Skip to content

Commit 5e04b42

Browse files
authored
feat(agents): upgrade sdk to 0.11.1, add streamable transport (#1870)
Upgrades MCP kotlin-sdk from 0.8.1 to 0.11.1 and makes Streamable HTTP the primary MCP transport for both client and server closes #1674 closes [KG-792](https://youtrack.jetbrains.com/issue/KG-792) closes [KG-756](https://youtrack.jetbrains.com/issue/KG-756) closes [KG-49](https://youtrack.jetbrains.com/issue/KG-49) closes [KG-755](https://youtrack.jetbrains.com/issue/KG-755) ### DEPRECATED: * `startSseMcpServer(factory, port, host, tools)` -- use `startMcpServer(factory, tools, port, host)` * `startSseMcpServer(factory, host, tools)` -- use `startMcpServer(factory, tools, host)`
1 parent 510e6cf commit 5e04b42

14 files changed

Lines changed: 1053 additions & 63 deletions

File tree

agents/agents-mcp-metadata/src/commonMain/kotlin/ai/koog/agents/mcp/metadata/McpToolSupport.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,15 @@ public enum class McpTransportType(public val value: String) {
7676
*/
7777
Tcp("tcp"),
7878

79+
/**
80+
* HTTP-based Streamable HTTP transport.
81+
*
82+
* This transport uses HTTP POST for sending messages and SSE for receiving,
83+
* supporting bidirectional communication, session management, and reconnection.
84+
* It is the recommended transport for remote MCP servers, replacing the legacy SSE transport.
85+
*/
86+
StreamableHttp("http"),
87+
7988
/**
8089
* Represents an unknown or undefined transport protocol type.
8190
*

agents/agents-mcp-server/src/commonMain/kotlin/ai/koog/agents/mcp/server/McpServer.kt

Lines changed: 85 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,23 @@ import ai.koog.agents.core.tools.annotations.InternalAgentToolsApi
99
import ai.koog.serialization.JSONSerializer
1010
import ai.koog.serialization.kotlinx.KotlinxSerializer
1111
import ai.koog.serialization.kotlinx.toKoogJSONObject
12+
import io.ktor.server.application.Application
1213
import io.ktor.server.engine.ApplicationEngineFactory
1314
import io.ktor.server.engine.EmbeddedServer
1415
import io.ktor.server.engine.EngineConnectorConfig
1516
import io.ktor.server.engine.embeddedServer
1617
import io.modelcontextprotocol.kotlin.sdk.server.Server
1718
import io.modelcontextprotocol.kotlin.sdk.server.ServerOptions
1819
import io.modelcontextprotocol.kotlin.sdk.server.mcp
20+
import io.modelcontextprotocol.kotlin.sdk.server.mcpStreamableHttp
1921
import io.modelcontextprotocol.kotlin.sdk.types.CallToolResult
2022
import io.modelcontextprotocol.kotlin.sdk.types.EmptyJsonObject
2123
import io.modelcontextprotocol.kotlin.sdk.types.Implementation
2224
import io.modelcontextprotocol.kotlin.sdk.types.ServerCapabilities
2325
import io.modelcontextprotocol.kotlin.sdk.types.TextContent
2426
import io.modelcontextprotocol.kotlin.sdk.types.ToolSchema
2527
import kotlinx.coroutines.coroutineScope
28+
import kotlinx.coroutines.delay
2629
import kotlinx.coroutines.isActive
2730
import kotlinx.serialization.json.JsonObject
2831
import kotlinx.serialization.json.JsonObjectBuilder
@@ -32,53 +35,117 @@ import kotlinx.serialization.json.put
3235
import kotlinx.serialization.json.putJsonArray
3336
import kotlinx.serialization.json.putJsonObject
3437
import kotlin.coroutines.cancellation.CancellationException
38+
import kotlin.time.Duration.Companion.milliseconds
3539
import io.modelcontextprotocol.kotlin.sdk.types.Tool as SdkTool
3640

41+
/**
42+
* Supported MCP server transport types for the Ktor-based [startMcpServer].
43+
*
44+
* For stdio transport use the platform-specific `startStdioMcpServer(tools)` entry point.
45+
*/
46+
public enum class McpServerTransportType {
47+
/** Streamable HTTP transport. */
48+
StreamableHttp,
49+
50+
/** SSE transport. */
51+
SSE,
52+
}
53+
54+
/**
55+
* Starts a new MCP server with the given [tools] on the specified [port] and [host].
56+
* Defaults to Streamable HTTP transport.
57+
*
58+
* @param factory The Ktor application engine factory to use (e.g., CIO, Netty).
59+
* @param tools The tools to expose via the MCP server.
60+
* @param port The port to listen on.
61+
* @param host The host to bind to.
62+
* @param transport The transport type to use.
63+
* @return The MCP [Server].
64+
*/
65+
public suspend fun startMcpServer(
66+
factory: ApplicationEngineFactory<*, *>,
67+
tools: ToolRegistry,
68+
port: Int = 3000,
69+
host: String = "localhost",
70+
transport: McpServerTransportType = McpServerTransportType.StreamableHttp,
71+
): Server = doStartMcpServer(factory, port, host, tools) { server ->
72+
installMcpTransport(server, transport)
73+
}.first
74+
75+
/**
76+
* Starts a new MCP server with the given [tools] on an OS-allocated port on the passed [host].
77+
* The actual port can be obtained from the returned list of [EngineConnectorConfig].
78+
* Defaults to Streamable HTTP transport.
79+
*
80+
* @param factory The Ktor application engine factory to use (e.g., CIO, Netty).
81+
* @param tools The tools to expose via the MCP server.
82+
* @param host The host to bind to.
83+
* @param transport The transport type to use.
84+
* @return A pair of the MCP [Server] and the list of connectors (used to discover the allocated port).
85+
*/
86+
public suspend fun startMcpServer(
87+
factory: ApplicationEngineFactory<*, *>,
88+
tools: ToolRegistry,
89+
host: String = "localhost",
90+
transport: McpServerTransportType = McpServerTransportType.StreamableHttp,
91+
): Pair<Server, List<EngineConnectorConfig>> = doStartMcpServer(factory, 0, host, tools) { server ->
92+
installMcpTransport(server, transport)
93+
}
94+
3795
/**
3896
* Starts a new MCP server with the passed [tools] that listens to and writes
39-
* to the specified [port] on the passed [host].
40-
* A port can be obtained from the returned list of [EngineConnectorConfig].
97+
* to the specified [port] on the passed [host] using SSE transport.
4198
*/
99+
@Deprecated(
100+
"SSE transport is deprecated. Use startMcpServer() which defaults to Streamable HTTP.",
101+
ReplaceWith("startMcpServer(factory, tools, port, host)"),
102+
level = DeprecationLevel.WARNING,
103+
)
42104
public suspend fun startSseMcpServer(
43105
factory: ApplicationEngineFactory<*, *>,
44106
port: Int = 3000,
45107
host: String = "localhost",
46108
tools: ToolRegistry,
47-
): Server = doStartSseMcpServer(factory, port, host, tools, true).first
109+
): Server = doStartMcpServer(factory, port, host, tools) { server -> mcp { server } }.first
48110

49111
/**
50112
* Starts a new MCP server with the passed [tools] that listens to and writes
51-
* to the allocated port on the passed [host].
113+
* to the allocated port on the passed [host] using SSE transport.
52114
* A port can be obtained from the returned list of [EngineConnectorConfig].
53115
*/
116+
@Deprecated(
117+
"SSE transport is deprecated. Use startMcpServer() which defaults to Streamable HTTP.",
118+
ReplaceWith("startMcpServer(factory, tools, host)"),
119+
level = DeprecationLevel.WARNING,
120+
)
54121
public suspend fun startSseMcpServer(
55122
factory: ApplicationEngineFactory<*, *>,
56123
host: String = "localhost",
57124
tools: ToolRegistry,
58-
): Pair<Server, List<EngineConnectorConfig>> = doStartSseMcpServer(factory, 0, host, tools, false)
125+
): Pair<Server, List<EngineConnectorConfig>> =
126+
doStartMcpServer(factory, 0, host, tools) { server -> mcp { server } }
127+
128+
private fun Application.installMcpTransport(server: Server, transport: McpServerTransportType) {
129+
when (transport) {
130+
McpServerTransportType.StreamableHttp -> mcpStreamableHttp { server }
131+
McpServerTransportType.SSE -> mcp { server }
132+
}
133+
}
59134

