Skip to content

Commit ec6e930

Browse files
authored
[Fix-17767][Master] fix execute task in workflow instance not effective (#18000)
1 parent 215f466 commit ec6e930

File tree

3 files changed

+374
-10
lines changed

3 files changed

+374
-10
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.engine.command.handler;
19+
20+
import static com.google.common.base.Preconditions.checkArgument;
21+
import static org.apache.dolphinscheduler.common.constants.CommandKeyConstants.CMD_PARAM_START_NODES;
22+
23+
import org.apache.dolphinscheduler.common.enums.CommandType;
24+
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
25+
import org.apache.dolphinscheduler.common.utils.JSONUtils;
26+
import org.apache.dolphinscheduler.dao.entity.Command;
27+
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
28+
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
29+
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
30+
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
31+
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
32+
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
33+
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph;
34+
import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowExecutionGraph;
35+
import org.apache.dolphinscheduler.server.master.engine.graph.WorkflowGraphTopologyLogicalVisitor;
36+
import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable;
37+
import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder;
38+
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder;
39+
40+
import org.apache.commons.collections4.CollectionUtils;
41+
import org.apache.commons.lang3.StringUtils;
42+
43+
import java.util.HashSet;
44+
import java.util.List;
45+
import java.util.Map;
46+
import java.util.Objects;
47+
import java.util.Set;
48+
import java.util.function.BiConsumer;
49+
import java.util.function.Function;
50+
import java.util.stream.Collectors;
51+
52+
import org.springframework.beans.factory.annotation.Autowired;
53+
import org.springframework.context.ApplicationContext;
54+
import org.springframework.stereotype.Component;
55+
56+
import com.google.common.base.Splitter;
57+
58+
/**
59+
* This handler is used to handle {@link CommandType#EXECUTE_TASK}.
60+
* <p> It will rerun the given start task and all its downstream tasks in the same workflow instance.
61+
*/
62+
@Component
63+
public class ExecuteTaskCommandHandler extends AbstractCommandHandler {
64+
65+
@Autowired
66+
private WorkflowInstanceDao workflowInstanceDao;
67+
68+
@Autowired
69+
private TaskInstanceDao taskInstanceDao;
70+
71+
@Autowired
72+
private ApplicationContext applicationContext;
73+
74+
@Autowired
75+
private MasterConfig masterConfig;
76+
77+
@Override
78+
protected void assembleWorkflowInstance(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
79+
final Command command = workflowExecuteContextBuilder.getCommand();
80+
final int workflowInstanceId = command.getWorkflowInstanceId();
81+
final WorkflowInstance workflowInstance = workflowInstanceDao.queryOptionalById(workflowInstanceId)
82+
.orElseThrow(() -> new IllegalArgumentException("Cannot find WorkflowInstance:" + workflowInstanceId));
83+
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, command.getCommandType().name());
84+
workflowInstance.setCommandType(command.getCommandType());
85+
if (command.getTaskDependType() != null) {
86+
workflowInstance.setTaskDependType(command.getTaskDependType());
87+
}
88+
workflowInstance.setHost(masterConfig.getMasterAddress());
89+
workflowInstanceDao.updateById(workflowInstance);
90+
workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);
91+
}
92+
93+
@Override
94+
protected void assembleWorkflowExecutionGraph(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
95+
final IWorkflowGraph workflowGraph = workflowExecuteContextBuilder.getWorkflowGraph();
96+
final List<String> startNodes = parseStartNodesFromCommand(workflowExecuteContextBuilder, workflowGraph);
97+
final Map<String, TaskInstance> taskInstanceMap = getValidTaskInstance(workflowExecuteContextBuilder
98+
.getWorkflowInstance())
99+
.stream()
100+
.collect(Collectors.toMap(TaskInstance::getName, Function.identity()));
101+
102+
// Mark the selected task and all its downstream task instances as invalid, then trigger them again.
103+
final Set<String> taskNamesNeedRerun = new HashSet<>();
104+
final WorkflowGraphTopologyLogicalVisitor markInvalidTaskVisitor =
105+
WorkflowGraphTopologyLogicalVisitor.builder()
106+
.onWorkflowGraph(workflowGraph)
107+
.taskDependType(workflowExecuteContextBuilder.getWorkflowInstance().getTaskDependType())
108+
.fromTask(startNodes)
109+
.doVisitFunction((task, successors) -> taskNamesNeedRerun.add(task))
110+
.build();
111+
markInvalidTaskVisitor.visit();
112+
113+
final List<TaskInstance> taskInstancesNeedRerun = taskNamesNeedRerun.stream()
114+
.map(taskInstanceMap::remove)
115+
.filter(Objects::nonNull)
116+
.collect(Collectors.toList());
117+
if (CollectionUtils.isNotEmpty(taskInstancesNeedRerun)) {
118+
taskInstanceDao.markTaskInstanceInvalid(taskInstancesNeedRerun);
119+
}
120+
121+
final WorkflowExecutionGraph workflowExecutionGraph = new WorkflowExecutionGraph();
122+
final BiConsumer<String, Set<String>> taskExecutionRunnableCreator = (task, successors) -> {
123+
final TaskExecutionRunnableBuilder taskExecutionRunnableBuilder =
124+
TaskExecutionRunnableBuilder
125+
.builder()
126+
.workflowExecutionGraph(workflowExecutionGraph)
127+
.workflowDefinition(workflowExecuteContextBuilder.getWorkflowDefinition())
128+
.project(workflowExecuteContextBuilder.getProject())
129+
.workflowInstance(workflowExecuteContextBuilder.getWorkflowInstance())
130+
.taskDefinition(workflowGraph.getTaskNodeByName(task))
131+
.taskInstance(taskInstanceMap.get(task))
132+
.workflowEventBus(workflowExecuteContextBuilder.getWorkflowEventBus())
133+
.applicationContext(applicationContext)
134+
.build();
135+
workflowExecutionGraph.addNode(new TaskExecutionRunnable(taskExecutionRunnableBuilder));
136+
workflowExecutionGraph.addEdge(task, successors);
137+
};
138+
139+
final WorkflowGraphTopologyLogicalVisitor workflowGraphTopologyLogicalVisitor =
140+
WorkflowGraphTopologyLogicalVisitor.builder()
141+
.taskDependType(workflowExecuteContextBuilder.getWorkflowInstance().getTaskDependType())
142+
.onWorkflowGraph(workflowGraph)
143+
.fromTask(startNodes)
144+
.doVisitFunction(taskExecutionRunnableCreator)
145+
.build();
146+
workflowGraphTopologyLogicalVisitor.visit();
147+
workflowExecutionGraph.removeUnReachableEdge();
148+
workflowExecuteContextBuilder.setWorkflowExecutionGraph(workflowExecutionGraph);
149+
}
150+
151+
private List<String> parseStartNodesFromCommand(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder,
152+
final IWorkflowGraph workflowGraph) {
153+
final Command command = workflowExecuteContextBuilder.getCommand();
154+
final String startNodes = JSONUtils.getNodeString(command.getCommandParam(), CMD_PARAM_START_NODES);
155+
checkArgument(StringUtils.isNotBlank(startNodes),
156+
"Invalid command param, the start nodes is empty: " + command.getCommandParam());
157+
final List<Long> startNodeCodes = Splitter.on(',')
158+
.trimResults()
159+
.omitEmptyStrings()
160+
.splitToStream(startNodes)
161+
.map(Long::parseLong)
162+
.collect(Collectors.toList());
163+
checkArgument(CollectionUtils.isNotEmpty(startNodeCodes),
164+
"Invalid command param, cannot parse start nodes from command param: " + command.getCommandParam());
165+
return startNodeCodes
166+
.stream()
167+
.map(workflowGraph::getTaskNodeByCode)
168+
.map(TaskDefinition::getName)
169+
.collect(Collectors.toList());
170+
}
171+
172+
@Override
173+
public CommandType commandType() {
174+
return CommandType.EXECUTE_TASK;
175+
}
176+
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/TaskExecutionContextFactory.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,9 @@
4949
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
5050
import org.apache.dolphinscheduler.service.process.ProcessService;
5151

