Skip to content

Commit 09ec882

Browse files
committed
ensure that the length for replicationTasks and taskInfo are the same
Signed-off-by: fimanishi <fimanishi@gmail.com>
1 parent 937b02c commit 09ec882

2 files changed

Lines changed: 8 additions & 4 deletions

File tree

service/history/replication/dlq_handler.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,8 @@ func (r *dlqHandlerImpl) readMessagesWithAckLevel(
236236
return nil, nil, nil, err
237237
}
238238

239-
taskInfo := make([]*types.ReplicationTaskInfo, 0, len(resp.Tasks))
240239
hydrated := make(map[int64]*types.ReplicationTask, len(resp.Tasks))
240+
taskInfos := make([]*types.ReplicationTaskInfo, 0, len(resp.Tasks)) // parallel to resp.Tasks
241241
var needHydration []*types.ReplicationTaskInfo
242242

243243
for _, task := range resp.Tasks {
@@ -256,7 +256,7 @@ func (r *dlqHandlerImpl) readMessagesWithAckLevel(
256256
NextEventID: info.NextEventID,
257257
ScheduledID: info.ScheduledID,
258258
}
259-
taskInfo = append(taskInfo, ti)
259+
taskInfos = append(taskInfos, ti)
260260

261261
if task.Task != nil {
262262
replicationTask, err := r.serializer.DeserializeReplicationDLQTask(task.Task)
@@ -291,15 +291,19 @@ func (r *dlqHandlerImpl) readMessagesWithAckLevel(
291291
}
292292
}
293293

294+
// Assemble both slices in the same loop so they always have equal length and matching order.
295+
// Tasks missing from hydration (source workflow deleted) are skipped from both.
294296
replicationTasks := make([]*types.ReplicationTask, 0, len(resp.Tasks))
295-
for _, task := range resp.Tasks {
297+
taskInfo := make([]*types.ReplicationTaskInfo, 0, len(resp.Tasks))
298+
for i, task := range resp.Tasks {
296299
rt, ok := hydrated[task.Info.TaskID]
297300
if !ok {
298301
r.logger.Warn("replication task not found after hydration",
299302
tag.WorkflowDomainID(task.Info.DomainID), tag.WorkflowID(task.Info.WorkflowID), tag.WorkflowRunID(task.Info.RunID), tag.TaskID(task.Info.TaskID))
300303
continue
301304
}
302305
replicationTasks = append(replicationTasks, rt)
306+
taskInfo = append(taskInfo, taskInfos[i])
303307
}
304308

305309
return replicationTasks, taskInfo, resp.NextPageToken, nil

service/history/replication/dlq_handler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ func (s *dlqHandlerSuite) TestReadMessagesWithAckLevel_MissingFromRemote() {
563563

564564
s.NoError(err)
565565
s.Empty(replicationTasks)
566-
s.Len(taskInfo, 1) // taskInfo is always populated regardless
566+
s.Empty(taskInfo) // task skipped from both slices when missing from remote
567567
}
568568

569569
func (s *dlqHandlerSuite) TestPurgeMessages() {

0 commit comments

Comments
 (0)