Skip to content

Commit 711b770

Browse files
committed
feat: init trigger pub/sub route set up for event processing
1 parent 9a6efea commit 711b770

8 files changed

Lines changed: 624 additions & 3 deletions

File tree

cmd/launcher/launcher.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package launcher
1717

1818
import (
1919
"context"
20+
"time"
2021

2122
"github.com/a2aproject/a2a-go/a2asrv"
2223

@@ -54,6 +55,18 @@ type SubLauncher interface {
5455
Run(ctx context.Context, config *Config) error
5556
}
5657

58+
// TriggerConfig contains configuration options for triggers.
59+
type TriggerConfig struct {
60+
// MaxRetries is the maximum number of times to retry a failed agent execution.
61+
MaxRetries int
62+
// BaseDelay is the base delay between retries.
63+
BaseDelay time.Duration
64+
// MaxDelay is the maximum delay between retries.
65+
MaxDelay time.Duration
66+
// MaxConcurrentRuns is the maximum number of concurrent runs.
67+
MaxConcurrentRuns int
68+
}
69+
5770
// Config contains parameters for web & console execution: sessions, artifacts, agents etc
5871
type Config struct {
5972
SessionService session.Service
@@ -63,4 +76,6 @@ type Config struct {
6376
A2AOptions []a2asrv.RequestHandlerOption
6477
PluginConfig runner.PluginConfig
6578
TelemetryOptions []telemetry.Option
79+
TriggerSources []string
80+
TriggerConfig TriggerConfig
6681
}

cmd/launcher/web/api/api.go

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"flag"
2020
"fmt"
2121
"net/http"
22+
"slices"
2223
"strings"
2324
"time"
2425

@@ -31,11 +32,19 @@ import (
3132
"google.golang.org/adk/telemetry"
3233
)
3334

35+
// SupportedTriggers defines the allowed trigger sources for the ADK REST API.
36+
var SupportedTriggers = []string{"pubsub"}
37+
3438
// apiConfig contains parametres for lauching ADK REST API
3539
type apiConfig struct {
36-
frontendAddress string
37-
pathPrefix string
38-
sseWriteTimeout time.Duration
40+
frontendAddress string
41+
pathPrefix string
42+
sseWriteTimeout time.Duration
43+
triggerSources string
44+
triggerMaxRetries int
45+
triggerBaseDelay time.Duration
46+
triggerMaxDelay time.Duration
47+
triggerMaxRuns int
3948
}
4049

4150
// apiLauncher can launch ADK REST API
@@ -73,6 +82,25 @@ func (a *apiLauncher) UserMessage(webURL string, printer func(v ...any)) {
7382

7483
// SetupSubrouters adds the API router to the parent router.
7584
func (a *apiLauncher) SetupSubrouters(router *mux.Router, config *launcher.Config) error {
85+
if a.config.triggerSources != "" {
86+
sources := strings.Split(a.config.triggerSources, ",")
87+
for _, source := range sources {
88+
if !slices.Contains(SupportedTriggers, source) {
89+
return fmt.Errorf("invalid trigger source: %q. Any subset of %s is allowed. Values should be comma-separated", source, strings.Join(SupportedTriggers, ", "))
90+
}
91+
}
92+
// De-duplicate the input sources.
93+
slices.Sort(sources)
94+
config.TriggerSources = slices.Compact(sources)
95+
}
96+
97+
config.TriggerConfig = launcher.TriggerConfig{
98+
MaxRetries: a.config.triggerMaxRetries,
99+
BaseDelay: a.config.triggerBaseDelay,
100+
MaxDelay: a.config.triggerMaxDelay,
101+
MaxConcurrentRuns: a.config.triggerMaxRuns,
102+
}
103+
76104
// Create the ADK REST API handler
77105
restServer, err := adkrest.NewServer(adkrest.ServerConfig{
78106
SessionService: config.SessionService,
@@ -81,6 +109,7 @@ func (a *apiLauncher) SetupSubrouters(router *mux.Router, config *launcher.Confi
81109
ArtifactService: config.ArtifactService,
82110
SSEWriteTimeout: a.config.sseWriteTimeout,
83111
PluginConfig: config.PluginConfig,
112+
TriggerSources: config.TriggerSources,
84113
})
85114
if err != nil {
86115
return fmt.Errorf("failed to create REST server: %w", err)
@@ -115,6 +144,19 @@ func (a *apiLauncher) Parse(args []string) ([]string, error) {
115144
if err != nil || !a.flags.Parsed() {
116145
return nil, fmt.Errorf("failed to parse api flags: %v", err)
117146
}
147+
if a.config.triggerMaxRetries < 0 {
148+
return nil, fmt.Errorf("trigger_max_retries must be >= 0")
149+
}
150+
if a.config.triggerBaseDelay < 0 {
151+
return nil, fmt.Errorf("trigger_base_delay must be >= 0")
152+
}
153+
if a.config.triggerMaxDelay < 0 {
154+
return nil, fmt.Errorf("trigger_max_delay must be >= 0")
155+
}
156+
if a.config.triggerMaxRuns < 0 {
157+
return nil, fmt.Errorf("trigger_max_concurrent_runs must be >= 0")
158+
}
159+
118160
p := a.config.pathPrefix
119161
if !strings.HasPrefix(p, "/") {
120162
p = "/" + p
@@ -138,6 +180,11 @@ func NewLauncher() weblauncher.Sublauncher {
138180
fs.StringVar(&config.frontendAddress, "webui_address", "localhost:8080", "ADK WebUI address as seen from the user browser. It's used to allow CORS requests. Please specify only hostname and (optionally) port.")
139181
fs.StringVar(&config.pathPrefix, "path_prefix", "/api", "ADK REST API path prefix. Default is '/api'.")
140182
fs.DurationVar(&config.sseWriteTimeout, "sse-write-timeout", 120*time.Second, "SSE server write timeout (i.e. '10s', '2m' - see time.ParseDuration for details) - for writing the SSE response after reading the headers & body")
183+
fs.IntVar(&config.triggerMaxRetries, "trigger_max_retries", 3, "Maximum retries for HTTP 429 errors from triggers")
184+
fs.DurationVar(&config.triggerBaseDelay, "trigger_base_delay", 1*time.Second, "Base delay for trigger retry exponential backoff")
185+
fs.DurationVar(&config.triggerMaxDelay, "trigger_max_delay", 10*time.Second, "Maximum delay for trigger retry exponential backoff")
186+
fs.IntVar(&config.triggerMaxRuns, "trigger_max_concurrent_runs", 100, "Maximum concurrent trigger runs")
187+
fs.StringVar(&config.triggerSources, "trigger_sources", "", fmt.Sprintf("Comma-separated list of trigger sources to enable (any subset of %s)", strings.Join(SupportedTriggers, ", ")))
141188

142189
return &apiLauncher{
143190
config: config,

cmd/launcher/web/api/api_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package api
16+
17+
import (
18+
"testing"
19+
20+
"github.com/google/go-cmp/cmp"
21+
"github.com/google/go-cmp/cmp/cmpopts"
22+
"github.com/gorilla/mux"
23+
24+
"google.golang.org/adk/cmd/launcher"
25+
)
26+
27+
func TestSetupSubrouters_TriggerSourcesValidation(t *testing.T) {
28+
tests := []struct {
29+
name string
30+
triggerSources string
31+
wantErr bool
32+
wantSources []string
33+
}{
34+
{
35+
name: "empty trigger sources",
36+
triggerSources: "",
37+
wantErr: false,
38+
wantSources: nil,
39+
},
40+
{
41+
name: "valid trigger sources single",
42+
triggerSources: "pubsub",
43+
wantErr: false,
44+
wantSources: []string{"pubsub"},
45+
},
46+
{
47+
name: "deduplicatedd trigger sources",
48+
triggerSources: "pubsub,pubsub,pubsub",
49+
wantErr: false,
50+
wantSources: []string{"pubsub"},
51+
},
52+
{
53+
name: "invalid trigger source",
54+
triggerSources: "invalid",
55+
wantErr: true,
56+
wantSources: nil,
57+
},
58+
{
59+
name: "mixed valid and invalid",
60+
triggerSources: "pubsub,invalid,bq",
61+
wantErr: true,
62+
wantSources: nil,
63+
},
64+
}
65+
66+
for _, tc := range tests {
67+
t.Run(tc.name, func(t *testing.T) {
68+
a := &apiLauncher{
69+
config: &apiConfig{
70+
triggerSources: tc.triggerSources,
71+
},
72+
}
73+
router := mux.NewRouter()
74+
config := &launcher.Config{}
75+
76+
err := a.SetupSubrouters(router, config)
77+
if tc.wantErr {
78+
if err == nil {
79+
t.Errorf("SetupSubrouters() error = nil, wantErr %v", tc.wantErr)
80+
}
81+
} else {
82+
if err != nil {
83+
t.Errorf("SetupSubrouters() error = %v, wantErr %v", err, tc.wantErr)
84+
}
85+
diff := cmp.Diff(tc.wantSources, config.TriggerSources, cmpopts.SortSlices(func(a, b string) bool {
86+
return a < b
87+
}))
88+
if diff != "" {
89+
t.Errorf("SetupSubrouters() config.TriggerSources mismatch (-want +got):\n%s", diff)
90+
}
91+
}
92+
})
93+
}
94+
}

0 commit comments

Comments
 (0)