Skip to content

Commit 0725213

Browse files
committed
package webhook
1 parent 243f3cd commit 0725213

1 file changed

Lines changed: 270 additions & 0 deletions

File tree

webhook/dispatcher.go

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
// Package webhook extends the base events package with a typed dispatcher
2+
// that routes OBIE event notifications to strongly-typed handlers, supports
3+
// multiple subscribers per event type, and provides dead-letter queue (DLQ)
4+
// semantics for failed handlers.
5+
package webhook
6+
7+
import (
8+
"context"
9+
"encoding/json"
10+
"fmt"
11+
"sync"
12+
"time"
13+
)
14+
15+
// ────────────────────────────────────────────────────────────────────────────
16+
// Event types and envelope
17+
// ────────────────────────────────────────────────────────────────────────────
18+
19+
// EventType is the URN identifying an OBIE event type.
20+
type EventType string
21+
22+
const (
23+
EventTypeResourceUpdate EventType = "urn:uk:org:openbanking:events:resource-update"
24+
EventTypeConsentAuthRevoked EventType = "urn:uk:org:openbanking:events:consent-authorization-revoked"
25+
EventTypeAccountAccessConsentLinkedAccountUpdate EventType = "urn:uk:org:openbanking:events:account-access-consent-linked-account-update"
26+
)
27+
28+
// Envelope is the top-level OBIE event notification structure.
29+
type Envelope struct {
30+
Iss string `json:"iss"`
31+
Iat int64 `json:"iat"`
32+
Jti string `json:"jti"`
33+
Aud string `json:"aud"`
34+
Sub string `json:"sub"`
35+
Txn string `json:"txn"`
36+
Toe int64 `json:"toe"`
37+
Events map[EventType]json.RawMessage `json:"events"`
38+
}
39+
40+
// ReceivedAt returns the time-of-event as a time.Time.
41+
func (e *Envelope) ReceivedAt() time.Time {
42+
return time.Unix(e.Toe, 0)
43+
}
44+
45+
// ────────────────────────────────────────────────────────────────────────────
46+
// Typed event payloads
47+
// ────────────────────────────────────────────────────────────────────────────
48+
49+
// ResourceUpdateEvent is the payload for EventTypeResourceUpdate.
50+
type ResourceUpdateEvent struct {
51+
Subject ResourceUpdateSubject `json:"subject"`
52+
}
53+
54+
// ResourceUpdateSubject describes the updated resource.
55+
type ResourceUpdateSubject struct {
56+
SubjectType string `json:"subject_type"`
57+
HTTPStatusCode int `json:"http_status_code"`
58+
Links map[string]string `json:"links"`
59+
Version string `json:"version"`
60+
}
61+
62+
// ConsentAuthRevokedEvent is the payload for EventTypeConsentAuthRevoked.
63+
type ConsentAuthRevokedEvent struct {
64+
Reason string `json:"reason,omitempty"`
65+
}
66+
67+
// ────────────────────────────────────────────────────────────────────────────
68+
// Handler types
69+
// ────────────────────────────────────────────────────────────────────────────
70+
71+
// Handler is a function that processes a decoded event payload.
72+
// The raw bytes are the JSON value of the event within the "events" map.
73+
type Handler func(ctx context.Context, envelope *Envelope, raw json.RawMessage) error
74+
75+
// ────────────────────────────────────────────────────────────────────────────
76+
// Dead-letter queue
77+
// ────────────────────────────────────────────────────────────────────────────
78+
79+
// DeadLetterItem stores a failed event delivery attempt.
80+
type DeadLetterItem struct {
81+
EventType EventType
82+
Envelope *Envelope
83+
Raw json.RawMessage
84+
Err error
85+
FailedAt time.Time
86+
Attempts int
87+
}
88+
89+
// DLQ is a simple in-memory dead-letter queue.
90+
type DLQ struct {
91+
mu sync.Mutex
92+
items []DeadLetterItem
93+
cap int
94+
}
95+
96+
// NewDLQ creates a DLQ with the given capacity. When full, oldest items are
97+
// dropped to make room.
98+
func NewDLQ(capacity int) *DLQ {
99+
if capacity <= 0 {
100+
capacity = 1000
101+
}
102+
return &DLQ{cap: capacity}
103+
}
104+
105+
// Push adds an item to the DLQ.
106+
func (q *DLQ) Push(item DeadLetterItem) {
107+
q.mu.Lock()
108+
defer q.mu.Unlock()
109+
if len(q.items) >= q.cap {
110+
q.items = q.items[1:] // drop oldest
111+
}
112+
q.items = append(q.items, item)
113+
}
114+
115+
// Drain returns and removes all items from the DLQ.
116+
func (q *DLQ) Drain() []DeadLetterItem {
117+
q.mu.Lock()
118+
defer q.mu.Unlock()
119+
out := make([]DeadLetterItem, len(q.items))
120+
copy(out, q.items)
121+
q.items = nil
122+
return out
123+
}
124+
125+
// Len returns the number of items in the DLQ.
126+
func (q *DLQ) Len() int {
127+
q.mu.Lock()
128+
defer q.mu.Unlock()
129+
return len(q.items)
130+
}
131+
132+
// ────────────────────────────────────────────────────────────────────────────
133+
// Dispatcher
134+
// ────────────────────────────────────────────────────────────────────────────
135+
136+
// Dispatcher routes incoming OBIE event notifications to registered handlers.
137+
// It supports multiple handlers per event type and delivers to all of them.
138+
type Dispatcher struct {
139+
mu sync.RWMutex
140+
handlers map[EventType][]Handler
141+
catch []Handler // wildcard handlers receive every event
142+
dlq *DLQ
143+
log Logger
144+
}
145+
146+
// Logger is the logging interface used by Dispatcher.
147+
type Logger interface {
148+
Errorf(format string, args ...any)
149+
Infof(format string, args ...any)
150+
}
151+
152+
type nopLog struct{}
153+
func (nopLog) Errorf(_ string, _ ...any) {}
154+
func (nopLog) Infof(_ string, _ ...any) {}
155+
156+
// NewDispatcher creates a Dispatcher. Pass a non-nil DLQ to enable
157+
// dead-lettering of failed handler invocations.
158+
func NewDispatcher(dlq *DLQ, log Logger) *Dispatcher {
159+
if log == nil {
160+
log = nopLog{}
161+
}
162+
return &Dispatcher{
163+
handlers: make(map[EventType][]Handler),
164+
dlq: dlq,
165+
log: log,
166+
}
167+
}
168+
169+
// On registers handler for the given eventType.
170+
// Multiple handlers per event type are supported and all will be invoked.
171+
func (d *Dispatcher) On(eventType EventType, handler Handler) {
172+
d.mu.Lock()
173+
defer d.mu.Unlock()
174+
d.handlers[eventType] = append(d.handlers[eventType], handler)
175+
}
176+
177+
// OnAny registers a handler that is invoked for every event type.
178+
func (d *Dispatcher) OnAny(handler Handler) {
179+
d.mu.Lock()
180+
defer d.mu.Unlock()
181+
d.catch = append(d.catch, handler)
182+
}
183+
184+
// Dispatch decodes env and delivers each event to its registered handlers.
185+
// If a handler returns an error it is recorded in the DLQ (if set) and
186+
// dispatch continues to remaining handlers — no error is returned to the caller.
187+
func (d *Dispatcher) Dispatch(ctx context.Context, env *Envelope) {
188+
d.mu.RLock()
189+
defer d.mu.RUnlock()
190+
191+
for evType, rawPayload := range env.Events {
192+
handlers := append(d.handlers[evType], d.catch...)
193+
for _, h := range handlers {
194+
if err := h(ctx, env, rawPayload); err != nil {
195+
d.log.Errorf("webhook: handler error for %s (jti=%s): %v", evType, env.Jti, err)
196+
if d.dlq != nil {
197+
d.dlq.Push(DeadLetterItem{
198+
EventType: evType,
199+
Envelope: env,
200+
Raw: rawPayload,
201+
Err: err,
202+
FailedAt: time.Now(),
203+
Attempts: 1,
204+
})
205+
}
206+
} else {
207+
d.log.Infof("webhook: delivered %s (jti=%s)", evType, env.Jti)
208+
}
209+
}
210+
}
211+
}
212+
213+
// DispatchJSON is a convenience wrapper that parses raw JSON into an Envelope
214+
// and then calls Dispatch.
215+
func (d *Dispatcher) DispatchJSON(ctx context.Context, body []byte) error {
216+
var env Envelope
217+
if err := json.Unmarshal(body, &env); err != nil {
218+
return fmt.Errorf("webhook: decode envelope: %w", err)
219+
}
220+
d.Dispatch(ctx, &env)
221+
return nil
222+
}
223+
224+
// ────────────────────────────────────────────────────────────────────────────
225+
// Typed handler helpers
226+
// ────────────────────────────────────────────────────────────────────────────
227+
228+
// OnResourceUpdate registers a typed handler for resource-update events.
229+
func (d *Dispatcher) OnResourceUpdate(fn func(ctx context.Context, env *Envelope, ev ResourceUpdateEvent) error) {
230+
d.On(EventTypeResourceUpdate, func(ctx context.Context, env *Envelope, raw json.RawMessage) error {
231+
var ev ResourceUpdateEvent
232+
if err := json.Unmarshal(raw, &ev); err != nil {
233+
return fmt.Errorf("webhook: decode ResourceUpdateEvent: %w", err)
234+
}
235+
return fn(ctx, env, ev)
236+
})
237+
}
238+
239+
// OnConsentRevoked registers a typed handler for consent-authorization-revoked events.
240+
func (d *Dispatcher) OnConsentRevoked(fn func(ctx context.Context, env *Envelope, ev ConsentAuthRevokedEvent) error) {
241+
d.On(EventTypeConsentAuthRevoked, func(ctx context.Context, env *Envelope, raw json.RawMessage) error {
242+
var ev ConsentAuthRevokedEvent
243+
if err := json.Unmarshal(raw, &ev); err != nil {
244+
return fmt.Errorf("webhook: decode ConsentAuthRevokedEvent: %w", err)
245+
}
246+
return fn(ctx, env, ev)
247+
})
248+
}
249+
250+
// ReplayDLQ attempts to re-deliver every item currently in the DLQ.
251+
// Successfully re-delivered items are removed; persistent failures remain.
252+
func (d *Dispatcher) ReplayDLQ(ctx context.Context) {
253+
if d.dlq == nil {
254+
return
255+
}
256+
items := d.dlq.Drain()
257+
for _, item := range items {
258+
d.mu.RLock()
259+
handlers := append(d.handlers[item.EventType], d.catch...)
260+
d.mu.RUnlock()
261+
262+
for _, h := range handlers {
263+
if err := h(ctx, item.Envelope, item.Raw); err != nil {
264+
item.Attempts++
265+
item.Err = err
266+
d.dlq.Push(item) // back to DLQ
267+
}
268+
}
269+
}
270+
}

0 commit comments

Comments
 (0)