|
2 | 2 |
|
3 | 3 | import static com.scalar.db.transaction.consensuscommit.ConsensusCommitOperationAttributes.isImplicitPreReadEnabled; |
4 | 4 | import static com.scalar.db.transaction.consensuscommit.ConsensusCommitOperationAttributes.isInsertModeEnabled; |
5 | | -import static com.scalar.db.transaction.consensuscommit.ConsensusCommitUtils.getTransactionTableMetadata; |
6 | 5 |
|
7 | 6 | import com.google.common.annotations.VisibleForTesting; |
8 | 7 | import com.google.common.base.MoreObjects; |
9 | 8 | import com.google.common.collect.ComparisonChain; |
10 | 9 | import com.google.common.collect.Iterators; |
11 | 10 | import com.scalar.db.api.ConditionSetBuilder; |
| 11 | +import com.scalar.db.api.ConditionalExpression; |
12 | 12 | import com.scalar.db.api.Delete; |
13 | 13 | import com.scalar.db.api.DistributedStorage; |
14 | 14 | import com.scalar.db.api.Get; |
|
20 | 20 | import com.scalar.db.api.ScanAll; |
21 | 21 | import com.scalar.db.api.ScanWithIndex; |
22 | 22 | import com.scalar.db.api.Scanner; |
| 23 | +import com.scalar.db.api.Selection; |
23 | 24 | import com.scalar.db.api.Selection.Conjunction; |
24 | 25 | import com.scalar.db.api.TableMetadata; |
25 | 26 | import com.scalar.db.common.CoreError; |
@@ -560,22 +561,43 @@ void toSerializable(DistributedStorage storage) |
560 | 561 |
|
561 | 562 | // Scan set is re-validated to check if there is no anti-dependency |
562 | 563 | for (Map.Entry<Scan, LinkedHashMap<Key, TransactionResult>> entry : scanSet.entrySet()) { |
563 | | - tasks.add(() -> validateScanResults(storage, entry.getKey(), entry.getValue(), false)); |
| 564 | + tasks.add( |
| 565 | + () -> { |
| 566 | + TransactionTableMetadata txMetadata = getTransactionTableMetadata(entry.getKey()); |
| 567 | + validateScanResults( |
| 568 | + storage, entry.getKey(), entry.getValue(), false, txMetadata.getTableMetadata()); |
| 569 | + validateBeforeIndex(storage, entry.getKey(), txMetadata); |
| 570 | + }); |
564 | 571 | } |
565 | 572 |
|
566 | 573 | // Scanner set is re-validated to check if there is no anti-dependency |
567 | 574 | for (ScannerInfo scannerInfo : scannerSet) { |
568 | | - tasks.add(() -> validateScanResults(storage, scannerInfo.scan, scannerInfo.results, true)); |
| 575 | + tasks.add( |
| 576 | + () -> { |
| 577 | + TransactionTableMetadata txMetadata = getTransactionTableMetadata(scannerInfo.scan); |
| 578 | + validateScanResults( |
| 579 | + storage, |
| 580 | + scannerInfo.scan, |
| 581 | + scannerInfo.results, |
| 582 | + true, |
| 583 | + txMetadata.getTableMetadata()); |
| 584 | + validateBeforeIndex(storage, scannerInfo.scan, txMetadata); |
| 585 | + }); |
569 | 586 | } |
570 | 587 |
|
571 | 588 | // Get set is re-validated to check if there is no anti-dependency |
572 | 589 | for (Map.Entry<Get, Optional<TransactionResult>> entry : getSet.entrySet()) { |
573 | 590 | Get get = entry.getKey(); |
574 | | - TableMetadata metadata = getTableMetadata(get); |
| 591 | + TransactionTableMetadata txMetadata = getTransactionTableMetadata(get); |
| 592 | + TableMetadata metadata = txMetadata.getTableMetadata(); |
575 | 593 |
|
576 | 594 | if (ScalarDbUtils.isSecondaryIndexSpecified(get, metadata)) { |
577 | 595 | // For Get with index |
578 | | - tasks.add(() -> validateGetWithIndexResult(storage, get, entry.getValue(), metadata)); |
| 596 | + tasks.add( |
| 597 | + () -> { |
| 598 | + validateGetWithIndexResult(storage, get, entry.getValue(), metadata); |
| 599 | + validateBeforeIndex(storage, get, txMetadata); |
| 600 | + }); |
579 | 601 | } else { |
580 | 602 | // For other Get |
581 | 603 |
|
@@ -610,20 +632,19 @@ void toSerializable(DistributedStorage storage) |
610 | 632 | * @param results the results of the scan |
611 | 633 | * @param notFullyScannedScanner if this is a validation for a scanner that has not been fully |
612 | 634 | * scanned |
| 635 | + * @param metadata the table metadata for the scanned table |
613 | 636 | * @throws ExecutionException if a storage operation fails |
614 | 637 | * @throws ValidationConflictException if the scan results are changed by another transaction |
615 | 638 | */ |
616 | 639 | private void validateScanResults( |
617 | 640 | DistributedStorage storage, |
618 | 641 | Scan scan, |
619 | 642 | LinkedHashMap<Key, TransactionResult> results, |
620 | | - boolean notFullyScannedScanner) |
| 643 | + boolean notFullyScannedScanner, |
| 644 | + TableMetadata metadata) |
621 | 645 | throws ExecutionException, ValidationConflictException { |
622 | | - Scanner scanner = null; |
623 | | - try { |
624 | | - TableMetadata metadata = getTableMetadata(scan); |
625 | | - |
626 | | - scanner = storage.scan(ConsensusCommitUtils.prepareScanForStorage(scan, metadata)); |
| 646 | + try (Scanner scanner = |
| 647 | + storage.scan(ConsensusCommitUtils.prepareScanForStorage(scan, metadata))) { |
627 | 648 |
|
628 | 649 | // Initialize the iterator for the latest scan results |
629 | 650 | Optional<Result> latestResult = getNextResult(scanner, scan); |
@@ -722,14 +743,8 @@ private void validateScanResults( |
722 | 743 | throwExceptionDueToAntiDependency(); |
723 | 744 | } |
724 | 745 | } |
725 | | - } finally { |
726 | | - if (scanner != null) { |
727 | | - try { |
728 | | - scanner.close(); |
729 | | - } catch (IOException e) { |
730 | | - logger.warn("Failed to close the scanner. Transaction ID: {}", id, e); |
731 | | - } |
732 | | - } |
| 746 | + } catch (IOException e) { |
| 747 | + logger.warn("Failed to close the scanner. Transaction ID: {}", id, e); |
733 | 748 | } |
734 | 749 | } |
735 | 750 |
|
@@ -780,7 +795,7 @@ private void validateGetWithIndexResult( |
780 | 795 | originalResult.ifPresent(r -> results.put(new Snapshot.Key(scanWithIndex, r, metadata), r)); |
781 | 796 |
|
782 | 797 | // Validate the result to check if there is no anti-dependency |
783 | | - validateScanResults(storage, scanWithIndex, results, false); |
| 798 | + validateScanResults(storage, scanWithIndex, results, false, metadata); |
784 | 799 | } |
785 | 800 |
|
786 | 801 | private void validateGetResult( |
@@ -810,10 +825,99 @@ private void validateGetResult( |
810 | 825 | } |
811 | 826 | } |
812 | 827 |
|
813 | | - private TableMetadata getTableMetadata(Operation operation) throws ExecutionException { |
814 | | - TransactionTableMetadata transactionTableMetadata = |
815 | | - getTransactionTableMetadata(tableMetadataManager, operation); |
816 | | - return transactionTableMetadata.getTableMetadata(); |
| 828 | + /** |
| 829 | + * Validates that there are no uncommitted records on the before-image index that could cause |
| 830 | + * phantom reads. |
| 831 | + * |
| 832 | + * <p>This is needed because when another transaction updates a record's indexed column (e.g., |
| 833 | + * from 10 to 20) and is in PREPARED/DELETED state, the regular index scan (e.g., index_col=10) |
| 834 | + * won't find that record since its current value is 20. However, the record's committed |
| 835 | + * (before-image) value is still 10. Without this check, a phantom could go undetected: a record |
| 836 | + * committed with index_col=10 but updated to 20 by another PREPARED transaction would be |
| 837 | + * invisible to both the original scan and the validation re-scan. |
| 838 | + * |
| 839 | + * <p>This method is only called in the SERIALIZABLE extra-read validation phase. In SERIALIZABLE, |
| 840 | + * {@link ConsensusCommitOperationChecker} rejects index-based operations on tables without |
| 841 | + * before-image indexes, so the existence of before-image indexes is guaranteed when this method |
| 842 | + * is called. Therefore, this method only needs to check whether the selection is an index-based |
| 843 | + * operation (Get with index, Scan with index, or ScanAll with indexed column conditions), without |
| 844 | + * checking for the existence of before-image indexes. |
| 845 | + * |
| 846 | + * @param storage a distributed storage |
| 847 | + * @param selection the original selection operation (Get with index, ScanWithIndex, or ScanAll) |
| 848 | + * @throws ExecutionException if a storage operation fails |
| 849 | + * @throws ValidationConflictException if uncommitted records are found on the before-image index |
| 850 | + */ |
| 851 | + private void validateBeforeIndex( |
| 852 | + DistributedStorage storage, Selection selection, TransactionTableMetadata txMetadata) |
| 853 | + throws ExecutionException, ValidationConflictException { |
| 854 | + if (!isIndexBasedOperation(selection, txMetadata.getTableMetadata())) { |
| 855 | + return; |
| 856 | + } |
| 857 | + |
| 858 | + Scan beforeIndexScan; |
| 859 | + if (selection instanceof ScanAll) { |
| 860 | + beforeIndexScan = |
| 861 | + ConsensusCommitUtils.createBeforeIndexScanAll( |
| 862 | + (ScanAll) selection, txMetadata.getTableMetadata()); |
| 863 | + } else { |
| 864 | + beforeIndexScan = ConsensusCommitUtils.createBeforeIndexScan(selection); |
| 865 | + } |
| 866 | + |
| 867 | + try (Scanner scanner = storage.scan(beforeIndexScan)) { |
| 868 | + for (Result result : scanner) { |
| 869 | + TransactionResult txResult = new TransactionResult(result); |
| 870 | + // Conservatively fail if any uncommitted record from another transaction is found on the |
| 871 | + // before-image index. This may cause false positives (e.g., when the record will be |
| 872 | + // rolled forward and its committed index value won't actually match the scan condition), |
| 873 | + // but it guarantees correctness. On retry, the record should be committed, so the retry |
| 874 | + // will succeed. |
| 875 | + if (!txResult.isCommitted() && !id.equals(txResult.getId())) { |
| 876 | + throwExceptionDueToAntiDependency(); |
| 877 | + } |
| 878 | + } |
| 879 | + } catch (RuntimeException e) { |
| 880 | + if (e.getCause() instanceof ExecutionException) { |
| 881 | + throw (ExecutionException) e.getCause(); |
| 882 | + } |
| 883 | + throw e; |
| 884 | + } catch (IOException e) { |
| 885 | + logger.warn("Failed to close the scanner. Transaction ID: {}", id, e); |
| 886 | + } |
| 887 | + } |
| 888 | + |
| 889 | + /** |
| 890 | + * Checks if the given selection is an index-based operation that requires before-image index |
| 891 | + * validation. This includes Get with index, Scan with index, and ScanAll with conditions on |
| 892 | + * indexed columns. |
| 893 | + * |
| 894 | + * <p>For ScanAll, whether the underlying storage actually uses the index depends on the storage |
| 895 | + * implementation. However, this method considers ScanAll with conditions on indexed columns as an |
| 896 | + * index-based operation regardless. |
| 897 | + * |
| 898 | + * @param selection the selection operation to check |
| 899 | + * @param metadata the table metadata |
| 900 | + * @return true if the selection is an index-based operation |
| 901 | + */ |
| 902 | + private boolean isIndexBasedOperation(Selection selection, TableMetadata metadata) { |
| 903 | + if (ScalarDbUtils.isSecondaryIndexSpecified(selection, metadata)) { |
| 904 | + return true; |
| 905 | + } |
| 906 | + if (selection instanceof ScanAll) { |
| 907 | + for (Selection.Conjunction conjunction : selection.getConjunctions()) { |
| 908 | + for (ConditionalExpression condition : conjunction.getConditions()) { |
| 909 | + if (metadata.getSecondaryIndexNames().contains(condition.getColumn().getName())) { |
| 910 | + return true; |
| 911 | + } |
| 912 | + } |
| 913 | + } |
| 914 | + } |
| 915 | + return false; |
| 916 | + } |
| 917 | + |
| 918 | + private TransactionTableMetadata getTransactionTableMetadata(Operation operation) |
| 919 | + throws ExecutionException { |
| 920 | + return ConsensusCommitUtils.getTransactionTableMetadata(tableMetadataManager, operation); |
817 | 921 | } |
818 | 922 |
|
819 | 923 | private boolean isChanged( |
|
0 commit comments