Skip to content

Commit ef52b63

Browse files
authored
Merge pull request #78 from QWED-AI/feat/phase-19-trust
feat(guards): implement SovereigntyGuard for PII data residency and local routing
2 parents 047713b + 45503af commit ef52b63

2 files changed

Lines changed: 150 additions & 0 deletions

File tree

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import re
2+
3+
class SovereigntyGuard:
4+
"""
5+
Enforces Data Residency and Sovereignty.
6+
Source: Alignment with privacy-sensitive legal deployments [Source 60, 61].
7+
"""
8+
def __init__(self, required_local_providers=None):
9+
if required_local_providers is None:
10+
required_local_providers = ["ollama", "vllm_local"]
11+
self.local_providers = list(required_local_providers)
12+
self._local_providers_casefold = {p.casefold() for p in self.local_providers}
13+
14+
# Regex for SSN variants and Confidentiality markers
15+
self.sensitive_patterns = [
16+
re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), # SSN: dash-separated
17+
re.compile(r"\b\d{3}\s\d{2}\s\d{4}\b"), # SSN: space-separated
18+
re.compile(r"\b\d{9}\b"), # SSN: contiguous (use with care — broad)
19+
re.compile(r"CONFIDENTIAL", re.IGNORECASE),
20+
]
21+
22+
def verify_routing(self, prompt: str, target_provider: str) -> dict:
23+
if not prompt:
24+
raise ValueError("prompt must be a non-empty string.")
25+
if not target_provider:
26+
raise ValueError("target_provider must be a non-empty string.")
27+
28+
# ISSUE: Does this prompt contain sensitive data destined for an external provider?
29+
is_sensitive = any(p.search(prompt) for p in self.sensitive_patterns)
30+
31+
# RULE: Sensitive data must never be routed to non-local infrastructure.
32+
# [Source: MSLR, data residency compliance]
33+
34+
# APPLICATION: Evaluate whether target_provider is an approved local provider.
35+
if is_sensitive and target_provider.casefold() not in self._local_providers_casefold:
36+
# CONCLUSION: Violation — block routing.
37+
return {
38+
"verified": False,
39+
"risk": "DATA_SOVEREIGNTY_VIOLATION",
40+
"message": f"Sensitive data detected. Routing to external provider '{target_provider}' is blocked. Must use local infrastructure."
41+
}
42+
43+
# CONCLUSION: No violation detected — permit routing.
44+
return {"verified": True, "risk": None, "message": "Routing permitted."}

tests/test_phase19_sovereignty.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
from qwed_sdk.guards.sovereignty_guard import SovereigntyGuard
2+
3+
def test_sovereignty_guard_allows_safe_data_to_external():
4+
guard = SovereigntyGuard()
5+
result = guard.verify_routing(prompt="What is the capital of France?", target_provider="openai")
6+
7+
assert result["verified"] is True
8+
assert result["risk"] is None
9+
assert "message" in result
10+
11+
def test_sovereignty_guard_blocks_sensitive_data_to_external():
12+
guard = SovereigntyGuard()
13+
prompt = "Here is the CONFIDENTIAL contract for our new client."
14+
result = guard.verify_routing(prompt=prompt, target_provider="openai")
15+
16+
assert result["verified"] is False
17+
assert result["risk"] == "DATA_SOVEREIGNTY_VIOLATION"
18+
assert "message" in result
19+
20+
def test_sovereignty_guard_blocks_ssn_to_external():
21+
guard = SovereigntyGuard()
22+
prompt = "My SSN is 123-45-6789. Please process my application."
23+
result = guard.verify_routing(prompt=prompt, target_provider="anthropic")
24+
25+
assert result["verified"] is False
26+
assert result["risk"] == "DATA_SOVEREIGNTY_VIOLATION"
27+
assert "message" in result
28+
29+
def test_sovereignty_guard_allows_sensitive_data_to_local():
30+
guard = SovereigntyGuard()
31+
prompt = "Here is the CONFIDENTIAL contract. SSN: 123-45-6789."
32+
# Ollama is in the default local_providers list
33+
result = guard.verify_routing(prompt=prompt, target_provider="ollama")
34+
35+
assert result["verified"] is True
36+
assert result["risk"] is None
37+
assert "message" in result
38+
39+
def test_sovereignty_guard_blocks_space_separated_ssn_to_external():
40+
guard = SovereigntyGuard()
41+
prompt = "My SSN is 123 45 6789. Please process my application."
42+
result = guard.verify_routing(prompt=prompt, target_provider="anthropic")
43+
44+
assert result["verified"] is False
45+
assert result["risk"] == "DATA_SOVEREIGNTY_VIOLATION"
46+
assert "message" in result
47+
48+
def test_sovereignty_guard_blocks_contiguous_ssn_to_external():
49+
guard = SovereigntyGuard()
50+
prompt = "SSN: 123456789"
51+
result = guard.verify_routing(prompt=prompt, target_provider="openai")
52+
53+
assert result["verified"] is False
54+
assert result["risk"] == "DATA_SOVEREIGNTY_VIOLATION"
55+
assert "message" in result
56+
57+
def test_sovereignty_guard_allows_vllm_local():
58+
guard = SovereigntyGuard()
59+
prompt = "Here is the CONFIDENTIAL contract. SSN: 123-45-6789."
60+
result = guard.verify_routing(prompt=prompt, target_provider="vllm_local")
61+
62+
assert result["verified"] is True
63+
assert result["risk"] is None
64+
assert "message" in result
65+
66+
def test_sovereignty_guard_case_insensitive_provider():
67+
guard = SovereigntyGuard()
68+
prompt = "Here is the CONFIDENTIAL contract. SSN: 123-45-6789."
69+
result = guard.verify_routing(prompt=prompt, target_provider="OLLAMA")
70+
71+
assert result["verified"] is True
72+
assert result["risk"] is None
73+
assert "message" in result
74+
75+
def test_sovereignty_guard_custom_local_provider():
76+
guard = SovereigntyGuard(required_local_providers=["my_local_llm"])
77+
prompt = "Here is the CONFIDENTIAL contract. SSN: 123-45-6789."
78+
79+
# Allowed local provider
80+
result_local = guard.verify_routing(prompt=prompt, target_provider="my_local_llm")
81+
assert result_local["verified"] is True
82+
assert "message" in result_local
83+
84+
# Default 'ollama' is now external since required_local_providers overrode it
85+
result_external = guard.verify_routing(prompt=prompt, target_provider="ollama")
86+
assert result_external["verified"] is False
87+
assert result_external["risk"] == "DATA_SOVEREIGNTY_VIOLATION"
88+
assert "message" in result_external
89+
90+
import pytest
91+
92+
def test_sovereignty_guard_raises_on_empty_prompt():
93+
guard = SovereigntyGuard()
94+
with pytest.raises(ValueError, match="prompt must be a non-empty string"):
95+
guard.verify_routing(prompt="", target_provider="ollama")
96+
97+
with pytest.raises(ValueError, match="prompt must be a non-empty string"):
98+
guard.verify_routing(prompt=None, target_provider="ollama")
99+
100+
def test_sovereignty_guard_raises_on_empty_provider():
101+
guard = SovereigntyGuard()
102+
with pytest.raises(ValueError, match="target_provider must be a non-empty string"):
103+
guard.verify_routing(prompt="Valid prompt", target_provider="")
104+
105+
with pytest.raises(ValueError, match="target_provider must be a non-empty string"):
106+
guard.verify_routing(prompt="Valid prompt", target_provider=None)

0 commit comments

Comments
 (0)