vlambda博客
学习文章列表

最新 Flink 1.13 的 Flink CEP 复杂事件处理快速入手、详细教程

Flink CEP

Flink CEP一、概述二、Pattern API1. 个体模式2. 组合模式3. 模式组4. 匹配后跳过策略三、模式的检测处理1. 将模式应用到流上2. 处理匹配事件3. 处理超时事件4. 处理迟到数据5. CEP 的状态机实现

一、概述

CEP,其实就是“复杂事件处理(Complex Event Processing)”的缩写;而 Flink CEP,就是 Flink 实现的一个用于复杂事件处理的库(library)。把事件流中的一个个简单事件,通过一定的规则匹配组合起来,这就是“复杂事件”;然后基于这些满足规则的一组组复杂事件进行转换处理,得到想要的结果进行输出。

CEP 做的事其实就是在流上进行模式匹配。主要用在风控管理【人工进一步判定用户是否有违规操作】、用户画像【行为轨迹画像特定行为习惯】和运维监控【灵活配置多指标、多依赖来实现更复杂的监控】。

FlinkCEP的步骤:定义一个模式(Pattern);将Pattern应用到DataStream上,检测满足规则的复杂事件,得到一个PatternStream;对 PatternStream 进行转换处理,将检测到的复杂事件提取出来,包装成报警信息输出。

二、Pattern API

1. 个体模式

模式(Pattern)其实就是将一组简单事件组合成复杂事件的“匹配规则”。把每个简单事件的匹配规则,叫作“个体模式”(Individual Pattern)。

Pattern.<LoginEvent>begin("first")    //连接词开始定义 
.where(new SimpleCondition<LoginEvent>() { //需要一个“过滤条件”
   @Override        

   public boolean filter(LoginEvent loginEvent) throws Exception {                    

            return loginEvent.eventType.equals("fail");        

  }
}) //默认情况下,其是单例模式,匹配接收一个事件;定义了量词,就成了循环模式,匹配接收多个事件。

   
//--------------------------------------------------------------      
//量词示例
// 匹配事件出现4次,或者不出现
pattern.times(4).optional();

// 匹配事件出现2, 3 或者4次,或者不出现,并且尽可能多地匹配[贪心]
pattern.times(2, 4).optional().greedy();

// 匹配事件出现1次或多次,或者不出现;并且尽可能多地匹配
pattern.oneOrMore().optional().greedy();

// 匹配事件出现2次或多次,或者不出现;并且尽可能多地匹配
pattern.timesOrMore(2).optional().greedy();


//--------------------------------------------------------------
//条件示例
// 限定子类型 当事件是 SubEvent 类型时,才可以满足当前模式 pattern 的匹配条件
pattern.subtype(SubEvent.class);

// 简单条件(Simple Conditions)
pattern.where(new SimpleCondition<Event>() {
  @Override    
  public boolean filter(Event value) {        
      return value.user.startsWith("A");
  }
});

//迭代条件(Iterative Conditions)
middle.oneOrMore() .where(new IterativeCondition<Event>() {
    @Override        
  public boolean filter(Event value, Context<Event> ctx) throws Exception {
      // 事件中的user必须以A开头
      if (!value.user.startsWith("A")){                
          return false;
      }
      int sum = value.amount;
      // 获取当前模式之前已经匹配的事件,求所有事件amount之和

      for (Event event : ctx.getEventsForPattern("middle")) {                 

                sum += event.amount;

        }
           // 在总数量小于100时,当前事件满足匹配规则,可以匹配成功            
           return sum < 100;
      }
  });

//组合条件(Combining Conditions):多个限定条件拆开组合
pattern.subtype(SubEvent.class) //逻辑与.where().where()、逻辑或.where().or()
.where(new SimpleCondition<SubEvent>() {
         @Override          
      public boolean filter(SubEvent value) {
               return ... // some condition          
          }
});

//终止条件(Stop Conditions):表示遇到某个特定事件时当前模式就不再继续循环匹配了。只与oneOrMore()或者oneOrMore().optional()结合使用。因为在这种循环模式下,把之前匹配的事件作为状态缓存起来继续等待;如果一直等下去,缓存的状态越来越多,最终会耗尽内存。故必须有个终点,当.until()指定的条件满足时,循环终止,清空状态释放内存。
pattern.oneOrMore().optional().until()

2. 组合模式

