@@ -3603,6 +3603,180 @@ public void getScanner_CloseWithBeforeIndexScanFindsRolledForwardRecord_ShouldCl
36033603 verify (recoveryFuture , never ()).get ();
36043604 }
36053605
3606+ @ Test
3607+ void read_GetWithIndexAndRolledBackRecordWithNonMatchingIndexKey_ShouldFilterOutResult ()
3608+ throws Exception {
3609+ // Arrange
3610+ // Query: col_idx = ANY_TEXT_3, but after rollback the record has col_idx = ANY_TEXT_4
3611+ Get getWithIndex = prepareGetWithIndex (); // indexKey = ANY_NAME_3:ANY_TEXT_3
3612+ Get getForStorage = toGetForStorageFrom (getWithIndex );
3613+ TransactionContext context =
3614+ new TransactionContext (ANY_ID_1 , snapshot , Isolation .SNAPSHOT , false , false );
3615+
3616+ // Storage returns an uncommitted (PREPARED) record
3617+ TransactionResult preparedResult = prepareResult (TransactionState .PREPARED );
3618+ when (storage .get (getForStorage )).thenReturn (Optional .of (preparedResult ));
3619+
3620+ // After rollback, the recovered result has a different index column value
3621+ TransactionResult recoveredResult = prepareResultWithIndexColumnValue (ANY_TEXT_4 );
3622+ Snapshot .Key key = new Snapshot .Key (getWithIndex , preparedResult , TABLE_METADATA );
3623+ @ SuppressWarnings ("unchecked" )
3624+ Future <Void > recoveryFuture = mock (Future .class );
3625+ when (recoveryExecutor .execute (
3626+ eq (key ),
3627+ eq (getWithIndex ),
3628+ eq (preparedResult ),
3629+ eq (ANY_ID_1 ),
3630+ eq (RecoveryExecutor .RecoveryType .RETURN_LATEST_RESULT_AND_RECOVER )))
3631+ .thenReturn (
3632+ new RecoveryExecutor .Result (key , Optional .of (recoveredResult ), recoveryFuture , true ));
3633+
3634+ // Act
3635+ Optional <TransactionResult > result =
3636+ handler .read (null , getWithIndex , context , TRANSACTION_TABLE_METADATA );
3637+
3638+ // Assert
3639+ assertThat (result ).isEmpty ();
3640+ // The read set should NOT contain this key because the record still exists with a different
3641+ // index value. Caching Optional.empty() would incorrectly mark it as absent.
3642+ verify (snapshot , never ()).putIntoReadSet (any (), any ());
3643+ verify (snapshot ).putIntoGetSet (getWithIndex , Optional .empty ());
3644+ }
3645+
3646+ @ Test
3647+ void read_GetWithIndexAndRolledBackRecordWithMatchingIndexKey_ShouldReturnResult ()
3648+ throws Exception {
3649+ // Arrange
3650+ Get getWithIndex = prepareGetWithIndex (); // indexKey = ANY_NAME_3:ANY_TEXT_3
3651+ Get getForStorage = toGetForStorageFrom (getWithIndex );
3652+ TransactionContext context =
3653+ new TransactionContext (ANY_ID_1 , snapshot , Isolation .SNAPSHOT , false , false );
3654+
3655+ // Storage returns an uncommitted (PREPARED) record
3656+ TransactionResult preparedResult = prepareResult (TransactionState .PREPARED );
3657+ when (storage .get (getForStorage )).thenReturn (Optional .of (preparedResult ));
3658+
3659+ // After rollback, the recovered result still has the matching index column value
3660+ TransactionResult recoveredResult = prepareResultWithIndexColumnValue (ANY_TEXT_3 );
3661+ Snapshot .Key key = new Snapshot .Key (getWithIndex , preparedResult , TABLE_METADATA );
3662+ @ SuppressWarnings ("unchecked" )
3663+ Future <Void > recoveryFuture = mock (Future .class );
3664+ when (recoveryExecutor .execute (
3665+ eq (key ),
3666+ eq (getWithIndex ),
3667+ eq (preparedResult ),
3668+ eq (ANY_ID_1 ),
3669+ eq (RecoveryExecutor .RecoveryType .RETURN_LATEST_RESULT_AND_RECOVER )))
3670+ .thenReturn (
3671+ new RecoveryExecutor .Result (key , Optional .of (recoveredResult ), recoveryFuture , true ));
3672+
3673+ // Act
3674+ Optional <TransactionResult > result =
3675+ handler .read (null , getWithIndex , context , TRANSACTION_TABLE_METADATA );
3676+
3677+ // Assert
3678+ assertThat (result ).isPresent ();
3679+ assertThat (result .get ()).isEqualTo (recoveredResult );
3680+ }
3681+
3682+ @ ParameterizedTest
3683+ @ EnumSource (ScanType .class )
3684+ void scan_ScanWithIndexAndRolledBackRecordWithNonMatchingIndexKey_ShouldFilterOutResult (
3685+ ScanType scanType ) throws Exception {
3686+ // Arrange
3687+ Scan scanWithIndex = prepareScanWithIndex (); // indexKey = ANY_NAME_3:ANY_TEXT_3
3688+ Scan scanForStorage = toScanForStorageFrom (scanWithIndex );
3689+ TransactionContext context =
3690+ new TransactionContext (ANY_ID_1 , snapshot , Isolation .SNAPSHOT , false , false );
3691+
3692+ // Storage scan returns an uncommitted (PREPARED) record
3693+ TransactionResult preparedResult = prepareResult (TransactionState .PREPARED );
3694+ Scanner storageScanner = mock (Scanner .class );
3695+ when (storageScanner .iterator ())
3696+ .thenReturn (Collections .<Result >singletonList (preparedResult ).iterator ());
3697+ when (storageScanner .one ()).thenReturn (Optional .of (preparedResult )).thenReturn (Optional .empty ());
3698+ when (storage .scan (scanForStorage )).thenReturn (storageScanner );
3699+
3700+ // After rollback, the recovered result has a different index column value
3701+ TransactionResult recoveredResult = prepareResultWithIndexColumnValue (ANY_TEXT_4 );
3702+ Snapshot .Key key = new Snapshot .Key (scanWithIndex , preparedResult , TABLE_METADATA );
3703+ @ SuppressWarnings ("unchecked" )
3704+ Future <Void > recoveryFuture = mock (Future .class );
3705+ when (recoveryExecutor .execute (
3706+ eq (key ),
3707+ eq (scanWithIndex ),
3708+ eq (preparedResult ),
3709+ eq (ANY_ID_1 ),
3710+ eq (RecoveryExecutor .RecoveryType .RETURN_LATEST_RESULT_AND_RECOVER )))
3711+ .thenReturn (
3712+ new RecoveryExecutor .Result (key , Optional .of (recoveredResult ), recoveryFuture , true ));
3713+
3714+ // Act
3715+ List <Result > results = scanOrGetScanner (scanWithIndex , scanType , context );
3716+
3717+ // Assert
3718+ assertThat (results ).isEmpty ();
3719+ verify (snapshot , never ()).putIntoReadSet (any (), any ());
3720+ }
3721+
3722+ @ ParameterizedTest
3723+ @ EnumSource (ScanType .class )
3724+ void scan_ScanWithIndexAndRolledBackRecordWithMatchingIndexKey_ShouldReturnResult (
3725+ ScanType scanType ) throws Exception {
3726+ // Arrange
3727+ Scan scanWithIndex = prepareScanWithIndex (); // indexKey = ANY_NAME_3:ANY_TEXT_3
3728+ Scan scanForStorage = toScanForStorageFrom (scanWithIndex );
3729+ TransactionContext context =
3730+ new TransactionContext (ANY_ID_1 , snapshot , Isolation .SNAPSHOT , false , false );
3731+
3732+ // Storage scan returns an uncommitted (PREPARED) record
3733+ TransactionResult preparedResult = prepareResult (TransactionState .PREPARED );
3734+ Scanner storageScanner = mock (Scanner .class );
3735+ when (storageScanner .iterator ())
3736+ .thenReturn (Collections .<Result >singletonList (preparedResult ).iterator ());
3737+ when (storageScanner .one ()).thenReturn (Optional .of (preparedResult )).thenReturn (Optional .empty ());
3738+ when (storage .scan (scanForStorage )).thenReturn (storageScanner );
3739+
3740+ // After rollback, the recovered result still has the matching index column value
3741+ TransactionResult recoveredResult = prepareResultWithIndexColumnValue (ANY_TEXT_3 );
3742+ Snapshot .Key key = new Snapshot .Key (scanWithIndex , preparedResult , TABLE_METADATA );
3743+ @ SuppressWarnings ("unchecked" )
3744+ Future <Void > recoveryFuture = mock (Future .class );
3745+ when (recoveryExecutor .execute (
3746+ eq (key ),
3747+ eq (scanWithIndex ),
3748+ eq (preparedResult ),
3749+ eq (ANY_ID_1 ),
3750+ eq (RecoveryExecutor .RecoveryType .RETURN_LATEST_RESULT_AND_RECOVER )))
3751+ .thenReturn (
3752+ new RecoveryExecutor .Result (key , Optional .of (recoveredResult ), recoveryFuture , true ));
3753+
3754+ // Act
3755+ List <Result > results = scanOrGetScanner (scanWithIndex , scanType , context );
3756+
3757+ // Assert
3758+ assertThat (results ).hasSize (1 );
3759+ }
3760+
3761+ private TransactionResult prepareResultWithIndexColumnValue (String indexColumnValue ) {
3762+ ImmutableMap <String , Column <?>> columns =
3763+ ImmutableMap .<String , Column <?>>builder ()
3764+ .put (ANY_NAME_1 , TextColumn .of (ANY_NAME_1 , ANY_TEXT_1 ))
3765+ .put (ANY_NAME_2 , TextColumn .of (ANY_NAME_2 , ANY_TEXT_2 ))
3766+ .put (ANY_NAME_3 , TextColumn .of (ANY_NAME_3 , indexColumnValue ))
3767+ .put (ANY_NAME_4 , IntColumn .of (ANY_NAME_4 , ANY_INT_1 ))
3768+ .put (Attribute .ID , TextColumn .of (Attribute .ID , ANY_ID_2 ))
3769+ .put (Attribute .STATE , IntColumn .of (Attribute .STATE , TransactionState .COMMITTED .get ()))
3770+ .put (Attribute .VERSION , IntColumn .of (Attribute .VERSION , 2 ))
3771+ .put (Attribute .BEFORE_ID , TextColumn .of (Attribute .BEFORE_ID , ANY_ID_1 ))
3772+ .put (
3773+ Attribute .BEFORE_STATE ,
3774+ IntColumn .of (Attribute .BEFORE_STATE , TransactionState .COMMITTED .get ()))
3775+ .put (Attribute .BEFORE_VERSION , IntColumn .of (Attribute .BEFORE_VERSION , 1 ))
3776+ .build ();
3777+ return new TransactionResult (new ResultImpl (columns , TABLE_METADATA ));
3778+ }
3779+
36063780 private List <Result > scanOrGetScanner (Scan scan , ScanType scanType , TransactionContext context )
36073781 throws CrudException {
36083782 if (scanType == ScanType .SCAN ) {
0 commit comments