60-
private suspend fun doStartSseMcpServer(
135+
private suspend fun doStartMcpServer(
61136
factory: ApplicationEngineFactory<*, *>,
62137
port: Int,
63138
host: String,
64139
tools: ToolRegistry,
65-
skipConnectors: Boolean,
140+
install: Application.(Server) -> Unit,
66141
): Pair<Server, List<EngineConnectorConfig>> {
67142
val server = configureMcpServer(tools)
68143

69-
val emb = embeddedServer(factory = factory, host = host, port = port) {
70-
mcp { server }
71-
}
144+
val emb = embeddedServer(factory = factory, host = host, port = port) { install(server) }
72145
.also { emb -> server.onClose { emb.stop(1000, 1000) } }
73146
.startSuspend(wait = false)
74147

75-
val connectors = if (skipConnectors) {
76-
emptyList()
77-
} else {
78-
emb.connectors()
79-
}
80-
81-
return server to connectors
148+
return server to emb.connectors()
82149
}
83150

84151
private suspend fun EmbeddedServer<*, *>.connectors(): List<EngineConnectorConfig> = coroutineScope {
@@ -90,6 +157,7 @@ private suspend fun EmbeddedServer<*, *>.connectors(): List<EngineConnectorConfi
90157
if (connectors.isNotEmpty()) {
91158
return@coroutineScope connectors
92159
}
160+
delay(50.milliseconds)
93161
}
94162

95163
return@coroutineScope emptyList()

agents/agents-mcp-server/src/jvmMain/kotlin/ai/koog/agents/mcp/server/McpServer.jvm.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import kotlinx.io.buffered
1313
public suspend fun startStdioMcpServer(tools: ToolRegistry): Server {
1414
return configureMcpServer(tools)
1515
.apply {
16-
connect(
16+
createSession(
1717
StdioServerTransport(
1818
inputStream = System.`in`.asSource().buffered(),
1919
outputStream = System.out.asSink().buffered()

agents/agents-mcp-server/src/jvmTest/kotlin/ai/koog/agents/mcp/server/KoogToolAsMcpToolTest.kt

Lines changed: 83 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import ai.koog.agents.testing.tools.RandomNumberTool
1111
import ai.koog.serialization.kotlinx.KotlinxSerializer
1212
import ai.koog.serialization.kotlinx.toKoogJSONObject
1313
import io.github.oshai.kotlinlogging.KotlinLogging
14+
import io.ktor.client.HttpClient
15+
import io.ktor.client.plugins.sse.SSE
1416
import io.ktor.server.cio.CIO
1517
import io.modelcontextprotocol.kotlin.sdk.types.EmptyJsonObject
1618
import io.modelcontextprotocol.kotlin.sdk.types.TextContent
@@ -47,7 +49,7 @@ class KoogToolAsMcpToolTest {
4749

4850
logger.info { "Result: ${mcpTool.encodeResultToString(result, serializer)}" }
4951

50-
val content = result?.content?.first() as TextContent
52+
val content = result.content.first() as TextContent
5153
assertEquals("${origin.last}", content.text)
5254
}
5355

@@ -64,7 +66,7 @@ class KoogToolAsMcpToolTest {
6466

6567
logger.info { "Result: ${mcpTool.encodeResultToString(result, serializer)}" }
6668

67-
val content = result?.content?.first() as TextContent
69+
val content = result.content.first() as TextContent
6870
assertEquals("${origin.last}", content.text)
6971
}
7072

@@ -80,7 +82,7 @@ class KoogToolAsMcpToolTest {
8082
}
8183
}
8284

83-
assertTrue(errorResult?.isError ?: false)
85+
assertTrue(errorResult.isError ?: false)
8486
}
8587

8688
// check that the server is still working
@@ -95,7 +97,7 @@ class KoogToolAsMcpToolTest {
9597

9698
logger.info { "Result: ${mcpTool.encodeResultToString(result, serializer)}" }
9799

98-
val content = result?.content?.first() as TextContent
100+
val content = result.content.first() as TextContent
99101
assertEquals("${origin.last}", content.text)
100102
}
101103
}
@@ -117,7 +119,7 @@ class KoogToolAsMcpToolTest {
117119
}
118120
}
119121

120-
assertTrue(errorResult?.isError ?: false)
122+
assertTrue(errorResult.isError ?: false)
121123

122124
val last = origin.last
123125
assertNotNull(last)
@@ -144,6 +146,23 @@ class KoogToolAsMcpToolTest {
144146
}
145147
}
146148

149+
@Test
150+
fun testKoogToolAsMcpToolViaStreamableHttp() = testMcpToolStreamableHttp(RandomNumberTool()) { mcpTool, origin ->
151+
val args = buildJsonObject { put("seed", "42") }
152+
153+
val result = withContext(Dispatchers.Default.limitedParallelism(1)) {
154+
withTimeout(20.seconds) {
155+
mcpTool.execute(args.toKoogJSONObject())
156+
}
157+
}
158+
159+
logger.info { "Result (Streamable HTTP): ${mcpTool.encodeResultToString(result, serializer)}" }
160+
161+
val content = result.content.first() as TextContent
162+
assertEquals("${origin.last}", content.text)
163+
}
164+
165+
@Suppress("DEPRECATION")
147166
private fun <T : Tool<*, *>> testMcpTool(
148167
tool: T,
149168
block: suspend (McpTool, T) -> Unit,
@@ -196,4 +215,63 @@ class KoogToolAsMcpToolTest {
196215
}
197216
}
198217
}
218+
219+
private fun <T : Tool<*, *>> testMcpToolStreamableHttp(
220+
tool: T,
221+
block: suspend (McpTool, T) -> Unit,
222+
) = runTest(timeout = 30.seconds) {
223+
assertIsNot<McpTool>(tool)
224+
225+
val (server, connectors) = startMcpServer(
226+
factory = CIO,
227+
tools = ToolRegistry {
228+
tool(tool)
229+
},
230+
)
231+
232+
val port = connectors.firstOrNull()?.port ?: 0
233+
assertNotEquals(0, port, "Port should not be 0")
234+
235+
val httpClient = HttpClient { install(SSE) }
236+
237+
try {
238+
val toolRegistry = withContext(Dispatchers.Default.limitedParallelism(1)) {
239+
withTimeout(20.seconds) {
240+
McpToolRegistryProvider.streamableHttp {
241+
url = "http://localhost:$port/mcp"
242+
this.httpClient = httpClient
243+
}
244+
}
245+
}
246+
247+
assertEquals(
248+
listOf(tool.descriptor),
249+
toolRegistry.tools.map { it.descriptor },
250+
)
251+
252+
val mcpTool = toolRegistry.getTool(tool.name) as McpTool
253+
block(mcpTool, tool)
254+
} finally {
255+
server.close()
256+
httpClient.close()
257+
258+
withContext(Dispatchers.Default.limitedParallelism(1)) {
259+
var result = Result.success(Unit)
260+
261+
for (attempt in 1..3) {
262+
result = runCatching {
263+
assertTrue(isPortAvailable(port), "Port $port should be available")
264+
}
265+
266+
if (result.isSuccess) {
267+
break
268+
} else {
269+
delay(1.seconds)
270+
}
271+
}
272+
273+
result.getOrThrow()
274+
}
275+
}
276+
}
199277
}

