vlambda博客
学习文章列表

一次Impala upsert kudu执行缓慢问题排查总结

问题背景

BI同学会用ImpalaKudu表上跑一些ETL任务,最近,BI同学反馈一个Kudu表的ETL任务突然变慢,执行时间从原来的不到1分钟到现在的7分钟。

解决过程

下文中提到的软件环境为:

  • Impala 3.2.0-cdh6.2.0 RELEASE

  • Kudu 1.9.0-cdh6.2.0

我们主要从SQL语句执行的操作了解该SQL的复杂度,并阅读该SQLprofile信息一步步进行排查,找出产生该问题的原因。以下是排查步骤:

1、该ETL任务的SQL语句执行的是一个UPSERT...SELECT操作,大体结构如下:

UPSERT INTO TABLE rtl_ods_test.aSELECT ...FROM rtl_ods_test.test1LEFT JOIN ...

2、接着我们从执行该SQLimpalad节点获取SQLprofile信息,profile信息的Summary部分如下:

Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem Est. Peak Mem Detail --------------------------------------------------------------------------------------------------------------------------------------------F00:KUDU WRITER 1 6m57s 6m57s 20.06 MB 20.00 MB 47:HASH JOIN 1 55.813ms 55.813ms 1.02M -1 45.76 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--72:EXCHANGE 1 179.577us 179.577us 7.71K -1 1.00 MB 152.95 KB BROADCAST | F24:EXCHANGE SENDER 1 4.155ms 4.155ms 2.55 KB 0 | 24:SCAN KUDU 1 7.886ms 7.886ms 7.71K -1 948.00 KB 1.88 MB rtl_ods_test.test24 t24 46:HASH JOIN 1 67.005ms 67.005ms 1.02M -1 44.93 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--71:EXCHANGE 1 842.112us 842.112us 39.38K -1 10.38 MB 431.86 KB BROADCAST | F23:EXCHANGE SENDER 1 29.824ms 29.824ms 920.00 B 0 | 70:AGGREGATE 1 62.423ms 62.423ms 39.38K -1 45.96 MB 128.00 MB FINALIZE | 69:EXCHANGE 1 873.411us 873.411us 39.38K -1 1.02 MB 431.96 KB HASH(hi.order_id) | F22:EXCHANGE SENDER 1 30.023ms 30.023ms 920.00 B 0 | 23:AGGREGATE 1 632.151ms 632.151ms 39.38K -1 46.57 MB 128.00 MB STREAMING | 22:SCAN KUDU 1 6.023ms 6.023ms 319.27K -1 16.95 MB 1.50 MB rtl_ods_test.test23 hi 45:HASH JOIN 1 70.527ms 70.527ms 1.02M -1 44.93 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--68:EXCHANGE 1 77.392us 77.392us 3.51K -1 480.00 KB 101.97 KB BROADCAST | F21:EXCHANGE SENDER 1 2.941ms 2.941ms 3.88 KB 0 | 21:SCAN KUDU 1 4.198ms 4.198ms 3.51K -1 321.00 KB 768.00 KB rtl_ods_test.test22 t22 44:HASH JOIN 1 63.624ms 63.624ms 1.02M -1 46.91 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--67:EXCHANGE 1 783.586us 783.586us 44.05K -1 3.04 MB 86.98 KB BROADCAST | F20:EXCHANGE SENDER 1 11.177ms 11.177ms 4.73 KB 0 | 20:SCAN KUDU 1 9.691ms 9.691ms 44.05K -1 1.22 MB 1.12 MB rtl_ods_test.test21 t21 43:HASH JOIN 1 40.688ms 40.688ms 654.69K -1 44.90 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--66:EXCHANGE 1 886.154us 886.154us 37.54K -1 1.73 MB 80.98 KB BROADCAST | F19:EXCHANGE SENDER 1 5.768ms 5.768ms 5.12 KB 0 | 19:SCAN KUDU 1 6.936ms 6.936ms 37.54K -1 2.11 MB 768.00 KB rtl_ods_test.test20 t20 42:HASH JOIN 1 35.599ms 35.599ms 527.37K -1 44.89 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--65:EXCHANGE 1 648.607us 648.607us 37.54K -1 1.73 MB 80.98 KB BROADCAST | F18:EXCHANGE SENDER 1 6.722ms 6.722ms 5.12 KB 0 | 18:SCAN KUDU 1 6.763ms 6.763ms 37.54K -1 2.27 MB 768.00 KB rtl_ods_test.test19 t19 41:HASH JOIN 1 74.309ms 74.309ms 527.37K -1 44.91 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--64:EXCHANGE 1 16.129us 16.129us 5 -1 16.00 KB 173.95 KB BROADCAST | F17:EXCHANGE SENDER 1 125.453us 125.453us 2.24 KB 0 | 17:SCAN KUDU 1 4.313ms 4.313ms 5 -1 179.00 KB 2.25 MB rtl_ods_test.test18 t18 40:HASH JOIN 1 18.218ms 18.218ms 266.00K -1 44.05 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--63:EXCHANGE 1 12.178us 12.178us 51 -1 16.00 KB 92.97 KB BROADCAST | F16:EXCHANGE SENDER 1 85.665us 85.665us 4.41 KB 0 | 16:SCAN KUDU 1 7.677ms 7.677ms 51 -1 95.00 KB 1.12 MB rtl_ods_test.test17 t17 39:HASH JOIN 1 10.809ms 10.809ms 132.23K -1 46.05 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--62:EXCHANGE 1 939.796us 939.796us 44.05K -1 3.04 MB 89.97 KB BROADCAST | F15:EXCHANGE SENDER 1 10.317ms 10.317ms 4.57 KB 0 | 15:SCAN KUDU 1 7.449ms 7.449ms 44.05K -1 2.34 MB 1.50 MB rtl_ods_test.test16 t16 38:HASH JOIN 1 8.885ms 8.885ms 73.13K -1 44.04 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--61:EXCHANGE 1 73.579us 73.579us 2.07K -1 160.00 KB 83.98 KB BROADCAST | F14:EXCHANGE SENDER 1 783.596us 783.596us 4.92 KB 0 | 14:SCAN KUDU 1 2.621ms 2.621ms 2.07K -1 172.00 KB 1.12 MB rtl_ods_test.test15 t15 37:HASH JOIN 1 9.465ms 9.465ms 73.13K -1 43.20 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--60:EXCHANGE 1 65.940us 65.940us 2.07K -1 160.00 KB 83.98 KB BROADCAST | F13:EXCHANGE SENDER 1 851.380us 851.380us 4.92 KB 0 | 13:SCAN KUDU 1 4.485ms 4.485ms 2.07K -1 172.00 KB 1.12 MB rtl_ods_test.test14 t14 36:HASH JOIN 1 11.477ms 11.477ms 73.13K -1 42.37 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--59:EXCHANGE 1 999.398us 999.398us 39.38K -1 5.12 MB 170.95 KB BROADCAST | F12:EXCHANGE SENDER 1 12.604ms 12.604ms 2.24 KB 0 | 12:SCAN KUDU 1 13.496ms 13.496ms 39.38K -1 1.40 MB 1.50 MB rtl_ods_test.test13 t13 35:HASH JOIN 1 10.903ms 10.903ms 73.13K -1 41.54 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--58:EXCHANGE 1 974.475us 974.475us 39.38K -1 5.12 MB 140.96 KB BROADCAST | F11:EXCHANGE SENDER 1 11.882ms 11.882ms 2.78 KB 0 | 11:SCAN KUDU 1 12.115ms 12.115ms 39.38K -1 1.57 MB 3.00 MB rtl_ods_test.test12 t12 34:HASH JOIN 1 11.990ms 11.990ms 73.13K -1 41.53 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--57:EXCHANGE 1 634.765us 634.765us 31.51K -1 4.12 MB 176.95 KB BROADCAST | F10:EXCHANGE SENDER 1 6.817ms 6.817ms 2.24 KB 0 | 10:SCAN KUDU 1 18.167ms 18.167ms 31.51K -1 1.22 MB 2.25 MB rtl_ods_test.test11 t11 33:HASH JOIN 1 14.638ms 14.638ms 73.13K -1 326.70 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--56:EXCHANGE 1 85.997ms 85.997ms 4.28M -1 14.01 MB 83.98 KB BROADCAST | F09:EXCHANGE SENDER 1 909.434ms 909.434ms 4.92 KB 0 | 09:SCAN KUDU 1 26.361ms 26.361ms 4.28M -1 5.12 MB 1.12 MB rtl_ods_test.test10 t10 32:HASH JOIN 1 782.440ms 782.440ms 73.13K -1 428.69 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--55:EXCHANGE 1 163.681ms 163.681ms 8.23M -1 16.06 MB 59.98 KB BROADCAST | F08:EXCHANGE SENDER 1 723.689ms 723.689ms 7.52 KB 0 | 08:SCAN KUDU 1 23.223ms 23.223ms 8.23M -1 1.51 MB 768.00 KB rtl_ods_test.test9 t9 31:HASH JOIN 1 11.533ms 11.533ms 60.13K -1 39.86 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--54:EXCHANGE 1 663.209us 663.209us 37.54K -1 1.73 MB 80.98 KB BROADCAST | F07:EXCHANGE SENDER 1 7.003ms 7.003ms 5.12 KB 0 | 07:SCAN KUDU 1 6.692ms 6.692ms 37.54K -1 2.35 MB 768.00 KB rtl_ods_test.test8 t8 30:HASH JOIN 1 13.723ms 13.723ms 39.38K -1 39.02 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--53:EXCHANGE 1 109.517us 109.517us 3.51K -1 256.00 KB 89.97 KB BROADCAST | F06:EXCHANGE SENDER 1 994.329us 994.329us 4.57 KB 0 | 06:SCAN KUDU 1 3.084ms 3.084ms 3.51K -1 276.00 KB 1.50 MB rtl_ods_test.test7 t7 29:HASH JOIN 1 6.471ms 6.471ms 39.38K -1 38.19 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--52:EXCHANGE 1 112.333us 112.333us 3.51K -1 256.00 KB 89.97 KB BROADCAST | F05:EXCHANGE SENDER 1 1.071ms 1.071ms 4.57 KB 0 | 05:SCAN KUDU 1 3.184ms 3.184ms 3.51K -1 276.00 KB 1.50 MB rtl_ods_test.test6 t6 28:HASH JOIN 1 9.355ms 9.355ms 39.38K -1 37.36 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--51:EXCHANGE 1 812.178us 812.178us 31.34K -1 7.87 MB 329.90 KB BROADCAST | F04:EXCHANGE SENDER 1 9.485ms 9.485ms 1.15 KB 0 | 04:SCAN KUDU 1 26.494ms 26.494ms 31.34K -1 2.00 MB 7.12 MB rtl_ods_test.test5 t5 27:HASH JOIN 1 8.991ms 8.991ms 39.38K -1 36.52 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--50:EXCHANGE 1 941.754us 941.754us 39.38K -1 5.12 MB 266.92 KB BROADCAST | F03:EXCHANGE SENDER 1 6.270ms 6.270ms 1.47 KB 0 | 03:SCAN KUDU 1 16.042ms 16.042ms 39.38K -1 950.00 KB 5.62 MB rtl_ods_test.test4 t4 26:HASH JOIN 1 8.908ms 8.908ms 39.38K -1 35.69 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--49:EXCHANGE 1 783.992us 783.992us 39.38K -1 5.12 MB 269.92 KB BROADCAST | F02:EXCHANGE SENDER 1 14.110ms 14.110ms 1.39 KB 0 | 02:SCAN KUDU 1 7.860ms 7.860ms 39.38K -1 2.59 MB 3.75 MB rtl_ods_test.test3 t3 25:HASH JOIN 1 44.403ms 44.403ms 39.38K -1 34.86 MB 2.00 GB LEFT OUTER JOIN, BROADCAST |--48:EXCHANGE 1 980.101us 980.101us 39.38K -1 5.12 MB 155.95 KB BROADCAST | F01:EXCHANGE SENDER 1 7.181ms 7.181ms 2.55 KB 0 | 01:SCAN KUDU 1 9.338ms 9.338ms 39.38K -1 1.39 MB 2.25 MB rtl_ods_test.test2 t2 00:SCAN KUDU                  1    6.483ms    6.483ms   39.38K          -1   16.66 MB       14.62 MB  rtl_ods_test.test1 t1

