Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement-16887][Dependent Task] Dependent task improvement #16910

Merged
merged 20 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ContextType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.graph.DAG;
Expand All @@ -53,11 +54,15 @@
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.AbstractTaskInstanceContext;
import org.apache.dolphinscheduler.dao.entity.DependentResultTaskInstanceContext;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.RelationSubWorkflow;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstanceContext;
import org.apache.dolphinscheduler.dao.entity.TaskInstanceDependentDetails;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
Expand All @@ -70,6 +75,7 @@
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceContextDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceMapDao;
Expand All @@ -96,6 +102,7 @@

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -174,6 +181,9 @@ public class WorkflowInstanceServiceImpl extends BaseServiceImpl implements Work
@Autowired
private CuringParamsService curingGlobalParamsService;

@Autowired
private TaskInstanceContextDao taskInstanceContextDao;

/**
* return top n SUCCESS workflow instance order by running time which started between startTime and endTime
*/
Expand All @@ -184,7 +194,7 @@ public Map<String, Object> queryTopNLongestRunningWorkflowInstance(User loginUse
// check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode,
ApiFuncIdentificationConstant.WORKFLOW_INSTANCE);
WORKFLOW_INSTANCE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
Expand Down Expand Up @@ -233,7 +243,7 @@ public Map<String, Object> queryWorkflowInstanceById(User loginUser, long projec
// check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode,
ApiFuncIdentificationConstant.WORKFLOW_INSTANCE);
WORKFLOW_INSTANCE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
Expand All @@ -245,7 +255,7 @@ public Map<String, Object> queryWorkflowInstanceById(User loginUser, long projec
workflowInstance.getWorkflowDefinitionVersion());

