vlambda博客
学习文章列表

一个 DAG 工作流引擎的设计实现源代码实例

任务Task

package com.bytedance.ecop.kunlun.engine.scheduler;

import java.util.UUID;

import static java.lang.Thread.sleep;

public class Task implements Executor {
private Long id;
private String name;
private int state;
private long timeout;

public Task() {
}

public Task(Long id, String name, int state, long timeout) {
this.id = id;
this.name = name;
this.state = state;
this.timeout = timeout;
}

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getState() {
return state;
}

public void setState(int state) {
this.state = state;
}

public long getTimeout() {
return timeout;
}

public void setTimeout(long timeout) {
this.timeout = timeout;
}

public boolean execute(TaskCallBack callBack) {
System.out.println("Task id: [" + id + "], " + "task name: [" + name + "] is running");
state = 1;
try {
sleep(3000L);
} catch (InterruptedException e) {
}

TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskId(id);
String taskInstanceId = UUID.randomUUID().toString();
taskInstance.setTaskInstanceId(taskInstanceId);

TaskInstanceResult taskInstanceResult = new TaskInstanceResult("Task[" + id + "], taskInstanceId=" + taskInstanceId + " TaskInstanceResult = " + UUID.randomUUID());
callBack.invoke(taskInstanceResult);
return true;
}

public boolean hasExecuted() {
return state == 1;
}
}



package com.bytedance.ecop.kunlun.engine.scheduler;

public interface Executor {
boolean execute(TaskCallBack callBack);
}


package com.bytedance.ecop.kunlun.engine.scheduler;

public interface TaskCallBack {
Object invoke(TaskInstanceResult result);
}


任务运行实例TaskInstance

package com.bytedance.ecop.kunlun.engine.scheduler;

