Skip to content

Streaming: SSE & WebSocket

Real-time data streaming for chat messages and video.

Server-Sent Events (SSE)

Used for streaming AI chat responses.

Chat Message Streaming

Endpoint:

POST /api/v1/actions/chats/{chatId}/messages

Request:

{
  "prompt": "What plays worked well last game?"
}

Response Stream:

data: {"type": "chunk", "text": "Based on"}
data: {"type": "chunk", "text": " your session data,"}
data: {"type": "chunk", "text": " the most effective"}
data: {"type": "chunk", "text": " plays were..."}
data: {"type": "done"}

Event Types

Type Description
chunk Partial text content
done Stream complete

Client Implementation

const sendMessage = async (prompt: string) => {
  const response = await fetch(`${API_BASE_URL}/chats/${chatId}/messages`, {
    method: "POST",
    headers: {
      Authorization: `Bearer ${token}`,
      "Content-Type": "application/json"
    },
    body: JSON.stringify({ prompt })
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let accumulatedContent = "";

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const text = decoder.decode(value, { stream: true });
    const lines = text.split("\n");

    for (const line of lines) {
      if (!line.startsWith("data: ")) continue;

      try {
        const json = JSON.parse(line.slice(6));

        switch (json.type) {
          case "chunk":
            accumulatedContent += json.text;
            updateMessageUI(accumulatedContent);
            break;

          case "done":
            finalizeMessage(accumulatedContent);
            break;
        }
      } catch (e) {
        // Ignore malformed JSON
      }
    }
  }
};

Debounced UI Updates

To prevent excessive re-renders during fast streaming:

import { useDebouncedCallback } from "use-debounce";

const updateMessageUI = useDebouncedCallback(
  (content: string) => {
    queryClient.setQueryData(["chat", chatId], (old) => ({
      ...old,
      messages: [
        ...old.messages.slice(0, -1),
        { role: "assistant", content }
      ]
    }));
  },
  50 // 50ms debounce
);

Optimistic Updates

// Before sending, add user message optimistically
queryClient.setQueryData(["chat", chatId], (old) => ({
  ...old,
  messages: [
    ...old.messages,
    { role: "user", content: prompt },
    { role: "assistant", content: "" } // Placeholder
  ]
}));

WebSocket Streaming

Used for real-time video frames from cameras.

Connection URL

ws://{serverHostURL}:{WS_PORT}/ws/stream/{cameraId}

Environment: - NEXT_PUBLIC_WS_PORT - WebSocket port (default: 7103) - serverHostURL - From organization metadata

Frame Data Format

interface FrameData {
  frame: string;      // Base64-encoded JPEG
  timestamp: number;  // Unix timestamp (ms)
  camera_id: string;
  detections?: Detection[];
}

interface Detection {
  class: string;           // Object class
  confidence: number;      // 0-1 score
  bbox: [number, number, number, number]; // x, y, width, height
}

Client Implementation

const startStreaming = (cameraId: string) => {
  const wsUrl = `ws://${serverHostURL}:${WS_PORT}/ws/stream/${cameraId}`;
  const ws = new WebSocket(wsUrl);

  ws.onopen = () => {
    console.log("WebSocket connected");
    setIsStreaming(true);
  };

  ws.onmessage = (event) => {
    const frameData: FrameData = JSON.parse(event.data);

    // Update frame state for canvas rendering
    setFrameData(frameData);

    // Forward frame to backend for processing
    postFrameToBackend(frameData);
  };

  ws.onerror = (error) => {
    console.error("WebSocket error:", error);
    toast.error("Stream connection error");
  };

  ws.onclose = () => {
    console.log("WebSocket closed");
    setIsStreaming(false);
  };

  wsRef.current = ws;
};

const stopStreaming = () => {
  if (wsRef.current) {
    wsRef.current.close();
    wsRef.current = null;
  }
  setIsStreaming(false);
};

Frame Processing

Frames are forwarded to the backend for potential processing:

const postFrameToBackend = async (frameData: FrameData) => {
  await fetch(`${API_BASE_URL}/api/v1/actions/actions/send`, {
    method: "POST",
    headers: {
      Authorization: `Bearer ${token}`,
      "Content-Type": "application/json"
    },
    body: JSON.stringify({
      streamName: "webcam",
      streamType: "frame",
      frame: frameData.frame,
      timestamp: frameData.timestamp,
      camera_id: frameData.camera_id
    })
  });
};

Canvas Rendering

const VideoCanvas = ({ frameData, showDetections }) => {
  const canvasRef = useRef<HTMLCanvasElement>(null);

  useEffect(() => {
    if (!frameData?.frame || !canvasRef.current) return;

    const canvas = canvasRef.current;
    const ctx = canvas.getContext("2d");

    // Decode and draw frame
    const img = new Image();
    img.onload = () => {
      canvas.width = img.width;
      canvas.height = img.height;
      ctx.drawImage(img, 0, 0);

      // Draw detection boxes
      if (showDetections && frameData.detections) {
        ctx.strokeStyle = "#00ff00";
        ctx.lineWidth = 2;

        for (const det of frameData.detections) {
          const [x, y, w, h] = det.bbox;
          ctx.strokeRect(x, y, w, h);
          ctx.fillStyle = "#00ff00";
          ctx.fillText(
            `${det.class} (${(det.confidence * 100).toFixed(0)}%)`,
            x, y - 5
          );
        }
      }
    };
    img.src = `data:image/jpeg;base64,${frameData.frame}`;
  }, [frameData, showDetections]);

  return <canvas ref={canvasRef} />;
};

Connection Management

Reconnection Logic

const MAX_RETRIES = 5;
const RETRY_DELAY = 1000;

const connectWithRetry = async (cameraId: string, attempt = 0) => {
  try {
    startStreaming(cameraId);
  } catch (error) {
    if (attempt < MAX_RETRIES) {
      await new Promise(r => setTimeout(r, RETRY_DELAY * Math.pow(2, attempt)));
      return connectWithRetry(cameraId, attempt + 1);
    }
    throw error;
  }
};

Cleanup

// In useCameraStream hook
useEffect(() => {
  return () => {
    if (wsRef.current) {
      wsRef.current.close();
    }
  };
}, []);

Error Handling

SSE Errors

if (!response.ok) {
  throw new Error(`Stream error: ${response.status}`);
}

// Handle aborted streams
if (done && accumulatedContent === "") {
  toast.error("Response was empty");
}

WebSocket Errors

ws.onerror = (error) => {
  logger.error("WebSocket error", { error, cameraId });
  toast.error("Stream connection lost");
  setIsStreaming(false);
};