Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/launcher/full/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import (
"google.golang.org/adk/cmd/launcher/web"
"google.golang.org/adk/cmd/launcher/web/a2a"
"google.golang.org/adk/cmd/launcher/web/api"
"google.golang.org/adk/cmd/launcher/web/triggers/pubsub"
"google.golang.org/adk/cmd/launcher/web/webui"
)

// NewLauncher returnes the most versatile universal launcher with all options built-in.
func NewLauncher() launcher.Launcher {
return universal.NewLauncher(console.NewLauncher(), web.NewLauncher(webui.NewLauncher(), a2a.NewLauncher(), api.NewLauncher()))
return universal.NewLauncher(console.NewLauncher(), web.NewLauncher(webui.NewLauncher(), a2a.NewLauncher(), api.NewLauncher(), pubsub.NewLauncher()))
}
136 changes: 136 additions & 0 deletions cmd/launcher/web/triggers/pubsub/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package pubsub provides a sublauncher that adds PubSub trigger capabilities to ADK web server.
package pubsub

import (
"flag"
"fmt"
"net/http"
"strings"
"time"

"github.com/gorilla/mux"

"google.golang.org/adk/cmd/launcher"
"google.golang.org/adk/cmd/launcher/web"
"google.golang.org/adk/internal/cli/util"
"google.golang.org/adk/server/adkrest/controllers/triggers"
)

type pubsubConfig struct {
pathPrefix string
triggerMaxRetries int
triggerBaseDelay time.Duration
triggerMaxDelay time.Duration
triggerMaxRuns int
}

type pubsubLauncher struct {
flags *flag.FlagSet
config *pubsubConfig
}

// NewLauncher creates a new pubsub launcher. It extends Web launcher.
func NewLauncher() web.Sublauncher {
config := &pubsubConfig{}

fs := flag.NewFlagSet("pubsub", flag.ContinueOnError)
fs.StringVar(&config.pathPrefix, "path_prefix", "/api", "Path prefix for the PubSub trigger endpoint. Default is '/api'.")
fs.IntVar(&config.triggerMaxRetries, "trigger_max_retries", 3, "Maximum retries for HTTP 429 errors from triggers")
fs.DurationVar(&config.triggerBaseDelay, "trigger_base_delay", 1*time.Second, "Base delay for trigger retry exponential backoff")
fs.DurationVar(&config.triggerMaxDelay, "trigger_max_delay", 10*time.Second, "Maximum delay for trigger retry exponential backoff")
fs.IntVar(&config.triggerMaxRuns, "trigger_max_concurrent_runs", 100, "Maximum concurrent trigger runs")

return &pubsubLauncher{
config: config,
flags: fs,
}
}

// Keyword implements web.Sublauncher. Returns the command-line keyword for pubsub launcher.
func (p *pubsubLauncher) Keyword() string {
return "pubsub"
}

// Parse parses the command-line arguments for the pubsub launcher.
func (p *pubsubLauncher) Parse(args []string) ([]string, error) {
err := p.flags.Parse(args)
if err != nil || !p.flags.Parsed() {
return nil, fmt.Errorf("failed to parse pubsub flags: %v", err)
}
if p.config.triggerMaxRetries < 0 {
return nil, fmt.Errorf("trigger_max_retries must be >= 0")
}
if p.config.triggerBaseDelay < 0 {
return nil, fmt.Errorf("trigger_base_delay must be >= 0")
}
if p.config.triggerMaxDelay < 0 {
return nil, fmt.Errorf("trigger_max_delay must be >= 0")
}
if p.config.triggerMaxRuns < 0 {
return nil, fmt.Errorf("trigger_max_concurrent_runs must be >= 0")
}

prefix := p.config.pathPrefix
if !strings.HasPrefix(prefix, "/") {
prefix = "/" + prefix
}
p.config.pathPrefix = strings.TrimSuffix(prefix, "/")

return p.flags.Args(), nil
}

// CommandLineSyntax returns the command-line syntax for the pubsub launcher.
func (p *pubsubLauncher) CommandLineSyntax() string {
return util.FormatFlagUsage(p.flags)
}

// SimpleDescription implements web.Sublauncher.
func (p *pubsubLauncher) SimpleDescription() string {
return "starts ADK PubSub trigger endpoint server"
}

// SetupSubrouters adds the PubSub trigger endpoint to the parent router.
func (p *pubsubLauncher) SetupSubrouters(router *mux.Router, config *launcher.Config) error {
triggerConfig := triggers.TriggerConfig{
MaxRetries: p.config.triggerMaxRetries,
BaseDelay: p.config.triggerBaseDelay,
MaxDelay: p.config.triggerMaxDelay,
MaxConcurrentRuns: p.config.triggerMaxRuns,
}

controller := triggers.NewPubSubController(
config.SessionService,
config.AgentLoader,
config.MemoryService,
config.ArtifactService,
config.PluginConfig,
triggerConfig,
)

subrouter := router
if p.config.pathPrefix != "" && p.config.pathPrefix != "/" {
subrouter = router.PathPrefix(p.config.pathPrefix).Subrouter()
}

subrouter.HandleFunc("/apps/{app_name}/trigger/pubsub", controller.PubSubTriggerHandler).Methods(http.MethodPost)
return nil
}

