Skip to content

Commit 4f25dc7

Browse files
authored
Merge branch 'dev' into fix-16754-datax-batchsize
2 parents 4931239 + 74baefa commit 4f25dc7

2 files changed

Lines changed: 151 additions & 23 deletions

File tree

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowInstanceServiceImpl.java

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -784,31 +784,18 @@ public Map<String, Object> viewVariables(long projectCode, Integer workflowInsta
784784
timezone = commandParam.getTimeZone();
785785
}
786786

787-
Map<String, String> timeParams = BusinessTimeUtils
787+
Map<String, String> parameterMap = BusinessTimeUtils
788788
.getBusinessTime(workflowInstance.getCmdTypeIfComplement(),
789789
workflowInstance.getScheduleTime(), timezone);
790-
String userDefinedParams = workflowInstance.getGlobalParams();
791-
// global params
792-
List<Property> globalParams = new ArrayList<>();
793790

794-
// global param string
795-
String globalParamStr =
796-
ParameterUtils.convertParameterPlaceholders(GlobalParameterUtils.serializeGlobalParameter(globalParams),
797-
timeParams);
798-
globalParams = GlobalParameterUtils.deserializeGlobalParameter(globalParamStr);
799-
for (Property property : globalParams) {
800-
timeParams.put(property.getProp(), property.getValue());
801-
}
802-
803-
if (userDefinedParams != null && userDefinedParams.length() > 0) {
804-
globalParams = GlobalParameterUtils.deserializeGlobalParameter(userDefinedParams);
805-
}
791+
// finalGlobalParams
792+
List<Property> finalGlobalParams = processGlobalParams(workflowInstance, parameterMap);
806793

807-
Map<String, Map<String, Object>> localUserDefParams = getLocalParams(workflowInstance, timeParams);
794+
// localUserDefParams
795+
Map<String, Map<String, Object>> localUserDefParams = processLocalParams(workflowInstance, parameterMap);
808796

809797
Map<String, Object> resultMap = new HashMap<>();
810-
811-
resultMap.put(GLOBAL_PARAMS, globalParams);
798+
resultMap.put(GLOBAL_PARAMS, finalGlobalParams);
812799
resultMap.put(LOCAL_PARAMS, localUserDefParams);
813800

814801
result.put(DATA_LIST, resultMap);
@@ -817,25 +804,63 @@ public Map<String, Object> viewVariables(long projectCode, Integer workflowInsta
817804
}
818805

