Skip to content

Commit d119753

Browse files
[pkg/stanza] Fix container parser operator logging errors at ERROR level when on_error is set to quiet mode (#45704)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Fix container parser operator logging errors at ERROR level when `on_error` is set to quiet mode (`send_quiet` or `drop_quiet`) <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Relates to #42646 Assisted-by: Claude Opus 4.7 --------- Signed-off-by: Paulo Dias <paulodias.gm@gmail.com> Co-authored-by: Andrzej Stencel <andrzej.stencel@elastic.co>
1 parent 2a74f66 commit d119753

9 files changed

Lines changed: 614 additions & 75 deletions

File tree

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'bug_fix'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: 'pkg/stanza'
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: 'Fix stanza container operator logging errors at ERROR level when `on_error` is set to a quiet mode'
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [42646]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
Previously, the `container` operator logged entry-level processing errors at
20+
ERROR level even when `on_error` was set to `drop_quiet` or `send_quiet`.
21+
These errors are now logged at DEBUG level in quiet modes, matching the
22+
documented behavior. Downstream delivery failures continue to propagate so
23+
the pipeline can react to them.
24+
25+
# If your change doesn't affect end users or the exported elements of any package,
26+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
27+
# Optional: The change log or logs in which this entry should be included.
28+
# e.g. '[user]' or '[user, api]'
29+
# Include 'user' if the change is relevant to end users.
30+
# Include 'api' if there is a change to a library API.
31+
# Default: '[user]'
32+
change_logs: [user]

pkg/stanza/operator/helper/parser.go

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ import (
1414
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/stanzaerrors"
1515
)
1616

17+
// ErrEntryHandled signals that ParseWith already handled the entry
18+
// (logged and optionally written downstream) and the caller must not
19+
// write it again or propagate an error.
20+
var ErrEntryHandled = errors.New("entry handled by parser in quiet mode")
21+
1722
// NewParserConfig creates a new parser config with default values
1823
func NewParserConfig(operatorID, operatorType string) ParserConfig {
1924
return ParserConfig{
@@ -117,7 +122,7 @@ func (p *ParserOperator) ProcessBatchWithCallback(ctx context.Context, entries [
117122
}
118123

119124
if err = p.ParseWith(ctx, ent, parse, write); err != nil {
120-
if p.OnError != DropOnErrorQuiet && p.OnError != SendOnErrorQuiet {
125+
if !errors.Is(err, ErrEntryHandled) {
121126
errs = append(errs, err)
122127
}
123128
continue
@@ -153,15 +158,13 @@ func (p *ParserOperator) ProcessWithCallback(ctx context.Context, entry *entry.E
153158
}
154159

155160
if err = p.ParseWith(ctx, entry, parse, p.Write); err != nil {
156-
if p.OnError == DropOnErrorQuiet || p.OnError == SendOnErrorQuiet {
161+
if errors.Is(err, ErrEntryHandled) {
157162
return nil
158163
}
159-
160164
return err
161165
}
162166
if cb != nil {
163-
err = cb(entry)
164-
if err != nil {
167+
if err = cb(entry); err != nil {
165168
return p.HandleEntryError(ctx, entry, err)
166169
}
167170
}
@@ -170,24 +173,35 @@ func (p *ParserOperator) ProcessWithCallback(ctx context.Context, entry *entry.E
170173
}
171174

172175
// ParseWith will process an entry's field with a parser function.
176+
// In quiet on_error modes any entry-level error is handled internally and
177+
// ErrEntryHandled is returned so callers do not write or propagate again.
173178
func (p *ParserOperator) ParseWith(ctx context.Context, entry *entry.Entry, parse ParseFunction, write WriteFunction) error {
179+
// handle translates a nil-in-quiet-mode return from HandleEntryErrorWithWrite
180+
// into ErrEntryHandled.
181+
handle := func(err error) error {
182+
handled := p.HandleEntryErrorWithWrite(ctx, entry, err, write)
183+
if handled == nil && p.isQuietMode() {
184+
return ErrEntryHandled
185+
}
186+
return handled
187+
}
188+
174189
value, ok := entry.Get(p.ParseFrom)
175190
if !ok {
176-
err := stanzaerrors.NewError(
191+
return handle(stanzaerrors.NewError(
177192
"Entry is missing the expected parse_from field.",
178193
"Ensure that all incoming entries contain the parse_from field.",
179194
"parse_from", p.ParseFrom.String(),
180-
)
181-
return p.HandleEntryErrorWithWrite(ctx, entry, err, write)
195+
))
182196
}
183197

184198
newValue, err := parse(value)
185199
if err != nil {
186-
return p.HandleEntryErrorWithWrite(ctx, entry, err, write)
200+
return handle(err)
187201
}
188202

189203
if err := entry.Set(p.ParseTo, newValue); err != nil {
190-
return p.HandleEntryErrorWithWrite(ctx, entry, fmt.Errorf("set parse_to: %w", err), write)
204+
return handle(fmt.Errorf("set parse_to: %w", err))
191205
}
192206

193207
if p.BodyField != nil {
@@ -218,16 +232,16 @@ func (p *ParserOperator) ParseWith(ctx context.Context, entry *entry.Entry, pars
218232

219233
// Handle parsing errors after attempting to parse all
220234
if timeParseErr != nil {
221-
return p.HandleEntryErrorWithWrite(ctx, entry, fmt.Errorf("time parser: %w", timeParseErr), write)
235+
return handle(fmt.Errorf("time parser: %w", timeParseErr))
222236
}
223237
if severityParseErr != nil {
224-
return p.HandleEntryErrorWithWrite(ctx, entry, fmt.Errorf("severity parser: %w", severityParseErr), write)
238+
return handle(fmt.Errorf("severity parser: %w", severityParseErr))
225239
}
226240
if traceParseErr != nil {
227-
return p.HandleEntryErrorWithWrite(ctx, entry, fmt.Errorf("trace parser: %w", traceParseErr), write)
241+
return handle(fmt.Errorf("trace parser: %w", traceParseErr))
228242
}
229243
if scopeNameParserErr != nil {
230-
return p.HandleEntryErrorWithWrite(ctx, entry, fmt.Errorf("scope_name parser: %w", scopeNameParserErr), write)
244+
return handle(fmt.Errorf("scope_name parser: %w", scopeNameParserErr))
231245
}
232246
return nil
233247
}

pkg/stanza/operator/helper/parser_test.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,147 @@ func NewTestParserConfig() ParserConfig {
703703
return expect
704704
}
705705

706+
func TestProcessWithCallback_QuietMode(t *testing.T) {
707+
// Quiet mode swallows processing errors, but send_quiet still surfaces
708+
// downstream write failures so the pipeline can react to delivery errors.
709+
// drop_quiet never calls write so its parse failure is fully suppressed.
710+
testCases := []struct {
711+
name string
712+
onError string
713+
expectWriteWrap bool
714+
}{
715+
{
716+
name: "SendOnErrorQuiet_WriteFailure_PropagatesWriteError",
717+
onError: SendOnErrorQuiet,
718+
expectWriteWrap: true,
719+
},
720+
{
721+
name: "DropOnErrorQuiet_ParseFailure_Suppressed",
722+
onError: DropOnErrorQuiet,
723+
},
724+
}
725+
726+
for _, tc := range testCases {
727+
t.Run(tc.name, func(t *testing.T) {
728+
fakeOut := testutil.NewFakeOutputWithProcessError(t)
729+
set := componenttest.NewNopTelemetrySettings()
730+
set.Logger = zaptest.NewLogger(t)
731+
writer := &WriterOperator{
732+
BasicOperator: BasicOperator{
733+
OperatorID: "test-id",
734+
OperatorType: "test-type",
735+
set: set,
736+
},
737+
OutputIDs: []string{fakeOut.ID()},
738+
}
739+
require.NoError(t, writer.SetOutputs([]operator.Operator{fakeOut}))
740+
741+
parser := ParserOperator{
742+
TransformerOperator: TransformerOperator{
743+
WriterOperator: *writer,
744+
OnError: tc.onError,
745+
},
746+
ParseFrom: entry.NewBodyField(),
747+
ParseTo: entry.NewAttributeField(),
748+
}
749+
750+
parse := func(_ any) (any, error) {
751+
return nil, errors.New("parse failure")
752+
}
753+
754+
ctx := t.Context()
755+
testEntry := entry.New()
756+
err := parser.ProcessWithCallback(ctx, testEntry, parse, nil)
757+
if tc.expectWriteWrap {
758+
require.Error(t, err)
759+
require.Contains(t, err.Error(), "failed to send entry after error")
760+
} else {
761+
require.NoError(t, err, "quiet mode must not propagate the processing error")
762+
}
763+
})
764+
}
765+
}
766+
767+
// TestProcessWithCallback_QuietMode_SkipAndCallbackPaths exercises the four
768+
// code paths in helper/parser.go that previously did not honor quiet mode:
769+
// - ProcessWithCallback Skip error
770+
// - ProcessWithCallback callback error
771+
// - ProcessBatchWithCallback Skip error
772+
// - ProcessBatchWithCallback callback error
773+
//
774+
// All four must be suppressed when the operator is configured in quiet mode.
775+
func TestProcessWithCallback_QuietMode_SkipAndCallbackPaths(t *testing.T) {
776+
parse := func(v any) (any, error) { return v, nil }
777+
okCb := func(_ *entry.Entry) error { return nil }
778+
failingCb := func(_ *entry.Entry) error { return errors.New("callback failure") }
779+
780+
// An "if" expression that always errors during evaluation (because the
781+
// attribute is missing and types do not match), driving the Skip error
782+
// path.
783+
badIfExpr, err := ExprCompileBool(`attributes["missing"] + 1 == 2`)
784+
require.NoError(t, err)
785+
786+
for _, onError := range []string{DropOnErrorQuiet, SendOnErrorQuiet} {
787+
t.Run("ProcessWithCallback_Skip_"+onError, func(t *testing.T) {
788+
writer, _ := writerWithFakeOut(t)
789+
parser := ParserOperator{
790+
TransformerOperator: TransformerOperator{
791+
WriterOperator: *writer,
792+
OnError: onError,
793+
IfExpr: badIfExpr,
794+
},
795+
ParseFrom: entry.NewBodyField(),
796+
ParseTo: entry.NewAttributeField(),
797+
}
798+
require.NoError(t, parser.ProcessWithCallback(t.Context(), entry.New(), parse, okCb),
799+
"Skip error must be suppressed in quiet mode")
800+
})
801+
802+
t.Run("ProcessWithCallback_Callback_"+onError, func(t *testing.T) {
803+
writer, _ := writerWithFakeOut(t)
804+
parser := ParserOperator{
805+
TransformerOperator: TransformerOperator{
806+
WriterOperator: *writer,
807+
OnError: onError,
808+
},
809+
ParseFrom: entry.NewBodyField(),
810+
ParseTo: entry.NewAttributeField(),
811+
}
812+
require.NoError(t, parser.ProcessWithCallback(t.Context(), entry.New(), parse, failingCb),
813+
"callback error must be suppressed in quiet mode")
814+
})
815+
816+
t.Run("ProcessBatchWithCallback_Skip_"+onError, func(t *testing.T) {
817+
writer, _ := writerWithFakeOut(t)
818+
parser := ParserOperator{
819+
TransformerOperator: TransformerOperator{
820+
WriterOperator: *writer,
821+
OnError: onError,
822+
IfExpr: badIfExpr,
823+
},
824+
ParseFrom: entry.NewBodyField(),
825+
ParseTo: entry.NewAttributeField(),
826+
}
827+
require.NoError(t, parser.ProcessBatchWithCallback(t.Context(), []*entry.Entry{entry.New()}, parse, okCb),
828+
"batch Skip error must be suppressed in quiet mode")
829+
})
830+
831+
t.Run("ProcessBatchWithCallback_Callback_"+onError, func(t *testing.T) {
832+
writer, _ := writerWithFakeOut(t)
833+
parser := ParserOperator{
834+
TransformerOperator: TransformerOperator{
835+
WriterOperator: *writer,
836+
OnError: onError,
837+
},
838+
ParseFrom: entry.NewBodyField(),
839+
ParseTo: entry.NewAttributeField(),
840+
}
841+
require.NoError(t, parser.ProcessBatchWithCallback(t.Context(), []*entry.Entry{entry.New()}, parse, failingCb),
842+
"batch callback error must be suppressed in quiet mode")
843+
})
844+
}
845+
}
846+
706847
func writerWithFakeOut(t *testing.T) (*WriterOperator, *testutil.FakeOutput) {
707848
fakeOut := testutil.NewFakeOutput(t)
708849
set := componenttest.NewNopTelemetrySettings()

pkg/stanza/operator/helper/transformer.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,7 @@ func (t *TransformerOperator) ProcessBatchWithTransform(ctx context.Context, ent
106106
}
107107

108108
if err = transform(ent); err != nil {
109-
if handleErr := t.HandleEntryErrorWithWrite(ctx, ent, err, write); handleErr != nil {
110-
// Only append error if not in quiet mode
111-
if !t.isQuietMode() {
112-
errs = append(errs, handleErr)
113-
}
114-
}
109+
errs = append(errs, t.HandleEntryErrorWithWrite(ctx, ent, err, write))
115110
continue
116111
}
117112

@@ -135,21 +130,22 @@ func (t *TransformerOperator) ProcessWith(ctx context.Context, entry *entry.Entr
135130
}
136131

137132
if err := transform(entry); err != nil {
138-
handleErr := t.HandleEntryError(ctx, entry, err)
139-
// Return nil for quiet modes to prevent error from bubbling up
140-
if t.isQuietMode() {
141-
return nil
142-
}
143-
return handleErr
133+
return t.HandleEntryError(ctx, entry, err)
144134
}
145135
return t.Write(ctx, entry)
146136
}
147137

148-
// HandleEntryError will handle an entry error using the on_error strategy.
138+
// HandleEntryError handles an entry error using the on_error strategy.
139+
// In quiet modes (drop_quiet, send_quiet) the processing error is swallowed,
140+
// but a downstream write error from send_quiet is still returned so the
141+
// pipeline can react to delivery failures.
149142
func (t *TransformerOperator) HandleEntryError(ctx context.Context, entry *entry.Entry, err error) error {
150143
return t.HandleEntryErrorWithWrite(ctx, entry, err, t.Write)
151144
}
152145

146+
// HandleEntryErrorWithWrite is like HandleEntryError but uses the supplied write function.
147+
// In quiet modes the processing error is swallowed; a downstream write error
148+
// from send_quiet is still returned.
153149
func (t *TransformerOperator) HandleEntryErrorWithWrite(ctx context.Context, entry *entry.Entry, err error, write WriteFunction) error {
154150
if entry == nil {
155151
return errors.New("got a nil entry, this should not happen and is potentially a bug")
@@ -163,12 +159,16 @@ func (t *TransformerOperator) HandleEntryErrorWithWrite(ctx context.Context, ent
163159
} else {
164160
t.Logger().Error("Failed to process entry", zapAttributes(entry, t.OnError, err)...)
165161
}
162+
166163
if t.OnError == SendOnError || t.OnError == SendOnErrorQuiet {
167164
if writeErr := write(ctx, entry); writeErr != nil {
168-
err = fmt.Errorf("failed to send entry after error: %w", writeErr)
165+
return fmt.Errorf("failed to send entry after error: %w", writeErr)
169166
}
170167
}
171168

169+
if t.isQuietMode() {
170+
return nil
171+
}
172172
return err
173173
}
174174

0 commit comments

Comments
 (0)