vlambda博客
学习文章列表

实时数仓 | Flink实时维表join方法总结(附项目源码)


分享嘉宾:中国好胖子

编辑整理:柠檬妹

出品平台:数据仓库与Python大数据


目录

  1. 1、ETL背景
  2. 2、解决方案


    1. 2.1 直接查库定时更新

    2. 2.2 异步IO

    3. 2.3 Broadcast的方式

    4. 2.4 异步io结合Cache

  3. 3、完整源码


正文

Tips:推荐收藏,PC端观看效果更佳哦


1、ETL背景

在我们实时数仓日常工作中,经常会有一些实时的需求,这些需求往往都是一些拉宽的需求。为了给实时数仓来进行OLAP对来进行Ad-hoc查询,但是我们工作中一些维度表的数据是会发生变化的,可能是缓慢变化维度。那么这个时候就需要进行flink连接其他数据源来进行查询。那么这个时候我们肯定可以想到就是来一条查一次,这个是肯定可以做到的。

但是在大数据场景下,我们是不是会觉得有点慢呢?

我们是否有更好的解决方案,就像我写代码的时候 有时候就会思考有没有更好的解决方案。但是针对于要进行交付给用户,所以我们并没有那么多的时间进行思考来进行,因为产品一直都在催你哦。那么我们就来看看有几种解决方案:

实时数仓 | Flink实时维表join方法总结(附项目源码)上图 是一个实时架构图。当然我们公司已经引入了clickhouse 实时数仓这些已经不是我们所追求的了,但是并不妨碍我们的需求。下面我们就来看一下数据。

{"dt":"2019-11-19 20:33:39","countryCode":"TW","data": [{"type":"s1","score":0.8,"level":"D"},{"type":"s2","score":0.1,"level":"B"}]} 
{"dt":"2019-11-19 20:33:41","countryCode":"KW","data": [{"type":"s2","score":0.2,"level":"A"},{"type":"s1","score":0.2,"level":"D"}]}
{"dt":"2019-11-19 20:33:43","countryCode":"HK","data": [{"type":"s5","score":0.5,"level":"C"},{"type":"s2","score":0.8,"level":"B"}]}
{"dt":"2019-11-19 20:33:39","countryCode":"TW","data": [{"type":"s1","score":0.8,"level":"D"},{"type":"s2","score":0.1,"level":"B"}]}

当然之上 是我们的模拟数据,接下来我们看看 业务人员需要什么数据

"dt":"2019-11-19 20:33:39","countryCode":"AREA_CT","type":"s1","score":0.8,"level":"D"

"dt":"2019-11-19 20:33:39","countryCode":"AREA_CT","type":"s2","score":0.1,"level":"B"

那么这个时候我们可以发现了,其实就是把国家 换成大区,这样入仓之后可以进行 大区的olap实时的一些分析。例如实时的绩效考核等,还有一些营销活动等。我们就不细细考量了,因为毕竟都是假数据。

那么我们看到原始数据和结果数据,我们发现,是进行了拆解,例如 一条记录中带有多个 type 也就是直播平台,但是结果数据拆成了两个,这个不是udtf吗?

同时将国家编码转化为大区编码,那么我们这时候假定大区编码会有变化,因为组织的重构问题,或者组织的架构演进等。

那么我们思考一下 有几种解决方案呢?

2、解决方案

2.1 直接查库定时更新

温馨提示:手机端左右滑动即可查看完整代码哦


 static class SimpleFlatMapFunction extends RichFlatMapFunction<String,OutData>{


private transient ConcurrentHashMap<String, String> hashMap = null;


@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Jedis jedisCluster = RedisFactory.getJedisCluster();

ScanResult<Map.Entry<String, String>> areas = jedisCluster.hscan("areas", "0");
List<Map.Entry<String, String>> result = areas.getResult();
System.out.println("更新缓存");

hashMap = new ConcurrentHashMap<>();
for (Map.Entry<String, String> stringStringEntry : result) {
String key = stringStringEntry.getKey();
String[] split = stringStringEntry.getValue().split(",");
for (String s : split) {
hashMap.put(s, key);
}
}
jedisCluster.close();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("更新缓存");
Jedis jedisCluster = RedisFactory.getJedisCluster();

ScanResult<Map.Entry<String, String>> areas = jedisCluster.hscan("areas", "0");
List<Map.Entry<String, String>> result = areas.getResult();
hashMap = new ConcurrentHashMap<>();
for (Map.Entry<String, String> stringStringEntry : result) {
String key = stringStringEntry.getKey();
String[] split = stringStringEntry.getValue().split(",");
for (String s : split) {
hashMap.put(s, key);
}
}
jedisCluster.close();
}
}, 0, 3, TimeUnit.SECONDS);

}

