Skip to content

Commit ce6af67

Browse files
committed
Fix index-based Get/Scan to filter out results whose index key no longer matches after lazy recovery rollback (#3488)
1 parent b3fb4a4 commit ce6af67

3 files changed

Lines changed: 405 additions & 16 deletions

File tree

core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java

Lines changed: 67 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.scalar.db.exception.storage.ExecutionException;
2525
import com.scalar.db.exception.transaction.CrudConflictException;
2626
import com.scalar.db.exception.transaction.CrudException;
27+
import com.scalar.db.io.Column;
2728
import com.scalar.db.util.ScalarDbUtils;
2829
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
2930
import java.io.IOException;
@@ -143,6 +144,7 @@ Optional<TransactionResult> read(
143144

144145
for (int i = 0; ; i++) {
145146
@Nullable Snapshot.Key key = originalKey;
147+
boolean indexKeyFilteredOut = false;
146148

147149
Optional<TransactionResult> result = getFromStorage(get, metadata, context.transactionId);
148150
if (result.isPresent() && !result.get().isCommitted()) {
@@ -156,12 +158,23 @@ Optional<TransactionResult> read(
156158

157159
RecoveryExecutor.Result recoveryResult = executeRecovery(key, get, result.get(), context);
158160
context.recoveryResults.add(recoveryResult);
159-
result = recoveryResult.recoveredResult;
161+
162+
// After recovery (e.g., rollback), the index column value may have changed back to its
163+
// original value, which might not match the queried index key. Filter out such results.
164+
if (recoveryResult.rolledBack && ScalarDbUtils.isSecondaryIndexSpecified(get, metadata)) {
165+
Optional<TransactionResult> unfiltered = recoveryResult.recoveredResult;
166+
result = unfiltered.filter(r -> resultMatchesIndexKey(get, r));
167+
if (unfiltered.isPresent() && !result.isPresent()) {
168+
indexKeyFilteredOut = true;
169+
}
170+
} else {
171+
result = recoveryResult.recoveredResult;
172+
}
160173
}
161174

175+
// Because we also get records whose before images match the conjunctions, we need to check if
176+
// the current status of the records actually match the conjunctions.
162177
if (!get.getConjunctions().isEmpty()) {
163-
// Because we also get records whose before images match the conjunctions, we need to check
164-
// if the current status of the records actually match the conjunctions.
165178
result =
166179
result.filter(
167180
r ->
@@ -182,12 +195,24 @@ Optional<TransactionResult> read(
182195
continue;
183196
}
184197

185-
// Put the result in the snapshot
186-
if (result.isPresent() || get.getConjunctions().isEmpty()) {
187-
// We put the result into the read set only if a get operation has no conjunction or the
188-
// result exists. This is because we don’t know whether the record actually exists or not
189-
// due to the conjunction.
190-
198+
// Put the result in the snapshot.
199+
//
200+
// When the result is present, we always cache it. When the result is absent, we cache
201+
// Optional.empty() only when we are certain that no record exists for this key. That
202+
// requires both of the following conditions to hold:
203+
//
204+
// (a) get.getConjunctions().isEmpty(): the Get has no conjunctions (additional
205+
// predicates applied on top of the key/index lookup). If conjunctions exist, a
206+
// record may actually exist for this key but have been filtered out by the
207+
// conjunctions above. In that case we cannot conclude that the record is absent, so
208+
// we must not cache Optional.empty().
209+
//
210+
// (b) !indexKeyFilteredOut: the result was not discarded by the post-rollback
211+
// index-key check above. If it was, a record does exist for this key, but its
212+
// index column value was reverted by rollback to a value that no longer matches the
213+
// queried index key. Caching Optional.empty() in that case would be incorrect
214+
// because the record still exists with a different index value.
215+
if (result.isPresent() || (get.getConjunctions().isEmpty() && !indexKeyFilteredOut)) {
191216
if (key != null) {
192217
putIntoReadSetInSnapshot(key, result, context);
193218
} else {
@@ -263,7 +288,7 @@ private LinkedHashMap<Snapshot.Key, TransactionResult> scanInternal(
263288
TransactionResult result = new TransactionResult(r);
264289
Snapshot.Key key = new Snapshot.Key(scan, r, metadata);
265290
Optional<TransactionResult> processedScanResult =
266-
processScanResult(key, scan, result, context);
291+
processScanResult(key, scan, result, context, metadata);
267292
processedScanResult.ifPresent(res -> results.put(key, res));
268293

269294
if (scan.getLimit() > 0 && results.size() >= scan.getLimit()) {
@@ -307,28 +332,38 @@ && checkAndRecoverBeforeIndexRecords(scan, context, txMetadata)) {
307332
}
308333

309334
private Optional<TransactionResult> processScanResult(
310-
Snapshot.Key key, Scan scan, TransactionResult result, TransactionContext context)
335+
Snapshot.Key key,
336+
Scan scan,
337+
TransactionResult result,
338+
TransactionContext context,
339+
TableMetadata metadata)
311340
throws CrudException {
312341
Optional<TransactionResult> ret;
313342
if (!result.isCommitted()) {
314343
// Lazy recovery
315344
RecoveryExecutor.Result recoveryResult = executeRecovery(key, scan, result, context);
316345
context.recoveryResults.add(recoveryResult);
317-
ret = recoveryResult.recoveredResult;
346+
347+
// After recovery (e.g., rollback), the index column value may have changed back to its
348+
// original value, which might not match the queried index key. Filter out such results.
349+
if (recoveryResult.rolledBack && ScalarDbUtils.isSecondaryIndexSpecified(scan, metadata)) {
350+
ret = recoveryResult.recoveredResult.filter(r -> resultMatchesIndexKey(scan, r));
351+
} else {
352+
ret = recoveryResult.recoveredResult;
353+
}
318354
} else {
319355
ret = Optional.of(result);
320356
}
321357

358+
// Because we also get records whose before images match the conjunctions, we need to check if
359+
// the current status of the records actually match the conjunctions.
322360
if (!scan.getConjunctions().isEmpty()) {
323-
// Because we also get records whose before images match the conjunctions, we need to check if
324-
// the current status of the records actually match the conjunctions.
325361
ret =
326362
ret.filter(
327363
r ->
328364
ScalarDbUtils.columnsMatchAnyOfConjunctions(
329365
r.getColumns(), scan.getConjunctions()));
330366
}
331-
332367
if (ret.isPresent()) {
333368
putIntoReadSetInSnapshot(key, ret, context);
334369
}
@@ -656,6 +691,22 @@ boolean requiresBeforeIndexCheck(Selection selection, TransactionTableMetadata m
656691
return false;
657692
}
658693

694+
/**
695+
* Checks if the result's index column value matches the queried index key value. This is needed
696+
* because after lazy recovery (e.g., rollback), the index column value may revert to its original
697+
* value, which might not match the queried index key.
698+
*
699+
* @param selection the index-based selection operation
700+
* @param result the result to check
701+
* @return true if the result's index column matches the queried index key
702+
*/
703+
private boolean resultMatchesIndexKey(Selection selection, TransactionResult result) {
704+
assert selection.getPartitionKey().getColumns().size() == 1;
705+
Column<?> indexColumn = selection.getPartitionKey().getColumns().get(0);
706+
Column<?> resultColumn = result.getColumns().get(indexColumn.getName());
707+
return resultColumn != null && resultColumn.equals(indexColumn);
708+
}
709+
659710
/**
660711
* Checks if there are PREPARED/DELETED records that match the before-image index conditions of
661712
* the given selection, and if so, executes recovery on them.
@@ -801,7 +852,7 @@ public Optional<Result> one() throws CrudException {
801852
TransactionResult result = new TransactionResult(r.get());
802853

803854
Optional<TransactionResult> processedScanResult =
804-
processScanResult(key, scan, result, context);
855+
processScanResult(key, scan, result, context, metadata);
805856
if (!processedScanResult.isPresent()) {
806857
continue;
807858
}

core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3603,6 +3603,180 @@ public void getScanner_CloseWithBeforeIndexScanFindsRolledForwardRecord_ShouldCl
36033603
verify(recoveryFuture, never()).get();
36043604
}
36053605

3606+
@Test
3607+
void read_GetWithIndexAndRolledBackRecordWithNonMatchingIndexKey_ShouldFilterOutResult()
3608+
throws Exception {
3609+
// Arrange
3610+
// Query: col_idx = ANY_TEXT_3, but after rollback the record has col_idx = ANY_TEXT_4
3611+
Get getWithIndex = prepareGetWithIndex(); // indexKey = ANY_NAME_3:ANY_TEXT_3
3612+
Get getForStorage = toGetForStorageFrom(getWithIndex);
3613+
TransactionContext context =
3614+
new TransactionContext(ANY_ID_1, snapshot, Isolation.SNAPSHOT, false, false);
3615+
3616+
// Storage returns an uncommitted (PREPARED) record
3617+
TransactionResult preparedResult = prepareResult(TransactionState.PREPARED);
3618+
when(storage.get(getForStorage)).thenReturn(Optional.of(preparedResult));
3619+
3620+
// After rollback, the recovered result has a different index column value
3621+
TransactionResult recoveredResult = prepareResultWithIndexColumnValue(ANY_TEXT_4);
3622+
Snapshot.Key key = new Snapshot.Key(getWithIndex, preparedResult, TABLE_METADATA);
3623+
@SuppressWarnings("unchecked")
3624+
Future<Void> recoveryFuture = mock(Future.class);
3625+
when(recoveryExecutor.execute(
3626+
eq(key),
3627+
eq(getWithIndex),
3628+
eq(preparedResult),
3629+
eq(ANY_ID_1),
3630+
eq(RecoveryExecutor.RecoveryType.RETURN_LATEST_RESULT_AND_RECOVER)))
3631+
.thenReturn(
3632+
new RecoveryExecutor.Result(key, Optional.of(recoveredResult), recoveryFuture, true));
3633+
3634+
// Act
3635+
Optional<TransactionResult> result =
3636+
handler.read(null, getWithIndex, context, TRANSACTION_TABLE_METADATA);
3637+
3638+
// Assert
3639+
assertThat(result).isEmpty();
3640+
// The read set should NOT contain this key because the record still exists with a different
3641+
// index value. Caching Optional.empty() would incorrectly mark it as absent.
3642+
verify(snapshot, never()).putIntoReadSet(any(), any());
3643+
verify(snapshot).putIntoGetSet(getWithIndex, Optional.empty());
3644+
}
3645+
3646+
@Test
3647+
void read_GetWithIndexAndRolledBackRecordWithMatchingIndexKey_ShouldReturnResult()
3648+
throws Exception {
3649+
// Arrange
3650+
Get getWithIndex = prepareGetWithIndex(); // indexKey = ANY_NAME_3:ANY_TEXT_3
3651+
Get getForStorage = toGetForStorageFrom(getWithIndex);
3652+
TransactionContext context =
3653+
new TransactionContext(ANY_ID_1, snapshot, Isolation.SNAPSHOT, false, false);
3654+
3655+
// Storage returns an uncommitted (PREPARED) record
3656+
TransactionResult preparedResult = prepareResult(TransactionState.PREPARED);
3657+
when(storage.get(getForStorage)).thenReturn(Optional.of(preparedResult));
3658+
3659+
// After rollback, the recovered result still has the matching index column value
3660+
TransactionResult recoveredResult = prepareResultWithIndexColumnValue(ANY_TEXT_3);
3661+
Snapshot.Key key = new Snapshot.Key(getWithIndex, preparedResult, TABLE_METADATA);
3662+
@SuppressWarnings("unchecked")
3663+
Future<Void> recoveryFuture = mock(Future.class);
3664+
when(recoveryExecutor.execute(
3665+
eq(key),
3666+
eq(getWithIndex),
3667+
eq(preparedResult),
3668+
eq(ANY_ID_1),
3669+
eq(RecoveryExecutor.RecoveryType.RETURN_LATEST_RESULT_AND_RECOVER)))
3670+
.thenReturn(
3671+
new RecoveryExecutor.Result(key, Optional.of(recoveredResult), recoveryFuture, true));
3672+
3673+
// Act
3674+
Optional<TransactionResult> result =
3675+
handler.read(null, getWithIndex, context, TRANSACTION_TABLE_METADATA);
3676+
3677+
// Assert
3678+
assertThat(result).isPresent();
3679+
assertThat(result.get()).isEqualTo(recoveredResult);
3680+
}
3681+
3682+
@ParameterizedTest
3683+
@EnumSource(ScanType.class)
3684+
void scan_ScanWithIndexAndRolledBackRecordWithNonMatchingIndexKey_ShouldFilterOutResult(
3685+
ScanType scanType) throws Exception {
3686+
// Arrange
3687+
Scan scanWithIndex = prepareScanWithIndex(); // indexKey = ANY_NAME_3:ANY_TEXT_3
3688+
Scan scanForStorage = toScanForStorageFrom(scanWithIndex);
3689+
TransactionContext context =
3690+
new TransactionContext(ANY_ID_1, snapshot, Isolation.SNAPSHOT, false, false);
3691+
3692+
// Storage scan returns an uncommitted (PREPARED) record
3693+
TransactionResult preparedResult = prepareResult(TransactionState.PREPARED);
3694+
Scanner storageScanner = mock(Scanner.class);
3695+
when(storageScanner.iterator())
3696+
.thenReturn(Collections.<Result>singletonList(preparedResult).iterator());
3697+
when(storageScanner.one()).thenReturn(Optional.of(preparedResult)).thenReturn(Optional.empty());
3698+
when(storage.scan(scanForStorage)).thenReturn(storageScanner);
3699+
3700+
// After rollback, the recovered result has a different index column value
3701+
TransactionResult recoveredResult = prepareResultWithIndexColumnValue(ANY_TEXT_4);
3702+
Snapshot.Key key = new Snapshot.Key(scanWithIndex, preparedResult, TABLE_METADATA);
3703+
@SuppressWarnings("unchecked")
3704+
Future<Void> recoveryFuture = mock(Future.class);
3705+
when(recoveryExecutor.execute(
3706+
eq(key),
3707+
eq(scanWithIndex),
3708+
eq(preparedResult),
3709+
eq(ANY_ID_1),
3710+
eq(RecoveryExecutor.RecoveryType.RETURN_LATEST_RESULT_AND_RECOVER)))
3711+
.thenReturn(
3712+
new RecoveryExecutor.Result(key, Optional.of(recoveredResult), recoveryFuture, true));
3713+
3714+
// Act
3715+
List<Result> results = scanOrGetScanner(scanWithIndex, scanType, context);
3716+
3717+
// Assert
3718+
assertThat(results).isEmpty();
3719+
verify(snapshot, never()).putIntoReadSet(any(), any());
3720+
}
3721+
3722+
@ParameterizedTest
3723+
@EnumSource(ScanType.class)
3724+
void scan_ScanWithIndexAndRolledBackRecordWithMatchingIndexKey_ShouldReturnResult(
3725+
ScanType scanType) throws Exception {
3726+
// Arrange
3727+
Scan scanWithIndex = prepareScanWithIndex(); // indexKey = ANY_NAME_3:ANY_TEXT_3
3728+
Scan scanForStorage = toScanForStorageFrom(scanWithIndex);
3729+
TransactionContext context =
3730+
new TransactionContext(ANY_ID_1, snapshot, Isolation.SNAPSHOT, false, false);
3731+
3732+
// Storage scan returns an uncommitted (PREPARED) record
3733+
TransactionResult preparedResult = prepareResult(TransactionState.PREPARED);
3734+
Scanner storageScanner = mock(Scanner.class);
3735+
when(storageScanner.iterator())
3736+
.thenReturn(Collections.<Result>singletonList(preparedResult).iterator());
3737+
when(storageScanner.one()).thenReturn(Optional.of(preparedResult)).thenReturn(Optional.empty());
3738+
when(storage.scan(scanForStorage)).thenReturn(storageScanner);
3739+
3740+
// After rollback, the recovered result still has the matching index column value
3741+
TransactionResult recoveredResult = prepareResultWithIndexColumnValue(ANY_TEXT_3);
3742+
Snapshot.Key key = new Snapshot.Key(scanWithIndex, preparedResult, TABLE_METADATA);
3743+
@SuppressWarnings("unchecked")
3744+
Future<Void> recoveryFuture = mock(Future.class);
3745+
when(recoveryExecutor.execute(
3746+
eq(key),
3747+
eq(scanWithIndex),
3748+
eq(preparedResult),
3749+
eq(ANY_ID_1),
3750+
eq(RecoveryExecutor.RecoveryType.RETURN_LATEST_RESULT_AND_RECOVER)))
3751+
.thenReturn(
3752+
new RecoveryExecutor.Result(key, Optional.of(recoveredResult), recoveryFuture, true));
3753+
3754+
// Act
3755+
List<Result> results = scanOrGetScanner(scanWithIndex, scanType, context);
3756+
3757+
// Assert
3758+
assertThat(results).hasSize(1);
3759+
}
3760+
3761+
private TransactionResult prepareResultWithIndexColumnValue(String indexColumnValue) {
3762+
ImmutableMap<String, Column<?>> columns =
3763+
ImmutableMap.<String, Column<?>>builder()
3764+
.put(ANY_NAME_1, TextColumn.of(ANY_NAME_1, ANY_TEXT_1))
3765+
.put(ANY_NAME_2, TextColumn.of(ANY_NAME_2, ANY_TEXT_2))
3766+
.put(ANY_NAME_3, TextColumn.of(ANY_NAME_3, indexColumnValue))
3767+
.put(ANY_NAME_4, IntColumn.of(ANY_NAME_4, ANY_INT_1))
3768+
.put(Attribute.ID, TextColumn.of(Attribute.ID, ANY_ID_2))
3769+
.put(Attribute.STATE, IntColumn.of(Attribute.STATE, TransactionState.COMMITTED.get()))
3770+
.put(Attribute.VERSION, IntColumn.of(Attribute.VERSION, 2))
3771+
.put(Attribute.BEFORE_ID, TextColumn.of(Attribute.BEFORE_ID, ANY_ID_1))
3772+
.put(
3773+
Attribute.BEFORE_STATE,
3774+
IntColumn.of(Attribute.BEFORE_STATE, TransactionState.COMMITTED.get()))
3775+
.put(Attribute.BEFORE_VERSION, IntColumn.of(Attribute.BEFORE_VERSION, 1))
3776+
.build();
3777+
return new TransactionResult(new ResultImpl(columns, TABLE_METADATA));
3778+
}
3779+
36063780
private List<Result> scanOrGetScanner(Scan scan, ScanType scanType, TransactionContext context)
36073781
throws CrudException {
36083782
if (scanType == ScanType.SCAN) {

0 commit comments

Comments
 (0)