将多个个体模式组合起来的完整模式,就叫作“组合模式”(Combining Pattern),也叫作“模式序列”(Pattern Sequence)。是用诸如 begin、next、followedBy 等表示先后顺序的“连接词”将个体模式串连起来得到的。

  • 初始模式(Initial Pattern)所有的组合模式,都必须以一个“初始模式”开头;其必须通过调用 Pattern 的静态方法.begin()来创建。

    Pattern<Event, ?> start = Pattern.<Event>begin("start");

    Pattern 有两个泛型参数,第一个就是检测事件的基本类型 Event,跟 begin 指定的类型一致;第二个则是当前模式里事件的子类型,由子类型限制条件指定。用类型通配符(?)代替,就可以从上下文直接推断。

  • 近邻条件(Contiguity Conditions)模式之间的组合是通过连接词方法实现的,连接词指明了先后事件之间有着怎样的近邻关系,这就是所谓的“近邻条件”(也叫“连续性条件”)。Flink CEP 中提供了三种近邻关系:

    • 严格近邻(Strict Contiguity)匹配的事件严格地按顺序一个接一个出现,中间不会有任何其他事件。Pattern 的.next()方法。

    • 宽松近邻(Relaxed Contiguity)只关心事件发生的顺序,两个匹配的事件之间可以有其他不匹配的事件出现。对应 .followedBy()方法。

    • 非确定性宽松近邻(Non-Deterministic Relaxed Contiguity)可以重复使用之前已经匹配过的事件,对应.followedByAny()方法。

  • 其他限制条件一些否定的“连接词”,如.notNext()表示前一个模式匹配到的事件后面,不能紧跟着某种事件。.notFollowedBy()表示前一个模式匹配到的事件后面,不会出现某种事件,不能以其结尾,主要用来表示“两个事件中间不会出现某种事件”。可以模式指定时间限制,通过调用.within()方法。方法传入一个时间参数,表示第一个事件到最后一个事件之间的最大时间间隔,只有在这期间成功匹配的复杂事件才是有效的。一个模式序列中只能有一个时间限制,调用位置不限;如果多次调用则会以最小的时间间隔为准。

  • 循环模式中的近邻条件循环模式虽是个体模式,却可以匹配多个事件。默认内部采用的是宽松近邻,调用 .consecutive() 设置为严格近邻。调用 .allowCombinations() 指定非确定性宽松近邻。

3. 模式组

将模式序列作为参数传给连接词,得到的就是一个模式组。例如:

// 在start后定义严格近邻的模式序列,并重复匹配两次 
Pattern<Event, ?> strict = start.next(
Pattern.<Event>begin("next_start").where(...)
  .followedBy("next_middle").where(...)
).times(2);

4. 匹配后跳过策略

由于有循环模式和非确定性宽松近邻的存在,同一个事件有可能会重复利用,被分配到不同的匹配结果中。匹配后跳过策略就专门用来精准控制循环模式的匹配结果。这个策略可以在 Pattern 的初始模式定义中,作为 begin()的第二个参数传入。

Pattern.begin("start", AfterMatchSkipStrategy.noSkip()) .where(...)

检测事件a非确定性宽松近邻b,如果输入事件序列“a1 a2 a3 b”:

  • 不跳过(NO_SKIP):AfterMatchSkipStrategy.noSkip(),默认策略,所有可能的匹配都会输出。该模式会检测到 6 个匹配结果(a1 a2 a3 b),(a1 a2 b),(a1 b),(a2 a3 b),(a2 b),(a3 b)。

  • 跳至下一个(SKIP_TO_NEXT):AfterMatchSkipStrategy.skipToNext(),找到一个 a1 开始的最大匹配之后,跳过 a1 开始的所有其他匹配,直接从下一个 a2 开始匹配起。当然 a2 也是如此跳过其他匹配。该模式会检测到3个匹配结果(a1 a2 a3 b),(a2 a3 b),(a3 b)。跟使用.greedy()效果是相同的。

  • 跳过所有子匹配(SKIP_PAST_LAST_EVENT):AfterMatchSkipStrategy.skipPastLastEvent(),找到 a1 开始的匹配(a1 a2 a3 b)之后,直接跳过所有 a1 直到 a3 开头的匹配,相当于把这些子匹配都跳过了。该模式会检测到1个匹配结果(a1 a2 a3 b)。

  • 跳至第一个(SKIP_TO_FIRST[a]),AfterMatchSkipStrategy.skipToFirst(“a”),找到 a1 开始的匹配(a1 a2 a3 b)后,跳到以最开始一个 a(也就是 a1)为开始的匹配,相当于只留下 a1 开始的匹配。该模式会检测到3个匹配结果(a1 a2 a3 b),(a1 a2 b),(a1 b)。

  • 跳至最后一个(SKIP_TO_LAST[a])AfterMatchSkipStrategy.skipToLast(“a”),找到 a1 开始的匹配(a1 a2 a3 b)后,跳过所有 a1、a2 开始的匹配,跳到以最后一个 a(也就是 a3)为开始的匹配。该模式会检测到2个匹配结果(a1 a2 a3 b),(a3 b)。

