Skip to content

Commit 4560e32

Browse files
Backport to branch(3) : Decouple CoordinatorCommitHandler from TransactionContext and WriteSetEncoder (#3675)
Co-authored-by: Toshihiro Suzuki <brfrn169@gmail.com>
1 parent b7d0302 commit 4560e32

12 files changed

Lines changed: 621 additions & 684 deletions

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
1414
import com.scalar.db.exception.transaction.ValidationConflictException;
1515
import com.scalar.db.exception.transaction.ValidationException;
16+
import com.scalar.db.transaction.consensuscommit.proto.v1.WriteSet;
1617
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
1718
import java.util.Optional;
1819
import java.util.concurrent.Future;
@@ -39,19 +40,22 @@
3940
* (ConsensusCommit / TwoPhaseConsensusCommit / ConsensusCommitManager) can depend on the
4041
* orchestrator alone.
4142
*
42-
* <p>Public methods like {@code commitState}, {@code abortState}, {@code prepareRecords}, {@code
43-
* commitRecords}, {@code rollbackRecords}, {@code commitStateWithoutWriteSet}, and {@code
44-
* abortStateWithoutWriteSet} are exposed on this class as thin pass-throughs to the specialized
45-
* handlers so direct callers (notably {@link TwoPhaseConsensusCommit}) can drive individual commit
46-
* phases without depending on the handlers directly.
43+
* <p>Several public methods are exposed on this class so direct callers (notably {@link
44+
* TwoPhaseConsensusCommit}) can drive individual commit phases without depending on the specialized
45+
* handlers directly. {@code prepareRecords}, {@code commitRecords}, {@code rollbackRecords}, {@code
46+
* commitStateWithoutWriteSet}, and {@code abortStateWithoutWriteSet} are thin pass-throughs. {@code
47+
* commitState} and {@code abortState} are not: they encode the transaction's write set via the
48+
* orchestrator-owned {@link WriteSetEncoder} before delegating to the Coordinator-side handler.
4749
*/
4850
@ThreadSafe
4951
public class CommitHandler {
5052
private static final Logger logger = LoggerFactory.getLogger(CommitHandler.class);
5153

5254
private final CoordinatorCommitHandler coordinatorCommitHandler;
5355
private final ParticipantCommitHandler participantCommitHandler;
56+
protected final WriteSetEncoder writeSetEncoder;
5457
protected final boolean coordinatorWriteOmissionOnReadOnlyEnabled;
58+
protected final boolean coordinatorWriteSetLoggingEnabled;
5559

5660
@LazyInit @Nullable private BeforePreparationHook beforePreparationHook;
5761

@@ -66,11 +70,9 @@ public CommitHandler(
6670
boolean coordinatorWriteSetLoggingEnabled,
6771
boolean onePhaseCommitEnabled) {
6872
this.coordinatorWriteOmissionOnReadOnlyEnabled = coordinatorWriteOmissionOnReadOnlyEnabled;
69-
this.coordinatorCommitHandler =
70-
new CoordinatorCommitHandler(
71-
coordinator,
72-
new WriteSetEncoder(tableMetadataManager),
73-
coordinatorWriteSetLoggingEnabled);
73+
this.coordinatorWriteSetLoggingEnabled = coordinatorWriteSetLoggingEnabled;
74+
this.writeSetEncoder = new WriteSetEncoder(tableMetadataManager);
75+
this.coordinatorCommitHandler = new CoordinatorCommitHandler(coordinator);
7476
this.participantCommitHandler =
7577
new ParticipantCommitHandler(
7678
storage,
@@ -87,9 +89,13 @@ public CommitHandler(
8789
@SuppressFBWarnings("EI_EXPOSE_REP2")
8890
protected CommitHandler(
8991
boolean coordinatorWriteOmissionOnReadOnlyEnabled,
92+
boolean coordinatorWriteSetLoggingEnabled,
93+
WriteSetEncoder writeSetEncoder,
9094
CoordinatorCommitHandler coordinatorCommitHandler,
9195
ParticipantCommitHandler participantCommitHandler) {
9296
this.coordinatorWriteOmissionOnReadOnlyEnabled = coordinatorWriteOmissionOnReadOnlyEnabled;
97+
this.coordinatorWriteSetLoggingEnabled = coordinatorWriteSetLoggingEnabled;
98+
this.writeSetEncoder = checkNotNull(writeSetEncoder);
9399
this.coordinatorCommitHandler = checkNotNull(coordinatorCommitHandler);
94100
this.participantCommitHandler = checkNotNull(participantCommitHandler);
95101
}
@@ -296,22 +302,44 @@ void onePhaseCommitRecords(TransactionContext context)
296302

297303
public long commitState(TransactionContext context)
298304
throws CommitConflictException, UnknownTransactionStatusException {
299-
return coordinatorCommitHandler.commitState(context);
305+
return coordinatorCommitHandler.commitState(
306+
context.transactionId, encodeWriteSetIfLoggingEnabled(context));
307+
}
308+
309+
// The tx_write_set column is part of the Coordinator schema only when the opt-in
310+
// `coordinator.write_set_logging.enabled` config is on. When it is off, skip encoding entirely so
311+
// no WriteSet is persisted (the column is not part of the schema in that case).
312+
@Nullable
313+
private WriteSet encodeWriteSetIfLoggingEnabled(TransactionContext context) {
314+
return coordinatorWriteSetLoggingEnabled
315+
? writeSetEncoder.encodeSingleGroupWriteSet(context, false)
316+
: null;
300317
}
301318

319+
// 2PC-only. Delegates to commitState(id, null) / abortState(id, null), which the group-commit
320+
// coordinator handler overrides to route through the group committer. That null never reaches the
321+
// group committer here because 2PC forbids group commit
322+
// (TwoPhaseConsensusCommitManager#throwIfGroupCommitIsEnabled) and never builds a
323+
// CommitHandlerWithGroupCommit.
324+
//
325+
// TODO: revisit this if/when the Two-phase Commit I/F is removed.
302326
public long commitStateWithoutWriteSet(TransactionContext context)
303327
throws CommitConflictException, UnknownTransactionStatusException {
304-
return coordinatorCommitHandler.commitStateWithoutWriteSet(context);
328+
return coordinatorCommitHandler.commitState(context.transactionId, null);
305329
}
306330

307331
public TransactionState abortState(TransactionContext context)
308332
throws UnknownTransactionStatusException {
309-
return coordinatorCommitHandler.abortState(context);
333+
return coordinatorCommitHandler.abortState(
334+
context.transactionId, encodeWriteSetIfLoggingEnabled(context));
310335
}
311336

337+
// 2PC-only; see the null-write-set note on commitStateWithoutWriteSet above.
338+
//
339+
// TODO: revisit this if/when the Two-phase Commit I/F is removed.
312340
public TransactionState abortStateWithoutWriteSet(String id)
313341
throws UnknownTransactionStatusException {
314-
return coordinatorCommitHandler.abortStateWithoutWriteSet(id);
342+
return coordinatorCommitHandler.abortState(id, null);
315343
}
316344

317345
public TransactionState forceAbortState(String id) throws UnknownTransactionStatusException {

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandlerWithGroupCommit.java

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static com.google.common.base.Preconditions.checkNotNull;
44

55
import com.scalar.db.api.DistributedStorage;
6+
import com.scalar.db.api.TransactionState;
67
import com.scalar.db.exception.transaction.CommitConflictException;
78
import com.scalar.db.exception.transaction.CommitException;
89
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
@@ -59,11 +60,10 @@ private CommitHandlerWithGroupCommit(
5960
CoordinatorGroupCommitter groupCommitter) {
6061
this(
6162
coordinatorWriteOmissionOnReadOnlyEnabled,
63+
coordinatorWriteSetLoggingEnabled,
64+
new WriteSetEncoder(tableMetadataManager),
6265
new CoordinatorCommitHandlerWithGroupCommit(
63-
coordinator,
64-
new WriteSetEncoder(tableMetadataManager),
65-
groupCommitter,
66-
coordinatorWriteSetLoggingEnabled),
66+
coordinator, groupCommitter, coordinatorWriteSetLoggingEnabled),
6767
participantCommitHandler);
6868
}
6969

@@ -72,9 +72,16 @@ private CommitHandlerWithGroupCommit(
7272
@SuppressFBWarnings("EI_EXPOSE_REP2")
7373
CommitHandlerWithGroupCommit(
7474
boolean coordinatorWriteOmissionOnReadOnlyEnabled,
75+
boolean coordinatorWriteSetLoggingEnabled,
76+
WriteSetEncoder writeSetEncoder,
7577
CoordinatorCommitHandlerWithGroupCommit coordinatorHandler,
7678
ParticipantCommitHandler participantCommitHandler) {
77-
super(coordinatorWriteOmissionOnReadOnlyEnabled, coordinatorHandler, participantCommitHandler);
79+
super(
80+
coordinatorWriteOmissionOnReadOnlyEnabled,
81+
coordinatorWriteSetLoggingEnabled,
82+
writeSetEncoder,
83+
coordinatorHandler,
84+
participantCommitHandler);
7885
this.coordinatorHandler = coordinatorHandler;
7986
}
8087

@@ -87,45 +94,68 @@ public void commit(TransactionContext context)
8794

8895
/**
8996
* Releases the group-commit slot reservation when the orchestrator will not write a Coordinator
90-
* state row through the normal 2-phase commit path. That happens when the transaction was begun
91-
* as non-read-only (and therefore reserved a slot) but turned out to have no writes/deletes,
92-
* combined with coordinator-write-omission being enabled — in which case {@link
93-
* CommitHandler#commit} skips {@code commitState} entirely and the reserved slot would otherwise
94-
* sit unused for the rest of the commit flow.
95-
*
96-
* <p>The other places that call {@link
97-
* CoordinatorCommitHandlerWithGroupCommit#cancelGroupCommitIfNeeded} (one-phase commit,
98-
* abort/conflict paths, before-commit failure cleanup) cannot share this predicate: at those
99-
* sites the transaction has writes/deletes, so the predicate would block the cancellation that
100-
* those paths actually need.
97+
* state row through the normal 2-phase commit path: a non-read-only transaction (which reserved a
98+
* slot) that turns out to have no writes/deletes while coordinator-write-omission is enabled, so
99+
* {@link CommitHandler#commit} skips {@code commitState} and the slot would otherwise sit unused.
101100
*/
102101
private void cancelGroupCommitIfCoordinatorStateOmitted(TransactionContext context) {
103102
if (!context.readOnly
104103
&& !context.snapshot.hasWritesOrDeletes()
105104
&& coordinatorWriteOmissionOnReadOnlyEnabled) {
106-
coordinatorHandler.cancelGroupCommitIfNeeded(context);
105+
cancelGroupCommitIfSlotReserved(context);
106+
}
107+
}
108+
109+
private void cancelGroupCommitIfSlotReserved(TransactionContext context) {
110+
// A transaction only holds a group commit slot when one was reserved at begin time (e.g., a
111+
// read-only transaction does not reserve a slot when coordinator write omission is enabled).
112+
// When no slot was reserved there is nothing to release.
113+
if (!context.groupCommitSlotReserved) {
114+
return;
107115
}
116+
117+
// FIXME: When a slot was reserved (so the early return above did not fire), this can run more
118+
// than once on a single failure path, because several cleanup callbacks each release the slot:
119+
// - onFailureBeforeCommit() runs on the pre-commit-state failure paths (via
120+
// safelyCallOnFailureBeforeCommit);
121+
// - abortState() additionally runs from abortStateAndRollbackRecordsIfNeeded() when the
122+
// transaction has writes/deletes, or coordinator write omission on read-only is disabled;
123+
// - onePhaseCommitRecords() pre-cancels the slot before its body, so a one-phase-commit
124+
// failure followed by onFailureBeforeCommit() also cancels twice.
125+
// It is currently safe only because groupCommitter.remove() is idempotent. The cleanup paths
126+
// should be reworked so the slot is released exactly once instead of relying on that
127+
// idempotency.
128+
coordinatorHandler.cancelGroupCommit(context.transactionId);
108129
}
109130

110131
@Override
111132
boolean canOnePhaseCommit(TransactionContext context) throws CommitException {
112133
try {
113134
return super.canOnePhaseCommit(context);
114135
} catch (CommitException e) {
115-
coordinatorHandler.cancelGroupCommitIfNeeded(context);
136+
cancelGroupCommitIfSlotReserved(context);
116137
throw e;
117138
}
118139
}
119140

120141
@Override
121142
void onePhaseCommitRecords(TransactionContext context)
122143
throws CommitConflictException, UnknownTransactionStatusException {
123-
coordinatorHandler.cancelGroupCommitIfNeeded(context);
144+
cancelGroupCommitIfSlotReserved(context);
124145
super.onePhaseCommitRecords(context);
125146
}
126147

127148
@Override
128149
protected void onFailureBeforeCommit(TransactionContext context) {
129-
coordinatorHandler.cancelGroupCommitIfNeeded(context);
150+
cancelGroupCommitIfSlotReserved(context);
151+
}
152+
153+
@Override
154+
public TransactionState abortState(TransactionContext context)
155+
throws UnknownTransactionStatusException {
156+
// Release the group-commit slot before writing the ABORTED state so it does not sit in the
157+
// group buffer waiting for a sibling.
158+
cancelGroupCommitIfSlotReserved(context);
159+
return super.abortState(context);
130160
}
131161
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/CoordinatorCommitHandler.java

Lines changed: 25 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -18,64 +18,35 @@
1818
* Handles the Coordinator-side (state-table) operations of the Consensus Commit protocol: writing
1919
* the COMMITTED / ABORTED records to the Coordinator state table and resolving putState conflicts.
2020
*
21-
* <p>Methods here only touch the Coordinator state table via {@link Coordinator}; they never touch
22-
* user data tables. When {@code commitState} loses a putState race and the persisted state turns
23-
* out to be ABORTED (or absent), this handler reports a {@link CommitConflictException} and leaves
24-
* the rollback of the transaction's prepared records to the caller. This keeps the handler free of
25-
* any participant-side dependency.
21+
* <p>Operates on a transaction id and a pre-encoded {@link WriteSet} (plus, for the commit path,
22+
* the {@code committedAt} to stamp); callers encode the write set before calling.
23+
*
24+
* <p>When {@code commitState} loses a putState race and the persisted state turns out to be ABORTED
25+
* (or absent), this handler reports a {@link CommitConflictException} and leaves the rollback of
26+
* the transaction's prepared records to the caller.
2627
*/
2728
@ThreadSafe
2829
class CoordinatorCommitHandler {
2930
private static final Logger logger = LoggerFactory.getLogger(CoordinatorCommitHandler.class);
3031

3132
private final Coordinator coordinator;
32-
private final WriteSetEncoder writeSetEncoder;
33-
private final boolean coordinatorWriteSetLoggingEnabled;
3433

3534
@SuppressFBWarnings("EI_EXPOSE_REP2")
36-
CoordinatorCommitHandler(
37-
Coordinator coordinator,
38-
WriteSetEncoder writeSetEncoder,
39-
boolean coordinatorWriteSetLoggingEnabled) {
35+
CoordinatorCommitHandler(Coordinator coordinator) {
4036
this.coordinator = checkNotNull(coordinator);
41-
this.writeSetEncoder = checkNotNull(writeSetEncoder);
42-
this.coordinatorWriteSetLoggingEnabled = coordinatorWriteSetLoggingEnabled;
43-
}
44-
45-
/**
46-
* Writes the COMMITTED state. When coordinator write-set logging is enabled, the {@code
47-
* tx_write_set} column is populated from the given context's snapshot; otherwise the column is
48-
* omitted.
49-
*
50-
* @return the {@code committedAt} timestamp written to the COMMITTED Coordinator state row. The
51-
* caller stamps the committed data records with the same value so the row and the records
52-
* share one timestamp.
53-
*/
54-
long commitState(TransactionContext context)
55-
throws CommitConflictException, UnknownTransactionStatusException {
56-
// The tx_write_set column is added only when the opt-in `coordinator.write_set_logging.enabled`
57-
// config is enabled. When it's disabled the column is not part of the Coordinator schema, so we
58-
// must skip encoding/persisting the WriteSet entirely.
59-
WriteSet writeSet =
60-
coordinatorWriteSetLoggingEnabled
61-
? writeSetEncoder.encodeSingleGroupWriteSet(context, false)
62-
: null;
63-
return commitStateInternal(context, writeSet);
6437
}
6538

6639
/**
67-
* Writes the COMMITTED state without persisting a {@code tx_write_set}.
40+
* Writes the COMMITTED state for the transaction identified by {@code id}, persisting the given
41+
* pre-encoded {@code writeSet} (or none when {@code null}).
6842
*
69-
* @return the {@code committedAt} timestamp written to the COMMITTED Coordinator state row.
43+
* @return the {@code committedAt} stamped on the COMMITTED Coordinator state row — the value this
44+
* call generates, or, when this commit lost a putState race to an already-COMMITTED row, that
45+
* row's committedAt. The caller stamps the committed data records with the returned value so
46+
* the row and the records share one timestamp.
7047
*/
71-
long commitStateWithoutWriteSet(TransactionContext context)
48+
long commitState(String id, @Nullable WriteSet writeSet)
7249
throws CommitConflictException, UnknownTransactionStatusException {
73-
return commitStateInternal(context, null);
74-
}
75-
76-
private long commitStateInternal(TransactionContext context, @Nullable WriteSet writeSet)
77-
throws CommitConflictException, UnknownTransactionStatusException {
78-
String id = context.transactionId;
7950
try {
8051
long committedAt = System.currentTimeMillis();
8152
Coordinator.State state =
@@ -84,7 +55,7 @@ private long commitStateInternal(TransactionContext context, @Nullable WriteSet
8455
logger.debug("Transaction {} is committed successfully at {}", id, committedAt);
8556
return committedAt;
8657
} catch (CoordinatorConflictException e) {
87-
return handleCommitConflict(context, e);
58+
return handleCommitConflict(id, e);
8859
} catch (CoordinatorException e) {
8960
throw new UnknownTransactionStatusException(
9061
CoreError.CONSENSUS_COMMIT_UNKNOWN_COORDINATOR_STATUS.buildMessage(e.getMessage()),
@@ -95,21 +66,21 @@ private long commitStateInternal(TransactionContext context, @Nullable WriteSet
9566

9667
/**
9768
* Resolves a putState conflict. Returns the {@code committedAt} of the already-persisted
98-
* COMMITTED state when this transaction turns out to be already committed; otherwise rolls back
99-
* the records and throws.
69+
* COMMITTED state when this transaction turns out to be already committed; otherwise reports the
70+
* conflict (the caller rolls the records back) by throwing {@link CommitConflictException}.
10071
*/
101-
long handleCommitConflict(TransactionContext context, Exception cause)
72+
long handleCommitConflict(String id, Exception cause)
10273
throws CommitConflictException, UnknownTransactionStatusException {
10374
try {
104-
Optional<Coordinator.State> s = coordinator.getState(context.transactionId);
75+
Optional<Coordinator.State> s = coordinator.getState(id);
10576
if (s.isPresent()) {
10677
Coordinator.State persisted = s.get();
10778
if (persisted.getState() == TransactionState.ABORTED) {
10879
throw new CommitConflictException(
10980
CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_STATE.buildMessage(
11081
cause.getMessage()),
11182
cause,
112-
context.transactionId);
83+
id);
11384
}
11485
// Otherwise the coordinator state is present and COMMITTED, which means this transaction
11586
// has already committed. Only Two-phase Commit I/F reaches this branch: there the same
@@ -163,37 +134,21 @@ long handleCommitConflict(TransactionContext context, Exception cause)
163134
CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_STATE.buildMessage(
164135
cause.getMessage()),
165136
cause,
166-
context.transactionId);
137+
id);
167138
}
168139
} catch (CoordinatorException ex) {
169140
throw new UnknownTransactionStatusException(
170141
CoreError.CONSENSUS_COMMIT_CANNOT_GET_COORDINATOR_STATUS.buildMessage(ex.getMessage()),
171142
ex,
172-
context.transactionId);
143+
id);
173144
}
174145
}
175146

176147
/**
177-
* Writes the ABORTED state. When coordinator write-set logging is enabled, the {@code
178-
* tx_write_set} column is populated from the given context's snapshot; otherwise the column is
179-
* omitted.
148+
* Writes the ABORTED state for the transaction identified by {@code id}, persisting the given
149+
* pre-encoded {@code writeSet} (or none when {@code null}).
180150
*/
181-
TransactionState abortState(TransactionContext context) throws UnknownTransactionStatusException {
182-
// Same opt-in gating as commitState: skip WriteSet encoding when write-set logging is
183-
// disabled, since the Coordinator schema does not include the column in that case.
184-
WriteSet writeSet =
185-
coordinatorWriteSetLoggingEnabled
186-
? writeSetEncoder.encodeSingleGroupWriteSet(context, false)
187-
: null;
188-
return abortStateInternal(context.transactionId, writeSet);
189-
}
190-
191-
/** Writes the ABORTED state without persisting a {@code tx_write_set} via a single putState. */
192-
TransactionState abortStateWithoutWriteSet(String id) throws UnknownTransactionStatusException {
193-
return abortStateInternal(id, null);
194-
}
195-
196-
private TransactionState abortStateInternal(String id, @Nullable WriteSet writeSet)
151+
TransactionState abortState(String id, @Nullable WriteSet writeSet)
197152
throws UnknownTransactionStatusException {
198153
try {
199154
Coordinator.State state =

0 commit comments

Comments
 (0)