-
-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathevents.go
More file actions
146 lines (124 loc) · 3.69 KB
/
Copy pathevents.go
File metadata and controls
146 lines (124 loc) · 3.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package main
import (
"context"
"fmt"
"time"
"github.com/jackc/pgx/v5/pgxpool"
libpack_logger "github.com/lukaszraczylo/graphql-monitoring-proxy/logging"
)
const (
initialDelay = 60 * time.Second
cleanupInterval = 1 * time.Hour
)
// Use parameterized queries to prevent SQL injection
// Cast $1 to interval type to allow proper parameterized interval values
var delQueries = [...]string{
"DELETE FROM hdb_catalog.event_invocation_logs WHERE created_at < NOW() - $1::INTERVAL",
"DELETE FROM hdb_catalog.event_log WHERE created_at < NOW() - $1::INTERVAL",
"DELETE FROM hdb_catalog.hdb_action_log WHERE created_at < NOW() - $1::INTERVAL",
"DELETE FROM hdb_catalog.hdb_cron_event_invocation_logs WHERE created_at < NOW() - $1::INTERVAL",
"DELETE FROM hdb_catalog.hdb_scheduled_event_invocation_logs WHERE created_at < NOW() - $1::INTERVAL",
}
func enableHasuraEventCleaner(ctx context.Context) error {
cfgMutex.RLock()
if !cfg.HasuraEventCleaner.Enable {
cfgMutex.RUnlock()
return nil
}
eventMetadataDb := cfg.HasuraEventCleaner.EventMetadataDb
if eventMetadataDb == "" {
logger := cfg.Logger
cfgMutex.RUnlock()
logger.Warning(&libpack_logger.LogMessage{
Message: "Event metadata db URL not specified, event cleaner not active",
})
return nil
}
clearOlderThan := cfg.HasuraEventCleaner.ClearOlderThan
logger := cfg.Logger
cfgMutex.RUnlock()
logger.Info(&libpack_logger.LogMessage{
Message: "Event cleaner enabled",
Pairs: map[string]any{"interval_in_days": clearOlderThan},
})
// Parse pool configuration
poolConfig, err := pgxpool.ParseConfig(eventMetadataDb)
if err != nil {
return err
}
// Set connection pool limits
poolConfig.MaxConns = 10
poolConfig.MinConns = 2
poolConfig.MaxConnLifetime = time.Hour
poolConfig.MaxConnIdleTime = 30 * time.Minute
pool, err := pgxpool.NewWithConfig(ctx, poolConfig)
if err != nil {
logger.Error(&libpack_logger.LogMessage{
Message: "Failed to create connection pool",
Pairs: map[string]any{"error": err.Error()},
})
return err
}
go func() {
defer pool.Close()
// Wait for initial delay or context cancellation
select {
case <-ctx.Done():
return
case <-time.After(initialDelay):
}
logger.Info(&libpack_logger.LogMessage{
Message: "Initial cleanup of old events",
})
cleanEvents(ctx, pool, clearOlderThan, logger)
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
logger.Info(&libpack_logger.LogMessage{
Message: "Stopping event cleaner",
})
return
case <-ticker.C:
logger.Info(&libpack_logger.LogMessage{
Message: "Cleaning up old events",
})
cleanEvents(ctx, pool, clearOlderThan, logger)
}
}
}()
return nil
}
func cleanEvents(ctx context.Context, pool *pgxpool.Pool, clearOlderThan int, logger *libpack_logger.Logger) {
var errors []error
var failedQueries []string
// Format interval parameter for PostgreSQL
interval := fmt.Sprintf("%d days", clearOlderThan)
for _, query := range delQueries {
// Use parameterized query with bound parameter to prevent SQL injection
_, err := pool.Exec(ctx, query, interval)
if err != nil {
errors = append(errors, err)
failedQueries = append(failedQueries, query)
} else {
logger.Debug(&libpack_logger.LogMessage{
Message: "Successfully executed query",
Pairs: map[string]any{"query": query, "interval": interval},
})
}
}
if len(errors) > 0 {
var errMsgs []string
for _, err := range errors {
errMsgs = append(errMsgs, err.Error())
}
logger.Error(&libpack_logger.LogMessage{
Message: "Failed to execute some queries",
Pairs: map[string]any{
"failed_queries": failedQueries,
"errors": errMsgs,
},
})
}
}