Streaming: SSE & WebSocket¶
Real-time data streaming for chat messages and video.
Server-Sent Events (SSE)¶
Used for streaming AI chat responses.
Chat Message Streaming¶
Endpoint:
Request:
Response Stream:
data: {"type": "chunk", "text": "Based on"}
data: {"type": "chunk", "text": " your session data,"}
data: {"type": "chunk", "text": " the most effective"}
data: {"type": "chunk", "text": " plays were..."}
data: {"type": "done"}
Event Types¶
| Type | Description |
|---|---|
chunk |
Partial text content |
done |
Stream complete |
Client Implementation¶
const sendMessage = async (prompt: string) => {
const response = await fetch(`${API_BASE_URL}/chats/${chatId}/messages`, {
method: "POST",
headers: {
Authorization: `Bearer ${token}`,
"Content-Type": "application/json"
},
body: JSON.stringify({ prompt })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let accumulatedContent = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value, { stream: true });
const lines = text.split("\n");
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
try {
const json = JSON.parse(line.slice(6));
switch (json.type) {
case "chunk":
accumulatedContent += json.text;
updateMessageUI(accumulatedContent);
break;
case "done":
finalizeMessage(accumulatedContent);
break;
}
} catch (e) {
// Ignore malformed JSON
}
}
}
};
Debounced UI Updates¶
To prevent excessive re-renders during fast streaming:
import { useDebouncedCallback } from "use-debounce";
const updateMessageUI = useDebouncedCallback(
(content: string) => {
queryClient.setQueryData(["chat", chatId], (old) => ({
...old,
messages: [
...old.messages.slice(0, -1),
{ role: "assistant", content }
]
}));
},
50 // 50ms debounce
);
Optimistic Updates¶
// Before sending, add user message optimistically
queryClient.setQueryData(["chat", chatId], (old) => ({
...old,
messages: [
...old.messages,
{ role: "user", content: prompt },
{ role: "assistant", content: "" } // Placeholder
]
}));
WebSocket Streaming¶
Used for real-time video frames from cameras.
Connection URL¶
Environment:
- NEXT_PUBLIC_WS_PORT - WebSocket port (default: 7103)
- serverHostURL - From organization metadata
Frame Data Format¶
interface FrameData {
frame: string; // Base64-encoded JPEG
timestamp: number; // Unix timestamp (ms)
camera_id: string;
detections?: Detection[];
}
interface Detection {
class: string; // Object class
confidence: number; // 0-1 score
bbox: [number, number, number, number]; // x, y, width, height
}
Client Implementation¶
const startStreaming = (cameraId: string) => {
const wsUrl = `ws://${serverHostURL}:${WS_PORT}/ws/stream/${cameraId}`;
const ws = new WebSocket(wsUrl);
ws.onopen = () => {
console.log("WebSocket connected");
setIsStreaming(true);
};
ws.onmessage = (event) => {
const frameData: FrameData = JSON.parse(event.data);
// Update frame state for canvas rendering
setFrameData(frameData);
// Forward frame to backend for processing
postFrameToBackend(frameData);
};
ws.onerror = (error) => {
console.error("WebSocket error:", error);
toast.error("Stream connection error");
};
ws.onclose = () => {
console.log("WebSocket closed");
setIsStreaming(false);
};
wsRef.current = ws;
};
const stopStreaming = () => {
if (wsRef.current) {
wsRef.current.close();
wsRef.current = null;
}
setIsStreaming(false);
};
Frame Processing¶
Frames are forwarded to the backend for potential processing:
const postFrameToBackend = async (frameData: FrameData) => {
await fetch(`${API_BASE_URL}/api/v1/actions/actions/send`, {
method: "POST",
headers: {
Authorization: `Bearer ${token}`,
"Content-Type": "application/json"
},
body: JSON.stringify({
streamName: "webcam",
streamType: "frame",
frame: frameData.frame,
timestamp: frameData.timestamp,
camera_id: frameData.camera_id
})
});
};
Canvas Rendering¶
const VideoCanvas = ({ frameData, showDetections }) => {
const canvasRef = useRef<HTMLCanvasElement>(null);
useEffect(() => {
if (!frameData?.frame || !canvasRef.current) return;
const canvas = canvasRef.current;
const ctx = canvas.getContext("2d");
// Decode and draw frame
const img = new Image();
img.onload = () => {
canvas.width = img.width;
canvas.height = img.height;
ctx.drawImage(img, 0, 0);
// Draw detection boxes
if (showDetections && frameData.detections) {
ctx.strokeStyle = "#00ff00";
ctx.lineWidth = 2;
for (const det of frameData.detections) {
const [x, y, w, h] = det.bbox;
ctx.strokeRect(x, y, w, h);
ctx.fillStyle = "#00ff00";
ctx.fillText(
`${det.class} (${(det.confidence * 100).toFixed(0)}%)`,
x, y - 5
);
}
}
};
img.src = `data:image/jpeg;base64,${frameData.frame}`;
}, [frameData, showDetections]);
return <canvas ref={canvasRef} />;
};
Connection Management¶
Reconnection Logic¶
const MAX_RETRIES = 5;
const RETRY_DELAY = 1000;
const connectWithRetry = async (cameraId: string, attempt = 0) => {
try {
startStreaming(cameraId);
} catch (error) {
if (attempt < MAX_RETRIES) {
await new Promise(r => setTimeout(r, RETRY_DELAY * Math.pow(2, attempt)));
return connectWithRetry(cameraId, attempt + 1);
}
throw error;
}
};
Cleanup¶
// In useCameraStream hook
useEffect(() => {
return () => {
if (wsRef.current) {
wsRef.current.close();
}
};
}, []);
Error Handling¶
SSE Errors¶
if (!response.ok) {
throw new Error(`Stream error: ${response.status}`);
}
// Handle aborted streams
if (done && accumulatedContent === "") {
toast.error("Response was empty");
}