Skip to content

Commit 46d8616

Browse files
[pkg/stanza/fileconsumer] Optimize indexed fingerprint matching (#47660)
This PR was implemented as part of #45203 but decided to split the changes done to make them easier to review. Updates #27404 Changes fingerprint matching in `fileset`. Before this change, matching was done with linear scans over the tracked files. After this change, matching uses an internal index based on fingerprint length and fingerprint content, so exact and prefix matches no longer depend on scanning the full set every time. As part of that implementation, `Fileset.Pop()` changes from removing the first element to removing the last element. In other words, it changes from FIFO behavior to LIFO behavior. This is an internal change that makes removal O(1) and fits the indexed structure. The benchmark was run before and after the change with `go test ./fileconsumer -run '^$' -bench BenchmarkPollManyFiles -benchmem -count=5`. | Files watched | Before | After | Improvement | | --- | ---: | ---: | ---: | | 100 | 2.448 ms/op | 2.196 ms/op | 10.3% | | 500 | 12.27 ms/op | 10.77 ms/op | 12.3% | | 1000 | 28.20 ms/op | 21.91 ms/op | 22.3% | | 2000 | 64.59 ms/op | 44.90 ms/op | 30.5% | | 2500 | 87.65 ms/op | 57.22 ms/op | 34.7% | | 3000 | 112.98 ms/op | 69.61 ms/op | 38.4% | | Geomean | 28.55 ms/op | 21.27 ms/op | 25.5% | --------- Signed-off-by: Israel Blancas <iblancasa@gmail.com> Co-authored-by: Andrzej Stencel <andrzej.stencel@elastic.co>
1 parent defe51c commit 46d8616

11 files changed

Lines changed: 516 additions & 38 deletions

File tree

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: receiver/file_log
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Improve polling performance when watching many files by indexing fingerprint matching."
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [27404]
14+
15+
# If your change doesn't affect end users or the exported elements of any package,
16+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
17+
# Optional: The change log or logs in which this entry should be included.
18+
# e.g. '[user]' or '[user, api]'
19+
# Include 'user' if the change is relevant to end users.
20+
# Include 'api' if the change is relevant to users of libraries or exported APIs.
21+
# Include 'developer' if the change is relevant only for developers of this repo.
22+
# Default: '[user]'
23+
change_logs: []

pkg/stanza/fileconsumer/benchmark_test.go

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,50 @@ func BenchmarkConsumeFiles(b *testing.B) {
342342
}
343343
}
344344