52-
import org.apache.commons.collections4.CollectionUtils;
5352
import org.apache.commons.lang3.StringUtils;
5453

55-
import java.util.Collections;
54+
import java.util.Comparator;
5655
import java.util.List;
5756
import java.util.Map;
5857
import java.util.Objects;
@@ -87,7 +86,8 @@ public TaskExecutionContext createTaskExecutionContext(TaskExecutionContextCreat
8786
final Project project = request.getProject();
8887

8988
final List<Property> varPools =
90-
generateTaskInstanceVarPool(request.getTaskDefinition(), request.getWorkflowExecutionGraph());
89+
generateTaskInstanceVarPool(workflowInstance, request.getTaskDefinition(),
90+
request.getWorkflowExecutionGraph());
9191
taskInstance.setVarPool(VarPoolUtils.serializeVarPool(varPools));
9292

9393
return TaskExecutionContextBuilder.get()
@@ -192,19 +192,26 @@ private Optional<String> getEnvironmentConfigFromDB(final TaskInstance taskInsta
192192
return Optional.ofNullable(environmentOptional.get().getConfig());
193193
}
194194

195-
// The successors of the task instance will be used to generate the var pool
196-
// All out varPool from the successors will be merged into the var pool of the task instance
197-
private List<Property> generateTaskInstanceVarPool(TaskDefinition taskDefinition,
195+
// The predecessors of the task instance will be used to generate the var pool.
196+
// In execute-task(TASK_ONLY) scenario, the predecessor might be outside current execution sub-graph.
197+
// For this case, fallback to workflow varPool to keep compatibility with historical behavior.
198+
private List<Property> generateTaskInstanceVarPool(WorkflowInstance workflowInstance,
199+
TaskDefinition taskDefinition,
198200
IWorkflowExecutionGraph workflowExecutionGraph) {
199-
List<ITaskExecutionRunnable> predecessors = workflowExecutionGraph.getPredecessors(taskDefinition.getName());
200-
if (CollectionUtils.isEmpty(predecessors)) {
201-
return Collections.emptyList();
201+
final boolean isStartNode = workflowExecutionGraph.getStartNodes()
202+
.stream()
203+
.anyMatch(node -> node.getTaskDefinition().getCode() == taskDefinition.getCode());
204+
if (isStartNode) {
205+
return VarPoolUtils.deserializeVarPool(workflowInstance.getVarPool());
202206
}
203-
List<String> varPoolsFromPredecessors = predecessors
207+
208+
List<String> varPoolsFromPredecessors = workflowExecutionGraph.getPredecessors(taskDefinition.getName())
204209
.stream()
205210
.filter(ITaskExecutionRunnable::isTaskInstanceInitialized)
206211
.map(ITaskExecutionRunnable::getTaskInstance)
212+
.sorted(Comparator.comparing(TaskInstance::getEndTime, Comparator.nullsLast(Comparator.naturalOrder())))
207213
.map(TaskInstance::getVarPool)
214+
.filter(StringUtils::isNotBlank)
208215
.collect(Collectors.toList());
209216
return VarPoolUtils.mergeVarPoolJsonString(varPoolsFromPredecessors);
210217
}

0 commit comments

Comments
 (0)