From 9d45a87a59cbb249096e43046cb14b09edc8df20 Mon Sep 17 00:00:00 2001 From: Verion1 Date: Mon, 11 May 2026 17:32:45 +0300 Subject: [PATCH 1/3] refactor(realtime): introduce RealtimeObservability orchestrator (PR 1 of 3) Move diagnostics/telemetry/stats into a single observability/ folder. Adds RealtimeObservability orchestrator that owns telemetry reporting, stats collection, and diagnostic emission. WebRTCStats gains ~20 standard RTCStats fields. aiortc (webrtc-manager + webrtc-connection) migrates onto the orchestrator. No LiveKit code, no new dependencies, no public API break. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/sdk/src/index.ts | 6 +- packages/sdk/src/realtime/client.ts | 156 +----- .../{ => observability}/diagnostics.ts | 0 .../observability/realtime-observability.ts | 145 +++++ .../{ => observability}/telemetry-reporter.ts | 6 +- .../realtime/observability/webrtc-stats.ts | 511 ++++++++++++++++++ packages/sdk/src/realtime/subscribe-client.ts | 2 +- .../sdk/src/realtime/webrtc-connection.ts | 47 +- packages/sdk/src/realtime/webrtc-manager.ts | 40 +- packages/sdk/src/realtime/webrtc-stats.ts | 233 -------- packages/sdk/tests/unit.test.ts | 123 +++-- 11 files changed, 829 insertions(+), 440 deletions(-) rename packages/sdk/src/realtime/{ => observability}/diagnostics.ts (100%) create mode 100644 packages/sdk/src/realtime/observability/realtime-observability.ts rename packages/sdk/src/realtime/{ => observability}/telemetry-reporter.ts (97%) create mode 100644 packages/sdk/src/realtime/observability/webrtc-stats.ts delete mode 100644 packages/sdk/src/realtime/webrtc-stats.ts diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index 926c2b2c..a776fbd1 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -24,6 +24,7 @@ export type { RealTimeClientConnectOptions, RealTimeClientInitialState, } from "./realtime/client"; +export type { SetInput } from "./realtime/methods"; export type { ConnectionPhase, DiagnosticEvent, @@ -37,15 +38,14 @@ export type { SelectedCandidatePairEvent, SignalingStateEvent, VideoStallEvent, -} from "./realtime/diagnostics"; -export type { SetInput } from "./realtime/methods"; +} from "./realtime/observability/diagnostics"; +export type { WebRTCStats } from "./realtime/observability/webrtc-stats"; export type { RealTimeSubscribeClient, SubscribeEvents, SubscribeOptions, } from "./realtime/subscribe-client"; export type { ConnectionState } from "./realtime/types"; -export type { WebRTCStats } from "./realtime/webrtc-stats"; export { type CanonicalModel, type CustomModelDefinition, diff --git a/packages/sdk/src/realtime/client.ts b/packages/sdk/src/realtime/client.ts index 8327a37d..b7338ce8 100644 --- a/packages/sdk/src/realtime/client.ts +++ b/packages/sdk/src/realtime/client.ts @@ -3,9 +3,11 @@ import { type CustomModelDefinition, type ModelDefinition, modelDefinitionSchema import { modelStateSchema } from "../shared/types"; import { classifyWebrtcError, type DecartSDKError } from "../utils/errors"; import type { Logger } from "../utils/logger"; -import type { DiagnosticEvent } from "./diagnostics"; import { createEventBuffer } from "./event-buffer"; import { realtimeMethods, type SetInput } from "./methods"; +import type { DiagnosticEvent } from "./observability/diagnostics"; +import { RealtimeObservability } from "./observability/realtime-observability"; +import type { WebRTCStats } from "./observability/webrtc-stats"; import { decodeSubscribeToken, encodeSubscribeToken, @@ -13,10 +15,8 @@ import { type SubscribeEvents, type SubscribeOptions, } from "./subscribe-client"; -import { type ITelemetryReporter, NullTelemetryReporter, TelemetryReporter } from "./telemetry-reporter"; import type { ConnectionState, GenerationTickMessage, SessionIdMessage } from "./types"; import { WebRTCManager } from "./webrtc-manager"; -import { type WebRTCStats, WebRTCStatsCollector } from "./webrtc-stats"; async function blobToBase64(blob: Blob): Promise { return new Promise((resolve, reject) => { @@ -138,8 +138,17 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { const inputStream: MediaStream = stream ?? new MediaStream(); let webrtcManager: WebRTCManager | undefined; - let telemetryReporter: ITelemetryReporter = new NullTelemetryReporter(); - let handleConnectionStateChange: ((state: ConnectionState) => void) | null = null; + const { emitter: eventEmitter, emitOrBuffer, flush, stop } = createEventBuffer(); + + const observability = new RealtimeObservability({ + telemetryEnabled: opts.telemetryEnabled, + apiKey, + model: options.model.name, + integration, + logger, + onDiagnostic: (event) => emitOrBuffer("diagnostic", event), + onStats: (stats) => emitOrBuffer("stats", stats), + }); try { // Prepare initial image base64 before connection @@ -155,20 +164,14 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { const url = `${baseUrl}${options.model.urlPath}`; - const { emitter: eventEmitter, emitOrBuffer, flush, stop } = createEventBuffer(); - webrtcManager = new WebRTCManager({ webrtcUrl: `${url}?api_key=${encodeURIComponent(apiKey)}&model=${encodeURIComponent(options.model.name)}`, integration, logger, - onDiagnostic: (name, data) => { - emitOrBuffer("diagnostic", { name, data } as Events["diagnostic"]); - addTelemetryDiagnostic(name, data); - }, + observability, onRemoteStream, onConnectionStateChange: (state) => { emitOrBuffer("connectionChange", state); - handleConnectionStateChange?.(state); }, onError: (error) => { logger.error("WebRTC error", { error: error.message }); @@ -185,56 +188,11 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { let sessionId: string | null = null; let subscribeToken: string | null = null; - const pendingTelemetryDiagnostics: Array<{ - name: DiagnosticEvent["name"]; - data: DiagnosticEvent["data"]; - timestamp: number; - }> = []; - let telemetryReporterReady = false; - - const addTelemetryDiagnostic = ( - name: DiagnosticEvent["name"], - data: DiagnosticEvent["data"], - timestamp: number = Date.now(), - ): void => { - if (!opts.telemetryEnabled) { - return; - } - - if (!telemetryReporterReady) { - pendingTelemetryDiagnostics.push({ name, data, timestamp }); - return; - } - - telemetryReporter.addDiagnostic({ name, data, timestamp }); - }; const sessionIdListener = (msg: SessionIdMessage) => { subscribeToken = encodeSubscribeToken(msg.session_id, msg.server_ip, msg.server_port); sessionId = msg.session_id; - - // Start telemetry reporter now that we have a session ID - if (opts.telemetryEnabled) { - if (telemetryReporterReady) { - telemetryReporter.stop(); - } - - const reporter = new TelemetryReporter({ - apiKey, - sessionId: msg.session_id, - model: options.model.name, - integration, - logger, - }); - reporter.start(); - telemetryReporter = reporter; - telemetryReporterReady = true; - - for (const diagnostic of pendingTelemetryDiagnostics) { - telemetryReporter.addDiagnostic(diagnostic); - } - pendingTelemetryDiagnostics.length = 0; - } + observability.sessionStarted(msg.session_id); }; manager.getWebsocketMessageEmitter().on("sessionId", sessionIdListener); @@ -247,78 +205,13 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { const methods = realtimeMethods(manager, imageToBase64); - let statsCollector: WebRTCStatsCollector | null = null; - let statsCollectorPeerConnection: RTCPeerConnection | null = null; - - // Video stall detection state (Twilio pattern: fps < 0.5 = stalled) - const STALL_FPS_THRESHOLD = 0.5; - let videoStalled = false; - let stallStartMs = 0; - - const startStatsCollection = (): (() => void) => { - statsCollector?.stop(); - videoStalled = false; - stallStartMs = 0; - statsCollector = new WebRTCStatsCollector(); - const pc = manager.getPeerConnection(); - statsCollectorPeerConnection = pc; - if (pc) { - statsCollector.start(pc, (stats) => { - emitOrBuffer("stats", stats); - telemetryReporter.addStats(stats); - - // Stall detection: check if video fps dropped below threshold - const fps = stats.video?.framesPerSecond ?? 0; - if (!videoStalled && stats.video && fps < STALL_FPS_THRESHOLD) { - videoStalled = true; - stallStartMs = Date.now(); - emitOrBuffer("diagnostic", { name: "videoStall", data: { stalled: true, durationMs: 0 } }); - addTelemetryDiagnostic("videoStall", { stalled: true, durationMs: 0 }, stallStartMs); - } else if (videoStalled && fps >= STALL_FPS_THRESHOLD) { - const durationMs = Date.now() - stallStartMs; - videoStalled = false; - emitOrBuffer("diagnostic", { name: "videoStall", data: { stalled: false, durationMs } }); - addTelemetryDiagnostic("videoStall", { stalled: false, durationMs }); - } - }); - } - return () => { - statsCollector?.stop(); - statsCollector = null; - statsCollectorPeerConnection = null; - }; - }; - - handleConnectionStateChange = (state) => { - if (!opts.telemetryEnabled) { - return; - } - - if (state !== "connected" && state !== "generating") { - return; - } - - const peerConnection = manager.getPeerConnection(); - if (!peerConnection || peerConnection === statsCollectorPeerConnection) { - return; - } - - startStatsCollection(); - }; - - // Auto-start stats when telemetry is enabled - if (opts.telemetryEnabled) { - startStatsCollection(); - } - const client: RealTimeClient = { set: methods.set, setPrompt: methods.setPrompt, isConnected: () => manager.isConnected(), getConnectionState: () => manager.getConnectionState(), disconnect: () => { - statsCollector?.stop(); - telemetryReporter.stop(); + observability.stop(); stop(); manager.cleanup(); }, @@ -345,7 +238,7 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { flush(); return client; } catch (error) { - telemetryReporter.stop(); + observability.stop(); webrtcManager?.cleanup(); throw error; } @@ -358,15 +251,20 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { const { emitter: eventEmitter, emitOrBuffer, flush, stop } = createEventBuffer(); let webrtcManager: WebRTCManager | undefined; + const observability = new RealtimeObservability({ + telemetryEnabled: false, + apiKey, + integration, + logger, + onDiagnostic: (event) => emitOrBuffer("diagnostic", event as SubscribeEvents["diagnostic"]), + }); try { webrtcManager = new WebRTCManager({ webrtcUrl: subscribeUrl, integration, logger, - onDiagnostic: (name, data) => { - emitOrBuffer("diagnostic", { name, data } as SubscribeEvents["diagnostic"]); - }, + observability, onRemoteStream: options.onRemoteStream, onConnectionStateChange: (state) => { emitOrBuffer("connectionChange", state); @@ -384,6 +282,7 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { isConnected: () => manager.isConnected(), getConnectionState: () => manager.getConnectionState(), disconnect: () => { + observability.stop(); stop(); manager.cleanup(); }, @@ -394,6 +293,7 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { flush(); return client; } catch (error) { + observability.stop(); webrtcManager?.cleanup(); throw error; } diff --git a/packages/sdk/src/realtime/diagnostics.ts b/packages/sdk/src/realtime/observability/diagnostics.ts similarity index 100% rename from packages/sdk/src/realtime/diagnostics.ts rename to packages/sdk/src/realtime/observability/diagnostics.ts diff --git a/packages/sdk/src/realtime/observability/realtime-observability.ts b/packages/sdk/src/realtime/observability/realtime-observability.ts new file mode 100644 index 00000000..948e83fd --- /dev/null +++ b/packages/sdk/src/realtime/observability/realtime-observability.ts @@ -0,0 +1,145 @@ +import type { Logger } from "../../utils/logger"; +import type { DiagnosticEvent, DiagnosticEventName, DiagnosticEvents } from "./diagnostics"; +import { type ITelemetryReporter, NullTelemetryReporter, TelemetryReporter } from "./telemetry-reporter"; +import { type StatsProvider, type WebRTCStats, WebRTCStatsCollector } from "./webrtc-stats"; + +const STALL_FPS_THRESHOLD = 0.5; + +export type RealtimeObservabilityOptions = { + telemetryEnabled: boolean; + apiKey: string; + model?: string; + integration?: string; + logger: Logger; + onDiagnostic?: (event: DiagnosticEvent) => void; + onStats?: (stats: WebRTCStats) => void; +}; + +type PendingTelemetryDiagnostic = { + name: DiagnosticEvent["name"]; + data: DiagnosticEvent["data"]; + timestamp: number; +}; + +export class RealtimeObservability { + private telemetryReporter: ITelemetryReporter = new NullTelemetryReporter(); + private telemetryReporterReady = false; + private pendingTelemetryDiagnostics: PendingTelemetryDiagnostic[] = []; + private statsCollector: WebRTCStatsCollector | null = null; + private statsCollectorSource: StatsProvider | null = null; + private videoStalled = false; + private stallStartMs = 0; + + constructor(private readonly options: RealtimeObservabilityOptions) {} + + diagnostic(name: K, data: DiagnosticEvents[K], timestamp: number = Date.now()): void { + this.options.logger.debug(name, data as Record); + this.options.onDiagnostic?.({ name, data } as DiagnosticEvent); + this.addTelemetryDiagnostic(name, data, timestamp); + } + + sessionStarted(sessionId: string): void { + if (!this.options.telemetryEnabled) { + return; + } + + if (this.telemetryReporterReady) { + this.telemetryReporter.stop(); + } + + const reporter = new TelemetryReporter({ + apiKey: this.options.apiKey, + sessionId, + model: this.options.model, + integration: this.options.integration, + logger: this.options.logger, + }); + reporter.start(); + this.telemetryReporter = reporter; + this.telemetryReporterReady = true; + + for (const diagnostic of this.pendingTelemetryDiagnostics) { + this.telemetryReporter.addDiagnostic(diagnostic); + } + this.pendingTelemetryDiagnostics.length = 0; + } + + setStatsProvider(source: StatsProvider | null): void { + if (!source) { + this.stopStats(); + return; + } + + if (source === this.statsCollectorSource) { + return; + } + + this.stopStats(); + this.resetStallDetection(); + this.statsCollectorSource = source; + + if (!this.options.telemetryEnabled && !this.options.onStats) { + return; + } + + this.statsCollector = new WebRTCStatsCollector(); + this.statsCollector.start(source, (stats) => this.handleStats(stats)); + } + + stopStats(): void { + this.statsCollector?.stop(); + this.statsCollector = null; + this.statsCollectorSource = null; + this.resetStallDetection(); + } + + stop(): void { + this.stopStats(); + this.telemetryReporter.stop(); + this.telemetryReporter = new NullTelemetryReporter(); + this.telemetryReporterReady = false; + this.pendingTelemetryDiagnostics.length = 0; + } + + private handleStats(stats: WebRTCStats): void { + this.options.onStats?.(stats); + this.telemetryReporter.addStats(stats); + this.detectVideoStall(stats); + } + + private detectVideoStall(stats: WebRTCStats): void { + const fps = stats.video?.framesPerSecond ?? 0; + if (!this.videoStalled && stats.video && fps < STALL_FPS_THRESHOLD) { + this.videoStalled = true; + this.stallStartMs = Date.now(); + this.diagnostic("videoStall", { stalled: true, durationMs: 0 }, this.stallStartMs); + } else if (this.videoStalled && fps >= STALL_FPS_THRESHOLD) { + const durationMs = Date.now() - this.stallStartMs; + this.videoStalled = false; + this.diagnostic("videoStall", { stalled: false, durationMs }); + } + } + + private addTelemetryDiagnostic( + name: K, + data: DiagnosticEvents[K], + timestamp: number, + ): void { + if (!this.options.telemetryEnabled) { + return; + } + + const diagnostic = { name, data, timestamp } as PendingTelemetryDiagnostic; + if (!this.telemetryReporterReady) { + this.pendingTelemetryDiagnostics.push(diagnostic); + return; + } + + this.telemetryReporter.addDiagnostic(diagnostic); + } + + private resetStallDetection(): void { + this.videoStalled = false; + this.stallStartMs = 0; + } +} diff --git a/packages/sdk/src/realtime/telemetry-reporter.ts b/packages/sdk/src/realtime/observability/telemetry-reporter.ts similarity index 97% rename from packages/sdk/src/realtime/telemetry-reporter.ts rename to packages/sdk/src/realtime/observability/telemetry-reporter.ts index 7521418c..d8c7179f 100644 --- a/packages/sdk/src/realtime/telemetry-reporter.ts +++ b/packages/sdk/src/realtime/observability/telemetry-reporter.ts @@ -1,6 +1,6 @@ -import { buildAuthHeaders } from "../shared/request"; -import type { Logger } from "../utils/logger"; -import { VERSION } from "../version"; +import { buildAuthHeaders } from "../../shared/request"; +import type { Logger } from "../../utils/logger"; +import { VERSION } from "../../version"; import type { WebRTCStats } from "./webrtc-stats"; const DEFAULT_REPORT_INTERVAL_MS = 10_000; // 10 seconds diff --git a/packages/sdk/src/realtime/observability/webrtc-stats.ts b/packages/sdk/src/realtime/observability/webrtc-stats.ts new file mode 100644 index 00000000..d4571f0b --- /dev/null +++ b/packages/sdk/src/realtime/observability/webrtc-stats.ts @@ -0,0 +1,511 @@ +export type WebRTCStats = { + timestamp: number; + video: { + framesDecoded: number; + framesDropped: number; + framesReceived: number; + keyFramesDecoded: number; + framesPerSecond: number; + frameWidth: number; + frameHeight: number; + bytesReceived: number; + packetsReceived: number; + packetsLost: number; + jitter: number; + /** Estimated inbound bitrate in bits/sec, computed from bytesReceived delta. */ + bitrate: number; + freezeCount: number; + totalFreezesDuration: number; + /** Delta: packets lost since previous sample. */ + packetsLostDelta: number; + /** Delta: frames dropped since previous sample. */ + framesDroppedDelta: number; + /** Delta: freeze count since previous sample. */ + freezeCountDelta: number; + /** Delta: freeze duration (seconds) since previous sample. */ + freezeDurationDelta: number; + /** NACKs sent to the sender (requesting packet retransmission). */ + nackCount: number; + nackCountDelta: number; + /** PLIs sent to the sender (full frame retransmission request). */ + pliCount: number; + /** FIRs sent to the sender (forced intra-refresh request). */ + firCount: number; + /** + * Average decode time (ms/frame), cumulative since stream start. + * Derived from totalDecodeTime/framesDecoded. `null` if the browser + * hasn't produced the underlying counters yet. + */ + avgDecodeTimeMs: number | null; + /** Average jitter-buffer time (ms/frame emitted). Cumulative. */ + avgJitterBufferMs: number | null; + /** + * Average total processing delay (ms/frame decoded) — from network + * receive to decoder output. Cumulative. + */ + avgProcessingDelayMs: number | null; + /** Average inter-frame delay at the decoder (ms). */ + avgInterFrameDelayMs: number | null; + /** + * Std-dev of inter-frame delay (ms), computed from + * totalInterFrameDelay + totalSquaredInterFrameDelay. + */ + interFrameDelayVarianceMs: number | null; + /** Current target delay of the jitter buffer (ms). */ + jitterBufferTargetDelayMs: number | null; + /** Current minimum delay of the jitter buffer (ms). */ + jitterBufferMinimumDelayMs: number | null; + /** Which decoder the browser picked (e.g. "libvpx", "ExternalDecoder"). */ + decoderImplementation: string; + } | null; + audio: { + bytesReceived: number; + packetsReceived: number; + packetsLost: number; + jitter: number; + /** Estimated inbound bitrate in bits/sec, computed from bytesReceived delta. */ + bitrate: number; + /** Delta: packets lost since previous sample. */ + packetsLostDelta: number; + } | null; + /** Outbound video track stats (from the local camera/screen share being sent). */ + outboundVideo: { + /** Why the encoder is limiting quality: "none", "bandwidth", "cpu", or "other". */ + qualityLimitationReason: string; + /** Cumulative time (seconds) spent in each quality limitation state. */ + qualityLimitationDurations: Record; + bytesSent: number; + packetsSent: number; + framesPerSecond: number; + frameWidth: number; + frameHeight: number; + /** Estimated outbound bitrate in bits/sec, computed from bytesSent delta. */ + bitrate: number; + /** Encoder's current target bitrate in kbps (BWE output). */ + targetBitrateKbps: number | null; + /** Average encode time per frame (ms), cumulative. */ + avgEncodeTimeMs: number | null; + /** Average packet send delay (ms), cumulative. */ + avgPacketSendDelayMs: number | null; + /** Average quantization parameter across encoded frames (lower is better). */ + avgQp: number | null; + /** NACKs received from receiver (retransmission requests). */ + nackCount: number; + /** PLIs received from receiver. */ + pliCount: number; + /** FIRs received from receiver. */ + firCount: number; + retransmittedBytesSent: number; + retransmittedPacketsSent: number; + /** Which encoder the browser picked (e.g. "libvpx", "SimulcastEncoderAdapter"). */ + encoderImplementation: string; + } | null; + /** + * Remote-inbound stats — what the far end reports *about its reception + * of our outbound stream*. Answers "does the server think we're lossy?" + * independently of what we see locally. Populated from + * `remote-inbound-rtp` reports. + */ + remoteInbound: { + fractionLost: number | null; + /** In seconds. */ + jitter: number | null; + /** In seconds. Often more accurate than connection.currentRoundTripTime. */ + roundTripTime: number | null; + } | null; + connection: { + /** Current round-trip time in seconds, or null if unavailable. */ + currentRoundTripTime: number | null; + /** Available outgoing bitrate estimate in bits/sec, or null if unavailable. */ + availableOutgoingBitrate: number | null; + /** + * Selected ICE candidate pairs (usually one per PC). Populated from + * the `candidate-pair` report with state="succeeded" plus the matching + * `local-candidate` / `remote-candidate` lookups. Lets diagnostic tools + * tell direct-UDP sessions from TURN-relayed ones — the path affects + * jitter and failure modes, so this is essential signal for + * benchmarking and incident triage. + */ + selectedCandidatePairs: Array<{ + local: IceCandidateInfo; + remote: IceCandidateInfo; + }>; + }; +}; + +/** One side of an ICE candidate pair (sender or receiver). */ +export type IceCandidateInfo = { + /** "host" | "srflx" | "prflx" | "relay" */ + candidateType: string; + /** IP (v4 or v6). May be `""` for mDNS-obfuscated host candidates. */ + address: string; + port: number; + /** "udp" | "tcp" */ + protocol: string; +}; + +export type StatsOptions = { + /** Polling interval in milliseconds. Default: 1000. Minimum: 500. */ + intervalMs?: number; +}; + +/** + * Source of `RTCStatsReport`-shaped samples for telemetry. `RTCPeerConnection` + * satisfies this interface natively; alternative transports can plug in their + * own adapter that aggregates per-track stats. + */ +export interface StatsProvider { + getStats(): Promise; +} + +const DEFAULT_INTERVAL_MS = 1000; +const MIN_INTERVAL_MS = 500; + +export class WebRTCStatsCollector { + private source: StatsProvider | null = null; + private intervalId: ReturnType | null = null; + private prevBytesVideo = 0; + private prevBytesAudio = 0; + private prevBytesSentVideo = 0; + private prevTimestamp = 0; + // Previous cumulative values for delta computation + private prevPacketsLostVideo = 0; + private prevFramesDropped = 0; + private prevFreezeCount = 0; + private prevFreezeDuration = 0; + private prevPacketsLostAudio = 0; + private prevNackCountInbound = 0; + private onStats: ((stats: WebRTCStats) => void) | null = null; + private intervalMs: number; + + constructor(options: StatsOptions = {}) { + this.intervalMs = Math.max(options.intervalMs ?? DEFAULT_INTERVAL_MS, MIN_INTERVAL_MS); + } + + /** Attach to a stats provider and start polling. */ + start(source: StatsProvider, onStats: (stats: WebRTCStats) => void): void { + this.stop(); + this.source = source; + this.onStats = onStats; + this.prevBytesVideo = 0; + this.prevBytesAudio = 0; + this.prevBytesSentVideo = 0; + this.prevTimestamp = 0; + this.prevPacketsLostVideo = 0; + this.prevFramesDropped = 0; + this.prevFreezeCount = 0; + this.prevFreezeDuration = 0; + this.prevPacketsLostAudio = 0; + this.prevNackCountInbound = 0; + this.intervalId = setInterval(() => this.collect(), this.intervalMs); + } + + /** Stop polling and release resources. */ + stop(): void { + if (this.intervalId !== null) { + clearInterval(this.intervalId); + this.intervalId = null; + } + this.source = null; + this.onStats = null; + } + + isRunning(): boolean { + return this.intervalId !== null; + } + + private async collect(): Promise { + if (!this.source || !this.onStats) return; + + try { + const rawStats = await this.source.getStats(); + const stats = this.parse(rawStats); + this.onStats(stats); + } catch { + // Source might be closed; stop silently + this.stop(); + } + } + + private parse(rawStats: RTCStatsReport): WebRTCStats { + const now = performance.now(); + const elapsed = this.prevTimestamp > 0 ? (now - this.prevTimestamp) / 1000 : 0; + + // Explicit NonNullable aliases so TypeScript can track field + // mutations inside the `forEach` closure below — otherwise it narrows + // the `| null` union to `never` after the first assignment. + type OutboundVideo = NonNullable; + let video: WebRTCStats["video"] = null; + let audio: WebRTCStats["audio"] = null; + let outboundVideo: OutboundVideo | null = null; + let remoteInbound: WebRTCStats["remoteInbound"] = null; + const connection: WebRTCStats["connection"] = { + currentRoundTripTime: null, + availableOutgoingBitrate: null, + selectedCandidatePairs: [], + }; + + // First pass — collect succeeded candidate-pair IDs. Resolving them + // into local/remote candidate objects happens after the main forEach + // so we have access to every report (ordering of rawStats is not + // guaranteed: a succeeded pair's local-candidate may appear before + // or after it). + const succeededPairs: Array<{ localId: string; remoteId: string }> = []; + + rawStats.forEach((report) => { + if (report.type === "inbound-rtp" && report.kind === "video") { + const bytesReceived = ((report as Record).bytesReceived as number) ?? 0; + const bitrate = elapsed > 0 ? ((bytesReceived - this.prevBytesVideo) * 8) / elapsed : 0; + this.prevBytesVideo = bytesReceived; + + const r = report as Record; + const packetsLost = (r.packetsLost as number) ?? 0; + const framesDropped = (r.framesDropped as number) ?? 0; + const freezeCount = (r.freezeCount as number) ?? 0; + const freezeDuration = (r.totalFreezesDuration as number) ?? 0; + const framesDecoded = (r.framesDecoded as number) ?? 0; + const nackCount = (r.nackCount as number) ?? 0; + + // Browser cumulative counters — averages below are + // (cumulativeTotal / denominator). `jitterBufferEmittedCount` is + // the canonical denominator per the WebRTC stats spec for the + // `jitterBuffer*` averages; `framesDecoded` for decode/processing + // averages. + const jbEmitted = (r.jitterBufferEmittedCount as number) ?? 0; + const totalDecodeTime = (r.totalDecodeTime as number) ?? 0; + const totalProcessingDelay = (r.totalProcessingDelay as number) ?? 0; + const totalInterFrameDelay = (r.totalInterFrameDelay as number) ?? 0; + const totalSquaredInterFrameDelay = (r.totalSquaredInterFrameDelay as number) ?? 0; + const jitterBufferDelay = (r.jitterBufferDelay as number) ?? 0; + const jitterBufferTargetDelay = (r.jitterBufferTargetDelay as number) ?? 0; + const jitterBufferMinimumDelay = (r.jitterBufferMinimumDelay as number) ?? 0; + + const avgDecodeTimeMs = framesDecoded > 0 ? (totalDecodeTime / framesDecoded) * 1000 : null; + const avgProcessingDelayMs = framesDecoded > 0 ? (totalProcessingDelay / framesDecoded) * 1000 : null; + const avgInterFrameDelayMs = framesDecoded > 0 ? (totalInterFrameDelay / framesDecoded) * 1000 : null; + // Variance σ² = E[X²] - E[X]² ; std-dev = sqrt(σ²). Report std-dev + // in ms — more actionable than variance for a threshold-based + // "is the path jittery" check. + const interFrameDelayVarianceMs = + framesDecoded > 0 + ? Math.sqrt( + Math.max(0, totalSquaredInterFrameDelay / framesDecoded - (totalInterFrameDelay / framesDecoded) ** 2), + ) * 1000 + : null; + const avgJitterBufferMs = jbEmitted > 0 ? (jitterBufferDelay / jbEmitted) * 1000 : null; + const jitterBufferTargetDelayMs = jbEmitted > 0 ? (jitterBufferTargetDelay / jbEmitted) * 1000 : null; + const jitterBufferMinimumDelayMs = jbEmitted > 0 ? (jitterBufferMinimumDelay / jbEmitted) * 1000 : null; + + video = { + framesDecoded, + framesDropped, + framesReceived: (r.framesReceived as number) ?? 0, + keyFramesDecoded: (r.keyFramesDecoded as number) ?? 0, + framesPerSecond: (r.framesPerSecond as number) ?? 0, + frameWidth: (r.frameWidth as number) ?? 0, + frameHeight: (r.frameHeight as number) ?? 0, + bytesReceived, + packetsReceived: (r.packetsReceived as number) ?? 0, + packetsLost, + jitter: (r.jitter as number) ?? 0, + bitrate: Math.round(bitrate), + freezeCount, + totalFreezesDuration: freezeDuration, + packetsLostDelta: Math.max(0, packetsLost - this.prevPacketsLostVideo), + framesDroppedDelta: Math.max(0, framesDropped - this.prevFramesDropped), + freezeCountDelta: Math.max(0, freezeCount - this.prevFreezeCount), + freezeDurationDelta: Math.max(0, freezeDuration - this.prevFreezeDuration), + nackCount, + nackCountDelta: Math.max(0, nackCount - this.prevNackCountInbound), + pliCount: (r.pliCount as number) ?? 0, + firCount: (r.firCount as number) ?? 0, + avgDecodeTimeMs, + avgJitterBufferMs, + avgProcessingDelayMs, + avgInterFrameDelayMs, + interFrameDelayVarianceMs, + jitterBufferTargetDelayMs, + jitterBufferMinimumDelayMs, + decoderImplementation: (r.decoderImplementation as string) ?? "", + }; + this.prevPacketsLostVideo = packetsLost; + this.prevFramesDropped = framesDropped; + this.prevFreezeCount = freezeCount; + this.prevFreezeDuration = freezeDuration; + this.prevNackCountInbound = nackCount; + } + + if (report.type === "outbound-rtp" && report.kind === "video") { + // Simulcast produces one outbound-rtp report per spatial layer + // (3 layers is common). Earlier versions picked whichever layer + // `forEach` visited last, which (a) underreports total outbound + // traffic and (b) causes bitrate to go violently negative across + // ticks because layer byte counters are independent and the "last + // visited" layer alternates. Accumulate byte/packet totals across + // every layer; pick scalar fields (resolution, fps, quality- + // limitation reason) from the highest-resolution layer so the + // reported frame size matches what's actually on the wire. + const r = report as Record; + const bytesSent = (r.bytesSent as number) ?? 0; + const packetsSent = (r.packetsSent as number) ?? 0; + const frameWidth = (r.frameWidth as number) ?? 0; + const frameHeight = (r.frameHeight as number) ?? 0; + const pixels = frameWidth * frameHeight; + const framesEncoded = (r.framesEncoded as number) ?? 0; + const totalEncodeTime = (r.totalEncodeTime as number) ?? 0; + const totalPacketSendDelay = (r.totalPacketSendDelay as number) ?? 0; + const qpSum = (r.qpSum as number) ?? 0; + const nackCount = (r.nackCount as number) ?? 0; + const pliCount = (r.pliCount as number) ?? 0; + const firCount = (r.firCount as number) ?? 0; + const retransmittedBytesSent = (r.retransmittedBytesSent as number) ?? 0; + const retransmittedPacketsSent = (r.retransmittedPacketsSent as number) ?? 0; + const targetBitrate = (r.targetBitrate as number | undefined) ?? null; + + const avgEncodeTimeMs = framesEncoded > 0 ? (totalEncodeTime / framesEncoded) * 1000 : null; + const avgPacketSendDelayMs = packetsSent > 0 ? (totalPacketSendDelay / packetsSent) * 1000 : null; + const avgQp = framesEncoded > 0 ? qpSum / framesEncoded : null; + + if (outboundVideo === null) { + outboundVideo = { + qualityLimitationReason: (r.qualityLimitationReason as string) ?? "none", + qualityLimitationDurations: (r.qualityLimitationDurations as Record) ?? {}, + bytesSent, + packetsSent, + framesPerSecond: (r.framesPerSecond as number) ?? 0, + frameWidth, + frameHeight, + bitrate: 0, + targetBitrateKbps: targetBitrate != null ? Math.round(targetBitrate / 1000) : null, + avgEncodeTimeMs, + avgPacketSendDelayMs, + avgQp, + nackCount, + pliCount, + firCount, + retransmittedBytesSent, + retransmittedPacketsSent, + encoderImplementation: (r.encoderImplementation as string) ?? "", + }; + } else { + outboundVideo.bytesSent += bytesSent; + outboundVideo.packetsSent += packetsSent; + outboundVideo.nackCount += nackCount; + outboundVideo.pliCount += pliCount; + outboundVideo.firCount += firCount; + outboundVideo.retransmittedBytesSent += retransmittedBytesSent; + outboundVideo.retransmittedPacketsSent += retransmittedPacketsSent; + // Promote scalar fields whenever a higher-resolution layer + // appears — we want reported resolution to match the largest + // active layer, not the lowest. avgEncodeTime / targetBitrate / + // encoderImplementation are also most representative of the + // primary layer. + if (pixels > outboundVideo.frameWidth * outboundVideo.frameHeight) { + outboundVideo.frameWidth = frameWidth; + outboundVideo.frameHeight = frameHeight; + outboundVideo.framesPerSecond = (r.framesPerSecond as number) ?? 0; + outboundVideo.qualityLimitationReason = (r.qualityLimitationReason as string) ?? "none"; + outboundVideo.qualityLimitationDurations = (r.qualityLimitationDurations as Record) ?? {}; + outboundVideo.targetBitrateKbps = targetBitrate != null ? Math.round(targetBitrate / 1000) : null; + outboundVideo.avgEncodeTimeMs = avgEncodeTimeMs; + outboundVideo.avgPacketSendDelayMs = avgPacketSendDelayMs; + outboundVideo.avgQp = avgQp; + outboundVideo.encoderImplementation = (r.encoderImplementation as string) ?? ""; + } + } + } + + if (report.type === "remote-inbound-rtp" && report.kind === "video") { + const r = report as Record; + remoteInbound = { + fractionLost: (r.fractionLost as number | undefined) ?? null, + jitter: (r.jitter as number | undefined) ?? null, + roundTripTime: (r.roundTripTime as number | undefined) ?? null, + }; + } + + if (report.type === "inbound-rtp" && report.kind === "audio") { + const bytesReceived = ((report as Record).bytesReceived as number) ?? 0; + const bitrate = elapsed > 0 ? ((bytesReceived - this.prevBytesAudio) * 8) / elapsed : 0; + this.prevBytesAudio = bytesReceived; + + const r = report as Record; + const audioPacketsLost = (r.packetsLost as number) ?? 0; + audio = { + bytesReceived, + packetsReceived: (r.packetsReceived as number) ?? 0, + packetsLost: audioPacketsLost, + jitter: (r.jitter as number) ?? 0, + bitrate: Math.round(bitrate), + packetsLostDelta: Math.max(0, audioPacketsLost - this.prevPacketsLostAudio), + }; + this.prevPacketsLostAudio = audioPacketsLost; + } + + if (report.type === "candidate-pair") { + const r = report as Record; + if (r.state === "succeeded") { + connection.currentRoundTripTime = (r.currentRoundTripTime as number) ?? null; + connection.availableOutgoingBitrate = (r.availableOutgoingBitrate as number) ?? null; + const localId = r.localCandidateId as string | undefined; + const remoteId = r.remoteCandidateId as string | undefined; + if (localId && remoteId) { + succeededPairs.push({ localId, remoteId }); + } + } + } + }); + + // Resolve candidate IDs to their local/remote-candidate reports now + // that we've seen every entry in the rawStats map. `rawStats.get()` + // is O(1) on the spec-compliant Map, so per-pair resolution is cheap. + if (succeededPairs.length > 0) { + const toInfo = (id: string): IceCandidateInfo | null => { + const c = (rawStats as unknown as Map).get(id) as Record | undefined; + if (!c) return null; + return { + // browsers may report `ip` (older spec) or `address` (newer). Prefer `address`. + candidateType: (c.candidateType as string) ?? "", + address: ((c.address as string) ?? (c.ip as string) ?? "") as string, + port: (c.port as number) ?? 0, + protocol: (c.protocol as string) ?? "", + }; + }; + for (const { localId, remoteId } of succeededPairs) { + const local = toInfo(localId); + const remote = toInfo(remoteId); + if (local && remote) { + connection.selectedCandidatePairs.push({ local, remote }); + } + } + } + + // Compute outbound video bitrate after the loop, now that we know + // the summed bytesSent across all simulcast layers. Doing it per- + // report would misattribute deltas to whichever layer came last. + // + // Cast via `unknown` because TypeScript can't track the non-null + // assignment inside the forEach closure above — flow analysis sees + // only the initial `let outboundVideo = null` and narrows to `never`. + const ov = outboundVideo as unknown as OutboundVideo | null; + if (ov !== null) { + const outBitrate = elapsed > 0 ? ((ov.bytesSent - this.prevBytesSentVideo) * 8) / elapsed : 0; + // Clamp to zero: when tracks are added/removed mid-session (new + // simulcast layer, publisher swap) total bytesSent can transiently + // drop. Negative bitrate is nonsensical to downstream consumers. + ov.bitrate = Math.max(0, Math.round(outBitrate)); + this.prevBytesSentVideo = ov.bytesSent; + } + + this.prevTimestamp = now; + + return { + timestamp: Date.now(), + video, + audio, + outboundVideo, + connection, + remoteInbound, + }; + } +} diff --git a/packages/sdk/src/realtime/subscribe-client.ts b/packages/sdk/src/realtime/subscribe-client.ts index 6b1370f5..d8610d04 100644 --- a/packages/sdk/src/realtime/subscribe-client.ts +++ b/packages/sdk/src/realtime/subscribe-client.ts @@ -1,5 +1,5 @@ import type { DecartSDKError } from "../utils/errors"; -import type { DiagnosticEvent } from "./diagnostics"; +import type { DiagnosticEvent } from "./observability/diagnostics"; import type { ConnectionState } from "./types"; type TokenPayload = { diff --git a/packages/sdk/src/realtime/webrtc-connection.ts b/packages/sdk/src/realtime/webrtc-connection.ts index 6e59a863..7b3a2fc7 100644 --- a/packages/sdk/src/realtime/webrtc-connection.ts +++ b/packages/sdk/src/realtime/webrtc-connection.ts @@ -2,7 +2,8 @@ import mitt from "mitt"; import type { Logger } from "../utils/logger"; import { buildUserAgent } from "../utils/user-agent"; -import type { DiagnosticEmitter, IceCandidateEvent } from "./diagnostics"; +import type { IceCandidateEvent } from "./observability/diagnostics"; +import { RealtimeObservability } from "./observability/realtime-observability"; import type { ConnectionState, GenerationTickMessage, @@ -26,7 +27,7 @@ interface ConnectionCallbacks { initialImage?: string; initialPrompt?: { text: string; enhance?: boolean }; logger?: Logger; - onDiagnostic?: DiagnosticEmitter; + observability?: RealtimeObservability; } type WsMessageEvents = { @@ -36,20 +37,24 @@ type WsMessageEvents = { generationTick: GenerationTickMessage; }; -const noopDiagnostic: DiagnosticEmitter = () => {}; - export class WebRTCConnection { private pc: RTCPeerConnection | null = null; private ws: WebSocket | null = null; private localStream: MediaStream | null = null; private connectionReject: ((error: Error) => void) | null = null; private logger: Logger; - private emitDiagnostic: DiagnosticEmitter; + private observability: RealtimeObservability; state: ConnectionState = "disconnected"; websocketMessagesEmitter = mitt(); constructor(private callbacks: ConnectionCallbacks = {}) { this.logger = callbacks.logger ?? { debug() {}, info() {}, warn() {}, error() {} }; - this.emitDiagnostic = callbacks.onDiagnostic ?? noopDiagnostic; + this.observability = + callbacks.observability ?? + new RealtimeObservability({ + telemetryEnabled: false, + apiKey: "", + logger: this.logger, + }); } getPeerConnection(): RTCPeerConnection | null { @@ -87,7 +92,7 @@ export class WebRTCConnection { this.ws.onopen = () => { clearTimeout(timer); - this.emitDiagnostic("phaseTiming", { + this.observability.diagnostic("phaseTiming", { phase: "websocket", durationMs: performance.now() - wsStart, success: true, @@ -104,7 +109,7 @@ export class WebRTCConnection { this.ws.onerror = () => { clearTimeout(timer); const error = new Error("WebSocket error"); - this.emitDiagnostic("phaseTiming", { + this.observability.diagnostic("phaseTiming", { phase: "websocket", durationMs: performance.now() - wsStart, success: false, @@ -134,7 +139,7 @@ export class WebRTCConnection { }), connectAbort, ]); - this.emitDiagnostic("phaseTiming", { + this.observability.diagnostic("phaseTiming", { phase: "avatar-image", durationMs: performance.now() - imageStart, success: true, @@ -142,7 +147,7 @@ export class WebRTCConnection { } else if (this.callbacks.initialPrompt) { const promptStart = performance.now(); await Promise.race([this.sendInitialPrompt(this.callbacks.initialPrompt), connectAbort]); - this.emitDiagnostic("phaseTiming", { + this.observability.diagnostic("phaseTiming", { phase: "initial-prompt", durationMs: performance.now() - promptStart, success: true, @@ -151,7 +156,7 @@ export class WebRTCConnection { // No image and no prompt — send passthrough (skip for subscribe mode which has no local stream) const nullStart = performance.now(); await Promise.race([this.setImageBase64(null, { prompt: null }), connectAbort]); - this.emitDiagnostic("phaseTiming", { + this.observability.diagnostic("phaseTiming", { phase: "initial-prompt", durationMs: performance.now() - nullStart, success: true, @@ -166,7 +171,7 @@ export class WebRTCConnection { const checkConnection = setInterval(() => { if (this.state === "connected" || this.state === "generating") { clearInterval(checkConnection); - this.emitDiagnostic("phaseTiming", { + this.observability.diagnostic("phaseTiming", { phase: "webrtc-handshake", durationMs: performance.now() - handshakeStart, success: true, @@ -174,7 +179,7 @@ export class WebRTCConnection { resolve(); } else if (this.state === "disconnected") { clearInterval(checkConnection); - this.emitDiagnostic("phaseTiming", { + this.observability.diagnostic("phaseTiming", { phase: "webrtc-handshake", durationMs: performance.now() - handshakeStart, success: false, @@ -183,7 +188,7 @@ export class WebRTCConnection { reject(new Error("Connection lost during WebRTC handshake")); } else if (Date.now() >= deadline) { clearInterval(checkConnection); - this.emitDiagnostic("phaseTiming", { + this.observability.diagnostic("phaseTiming", { phase: "webrtc-handshake", durationMs: performance.now() - handshakeStart, success: false, @@ -198,7 +203,7 @@ export class WebRTCConnection { connectAbort, ]); - this.emitDiagnostic("phaseTiming", { + this.observability.diagnostic("phaseTiming", { phase: "total", durationMs: performance.now() - totalStart, success: true, @@ -282,7 +287,7 @@ export class WebRTCConnection { case "ice-candidate": if (msg.candidate) { await this.pc.addIceCandidate(msg.candidate); - this.emitDiagnostic("iceCandidate", { + this.observability.diagnostic("iceCandidate", { source: "remote", candidateType: (msg.candidate.candidate?.match(/typ (\w+)/)?.[1] as IceCandidateEvent["candidateType"]) ?? "unknown", @@ -439,7 +444,7 @@ export class WebRTCConnection { this.pc.onicecandidate = (e) => { this.send({ type: "ice-candidate", candidate: e.candidate }); if (e.candidate) { - this.emitDiagnostic("iceCandidate", { + this.observability.diagnostic("iceCandidate", { source: "local", candidateType: (e.candidate.type as IceCandidateEvent["candidateType"]) ?? "unknown", protocol: (e.candidate.protocol as IceCandidateEvent["protocol"]) ?? "unknown", @@ -453,7 +458,7 @@ export class WebRTCConnection { this.pc.onconnectionstatechange = () => { if (!this.pc) return; const s = this.pc.connectionState; - this.emitDiagnostic("peerConnectionStateChange", { + this.observability.diagnostic("peerConnectionStateChange", { state: s, previousState: prevPcState, timestampMs: performance.now(), @@ -475,7 +480,7 @@ export class WebRTCConnection { this.pc.oniceconnectionstatechange = () => { if (!this.pc) return; const newIceState = this.pc.iceConnectionState; - this.emitDiagnostic("iceStateChange", { + this.observability.diagnostic("iceStateChange", { state: newIceState, previousState: prevIceState, timestampMs: performance.now(), @@ -492,7 +497,7 @@ export class WebRTCConnection { this.pc.onsignalingstatechange = () => { if (!this.pc) return; const newState = this.pc.signalingState; - this.emitDiagnostic("signalingStateChange", { + this.observability.diagnostic("signalingStateChange", { state: newState, previousState: prevSignalingState, timestampMs: performance.now(), @@ -519,7 +524,7 @@ export class WebRTCConnection { if (r.id === report.remoteCandidateId) remoteCandidate = r as Record; }); if (localCandidate && remoteCandidate) { - this.emitDiagnostic("selectedCandidatePair", { + this.observability.diagnostic("selectedCandidatePair", { local: { candidateType: String(localCandidate.candidateType ?? "unknown"), protocol: String(localCandidate.protocol ?? "unknown"), diff --git a/packages/sdk/src/realtime/webrtc-manager.ts b/packages/sdk/src/realtime/webrtc-manager.ts index 174c897f..7e84f51e 100644 --- a/packages/sdk/src/realtime/webrtc-manager.ts +++ b/packages/sdk/src/realtime/webrtc-manager.ts @@ -1,7 +1,7 @@ import pRetry, { AbortError } from "p-retry"; import type { Logger } from "../utils/logger"; -import type { DiagnosticEmitter } from "./diagnostics"; +import { RealtimeObservability } from "./observability/realtime-observability"; import type { ConnectionState, OutgoingMessage } from "./types"; import { WebRTCConnection } from "./webrtc-connection"; @@ -9,7 +9,7 @@ export interface WebRTCConfig { webrtcUrl: string; integration?: string; logger?: Logger; - onDiagnostic?: DiagnosticEmitter; + observability?: RealtimeObservability; onRemoteStream: (stream: MediaStream) => void; onConnectionStateChange?: (state: ConnectionState) => void; onError?: (error: Error) => void; @@ -42,6 +42,7 @@ export class WebRTCManager { private connection: WebRTCConnection; private config: WebRTCConfig; private logger: Logger; + private observability: RealtimeObservability; private localStream: MediaStream | null = null; private subscribeMode = false; private managerState: ConnectionState = "disconnected"; @@ -49,10 +50,18 @@ export class WebRTCManager { private isReconnecting = false; private intentionalDisconnect = false; private reconnectGeneration = 0; + private statsProviderConnection: RTCPeerConnection | null = null; constructor(config: WebRTCConfig) { this.config = config; this.logger = config.logger ?? { debug() {}, info() {}, warn() {}, error() {} }; + this.observability = + config.observability ?? + new RealtimeObservability({ + telemetryEnabled: false, + apiKey: "", + logger: this.logger, + }); this.connection = new WebRTCConnection({ onRemoteStream: config.onRemoteStream, onStateChange: (state) => this.handleConnectionStateChange(state), @@ -63,7 +72,7 @@ export class WebRTCManager { initialImage: config.initialImage, initialPrompt: config.initialPrompt, logger: this.logger, - onDiagnostic: config.onDiagnostic, + observability: this.observability, }); } @@ -75,9 +84,22 @@ export class WebRTCManager { } } + private syncStatsProvider(): void { + const pc = this.getPeerConnection(); + const isLive = this.managerState === "connected" || this.managerState === "generating"; + if (isLive && pc && pc !== this.statsProviderConnection) { + this.statsProviderConnection = pc; + this.observability.setStatsProvider(pc); + } else if (!isLive && this.statsProviderConnection) { + this.statsProviderConnection = null; + this.observability.setStatsProvider(null); + } + } + private handleConnectionStateChange(state: ConnectionState): void { if (this.intentionalDisconnect) { this.emitState("disconnected"); + this.syncStatsProvider(); return; } @@ -86,6 +108,7 @@ export class WebRTCManager { if (state === "connected" || state === "generating") { this.isReconnecting = false; this.emitState(state); + this.syncStatsProvider(); } return; } @@ -98,6 +121,7 @@ export class WebRTCManager { } this.emitState(state); + this.syncStatsProvider(); } private async reconnect(): Promise { @@ -107,6 +131,8 @@ export class WebRTCManager { const reconnectGeneration = ++this.reconnectGeneration; this.isReconnecting = true; this.emitState("reconnecting"); + this.observability.setStatsProvider(null); + this.statsProviderConnection = null; const reconnectStart = performance.now(); try { @@ -144,7 +170,7 @@ export class WebRTCManager { return; } this.logger.warn("Reconnect attempt failed", { error: error.message, attempt: error.attemptNumber }); - this.config.onDiagnostic?.("reconnect", { + this.observability.diagnostic("reconnect", { attempt: error.attemptNumber, maxAttempts: RETRY_OPTIONS.retries + 1, durationMs: performance.now() - reconnectStart, @@ -162,7 +188,7 @@ export class WebRTCManager { }, }, ); - this.config.onDiagnostic?.("reconnect", { + this.observability.diagnostic("reconnect", { attempt: attemptCount, maxAttempts: RETRY_OPTIONS.retries + 1, durationMs: performance.now() - reconnectStart, @@ -223,6 +249,8 @@ export class WebRTCManager { this.reconnectGeneration += 1; this.connection.cleanup(); this.localStream = null; + this.statsProviderConnection = null; + this.observability.setStatsProvider(null); this.emitState("disconnected"); } @@ -244,7 +272,7 @@ export class WebRTCManager { setImage( imageBase64: string | null, - options?: { prompt?: string; enhance?: boolean; timeout?: number }, + options?: { prompt?: string | null; enhance?: boolean; timeout?: number }, ): Promise { return this.connection.setImageBase64(imageBase64, options); } diff --git a/packages/sdk/src/realtime/webrtc-stats.ts b/packages/sdk/src/realtime/webrtc-stats.ts deleted file mode 100644 index 42319a47..00000000 --- a/packages/sdk/src/realtime/webrtc-stats.ts +++ /dev/null @@ -1,233 +0,0 @@ -export type WebRTCStats = { - timestamp: number; - video: { - framesDecoded: number; - framesDropped: number; - framesPerSecond: number; - frameWidth: number; - frameHeight: number; - bytesReceived: number; - packetsReceived: number; - packetsLost: number; - jitter: number; - /** Estimated inbound bitrate in bits/sec, computed from bytesReceived delta. */ - bitrate: number; - freezeCount: number; - totalFreezesDuration: number; - /** Delta: packets lost since previous sample. */ - packetsLostDelta: number; - /** Delta: frames dropped since previous sample. */ - framesDroppedDelta: number; - /** Delta: freeze count since previous sample. */ - freezeCountDelta: number; - /** Delta: freeze duration (seconds) since previous sample. */ - freezeDurationDelta: number; - } | null; - audio: { - bytesReceived: number; - packetsReceived: number; - packetsLost: number; - jitter: number; - /** Estimated inbound bitrate in bits/sec, computed from bytesReceived delta. */ - bitrate: number; - /** Delta: packets lost since previous sample. */ - packetsLostDelta: number; - } | null; - /** Outbound video track stats (from the local camera/screen share being sent). */ - outboundVideo: { - /** Why the encoder is limiting quality: "none", "bandwidth", "cpu", or "other". */ - qualityLimitationReason: string; - /** Cumulative time (seconds) spent in each quality limitation state. */ - qualityLimitationDurations: Record; - bytesSent: number; - packetsSent: number; - framesPerSecond: number; - frameWidth: number; - frameHeight: number; - /** Estimated outbound bitrate in bits/sec, computed from bytesSent delta. */ - bitrate: number; - } | null; - connection: { - /** Current round-trip time in seconds, or null if unavailable. */ - currentRoundTripTime: number | null; - /** Available outgoing bitrate estimate in bits/sec, or null if unavailable. */ - availableOutgoingBitrate: number | null; - }; -}; - -export type StatsOptions = { - /** Polling interval in milliseconds. Default: 1000. Minimum: 500. */ - intervalMs?: number; -}; - -const DEFAULT_INTERVAL_MS = 1000; -const MIN_INTERVAL_MS = 500; - -export class WebRTCStatsCollector { - private pc: RTCPeerConnection | null = null; - private intervalId: ReturnType | null = null; - private prevBytesVideo = 0; - private prevBytesAudio = 0; - private prevBytesSentVideo = 0; - private prevTimestamp = 0; - // Previous cumulative values for delta computation - private prevPacketsLostVideo = 0; - private prevFramesDropped = 0; - private prevFreezeCount = 0; - private prevFreezeDuration = 0; - private prevPacketsLostAudio = 0; - private onStats: ((stats: WebRTCStats) => void) | null = null; - private intervalMs: number; - - constructor(options: StatsOptions = {}) { - this.intervalMs = Math.max(options.intervalMs ?? DEFAULT_INTERVAL_MS, MIN_INTERVAL_MS); - } - - /** Attach to a peer connection and start polling. */ - start(pc: RTCPeerConnection, onStats: (stats: WebRTCStats) => void): void { - this.stop(); - this.pc = pc; - this.onStats = onStats; - this.prevBytesVideo = 0; - this.prevBytesAudio = 0; - this.prevBytesSentVideo = 0; - this.prevTimestamp = 0; - this.prevPacketsLostVideo = 0; - this.prevFramesDropped = 0; - this.prevFreezeCount = 0; - this.prevFreezeDuration = 0; - this.prevPacketsLostAudio = 0; - this.intervalId = setInterval(() => this.collect(), this.intervalMs); - } - - /** Stop polling and release resources. */ - stop(): void { - if (this.intervalId !== null) { - clearInterval(this.intervalId); - this.intervalId = null; - } - this.pc = null; - this.onStats = null; - } - - isRunning(): boolean { - return this.intervalId !== null; - } - - private async collect(): Promise { - if (!this.pc || !this.onStats) return; - - try { - const rawStats = await this.pc.getStats(); - const stats = this.parse(rawStats); - this.onStats(stats); - } catch { - // PC might be closed; stop silently - this.stop(); - } - } - - private parse(rawStats: RTCStatsReport): WebRTCStats { - const now = performance.now(); - const elapsed = this.prevTimestamp > 0 ? (now - this.prevTimestamp) / 1000 : 0; - - let video: WebRTCStats["video"] = null; - let audio: WebRTCStats["audio"] = null; - let outboundVideo: WebRTCStats["outboundVideo"] = null; - const connection: WebRTCStats["connection"] = { - currentRoundTripTime: null, - availableOutgoingBitrate: null, - }; - - rawStats.forEach((report) => { - if (report.type === "inbound-rtp" && report.kind === "video") { - const bytesReceived = ((report as Record).bytesReceived as number) ?? 0; - const bitrate = elapsed > 0 ? ((bytesReceived - this.prevBytesVideo) * 8) / elapsed : 0; - this.prevBytesVideo = bytesReceived; - - const r = report as Record; - const packetsLost = (r.packetsLost as number) ?? 0; - const framesDropped = (r.framesDropped as number) ?? 0; - const freezeCount = (r.freezeCount as number) ?? 0; - const freezeDuration = (r.totalFreezesDuration as number) ?? 0; - - video = { - framesDecoded: (r.framesDecoded as number) ?? 0, - framesDropped, - framesPerSecond: (r.framesPerSecond as number) ?? 0, - frameWidth: (r.frameWidth as number) ?? 0, - frameHeight: (r.frameHeight as number) ?? 0, - bytesReceived, - packetsReceived: (r.packetsReceived as number) ?? 0, - packetsLost, - jitter: (r.jitter as number) ?? 0, - bitrate: Math.round(bitrate), - freezeCount, - totalFreezesDuration: freezeDuration, - packetsLostDelta: Math.max(0, packetsLost - this.prevPacketsLostVideo), - framesDroppedDelta: Math.max(0, framesDropped - this.prevFramesDropped), - freezeCountDelta: Math.max(0, freezeCount - this.prevFreezeCount), - freezeDurationDelta: Math.max(0, freezeDuration - this.prevFreezeDuration), - }; - this.prevPacketsLostVideo = packetsLost; - this.prevFramesDropped = framesDropped; - this.prevFreezeCount = freezeCount; - this.prevFreezeDuration = freezeDuration; - } - - if (report.type === "outbound-rtp" && report.kind === "video") { - const r = report as Record; - const bytesSent = (r.bytesSent as number) ?? 0; - const outBitrate = elapsed > 0 ? ((bytesSent - this.prevBytesSentVideo) * 8) / elapsed : 0; - this.prevBytesSentVideo = bytesSent; - - outboundVideo = { - qualityLimitationReason: (r.qualityLimitationReason as string) ?? "none", - qualityLimitationDurations: (r.qualityLimitationDurations as Record) ?? {}, - bytesSent, - packetsSent: (r.packetsSent as number) ?? 0, - framesPerSecond: (r.framesPerSecond as number) ?? 0, - frameWidth: (r.frameWidth as number) ?? 0, - frameHeight: (r.frameHeight as number) ?? 0, - bitrate: Math.round(outBitrate), - }; - } - - if (report.type === "inbound-rtp" && report.kind === "audio") { - const bytesReceived = ((report as Record).bytesReceived as number) ?? 0; - const bitrate = elapsed > 0 ? ((bytesReceived - this.prevBytesAudio) * 8) / elapsed : 0; - this.prevBytesAudio = bytesReceived; - - const r = report as Record; - const audioPacketsLost = (r.packetsLost as number) ?? 0; - audio = { - bytesReceived, - packetsReceived: (r.packetsReceived as number) ?? 0, - packetsLost: audioPacketsLost, - jitter: (r.jitter as number) ?? 0, - bitrate: Math.round(bitrate), - packetsLostDelta: Math.max(0, audioPacketsLost - this.prevPacketsLostAudio), - }; - this.prevPacketsLostAudio = audioPacketsLost; - } - - if (report.type === "candidate-pair") { - const r = report as Record; - if (r.state === "succeeded") { - connection.currentRoundTripTime = (r.currentRoundTripTime as number) ?? null; - connection.availableOutgoingBitrate = (r.availableOutgoingBitrate as number) ?? null; - } - } - }); - - this.prevTimestamp = now; - - return { - timestamp: Date.now(), - video, - audio, - outboundVideo, - connection, - }; - } -} diff --git a/packages/sdk/tests/unit.test.ts b/packages/sdk/tests/unit.test.ts index c937fd31..e4fda559 100644 --- a/packages/sdk/tests/unit.test.ts +++ b/packages/sdk/tests/unit.test.ts @@ -1664,12 +1664,12 @@ describe("Subscribe Client", () => { const mgr = this as unknown as { config: { onConnectionStateChange?: (state: import("../src/realtime/types").ConnectionState) => void; - onDiagnostic?: (name: string, data: unknown) => void; }; managerState: import("../src/realtime/types").ConnectionState; + observability: import("../src/realtime/observability/realtime-observability").RealtimeObservability; }; - mgr.config.onDiagnostic?.("phaseTiming", { + mgr.observability.diagnostic("phaseTiming", { phase: "websocket", durationMs: 12, success: true, @@ -1808,24 +1808,22 @@ describe("Subscribe Client", () => { it("restarts stats collection when peer connection changes after reconnect", async () => { const { createRealTimeClient } = await import("../src/realtime/client.js"); const { WebRTCManager } = await import("../src/realtime/webrtc-manager.js"); - const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + const { WebRTCStatsCollector } = await import("../src/realtime/observability/webrtc-stats.js"); - const firstPeerConnection = {} as RTCPeerConnection; - const secondPeerConnection = {} as RTCPeerConnection; + const firstPeerConnection = { getStats: vi.fn().mockResolvedValue(new Map()) } as unknown as RTCPeerConnection; + const secondPeerConnection = { getStats: vi.fn().mockResolvedValue(new Map()) } as unknown as RTCPeerConnection; let currentPeerConnection = firstPeerConnection; - let onConnectionStateChange: ((state: import("../src/realtime/types").ConnectionState) => void) | undefined; const startSpy = vi.spyOn(WebRTCStatsCollector.prototype, "start").mockImplementation(() => {}); const stopSpy = vi.spyOn(WebRTCStatsCollector.prototype, "stop").mockImplementation(() => {}); const connectSpy = vi.spyOn(WebRTCManager.prototype, "connect").mockImplementation(async function () { const mgr = this as unknown as { - config: { onConnectionStateChange?: (state: import("../src/realtime/types").ConnectionState) => void }; managerState: import("../src/realtime/types").ConnectionState; + handleConnectionStateChange: (state: import("../src/realtime/types").ConnectionState) => void; }; - onConnectionStateChange = mgr.config.onConnectionStateChange; mgr.managerState = "connected"; - mgr.config.onConnectionStateChange?.("connected"); + mgr.handleConnectionStateChange("connected"); return true; }); const stateSpy = vi.spyOn(WebRTCManager.prototype, "getConnectionState").mockReturnValue("connected"); @@ -1846,11 +1844,20 @@ describe("Subscribe Client", () => { onRemoteStream: vi.fn(), }); + const internalManager = connectSpy.mock.instances[0] as unknown as { + handleConnectionStateChange: (state: import("../src/realtime/types").ConnectionState) => void; + statsProviderConnection: RTCPeerConnection | null; + managerState: import("../src/realtime/types").ConnectionState; + }; + expect(startSpy).toHaveBeenCalledTimes(1); expect(startSpy.mock.calls[0][0]).toBe(firstPeerConnection); currentPeerConnection = secondPeerConnection; - onConnectionStateChange?.("connected"); + // Simulate underlying connection bouncing: disconnected then reconnected. Manager treats the + // second "connected" as a new peer connection and rotates the stats provider accordingly. + internalManager.statsProviderConnection = null; + internalManager.handleConnectionStateChange("connected"); expect(startSpy).toHaveBeenCalledTimes(2); expect(startSpy.mock.calls[1][0]).toBe(secondPeerConnection); @@ -1867,6 +1874,32 @@ describe("Subscribe Client", () => { } }); + it("keeps explicit observability stats listeners independent from telemetry upload", async () => { + const { RealtimeObservability } = await import("../src/realtime/observability/realtime-observability.js"); + const { WebRTCStatsCollector } = await import("../src/realtime/observability/webrtc-stats.js"); + + const source = { getStats: vi.fn().mockResolvedValue(new Map()) }; + const startSpy = vi.spyOn(WebRTCStatsCollector.prototype, "start").mockImplementation(() => {}); + + try { + const observability = new RealtimeObservability({ + telemetryEnabled: false, + apiKey: "test-key", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + onStats: vi.fn(), + }); + + observability.setStatsProvider(source); + + expect(startSpy).toHaveBeenCalledTimes(1); + expect(startSpy.mock.calls[0][0]).toBe(source); + + observability.stop(); + } finally { + startSpy.mockRestore(); + } + }); + it("subscribe client buffers events until returned", async () => { const { encodeSubscribeToken } = await import("../src/realtime/subscribe-client.js"); const { createRealTimeClient } = await import("../src/realtime/client.js"); @@ -2063,7 +2096,7 @@ describe("WebRTC Error Classification", () => { describe("WebRTCStatsCollector", () => { it("starts and stops polling", async () => { - const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + const { WebRTCStatsCollector } = await import("../src/realtime/observability/webrtc-stats.js"); const collector = new WebRTCStatsCollector(); const mockPC = { @@ -2080,7 +2113,7 @@ describe("WebRTCStatsCollector", () => { }); it("parses inbound video stats", async () => { - const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + const { WebRTCStatsCollector } = await import("../src/realtime/observability/webrtc-stats.js"); vi.useFakeTimers(); try { @@ -2107,7 +2140,7 @@ describe("WebRTCStatsCollector", () => { getStats: vi.fn().mockResolvedValue(statsReport), } as unknown as RTCPeerConnection; - const receivedStats: Array = []; + const receivedStats: Array = []; collector.start(mockPC, (stats) => receivedStats.push(stats)); await vi.advanceTimersByTimeAsync(1000); @@ -2131,7 +2164,7 @@ describe("WebRTCStatsCollector", () => { }); it("parses inbound audio and candidate-pair stats", async () => { - const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + const { WebRTCStatsCollector } = await import("../src/realtime/observability/webrtc-stats.js"); vi.useFakeTimers(); try { @@ -2161,7 +2194,7 @@ describe("WebRTCStatsCollector", () => { getStats: vi.fn().mockResolvedValue(statsReport), } as unknown as RTCPeerConnection; - const receivedStats: Array = []; + const receivedStats: Array = []; collector.start(mockPC, (stats) => receivedStats.push(stats)); await vi.advanceTimersByTimeAsync(1000); @@ -2180,7 +2213,7 @@ describe("WebRTCStatsCollector", () => { }); it("computes video bitrate from bytesReceived delta", async () => { - const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + const { WebRTCStatsCollector } = await import("../src/realtime/observability/webrtc-stats.js"); vi.useFakeTimers(); try { @@ -2213,7 +2246,7 @@ describe("WebRTCStatsCollector", () => { }), } as unknown as RTCPeerConnection; - const receivedStats: Array = []; + const receivedStats: Array = []; collector.start(mockPC, (stats) => receivedStats.push(stats)); // First tick: no previous data, bitrate = 0 @@ -2231,7 +2264,7 @@ describe("WebRTCStatsCollector", () => { }); it("enforces minimum interval of 500ms", async () => { - const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + const { WebRTCStatsCollector } = await import("../src/realtime/observability/webrtc-stats.js"); vi.useFakeTimers(); try { @@ -2259,7 +2292,7 @@ describe("WebRTCStatsCollector", () => { }); it("stops silently if getStats throws", async () => { - const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + const { WebRTCStatsCollector } = await import("../src/realtime/observability/webrtc-stats.js"); vi.useFakeTimers(); try { @@ -2286,7 +2319,7 @@ describe("WebRTCStatsCollector", () => { describe("TelemetryReporter", () => { it("buffers stats and diagnostics then flushes on interval", async () => { - const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + const { TelemetryReporter } = await import("../src/realtime/observability/telemetry-reporter.js"); vi.useFakeTimers(); @@ -2340,7 +2373,7 @@ describe("TelemetryReporter", () => { }); it("does not send empty reports", async () => { - const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + const { TelemetryReporter } = await import("../src/realtime/observability/telemetry-reporter.js"); vi.useFakeTimers(); @@ -2370,7 +2403,7 @@ describe("TelemetryReporter", () => { }); it("stop discards buffered data without sending a request", async () => { - const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + const { TelemetryReporter } = await import("../src/realtime/observability/telemetry-reporter.js"); const fetchMock = vi.fn().mockResolvedValue({ ok: true }); vi.stubGlobal("fetch", fetchMock); @@ -2405,7 +2438,7 @@ describe("TelemetryReporter", () => { }); it("silently handles fetch failures", async () => { - const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + const { TelemetryReporter } = await import("../src/realtime/observability/telemetry-reporter.js"); const fetchMock = vi.fn().mockRejectedValue(new Error("network error")); vi.stubGlobal("fetch", fetchMock); @@ -2433,7 +2466,7 @@ describe("TelemetryReporter", () => { }); it("includes auth headers and sdk version in report", async () => { - const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + const { TelemetryReporter } = await import("../src/realtime/observability/telemetry-reporter.js"); const fetchMock = vi.fn().mockResolvedValue({ ok: true }); vi.stubGlobal("fetch", fetchMock); @@ -2468,7 +2501,7 @@ describe("TelemetryReporter", () => { }); it("clears buffers after sending", async () => { - const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + const { TelemetryReporter } = await import("../src/realtime/observability/telemetry-reporter.js"); vi.useFakeTimers(); @@ -2508,7 +2541,7 @@ describe("TelemetryReporter", () => { }); it("chunks reports when buffers exceed 120 items", async () => { - const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + const { TelemetryReporter } = await import("../src/realtime/observability/telemetry-reporter.js"); const fetchMock = vi.fn().mockResolvedValue({ ok: true }); vi.stubGlobal("fetch", fetchMock); @@ -2546,7 +2579,7 @@ describe("TelemetryReporter", () => { }); it("warns on non-2xx response status", async () => { - const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + const { TelemetryReporter } = await import("../src/realtime/observability/telemetry-reporter.js"); const fetchMock = vi.fn().mockResolvedValue({ ok: false, status: 500, statusText: "Internal Server Error" }); vi.stubGlobal("fetch", fetchMock); @@ -2583,7 +2616,7 @@ describe("TelemetryReporter", () => { }); it("includes model in report body and tags when provided", async () => { - const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + const { TelemetryReporter } = await import("../src/realtime/observability/telemetry-reporter.js"); const fetchMock = vi.fn().mockResolvedValue({ ok: true }); vi.stubGlobal("fetch", fetchMock); @@ -2614,7 +2647,7 @@ describe("TelemetryReporter", () => { }); it("omits model from report when not provided", async () => { - const { TelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + const { TelemetryReporter } = await import("../src/realtime/observability/telemetry-reporter.js"); const fetchMock = vi.fn().mockResolvedValue({ ok: true }); vi.stubGlobal("fetch", fetchMock); @@ -2927,7 +2960,7 @@ describe("WebSockets Connection", () => { describe("NullTelemetryReporter", () => { it("all methods are no-ops", async () => { - const { NullTelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + const { NullTelemetryReporter } = await import("../src/realtime/observability/telemetry-reporter.js"); const reporter = new NullTelemetryReporter(); // None of these should throw @@ -2945,7 +2978,7 @@ describe("NullTelemetryReporter", () => { }); it("implements ITelemetryReporter interface", async () => { - const { NullTelemetryReporter } = await import("../src/realtime/telemetry-reporter.js"); + const { NullTelemetryReporter } = await import("../src/realtime/observability/telemetry-reporter.js"); const reporter = new NullTelemetryReporter(); expect(typeof reporter.start).toBe("function"); @@ -2958,7 +2991,7 @@ describe("NullTelemetryReporter", () => { describe("Outbound Video Stats", () => { it("parses outbound-rtp video with quality limitation tracking", async () => { - const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + const { WebRTCStatsCollector } = await import("../src/realtime/observability/webrtc-stats.js"); vi.useFakeTimers(); try { @@ -2981,7 +3014,7 @@ describe("Outbound Video Stats", () => { getStats: vi.fn().mockResolvedValue(statsReport), } as unknown as RTCPeerConnection; - const receivedStats: Array = []; + const receivedStats: Array = []; collector.start(mockPC, (stats) => receivedStats.push(stats)); await vi.advanceTimersByTimeAsync(1000); @@ -3006,7 +3039,7 @@ describe("Outbound Video Stats", () => { }); it("returns null outboundVideo when no outbound-rtp report", async () => { - const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + const { WebRTCStatsCollector } = await import("../src/realtime/observability/webrtc-stats.js"); vi.useFakeTimers(); try { @@ -3016,7 +3049,7 @@ describe("Outbound Video Stats", () => { getStats: vi.fn().mockResolvedValue(new Map()), } as unknown as RTCPeerConnection; - const receivedStats: Array = []; + const receivedStats: Array = []; collector.start(mockPC, (stats) => receivedStats.push(stats)); await vi.advanceTimersByTimeAsync(1000); @@ -3030,7 +3063,7 @@ describe("Outbound Video Stats", () => { }); it("computes outbound video bitrate from bytesSent delta", async () => { - const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + const { WebRTCStatsCollector } = await import("../src/realtime/observability/webrtc-stats.js"); vi.useFakeTimers(); try { @@ -3059,7 +3092,7 @@ describe("Outbound Video Stats", () => { }), } as unknown as RTCPeerConnection; - const receivedStats: Array = []; + const receivedStats: Array = []; collector.start(mockPC, (stats) => receivedStats.push(stats)); // First tick: no previous data, bitrate = 0 @@ -3079,7 +3112,7 @@ describe("Outbound Video Stats", () => { describe("Delta computation for cumulative counters", () => { it("computes packetsLostDelta, framesDroppedDelta, freezeCountDelta, freezeDurationDelta", async () => { - const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + const { WebRTCStatsCollector } = await import("../src/realtime/observability/webrtc-stats.js"); vi.useFakeTimers(); try { @@ -3119,7 +3152,7 @@ describe("Delta computation for cumulative counters", () => { }), } as unknown as RTCPeerConnection; - const receivedStats: Array = []; + const receivedStats: Array = []; collector.start(mockPC, (stats) => receivedStats.push(stats)); // First tick: delta = cumulative (since prev was 0) @@ -3143,7 +3176,7 @@ describe("Delta computation for cumulative counters", () => { }); it("computes audio packetsLostDelta", async () => { - const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + const { WebRTCStatsCollector } = await import("../src/realtime/observability/webrtc-stats.js"); vi.useFakeTimers(); try { @@ -3169,7 +3202,7 @@ describe("Delta computation for cumulative counters", () => { }), } as unknown as RTCPeerConnection; - const receivedStats: Array = []; + const receivedStats: Array = []; collector.start(mockPC, (stats) => receivedStats.push(stats)); await vi.advanceTimersByTimeAsync(1000); @@ -3185,7 +3218,7 @@ describe("Delta computation for cumulative counters", () => { }); it("clamps deltas to zero if cumulative counter resets", async () => { - const { WebRTCStatsCollector } = await import("../src/realtime/webrtc-stats.js"); + const { WebRTCStatsCollector } = await import("../src/realtime/observability/webrtc-stats.js"); vi.useFakeTimers(); try { @@ -3222,7 +3255,7 @@ describe("Delta computation for cumulative counters", () => { }), } as unknown as RTCPeerConnection; - const receivedStats: Array = []; + const receivedStats: Array = []; collector.start(mockPC, (stats) => receivedStats.push(stats)); await vi.advanceTimersByTimeAsync(1000); @@ -3242,7 +3275,7 @@ describe("Delta computation for cumulative counters", () => { describe("VideoStall Diagnostic", () => { it("videoStall event type exists in DiagnosticEvents", async () => { // Type-level check: videoStall is a valid DiagnosticEventName - const event: import("../src/realtime/diagnostics.js").DiagnosticEvent = { + const event: import("../src/realtime/observability/diagnostics.js").DiagnosticEvent = { name: "videoStall", data: { stalled: true, durationMs: 0 }, }; @@ -3252,7 +3285,7 @@ describe("VideoStall Diagnostic", () => { }); it("videoStall recovery includes duration", () => { - const event: import("../src/realtime/diagnostics.js").DiagnosticEvent = { + const event: import("../src/realtime/observability/diagnostics.js").DiagnosticEvent = { name: "videoStall", data: { stalled: false, durationMs: 1500 }, }; From 0f5169d1b417d962fef5c04a382f1b1918589684 Mon Sep 17 00:00:00 2001 From: Adir Amsalem Date: Wed, 13 May 2026 00:26:35 -0700 Subject: [PATCH 2/3] fix(realtime): propagate observability telemetry options (#133) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Propagate the SDK `telemetryEnabled` option through the realtime subscribe path instead of forcing telemetry off. - Require injected `RealtimeObservability` in `WebRTCManager` / `WebRTCConnection`, removing the hard-coded disabled observability fallback with an empty API key. - Rename inter-frame delay `Variance` stat to `StdDev` to match the actual computation. - `pnpm --filter @decartai/sdk typecheck` - `pnpm --filter @decartai/sdk format:check` (passes with existing warnings/info) - `pnpm --filter @decartai/sdk test -- --run` - `pnpm --filter @decartai/sdk build` (passes; existing tsdown warning about `define` option) - Independent Claude Code review of diff vs `origin/alon/livekit-pr1-observability` Base for DecartAI/sdk#132. --- > [!NOTE] > **Medium Risk** > Changes realtime telemetry behavior (subscribe path now honors `telemetryEnabled`) and makes `WebRTCManager`/`WebRTCConnection` require an injected `RealtimeObservability`, which can be a breaking change for any consumers constructing these classes directly. Renaming a `WebRTCStats` field also risks downstream type/runtime expectations. > > **Overview** > Realtime subscribe sessions now honor the SDK’s `telemetryEnabled` option and immediately call `observability.sessionStarted(sid)` so telemetry/diagnostics are associated with the session. > > `WebRTCManager` and `WebRTCConnection` no longer create an internal fallback `RealtimeObservability` (previously hard-coded telemetry off with an empty API key); callers must pass `observability` explicitly, and unit tests are updated accordingly. > > In `WebRTCStats`, the inter-frame delay metric is renamed from `interFrameDelayVarianceMs` to `interFrameDelayStdDevMs` to match the actual std-dev calculation. > > Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 16388f0bc749fb1c8c8bbeea2077d96797e7f7ce. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot). --- packages/sdk/src/realtime/client.ts | 3 +- .../realtime/observability/webrtc-stats.ts | 8 ++--- .../sdk/src/realtime/webrtc-connection.ts | 14 +++------ packages/sdk/src/realtime/webrtc-manager.ts | 12 ++----- packages/sdk/tests/unit.test.ts | 31 +++++++++++++------ 5 files changed, 34 insertions(+), 34 deletions(-) diff --git a/packages/sdk/src/realtime/client.ts b/packages/sdk/src/realtime/client.ts index b7338ce8..ab8be410 100644 --- a/packages/sdk/src/realtime/client.ts +++ b/packages/sdk/src/realtime/client.ts @@ -252,12 +252,13 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { let webrtcManager: WebRTCManager | undefined; const observability = new RealtimeObservability({ - telemetryEnabled: false, + telemetryEnabled: opts.telemetryEnabled, apiKey, integration, logger, onDiagnostic: (event) => emitOrBuffer("diagnostic", event as SubscribeEvents["diagnostic"]), }); + observability.sessionStarted(sid); try { webrtcManager = new WebRTCManager({ diff --git a/packages/sdk/src/realtime/observability/webrtc-stats.ts b/packages/sdk/src/realtime/observability/webrtc-stats.ts index d4571f0b..20c24f9b 100644 --- a/packages/sdk/src/realtime/observability/webrtc-stats.ts +++ b/packages/sdk/src/realtime/observability/webrtc-stats.ts @@ -50,7 +50,7 @@ export type WebRTCStats = { * Std-dev of inter-frame delay (ms), computed from * totalInterFrameDelay + totalSquaredInterFrameDelay. */ - interFrameDelayVarianceMs: number | null; + interFrameDelayStdDevMs: number | null; /** Current target delay of the jitter buffer (ms). */ jitterBufferTargetDelayMs: number | null; /** Current minimum delay of the jitter buffer (ms). */ @@ -283,10 +283,10 @@ export class WebRTCStatsCollector { const avgDecodeTimeMs = framesDecoded > 0 ? (totalDecodeTime / framesDecoded) * 1000 : null; const avgProcessingDelayMs = framesDecoded > 0 ? (totalProcessingDelay / framesDecoded) * 1000 : null; const avgInterFrameDelayMs = framesDecoded > 0 ? (totalInterFrameDelay / framesDecoded) * 1000 : null; - // Variance σ² = E[X²] - E[X]² ; std-dev = sqrt(σ²). Report std-dev + // Variance σ² = E[X²] - E[X]²; std-dev = sqrt(σ²). Report std-dev // in ms — more actionable than variance for a threshold-based // "is the path jittery" check. - const interFrameDelayVarianceMs = + const interFrameDelayStdDevMs = framesDecoded > 0 ? Math.sqrt( Math.max(0, totalSquaredInterFrameDelay / framesDecoded - (totalInterFrameDelay / framesDecoded) ** 2), @@ -323,7 +323,7 @@ export class WebRTCStatsCollector { avgJitterBufferMs, avgProcessingDelayMs, avgInterFrameDelayMs, - interFrameDelayVarianceMs, + interFrameDelayStdDevMs, jitterBufferTargetDelayMs, jitterBufferMinimumDelayMs, decoderImplementation: (r.decoderImplementation as string) ?? "", diff --git a/packages/sdk/src/realtime/webrtc-connection.ts b/packages/sdk/src/realtime/webrtc-connection.ts index 7b3a2fc7..e9ca389a 100644 --- a/packages/sdk/src/realtime/webrtc-connection.ts +++ b/packages/sdk/src/realtime/webrtc-connection.ts @@ -3,7 +3,7 @@ import mitt from "mitt"; import type { Logger } from "../utils/logger"; import { buildUserAgent } from "../utils/user-agent"; import type { IceCandidateEvent } from "./observability/diagnostics"; -import { RealtimeObservability } from "./observability/realtime-observability"; +import type { RealtimeObservability } from "./observability/realtime-observability"; import type { ConnectionState, GenerationTickMessage, @@ -27,7 +27,7 @@ interface ConnectionCallbacks { initialImage?: string; initialPrompt?: { text: string; enhance?: boolean }; logger?: Logger; - observability?: RealtimeObservability; + observability: RealtimeObservability; } type WsMessageEvents = { @@ -46,15 +46,9 @@ export class WebRTCConnection { private observability: RealtimeObservability; state: ConnectionState = "disconnected"; websocketMessagesEmitter = mitt(); - constructor(private callbacks: ConnectionCallbacks = {}) { + constructor(private callbacks: ConnectionCallbacks) { this.logger = callbacks.logger ?? { debug() {}, info() {}, warn() {}, error() {} }; - this.observability = - callbacks.observability ?? - new RealtimeObservability({ - telemetryEnabled: false, - apiKey: "", - logger: this.logger, - }); + this.observability = callbacks.observability; } getPeerConnection(): RTCPeerConnection | null { diff --git a/packages/sdk/src/realtime/webrtc-manager.ts b/packages/sdk/src/realtime/webrtc-manager.ts index 7e84f51e..b8993804 100644 --- a/packages/sdk/src/realtime/webrtc-manager.ts +++ b/packages/sdk/src/realtime/webrtc-manager.ts @@ -1,7 +1,7 @@ import pRetry, { AbortError } from "p-retry"; import type { Logger } from "../utils/logger"; -import { RealtimeObservability } from "./observability/realtime-observability"; +import type { RealtimeObservability } from "./observability/realtime-observability"; import type { ConnectionState, OutgoingMessage } from "./types"; import { WebRTCConnection } from "./webrtc-connection"; @@ -9,7 +9,7 @@ export interface WebRTCConfig { webrtcUrl: string; integration?: string; logger?: Logger; - observability?: RealtimeObservability; + observability: RealtimeObservability; onRemoteStream: (stream: MediaStream) => void; onConnectionStateChange?: (state: ConnectionState) => void; onError?: (error: Error) => void; @@ -55,13 +55,7 @@ export class WebRTCManager { constructor(config: WebRTCConfig) { this.config = config; this.logger = config.logger ?? { debug() {}, info() {}, warn() {}, error() {} }; - this.observability = - config.observability ?? - new RealtimeObservability({ - telemetryEnabled: false, - apiKey: "", - logger: this.logger, - }); + this.observability = config.observability; this.connection = new WebRTCConnection({ onRemoteStream: config.onRemoteStream, onStateChange: (state) => this.handleConnectionStateChange(state), diff --git a/packages/sdk/tests/unit.test.ts b/packages/sdk/tests/unit.test.ts index e4fda559..2d62a370 100644 --- a/packages/sdk/tests/unit.test.ts +++ b/packages/sdk/tests/unit.test.ts @@ -17,6 +17,7 @@ import { resolveCanonicalModelAlias, resolveModelAlias, } from "../src/index.js"; +import { RealtimeObservability } from "../src/realtime/observability/realtime-observability.js"; import { _resetDeprecationWarnings, canonicalImageModels, @@ -33,6 +34,14 @@ const MOCK_RESPONSE_DATA = new Uint8Array([0x00, 0x01, 0x02]).buffer; const TEST_API_KEY = "test-api-key"; const BASE_URL = "http://localhost"; +const TEST_LOGGER = { debug() {}, info() {}, warn() {}, error() {} }; +const createTestObservability = () => + new RealtimeObservability({ + telemetryEnabled: false, + apiKey: TEST_API_KEY, + logger: TEST_LOGGER, + }); + describe("Decart SDK", () => { describe("createDecartClient", () => { afterEach(() => { @@ -1163,14 +1172,14 @@ describe("WebRTCConnection", () => { describe("setImageBase64", () => { it("rejects immediately when WebSocket is not open", async () => { const { WebRTCConnection } = await import("../src/realtime/webrtc-connection.js"); - const connection = new WebRTCConnection(); + const connection = new WebRTCConnection({ observability: createTestObservability() }); await expect(connection.setImageBase64("base64data", { timeout: 5000 })).rejects.toThrow("WebSocket is not open"); }); it("rejects immediately with default timeout when WebSocket is not open", async () => { const { WebRTCConnection } = await import("../src/realtime/webrtc-connection.js"); - const connection = new WebRTCConnection(); + const connection = new WebRTCConnection({ observability: createTestObservability() }); await expect(connection.setImageBase64("base64data")).rejects.toThrow("WebSocket is not open"); }); @@ -1186,7 +1195,7 @@ describe("WebRTCConnection", () => { it("uses custom timeout when send succeeds but ack is not received", async () => { const { WebRTCConnection } = await import("../src/realtime/webrtc-connection.js"); - const connection = new WebRTCConnection(); + const connection = new WebRTCConnection({ observability: createTestObservability() }); const sendSpy = vi.spyOn(connection, "send").mockReturnValue(true); const customTimeout = 5000; @@ -1211,7 +1220,7 @@ describe("WebRTCConnection", () => { it("uses default timeout (30000ms) when send succeeds but ack is not received", async () => { const { WebRTCConnection } = await import("../src/realtime/webrtc-connection.js"); - const connection = new WebRTCConnection(); + const connection = new WebRTCConnection({ observability: createTestObservability() }); const sendSpy = vi.spyOn(connection, "send").mockReturnValue(true); let rejected = false; @@ -1238,7 +1247,7 @@ describe("WebRTCConnection", () => { vi.useFakeTimers(); try { const { WebRTCConnection } = await import("../src/realtime/webrtc-connection.js"); - const connection = new WebRTCConnection(); + const connection = new WebRTCConnection({ observability: createTestObservability() }); const sendSpy = vi.spyOn(connection, "send").mockReturnValue(true); const promise = connection.setImageBase64(null, { prompt: null }).catch(() => {}); @@ -1296,7 +1305,7 @@ describe("WebRTCConnection", () => { vi.stubGlobal("RTCPeerConnection", FakePeerConnection as unknown as typeof RTCPeerConnection); try { - const connection = new WebRTCConnection(); + const connection = new WebRTCConnection({ observability: createTestObservability() }); const internalConnection = connection as unknown as { handleSignalingMessage: (msg: unknown) => Promise; localStream: { getTracks: () => MediaStreamTrack[] }; @@ -1529,7 +1538,7 @@ describe("Subscribe Client", () => { vi.stubGlobal("RTCPeerConnection", FakePeerConnection as unknown as typeof RTCPeerConnection); try { - const connection = new WebRTCConnection(); + const connection = new WebRTCConnection({ observability: createTestObservability() }); const internal = connection as unknown as { handleSignalingMessage: (msg: unknown) => Promise; localStream: MediaStream | null; @@ -1555,6 +1564,7 @@ describe("Subscribe Client", () => { const manager = new WebRTCManager({ webrtcUrl: "wss://example.com", + observability: createTestObservability(), onRemoteStream: vi.fn(), onError: vi.fn(), }); @@ -2711,7 +2721,7 @@ describe("WebSockets Connection", () => { vi.stubGlobal("WebSocket", FakeWebSocket as unknown as typeof WebSocket); try { - const connection = new WebRTCConnection(); + const connection = new WebRTCConnection({ observability: createTestObservability() }); const internal = connection as unknown as { setState: (state: import("../src/realtime/types").ConnectionState) => void; setupNewPeerConnection: () => Promise; @@ -2765,7 +2775,7 @@ describe("WebSockets Connection", () => { vi.stubGlobal("WebSocket", FakeWebSocket as unknown as typeof WebSocket); try { - const connection = new WebRTCConnection(); + const connection = new WebRTCConnection({ observability: createTestObservability() }); const internal = connection as unknown as { setState: (state: import("../src/realtime/types").ConnectionState) => void; setupNewPeerConnection: () => Promise; @@ -2818,7 +2828,7 @@ describe("WebSockets Connection", () => { vi.stubGlobal("RTCPeerConnection", FakePeerConnection as unknown as typeof RTCPeerConnection); try { - const connection = new WebRTCConnection(); + const connection = new WebRTCConnection({ observability: createTestObservability() }); const internal = connection as unknown as { handleSignalingMessage: (msg: unknown) => Promise; localStream: { getTracks: () => MediaStreamTrack[] }; @@ -2848,6 +2858,7 @@ describe("WebSockets Connection", () => { const { WebRTCManager } = await import("../src/realtime/webrtc-manager.js"); const manager = new WebRTCManager({ webrtcUrl: "wss://example.com", + observability: createTestObservability(), onRemoteStream: vi.fn(), onError: vi.fn(), }); From 7fad89154e225c068c00541856eb02dffaca7ee6 Mon Sep 17 00:00:00 2001 From: Verion1 Date: Thu, 14 May 2026 12:15:24 +0300 Subject: [PATCH 3/3] fix Adir issues + publish stats events --- packages/sdk/src/realtime/client.ts | 3 +- .../observability/realtime-observability.ts | 1 - packages/sdk/src/realtime/subscribe-client.ts | 2 + packages/sdk/tests/unit.test.ts | 45 +++++++++++++++++++ 4 files changed, 49 insertions(+), 2 deletions(-) diff --git a/packages/sdk/src/realtime/client.ts b/packages/sdk/src/realtime/client.ts index ab8be410..b2747023 100644 --- a/packages/sdk/src/realtime/client.ts +++ b/packages/sdk/src/realtime/client.ts @@ -147,7 +147,7 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { integration, logger, onDiagnostic: (event) => emitOrBuffer("diagnostic", event), - onStats: (stats) => emitOrBuffer("stats", stats), + onStats: opts.telemetryEnabled ? (stats) => emitOrBuffer("stats", stats) : undefined, }); try { @@ -257,6 +257,7 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => { integration, logger, onDiagnostic: (event) => emitOrBuffer("diagnostic", event as SubscribeEvents["diagnostic"]), + onStats: opts.telemetryEnabled ? (stats) => emitOrBuffer("stats", stats) : undefined, }); observability.sessionStarted(sid); diff --git a/packages/sdk/src/realtime/observability/realtime-observability.ts b/packages/sdk/src/realtime/observability/realtime-observability.ts index 948e83fd..1070b713 100644 --- a/packages/sdk/src/realtime/observability/realtime-observability.ts +++ b/packages/sdk/src/realtime/observability/realtime-observability.ts @@ -75,7 +75,6 @@ export class RealtimeObservability { } this.stopStats(); - this.resetStallDetection(); this.statsCollectorSource = source; if (!this.options.telemetryEnabled && !this.options.onStats) { diff --git a/packages/sdk/src/realtime/subscribe-client.ts b/packages/sdk/src/realtime/subscribe-client.ts index d8610d04..0e93f8c5 100644 --- a/packages/sdk/src/realtime/subscribe-client.ts +++ b/packages/sdk/src/realtime/subscribe-client.ts @@ -1,5 +1,6 @@ import type { DecartSDKError } from "../utils/errors"; import type { DiagnosticEvent } from "./observability/diagnostics"; +import type { WebRTCStats } from "./observability/webrtc-stats"; import type { ConnectionState } from "./types"; type TokenPayload = { @@ -28,6 +29,7 @@ export type SubscribeEvents = { connectionChange: ConnectionState; error: DecartSDKError; diagnostic: DiagnosticEvent; + stats: WebRTCStats; }; export type RealTimeSubscribeClient = { diff --git a/packages/sdk/tests/unit.test.ts b/packages/sdk/tests/unit.test.ts index 2d62a370..ca96f30d 100644 --- a/packages/sdk/tests/unit.test.ts +++ b/packages/sdk/tests/unit.test.ts @@ -1884,6 +1884,51 @@ describe("Subscribe Client", () => { } }); + it("does not start stats collection when telemetry is disabled", async () => { + const { createRealTimeClient } = await import("../src/realtime/client.js"); + const { WebRTCManager } = await import("../src/realtime/webrtc-manager.js"); + const { WebRTCStatsCollector } = await import("../src/realtime/observability/webrtc-stats.js"); + + const peerConnection = { getStats: vi.fn().mockResolvedValue(new Map()) } as unknown as RTCPeerConnection; + + const startSpy = vi.spyOn(WebRTCStatsCollector.prototype, "start").mockImplementation(() => {}); + const connectSpy = vi.spyOn(WebRTCManager.prototype, "connect").mockImplementation(async function () { + const mgr = this as unknown as { + managerState: import("../src/realtime/types").ConnectionState; + handleConnectionStateChange: (state: import("../src/realtime/types").ConnectionState) => void; + }; + mgr.managerState = "connected"; + mgr.handleConnectionStateChange("connected"); + return true; + }); + const stateSpy = vi.spyOn(WebRTCManager.prototype, "getConnectionState").mockReturnValue("connected"); + const peerConnectionSpy = vi.spyOn(WebRTCManager.prototype, "getPeerConnection").mockReturnValue(peerConnection); + const cleanupSpy = vi.spyOn(WebRTCManager.prototype, "cleanup").mockImplementation(() => {}); + + try { + const realtime = createRealTimeClient({ + baseUrl: "wss://api3.decart.ai", + apiKey: "test-key", + logger: { debug() {}, info() {}, warn() {}, error() {} }, + telemetryEnabled: false, + }); + const client = await realtime.connect({} as MediaStream, { + model: models.realtime("mirage_v2"), + onRemoteStream: vi.fn(), + }); + + expect(startSpy).not.toHaveBeenCalled(); + + client.disconnect(); + } finally { + startSpy.mockRestore(); + connectSpy.mockRestore(); + stateSpy.mockRestore(); + peerConnectionSpy.mockRestore(); + cleanupSpy.mockRestore(); + } + }); + it("keeps explicit observability stats listeners independent from telemetry upload", async () => { const { RealtimeObservability } = await import("../src/realtime/observability/realtime-observability.js"); const { WebRTCStatsCollector } = await import("../src/realtime/observability/webrtc-stats.js");