@Override
public void flatMap(String s, Collector<OutData> collector) throws Exception {
OriginData originData = JSONObject.parseObject(s, OriginData.class);
String countryCode = originData.countryCode;
ArrayList<Data> data = originData.data;
String dt = originData.dt;
String coutryCode = hashMap.get(countryCode);
for (Data datum : data) {
OutData of = OutData.of(dt, coutryCode, datum.type, datum.score, datum.level);
collector.collect(of);
}
}
}

2.2 异步IO

static class SimpaleAsyncIoFunction extends RichAsyncFunction<String,OutData> {
private transient RedisClient redisClient;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
super.open(parameters);
RedisOptions config = new RedisOptions();
config.setHost("hadoop01");
config.setPort(6379);

VertxOptions vo = new VertxOptions();
vo.setEventLoopPoolSize(10);
vo.setWorkerPoolSize(20);

Vertx vertx = Vertx.vertx(vo);

redisClient = RedisClient.create(vertx, config);
}

@Override
public void close() throws Exception {
super.close();
super.close();
if(redisClient!=null){
redisClient.close(null);
}
}
@Override
public void asyncInvoke(String s, ResultFuture<OutData> resultFuture) throws Exception {
OriginData originData = JSONObject.parseObject(s, OriginData.class);
String countryCode = originData.countryCode;

redisClient.hscan("areas", "0", ScanOptions.NONE, new Handler<AsyncResult<JsonArray>>() {
@Override
public void handle(AsyncResult<JsonArray> result) {
if (result.succeeded()){
JsonArray result1 = result.result();
if (result1 == null){
resultFuture.complete(null);
return;
}
JsonArray jsonArray = result1.getJsonArray(1);
// ["AREA_US","US","AREA_CT","TW,HK","AREA_AR","PK,KW,SA,XX","AREA_IN","IN"]
HashMap<String,String> ss = new HashMap<>();
ArrayList<String> keys = new ArrayList<>();
ArrayList<String> values = new ArrayList<>();

for (int i = 0; i <jsonArray.size() ; i++) {
if (i % 2 == 0){
keys.add(jsonArray.getString(i));
}else {
values.add(jsonArray.getString(i));
}
}

for (int i = 0; i < keys.size(); i++) {
String s1 = keys.get(i);
String s2 = values.get(i);
String[] split = s2.split(",");
for (String s3 : split) {
ss.put(s3,s1);
}
}
String dt = originData.dt;
String country = ss.get(countryCode);

for (Data datum : originData.data) {
OutData outData = OutData.of(dt, country, datum.type, datum.score, datum.level);
resultFuture.complete(Collections.singleton(outData));
}

} else if(result.failed()){
resultFuture.complete(null);
return;
}

}
});

}
}

2.3 Broadcast的方式

static class BroadcastSourceFunction extends RichSourceFunction<String>{

@Override
public void run(SourceContext<String> sourceContext) throws Exception {
while (true){

Jedis jedisCluster = RedisFactory.getJedisCluster();

ScanResult<Map.Entry<String, String>> areas = jedisCluster.hscan("areas", "0");
List<Map.Entry<String, String>> result = areas.getResult();
HashMap<String, String> hashMap = new HashMap<>();

for (Map.Entry<String, String> stringStringEntry : result) {
String key = stringStringEntry.getKey();
String[] split = stringStringEntry.getValue().split(",");
for (String s : split) {
hashMap.put(s,key);

}
}

sourceContext.collect(JSON.toJSONString(hashMap));
jedisCluster.close();

TimeUnit.SECONDS.sleep(3);
}




}

@Override
public void cancel() {

}
}

