Flink安装部署之Flink On Kubernetes Session Model | 清风
Flink on Kubernetes的Session集群构建过程
1)实现org.apache.flink.kubernetes.cli.KubernetesSessionCli (mian 程序)来支持Kubernetes的命令行。
2)在org.apache.flink.client.cli.CliFrontend (mian 程序)中实现加载KubernetesSessionCli的命令行。
3)实现继承org.apache.flink.client.deployment.ClusterDescriptor的KubernetesClusterDescriptor,进而实现创建JobManager Service和JobManager Deployment的逻辑。
4)实现JobManager的入口命令和继承自org.apache.flink.kubernetes.entrypoint.SessionClusterEntrypoint (mian 程序)的Kubernetes-ClusterEntrypoint。KubernetesClusterEntrypoint实现创建Dispatcher和ResourceManager组件的工厂类方法createDispatcherResourceManagerComponentFactory,该方法负责启动JobManager的运行时组件和基本服务。
5)实现KubernetesResourceManager类,该类负责根据资源情况往Kubernetes API Server发送申请TaskManager Pod的请求和维护资源的状态。
6)实现TaskManager的入口命令以及入口类org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner(mian 程序),Kubernetes-TaskExecutorRunner通过加载相应的配置构建和启动TaskExecutorRunner。
① 资源利用率低,提前确定 TaskManager 数量,如果作业需要的资源少,则大量 TaskManager 处于闲置状态。反之则会导致 TaskManager 资源不足。
② 作业隔离性差,多个作业的任务存在资源竞争,相互影响。如果一个作业异常导致 TaskManager 挂了,该 TaskManager 上的全部作业都会被重启。
(1)运行的 JobManager 的 Deployment;
(2)运行一组 TaskManager 的 Deployment;
(3)对外暴露 REST 接口和端口的 Service。
(1)定义配置、服务、deployments
1)创建configmap,flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
parallelism.default: 2
log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
2)创建jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
name: rpc
port: 6123
name: blob-server
port: 6124
name: webui
port: 8081
selector:
app: flink
component: jobmanager
3)创建jobmanager-session-deployment.yaml,非可用版本配置如下
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
name: jobmanager
image: apache/flink:1.14.3-scala_2.11
args: ["jobmanager"]
ports:
containerPort: 6123
name: rpc
containerPort: 6124
name: blob-server
containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
name: flink-config-volume
mountPath: /opt/flink/conf
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
name: flink-config-volume
configMap:
name: flink-config
items:
key: flink-conf.yaml
path: flink-conf.yaml
key: log4j-console.properties
path: log4j-console.properties
4)创建taskmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
name: taskmanager
image: apache/flink:1.14.3-scala_2.11
args: ["taskmanager"]
ports:
containerPort: 6122
name: rpc
containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
name: flink-config-volume
configMap:
name: flink-config
items:
key: flink-conf.yaml
path: flink-conf.yaml
key: log4j-console.properties
path: log4j-console.properties
(2)创建configmap,service,JobManager Deployment,TaskManager Deployment
$ kubectl create -f flink-configuration-configmap.yaml
$ kubectl create -f jobmanager-service.yaml
$ kubectl create -f jobmanager-session-deployment.yaml
$ kubectl create -f taskmanager-session-deployment.yaml
kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-jobmanager-5d84d6b644-8jjkm 1/1 Running 0 4s
flink-taskmanager-9fdb95fd8-9mpgc 1/1 Running 1 14m
flink-taskmanager-9fdb95fd8-vbtwv 1/1 Running 1 14m
(3)对外暴露端口
1、使用kubectl proxy方式
kubectl proxy --address='192.0.0.1' -p=8081 --accept-hosts='^*$'
2、使用kubectl port-forward方式
kubectl port-forward ${flink-jobmanager-pod} 8081:8081
3、使用jobmanager-rest-service方式,创建jobmanager-rest-service.yaml
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-rest
spec:
type: NodePort
ports:
name: rest
port: 8081
targetPort: 8081
nodePort: 30081
selector:
app: flink
component: jobmanager
kubectl create -f jobmanager-rest-service.yaml
kubectl get svc flink-jobmanager-rest
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
flink-jobmanager-rest NodePort 10.0.0.244 <none> 8081:31319/TCP 161m
(4)提交作业
exec -it flink-jobmanager-5d84d6b644-8jjkm /bin/sh kubectl
./bin/flink run -m flink-jobmanager:8081 ./examples/streaming/TopSpeedWindowing.jar
(5)释放资源
$ kubectl delete -f jobmanager-service.yaml
$ kubectl delete -f flink-configuration-configmap.yaml
$ kubectl delete -f taskmanager-session-deployment.yaml
$ kubectl delete -f jobmanager-session-deployment.yaml