345+
func BenchmarkPollManyFiles(b *testing.B) {
346+
fileCounts := []int{100, 500, 1000, 2000, 2500, 3000}
347+
for _, numFiles := range fileCounts {
348+
b.Run(fmt.Sprintf("Files_%d", numFiles), func(b *testing.B) {
349+
rootDir := b.TempDir()
350+
351+
for i := range numFiles {
352+
path := filepath.Join(rootDir, fmt.Sprintf("file%04d.log", i))
353+
f := filetest.OpenFile(b, path)
354+
_, err := f.WriteString(path + "\n")
355+
require.NoError(b, err)
356+
require.NoError(b, f.Close())
357+
}
358+
359+
cfg := NewConfig()
360+
cfg.Include = []string{filepath.Join(rootDir, "*.log")}
361+
cfg.StartAt = "beginning"
362+
cfg.MaxConcurrentFiles = numFiles * 2
363+
cfg.PollInterval = time.Microsecond
364+
365+
set := componenttest.NewNopTelemetrySettings()
366+
callback := func(context.Context, [][]byte, map[string]any, int64, []int64) error {
367+
return nil
368+
}
369+
370+
op, err := cfg.Build(set, callback)
371+
require.NoError(b, err)
372+
373+
ctx := b.Context()
374+
persister := testutil.NewUnscopedMockPersister()
375+
op.instantiateTracker(ctx, persister)
376+
op.persister = persister
377+
378+
op.poll(ctx)
379+
380+
b.ReportAllocs()
381+
b.ResetTimer()
382+
for b.Loop() {
383+
op.poll(ctx)
384+
}
385+
})
386+
}
387+
}
388+
345389
// BenchmarkFingerprintComparison benchmarks fingerprint comparison operations
346390
// This isolates the cost of fingerprint matching from file I/O
347391
func BenchmarkFingerprintComparison(b *testing.B) {
@@ -411,8 +455,8 @@ func BenchmarkFilesetMatch(b *testing.B) {
411455
fs := fileset.New[*reader.Metadata](size)
412456
fs.Add(metadatas...)
413457

414-
// Measure the Match operation
415-
result := fs.Match(targetFp, fileset.Equal)
458+
// Measure the indexed exact-match operation.
459+
result := fs.MatchEqual(targetFp)
416460
if result == nil {
417461
b.Fatal("expected to find match")
418462
}

pkg/stanza/fileconsumer/internal/archive/archive.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (a *archive) FindFiles(ctx context.Context, fps []*fingerprint.Fingerprint)
9393
// we've already found a match for this index, continue
9494
continue
9595
}
96-
if md := data.Match(fp, fileset.StartsWith); md != nil {
96+
if md := data.MatchStartsWith(fp); md != nil {
9797
// update the matched metada for the index
9898
matchedMetadata[j] = md
9999
archiveModified = true

pkg/stanza/fileconsumer/internal/fileset/fileset.go

Lines changed: 182 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,29 @@ type Matchable interface {
2222
GetFingerprint() *fingerprint.Fingerprint
2323
}
2424

25+
type bucket struct {
26+
indices []int
27+
}
28+
29+
type bucketPos struct {
30+
bucket *bucket
31+
idx int
32+
}
33+
2534
type Fileset[T Matchable] struct {
26-
readers []T
35+
readers []T
36+
positions []bucketPos
37+
buckets map[int]map[string]*bucket
38+
bucketLengths []int
2739
}
2840

2941
func New[T Matchable](capacity int) *Fileset[T] {
30-
return &Fileset[T]{readers: make([]T, 0, capacity)}
42+
return &Fileset[T]{
43+
readers: make([]T, 0, capacity),
44+
positions: make([]bucketPos, 0, capacity),
45+
buckets: make(map[int]map[string]*bucket),
46+
bucketLengths: make([]int, 0),
47+
}
3148
}
3249

3350
func (set *Fileset[T]) Len() int {
@@ -38,33 +55,188 @@ func (set *Fileset[T]) Get() []T {
3855
return set.readers
3956
}
4057

58+
func (set *Fileset[T]) Reset() {
59+
var zero T
60+
for i := range set.readers {
61+
set.readers[i] = zero
62+
}
63+
set.readers = set.readers[:0]
64+
for i := range set.positions {
65+
set.positions[i] = bucketPos{}
66+
}
67+
set.positions = set.positions[:0]
68+
clear(set.buckets)
69+
set.bucketLengths = set.bucketLengths[:0]
70+
}
71+
72+
func (set *Fileset[T]) Reindex() {
73+
if len(set.readers) == 0 {
74+
clear(set.buckets)
75+
set.bucketLengths = set.bucketLengths[:0]
76+
return
77+
}
78+
79+
clear(set.buckets)
80+
set.bucketLengths = set.bucketLengths[:0]
81+
82+
for idx, reader := range set.readers {
83+
pos := bucketPos{}
84+
if fp := reader.GetFingerprint(); fp != nil && fp.Len() > 0 {
85+
pos = set.insertIntoBucket(idx, fp)
86+
}
87+
set.positions[idx] = pos
88+
}
89+
}
90+
4191
func (set *Fileset[T]) Pop() (T, error) {
42-
// return first element from the array and remove it
4392
var val T
4493
if len(set.readers) == 0 {
4594
return val, errFilesetEmpty
4695
}
47-
r := set.readers[0]
48-
set.readers = slices.Delete(set.readers, 0, 1)
49-
return r, nil
96+
return set.removeAt(len(set.readers) - 1), nil
5097
}
5198

5299
func (set *Fileset[T]) Add(readers ...T) {
53-
// add open readers
54-
set.readers = append(set.readers, readers...)
100+
for _, reader := range readers {
101+
set.readers = append(set.readers, reader)
102+
idx := len(set.readers) - 1
103+
pos := bucketPos{}
104+
if fp := reader.GetFingerprint(); fp != nil && fp.Len() > 0 {
105+
pos = set.insertIntoBucket(idx, fp)
106+
}
107+
set.positions = append(set.positions, pos)
108+
}
109+
}
110+
111+
func (set *Fileset[T]) MatchEqual(fp *fingerprint.Fingerprint) T {
112+
var zero T
113+
if fp == nil || fp.Len() == 0 {
114+
return zero
115+
}
116+
bucketsByLength, ok := set.buckets[fp.Len()]
117+
if !ok {
118+
return zero
119+
}
120+
bucket := bucketsByLength[fp.Key()]
121+
if bucket == nil {
122+
return zero
123+
}
124+
for _, idx := range slices.Backward(bucket.indices) {
125+
if fp.Equal(set.readers[idx].GetFingerprint()) {
126+
return set.removeAt(idx)
127+
}
128+
}
129+
return zero
130+
}
131+
132+
func (set *Fileset[T]) MatchStartsWith(fp *fingerprint.Fingerprint) T {
133+
var zero T
134+
if fp == nil || fp.Len() == 0 {
135+
return zero
136+
}
137+
fpKey := fp.Key()
138+
for _, length := range set.bucketLengths {
139+
if length > len(fpKey) {
140+
continue
141+
}
142+
bucketsByLength, ok := set.buckets[length]
143+
if !ok {
144+
continue
145+
}
146+
bucket := bucketsByLength[fpKey[:length]]
147+
if bucket == nil {
148+
continue
149+
}
150+
for _, idx := range slices.Backward(bucket.indices) {
151+
if fp.StartsWith(set.readers[idx].GetFingerprint()) {
152+
return set.removeAt(idx)
153+
}
154+
}
155+
}
156+
return zero
55157
}
56158

57159
func (set *Fileset[T]) Match(fp *fingerprint.Fingerprint, cmp func(a, b *fingerprint.Fingerprint) bool) T {
58160
var val T
59161
for idx, r := range set.readers {
60162
if cmp(fp, r.GetFingerprint()) {
61-
set.readers = append(set.readers[:idx], set.readers[idx+1:]...)
62-
return r
163+
return set.removeAt(idx)
63164
}
64165
}
65166
return val
66167
}
67168

169+
func (set *Fileset[T]) insertIntoBucket(idx int, fp *fingerprint.Fingerprint) bucketPos {
170+
keyLen := fp.Len()
171+
key := fp.Key()
172+
173+
bucketsByLength := set.buckets[keyLen]
174+
if bucketsByLength == nil {
175+
bucketsByLength = make(map[string]*bucket)
176+
set.buckets[keyLen] = bucketsByLength
177+
set.bucketLengths = append(set.bucketLengths, keyLen)
178+
slices.SortFunc(set.bucketLengths, func(a, b int) int {
179+
return b - a
180+
})
181+
}
182+
183+
b := bucketsByLength[key]
184+
if b == nil {
185+
b = &bucket{}
186+
bucketsByLength[key] = b
187+
}
188+
b.indices = append(b.indices, idx)
189+
190+
return bucketPos{
191+
bucket: b,
192+
idx: len(b.indices) - 1,
193+
}
194+
}
195+
196+
func (set *Fileset[T]) removeAt(idx int) T {
197+
val := set.readers[idx]
198+
set.removeFromBucket(idx)
199+
200+
lastIdx := len(set.readers) - 1
201+
if idx != lastIdx {
202+
set.readers[idx] = set.readers[lastIdx]
203+
set.positions[idx] = set.positions[lastIdx]
204+
set.updateBucketPosition(idx)
205+
}
206+
207+
var zero T
208+
set.readers[lastIdx] = zero
209+
set.readers = set.readers[:lastIdx]
210+
set.positions[lastIdx] = bucketPos{}
211+
set.positions = set.positions[:lastIdx]
212+
213+
return val
214+
}
215+
216+
func (set *Fileset[T]) removeFromBucket(idx int) {
217+
pos := set.positions[idx]
218+
if pos.bucket == nil {
219+
return
220+
}
221+
222+
last := len(pos.bucket.indices) - 1
223+
swapIdx := pos.bucket.indices[last]
224+
pos.bucket.indices[pos.idx] = swapIdx
225+
pos.bucket.indices = pos.bucket.indices[:last]
226+
if pos.idx < last {
227+
set.positions[swapIdx].idx = pos.idx
228+
}
229+
set.positions[idx] = bucketPos{}
230+
}
231+
232+
func (set *Fileset[T]) updateBucketPosition(idx int) {
233+
pos := &set.positions[idx]
234+
if pos.bucket == nil {
235+
return
236+
}
237+
pos.bucket.indices[pos.idx] = idx
238+
}
239+
68240
// comparators
69241
func StartsWith(a, b *fingerprint.Fingerprint) bool {
70242
return a.StartsWith(b)

0 commit comments

Comments
 (0)