2.4 异步io结合Cache

我相信各位会了基础的,这个很简单也就不写了。

无非就是制定缓存淘汰算法,然后缓存有就拿缓存的,没有就异步去redis拿而已。

3、完整代码

package com.bigdata.dim.join;
import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import io.vertx.core.AsyncResult;import io.vertx.core.Handler;import io.vertx.core.Vertx;import io.vertx.core.VertxOptions;import io.vertx.core.json.JsonArray;import io.vertx.redis.RedisClient;import io.vertx.redis.RedisOptions;import io.vertx.redis.op.ScanOptions;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.functions.RichFlatMapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.state.BroadcastState;import org.apache.flink.api.common.state.MapStateDescriptor;import org.apache.flink.api.common.state.ReadOnlyBroadcastState;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.BroadcastStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.async.ResultFuture;import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.util.Collector;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import redis.clients.jedis.HostAndPort;import redis.clients.jedis.Jedis;import redis.clients.jedis.ScanResult;
import java.text.SimpleDateFormat;import java.util.*;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;
/** * Copyright (c) 2019 bigdata ALL Rights Reserved * Project: learning * Package: com.bigdata.dim.join * Version: 1.0 * * @author qingzhi.wu 2020/11/8 11:12 */public class Main { private static final int RESTART_ATTEMPTS = 5; private static final int RESTART_INTERVAL = 20; private static Logger logger = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置Stage策略 CheckpointConfig checkpointConfig = env.getCheckpointConfig(); env.enableCheckpointing(5000L); checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); checkpointConfig.setMaxConcurrentCheckpoints(1); checkpointConfig.setCheckpointTimeout(100000L); checkpointConfig.setFailOnCheckpointingErrors(true); checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //测试环境不需要设置 backend // FsStateBackend fsStateBackend = new FsStateBackend(CheckpointUtils.getCheckpointDir()); // env.setStateBackend(fsStateBackend); // 延迟时间间隔 env.setRestartStrategy(RestartStrategies.fixedDelayRestart( RESTART_ATTEMPTS, // 尝试重启次数 org.apache.flink.api.common.time.Time.of(RESTART_INTERVAL, TimeUnit.SECONDS) ));

//自定义source 生成数据 DataStreamSource<String> dataStreamSource = env.addSource(new DataSource()); //1、采用直接用redis的方式 SingleOutputStreamOperator<OutData> outDataSingleOutputStreamOperator = dataStreamSource.flatMap(new SimpleFlatMapFunction()); //2.asynio// SingleOutputStreamOperator<OutData> outDataSingleOutputStreamOperator =// AsyncDataStream.unorderedWait(dataStreamSource, new SimpaleAsyncIoFunction(), 2000, TimeUnit.MILLISECONDS);// final MapStateDescriptor<String, String> broadcastDes = new MapStateDescriptor<>( "broadcast", String.class, String.class ); BroadcastStream<String> broadcast = env.addSource(new BroadcastSourceFunction()).broadcast(broadcastDes);
SingleOutputStreamOperator<OutData> outDataSingleOutputStreamOperator = dataStreamSource.connect(broadcast).process(new BroadcastProcessFunction<String, String, OutData>() { @Override public void processElement(String s, ReadOnlyContext readOnlyContext, Collector<OutData> collector) throws Exception { ReadOnlyBroadcastState<String, String> broadcastState = readOnlyContext.getBroadcastState(broadcastDes); String broadcastState1 = broadcastState.get("broadcastState"); HashMap<String,String> data = JSONObject.parseObject(broadcastState1, HashMap.class); OriginData originData = JSONObject.parseObject(s, OriginData.class); String countryCode = originData.countryCode; ArrayList<Data> datas = originData.data; String dt = originData.dt; String coutryCode = data.get(countryCode); for (Data datum : datas) { OutData of = OutData.of(dt, coutryCode, datum.type, datum.score, datum.level); collector.collect(of); }
}
@Override public void processBroadcastElement(String s , Context context, Collector<OutData> collector) throws Exception { BroadcastState<String, String> broadcastState = context.getBroadcastState(broadcastDes); broadcastState.remove("broadcastState"); broadcastState.put("broadcastState",s); } });