三、模式的检测处理

定义好模式还只是整个复杂事件处理的第一步,接下来还需要将模式应用到事件流上、检测提取匹配的复杂事件并定义处理转换的方法,最终得到想要的输出信息。

以检测用户连续三次登录失败的复杂事件为例。

1. 将模式应用到流上

默认是先后顺序事件时间语义的时间戳先后顺序,若处理时间语义就是时间到达顺序,对于时间戳相同或是同时到达的事件,可传入一个比较器,进行更精确的排序。

DataStream<Event> inputStream = ...  //可以是 keyBy 得到的 KeyedStream
Pattern<Event, ?> pattern = ...  
PatternStream<Event> patternStream = CEP.pattern(inputStream, pattern);

EventComparator<Event> comparator = ...   //定义比较器,更精确的排序。
PatternStream<Event> patternStream = CEP.pattern(inputStream, pattern, comparator);

2. 处理匹配事件

PatternStream 的转换操作主要可以分成两种:简单便捷的选择提取(select)操作,和更加通用、更加强大的处理(process)操作。

  • 匹配事件的选择提取(select)

    从 PatternStream 中直接把匹配的复杂事件提取出来,包装成想要的信息输出,这个操作就是“选择”(select)。

    .select()传入 PatternSelectFunction 或者 .flatSelect()传入PatternFlatSelectFunction。

    // 1. 定义Pattern,登录失败事件,循环检测3次 
    Pattern<LoginEvent, LoginEvent> pattern = Pattern.<LoginEvent>begin("fails")
          .where(new SimpleCondition<LoginEvent>() {
               @Override            
               public boolean filter(LoginEvent loginEvent) throws Exception {                 return loginEvent.eventType.equals("fail");
              }
          }).times(3).consecutive();

    // 2. 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
    PatternStream<LoginEvent> patternStream = CEP.pattern(stream, pattern);

    // 3. .select() 将匹配到的复杂事件选择出来,然后包装成报警信息输出
    patternStream.select(new PatternSelectFunction<LoginEvent, String>() {
    @Override
    public String select(Map<String, List<LoginEvent>> map) throws Exception {
             // 只有一个模式,匹配到了3个事件,放在List中
             LoginEvent first = map.get("fails").get(0);
             LoginEvent second = map.get("fails").get(1);                
             LoginEvent third = map.get("fails").get(2);                
             return first.userId + " 连续三次登录失败!登录时间:" + first.timestamp
                 + ", " + second.timestamp + ", " + third.timestamp;
    }
    }).print("warning");

    // 3. .flatSelect() 将匹配到的复杂事件选择出来,然后包装成报警信息输出
    patternStream.flatSelect(new PatternFlatSelectFunction<LoginEvent, String>() {
    @Override
    public void flatSelect(Map<String, List<LoginEvent>> map, Collector<String> out) throws Exception {
    LoginEvent first = map.get("fails").get(0);
    LoginEvent second = map.get("fails").get(1);
           LoginEvent third = map.get("fails").get(2);
           out.collect(first.userId + " 连续三次登录失败!登录时间:" + first.timestamp +
    ", " + second.timestamp + ", " + third.timestamp);
      }
    }).print("warning");
  • 匹配事件的通用处理(process)1.8 版本后引入,调用 .process() ,传入PatternProcessFunction。事实上,PatternSelectFunction 和 PatternFlatSelectFunction在 CEP 内部执行时也会被转换成 PatternProcessFunction。

    // 3. .process() 将匹配到的复杂事件选择出来,然后包装成报警信息输出 
    patternStream.process(new PatternProcessFunction<LoginEvent, String>() {
    @Override
    public void processMatch(Map<String, List<LoginEvent>> map, Context ctx, Collector<String> out) throws Exception {
    LoginEvent first = map.get("fails").get(0);
    LoginEvent second = map.get("fails").get(1);
           LoginEvent third = map.get("fails").get(2);
           out.collect(first.userId + " 连续三次登录失败!登录时间:" + first.timestamp +
    ", " + second.timestamp + ", " + third.timestamp);
      }
    }).print("warning");

