Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ func extractStreamingContent(ctx *RequestContext, chunkData map[string]interface
if content, ok := delta["content"].(string); ok && content != "" {
ctx.StreamingContent += content
}
// Reasoning models stream their thinking under delta.reasoning_content.
// Accumulate it so the reconstructed (cached) response carries the same
// reasoning the live stream delivered, instead of silently dropping it.
if reasoning, ok := delta["reasoning_content"].(string); ok && reasoning != "" {
ctx.StreamingReasoning += reasoning
}
}
if finishReason, ok := choice["finish_reason"].(string); ok && finishReason != "" {
ctx.StreamingMetadata["finish_reason"] = finishReason
Expand Down
5 changes: 5 additions & 0 deletions src/semantic-router/pkg/extproc/processor_res_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ func buildReconstructedStreamingResponse(
} else {
message["content"] = nil
}
// Preserve reasoning so a cache hit returns the same reasoning_content the
// live stream delivered (reasoning models stream it under delta.reasoning_content).
if ctx.StreamingReasoning != "" {
message["reasoning_content"] = ctx.StreamingReasoning
}

toolCalls := buildStreamingResponseToolCalls(ctx)
if includeToolCalls && len(toolCalls) > 0 {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package extproc

import (
"encoding/json"
"testing"

"github.com/openai/openai-go"
)

// A reasoning model streams its thinking under delta.reasoning_content. The
// streaming accumulator must capture it, and the reconstructed response that is
// cached must carry it — otherwise a later cache hit returns a response missing
// the reasoning the original live stream delivered.

func TestExtractStreamingContentAccumulatesReasoning(t *testing.T) {
ctx := &RequestContext{StreamingMetadata: map[string]interface{}{}}

chunks := []map[string]interface{}{
{"choices": []interface{}{map[string]interface{}{
"delta": map[string]interface{}{"reasoning_content": "Let me "},
}}},
{"choices": []interface{}{map[string]interface{}{
"delta": map[string]interface{}{"reasoning_content": "think. "},
}}},
{"choices": []interface{}{map[string]interface{}{
"delta": map[string]interface{}{"content": "Answer."},
}}},
}
for _, c := range chunks {
extractStreamingContent(ctx, c)
}

if ctx.StreamingReasoning != "Let me think. " {
t.Fatalf("reasoning not accumulated: %q", ctx.StreamingReasoning)
}
if ctx.StreamingContent != "Answer." {
t.Fatalf("content not accumulated: %q", ctx.StreamingContent)
}
}

func TestReconstructedStreamingResponseIncludesReasoning(t *testing.T) {
ctx := &RequestContext{
StreamingContent: "The answer is 4.",
StreamingReasoning: "2+2 equals 4.",
StreamingMetadata: map[string]interface{}{
"id": "chatcmpl-reasoning",
"model": "qwen3",
"created": int64(1),
},
}

body, err := buildReconstructedStreamingResponse(ctx, openai.CompletionUsage{}, false)
if err != nil {
t.Fatalf("reconstruct error: %v", err)
}

var parsed struct {
Choices []struct {
Message struct {
Content string `json:"content"`
ReasoningContent string `json:"reasoning_content"`
} `json:"message"`
} `json:"choices"`
}
if err := json.Unmarshal(body, &parsed); err != nil {
t.Fatalf("unmarshal: %v", err)
}
if len(parsed.Choices) != 1 {
t.Fatalf("expected 1 choice, got %d", len(parsed.Choices))
}
if parsed.Choices[0].Message.ReasoningContent != "2+2 equals 4." {
t.Fatalf("reconstructed response dropped reasoning_content: %q", parsed.Choices[0].Message.ReasoningContent)
}
if parsed.Choices[0].Message.Content != "The answer is 4." {
t.Fatalf("content mismatch: %q", parsed.Choices[0].Message.Content)
}
}

// A response with no reasoning must not gain an empty reasoning_content field.
func TestReconstructedStreamingResponseOmitsEmptyReasoning(t *testing.T) {
ctx := &RequestContext{
StreamingContent: "hello",
StreamingMetadata: map[string]interface{}{
"id": "chatcmpl-plain",
"model": "qwen3",
"created": int64(1),
},
}
body, err := buildReconstructedStreamingResponse(ctx, openai.CompletionUsage{}, false)
if err != nil {
t.Fatalf("reconstruct error: %v", err)
}
var raw map[string]interface{}
if err := json.Unmarshal(body, &raw); err != nil {
t.Fatalf("unmarshal: %v", err)
}
choice := raw["choices"].([]interface{})[0].(map[string]interface{})
msg := choice["message"].(map[string]interface{})
if _, present := msg["reasoning_content"]; present {
t.Fatal("reasoning_content must be absent when no reasoning was streamed")
}
}
1 change: 1 addition & 0 deletions src/semantic-router/pkg/extproc/request_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type RequestContext struct {
// Streaming accumulation for caching
HasStreamingChunks bool // True when at least one SSE chunk has been received
StreamingContent string // Accumulated content from delta.content
StreamingReasoning string // Accumulated reasoning from delta.reasoning_content
StreamingMetadata map[string]interface{} // id, model, created from first chunk
StreamingToolCalls map[int]*StreamingToolCallState // Accumulated delta.tool_calls keyed by tool index
StreamingComplete bool // True when [DONE] marker received
Expand Down
Loading