From dbb3989692e165faab29a1d3f3d9901981c27038 Mon Sep 17 00:00:00 2001 From: theohsiung Date: Thu, 11 Jun 2026 00:40:26 +0800 Subject: [PATCH] [Router] preserve reasoning_content when caching streaming responses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The streaming accumulator only captured delta.content; the reconstructed chat.completion written to the semantic cache therefore dropped the reasoning that reasoning models stream under delta.reasoning_content. A later cache hit for a semantically-similar request then returned a response WITHOUT the reasoning the original live stream delivered — a silent fidelity loss (the non-streaming cache preserves it because it stores the raw upstream body). Accumulate delta.reasoning_content into ctx.StreamingReasoning and include it as message.reasoning_content in the reconstructed response when present. reasoning_content is already a recognized field elsewhere (looper extraction, memory, anthropic outbound). Absent when no reasoning was streamed. Signed-off-by: theohsiung --- .../extproc/processor_res_body_streaming.go | 6 ++ .../pkg/extproc/processor_res_cache.go | 5 + .../processor_res_cache_reasoning_test.go | 102 ++++++++++++++++++ .../pkg/extproc/request_context.go | 1 + 4 files changed, 114 insertions(+) create mode 100644 src/semantic-router/pkg/extproc/processor_res_cache_reasoning_test.go diff --git a/src/semantic-router/pkg/extproc/processor_res_body_streaming.go b/src/semantic-router/pkg/extproc/processor_res_body_streaming.go index f1d78f4eaa..0e2b118a95 100644 --- a/src/semantic-router/pkg/extproc/processor_res_body_streaming.go +++ b/src/semantic-router/pkg/extproc/processor_res_body_streaming.go @@ -156,6 +156,12 @@ func extractStreamingContent(ctx *RequestContext, chunkData map[string]interface if content, ok := delta["content"].(string); ok && content != "" { ctx.StreamingContent += content } + // Reasoning models stream their thinking under delta.reasoning_content. + // Accumulate it so the reconstructed (cached) response carries the same + // reasoning the live stream delivered, instead of silently dropping it. + if reasoning, ok := delta["reasoning_content"].(string); ok && reasoning != "" { + ctx.StreamingReasoning += reasoning + } } if finishReason, ok := choice["finish_reason"].(string); ok && finishReason != "" { ctx.StreamingMetadata["finish_reason"] = finishReason diff --git a/src/semantic-router/pkg/extproc/processor_res_cache.go b/src/semantic-router/pkg/extproc/processor_res_cache.go index 635b561f6c..559d83f5a7 100644 --- a/src/semantic-router/pkg/extproc/processor_res_cache.go +++ b/src/semantic-router/pkg/extproc/processor_res_cache.go @@ -162,6 +162,11 @@ func buildReconstructedStreamingResponse( } else { message["content"] = nil } + // Preserve reasoning so a cache hit returns the same reasoning_content the + // live stream delivered (reasoning models stream it under delta.reasoning_content). + if ctx.StreamingReasoning != "" { + message["reasoning_content"] = ctx.StreamingReasoning + } toolCalls := buildStreamingResponseToolCalls(ctx) if includeToolCalls && len(toolCalls) > 0 { diff --git a/src/semantic-router/pkg/extproc/processor_res_cache_reasoning_test.go b/src/semantic-router/pkg/extproc/processor_res_cache_reasoning_test.go new file mode 100644 index 0000000000..b2ad8d3dcd --- /dev/null +++ b/src/semantic-router/pkg/extproc/processor_res_cache_reasoning_test.go @@ -0,0 +1,102 @@ +package extproc + +import ( + "encoding/json" + "testing" + + "github.com/openai/openai-go" +) + +// A reasoning model streams its thinking under delta.reasoning_content. The +// streaming accumulator must capture it, and the reconstructed response that is +// cached must carry it — otherwise a later cache hit returns a response missing +// the reasoning the original live stream delivered. + +func TestExtractStreamingContentAccumulatesReasoning(t *testing.T) { + ctx := &RequestContext{StreamingMetadata: map[string]interface{}{}} + + chunks := []map[string]interface{}{ + {"choices": []interface{}{map[string]interface{}{ + "delta": map[string]interface{}{"reasoning_content": "Let me "}, + }}}, + {"choices": []interface{}{map[string]interface{}{ + "delta": map[string]interface{}{"reasoning_content": "think. "}, + }}}, + {"choices": []interface{}{map[string]interface{}{ + "delta": map[string]interface{}{"content": "Answer."}, + }}}, + } + for _, c := range chunks { + extractStreamingContent(ctx, c) + } + + if ctx.StreamingReasoning != "Let me think. " { + t.Fatalf("reasoning not accumulated: %q", ctx.StreamingReasoning) + } + if ctx.StreamingContent != "Answer." { + t.Fatalf("content not accumulated: %q", ctx.StreamingContent) + } +} + +func TestReconstructedStreamingResponseIncludesReasoning(t *testing.T) { + ctx := &RequestContext{ + StreamingContent: "The answer is 4.", + StreamingReasoning: "2+2 equals 4.", + StreamingMetadata: map[string]interface{}{ + "id": "chatcmpl-reasoning", + "model": "qwen3", + "created": int64(1), + }, + } + + body, err := buildReconstructedStreamingResponse(ctx, openai.CompletionUsage{}, false) + if err != nil { + t.Fatalf("reconstruct error: %v", err) + } + + var parsed struct { + Choices []struct { + Message struct { + Content string `json:"content"` + ReasoningContent string `json:"reasoning_content"` + } `json:"message"` + } `json:"choices"` + } + if err := json.Unmarshal(body, &parsed); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if len(parsed.Choices) != 1 { + t.Fatalf("expected 1 choice, got %d", len(parsed.Choices)) + } + if parsed.Choices[0].Message.ReasoningContent != "2+2 equals 4." { + t.Fatalf("reconstructed response dropped reasoning_content: %q", parsed.Choices[0].Message.ReasoningContent) + } + if parsed.Choices[0].Message.Content != "The answer is 4." { + t.Fatalf("content mismatch: %q", parsed.Choices[0].Message.Content) + } +} + +// A response with no reasoning must not gain an empty reasoning_content field. +func TestReconstructedStreamingResponseOmitsEmptyReasoning(t *testing.T) { + ctx := &RequestContext{ + StreamingContent: "hello", + StreamingMetadata: map[string]interface{}{ + "id": "chatcmpl-plain", + "model": "qwen3", + "created": int64(1), + }, + } + body, err := buildReconstructedStreamingResponse(ctx, openai.CompletionUsage{}, false) + if err != nil { + t.Fatalf("reconstruct error: %v", err) + } + var raw map[string]interface{} + if err := json.Unmarshal(body, &raw); err != nil { + t.Fatalf("unmarshal: %v", err) + } + choice := raw["choices"].([]interface{})[0].(map[string]interface{}) + msg := choice["message"].(map[string]interface{}) + if _, present := msg["reasoning_content"]; present { + t.Fatal("reasoning_content must be absent when no reasoning was streamed") + } +} diff --git a/src/semantic-router/pkg/extproc/request_context.go b/src/semantic-router/pkg/extproc/request_context.go index d2bf9e1d0d..4dc53bfe87 100644 --- a/src/semantic-router/pkg/extproc/request_context.go +++ b/src/semantic-router/pkg/extproc/request_context.go @@ -74,6 +74,7 @@ type RequestContext struct { // Streaming accumulation for caching HasStreamingChunks bool // True when at least one SSE chunk has been received StreamingContent string // Accumulated content from delta.content + StreamingReasoning string // Accumulated reasoning from delta.reasoning_content StreamingMetadata map[string]interface{} // id, model, created from first chunk StreamingToolCalls map[int]*StreamingToolCallState // Accumulated delta.tool_calls keyed by tool index StreamingComplete bool // True when [DONE] marker received