// UserMessage implements web.Sublauncher.
func (p *pubsubLauncher) UserMessage(webURL string, printer func(v ...any)) {
printer(fmt.Sprintf(" pubsub: PubSub trigger endpoint is available at %s%s/apps/{app_name}/trigger/pubsub", webURL, p.config.pathPrefix))
}
97 changes: 97 additions & 0 deletions cmd/launcher/web/triggers/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pubsub

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/gorilla/mux"

"google.golang.org/adk/cmd/launcher"
)

func TestParse(t *testing.T) {
tests := []struct {
name string
args []string
wantPrefix string
wantRetry int
wantErr bool
}{
{
name: "default values",
args: []string{},
wantPrefix: "/api",
wantRetry: 3,
wantErr: false,
},
{
name: "custom prefix and retries",
args: []string{"-path_prefix=/custom", "-trigger_max_retries=5"},
wantPrefix: "/custom",
wantRetry: 5,
wantErr: false,
},
{
name: "invalid retry count",
args: []string{"-trigger_max_retries=-1"},
wantPrefix: "/api",
wantRetry: 3,
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := NewLauncher().(*pubsubLauncher)
_, err := l.Parse(tt.args)
if (err != nil) != tt.wantErr {
t.Errorf("Parse() error = %v, wantErr %v", err, tt.wantErr)
return
}
if tt.wantErr {
return
}
if l.config.pathPrefix != tt.wantPrefix {
t.Errorf("Parse() pathPrefix = %v, want %v", l.config.pathPrefix, tt.wantPrefix)
}
if l.config.triggerMaxRetries != tt.wantRetry {
t.Errorf("Parse() triggerMaxRetries = %v, want %v", l.config.triggerMaxRetries, tt.wantRetry)
}
})
}
}

func TestSetupSubrouters(t *testing.T) {
l := NewLauncher().(*pubsubLauncher)
_, _ = l.Parse([]string{"-path_prefix=/api"})

router := mux.NewRouter()
config := &launcher.Config{}

err := l.SetupSubrouters(router, config)
if err != nil {
t.Fatalf("SetupSubrouters() failed: %v", err)
}

// Verify route is registered
req := httptest.NewRequest(http.MethodPost, "/api/apps/my-app/trigger/pubsub", nil)
var match mux.RouteMatch
if !router.Match(req, &match) {
t.Errorf("SetupSubrouters() did not register expected route")
}
}
29 changes: 29 additions & 0 deletions server/adkrest/controllers/triggers/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package triggers

import "time"

// TriggerConfig contains configuration options for triggers.
type TriggerConfig struct {
// MaxRetries is the maximum number of times to retry a failed agent execution.
MaxRetries int
// BaseDelay is the base delay between retries.
BaseDelay time.Duration
// MaxDelay is the maximum delay between retries.
MaxDelay time.Duration
// MaxConcurrentRuns is the maximum number of concurrent runs.
MaxConcurrentRuns int
}
99 changes: 99 additions & 0 deletions server/adkrest/controllers/triggers/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package triggers

import (
"encoding/json"
"fmt"
"net/http"

"google.golang.org/adk/agent"
"google.golang.org/adk/artifact"
"google.golang.org/adk/memory"
"google.golang.org/adk/runner"
"google.golang.org/adk/server/adkrest/internal/models"
"google.golang.org/adk/session"
)

const defaultUserID = "pubsub-caller"

// PubSubController handles the PubSub trigger endpoints.
type PubSubController struct {
sessionService session.Service
agentLoader agent.Loader
memoryService memory.Service
artifactService artifact.Service
pluginConfig runner.PluginConfig
triggerConfig TriggerConfig
semaphore chan struct{}
}

// NewPubSubController creates a new PubSubController.
func NewPubSubController(sessionService session.Service, agentLoader agent.Loader, memoryService memory.Service, artifactService artifact.Service, pluginConfig runner.PluginConfig, triggerConfig TriggerConfig) *PubSubController {
return &PubSubController{
sessionService: sessionService,
agentLoader: agentLoader,
memoryService: memoryService,
artifactService: artifactService,
pluginConfig: pluginConfig,
triggerConfig: triggerConfig,
semaphore: make(chan struct{}, triggerConfig.MaxConcurrentRuns),
}
}

// PubSubTriggerHandler handles the PubSub trigger endpoint.
func (c *PubSubController) PubSubTriggerHandler(w http.ResponseWriter, r *http.Request) {
if c.semaphore != nil {
c.semaphore <- struct{}{}
defer func() { <-c.semaphore }()
}

// Parse the request to the request model.
var req models.PubSubTriggerRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
c.respondError(w, http.StatusBadRequest, fmt.Sprintf("failed to decode request: %v", err))
return
}

// Decode base64 message data.
var messageContent string
if len(req.Message.Data) > 0 {
messageContent = string(req.Message.Data)
// Use Attributes if Data is empty.
} else if len(req.Message.Attributes) > 0 {
attrBytes, err := json.Marshal(req.Message.Attributes)
if err != nil {
c.respondError(w, http.StatusInternalServerError, fmt.Sprintf("failed to marshal attributes: %v", err))
return
}
messageContent = string(attrBytes)
} else {
c.respondError(w, http.StatusBadRequest, "pubsub message contains no data or attributes")
return
}

appName, err := c.appName(r)
if err != nil {
c.respondError(w, http.StatusInternalServerError, err.Error())
return
}

if _, err := c.runAgent(r.Context(), appName, req.Subscription, messageContent); err != nil {
c.respondError(w, http.StatusInternalServerError, fmt.Sprintf("failed to run agent: %v", err))
return
}

c.respondSuccess(w)
}
Loading
Loading