Skip to content

Commit 81e7884

Browse files
authored
fix: rework reentrant concurrency model for Java interop (#1945)
## Problem For more stable Java interop, we need to properly handle the reentrant scenario `Suspendable Koog code` → `Blocking user Java code` → `Suspendable Koog code`. Naively wrapping Java-interacting components in `runBlocking(context)` or `withContext(context)` introduces a risk of deadlock or thread starvation. Consider the following example: ```kotlin val singleThreadDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() fun suspendableKotlinCodeOne() { runBlocking(singleThreadDispatcher) { blockingJavaCodeTwo() } } fun suspendableKotlinCodeThree() { runBlocking(singleThreadDispatcher) { // do something } } ``` ```java public void blockingJavaCodeTwo() { suspendableKotlinCodeThree(); } ``` `suspendableKotlinCodeOne` dispatches a block onto `singleThreadDispatcher`, which then calls into `blockingJavaCodeTwo`. The Java method calls back into Kotlin via `suspendableKotlinCodeThree`, which again tries to dispatch onto the same single-threaded executor. Even though execution stays on the same thread throughout, the coroutine context tracking this is lost when crossing the Kotlin → Java → Kotlin boundary. As a result, `suspendableKotlinCodeThree` cannot tell that it is already running on the correct context and attempts to redispatch — causing a deadlock, since the only thread is already occupied. The redispatch was never actually necessary; the inner block could have executed directly in place. ## Current state The solution is to store the coroutine context in a thread local, then check it on dispatch to determine whether the requested context is already active. Several previous attempts have tried to implement this and fix related bugs, the most recent being #1716. However, the current implementation still has major problems: * After the last fix, `runBlockingIfRequired` ignores its `context` argument, meaning it never dispatches to the requested dispatcher. * Thread-local-based dispatcher tracking stores the context on the wrong thread — the current thread running `runBlockingIfRequired`, rather than the thread associated with the supplied `context`. * There is a bunch of similar-looking helper functions implemented separately, leading to duplication. The naming is also confusing. ## Solution I've narrowed the scope down to two functions with (hopefully) clearer names: `runBlockingReentrant` and `withContextReentrant`. They implement the approach described above correctly, avoiding unnecessary dispatch. A few tests have been added to verify the behavior. All existing utility functions were deleted and all implementations have been migrated. ## Important note The case where a user manually submits a task to the executor outside of Koog — for example, running the agent itself on the same single-threaded executor — is deliberately out of scope. In such cases, it is technically impossible to reliably determine whether we are already on the same executor/thread. This should be treated as user error rather than something Koog tries to handle. DEPRECATED: * Constructors and methods in `AIAgentConfig` on the JVM that accept `ExecutorService` parameters. Constructors and methods that accept more generic `Executor` should be used instead. closes [KG-750](https://youtrack.jetbrains.com/issue/KG-750)
1 parent cc3ebc5 commit 81e7884

39 files changed

Lines changed: 649 additions & 706 deletions

File tree

agents/agents-core/src/jvmCommonMain/kotlin/ai/koog/agents/core/agent/AIAgent.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ import ai.koog.agents.core.agent.entity.AIAgentGraphStrategy
1010
import ai.koog.agents.core.agent.session.AIAgentRunSession
1111
import ai.koog.agents.core.annotation.InternalAgentsApi
1212
import ai.koog.agents.core.tools.ToolRegistry
13-
import ai.koog.agents.core.utils.runOnStrategyDispatcher
13+
import ai.koog.agents.core.utils.runBlockingOnLLMDispatcher
1414
import ai.koog.agents.planner.AIAgentPlannerStrategy
1515
import ai.koog.agents.planner.PlannerAIAgent
1616
import ai.koog.prompt.executor.model.PromptExecutor
1717
import ai.koog.prompt.llm.LLModel
1818
import ai.koog.prompt.processor.ResponseProcessor
19+
import ai.koog.utils.annotations.InternalKoogUtils
1920
import ai.koog.utils.io.Closeable
2021
import ai.koog.utils.time.KoogClock
2122
import java.util.concurrent.ExecutorService
@@ -37,14 +38,17 @@ public actual abstract class AIAgent<Input, Output> : Closeable {
3738
* If not provided, the default coroutine context is used.
3839
* @return The output resulting from the execution of the AI agent with the given input.
3940
*/
41+
@OptIn(InternalKoogUtils::class)
4042
@JavaAPI
4143
@JvmOverloads
4244
@JvmName("run")
4345
public final fun javaNonSuspendRun(
4446
agentInput: Input,
4547
sessionId: String? = null,
4648
executorService: ExecutorService? = null
47-
): Output = agentConfig.runOnStrategyDispatcher(executorService) { run(agentInput, sessionId) }
49+
): Output = agentConfig.runBlockingOnLLMDispatcher(executorService) {
50+
run(agentInput, sessionId)
51+
}
4852

4953
// Common (multiplatform) methods:
5054
public actual abstract suspend fun run(agentInput: Input, sessionId: String?): Output

agents/agents-core/src/jvmCommonMain/kotlin/ai/koog/agents/core/agent/AIAgentService.kt

Lines changed: 6 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
@file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING", "MissingKDocForPublicAPI")
2-
@file:OptIn(InternalAgentsApi::class)
32

43
package ai.koog.agents.core.agent
54

@@ -8,13 +7,14 @@ import ai.koog.agents.core.agent.config.AIAgentConfig
87
import ai.koog.agents.core.agent.entity.AIAgentGraphStrategy
98
import ai.koog.agents.core.annotation.InternalAgentsApi
109
import ai.koog.agents.core.tools.ToolRegistry
11-
import ai.koog.agents.core.utils.runOnStrategyDispatcher
10+
import ai.koog.agents.core.utils.runBlockingOnStrategyDispatcher
1211
import ai.koog.prompt.executor.model.PromptExecutor
1312
import ai.koog.prompt.llm.LLModel
1413
import ai.koog.prompt.processor.ResponseProcessor
1514
import ai.koog.utils.time.KoogClock
1615
import java.util.concurrent.ExecutorService
1716

17+
@OptIn(InternalAgentsApi::class)
1818
public actual abstract class AIAgentService<Input, Output, TAgent : AIAgent<Input, Output>> {
1919
public actual abstract val promptExecutor: PromptExecutor
2020
public actual abstract val agentConfig: AIAgentConfig
@@ -56,7 +56,7 @@ public actual abstract class AIAgentService<Input, Output, TAgent : AIAgent<Inpu
5656
agentConfig: AIAgentConfig = this.agentConfig,
5757
executorService: ExecutorService? = null,
5858
clock: KoogClock = KoogClock.System
59-
): TAgent = agentConfig.runOnStrategyDispatcher(executorService) {
59+
): TAgent = agentConfig.runBlockingOnStrategyDispatcher(executorService) {
6060
createAgent(id, additionalToolRegistry, agentConfig, clock)
6161
}
6262

@@ -98,7 +98,7 @@ public actual abstract class AIAgentService<Input, Output, TAgent : AIAgent<Inpu
9898
public fun removeAgent(
9999
agent: TAgent,
100100
executorService: ExecutorService? = null
101-
): Boolean = agentConfig.runOnStrategyDispatcher(executorService) {
101+
): Boolean = agentConfig.runBlockingOnStrategyDispatcher(executorService) {
102102
removeAgent(agent)
103103
}
104104

@@ -114,7 +114,7 @@ public actual abstract class AIAgentService<Input, Output, TAgent : AIAgent<Inpu
114114
public fun removeAgentWithId(
115115
id: String,
116116
executorService: ExecutorService? = null
117-
): Boolean = agentConfig.runOnStrategyDispatcher(executorService) {
117+
): Boolean = agentConfig.runBlockingOnStrategyDispatcher(executorService) {
118118
removeAgentWithId(id)
119119
}
120120

@@ -133,59 +133,10 @@ public actual abstract class AIAgentService<Input, Output, TAgent : AIAgent<Inpu
133133
public fun agentById(
134134
id: String,
135135
executorService: ExecutorService? = null
136-
): TAgent? = agentConfig.runOnStrategyDispatcher(executorService) {
136+
): TAgent? = agentConfig.runBlockingOnStrategyDispatcher(executorService) {
137137
agentById(id)
138138
}
139139

140-
/**
141-
* Lists all currently active agents using the configured strategy dispatcher.
142-
*
143-
* If an optional `executorService` is provided, it will be used as the execution context
144-
* for running the operation. Otherwise, a default executor service or dispatcher is utilized.
145-
*
146-
* @param executorService The optional `ExecutorService` to use for task execution. Defaults to `null`.
147-
* @return A list of currently active agents of type `TAgent`.
148-
*/
149-
@JavaAPI
150-
@JvmOverloads
151-
public fun listActiveAgents(
152-
executorService: ExecutorService? = null
153-
): List<TAgent> = agentConfig.runOnStrategyDispatcher(executorService) {
154-
listActiveAgents()
155-
}
156-
157-
/**
158-
* Retrieves a list of all inactive agents.
159-
*
160-
* This method utilizes the provided `ExecutorService` for dispatching, if specified.
161-
* If no `ExecutorService` is provided, a default strategy-specific executor will be used.
162-
*
163-
* @param executorService An optional `ExecutorService` to execute the operation.
164-
* If `null`, the default executor service of the agent's configuration is used.
165-
* @return A list of `TAgent` objects representing the inactive agents.
166-
*/
167-
@JavaAPI
168-
@JvmOverloads
169-
public fun listInactiveAgents(
170-
executorService: ExecutorService? = null
171-
): List<TAgent> = agentConfig.runOnStrategyDispatcher(executorService) {
172-
listInactiveAgents()
173-
}
174-
175-
/**
176-
* Retrieves a list of finished agents.
177-
*
178-
* @param executorService The optional executor service to be used for the operation. If no executor service is provided, a default one will be used.
179-
* @return A list of finished agents.
180-
*/
181-
@JavaAPI
182-
@JvmOverloads
183-
public fun listFinishedAgents(
184-
executorService: ExecutorService? = null
185-
): List<TAgent> = agentConfig.runOnStrategyDispatcher(executorService) {
186-
listFinishedAgents()
187-
}
188-
189140
public actual companion object {
190141
@JvmStatic
191142
public actual fun builder(): AIAgentServiceBuilder = AIAgentServiceBuilder()

0 commit comments

Comments
 (0)