Skip to content

Commit 4b18af6

Browse files
Add v3 timing difference matching
1 parent 2eeb522 commit 4b18af6

3 files changed

Lines changed: 190 additions & 28 deletions

File tree

tests/test_v3_pipeline_runner.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ def test_run_v3_pipeline_creates_expected_outputs():
3232
assert result["canonical_ledger_count"] == 600
3333
assert result["validation_issue_count"] >= 1
3434
assert result["exact_match_count"] >= 1
35+
assert result["timing_match_count"] >= 0
36+
assert result["deterministic_match_count"] >= result["exact_match_count"]
3537
assert result["exception_count"] >= 1
3638

3739
reconciliation_links = pd.read_csv(output_dir / "reconciliation_links.csv")
@@ -50,6 +52,6 @@ def test_run_v3_pipeline_creates_expected_outputs():
5052
"schema_validation",
5153
"bank_standardization",
5254
"ledger_standardization",
53-
"deterministic_exact_matching",
55+
"deterministic_matching",
5456
"exception_queue_build",
5557
}

versions/v3/src/matching/deterministic_rules.py

Lines changed: 165 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import pandas as pd
66

77

8-
MATCH_KEY_COLUMNS = [
8+
EXACT_MATCH_KEY_COLUMNS = [
99
"account_id",
1010
"currency",
1111
"direction",
@@ -14,6 +14,13 @@
1414
"canonical_date",
1515
]
1616

17+
TIMING_MATCH_KEY_COLUMNS = [
18+
"account_id",
19+
"currency",
20+
"direction",
21+
"amount_numeric",
22+
"normalized_reference",
23+
]
1724

1825
RECONCILIATION_LINK_COLUMNS = [
1926
"run_id",
@@ -42,44 +49,40 @@
4249
def _clean_value(value: Any) -> Any:
4350
if pd.isna(value):
4451
return None
45-
4652
return value
4753

4854

49-
def _eligible_for_exact_match(df: pd.DataFrame) -> pd.DataFrame:
50-
return df.dropna(subset=MATCH_KEY_COLUMNS).copy()
55+
def _empty_links() -> pd.DataFrame:
56+
return pd.DataFrame(columns=RECONCILIATION_LINK_COLUMNS)
57+
58+
59+
def _matched_ids(reconciliation_links: pd.DataFrame, column_name: str) -> set[int]:
60+
if reconciliation_links.empty or column_name not in reconciliation_links.columns:
61+
return set()
62+
return {int(value) for value in reconciliation_links[column_name].dropna()}
5163

5264

5365
def find_exact_matches(
5466
canonical_bank: pd.DataFrame,
5567
canonical_ledger: pd.DataFrame,
5668
run_id: str,
69+
link_start_index: int = 1,
5770
) -> pd.DataFrame:
58-
"""Find high-confidence exact matches between canonical bank and ledger rows.
59-
60-
Exact match criteria:
61-
- same account
62-
- same currency
63-
- same direction
64-
- same amount
65-
- same normalized reference
66-
- same canonical date
67-
"""
68-
bank_candidates = _eligible_for_exact_match(canonical_bank)
69-
ledger_candidates = _eligible_for_exact_match(canonical_ledger)
71+
bank_candidates = canonical_bank.dropna(subset=EXACT_MATCH_KEY_COLUMNS).copy()
72+
ledger_candidates = canonical_ledger.dropna(subset=EXACT_MATCH_KEY_COLUMNS).copy()
7073

7174
if bank_candidates.empty or ledger_candidates.empty:
72-
return pd.DataFrame(columns=RECONCILIATION_LINK_COLUMNS)
75+
return _empty_links()
7376

7477
merged = bank_candidates.merge(
7578
ledger_candidates,
76-
on=MATCH_KEY_COLUMNS,
79+
on=EXACT_MATCH_KEY_COLUMNS,
7780
suffixes=("_bank", "_ledger"),
7881
how="inner",
7982
)
8083

8184
if merged.empty:
82-
return pd.DataFrame(columns=RECONCILIATION_LINK_COLUMNS)
85+
return _empty_links()
8386

8487
merged = merged.sort_values(
8588
by=["source_row_id_bank", "source_row_id_ledger"],
@@ -96,14 +99,13 @@ def find_exact_matches(
9699

97100
if bank_source_row_id in matched_bank_rows:
98101
continue
99-
100102
if ledger_source_row_id in matched_ledger_rows:
101103
continue
102104

103105
matched_bank_rows.add(bank_source_row_id)
104106
matched_ledger_rows.add(ledger_source_row_id)
105107

106-
link_number = len(links) + 1
108+
link_number = link_start_index + len(links)
107109

108110
links.append(
109111
{
@@ -131,3 +133,145 @@ def find_exact_matches(
131133
)
132134

133135
return pd.DataFrame(links, columns=RECONCILIATION_LINK_COLUMNS)
136+
137+
138+
def find_timing_difference_matches(
139+
canonical_bank: pd.DataFrame,
140+
canonical_ledger: pd.DataFrame,
141+
run_id: str,
142+
existing_links: pd.DataFrame | None = None,
143+
max_day_gap: int = 2,
144+
link_start_index: int = 1,
145+
) -> pd.DataFrame:
146+
existing_links = existing_links if existing_links is not None else _empty_links()
147+
148+
matched_bank_rows = _matched_ids(existing_links, "bank_source_row_id")
149+
matched_ledger_rows = _matched_ids(existing_links, "ledger_source_row_id")
150+
151+
bank_candidates = canonical_bank.dropna(
152+
subset=TIMING_MATCH_KEY_COLUMNS + ["canonical_date"]
153+
).copy()
154+
ledger_candidates = canonical_ledger.dropna(
155+
subset=TIMING_MATCH_KEY_COLUMNS + ["canonical_date"]
156+
).copy()
157+
158+
bank_candidates = bank_candidates[
159+
~bank_candidates["source_row_id"].isin(matched_bank_rows)
160+
].copy()
161+
ledger_candidates = ledger_candidates[
162+
~ledger_candidates["source_row_id"].isin(matched_ledger_rows)
163+
].copy()
164+
165+
if bank_candidates.empty or ledger_candidates.empty:
166+
return _empty_links()
167+
168+
merged = bank_candidates.merge(
169+
ledger_candidates,
170+
on=TIMING_MATCH_KEY_COLUMNS,
171+
suffixes=("_bank", "_ledger"),
172+
how="inner",
173+
)
174+
175+
if merged.empty:
176+
return _empty_links()
177+
178+
candidates = []
179+
180+
for _, row in merged.iterrows():
181+
bank_date = pd.to_datetime(row["canonical_date_bank"], errors="coerce")
182+
ledger_date = pd.to_datetime(row["canonical_date_ledger"], errors="coerce")
183+
184+
if pd.isna(bank_date) or pd.isna(ledger_date):
185+
continue
186+
187+
date_gap_days = abs((bank_date - ledger_date).days)
188+
189+
if date_gap_days == 0:
190+
continue
191+
if date_gap_days > max_day_gap:
192+
continue
193+
194+
candidate = row.to_dict()
195+
candidate["date_gap_days"] = date_gap_days
196+
candidates.append(candidate)
197+
198+
if not candidates:
199+
return _empty_links()
200+
201+
timing_df = pd.DataFrame(candidates).sort_values(
202+
by=["date_gap_days", "source_row_id_bank", "source_row_id_ledger"],
203+
kind="stable",
204+
)
205+
206+
matched_bank_rows = set()
207+
matched_ledger_rows = set()
208+
links: list[dict[str, Any]] = []
209+
210+
for _, row in timing_df.iterrows():
211+
bank_source_row_id = int(row["source_row_id_bank"])
212+
ledger_source_row_id = int(row["source_row_id_ledger"])
213+
214+
if bank_source_row_id in matched_bank_rows:
215+
continue
216+
if ledger_source_row_id in matched_ledger_rows:
217+
continue
218+
219+
matched_bank_rows.add(bank_source_row_id)
220+
matched_ledger_rows.add(ledger_source_row_id)
221+
222+
link_number = link_start_index + len(links)
223+
224+
links.append(
225+
{
226+
"run_id": run_id,
227+
"link_id": f"LINK-{link_number:06d}",
228+
"match_type": "POTENTIAL_TIMING_DIFFERENCE",
229+
"stage_detected": "deterministic_timing",
230+
"confidence_score": 0.95,
231+
"bank_source_row_id": bank_source_row_id,
232+
"ledger_source_row_id": ledger_source_row_id,
233+
"bank_transaction_id": _clean_value(row.get("bank_transaction_id")),
234+
"ledger_transaction_id": _clean_value(row.get("ledger_transaction_id")),
235+
"account_id": row["account_id"],
236+
"currency": row["currency"],
237+
"direction": row["direction"],
238+
"amount_bank": row["amount_numeric"],
239+
"amount_internal": row["amount_numeric"],
240+
"transaction_date_bank": row["canonical_date_bank"],
241+
"transaction_date_internal": row["canonical_date_ledger"],
242+
"normalized_reference": row["normalized_reference"],
243+
"counterparty_bank": _clean_value(row.get("counterparty_bank")),
244+
"counterparty_internal": _clean_value(row.get("counterparty_ledger")),
245+
"rationale": f"Matched on account, currency, direction, amount, and normalized reference with a {int(row['date_gap_days'])}-day date gap.",
246+
}
247+
)
248+
249+
return pd.DataFrame(links, columns=RECONCILIATION_LINK_COLUMNS)
250+
251+
252+
def find_deterministic_matches(
253+
canonical_bank: pd.DataFrame,
254+
canonical_ledger: pd.DataFrame,
255+
run_id: str,
256+
) -> pd.DataFrame:
257+
exact_matches = find_exact_matches(
258+
canonical_bank=canonical_bank,
259+
canonical_ledger=canonical_ledger,
260+
run_id=run_id,
261+
link_start_index=1,
262+
)
263+
264+
timing_matches = find_timing_difference_matches(
265+
canonical_bank=canonical_bank,
266+
canonical_ledger=canonical_ledger,
267+
run_id=run_id,
268+
existing_links=exact_matches,
269+
link_start_index=len(exact_matches) + 1,
270+
)
271+
272+
all_links = [df for df in [exact_matches, timing_matches] if not df.empty]
273+
274+
if not all_links:
275+
return _empty_links()
276+
277+
return pd.concat(all_links, ignore_index=True)

versions/v3/src/reconciliation/run_v3_pipeline.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
standardize_bank_transactions,
1919
standardize_internal_ledger,
2020
)
21-
from versions.v3.src.matching.deterministic_rules import find_exact_matches # noqa: E402
21+
from versions.v3.src.matching.deterministic_rules import find_deterministic_matches # noqa: E402
2222
from versions.v3.src.reconciliation.exception_builder import build_exception_queue # noqa: E402
2323

2424

@@ -93,18 +93,32 @@ def run_v3_pipeline() -> dict[str, Any]:
9393
print(f"Canonical bank output: {canonical_bank_output_path}")
9494
print(f"Canonical ledger output: {canonical_ledger_output_path}")
9595

96-
print("Step 3/5: Running deterministic exact matching...")
96+
print("Step 3/5: Running deterministic matching...")
9797

98-
reconciliation_links = find_exact_matches(
98+
reconciliation_links = find_deterministic_matches(
9999
canonical_bank=canonical_bank,
100100
canonical_ledger=canonical_ledger,
101101
run_id=run_id,
102102
)
103103

104+
exact_match_count = (
105+
int((reconciliation_links["match_type"] == "EXACT_CANONICAL_MATCH").sum())
106+
if not reconciliation_links.empty
107+
else 0
108+
)
109+
110+
timing_match_count = (
111+
int((reconciliation_links["match_type"] == "POTENTIAL_TIMING_DIFFERENCE").sum())
112+
if not reconciliation_links.empty
113+
else 0
114+
)
115+
104116
reconciliation_links_output_path = V3_OUTPUT_DIR / "reconciliation_links.csv"
105117
write_csv(reconciliation_links, reconciliation_links_output_path)
106118

107-
print(f"Exact reconciliation links: {len(reconciliation_links)}")
119+
print(f"Exact reconciliation links: {exact_match_count}")
120+
print(f"Timing-difference links: {timing_match_count}")
121+
print(f"Total deterministic links: {len(reconciliation_links)}")
108122
print(f"Reconciliation links output: {reconciliation_links_output_path}")
109123

110124
print("Step 4/5: Building exception queue...")
@@ -146,7 +160,7 @@ def run_v3_pipeline() -> dict[str, Any]:
146160
},
147161
{
148162
"run_id": run_id,
149-
"stage": "deterministic_exact_matching",
163+
"stage": "deterministic_matching",
150164
"output_file": "reconciliation_links.csv",
151165
"record_count": len(reconciliation_links),
152166
},
@@ -170,7 +184,9 @@ def run_v3_pipeline() -> dict[str, Any]:
170184
"validation_issue_count": len(validation_issues_df),
171185
"canonical_bank_count": len(canonical_bank),
172186
"canonical_ledger_count": len(canonical_ledger),
173-
"exact_match_count": len(reconciliation_links),
187+
"exact_match_count": exact_match_count,
188+
"timing_match_count": timing_match_count,
189+
"deterministic_match_count": len(reconciliation_links),
174190
"exception_count": len(exception_queue),
175191
"summary_output_path": summary_output_path,
176192
}

0 commit comments

Comments
 (0)