fix: rework reentrant concurrency model for Java interop#1945
fix: rework reentrant concurrency model for Java interop#1945EugeneTheDev merged 2 commits intodevelopfrom
Conversation
5cbd3b4 to
4e9d616
Compare
sdubov
left a comment
There was a problem hiding this comment.
@EugeneTheDev , nice work! Thank you for the changes. I've left several general related comments.
| * Sets serializer for underlying tool calls and LLM requests | ||
| * | ||
| * @param serializer The JSON serializer to configure the AI agent with. | ||
| * @return The updated instance of [Companion.AIAgentConfigBuilder] |
There was a problem hiding this comment.
Why did you delete the return part of KDoc?
There was a problem hiding this comment.
They seemed too verbose, without any actual value. You can see the return type yourself in your IDE, so that part only bloats the KDoc, IMHO, because it doesn't provide any additional valuable info on the returned value
| @JavaAPI | ||
| public fun executionInfo(): AgentExecutionInfo = executionInfo | ||
|
|
||
| @OptIn(InternalKoogUtils::class) |
There was a problem hiding this comment.
Would not it be better to annotate the exact internal call instead of a whole function?
There was a problem hiding this comment.
I would say that's gonna be too granular. Prefering method-level opt-ins to class level opt-ins is fine, I agree here. But going ever more granular might not be the best idea from the readability side of things. I don't really have any strong opinion here, but putting opt-in on the method level is already an established pattern in the whole project, so I'm just following it
| executorService: ExecutorService? = null | ||
| ): Message.Response = config.runOnLLMDispatcher(executorService) { | ||
| ): Message.Response = runBlockingReentrant( | ||
| executorService?.asCoroutineDispatcher() ?: config.strategyDispatcher |
There was a problem hiding this comment.
strategy or llm request dispatcher?
There was a problem hiding this comment.
Good catch, will fix
| public final fun javaNonSuspendRun( | ||
| agentInput: Input, | ||
| sessionId: String? = null, | ||
| executorService: ExecutorService? = null |
There was a problem hiding this comment.
The context from the executorService is not used here. Is it by design?
There was a problem hiding this comment.
Good catch, will fix
| @property:PublishedApi | ||
| internal var strategyExecutorService: ExecutorService? = null | ||
| @InternalAgentsApi | ||
| public var strategyDispatcher: CoroutineDispatcher = Dispatchers.Default |
There was a problem hiding this comment.
Why do these dispatchers lives in the AIAgentConfig? I would assume that they should be located closer to the executor, instead of configurator. What do you think?
There was a problem hiding this comment.
That's the only central configuration object we currently have. So it's the only option to put global configurations that should affect the whole agent
| */ | ||
| @InternalKoogUtils | ||
| @JvmOverloads | ||
| public fun <T> runBlockingReentrant( |
There was a problem hiding this comment.
Previously in the CoroutineUtils we had thow helper methods: runOnLLMDispatcher and runOnStrategyDispatcher`. It looks like a handy shortcut to delegate execution on a correct thread instead of writing:
runBlockingReentrant(dispatcher ?: config.strategyDispatcher) { ... }
Maybe it worth thinking about returning them?
TBD: @EugeneTheDev , also I have some concern that we currently have a contract for using suspend Kotlin calls from Java API that seems easy to break. Maybe we should expose two API for such cases and hide the rest from a public API, limiting the probability of mistakes. What do you think?
There was a problem hiding this comment.
Maybe it worth thinking about returning them?
Initially, I thought that this inline approach would be more readable, but I guess you're right, and these shorthands did simplify things a bit. I'll return them back
we currently have a contract for using suspend Kotlin calls from Java API that seems easy to break
Can you clarify, please, wdym? I don't quite get it
| @@ -1,5 +1,5 @@ | |||
| @file:Suppress("EXPECT_ACTUAL_CLASSIFIERS_ARE_IN_BETA_WARNING") | |||
| @file:OptIn(InternalAgentsApi::class) | |||
| @file:OptIn(InternalAgentsApi::class, InternalKoogUtils::class) | |||
There was a problem hiding this comment.
Not related to a current change (feel free to skip this comment), but I often see OptIn on file level in our code base. @EugeneTheDev, do you consider this a good practice? I would add annotation for a particular invocation statement instead.
There was a problem hiding this comment.
Oh, that's a good comment. I actually don't like file-level opt-ins or suppresions unless there's a really good reason (like you need to opt in on the same API that is used by several top-level functions in this file). But, reiterating on my comment above, I think that statement-level opt-in might be too granular. IMHO method/function level is a sweet spot
65431ad to
a3eecea
Compare
947dc60 to
82f798c
Compare
Problem
For more stable Java interop, we need to properly handle the reentrant scenario
Suspendable Koog code→Blocking user Java code→Suspendable Koog code. Naively wrapping Java-interacting components inrunBlocking(context)orwithContext(context)introduces a risk of deadlock or thread starvation.Consider the following example:
suspendableKotlinCodeOnedispatches a block ontosingleThreadDispatcher, which then calls intoblockingJavaCodeTwo. The Java method calls back into Kotlin viasuspendableKotlinCodeThree, which again tries to dispatch onto the same single-threaded executor.Even though execution stays on the same thread throughout, the coroutine context tracking this is lost when crossing the Kotlin → Java → Kotlin boundary. As a result,
suspendableKotlinCodeThreecannot tell that it is already running on the correct context and attempts to redispatch — causing a deadlock, since the only thread is already occupied. The redispatch was never actually necessary; the inner block could have executed directly in place.Current state
The solution is to store the coroutine context in a thread local, then check it on dispatch to determine whether the requested context is already active. Several previous attempts have tried to implement this and fix related bugs, the most recent being #1716. However, the current implementation still has major problems:
runBlockingIfRequiredignores itscontextargument, meaning it never dispatches to the requested dispatcher.runBlockingIfRequired, rather than the thread associated with the suppliedcontext.Solution
I've narrowed the scope down to two functions with (hopefully) clearer names:
runBlockingReentrantandwithContextReentrant. They implement the approach described above correctly, avoiding unnecessary dispatch. A few tests have been added to verify the behavior. All existing utility functions were deleted and all implementations have been migrated.Important note
The case where a user manually submits a task to the executor outside of Koog — for example, running the agent itself on the same single-threaded executor — is deliberately out of scope. In such cases, it is technically impossible to reliably determine whether we are already on the same executor/thread. This should be treated as user error rather than something Koog tries to handle.
DEPRECATED:
AIAgentConfigon the JVM that acceptExecutorServiceparameters. Constructors and methods that accept more genericExecutorshould be used instead.closes KG-750