Skip to content

Commit 5f13f8b

Browse files
committed
refactor(consensuscommit): unify commit-phase timestamps via orchestrator-provided timestamps
Previously every record/row written during a single ConsensusCommit transaction generated its own System.currentTimeMillis() at the call site, so preparedAt/committedAt could differ across rows of the same transaction and between the data records and the COMMITTED Coordinator state row. Align timestamps where multiple writes must share one value: the caller generates the timestamp once and passes it down as an explicit method argument. - preparedAt: generated once and passed to prepareRecords, so every prepared row shares it -- and, with the upcoming TwoPhaseCommit interfaces, so multiple participants can be given the same value. - committedAt: generated once and passed to commitState + commitRecords, so the COMMITTED Coordinator state row and every committed data row share it (group-commit-disabled). The no-timestamp constructors of PrepareMutationComposer and CommitMutationComposer are removed so the caller always supplies the value (RecoveryHandler's roll-forward passes System.currentTimeMillis() explicitly). AbstractMutationComposer keeps only the explicit-timestamp constructor and no longer calls System.currentTimeMillis() itself. Deliberately NOT threaded, because there is no cross-operation alignment to enforce -- these generate their own timestamp internally: - one-phase commit (onePhaseCommitRecords / OnePhaseCommitMutationComposer): the single-participant fast path; one composer writes all rows, with no Coordinator state row, so its rows already share one timestamp for both PREPARED_AT and COMMITTED_AT. - abort (abortState / abortStateWithoutWriteSet): writes a single Coordinator row. - rollback (rollbackRecords / RollbackMutationComposer): restores before-images and writes no fresh timestamp. - Coordinator.forceAbort: its parent-id marker and full-id ABORTED rows can be written by different lazy-recovery runs, so no invariant binds their createdAt; Coordinator.java is left unchanged. - group-commit Emitter's batched COMMITTED row: a shared row cannot carry a per-transaction value, so it keeps an emit-time timestamp. - RecoveryHandler/RecoveryExecutor: independent transactions, out of scope.
1 parent 84aab20 commit 5f13f8b

22 files changed

Lines changed: 544 additions & 206 deletions

core/src/main/java/com/scalar/db/transaction/consensuscommit/AbstractMutationComposer.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.scalar.db.transaction.consensuscommit;
22

3-
import com.google.common.annotations.VisibleForTesting;
43
import com.google.common.collect.ImmutableList;
54
import com.scalar.db.api.Mutation;
65
import com.scalar.db.api.PutBuilder;
@@ -15,22 +14,14 @@
1514
public abstract class AbstractMutationComposer implements MutationComposer {
1615
protected final String id;
1716
protected final List<Mutation> mutations;
18-
protected final long current;
17+
protected final long timestamp;
1918
protected final TransactionTableMetadataManager tableMetadataManager;
2019

21-
public AbstractMutationComposer(String id, TransactionTableMetadataManager tableMetadataManager) {
20+
public AbstractMutationComposer(
21+
String id, long timestamp, TransactionTableMetadataManager tableMetadataManager) {
2222
this.id = id;
2323
this.mutations = new ArrayList<>();
24-
this.current = System.currentTimeMillis();
25-
this.tableMetadataManager = tableMetadataManager;
26-
}
27-
28-
@VisibleForTesting
29-
AbstractMutationComposer(
30-
String id, long current, TransactionTableMetadataManager tableMetadataManager) {
31-
this.id = id;
32-
this.mutations = new ArrayList<>();
33-
this.current = current;
24+
this.timestamp = timestamp;
3425
this.tableMetadataManager = tableMetadataManager;
3526
}
3627

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitHandler.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ public void commit(TransactionContext context)
172172

173173
if (hasWritesOrDeletesInSnapshot) {
174174
try {
175-
prepareRecords(context);
175+
prepareRecords(context, System.currentTimeMillis());
176176
} catch (PreparationException e) {
177177
safelyCallOnFailureBeforeCommit(context);
178178
abortState(context);
@@ -207,8 +207,12 @@ public void commit(TransactionContext context)
207207
context, beforePreparationHookFuture.orElse(null), hasWritesOrDeletesInSnapshot);
208208

209209
if (hasWritesOrDeletesInSnapshot || !coordinatorWriteOmissionOnReadOnlyEnabled) {
210+
long committedAt;
210211
try {
211-
commitState(context);
212+
// commitState writes the COMMITTED Coordinator state row and returns the committedAt it
213+
// stamped (in group commit, the emit-time value shared across the batch). The data records
214+
// are then committed with the same value so the row and the records share one timestamp.
215+
committedAt = commitState(context);
212216
} catch (CommitConflictException e) {
213217
// The COMMITTED-state write lost a putState race that resolved to ABORTED (or absent): the
214218
// transaction is aborted. The Coordinator-side handler only reports the conflict; the
@@ -219,9 +223,9 @@ public void commit(TransactionContext context)
219223
}
220224
throw e;
221225
}
222-
}
223-
if (hasWritesOrDeletesInSnapshot) {
224-
commitRecords(context);
226+
if (hasWritesOrDeletesInSnapshot) {
227+
commitRecords(context, committedAt);
228+
}
225229
}
226230
}
227231

@@ -251,16 +255,17 @@ private void abortStateAndRollbackRecordsIfNeeded(
251255
//
252256
// TODO: revisit this if/when the Two-phase Commit I/F is removed.
253257

254-
public void prepareRecords(TransactionContext context) throws PreparationException {
255-
participantCommitHandler.prepareRecords(context);
258+
public void prepareRecords(TransactionContext context, long preparedAt)
259+
throws PreparationException {
260+
participantCommitHandler.prepareRecords(context, preparedAt);
256261
}
257262

258263
public void validateRecords(TransactionContext context) throws ValidationException {
259264
participantCommitHandler.validateRecords(context);
260265
}
261266

262-
public void commitRecords(TransactionContext context) {
263-
participantCommitHandler.commitRecords(context);
267+
public void commitRecords(TransactionContext context, long committedAt) {
268+
participantCommitHandler.commitRecords(context, committedAt);
264269
}
265270

266271
public void rollbackRecords(TransactionContext context) {
@@ -285,14 +290,14 @@ void onePhaseCommitRecords(TransactionContext context)
285290
participantCommitHandler.onePhaseCommitRecords(context);
286291
}
287292

288-
public void commitState(TransactionContext context)
293+
public long commitState(TransactionContext context)
289294
throws CommitConflictException, UnknownTransactionStatusException {
290-
coordinatorCommitHandler.commitState(context);
295+
return coordinatorCommitHandler.commitState(context);
291296
}
292297

293-
public void commitStateWithoutWriteSet(TransactionContext context)
298+
public long commitStateWithoutWriteSet(TransactionContext context)
294299
throws CommitConflictException, UnknownTransactionStatusException {
295-
coordinatorCommitHandler.commitStateWithoutWriteSet(context);
300+
return coordinatorCommitHandler.commitStateWithoutWriteSet(context);
296301
}
297302

298303
public TransactionState abortState(TransactionContext context)

core/src/main/java/com/scalar/db/transaction/consensuscommit/CommitMutationComposer.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import static com.scalar.db.transaction.consensuscommit.Attribute.STATE;
66
import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.getTransactionTableMetadata;
77

8-
import com.google.common.annotations.VisibleForTesting;
98
import com.scalar.db.api.ConditionBuilder;
109
import com.scalar.db.api.Consistency;
1110
import com.scalar.db.api.Delete;
@@ -31,14 +30,9 @@
3130
public class CommitMutationComposer extends AbstractMutationComposer {
3231
private static final Logger logger = LoggerFactory.getLogger(CommitMutationComposer.class);
3332

34-
public CommitMutationComposer(String id, TransactionTableMetadataManager tableMetadataManager) {
35-
super(id, tableMetadataManager);
36-
}
37-
38-
@VisibleForTesting
39-
CommitMutationComposer(
40-
String id, long current, TransactionTableMetadataManager tableMetadataManager) {
41-
super(id, current, tableMetadataManager);
33+
public CommitMutationComposer(
34+
String id, long timestamp, TransactionTableMetadataManager tableMetadataManager) {
35+
super(id, timestamp, tableMetadataManager);
4236
}
4337

4438
@Override
@@ -95,7 +89,7 @@ private Put composePut(Operation base, @Nullable TransactionResult result)
9589
ConditionBuilder.column(STATE)
9690
.isEqualToInt(TransactionState.PREPARED.get()))
9791
.build())
98-
.bigIntValue(COMMITTED_AT, current)
92+
.bigIntValue(COMMITTED_AT, timestamp)
9993
.intValue(STATE, TransactionState.COMMITTED.get())
10094
.consistency(Consistency.LINEARIZABLE);
10195
getClusteringKey(base, result).ifPresent(putBuilder::clusteringKey);

core/src/main/java/com/scalar/db/transaction/consensuscommit/CoordinatorCommitHandler.java

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,30 +40,38 @@ class CoordinatorCommitHandler {
4040
/**
4141
* Writes the COMMITTED state with the {@code tx_write_set} populated from the given context's
4242
* snapshot.
43+
*
44+
* @return the {@code committedAt} timestamp written to the COMMITTED Coordinator state row. The
45+
* caller stamps the committed data records with the same value so the row and the records
46+
* share one timestamp.
4347
*/
44-
void commitState(TransactionContext context)
48+
long commitState(TransactionContext context)
4549
throws CommitConflictException, UnknownTransactionStatusException {
46-
commitStateInternal(context, writeSetEncoder.encodeSingleGroupWriteSet(context, false));
50+
return commitStateInternal(context, writeSetEncoder.encodeSingleGroupWriteSet(context, false));
4751
}
4852

49-
/** Writes the COMMITTED state without persisting a {@code tx_write_set}. */
50-
void commitStateWithoutWriteSet(TransactionContext context)
53+
/**
54+
* Writes the COMMITTED state without persisting a {@code tx_write_set}.
55+
*
56+
* @return the {@code committedAt} timestamp written to the COMMITTED Coordinator state row.
57+
*/
58+
long commitStateWithoutWriteSet(TransactionContext context)
5159
throws CommitConflictException, UnknownTransactionStatusException {
52-
commitStateInternal(context, null);
60+
return commitStateInternal(context, null);
5361
}
5462

55-
private void commitStateInternal(TransactionContext context, @Nullable WriteSet writeSet)
63+
private long commitStateInternal(TransactionContext context, @Nullable WriteSet writeSet)
5664
throws CommitConflictException, UnknownTransactionStatusException {
5765
String id = context.transactionId;
5866
try {
67+
long committedAt = System.currentTimeMillis();
5968
Coordinator.State state =
60-
new Coordinator.State(
61-
id, writeSet, TransactionState.COMMITTED, System.currentTimeMillis());
69+
new Coordinator.State(id, writeSet, TransactionState.COMMITTED, committedAt);
6270
coordinator.putState(state);
63-
logger.debug(
64-
"Transaction {} is committed successfully at {}", id, System.currentTimeMillis());
71+
logger.debug("Transaction {} is committed successfully at {}", id, committedAt);
72+
return committedAt;
6573
} catch (CoordinatorConflictException e) {
66-
handleCommitConflict(context, e);
74+
return handleCommitConflict(context, e);
6775
} catch (CoordinatorException e) {
6876
throw new UnknownTransactionStatusException(
6977
CoreError.CONSENSUS_COMMIT_UNKNOWN_COORDINATOR_STATUS.buildMessage(e.getMessage()),
@@ -72,13 +80,18 @@ private void commitStateInternal(TransactionContext context, @Nullable WriteSet
7280
}
7381
}
7482

75-
void handleCommitConflict(TransactionContext context, Exception cause)
83+
/**
84+
* Resolves a putState conflict. Returns the {@code committedAt} of the already-persisted
85+
* COMMITTED state when this transaction turns out to be already committed; otherwise rolls back
86+
* the records and throws.
87+
*/
88+
long handleCommitConflict(TransactionContext context, Exception cause)
7689
throws CommitConflictException, UnknownTransactionStatusException {
7790
try {
7891
Optional<Coordinator.State> s = coordinator.getState(context.transactionId);
7992
if (s.isPresent()) {
80-
TransactionState state = s.get().getState();
81-
if (state == TransactionState.ABORTED) {
93+
Coordinator.State persisted = s.get();
94+
if (persisted.getState() == TransactionState.ABORTED) {
8295
throw new CommitConflictException(
8396
CoreError.CONSENSUS_COMMIT_CONFLICT_OCCURRED_WHEN_COMMITTING_STATE.buildMessage(
8497
cause.getMessage()),
@@ -93,10 +106,13 @@ void handleCommitConflict(TransactionContext context, Exception cause)
93106
// putIfNotExists race and observes it here. With One-phase Commit I/F this is unreachable:
94107
// this commit is the only writer of the COMMITTED state and it just lost the race, so the
95108
// conflicting row was an ABORTED from a lazy recovery, handled above. The transaction is
96-
// committed, so return normally and let the caller commit the records.
109+
// committed, so return the persisted row's committedAt and let the caller commit the
110+
// records with it (keeping the row and the records on a single timestamp).
97111
//
98112
// TODO: revisit this if/when the Two-phase Commit I/F is removed -- it would then be
99113
// unreachable (a COMMITTED state could never be observed after a conflict here).
114+
115+
return persisted.getCreatedAt();
100116
} else {
101117
// The coordinator state is absent: a row existed when our putIfNotExists lost the race, but
102118
// it is gone now. In both interfaces this means the conflicting row was an ABORTED written

core/src/main/java/com/scalar/db/transaction/consensuscommit/CoordinatorCommitHandlerWithGroupCommit.java

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -44,23 +44,32 @@ class CoordinatorCommitHandlerWithGroupCommit extends CoordinatorCommitHandler {
4444
this.groupCommitter = groupCommitter;
4545
}
4646

47+
/**
48+
* Group commits the COMMITTED state. The {@code committedAt} stamped on the Coordinator row is
49+
* generated by the Emitter when it writes the batched row, and that single value is handed back
50+
* to every transaction in the batch through {@link CoordinatorGroupCommitter#ready}. This method
51+
* returns it so the caller can stamp this transaction's committed data records with the same
52+
* value, keeping the (batched) Coordinator row and the records on a single timestamp.
53+
*/
4754
@Override
48-
void commitState(TransactionContext context)
55+
long commitState(TransactionContext context)
4956
throws CommitConflictException, UnknownTransactionStatusException {
5057
String id = context.transactionId;
5158
try {
52-
// Group commit the state by internally calling `groupCommitState()` via the emitter.
53-
groupCommitter.ready(id, context);
54-
logger.debug(
55-
"Transaction {} is committed successfully at {}", id, System.currentTimeMillis());
59+
// Group commit the state by internally calling `groupCommitState()` via the emitter. The
60+
// returned value is the emit-time committedAt the Emitter wrote to the batched row.
61+
Long committedAt = groupCommitter.ready(id, context);
62+
assert committedAt != null;
63+
logger.debug("Transaction {} is committed successfully at {}", id, committedAt);
64+
return committedAt;
5665
} catch (GroupCommitConflictException e) {
5766
cancelGroupCommitIfNeeded(context);
58-
handleCommitConflict(context, e);
67+
return handleCommitConflict(context, e);
5968
} catch (GroupCommitException e) {
6069
cancelGroupCommitIfNeeded(context);
6170
Throwable cause = e.getCause();
6271
if (cause instanceof CoordinatorConflictException) {
63-
handleCommitConflict(context, (CoordinatorConflictException) cause);
72+
return handleCommitConflict(context, (CoordinatorConflictException) cause);
6473
} else {
6574
// Failed to access the coordinator state. The state is unknown.
6675
throw new UnknownTransactionStatusException("Coordinator status is unknown", cause, id);
@@ -112,7 +121,7 @@ void cancelGroupCommitIfNeeded(TransactionContext context) {
112121
}
113122

114123
@VisibleForTesting
115-
static class Emitter implements Emittable<String, String, TransactionContext, Void> {
124+
static class Emitter implements Emittable<String, String, TransactionContext, Long> {
116125
private final Coordinator coordinator;
117126
private final WriteSetEncoder writeSetEncoder;
118127

@@ -122,7 +131,7 @@ static class Emitter implements Emittable<String, String, TransactionContext, Vo
122131
}
123132

124133
@Override
125-
public Void emitNormalGroup(String parentId, List<TransactionContext> contexts)
134+
public Long emitNormalGroup(String parentId, List<TransactionContext> contexts)
126135
throws CoordinatorException {
127136
if (contexts.isEmpty()) {
128137
return null;
@@ -135,32 +144,33 @@ public Void emitNormalGroup(String parentId, List<TransactionContext> contexts)
135144
for (TransactionContext context : contexts) {
136145
childIds.add(KEY_MANIPULATOR.keysFromFullKey(context.transactionId).childKey);
137146
}
147+
// Generate one committedAt for the whole batched row and return it so every transaction in
148+
// this group stamps its committed data records with the same value.
149+
long committedAt = System.currentTimeMillis();
138150
coordinator.putState(
139151
new State(
140152
parentId,
141153
childIds,
142154
writeSetEncoder.encodeMultiGroupWriteSet(contexts, false),
143155
TransactionState.COMMITTED,
144-
System.currentTimeMillis()));
156+
committedAt));
145157
logger.debug(
146-
"Transaction {} (parent ID) is committed successfully at {}",
147-
parentId,
148-
System.currentTimeMillis());
149-
return null;
158+
"Transaction {} (parent ID) is committed successfully at {}", parentId, committedAt);
159+
return committedAt;
150160
}
151161

152162
@Override
153-
public Void emitDelayedGroup(String fullId, TransactionContext context)
163+
public Long emitDelayedGroup(String fullId, TransactionContext context)
154164
throws CoordinatorException {
165+
long committedAt = System.currentTimeMillis();
155166
coordinator.putState(
156167
new State(
157168
fullId,
158169
writeSetEncoder.encodeSingleGroupWriteSet(context, false),
159170
TransactionState.COMMITTED,
160-
System.currentTimeMillis()));
161-
logger.debug(
162-
"Transaction {} is committed successfully at {}", fullId, System.currentTimeMillis());
163-
return null;
171+
committedAt));
172+
logger.debug("Transaction {} is committed successfully at {}", fullId, committedAt);
173+
return committedAt;
164174
}
165175
}
166176
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/CoordinatorGroupCommitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import java.util.Optional;
77

88
public class CoordinatorGroupCommitter
9-
extends GroupCommitter<String, String, String, String, String, TransactionContext, Void> {
9+
extends GroupCommitter<String, String, String, String, String, TransactionContext, Long> {
1010
CoordinatorGroupCommitter(GroupCommitConfig config) {
1111
super("coordinator", config, new CoordinatorGroupCommitKeyManipulator());
1212
}

core/src/main/java/com/scalar/db/transaction/consensuscommit/OnePhaseCommitMutationComposer.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,18 @@
2323
@NotThreadSafe
2424
public class OnePhaseCommitMutationComposer extends AbstractMutationComposer {
2525

26+
// One-phase commit is the single-participant fast path, so there is no cross-participant
27+
// timestamp to coordinate; the composer generates its own timestamp for both PREPARED_AT and
28+
// COMMITTED_AT.
2629
public OnePhaseCommitMutationComposer(
2730
String id, TransactionTableMetadataManager tableMetadataManager) {
28-
super(id, tableMetadataManager);
31+
super(id, System.currentTimeMillis(), tableMetadataManager);
2932
}
3033

3134
@VisibleForTesting
3235
OnePhaseCommitMutationComposer(
33-
String id, long current, TransactionTableMetadataManager tableMetadataManager) {
34-
super(id, current, tableMetadataManager);
36+
String id, long timestamp, TransactionTableMetadataManager tableMetadataManager) {
37+
super(id, timestamp, tableMetadataManager);
3538
}
3639

3740
@Override
@@ -66,8 +69,8 @@ private Put composePut(Put base, @Nullable TransactionResult result) throws Exec
6669

6770
putBuilder.textValue(Attribute.ID, id);
6871
putBuilder.intValue(Attribute.STATE, TransactionState.COMMITTED.get());
69-
putBuilder.bigIntValue(Attribute.PREPARED_AT, current);
70-
putBuilder.bigIntValue(Attribute.COMMITTED_AT, current);
72+
putBuilder.bigIntValue(Attribute.PREPARED_AT, timestamp);
73+
putBuilder.bigIntValue(Attribute.COMMITTED_AT, timestamp);
7174

7275
if (!isInsertModeEnabled(base) && result != null) { // overwrite existing record
7376
int version = result.getVersion();

0 commit comments

Comments
 (0)