vlambda博客
学习文章列表

flink sql使用中的一个问题


最近有人问了浪尖一个flink共享datastream或者临时表会否重复计算的问题。

对于 flink 的datastream ,比如上图,source 经过datastream计算之后的结果想共享给compute1和compute2计算,这样可以避免之前的逻辑重复计算,而且数据也只需拉去一次。


而对于flink的sql呢?假如compute1和compute2之前是经过复杂计算的临时表,直接给下游sql计算使用会出现什么问题呢?


先告诉大家答案 ,临时表注册完了之后,实际上并没有完成物化功能,这时候后续有多个sqlupdate操作依赖这个临时表的话,会导致临时表多次计算的。


这个其实也不难理解,因为每次sqlupdate都是完成sql 语法树的解析,实际上也是类似于spark的血缘关系,但是flink sql不能像spark rdd血缘关系那样使用cache或者Checkpoint来避免重复计算,因为它并不能支持公共节点识别和公共节点数据的多次分发。


sql代码如下,供大家测试参考

package org.table.kafka;
import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Json;import org.apache.flink.table.descriptors.Kafka;import org.apache.flink.table.descriptors.Rowtime;import org.apache.flink.table.descriptors.Schema;
public class kafka2kafka { public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); tEnv.connect( new Kafka() .version("0.10") // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("jsontest") .property("bootstrap.servers", "localhost:9093") .property("group.id","test") .startFromLatest() ) .withFormat( new Json() .failOnMissingField(false) .deriveSchema() ) .withSchema(
new Schema() .field("rowtime",Types.SQL_TIMESTAMP) .rowtime(new Rowtime() .timestampsFromField("eventtime") .watermarksPeriodicBounded(2000) ) .field("fruit", Types.STRING) .field("number", Types.INT) ) .inAppendMode() .registerTableSource("source");
tEnv.connect( new Kafka() .version("0.10") // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("test") .property("acks", "all") .property("retries", "0") .property("batch.size", "16384") .property("linger.ms", "10") .property("bootstrap.servers", "localhost:9093") .sinkPartitionerFixed() ).inAppendMode() .withFormat( new Json().deriveSchema() ) .withSchema( new Schema() .field("fruit", Types.STRING) .field("total", Types.INT) .field("time", Types.SQL_TIMESTAMP) ) .registerTableSink("sink");
tEnv.connect( new Kafka() .version("0.10") // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("test") .property("acks", "all") .property("retries", "0") .property("batch.size", "16384") .property("linger.ms", "10") .property("bootstrap.servers", "localhost:9093") .sinkPartitionerFixed() ).inAppendMode() .withFormat( new Json().deriveSchema() ) .withSchema( new Schema() .field("fruit", Types.STRING) .field("total", Types.INT) .field("time", Types.SQL_TIMESTAMP) ) .registerTableSink("sink1");
Table table = tEnv.sqlQuery("select * from source"); tEnv.registerTable("view",table);

tEnv.sqlUpdate("insert into sink select fruit,sum(number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from view group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)"); tEnv.sqlUpdate("insert into sink1 select fruit,sum(number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from view group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)");
System.out.println(env.getExecutionPlan());// env.execute(); }}


可视化页面链接:

https://flink.apache.org/visualizer/


使用的过程中避免重要的账号密码被泄露。