3. 处理超时事件

复杂事件的检测结果一般只有两种:要么匹配,要么不匹配。但在有时间限制下,超出 .within() 指定模式检测的时间间隔,当前这组检测就应该失败了。超时是一种“部分成功匹配”,开头能够正常匹配的前提下,没有等到后续的匹配事件才会超时,往往不应该直接丢弃,而是要输出一个提示或报警信息。

  • 使用 PatternProcessFunction 的侧输出流(官方推荐)

    class MyPatternProcessFunction extends PatternProcessFunction<Event, String> implements TimedOutPartialMatchHandler<Event> { 
    // 正常匹配事件的处理
    @Override    
    public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<String> out) throws Exception{      
    ...
    }
    // 超时部分匹配事件的处理
    @Override    
    public void processTimedOutMatch(Map<String, List<Event>> match, Context ctx) throws Exception{
      Event startEvent = match.get("start").get(0);
      OutputTag<Event> outputTag = new OutputTag<Event>("time-out"){};         ctx.output(outputTag, startEvent);
    }
    }
  • 使用 PatternTimeoutFunction

    // 定义一个侧输出流标签,用于标识超时侧输出流 
    OutputTag<String> timeoutTag = new OutputTag<String>("timeout"){};

    // 将匹配到的,和超时部分匹配的复杂事件提取出来,然后包装成提示信息输出
    SingleOutputStreamOperator<String> resultStream = patternStream.select(timeoutTag,
    // 超时部分匹配事件的处理
    new PatternTimeoutFunction<Event, String>() {
    @Override
         public String timeout(Map<String, List<Event>> pattern, long timeoutTimestamp) throws Exception {
    Event event = pattern.get("start").get(0);
    return "超时:" + event.toString();
    }
    },
    // 正常匹配事件的处理
    new PatternSelectFunction<Event, String>() {
    @Override
    public String select(Map<String, List<Event>> pattern) throws Exception {
    ...
    }
    });

    // 将正常匹配和超时部分匹配的处理结果流打印输出
    resultStream.print("matched");
    resultStream.getSideOutput(timeoutTag).print("timeout");

    一个应用案例就是下单后15分钟内未支付的用户监测

4. 处理迟到数据

在 Flink CEP 中沿用了通过设置水位线(watermark)延迟来处理乱序数据的做法。当一个事件到来时,并不会立即做检测匹配处理,而是先放入一个缓冲区(buffer)。缓冲区内的数据,会按照时间戳由小到大排序;当一个水位线到来时,就会将缓冲区中所有时间戳小于水位线的事件依次取出,进行检测匹配。

水位线延迟时间不可能保证将所有乱序数据完美包括进来,总会有一些事件延迟比较大的“迟到数据”就只能被直接丢弃了,与窗口对迟到数据的默认处理一致。

如果不希望迟到数据丢掉,应该也可以借鉴窗口的做法。将迟到事件输出到侧输出流。

PatternStream<Event> patternStream = CEP.pattern(input, pattern); 

// 定义一个侧输出流的标签
OutputTag<String> lateDataOutputTag = new OutputTag<String>("late-data"){};  
SingleOutputStreamOperator<ComplexEvent> result = patternStream
  .sideOutputLateData(lateDataOutputTag)    // 将迟到数据输出到侧输出流
  .select(     // 处理正常匹配数据
       new PatternSelectFunction<Event, ComplexEvent>() {...}    
);

// 从结果中提取侧输出流
DataStream<String> lateData = result.getSideOutput(lateDataOutputTag);

5. CEP 的状态机实现

CEP 中模式的定义方式比较复杂,而且与正则表达式非常相似:正则表达式在字符串上匹配符合模板的字符序列,而Flink CEP 则是在事件流上匹配符合模式定义的复杂事件。

检测匹配事件的过程中会有初始(没有任何匹配)、检测中(部分匹配成功)、匹配成功、匹配失败等不同的“状态”。随着每个事件的到来,都会改变当前检测的“状态”;而这种改变跟当前事件的特性有关、也跟当前所处的状态有关。其实就是一个“状态机”(state machine),也正是正则表达式底层引擎的实现原理。

