flink sql使用中的一个问题
最近有人问了浪尖一个flink共享datastream或者临时表会否重复计算的问题。
对于 flink 的datastream ,比如上图,source 经过datastream计算之后的结果想共享给compute1和compute2计算,这样可以避免之前的逻辑重复计算,而且数据也只需拉去一次。
而对于flink的sql呢?假如compute1和compute2之前是经过复杂计算的临时表,直接给下游sql计算使用会出现什么问题呢?
先告诉大家答案 ,临时表注册完了之后,实际上并没有完成物化功能,这时候后续有多个sqlupdate操作依赖这个临时表的话,会导致临时表多次计算的。
这个其实也不难理解,因为每次sqlupdate都是完成sql 语法树的解析,实际上也是类似于spark的血缘关系,但是flink sql不能像spark rdd血缘关系那样使用cache或者Checkpoint来避免重复计算,因为它并不能支持公共节点识别和公共节点数据的多次分发。
sql代码如下,供大家测试参考
package org.table.kafka;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Json;import org.apache.flink.table.descriptors.Kafka;import org.apache.flink.table.descriptors.Rowtime;import org.apache.flink.table.descriptors.Schema;public class kafka2kafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setParallelism(1);StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);tEnv.connect(new Kafka().version("0.10")// "0.8", "0.9", "0.10", "0.11", and "universal".topic("jsontest").property("bootstrap.servers", "localhost:9093").property("group.id","test").startFromLatest()).withFormat(new Json().failOnMissingField(false).deriveSchema()).withSchema(new Schema().field("rowtime",Types.SQL_TIMESTAMP).rowtime(new Rowtime().timestampsFromField("eventtime").watermarksPeriodicBounded(2000)).field("fruit", Types.STRING).field("number", Types.INT)).inAppendMode().registerTableSource("source");tEnv.connect(new Kafka().version("0.10")// "0.8", "0.9", "0.10", "0.11", and "universal".topic("test").property("acks", "all").property("retries", "0").property("batch.size", "16384").property("linger.ms", "10").property("bootstrap.servers", "localhost:9093").sinkPartitionerFixed()).inAppendMode().withFormat(new Json().deriveSchema()).withSchema(new Schema().field("fruit", Types.STRING).field("total", Types.INT).field("time", Types.SQL_TIMESTAMP)).registerTableSink("sink");tEnv.connect(new Kafka().version("0.10")// "0.8", "0.9", "0.10", "0.11", and "universal".topic("test").property("acks", "all").property("retries", "0").property("batch.size", "16384").property("linger.ms", "10").property("bootstrap.servers", "localhost:9093").sinkPartitionerFixed()).inAppendMode().withFormat(new Json().deriveSchema()).withSchema(new Schema().field("fruit", Types.STRING).field("total", Types.INT).field("time", Types.SQL_TIMESTAMP)).registerTableSink("sink1");Table table = tEnv.sqlQuery("select * from source");tEnv.registerTable("view",table);tEnv.sqlUpdate("insert into sink select fruit,sum(number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from view group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)");tEnv.sqlUpdate("insert into sink1 select fruit,sum(number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from view group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)");System.out.println(env.getExecutionPlan());// env.execute();}}
可视化页面链接:
https://flink.apache.org/visualizer/
使用的过程中避免重要的账号密码被泄露。
