Streaming¶
Track streaming LLM responses with automatic chunk accumulation.
Starting a Streaming Span¶
// Start a streaming span
ctx, streamSpan, _ := opik.StartStreamingSpan(ctx, "stream-response",
opik.WithSpanType(opik.SpanTypeLLM),
opik.WithSpanModel("gpt-4"),
)
Adding Chunks¶
As chunks arrive from your LLM, add them to the span:
for chunk := range streamChannel {
// Add chunk content
streamSpan.AddChunk(chunk.Content)
}
With Chunk Options¶
// Add chunk with token count
streamSpan.AddChunk(chunk.Content,
opik.WithChunkTokenCount(chunk.TokenCount),
)
// Mark final chunk with finish reason
streamSpan.AddChunk(lastChunk.Content,
opik.WithChunkTokenCount(lastChunk.TokenCount),
opik.WithChunkFinishReason("stop"),
)
Ending the Streaming Span¶
// End span - accumulated content is automatically captured
streamSpan.End(ctx)
Complete Example¶
func streamCompletion(ctx context.Context, prompt string) (string, error) {
// Start streaming span
ctx, streamSpan, err := opik.StartStreamingSpan(ctx, "openai-stream",
opik.WithSpanType(opik.SpanTypeLLM),
opik.WithSpanModel("gpt-4"),
opik.WithSpanProvider("openai"),
opik.WithSpanInput(map[string]any{"prompt": prompt}),
)
if err != nil {
return "", err
}
// Create streaming request to OpenAI
stream, err := openaiClient.CreateChatCompletionStream(ctx, openai.ChatCompletionRequest{
Model: "gpt-4",
Messages: []openai.ChatCompletionMessage{{Role: "user", Content: prompt}},
Stream: true,
})
if err != nil {
streamSpan.End(ctx)
return "", err
}
defer stream.Close()
// Collect chunks
var fullResponse strings.Builder
for {
response, err := stream.Recv()
if err == io.EOF {
// Mark final chunk
streamSpan.AddChunk("", opik.WithChunkFinishReason("stop"))
break
}
if err != nil {
streamSpan.End(ctx)
return "", err
}
content := response.Choices[0].Delta.Content
fullResponse.WriteString(content)
// Add chunk to span
streamSpan.AddChunk(content)
// Optionally output to user in real-time
fmt.Print(content)
}
// End span with accumulated output
streamSpan.End(ctx)
return fullResponse.String(), nil
}
Chunk Options¶
| Option | Description |
|---|---|
WithChunkTokenCount(n) |
Number of tokens in this chunk |
WithChunkFinishReason(reason) |
Finish reason (e.g., "stop", "length") |
Stream Accumulator¶
The streaming span automatically accumulates:
- Content: All chunk content concatenated
- Token count: Sum of all chunk token counts
- Duration: Time from first chunk to last
- Finish reason: From the final chunk
Accessing Accumulated Data¶
// Get the accumulated content before ending
accumulated := streamSpan.AccumulatedContent()
// Get chunk count
count := streamSpan.ChunkCount()
// Get total tokens
tokens := streamSpan.TotalTokens()
Best Practices¶
- Start span before streaming: Create the span before initiating the stream
- Add chunks as they arrive: Don't buffer - add immediately for accurate timing
- Include token counts: If available, include for cost tracking
- Mark finish reason: Always mark the final chunk with a finish reason
- Handle errors: End the span even if streaming fails