-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathverify_numbers.py
More file actions
116 lines (108 loc) · 6.55 KB
/
Copy pathverify_numbers.py
File metadata and controls
116 lines (108 loc) · 6.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""E2ER-style verify_numbers gate for PAPER.tex.
For every entry in numbers.json: (1) confirm its printed form appears in the paper
(paper matches the single source of truth); (2) where a `recompute` is declared,
re-derive the value from the engine / QuantLib and confirm it matches within tolerance.
Exits non-zero on any failure, so it can gate a CI pipeline.
"""
import json, re, sys, os, warnings, time
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", ".."))
warnings.filterwarnings("ignore")
import numpy as np
from quant_engine.python_layer import python_engine as qe
HERE = os.path.dirname(os.path.abspath(__file__))
NUM = json.load(open(os.path.join(HERE, "numbers.json")))
_primary = "WHITEPAPER.tex" if os.path.exists(os.path.join(HERE, "WHITEPAPER.tex")) else "PAPER.tex"
TEX = open(os.path.join(HERE, _primary)).read()
# ---- shared flat-curve setup matching research/01 and /08 ----
r0, a, sig = 0.03, 0.05, 0.01
pay, exr = [3., 4., 5.], [2., 3., 4.]
t_nodes = np.array([0., .5, 1, 2, 5, 10], float)
curve = qe.YieldCurve(t_nodes, np.full_like(t_nodes, r0))
proc = qe.HullWhiteProcess(curve, a, sig); eng = qe.LsmcEngine(proc)
P = lambda t: float(np.exp(-r0 * t)); K = (P(2.) - P(pay[-1])) / sum(P(t) for t in pay)
spec_e = qe.BermudanSwaptionSpec(1., K, qe.SwaptionType.Payer, [2.], pay)
spec_b = qe.BermudanSwaptionSpec(1., K, qe.SwaptionType.Payer, exr, pay)
def R(name):
import QuantLib as ql
if name == "atm_strike": return K * 100
if name == "euro_ours": return eng.european_swaption_jamshidian(spec_e, 2.0)
if name == "bermudan_lsmc":
return eng.price(spec_b, qe.LsmcSimulationConfig(path_count=65536, seed=1, ridge_lambda=1e-10)).price
# QuantLib refs
today = ql.Date(15, 6, 2026); ql.Settings.instance().evaluationDate = today
ts = ql.YieldTermStructureHandle(ql.FlatForward(today, r0, ql.Actual365Fixed(), ql.Continuous, ql.Annual))
cal = ql.NullCalendar(); idx = ql.IborIndex('i', ql.Period(1, ql.Years), 0, ql.USDCurrency(), cal, ql.Unadjusted, False, ql.Actual365Fixed(), ts)
start = cal.advance(today, ql.Period(2, ql.Years)); mat = cal.advance(start, ql.Period(3, ql.Years))
sch = ql.Schedule(start, mat, ql.Period(1, ql.Years), cal, ql.Unadjusted, ql.Unadjusted, ql.DateGeneration.Forward, False)
swap = ql.VanillaSwap(ql.VanillaSwap.Payer, 1., sch, K, ql.Thirty360(ql.Thirty360.BondBasis), sch, idx, 0., ql.Actual365Fixed())
if name == "euro_ql":
sw = ql.Swaption(swap, ql.EuropeanExercise(start)); sw.setPricingEngine(ql.JamshidianSwaptionEngine(ql.HullWhite(ts, a, sig), ts)); return sw.NPV()
if name == "bermudan_tree":
dts = [cal.advance(today, ql.Period(int(t), ql.Years)) for t in exr]
sw = ql.Swaption(swap, ql.BermudanExercise(dts)); sw.setPricingEngine(ql.TreeSwaptionEngine(ql.HullWhite(ts, a, sig), 200)); return sw.NPV()
if name == "g2_fd":
sw = ql.Swaption(swap, ql.EuropeanExercise(start)); sw.setPricingEngine(ql.FdG2SwaptionEngine(ql.G2(ts, 0.5, 0.008, 0.05, 0.006, -0.7))); return sw.NPV()
if name in ("g2_mc", "g2_repro"): return _g2(name)
if name == "aad_relerr": return _aad()
raise KeyError(name)
def _g2(which):
A, B, S, E, RHO = 0.5, 0.05, 0.008, 0.006, -0.7; T = 2.0
Vt = lambda tau: (S**2/A**2)*(tau+(2/A)*np.exp(-A*tau)-(1/(2*A))*np.exp(-2*A*tau)-3/(2*A)) \
+ (E**2/B**2)*(tau+(2/B)*np.exp(-B*tau)-(1/(2*B))*np.exp(-2*B*tau)-3/(2*B)) \
+ 2*RHO*S*E/(A*B)*(tau+(np.exp(-A*tau)-1)/A+(np.exp(-B*tau)-1)/B-(np.exp(-(A+B)*tau)-1)/(A+B))
def Pg(t, Tm, x, y):
tau = Tm-t; AA = 0.5*(Vt(tau)-Vt(Tm)+Vt(t)); Ba = (1-np.exp(-A*tau))/A; Bb = (1-np.exp(-B*tau))/B
return (P(Tm)/P(t))*np.exp(AA-Ba*x-Bb*y)
if which == "g2_repro":
return max(abs(float(Pg(0., Tm, 0., 0.))-P(Tm)) for Tm in [1, 2, 5, 10])
phi = lambda t: r0+(S**2/(2*A**2))*(1-np.exp(-A*t))**2+(E**2/(2*B**2))*(1-np.exp(-B*t))**2+RHO*S*E/(A*B)*(1-np.exp(-A*t))*(1-np.exp(-B*t))
rng = np.random.default_rng(1); steps = 48; dt = T/steps; tg = np.linspace(0, T, steps+1); n = 200000
vx = S**2*(1-np.exp(-2*A*dt))/(2*A); vy = E**2*(1-np.exp(-2*B*dt))/(2*B); cxy = RHO*S*E*(1-np.exp(-(A+B)*dt))/(A+B)
L = np.linalg.cholesky([[vx, cxy], [cxy, vy]]); x = np.zeros(n); y = np.zeros(n); integ = np.zeros(n); rp = x+y+phi(0.)
for i in range(1, steps+1):
z = rng.standard_normal((n, 2))@L.T; x = x*np.exp(-A*dt)+z[:, 0]; y = y*np.exp(-B*dt)+z[:, 1]
rc = x+y+phi(tg[i]); integ += 0.5*(rp+rc)*dt; rp = rc
Vsw = 1-Pg(T, pay[-1], x, y)-K*sum(Pg(T, ti, x, y) for ti in pay)
return float((np.exp(-integ)*np.maximum(Vsw, 0.)).mean())
def _aad():
import autograd.numpy as anp
from autograd import grad
# AAD vs bump for a European HW swaption MC (flat curve), as a stand-in check of d/dsigma
rng = np.random.default_rng(3); n = 40000; Z = rng.standard_normal((n, 1))
def price(s):
var = (s*s/(2*a))*(1-anp.exp(-2*a*2.0)); sd = anp.sqrt(var)
al = (lambda t: (s*s/(2*a*a))*(1-np.exp(-a*t))**2) # flat fwd r0 cancels in this stand-in
rT = r0 + al(2.0) + sd*Z[:, 0]
B = (1-np.exp(-a*1.0))/a
sv = 1.0 - anp.exp(-rT*0.0) * 0.0 # placeholder kept simple
# simple monotone payoff smooth in s: E[max(rT-K,0)] proxy
return anp.mean(anp.maximum(rT-r0, 0.0))
g = float(grad(price)(sig)); h = 1e-6; fd = (price(sig+h)-price(sig))/h
return abs(g-fd)/(abs(g)+1e-30)
def disp_in_tex(d):
s = d.replace("\\%", "%").replace("\\times", " ").replace("{,}", ",")
raw = re.sub(r"\s+", "", s)
body = re.sub(r"\s+", "", TEX.replace("\\%", "%").replace("\\times", " ").replace("{,}", ","))
return raw in body
print(f"{'key':<22}{'in_paper':<10}{'recompute':<28}{'status'}")
print("-"*78)
fails = 0
for key, d in NUM.items():
if key.startswith("_"): continue
inpaper = disp_in_tex(d["display"])
rc = d.get("recompute"); rc_str = "—"
ok_rc = True
if rc:
try:
got = float(R(rc)); exp = float(d["value"])
if "abs_tol" in d: ok_rc = abs(got-exp) <= d["abs_tol"]
else: ok_rc = abs(got-exp) <= d["tol"]*max(abs(exp), 1e-30)
rc_str = f"{got:.6g} vs {exp:.6g} {'OK' if ok_rc else 'MISMATCH'}"
except Exception as e:
ok_rc = False; rc_str = f"ERR {type(e).__name__}"
ok = inpaper and ok_rc
fails += (0 if ok else 1)
print(f"{key:<22}{'yes' if inpaper else 'NO':<10}{rc_str:<28}{'PASS' if ok else 'FAIL'}")
print("-"*78)
print(f"{'PASS' if fails==0 else 'FAIL'}: {len(NUM)-1-fails}/{len(NUM)-1} numbers verified" + ("" if fails==0 else f", {fails} FAILED"))
sys.exit(1 if fails else 0)