-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathjobs.go
More file actions
99 lines (87 loc) · 2.75 KB
/
Copy pathjobs.go
File metadata and controls
99 lines (87 loc) · 2.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package main
import (
"context"
"log"
"time"
"github.com/fiatjaf/eventstore/badger"
"github.com/nbd-wtf/go-nostr"
)
func cleanDatabase() {
// The in-memory store self-evicts; only the persistent badger backend needs
// an external prune + value-log GC cycle.
b, ok := store.(*badger.BadgerBackend)
if !ok {
return
}
// Run cleanup more frequently but with targeted queries
// This reduces the batch size and spreads the load
ticker := time.NewTicker(time.Duration(config.KeepNotesFor/2+1) * time.Minute)
defer ticker.Stop()
for range ticker.C {
pruneOldEvents()
// Reclaim disk space from Badger's value log. Deleting events only
// writes tombstones; without GC the .vlog files grow unbounded under
// this relay's write-then-delete (ephemeral event) workload.
runValueLogGC(b)
}
}
// runValueLogGC repeatedly rewrites Badger value-log files until there is
// nothing left worth reclaiming (RunValueLogGC returns a non-nil error, e.g.
// badger.ErrNoRewrite). Each successful call rewrites at most one file.
func runValueLogGC(b *badger.BadgerBackend) {
if b == nil || b.DB == nil {
return
}
for {
if err := b.RunValueLogGC(0.5); err != nil {
// ErrNoRewrite (nothing to collect) is the normal stop condition.
return
}
}
}
func pruneOldEvents() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Calculate cutoff timestamp - only query events older than retention period
cutoff := nostr.Timestamp(time.Now().Add(-time.Duration(config.KeepNotesFor) * time.Minute).Unix())
// Use Until filter to only fetch expired events instead of full table scan
filter := nostr.Filter{
Kinds: []int{24133, 24135},
Until: &cutoff,
}
// Delete events as they stream in. Badger uses MVCC, so deleting while the
// query's read iterator is still open is safe, and avoids buffering the
// entire expired set in memory.
//
// The eventstore producer goroutine sends without selecting on ctx, so we
// must always drain the channel to completion (even after a timeout) to
// avoid leaking it; we simply stop issuing deletes once ctx is done.
timedOut := false
for _, qe := range relay.QueryEvents {
ch, err := qe(ctx, filter)
if err != nil {
log.Printf("can't read from database: %s", err.Error())
continue
}
for ev := range ch {
if timedOut {
continue // drain remaining events, but stop deleting
}
if ctx.Err() != nil {
log.Printf("cleanup timeout while pruning expired events")
timedOut = true
continue
}
deleteEvent(ctx, ev)
}
}
}
func deleteEvent(ctx context.Context, ev *nostr.Event) bool {
for _, de := range relay.DeleteEvent {
if err := de(ctx, ev); err != nil {
log.Printf("can't delete event %s: %s", ev.ID, err.Error())
return false
}
}
return true
}