From 5d8b414911df72df6e8e706cdf61b1721b45ac52 Mon Sep 17 00:00:00 2001 From: Sergei Dubov Date: Thu, 7 May 2026 18:03:35 +0200 Subject: [PATCH] fix(agents): KG-808. SpanAdapter hooks run on fully prepared spans in OpenTelemetry feature - `OpenTelemetry.kt` previously invoked `SpanAdapter.onBeforeSpanFinished` before `endSpan` had populated final attributes, so adapters such as Langfuse and Weave never observed them. - Each start and end span helpers now take a `SpanAdapter` and invokes the corresponding hook internally, after all attributes are set and right before the span is started or ended. - MCP enrichment for tool-call spans moved inside `startExecuteToolSpan` via a new optional `mcpToolMetadata` parameter; `mcpClientSpan.kt` is gone, its helper is now private inside `executeToolSpan.kt`. closes: [KG-808](https://youtrack.jetbrains.com/issue/KG-808) --- .../opentelemetry/feature/OpenTelemetry.kt | 141 ++++++++---------- .../opentelemetry/span/createAgentSpan.kt | 26 +++- .../opentelemetry/span/executeToolSpan.kt | 121 ++++++++++++++- .../opentelemetry/span/inferenceSpan.kt | 35 ++++- .../opentelemetry/span/invokeAgentSpan.kt | 35 ++++- .../opentelemetry/span/mcpClientSpan.kt | 58 ------- .../opentelemetry/span/nodeExecuteSpan.kt | 30 +++- .../opentelemetry/span/strategySpan.kt | 26 +++- .../opentelemetry/span/subgraphExecuteSpan.kt | 30 +++- .../feature/OpenTelemetryFeatureTest.kt | 52 ++++++- .../span/OpenTelemetryInferenceSpanTest.kt | 8 +- .../integration/TraceStructureTestBase.kt | 30 +++- .../weave/WeaveTraceStructureTest.kt | 19 ++- 13 files changed, 433 insertions(+), 178 deletions(-) delete mode 100644 agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/mcpClientSpan.kt diff --git a/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetry.kt b/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetry.kt index 656c6bcb80..067a6da5f8 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetry.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetry.kt @@ -3,7 +3,6 @@ package ai.koog.agents.features.opentelemetry.feature import ai.koog.agents.core.agent.config.AIAgentConfig import ai.koog.agents.core.agent.entity.AIAgentStorageKey import ai.koog.agents.core.agent.execution.AgentExecutionInfo -import ai.koog.agents.core.annotation.InternalAgentsApi import ai.koog.agents.core.feature.AIAgentFunctionalFeature import ai.koog.agents.core.feature.AIAgentGraphFeature import ai.koog.agents.core.feature.AIAgentPlannerFeature @@ -38,7 +37,6 @@ import ai.koog.agents.features.opentelemetry.span.endInvokeAgentSpan import ai.koog.agents.features.opentelemetry.span.endNodeExecuteSpan import ai.koog.agents.features.opentelemetry.span.endStrategySpan import ai.koog.agents.features.opentelemetry.span.endSubgraphExecuteSpan -import ai.koog.agents.features.opentelemetry.span.enrichExecuteToolSpanWithMcpAttrs import ai.koog.agents.features.opentelemetry.span.startCreateAgentSpan import ai.koog.agents.features.opentelemetry.span.startExecuteToolSpan import ai.koog.agents.features.opentelemetry.span.startInferenceSpan @@ -114,10 +112,10 @@ public class OpenTelemetry { id = eventContext.eventId, runId = eventContext.context.runId, nodeId = eventContext.node.id, - nodeInput = nodeInput + nodeInput = nodeInput, + spanAdapter = spanAdapter, ) - spanAdapter?.onBeforeSpanStarted(nodeExecuteSpan) spanCollector.collectSpan( span = nodeExecuteSpan, path = patchedExecutionInfo @@ -137,11 +135,11 @@ public class OpenTelemetry { val nodeOutput = nodeDataToString(eventContext.output, eventContext.outputType, pipeline.config.serializer) - spanAdapter?.onBeforeSpanFinished(nodeExecuteSpan) endNodeExecuteSpan( span = nodeExecuteSpan, nodeOutput = nodeOutput, - verbose = config.isVerbose + verbose = config.isVerbose, + spanAdapter = spanAdapter, ) spanCollector.removeSpan( span = nodeExecuteSpan, @@ -160,12 +158,12 @@ public class OpenTelemetry { spanType = SpanType.NODE ) ?: return@intercept - spanAdapter?.onBeforeSpanFinished(nodeExecuteSpan) endNodeExecuteSpan( span = nodeExecuteSpan, nodeOutput = null, error = eventContext.error, - verbose = config.isVerbose + verbose = config.isVerbose, + spanAdapter = spanAdapter, ) spanCollector.removeSpan( span = nodeExecuteSpan, @@ -194,10 +192,10 @@ public class OpenTelemetry { id = eventContext.eventId, runId = eventContext.context.runId, subgraphId = eventContext.subgraph.id, - subgraphInput = subgraphInput + subgraphInput = subgraphInput, + spanAdapter = spanAdapter, ) - spanAdapter?.onBeforeSpanStarted(subgraphExecuteSpan) spanCollector.collectSpan( span = subgraphExecuteSpan, path = patchedExecutionInfo @@ -217,11 +215,11 @@ public class OpenTelemetry { val subgraphOutput = nodeDataToString(eventContext.output, eventContext.outputType, pipeline.config.serializer) - spanAdapter?.onBeforeSpanFinished(subgraphExecuteSpan) endSubgraphExecuteSpan( span = subgraphExecuteSpan, subgraphOutput = subgraphOutput, - verbose = config.isVerbose + verbose = config.isVerbose, + spanAdapter = spanAdapter, ) spanCollector.removeSpan( span = subgraphExecuteSpan, @@ -240,12 +238,12 @@ public class OpenTelemetry { spanType = SpanType.SUBGRAPH ) ?: return@intercept - spanAdapter?.onBeforeSpanFinished(subgraphExecuteSpan) endSubgraphExecuteSpan( span = subgraphExecuteSpan, subgraphOutput = null, error = eventContext.error, - verbose = config.isVerbose + verbose = config.isVerbose, + spanAdapter = spanAdapter, ) spanCollector.removeSpan( span = subgraphExecuteSpan, @@ -311,10 +309,10 @@ public class OpenTelemetry { id = eventContext.eventId, model = eventContext.agent.agentConfig.model, agentId = eventContext.context.agentId, - messages = messages + messages = messages, + spanAdapter = spanAdapter, ) - spanAdapter?.onBeforeSpanStarted(createAgentSpan) spanCollector.collectSpan( span = createAgentSpan, path = eventContext.executionInfo @@ -331,10 +329,10 @@ public class OpenTelemetry { runId = eventContext.runId, llmParams = eventContext.agent.agentConfig.prompt.params, messages = messages, - tools = tools + tools = tools, + spanAdapter = spanAdapter, ) - spanAdapter?.onBeforeSpanStarted(invokeAgentSpan) // Patch the agent execution info to include runId in the path. // This is required to create a path structure that matches the span structure in the OTel feature. spanCollector.collectSpan( @@ -360,12 +358,12 @@ public class OpenTelemetry { ) } - spanAdapter?.onBeforeSpanFinished(invokeAgentSpan) endInvokeAgentSpan( span = invokeAgentSpan, messages = eventContext.context.config.prompt.messages.toList(), model = eventContext.context.config.model, - verbose = config.isVerbose + verbose = config.isVerbose, + spanAdapter = spanAdapter, ) spanCollector.removeSpan( span = invokeAgentSpan, @@ -381,7 +379,11 @@ public class OpenTelemetry { metricCollector.flushPendingAsErrors(eventContext.error) // Stop all unfinished spans, except InvokeAgentSpan and AgentCreateSpan - endUnfinishedSpans(spanCollector, config.isVerbose) { span -> + endUnfinishedSpans( + spanCollector = spanCollector, + spanAdapter = spanAdapter, + verbose = config.isVerbose, + ) { span -> span.type != SpanType.CREATE_AGENT && span.type != SpanType.INVOKE_AGENT && span.id != eventContext.eventId @@ -395,13 +397,13 @@ public class OpenTelemetry { spanType = SpanType.INVOKE_AGENT ) ?: return@intercept - spanAdapter?.onBeforeSpanFinished(invokeAgentSpan) endInvokeAgentSpan( span = invokeAgentSpan, messages = eventContext.context.config.prompt.messages.toList(), model = eventContext.context.config.model, error = eventContext.error, - verbose = config.isVerbose + verbose = config.isVerbose, + spanAdapter = spanAdapter, ) spanCollector.removeSpan( span = invokeAgentSpan, @@ -417,7 +419,11 @@ public class OpenTelemetry { metricCollector.flushPendingAsErrors(error = null) // Stop all unfinished spans, except the AgentCreateSpan - endUnfinishedSpans(spanCollector, config.isVerbose) { span -> + endUnfinishedSpans( + spanCollector = spanCollector, + verbose = config.isVerbose, + spanAdapter = spanAdapter, + ) { span -> span.type != SpanType.CREATE_AGENT } @@ -428,10 +434,10 @@ public class OpenTelemetry { spanType = SpanType.CREATE_AGENT ) ?: return@intercept - spanAdapter?.onBeforeSpanFinished(agentSpan) endCreateAgentSpan( span = agentSpan, - verbose = config.isVerbose + verbose = config.isVerbose, + spanAdapter = spanAdapter, ) spanCollector.removeSpan( @@ -442,7 +448,11 @@ public class OpenTelemetry { // Just in case we miss some spans, stop them as well if (spanCollector.activeSpansCount > 0) { logger.warn { "Found <${spanCollector.activeSpansCount}> active span(s) after agent closing. Stopping them." } - endUnfinishedSpans(spanCollector, config.isVerbose) + endUnfinishedSpans( + spanCollector = spanCollector, + verbose = config.isVerbose, + spanAdapter = spanAdapter, + ) } if (config.isShutdownOnAgentClose) { @@ -468,10 +478,10 @@ public class OpenTelemetry { parentSpan = parentSpan, id = eventContext.eventId, runId = eventContext.context.runId, - strategyName = eventContext.strategy.name + strategyName = eventContext.strategy.name, + spanAdapter = spanAdapter, ) - spanAdapter?.onBeforeSpanStarted(strategySpan) spanCollector.collectSpan( span = strategySpan, path = patchedExecutionInfo @@ -487,8 +497,11 @@ public class OpenTelemetry { spanType = SpanType.STRATEGY ) ?: return@intercept - spanAdapter?.onBeforeSpanFinished(strategySpan) - endStrategySpan(span = strategySpan, verbose = config.isVerbose) + endStrategySpan( + span = strategySpan, + verbose = config.isVerbose, + spanAdapter = spanAdapter, + ) spanCollector.removeSpan( span = strategySpan, path = patchedExecutionInfo @@ -522,11 +535,10 @@ public class OpenTelemetry { model = eventContext.model, messages = messages, llmParams = eventContext.prompt.params, - tools = eventContext.tools + tools = eventContext.tools, + spanAdapter = spanAdapter, ) - // Start span - spanAdapter?.onBeforeSpanStarted(inferenceSpan) spanCollector.collectSpan( span = inferenceSpan, path = patchedExecutionInfo @@ -563,20 +575,13 @@ public class OpenTelemetry { ) } - // Pre-populate gen_ai.output.messages so SpanAdapter implementations (Langfuse, Weave) - // can reshape the response messages before the span ends. - // endInferenceSpan re-adds the same attribute idempotently. - if (eventContext.responses.isNotEmpty()) { - inferenceSpan.addAttribute(GenAIAttributes.Output.Messages(eventContext.responses)) - } - // Stop InferenceSpan - spanAdapter?.onBeforeSpanFinished(inferenceSpan) endInferenceSpan( span = inferenceSpan, messages = eventContext.responses, model = eventContext.model, - verbose = config.isVerbose + verbose = config.isVerbose, + spanAdapter = spanAdapter, ) spanCollector.removeSpan( span = inferenceSpan, @@ -629,13 +634,13 @@ public class OpenTelemetry { spanType = SpanType.INFERENCE ) ?: return@intercept - spanAdapter?.onBeforeSpanFinished(inferenceSpan) endInferenceSpan( span = inferenceSpan, messages = emptyList(), model = eventContext.model, + error = eventContext.error, verbose = config.isVerbose, - error = eventContext.error + spanAdapter = spanAdapter, ) spanCollector.removeSpan( span = inferenceSpan, @@ -647,7 +652,6 @@ public class OpenTelemetry { //region Tool Call - @OptIn(InternalAgentsApi::class) pipeline.interceptToolCallStarting(this) intercept@{ eventContext -> logger.debug { "Execute OpenTelemetry tool call handler" } @@ -664,32 +668,11 @@ public class OpenTelemetry { toolName = eventContext.toolName, toolArgs = eventContext.toolArgs.toKotlinxJsonObject(), toolDescription = eventContext.toolDescription, - toolCallId = eventContext.toolCallId - ) - val mcpToolMetadata = eventContext.context.llm.toolRegistry.getMcpToolMeta(eventContext.toolName) - if (mcpToolMetadata != null) { - val mcpVersion = mcpToolMetadata[McpMetadataKeys.McpProtocolVersion] - val mcpTransportType = mcpToolMetadata[McpMetadataKeys.McpTransportType] - if (mcpVersion != null && mcpTransportType != null) { - executeToolSpan.enrichExecuteToolSpanWithMcpAttrs( - toolName = eventContext.toolName, - sessionId = mcpToolMetadata[McpMetadataKeys.McpSessionId], - methodName = "tools/call", - serverPort = mcpToolMetadata[McpMetadataKeys.ServerPort]?.toIntOrNull(), - serverAddress = mcpToolMetadata[McpMetadataKeys.ServerUrl], - mcpProtocolVersion = mcpVersion, - mcpTransportType = mcpTransportType, - ) - } else { - logger.error { - "MCP protocol version ($mcpVersion) and transport type ($mcpTransportType) are required " + - "for mcp tool call spans: tool=${eventContext.toolName}, " + - "serverUrl=${mcpToolMetadata[McpMetadataKeys.ServerUrl]}" - } - } - } + toolCallId = eventContext.toolCallId, + mcpToolMetadata = eventContext.context.llm.toolRegistry.getMcpToolMeta(eventContext.toolName), + spanAdapter = spanAdapter, + ) - spanAdapter?.onBeforeSpanStarted(executeToolSpan) spanCollector.collectSpan(executeToolSpan, path) // Metrics @@ -878,14 +861,20 @@ public class OpenTelemetry { appendId(this, id) /** - * Ends all unfinished spans that match the given predicate. - * If no predicate is provided, ends all spans. + * Ends all unfinished spans that match the given predicate. If no predicate is provided, ends all spans. * Spans are closed from leaf nodes up to parent nodes to maintain a proper hierarchy. * + * The optional [spanAdapter]'s [SpanAdapter.onBeforeSpanFinished] is invoked before + * each span is closed so adapter post-processing remains consistent. + * + * @param spanCollector Span collector for retrieving active spans. + * @param spanAdapter Optional span adapter for post-processing before span closure; + * @param verbose Whether to log span closure details. * @param filter Optional filter for spans to end. */ internal suspend fun endUnfinishedSpans( spanCollector: SpanCollector, + spanAdapter: SpanAdapter?, verbose: Boolean = false, filter: ((GenAIAgentSpan) -> Boolean)? = null ) { @@ -893,6 +882,7 @@ public class OpenTelemetry { spansToEnd.forEach { spanNode -> try { + spanAdapter?.onBeforeSpanFinished(spanNode.span) spanNode.span.end(verbose = verbose) spanCollector.removeSpan(spanNode.span, spanNode.path) } catch (e: Exception) { @@ -918,12 +908,12 @@ public class OpenTelemetry { spanType = SpanType.EXECUTE_TOOL ) ?: return - spanAdapter?.onBeforeSpanFinished(span = span) endExecuteToolSpan( span = span, toolResult = toolResult?.toKotlinxJsonElement(), error = error, - verbose = config.isVerbose + verbose = config.isVerbose, + spanAdapter = spanAdapter, ) spanCollector.removeSpan( span = span, @@ -931,7 +921,6 @@ public class OpenTelemetry { ) } - @OptIn(InternalAgentsApi::class) private fun ToolRegistry.getMcpToolMeta( toolName: String, ): Map? = tools.firstNotNullOfOrNull { tool -> diff --git a/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/createAgentSpan.kt b/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/createAgentSpan.kt index 998be0376c..a00bbcc075 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/createAgentSpan.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/createAgentSpan.kt @@ -5,6 +5,7 @@ import ai.koog.agents.features.opentelemetry.attribute.KoogAttributes import ai.koog.agents.features.opentelemetry.extension.addCommonErrorAttributes import ai.koog.agents.features.opentelemetry.extension.systemMessages import ai.koog.agents.features.opentelemetry.extension.toStatusData +import ai.koog.agents.features.opentelemetry.integration.SpanAdapter import ai.koog.prompt.llm.LLModel import ai.koog.prompt.message.Message import io.opentelemetry.kotlin.factory.ContextFactory @@ -30,6 +31,15 @@ import io.opentelemetry.kotlin.tracing.Tracer * * Custom attributes: * - koog.event.id + * + * @param tracer The tracer to use + * @param contextFactory The context factory to use + * @param parentSpan The parent span to use + * @param id The id of the span + * @param agentId The id of the agent + * @param model The model to use + * @param messages The messages to use + * @param spanAdapter The span adapter to use */ internal fun startCreateAgentSpan( tracer: Tracer, @@ -38,7 +48,8 @@ internal fun startCreateAgentSpan( id: String, agentId: String, model: LLModel, - messages: List + messages: List, + spanAdapter: SpanAdapter? = null, ): GenAIAgentSpan { val builder = GenAIAgentSpanBuilder( spanType = SpanType.CREATE_AGENT, @@ -68,7 +79,9 @@ internal fun startCreateAgentSpan( builder.addAttribute(KoogAttributes.Koog.Event.Id(id)) - return builder.buildAndStart(tracer, contextFactory) + val span = builder.buildAndStart(tracer, contextFactory) + spanAdapter?.onBeforeSpanStarted(span) + return span } /** @@ -79,11 +92,17 @@ internal fun startCreateAgentSpan( * * Span attribute: * - error.type (conditional) + * + * @param span The span to end + * @param error The error to set as the span status + * @param verbose Whether to log verbose information + * @param spanAdapter The span adapter to use */ internal fun endCreateAgentSpan( span: GenAIAgentSpan, error: Throwable? = null, - verbose: Boolean = false + verbose: Boolean = false, + spanAdapter: SpanAdapter? = null, ) { check(span.type == SpanType.CREATE_AGENT) { "${span.logString} Expected to end span type of type: <${SpanType.CREATE_AGENT}>, but received span of type: <${span.type}>" @@ -92,5 +111,6 @@ internal fun endCreateAgentSpan( // error.type span.addCommonErrorAttributes(error) + spanAdapter?.onBeforeSpanFinished(span) span.end(error.toStatusData(), verbose) } diff --git a/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/executeToolSpan.kt b/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/executeToolSpan.kt index fc94842a6b..2b75e13aae 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/executeToolSpan.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/executeToolSpan.kt @@ -1,17 +1,27 @@ package ai.koog.agents.features.opentelemetry.span +import ai.koog.agents.features.opentelemetry.attribute.CommonAttributes import ai.koog.agents.features.opentelemetry.attribute.GenAIAttributes import ai.koog.agents.features.opentelemetry.attribute.KoogAttributes +import ai.koog.agents.features.opentelemetry.attribute.McpAttributes import ai.koog.agents.features.opentelemetry.extension.addCommonErrorAttributes import ai.koog.agents.features.opentelemetry.extension.toStatusData +import ai.koog.agents.features.opentelemetry.integration.SpanAdapter +import ai.koog.agents.mcp.metadata.McpMetadataKeys +import io.github.oshai.kotlinlogging.KotlinLogging import io.opentelemetry.kotlin.factory.ContextFactory import io.opentelemetry.kotlin.tracing.SpanKind import io.opentelemetry.kotlin.tracing.Tracer import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonObject +private val logger = KotlinLogging.logger { } + /** - * Build and start a new Execute Tool Span with necessary attributes. + * Build and start a new Execute Tool Span with necessary attributes. When [mcpToolMetadata] is + * provided (i.e., the tool is sourced via MCP), the span is enriched with MCP-specific attributes + * before [SpanAdapter.onBeforeSpanStarted] is invoked, so adapters observe the fully prepared + * attribute set. * * Add the necessary attributes for the Execute Tool Span, according to the OpenTelemetry Semantic Convention: * https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#execute-tool-span @@ -26,6 +36,17 @@ import kotlinx.serialization.json.JsonObject * * Custom attributes: * - koog.event.id + * + * @param tracer The tracer instance to use for creating the span. + * @param contextFactory The context factory to use for creating the span context. + * @param parentSpan The parent span for the new span. + * @param id The unique identifier for the span. + * @param toolName The name of the tool being called. + * @param toolArgs The arguments for the tool call. + * @param toolDescription The description of the tool. + * @param toolCallId The identifier for the tool call. + * @param mcpToolMetadata Optional metadata for the tool sourced via MCP. + * @param spanAdapter Optional span adapter for customizing the span behavior. */ internal fun startExecuteToolSpan( tracer: Tracer, @@ -36,6 +57,8 @@ internal fun startExecuteToolSpan( toolArgs: JsonObject, toolDescription: String?, toolCallId: String?, + mcpToolMetadata: Map? = null, + spanAdapter: SpanAdapter? = null, ): GenAIAgentSpan { val builder = GenAIAgentSpanBuilder( spanType = SpanType.EXECUTE_TOOL, @@ -70,11 +93,38 @@ internal fun startExecuteToolSpan( builder.addAttribute(KoogAttributes.Koog.Event.Id(id)) - return builder.buildAndStart(tracer, contextFactory) + val span = builder.buildAndStart(tracer, contextFactory) + + if (mcpToolMetadata != null) { + val mcpVersion = mcpToolMetadata[McpMetadataKeys.McpProtocolVersion] + val mcpTransportType = mcpToolMetadata[McpMetadataKeys.McpTransportType] + if (mcpVersion != null && mcpTransportType != null) { + span.enrichExecuteToolSpanWithMcpAttributes( + toolName = toolName, + sessionId = mcpToolMetadata[McpMetadataKeys.McpSessionId], + methodName = "tools/call", + serverPort = mcpToolMetadata[McpMetadataKeys.ServerPort]?.toIntOrNull(), + serverAddress = mcpToolMetadata[McpMetadataKeys.ServerUrl], + mcpProtocolVersion = mcpVersion, + mcpTransportType = mcpTransportType, + ) + } else { + logger.error { + "MCP protocol version ($mcpVersion) and transport type ($mcpTransportType) are required " + + "for mcp tool call spans: tool=$toolName, " + + "serverUrl=${mcpToolMetadata[McpMetadataKeys.ServerUrl]}" + } + } + } + + spanAdapter?.onBeforeSpanStarted(span) + return span } /** - * End Execute Tool Span and set final attributes. + * End Execute Tool Span and set final attributes. The provided [spanAdapter] is invoked via + * [SpanAdapter.onBeforeSpanFinished] after all attributes are set and immediately before the + * underlying span is ended. * * Add the necessary attributes for the Execute Tool Span, according to the OpenTelemetry Semantic Convention: * https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#execute-tool-span @@ -82,12 +132,19 @@ internal fun startExecuteToolSpan( * Span attribute: * - error.type (conditional) * - gen_ai.tool.call.result (recommended) + * + * @param span The span to end. + * @param toolResult The result of the tool call. + * @param error The error that occurred during the tool call, if any. + * @param verbose Whether to log verbose information. + * @param spanAdapter Optional span adapter for customizing the span behavior. */ internal fun endExecuteToolSpan( span: GenAIAgentSpan, toolResult: JsonElement?, error: Throwable? = null, - verbose: Boolean = false + verbose: Boolean = false, + spanAdapter: SpanAdapter? = null, ) { check(span.type == SpanType.EXECUTE_TOOL) { "${span.logString} Expected to end span type of type: <${SpanType.EXECUTE_TOOL}>, but received span of type: <${span.type}>" @@ -101,5 +158,61 @@ internal fun endExecuteToolSpan( span.addAttribute(GenAIAttributes.Tool.Call.Result(result)) } + spanAdapter?.onBeforeSpanFinished(span) span.end(error.toStatusData(), verbose) } + +/** + * Adds MCP-specific attributes to this Execute Tool span. Caller must ensure the receiver wraps an MCP tool invocation. + * The type is not checked here. Attribute requirement levels follow the OpenTelemetry GenAI/MCP + * semantic conventions and are noted next to each `addAttribute` call. + * + * @param toolName The name of the MCP tool being called; + * @param methodName MCP method invoked; + * @param serverAddress MCP server address, when known; + * @param serverPort MCP server port, when known; + * @param sessionId MCP session identifier, when the call is part of a session; + * @param mcpProtocolVersion MCP protocol version in use; + * @param mcpTransportType Transport used to reach the MCP server ("stdio", "http"); + * @return The receiver span, for chaining. + */ +private fun GenAIAgentSpan.enrichExecuteToolSpanWithMcpAttributes( + toolName: String, + methodName: String, + serverAddress: String? = null, + serverPort: Int? = null, + sessionId: String? = null, + mcpProtocolVersion: String, + mcpTransportType: String, +): GenAIAgentSpan { + // mcp.method.name (REQUIRED) + addAttribute(McpAttributes.Mcp.Method.Name(methodName)) + + // gen_ai.operation.name (RECOMMENDED for tool calls) + addAttribute(GenAIAttributes.Operation.Name(GenAIAttributes.Operation.OperationNameType.EXECUTE_TOOL)) + + // gen_ai.tool.name (CONDITIONALLY REQUIRED) + addAttribute(GenAIAttributes.Tool.Name(toolName)) + + // mcp.session.id (RECOMMENDED) + sessionId?.let { session -> + addAttribute(McpAttributes.Mcp.Session.Id(session)) + } + + // mcp.protocol.version (RECOMMENDED) + addAttribute(McpAttributes.Mcp.Protocol.Version(mcpProtocolVersion)) + // network.transport (RECOMMENDED) + addAttribute(McpAttributes.Network.Transport(mcpTransportType)) + + // server.address (RECOMMENDED) + serverAddress?.let { address -> + addAttribute(CommonAttributes.Server.Address(address)) + } + + // server.port (RECOMMENDED) + serverPort?.let { port -> + addAttribute(CommonAttributes.Server.Port(port)) + } + + return this +} diff --git a/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/inferenceSpan.kt b/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/inferenceSpan.kt index 96642b9b57..b863b84c01 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/inferenceSpan.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/inferenceSpan.kt @@ -9,6 +9,7 @@ import ai.koog.agents.features.opentelemetry.extension.sumInputTokens import ai.koog.agents.features.opentelemetry.extension.sumOutputTokens import ai.koog.agents.features.opentelemetry.extension.systemMessages import ai.koog.agents.features.opentelemetry.extension.toStatusData +import ai.koog.agents.features.opentelemetry.integration.SpanAdapter import ai.koog.prompt.llm.LLMProvider import ai.koog.prompt.llm.LLModel import ai.koog.prompt.message.Message @@ -47,6 +48,18 @@ import kotlinx.serialization.json.JsonObject * * Custom attributes: * - koog.event.id + * + * @param tracer The tracer instance to use for creating the span. + * @param contextFactory The context factory to use for creating the span context. + * @param parentSpan The parent span for the new span. + * @param id The unique identifier for the span. + * @param provider The LLM provider for the inference. + * @param runId The unique identifier for the run. + * @param model The model used for inference. + * @param messages The list of messages used in the inference. + * @param llmParams The parameters for the LLM request. + * @param tools The list of tools available for the inference. + * @param spanAdapter Optional span adapter for customizing the span behavior. */ internal fun startInferenceSpan( tracer: Tracer, @@ -58,7 +71,8 @@ internal fun startInferenceSpan( model: LLModel, messages: List, llmParams: LLMParams, - tools: List + tools: List, + spanAdapter: SpanAdapter? = null, ): GenAIAgentSpan { val builder = GenAIAgentSpanBuilder( spanType = SpanType.INFERENCE, @@ -128,11 +142,15 @@ internal fun startInferenceSpan( builder.addAttribute(KoogAttributes.Koog.Event.Id(id)) - return builder.buildAndStart(tracer, contextFactory) + val span = builder.buildAndStart(tracer, contextFactory) + spanAdapter?.onBeforeSpanStarted(span) + return span } /** - * End Inference Span and set final attributes. + * End Inference Span and set final attributes. The provided [spanAdapter] is invoked via + * [SpanAdapter.onBeforeSpanFinished] after all attributes are set and immediately before the + * underlying span is ended. * * Add the necessary attributes for the Inference Span according to the OpenTelemetry Semantic Convention: * https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/#inference @@ -146,13 +164,21 @@ internal fun startInferenceSpan( * - gen_ai.usage.input_tokens (recommended) * - gen_ai.usage.output_tokens (recommended) * - gen_ai.output.messages (recommended) + * + * @param span The span to end. + * @param messages The list of messages used in the inference. + * @param model The model used for inference. + * @param error The error that occurred during inference if any. + * @param verbose Whether to log verbose information. + * @param spanAdapter Optional span adapter for customizing the span behavior. */ internal fun endInferenceSpan( span: GenAIAgentSpan, messages: List, model: LLModel, error: Throwable? = null, - verbose: Boolean = false + verbose: Boolean = false, + spanAdapter: SpanAdapter? = null, ) { check(span.type == SpanType.INFERENCE) { "${span.logString} Expected to end span type of type: <${SpanType.INFERENCE}>, but received span of type: <${span.type}>" @@ -184,5 +210,6 @@ internal fun endInferenceSpan( span.addAttribute(GenAIAttributes.Output.Messages(messages)) } + spanAdapter?.onBeforeSpanFinished(span) span.end(error.toStatusData(), verbose) } diff --git a/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/invokeAgentSpan.kt b/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/invokeAgentSpan.kt index af7368f532..ef8933d800 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/invokeAgentSpan.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/invokeAgentSpan.kt @@ -8,6 +8,7 @@ import ai.koog.agents.features.opentelemetry.extension.sumInputTokens import ai.koog.agents.features.opentelemetry.extension.sumOutputTokens import ai.koog.agents.features.opentelemetry.extension.systemMessages import ai.koog.agents.features.opentelemetry.extension.toStatusData +import ai.koog.agents.features.opentelemetry.integration.SpanAdapter import ai.koog.prompt.llm.LLModel import ai.koog.prompt.message.Message import ai.koog.prompt.params.LLMParams @@ -47,6 +48,18 @@ import io.opentelemetry.kotlin.tracing.Tracer * * Custom attributes: * - koog.event.id + * + * @param tracer The tracer instance to use for creating the span. + * @param contextFactory The context factory to use for creating the span context. + * @param parentSpan The parent span for the new span. + * @param id The unique identifier for the span. + * @param model The model used for inference. + * @param runId The unique identifier for the run. + * @param agentId The unique identifier for the agent. + * @param llmParams The parameters for the LLM request. + * @param messages The list of messages used in the inference. + * @param tools The list of tools available for the inference. + * @param spanAdapter Optional span adapter for customizing the span behavior. */ internal fun startInvokeAgentSpan( tracer: Tracer, @@ -58,7 +71,8 @@ internal fun startInvokeAgentSpan( agentId: String, llmParams: LLMParams, messages: List, - tools: List + tools: List, + spanAdapter: SpanAdapter? = null, ): GenAIAgentSpan { val builder = GenAIAgentSpanBuilder( spanType = SpanType.INVOKE_AGENT, @@ -133,11 +147,15 @@ internal fun startInvokeAgentSpan( builder.addAttribute(KoogAttributes.Koog.Event.Id(id)) - return builder.buildAndStart(tracer, contextFactory) + val span = builder.buildAndStart(tracer, contextFactory) + spanAdapter?.onBeforeSpanStarted(span) + return span } /** - * End Invoke Agent Span and set final attributes. + * End Invoke Agent Span and set final attributes. The provided [spanAdapter] is invoked via + * [SpanAdapter.onBeforeSpanFinished] after all attributes are set and immediately before the + * underlying span is ended. * * Add the necessary attributes for the Invoke Agent Span, according to the OpenTelemetry Semantic Convention: * https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-agent-spans/#invoke-agent-span @@ -150,13 +168,21 @@ internal fun startInvokeAgentSpan( * - gen_ai.usage.input_tokens (recommended) * - gen_ai.usage.output_tokens (recommended) * - gen_ai.output.messages (recommended) + * + * @param span The span to end. + * @param messages The list of messages used in the inference. + * @param model The model used for inference. + * @param error The error that occurred during inference if any. + * @param verbose Whether to log verbose information. + * @param spanAdapter Optional span adapter for customizing the span behavior. */ internal fun endInvokeAgentSpan( span: GenAIAgentSpan, messages: List, model: LLModel, error: Throwable? = null, - verbose: Boolean = false + verbose: Boolean = false, + spanAdapter: SpanAdapter? = null, ) { check(span.type == SpanType.INVOKE_AGENT) { "${span.logString} Expected to end span type of type: <${SpanType.INVOKE_AGENT}>, but received span of type: <${span.type}>" @@ -181,5 +207,6 @@ internal fun endInvokeAgentSpan( span.addAttribute(GenAIAttributes.Output.Messages(messages)) } + spanAdapter?.onBeforeSpanFinished(span) span.end(error.toStatusData(), verbose) } diff --git a/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/mcpClientSpan.kt b/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/mcpClientSpan.kt deleted file mode 100644 index 39918bc4a7..0000000000 --- a/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/mcpClientSpan.kt +++ /dev/null @@ -1,58 +0,0 @@ -package ai.koog.agents.features.opentelemetry.span - -import ai.koog.agents.features.opentelemetry.attribute.CommonAttributes -import ai.koog.agents.features.opentelemetry.attribute.GenAIAttributes -import ai.koog.agents.features.opentelemetry.attribute.McpAttributes - -/** - * Enriches an Execute Tool Span with MCP-specific attributes. - * - * @param toolName The name of the tool being called. - * @param methodName The MCP method name string (e.g., "tools/call"). - * @param serverAddress The server address for client spans (recommended). - * @param serverPort The server port for client spans (recommended). - * @param sessionId The MCP session identifier (recommended when part of a session). - * @param mcpProtocolVersion The MCP protocol version in use (recommended). - * @param mcpTransportType The transport type used for communication (recommended). - * @return This span with MCP attributes added. - */ -internal fun GenAIAgentSpan.enrichExecuteToolSpanWithMcpAttrs( - toolName: String, - methodName: String, - serverAddress: String? = null, - serverPort: Int? = null, - sessionId: String? = null, - mcpProtocolVersion: String, - mcpTransportType: String, -): GenAIAgentSpan { - // mcp.method.name (REQUIRED) - addAttribute(McpAttributes.Mcp.Method.Name(methodName)) - - // gen_ai.operation.name (RECOMMENDED for tool calls) - addAttribute(GenAIAttributes.Operation.Name(GenAIAttributes.Operation.OperationNameType.EXECUTE_TOOL)) - - // gen_ai.tool.name (CONDITIONALLY REQUIRED) - addAttribute(GenAIAttributes.Tool.Name(toolName)) - - // mcp.session.id (RECOMMENDED) - sessionId?.let { session -> - addAttribute(McpAttributes.Mcp.Session.Id(session)) - } - - // mcp.protocol.version (RECOMMENDED) - addAttribute(McpAttributes.Mcp.Protocol.Version(mcpProtocolVersion)) - // network.transport (RECOMMENDED) - addAttribute(McpAttributes.Network.Transport(mcpTransportType)) - - // server.address (RECOMMENDED) - serverAddress?.let { address -> - addAttribute(CommonAttributes.Server.Address(address)) - } - - // server.port (RECOMMENDED) - serverPort?.let { port -> - addAttribute(CommonAttributes.Server.Port(port)) - } - - return this -} diff --git a/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/nodeExecuteSpan.kt b/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/nodeExecuteSpan.kt index ee77ace772..2a0881a533 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/nodeExecuteSpan.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/nodeExecuteSpan.kt @@ -4,6 +4,7 @@ import ai.koog.agents.features.opentelemetry.attribute.GenAIAttributes import ai.koog.agents.features.opentelemetry.attribute.KoogAttributes import ai.koog.agents.features.opentelemetry.extension.addCommonErrorAttributes import ai.koog.agents.features.opentelemetry.extension.toStatusData +import ai.koog.agents.features.opentelemetry.integration.SpanAdapter import io.opentelemetry.kotlin.factory.ContextFactory import io.opentelemetry.kotlin.tracing.SpanKind import io.opentelemetry.kotlin.tracing.Tracer @@ -21,6 +22,16 @@ import io.opentelemetry.kotlin.tracing.Tracer * - koog.node.id * - koog.node.input (conditional) * - koog.event.id + * + * @param tracer The tracer instance to use for creating the span. + * @param contextFactory The context factory to use for creating the span context. + * @param parentSpan The parent span for this node execute span. + * @param id The unique identifier for this span. + * @param runId The conversation identifier for the parent run. + * @param nodeId The unique identifier for this node. + * @param nodeInput The input provided to the node (optional). + * @param spanAdapter The span adapter to use for custom span behavior (optional). + * @return The newly created and started GenAIAgentSpan. */ internal fun startNodeExecuteSpan( tracer: Tracer, @@ -30,6 +41,7 @@ internal fun startNodeExecuteSpan( runId: String, nodeId: String, nodeInput: String?, + spanAdapter: SpanAdapter? = null, ): GenAIAgentSpan { val builder = GenAIAgentSpanBuilder( spanType = SpanType.NODE, @@ -47,11 +59,15 @@ internal fun startNodeExecuteSpan( builder.addAttribute(KoogAttributes.Koog.Event.Id(id)) - return builder.buildAndStart(tracer, contextFactory) + val span = builder.buildAndStart(tracer, contextFactory) + spanAdapter?.onBeforeSpanStarted(span) + return span } /** - * End Node Execute Span and set final attributes. + * End Node Execute Span and set final attributes. The provided [spanAdapter] is invoked via + * [SpanAdapter.onBeforeSpanFinished] after all attributes are set and immediately before the + * underlying span is ended. * * Note: This span is out of scope of the OpenTelemetry Semantic Convention for GenAI. * It is a custom span used to support Koog events hierarchy. @@ -61,12 +77,19 @@ internal fun startNodeExecuteSpan( * * Custom attributes: * - koog.node.output (conditional) + * + * @param span The GenAIAgentSpan to end. + * @param nodeOutput The output produced by the node (optional). + * @param error The error that occurred during the node execution (optional). + * @param verbose Whether to log verbose information during span ending (default: false). + * @param spanAdapter The span adapter to use for custom span behavior (optional). */ internal fun endNodeExecuteSpan( span: GenAIAgentSpan, nodeOutput: String?, error: Throwable? = null, - verbose: Boolean = false + verbose: Boolean = false, + spanAdapter: SpanAdapter? = null, ) { check(span.type == SpanType.NODE) { "${span.logString} Expected to end span type of type: <${SpanType.NODE}>, but received span of type: <${span.type}>" @@ -79,5 +102,6 @@ internal fun endNodeExecuteSpan( span.addAttribute(KoogAttributes.Koog.Node.Output(output)) } + spanAdapter?.onBeforeSpanFinished(span) span.end(error.toStatusData(), verbose) } diff --git a/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/strategySpan.kt b/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/strategySpan.kt index 897105a554..7814a5c445 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/strategySpan.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/strategySpan.kt @@ -4,6 +4,7 @@ import ai.koog.agents.features.opentelemetry.attribute.GenAIAttributes import ai.koog.agents.features.opentelemetry.attribute.KoogAttributes import ai.koog.agents.features.opentelemetry.extension.addCommonErrorAttributes import ai.koog.agents.features.opentelemetry.extension.toStatusData +import ai.koog.agents.features.opentelemetry.integration.SpanAdapter import io.opentelemetry.kotlin.factory.ContextFactory import io.opentelemetry.kotlin.tracing.SpanKind import io.opentelemetry.kotlin.tracing.Tracer @@ -21,6 +22,15 @@ import io.opentelemetry.kotlin.tracing.Tracer * Custom attributes: * - koog.strategy.name * - koog.event.id + * + * @param tracer The tracer instance to use for creating the span. + * @param contextFactory The context factory to use for creating the span context. + * @param parentSpan The parent span for this strategy span. + * @param id The unique identifier for this span. + * @param runId The conversation identifier for the parent run. + * @param strategyName The name of the strategy being executed. + * @param spanAdapter The span adapter to use for custom span behavior (optional). + * @return The newly created and started GenAIAgentSpan. */ internal fun startStrategySpan( tracer: Tracer, @@ -29,6 +39,7 @@ internal fun startStrategySpan( id: String, runId: String, strategyName: String, + spanAdapter: SpanAdapter? = null, ): GenAIAgentSpan { val builder = GenAIAgentSpanBuilder( spanType = SpanType.STRATEGY, @@ -41,11 +52,15 @@ internal fun startStrategySpan( .addAttribute(KoogAttributes.Koog.Strategy.Name(strategyName)) .addAttribute(KoogAttributes.Koog.Event.Id(id)) - return builder.buildAndStart(tracer, contextFactory) + val span = builder.buildAndStart(tracer, contextFactory) + spanAdapter?.onBeforeSpanStarted(span) + return span } /** - * End Strategy Span and set final attributes. + * End Strategy Span and set final attributes. The provided [spanAdapter] is invoked via + * [SpanAdapter.onBeforeSpanFinished] after all attributes are set and immediately before the + * underlying span is ended. * * Note: This span is not a standard span type defined in the OpenTelemetry * Semantic Conventions but is designed to provide support for tracing @@ -53,11 +68,17 @@ internal fun startStrategySpan( * * Span attributes: * - error.type (conditional) + * + * @param span The GenAIAgentSpan to end. + * @param error The error that occurred during the strategy execution (optional). + * @param verbose Whether to log verbose information during span ending (default: false). + * @param spanAdapter The span adapter to use for custom span behavior (optional). */ internal fun endStrategySpan( span: GenAIAgentSpan, error: Throwable? = null, verbose: Boolean = false, + spanAdapter: SpanAdapter? = null, ) { check(span.type == SpanType.STRATEGY) { "${span.logString} Expected to end span type of type: <${SpanType.STRATEGY}>, but received span of type: <${span.type}>" @@ -66,5 +87,6 @@ internal fun endStrategySpan( // error.type span.addCommonErrorAttributes(error) + spanAdapter?.onBeforeSpanFinished(span) span.end(error.toStatusData(), verbose) } diff --git a/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/subgraphExecuteSpan.kt b/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/subgraphExecuteSpan.kt index 7f79c1f054..30dbe0be59 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/subgraphExecuteSpan.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/commonMain/kotlin/ai/koog/agents/features/opentelemetry/span/subgraphExecuteSpan.kt @@ -4,6 +4,7 @@ import ai.koog.agents.features.opentelemetry.attribute.GenAIAttributes import ai.koog.agents.features.opentelemetry.attribute.KoogAttributes import ai.koog.agents.features.opentelemetry.extension.addCommonErrorAttributes import ai.koog.agents.features.opentelemetry.extension.toStatusData +import ai.koog.agents.features.opentelemetry.integration.SpanAdapter import io.opentelemetry.kotlin.factory.ContextFactory import io.opentelemetry.kotlin.tracing.SpanKind import io.opentelemetry.kotlin.tracing.Tracer @@ -21,6 +22,16 @@ import io.opentelemetry.kotlin.tracing.Tracer * - koog.subgraph.id * - koog.subgraph.input (conditional) * - koog.event.id + * + * @param tracer The tracer instance to use for creating the span. + * @param contextFactory The context factory to use for creating the span context. + * @param parentSpan The parent span for this subgraph execution span. + * @param id The unique identifier for this span. + * @param runId The conversation identifier for the parent run. + * @param subgraphId The unique identifier for the subgraph being executed. + * @param subgraphInput The input provided to the subgraph (optional). + * @param spanAdapter The span adapter to use for custom span behavior (optional). + * @return The newly created and started GenAIAgentSpan. */ internal fun startSubgraphExecuteSpan( tracer: Tracer, @@ -30,6 +41,7 @@ internal fun startSubgraphExecuteSpan( runId: String, subgraphId: String, subgraphInput: String?, + spanAdapter: SpanAdapter? = null, ): GenAIAgentSpan { val builder = GenAIAgentSpanBuilder( spanType = SpanType.SUBGRAPH, @@ -47,11 +59,15 @@ internal fun startSubgraphExecuteSpan( builder.addAttribute(KoogAttributes.Koog.Event.Id(id)) - return builder.buildAndStart(tracer, contextFactory) + val span = builder.buildAndStart(tracer, contextFactory) + spanAdapter?.onBeforeSpanStarted(span) + return span } /** - * End Subgraph Execute Span and set final attributes. + * End Subgraph Execute Span and set final attributes. The provided [spanAdapter] is invoked via + * [SpanAdapter.onBeforeSpanFinished] after all attributes are set and immediately before the + * underlying span is ended. * * Note: This span is out of scope of the Open Telemetry Semantic Convention for GenAI. * It is a custom span used to support Koog events hierarchy. @@ -61,12 +77,19 @@ internal fun startSubgraphExecuteSpan( * * Custom attributes: * - koog.subgraph.output (conditional) + * + * @param span The GenAIAgentSpan to end. + * @param subgraphOutput The output produced by the subgraph (optional). + * @param error The error that occurred during the subgraph execution (optional). + * @param verbose Whether to log verbose information during span ending (default: false). + * @param spanAdapter The span adapter to use for custom span behavior (optional). */ internal fun endSubgraphExecuteSpan( span: GenAIAgentSpan, subgraphOutput: String?, error: Throwable? = null, - verbose: Boolean = false + verbose: Boolean = false, + spanAdapter: SpanAdapter? = null, ) { check(span.type == SpanType.SUBGRAPH) { "${span.logString} Expected to end span type of type: <${SpanType.SUBGRAPH}>, but received span of type: <${span.type}>" @@ -79,5 +102,6 @@ internal fun endSubgraphExecuteSpan( span.addAttribute(KoogAttributes.Koog.Subgraph.Output(output)) } + spanAdapter?.onBeforeSpanFinished(span) span.end(error.toStatusData(), verbose) } diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetryFeatureTest.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetryFeatureTest.kt index 3f5fdda2cb..18498e4a55 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetryFeatureTest.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetryFeatureTest.kt @@ -1,9 +1,11 @@ package ai.koog.agents.features.opentelemetry.feature import ai.koog.agents.core.agent.execution.AgentExecutionInfo +import ai.koog.agents.features.opentelemetry.integration.SpanAdapter import ai.koog.agents.features.opentelemetry.mock.MockContextFactory import ai.koog.agents.features.opentelemetry.mock.MockSpan import ai.koog.agents.features.opentelemetry.mock.MockTracer +import ai.koog.agents.features.opentelemetry.span.GenAIAgentSpan import ai.koog.agents.features.opentelemetry.span.GenAIAgentSpanBuilder import ai.koog.agents.features.opentelemetry.span.SpanCollector import ai.koog.agents.features.opentelemetry.span.SpanType @@ -103,7 +105,7 @@ class OpenTelemetryFeatureTest { assertEquals(2, spanCollector.activeSpansCount) // End all unfinished spans - openTelemetry.endUnfinishedSpans(spanCollector, verbose = false) + openTelemetry.endUnfinishedSpans(spanCollector, spanAdapter = null, verbose = false) // Verify all spans are ended and removed assertEquals(0, spanCollector.activeSpansCount) @@ -151,7 +153,7 @@ class OpenTelemetryFeatureTest { assertEquals(3, spanCollector.activeSpansCount) // End only NODE spans (filter out CREATE_AGENT and INVOKE_AGENT) - openTelemetry.endUnfinishedSpans(spanCollector, verbose = false) { span -> + openTelemetry.endUnfinishedSpans(spanCollector, spanAdapter = null, verbose = false) { span -> span.type == SpanType.NODE } @@ -216,7 +218,7 @@ class OpenTelemetryFeatureTest { assertEquals(4, spanCollector.activeSpansCount) // End all spans - should handle hierarchy correctly (leaf to root) - openTelemetry.endUnfinishedSpans(spanCollector, verbose = false) + openTelemetry.endUnfinishedSpans(spanCollector, spanAdapter = null, verbose = false) // Verify all spans are ended assertEquals(0, spanCollector.activeSpansCount) @@ -232,5 +234,49 @@ class OpenTelemetryFeatureTest { assertTrue(mockLevel3Span.isEnded) } + @Test + fun testEndUnfinishedSpans_InvokesSpanAdapterOnBeforeSpanFinished() = runTest { + val spanCollector = SpanCollector() + val tracer = MockTracer() + val contextFactory = MockContextFactory() + val openTelemetry = OpenTelemetry.Feature + + val finishedSpans = mutableListOf() + val adapter = object : SpanAdapter() { + override fun onBeforeSpanFinished(span: GenAIAgentSpan) { + finishedSpans += span + } + } + + val createAgentSpan = GenAIAgentSpanBuilder( + spanType = SpanType.CREATE_AGENT, + parentSpan = null, + id = "create-agent", + name = "create-agent-name", + kind = SpanKind.INTERNAL, + ).buildAndStart(tracer, contextFactory) + spanCollector.collectSpan(createAgentSpan, AgentExecutionInfo(null, "create-agent")) + + val nodeSpan = GenAIAgentSpanBuilder( + spanType = SpanType.NODE, + parentSpan = createAgentSpan, + id = "node", + name = "node-name", + kind = SpanKind.INTERNAL, + ).buildAndStart(tracer, contextFactory) + spanCollector.collectSpan(nodeSpan, AgentExecutionInfo(AgentExecutionInfo(null, "create-agent"), "node")) + + openTelemetry.endUnfinishedSpans( + spanCollector = spanCollector, + spanAdapter = adapter, + verbose = false, + ) + + assertEquals(0, spanCollector.activeSpansCount) + assertEquals(setOf("create-agent", "node"), finishedSpans.map { it.id }.toSet()) + assertTrue((createAgentSpan.span as MockSpan).isEnded) + assertTrue((nodeSpan.span as MockSpan).isEnded) + } + //endregion End Unfinished Spans } diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/feature/span/OpenTelemetryInferenceSpanTest.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/feature/span/OpenTelemetryInferenceSpanTest.kt index 224d55b66f..c4f70c90b2 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/feature/span/OpenTelemetryInferenceSpanTest.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/feature/span/OpenTelemetryInferenceSpanTest.kt @@ -1,6 +1,7 @@ package ai.koog.agents.features.opentelemetry.feature.span import ai.koog.agents.core.agent.AIAgent +import ai.koog.agents.core.agent.context.DetachedPromptExecutorAPI import ai.koog.agents.core.agent.entity.AIAgentSubgraphBase.Companion.START_NODE_PREFIX import ai.koog.agents.core.agent.singleRunStrategy import ai.koog.agents.core.dsl.builder.node @@ -61,8 +62,11 @@ import kotlin.test.assertTrue import kotlin.time.Duration.Companion.seconds class OpenTelemetryInferenceSpanTest : OpenTelemetryTestBase() { + private val serializer = KotlinxSerializer() + private val json = Json { allowStructuredMapKeys = true } + @ParameterizedTest @EnumSource(AgentType::class) fun `test inference spans are collected`(agentType: AgentType) = runTest { @@ -568,7 +572,6 @@ class OpenTelemetryInferenceSpanTest : OpenTelemetryTestBase() { } } - @OptIn(ai.koog.agents.core.agent.context.DetachedPromptExecutorAPI::class) @Test fun `test moderation response is recorded as koog moderation result attribute`() = runTest { val userInput = "I want to build a bomb" @@ -590,6 +593,7 @@ class OpenTelemetryInferenceSpanTest : OpenTelemetryTestBase() { val moderationPrompt = prompt("single-message-moderation") { message(Message.User(input, RequestMetaInfo.create(testClock))) } + @OptIn(DetachedPromptExecutorAPI::class) llm.promptExecutor.moderate(moderationPrompt, moderationModel) } input @@ -610,7 +614,7 @@ class OpenTelemetryInferenceSpanTest : OpenTelemetryTestBase() { verbose = true, ) - val expectedJson = Json { allowStructuredMapKeys = true } + val expectedJson = json .encodeToString(ModerationResult.serializer(), moderationResult) // The moderation call goes through ContextualPromptExecutor, which fires LLMCallStarting / diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/integration/TraceStructureTestBase.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/integration/TraceStructureTestBase.kt index 5c8535c78c..282d06eab3 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/integration/TraceStructureTestBase.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/integration/TraceStructureTestBase.kt @@ -59,6 +59,20 @@ abstract class TraceStructureTestBase(private val openTelemetryConfigurator: Ope private val json = Json { allowStructuredMapKeys = true } private val serializer = KotlinxSerializer() + /** + * Name of the inference-span input-token-usage attribute as it appears AFTER any [SpanAdapter] + * has run. Defaults to the OTel GenAI semconv name; adapters that rename the attribute (e.g. + * Weave → `gen_ai.usage.prompt_tokens`) override this. + */ + protected open val inputTokensAttributeName: String = "gen_ai.usage.input_tokens" + + /** + * Name of the inference-span output-token-usage attribute as it appears AFTER any [SpanAdapter] + * has run. Defaults to the OTel GenAI semconv name; adapters that rename the attribute (e.g. + * Weave → `gen_ai.usage.completion_tokens`) override this. + */ + protected open val outputTokensAttributeName: String = "gen_ai.usage.output_tokens" + @Test fun testSingleLLMCall() = runBlocking { TestSpanProcessor().let { testProcessor -> @@ -112,8 +126,8 @@ abstract class TraceStructureTestBase(private val openTelemetryConfigurator: Ope "gen_ai.request.temperature" to temperature, "gen_ai.response.finish_reasons" to listOf(FinishReasonType.Stop.id), "gen_ai.response.model" to model.id, - "gen_ai.usage.input_tokens" to 0L, - "gen_ai.usage.output_tokens" to 0L, + inputTokensAttributeName to 0L, + outputTokensAttributeName to 0L, "koog.event.id" to llmCallEventId, "gen_ai.input.messages" to getMessagesString( listOf( @@ -598,8 +612,8 @@ abstract class TraceStructureTestBase(private val openTelemetryConfigurator: Ope "gen_ai.request.temperature" to temperature, "gen_ai.response.finish_reasons" to listOf(FinishReasonType.Stop.id), "gen_ai.response.model" to model.id, - "gen_ai.usage.input_tokens" to 0L, - "gen_ai.usage.output_tokens" to 0L, + inputTokensAttributeName to 0L, + outputTokensAttributeName to 0L, "koog.event.id" to llmCallEventId, "gen_ai.input.messages" to getMessagesString( listOf( @@ -716,8 +730,8 @@ abstract class TraceStructureTestBase(private val openTelemetryConfigurator: Ope val finalLLMEventId = testData.singleAttributeValue(actualLLMSpans[1], "koog.event.id") // Get actual token counts from spans - val inputTokens0 = getAttributeValue(actualLLMSpans[0], "gen_ai.usage.input_tokens") ?: 0L - val inputTokens1 = getAttributeValue(actualLLMSpans[1], "gen_ai.usage.input_tokens") ?: 0L + val inputTokens0 = getAttributeValue(actualLLMSpans[0], inputTokensAttributeName) ?: 0L + val inputTokens1 = getAttributeValue(actualLLMSpans[1], inputTokensAttributeName) ?: 0L // Build expected spans using abstract methods val expectedInitialLLMSpanAttributes = @@ -734,7 +748,7 @@ abstract class TraceStructureTestBase(private val openTelemetryConfigurator: Ope ).toLong() ).plus( mapOf( - "gen_ai.usage.input_tokens" to inputTokens0, + inputTokensAttributeName to inputTokens0, "koog.event.id" to initialLLMEventId, ) ) @@ -753,7 +767,7 @@ abstract class TraceStructureTestBase(private val openTelemetryConfigurator: Ope outputTokens = tokenizer.countTokens(text = finalResponse).toLong(), ).plus( mapOf( - "gen_ai.usage.input_tokens" to inputTokens1, + inputTokensAttributeName to inputTokens1, "koog.event.id" to finalLLMEventId, ) ) diff --git a/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/integration/weave/WeaveTraceStructureTest.kt b/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/integration/weave/WeaveTraceStructureTest.kt index 5cfa08af6b..5118ce133c 100644 --- a/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/integration/weave/WeaveTraceStructureTest.kt +++ b/agents/agents-features/agents-features-opentelemetry/src/jvmTest/kotlin/ai/koog/agents/features/opentelemetry/integration/weave/WeaveTraceStructureTest.kt @@ -17,6 +17,9 @@ import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable class WeaveTraceStructureTest : TraceStructureTestBase(openTelemetryConfigurator = { addWeaveExporter() }) { + override val inputTokensAttributeName: String = "gen_ai.usage.prompt_tokens" + override val outputTokensAttributeName: String = "gen_ai.usage.completion_tokens" + override fun testLLMCallToolCallLLMCallGetExpectedInitialLLMCallSpanAttributes( model: LLModel, temperature: Double, @@ -51,8 +54,8 @@ class WeaveTraceStructureTest : "gen_ai.request.temperature" to temperature, "gen_ai.request.model" to model.id, "gen_ai.response.model" to model.id, - "gen_ai.usage.input_tokens" to 0L, - "gen_ai.usage.output_tokens" to 0L, + "gen_ai.usage.prompt_tokens" to 0L, + "gen_ai.usage.completion_tokens" to 0L, "gen_ai.input.messages" to inputMessages, "system_instructions" to systemInstructions, "gen_ai.output.messages" to outputMessages, @@ -119,8 +122,8 @@ class WeaveTraceStructureTest : "gen_ai.request.temperature" to temperature, "gen_ai.request.model" to model.id, "gen_ai.response.model" to model.id, - "gen_ai.usage.input_tokens" to 0L, - "gen_ai.usage.output_tokens" to 0L, + "gen_ai.usage.prompt_tokens" to 0L, + "gen_ai.usage.completion_tokens" to 0L, "gen_ai.input.messages" to inputMessages, "system_instructions" to systemInstructions, "gen_ai.output.messages" to outputMessages, @@ -183,8 +186,8 @@ class WeaveTraceStructureTest : "gen_ai.request.max_tokens" to maxTokens, "gen_ai.request.model" to model.id, "gen_ai.response.model" to model.id, - "gen_ai.usage.input_tokens" to 0L, - "gen_ai.usage.output_tokens" to outputTokens, + "gen_ai.usage.prompt_tokens" to 0L, + "gen_ai.usage.completion_tokens" to outputTokens, "gen_ai.input.messages" to inputMessages, "system_instructions" to systemInstructions, "gen_ai.output.messages" to outputMessages, @@ -266,8 +269,8 @@ class WeaveTraceStructureTest : "gen_ai.request.model" to model.id, "gen_ai.request.max_tokens" to maxTokens, "gen_ai.response.model" to model.id, - "gen_ai.usage.input_tokens" to 0L, - "gen_ai.usage.output_tokens" to outputTokens, + "gen_ai.usage.prompt_tokens" to 0L, + "gen_ai.usage.completion_tokens" to outputTokens, "gen_ai.input.messages" to inputMessages, "system_instructions" to systemInstructions, "gen_ai.output.messages" to outputMessages,