京东:Flink SQL 优化实战
摘要:本文作者为京东算法服务部的张颖和段学浩,并由 Apache Hive PMC,阿里巴巴技术专家李锐帮忙校对。主要内容为:
-
背景 -
Flink SQL 的优化 -
总结
一、背景
二、 Flink SQL 的优化
1. UDF 重用
-
一个 taskmanager 里面可能会有多个 subtask,所以这个 cache 要么是 thread (THREAD LOCAL) 级别要么是 tm 级别;
-
为了防止出现一些情况导致清理 cache 的逻辑走不到,一定要在 close 方法里将 cache 清掉; -
为了防止内存无限增大,选取的 cache 最好可以主动控制 size;至于 “超时时间”,建议可以配置一下,但是最好不要小于 UDF 先后调用的时间; -
上文有提到过,一个 tm 里面可能会有多个 subtask,相当于 tm 里面是个多线程的环境。首先我们的 cache 需要是线程安全的,然后可以根据业务判断需不需要锁。
public class RandomFunction extends ScalarFunction {
private static Cache<String, Integer> cache = CacheBuilder.newBuilder()
.maximumSize(2)
.expireAfterWrite(3, TimeUnit.SECONDS)
.build();
public int eval(String pvid) {
profileLog.error("RandomFunction invoked:" + atomicInteger.incrementAndGet());
Integer result = cache.getIfPresent(pvid);
if (null == result) {
int tmp = (int)(Math.random() * 1000);
cache.put("pvid", tmp);
return tmp;
}
return result;
}
@Override
public void close() throws Exception {
super.close();
cache.cleanUp();
}
}
2. 单元测试
public static HiveConf createHiveConf() {
ClassLoader classLoader = new HiveOperatorTest().getClass().getClassLoader();
HiveConf.setHiveSiteLocation(classLoader.getResource(HIVE_SITE_XML));
try {
TEMPORARY_FOLDER.create();
String warehouseDir = TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/metastore_db";
String warehouseUri = String.format(HIVE_WAREHOUSE_URI_FORMAT, warehouseDir);
HiveConf hiveConf = new HiveConf();
hiveConf.setVar(
HiveConf.ConfVars.METASTOREWAREHOUSE,
TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath());
hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, warehouseUri);
hiveConf.set("datanucleus.connectionPoolingType", "None");
hiveConf.set("hive.metastore.schema.verification", "false");
hiveConf.set("datanucleus.schema.autoCreateTables", "true");
return hiveConf;
} catch (IOException e) {
throw new CatalogException("Failed to create test HiveConf to HiveCatalog.", e);
}
}
public static void createCatalog() throws Exception{
Class clazz = HiveCatalog.class;
Constructor c1 = clazz.getDeclaredConstructor(new Class[]{String.class, String.class, HiveConf.class, String.class, boolean.class});
c1.setAccessible(true);
hiveCatalog = (HiveCatalog)c1.newInstance(new Object[]{"test-catalog", null, createHiveConf(), "2.3.4", true});
hiveCatalog.open();
}
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
TableConfig tableConfig = tableEnv.getConfig();
Configuration configuration = new Configuration();
configuration.setInteger("table.exec.resource.default-parallelism", 1);
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tableEnv.useCatalog(hiveCatalog.getName());
public static void closeCatalog() {
if (hiveCatalog != null) {
hiveCatalog.close();
}
}
CollectionTableFactory.reset();
CollectionTableFactory.initData(Arrays.asList(Row.of("this is a test"), Row.of("zhangying480"), Row.of("just for test"), Row.of("a test case")));
StringBuilder sbFilesSource = new StringBuilder();
sbFilesSource.append("CREATE temporary TABLE db1.`search_realtime_table_dump_p13`(" + " `pvid` string) with ('connector.type'='COLLECTION','is-bounded' = 'true')");
tableEnv.executeSql(sbFilesSource.toString());
3. join 方式的选择
传统的离线 Batch SQL (面向有界数据集的 SQL) 有三种基础的实现方式,分别是 Nested-loop Join、Sort-Merge Join 和 Hash Join。
Nested-loop Join 最为简单直接,将两个数据集加载到内存,并用内嵌遍历的方式来逐个比较两个数据集内的元素是否符合 Join 条件。Nested-loop Join 的时间效率以及空间效率都是最低的,可以使用:table.exec.disabled-operators:NestedLoopJoin 来禁用。
以下两张图片是禁用前和禁用后的效果 (如果你的禁用没有生效,先看一下是不是 Equi-Join):
-
Sort-Merge Join 分为 Sort 和 Merge 两个阶段:首先将两个数据集进行分别排序,然后再对两个有序数据集分别进行遍历和匹配,类似于归并排序的合并。(Sort-Merge Join 要求对两个数据集进行排序,但是如果两个输入是有序的数据集,则可以作为一种优化方案)。
Hash Join 同样分为两个阶段:首先将一个数据集转换为 Hash Table,然后遍历另外一个数据集元素并与 Hash Table 内的元素进行匹配。
-
第一阶段和第一个数据集分别称为 build 阶段和 build table; -
第二个阶段和第二个数据集分别称为 probe 阶段和 probe table。
-
Repartition-Repartition strategy: Join 的两个数据集分别对它们的 key 使用相同的分区函数进行分区,并经过网络发送数据; -
Broadcast-Forward strategy: 大的数据集不做处理,另一个比较小的数据集全部复制到集群中一部分数据的机器上。
-
如果两个数据集有较大差距,建议采用 Broadcast-Forward strategy; -
如果两个数据集差不多,建议采用 Repartition-Repartition strategy。
4. multiple input
5. 对象重用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
6. SQL 任务的 failover 策略
7. shuffle
-
pipeline shuffle 性能好,但是对资源的要求高,而且容错比较差 (会将该 operator 分到前面的一个 region 里面,对于 batch 任务来说,如果这个算子出问题,将从上一个 region 恢复); -
blocking shuffle 就是传统的 batch shuffle,会将数据落盘,这种 shuffle 的容错好,但是会产生大量的磁盘、网络 io (如果为了省心的话,建议用 blocking suffle)。blocking shuffle 又分为 hash shuffle 和 sort shuffle, -
如果你的磁盘是 ssd 并且并发不太大的话,可以选择使用 hash shuffle,这种 shuffle 方式产生的文件多、随机读多,对磁盘 io 影响较大; -
如果你是 sata 并且并发比较大,可以选择用 sort-merge shuffle,这种 shuffle 产生的数据少,顺序读,不会产生大量的磁盘 io,不过开销会更大一些 (sort merge)。
三、总结
【 活动推荐 】
8 月 7 日 Apache Flink Meetup,
4 位技术大咖将分享 Flink 在腾讯和第四范式的实践应用,
并且也将带来 Flink 1.14 版本的新特性预览。
(扫码报名)