SingleOutputStreamOperator<String> map = outDataSingleOutputStreamOperator.map(new MapFunction<OutData, String>() {
@Override public String map(OutData outData) throws Exception { return JSON.toJSONString(outData); } }); map.print(); env.execute();
}
static class SimpleFlatMapFunction extends RichFlatMapFunction<String,OutData>{

private transient ConcurrentHashMap<String, String> hashMap = null;

@Override public void open(Configuration parameters) throws Exception { super.open(parameters); Jedis jedisCluster = RedisFactory.getJedisCluster();
ScanResult<Map.Entry<String, String>> areas = jedisCluster.hscan("areas", "0"); List<Map.Entry<String, String>> result = areas.getResult(); System.out.println("更新缓存");
hashMap = new ConcurrentHashMap<>(); for (Map.Entry<String, String> stringStringEntry : result) { String key = stringStringEntry.getKey(); String[] split = stringStringEntry.getValue().split(","); for (String s : split) { hashMap.put(s, key); } } jedisCluster.close(); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println("更新缓存"); Jedis jedisCluster = RedisFactory.getJedisCluster();
ScanResult<Map.Entry<String, String>> areas = jedisCluster.hscan("areas", "0"); List<Map.Entry<String, String>> result = areas.getResult(); hashMap = new ConcurrentHashMap<>(); for (Map.Entry<String, String> stringStringEntry : result) { String key = stringStringEntry.getKey(); String[] split = stringStringEntry.getValue().split(","); for (String s : split) { hashMap.put(s, key); } } jedisCluster.close(); } }, 0, 3, TimeUnit.SECONDS);
}
@Override public void flatMap(String s, Collector<OutData> collector) throws Exception { OriginData originData = JSONObject.parseObject(s, OriginData.class); String countryCode = originData.countryCode; ArrayList<Data> data = originData.data; String dt = originData.dt; String coutryCode = hashMap.get(countryCode); for (Data datum : data) { OutData of = OutData.of(dt, coutryCode, datum.type, datum.score, datum.level); collector.collect(of); } } } static class SimpaleAsyncIoFunction extends RichAsyncFunction<String,OutData> { private transient RedisClient redisClient; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); super.open(parameters); RedisOptions config = new RedisOptions(); config.setHost("hadoop01"); config.setPort(6379);
VertxOptions vo = new VertxOptions(); vo.setEventLoopPoolSize(10); vo.setWorkerPoolSize(20);
Vertx vertx = Vertx.vertx(vo);
redisClient = RedisClient.create(vertx, config); }
@Override public void close() throws Exception { super.close(); super.close(); if(redisClient!=null){ redisClient.close(null); } } @Override public void asyncInvoke(String s, ResultFuture<OutData> resultFuture) throws Exception { OriginData originData = JSONObject.parseObject(s, OriginData.class); String countryCode = originData.countryCode;
redisClient.hscan("areas", "0", ScanOptions.NONE, new Handler<AsyncResult<JsonArray>>() { @Override public void handle(AsyncResult<JsonArray> result) { if (result.succeeded()){ JsonArray result1 = result.result(); if (result1 == null){ resultFuture.complete(null); return; } JsonArray jsonArray = result1.getJsonArray(1); // ["AREA_US","US","AREA_CT","TW,HK","AREA_AR","PK,KW,SA,XX","AREA_IN","IN"] HashMap<String,String> ss = new HashMap<>(); ArrayList<String> keys = new ArrayList<>(); ArrayList<String> values = new ArrayList<>();
for (int i = 0; i <jsonArray.size() ; i++) { if (i % 2 == 0){ keys.add(jsonArray.getString(i)); }else { values.add(jsonArray.getString(i)); } }
for (int i = 0; i < keys.size(); i++) { String s1 = keys.get(i); String s2 = values.get(i); String[] split = s2.split(","); for (String s3 : split) { ss.put(s3,s1); } } String dt = originData.dt; String country = ss.get(countryCode);
for (Data datum : originData.data) { OutData outData = OutData.of(dt, country, datum.type, datum.score, datum.level); resultFuture.complete(Collections.singleton(outData)); }
} else if(result.failed()){ resultFuture.complete(null); return; }
} });
} } static class BroadcastSourceFunction extends RichSourceFunction<String>{
@Override public void run(SourceContext<String> sourceContext) throws Exception { while (true){
Jedis jedisCluster = RedisFactory.getJedisCluster();
ScanResult<Map.Entry<String, String>> areas = jedisCluster.hscan("areas", "0"); List<Map.Entry<String, String>> result = areas.getResult(); HashMap<String, String> hashMap = new HashMap<>();
for (Map.Entry<String, String> stringStringEntry : result) { String key = stringStringEntry.getKey(); String[] split = stringStringEntry.getValue().split(","); for (String s : split) { hashMap.put(s,key);
} }
sourceContext.collect(JSON.toJSONString(hashMap)); jedisCluster.close();
TimeUnit.SECONDS.sleep(3); }



}
@Override public void cancel() {
} }
static class RedisFactory { private static Jedis jedisCluster = null;
private RedisFactory() { }
public static Jedis getJedisCluster() {

jedisCluster = new Jedis(new HostAndPort("hadoop01", Integer.parseInt("6379")));

return jedisCluster; } }

