Skip to content

Commit b672f4d

Browse files
Add v3 schema validator
1 parent 0420f8e commit b672f4d

2 files changed

Lines changed: 209 additions & 0 deletions

File tree

requirements-v3.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pandas>=2.0.0
2+
PyYAML>=6.0.0
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
from pathlib import Path
2+
from typing import Any
3+
4+
import pandas as pd
5+
import yaml
6+
7+
8+
REPO_ROOT = Path(__file__).resolve().parents[4]
9+
10+
V2_DATA_DIR = REPO_ROOT / "versions" / "v2" / "data"
11+
V3_SCHEMA_DIR = REPO_ROOT / "versions" / "v3" / "schemas"
12+
V3_OUTPUT_DIR = REPO_ROOT / "versions" / "v3" / "output"
13+
14+
15+
def load_schema(schema_path: Path) -> list[dict[str, Any]]:
16+
with schema_path.open("r", encoding="utf-8") as file:
17+
schema = yaml.safe_load(file)
18+
19+
return schema.get("fields", [])
20+
21+
22+
def is_missing(value: Any) -> bool:
23+
if pd.isna(value):
24+
return True
25+
26+
if isinstance(value, str) and value.strip() == "":
27+
return True
28+
29+
return False
30+
31+
32+
def add_issue(
33+
issues: list[dict[str, Any]],
34+
source_name: str,
35+
row_number: int | str,
36+
field_name: str,
37+
issue_code: str,
38+
severity: str,
39+
observed_value: Any,
40+
expected_rule: str,
41+
suggested_fix: str,
42+
) -> None:
43+
issues.append(
44+
{
45+
"source_name": source_name,
46+
"row_number": row_number,
47+
"field_name": field_name,
48+
"issue_code": issue_code,
49+
"severity": severity,
50+
"observed_value": observed_value,
51+
"expected_rule": expected_rule,
52+
"suggested_fix": suggested_fix,
53+
}
54+
)
55+
56+
57+
def validate_dataframe(
58+
df: pd.DataFrame,
59+
schema_fields: list[dict[str, Any]],
60+
source_name: str,
61+
) -> list[dict[str, Any]]:
62+
issues: list[dict[str, Any]] = []
63+
columns = set(df.columns)
64+
65+
for field in schema_fields:
66+
field_name = field["name"]
67+
field_type = field.get("type", "string")
68+
constraints = field.get("constraints", {}) or {}
69+
is_required = constraints.get("required", False)
70+
allowed_values = constraints.get("enum")
71+
72+
if field_name not in columns:
73+
severity = "High" if is_required else "Medium"
74+
issue_code = (
75+
"MISSING_REQUIRED_COLUMN"
76+
if is_required
77+
else "MISSING_EXPECTED_COLUMN"
78+
)
79+
80+
add_issue(
81+
issues=issues,
82+
source_name=source_name,
83+
row_number="file",
84+
field_name=field_name,
85+
issue_code=issue_code,
86+
severity=severity,
87+
observed_value="column not found",
88+
expected_rule=f"Column '{field_name}' should exist in source file.",
89+
suggested_fix=f"Add or map the '{field_name}' column before reconciliation.",
90+
)
91+
continue
92+
93+
series = df[field_name]
94+
95+
if is_required:
96+
for index, value in series.items():
97+
if is_missing(value):
98+
add_issue(
99+
issues=issues,
100+
source_name=source_name,
101+
row_number=index + 2,
102+
field_name=field_name,
103+
issue_code="MISSING_REQUIRED_VALUE",
104+
severity="High",
105+
observed_value=value,
106+
expected_rule=f"'{field_name}' is required and cannot be empty.",
107+
suggested_fix=f"Populate '{field_name}' or route the row to data-quality review.",
108+
)
109+
110+
if field_type == "date":
111+
parsed_dates = pd.to_datetime(series, errors="coerce")
112+
113+
for index, value in series.items():
114+
if not is_missing(value) and pd.isna(parsed_dates.loc[index]):
115+
add_issue(
116+
issues=issues,
117+
source_name=source_name,
118+
row_number=index + 2,
119+
field_name=field_name,
120+
issue_code="INVALID_DATE",
121+
severity="High",
122+
observed_value=value,
123+
expected_rule=f"'{field_name}' should be a valid date.",
124+
suggested_fix="Convert the value to a valid date format before reconciliation.",
125+
)
126+
127+
if field_type == "number":
128+
parsed_numbers = pd.to_numeric(series, errors="coerce")
129+
130+
for index, value in series.items():
131+
if not is_missing(value) and pd.isna(parsed_numbers.loc[index]):
132+
add_issue(
133+
issues=issues,
134+
source_name=source_name,
135+
row_number=index + 2,
136+
field_name=field_name,
137+
issue_code="INVALID_NUMBER",
138+
severity="High",
139+
observed_value=value,
140+
expected_rule=f"'{field_name}' should be numeric.",
141+
suggested_fix="Convert the value to a valid numeric amount before reconciliation.",
142+
)
143+
144+
if allowed_values:
145+
for index, value in series.items():
146+
if not is_missing(value) and value not in allowed_values:
147+
add_issue(
148+
issues=issues,
149+
source_name=source_name,
150+
row_number=index + 2,
151+
field_name=field_name,
152+
issue_code="VALUE_NOT_ALLOWED",
153+
severity="Medium",
154+
observed_value=value,
155+
expected_rule=f"'{field_name}' should be one of: {allowed_values}.",
156+
suggested_fix=f"Map '{field_name}' to an approved value before reconciliation.",
157+
)
158+
159+
return issues
160+
161+
162+
def validate_source_file(
163+
source_name: str,
164+
csv_path: Path,
165+
schema_path: Path,
166+
) -> list[dict[str, Any]]:
167+
df = pd.read_csv(csv_path)
168+
schema_fields = load_schema(schema_path)
169+
170+
return validate_dataframe(
171+
df=df,
172+
schema_fields=schema_fields,
173+
source_name=source_name,
174+
)
175+
176+
177+
def main() -> None:
178+
V3_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
179+
180+
validation_issues: list[dict[str, Any]] = []
181+
182+
validation_issues.extend(
183+
validate_source_file(
184+
source_name="bank_statement",
185+
csv_path=V2_DATA_DIR / "bank_statement_v2.csv",
186+
schema_path=V3_SCHEMA_DIR / "bank_statement.schema.yaml",
187+
)
188+
)
189+
190+
validation_issues.extend(
191+
validate_source_file(
192+
source_name="internal_cash_ledger",
193+
csv_path=V2_DATA_DIR / "internal_cash_ledger_v2.csv",
194+
schema_path=V3_SCHEMA_DIR / "internal_cash_ledger.schema.yaml",
195+
)
196+
)
197+
198+
output_path = V3_OUTPUT_DIR / "validation_issues.csv"
199+
pd.DataFrame(validation_issues).to_csv(output_path, index=False)
200+
201+
print("Validation complete.")
202+
print(f"Issues found: {len(validation_issues)}")
203+
print(f"Output written to: {output_path}")
204+
205+
206+
if __name__ == "__main__":
207+
main()

0 commit comments

Comments
 (0)