Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export type {
RealTimeClientConnectOptions,
RealTimeClientInitialState,
} from "./realtime/client";
export type { SetInput } from "./realtime/methods";
export type {
ConnectionPhase,
DiagnosticEvent,
Expand All @@ -37,15 +38,14 @@ export type {
SelectedCandidatePairEvent,
SignalingStateEvent,
VideoStallEvent,
} from "./realtime/diagnostics";
export type { SetInput } from "./realtime/methods";
} from "./realtime/observability/diagnostics";
export type { WebRTCStats } from "./realtime/observability/webrtc-stats";
export type {
RealTimeSubscribeClient,
SubscribeEvents,
SubscribeOptions,
} from "./realtime/subscribe-client";
export type { ConnectionState } from "./realtime/types";
export type { WebRTCStats } from "./realtime/webrtc-stats";
export {
type CanonicalModel,
type CustomModelDefinition,
Expand Down
158 changes: 30 additions & 128 deletions packages/sdk/src/realtime/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@ import { type CustomModelDefinition, type ModelDefinition, modelDefinitionSchema
import { modelStateSchema } from "../shared/types";
import { classifyWebrtcError, type DecartSDKError } from "../utils/errors";
import type { Logger } from "../utils/logger";
import type { DiagnosticEvent } from "./diagnostics";
import { createEventBuffer } from "./event-buffer";
import { realtimeMethods, type SetInput } from "./methods";
import { createMirroredStream, type MirroredStream, shouldMirrorTrack } from "./mirror-stream";
import type { DiagnosticEvent } from "./observability/diagnostics";
import { RealtimeObservability } from "./observability/realtime-observability";
import type { WebRTCStats } from "./observability/webrtc-stats";
import {
decodeSubscribeToken,
encodeSubscribeToken,
type RealTimeSubscribeClient,
type SubscribeEvents,
type SubscribeOptions,
} from "./subscribe-client";
import { type ITelemetryReporter, NullTelemetryReporter, TelemetryReporter } from "./telemetry-reporter";
import type { ConnectionState, GenerationTickMessage, SessionIdMessage } from "./types";
import { WebRTCManager } from "./webrtc-manager";
import { type WebRTCStats, WebRTCStatsCollector } from "./webrtc-stats";

async function blobToBase64(blob: Blob): Promise<string> {
return new Promise((resolve, reject) => {
Expand Down Expand Up @@ -164,8 +164,17 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {
}

let webrtcManager: WebRTCManager | undefined;
let telemetryReporter: ITelemetryReporter = new NullTelemetryReporter();
let handleConnectionStateChange: ((state: ConnectionState) => void) | null = null;
const { emitter: eventEmitter, emitOrBuffer, flush, stop } = createEventBuffer<Events>();

const observability = new RealtimeObservability({
telemetryEnabled: opts.telemetryEnabled,
apiKey,
model: options.model.name,
integration,
logger,
onDiagnostic: (event) => emitOrBuffer("diagnostic", event),
onStats: opts.telemetryEnabled ? (stats) => emitOrBuffer("stats", stats) : undefined,
Comment thread
cursor[bot] marked this conversation as resolved.
});
Comment thread
cursor[bot] marked this conversation as resolved.

try {
// Prepare initial image base64 before connection
Expand All @@ -181,20 +190,14 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {

const url = `${baseUrl}${options.model.urlPath}`;

const { emitter: eventEmitter, emitOrBuffer, flush, stop } = createEventBuffer<Events>();

webrtcManager = new WebRTCManager({
webrtcUrl: `${url}?api_key=${encodeURIComponent(apiKey)}&model=${encodeURIComponent(options.model.name)}`,
integration,
logger,
onDiagnostic: (name, data) => {
emitOrBuffer("diagnostic", { name, data } as Events["diagnostic"]);
addTelemetryDiagnostic(name, data);
},
observability,
onRemoteStream,
onConnectionStateChange: (state) => {
emitOrBuffer("connectionChange", state);
handleConnectionStateChange?.(state);
},
onError: (error) => {
logger.error("WebRTC error", { error: error.message });
Expand All @@ -211,56 +214,11 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {

let sessionId: string | null = null;
let subscribeToken: string | null = null;
const pendingTelemetryDiagnostics: Array<{
name: DiagnosticEvent["name"];
data: DiagnosticEvent["data"];
timestamp: number;
}> = [];
let telemetryReporterReady = false;

const addTelemetryDiagnostic = (
name: DiagnosticEvent["name"],
data: DiagnosticEvent["data"],
timestamp: number = Date.now(),
): void => {
if (!opts.telemetryEnabled) {
return;
}

if (!telemetryReporterReady) {
pendingTelemetryDiagnostics.push({ name, data, timestamp });
return;
}

telemetryReporter.addDiagnostic({ name, data, timestamp });
};

const sessionIdListener = (msg: SessionIdMessage) => {
subscribeToken = encodeSubscribeToken(msg.session_id, msg.server_ip, msg.server_port);
sessionId = msg.session_id;

// Start telemetry reporter now that we have a session ID
if (opts.telemetryEnabled) {
if (telemetryReporterReady) {
telemetryReporter.stop();
}

const reporter = new TelemetryReporter({
apiKey,
sessionId: msg.session_id,
model: options.model.name,
integration,
logger,
});
reporter.start();
telemetryReporter = reporter;
telemetryReporterReady = true;

for (const diagnostic of pendingTelemetryDiagnostics) {
telemetryReporter.addDiagnostic(diagnostic);
}
pendingTelemetryDiagnostics.length = 0;
}
observability.sessionStarted(msg.session_id);
};
manager.getWebsocketMessageEmitter().on("sessionId", sessionIdListener);

Expand All @@ -273,78 +231,13 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {

const methods = realtimeMethods(manager, imageToBase64);

let statsCollector: WebRTCStatsCollector | null = null;
let statsCollectorPeerConnection: RTCPeerConnection | null = null;

// Video stall detection state (Twilio pattern: fps < 0.5 = stalled)
const STALL_FPS_THRESHOLD = 0.5;
let videoStalled = false;
let stallStartMs = 0;

const startStatsCollection = (): (() => void) => {
statsCollector?.stop();
videoStalled = false;
stallStartMs = 0;
statsCollector = new WebRTCStatsCollector();
const pc = manager.getPeerConnection();
statsCollectorPeerConnection = pc;
if (pc) {
statsCollector.start(pc, (stats) => {
emitOrBuffer("stats", stats);
telemetryReporter.addStats(stats);

// Stall detection: check if video fps dropped below threshold
const fps = stats.video?.framesPerSecond ?? 0;
if (!videoStalled && stats.video && fps < STALL_FPS_THRESHOLD) {
videoStalled = true;
stallStartMs = Date.now();
emitOrBuffer("diagnostic", { name: "videoStall", data: { stalled: true, durationMs: 0 } });
addTelemetryDiagnostic("videoStall", { stalled: true, durationMs: 0 }, stallStartMs);
} else if (videoStalled && fps >= STALL_FPS_THRESHOLD) {
const durationMs = Date.now() - stallStartMs;
videoStalled = false;
emitOrBuffer("diagnostic", { name: "videoStall", data: { stalled: false, durationMs } });
addTelemetryDiagnostic("videoStall", { stalled: false, durationMs });
}
});
}
return () => {
statsCollector?.stop();
statsCollector = null;
statsCollectorPeerConnection = null;
};
};

handleConnectionStateChange = (state) => {
if (!opts.telemetryEnabled) {
return;
}

if (state !== "connected" && state !== "generating") {
return;
}

const peerConnection = manager.getPeerConnection();
if (!peerConnection || peerConnection === statsCollectorPeerConnection) {
return;
}

startStatsCollection();
};

// Auto-start stats when telemetry is enabled
if (opts.telemetryEnabled) {
startStatsCollection();
}

const client: RealTimeClient = {
set: methods.set,
setPrompt: methods.setPrompt,
isConnected: () => manager.isConnected(),
getConnectionState: () => manager.getConnectionState(),
disconnect: () => {
statsCollector?.stop();
telemetryReporter.stop();
observability.stop();
stop();
manager.cleanup();
mirroredStream?.dispose();
Expand Down Expand Up @@ -372,7 +265,7 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {
flush();
return client;
} catch (error) {
telemetryReporter.stop();
observability.stop();
webrtcManager?.cleanup();
mirroredStream?.dispose();
throw error;
Expand All @@ -386,15 +279,22 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {
const { emitter: eventEmitter, emitOrBuffer, flush, stop } = createEventBuffer<SubscribeEvents>();

let webrtcManager: WebRTCManager | undefined;
const observability = new RealtimeObservability({
telemetryEnabled: opts.telemetryEnabled,
apiKey,
integration,
logger,
onDiagnostic: (event) => emitOrBuffer("diagnostic", event as SubscribeEvents["diagnostic"]),
onStats: opts.telemetryEnabled ? (stats) => emitOrBuffer("stats", stats) : undefined,
});
observability.sessionStarted(sid);
Comment thread
cursor[bot] marked this conversation as resolved.

try {
webrtcManager = new WebRTCManager({
webrtcUrl: subscribeUrl,
integration,
logger,
onDiagnostic: (name, data) => {
emitOrBuffer("diagnostic", { name, data } as SubscribeEvents["diagnostic"]);
},
observability,
onRemoteStream: options.onRemoteStream,
onConnectionStateChange: (state) => {
emitOrBuffer("connectionChange", state);
Expand All @@ -412,6 +312,7 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {
isConnected: () => manager.isConnected(),
getConnectionState: () => manager.getConnectionState(),
disconnect: () => {
observability.stop();
stop();
manager.cleanup();
},
Expand All @@ -422,6 +323,7 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {
flush();
return client;
} catch (error) {
observability.stop();
webrtcManager?.cleanup();
throw error;
}
Expand Down
Loading
Loading