Flink CEP 的底层工作原理其实与正则表达式是一致的,是一个“非确定有限状态自动机”(Nondeterministic Finite Automaton,NFA)。

检测用户连续三次登录失败的复杂事件。用 Flink CEP 中的 Pattern API 可以很方便地把它定义出来;如果我们现在不用 CEP,而是用 DataStream API 和处理函数来实现,应该怎么做呢?当然因为这个需求不是很复杂,我们也可以用嵌套的 if-else 条件判断将它实现,不过这样做的代码可读性和扩展性都会很差。更好的方式,就是实现一个状态机。

从初始状态(INITIAL)出发,遇到一个类型为 fail 的登录失败事件,就开始进入部分匹配的状态;目前只有一个 fail 事件,我们把当前状态记作 S1。基于 S1 状态,如果继续遇到 fail 事件,那么就有两个 fail 事件,记作 S2。基于 S2 状态如果再次遇到 fail 事件,那么就找到了一组匹配的复杂事件,把当前状态记作 Matched,就可以输出报警信息了。需要注意的是,报警完毕,需要立即重置状态回 S2;因为如果接下来再遇到 fail 事件,就又满足了新的连续三次登录失败,需要再次报警。

而不论是初始状态,还是 S1、S2 状态,只要遇到类型为 success 的登录成功事件,就会跳转到结束状态,记作 Terminal。此时当前检测完毕,之前的部分匹配应该全部清空,所以需要立即重置状态到 Initial,重新开始下一轮检测。所以这里我们真正参与状态转移的,其实只有 Initial、S1、S2 三个状态,Matched 和 Terminal 是为了方便我们做其他操作(比如输出报警、清空状态)的“临时标记状态”,不等新事件到来马上就会跳转。

// 将数据依次输入状态机进行处理 
DataStream<String> alertStream = stream.flatMap(new StateMachineMapper());


@SuppressWarnings("serial")
public static class StateMachineMapper extends RichFlatMapFunction<LoginEvent, String> {
// 声明当前用户对应的状态
private ValueState<State> currentState;

@Override
   public void open(Configuration conf) {
currentState = getRuntimeContext().getState( // 获取状态对象
           new ValueStateDescriptor<>("state", State.class));
  }

@Override
public void flatMap(LoginEvent event, Collector<String> out) throws Exception {
// 获取状态,如果状态为空,置为初始状态
State state = currentState.value();
       if (state == null) {
           state = State.Initial;
}

// 基于当前状态,输入当前事件时跳转到下一状态
       State nextState = state.transition(event.eventType);
       if (nextState == State.Matched) {
           // 如果检测到匹配的复杂事件,输出报警信息
           out.collect(event.userId + " 连续三次登录失败");
           // 需要跳转回S2状态,这里直接不更新状态就可以了
      } else if (nextState == State.Terminal) {
           // 如果到了终止状态,就重置状态,准备重新开始
           currentState.update(State.Initial);
      } else {
           // 如果还没结束,更新状态(状态跳转),继续读取事件
           currentState.update(nextState);
      }
  }
}


// 状态机实现
public enum State {
Terminal,    // 匹配失败,当前匹配终止
Matched,    // 匹配成功

// S2状态
S2(new Transition("fail", Matched), new Transition("success", Terminal)),

// S1状态
   S1(new Transition("fail", S2), new Transition("success", Terminal)),

// 初始状态
   Initial(new Transition("fail", S1), new Transition("success", Terminal));
   
   private final Transition[] transitions;    // 状态转移规则

// 状态的构造方法,可以传入一组状态转移规则来定义状态
State(Transition... transitions) {
       this.transitions = transitions;
  }
// 状态的转移方法,根据当前输入事件类型,从定义好的转移规则中找到下一个状态
public State transition(String eventType) {
       for (Transition t : transitions) {
           if (t.getEventType().equals(eventType)) {
               return t.getTargetState();
}
}
// 如果没有找到转移规则,说明已经结束,回到初始状态
       return Initial;
  }
}


// 定义状态转移类,包括两个属性:当前事件类型和目标状态
public static class Transition implements Serializable {
   private static final long serialVersionUID = 1L;
// 触发状态转移的当前事件类型
private final String eventType;
// 转移的目标状态
private final State targetState;
public Transition(String eventType, State targetState) {
       this.eventType = checkNotNull(eventType);
       this.targetState = checkNotNull(targetState);
  }
   public String getEventType() {
       return eventType;
}
public State getTargetState() {
       return targetState;
}