if (workflowDefinition == null || projectCode != workflowDefinition.getProjectCode()) {
log.error("workflow definition does not exist, projectCode:{}.", projectCode);
log.error("workflow definition does not exist, projectCode: {}.", projectCode);
putMsg(result, Status.WORKFLOW_DEFINITION_NOT_EXIST, workflowInstanceId);
} else {
workflowInstance.setLocations(workflowDefinition.getLocations());
Expand Down Expand Up @@ -443,7 +453,7 @@ public Map<String, Object> queryTaskListByWorkflowInstanceId(User loginUser, lon
// check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode,
ApiFuncIdentificationConstant.WORKFLOW_INSTANCE);
WORKFLOW_INSTANCE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
Expand All @@ -460,15 +470,45 @@ public Map<String, Object> queryTaskListByWorkflowInstanceId(User loginUser, lon
List<TaskInstance> taskInstanceList =
taskInstanceDao.queryValidTaskListByWorkflowInstanceId(workflowInstanceId,
workflowInstance.getTestFlag());
List<TaskInstanceDependentDetails> taskInstanceDependentDetailsList =
setTaskInstanceDependentResult(taskInstanceList);

Map<String, Object> resultMap = new HashMap<>();
resultMap.put(WORKFLOW_INSTANCE_STATE, workflowInstance.getState().toString());
resultMap.put(TASK_LIST, taskInstanceList);
resultMap.put(TASK_LIST, taskInstanceDependentDetailsList);
result.put(DATA_LIST, resultMap);

putMsg(result, Status.SUCCESS);
return result;
}

private List<TaskInstanceDependentDetails> setTaskInstanceDependentResult(List<TaskInstance> taskInstanceList) {
ruanwenjun marked this conversation as resolved.
Show resolved Hide resolved
List<TaskInstanceDependentDetails> taskInstanceDependentDetailsList = taskInstanceList.stream()
.map(taskInstance -> {
TaskInstanceDependentDetails taskInstanceDependentDetails = new TaskInstanceDependentDetails();
BeanUtils.copyProperties(taskInstance, taskInstanceDependentDetails);
return taskInstanceDependentDetails;
}).collect(Collectors.toList());
List<Integer> taskInstanceIdList = taskInstanceList.stream()
.map(TaskInstance::getId).collect(Collectors.toList());
List<TaskInstanceContext> taskInstanceContextList =
taskInstanceContextDao.batchQueryByTaskInstanceIdsAndContextType(taskInstanceIdList,
ContextType.DEPENDENT_RESULT_CONTEXT);
for (TaskInstanceContext taskInstanceContext : taskInstanceContextList) {
for (AbstractTaskInstanceContext dependentResultTaskInstanceContext : taskInstanceContext
.getTaskInstanceContext()) {
for (TaskInstanceDependentDetails taskInstanceDependentDetails : taskInstanceDependentDetailsList) {
if (taskInstanceDependentDetails.getId().equals(taskInstanceContext.getTaskInstanceId())) {
taskInstanceDependentDetails
.setTaskInstanceDependentResult(
(DependentResultTaskInstanceContext) dependentResultTaskInstanceContext);
}
}
}
}
return taskInstanceDependentDetailsList;
}

@Override
public List<DynamicSubWorkflowDto> queryDynamicSubWorkflowInstances(User loginUser, Integer taskId) {
TaskInstance taskInstance = taskInstanceDao.queryById(taskId);
Expand All @@ -488,7 +528,7 @@ public List<DynamicSubWorkflowDto> queryDynamicSubWorkflowInstances(User loginUs
.queryAllSubWorkflowInstance((long) taskInstance.getWorkflowInstanceId(),
taskInstance.getTaskCode());
List<Long> allSubWorkflowInstanceId = relationSubWorkflows.stream()
.map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(java.util.stream.Collectors.toList());
.map(RelationSubWorkflow::getSubWorkflowInstanceId).collect(Collectors.toList());
List<WorkflowInstance> allSubWorkflows = workflowInstanceDao.queryByIds(allSubWorkflowInstanceId);

if (allSubWorkflows == null || allSubWorkflows.isEmpty()) {
Expand Down Expand Up @@ -539,7 +579,7 @@ public Map<String, Object> querySubWorkflowInstanceByTaskId(User loginUser, long
// check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode,
ApiFuncIdentificationConstant.WORKFLOW_INSTANCE);
WORKFLOW_INSTANCE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
Expand Down Expand Up @@ -693,7 +733,7 @@ public Map<String, Object> updateWorkflowInstance(User loginUser, long projectCo
"Update task relations complete, projectCode:{}, workflowDefinitionCode:{}, workflowDefinitionVersion:{}.",
projectCode, workflowDefinition.getCode(), insertVersion);
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, workflowDefinition);
result.put(DATA_LIST, workflowDefinition);
} else {
log.info(
"Update task relations error, projectCode:{}, workflowDefinitionCode:{}, workflowDefinitionVersion:{}.",
Expand Down Expand Up @@ -750,7 +790,7 @@ public Map<String, Object> queryParentInstanceBySubId(User loginUser, long proje
// check user access for project
Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode,
ApiFuncIdentificationConstant.WORKFLOW_INSTANCE);
WORKFLOW_INSTANCE);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
Expand Down Expand Up @@ -824,7 +864,7 @@ public Map<String, Object> viewVariables(long projectCode, Integer workflowInsta
if (workflowInstance == null) {
log.error("workflow instance does not exist, projectCode:{}, workflowInstanceId:{}.", projectCode,
workflowInstanceId);
putMsg(result, Status.WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId);
putMsg(result, WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId);
return result;
}

Expand Down Expand Up @@ -918,7 +958,7 @@ public Map<String, Object> viewGantt(long projectCode, Integer workflowInstanceI
if (workflowInstance == null) {
log.error("workflow instance does not exist, projectCode:{}, workflowInstanceId:{}.", projectCode,
workflowInstanceId);
putMsg(result, Status.WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId);
putMsg(result, WORKFLOW_INSTANCE_NOT_EXIST, workflowInstanceId);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ContextType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
Expand All @@ -42,10 +43,12 @@
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.DependentResultTaskInstanceContext;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstanceContext;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
Expand All @@ -60,10 +63,12 @@
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceContextDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceMapDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
import org.apache.dolphinscheduler.service.model.TaskNode;
Expand All @@ -90,6 +95,7 @@
import org.mockito.quality.Strictness;

import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.google.common.collect.Lists;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
Expand Down Expand Up @@ -154,6 +160,9 @@ public class WorkflowInstanceServiceTest {
@Mock
private WorkflowInstanceMapDao workflowInstanceMapDao;

@Mock
private TaskInstanceContextDao taskInstanceContextDao;

private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
+ "\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
+ "\"preTaskVersion\":1,\"postTaskCode\":123451234,\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"}]";
Expand Down Expand Up @@ -465,6 +474,18 @@ public void testQueryTaskListByWorkflowInstanceId() throws IOException {
taskInstance.setTaskType("SHELL");
List<TaskInstance> taskInstanceList = new ArrayList<>();
taskInstanceList.add(taskInstance);
List<DependentResultTaskInstanceContext> dependentResultTaskInstanceContextList = new ArrayList<>();
TaskInstanceContext taskInstanceContext = new TaskInstanceContext();
taskInstanceContext.setTaskInstanceId(0);
taskInstanceContext.setContextType(ContextType.DEPENDENT_RESULT_CONTEXT);
DependentResultTaskInstanceContext dependentResultTaskInstanceContext =
new DependentResultTaskInstanceContext();
dependentResultTaskInstanceContext.setProjectCode(projectCode);
dependentResultTaskInstanceContext.setDependentResult(DependResult.SUCCESS);
taskInstanceContext.setTaskInstanceContext(
Lists.asList(dependentResultTaskInstanceContext, new DependentResultTaskInstanceContext[0]));
List<Integer> taskInstanceIdList = new ArrayList<>();
taskInstanceIdList.add(0);
Result res = new Result();
res.setCode(Status.SUCCESS.ordinal());
res.setData("xxx");
Expand All @@ -476,6 +497,9 @@ public void testQueryTaskListByWorkflowInstanceId() throws IOException {
workflowInstance.getTestFlag()))
.thenReturn(taskInstanceList);
when(loggerService.queryLog(loginUser, taskInstance.getId(), 0, 4098)).thenReturn(res);
when(taskInstanceContextDao.batchQueryByTaskInstanceIdsAndContextType(taskInstanceIdList,
ContextType.DEPENDENT_RESULT_CONTEXT))
.thenReturn(Lists.asList(taskInstanceContext, new TaskInstanceContext[0]));
Map<String, Object> successRes =
workflowInstanceService.queryTaskListByWorkflowInstanceId(loginUser, projectCode, 1);
Assertions.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,13 @@ public final class Constants {
public static final String SUBWORKFLOW_INSTANCE_ID = "subWorkflowInstanceId";
public static final String WORKFLOW_INSTANCE_STATE = "workflowInstanceState";
public static final String PARENT_WORKFLOW_INSTANCE = "parentWorkflowInstance";
public static final String DEPENDENCE = "dependence";
public static final String TASK_LIST = "taskList";
public static final String QUEUE = "queue";
public static final String QUEUE_NAME = "queueName";
public static final String DEPENDENT_SPLIT = ":||";

/**
* dependent task
*/
public static final long DEPENDENT_ALL_TASK_CODE = -1;
public static final long DEPENDENT_WORKFLOW_CODE = 0;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.common.enums;

import lombok.Getter;

@Getter
public enum ContextType {

DEPENDENT_RESULT_CONTEXT, RUNTIME_CONTEXT;
ruanwenjun marked this conversation as resolved.
Show resolved Hide resolved

public static ContextType of(String name) {
for (ContextType contextType : values()) {
if (contextType.name().equalsIgnoreCase(name)) {
return contextType;
}
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.dao.entity;

import org.apache.dolphinscheduler.common.enums.ContextType;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public abstract class AbstractTaskInstanceContext implements ITaskInstanceContext {

private ContextType contextType;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.dao.entity;

import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

@EqualsAndHashCode(callSuper = true)

Check notice

Code scanning / CodeQL

Missing Override annotation Note

This method overrides
AbstractTaskInstanceContext.canEqual
; it is advisable to add an Override annotation.
@Data
@NoArgsConstructor
public class DependentResultTaskInstanceContext extends AbstractTaskInstanceContext {

private Long projectCode;

private Long workflowDefinitionCode;

private Long taskDefinitionCode;

private String dateCycle;

private DependResult dependentResult;

}
Loading
Loading