static class OriginData { public String dt; public String countryCode; public ArrayList<Data> data;
public OriginData() { }
public OriginData(String dt, String countryCode, ArrayList<Data> data) { this.dt = dt; this.countryCode = countryCode; this.data = data; }
public static OriginData of(String dt, String countryCode, ArrayList<Data> data) { return new OriginData(dt, countryCode, data); } }
static class Data { public String type; public Double score; public String level;
public Data() { }
public Data(String type, Double score, String level) { this.type = type; this.score = score; this.level = level; }
public static Data of(String type, Double score, String level) { return new Data(type, score, level); } }
static class OutData { public String dt; public String countryCode; public String type; public Double score; public String level;
public OutData() { }
public OutData(String dt, String countryCode, String type, Double score, String level) { this.dt = dt; this.countryCode = countryCode; this.type = type; this.score = score; this.level = level; }
public static OutData of(String dt, String countryCode, String type, Double score, String level) { return new OutData(dt, countryCode, type, score, level); } }
static class DataSource extends RichSourceFunction<String> { private static final String YYYYMMDDHHMMSS = "yyyy-MM-dd HH:mm:ss"; private static Random random = new Random(); private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat(YYYYMMDDHHMMSS); private static String[] countryCodes = {"US","TW","HK","PK","KW","SA","XX","IN"}; private static String[] users = {"s1","s2","s3","s4","s5","s6","s7","s8","s9","s10","s11","s12","s13","s14","s15","s16"}; private static String[] levels = {"A","B","C","D"};
@Override public void run(SourceContext<String> sourceContext) throws Exception { while (true){ int i = random.nextInt(4); long time = System.currentTimeMillis()+ 1000*i; String resDate = simpleDateFormat.format(time); i = random.nextInt(users.length); String user1 = users[i]; Double score1 = Double.valueOf(String.format("%.1f", random.nextDouble()));
String countCode1 = countryCodes[i%countryCodes.length]; String level1 = levels[i%levels.length]; i = random.nextInt(users.length); String user2 = users[i]; String countCode2 = countCode1; String level2 = levels[i%levels.length];
Double score2 = Double.valueOf(String.format("%.1f", random.nextDouble()));
Data data1 = Data.of(user1, score1, level1); Data data2 = Data.of(user2, score2, level2); ArrayList<Data> datas = new ArrayList<>(); datas.add(data1); datas.add(data2); OriginData originData = OriginData.of(resDate, countCode1, datas); String s = JSON.toJSONString(originData); sourceContext.collect(s); TimeUnit.SECONDS.sleep(1); }

}
@Override public void cancel() {
} }}

End


实时数仓 | Flink实时维表join方法总结(附项目源码)






    ▼ 福利时刻 ▼ 




  • 文末扫码后台回复关键词:数据质量、Hive、画像ppt、实时数仓、数据治理,都可获取宝贵干货资源与资料。



Q: 关于大数据,你还想了解什么?

投稿请联系小助手:iom1128『仙子紫霞』

   

关注不迷路~ 各种福利、资源定期分享!

    
      
      
    
[在看、收藏、转发],真爱三连!