flink sql使用中的一个问题
最近有人问了浪尖一个flink共享datastream或者临时表会否重复计算的问题。
对于 flink 的datastream ,比如上图,source 经过datastream计算之后的结果想共享给compute1和compute2计算,这样可以避免之前的逻辑重复计算,而且数据也只需拉去一次。
而对于flink的sql呢?假如compute1和compute2之前是经过复杂计算的临时表,直接给下游sql计算使用会出现什么问题呢?
先告诉大家答案 ,临时表注册完了之后,实际上并没有完成物化功能,这时候后续有多个sqlupdate操作依赖这个临时表的话,会导致临时表多次计算的。
这个其实也不难理解,因为每次sqlupdate都是完成sql 语法树的解析,实际上也是类似于spark的血缘关系,但是flink sql不能像spark rdd血缘关系那样使用cache或者Checkpoint来避免重复计算,因为它并不能支持公共节点识别和公共节点数据的多次分发。
sql代码如下,供大家测试参考
package org.table.kafka;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.Schema;
public class kafka2kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
tEnv.connect(
new Kafka()
.version("0.10")
// "0.8", "0.9", "0.10", "0.11", and "universal"
.topic("jsontest")
.property("bootstrap.servers", "localhost:9093")
.property("group.id","test")
.startFromLatest()
)
.withFormat(
new Json()
.failOnMissingField(false)
.deriveSchema()
)
.withSchema(
new Schema()
.field("rowtime",Types.SQL_TIMESTAMP)
.rowtime(new Rowtime()
.timestampsFromField("eventtime")
.watermarksPeriodicBounded(2000)
)
.field("fruit", Types.STRING)
.field("number", Types.INT)
)
.inAppendMode()
.registerTableSource("source");
tEnv.connect(
new Kafka()
.version("0.10")
// "0.8", "0.9", "0.10", "0.11", and "universal"
.topic("test")
.property("acks", "all")
.property("retries", "0")
.property("batch.size", "16384")
.property("linger.ms", "10")
.property("bootstrap.servers", "localhost:9093")
.sinkPartitionerFixed()
).inAppendMode()
.withFormat(
new Json().deriveSchema()
)
.withSchema(
new Schema()
.field("fruit", Types.STRING)
.field("total", Types.INT)
.field("time", Types.SQL_TIMESTAMP)
)
.registerTableSink("sink");
tEnv.connect(
new Kafka()
.version("0.10")
// "0.8", "0.9", "0.10", "0.11", and "universal"
.topic("test")
.property("acks", "all")
.property("retries", "0")
.property("batch.size", "16384")
.property("linger.ms", "10")
.property("bootstrap.servers", "localhost:9093")
.sinkPartitionerFixed()
).inAppendMode()
.withFormat(
new Json().deriveSchema()
)
.withSchema(
new Schema()
.field("fruit", Types.STRING)
.field("total", Types.INT)
.field("time", Types.SQL_TIMESTAMP)
)
.registerTableSink("sink1");
Table table = tEnv.sqlQuery("select * from source");
tEnv.registerTable("view",table);
tEnv.sqlUpdate("insert into sink select fruit,sum(number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from view group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)");
tEnv.sqlUpdate("insert into sink1 select fruit,sum(number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from view group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)");
System.out.println(env.getExecutionPlan());
// env.execute();
}
}
可视化页面链接:
https://flink.apache.org/visualizer/
使用的过程中避免重要的账号密码被泄露。