819806
/**
820-
* get local params
807+
* Process global parameters: resolve placeholders and merge into context.
808+
*
809+
* @param workflowInstance The workflow instance.
810+
* @param parameterMap Context parameters for placeholder replacement and merging
811+
* @return Deserialized global properties list
812+
*/
813+
private List<Property> processGlobalParams(WorkflowInstance workflowInstance, Map<String, String> parameterMap) {
814+
List<Property> finalGlobalParams = new ArrayList<>();
815+
816+
String globalParamsJson = workflowInstance.getGlobalParams();
817+
if (StringUtils.isNotEmpty(globalParamsJson)) {
818+
// Replace placeholders
819+
String replacedJsonStr = ParameterUtils.convertParameterPlaceholders(globalParamsJson, parameterMap);
820+
finalGlobalParams = GlobalParameterUtils.deserializeGlobalParameter(replacedJsonStr);
821+
822+
// Merge into context map
823+
if (finalGlobalParams != null) {
824+
for (Property property : finalGlobalParams) {
825+
if (property.getProp() != null && property.getValue() != null) {
826+
parameterMap.put(property.getProp(), property.getValue());
827+
}
828+
}
829+
}
830+
}
831+
return finalGlobalParams;
832+
}
833+
834+
/**
835+
* Process local parameters for tasks within a workflow instance.
836+
*
837+
* @param workflowInstance The workflow instance.
838+
* @param parameterMap Context parameters for placeholder replacement.
839+
* @return Map of task name to its local parameters and type.
821840
*/
822-
private Map<String, Map<String, Object>> getLocalParams(WorkflowInstance workflowInstance,
823-
Map<String, String> timeParams) {
841+
private Map<String, Map<String, Object>> processLocalParams(WorkflowInstance workflowInstance,
842+
Map<String, String> parameterMap) {
824843
Map<String, Map<String, Object>> localUserDefParams = new HashMap<>();
844+
845+
// Fetch valid task instances for the workflow
825846
List<TaskInstance> taskInstanceList =
826847
taskInstanceMapper.findValidTaskListByWorkflowInstanceId(workflowInstance.getId(), Flag.YES);
848+
827849
for (TaskInstance taskInstance : taskInstanceList) {
828850
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
829851
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
830852

831853
String localParams = JSONUtils.getNodeString(taskDefinitionLog.getTaskParams(), LOCAL_PARAMS);
854+
832855
if (!StringUtils.isEmpty(localParams)) {
833-
localParams = ParameterUtils.convertParameterPlaceholders(localParams, timeParams);
856+
// Replace placeholders and deserialize
857+
localParams = ParameterUtils.convertParameterPlaceholders(localParams, parameterMap);
834858
List<Property> localParamsList = JSONUtils.toList(localParams, Property.class);
835859

836860
Map<String, Object> localParamsMap = new HashMap<>();
837861
localParamsMap.put(TASK_TYPE, taskDefinitionLog.getTaskType());
838862
localParamsMap.put(LOCAL_PARAMS_LIST, localParamsList);
863+
839864
if (CollectionUtils.isNotEmpty(localParamsList)) {
840865
localUserDefParams.put(taskDefinitionLog.getName(), localParamsMap);
841866
}

dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceServiceTest.java

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
4343
import org.apache.dolphinscheduler.common.utils.DateUtils;
4444
import org.apache.dolphinscheduler.common.utils.JSONUtils;
45+
import org.apache.dolphinscheduler.dao.AlertDao;
4546
import org.apache.dolphinscheduler.dao.entity.DependentResultTaskInstanceContext;
4647
import org.apache.dolphinscheduler.dao.entity.Project;
4748
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
@@ -56,26 +57,30 @@
5657
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
5758
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
5859
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
60+
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
5961
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
6062
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionLogMapper;
6163
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
6264
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
6365
import org.apache.dolphinscheduler.dao.repository.TaskInstanceContextDao;
6466
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
6567
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
68+
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceMapDao;
6669
import org.apache.dolphinscheduler.extract.master.command.RunWorkflowCommandParam;
6770
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
6871
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
6972
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
7073
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
7174
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
7275
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
76+
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
7377
import org.apache.dolphinscheduler.service.model.TaskNode;
7478
import org.apache.dolphinscheduler.service.process.ProcessService;
7579

7680
import java.io.IOException;
7781
import java.text.MessageFormat;
7882
import java.util.ArrayList;
83+
import java.util.Calendar;
7984
import java.util.Date;
8085
import java.util.HashMap;
8186
import java.util.List;
@@ -145,6 +150,21 @@ public class WorkflowInstanceServiceTest {
145150
@Mock
146151
private TaskInstanceContextDao taskInstanceContextDao;
147152

153+
@Mock
154+
TaskInstanceMapper taskInstanceMapper;
155+
156+
@Mock
157+
CuringParamsService curingGlobalParamsService;
158+
159+
@Mock
160+
TaskInstanceService taskInstanceService;
161+
162+
@Mock
163+
WorkflowInstanceMapDao workflowInstanceMapDao;
164+
165+
@Mock
166+
AlertDao alertDao;
167+
148168
private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
149169
+ "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
150170
+ "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";
@@ -768,6 +788,89 @@ public void testViewVariables() {
768788
Assertions.assertEquals(Status.WORKFLOW_INSTANCE_NOT_EXIST, processNotExist.get(Constants.STATUS));
769789
}
770790

791+
@Test
792+
public void testViewVariables_WithTimePlaceholders() {
793+
String globalParamsJson = "[{\"prop\":\"biz_date\",\"value\":\"$[yyyyMMdd]\",\"type\":\"VARCHAR\"}," +
794+
"{\"prop\":\"env\",\"value\":\"${ENV_TYPE}\",\"type\":\"VARCHAR\"}]";
795+
796+
WorkflowInstance workflowInstance = getProcessInstance();
797+
workflowInstance.setId(1);
798+
workflowInstance.setCommandType(CommandType.SCHEDULER);
799+
800+
Calendar calendar = Calendar.getInstance();
801+
calendar.set(2026, Calendar.MARCH, 13, 10, 0, 0);
802+
workflowInstance.setScheduleTime(calendar.getTime());
803+
workflowInstance.setGlobalParams(globalParamsJson);
804+
workflowInstance.setWorkflowDefinitionCode(100L);
805+
806+
when(workflowInstanceMapper.queryDetailById(1)).thenReturn(workflowInstance);
807+
808+
WorkflowDefinition workflowDefinition = new WorkflowDefinition();
809+
workflowDefinition.setCode(100L);
810+
workflowDefinition.setProjectCode(1L);
811+
when(workflowDefinitionMapper.queryByCode(100L)).thenReturn(workflowDefinition);
812+
813+
Map<String, Object> result = workflowInstanceService.viewVariables(1L, 1);
814+
815+
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
816+
817+
Map<String, Object> dataList = (Map<String, Object>) result.get(Constants.DATA_LIST);
818+
Assertions.assertNotNull(dataList);
819+
820+
List<Property> globalParams = (List<Property>) dataList.get(Constants.GLOBAL_PARAMS);
821+
Assertions.assertNotNull(globalParams);
822+
Assertions.assertEquals(2, globalParams.size());
823+
824+
Property dateParam = globalParams.stream()
825+
.filter(p -> "biz_date".equals(p.getProp()))
826+
.findFirst()
827+
.orElse(null);
828+
Assertions.assertNotNull(dateParam);
829+
Assertions.assertEquals("20260313", dateParam.getValue(),
830+
"Time placeholder $[yyyyMMdd] should be replaced with schedule time");
831+
832+
Property envParam = globalParams.stream()
833+
.filter(p -> "env".equals(p.getProp()))
834+
.findFirst()
835+
.orElse(null);
836+
Assertions.assertNotNull(envParam);
837+
}
838+
839+
@Test
840+
public void testViewVariables_InstanceNotFound() {
841+
when(workflowInstanceMapper.queryDetailById(999)).thenReturn(null);
842+
843+
Map<String, Object> result = workflowInstanceService.viewVariables(1L, 999);
844+
845+
Assertions.assertEquals(Status.WORKFLOW_INSTANCE_NOT_EXIST, result.get(Constants.STATUS));
846+
Assertions.assertNull(result.get(Constants.DATA_LIST));
847+
}
848+
849+
@Test
850+
public void testViewVariables_EmptyGlobalParams() {
851+
WorkflowInstance workflowInstance = getProcessInstance();
852+
workflowInstance.setId(2);
853+
workflowInstance.setCommandType(CommandType.START_PROCESS);
854+
workflowInstance.setScheduleTime(new Date());
855+
workflowInstance.setGlobalParams("");
856+
workflowInstance.setWorkflowDefinitionCode(101L);
857+
858+
when(workflowInstanceMapper.queryDetailById(2)).thenReturn(workflowInstance);
859+
860+
WorkflowDefinition workflowDefinition = new WorkflowDefinition();
861+
workflowDefinition.setCode(101L);
862+
workflowDefinition.setProjectCode(1L);
863+
when(workflowDefinitionMapper.queryByCode(101L)).thenReturn(workflowDefinition);
864+
865+
Map<String, Object> result = workflowInstanceService.viewVariables(1L, 2);
866+
867+
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
868+
869+
Map<String, Object> dataList = (Map<String, Object>) result.get(Constants.DATA_LIST);
870+
List<Property> globalParams = (List<Property>) dataList.get(Constants.GLOBAL_PARAMS);
871+
Assertions.assertTrue(globalParams.isEmpty(), "Global params list should be empty when input is empty string");
872+
}
873+
771874
@Test
772875
public void testViewGantt() throws Exception {
773876
WorkflowInstance workflowInstance = getProcessInstance();

0 commit comments

Comments
 (0)