-
-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathretry_budget.go
More file actions
234 lines (201 loc) · 6.15 KB
/
Copy pathretry_budget.go
File metadata and controls
234 lines (201 loc) · 6.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
package main
import (
"context"
"sync"
"sync/atomic"
"time"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
)
// RetryBudget implements a token bucket algorithm to limit the rate of retries
// This prevents retry storms and cascading failures
type RetryBudget struct {
tokensPerSecond float64
maxTokens int64
currentTokens atomic.Int64
lastRefill atomic.Int64 // Unix timestamp in nanoseconds
enabled bool
logger *libpack_logger.Logger
ctx context.Context
cancel context.CancelFunc
// Statistics
totalAttempts atomic.Int64
allowedRetries atomic.Int64
deniedRetries atomic.Int64
}
// RetryBudgetConfig holds configuration for retry budget
type RetryBudgetConfig struct {
TokensPerSecond float64 // Rate at which tokens are refilled
MaxTokens int // Maximum number of tokens (burst capacity)
Enabled bool // Whether retry budget is enabled
}
// NewRetryBudget creates a new retry budget (deprecated, use NewRetryBudgetWithContext)
func NewRetryBudget(config RetryBudgetConfig, logger *libpack_logger.Logger) *RetryBudget {
return NewRetryBudgetWithContext(context.Background(), config, logger)
}
// NewRetryBudgetWithContext creates a new retry budget with context for graceful shutdown
func NewRetryBudgetWithContext(ctx context.Context, config RetryBudgetConfig, logger *libpack_logger.Logger) *RetryBudget {
budgetCtx, cancel := context.WithCancel(ctx)
rb := &RetryBudget{
tokensPerSecond: config.TokensPerSecond,
maxTokens: int64(config.MaxTokens),
enabled: config.Enabled,
logger: logger,
ctx: budgetCtx,
cancel: cancel,
}
// Initialize with full bucket
rb.currentTokens.Store(rb.maxTokens)
rb.lastRefill.Store(time.Now().UnixNano())
// Start refill goroutine
if rb.enabled {
go rb.refillLoop()
}
return rb
}
// AllowRetry checks if a retry is allowed based on the current budget
func (rb *RetryBudget) AllowRetry() bool {
rb.totalAttempts.Add(1)
if !rb.enabled {
rb.allowedRetries.Add(1)
return true
}
// Try to consume a token
for {
current := rb.currentTokens.Load()
if current <= 0 {
rb.deniedRetries.Add(1)
if rb.logger != nil {
rb.logger.Debug(&libpack_logger.LogMessage{
Message: "Retry denied: budget exhausted",
Pairs: map[string]any{
"current_tokens": current,
"denied_count": rb.deniedRetries.Load(),
},
})
}
return false
}
if rb.currentTokens.CompareAndSwap(current, current-1) {
rb.allowedRetries.Add(1)
return true
}
}
}
// refillLoop periodically refills tokens
func (rb *RetryBudget) refillLoop() {
ticker := time.NewTicker(100 * time.Millisecond) // Refill every 100ms
defer ticker.Stop()
for {
select {
case <-rb.ctx.Done():
return
case <-ticker.C:
rb.refill()
}
}
}
// Shutdown stops the retry budget goroutine
func (rb *RetryBudget) Shutdown() {
if rb.cancel != nil {
rb.cancel()
}
}
// refill adds tokens to the bucket based on elapsed time
func (rb *RetryBudget) refill() {
now := time.Now().UnixNano()
last := rb.lastRefill.Load()
// Calculate elapsed time in seconds
elapsed := float64(now-last) / float64(time.Second)
// Calculate tokens to add
tokensToAdd := int64(elapsed * rb.tokensPerSecond)
if tokensToAdd > 0 {
// Update last refill time
if rb.lastRefill.CompareAndSwap(last, now) {
// Add tokens, capped at maxTokens
for {
current := rb.currentTokens.Load()
newValue := current + tokensToAdd
if newValue > rb.maxTokens {
newValue = rb.maxTokens
}
if rb.currentTokens.CompareAndSwap(current, newValue) {
break
}
}
}
}
}
// GetStats returns current statistics
func (rb *RetryBudget) GetStats() map[string]any {
totalAttempts := rb.totalAttempts.Load()
allowedRetries := rb.allowedRetries.Load()
deniedRetries := rb.deniedRetries.Load()
var denialRate float64
if totalAttempts > 0 {
denialRate = float64(deniedRetries) / float64(totalAttempts) * 100
}
return map[string]any{
"enabled": rb.enabled,
"current_tokens": rb.currentTokens.Load(),
"max_tokens": rb.maxTokens,
"tokens_per_sec": rb.tokensPerSecond,
"total_attempts": totalAttempts,
"allowed_retries": allowedRetries,
"denied_retries": deniedRetries,
"denial_rate_pct": denialRate,
}
}
// Reset resets the retry budget statistics
func (rb *RetryBudget) Reset() {
rb.totalAttempts.Store(0)
rb.allowedRetries.Store(0)
rb.deniedRetries.Store(0)
rb.currentTokens.Store(rb.maxTokens)
}
// UpdateConfig updates the retry budget configuration
func (rb *RetryBudget) UpdateConfig(config RetryBudgetConfig) {
rb.tokensPerSecond = config.TokensPerSecond
rb.maxTokens = int64(config.MaxTokens)
rb.enabled = config.Enabled
// Reset to full capacity
rb.currentTokens.Store(rb.maxTokens)
if rb.logger != nil {
rb.logger.Info(&libpack_logger.LogMessage{
Message: "Retry budget configuration updated",
Pairs: map[string]any{
"tokens_per_sec": config.TokensPerSecond,
"max_tokens": config.MaxTokens,
"enabled": config.Enabled,
},
})
}
}
// Global retry budget instance
var (
retryBudget *RetryBudget
retryBudgetOnce sync.Once
)
// InitializeRetryBudget initializes the global retry budget (deprecated, use InitializeRetryBudgetWithContext)
func InitializeRetryBudget(config RetryBudgetConfig, logger *libpack_logger.Logger) *RetryBudget {
return InitializeRetryBudgetWithContext(context.Background(), config, logger)
}
// InitializeRetryBudgetWithContext initializes the global retry budget with context for graceful shutdown
func InitializeRetryBudgetWithContext(ctx context.Context, config RetryBudgetConfig, logger *libpack_logger.Logger) *RetryBudget {
retryBudgetOnce.Do(func() {
retryBudget = NewRetryBudgetWithContext(ctx, config, logger)
if logger != nil && config.Enabled {
logger.Info(&libpack_logger.LogMessage{
Message: "Retry budget initialized",
Pairs: map[string]any{
"tokens_per_sec": config.TokensPerSecond,
"max_tokens": config.MaxTokens,
},
})
}
})
return retryBudget
}
// GetRetryBudget returns the global retry budget instance
func GetRetryBudget() *RetryBudget {
return retryBudget
}