agents/agents-mcp/src/commonMain/kotlin/ai/koog/agents/mcp/McpTool.kt

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import ai.koog.serialization.kotlinx.toKotlinxJsonObject
1212
import ai.koog.serialization.typeToken
1313
import io.modelcontextprotocol.kotlin.sdk.client.Client
1414
import io.modelcontextprotocol.kotlin.sdk.types.CallToolResult
15+
import io.modelcontextprotocol.kotlin.sdk.types.TextContent
1516
import kotlinx.serialization.builtins.nullable
1617
import kotlinx.serialization.json.Json
1718
import kotlinx.serialization.json.JsonElement
@@ -72,8 +73,23 @@ public class McpTool(
7273

7374
/**
7475
* Postprocess result string representation for LLMs a bit, removing unnecessary meta fields.
76+
* When the result indicates an error (isError == true), returns a clearly prefixed error string
77+
* so the LLM can recognize the failure and adjust its strategy.
78+
*
79+
* If the error result has no [TextContent] (or only blank text), falls back to encoding the full
80+
* [CallToolResult] as JSON so non-text content (e.g. images, embedded resources) is not silently
81+
* dropped.
7582
*/
7683
override fun encodeResultToString(result: CallToolResult?, serializer: JSONSerializer): String {
84+
if (result?.isError == true) {
85+
val errorText = result.content.filterIsInstance<TextContent>().joinToString("\n") { it.text }
86+
if (errorText.isNotBlank()) {
87+
return "Error: $errorText"
88+
}
89+
val fallbackJson = json.encodeToJsonElement(resultSerializer, result).toKoogJSONElement()
90+
return "Error: ${serializer.encodeJSONElementToString(fallbackJson)}"
91+
}
92+
7793
val preparedResultJson: JsonElement = result
7894
?.let {
7995
JsonObject(

0 commit comments

Comments
 (0)