Skip to content

Commit 84aab20

Browse files
committed
Add an emit result type to the group commit framework
Generalize the group-commit framework with a trailing R type parameter so an emit can return a result that is handed back to every slot in the emitted group and returned from GroupCommitter#ready. The Coordinator layer adopts it as Void for now (no behavioral change); a later change uses it to thread the commit-phase timestamp.
1 parent 3d6d58a commit 84aab20

22 files changed

Lines changed: 470 additions & 358 deletions

core/src/main/java/com/scalar/db/transaction/consensuscommit/CoordinatorCommitHandlerWithGroupCommit.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ void cancelGroupCommitIfNeeded(TransactionContext context) {
112112
}
113113

114114
@VisibleForTesting
115-
static class Emitter implements Emittable<String, String, TransactionContext> {
115+
static class Emitter implements Emittable<String, String, TransactionContext, Void> {
116116
private final Coordinator coordinator;
117117
private final WriteSetEncoder writeSetEncoder;
118118

@@ -122,10 +122,10 @@ static class Emitter implements Emittable<String, String, TransactionContext> {
122122
}
123123

124124
@Override
125-
public void emitNormalGroup(String parentId, List<TransactionContext> contexts)
125+
public Void emitNormalGroup(String parentId, List<TransactionContext> contexts)
126126
throws CoordinatorException {
127127
if (contexts.isEmpty()) {
128-
return;
128+
return null;
129129
}
130130
if (KEY_MANIPULATOR.isFullKey(parentId)) {
131131
throw new AssertionError(
@@ -146,10 +146,11 @@ public void emitNormalGroup(String parentId, List<TransactionContext> contexts)
146146
"Transaction {} (parent ID) is committed successfully at {}",
147147
parentId,
148148
System.currentTimeMillis());
149+
return null;
149150
}
150151

151152
@Override
152-
public void emitDelayedGroup(String fullId, TransactionContext context)
153+
public Void emitDelayedGroup(String fullId, TransactionContext context)
153154
throws CoordinatorException {
154155
coordinator.putState(
155156
new State(
@@ -159,6 +160,7 @@ public void emitDelayedGroup(String fullId, TransactionContext context)
159160
System.currentTimeMillis()));
160161
logger.debug(
161162
"Transaction {} is committed successfully at {}", fullId, System.currentTimeMillis());
163+
return null;
162164
}
163165
}
164166
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/CoordinatorGroupCommitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import java.util.Optional;
77

88
public class CoordinatorGroupCommitter
9-
extends GroupCommitter<String, String, String, String, String, TransactionContext> {
9+
extends GroupCommitter<String, String, String, String, String, TransactionContext, Void> {
1010
CoordinatorGroupCommitter(GroupCommitConfig config) {
1111
super("coordinator", config, new CoordinatorGroupCommitKeyManipulator());
1212
}

core/src/main/java/com/scalar/db/util/groupcommit/DelayedGroup.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@
77

88
// A group for a delayed slot. This group contains only a single slot.
99
@ThreadSafe
10-
class DelayedGroup<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>
11-
extends Group<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> {
10+
class DelayedGroup<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R>
11+
extends Group<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R> {
1212
private final FULL_KEY fullKey;
1313

1414
DelayedGroup(
1515
GroupCommitConfig config,
1616
FULL_KEY fullKey,
17-
Emittable<EMIT_PARENT_KEY, EMIT_FULL_KEY, V> emitter,
17+
Emittable<EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R> emitter,
1818
GroupCommitKeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
1919
keyManipulator) {
2020
super(emitter, keyManipulator, 1, config.oldGroupAbortTimeoutMillis());
@@ -35,7 +35,7 @@ FULL_KEY fullKey(CHILD_KEY childKey) {
3535
// slot. But just in case.
3636
protected synchronized void delegateEmitTaskToWaiter() {
3737
assert slots.size() == 1;
38-
for (Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> slot :
38+
for (Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R> slot :
3939
slots.values()) {
4040
// Pass `emitter` to ask the receiver's thread to emit the value
4141
slot.delegateTaskToWaiter(
@@ -50,7 +50,7 @@ protected synchronized void delegateEmitTaskToWaiter() {
5050
@Nullable
5151
@Override
5252
protected synchronized FULL_KEY reserveNewSlot(
53-
Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> slot) {
53+
Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R> slot) {
5454
slot.changeParentGroupToDelayedGroup(this);
5555
FULL_KEY fullKey = super.reserveNewSlot(slot);
5656
if (fullKey == null) {
@@ -64,7 +64,7 @@ protected synchronized FULL_KEY reserveNewSlot(
6464
public boolean equals(Object o) {
6565
if (this == o) return true;
6666
if (!(o instanceof DelayedGroup)) return false;
67-
DelayedGroup<?, ?, ?, ?, ?, ?> that = (DelayedGroup<?, ?, ?, ?, ?, ?>) o;
67+
DelayedGroup<?, ?, ?, ?, ?, ?, ?> that = (DelayedGroup<?, ?, ?, ?, ?, ?, ?>) o;
6868
return Objects.equal(fullKey, that.fullKey);
6969
}
7070

core/src/main/java/com/scalar/db/util/groupcommit/DelayedSlotMoveWorker.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,21 @@
88
// A worker manages NormalGroup instances to move delayed slots to a new DelayedGroup.
99
// Ready NormalGroup is passed to GroupCleanupWorker.
1010
@ThreadSafe
11-
class DelayedSlotMoveWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>
11+
class DelayedSlotMoveWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R>
1212
extends BackgroundWorker<
13-
NormalGroup<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>> {
14-
private final GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>
13+
NormalGroup<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R>> {
14+
private final GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R>
1515
groupManager;
1616
private final GroupCleanupWorker<
17-
PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>
17+
PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R>
1818
groupCleanupWorker;
1919

2020
DelayedSlotMoveWorker(
2121
String label,
2222
long queueCheckIntervalInMillis,
23-
GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> groupManager,
24-
GroupCleanupWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>
23+
GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R>
24+
groupManager,
25+
GroupCleanupWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R>
2526
groupCleanupWorker) {
2627
super(
2728
label + "-group-commit-delayed-slot-move",
@@ -34,7 +35,7 @@ class DelayedSlotMoveWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EM
3435
}
3536

3637
@Override
37-
BlockingQueue<NormalGroup<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>>
38+
BlockingQueue<NormalGroup<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R>>
3839
createQueue() {
3940
// Use a priority queue to prioritize groups based on their timeout values, processing groups
4041
// with smaller timeout values first.
@@ -44,7 +45,8 @@ class DelayedSlotMoveWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EM
4445

4546
@Override
4647
boolean processItem(
47-
NormalGroup<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> normalGroup) {
48+
NormalGroup<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R>
49+
normalGroup) {
4850
if (normalGroup.isReady()) {
4951
groupCleanupWorker.add(normalGroup);
5052
// Already ready. Should remove the item.
Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,22 @@
11
package com.scalar.db.util.groupcommit;
22

33
import java.util.List;
4+
import javax.annotation.Nullable;
45

56
/**
67
* An emittable interface to emit multiple values at once.
78
*
8-
* @param <FULL_KEY> A full-key type that Emitter can interpret.
99
* @param <PARENT_KEY> A parent-key type that Emitter can interpret.
10+
* @param <FULL_KEY> A full-key type that Emitter can interpret.
1011
* @param <V> A value type to be set to a slot.
12+
* @param <R> A result type returned by an emit. The same result is handed back to every slot that
13+
* belongs to the emitted group, so a client can obtain it from {@link
14+
* GroupCommitter#ready(Object, Object)}. The result may be null.
1115
*/
12-
public interface Emittable<PARENT_KEY, FULL_KEY, V> {
13-
void emitNormalGroup(PARENT_KEY parentKey, List<V> values) throws Exception;
16+
public interface Emittable<PARENT_KEY, FULL_KEY, V, R> {
17+
@Nullable
18+
R emitNormalGroup(PARENT_KEY parentKey, List<V> values) throws Exception;
1419

15-
void emitDelayedGroup(FULL_KEY fullKey, V value) throws Exception;
20+
@Nullable
21+
R emitDelayedGroup(FULL_KEY fullKey, V value) throws Exception;
1622
}

core/src/main/java/com/scalar/db/util/groupcommit/Group.java

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,17 @@
1010

1111
// An abstract class that has logics and implementations to manage slots and trigger to emit it.
1212
@ThreadSafe
13-
abstract class Group<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> {
13+
abstract class Group<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R> {
1414
private static final Logger logger = LoggerFactory.getLogger(Group.class);
1515

16-
protected final Emittable<EMIT_PARENT_KEY, EMIT_FULL_KEY, V> emitter;
16+
protected final Emittable<EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R> emitter;
1717
protected final GroupCommitKeyManipulator<
1818
PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
1919
keyManipulator;
2020
private final int capacity;
2121
private final AtomicReference<Integer> size = new AtomicReference<>();
2222
protected final Map<
23-
CHILD_KEY, Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>>
23+
CHILD_KEY, Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R>>
2424
slots;
2525
// Whether to reject a new value slot.
2626
protected final AtomicReference<Status> status = new AtomicReference<>(Status.OPEN);
@@ -61,7 +61,7 @@ enum Status {
6161
}
6262

6363
Group(
64-
Emittable<EMIT_PARENT_KEY, EMIT_FULL_KEY, V> emitter,
64+
Emittable<EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R> emitter,
6565
GroupCommitKeyManipulator<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY>
6666
keyManipulator,
6767
int capacity,
@@ -82,7 +82,7 @@ private boolean noMoreSlot() {
8282
// If it returns null, the Group is already size-fixed and a retry is needed.
8383
@Nullable
8484
protected synchronized FULL_KEY reserveNewSlot(
85-
Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> slot) {
85+
Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R> slot) {
8686
if (isSizeFixed()) {
8787
return null;
8888
}
@@ -94,8 +94,8 @@ protected synchronized FULL_KEY reserveNewSlot(
9494
}
9595

9696
private void reserveSlot(
97-
Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> slot) {
98-
Slot<?, ?, ?, ?, ?, ?> oldSlot = slots.put(slot.key(), slot);
97+
Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R> slot) {
98+
Slot<?, ?, ?, ?, ?, ?, ?> oldSlot = slots.put(slot.key(), slot);
9999
if (oldSlot != null) {
100100
throw new AssertionError(
101101
String.format("An old slot exist unexpectedly. Slot: %s, Old slot: %s", slot, oldSlot));
@@ -106,7 +106,7 @@ private void reserveSlot(
106106
// This sync is for moving timed-out value slot from a normal buf to a new delayed buf.
107107
// Returns null if the state of the group is changed (e.g., the slot is moved to another group).
108108
@Nullable
109-
private synchronized Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>
109+
private synchronized Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R>
110110
putValueToSlot(CHILD_KEY childKey, V value) {
111111
if (isReady()) {
112112
logger.debug(
@@ -116,7 +116,7 @@ private void reserveSlot(
116116
return null;
117117
}
118118

119-
Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> slot =
119+
Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R> slot =
120120
slots.get(childKey);
121121
if (slot == null) {
122122
return null;
@@ -125,20 +125,34 @@ private void reserveSlot(
125125
return slot;
126126
}
127127

128-
boolean putValueToSlotAndWait(CHILD_KEY childKey, V value) throws GroupCommitException {
129-
Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> slot;
128+
// Puts the value to the slot and waits until the group is emitted. Returns the emit result
129+
// wrapped in an EmitResult on success, or null when a retry is needed (the slot was moved to
130+
// another group). The result is wrapped so that a successful emit carrying a null result can be
131+
// distinguished from the retry signal.
132+
@Nullable
133+
EmitResult<R> putValueToSlotAndWait(CHILD_KEY childKey, V value) throws GroupCommitException {
134+
Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R> slot;
130135
synchronized (this) {
131136
slot = putValueToSlot(childKey, value);
132137
if (slot == null) {
133138
// Needs a retry since the state of the group is changed.
134-
return false;
139+
return null;
135140
}
136141
updateStatus();
137142

138143
delegateEmitTaskToWaiterIfReady();
139144
}
140-
slot.waitUntilEmit();
141-
return true;
145+
return new EmitResult<>(slot.waitUntilEmit());
146+
}
147+
148+
// Wraps the (possibly null) emit result so the caller can distinguish a successful emit (non-null
149+
// holder, possibly carrying a null result) from a "retry needed" signal (null holder).
150+
static final class EmitResult<R> {
151+
@Nullable final R value;
152+
153+
EmitResult(@Nullable R value) {
154+
this.value = value;
155+
}
142156
}
143157

144158
private synchronized void updateSlotsSize() {
@@ -185,7 +199,7 @@ synchronized void updateStatus() {
185199

186200
if (newStatus == Status.SIZE_FIXED) {
187201
int readySlotCount = 0;
188-
for (Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> slot :
202+
for (Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R> slot :
189203
slots.values()) {
190204
if (slot.isReady()) {
191205
readySlotCount++;
@@ -198,7 +212,7 @@ synchronized void updateStatus() {
198212

199213
if (newStatus == Status.READY) {
200214
int doneSlotCount = 0;
201-
for (Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> slot :
215+
for (Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R> slot :
202216
slots.values()) {
203217
if (slot.isDone()) {
204218
doneSlotCount++;
@@ -212,7 +226,7 @@ synchronized void updateStatus() {
212226
}
213227

214228
synchronized boolean removeSlot(CHILD_KEY childKey) {
215-
Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> slot =
229+
Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R> slot =
216230
slots.get(childKey);
217231
if (slot == null) {
218232
// Probably, the slot is already removed by the client or moved from NormalGroup to
@@ -231,7 +245,7 @@ synchronized boolean removeSlot(CHILD_KEY childKey) {
231245
return false;
232246
}
233247

234-
Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> removed =
248+
Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R> removed =
235249
slots.remove(childKey);
236250
assert removed != null;
237251

@@ -245,7 +259,7 @@ synchronized boolean removeSlot(CHILD_KEY childKey) {
245259
}
246260

247261
synchronized void abort() {
248-
for (Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> slot :
262+
for (Slot<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R> slot :
249263
slots.values()) {
250264
// Tell the clients that the slots are aborted.
251265
slot.markAsFailed(

core/src/main/java/com/scalar/db/util/groupcommit/GroupCleanupWorker.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,17 @@
88

99
// A worker manages Group instances to removes completed ones.
1010
@ThreadSafe
11-
class GroupCleanupWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>
11+
class GroupCleanupWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R>
1212
extends BackgroundWorker<
13-
Group<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>> {
13+
Group<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R>> {
1414
private static final Logger logger = LoggerFactory.getLogger(GroupCleanupWorker.class);
15-
private final GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>
15+
private final GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R>
1616
groupManager;
1717

1818
GroupCleanupWorker(
1919
String label,
2020
long queueCheckIntervalInMillis,
21-
GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>
21+
GroupManager<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R>
2222
groupManager) {
2323
super(
2424
label + "-group-commit-group-cleanup",
@@ -29,7 +29,7 @@ class GroupCleanupWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_
2929
}
3030

3131
@Override
32-
BlockingQueue<Group<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V>>
32+
BlockingQueue<Group<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R>>
3333
createQueue() {
3434
// Use a normal queue because:
3535
// - The timeout of the queued groups is large since it's for "just in case"
@@ -41,7 +41,7 @@ class GroupCleanupWorker<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_
4141

4242
@Override
4343
boolean processItem(
44-
Group<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V> group) {
44+
Group<PARENT_KEY, CHILD_KEY, FULL_KEY, EMIT_PARENT_KEY, EMIT_FULL_KEY, V, R> group) {
4545
if (group.oldGroupAbortTimeoutAtMillis() < System.currentTimeMillis()) {
4646
groupManager.removeGroupFromMap(group);
4747
group.abort();

0 commit comments

Comments
 (0)