Skip to content

Commit 8bb62c9

Browse files
Integrate candidate links into v3 pipeline
1 parent 0bb7103 commit 8bb62c9

2 files changed

Lines changed: 41 additions & 5 deletions

File tree

tests/test_v3_pipeline_runner.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ def test_run_v3_pipeline_creates_expected_outputs():
2020
"canonical_bank_transactions.csv",
2121
"canonical_internal_transactions.csv",
2222
"reconciliation_links.csv",
23+
"candidate_links.csv",
2324
"exception_queue.csv",
2425
"pipeline_run_summary.csv",
2526
]
@@ -34,13 +35,24 @@ def test_run_v3_pipeline_creates_expected_outputs():
3435
assert result["exact_match_count"] >= 1
3536
assert result["timing_match_count"] >= 0
3637
assert result["deterministic_match_count"] >= result["exact_match_count"]
38+
assert result["candidate_link_count"] >= 0
3739
assert result["amount_mismatch_count"] >= 0
3840
assert result["exception_count"] >= 1
3941

4042
reconciliation_links = pd.read_csv(output_dir / "reconciliation_links.csv")
4143
assert not reconciliation_links.empty
4244
assert "EXACT_CANONICAL_MATCH" in set(reconciliation_links["match_type"])
4345

46+
candidate_links = pd.read_csv(output_dir / "candidate_links.csv")
47+
assert {
48+
"candidate_id",
49+
"candidate_status",
50+
"confidence_score",
51+
"bank_source_row_id",
52+
"ledger_source_row_id",
53+
"rationale",
54+
}.issubset(set(candidate_links.columns))
55+
4456
exception_queue = pd.read_csv(output_dir / "exception_queue.csv")
4557
assert not exception_queue.empty
4658
assert {"UNMATCHED_BANK_TRANSACTION", "UNMATCHED_LEDGER_TRANSACTION"} & set(
@@ -54,5 +66,6 @@ def test_run_v3_pipeline_creates_expected_outputs():
5466
"bank_standardization",
5567
"ledger_standardization",
5668
"deterministic_matching",
69+
"candidate_link_generation",
5770
"exception_queue_build",
5871
}

versions/v3/src/reconciliation/run_v3_pipeline.py

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
standardize_bank_transactions,
1919
standardize_internal_ledger,
2020
)
21+
from versions.v3.src.matching.candidate_links import build_candidate_links # noqa: E402
2122
from versions.v3.src.matching.deterministic_rules import find_deterministic_matches # noqa: E402
2223
from versions.v3.src.reconciliation.exception_builder import build_exception_queue # noqa: E402
2324

@@ -49,7 +50,7 @@ def run_v3_pipeline() -> dict[str, Any]:
4950
ledger_schema_path = V3_SCHEMA_DIR / "internal_cash_ledger.schema.yaml"
5051

5152
print(f"Starting v3 pipeline run: {run_id}")
52-
print("Step 1/5: Running schema validation...")
53+
print("Step 1/6: Running schema validation...")
5354

5455
validation_issues = []
5556
validation_issues.extend(
@@ -74,7 +75,7 @@ def run_v3_pipeline() -> dict[str, Any]:
7475
print(f"Validation issues found: {len(validation_issues_df)}")
7576
print(f"Validation output: {validation_output_path}")
7677

77-
print("Step 2/5: Standardizing source transactions...")
78+
print("Step 2/6: Standardizing source transactions...")
7879

7980
bank_df = pd.read_csv(bank_input_path)
8081
ledger_df = pd.read_csv(ledger_input_path)
@@ -93,7 +94,7 @@ def run_v3_pipeline() -> dict[str, Any]:
9394
print(f"Canonical bank output: {canonical_bank_output_path}")
9495
print(f"Canonical ledger output: {canonical_ledger_output_path}")
9596

96-
print("Step 3/5: Running deterministic matching...")
97+
print("Step 3/6: Running deterministic matching...")
9798

9899
reconciliation_links = find_deterministic_matches(
99100
canonical_bank=canonical_bank,
@@ -121,7 +122,22 @@ def run_v3_pipeline() -> dict[str, Any]:
121122
print(f"Total deterministic links: {len(reconciliation_links)}")
122123
print(f"Reconciliation links output: {reconciliation_links_output_path}")
123124

124-
print("Step 4/5: Building exception queue...")
125+
print("Step 4/6: Building candidate links for analyst review...")
126+
127+
candidate_links = build_candidate_links(
128+
canonical_bank=canonical_bank,
129+
canonical_ledger=canonical_ledger,
130+
reconciliation_links=reconciliation_links,
131+
run_id=run_id,
132+
)
133+
134+
candidate_links_output_path = V3_OUTPUT_DIR / "candidate_links.csv"
135+
write_csv(candidate_links, candidate_links_output_path)
136+
137+
print(f"Candidate links for review: {len(candidate_links)}")
138+
print(f"Candidate links output: {candidate_links_output_path}")
139+
140+
print("Step 5/6: Building exception queue...")
125141

126142
exception_queue = build_exception_queue(
127143
canonical_bank=canonical_bank,
@@ -143,7 +159,7 @@ def run_v3_pipeline() -> dict[str, Any]:
143159
print(f"Exception queue rows: {len(exception_queue)}")
144160
print(f"Exception queue output: {exception_queue_output_path}")
145161

146-
print("Step 5/5: Writing pipeline summary...")
162+
print("Step 6/6: Writing pipeline summary...")
147163

148164
summary = pd.DataFrame(
149165
[
@@ -171,6 +187,12 @@ def run_v3_pipeline() -> dict[str, Any]:
171187
"output_file": "reconciliation_links.csv",
172188
"record_count": len(reconciliation_links),
173189
},
190+
{
191+
"run_id": run_id,
192+
"stage": "candidate_link_generation",
193+
"output_file": "candidate_links.csv",
194+
"record_count": len(candidate_links),
195+
},
174196
{
175197
"run_id": run_id,
176198
"stage": "exception_queue_build",
@@ -194,6 +216,7 @@ def run_v3_pipeline() -> dict[str, Any]:
194216
"exact_match_count": exact_match_count,
195217
"timing_match_count": timing_match_count,
196218
"deterministic_match_count": len(reconciliation_links),
219+
"candidate_link_count": len(candidate_links),
197220
"amount_mismatch_count": amount_mismatch_count,
198221
"exception_count": len(exception_queue),
199222
"summary_output_path": summary_output_path,

0 commit comments

Comments
 (0)