开源工作流引擎 Workflow Core 的研究和使用教程
一,工作流对象和使用前说明
1,IWorkflow / IWorkflowBuilder
2,EndWorkflow
3,容器
4,工作流的步骤点
二,IStepBuilder 节点
1,设置属性的方法
2,设置数据
3,Input / Output
三,工作流节点的逻辑和操作
1,CompensateWith
2,CompensateWithSequence
3,OnError
1,Delay
2,Schedule
3,Recur
1,End
2,CancelCondition
1,WaitFor
1,Then
2,Attach
1,Saga
容器操作
普通节点
事件
条件体和循环体
节点的异步或多线程
用于事务的操作
四,条件或开关
1,Parallel
1,When
2,While
3,If
1,ForEach
迭代
条件判断
节点并发
五,其它
开源工作流引擎 Workflow Core 的研究和使用教程
一,工作流对象和使用前说明
为了避免歧义,事先约定。
工作流有很多节点组成,一个节点成为步骤点(Step)。
1,IWorkflow / IWorkflowBuilder
Workflow Core 中,用于构建工作流的类继承 IWorkflow
,代表一条有任务规则的工作流,可以表示工作流任务的开始或者 Do() 方法,或工作流分支获取其它方法。
IWorkflow 有两个同名接口:
public interface IWorkflow<TData>
where TData : new()
{
string Id { get; }
int Version { get; }
void Build(IWorkflowBuilder<TData> builder);
}
public interface IWorkflow : IWorkflow<object>
{
}
Id:此工作流的唯一标识符;
Version:此工作流的版本。
void Build
:在此方法内构建工作流。
工作流运作过程中,可以传递数据。有两种传递方法:使用泛型,从运行工作流时就要传入;使用 object 简单类型,由单独的步骤产生并且传递给下一个节点。
IWorkflowBuilder 是工作流对象,构建一个具有逻辑规则的工作流。可以构建复杂的、具有循环、判断的工作流规则,或者并行或者异步处理工作流任务。
一个简单的工作流规则:
public class DeferSampleWorkflow : IWorkflow
{
public string Id => "DeferSampleWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<object> builder)
{
builder
.StartWith(context =>
{
// 开始工作流任务
Console.WriteLine("Workflow started");
return ExecutionResult.Next();
})
.Then<SleepStep>()
.Input(step => step.Period, data => TimeSpan.FromSeconds(20))
.Then(context =>
{
Console.WriteLine("workflow complete");
return ExecutionResult.Next();
});
}
}
2,EndWorkflow
此对象表示当前工作流任务已经结束,可以表示主工作流或者工作流分支任务的完成。
/// Ends the workflow and marks it as complete
IStepBuilder<TData, TStepBody> EndWorkflow();
因为工作流是可以出现分支的,每个工作流各自独立工作,每个分支都有其生命周期。
3,容器
ForEach
、While
、If
、When
、Schedule
、Recur
是步骤容器。都返回IContainerStepBuilder<TData, Schedule, TStepBody>
Parallel、Saga是步骤的容器,都返回 IStepBuilder<TData, Sequence>
。
ForEach、While、If、When、Schedule、Recur 返回类型的接口:
public interface IContainerStepBuilder<TData, TStepBody, TReturnStep>
where TStepBody : IStepBody
where TReturnStep : IStepBody
{
/// The block of steps to execute
IStepBuilder<TData, TReturnStep> Do(Action<IWorkflowBuilder<TData>> builder);
Parallel、Saga :
/// Execute multiple blocks of steps in parallel
IParallelStepBuilder<TData, Sequence> Parallel();
/// Execute a sequence of steps in a container
IStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);
也就是说,ForEach、While、If、When、Schedule、Recur 是真正的容器。
按照我的理解,继承了 IContainerStepBuilder
的,是一个容器,一个流程下的一个步骤/容器;因为 Workflow Core 作者对接口的命名很明显表达了 This a container
。
因为里面包含了一组操作,可以说是一个步骤里面包含了一个流程,这个流程由一系列操作组成,它是线性的,是顺序的。里面是一条工作流(Workflow)。
而 Parllel、Saga,相当于步骤点的容器。
更直观的理解是电路,继承 IContainerStepBuilder 的是串联设备的容器,是顺序的;
Parllel 是并联电路/设备的一个容器,它既是一个开关,使得一条电路变成多条并流的电路,又包含了这些电路的电器。里面可以产生多条工作流,是多分支的、不同步的、独立的。
从实现接口上看,ForEach、While、If、When、Schedule、Recur、Parllel 都实现了 Do()
方法,而 Saga 没有实现。
关于 Saga,后面说明。
4,工作流的步骤点
实现接口如下:
IStepBuilder<TData, TStep> StartWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;
IStepBuilder<TData, InlineStepBody> StartWith(Func<IStepExecutionContext, ExecutionResult> body);
IStepBuilder<TData, ActionStepBody> StartWith(Action<IStepExecutionContext> body);
IEnumerable<WorkflowStep> GetUpstreamSteps(int id);
IWorkflowBuilder<TData> UseDefaultErrorBehavior(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);
方法名称 | 说明 |
---|---|
StartWith | 任务的开始,必须调用此方法 |
GetUpstreamSteps | 获取上一个步骤(StepBody)的ID |
UseDefaultErrorBehavior | 不详 |
StepBody 是一个节点,IStepBuilder 构建一个节点,只有通过 StartWith,才能开始一个工作流、一个分支、异步任务等。
UseDefaultErrorBehavior
笔者没有使用到,不敢瞎说。貌似与事务有关,当一个步骤点发生异常时,可以终止、重试等。
二,IStepBuilder 节点
IStepBuilder 表示一个节点,或者说一个容器,里面可以含有其它操作,例如并行、异步、循环等。
1,设置属性的方法
Name:设置此步骤点的名称;
id:步骤点的唯一标识符。
/// Specifies a display name for the step
IStepBuilder<TData, TStepBody> Name(string name);
/// Specifies a custom Id to reference this step
IStepBuilder<TData, TStepBody> Id(string id);
2,设置数据
前面说到,工作流每个步骤点传递数据有两种方式。
TData(泛型) 是工作流中,随着流传的数据,这个对象会在整个工作流程生存。
例如 Mydata
class RecurSampleWorkflow : IWorkflow<MyData>
{
public string Id => "recur-sample";
public int Version => 1;
public void Build(IWorkflowBuilder<MyData> builder)
{
...
}
}
public class MyData
{
public int Counter { get; set; }
}
3,Input / Output
为当前步骤点(StepBody)设置数据,亦可为 TData 设置数据。
两类数据:每个步骤点都可以拥有很多字段、属性和方法等;工作流流转 TData。
Input、Output 是设置这些数据的具体方法。
IStepBuilder<TData, TStepBody> Input<TInput>(Expression<Func<TStepBody, TInput>> stepProperty, Expression<Func<TData, TInput>> value);
IStepBuilder<TData, TStepBody> Input<TInput>(Expression<Func<TStepBody, TInput>> stepProperty, Expression<Func<TData, IStepExecutionContext, TInput>> value);
IStepBuilder<TData, TStepBody> Input(Action<TStepBody, TData> action);
IStepBuilder<TData, TStepBody> Output<TOutput>(Expression<Func<TData, TOutput>> dataProperty, Expression<Func<TStepBody, object>> value);
三,工作流节点的逻辑和操作
容器操作
1,Saga
用于在容器中执行一系列操作。
/// Execute a sequence of steps in a container
IStepBuilder<TData, Sequence> Saga(Action<IWorkflowBuilder<TData>> builder);
虽然注释说明 “用于在容器中执行一系列操作”,但实际上它不是一个真正的”容器“。
因为它没有继承 IContainerStepBuilder
,也没有实现 Do()
。
但是它返回的 Sequence
实现了ContainerStepBody
。
如果说真正的容器相当于一条长河流中的一个湖泊(可以容纳和储水),而 Saga 可能只是某一段河流的命名,而不是具体的湖泊。
或者说 static void Main(string[] args)
里面的代码太多了,新建一个方法体,把部分代码放进去。总不能把所有代码写在一个方法里吧?那么创建一个类,把代码分成多个部分,放到不同方法中,增强可读性。本质还是没有变。
Saga 可以用来处理事务,进行重试或回滚等操作。后面说明。
普通节点
1,Then
用于创建下一个节点,创建一个普通节点。可以是主工作流的节点(最外层)、或者作为循环、条件节点里的节点、作为节点中节点的节点。
IStepBuilder<TData, TStep> Then<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;
IStepBuilder<TData, TStep> Then<TStep>(IStepBuilder<TData, TStep> newStep) where TStep : IStepBody;
IStepBuilder<TData, InlineStepBody> Then(Func<IStepExecutionContext, ExecutionResult> body);
IStepBuilder<TData, ActionStepBody> Then(Action<IStepExecutionContext> body);
2,Attach
Then 作为普通节点,按顺序执行。操作对象是类型、StepBody。
Attach 也是普通节点,无特殊意义,通过 id 来指定要执行 StepBody 。可以作为流程控制的跳转。
相当于 goto 语句。
/// Specify the next step in the workflow by Id
IStepBuilder<TData, TStepBody> Attach(string id);
事件
1,WaitFor
用于定义事件,将当前节点作为事件节点,然后在后台挂起,工作流会接着执行下一个节点。在工作流停止前,可以通过指定 标识符(Id) 触发事件。在一个工作流中,每个事件的标识符都是唯一的。
IStepBuilder<TData, WaitFor> WaitFor(string eventName, Expression<Func<TData, string>> eventKey, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);
IStepBuilder<TData, WaitFor> WaitFor(string eventName, Expression<Func<TData, IStepExecutionContext, string>> eventKey, Expression<Func<TData, DateTime>> effectiveDate = null, Expression<Func<TData, bool>> cancelCondition = null);
条件体和循环体
1,End
意思应该是结束一个节点的运行。
如果在 When 中使用,相当于 break。
IStepBuilder<TData, TStep> End<TStep>(string name) where TStep : IStepBody;
使用例子
builder
.StartWith<RandomOutput>(x => x.Name("Random Step"))
.When(0)
.Then<TaskA>()
.Then<TaskB>()
.End<RandomOutput>("Random Step")
.When(1)
.Then<TaskC>()
.Then<TaskD>()
.End<RandomOutput>("Random Step");
2,CancelCondition
在一个条件下过早地取消此步骤的执行。
应该相当于 contiune。
/// Prematurely cancel the execution of this step on a condition
IStepBuilder<TData, TStepBody> CancelCondition(Expression<Func<TData, bool>> cancelCondition, bool proceedAfterCancel = false);
节点的异步或多线程
1,Delay
延迟执行,使得当前节点延时执行。并非是阻塞当前的工作流运行。Delay 跟在节点后面,使得这个节点延时运行。可以理解成异步,工作流不会等待此节点执行完毕,会直接执行下一个节点/步骤。
/// Wait for a specified period
IStepBuilder<TData, Delay> Delay(Expression<Func<TData, TimeSpan>> period);
2,Schedule
预定执行。将当前节点设置一个时间,将在一段时间后执行。Schedule 不会阻塞工作流。
Schedule 是非阻塞的,工作流不会等待Schedule执行完毕,会直接执行下一个节点/步骤。
/// Schedule a block of steps to execute in parallel sometime in the future
IContainerStepBuilder<TData, Schedule, TStepBody> Schedule(Expression<Func<TData, TimeSpan>> time);
例子
builder
.StartWith(context => Console.WriteLine("Hello"))
.Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
.StartWith(context => Console.WriteLine("Doing scheduled tasks"))
)
.Then(context => Console.WriteLine("Doing normal tasks"));
3,Recur
用于重复执行某个节点,直至条件不符。
Recur 是非阻塞的,工作流不会等待 Rezur 执行完毕,会直接执行下一个节点/步骤。
/// Schedule a block of steps to execute in parallel sometime in the future at a recurring interval
IContainerStepBuilder<TData, Recur, TStepBody> Recur(Expression<Func<TData, TimeSpan>> interval, Expression<Func<TData, bool>> until);
用于事务的操作
相当于数据库中的事务,流程中某些步骤发生异常时的时候执行某些操作。
例如:
builder
.StartWith(context => Console.WriteLine("Begin"))
.Saga(saga => saga
.StartWith<Task1>()
.CompensateWith<UndoTask1>()
.Then<Task2>()
.CompensateWith<UndoTask2>()
.Then<Task3>()
.CompensateWith<UndoTask3>()
)
.OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5))
.Then(context => Console.WriteLine("End"));
1,CompensateWith
如果此步骤引发未处理的异常,则撤消步骤;如果发生异常,则执行。
可以作为节点的 B计划。当节点执行任务没有问题时, CompensateWith 不会运行;如果节点发生错误,就会按一定要求执行 CompensateWith 。
/// Undo step if unhandled exception is thrown by this step
IStepBuilder<TData, TStepBody> CompensateWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;
IStepBuilder<TData, TStepBody> CompensateWith(Func<IStepExecutionContext, ExecutionResult> body);
IStepBuilder<TData, TStepBody> CompensateWith(Action<IStepExecutionContext> body);
2,CompensateWithSequence
如果此步骤引发未处理的异常,则撤消步骤;如果发生异常,则执行。与 CompensateWith 的区别是,传入参数前者是 Func,后者是 Action。
CompensateWith
的内部实现了 CompensateWith
,是对 CompensateWith
的封装。
/// Undo step if unhandled exception is thrown by this step
IStepBuilder<TData, TStepBody> CompensateWithSequence(Action<IWorkflowBuilder<TData>> builder);
3,OnError
用于事务操作,表示发生错误时如果回滚、设置时间等。一般与 Saga 一起使用。
OnError 是阻塞的。
/// Configure the behavior when this step throws an unhandled exception
IStepBuilder<TData, TStepBody> OnError(WorkflowErrorHandling behavior, TimeSpan? retryInterval = null);
OnError 可以捕获一个容器内,某个节点的异常,并执行回滚操作。如果直接在节点上使用而不是容器,可以发生回滚,然后执行下个节点。如果作用于容器,那么可以让容器进行重新运行,等一系列操作。
OnError 可以与 When、While 等节点容器一起使用,但他们本身带有循环功能,使用事务会让代码逻辑变得奇怪。
Saga 没有条件判断、没有循环,本身就是一个简单的袋子,是节点的容器。因此使用 Saga 作为事务操作的容器,十分适合,进行回滚、重试等一系列操作。
四,条件或开关
迭代
1,ForEach
迭代,也可以说是循环。内部使用 IEnumerable 来实现。
与 C# 中 Foreach 的区别是,C# 中是用来迭代数据;
而工作流中 ForEach 用来判断元素个数,标识应该循环多少次。
ForEach 是阻塞的。
/// Execute a block of steps, once for each item in a collection in a parallel foreach
IContainerStepBuilder<TData, Foreach, Foreach> ForEach(Expression<Func<TData, IEnumerable>> collection);
示例
builder
.StartWith<SayHello>()
.ForEach(data => new List<int>() { 1, 2, 3, 4 })
.Do(x => x
.StartWith<DisplayContext>()
.Input(step => step.Item, (data, context) => context.Item)
.Then<DoSomething>())
.Then<SayGoodbye>();
最终会循环5次。
条件判断
1,When
条件判断,条件是否真。
When 是阻塞的。
When 可以捕获上一个节点流转的数据(非 TData)。
/// Configure an outcome for this step, then wire it to another step
[Obsolete]
IStepOutcomeBuilder<TData> When(object outcomeValue, string label = null);
/// Configure an outcome for this step, then wire it to a sequence
IContainerStepBuilder<TData, When, OutcomeSwitch> When(Expression<Func<TData, object>> outcomeValue, string label = null);
前一个方法例如
When(0)
,会捕获 return ExecutionResult.Outcome(value);
的值,判断是否相等。但是这种方式已经过时。
需要使用表达式来判断。例如
.When(data => 1)
.When(data => data.value==1)
2,While
条件判断,条件是否真。与When有区别,When可以捕获 ExecutionResult.Outcome(value);
。
While 是阻塞的。
/// Repeat a block of steps until a condition becomes true
IContainerStepBuilder<TData, While, While> While(Expression<Func<TData, bool>> condition);
示例
builder
.StartWith<SayHello>()
.While(data => data.Counter < 3)
.Do(x => x
.StartWith<DoSomething>()
.Then<IncrementStep>()
.Input(step => step.Value1, data => data.Counter)
.Output(data => data.Counter, step => step.Value2))
.Then<SayGoodbye>();
3,If
条件判断,是否符合条件。
If是阻塞的。
/// Execute a block of steps if a condition is true
IContainerStepBuilder<TData, If, If> If(Expression<Func<TData, bool>> condition);
When、While、If的区别是,When、While 是条件是否为真,If是表达式是否为真。
实质上,是语言上的区别,与代码逻辑无关。
真假用 When/While,条件判断、表达式判断用 If 。
节点并发
1,Parallel
并行任务。作为容器,可以在里面设置多组任务,这些任务将会同时、并发运行。
Parallel 是阻塞的。
/// Execute multiple blocks of steps in parallel
IParallelStepBuilder<TData, Sequence> Parallel();
示例:
.StartWith<SayHello>()
.Parallel()
.Do(then =>
then.StartWith<PrintMessage>()
.Input(step => step.Message, data => "Item 1.1")
.Then<PrintMessage>()
.Input(step => step.Message, data => "Item 1.2"))
.Do(then =>
then.StartWith<PrintMessage>()
.Input(step => step.Message, data => "Item 2.1")
.Then<PrintMessage>()
.Input(step => step.Message, data => "Item 2.2")
.Then<PrintMessage>()
.Input(step => step.Message, data => "Item 2.3"))
.Do(then =>
then.StartWith<PrintMessage>()
.Input(step => step.Message, data => "Item 3.1")
.Then<PrintMessage>()
.Input(step => step.Message, data => "Item 3.2"))
.Join()
.Then<SayGoodbye>();
有三个 Do,代表三个并行任务。三个 Do 是并行的,Do 内的代码,会按顺序执行。
Paeallel 的 Do:
public interface IParallelStepBuilder<TData, TStepBody>
where TStepBody : IStepBody
{
IParallelStepBuilder<TData, TStepBody> Do(Action<IWorkflowBuilder<TData>> builder);
IStepBuilder<TData, Sequence> Join();
}
比起 ForEach、When、While、If,除了有 Do,还有 Join 方法。
对于其它节点类型来说,Do直接构建节点。
对于Parallel来说,Do收集任务,最终需要Join来构建节点和运行任务。
五,其它
写得长不好看,其它内容压缩一下。
数据传递和依赖注入
Workflow Core 支持对每个步骤点进行依赖注入。
支持数据持久化
Workflow Core 支持将构建的工作流存储到数据库中,以便以后再次调用。
支持 Sql Server、Mysql、SQLite、PostgreSQL、Redis、MongoDB、AWS、Azure、
Elasticsearch、RabbitMQ... ....
支持动态调用和动态生成工作流
你可以通过 C# 代码构建工作流,或者通过 Json、Yaml 动态构建工作流。
可以利用可视化设计器,将逻辑和任务生成配置文件,然后动态传递,使用 Workflow Core 动态创建工作流。
篇幅有限,不再赘述。
有兴趣请关注 Workflow Core:https://github.com/danielgerlag/workflow-core