Skip to content

Commit 1a47f20

Browse files
authored
[exporter/kafka] Add producer.max_broker_write_bytes config option (#49007)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description The maximum size of a single write to a broker was previously fixed at the underlying franz-go default of 100 MiB and could not be configured. As a result, setting `producer.max_message_bytes` above 100 MiB passed configuration validation but caused the collector to fail on startup with an unrecoverable error. The new `producer.max_broker_write_bytes` setting (default 104857600, i.e. 100 MiB) exposes this limit. To send messages larger than 100 MiB, raise it so it is greater than or equal to `max_message_bytes`. Configuration is now validated up front: the collector reports a clear error if `max_broker_write_bytes` is below the 100 MiB minimum or smaller than `max_message_bytes`, rather than failing at runtime. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #47492 <!--Describe what testing was performed and which tests were added.--> #### Testing Added unit tests. <!--Describe the documentation added.--> #### Documentation Updated the README.md with this new option. <!--Authorship attestation. See AGENTS.md for details. AI agents must not check this box on behalf of the user; the human author must check it themselves before the PR is ready for review.--> #### Authorship - [X] I, a human, wrote this pull request description myself. <!--Please delete paragraphs that you did not use before submitting.--> --------- Signed-off-by: Paulo Dias <paulodias.gm@gmail.com>
1 parent 0abbb14 commit 1a47f20

7 files changed

Lines changed: 144 additions & 2 deletions

File tree

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: exporter/kafka
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add `producer.max_broker_write_bytes` config
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [47492]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
The maximum size of a single write to a broker was previously fixed at the underlying
20+
franz-go default of 100 MiB and could not be configured. As a result, setting
21+
`producer.max_message_bytes` above 100 MiB passed configuration validation but caused the
22+
collector to fail on startup with an unrecoverable error ("max broker write bytes ... is
23+
erroneously less than max record batch bytes ...").
24+
25+
The new `producer.max_broker_write_bytes` setting (default 104857600, i.e. 100 MiB) exposes
26+
this limit. To send messages larger than 100 MiB, raise it so it is greater than or equal to
27+
`max_message_bytes`. Configuration is now validated up front: the collector reports a clear
28+
error if `max_broker_write_bytes` is below the 100 MiB minimum or smaller than
29+
`max_message_bytes`, rather than failing at runtime.
30+
31+
# If your change doesn't affect end users or the exported elements of any package,
32+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
33+
# Optional: The change log or logs in which this entry should be included.
34+
# e.g. '[user]' or '[user, api]'
35+
# Include 'user' if the change is relevant to end users.
36+
# Include 'api' if there is a change to a library API.
37+
# Default: '[user]'
38+
change_logs: [user]

exporter/kafkaexporter/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ The following settings can be optionally configured:
105105
- `requests_per_second` is the average number of requests per seconds.
106106
- `producer`
107107
- `max_message_bytes` (default = 1000000) the maximum permitted size of a message in bytes, calculated before compression.
108+
- `max_broker_write_bytes` (default = 104857600) the maximum bytes the producer will write to a broker in a single request. Must be greater than or equal to `max_message_bytes`. Increase this when raising `max_message_bytes` above 100 MiB.
108109
- `required_acks` (default = 1) controls when a message is regarded as transmitted. <https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#acks>
109110
- `compression` (default = 'none') the compression used when producing messages to kafka. The options are: `none`, `gzip`, `snappy`, `lz4`, and `zstd` <https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#compression-type>
110111
- `compression_params`

internal/kafka/franz_client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func NewFranzSyncProducer(
7070
kgo.ProducerBatchCompression(codec),
7171
kgo.ProducerLinger(cfg.Linger),
7272
kgo.ProducerBatchMaxBytes(int32(cfg.MaxMessageBytes)),
73+
kgo.BrokerMaxWriteBytes(int32(cfg.MaxBrokerWriteBytes)),
7374
kgo.MaxBufferedRecords(cfg.FlushMaxMessages),
7475
)...)
7576
if err != nil {

pkg/kafka/configkafka/config.go

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package configkafka // import "github.com/open-telemetry/opentelemetry-collector
66
import (
77
"errors"
88
"fmt"
9+
"math"
910
"strings"
1011
"time"
1112

@@ -268,10 +269,26 @@ type AutoCommitConfig struct {
268269
Interval time.Duration `mapstructure:"interval"`
269270
}
270271

272+
// franzGoMinBrokerWriteBytes is franz-go's hardcoded 100 MiB floor for
273+
// kgo.BrokerMaxWriteBytes: values below it are rejected by franz-go at client
274+
// construction. It is also franz-go's default for that option, so it doubles as
275+
// the default for ProducerConfig.MaxBrokerWriteBytes, preserving the prior
276+
// (non-configurable) behavior when left unset.
277+
const franzGoMinBrokerWriteBytes = 100 << 20 // 104857600
278+
271279
type ProducerConfig struct {
272-
// Maximum message bytes the producer will accept to produce (default 1000000)
280+
// MaxMessageBytes is the maximum message bytes the producer will accept to
281+
// produce. It must be less than or equal to MaxBrokerWriteBytes, and must
282+
// fit in an int32 as it maps to franz-go's kgo.ProducerBatchMaxBytes.
283+
// (default 1000000)
273284
MaxMessageBytes int `mapstructure:"max_message_bytes"`
274285

286+
// MaxBrokerWriteBytes is the maximum bytes the producer will write to a
287+
// broker in a single request. It must be >= MaxMessageBytes. Maps to
288+
// franz-go's kgo.BrokerMaxWriteBytes, whose default (and minimum accepted
289+
// value) is 100 MiB. (default 104857600)
290+
MaxBrokerWriteBytes int `mapstructure:"max_broker_write_bytes"`
291+
275292
// RequiredAcks holds the number acknowledgements required before producing
276293
// returns successfully. See:
277294
// https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#acks
@@ -309,6 +326,7 @@ type ProducerConfig struct {
309326
func NewDefaultProducerConfig() ProducerConfig {
310327
return ProducerConfig{
311328
MaxMessageBytes: 1000000,
329+
MaxBrokerWriteBytes: franzGoMinBrokerWriteBytes,
312330
RequiredAcks: WaitForLocal,
313331
Compression: "none",
314332
FlushMaxMessages: 10000,
@@ -335,6 +353,32 @@ func (c ProducerConfig) Validate() error {
335353
if c.MaxMessageBytes < 0 {
336354
return fmt.Errorf("max_message_bytes (%d) must be non-negative", c.MaxMessageBytes)
337355
}
356+
// Both limits are passed to franz-go as int32, so reject anything that would
357+
// overflow on conversion and silently become a negative/invalid size.
358+
if c.MaxMessageBytes > math.MaxInt32 {
359+
return fmt.Errorf("max_message_bytes (%d) must not exceed %d", c.MaxMessageBytes, math.MaxInt32)
360+
}
361+
if c.MaxBrokerWriteBytes < 0 {
362+
return fmt.Errorf("max_broker_write_bytes (%d) must be non-negative", c.MaxBrokerWriteBytes)
363+
}
364+
if c.MaxBrokerWriteBytes < franzGoMinBrokerWriteBytes {
365+
return fmt.Errorf(
366+
"max_broker_write_bytes (%d) must be at least %d (%d MiB, franz-go minimum)",
367+
c.MaxBrokerWriteBytes,
368+
franzGoMinBrokerWriteBytes,
369+
franzGoMinBrokerWriteBytes>>20,
370+
)
371+
}
372+
if c.MaxBrokerWriteBytes > math.MaxInt32 {
373+
return fmt.Errorf("max_broker_write_bytes (%d) must not exceed %d", c.MaxBrokerWriteBytes, math.MaxInt32)
374+
}
375+
if c.MaxMessageBytes > c.MaxBrokerWriteBytes {
376+
return fmt.Errorf(
377+
"max_message_bytes (%d) cannot be greater than max_broker_write_bytes (%d)",
378+
c.MaxMessageBytes,
379+
c.MaxBrokerWriteBytes,
380+
)
381+
}
338382
if c.FlushMaxMessages < 1 {
339383
return fmt.Errorf("flush_max_messages (%d) must be at least 1", c.FlushMaxMessages)
340384
}

pkg/kafka/configkafka/config.schema.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,11 @@ $defs:
194194
description: Linger controls the linger time for the producer. (default 10ms).
195195
type: string
196196
format: duration
197+
max_broker_write_bytes:
198+
description: MaxBrokerWriteBytes is the maximum bytes the producer will write to a broker in a single request. It must be >= MaxMessageBytes. Maps to franz-go's kgo.BrokerMaxWriteBytes, whose default (and minimum accepted value) is 100 MiB. (default 104857600)
199+
type: integer
197200
max_message_bytes:
198-
description: Maximum message bytes the producer will accept to produce (default 1000000)
201+
description: MaxMessageBytes is the maximum message bytes the producer will accept to produce. It must be less than or equal to MaxBrokerWriteBytes, and must fit in an int32 as it maps to franz-go's kgo.ProducerBatchMaxBytes. (default 1000000)
199202
type: integer
200203
required_acks:
201204
description: 'RequiredAcks holds the number acknowledgements required before producing returns successfully. See: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#acks Acceptable values are: 0 (NoResponse) Does not wait for any acknowledgements. 1 (WaitForLocal) Waits for only the leader to write the record to its local log, but does not wait for followers to acknowledge. (default) -1 (WaitForAll) Waits for all in-sync replicas to acknowledge. In YAML configuration, "all" is accepted as an alias for -1.'

pkg/kafka/configkafka/config_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
package configkafka // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/configkafka"
55

66
import (
7+
"fmt"
8+
"math"
79
"path/filepath"
10+
"strconv"
811
"testing"
912
"time"
1013

@@ -296,6 +299,14 @@ func TestProducerConfig(t *testing.T) {
296299
return cfg
297300
}(),
298301
},
302+
"large_message": {
303+
expected: func() ProducerConfig {
304+
cfg := NewDefaultProducerConfig()
305+
cfg.MaxMessageBytes = 209715200
306+
cfg.MaxBrokerWriteBytes = 268435456
307+
return cfg
308+
}(),
309+
},
299310

300311
// Invalid configurations
301312
"invalid_compression": {
@@ -313,6 +324,41 @@ func TestProducerConfig(t *testing.T) {
313324
"max_message_bytes_negative": {
314325
expectedErr: "max_message_bytes (-1000) must be non-negative",
315326
},
327+
"max_broker_write_bytes_negative": {
328+
expectedErr: "max_broker_write_bytes (-1000) must be non-negative",
329+
},
330+
"max_broker_write_bytes_too_small": {
331+
expectedErr: fmt.Sprintf("max_broker_write_bytes (1000) must be at least %d (%d MiB, franz-go minimum)", franzGoMinBrokerWriteBytes, franzGoMinBrokerWriteBytes>>20),
332+
},
333+
"max_message_bytes_exceeds_broker": {
334+
expectedErr: fmt.Sprintf("max_message_bytes (209715200) cannot be greater than max_broker_write_bytes (%d)", franzGoMinBrokerWriteBytes),
335+
},
336+
})
337+
}
338+
339+
// TestProducerConfigInt32Overflow verifies that size limits exceeding the int32
340+
// range used by franz-go are rejected. Values above math.MaxInt32 are only
341+
// representable on platforms where int is 64-bit, so this is skipped elsewhere.
342+
func TestProducerConfigInt32Overflow(t *testing.T) {
343+
if strconv.IntSize < 64 {
344+
t.Skip("int is not wide enough to exceed math.MaxInt32 on this architecture")
345+
}
346+
// Use a runtime int64 value so the conversion to int is not a compile-time
347+
// constant (which would overflow int and fail to build on 32-bit platforms).
348+
maxInt32 := int64(math.MaxInt32)
349+
overflow := int(maxInt32 + 1)
350+
351+
t.Run("max_message_bytes", func(t *testing.T) {
352+
cfg := NewDefaultProducerConfig()
353+
cfg.MaxMessageBytes = overflow
354+
require.EqualError(t, cfg.Validate(),
355+
fmt.Sprintf("max_message_bytes (%d) must not exceed %d", overflow, math.MaxInt32))
356+
})
357+
t.Run("max_broker_write_bytes", func(t *testing.T) {
358+
cfg := NewDefaultProducerConfig()
359+
cfg.MaxBrokerWriteBytes = overflow
360+
require.EqualError(t, cfg.Validate(),
361+
fmt.Sprintf("max_broker_write_bytes (%d) must not exceed %d", overflow, math.MaxInt32))
316362
})
317363
}
318364

pkg/kafka/configkafka/testdata/producer_config.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ kafka/required_acks_all:
2525
required_acks: all
2626
kafka/disable_auto_topic_creation:
2727
allow_auto_topic_creation: false
28+
kafka/large_message:
29+
max_message_bytes: 209715200
30+
max_broker_write_bytes: 268435456
2831

2932
# Invalid configurations
3033
kafka/invalid_compression:
@@ -37,6 +40,12 @@ kafka/flush_max_messages_negative:
3740
flush_max_messages: -1
3841
kafka/max_message_bytes_negative:
3942
max_message_bytes: -1000
43+
kafka/max_broker_write_bytes_too_small:
44+
max_broker_write_bytes: 1000
45+
kafka/max_message_bytes_exceeds_broker:
46+
max_message_bytes: 209715200
47+
kafka/max_broker_write_bytes_negative:
48+
max_broker_write_bytes: -1000
4049

4150
kafka/producer_linger:
4251
linger: 100ms

0 commit comments

Comments
 (0)