Summary部分,我们看到该SQLSCAN KUDUHASH JOIN操作都非常快,时间主要花费在了往Kudu写数据的操作上,F00:KUDU WRITER的平均时间Avg Time和最大时间Max Time都为6m57s

3、F00Impala profile中一个Fragment的编号,接着我们通过搜索F00关键字找到Summary下面执行明细部分的Fragment

Fragment F00: Instance 424107027ccceb5c:1f03c57400000018 (host=kudu07:22000):(Total: 7m5s, non-child: 0.000ns, % non-child: 0.00%) Last report received time: 2020-02-10 15:07:54.309 Hdfs split stats (<volume id>:<# splits>/<split lengths>):  Fragment Instance Lifecycle Event Timeline: 7m5s - Prepare Finished: 6.511ms (6.511ms) - Open Finished: 7s632ms (7s626ms) - First Batch Produced: 7s632ms (206.874us) - First Batch Sent: 7s669ms (36.252ms) - ExecInternal Finished: 7m5s (6m57s) - MemoryUsage (8s000ms): 366.28 MB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.47 GB, 1.46 GB, 1.46 GB - ThreadUsage (8s000ms): 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 - AverageThreadTokens: 1.73  - BloomFilterBytes: 0 - ExchangeScanRatio: 0.00  - PeakMemoryUsage: 1.47 GB (1583111129) - PeakReservation: 1.43 GB (1530920960) - PeakUsedReservation: 0 - PerHostPeakMemUsage: 1.47 GB (1583111129) - RowsProduced: 1.02M (1024494) - TotalNetworkReceiveTime: 1s388ms - TotalNetworkSendTime: 0.000ns - TotalStorageWaitTime: 154.348ms - TotalThreadsInvoluntaryContextSwitches: 995 (995) - TotalThreadsTotalWallClockTime: 12m15s - TotalThreadsSysTime: 3s400ms - TotalThreadsUserTime: 47s997ms - TotalThreadsVoluntaryContextSwitches: 12.47K (12471) Buffer pool: - AllocTime: 0.000ns - CumulativeAllocationBytes: 0 - CumulativeAllocations: 0 (0) - PeakReservation: 0 - PeakUnpinnedBytes: 0 - PeakUsedReservation: 0 - ReadIoBytes: 0 - ReadIoOps: 0 (0) - ReadIoWaitTime: 0.000ns - ReservationLimit: 0 - SystemAllocTime: 0.000ns - WriteIoBytes: 0 - WriteIoOps: 0 (0) - WriteIoWaitTime: 0.000ns Fragment Instance Lifecycle Timings: - ExecTime: 6m57s - ExecTreeExecTime: 514.258ms - OpenTime: 7s626ms - ExecTreeOpenTime: 1s763ms - PrepareTime: 6.340ms - ExecTreePrepareTime: 3.879ms KuduTableSink:(Total: 6m57s, non-child: 6m57s, % non-child: 100.00%) - KuduApplyTimer: 6m27s - NumRowErrors: 0 (0) - PeakMemoryUsage: 20.06 MB (21037056) - RowsProcessedRate: 2.46 K/sec - TotalNumRows: 1.02M (1024494) HASH_JOIN_NODE (id=47):(Total: 2s282ms, non-child: 55.813ms, % non-child: 2.23%) ExecOption: Probe Side Codegen Enabled, Join Build-Side Prepared Asynchronously Node Lifecycle Event Timeline: 7m5s - Open Started: 5s867ms (5s867ms) - Open Finished: 7s631ms (1s763ms) - First Batch Requested: 7s632ms (1.524ms) - First Batch Returned: 7s632ms (202.665us) - Last Batch Returned: 7m5s (6m57s) - Closed: 7m5s (178.375ms) - BuildRows: 7.71K (7708) - BuildTime: 39.969ms - NumHashTableBuildsSkipped: 0 (0) - PeakMemoryUsage: 45.76 MB (47980800) - ProbeRows: 1.02M (1024494) - ProbeRowsPartitioned: 0 (0) - ProbeTime: 51.260ms - RowsReturned: 1.02M (1024494) - RowsReturnedRate: 448.83 K/sec

从这里我们可以看到,这个FragmentHASH_JOIN操作共返回结果1024494条(RowsReturned: 1.02M (1024494)),接着Impala开始向Kudu写入数据,向Kudu写数据的profile信息如下:

KuduTableSink:(Total: 6m57s, non-child: 6m57s, % non-child: 100.00%) - KuduApplyTimer: 6m27s - NumRowErrors: 0 (0) - PeakMemoryUsage: 20.06 MB (21037056) - RowsProcessedRate: 2.46 K/sec - TotalNumRows: 1.02M (1024494)

KuduTableSink中的信息表明,向Kudu写入数据总花费时间6m57s(Total: 6m57s),内存使用最大值20.06 MB(PeakMemoryUsage: 20.06 MB),每秒处理数据2.46 K/sec(RowsProcessedRate: 2.46 K/sec),总写入数据为1024494条(TotalNumRows: 1.02M (1024494))

Kudu是通过主键来判断记录是否重复的,它实现upsert操作的原理主要是通过判断插入的记录是否存在主键冲突来决定是插入还是更新,当出现主键冲突时进行更新操作,若无冲突则进行插入操作。我们将这些信息反馈给BI同学,他们查了SQL后发现这条SQL返回的结果大部分都是重复的,最终导致将结果集upsertKudu时,花费了很长时间。他们对同步任务的相关表调整后,该ETL任务的执行时间恢复正常。

KuduTableSink源码分析

上面提到的KuduTableSinkImpalaKudu写数据的一个类,该类的声明如下:

class KuduTableSink : public DataSink

KuduTableSink类实现了DataSink接口,DataSink接口是Impala为不同sink操作定义的一个抽象接口,比如向HDFS表写数据操作、 通过网络发送数据操作、为join操作构建hash tables操作等等。该接口的实现关系如下图:

KuduTableSink类的成员信息如下:


KuduTableSink定义了我们在上面的Fragment中看到的metric信息:

  • kudu_apply_timer_:花费在Kudu操作上的时间,在正常情况下,Apply()应该是可以忽略不计,因为它在启用AUTO_FLUSH_BACKGROUND情况下是异步的,在Apply()中花费的大量时间可能表明,Kudu不能像sink写行那样快地缓冲和发送行

  • total_rows_:处理的总行数,包括写入Kudu的行数和有错误的行数

  • num_row_errors_:有错误的行数

  • rows_processed_rate_sink消费和处理行的速率

除这些metric之外,KuduTableSink定义了操作Kudu的方法,这些方法主要遵循DataSink接口定义的方法实现标准,这些方法的作用如下:

  • virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker):该方法在Send()、Open()、 Close()方法执行之前调用,主要为KuduTableSink准备上下文运行环境

  • virtual Status Open(RuntimeState* state):该方法在Send()方法之前调用,主要完成连接Kudu操作并创建KuduSession对象

  • virtual Status Send(RuntimeState* state, RowBatch* batch):该方法会被调用多次,主要完成往Kudu表写数据操作

  • virtual Status FlushFinal(RuntimeState* state):该方法在close()方法之前调用,负责清空缓存中剩余的数据,将这些数据刷新到Kudu

  • virtual void Close(RuntimeState* state):关闭KuduSession,释放资源

