Skip to content

Commit 3d6d58a

Browse files
committed
refactor(consensuscommit): decouple CoordinatorCommitHandler from ParticipantCommitHandler
CoordinatorCommitHandler held a ParticipantCommitHandler purely to roll back the transaction's prepared records when a commitState putState race resolved to ABORTED/absent. That made the Coordinator-side (state-table) handler depend on the participant-side (data-record) handler, which blocks reusing the two handlers independently (e.g. from callers other than CommitHandler). Move that responsibility to the orchestrator: handleCommitConflict now only reports the conflict (throws CommitConflictException, or returns normally when the state is already COMMITTED), and CommitHandler.commit() catches it and rolls back the records via its own ParticipantCommitHandler. No behavioral change. - Drop the ParticipantCommitHandler dependency from CoordinatorCommitHandler and CoordinatorCommitHandlerWithGroupCommit. - Order the two handlers Coordinator-before-Participant in CommitHandler's fields/constructors now that they are independent. - Drop the "one of the two specialized handlers that CommitHandler delegates to" framing from the handler Javadocs; describe each handler by its own responsibility so it reads correctly for non-CommitHandler callers.
1 parent 29e3cdd commit 3d6d58a

11 files changed

Lines changed: 158 additions & 96 deletions

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@
4949
public class CommitHandler {
5050
private static final Logger logger = LoggerFactory.getLogger(CommitHandler.class);
5151

52-
private final ParticipantCommitHandler participantCommitHandler;
5352
private final CoordinatorCommitHandler coordinatorCommitHandler;
53+
private final ParticipantCommitHandler participantCommitHandler;
5454
protected final boolean coordinatorWriteOmissionOnReadOnlyEnabled;
5555

5656
@LazyInit @Nullable private BeforePreparationHook beforePreparationHook;
@@ -65,16 +65,15 @@ public CommitHandler(
6565
boolean coordinatorWriteOmissionOnReadOnlyEnabled,
6666
boolean onePhaseCommitEnabled) {
6767
this.coordinatorWriteOmissionOnReadOnlyEnabled = coordinatorWriteOmissionOnReadOnlyEnabled;
68+
this.coordinatorCommitHandler =
69+
new CoordinatorCommitHandler(coordinator, new WriteSetEncoder(tableMetadataManager));
6870
this.participantCommitHandler =
6971
new ParticipantCommitHandler(
7072
storage,
7173
tableMetadataManager,
7274
parallelExecutor,
7375
mutationsGrouper,
7476
onePhaseCommitEnabled);
75-
this.coordinatorCommitHandler =
76-
new CoordinatorCommitHandler(
77-
coordinator, new WriteSetEncoder(tableMetadataManager), participantCommitHandler);
7877
}
7978

8079
// Constructor for subclasses (CommitHandlerWithGroupCommit) that need to inject a
@@ -84,11 +83,11 @@ public CommitHandler(
8483
@SuppressFBWarnings("EI_EXPOSE_REP2")
8584
protected CommitHandler(
8685
boolean coordinatorWriteOmissionOnReadOnlyEnabled,
87-
ParticipantCommitHandler participantCommitHandler,
88-
CoordinatorCommitHandler coordinatorCommitHandler) {
86+
CoordinatorCommitHandler coordinatorCommitHandler,
87+
ParticipantCommitHandler participantCommitHandler) {
8988
this.coordinatorWriteOmissionOnReadOnlyEnabled = coordinatorWriteOmissionOnReadOnlyEnabled;
90-
this.participantCommitHandler = checkNotNull(participantCommitHandler);
9189
this.coordinatorCommitHandler = checkNotNull(coordinatorCommitHandler);
90+
this.participantCommitHandler = checkNotNull(participantCommitHandler);
9291
}
9392

9493
/**
@@ -208,7 +207,18 @@ public void commit(TransactionContext context)
208207
context, beforePreparationHookFuture.orElse(null), hasWritesOrDeletesInSnapshot);
209208

210209
if (hasWritesOrDeletesInSnapshot || !coordinatorWriteOmissionOnReadOnlyEnabled) {
211-
commitState(context);
210+
try {
211+
commitState(context);
212+
} catch (CommitConflictException e) {
213+
// The COMMITTED-state write lost a putState race that resolved to ABORTED (or absent): the
214+
// transaction is aborted. The Coordinator-side handler only reports the conflict; the
215+
// orchestrator owns the records, so roll back the prepared records here before surfacing
216+
// it.
217+
if (hasWritesOrDeletesInSnapshot) {
218+
rollbackRecords(context);
219+
}
220+
throw e;
221+
}
212222
}
213223
if (hasWritesOrDeletesInSnapshot) {
214224
commitRecords(context);

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ public CommitHandlerWithGroupCommit(
4646
checkNotNull(groupCommitter));
4747
}
4848

49-
// Second-stage delegating constructor so the ParticipantCommitHandler is available when we build
50-
// the CoordinatorCommitHandlerWithGroupCommit.
49+
// Second-stage delegating constructor that builds the two specialized handlers before handing
50+
// them to the package-private constructor.
5151
private CommitHandlerWithGroupCommit(
5252
TransactionTableMetadataManager tableMetadataManager,
5353
boolean coordinatorWriteOmissionOnReadOnlyEnabled,
@@ -56,22 +56,19 @@ private CommitHandlerWithGroupCommit(
5656
CoordinatorGroupCommitter groupCommitter) {
5757
this(
5858
coordinatorWriteOmissionOnReadOnlyEnabled,
59-
participantCommitHandler,
6059
new CoordinatorCommitHandlerWithGroupCommit(
61-
coordinator,
62-
new WriteSetEncoder(tableMetadataManager),
63-
participantCommitHandler,
64-
groupCommitter));
60+
coordinator, new WriteSetEncoder(tableMetadataManager), groupCommitter),
61+
participantCommitHandler);
6562
}
6663

67-
// Package-private so test code can inject Mockito spies of ParticipantCommitHandler /
68-
// CoordinatorCommitHandlerWithGroupCommit via constructor injection rather than via reflection.
64+
// Package-private so test code can inject Mockito spies of the two specialized handlers via
65+
// constructor injection rather than via reflection.
6966
@SuppressFBWarnings("EI_EXPOSE_REP2")
7067
CommitHandlerWithGroupCommit(
7168
boolean coordinatorWriteOmissionOnReadOnlyEnabled,
72-
ParticipantCommitHandler participantCommitHandler,
73-
CoordinatorCommitHandlerWithGroupCommit coordinatorHandler) {
74-
super(coordinatorWriteOmissionOnReadOnlyEnabled, participantCommitHandler, coordinatorHandler);
69+
CoordinatorCommitHandlerWithGroupCommit coordinatorHandler,
70+
ParticipantCommitHandler participantCommitHandler) {
71+
super(coordinatorWriteOmissionOnReadOnlyEnabled, coordinatorHandler, participantCommitHandler);
7572
this.coordinatorHandler = coordinatorHandler;
7673
}
7774

core/src/main/java/com/scalar/db/transaction/consensuscommit/CoordinatorCommitHandler.java

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,23 @@
1818
* Handles the Coordinator-side (state-table) operations of the Consensus Commit protocol: writing
1919
* the COMMITTED / ABORTED records to the Coordinator state table and resolving putState conflicts.
2020
*
21-
* <p>This is one of the two specialized handlers that {@link CommitHandler} delegates to. The other
22-
* is {@link ParticipantCommitHandler}, which owns participant-side data-record operations.
23-
*
24-
* <p>Methods here only touch the Coordinator state table via {@link Coordinator}. They never write
25-
* to user data tables — except indirectly: when {@code commitState} loses a putState race and the
26-
* persisted state turns out to be ABORTED (or absent), this handler asks the orchestrator-provided
27-
* {@link ParticipantCommitHandler} to roll back the records that the transaction had prepared. That
28-
* cross-handler call keeps the "commit conflict → rollback records" invariant atomic from the
29-
* orchestrator's perspective.
21+
* <p>Methods here only touch the Coordinator state table via {@link Coordinator}; they never touch
22+
* user data tables. When {@code commitState} loses a putState race and the persisted state turns
23+
* out to be ABORTED (or absent), this handler reports a {@link CommitConflictException} and leaves
24+
* the rollback of the transaction's prepared records to the caller. This keeps the handler free of
25+
* any participant-side dependency.
3026
*/
3127
@ThreadSafe
3228
class CoordinatorCommitHandler {
3329
private static final Logger logger = LoggerFactory.getLogger(CoordinatorCommitHandler.class);
3430

3531
private final Coordinator coordinator;
3632
private final WriteSetEncoder writeSetEncoder;
37-
private final ParticipantCommitHandler participantCommitHandler;
3833

3934
@SuppressFBWarnings("EI_EXPOSE_REP2")
40-
CoordinatorCommitHandler(
41-
Coordinator coordinator,
42-
WriteSetEncoder writeSetEncoder,
43-
ParticipantCommitHandler participantCommitHandler) {
35+
CoordinatorCommitHandler(Coordinator coordinator, WriteSetEncoder writeSetEncoder) {
4436
this.coordinator = checkNotNull(coordinator);
4537
this.writeSetEncoder = checkNotNull(writeSetEncoder);
46-
this.participantCommitHandler = checkNotNull(participantCommitHandler);
4738
}
4839

4940
/**
@@ -88,7 +79,6 @@ void handleCommitConflict(TransactionContext context, Exception cause)
8879
if (s.isPresent()) {
8980
TransactionState state = s.get().getState();
9081
if (state == TransactionState.ABORTED) {
91-
participantCommitHandler.rollbackRecords(context);
9282
throw new CommitConflictException(
9383
CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_STATE.buildMessage(
9484
cause.getMessage()),
@@ -111,8 +101,9 @@ void handleCommitConflict(TransactionContext context, Exception cause)
111101
// The coordinator state is absent: a row existed when our putIfNotExists lost the race, but
112102
// it is gone now. In both interfaces this means the conflicting row was an ABORTED written
113103
// by a lazy recovery (which also rolled the records back) and later removed by the
114-
// Coordinator state cleanup process, so the transaction is definitively aborted. Roll the
115-
// records back and report a conflict -- the same outcome as the present-ABORTED case above.
104+
// Coordinator state cleanup process, so the transaction is definitively aborted. Report a
105+
// conflict (the orchestrator rolls the records back) -- the same outcome as the
106+
// present-ABORTED case above.
116107
//
117108
// A COMMITTED row can be ruled out here in both interfaces:
118109
// - One-phase Commit I/F: this commit is the only writer of this transaction's COMMITTED
@@ -139,7 +130,6 @@ void handleCommitConflict(TransactionContext context, Exception cause)
139130
//
140131
// TODO: revisit this if/when the Two-phase Commit I/F is removed.
141132

142-
participantCommitHandler.rollbackRecords(context);
143133
throw new CommitConflictException(
144134
CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_STATE.buildMessage(
145135
cause.getMessage()),

core/src/main/java/com/scalar/db/transaction/consensuscommit/CoordinatorCommitHandlerWithGroupCommit.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,6 @@
2222
* Group-commit-aware variant of {@link CoordinatorCommitHandler}. Routes {@code commitState} into
2323
* the group-commit emitter so that multiple transactions' COMMITTED states can be batched into one
2424
* Coordinator-table row. Cancels the group commit slot reservation on the abort path.
25-
*
26-
* <p>Used by {@link CommitHandlerWithGroupCommit} (the orchestrator variant) in place of the base
27-
* {@link CoordinatorCommitHandler} when group commit is enabled.
2825
*/
2926
@ThreadSafe
3027
class CoordinatorCommitHandlerWithGroupCommit extends CoordinatorCommitHandler {
@@ -39,9 +36,8 @@ class CoordinatorCommitHandlerWithGroupCommit extends CoordinatorCommitHandler {
3936
CoordinatorCommitHandlerWithGroupCommit(
4037
Coordinator coordinator,
4138
WriteSetEncoder writeSetEncoder,
42-
ParticipantCommitHandler participantCommitHandler,
4339
CoordinatorGroupCommitter groupCommitter) {
44-
super(coordinator, writeSetEncoder, participantCommitHandler);
40+
super(coordinator, writeSetEncoder);
4541
checkNotNull(groupCommitter);
4642
// The methods of this emitter will be called via GroupCommitter.ready().
4743
groupCommitter.setEmitter(new Emitter(coordinator, writeSetEncoder));

core/src/main/java/com/scalar/db/transaction/consensuscommit/ParticipantCommitHandler.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,6 @@
3333
* preparing, validating, committing, and rolling back the records the transaction touches in user
3434
* data tables.
3535
*
36-
* <p>This is one of the two specialized handlers that {@link CommitHandler} delegates to. The other
37-
* is {@link CoordinatorCommitHandler}, which owns Coordinator-table state writes.
38-
*
3936
* <p>Methods here only touch user data tables via {@link DistributedStorage} and the {@link
4037
* MutationsGrouper} / {@link ParallelExecutor} stack. They never write to the Coordinator table.
4138
*/

core/src/main/java/com/scalar/db/transaction/consensuscommit/TwoPhaseConsensusCommit.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,13 @@ public void commit() throws CommitConflictException, UnknownTransactionStatusExc
234234

235235
try {
236236
commit.commitStateWithoutWriteSet(context);
237-
} catch (CommitConflictException | UnknownTransactionStatusException e) {
238-
// no need to rollback because the transaction has already been rolled back
237+
} catch (CommitConflictException e) {
238+
commit.rollbackRecords(context);
239+
needRollback = false;
240+
throw e;
241+
} catch (UnknownTransactionStatusException e) {
242+
// The coordinator state is unknown, so we must not roll back the records: the transaction may
243+
// have committed.
239244
needRollback = false;
240245

241246
throw e;

core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ protected void extraCleanup() {}
6868
protected CommitHandler createCommitHandler(boolean coordinatorWriteOmissionOnReadOnlyEnabled) {
6969
return new CommitHandler(
7070
coordinatorWriteOmissionOnReadOnlyEnabled,
71-
participantCommitHandler,
72-
coordinatorCommitHandler);
71+
coordinatorCommitHandler,
72+
participantCommitHandler);
7373
}
7474

7575
@BeforeEach
@@ -306,6 +306,29 @@ public void commit_ValidateThrowsGenericException_WithWrites_ShouldDelegateAbort
306306
verify(participantCommitHandler).rollbackRecords(context);
307307
}
308308

309+
@Test
310+
public void commit_CommitStateThrowsConflict_WithWrites_ShouldRollbackRecordsAndThrowConflict()
311+
throws Exception {
312+
// The Coordinator-side handler only reports the commit-state putState conflict; the
313+
// orchestrator
314+
// owns the records, so it rolls them back here before surfacing the conflict.
315+
316+
// Arrange
317+
Snapshot snapshot = snapshotWithWrites();
318+
doThrow(new CommitConflictException("conflict", anyId()))
319+
.when(coordinatorCommitHandler)
320+
.commitState(any());
321+
TransactionContext context =
322+
createTransactionContext(anyId(), snapshot, Isolation.SNAPSHOT, false, false);
323+
324+
// Act Assert
325+
assertThatThrownBy(() -> handler.commit(context)).isInstanceOf(CommitConflictException.class);
326+
327+
verify(coordinatorCommitHandler).commitState(context);
328+
verify(participantCommitHandler).rollbackRecords(context);
329+
verify(participantCommitHandler, never()).commitRecords(any());
330+
}
331+
309332
@Test
310333
public void commit_ValidateThrows_NoWrites_OmissionEnabled_ShouldNotDelegateAbortNorRollback()
311334
throws Exception {

core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommitTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ protected CommitHandler createCommitHandler(boolean coordinatorWriteOmissionOnRe
4646
coordinatorCommitHandler = groupCommitCoordinatorHandler;
4747
return new CommitHandlerWithGroupCommit(
4848
coordinatorWriteOmissionOnReadOnlyEnabled,
49-
participantCommitHandler,
50-
groupCommitCoordinatorHandler);
49+
groupCommitCoordinatorHandler,
50+
participantCommitHandler);
5151
}
5252

5353
// =========================================================================

core/src/test/java/com/scalar/db/transaction/consensuscommit/CoordinatorCommitHandlerTest.java

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ class CoordinatorCommitHandlerTest {
4040

4141
@Mock private Coordinator coordinator;
4242
@Mock private TransactionTableMetadataManager tableMetadataManager;
43-
@Mock private ParticipantCommitHandler participantCommitHandler;
4443
@Mock private ConsensusCommitConfig config;
4544

4645
private ParallelExecutor parallelExecutor;
@@ -63,8 +62,7 @@ void tearDown() {
6362
}
6463

6564
private CoordinatorCommitHandler createHandler() {
66-
return new CoordinatorCommitHandler(
67-
coordinator, new WriteSetEncoder(tableMetadataManager), participantCommitHandler);
65+
return new CoordinatorCommitHandler(coordinator, new WriteSetEncoder(tableMetadataManager));
6866
}
6967

7068
private Put preparePut1() {
@@ -127,9 +125,11 @@ void commitState_WhenSuccessful_ShouldPutCommittedState()
127125
}
128126

129127
@Test
130-
void
131-
commitState_WhenCoordinatorConflictAndAbortedReturnedInGetState_ShouldRollbackRecordsAndThrowConflict()
132-
throws Exception {
128+
void commitState_WhenCoordinatorConflictAndAbortedReturnedInGetState_ShouldThrowConflict()
129+
throws Exception {
130+
// Record rollback is the orchestrator's responsibility (see CommitHandlerTest); here we only
131+
// assert that the conflict is surfaced.
132+
133133
// Arrange
134134
Snapshot snapshot = prepareSnapshotWithWrite();
135135
TransactionContext context = createTransactionContext(snapshot);
@@ -149,8 +149,6 @@ void commitState_WhenSuccessful_ShouldPutCommittedState()
149149
// Act Assert
150150
assertThatThrownBy(() -> handler.commitState(context))
151151
.isInstanceOf(CommitConflictException.class);
152-
153-
verify(participantCommitHandler).rollbackRecords(context);
154152
}
155153

156154
@Test
@@ -175,17 +173,16 @@ void commitState_WhenCoordinatorConflictAndCommittedReturnedInGetState_ShouldRet
175173
.when(coordinator)
176174
.getState(anyId());
177175

178-
// Act (must not throw)
176+
// Act Assert (must not throw — the transaction is already committed)
179177
handler.commitState(context);
180-
181-
// Assert
182-
verify(participantCommitHandler, never()).rollbackRecords(any());
183178
}
184179

185180
@Test
186-
void
187-
commitState_WhenCoordinatorConflictAndNoStatePersisted_ShouldRollbackRecordsAndThrowConflict()
188-
throws Exception {
181+
void commitState_WhenCoordinatorConflictAndNoStatePersisted_ShouldThrowConflict()
182+
throws Exception {
183+
// Record rollback is the orchestrator's responsibility (see CommitHandlerTest); here we only
184+
// assert that the conflict is surfaced.
185+
189186
// Arrange
190187
Snapshot snapshot = prepareSnapshotWithWrite();
191188
TransactionContext context = createTransactionContext(snapshot);
@@ -200,8 +197,6 @@ void commitState_WhenCoordinatorConflictAndCommittedReturnedInGetState_ShouldRet
200197
// Act Assert
201198
assertThatThrownBy(() -> handler.commitState(context))
202199
.isInstanceOf(CommitConflictException.class);
203-
204-
verify(participantCommitHandler).rollbackRecords(context);
205200
}
206201

207202
@Test
@@ -435,8 +430,10 @@ void forceAbortState_WhenCoordinatorExceptionThrown_ShouldThrowUnknown()
435430
// ---------- handleCommitConflict ----------
436431

437432
@Test
438-
void handleCommitConflict_WhenAbortedStatePersisted_ShouldRollbackRecordsAndThrowConflict()
439-
throws Exception {
433+
void handleCommitConflict_WhenAbortedStatePersisted_ShouldThrowConflict() throws Exception {
434+
// Record rollback is the orchestrator's responsibility (see CommitHandlerTest); here we only
435+
// assert that the conflict is surfaced.
436+
440437
// Arrange
441438
Snapshot snapshot = prepareSnapshotWithWrite();
442439
TransactionContext context = createTransactionContext(snapshot);
@@ -452,7 +449,6 @@ void handleCommitConflict_WhenAbortedStatePersisted_ShouldRollbackRecordsAndThro
452449
assertThatThrownBy(() -> handler.handleCommitConflict(context, cause))
453450
.isInstanceOf(CommitConflictException.class)
454451
.hasCause(cause);
455-
verify(participantCommitHandler).rollbackRecords(context);
456452
}
457453

458454
@Test
@@ -467,16 +463,15 @@ void handleCommitConflict_WhenCommittedStatePersisted_ShouldReturnNormally() thr
467463
.when(coordinator)
468464
.getState(anyId());
469465

470-
// Act (must not throw)
466+
// Act Assert (must not throw)
471467
handler.handleCommitConflict(context, new RuntimeException("conflict"));
472-
473-
// Assert
474-
verify(participantCommitHandler, never()).rollbackRecords(any());
475468
}
476469

477470
@Test
478-
void handleCommitConflict_WhenNoStatePersisted_ShouldRollbackRecordsAndThrowConflict()
479-
throws Exception {
471+
void handleCommitConflict_WhenNoStatePersisted_ShouldThrowConflict() throws Exception {
472+
// Record rollback is the orchestrator's responsibility (see CommitHandlerTest); here we only
473+
// assert that the conflict is surfaced.
474+
480475
// Arrange
481476
Snapshot snapshot = prepareSnapshotWithWrite();
482477
TransactionContext context = createTransactionContext(snapshot);
@@ -487,7 +482,6 @@ void handleCommitConflict_WhenNoStatePersisted_ShouldRollbackRecordsAndThrowConf
487482
assertThatThrownBy(() -> handler.handleCommitConflict(context, cause))
488483
.isInstanceOf(CommitConflictException.class)
489484
.hasCause(cause);
490-
verify(participantCommitHandler).rollbackRecords(context);
491485
}
492486

493487
@Test

0 commit comments

Comments
 (0)