我们都是架构师!
关注架构师(JiaGouX),添加“星标”
获取每天技术干货,一起成为牛逼架构师
技术群请加若飞:1321113940 进架构师群
投稿、合作、版权等邮箱:[email protected]
[root@hadoop ~]# cd /usr/local/src[root@hadoop /usr/local/src]# lspresto-server-0.243.2.tar.gz presto-cli-0.243.2-executable.jar[root@hadoop /usr/local/src]#
[root@hadoop /usr/local/src]# tar -zxvf presto-server-0.243.2.tar.gz[root@hadoop /usr/local/src]# mv presto-server-0.243.2 /usr/local/presto-server[root@hadoop /usr/local/src]# cd /usr/local/presto-server/[root@hadoop /usr/local/presto-server]# lsbin lib NOTICE plugin README.txt[root@hadoop /usr/local/presto-server]#
[][]coordinator=truenode-scheduler.include-coordinator=truehttp-server.http.port=9090discovery-server.enabled=truediscovery.uri=http://192.168.243.161:9090[root@hadoop /usr/local/presto-server]# vim etc/node.properties # 每个节点的特殊配置
# presto集群的名称node.environment=presto_dev# 当前节点的idnode.id=ffffffff-ffff-ffff-ffff-ffffffffff01# 节点的数据存储目录node.data-dir=/data/presto[root@hadoop /usr/local/presto-server]# vim etc/jvm.config # JVM相关配置
-server-Xmx8G-XX:+UseG1GC-XX:G1HeapRegionSize=32M-XX:+UseGCOverheadLimit-XX:+ExplicitGCInvokesConcurrent-XX:+HeapDumpOnOutOfMemoryError-XX:+ExitOnOutOfMemoryError
[root@hadoop /usr/local/presto-server]# vim etc/log.properties # 日志相关配置
com.facebook.presto=INFO
[root@hadoop /usr/local/presto-server]# mkdir etc/catalog[root@hadoop /usr/local/presto-server]# vim etc/catalog/jmx.propertiesconnector.name=jmx[root@hadoop /usr/local/presto-server]# vim etc/catalog/hive.propertiesconnector.name=hive-hadoop2hive.metastore.uri=thrift://192.168.243.161:9083hive.config.resources=/usr/local/hadoop-2.8.5/etc/hadoop/hdfs-site.xml,/usr/local/hadoop-2.8.5/etc/hadoop/core-site.xmlhive.allow-drop-table=false
[root@hadoop /usr/local/presto-server]# bin/launcher run...2020-11-16T16:55:35.776+0800 INFO main com.facebook.presto.server.PrestoServer ======== SERVER STARTED ========
[root@hadoop /usr/local/presto-server]# bin/launcher startStarted as 5908[root@hadoop /usr/local/presto-server]#
[root@hadoop /usr/local/presto-server]# jps |grep -i presto5908 PrestoServer[root@hadoop /usr/local/presto-server]# netstat -lntp |grep 5908tcp6 0 0 :::39225 :::* LISTEN 5908/javatcp6 0 0 :::42622 :::* LISTEN 5908/javatcp6 0 0 :::9090 :::* LISTEN 5908/javatcp6 0 0 :::36714 :::* LISTEN 5908/javatcp6 0 0 :::45066 :::* LISTEN 5908/javatcp6 0 0 :::32982 :::* LISTEN 5908/java[root@hadoop /usr/local/presto-server]#
bin目录下:
[root@hadoop /usr/local/presto-server]# mv /usr/local/src/presto-cli-0.243.2-executable.jar bin/presto-cli.jar[root@hadoop /usr/local/presto-server]# chmod a+x bin/presto-cli.jar
/usr/local/presto-server]# bin/presto-cli.jar --server localhost:9090 --catalog hive --user rootshow catalogs;Catalog---------hivejmxsystemrows)Query 20201116_091555_00001_cus94, FINISHED, 1 nodeSplits: 19 total, 19 done (100.00%)0:00 [0 rows, 0B] [0 rows/s, 0B/s]show schemas;Schema--------------------db01defaultinformation_schemarows)Query 20201116_091557_00002_cus94, FINISHED, 1 nodeSplits: 19 total, 19 done (100.00%)0:00 [3 rows, 44B] [16 rows/s, 243B/s]use db01;USEpresto:db01> show tables;Table----------log_devlog_dev2rows)Query 20201116_091652_00004_cus94, FINISHED, 1 nodeSplits: 19 total, 19 done (100.00%)0:00 [2 rows, 43B] [5 rows/s, 117B/s]presto:db01> select * from log_dev;id | name | create_time | creator | info----+----------+-------------+---------+----------------4 | 更新用户 | 1554189515 | yarn | 更新用户 test36 | 创建用户 | 1554299345 | yarn | 创建用户 test5rows)Query 20201116_091705_00005_cus94, FINISHED, 1 nodeSplits: 17 total, 17 done (100.00%)0:01 [2 rows, 84B] [2 rows/s, 84B/s]presto:db01>
pom文件的内容如下:
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>presto-test</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>com.facebook.presto</groupId><artifactId>presto-jdbc</artifactId><version>0.243.2</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build></project>
package com.example.presto.demo;import java.sql.Connection;import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.Statement;/*** 使用JDBC操作Presto** @author 01* @date 2020-11-16**/public class JdbcTest {public static void main(String[] args) throws Exception {Class.forName("com.facebook.presto.jdbc.PrestoDriver");Connection connection = DriverManager.getConnection("jdbc:presto://192.168.243.161:9090/hive/db01","root", null);Statement statement = connection.createStatement();ResultSet resultSet = statement.executeQuery("select * from log_dev");while (resultSet.next()) {for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {System.out.print(resultSet.getString(i) + "\t");}System.out.println();}resultSet.close();connection.close();}}
@ScalarFunction注解标记实现业务逻辑的静态方法
@Description描述函数的作用,这里的内容会在
SHOW FUNCTIONS中显示
@SqlType标记函数的返回值类型
pom文件中,添加如下依赖:
<dependency><groupId>com.facebook.presto</groupId><artifactId>presto-spi</artifactId><version>0.243</version><scope>provided</scope></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>21.0</version></dependency>
package com.example.presto.demo.udf;import com.facebook.presto.common.type.StandardTypes;import com.facebook.presto.spi.function.Description;import com.facebook.presto.spi.function.ScalarFunction;import com.facebook.presto.spi.function.SqlType;import io.airlift.slice.Slice;import io.airlift.slice.Slices;public class PrefixFunction {/*** 为字符串添加一个前缀* presto中没有String类型,使用Slice代替*/public static Slice prefix( Slice value) {return Slices.utf8Slice("presto_udf_" + value.toStringUtf8());}}
package com.example.presto.demo.udf;import com.facebook.presto.common.type.StandardTypes;import com.facebook.presto.spi.function.Description;import com.facebook.presto.spi.function.ScalarFunction;import com.facebook.presto.spi.function.SqlNullable;import com.facebook.presto.spi.function.SqlType;import io.airlift.slice.Slice;import io.airlift.slice.Slices;public class GenJson {/*** 根据传入的数据生成json字符串*/public static Slice genJson( Slice key,Slice value) {return Slices.utf8Slice(String.format("{\"%s\":\"%s\"}", key.toStringUtf8(),value == null ? "" : value.toStringUtf8()));}}
Plugin的实现类,在
getFunctions方法中添加我们开发的UDF函数。代码如下:
package com.example.presto.demo.udf;import com.facebook.presto.spi.Plugin;import com.google.common.collect.ImmutableSet;import java.util.Set;public class ExampleFunctionsPlugin implements Plugin {@Overridepublic Set<Class<?>> getFunctions() {return ImmutableSet.<Class<?>>builder().add(PrefixFunction.class).add(GenJson.class).build();}}
resources目录下创建如下目录文件:
com.example.presto.demo.udf.ExampleFunctionsPlugin
[]presto-test-1.0-SNAPSHOT.jar[]
plugin目录下:
[][][][]guava-26.0-jre.jar presto-test-1.0-SNAPSHOT.jar
[]
[]presto> use db01;USEpresto:db01> select Prefix(name) from log_dev;_col0---------------------presto_udf_更新用户presto_udf_创建用户(2 rows)Query 20201116_121815_00002_upy9p, FINISHED, 1 nodeSplits: 17 total, 17 done (100.00%)0:01 [2 rows, 84B] [1 rows/s, 63B/s]presto:db01> select GenJson(creator, name) from log_dev;_col0---------------------{"yarn":"更新用户"}{"yarn":"创建用户"}(2 rows)Query 20201116_121905_00003_upy9p, FINISHED, 1 nodeSplits: 17 total, 17 done (100.00%)0:00 [2 rows, 84B] [8 rows/s, 336B/s]presto:db01>
input(state, data):针对每条数据,执行
input函数,在每个有数据的节点都会执行,最终得到多个累积的状态数据
combine(state1, state2):将所有节点的状态数据聚合起来,直至所有状态数据被聚合成一个最终状态,即Aggregation函数的输出结果
output(final_state, out):最终输出结果到一个
BlockBuilder
@AggregationFunction标记为Aggregation函数
@InputFunction、
@CombineFunction、
@OutputFunction分别标记计算函数、合并结果函数和最终输出函数
AccumulatorState,声明用于提供和获取值的方法:
package com.example.presto.demo.udf;import com.facebook.presto.spi.function.AccumulatorState;import io.airlift.slice.Slice;public interface StringValueState extends AccumulatorState {Slice getStringValue();void setStringValue(Slice value);}
package com.example.presto.demo.udf;import com.facebook.presto.common.block.BlockBuilder;import com.facebook.presto.common.type.StandardTypes;import com.facebook.presto.common.type.VarcharType;import com.facebook.presto.spi.function.*;import io.airlift.slice.Slice;import io.airlift.slice.Slices;/*** Aggregation函数 - 实现字符串连接功能** @author 01*/("ConcatStr")public class ConCatFunction {public static void input(StringValueState state,@SqlType(StandardTypes.VARCHAR) Slice value) {state.setStringValue(Slices.utf8Slice(checkNull(state.getStringValue()) + "|" +value.toStringUtf8()));}public static void combine(StringValueState state,StringValueState otherState) {state.setStringValue(Slices.utf8Slice(checkNull(state.getStringValue()) + "|" +checkNull(otherState.getStringValue())));}(StandardTypes.VARCHAR)public static void output(StringValueState state,BlockBuilder blockBuilder) {VarcharType.VARCHAR.writeSlice(blockBuilder, state.getStringValue());}private static String checkNull(Slice slice) {return slice == null ? "" : slice.toStringUtf8();}}
ExampleFunctionsPlugin中添加该函数:
public class ExampleFunctionsPlugin implements Plugin {public Set<Class<?>> getFunctions() {return ImmutableSet.<Class<?>>builder()....add(ConCatFunction.class).build();}}
[]presto-test-1.0-SNAPSHOT.jar[]
[]cp:是否覆盖"/usr/local/presto-server/plugin/example-functions/presto-test-1.0-SNAPSHOT.jar"?yes[]
[]
/usr/local/presto-server]# bin/presto-cli.jar --server localhost:9090 --catalog hive --user rootuse db01;USEpresto:db01> select ConcatStr(creator) from log_dev2;_col0---------------------------------row)Query 20201116_124714_00001_inrgm, FINISHED, 1 nodeSplits: 18 total, 18 done (100.00%)0:01 [6 rows, 825B] [4 rows/s, 571B/s]presto:db01>
EventListener和
EventListenerFactory接口
EventListener,实现监听事件并将事件信息写入日志文件。首先,编写
EventListener的实现类,核心逻辑都在该类中。代码如下:
package com.example.presto.demo.eventlistener;import com.facebook.presto.spi.eventlistener.EventListener;import com.facebook.presto.spi.eventlistener.QueryCompletedEvent;import com.facebook.presto.spi.eventlistener.QueryCreatedEvent;import com.facebook.presto.spi.eventlistener.SplitCompletedEvent;import java.io.File;import java.io.FileWriter;import java.io.IOException;import java.time.Instant;import java.util.Map;public class QueryEventListener implements EventListener {private final String logPath;public QueryEventListener(Map<String, String> config) {logPath = config.get("log.path");System.out.println(logPath);}/*** 监听创建查询事件*/public void queryCreated(QueryCreatedEvent queryCreatedEvent) {String queryId = queryCreatedEvent.getMetadata().getQueryId();String query = queryCreatedEvent.getMetadata().getQuery();String user = queryCreatedEvent.getContext().getUser();String fileName = logPath + File.separator + queryId;File logFile = new File(fileName);if (!logFile.exists()) {try {boolean result = logFile.createNewFile();System.out.println(result);} catch (IOException e) {e.printStackTrace();}}try (FileWriter fw = new FileWriter(fileName, true)) {fw.append(String.format("User:%s Id:%s Query:%s%n", user, queryId, query));} catch (IOException e) {e.printStackTrace();}}/*** 监听查询完成事件*/public void queryCompleted(QueryCompletedEvent queryCompletedEvent) {String queryId = queryCompletedEvent.getMetadata().getQueryId();long createTime = queryCompletedEvent.getCreateTime().toEpochMilli();long endTime = queryCompletedEvent.getEndTime().toEpochMilli();long totalBytes = queryCompletedEvent.getStatistics().getTotalBytes();String queryState = queryCompletedEvent.getMetadata().getQueryState();queryCompletedEvent.getFailureInfo().ifPresent(queryFailureInfo -> {int errCode = queryFailureInfo.getErrorCode().getCode();String failureType = queryFailureInfo.getFailureType().orElse("").toUpperCase();String failureHost = queryFailureInfo.getFailureHost().orElse("");String failureMessage = queryFailureInfo.getFailureMessage().orElse("");});String fileName = logPath + File.separator + queryId;try (FileWriter fw = new FileWriter(fileName, true)) {fw.append(String.format("Id:%s StartTime:%s EndTime:%s State:%s%n",queryId, createTime, endTime, queryState));} catch (IOException e) {e.printStackTrace();}}/*** 监听split完成事件*/public void splitCompleted(SplitCompletedEvent splitCompletedEvent) {long createTime = splitCompletedEvent.getCreateTime().toEpochMilli();long endTime = splitCompletedEvent.getEndTime().orElse(Instant.MAX).toEpochMilli();String queryId = splitCompletedEvent.getQueryId();String stageId = splitCompletedEvent.getStageId();String taskId = splitCompletedEvent.getTaskId();String fileName = logPath + File.separator + queryId;try (FileWriter fw = new FileWriter(fileName, true)) {fw.append(String.format("Id:%s StartTime:%s EndTime:%s StageId:%s TaskId:%s%n",queryId, createTime, endTime, stageId, taskId));} catch (IOException e) {e.printStackTrace();}}}
EventListenerFactory接口,用于创建我们自定义的
QueryEventListener:
package com.example.presto.demo.eventlistener;import com.facebook.presto.spi.eventlistener.EventListener;import com.facebook.presto.spi.eventlistener.EventListenerFactory;import java.util.Map;public class QueryEventListenerFactory implements EventListenerFactory {public String getName() {// EventListener的名称return "query-event-listener";}public EventListener create(Map<String, String> config) {if (!config.containsKey("log.path")) {throw new RuntimeException("missing log.path conf");}return new QueryEventListener(config);}}
Plugin的实现类,在
getEventListenerFactories方法中添加我们自定义的EventListener创建工厂:
package com.example.presto.demo.eventlistener;import com.facebook.presto.spi.Plugin;import com.facebook.presto.spi.eventlistener.EventListenerFactory;import java.util.Collections;public class QueryEventPlugin implements Plugin {public Iterable<EventListenerFactory> getEventListenerFactories() {QueryEventListenerFactory queryEventListenerFactory = new QueryEventListenerFactory();return Collections.singletonList(queryEventListenerFactory);}}
com.facebook.presto.spi.Plugin文件中,添加
QueryEventPlugin类的包路径:
com.example.presto.demo.eventlistener.QueryEventPlugin
[]presto-test-1.0-SNAPSHOT.jar[]
plugin目录下:
[][][][]guava-26.0-jre.jar presto-test-1.0-SNAPSHOT.jar
example-functions目录,否则会在启动presto-server时因为重复注册UDF而报错:
[]
[]event-listener.name=query-event-listenerlog.path=/data/presto/log[]
[]
/usr/local/presto-server]# bin/presto-cli.jar --server localhost:9090 --catalog hive --user rootuse db01;USEpresto:db01> select * from log_dev;id | name | create_time | creator | info----+----------+-------------+---------+----------------4 | 更新用户 | 1554189515 | yarn | 更新用户 test36 | 创建用户 | 1554299345 | yarn | 创建用户 test5rows)Query 20201116_132643_00001_tvyva, FINISHED, 1 nodeSplits: 17 total, 17 done (100.00%)0:01 [2 rows, 84B] [1 rows/s, 58B/s]presto:db01> select * from log_dev2 limit 1;id | name | create_time | creator | info----+----------+-------------+---------+---------------1 | 创建用户 | 1554099545 | hdfs | 创建用户 testrow)Query 20201116_132652_00002_tvyva, FINISHED, 1 nodeSplits: 18 total, 18 done (100.00%)0:00 [1 rows, 825B] [3 rows/s, 2.48KB/s]presto:db01>
[root@hadoop ~]# ls /data/presto/log/20201116_132435_00000_tvyva 20201116_132643_00001_tvyva 20201116_132652_00002_tvyva[root@hadoop ~]# cat /data/presto/log/20201116_132435_00000_tvyvaUser:root Id:20201116_132435_00000_tvyva Query:use db01Id:20201116_132435_00000_tvyva StartTime:1605533075986 EndTime:1605533076000 State:FINISHED[root@hadoop ~]# cat /data/presto/log/20201116_132643_00001_tvyvaUser:root Id:20201116_132643_00001_tvyva Query:select * from log_devId:20201116_132643_00001_tvyva StartTime:1605533204999 EndTime:1605533205193 StageId:20201116_132643_00001_tvyva.1 TaskId:0...Id:20201116_132643_00001_tvyva StartTime:1605533203889 EndTime:1605533205297 State:FINISHED[root@hadoop ~]# cat /data/presto/log/20201116_132652_00002_tvyvaUser:root Id:20201116_132652_00002_tvyva Query:select * from log_dev2 limit 1Id:20201116_132652_00002_tvyva StartTime:1605533212541 EndTime:1605533212644 StageId:20201116_132652_00002_tvyva.1 TaskId:0...Id:20201116_132652_00002_tvyva StartTime:1605533212413 EndTime:1605533212688 State:FINISHED[root@hadoop ~]#
query.low-memory-killer.policy配置参数,可以指定kill查询的策略。该参数取值:
total-reservation-on-blocked-nodes(kill在阻塞节点上使用内存最多的查询)或者
total-reservation(kill最耗费内存的查询)
query.max-memory:单个query在整个集群中允许占用的最大user memory
query.max-total-memory:单个query在整个集群中允许占用的最大(user + system) memory
query.max-memory-per-node:一个query在单个worker上允许的最大user memory,即ReservedPool,默认为heapSize的0.1
query.max-total-memory-per-node:一个query在单个worker上允许的最大(user + system) memory
·END·
相关阅读:
来源:https://www.jianshu.com/p/3c28e89c7813
版权申明:内容来源网络,仅供分享学习,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢!
我们都是架构师!
关注架构师(JiaGouX),添加“星标”
获取每天技术干货,一起成为牛逼架构师
技术群请加若飞:1321113940 进架构师群
投稿、合作、版权等邮箱:[email protected]