Kudu提交数据有三种策略,Impala使用的是异步刷新模式向Kudu提交数据KuduTableSinkOpen()方法中设置KuduSessionFlushModeAUTO_FLUSH_BACKGROUND

KUDU_RETURN_IF_ERROR(session_->SetFlushMode(kudu::client::KuduSession::AUTO_FLUSH_BACKGROUND), "Unable to set flush mode");

在这里简要说下三种Kudu提交数据策略的含义:

  • AUTO_FLUSH_SYNC:同步刷新模式。调用KuduSession.apply()方法后,客户端会等数据刷新到服务器后再返回,这种情况就不能批量插入数据,调用KuduSession.flush()方法不会起任何作用,因为此时缓冲区数据已经被刷新到了服务器

  • AUTO_FLUSH_BACKGROUND:异步刷新模式。调用KuduSession.apply()方法后,客户端会立即返回,但是写入将在后台发送,可能与来自同一会话的其他写入一起进行批处理。如果没有足够的缓冲空间,KuduSession.apply()会阻塞直到缓冲空间可用。因为写入操作是在后台进行的,因此任何错误都将存储在一个会话本地(session-local)缓冲区中,调用countPendingErrors()或者getPendingErrors()可以获取错误相关的信息。注意:这种模式可能会导致数据写入的时候乱序,这是因为在这种模式下,多个写操作可以并行发送到服务器

  • MANUAL_FLUSH:手动刷新模式。调用KuduSession.apply()方法后,客户端会立即返回,但是写入请求不会被立即发送,需要我们手动调用KuduSession.flush()来发送写入请求。如果缓冲区超过了配置的大小,会返回错误