public class TaskInstance {
private String taskInstanceId;
private Long taskId;
private String name;
private int state;

public TaskInstance() {
}

public TaskInstance(String taskInstanceId, Long taskId, String name, int state) {
this.taskInstanceId = taskInstanceId;
this.taskId = taskId;
this.name = name;
this.state = state;
}

public String getTaskInstanceId() {
return taskInstanceId;
}

public void setTaskInstanceId(String taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}

public Long getTaskId() {
return taskId;
}

public void setTaskId(Long taskId) {
this.taskId = taskId;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getState() {
return state;
}

public void setState(int state) {
this.state = state;
}
}


package com.bytedance.ecop.kunlun.engine.scheduler;

public class TaskInstanceResult {
String resultJson;

@Override
public String toString() {
return "TaskInstanceResult{" +
"resultJson='" + resultJson + '\'' +
'}';
}

public TaskInstanceResult(String resultJson) {
this.resultJson = resultJson;
}

public String getResultJson() {
return resultJson;
}

public void setResultJson(String resultJson) {
this.resultJson = resultJson;
}
}

DAG流程定义 Process

package com.bytedance.ecop.kunlun.engine.scheduler;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* DAG工作流
*/

public class Process {
private Long graphId;
private Set<Task> tasks;
private Map<Task, Set<Task>> map;

public Process() {
this.tasks = new HashSet<Task>();
this.map = new HashMap<Task, Set<Task>>();
}

public void addEdge(Task task, Task prev) {
if (!tasks.contains(task) || !tasks.contains(prev)) {
throw new IllegalArgumentException();
}
Set<Task> prevs = map.get(task);
if (prevs == null) {
prevs = new HashSet<Task>();
map.put(task, prevs);
}
if (prevs.contains(prev)) {
throw new IllegalArgumentException();
}
prevs.add(prev);
}

public void addTask(Task task) {
if (tasks.contains(task)) {
throw new IllegalArgumentException();
}
tasks.add(task);
}

public void remove(Task task) {
if (!tasks.contains(task)) {
return;
}
if (map.containsKey(task)) {
map.remove(task);
}
for (Set<Task> set : map.values()) {
if (set.contains(task)) {
set.remove(task);
}
}
}

public Set<Task> getTasks() {
return tasks;
}

public void setTasks(Set<Task> tasks) {
this.tasks = tasks;
}

public Map<Task, Set<Task>> getMap() {
return map;
}

public void setMap(Map<Task, Set<Task>> map) {
this.map = map;
}
}

DAG流程实例 ProcessInstance

package com.bytedance.ecop.kunlun.engine.scheduler;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* DAG工作流
*/

public class ProcessInstance {
Long processId;
String processInstanceId;

public ProcessInstance(Long processId, String processInstanceId) {
this.processId = processId;
this.processInstanceId = processInstanceId;
}

public Long getProcessId() {
return processId;
}

public void setProcessId(Long processId) {
this.processId = processId;
}

public String getProcessInstanceId() {
return processInstanceId;
}

public void setProcessInstanceId(String processInstanceId) {
this.processInstanceId = processInstanceId;
}
}

DAG工作流程调度器

package com.bytedance.ecop.kunlun.engine.scheduler;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

public class Scheduler {

public void schedule(Process process) {
while (true) {
// 1、构建 todoTaskList
List<Task> todoTaskList = new ArrayList<Task>();

for (Task task : process.getTasks()) {
if (!task.hasExecuted()) {
Set<Task> prevs = process.getMap().get(task);
if (prevs != null && !prevs.isEmpty()) {
boolean toAdd = true;
for (Task tsk : prevs) {
if (!tsk.hasExecuted()) {
toAdd = false;
break;
}
}
if (toAdd) {
todoTaskList.add(task);
}
} else {
todoTaskList.add(task);
}
}
}

// 2.执行 todoTaskList
if (!todoTaskList.isEmpty()) {
for (Task task : todoTaskList) {

task.execute(new TaskCallBack() {
@Override
public Object invoke(TaskInstanceResult taskInstanceResult) {
System.out.println(taskInstanceResult);
return null;
}
});

}
} else {
break;
}

}
}

}

测试运行效果

package com.bytedance.ecop.kunlun.engine.scheduler;

public class MainApplication {

public static void main(String[] args) {
// 创建工作流
Process process = new Process();
// 注册任务
Task task1 = new Task(1L, "task1", 0, -1);
Task task2 = new Task(2L, "task2", 0, -1);
Task task3 = new Task(3L, "task3", 0, -1);
Task task4 = new Task(4L, "task4", 0, -1);
Task task5 = new Task(5L, "task5", 0, -1);
Task task6 = new Task(6L, "task6", 0, -1);

process.addTask(task1);
process.addTask(task2);
process.addTask(task3);
process.addTask(task4);
process.addTask(task5);
process.addTask(task6);

process.addEdge(task1, task2);
process.addEdge(task1, task5);
process.addEdge(task6, task2);
process.addEdge(task2, task3);
process.addEdge(task2, task4);

// 创建调度器,执行DAG调度
Scheduler scheduler = new Scheduler();
scheduler.schedule(process);
}

}

Task id: [4], task name: [task4] is running
TaskInstanceResult{resultJson='Task[4], taskInstanceId=b9fc6d44-d564-4131-8995-debf9a90f954 TaskInstanceResult = 37f41979-65b8-4c2d-baf1-b13176c19d02'}
Task id: [5], task name: [task5] is running
TaskInstanceResult{resultJson='Task[5], taskInstanceId=3da4db7f-4a63-482d-900c-64db48030627 TaskInstanceResult = 4f71de6d-8655-4d0f-8a64-c3c8ce8bdd23'}
Task id: [3], task name: [task3] is running
TaskInstanceResult{resultJson='Task[3], taskInstanceId=145a130a-5e39-41ea-98f3-87f62a8dfff8 TaskInstanceResult = d8746617-1df2-46f8-a129-d75070478228'}
Task id: [2], task name: [task2] is running
TaskInstanceResult{resultJson='Task[2], taskInstanceId=f7640b21-58b1-45bd-a886-aac425644d8e TaskInstanceResult = 7182f6c6-6395-4046-b50c-0df19f6de4ae'}
Task id: [1], task name: [task1] is running
TaskInstanceResult{resultJson='Task[1], taskInstanceId=be9ac46f-df32-4da8-b54c-74698c39d309 TaskInstanceResult = ac78327d-8825-4651-b309-c1bb5da6416c'}
Task id: [6], task name: [task6] is running
TaskInstanceResult{resultJson='Task[6], taskInstanceId=cb138e08-6ebe-4b4b-87ab-a1b230a06740 TaskInstanceResult = c0be1628-3a0c-4bfd-b863-500ca512ce80'}

Process finished with exit code 0