数据仓库:Mysql大量数据快速导出
本文来源:https://www.cnblogs.com/ITtangtang/p/7612237.html
背景
写这篇文章主要是介绍一下我做数据仓库ETL同步的过程中遇到的一些有意思的内容和提升程序运行效率的过程。
关系型数据库:
项目初期:游戏的运营数据比较轻量,相关的运营数据是通过Java后台程序聚合查询关系型数据库MySQL完全可以应付,系统通过定时任务每日统计相关数据,等待运营人员查询即可。
项目中后期:随着开服数量增多,玩家数量越来越多,数据库的数据量越来越大,运营后台查询效率越来越低。对于普通的关系型来说,如MySQL,当单表存储记录数超过500万条后,数据库查询性能将变得极为缓慢,而往往我们都不会只做单表查询,还有多表join。这里假如有100个游戏服,每个服有20张表,而每个表有500W数据,那么:
总数据量 = 100 * 20 * 500W = 10亿 按当时的库表结构,换算成磁盘空间,约为100G左右
我的天呐,现在没有单机的内存能同一时间载入100G的数据
https://www.zhihu.com/question/19719997
所以,考虑到这一点,Hive被提出来解决难题!
数据仓库
Hive适合做海量数据的数据仓库工具, 因为数据仓库中的数据有这两个特点:最全的历史数据(海量)、相对稳定的;所谓相对稳定,指的是数据仓库不同于业务系统数据库,数据经常会被更新,数据一旦进入数据仓库,很少会被更新和删除,只会被大量查询。而Hive,也是具备这两个特点
https://my.oschina.net/leejun2005/blog/189035
二、项目架构设计
在这里先说下初期项目架构的探索,因为数据流向,其实最终就是从MYSQL--------->Hive中,我使用的是Jdbc方式。为什么不使用下列工具呢?
DataX 阿里开源的数据同步中间件,没做过详细研究
1、全局缓存队列
使用生产者消费者模型,中间使用内存,数据落地成txt
首先生产者通过Jdbc获取源数据内容,放入固定大小的缓存队列,同时消费者不断的从缓存读取数据,根据不同的数据类型分别读取出来,并逐条写入相应的txt文件。
速度每秒约8000条。
这样做表面上看起来非常美好,流水式的处理,来一条处理一下,可是发现消费的速度远远赶不上生产的速度,生产出来的数据会堆积在缓存队列里面,假如队列不固定长度的话,这时候还会大量消耗内存,所以为了提升写入的速度,决定采用下一种方案
2、每一张表一个缓存队列及writer接口
每张表各自起一个生产者消费者模型,消费者启动时初始化相应的writer接口,架构设计如下:
table1的生产者通过Jdbc获取源数据内容,放入table自带的固定大小的缓存队列,同时table1相应的消费者不断的从缓存读取数据,根据不同的数据类型分别读取出来,并逐条写入相应的txt文件。
速度每秒约2W条。
这样生产者线程可以并发的进行,通过控制生产者线程的数量,可以大大提高处理的效率, 项目关键代码如下:
1)线程池
1/***
2 *
3 *
4 * @描述 任务线程池
5 */
6public class DumpExecuteService {
7
8 private static ExecutorService dumpServerWorkerService; // 游戏服任务
9 private static ExecutorService dumpTableWorkerService; // 表数据任务
10 private static ExecutorService dumpReaderWorkerService; // 读取数据任务
11 private static ExecutorService dumpWriterWorkerService; // 写数据结果任务
12
13 /***
14 * 初始化任务线程池
15 * @param concurrencyDBCount 并发数量
16 */
17 public synchronized static void startup(int concurrencyDBCount) {
18
19 if (dumpServerWorkerService != null)
20 return;
21
22 if (concurrencyDBCount > 2)
23 concurrencyDBCount = 2; // 最多支持两个数据库任务并发执行
24
25 if (concurrencyDBCount < 1)
26 concurrencyDBCount = 1;
27
28 dumpServerWorkerService = Executors.newFixedThreadPool(concurrencyDBCount, new NamedThreadFactory(
29 "DumpExecuteService.dumpServerWorkerService" + System.currentTimeMillis()));
30 dumpTableWorkerService = Executors.newFixedThreadPool(2, new NamedThreadFactory("DumpExecuteService.dumpTableWorkerService"
31 + System.currentTimeMillis()));
32 dumpWriterWorkerService = Executors.newFixedThreadPool(8, new NamedThreadFactory("DumpExecuteService.dumpWriterWorkerService"
33 + System.currentTimeMillis()));
34 dumpReaderWorkerService = Executors.newFixedThreadPool(2, new NamedThreadFactory("DumpExecuteService.dumpReaderWorkerService"
35 + System.currentTimeMillis()));
36 }
37
38 public static Future<Integer> submitDumpServerWorker(DumpServerWorkerLogic worker) {
39 return dumpServerWorkerService.submit(worker);
40 }
41
42 public static Future<Integer> submitDumpWriteWorker(DumpWriteWorkerLogic worker) {
43 return dumpWriterWorkerService.submit(worker);
44 }
45
46 public static Future<Integer> submitDumpReadWorker(DumpReadWorkerLogic worker) {
47 return dumpReaderWorkerService.submit(worker);
48 }
49
50 public static Future<Integer> submitDumpTableWorker(DumpTableWorkerLogic worker) {
51 return dumpTableWorkerService.submit(worker);
52 }
53
54 /***
55 * 关闭线程池
56 */
57 public synchronized static void shutdown() {
58
59 //执行线程池关闭...
60 }
61}
说明:该类定义4个线程池,分别用于执行不同的任务
2)游戏服任务线程池
1/**
2 * 1) 获取 游戏服log库数据库连接
3 2) 依次处理单张表
4 */
5public class DumpServerWorkerLogic extends AbstractLogic implements Callable<Integer> {
6 private static Logger logger = LoggerFactory.getLogger(DumpServerWorkerLogic.class);
7
8 private final ServerPO server;// 数据库
9
10 private final String startDate;// 开始时间
11
12 private SourceType sourceType;// 数据来源类型
13
14 private Map<String, Integer> resultDBMap;// 表记录计数
15
16 private GameType gameType;
17
18 public DumpServerWorkerLogic(ServerPO server, String startDate, SourceType sourceType, Map<String, Integer> resultDBMap,
19 GameType gameType) {
20 CheckUtil.checkNotNull("DumpServerWorkerLogic.server", server);
21 CheckUtil.checkNotNull("DumpServerWorkerLogic.startDate", startDate);
22 CheckUtil.checkNotNull("DumpServerWorkerLogic.sourceType", sourceType);
23 CheckUtil.checkNotNull("DumpServerWorkerLogic.resultDBMap", resultDBMap);
24 CheckUtil.checkNotNull("DumpServerWorkerLogic.gameType", gameType);
25
26 this.server = server;
27 this.startDate = startDate;
28 this.sourceType = sourceType;
29 this.resultDBMap = resultDBMap;
30 this.gameType = gameType;
31 }
32
33 @Override
34 public Integer call() {
35
36 // 获取连接, 并取得该库的所有表
37 Connection conn = null;
38 try {
39 conn = JdbcUtils.getDbConnection(server);
40 }
41 catch (Exception e) {
42 throw new GameRuntimeException(e.getMessage(), e);
43 }
44 List<String> tableNames = null;
45 DumpDbInfoBO dumpDbInfoBO = DumpConfig.getDumpDbInfoBO();
46
47 int totalRecordCount = 0;
48 try {
49 switch (this.sourceType) {
50 case GAME_LOG:
51 tableNames = JdbcUtils.getAllTableNames(conn);
52 break;
53 case INFOCENTER:
54 tableNames = dumpDbInfoBO.getIncludeInfoTables();
55 tableNames.add("pay_action");
56 break;
57 case EVENT_LOG:
58 tableNames = new ArrayList<String>();
59 Date date = DateTimeUtil.string2Date(startDate, "yyyy-MM-dd");
60 String sdate = DateTimeUtil.date2String(date, "yyyyMMdd");
61 String smonth = DateTimeUtil.date2String(date, "yyyyMM");
62 tableNames.add("log_device_startup" + "_" + smonth);
63 tableNames.add("log_device" + "_" + sdate);
64 break;
65 }
66
67 // 遍历table
68 for (String tableName : tableNames) {
69 // 过滤
70 if (dumpDbInfoBO.getExcludeTables().contains(tableName))
71 continue;
72 DumpTableWorkerLogic tableTask = new DumpTableWorkerLogic(conn, server, tableName, startDate, resultDBMap,
73 gameType, sourceType);
74 Future<Integer> tableFuture = DumpExecuteService.submitDumpTableWorker(tableTask);
75 int count = tableFuture.get();
76 totalRecordCount += count;
77 logger.info(String.format("DumpServerWorkerLogic %s-%s.%s be done", startDate, server.getLogDbName(), tableName));
78 }
79 return totalRecordCount;
80 } catch (Exception e) {
81 throw new GameRuntimeException(e, "DumpTableWorkerLogic fail. server={%s}, errorMsg={%s} ",server.getId(), e.getMessage());
82 } finally {
83 JdbcUtils.closeConnection(conn);
84 }
85
86 }
87
88}
3)表处理任务,一个表一个
1/***
2 *
3 *
4 * @描述 创建一个表查询结果写任务 (一个表一个)
5 */
6public class DumpTableWorkerLogic implements Callable<Integer> {
7 private static Logger logger = LoggerFactory.getLogger(DumpTableWorkerLogic.class);
8
9 private final String tableName;
10 private final Connection conn;
11
12 private ServerPO server;
13
14 private String startDate;
15
16 private Map<String, Integer> resultDBMap;// 表记录计数
17
18 private GameType gameType;
19
20 private SourceType sourceType;// 数据来源类型
21
22 public DumpTableWorkerLogic(Connection conn, ServerPO server, String tableName, String startDate, Map<String, Integer> resultDBMap,
23 GameType gameType, SourceType sourceType) {
24 CheckUtil.checkNotNull("DumpTableWorkerLogic.conn", conn);
25 CheckUtil.checkNotNull("DumpTableWorkerLogic.tableName", tableName);
26 CheckUtil.checkNotNull("DumpTableWorkerLogic.server", server);
27 CheckUtil.checkNotNull("DumpTableWorkerLogic.startDate", startDate);
28 CheckUtil.checkNotNull("DumpTableWorkerLogic.resultDBMap", resultDBMap);
29 CheckUtil.checkNotNull("DumpTableWorkerLogic.gameType", gameType);
30 CheckUtil.checkNotNull("DumpServerWorkerLogic.sourceType", sourceType);
31
32 this.conn = conn;
33 this.tableName = tableName;
34 this.server = server;
35 this.startDate = startDate;
36 this.resultDBMap = resultDBMap;
37 this.gameType = gameType;
38 this.sourceType = sourceType;
39
40 logger.info("DumpTableWorkerLogic[{}] Reg", tableName);
41 }
42
43 @Override
44 public Integer call() {
45 logger.info("DumpTableWorkerLogic[{}] Start", tableName);
46
47 // 写检查结果任务
48 DumpWriteWorkerLogic writerWorker = new DumpWriteWorkerLogic(server, tableName, startDate, resultDBMap, gameType,
49 sourceType);
50 Future<Integer> writeFuture = DumpExecuteService.submitDumpWriteWorker(writerWorker);
51 logger.info("DumpTableWorkerLogic[{}] writer={}", tableName);
52
53 // 数据查询任务
54 DumpReadWorkerLogic readerWorker = new DumpReadWorkerLogic(conn, tableName, writerWorker, startDate);
55 DumpExecuteService.submitDumpReadWorker(readerWorker);
56 logger.info("DumpTableWorkerLogic[{}] reader={}", tableName);
57
58 try {
59 int writeCount = writeFuture.get();
60 logger.info("DumpTableWorkerLogic[{}] ---" + startDate + "---" + server.getId() + "---" + tableName + "---导出数据条数---"
61 + writeCount);
62 return writeCount;
63 } catch (Exception e) {
64 throw new GameRuntimeException(e, "DumpTableWorkerLogic fail. tableName={%s}, errorMsg={%s} ",tableName, e.getMessage());
65 }
66 }
67
68}
4)单表读取任务线程
1/***
2 * mysql读取数据任务
3 *
4 */
5public class DumpReadWorkerLogic implements Callable<Integer> {
6
7 private static Logger logger = LoggerFactory.getLogger(DumpReadWorkerLogic.class);
8
9 private String tableName;
10
11 private final Connection conn;
12
13 private DumpWriteWorkerLogic writerWorker; // 写结果数据任务
14
15 private String startDate;// 开始导出日期
16
17 private static final int LIMIT = 50000;// 限制sql一次读出条数
18
19 public DumpReadWorkerLogic(Connection conn, String tableName, DumpWriteWorkerLogic writerWorker, String startDate) {
20 CheckUtil.checkNotNull("MysqlDataReadWorker.conn", conn);
21 CheckUtil.checkNotNull("MysqlDataReadWorker.tableName", tableName);
22 CheckUtil.checkNotNull("MysqlDataReadWorker.startDate", startDate);
23
24 this.conn = conn;
25 this.tableName = tableName;
26 this.writerWorker = writerWorker;
27 this.startDate = startDate;
28
29 logger.info("DumpReadWorkerLogic Reg. tableName={}", this.tableName);
30 }
31
32 @Override
33 public Integer call() {
34 try {
35 List<Map<String, Object>> result = JdbcUtils.queryForList(conn, "show full fields from " + tableName);
36
37 int index = 0;
38 String querySql = "";
39
40 int totalCount = 0;
41 while (true) {
42 int offset = index * LIMIT;
43 querySql = DumpLogic.getTableQuerySql(result, tableName, true, startDate) + " limit " + offset + "," + LIMIT;
44 int row = DumpLogic.query(conn, querySql, writerWorker);
45 totalCount += row;
46 logger.info("tableName=" + tableName + ", offset=" + offset + ", index=" + index + ", row=" + row + ", limit=" + LIMIT);
47 if (row < LIMIT)
48 break;
49 index++;
50 }
51 writerWorker.prepareClose();
52 logger.info(startDate + "---" + tableName + "---Read.End");
53 return totalCount;
54 }
55 catch (Exception e) {
56 throw new GameRuntimeException(e, "MysqlDataReadWorker fail. tableName={%s}, errorMsg={%s} ",tableName, e.getMessage());
57 }
58 }
59
60}
5)单表写入任务线程
1/***
2 *
3 *
4 * @描述 mysql数据导出任务
5 */
6public class DumpWriteWorkerLogic implements Callable<Integer> {
7
8 private static final Logger logger = LoggerFactory.getLogger(DumpWriteWorkerLogic.class);
9 private String tableName;// 表名
10
11 private AtomicBoolean alive; // 线程是否活着
12
13 private BufferedWriter writer;
14
15 private ArrayBlockingQueue<String> queue; // 消息队列
16
17 private ServerPO server;// 服务器
18
19 private String startDate;// 开始时间
20
21 private Map<String, Integer> resultDBMap;// 当天某服某表数量记录
22
23 private GameType gameType;
24
25 private SourceType sourceType;// 数据来源类型
26
27 public DumpWriteWorkerLogic(ServerPO server, String tableName, String startDate, Map<String, Integer> resultDBMap, GameType gameType,
28 SourceType sourceType) {
29 CheckUtil.checkNotNull("DumpWriteWorkerLogic.tableName", tableName);
30 CheckUtil.checkNotNull("DumpWriteWorkerLogic.server", server);
31 CheckUtil.checkNotNull("DumpWriteWorkerLogic.startDate", startDate);
32 CheckUtil.checkNotNull("DumpWriteWorkerLogic.resultDBMap", resultDBMap);
33 CheckUtil.checkNotNull("DumpWriteWorkerLogic.gameType", gameType);
34 CheckUtil.checkNotNull("DumpWriteWorkerLogic.sourceType", sourceType);
35
36 this.tableName = tableName;
37 this.server = server;
38 this.startDate = startDate;
39 this.queue = new ArrayBlockingQueue<>(65536);
40 this.alive = new AtomicBoolean(true);
41 this.gameType = gameType;
42 this.sourceType = sourceType;
43 this.writer = createWriter();
44 this.resultDBMap = resultDBMap;
45
46 logger.info("DumpWriteWorkerLogic Reg. tableName={}", this.tableName);
47 }
48
49 /***
50 * 创建writer, 若文件不存在,会新建文件
51 *
52 * @param serverId
53 * @return
54 */
55 private BufferedWriter createWriter() {
56 try {
57 File toFile = FileUtils.getFilenameOfDumpTable(sourceType, tableName, startDate, gameType, ".txt");
58 if (!toFile.exists()) {
59 FileUtils.createFile(sourceType, tableName, startDate, gameType);
60 }
61 return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(toFile, true), Charsets.UTF_8), 5 * 1024 * 1024);
62 } catch (Exception e) {
63 throw new GameRuntimeException(e, "DumpWriteWorkerLogic createWriter fail. server={%s}, errorMsg={%s} ",server.getId(), e.getMessage());
64 }
65 }
66
67 /***
68 * 写入文件
69 *
70 * @param line
71 * 一条记录
72 */
73 private void writeToFile(String line) {
74 try {
75 this.writer.write(line + "\n");
76 } catch (Exception e) {
77 throw new GameRuntimeException(e, "DumpWriteWorkerLogic writeToFile fail. errorMsg={%s} ", e.getMessage());
78 }
79 }
80
81 /**
82 * 记录数据到消息队列; 如果消息队列满了, 会阻塞直到可以put为止
83 *
84 * @param result
85 */
86 public void putToWriterQueue(String line) {
87
88 CheckUtil.checkNotNull("DumpWriteWorkerLogic putToWriterQueue", line);
89
90 try {
91 queue.put(line);
92 } catch (InterruptedException e) {
93 throw new GameRuntimeException(e, "DumpWriteWorkerLogic putToWriterQueue fail. errorMsg={%s} ", e.getMessage());
94 }
95 }
96
97 /**
98 * 准备关闭 (通知一下"需要处理的用户数据都处理完毕了"; task 写完数据, 就可以完毕了)
99 */
100 public void prepareClose() {
101 alive.set(false);
102 }
103
104 @Override
105 public Integer call() {
106 logger.info("DumpWriteWorkerLogic Start. tableName={}", this.tableName);
107 try {
108 int totalCount = 0;
109 while (alive.get() || !queue.isEmpty()) {
110 List<String> dataList = new ArrayList<String>();
111 queue.drainTo(dataList);
112 int count = processDataList(dataList);
113 totalCount += count;
114 }
115 logger.info("DumpWriteWorkerLogic ---" + startDate + "---" + tableName + "---Writer.End");
116 return totalCount;
117 } catch (Exception exp) {
118 throw new GameRuntimeException(exp, "DumpWriteWorkerLogic call() fail. errorMsg={%s} ", exp.getMessage());
119 } finally {
120 FileUtil.close(this.writer);
121 }
122 }
123
124 /***
125 * 处理数据:写入本地文件及map
126 *
127 * @param dataList
128 * 数据集合
129 * @return
130 */
131 private int processDataList(List<String> dataList) {
132 int totalCount = 0;
133
134 // 所有记录
135 String key = server.getId() + "#" + tableName + "#" + sourceType.getIndex();
136 if (dataList != null && dataList.size() > 0) {
137
138 for (String line : dataList) {
139
140 // 按行写入文件
141 writeToFile(line);
142
143 // 记录到result_data_record_count
144 if (resultDBMap.get(key) != null) {
145 resultDBMap.put(key, resultDBMap.get(key) + 1);
146 }
147 else {
148 resultDBMap.put(key, 1);
149 }
150
151 totalCount++;
152 }
153 }
154
155 return totalCount;
156 }
157
158}
内存优化
1、使用Jdbc方式获取数据,如果这个数据表比较大,那么获取数据的速度特别慢;
2、这个进程还会占用非常大的内存,并且GC不掉。分析原因,Jdbc获取数据的时候,会一次将所有数据放入到内存,如果同步的数据表非常大,那么甚至会将内存撑爆。
那么优化的方法是让Jdbc不是一次全部将数据拿到内存,而是分页获取,每次最大limit数设置为50000,请参考read线程。
经过这种架构优化后,5000W数据大约花费40min可完成导出
说明:
因为本文只是记录项目的设计过程,详细的代码后面会开源。
●
●
●
●
●
●
●
●
●
●
●
●