vlambda博客
学习文章列表

Flink双流join的3种方式及IntervalJoin源码分析

概述

在数据库中的静态表上做 OLAP 分析时,两表 join 是非常常见的操作。同理,在流式处理作业中,有时也需要在两条流上做 join 以获得更丰富的信息。Flink DataStream API 为用户提供了3个算子来实现双流 join,分别是:

  • join()

  • coGroup()

  • intervalJoin()

join()

join() 算子提供的语义为"Window join",即按照指定字段和(滚动/滑动/会话)窗口进行 inner join,支持处理时间和事件时间两种时间特征。

 paymentInfo_ds.join(orderInfo__ds) .where(_.order_id) .equalTo(_.order_id) .window(TumblingEventTimeWindows.of(Time.seconds(20)))   .apply(new JoinFunction[PaymentInfo,OrderInfo,PaymentWide]{ override def join(first: PaymentInfo, second: OrderInfo): PaymentWide = { //处理逻辑 new PaymentWide(first, second) } })

coGroup()

只有 inner join 肯定还不够,如何实现 left/right outer join 呢?答案就是利用 coGroup() 算子。

它的调用方式类似于 join() 算子,也需要开窗,但是 CoGroupFunction 比 JoinFunction 更加灵活,可以按照用户指定的逻辑匹配左流和/或右流的数据并输出。

 paymentInfo_wm_ds.coGroup(orderInfo_ds) .where(_.order_id) .equalTo(_.order_id) .window(TumblingEventTimeWindows.of(Time.seconds(20))) .apply(new CoGroupFunction[PaymentInfo,OrderInfo,PaymentWide](){ override def coGroup(first: lang.Iterable[PaymentInfo], second: lang.Iterable[OrderInfo], out: Collector[PaymentWide]): Unit = { val f = first.iterator() while (f.hasNext){ //处理左流数据  //处理右流数据 } } })

intervalJoin()

join() 和 coGroup() 都是基于窗口做关联的。但是在某些情况下,两条流的数据步调未必一致。例如,订单流的数据有可能在点击流的购买动作发生之后很久才被写入,如果用窗口来圈定,很容易 join 不上。

所以 Flink 又提供了"Interval join"的语义,按照指定字段以及右流相对左流偏移的时间区间进行关联,即:right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]

interval join 也是 inner join,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间。注意在运行之前,需要分别在两个流上应用 assignTimestampsAndWatermarks() 方法获取事件时间戳和水印。

paymentInfo_ds.keyBy(_.order_id) .intervalJoin(orderWide_wm_ds.keyBy(_.order_id)) .between(Time.minutes(-15), Time.minutes(0)) .process(new ProcessJoinFunction[PaymentInfo, OrderInfo, PaymentWide]() { override def processElement(in1: PaymentInfo, in2: OrderInfo, context: ProcessJoinFunction[PaymentInfo, OrderInfo, PaymentWide]#Context, collector: Collector[PaymentWide]): Unit = { collector.collect(new PaymentWide(in1, in2)) } })

由上可见,interval join 与 window join 不同,是两个 KeyedStream 之上的操作,并且需要调用 between() 方法指定偏移区间的上下界。如果想令上下界是开区间,可以调用 upperBoundExclusive()/lowerBoundExclusive() 方法。

interval join 的实现原理及源码分析

org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator

 /** * Completes the join operation with the given user function that is executed for each joined pair * of elements. This methods allows for passing explicit type information for the output type. * * @param processJoinFunction The user-defined process join function. * @param outputType The type information for the output type. * @param <OUT> The output type. * @return The transformed {@link DataStream}. */ @PublicEvolving public <OUT> SingleOutputStreamOperator<OUT> process( ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction, TypeInformation<OUT> outputType) { Preconditions.checkNotNull(processJoinFunction); Preconditions.checkNotNull(outputType);  final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);  final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator = new IntervalJoinOperator<>( lowerBound, upperBound, lowerBoundInclusive, upperBoundInclusive, left.getType().createSerializer(left.getExecutionConfig()), right.getType().createSerializer(right.getExecutionConfig()), cleanedUdf );  return left .connect(right) .keyBy(keySelector1, keySelector2) .transform("Interval Join", outputType, operator); }

可见是先对两条流执行 connect()keyBy() 操作,然后利用 IntervalJoinOperator 算子进行转换。在 IntervalJoinOperator 中,会利用两个 MapState 分别缓存左流和右流的数据

org.apache.flink.streaming.api.operators.co.IntervalJoinOperator#initializeState 

private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer; private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;   @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context);  this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( LEFT_BUFFER, LongSerializer.INSTANCE, new ListSerializer<>(new BufferEntrySerializer<>(leftTypeSerializer)) ));  this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( RIGHT_BUFFER, LongSerializer.INSTANCE, new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer)) )); }
  • Long 表示事件时间戳

  • List表示该时刻到来的数据记录

当左流和右流有数据到达时,会分别调用 processElement1() 和 processElement2() 方法,它们都调用了 processElement() 方法,代码如下。

/** * Process a {@link StreamRecord} from the left stream. Whenever an {@link StreamRecord} * arrives at the left stream, it will get added to the left buffer. Possible join candidates * for that element will be looked up from the right buffer and if the pair lies within the * user defined boundaries, it gets passed to the {@link ProcessJoinFunction}. * * @param record An incoming record to be joined * @throws Exception Can throw an Exception during state access */ @Override public void processElement1(StreamRecord<T1> record) throws Exception { processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true); }  /** * Process a {@link StreamRecord} from the right stream. Whenever a {@link StreamRecord} * arrives at the right stream, it will get added to the right buffer. Possible join candidates * for that element will be looked up from the left buffer and if the pair lies within the user * defined boundaries, it gets passed to the {@link ProcessJoinFunction}. * * @param record An incoming record to be joined * @throws Exception Can throw an exception during state access */ @Override public void processElement2(StreamRecord<T2> record) throws Exception { processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false); }  @SuppressWarnings("unchecked") private <THIS, OTHER> void processElement( final StreamRecord<THIS> record, final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer, final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer, final long relativeLowerBound, final long relativeUpperBound, final boolean isLeft) throws Exception {  final THIS ourValue = record.getValue(); final long ourTimestamp = record.getTimestamp();  if (ourTimestamp == Long.MIN_VALUE) { throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " + "interval stream joins need to have timestamps meaningful timestamps."); }  if (isLate(ourTimestamp)) { return; }  addToBuffer(ourBuffer, ourValue, ourTimestamp);  for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) { final long timestamp = bucket.getKey();  if (timestamp < ourTimestamp + relativeLowerBound || timestamp > ourTimestamp + relativeUpperBound) { continue; }  for (BufferEntry<OTHER> entry: bucket.getValue()) { if (isLeft) { collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp); } else { collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp); } } }  long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp; if (isLeft) { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime); } else { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime); } }
  • 取得当前流 StreamRecord 的时间戳,调用 isLate() 方法判断它是否是迟到数据(即时间戳小于当前水印值),如是则丢弃。

  • 调用 addToBuffer() 方法,将时间戳和数据一起插入当前流对应的 MapState。

  • 遍历另外一个流的 MapState,如果数据满足前述的时间区间条件,则调用 collect() 方法将该条数据投递给用户定义的 ProcessJoinFunction 进行处理。collect() 方法的代码如下,注意结果对应的时间戳是左右流时间戳里较大的那个。

private boolean isLate(long timestamp) { long currentWatermark = internalTimerService.currentWatermark(); return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark; }  private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception { final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);  collector.setAbsoluteTimestamp(resultTimestamp); context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);  userFunction.processElement(left, right, context, collector); }  private static <T> void addToBuffer( final MapState<Long, List<IntervalJoinOperator.BufferEntry<T>>> buffer, final T value, final long timestamp) throws Exception { List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp); if (elemsInBucket == null) { elemsInBucket = new ArrayList<>(); } elemsInBucket.add(new BufferEntry<>(value, false)); buffer.put(timestamp, elemsInBucket); }

调用 TimerService.registerEventTimeTimer() 注册时间戳为 timestamp + relativeUpperBound 的定时器,该定时器负责在水印超过区间的上界时执行状态的清理逻辑,防止数据堆积。注意左右流的定时器所属的 namespace 是不同的,具体逻辑则位于 onEventTime() 方法中。

@Override public void onEventTime(InternalTimer<K, String> timer) throws Exception {  long timerTimestamp = timer.getTimestamp(); String namespace = timer.getNamespace();  logger.trace("onEventTime @ {}", timerTimestamp);  switch (namespace) { case CLEANUP_NAMESPACE_LEFT: { long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound; logger.trace("Removing from left buffer @ {}", timestamp); leftBuffer.remove(timestamp); break; } case CLEANUP_NAMESPACE_RIGHT: { long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp; logger.trace("Removing from right buffer @ {}", timestamp); rightBuffer.remove(timestamp); break; } default: throw new RuntimeException("Invalid namespace " + namespace); } }