-
-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathrequest_coalescing.go
More file actions
243 lines (200 loc) · 6.06 KB
/
Copy pathrequest_coalescing.go
File metadata and controls
243 lines (200 loc) · 6.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
package main
import (
"sync"
"sync/atomic"
"time"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
libpack_monitoring "github.com/lukaszraczylo/graphql-monitoring-proxy/monitoring"
)
// CoalescedResponse represents the shared response
type CoalescedResponse struct {
Body []byte
StatusCode int
Headers map[string]string
Err error
CachedAt time.Time
}
// RequestCoalescer implements the single-flight pattern to deduplicate identical concurrent requests
type RequestCoalescer struct {
inflight sync.Map // key: hash, value: *inflightRequest
logger *libpack_logger.Logger
monitoring *libpack_monitoring.MetricsSetup
enabled bool
// Statistics
totalRequests atomic.Int64
coalescedRequests atomic.Int64
inflightCount atomic.Int64
}
// inflightRequest represents a request currently in flight
type inflightRequest struct {
wg sync.WaitGroup
response *CoalescedResponse
waiters atomic.Int32
createdAt time.Time
mu sync.RWMutex
}
// NewRequestCoalescer creates a new request coalescer
func NewRequestCoalescer(enabled bool, logger *libpack_logger.Logger, monitoring *libpack_monitoring.MetricsSetup) *RequestCoalescer {
rc := &RequestCoalescer{
logger: logger,
monitoring: monitoring,
enabled: enabled,
}
if logger != nil && enabled {
logger.Info(&libpack_logger.LogMessage{
Message: "Request coalescing enabled",
})
}
return rc
}
// Do executes a function, deduplicating concurrent calls with the same key
func (rc *RequestCoalescer) Do(key string, fn func() (*CoalescedResponse, error)) (*CoalescedResponse, error) {
rc.totalRequests.Add(1)
if !rc.enabled {
return fn()
}
// Try to load existing inflight request
if existing, loaded := rc.inflight.Load(key); loaded {
inflight := existing.(*inflightRequest)
// Increment waiter count
waiters := inflight.waiters.Add(1)
rc.coalescedRequests.Add(1)
if rc.logger != nil {
rc.logger.Debug(&libpack_logger.LogMessage{
Message: "Request coalesced with in-flight request",
Pairs: map[string]any{
"key": key[:min(len(key), 32)] + "...",
"waiters": waiters,
},
})
}
// Wait for the inflight request to complete
inflight.wg.Wait()
// Return the shared response
inflight.mu.RLock()
defer inflight.mu.RUnlock()
if rc.monitoring != nil {
rc.monitoring.Increment("graphql_proxy_coalesced_requests_total", nil)
}
return inflight.response, nil
}
// Create a new inflight request
inflight := &inflightRequest{
createdAt: time.Now(),
}
inflight.wg.Add(1)
inflight.waiters.Store(1) // This request is the first waiter
// Try to store it (another goroutine might have just done the same)
actual, loaded := rc.inflight.LoadOrStore(key, inflight)
if loaded {
// Someone else beat us to it, wait for their result
existingInflight := actual.(*inflightRequest)
waiters := existingInflight.waiters.Add(1)
rc.coalescedRequests.Add(1)
if rc.logger != nil {
rc.logger.Debug(&libpack_logger.LogMessage{
Message: "Request coalesced (race condition)",
Pairs: map[string]any{
"key": key[:min(len(key), 32)] + "...",
"waiters": waiters,
},
})
}
existingInflight.wg.Wait()
existingInflight.mu.RLock()
defer existingInflight.mu.RUnlock()
if rc.monitoring != nil {
rc.monitoring.Increment("graphql_proxy_coalesced_requests_total", nil)
}
return existingInflight.response, nil
}
// We're the primary request, execute the function
rc.inflightCount.Add(1)
defer rc.inflightCount.Add(-1)
// Execute the request
response, err := fn()
// Store the result
inflight.mu.Lock()
if err != nil {
inflight.response = &CoalescedResponse{
Err: err,
}
} else {
inflight.response = response
}
inflight.mu.Unlock()
// Clean up and notify waiters
rc.inflight.Delete(key)
inflight.wg.Done()
// Log statistics
waiters := inflight.waiters.Load()
duration := time.Since(inflight.createdAt)
if rc.logger != nil && waiters > 1 {
rc.logger.Info(&libpack_logger.LogMessage{
Message: "Request completed, served coalesced waiters",
Pairs: map[string]any{
"key": key[:min(len(key), 32)] + "...",
"waiters": waiters,
"duration_ms": duration.Milliseconds(),
"saved_calls": waiters - 1,
},
})
}
if rc.monitoring != nil {
rc.monitoring.Increment("graphql_proxy_primary_requests_total", nil)
if waiters > 1 {
rc.monitoring.Update("graphql_proxy_coalescing_wait_duration", nil, duration.Seconds())
}
}
return inflight.response, nil
}
// GetStats returns coalescing statistics
func (rc *RequestCoalescer) GetStats() map[string]any {
totalRequests := rc.totalRequests.Load()
coalescedRequests := rc.coalescedRequests.Load()
var coalescingRate float64
if totalRequests > 0 {
coalescingRate = float64(coalescedRequests) / float64(totalRequests) * 100
}
primaryRequests := totalRequests - coalescedRequests
var savings float64
if primaryRequests > 0 {
savings = float64(coalescedRequests) / float64(primaryRequests) * 100
}
return map[string]any{
"enabled": rc.enabled,
"total_requests": totalRequests,
"primary_requests": primaryRequests,
"coalesced_requests": coalescedRequests,
"inflight_count": rc.inflightCount.Load(),
"coalescing_rate_pct": coalescingRate,
"backend_savings_pct": savings,
}
}
// Reset resets coalescing statistics
func (rc *RequestCoalescer) Reset() {
rc.totalRequests.Store(0)
rc.coalescedRequests.Store(0)
}
// Global request coalescer
var (
requestCoalescer *RequestCoalescer
requestCoalescerOnce sync.Once
)
// InitializeRequestCoalescer initializes the global request coalescer
func InitializeRequestCoalescer(enabled bool, logger *libpack_logger.Logger, monitoring *libpack_monitoring.MetricsSetup) *RequestCoalescer {
requestCoalescerOnce.Do(func() {
requestCoalescer = NewRequestCoalescer(enabled, logger, monitoring)
})
return requestCoalescer
}
// GetRequestCoalescer returns the global request coalescer
func GetRequestCoalescer() *RequestCoalescer {
return requestCoalescer
}
func min(a, b int) int {
if a < b {
return a
}
return b
}