Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.exception.transaction.CrudConflictException;
import com.scalar.db.exception.transaction.CrudException;
import com.scalar.db.io.Column;
import com.scalar.db.util.ScalarDbUtils;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
Expand Down Expand Up @@ -143,6 +144,7 @@ Optional<TransactionResult> read(

for (int i = 0; ; i++) {
@Nullable Snapshot.Key key = originalKey;
boolean indexKeyFilteredOut = false;

Optional<TransactionResult> result = getFromStorage(get, metadata, context.transactionId);
if (result.isPresent() && !result.get().isCommitted()) {
Expand All @@ -156,12 +158,23 @@ Optional<TransactionResult> read(

RecoveryExecutor.Result recoveryResult = executeRecovery(key, get, result.get(), context);
context.recoveryResults.add(recoveryResult);
result = recoveryResult.recoveredResult;

// After recovery (e.g., rollback), the index column value may have changed back to its
// original value, which might not match the queried index key. Filter out such results.
if (recoveryResult.rolledBack && ScalarDbUtils.isSecondaryIndexSpecified(get, metadata)) {
Optional<TransactionResult> unfiltered = recoveryResult.recoveredResult;
result = unfiltered.filter(r -> resultMatchesIndexKey(get, r));
if (unfiltered.isPresent() && !result.isPresent()) {
indexKeyFilteredOut = true;
}
} else {
result = recoveryResult.recoveredResult;
}
Comment thread
brfrn169 marked this conversation as resolved.
Comment thread
brfrn169 marked this conversation as resolved.
}

// Because we also get records whose before images match the conjunctions, we need to check if
// the current status of the records actually match the conjunctions.
if (!get.getConjunctions().isEmpty()) {
// Because we also get records whose before images match the conjunctions, we need to check
// if the current status of the records actually match the conjunctions.
result =
result.filter(
r ->
Expand All @@ -182,12 +195,24 @@ Optional<TransactionResult> read(
continue;
}

// Put the result in the snapshot
if (result.isPresent() || get.getConjunctions().isEmpty()) {
// We put the result into the read set only if a get operation has no conjunction or the
// result exists. This is because we don’t know whether the record actually exists or not
// due to the conjunction.

// Put the result in the snapshot.
//
// When the result is present, we always cache it. When the result is absent, we cache
// Optional.empty() only when we are certain that no record exists for this key. That
// requires both of the following conditions to hold:
//
// (a) get.getConjunctions().isEmpty(): the Get has no conjunctions (additional
// predicates applied on top of the key/index lookup). If conjunctions exist, a
// record may actually exist for this key but have been filtered out by the
// conjunctions above. In that case we cannot conclude that the record is absent, so
// we must not cache Optional.empty().
//
// (b) !indexKeyFilteredOut: the result was not discarded by the post-rollback
// index-key check above. If it was, a record does exist for this key, but its
// index column value was reverted by rollback to a value that no longer matches the
// queried index key. Caching Optional.empty() in that case would be incorrect
// because the record still exists with a different index value.
if (result.isPresent() || (get.getConjunctions().isEmpty() && !indexKeyFilteredOut)) {
if (key != null) {
putIntoReadSetInSnapshot(key, result, context);
} else {
Expand Down Expand Up @@ -263,7 +288,7 @@ private LinkedHashMap<Snapshot.Key, TransactionResult> scanInternal(
TransactionResult result = new TransactionResult(r);
Snapshot.Key key = new Snapshot.Key(scan, r, metadata);
Optional<TransactionResult> processedScanResult =
processScanResult(key, scan, result, context);
processScanResult(key, scan, result, context, metadata);
processedScanResult.ifPresent(res -> results.put(key, res));

if (scan.getLimit() > 0 && results.size() >= scan.getLimit()) {
Expand Down Expand Up @@ -307,28 +332,38 @@ && checkAndRecoverBeforeIndexRecords(scan, context, txMetadata)) {
}

private Optional<TransactionResult> processScanResult(
Snapshot.Key key, Scan scan, TransactionResult result, TransactionContext context)
Snapshot.Key key,
Scan scan,
TransactionResult result,
TransactionContext context,
TableMetadata metadata)
throws CrudException {
Optional<TransactionResult> ret;
if (!result.isCommitted()) {
// Lazy recovery
RecoveryExecutor.Result recoveryResult = executeRecovery(key, scan, result, context);
context.recoveryResults.add(recoveryResult);
ret = recoveryResult.recoveredResult;

// After recovery (e.g., rollback), the index column value may have changed back to its
// original value, which might not match the queried index key. Filter out such results.
if (recoveryResult.rolledBack && ScalarDbUtils.isSecondaryIndexSpecified(scan, metadata)) {
ret = recoveryResult.recoveredResult.filter(r -> resultMatchesIndexKey(scan, r));
} else {
ret = recoveryResult.recoveredResult;
}
} else {
ret = Optional.of(result);
}

// Because we also get records whose before images match the conjunctions, we need to check if
// the current status of the records actually match the conjunctions.
if (!scan.getConjunctions().isEmpty()) {
// Because we also get records whose before images match the conjunctions, we need to check if
// the current status of the records actually match the conjunctions.
ret =
ret.filter(
r ->
ScalarDbUtils.columnsMatchAnyOfConjunctions(
r.getColumns(), scan.getConjunctions()));
}

if (ret.isPresent()) {
putIntoReadSetInSnapshot(key, ret, context);
}
Expand Down Expand Up @@ -656,6 +691,22 @@ boolean requiresBeforeIndexCheck(Selection selection, TransactionTableMetadata m
return false;
}

/**
* Checks if the result's index column value matches the queried index key value. This is needed
* because after lazy recovery (e.g., rollback), the index column value may revert to its original
* value, which might not match the queried index key.
*
* @param selection the index-based selection operation
* @param result the result to check
* @return true if the result's index column matches the queried index key
*/
private boolean resultMatchesIndexKey(Selection selection, TransactionResult result) {
assert selection.getPartitionKey().getColumns().size() == 1;
Column<?> indexColumn = selection.getPartitionKey().getColumns().get(0);
Comment thread
brfrn169 marked this conversation as resolved.
Column<?> resultColumn = result.getColumns().get(indexColumn.getName());
return resultColumn != null && resultColumn.equals(indexColumn);
}

/**
* Checks if there are PREPARED/DELETED records that match the before-image index conditions of
* the given selection, and if so, executes recovery on them.
Expand Down Expand Up @@ -801,7 +852,7 @@ public Optional<Result> one() throws CrudException {
TransactionResult result = new TransactionResult(r.get());

Optional<TransactionResult> processedScanResult =
processScanResult(key, scan, result, context);
processScanResult(key, scan, result, context, metadata);
if (!processedScanResult.isPresent()) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3603,6 +3603,180 @@ public void getScanner_CloseWithBeforeIndexScanFindsRolledForwardRecord_ShouldCl
verify(recoveryFuture, never()).get();
}

@Test
void read_GetWithIndexAndRolledBackRecordWithNonMatchingIndexKey_ShouldFilterOutResult()
throws Exception {
// Arrange
// Query: col_idx = ANY_TEXT_3, but after rollback the record has col_idx = ANY_TEXT_4
Get getWithIndex = prepareGetWithIndex(); // indexKey = ANY_NAME_3:ANY_TEXT_3
Get getForStorage = toGetForStorageFrom(getWithIndex);
TransactionContext context =
new TransactionContext(ANY_ID_1, snapshot, Isolation.SNAPSHOT, false, false);

// Storage returns an uncommitted (PREPARED) record
TransactionResult preparedResult = prepareResult(TransactionState.PREPARED);
when(storage.get(getForStorage)).thenReturn(Optional.of(preparedResult));

// After rollback, the recovered result has a different index column value
TransactionResult recoveredResult = prepareResultWithIndexColumnValue(ANY_TEXT_4);
Snapshot.Key key = new Snapshot.Key(getWithIndex, preparedResult, TABLE_METADATA);
@SuppressWarnings("unchecked")
Future<Void> recoveryFuture = mock(Future.class);
when(recoveryExecutor.execute(
eq(key),
eq(getWithIndex),
eq(preparedResult),
eq(ANY_ID_1),
eq(RecoveryExecutor.RecoveryType.RETURN_LATEST_RESULT_AND_RECOVER)))
.thenReturn(
new RecoveryExecutor.Result(key, Optional.of(recoveredResult), recoveryFuture, true));

// Act
Optional<TransactionResult> result =
handler.read(null, getWithIndex, context, TRANSACTION_TABLE_METADATA);

// Assert
assertThat(result).isEmpty();
// The read set should NOT contain this key because the record still exists with a different
// index value. Caching Optional.empty() would incorrectly mark it as absent.
verify(snapshot, never()).putIntoReadSet(any(), any());
verify(snapshot).putIntoGetSet(getWithIndex, Optional.empty());
}

@Test
void read_GetWithIndexAndRolledBackRecordWithMatchingIndexKey_ShouldReturnResult()
throws Exception {
// Arrange
Get getWithIndex = prepareGetWithIndex(); // indexKey = ANY_NAME_3:ANY_TEXT_3
Get getForStorage = toGetForStorageFrom(getWithIndex);
TransactionContext context =
new TransactionContext(ANY_ID_1, snapshot, Isolation.SNAPSHOT, false, false);

// Storage returns an uncommitted (PREPARED) record
TransactionResult preparedResult = prepareResult(TransactionState.PREPARED);
when(storage.get(getForStorage)).thenReturn(Optional.of(preparedResult));

// After rollback, the recovered result still has the matching index column value
TransactionResult recoveredResult = prepareResultWithIndexColumnValue(ANY_TEXT_3);
Snapshot.Key key = new Snapshot.Key(getWithIndex, preparedResult, TABLE_METADATA);
@SuppressWarnings("unchecked")
Future<Void> recoveryFuture = mock(Future.class);
when(recoveryExecutor.execute(
eq(key),
eq(getWithIndex),
eq(preparedResult),
eq(ANY_ID_1),
eq(RecoveryExecutor.RecoveryType.RETURN_LATEST_RESULT_AND_RECOVER)))
.thenReturn(
new RecoveryExecutor.Result(key, Optional.of(recoveredResult), recoveryFuture, true));

// Act
Optional<TransactionResult> result =
handler.read(null, getWithIndex, context, TRANSACTION_TABLE_METADATA);

// Assert
assertThat(result).isPresent();
assertThat(result.get()).isEqualTo(recoveredResult);
}

@ParameterizedTest
@EnumSource(ScanType.class)
void scan_ScanWithIndexAndRolledBackRecordWithNonMatchingIndexKey_ShouldFilterOutResult(
ScanType scanType) throws Exception {
// Arrange
Scan scanWithIndex = prepareScanWithIndex(); // indexKey = ANY_NAME_3:ANY_TEXT_3
Scan scanForStorage = toScanForStorageFrom(scanWithIndex);
TransactionContext context =
new TransactionContext(ANY_ID_1, snapshot, Isolation.SNAPSHOT, false, false);

// Storage scan returns an uncommitted (PREPARED) record
TransactionResult preparedResult = prepareResult(TransactionState.PREPARED);
Scanner storageScanner = mock(Scanner.class);
when(storageScanner.iterator())
.thenReturn(Collections.<Result>singletonList(preparedResult).iterator());
when(storageScanner.one()).thenReturn(Optional.of(preparedResult)).thenReturn(Optional.empty());
when(storage.scan(scanForStorage)).thenReturn(storageScanner);

// After rollback, the recovered result has a different index column value
TransactionResult recoveredResult = prepareResultWithIndexColumnValue(ANY_TEXT_4);
Snapshot.Key key = new Snapshot.Key(scanWithIndex, preparedResult, TABLE_METADATA);
@SuppressWarnings("unchecked")
Future<Void> recoveryFuture = mock(Future.class);
when(recoveryExecutor.execute(
eq(key),
eq(scanWithIndex),
eq(preparedResult),
eq(ANY_ID_1),
eq(RecoveryExecutor.RecoveryType.RETURN_LATEST_RESULT_AND_RECOVER)))
.thenReturn(
new RecoveryExecutor.Result(key, Optional.of(recoveredResult), recoveryFuture, true));

// Act
List<Result> results = scanOrGetScanner(scanWithIndex, scanType, context);

// Assert
assertThat(results).isEmpty();
verify(snapshot, never()).putIntoReadSet(any(), any());
}

@ParameterizedTest
@EnumSource(ScanType.class)
void scan_ScanWithIndexAndRolledBackRecordWithMatchingIndexKey_ShouldReturnResult(
ScanType scanType) throws Exception {
// Arrange
Scan scanWithIndex = prepareScanWithIndex(); // indexKey = ANY_NAME_3:ANY_TEXT_3
Scan scanForStorage = toScanForStorageFrom(scanWithIndex);
TransactionContext context =
new TransactionContext(ANY_ID_1, snapshot, Isolation.SNAPSHOT, false, false);

// Storage scan returns an uncommitted (PREPARED) record
TransactionResult preparedResult = prepareResult(TransactionState.PREPARED);
Scanner storageScanner = mock(Scanner.class);
when(storageScanner.iterator())
.thenReturn(Collections.<Result>singletonList(preparedResult).iterator());
when(storageScanner.one()).thenReturn(Optional.of(preparedResult)).thenReturn(Optional.empty());
when(storage.scan(scanForStorage)).thenReturn(storageScanner);

// After rollback, the recovered result still has the matching index column value
TransactionResult recoveredResult = prepareResultWithIndexColumnValue(ANY_TEXT_3);
Snapshot.Key key = new Snapshot.Key(scanWithIndex, preparedResult, TABLE_METADATA);
@SuppressWarnings("unchecked")
Future<Void> recoveryFuture = mock(Future.class);
when(recoveryExecutor.execute(
eq(key),
eq(scanWithIndex),
eq(preparedResult),
eq(ANY_ID_1),
eq(RecoveryExecutor.RecoveryType.RETURN_LATEST_RESULT_AND_RECOVER)))
.thenReturn(
new RecoveryExecutor.Result(key, Optional.of(recoveredResult), recoveryFuture, true));

// Act
List<Result> results = scanOrGetScanner(scanWithIndex, scanType, context);

// Assert
assertThat(results).hasSize(1);
}

private TransactionResult prepareResultWithIndexColumnValue(String indexColumnValue) {
ImmutableMap<String, Column<?>> columns =
ImmutableMap.<String, Column<?>>builder()
.put(ANY_NAME_1, TextColumn.of(ANY_NAME_1, ANY_TEXT_1))
.put(ANY_NAME_2, TextColumn.of(ANY_NAME_2, ANY_TEXT_2))
.put(ANY_NAME_3, TextColumn.of(ANY_NAME_3, indexColumnValue))
.put(ANY_NAME_4, IntColumn.of(ANY_NAME_4, ANY_INT_1))
.put(Attribute.ID, TextColumn.of(Attribute.ID, ANY_ID_2))
.put(Attribute.STATE, IntColumn.of(Attribute.STATE, TransactionState.COMMITTED.get()))
.put(Attribute.VERSION, IntColumn.of(Attribute.VERSION, 2))
.put(Attribute.BEFORE_ID, TextColumn.of(Attribute.BEFORE_ID, ANY_ID_1))
.put(
Attribute.BEFORE_STATE,
IntColumn.of(Attribute.BEFORE_STATE, TransactionState.COMMITTED.get()))
.put(Attribute.BEFORE_VERSION, IntColumn.of(Attribute.BEFORE_VERSION, 1))
.build();
return new TransactionResult(new ResultImpl(columns, TABLE_METADATA));
}

private List<Result> scanOrGetScanner(Scan scan, ScanType scanType, TransactionContext context)
throws CrudException {
if (scanType == ScanType.SCAN) {
Expand Down
Loading