除刷新方式设置外,还有以下参数会影响客户端的写入行为:

  • kudu_mutation_buffer_sizeKudu客户端缓存操作数据的字节数,KuduTableSink中定义的默认值为10MB,该参数通过KuduSessionSetMutationBufferSpace()方法设置。可以在impalad的启动项中自定义kudu_mutation_buffer_size的大小

  • kudu_error_buffer_sizeKuduSession操作异常的buffer大小,KuduTableSink中定义的默认值为10MB,该参数通过KuduSessionSetErrorBufferSpace()方法设置。可以在impalad的启动项中自定义kudu_error_buffer_size的大小

  • 触发flush操作的缓存阈值:仅在AUTO_FLUSH_BACKGROUND刷新模式下生效。KuduTableSink中定义的默认值为70%。当缓存大小达到70%的时候,Kudu客户端开始将缓存的数据发送给相应的tablet服务。Kudu客户端定义的阈值为50%。该阈值通过KuduSessionSetMutationBufferFlushWatermark()方法设置

  • 每个KuduSession对象的最大缓存数:KuduTableSink将其设置为0,表示无限制。该参数通过KuduSessionSetMutationBufferMaxNum()方法设置

以上参数在KuduTableSinkOpen()方法中的代码为:

 session_ = client_->NewSession(); session_->SetTimeoutMillis(FLAGS_kudu_operation_timeout_ms);
