我们都是架构师!
关注架构师(JiaGouX),添加“星标”
获取每天技术干货,一起成为牛逼架构师
技术群请加若飞:1321113940 进架构师群
投稿、合作、版权等邮箱:[email protected]
[root@hadoop ~]# cd /usr/local/src
[root@hadoop /usr/local/src]# ls
presto-server-0.243.2.tar.gz presto-cli-0.243.2-executable.jar
[root@hadoop /usr/local/src]#
[root@hadoop /usr/local/src]# tar -zxvf presto-server-0.243.2.tar.gz
[root@hadoop /usr/local/src]# mv presto-server-0.243.2 /usr/local/presto-server
[root@hadoop /usr/local/src]# cd /usr/local/presto-server/
[root@hadoop /usr/local/presto-server]# ls
bin lib NOTICE plugin README.txt
[root@hadoop /usr/local/presto-server]#
[ ]
[ ]
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=9090
discovery-server.enabled=true
discovery.uri=http://192.168.243.161:9090
[root@hadoop /usr/local/presto-server]# vim etc/node.properties # 每个节点的特殊配置
# presto集群的名称
node.environment=presto_dev
# 当前节点的id
node.id=ffffffff-ffff-ffff-ffff-ffffffffff01
# 节点的数据存储目录
node.data-dir=/data/presto
[root@hadoop /usr/local/presto-server]# vim etc/jvm.config # JVM相关配置
-server
-Xmx8G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+ExitOnOutOfMemoryError
[root@hadoop /usr/local/presto-server]# vim etc/log.properties # 日志相关配置
com.facebook.presto=INFO
[root@hadoop /usr/local/presto-server]# mkdir etc/catalog
[root@hadoop /usr/local/presto-server]# vim etc/catalog/jmx.properties
connector.name=jmx
[root@hadoop /usr/local/presto-server]# vim etc/catalog/hive.properties
connector.name=hive-hadoop2
hive.metastore.uri=thrift://192.168.243.161:9083
hive.config.resources=/usr/local/hadoop-2.8.5/etc/hadoop/hdfs-site.xml,/usr/local/hadoop-2.8.5/etc/hadoop/core-site.xml
hive.allow-drop-table=false
[root@hadoop /usr/local/presto-server]# bin/launcher run
...
2020-11-16T16:55:35.776+0800 INFO main com.facebook.presto.server.PrestoServer ======== SERVER STARTED ========
[root@hadoop /usr/local/presto-server]# bin/launcher start
Started as 5908
[root@hadoop /usr/local/presto-server]#
[root@hadoop /usr/local/presto-server]# jps |grep -i presto
5908 PrestoServer
[root@hadoop /usr/local/presto-server]# netstat -lntp |grep 5908
tcp6 0 0 :::39225 :::* LISTEN 5908/java
tcp6 0 0 :::42622 :::* LISTEN 5908/java
tcp6 0 0 :::9090 :::* LISTEN 5908/java
tcp6 0 0 :::36714 :::* LISTEN 5908/java
tcp6 0 0 :::45066 :::* LISTEN 5908/java
tcp6 0 0 :::32982 :::* LISTEN 5908/java
[root@hadoop /usr/local/presto-server]#
bin
目录下:
[root@hadoop /usr/local/presto-server]# mv /usr/local/src/presto-cli-0.243.2-executable.jar bin/presto-cli.jar
[root@hadoop /usr/local/presto-server]# chmod a+x bin/presto-cli.jar
/usr/local/presto-server]# bin/presto-cli.jar --server localhost:9090 --catalog hive --user root
show catalogs;
Catalog
---------
hive
jmx
system
rows)
Query 20201116_091555_00001_cus94, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:00 [0 rows, 0B] [0 rows/s, 0B/s]
show schemas;
Schema
--------------------
db01
default
information_schema
rows)
Query 20201116_091557_00002_cus94, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:00 [3 rows, 44B] [16 rows/s, 243B/s]
use db01;
USE
presto:db01> show tables;
Table
----------
log_dev
log_dev2
rows)
Query 20201116_091652_00004_cus94, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:00 [2 rows, 43B] [5 rows/s, 117B/s]
presto:db01> select * from log_dev;
id | name | create_time | creator | info
----+----------+-------------+---------+----------------
4 | 更新用户 | 1554189515 | yarn | 更新用户 test3
6 | 创建用户 | 1554299345 | yarn | 创建用户 test5
rows)
Query 20201116_091705_00005_cus94, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:01 [2 rows, 84B] [2 rows/s, 84B/s]
presto:db01>
pom
文件的内容如下:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>presto-test</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-jdbc</artifactId>
<version>0.243.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
package com.example.presto.demo;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
/**
* 使用JDBC操作Presto
*
* @author 01
* @date 2020-11-16
**/
public class JdbcTest {
public static void main(String[] args) throws Exception {
Class.forName("com.facebook.presto.jdbc.PrestoDriver");
Connection connection = DriverManager.getConnection(
"jdbc:presto://192.168.243.161:9090/hive/db01",
"root", null
);
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select * from log_dev");
while (resultSet.next()) {
for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
System.out.print(resultSet.getString(i) + "\t");
}
System.out.println();
}
resultSet.close();
connection.close();
}
}
@ScalarFunction
注解标记实现业务逻辑的静态方法
@Description
描述函数的作用,这里的内容会在
SHOW FUNCTIONS
中显示
@SqlType
标记函数的返回值类型
pom
文件中,添加如下依赖:
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
<version>0.243</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>21.0</version>
</dependency>
package com.example.presto.demo.udf;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.spi.function.Description;
import com.facebook.presto.spi.function.ScalarFunction;
import com.facebook.presto.spi.function.SqlType;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
public class PrefixFunction {
/**
* 为字符串添加一个前缀
* presto中没有String类型,使用Slice代替
*/
public static Slice prefix( Slice value) {
return Slices.utf8Slice("presto_udf_" + value.toStringUtf8());
}
}
package com.example.presto.demo.udf;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.spi.function.Description;
import com.facebook.presto.spi.function.ScalarFunction;
import com.facebook.presto.spi.function.SqlNullable;
import com.facebook.presto.spi.function.SqlType;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
public class GenJson {
/**
* 根据传入的数据生成json字符串
*/
public static Slice genJson( Slice key,
Slice value) {
return Slices.utf8Slice(
String.format("{\"%s\":\"%s\"}", key.toStringUtf8(),
value == null ? "" : value.toStringUtf8())
);
}
}
Plugin
的实现类,在
getFunctions
方法中添加我们开发的UDF函数。代码如下:
package com.example.presto.demo.udf;
import com.facebook.presto.spi.Plugin;
import com.google.common.collect.ImmutableSet;
import java.util.Set;
public class ExampleFunctionsPlugin implements Plugin {
@Override
public Set<Class<?>> getFunctions() {
return ImmutableSet.<Class<?>>builder()
.add(PrefixFunction.class)
.add(GenJson.class)
.build();
}
}
resources
目录下创建如下目录文件:
com.example.presto.demo.udf.ExampleFunctionsPlugin
[ ]
presto-test-1.0-SNAPSHOT.jar
[ ]
plugin
目录下:
[ ]
[ ]
[ ]
[ ]
guava-26.0-jre.jar presto-test-1.0-SNAPSHOT.jar
[ ]
[ ]
presto> use db01;
USE
presto:db01> select Prefix(name) from log_dev;
_col0
---------------------
presto_udf_更新用户
presto_udf_创建用户
(2 rows)
Query 20201116_121815_00002_upy9p, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:01 [2 rows, 84B] [1 rows/s, 63B/s]
presto:db01> select GenJson(creator, name) from log_dev;
_col0
---------------------
{"yarn":"更新用户"}
{"yarn":"创建用户"}
(2 rows)
Query 20201116_121905_00003_upy9p, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:00 [2 rows, 84B] [8 rows/s, 336B/s]
presto:db01>
input(state, data)
:针对每条数据,执行
input
函数,在每个有数据的节点都会执行,最终得到多个累积的状态数据
combine(state1, state2)
:将所有节点的状态数据聚合起来,直至所有状态数据被聚合成一个最终状态,即Aggregation函数的输出结果
output(final_state, out)
:最终输出结果到一个
BlockBuilder
@AggregationFunction
标记为Aggregation函数
@InputFunction
、
@CombineFunction
、
@OutputFunction
分别标记计算函数、合并结果函数和最终输出函数
AccumulatorState
,声明用于提供和获取值的方法:
package com.example.presto.demo.udf;
import com.facebook.presto.spi.function.AccumulatorState;
import io.airlift.slice.Slice;
public interface StringValueState extends AccumulatorState {
Slice getStringValue();
void setStringValue(Slice value);
}
package com.example.presto.demo.udf;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.spi.function.*;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
/**
* Aggregation函数 - 实现字符串连接功能
*
* @author 01
*/
"ConcatStr") (
public class ConCatFunction {
public static void input(StringValueState state,
@SqlType(StandardTypes.VARCHAR) Slice value) {
state.setStringValue(Slices.utf8Slice(
checkNull(state.getStringValue()) + "|" +
value.toStringUtf8()
));
}
public static void combine(StringValueState state,
StringValueState otherState) {
state.setStringValue(Slices.utf8Slice(
checkNull(state.getStringValue()) + "|" +
checkNull(otherState.getStringValue())
));
}
(StandardTypes.VARCHAR)
public static void output(StringValueState state,
BlockBuilder blockBuilder) {
VarcharType.VARCHAR.writeSlice(blockBuilder, state.getStringValue());
}
private static String checkNull(Slice slice) {
return slice == null ? "" : slice.toStringUtf8();
}
}
ExampleFunctionsPlugin
中添加该函数:
public class ExampleFunctionsPlugin implements Plugin {
public Set<Class<?>> getFunctions() {
return ImmutableSet.<Class<?>>builder()
...
.add(ConCatFunction.class)
.build();
}
}
[ ]
presto-test-1.0-SNAPSHOT.jar
[ ]
[ ]
cp:是否覆盖"/usr/local/presto-server/plugin/example-functions/presto-test-1.0-SNAPSHOT.jar"?yes
[ ]
[ ]
/usr/local/presto-server]# bin/presto-cli.jar --server localhost:9090 --catalog hive --user root
use db01;
USE
presto:db01> select ConcatStr(creator) from log_dev2;
_col0
---------------------------------
row)
Query 20201116_124714_00001_inrgm, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:01 [6 rows, 825B] [4 rows/s, 571B/s]
presto:db01>
EventListener
和
EventListenerFactory
接口
EventListener
,实现监听事件并将事件信息写入日志文件。首先,编写
EventListener
的实现类,核心逻辑都在该类中。代码如下:
package com.example.presto.demo.eventlistener;
import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.spi.eventlistener.QueryCompletedEvent;
import com.facebook.presto.spi.eventlistener.QueryCreatedEvent;
import com.facebook.presto.spi.eventlistener.SplitCompletedEvent;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.time.Instant;
import java.util.Map;
public class QueryEventListener implements EventListener {
private final String logPath;
public QueryEventListener(Map<String, String> config) {
logPath = config.get("log.path");
System.out.println(logPath);
}
/**
* 监听创建查询事件
*/
public void queryCreated(QueryCreatedEvent queryCreatedEvent) {
String queryId = queryCreatedEvent.getMetadata().getQueryId();
String query = queryCreatedEvent.getMetadata().getQuery();
String user = queryCreatedEvent.getContext().getUser();
String fileName = logPath + File.separator + queryId;
File logFile = new File(fileName);
if (!logFile.exists()) {
try {
boolean result = logFile.createNewFile();
System.out.println(result);
} catch (IOException e) {
e.printStackTrace();
}
}
try (FileWriter fw = new FileWriter(fileName, true)) {
fw.append(String.format("User:%s Id:%s Query:%s%n", user, queryId, query));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 监听查询完成事件
*/
public void queryCompleted(QueryCompletedEvent queryCompletedEvent) {
String queryId = queryCompletedEvent.getMetadata().getQueryId();
long createTime = queryCompletedEvent.getCreateTime().toEpochMilli();
long endTime = queryCompletedEvent.getEndTime().toEpochMilli();
long totalBytes = queryCompletedEvent.getStatistics().getTotalBytes();
String queryState = queryCompletedEvent.getMetadata().getQueryState();
queryCompletedEvent.getFailureInfo().ifPresent(queryFailureInfo -> {
int errCode = queryFailureInfo.getErrorCode().getCode();
String failureType = queryFailureInfo.getFailureType().orElse("").toUpperCase();
String failureHost = queryFailureInfo.getFailureHost().orElse("");
String failureMessage = queryFailureInfo.getFailureMessage().orElse("");
});
String fileName = logPath + File.separator + queryId;
try (FileWriter fw = new FileWriter(fileName, true)) {
fw.append(String.format("Id:%s StartTime:%s EndTime:%s State:%s%n",
queryId, createTime, endTime, queryState));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 监听split完成事件
*/
public void splitCompleted(SplitCompletedEvent splitCompletedEvent) {
long createTime = splitCompletedEvent.getCreateTime().toEpochMilli();
long endTime = splitCompletedEvent.getEndTime().orElse(Instant.MAX).toEpochMilli();
String queryId = splitCompletedEvent.getQueryId();
String stageId = splitCompletedEvent.getStageId();
String taskId = splitCompletedEvent.getTaskId();
String fileName = logPath + File.separator + queryId;
try (FileWriter fw = new FileWriter(fileName, true)) {
fw.append(String.format("Id:%s StartTime:%s EndTime:%s StageId:%s TaskId:%s%n",
queryId, createTime, endTime, stageId, taskId));
} catch (IOException e) {
e.printStackTrace();
}
}
}
EventListenerFactory
接口,用于创建我们自定义的
QueryEventListener
:
package com.example.presto.demo.eventlistener;
import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.spi.eventlistener.EventListenerFactory;
import java.util.Map;
public class QueryEventListenerFactory implements EventListenerFactory {
public String getName() {
// EventListener的名称
return "query-event-listener";
}
public EventListener create(Map<String, String> config) {
if (!config.containsKey("log.path")) {
throw new RuntimeException("missing log.path conf");
}
return new QueryEventListener(config);
}
}
Plugin
的实现类,在
getEventListenerFactories
方法中添加我们自定义的EventListener创建工厂:
package com.example.presto.demo.eventlistener;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.eventlistener.EventListenerFactory;
import java.util.Collections;
public class QueryEventPlugin implements Plugin {
public Iterable<EventListenerFactory> getEventListenerFactories() {
QueryEventListenerFactory queryEventListenerFactory = new QueryEventListenerFactory();
return Collections.singletonList(queryEventListenerFactory);
}
}
com.facebook.presto.spi.Plugin
文件中,添加
QueryEventPlugin
类的包路径:
com.example.presto.demo.eventlistener.QueryEventPlugin
[ ]
presto-test-1.0-SNAPSHOT.jar
[ ]
plugin
目录下:
[ ]
[ ]
[ ]
[ ]
guava-26.0-jre.jar presto-test-1.0-SNAPSHOT.jar
example-functions
目录,否则会在启动presto-server时因为重复注册UDF而报错:
[ ]
[ ]
event-listener.name=query-event-listener
log.path=/data/presto/log
[ ]
[ ]
/usr/local/presto-server]# bin/presto-cli.jar --server localhost:9090 --catalog hive --user root
use db01;
USE
presto:db01> select * from log_dev;
id | name | create_time | creator | info
----+----------+-------------+---------+----------------
4 | 更新用户 | 1554189515 | yarn | 更新用户 test3
6 | 创建用户 | 1554299345 | yarn | 创建用户 test5
rows)
Query 20201116_132643_00001_tvyva, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:01 [2 rows, 84B] [1 rows/s, 58B/s]
presto:db01> select * from log_dev2 limit 1;
id | name | create_time | creator | info
----+----------+-------------+---------+---------------
1 | 创建用户 | 1554099545 | hdfs | 创建用户 test
row)
Query 20201116_132652_00002_tvyva, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:00 [1 rows, 825B] [3 rows/s, 2.48KB/s]
presto:db01>
[root@hadoop ~]# ls /data/presto/log/
20201116_132435_00000_tvyva 20201116_132643_00001_tvyva 20201116_132652_00002_tvyva
[root@hadoop ~]# cat /data/presto/log/20201116_132435_00000_tvyva
User:root Id:20201116_132435_00000_tvyva Query:use db01
Id:20201116_132435_00000_tvyva StartTime:1605533075986 EndTime:1605533076000 State:FINISHED
[root@hadoop ~]# cat /data/presto/log/20201116_132643_00001_tvyva
User:root Id:20201116_132643_00001_tvyva Query:select * from log_dev
Id:20201116_132643_00001_tvyva StartTime:1605533204999 EndTime:1605533205193 StageId:20201116_132643_00001_tvyva.1 TaskId:0
...
Id:20201116_132643_00001_tvyva StartTime:1605533203889 EndTime:1605533205297 State:FINISHED
[root@hadoop ~]# cat /data/presto/log/20201116_132652_00002_tvyva
User:root Id:20201116_132652_00002_tvyva Query:select * from log_dev2 limit 1
Id:20201116_132652_00002_tvyva StartTime:1605533212541 EndTime:1605533212644 StageId:20201116_132652_00002_tvyva.1 TaskId:0
...
Id:20201116_132652_00002_tvyva StartTime:1605533212413 EndTime:1605533212688 State:FINISHED
[root@hadoop ~]#
query.low-memory-killer.policy
配置参数,可以指定kill查询的策略。该参数取值:
total-reservation-on-blocked-nodes
(kill在阻塞节点上使用内存最多的查询)或者
total-reservation
(kill最耗费内存的查询)
query.max-memory
:单个query在整个集群中允许占用的最大user memory
query.max-total-memory
:单个query在整个集群中允许占用的最大(user + system) memory
query.max-memory-per-node
:一个query在单个worker上允许的最大user memory,即ReservedPool,默认为heapSize的0.1
query.max-total-memory-per-node
:一个query在单个worker上允许的最大(user + system) memory
·END·
相关阅读:
来源:https://www.jianshu.com/p/3c28e89c7813
版权申明:内容来源网络,仅供分享学习,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢!
我们都是架构师!
关注架构师(JiaGouX),添加“星标”
获取每天技术干货,一起成为牛逼架构师
技术群请加若飞:1321113940 进架构师群
投稿、合作、版权等邮箱:[email protected]