Skip to content

Commit 31418a7

Browse files
[receiver/kafkametrics] use kadm.Client.Lag and do not record negative values (#48701) (#48774)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #48701 <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Authorship attestation. See AGENTS.md for details. AI agents must not check this box on behalf of the user; the human author must check it themselves before the PR is ready for review.--> #### Authorship - [x] I, a human, wrote this pull request description myself. <!--Please delete paragraphs that you did not use before submitting.-->
1 parent b6f0fa8 commit 31418a7

4 files changed

Lines changed: 155 additions & 81 deletions

File tree

.chloggen/48701.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: receiver/kafka_metrics
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: use kadm.Client.Lag and do not record negative values
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [48701]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/kafkametricsreceiver/consumer_scraper_franz.go

Lines changed: 37 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
"go.opentelemetry.io/collector/pdata/pmetric"
1717
"go.opentelemetry.io/collector/receiver"
1818
"go.opentelemetry.io/collector/scraper"
19-
"go.uber.org/multierr"
19+
"go.opentelemetry.io/collector/scraper/scrapererror"
2020

2121
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
2222
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata"
@@ -70,108 +70,69 @@ func (s *consumerScraperFranz) scrape(ctx context.Context) (pmetric.Metrics, err
7070
return pmetric.Metrics{}, err
7171
}
7272

73-
var scrapeErr error
74-
75-
// 1) list & filter groups
76-
lgs, err := s.adm.ListGroups(ctx)
73+
lgs, err := s.adm.ListGroupsByType(ctx, []string{"classic", "consumer"})
7774
if err != nil {
78-
return pmetric.Metrics{}, fmt.Errorf("franz-go: ListGroups failed: %w", err)
75+
return pmetric.Metrics{}, fmt.Errorf("franz-go: ListGroupsByType failed: %w", err)
7976
}
77+
8078
var matchedGrpIDs []string
81-
for _, g := range lgs.Sorted() {
79+
for _, g := range lgs {
8280
if s.groupFilter.MatchString(g.Group) {
8381
matchedGrpIDs = append(matchedGrpIDs, g.Group)
8482
}
8583
}
8684

87-
// 2) list & filter topics
88-
td, err := s.adm.ListTopics(ctx) // non-internal only, same as sarama default
89-
if err != nil {
90-
return pmetric.Metrics{}, fmt.Errorf("franz-go: ListTopics failed: %w", err)
91-
}
92-
var matchedTopics []string
93-
for t := range td {
94-
if s.topicFilter.MatchString(t) {
95-
matchedTopics = append(matchedTopics, t)
96-
}
97-
}
98-
99-
// 3) compute partition list + end offsets for matched topics
100-
endOffsets, err := s.adm.ListEndOffsets(ctx, matchedTopics...)
101-
if err != nil {
102-
return pmetric.Metrics{}, fmt.Errorf("franz-go: ListEndOffsets failed: %w", err)
103-
}
104-
// Build helpers equivalent to Sarama path
105-
topicPartitions := make(map[string][]int32, len(matchedTopics))
106-
topicPartitionOffset := make(map[string]map[int32]int64, len(matchedTopics))
107-
endOffsets.Each(func(lo kadm.ListedOffset) {
108-
// lo.Topic, lo.Partition, lo.Offset
109-
if _, ok := topicPartitions[lo.Topic]; !ok {
110-
topicPartitions[lo.Topic] = []int32{}
111-
}
112-
if _, ok := topicPartitionOffset[lo.Topic]; !ok {
113-
topicPartitionOffset[lo.Topic] = map[int32]int64{}
114-
}
115-
topicPartitions[lo.Topic] = append(topicPartitions[lo.Topic], lo.Partition)
116-
topicPartitionOffset[lo.Topic][lo.Partition] = lo.Offset
117-
})
118-
119-
// 4) describe groups for member counts
120-
dgs, err := s.adm.DescribeGroups(ctx, matchedGrpIDs...)
85+
dgls, err := s.adm.Lag(ctx, matchedGrpIDs...)
12186
if err != nil {
122-
return pmetric.Metrics{}, fmt.Errorf("franz-go: DescribeGroups failed: %w", err)
87+
return pmetric.Metrics{}, fmt.Errorf("franz-go: Lag failed: %w", err)
12388
}
12489

90+
scrapeErrs := scrapererror.ScrapeErrors{}
12591
now := pcommon.NewTimestampFromTime(time.Now())
126-
127-
// 5) per group: fetch committed offsets for matched topics and compute metrics
128-
gs := dgs.Sorted()
129-
for i := range gs {
130-
g := &gs[i]
131-
grpID := g.Group
132-
s.mb.RecordKafkaConsumerGroupMembersDataPoint(now, int64(len(g.Members)), grpID)
133-
134-
offsets, ferr := s.adm.FetchOffsetsForTopics(ctx, grpID, matchedTopics...)
135-
if ferr != nil {
136-
scrapeErr = multierr.Append(scrapeErr, fmt.Errorf("franz-go: FetchOffsetsForTopics(%s) failed: %w", grpID, ferr))
92+
for group := range dgls {
93+
dgl := dgls[group]
94+
if dgl.DescribeErr != nil {
95+
scrapeErrs.AddPartial(1, fmt.Errorf("franz-go: returned error from describing the group. group=%s, error=%w", group, dgl.DescribeErr))
13796
continue
13897
}
139-
140-
for topic, parts := range topicPartitions {
141-
isConsumed := false
142-
var lagSum int64
98+
s.mb.RecordKafkaConsumerGroupMembersDataPoint(now, int64(len(dgl.Members)), group)
99+
if dgl.FetchErr != nil {
100+
scrapeErrs.AddPartial(1, fmt.Errorf("franz-go: returned error from fetching offsets. group=%s, error=%w", group, dgl.FetchErr))
101+
continue
102+
}
103+
for topic := range dgl.Lag {
104+
if !s.topicFilter.MatchString(topic) {
105+
continue
106+
}
107+
gmls := dgl.Lag[topic]
108+
var isConsumed bool
143109
var offsetSum int64
144-
145-
for _, p := range parts {
146-
consumerOffset := int64(-1)
147-
if or, ok := offsets.Lookup(topic, p); ok && or.Err == nil {
148-
consumerOffset = or.At
110+
var lagSum int64
111+
for partition := range gmls {
112+
gml := gmls[partition]
113+
if gml.Err != nil {
114+
scrapeErrs.AddPartial(1, fmt.Errorf("franz-go: returned either the commit error, or the list end offsets error. group=%s, topic=%s, partition=%d, error=%w", group, topic, partition, gml.Err))
115+
continue
149116
}
150-
offsetSum += consumerOffset
151-
s.mb.RecordKafkaConsumerGroupOffsetDataPoint(now, consumerOffset, grpID, topic, int64(p))
152-
153-
var consumerLag int64 = -1
154-
if consumerOffset != -1 {
117+
if gml.Commit.At != -1 {
155118
isConsumed = true
156-
if end, ok := topicPartitionOffset[topic][p]; ok {
157-
consumerLag = end - consumerOffset
158-
lagSum += consumerLag
159-
}
119+
offsetSum += gml.Commit.At
120+
lagSum += gml.Lag // franz-go clamps Lag to >= 0 and only returns Lag == -1 when gml.Err != nil
121+
s.mb.RecordKafkaConsumerGroupOffsetDataPoint(now, gml.Commit.At, group, topic, int64(partition))
122+
s.mb.RecordKafkaConsumerGroupLagDataPoint(now, gml.Lag, group, topic, int64(partition))
160123
}
161-
s.mb.RecordKafkaConsumerGroupLagDataPoint(now, consumerLag, grpID, topic, int64(p))
162124
}
163-
164125
if isConsumed {
165-
s.mb.RecordKafkaConsumerGroupOffsetSumDataPoint(now, offsetSum, grpID, topic)
166-
s.mb.RecordKafkaConsumerGroupLagSumDataPoint(now, lagSum, grpID, topic)
126+
s.mb.RecordKafkaConsumerGroupOffsetSumDataPoint(now, offsetSum, group, topic)
127+
s.mb.RecordKafkaConsumerGroupLagSumDataPoint(now, lagSum, group, topic)
167128
}
168129
}
169130
}
170131

171132
rb := s.mb.NewResourceBuilder()
172133
rb.SetKafkaClusterAlias(s.config.ClusterAlias)
173134

174-
return s.mb.Emit(metadata.WithResource(rb.Emit())), scrapeErr
135+
return s.mb.Emit(metadata.WithResource(rb.Emit())), scrapeErrs.Combine()
175136
}
176137

177138
// Factory helper for franz-go path (selected under the feature gate later).

receiver/kafkametricsreceiver/consumer_scraper_franz_test.go

Lines changed: 90 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func TestConsumerScraperFranz_ScrapeMetricValues(t *testing.T) {
115115
const (
116116
topic = "topic-a"
117117
group = "test-group"
118-
committed = int64(7)
118+
committed = int64(0)
119119
)
120120

121121
cluster, clientCfg := kafkatest.NewCluster(t, kfake.SeedTopics(1, topic))
@@ -158,9 +158,8 @@ func TestConsumerScraperFranz_ScrapeMetricValues(t *testing.T) {
158158
require.True(t, ok)
159159
require.Equal(t, "test-cluster", val.Str())
160160

161-
// We produced 1 record at partition 0, and committed offset = 7. End offset is 1 (record offset 0 + 1),
162-
// so lag = 1 - 7 = -6. The scraper records the raw difference; we just verify the metric is emitted
163-
// with the committed offset and a deterministic lag.
161+
// We produced 1 record at partition 0, and committed offset = 0. End offset is 1 (record offset 0 + 1),
162+
// so lag = 1 - 0 = 1.
164163
const expectedEnd = int64(1)
165164
const expectedLag = expectedEnd - committed
166165

@@ -216,6 +215,93 @@ func TestConsumerScraperFranz_ScrapeMetricValues(t *testing.T) {
216215
require.NoError(t, err)
217216
}
218217

218+
func TestConsumerScraperFranz_ScrapeNoEmittedDataPointsForUncommitted(t *testing.T) {
219+
const (
220+
topic = "topic-a"
221+
group = "test-group-uncommitted"
222+
)
223+
224+
cluster, clientCfg := kafkatest.NewCluster(t, kfake.SeedTopics(1, topic))
225+
cl, err := kgo.NewClient(kgo.SeedBrokers(cluster.ListenAddrs()...))
226+
require.NoError(t, err)
227+
t.Cleanup(cl.Close)
228+
229+
adm := kadm.NewClient(cl)
230+
231+
// Produce a record so the partition has a valid log end offset (1).
232+
produceResults := cl.ProduceSync(t.Context(), &kgo.Record{Topic: topic, Value: []byte("payload")})
233+
require.NoError(t, produceResults.FirstErr())
234+
235+
// Commit an explicit offset of -1 for the group at partition 0.
236+
// This maps directly to the gml.Commit.At == -1 check in the scraper logic.
237+
var os kadm.Offsets
238+
os.AddOffset(topic, 0, -1, -1)
239+
_, err = adm.CommitOffsets(t.Context(), group, os)
240+
require.NoError(t, err)
241+
242+
cfg := Config{
243+
ClientConfig: clientCfg,
244+
MetricsBuilderConfig: metadata.NewDefaultMetricsBuilderConfig(),
245+
ClusterAlias: "test-cluster",
246+
TopicMatch: ".*",
247+
GroupMatch: ".*",
248+
}
249+
cfg.ResourceAttributes.KafkaClusterAlias.Enabled = true
250+
251+
s, err := createConsumerScraperFranz(t.Context(), cfg, receivertest.NewNopSettings(metadata.Type))
252+
require.NoError(t, err)
253+
require.NoError(t, s.Start(t.Context(), componenttest.NewNopHost()))
254+
t.Cleanup(func() { require.NoError(t, s.Shutdown(t.Context())) })
255+
256+
md, err := s.ScrapeMetrics(t.Context())
257+
require.NoError(t, err)
258+
require.Equal(t, 1, md.ResourceMetrics().Len())
259+
require.Equal(t, 1, md.ResourceMetrics().At(0).ScopeMetrics().Len())
260+
261+
rm := md.ResourceMetrics().At(0)
262+
ms := rm.ScopeMetrics().At(0).Metrics()
263+
264+
seen := map[string]bool{}
265+
for i := 0; i < ms.Len(); i++ {
266+
m := ms.At(i)
267+
seen[m.Name()] = true
268+
switch m.Name() {
269+
case "kafka.consumer_group.offset":
270+
// Ensure no individual partition offset metrics were generated
271+
require.Zero(t, m.Gauge().DataPoints().Len(), "expected no offset datapoints for an uncommitted partition")
272+
case "kafka.consumer_group.lag":
273+
// Ensure no individual partition lag metrics were generated
274+
require.Zero(t, m.Gauge().DataPoints().Len(), "expected no lag datapoints for an uncommitted partition")
275+
case "kafka.consumer_group.offset_sum":
276+
// Ensure the sum block was bypassed since isConsumed should be false
277+
require.Zero(t, m.Gauge().DataPoints().Len(), "expected no offset_sum datapoints for an uncommitted partition")
278+
case "kafka.consumer_group.lag_sum":
279+
// Ensure the sum block was bypassed since isConsumed should be false
280+
require.Zero(t, m.Gauge().DataPoints().Len(), "expected no lag_sum datapoints for an uncommitted partition")
281+
case "kafka.consumer_group.members":
282+
// Group structural tracking still exists, so members is safely recorded as 0
283+
require.Equal(t, int64(0), m.Sum().DataPoints().At(0).IntValue())
284+
}
285+
}
286+
for _, name := range []string{
287+
"kafka.consumer_group.offset",
288+
"kafka.consumer_group.lag",
289+
"kafka.consumer_group.offset_sum",
290+
"kafka.consumer_group.lag_sum",
291+
} {
292+
require.False(t, seen[name], "metric %s emitted", name)
293+
}
294+
for _, name := range []string{
295+
"kafka.consumer_group.members",
296+
} {
297+
require.True(t, seen[name], "metric %s not emitted", name)
298+
}
299+
300+
// Clean up the group so the kfake goroutine exits smoothly.
301+
_, err = adm.DeleteGroups(t.Context(), group)
302+
require.NoError(t, err)
303+
}
304+
219305
func TestConsumerScraperFranz_ScrapeUnreachable(t *testing.T) {
220306
cluster, clientCfg := kafkatest.NewCluster(t, kfake.SeedTopics(1, "topic-a"))
221307
cfg := Config{

receiver/kafkametricsreceiver/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ require (
2424
go.opentelemetry.io/collector/scraper v0.154.0
2525
go.opentelemetry.io/collector/scraper/scraperhelper v0.154.0
2626
go.uber.org/goleak v1.3.0
27-
go.uber.org/multierr v1.11.0
2827
go.uber.org/zap v1.28.0
2928
)
3029

@@ -90,6 +89,7 @@ require (
9089
go.opentelemetry.io/otel/sdk v1.44.0 // indirect
9190
go.opentelemetry.io/otel/sdk/metric v1.44.0 // indirect
9291
go.opentelemetry.io/otel/trace v1.44.0 // indirect
92+
go.uber.org/multierr v1.11.0 // indirect
9393
go.yaml.in/yaml/v3 v3.0.4 // indirect
9494
golang.org/x/crypto v0.52.0 // indirect
9595
golang.org/x/net v0.55.0 // indirect

0 commit comments

Comments
 (0)