From 6d1031fbbf917dfb1bf897b204b3a9715a2e2714 Mon Sep 17 00:00:00 2001 From: aris-cub Date: Tue, 14 Apr 2026 14:04:16 +0300 Subject: [PATCH 1/4] add W3C Baggage propagation support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The telemetry-tracing module only propagates W3C Trace Context (traceparent/tracestate) but not W3C Baggage. This prevents downstream consumers from receiving arbitrary key-value metadata through Pub/Sub messages, which is needed for use cases like message routing and queue splitting. Uses CompositePropagator to combine W3CTraceContextPropagator and W3CBaggagePropagator. No new dependencies required — both are already in @opentelemetry/core. --- handwritten/pubsub/src/telemetry-tracing.ts | 39 +++++-- handwritten/pubsub/test/telemetry-tracing.ts | 105 ++++++++++++++++++- handwritten/pubsub/test/tracing.ts | 11 ++ 3 files changed, 143 insertions(+), 12 deletions(-) diff --git a/handwritten/pubsub/src/telemetry-tracing.ts b/handwritten/pubsub/src/telemetry-tracing.ts index 54a516d1b6b4..94f5dec4d92b 100644 --- a/handwritten/pubsub/src/telemetry-tracing.ts +++ b/handwritten/pubsub/src/telemetry-tracing.ts @@ -26,7 +26,11 @@ import { Context, Link, } from '@opentelemetry/api'; -import {W3CTraceContextPropagator} from '@opentelemetry/core'; +import { + W3CTraceContextPropagator, + W3CBaggagePropagator, + CompositePropagator, +} from '@opentelemetry/core'; import {Attributes, PubsubMessage} from './publisher/pubsub-message'; import {Duration} from './temporal'; @@ -76,7 +80,9 @@ export enum OpenTelemetryLevel { * @private * @internal */ -const w3cTraceContextPropagator = new W3CTraceContextPropagator(); +const compositePropagator = new CompositePropagator({ + propagators: [new W3CTraceContextPropagator(), new W3CBaggagePropagator()], +}); // True if user code elsewhere wants to enable OpenTelemetry support. let globallyEnabled = false; @@ -225,6 +231,7 @@ export function spanContextToContext( * @internal */ export const modernAttributeName = 'googclient_traceparent'; +export const baggageAttributeName = 'googclient_baggage'; export interface AttributeParams { // Fully qualified. @@ -768,9 +775,20 @@ export function injectSpan(span: Span, message: MessageWithAttributes): void { delete message.attributes[modernAttributeName]; } - // Always do propagation injection with the trace context. - const context = trace.setSpanContext(ROOT_CONTEXT, span.spanContext()); - w3cTraceContextPropagator.inject(context, message, pubsubSetter); + if (message.attributes[baggageAttributeName]) { + console.warn( + `${baggageAttributeName} key set as message attribute, but will be overridden.`, + ); + + delete message.attributes[baggageAttributeName]; + } + + // Always do propagation injection with the trace and baggage context. + const propagationContext = trace.setSpanContext( + context.active(), + span.spanContext(), + ); + compositePropagator.inject(propagationContext, message, pubsubSetter); // Also put the direct reference to the Span object for while we're // passing it around in the client library. @@ -822,12 +840,11 @@ export function extractSpan( let context: Context | undefined; - if (keys.includes(modernAttributeName)) { - context = w3cTraceContextPropagator.extract( - ROOT_CONTEXT, - message, - pubsubGetter, - ); + if ( + keys.includes(modernAttributeName) || + keys.includes(baggageAttributeName) + ) { + context = compositePropagator.extract(ROOT_CONTEXT, message, pubsubGetter); } const span = PubsubSpans.createReceiveSpan( diff --git a/handwritten/pubsub/test/telemetry-tracing.ts b/handwritten/pubsub/test/telemetry-tracing.ts index 4ef42f105575..d21b5e220499 100644 --- a/handwritten/pubsub/test/telemetry-tracing.ts +++ b/handwritten/pubsub/test/telemetry-tracing.ts @@ -20,7 +20,12 @@ import {describe, it, beforeEach} from 'mocha'; import * as trace from '@opentelemetry/sdk-trace-base'; import * as otel from '../src/telemetry-tracing'; import {exporter} from './tracing'; -import {SpanKind} from '@opentelemetry/api'; +import { + SpanKind, + context, + propagation, + trace as otelTrace, +} from '@opentelemetry/api'; import sinon = require('sinon'); import {PubsubMessage} from '../src/publisher'; import {Duration} from '../src/temporal'; @@ -191,6 +196,104 @@ describe('OpenTelemetryTracer', () => { 'd4cda95b652f4a1592b449d5929fda1b', ); }); + + it('round-trips baggage through inject and extract', () => { + // Verify that baggage set on a message via the composite propagator + // can be extracted on the subscriber side. + const baggage = propagation.createBaggage({ + 'test-key': {value: 'test-value'}, + }); + + // Build a context with both a span and baggage. + const publishMessage: PubsubMessage = { + attributes: {}, + }; + const span = otel.PubsubSpans.createPublisherSpan( + publishMessage, + 'projects/test/topics/topicfoo', + 'tests', + ); + assert.ok(span); + + // Simulate what injectSpan does, but with baggage on the context. + const ctxWithBaggage = propagation.setBaggage(context.active(), baggage); + const propagationCtx = otelTrace.setSpanContext( + ctxWithBaggage, + span.spanContext(), + ); + + // Use the pubsub setter/getter directly with propagation API. + propagation.inject(propagationCtx, publishMessage, otel.pubsubSetter); + + // Verify baggage attribute was set on the message. + assert.strictEqual( + Object.getOwnPropertyNames(publishMessage.attributes).includes( + otel.baggageAttributeName, + ), + true, + ); + assert.ok( + ( + publishMessage.attributes![otel.baggageAttributeName] as string + ).includes('test-key=test-value'), + ); + }); + + it('should issue a warning if baggage attribute key is set', () => { + const message: PubsubMessage = { + attributes: { + [otel.baggageAttributeName]: 'bazbar', + }, + }; + const span = otel.PubsubSpans.createPublisherSpan( + message, + 'projects/test/topics/topicfoo', + 'tests', + ); + assert.ok(span); + + const warnSpy = sinon.spy(console, 'warn'); + try { + otel.injectSpan(span, message); + assert.strictEqual(warnSpy.callCount, 1); + } finally { + warnSpy.restore(); + } + }); + + it('extracts baggage from message attributes', () => { + const message = { + attributes: { + [otel.modernAttributeName]: + '00-d4cda95b652f4a1592b449d5929fda1b-553964cd9101a314-01', + [otel.baggageAttributeName]: 'test-key=test-value', + }, + }; + + const childSpan = otel.extractSpan( + message, + 'projects/test/subscriptions/subfoo', + ); + assert.ok(childSpan); + assert.strictEqual( + childSpan.spanContext().traceId, + 'd4cda95b652f4a1592b449d5929fda1b', + ); + }); + + it('extracts span when only baggage is present', () => { + const message = { + attributes: { + [otel.baggageAttributeName]: 'test-key=test-value', + }, + }; + + const childSpan = otel.extractSpan( + message, + 'projects/test/subscriptions/subfoo', + ); + assert.ok(childSpan); + }); }); describe('attribute creation', () => { diff --git a/handwritten/pubsub/test/tracing.ts b/handwritten/pubsub/test/tracing.ts index 7689253ad437..541ffe85af6a 100644 --- a/handwritten/pubsub/test/tracing.ts +++ b/handwritten/pubsub/test/tracing.ts @@ -19,6 +19,12 @@ import { InMemorySpanExporter, SimpleSpanProcessor, } from '@opentelemetry/sdk-trace-base'; +import {propagation} from '@opentelemetry/api'; +import { + CompositePropagator, + W3CTraceContextPropagator, + W3CBaggagePropagator, +} from '@opentelemetry/core'; /** * This file is used to initialise a global tracing provider and span exporter @@ -39,3 +45,8 @@ export const exporter: InMemorySpanExporter = new InMemorySpanExporter(); export const provider: BasicTracerProvider = new BasicTracerProvider(); provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); provider.register(); +propagation.setGlobalPropagator( + new CompositePropagator({ + propagators: [new W3CTraceContextPropagator(), new W3CBaggagePropagator()], + }), +); From da77cecf32598a978030a3305e4f121bf407d631 Mon Sep 17 00:00:00 2001 From: aris-cub Date: Tue, 14 Apr 2026 14:23:18 +0300 Subject: [PATCH 2/4] update baggage extraction check and injection test - Add baggage key to `containsSpanContext` - Use `context.with()` in injection test --- handwritten/pubsub/package.json | 1 + handwritten/pubsub/src/telemetry-tracing.ts | 4 ++- handwritten/pubsub/test/telemetry-tracing.ts | 26 ++++++-------------- handwritten/pubsub/test/tracing.ts | 5 +++- 4 files changed, 16 insertions(+), 20 deletions(-) diff --git a/handwritten/pubsub/package.json b/handwritten/pubsub/package.json index 617dabc9f63e..bfe42f13bdd8 100644 --- a/handwritten/pubsub/package.json +++ b/handwritten/pubsub/package.json @@ -72,6 +72,7 @@ }, "devDependencies": { "@grpc/proto-loader": "^0.8.0", + "@opentelemetry/context-async-hooks": "^2.6.1", "@opentelemetry/sdk-trace-base": "^1.17.0", "@types/duplexify": "^3.6.4", "@types/extend": "^3.0.4", diff --git a/handwritten/pubsub/src/telemetry-tracing.ts b/handwritten/pubsub/src/telemetry-tracing.ts index 94f5dec4d92b..833924999941 100644 --- a/handwritten/pubsub/src/telemetry-tracing.ts +++ b/handwritten/pubsub/src/telemetry-tracing.ts @@ -811,7 +811,9 @@ export function containsSpanContext(message: MessageWithAttributes): boolean { } const keys = Object.getOwnPropertyNames(message.attributes); - return !!keys.find(n => n === modernAttributeName); + return !!keys.find( + n => n === modernAttributeName || n === baggageAttributeName, + ); } /** diff --git a/handwritten/pubsub/test/telemetry-tracing.ts b/handwritten/pubsub/test/telemetry-tracing.ts index d21b5e220499..8eb44b15db88 100644 --- a/handwritten/pubsub/test/telemetry-tracing.ts +++ b/handwritten/pubsub/test/telemetry-tracing.ts @@ -20,12 +20,7 @@ import {describe, it, beforeEach} from 'mocha'; import * as trace from '@opentelemetry/sdk-trace-base'; import * as otel from '../src/telemetry-tracing'; import {exporter} from './tracing'; -import { - SpanKind, - context, - propagation, - trace as otelTrace, -} from '@opentelemetry/api'; +import {SpanKind, context, propagation} from '@opentelemetry/api'; import sinon = require('sinon'); import {PubsubMessage} from '../src/publisher'; import {Duration} from '../src/temporal'; @@ -197,14 +192,11 @@ describe('OpenTelemetryTracer', () => { ); }); - it('round-trips baggage through inject and extract', () => { - // Verify that baggage set on a message via the composite propagator - // can be extracted on the subscriber side. + it('injects baggage from the active context into message attributes', () => { const baggage = propagation.createBaggage({ 'test-key': {value: 'test-value'}, }); - // Build a context with both a span and baggage. const publishMessage: PubsubMessage = { attributes: {}, }; @@ -215,17 +207,15 @@ describe('OpenTelemetryTracer', () => { ); assert.ok(span); - // Simulate what injectSpan does, but with baggage on the context. + // Set the baggage on the active context. const ctxWithBaggage = propagation.setBaggage(context.active(), baggage); - const propagationCtx = otelTrace.setSpanContext( - ctxWithBaggage, - span.spanContext(), - ); - // Use the pubsub setter/getter directly with propagation API. - propagation.inject(propagationCtx, publishMessage, otel.pubsubSetter); + // Execute injectSpan within the scope of the active context. + context.with(ctxWithBaggage, () => { + otel.injectSpan(span, publishMessage); + }); - // Verify baggage attribute was set on the message. + // Verify baggage attribute was set on the message by the compositePropagator. assert.strictEqual( Object.getOwnPropertyNames(publishMessage.attributes).includes( otel.baggageAttributeName, diff --git a/handwritten/pubsub/test/tracing.ts b/handwritten/pubsub/test/tracing.ts index 541ffe85af6a..81053b738a17 100644 --- a/handwritten/pubsub/test/tracing.ts +++ b/handwritten/pubsub/test/tracing.ts @@ -19,12 +19,13 @@ import { InMemorySpanExporter, SimpleSpanProcessor, } from '@opentelemetry/sdk-trace-base'; -import {propagation} from '@opentelemetry/api'; +import {context, propagation} from '@opentelemetry/api'; import { CompositePropagator, W3CTraceContextPropagator, W3CBaggagePropagator, } from '@opentelemetry/core'; +import {AsyncLocalStorageContextManager} from '@opentelemetry/context-async-hooks'; /** * This file is used to initialise a global tracing provider and span exporter @@ -44,6 +45,8 @@ import { export const exporter: InMemorySpanExporter = new InMemorySpanExporter(); export const provider: BasicTracerProvider = new BasicTracerProvider(); provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); +const contextManager = new AsyncLocalStorageContextManager(); +context.setGlobalContextManager(contextManager); provider.register(); propagation.setGlobalPropagator( new CompositePropagator({ From edbd0eb0d0efe8a2203b6b2ae67ef72c018d6939 Mon Sep 17 00:00:00 2001 From: aris-cub Date: Wed, 15 Apr 2026 17:41:06 +0300 Subject: [PATCH 3/4] chore: retrigger CLA check From 058f05764734b3f84ae8cc5b87eb36b6e9fb05f7 Mon Sep 17 00:00:00 2001 From: aris-cub Date: Wed, 15 Apr 2026 22:35:24 +0300 Subject: [PATCH 4/4] feat(pubsub): propagate W3C Baggage to subscriber callbacks retain propagation context to prevent dropping baggage - Store context on message and activate for user callbacks. - Rename containsSpanContext to containsPropagationContext. - Update copyright year. --- handwritten/pubsub/src/lease-manager.ts | 8 +++++- handwritten/pubsub/src/subscriber.ts | 10 +++++++ handwritten/pubsub/src/telemetry-tracing.ts | 13 +++++++-- handwritten/pubsub/test/telemetry-tracing.ts | 29 ++++++++++++++++---- 4 files changed, 51 insertions(+), 9 deletions(-) diff --git a/handwritten/pubsub/src/lease-manager.ts b/handwritten/pubsub/src/lease-manager.ts index d8afdb11a359..76f5f56cb6d9 100644 --- a/handwritten/pubsub/src/lease-manager.ts +++ b/handwritten/pubsub/src/lease-manager.ts @@ -15,6 +15,7 @@ */ import {EventEmitter} from 'events'; +import {context as otelContext} from '@opentelemetry/api'; import {AckError, Message, Subscriber} from './subscriber'; import {defaultOptions} from './default-options'; @@ -297,8 +298,13 @@ export class LeaseManager extends EventEmitter { message.ackId, ); message.subSpans.processingStart(this._subscriber.name); + const emitCallback = () => this._subscriber.emit('message', message); try { - this._subscriber.emit('message', message); + if (message.parentContext) { + otelContext.with(message.parentContext, emitCallback); + } else { + emitCallback(); + } } catch (e: unknown) { logs.callbackExceptions.error( 'message (ID %s, ackID %s) caused a user callback exception: %o', diff --git a/handwritten/pubsub/src/subscriber.ts b/handwritten/pubsub/src/subscriber.ts index 46834636e1b7..604f505f54ef 100644 --- a/handwritten/pubsub/src/subscriber.ts +++ b/handwritten/pubsub/src/subscriber.ts @@ -14,6 +14,7 @@ * limitations under the License. */ +import {Context} from '@opentelemetry/api'; import {DateStruct, PreciseDate} from '@google-cloud/precise-date'; import {replaceProjectIdToken} from '@google-cloud/projectify'; import {promisify} from '@google-cloud/promisify'; @@ -289,6 +290,15 @@ export class Message implements tracing.MessageWithAttributes { */ parentSpan?: tracing.Span; + /** + * @private + * + * Tracks the propagation context (including baggage) extracted from the + * incoming message. Used to set the active context when dispatching to + * user callbacks so that baggage is accessible. + */ + parentContext?: Context; + /** * We'll save the state of the subscription's exactly once delivery flag at the * time the message was received. This is pretty much only for tracing, as we will diff --git a/handwritten/pubsub/src/telemetry-tracing.ts b/handwritten/pubsub/src/telemetry-tracing.ts index 833924999941..2ee65b0ac734 100644 --- a/handwritten/pubsub/src/telemetry-tracing.ts +++ b/handwritten/pubsub/src/telemetry-tracing.ts @@ -1,5 +1,5 @@ /*! - * Copyright 2020-2024 Google LLC + * Copyright 2020-2026 Google LLC * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -123,6 +123,7 @@ export function isEnabled(): OpenTelemetryLevel { export interface MessageWithAttributes { attributes?: Attributes | null | undefined; parentSpan?: Span; + parentContext?: Context; } /** @@ -796,12 +797,15 @@ export function injectSpan(span: Span, message: MessageWithAttributes): void { } /** - * Returns true if this message potentially contains a span context. + * Returns true if this message potentially contains a propagation context + * (trace context or baggage). * * @private * @internal */ -export function containsSpanContext(message: MessageWithAttributes): boolean { +export function containsPropagationContext( + message: MessageWithAttributes, +): boolean { if (message.parentSpan) { return true; } @@ -856,5 +860,8 @@ export function extractSpan( 'extractSpan', ); message.parentSpan = span; + if (context) { + message.parentContext = context; + } return span; } diff --git a/handwritten/pubsub/test/telemetry-tracing.ts b/handwritten/pubsub/test/telemetry-tracing.ts index 8eb44b15db88..b82732223089 100644 --- a/handwritten/pubsub/test/telemetry-tracing.ts +++ b/handwritten/pubsub/test/telemetry-tracing.ts @@ -160,7 +160,7 @@ describe('OpenTelemetryTracer', () => { } }); - it('should be able to determine if attributes are present', () => { + it('should be able to determine if propagation attributes are present', () => { let message: otel.MessageWithAttributes; message = { @@ -168,10 +168,17 @@ describe('OpenTelemetryTracer', () => { [otel.modernAttributeName]: 'foobar', }, }; - assert.strictEqual(otel.containsSpanContext(message), true); + assert.strictEqual(otel.containsPropagationContext(message), true); + + message = { + attributes: { + [otel.baggageAttributeName]: 'key=value', + }, + }; + assert.strictEqual(otel.containsPropagationContext(message), true); message = {}; - assert.strictEqual(otel.containsSpanContext(message), false); + assert.strictEqual(otel.containsPropagationContext(message), false); }); it('extracts a trace context', () => { @@ -252,7 +259,7 @@ describe('OpenTelemetryTracer', () => { }); it('extracts baggage from message attributes', () => { - const message = { + const message: otel.MessageWithAttributes = { attributes: { [otel.modernAttributeName]: '00-d4cda95b652f4a1592b449d5929fda1b-553964cd9101a314-01', @@ -269,10 +276,16 @@ describe('OpenTelemetryTracer', () => { childSpan.spanContext().traceId, 'd4cda95b652f4a1592b449d5929fda1b', ); + + // Verify baggage is accessible on the extracted context. + assert.ok(message.parentContext); + const baggage = propagation.getBaggage(message.parentContext!); + assert.ok(baggage); + assert.strictEqual(baggage!.getEntry('test-key')?.value, 'test-value'); }); it('extracts span when only baggage is present', () => { - const message = { + const message: otel.MessageWithAttributes = { attributes: { [otel.baggageAttributeName]: 'test-key=test-value', }, @@ -283,6 +296,12 @@ describe('OpenTelemetryTracer', () => { 'projects/test/subscriptions/subfoo', ); assert.ok(childSpan); + + // Verify baggage is accessible even without a trace context. + assert.ok(message.parentContext); + const baggage = propagation.getBaggage(message.parentContext!); + assert.ok(baggage); + assert.strictEqual(baggage!.getEntry('test-key')?.value, 'test-value'); }); });