Skip to content

Commit e786a42

Browse files
feat(scheduler): Enforce concurrency_limit for CONCURRENT overlap policy in scheduler (#8024)
<!-- If you are new to contributing or want a refresher, please read ./pull_request_guidance.md --> **What changed?** - Implement SchedulePolicies.concurrency_limit for CONCURRENT by tracking multiple in-flight target runs and enforcing the cap in processScheduleFireActivity **Why?** - SchedulePolicies.ConcurrencyLimit was defined in the IDL but is not used— CONCURRENT always allowed unlimited parallel runs. We need a way to cap how many instances of a target workflow run simultaneously - **Why RunningWorkflows?** Unlike LastStartedWorkflow (which tracks only the the last started workflow), bounded CONCURRENT must track all in-flight workflows. On each fire, the activity describes every tracked workflow, prunes completed entries, and compares the surviving count against the cap. RunningWorkflows is the state that carries this set across fires and across ContinueAsNew. - **Why MaxConcurrencyLimitSystemLimit = 1000** ? RunningWorkflows is serialized into the ContinueAsNew payload. An unbounded user-configured limit could grow this slice enough to breach Cadence's 2MB BlobSizeLimitError, which causes the workflow to fail with no graceful recovery. This is mirroring the existing MaxBufferedFiresSystemLimit pattern for the BUFFER policy. - **Why handleUpdate cleanup**? When a schedule switches away from bounded CONCURRENT to different policy, or limit set to 0, the tracked list becomes meaningless. handleUpdate clears it to avoid carrying stale entries. **How did you test it?** - Unit tests for processScheduleFireActivity: at-capacity skip, room-available start, completed-workflow pruning, describe error propagation, and AlreadyStartedError handling - Metrics tests: bounded-CONCURRENT skip emits SchedulerFireSkippedCountPerDomain with the correct overlap-policy tag - handleUpdate tests: RunningWorkflows is cleared when leaving bounded CONCURRENT (policy change or limit → 0), and preserved when only the limit changes between positive values **Potential risks** - ConcurrencyLimit flag is missing from CLI command. A user running cadence schedule create --help sees no mention of concurrency_limit at all. - Early return guard in schedule_commands silently lost the value. https://github.com/cadence-workflow/cadence/blob/95a7d9d56a25988b7bbfd7143f68597dbfe3fbcd/tools/cli/schedule_commands.go#L371 **Release notes** N/A **Documentation Changes** - consider adding an operator note that ConcurrencyLimit should be kept below a certain threshold to avoid workflow-state size concerns --------- Signed-off-by: YaweiZhang-930 <yawei930@gmail.com>
1 parent fd93219 commit e786a42

5 files changed

Lines changed: 383 additions & 2 deletions

File tree

service/worker/scheduler/activity.go

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,32 @@ func processScheduleFireActivity(ctx context.Context, req ProcessFireRequest) (r
7575
policy = types.ScheduleOverlapPolicySkipNew
7676
}
7777

78+
// Bounded CONCURRENT: describe each tracked in-flight workflow, prune
79+
// completed entries, and enforce the cap. When under the cap, falls through
80+
// to the shared start block; stillRunning is used there to build
81+
// result.ActiveWorkflows. When at or over the cap, returns early with a skip.
82+
isBoundedConcurrent := policy == types.ScheduleOverlapPolicyConcurrent && req.ConcurrencyLimit > 0
83+
var stillRunning []RunningWorkflowInfo
84+
if isBoundedConcurrent {
85+
effectiveLimit := effectiveConcurrencyLimit(req.ConcurrencyLimit)
86+
for _, wf := range req.RunningWorkflows {
87+
running, err := isWorkflowRunning(ctx, sc.FrontendClient, req.Domain, &wf)
88+
if err != nil {
89+
return nil, err
90+
}
91+
if running {
92+
stillRunning = append(stillRunning, wf)
93+
}
94+
}
95+
if len(stillRunning) >= int(effectiveLimit) {
96+
scope.Tagged(metrics.OverlapPolicyTag(policy.String()), metrics.TriggerSourceTag(string(req.TriggerSource))).
97+
IncCounter(metrics.SchedulerFireSkippedCountPerDomain)
98+
result.SkippedDelta = 1
99+
result.ActiveWorkflows = stillRunning
100+
return result, nil
101+
}
102+
}
103+
78104
if policy != types.ScheduleOverlapPolicyConcurrent && req.LastStartedWorkflow != nil {
79105
running, err := isWorkflowRunning(ctx, sc.FrontendClient, req.Domain, req.LastStartedWorkflow)
80106
if err != nil {
@@ -135,11 +161,15 @@ func processScheduleFireActivity(ctx context.Context, req ProcessFireRequest) (r
135161
var alreadyStarted *types.WorkflowExecutionAlreadyStartedError
136162
if errors.As(err, &alreadyStarted) {
137163
scope.Tagged(metrics.TriggerSourceTag(string(req.TriggerSource))).IncCounter(metrics.SchedulerFireAlreadyRunningCountPerDomain)
138-
result.SkippedDelta = 1
139-
result.StartedWorkflow = &RunningWorkflowInfo{
164+
existing := &RunningWorkflowInfo{
140165
WorkflowID: workflowID,
141166
RunID: alreadyStarted.RunID,
142167
}
168+
result.SkippedDelta = 1
169+
result.StartedWorkflow = existing
170+
if isBoundedConcurrent {
171+
result.ActiveWorkflows = append(stillRunning, *existing)
172+
}
143173
return result, nil
144174
}
145175
return nil, fmt.Errorf("failed to start workflow: %w", err)
@@ -155,6 +185,9 @@ func processScheduleFireActivity(ctx context.Context, req ProcessFireRequest) (r
155185
WorkflowID: workflowID,
156186
RunID: resp.GetRunID(),
157187
}
188+
if isBoundedConcurrent {
189+
result.ActiveWorkflows = append(stillRunning, *result.StartedWorkflow)
190+
}
158191
return result, nil
159192
}
160193

service/worker/scheduler/activity_test.go

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,175 @@ func TestProcessScheduleFireActivity(t *testing.T) {
383383
StartedWorkflow: &RunningWorkflowInfo{WorkflowID: expectedWfID, RunID: "new-run"},
384384
},
385385
},
386+
{
387+
name: "CONCURRENT with cap: at capacity, skips new fire",
388+
req: func() ProcessFireRequest {
389+
r := baseReq
390+
r.OverlapPolicy = types.ScheduleOverlapPolicyConcurrent
391+
r.ConcurrencyLimit = 2
392+
r.RunningWorkflows = []RunningWorkflowInfo{
393+
{WorkflowID: "wf-1", RunID: "run-1"},
394+
{WorkflowID: "wf-2", RunID: "run-2"},
395+
}
396+
return r
397+
}(),
398+
setupMock: func(m *frontend.MockClient) {
399+
m.EXPECT().DescribeWorkflowExecution(gomock.Any(), gomock.Any()).
400+
Return(&types.DescribeWorkflowExecutionResponse{
401+
WorkflowExecutionInfo: &types.WorkflowExecutionInfo{CloseStatus: nil},
402+
}, nil).Times(2)
403+
},
404+
wantResult: &ProcessFireResult{
405+
SkippedDelta: 1,
406+
ActiveWorkflows: []RunningWorkflowInfo{
407+
{WorkflowID: "wf-1", RunID: "run-1"},
408+
{WorkflowID: "wf-2", RunID: "run-2"},
409+
},
410+
},
411+
},
412+
{
413+
name: "CONCURRENT with cap: slot available, starts new workflow",
414+
req: func() ProcessFireRequest {
415+
r := baseReq
416+
r.OverlapPolicy = types.ScheduleOverlapPolicyConcurrent
417+
r.ConcurrencyLimit = 3
418+
r.RunningWorkflows = []RunningWorkflowInfo{
419+
{WorkflowID: "wf-1", RunID: "run-1"},
420+
{WorkflowID: "wf-2", RunID: "run-2"},
421+
}
422+
return r
423+
}(),
424+
setupMock: func(m *frontend.MockClient) {
425+
m.EXPECT().DescribeWorkflowExecution(gomock.Any(), gomock.Any()).
426+
Return(&types.DescribeWorkflowExecutionResponse{
427+
WorkflowExecutionInfo: &types.WorkflowExecutionInfo{CloseStatus: nil},
428+
}, nil).Times(2)
429+
m.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any()).
430+
Return(&types.StartWorkflowExecutionResponse{RunID: "new-run"}, nil)
431+
},
432+
wantResult: &ProcessFireResult{
433+
TotalDelta: 1,
434+
StartedWorkflow: &RunningWorkflowInfo{WorkflowID: expectedWfID, RunID: "new-run"},
435+
ActiveWorkflows: []RunningWorkflowInfo{
436+
{WorkflowID: "wf-1", RunID: "run-1"},
437+
{WorkflowID: "wf-2", RunID: "run-2"},
438+
{WorkflowID: expectedWfID, RunID: "new-run"},
439+
},
440+
},
441+
},
442+
{
443+
name: "CONCURRENT with cap: completed workflows pruned, freed slot starts new workflow",
444+
req: func() ProcessFireRequest {
445+
r := baseReq
446+
r.OverlapPolicy = types.ScheduleOverlapPolicyConcurrent
447+
r.ConcurrencyLimit = 2
448+
r.RunningWorkflows = []RunningWorkflowInfo{
449+
{WorkflowID: "wf-1", RunID: "run-1"},
450+
{WorkflowID: "wf-2", RunID: "run-2"},
451+
}
452+
return r
453+
}(),
454+
setupMock: func(m *frontend.MockClient) {
455+
closed := types.WorkflowExecutionCloseStatus(0)
456+
// wf-1 is closed; wf-2 is still running
457+
gomock.InOrder(
458+
m.EXPECT().DescribeWorkflowExecution(gomock.Any(), gomock.Any()).
459+
Return(&types.DescribeWorkflowExecutionResponse{
460+
WorkflowExecutionInfo: &types.WorkflowExecutionInfo{CloseStatus: &closed},
461+
}, nil),
462+
m.EXPECT().DescribeWorkflowExecution(gomock.Any(), gomock.Any()).
463+
Return(&types.DescribeWorkflowExecutionResponse{
464+
WorkflowExecutionInfo: &types.WorkflowExecutionInfo{CloseStatus: nil},
465+
}, nil),
466+
)
467+
m.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any()).
468+
Return(&types.StartWorkflowExecutionResponse{RunID: "new-run"}, nil)
469+
},
470+
wantResult: &ProcessFireResult{
471+
TotalDelta: 1,
472+
StartedWorkflow: &RunningWorkflowInfo{WorkflowID: expectedWfID, RunID: "new-run"},
473+
ActiveWorkflows: []RunningWorkflowInfo{
474+
{WorkflowID: "wf-2", RunID: "run-2"},
475+
{WorkflowID: expectedWfID, RunID: "new-run"},
476+
},
477+
},
478+
},
479+
{
480+
name: "CONCURRENT with cap: all tracked workflows completed, starts new workflow",
481+
req: func() ProcessFireRequest {
482+
r := baseReq
483+
r.OverlapPolicy = types.ScheduleOverlapPolicyConcurrent
484+
r.ConcurrencyLimit = 1
485+
r.RunningWorkflows = []RunningWorkflowInfo{
486+
{WorkflowID: "wf-1", RunID: "run-1"},
487+
}
488+
return r
489+
}(),
490+
setupMock: func(m *frontend.MockClient) {
491+
closed := types.WorkflowExecutionCloseStatus(0)
492+
m.EXPECT().DescribeWorkflowExecution(gomock.Any(), gomock.Any()).
493+
Return(&types.DescribeWorkflowExecutionResponse{
494+
WorkflowExecutionInfo: &types.WorkflowExecutionInfo{CloseStatus: &closed},
495+
}, nil)
496+
m.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any()).
497+
Return(&types.StartWorkflowExecutionResponse{RunID: "new-run"}, nil)
498+
},
499+
wantResult: &ProcessFireResult{
500+
TotalDelta: 1,
501+
StartedWorkflow: &RunningWorkflowInfo{WorkflowID: expectedWfID, RunID: "new-run"},
502+
ActiveWorkflows: []RunningWorkflowInfo{
503+
{WorkflowID: expectedWfID, RunID: "new-run"},
504+
},
505+
},
506+
},
507+
{
508+
name: "CONCURRENT with cap: describe error during slot check propagates",
509+
req: func() ProcessFireRequest {
510+
r := baseReq
511+
r.OverlapPolicy = types.ScheduleOverlapPolicyConcurrent
512+
r.ConcurrencyLimit = 2
513+
r.RunningWorkflows = []RunningWorkflowInfo{
514+
{WorkflowID: "wf-1", RunID: "run-1"},
515+
}
516+
return r
517+
}(),
518+
setupMock: func(m *frontend.MockClient) {
519+
m.EXPECT().DescribeWorkflowExecution(gomock.Any(), gomock.Any()).
520+
Return(nil, errors.New("connection refused"))
521+
},
522+
wantErr: true,
523+
},
524+
{
525+
name: "CONCURRENT with cap: already-started includes workflow in active set",
526+
req: func() ProcessFireRequest {
527+
r := baseReq
528+
r.OverlapPolicy = types.ScheduleOverlapPolicyConcurrent
529+
r.ConcurrencyLimit = 3
530+
r.RunningWorkflows = []RunningWorkflowInfo{
531+
{WorkflowID: "wf-1", RunID: "run-1"},
532+
}
533+
return r
534+
}(),
535+
setupMock: func(m *frontend.MockClient) {
536+
m.EXPECT().DescribeWorkflowExecution(gomock.Any(), gomock.Any()).
537+
Return(&types.DescribeWorkflowExecutionResponse{
538+
WorkflowExecutionInfo: &types.WorkflowExecutionInfo{CloseStatus: nil},
539+
}, nil)
540+
m.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any()).
541+
Return(nil, &types.WorkflowExecutionAlreadyStartedError{
542+
Message: "already started",
543+
RunID: "existing-run",
544+
})
545+
},
546+
wantResult: &ProcessFireResult{
547+
SkippedDelta: 1,
548+
StartedWorkflow: &RunningWorkflowInfo{WorkflowID: expectedWfID, RunID: "existing-run"},
549+
ActiveWorkflows: []RunningWorkflowInfo{
550+
{WorkflowID: "wf-1", RunID: "run-1"},
551+
{WorkflowID: expectedWfID, RunID: "existing-run"},
552+
},
553+
},
554+
},
386555
{
387556
name: "AlreadyStartedError returns skipped with RunID",
388557
req: baseReq,
@@ -648,6 +817,32 @@ func TestProcessScheduleFireActivityMetrics(t *testing.T) {
648817
metrics.SchedulerFireErrorCountPerDomain,
649818
},
650819
},
820+
{
821+
name: "CONCURRENT with cap at capacity: emits skipped counter",
822+
req: func() ProcessFireRequest {
823+
r := baseReq
824+
r.OverlapPolicy = types.ScheduleOverlapPolicyConcurrent
825+
r.ConcurrencyLimit = 2
826+
r.RunningWorkflows = []RunningWorkflowInfo{
827+
{WorkflowID: "wf-1", RunID: "run-1"},
828+
{WorkflowID: "wf-2", RunID: "run-2"},
829+
}
830+
return r
831+
}(),
832+
setupMock: func(m *frontend.MockClient) {
833+
m.EXPECT().DescribeWorkflowExecution(gomock.Any(), gomock.Any()).
834+
Return(&types.DescribeWorkflowExecutionResponse{
835+
WorkflowExecutionInfo: &types.WorkflowExecutionInfo{CloseStatus: nil},
836+
}, nil).Times(2)
837+
},
838+
wantCounters: []metrics.MetricIdx{metrics.SchedulerFireSkippedCountPerDomain},
839+
wantNoCounter: []metrics.MetricIdx{
840+
metrics.SchedulerFireStartedCountPerDomain,
841+
metrics.SchedulerFireBufferedCountPerDomain,
842+
metrics.SchedulerFireAlreadyRunningCountPerDomain,
843+
metrics.SchedulerFireErrorCountPerDomain,
844+
},
845+
},
651846
{
652847
name: "start error emits error counter",
653848
req: baseReq,
@@ -819,3 +1014,22 @@ func TestProcessScheduleFireActivityLatency(t *testing.T) {
8191014
})
8201015
}
8211016
}
1017+
1018+
func TestEffectiveConcurrencyLimit(t *testing.T) {
1019+
tests := []struct {
1020+
name string
1021+
userLimit int32
1022+
want int32
1023+
}{
1024+
{"below system limit returned as-is", 1, 1},
1025+
{"typical value returned as-is", 10, 10},
1026+
{"at system limit returned as-is", MaxConcurrencyLimitSystemLimit, MaxConcurrencyLimitSystemLimit},
1027+
{"one above system limit clamped", MaxConcurrencyLimitSystemLimit + 1, MaxConcurrencyLimitSystemLimit},
1028+
{"large value clamped to system limit", 10000, MaxConcurrencyLimitSystemLimit},
1029+
}
1030+
for _, tc := range tests {
1031+
t.Run(tc.name, func(t *testing.T) {
1032+
assert.Equal(t, tc.want, effectiveConcurrencyLimit(tc.userLimit))
1033+
})
1034+
}
1035+
}

