Skip to content

Commit fe31825

Browse files
committed
refactor(consensuscommit): make CoordinatorCommitHandler context-independent
Decouple the Coordinator-side commit handler from TransactionContext and WriteSetEncoder. CoordinatorCommitHandler now operates on a transaction id plus a pre-encoded WriteSet, so the orchestrator (CommitHandler) owns the WriteSetEncoder and supplies the encoded write set and committedAt. For group commit, the Emittable value becomes a (fullId, WriteSet) carrier (CoordinatorGroupCommitValue) so the Emitter is encoder-free: it derives each child id from the carried full id and stamps the pre-encoded write set. The WriteSetEncoder therefore lives only in the orchestrator layer, and WriteSetEncoder#encodeMultiGroupWriteSet and its key-manipulator dependency are dropped. The group-commit entry point is named groupCommitState (distinct from the base commitState, whose signature differs) so a caller cannot silently bind to the base direct-putState overload, and slot cancellation is id-based (cancelGroupCommit). Tests are reworked to the id-based signatures, with commitState delegation routed through protected hooks so the group-commit subclass asserts the 2-arg groupCommitState (committedAt determined at emit time) while the base asserts the 3-arg orchestrator-provided variant.
1 parent 2ebe241 commit fe31825

12 files changed

Lines changed: 552 additions & 610 deletions

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,20 @@
3939
* (ConsensusCommit / TwoPhaseConsensusCommit / ConsensusCommitManager) can depend on the
4040
* orchestrator alone.
4141
*
42-
* <p>Public methods like {@code commitState}, {@code abortState}, {@code prepareRecords}, {@code
43-
* commitRecords}, {@code rollbackRecords}, {@code commitStateWithoutWriteSet}, and {@code
44-
* abortStateWithoutWriteSet} are exposed on this class as thin pass-throughs to the specialized
45-
* handlers so direct callers (notably {@link TwoPhaseConsensusCommit}) can drive individual commit
46-
* phases without depending on the handlers directly.
42+
* <p>Several public methods are exposed on this class so direct callers (notably {@link
43+
* TwoPhaseConsensusCommit}) can drive individual commit phases without depending on the specialized
44+
* handlers directly. {@code prepareRecords}, {@code commitRecords}, {@code rollbackRecords}, {@code
45+
* commitStateWithoutWriteSet}, and {@code abortStateWithoutWriteSet} are thin pass-throughs. {@code
46+
* commitState} and {@code abortState} are not: they encode the transaction's write set via the
47+
* orchestrator-owned {@link WriteSetEncoder} before delegating to the Coordinator-side handler.
4748
*/
4849
@ThreadSafe
4950
public class CommitHandler {
5051
private static final Logger logger = LoggerFactory.getLogger(CommitHandler.class);
5152

5253
private final CoordinatorCommitHandler coordinatorCommitHandler;
5354
private final ParticipantCommitHandler participantCommitHandler;
55+
protected final WriteSetEncoder writeSetEncoder;
5456
protected final boolean coordinatorWriteOmissionOnReadOnlyEnabled;
5557

5658
@LazyInit @Nullable private BeforePreparationHook beforePreparationHook;
@@ -65,8 +67,8 @@ public CommitHandler(
6567
boolean coordinatorWriteOmissionOnReadOnlyEnabled,
6668
boolean onePhaseCommitEnabled) {
6769
this.coordinatorWriteOmissionOnReadOnlyEnabled = coordinatorWriteOmissionOnReadOnlyEnabled;
68-
this.coordinatorCommitHandler =
69-
new CoordinatorCommitHandler(coordinator, new WriteSetEncoder(tableMetadataManager));
70+
this.writeSetEncoder = new WriteSetEncoder(tableMetadataManager);
71+
this.coordinatorCommitHandler = new CoordinatorCommitHandler(coordinator);
7072
this.participantCommitHandler =
7173
new ParticipantCommitHandler(
7274
storage,
@@ -83,9 +85,11 @@ public CommitHandler(
8385
@SuppressFBWarnings("EI_EXPOSE_REP2")
8486
protected CommitHandler(
8587
boolean coordinatorWriteOmissionOnReadOnlyEnabled,
88+
WriteSetEncoder writeSetEncoder,
8689
CoordinatorCommitHandler coordinatorCommitHandler,
8790
ParticipantCommitHandler participantCommitHandler) {
8891
this.coordinatorWriteOmissionOnReadOnlyEnabled = coordinatorWriteOmissionOnReadOnlyEnabled;
92+
this.writeSetEncoder = checkNotNull(writeSetEncoder);
8993
this.coordinatorCommitHandler = checkNotNull(coordinatorCommitHandler);
9094
this.participantCommitHandler = checkNotNull(participantCommitHandler);
9195
}
@@ -292,22 +296,34 @@ void onePhaseCommitRecords(TransactionContext context)
292296

293297
public long commitState(TransactionContext context)
294298
throws CommitConflictException, UnknownTransactionStatusException {
295-
return coordinatorCommitHandler.commitState(context);
299+
return coordinatorCommitHandler.commitState(
300+
context.transactionId, writeSetEncoder.encodeSingleGroupWriteSet(context, false));
296301
}
297302

303+
// 2PC-only. Delegates to commitState(id, null) / abortState(id, null), which the group-commit
304+
// coordinator handler overrides to route through the group committer. That null never reaches the
305+
// group committer here because 2PC forbids group commit
306+
// (TwoPhaseConsensusCommitManager#throwIfGroupCommitIsEnabled) and never builds a
307+
// CommitHandlerWithGroupCommit.
308+
//
309+
// TODO: revisit this if/when the Two-phase Commit I/F is removed.
298310
public long commitStateWithoutWriteSet(TransactionContext context)
299311
throws CommitConflictException, UnknownTransactionStatusException {
300-
return coordinatorCommitHandler.commitStateWithoutWriteSet(context);
312+
return coordinatorCommitHandler.commitState(context.transactionId, null);
301313
}
302314

303315
public TransactionState abortState(TransactionContext context)
304316
throws UnknownTransactionStatusException {
305-
return coordinatorCommitHandler.abortState(context);
317+
return coordinatorCommitHandler.abortState(
318+
context.transactionId, writeSetEncoder.encodeSingleGroupWriteSet(context, false));
306319
}
307320

321+
// 2PC-only; see the null-write-set note on commitStateWithoutWriteSet above.
322+
//
323+
// TODO: revisit this if/when the Two-phase Commit I/F is removed.
308324
public TransactionState abortStateWithoutWriteSet(String id)
309325
throws UnknownTransactionStatusException {
310-
return coordinatorCommitHandler.abortStateWithoutWriteSet(id);
326+
return coordinatorCommitHandler.abortState(id, null);
311327
}
312328

313329
public TransactionState forceAbortState(String id) throws UnknownTransactionStatusException {

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java

Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static com.google.common.base.Preconditions.checkNotNull;
44

55
import com.scalar.db.api.DistributedStorage;
6+
import com.scalar.db.api.TransactionState;
67
import com.scalar.db.exception.transaction.CommitConflictException;
78
import com.scalar.db.exception.transaction.CommitException;
89
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
@@ -56,8 +57,8 @@ private CommitHandlerWithGroupCommit(
5657
CoordinatorGroupCommitter groupCommitter) {
5758
this(
5859
coordinatorWriteOmissionOnReadOnlyEnabled,
59-
new CoordinatorCommitHandlerWithGroupCommit(
60-
coordinator, new WriteSetEncoder(tableMetadataManager), groupCommitter),
60+
new WriteSetEncoder(tableMetadataManager),
61+
new CoordinatorCommitHandlerWithGroupCommit(coordinator, groupCommitter),
6162
participantCommitHandler);
6263
}
6364

@@ -66,9 +67,14 @@ coordinator, new WriteSetEncoder(tableMetadataManager), groupCommitter),
6667
@SuppressFBWarnings("EI_EXPOSE_REP2")
6768
CommitHandlerWithGroupCommit(
6869
boolean coordinatorWriteOmissionOnReadOnlyEnabled,
70+
WriteSetEncoder writeSetEncoder,
6971
CoordinatorCommitHandlerWithGroupCommit coordinatorHandler,
7072
ParticipantCommitHandler participantCommitHandler) {
71-
super(coordinatorWriteOmissionOnReadOnlyEnabled, coordinatorHandler, participantCommitHandler);
73+
super(
74+
coordinatorWriteOmissionOnReadOnlyEnabled,
75+
writeSetEncoder,
76+
coordinatorHandler,
77+
participantCommitHandler);
7278
this.coordinatorHandler = coordinatorHandler;
7379
}
7480

@@ -81,45 +87,68 @@ public void commit(TransactionContext context)
8187

8288
/**
8389
* Releases the group-commit slot reservation when the orchestrator will not write a Coordinator
84-
* state row through the normal 2-phase commit path. That happens when the transaction was begun
85-
* as non-read-only (and therefore reserved a slot) but turned out to have no writes/deletes,
86-
* combined with coordinator-write-omission being enabled — in which case {@link
87-
* CommitHandler#commit} skips {@code commitState} entirely and the reserved slot would otherwise
88-
* sit unused for the rest of the commit flow.
89-
*
90-
* <p>The other places that call {@link
91-
* CoordinatorCommitHandlerWithGroupCommit#cancelGroupCommitIfNeeded} (one-phase commit,
92-
* abort/conflict paths, before-commit failure cleanup) cannot share this predicate: at those
93-
* sites the transaction has writes/deletes, so the predicate would block the cancellation that
94-
* those paths actually need.
90+
* state row through the normal 2-phase commit path: a non-read-only transaction (which reserved a
91+
* slot) that turns out to have no writes/deletes while coordinator-write-omission is enabled, so
92+
* {@link CommitHandler#commit} skips {@code commitState} and the slot would otherwise sit unused.
9593
*/
9694
private void cancelGroupCommitIfCoordinatorStateOmitted(TransactionContext context) {
9795
if (!context.readOnly
9896
&& !context.snapshot.hasWritesOrDeletes()
9997
&& coordinatorWriteOmissionOnReadOnlyEnabled) {
100-
coordinatorHandler.cancelGroupCommitIfNeeded(context);
98+
cancelGroupCommitIfSlotReserved(context);
10199
}
102100
}
103101

102+
private void cancelGroupCommitIfSlotReserved(TransactionContext context) {
103+
// A transaction only holds a group commit slot when one was reserved at begin time (e.g., a
104+
// read-only transaction does not reserve a slot when coordinator write omission is enabled).
105+
// When no slot was reserved there is nothing to release.
106+
if (!context.groupCommitSlotReserved) {
107+
return;
108+
}
109+
110+
// FIXME: When a slot was reserved (so the early return above did not fire), this can run more
111+
// than once on a single failure path, because several cleanup callbacks each release the slot:
112+
// - onFailureBeforeCommit() runs on the pre-commit-state failure paths (via
113+
// safelyCallOnFailureBeforeCommit);
114+
// - abortState() additionally runs from abortStateAndRollbackRecordsIfNeeded() when the
115+
// transaction has writes/deletes, or coordinator write omission on read-only is disabled;
116+
// - onePhaseCommitRecords() pre-cancels the slot before its body, so a one-phase-commit
117+
// failure followed by onFailureBeforeCommit() also cancels twice.
118+
// It is currently safe only because groupCommitter.remove() is idempotent. The cleanup paths
119+
// should be reworked so the slot is released exactly once instead of relying on that
120+
// idempotency.
121+
coordinatorHandler.cancelGroupCommit(context.transactionId);
122+
}
123+
104124
@Override
105125
boolean canOnePhaseCommit(TransactionContext context) throws CommitException {
106126
try {
107127
return super.canOnePhaseCommit(context);
108128
} catch (CommitException e) {
109-
coordinatorHandler.cancelGroupCommitIfNeeded(context);
129+
cancelGroupCommitIfSlotReserved(context);
110130
throw e;
111131
}
112132
}
113133

114134
@Override
115135
void onePhaseCommitRecords(TransactionContext context)
116136
throws CommitConflictException, UnknownTransactionStatusException {
117-
coordinatorHandler.cancelGroupCommitIfNeeded(context);
137+
cancelGroupCommitIfSlotReserved(context);
118138
super.onePhaseCommitRecords(context);
119139
}
120140

121141
@Override
122142
protected void onFailureBeforeCommit(TransactionContext context) {
123-
coordinatorHandler.cancelGroupCommitIfNeeded(context);
143+
cancelGroupCommitIfSlotReserved(context);
144+
}
145+
146+
@Override
147+
public TransactionState abortState(TransactionContext context)
148+
throws UnknownTransactionStatusException {
149+
// Release the group-commit slot before writing the ABORTED state so it does not sit in the
150+
// group buffer waiting for a sibling.
151+
cancelGroupCommitIfSlotReserved(context);
152+
return super.abortState(context);
124153
}
125154
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/CoordinatorCommitHandler.java

Lines changed: 25 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -18,51 +18,35 @@
1818
* Handles the Coordinator-side (state-table) operations of the Consensus Commit protocol: writing
1919
* the COMMITTED / ABORTED records to the Coordinator state table and resolving putState conflicts.
2020
*
21-
* <p>Methods here only touch the Coordinator state table via {@link Coordinator}; they never touch
22-
* user data tables. When {@code commitState} loses a putState race and the persisted state turns
23-
* out to be ABORTED (or absent), this handler reports a {@link CommitConflictException} and leaves
24-
* the rollback of the transaction's prepared records to the caller. This keeps the handler free of
25-
* any participant-side dependency.
21+
* <p>Operates on a transaction id and a pre-encoded {@link WriteSet} (plus, for the commit path,
22+
* the {@code committedAt} to stamp); callers encode the write set before calling.
23+
*
24+
* <p>When {@code commitState} loses a putState race and the persisted state turns out to be ABORTED
25+
* (or absent), this handler reports a {@link CommitConflictException} and leaves the rollback of
26+
* the transaction's prepared records to the caller.
2627
*/
2728
@ThreadSafe
2829
class CoordinatorCommitHandler {
2930
private static final Logger logger = LoggerFactory.getLogger(CoordinatorCommitHandler.class);
3031

3132
private final Coordinator coordinator;
32-
private final WriteSetEncoder writeSetEncoder;
3333

3434
@SuppressFBWarnings("EI_EXPOSE_REP2")
35-
CoordinatorCommitHandler(Coordinator coordinator, WriteSetEncoder writeSetEncoder) {
35+
CoordinatorCommitHandler(Coordinator coordinator) {
3636
this.coordinator = checkNotNull(coordinator);
37-
this.writeSetEncoder = checkNotNull(writeSetEncoder);
38-
}
39-
40-
/**
41-
* Writes the COMMITTED state with the {@code tx_write_set} populated from the given context's
42-
* snapshot.
43-
*
44-
* @return the {@code committedAt} timestamp written to the COMMITTED Coordinator state row. The
45-
* caller stamps the committed data records with the same value so the row and the records
46-
* share one timestamp.
47-
*/
48-
long commitState(TransactionContext context)
49-
throws CommitConflictException, UnknownTransactionStatusException {
50-
return commitStateInternal(context, writeSetEncoder.encodeSingleGroupWriteSet(context, false));
5137
}
5238

5339
/**
54-
* Writes the COMMITTED state without persisting a {@code tx_write_set}.
40+
* Writes the COMMITTED state for the transaction identified by {@code id}, persisting the given
41+
* pre-encoded {@code writeSet} (or none when {@code null}).
5542
*
56-
* @return the {@code committedAt} timestamp written to the COMMITTED Coordinator state row.
43+
* @return the {@code committedAt} stamped on the COMMITTED Coordinator state row — the value this
44+
* call generates, or, when this commit lost a putState race to an already-COMMITTED row, that
45+
* row's committedAt. The caller stamps the committed data records with the returned value so
46+
* the row and the records share one timestamp.
5747
*/
58-
long commitStateWithoutWriteSet(TransactionContext context)
48+
long commitState(String id, @Nullable WriteSet writeSet)
5949
throws CommitConflictException, UnknownTransactionStatusException {
60-
return commitStateInternal(context, null);
61-
}
62-
63-
private long commitStateInternal(TransactionContext context, @Nullable WriteSet writeSet)
64-
throws CommitConflictException, UnknownTransactionStatusException {
65-
String id = context.transactionId;
6650
try {
6751
long committedAt = System.currentTimeMillis();
6852
Coordinator.State state =
@@ -71,7 +55,7 @@ private long commitStateInternal(TransactionContext context, @Nullable WriteSet
7155
logger.debug("Transaction {} is committed successfully at {}", id, committedAt);
7256
return committedAt;
7357
} catch (CoordinatorConflictException e) {
74-
return handleCommitConflict(context, e);
58+
return handleCommitConflict(id, e);
7559
} catch (CoordinatorException e) {
7660
throw new UnknownTransactionStatusException(
7761
CoreError.CONSENSUS_COMMIT_UNKNOWN_COORDINATOR_STATUS.buildMessage(e.getMessage()),
@@ -82,21 +66,21 @@ private long commitStateInternal(TransactionContext context, @Nullable WriteSet
8266

8367
/**
8468
* Resolves a putState conflict. Returns the {@code committedAt} of the already-persisted
85-
* COMMITTED state when this transaction turns out to be already committed; otherwise rolls back
86-
* the records and throws.
69+
* COMMITTED state when this transaction turns out to be already committed; otherwise reports the
70+
* conflict (the caller rolls the records back) by throwing {@link CommitConflictException}.
8771
*/
88-
long handleCommitConflict(TransactionContext context, Exception cause)
72+
long handleCommitConflict(String id, Exception cause)
8973
throws CommitConflictException, UnknownTransactionStatusException {
9074
try {
91-
Optional<Coordinator.State> s = coordinator.getState(context.transactionId);
75+
Optional<Coordinator.State> s = coordinator.getState(id);
9276
if (s.isPresent()) {
9377
Coordinator.State persisted = s.get();
9478
if (persisted.getState() == TransactionState.ABORTED) {
9579
throw new CommitConflictException(
9680
CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_STATE.buildMessage(
9781
cause.getMessage()),
9882
cause,
99-
context.transactionId);
83+
id);
10084
}
10185
// Otherwise the coordinator state is present and COMMITTED, which means this transaction
10286
// has already committed. Only Two-phase Commit I/F reaches this branch: there the same
@@ -150,31 +134,21 @@ long handleCommitConflict(TransactionContext context, Exception cause)
150134
CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_STATE.buildMessage(
151135
cause.getMessage()),
152136
cause,
153-
context.transactionId);
137+
id);
154138
}
155139
} catch (CoordinatorException ex) {
156140
throw new UnknownTransactionStatusException(
157141
CoreError.CONSENSUS_COMMIT_CANNOT_GET_COORDINATOR_STATUS.buildMessage(ex.getMessage()),
158142
ex,
159-
context.transactionId);
143+
id);
160144
}
161145
}
162146

163147
/**
164-
* Writes the ABORTED state with the {@code tx_write_set} populated from the given context's
165-
* snapshot.
148+
* Writes the ABORTED state for the transaction identified by {@code id}, persisting the given
149+
* pre-encoded {@code writeSet} (or none when {@code null}).
166150
*/
167-
TransactionState abortState(TransactionContext context) throws UnknownTransactionStatusException {
168-
return abortStateInternal(
169-
context.transactionId, writeSetEncoder.encodeSingleGroupWriteSet(context, false));
170-
}
171-
172-
/** Writes the ABORTED state without persisting a {@code tx_write_set} via a single putState. */
173-
TransactionState abortStateWithoutWriteSet(String id) throws UnknownTransactionStatusException {
174-
return abortStateInternal(id, null);
175-
}
176-
177-
private TransactionState abortStateInternal(String id, @Nullable WriteSet writeSet)
151+
TransactionState abortState(String id, @Nullable WriteSet writeSet)
178152
throws UnknownTransactionStatusException {
179153
try {
180154
Coordinator.State state =

0 commit comments

Comments
 (0)