Skip to content

Commit 483d3d8

Browse files
authored
feat: Write DLQ tasks when Standby processing timeout is reached (#8022)
<!-- If you are new to contributing or want a refresher, please read ./pull_request_guidance.md --> **What changed?** Adds a new post action function that writes to the History Task DLQ rather than discarding the task. **Why?** Discarding a task is a complete loss of data. In rare scenarios, it could be possible that tasks are discarded from both clusters (in a multi-cluster setup), resulting in the inability to continue a workflow. **How did you test it?** To run the unit tests: ``` go test ./service/history/... ``` To run a full integration test requires additional persistence changes which are coming in a follow up PR. **Potential risks** This is hidden behind a feature flag. Premature enablement of this flag should not result in running the code path, as the DLQWriter has yet to be wired up to enable writing to the DLQ. **Release notes** N/A **Documentation Changes** N/A
1 parent ab4e4eb commit 483d3d8

20 files changed

Lines changed: 1048 additions & 29 deletions

service/history/config/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,10 @@ type Config struct {
345345
GlobalRatelimiterDecayAfter dynamicproperties.DurationPropertyFn
346346
GlobalRatelimiterGCAfter dynamicproperties.DurationPropertyFn
347347

348+
// History Task DLQ Configuration
349+
HistoryTaskDLQMode dynamicproperties.StringPropertyFnWithDomainFilter
350+
HistoryTaskProcessingInterval dynamicproperties.DurationPropertyFnWithShardIDFilter
351+
348352
// HostName for machine running the service
349353
HostName string
350354
}
@@ -612,6 +616,9 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, i
612616
GlobalRatelimiterDecayAfter: dc.GetDurationProperty(dynamicproperties.HistoryGlobalRatelimiterDecayAfter),
613617
GlobalRatelimiterGCAfter: dc.GetDurationProperty(dynamicproperties.HistoryGlobalRatelimiterGCAfter),
614618

619+
HistoryTaskDLQMode: dc.GetStringPropertyFilteredByDomain(dynamicproperties.HistoryTaskDeadLetterQueueMode),
620+
HistoryTaskProcessingInterval: dc.GetDurationPropertyFilteredByShardID(dynamicproperties.HistoryTaskDLQProcessorInterval),
621+
615622
HostName: hostname,
616623
}
617624