service/worker/scheduler/types.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@ const (
7575
// entries stays well within the workflow input size limit.
7676
MaxBufferedFiresSystemLimit = 1000
7777

78+
// MaxConcurrencyLimitSystemLimit caps ConcurrencyLimit for the bounded CONCURRENT
79+
// overlap policy regardless of the user-configured value. It bounds the
80+
// RunningWorkflows slice carried in ContinueAsNew payload: each RunningWorkflowInfo
81+
// is ~110 bytes JSON, so 1000 entries adds ~107KB — well within the 2MB hard limit
82+
// and leaving headroom for the rest of the workflow state. Exceeding the 2MB limit
83+
// causes Cadence to fail the workflow entirely with no graceful degradation.
84+
MaxConcurrencyLimitSystemLimit = 1000
85+
7886
// signal_type tag values for scheduler_signal_received_count metric.
7987
signalTypeTagPause = "pause"
8088
signalTypeTagUnpause = "unpause"
@@ -152,6 +160,9 @@ type SchedulerWorkflowState struct {
152160
// the overlap policy can check whether it is still running before starting
153161
// the next one. Nil when no workflow has been started yet.
154162
LastStartedWorkflow *RunningWorkflowInfo `json:"lastStartedWorkflow,omitempty"`
163+
// RunningWorkflows holds in-flight target workflows under bounded CONCURRENT
164+
// (ConcurrencyLimit > 0); completed entries are pruned by the activity on each fire.
165+
RunningWorkflows []RunningWorkflowInfo `json:"runningWorkflows,omitempty"`
155166
}
156167

157168
// BufferedFire is a schedule fire queued for sequential execution by the BUFFER
@@ -259,6 +270,11 @@ type ProcessFireRequest struct {
259270
TriggerSource TriggerSource `json:"triggerSource"`
260271
OverlapPolicy types.ScheduleOverlapPolicy `json:"overlapPolicy"`
261272
LastStartedWorkflow *RunningWorkflowInfo `json:"lastStartedWorkflow,omitempty"`
273+
// ConcurrencyLimit mirrors SchedulePolicies.ConcurrencyLimit; 0 = unlimited.
274+
ConcurrencyLimit int32 `json:"concurrencyLimit,omitempty"`
275+
// RunningWorkflows is the current in-flight set from workflow state; used
276+
// only when OverlapPolicy==CONCURRENT and ConcurrencyLimit > 0.
277+
RunningWorkflows []RunningWorkflowInfo `json:"runningWorkflows,omitempty"`
262278
}
263279

264280
// ProcessFireResult is the output of processScheduleFireActivity. The workflow
@@ -272,4 +288,7 @@ type ProcessFireResult struct {
272288
// appends the fire to state.BufferedFires and retries draining on the
273289
// next loop iteration.
274290
Buffered bool `json:"buffered,omitempty"`
291+
// ActiveWorkflows is the updated in-flight set for bounded CONCURRENT; the workflow
292+
// replaces state.RunningWorkflows with it after each fire. Nil for all other policies.
293+
ActiveWorkflows []RunningWorkflowInfo `json:"activeWorkflows,omitempty"`
275294
}

service/worker/scheduler/workflow.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,20 @@ func handleUpdate(logger *zap.Logger, sig UpdateSignal, input *SchedulerWorkflow
422422
state.SkippedRuns += int64(len(state.BufferedFires))
423423
state.BufferedFires = nil
424424
}
425+
// Drop running-workflow tracking when leaving bounded CONCURRENT: the
426+
// list is meaningless under any other policy or when limit becomes 0.
427+
newOverlap := input.Policies.OverlapPolicy
428+
newLimit := input.Policies.ConcurrencyLimit
429+
if previousOverlap == types.ScheduleOverlapPolicyConcurrent &&
430+
(newOverlap != types.ScheduleOverlapPolicyConcurrent || newLimit == 0) &&
431+
len(state.RunningWorkflows) > 0 {
432+
logger.Warn("policy change cleared running workflows tracking",
433+
zap.String("from", previousOverlap.String()),
434+
zap.String("to", newOverlap.String()),
435+
zap.Int32("newLimit", newLimit),
436+
zap.Int("clearedCount", len(state.RunningWorkflows)))
437+
state.RunningWorkflows = nil
438+
}
425439
}
426440
if changed {
427441
logger.Info("schedule updated")
@@ -533,6 +547,8 @@ func tryStartFire(ctx workflow.Context, logger *zap.Logger, input *SchedulerWork
533547
TriggerSource: trigger,
534548
OverlapPolicy: input.Policies.OverlapPolicy,
535549
LastStartedWorkflow: state.LastStartedWorkflow,
550+
ConcurrencyLimit: input.Policies.ConcurrencyLimit,
551+
RunningWorkflows: state.RunningWorkflows,
536552
}
537553

538554
var result ProcessFireResult
@@ -554,6 +570,9 @@ func tryStartFire(ctx workflow.Context, logger *zap.Logger, input *SchedulerWork
554570
if result.StartedWorkflow != nil {
555571
state.LastStartedWorkflow = result.StartedWorkflow
556572
}
573+
if result.ActiveWorkflows != nil {
574+
state.RunningWorkflows = result.ActiveWorkflows
575+
}
557576

558577
if result.TotalDelta > 0 && result.StartedWorkflow != nil {
559578
logger.Info("scheduled workflow started",
@@ -612,6 +631,18 @@ func effectiveBufferLimit(userLimit int32) (effective int, reason string) {
612631
return int(userLimit), BufferOverflowReasonUserLimit
613632
}
614633

634+
// effectiveConcurrencyLimit returns the concurrency cap enforced for the bounded
635+
// CONCURRENT overlap policy. Values above the system ceiling are silently clamped
636+
// so RunningWorkflows never grows large enough to bloat the ContinueAsNew payload
637+
// toward Cadence's BlobSizeLimitError (default 2MB). Only called when userLimit > 0
638+
// (i.e., isBoundedConcurrent is true).
639+
func effectiveConcurrencyLimit(userLimit int32) int32 {
640+
if userLimit > MaxConcurrencyLimitSystemLimit {
641+
return MaxConcurrencyLimitSystemLimit
642+
}
643+
return userLimit
644+
}
645+
615646
// drainBufferedFires executes queued fires in FIFO order, stopping as soon as
616647
// one re-buffers (previous target workflow still running) or
617648
// maxDrainFiresPerExecution fires have been processed. Returns true when more

0 commit comments

Comments
 (0)