// KuduSession Set* methods here and below return a status for API compatibility. // As long as the Kudu client is statically linked, these shouldn't fail and thus these // calls could also DCHECK status is OK for debug builds (while still returning errors // for release). KUDU_RETURN_IF_ERROR(session_->SetFlushMode( kudu::client::KuduSession::AUTO_FLUSH_BACKGROUND), "Unable to set flush mode");
const int32_t buf_size = FLAGS_kudu_mutation_buffer_size; if (buf_size < 1024 * 1024) { return Status(strings::Substitute( "Invalid kudu_mutation_buffer_size: '$0'. Must be greater than 1MB.", buf_size)); } KUDU_RETURN_IF_ERROR(session_->SetMutationBufferSpace(buf_size), "Couldn't set mutation buffer size");
// Configure client memory used for buffering. // Internally, the Kudu client keeps one or more buffers for writing operations. When a // single buffer is flushed, it is locked (that space cannot be reused) until all // operations within it complete, so it is important to have a number of buffers. In // our testing, we found that allowing a total of 10MB of buffer space to provide good // results; this is the default. Then, because of some existing 8MB limits in Kudu, we // want to have that total space broken up into 7MB buffers (INDIVIDUAL_BUFFER_SIZE). // The mutation flush watermark is set to flush every INDIVIDUAL_BUFFER_SIZE. // TODO: simplify/remove this logic when Kudu simplifies the API (KUDU-1808). int num_buffers = FLAGS_kudu_mutation_buffer_size / INDIVIDUAL_BUFFER_SIZE; if (num_buffers == 0) num_buffers = 1; KUDU_RETURN_IF_ERROR(session_->SetMutationBufferFlushWatermark(1.0 / num_buffers), "Couldn't set mutation buffer watermark");
// No limit on the buffer count since the settings above imply a max number of buffers. // Note that the Kudu client API has a few too many knobs for configuring the size and // number of these buffers; there are a few ways to accomplish similar behaviors. KUDU_RETURN_IF_ERROR(session_->SetMutationBufferMaxNum(0), "Couldn't set mutation buffer count");
KUDU_RETURN_IF_ERROR(session_->SetErrorBufferSpace(error_buffer_size), "Failed to set error buffer space");

最后,在Send()FlushFinal()方法执行过程中,会调用CheckForErrors()方法检查写入是否有错误发生,并记录错误信息和错误的行数(num_row_errors_)。

参考资料

  • 源码链接:https://github.com/apache/impala/blob/3.2.0/be/src/exec/kudu-table-sink.cc

  • SessionConfiguration.FlushMode官方文档

  • Kudu C++ client API