service/history/config/config_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,8 @@ func TestNewConfig(t *testing.T) {
282282
"EnableCorruptionAutoRepair": {dynamicproperties.EnableCorruptionAutoRepair, true},
283283
"CorruptionRepairTimeout": {dynamicproperties.CorruptionRepairTimeout, time.Duration(1)},
284284
"RequireChecksumMatchAfterRebuildRepair": {dynamicproperties.RequireChecksumMatchAfterRebuildRepair, true},
285+
"HistoryTaskDLQMode": {dynamicproperties.HistoryTaskDeadLetterQueueMode, "enabled"},
286+
"HistoryTaskProcessingInterval": {dynamicproperties.HistoryTaskDLQProcessorInterval, time.Second},
285287
}
286288
client := dynamicconfig.NewInMemoryClient()
287289
for fieldName, expected := range fields {

service/history/constants/constants.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,12 @@ var (
3636
ErrHistoryHostThrottle = &types.ServiceBusyError{Message: "History host rps exceeded"}
3737
ErrShuttingDown = &types.InternalServiceError{Message: "Shutting down"}
3838
)
39+
40+
const (
41+
// HistoryTaskDLQModeEnabled enables writing tasks to the DLQ.
42+
HistoryTaskDLQModeEnabled = "enabled"
43+
// HistoryTaskDLQModeDisabled disables writing tasks to the DLQ.
44+
HistoryTaskDLQModeDisabled = "disabled"
45+
// HistoryTaskDLQModeShadow enables writing tasks to the DLQ but does not process the task.
46+
HistoryTaskDLQModeShadow = "shadow"
47+
)

service/history/queue/timer_queue_processor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ func NewTimerQueueProcessor(
128128
shard.GetMetricsClient(),
129129
clusterName,
130130
config,
131+
nil, // TODO(c-warren): wire DLQ writer once persistence layer is written
131132
)
132133
standbyTaskExecutors = append(standbyTaskExecutors, standbyTaskExecutor)
133134
standbyQueueProcessors[clusterName], standbyQueueTimerGates[clusterName] = newTimerQueueStandbyProcessor(

service/history/queue/transfer_queue_processor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ func NewTransferQueueProcessor(
136136
standByLogger,
137137
clusterName,
138138
config,
139+
nil, // TODO(c-warren): wire DLQ writer once persistence layer is written
139140
)
140141
standbyQueueProcessors[clusterName] = newTransferQueueStandbyProcessor(
141142
clusterName,

service/history/queuev2/timer_queue_factory.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ func (f *timerQueueFactory) createQueuev2(
122122
shard.GetMetricsClient(),
123123
shard.GetClusterMetadata().GetCurrentClusterName(),
124124
shard.GetConfig(),
125+
nil, // TODO(c-warren): wire DLQ writer once persistence layer is written
125126
)
126127
executorWrapper := task.NewExecutorWrapper(
127128
shard.GetClusterMetadata().GetCurrentClusterName(),

service/history/queuev2/transfer_queue_factory.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ func (f *transferQueueFactory) createQueuev2(
128128
logger,
129129
shard.GetClusterMetadata().GetCurrentClusterName(),
130130
shard.GetConfig(),
131+
nil, // TODO(c-warren): wire DLQ writer once persistence layer is written
131132
)
132133

133134
executorWrapper := task.NewExecutorWrapper(

service/history/task/standby_task_util.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,27 @@ import (
2828

2929
"github.com/uber/cadence/common"
3030
"github.com/uber/cadence/common/activecluster"
31+
"github.com/uber/cadence/common/dynamicconfig/dynamicproperties"
3132
"github.com/uber/cadence/common/log"
3233
"github.com/uber/cadence/common/log/tag"
3334
"github.com/uber/cadence/common/persistence"
3435
"github.com/uber/cadence/common/types"
36+
"github.com/uber/cadence/service/history/constants"
3537
"github.com/uber/cadence/service/history/execution"
38+
"github.com/uber/cadence/service/history/shard"
39+
"github.com/uber/cadence/service/history/taskdlq"
3640
)
3741

3842
type (
3943
standbyActionFn func(context.Context, execution.Context, execution.MutableState) (interface{}, error)
4044
standbyPostActionFn func(context.Context, persistence.Task, interface{}, log.Logger) error
4145

4246
standbyCurrentTimeFn func(persistence.Task) (time.Time, error)
47+
48+
// TaskDLQWriter is the subset of taskdlq.HistoryTaskDLQStore used by standby task executors.
49+
TaskDLQWriter interface {
50+
AddTask(ctx context.Context, request taskdlq.AddTaskRequest) error
51+
}
4352
)
4453

4554
var (
@@ -91,6 +100,82 @@ func standbyTaskPostActionTaskDiscarded(
91100
return ErrTaskDiscarded
92101
}
93102

103+
// standbyTaskPostActionWriteToDLQ returns a standbyPostActionFn that writes the task to the DLQ.
104+
// If writer is nil, it falls back to standbyTaskPostActionTaskDiscarded (preserving the old discard
105+
// behavior until the DLQ persistence backend is wired up).
106+
func standbyTaskPostActionWriteToDLQ(
107+
writer TaskDLQWriter,
108+
shard shard.Context,
109+
enabled dynamicproperties.StringPropertyFnWithDomainFilter,
110+
) standbyPostActionFn {
111+
if writer == nil {
112+
return standbyTaskPostActionTaskDiscarded
113+
}
114+
115+
shardID := shard.GetShardID()
116+
117+
return func(ctx context.Context, task persistence.Task, postActionInfo interface{}, logger log.Logger) error {
118+
domainID := task.GetDomainID()
119+
isDeadLetterQueueEnabled := enabled(domainID)
120+
121+
if postActionInfo == nil {
122+
return nil
123+
}
124+
125+
clusterAttribute, err := getClusterAttributesForTask(ctx, shard, task)
126+
if err != nil {
127+
if errors.Is(err, errActiveClusterSelectionPolicyNotFound) {
128+
logger.Warn("Active cluster selection policy not found. Defaulting to default scope and name.")
129+
} else {
130+
return err
131+
}
132+
}
133+
134+
if clusterAttribute == nil {
135+
clusterAttribute = &types.ClusterAttribute{
136+
Scope: taskdlq.DefaultClusterAttributeScope,
137+
Name: taskdlq.DefaultClusterAttributeName,
138+
}
139+
}
140+
141+
logger.Warn("Attempting to write standby task to DLQ due to task being pending for too long.",
142+
tag.WorkflowID(task.GetWorkflowID()),
143+
tag.WorkflowRunID(task.GetRunID()),
144+
tag.WorkflowDomainID(task.GetDomainID()),
145+
tag.TaskID(task.GetTaskID()),
146+
tag.TaskType(task.GetTaskType()),
147+
tag.FailoverVersion(task.GetVersion()),
148+
tag.Timestamp(task.GetVisibilityTimestamp()),
149+
tag.IsShadowModeEnabled(isDeadLetterQueueEnabled == constants.HistoryTaskDLQModeShadow))
150+
151+
// TODO(c-warren): Move this logic into the writer instead, and return a ErrHistoryDLQNotEnabled error to be handled here
152+
switch isDeadLetterQueueEnabled {
153+
case constants.HistoryTaskDLQModeEnabled:
154+
return writer.AddTask(ctx, taskdlq.AddTaskRequest{
155+
ShardID: shardID,
156+
DomainID: task.GetDomainID(),
157+
ClusterAttributeScope: clusterAttribute.Scope,
158+
ClusterAttributeName: clusterAttribute.Name,
159+
Task: task,
160+
})
161+
case constants.HistoryTaskDLQModeShadow:
162+
err := writer.AddTask(ctx, taskdlq.AddTaskRequest{
163+
ShardID: shardID,
164+
DomainID: task.GetDomainID(),
165+
ClusterAttributeScope: clusterAttribute.Scope,
166+
ClusterAttributeName: clusterAttribute.Name,
167+
Task: task,
168+
})
169+
if err != nil {
170+
logger.Warn("Failed to write standby task to DLQ in shadow mode. Will discard the task.")
171+
}
172+
return standbyTaskPostActionTaskDiscarded(ctx, task, postActionInfo, logger)
173+
default:
174+
return standbyTaskPostActionTaskDiscarded(ctx, task, postActionInfo, logger)
175+
}
176+
}
177+
}
178+
94179
type (
95180
historyResendInfo struct {
96181
// used by NDC

0 commit comments

Comments
 (0)