vlambda博客
学习文章列表

Flink安装部署之Flink On Kubernetes Session Model | 清风

基于 Kubernetes 的 Flink Session Model(会话模式)的特点是 集群长期处于运行状态,集群上可以运行多个 Flink 作业,每个作业都需要提交到集群处理,所需的 TaskManager 数量,需要提前确定,不是动态分配的。JobManager 和 TaskManager 的创建是通过 Kubernetes 资源描述文件配置好的,一般通过 kubectl 命令创建出来,TaskManager 会向 ResourceManager 模块注册,当 Flink Client 需要提交作业时,Dispatcher 收到提交的任务请求后,将请求转发给 JobManager,最后再由 JobManager 将任务分发给 TaskManager。

Flink on Kubernetes的Session集群构建过程


要实现一个Flink on Kubernetes的Session模式,需要完成以下几点。

1)实现org.apache.flink.kubernetes.cli.KubernetesSessionCli (mian 程序)来支持Kubernetes的命令行。

2)在org.apache.flink.client.cli.CliFrontend (mian 程序)中实现加载KubernetesSessionCli的命令行。

3)实现继承org.apache.flink.client.deployment.ClusterDescriptor的KubernetesClusterDescriptor,进而实现创建JobManager Service和JobManager Deployment的逻辑。

4)实现JobManager的入口命令和继承自org.apache.flink.kubernetes.entrypoint.SessionClusterEntrypoint (mian 程序)的Kubernetes-ClusterEntrypoint。KubernetesClusterEntrypoint实现创建Dispatcher和ResourceManager组件的工厂类方法createDispatcherResourceManagerComponentFactory,该方法负责启动JobManager的运行时组件和基本服务。

5)实现KubernetesResourceManager类,该类负责根据资源情况往Kubernetes API Server发送申请TaskManager Pod的请求和维护资源的状态。

6)实现TaskManager的入口命令以及入口类org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner(mian 程序),Kubernetes-TaskExecutorRunner通过加载相应的配置构建和启动TaskExecutorRunner。


该模式的优点:
Flink 集群是预先启动运行的。用户提交作业的时候,作业可以立即分配到 TaskManager,即 作业启动速度快。

该模式的缺点:

资源利用率低,提前确定 TaskManager 数量,如果作业需要的资源少,则大量 TaskManager 处于闲置状态。反之则会导致 TaskManager 资源不足。

作业隔离性差,多个作业的任务存在资源竞争,相互影响。如果一个作业异常导致 TaskManager 挂了,该 TaskManager 上的全部作业都会被重启。


部署指导

基于 Kubernetes 部署的一个 Flink 会话集群至少包含下面三个组件:

(1)运行的 JobManager 的 Deployment;

(2)运行一组 TaskManager 的 Deployment;

(3)对外暴露 REST 接口和端口的 Service。


部署步骤如下:

(1)定义配置、服务、deployments

1)创建configmap,flink-configuration-configmap.yaml

apiVersion: v1kind: ConfigMapmetadata: name: flink-config labels: app: flinkdata: flink-conf.yaml: |+ jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 1600m taskmanager.memory.process.size: 1728m parallelism.default: 2  log4j-console.properties: |+ # This affects logging for both user code and Flink rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender # Uncomment this if you want to _only_ change Flink's logging #logger.flink.name = org.apache.flink #logger.flink.level = INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here. logger.akka.name = akka logger.akka.level = INFO logger.kafka.name= org.apache.kafka logger.kafka.level = INFO logger.hadoop.name = org.apache.hadoop logger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO # Log all infos to the console appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Log all infos in the given rolling file appender.rolling.name = RollingFileAppender appender.rolling.type = RollingFile appender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong) warnings from the Netty channel handler logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF 

  

2)创建jobmanager-service.yaml

apiVersion: v1kind: Servicemetadata: name: flink-jobmanagerspec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob-server port: 6124 - name: webui port: 8081 selector: app: flink component: jobmanager


3)创建jobmanager-session-deployment.yaml,非可用版本配置如下

apiVersion: apps/v1kind: Deploymentmetadata: name: flink-jobmanagerspec: replicas: 1 selector: matchLabels: app: flink component: jobmanager template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager image: apache/flink:1.14.3-scala_2.11 args: ["jobmanager"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties


4)创建taskmanager-session-deployment.yaml

apiVersion: apps/v1kind: Deploymentmetadata: name: flink-taskmanagerspec: replicas: 2 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager image: apache/flink:1.14.3-scala_2.11 args: ["taskmanager"] ports: - containerPort: 6122 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf/ securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties


(2)创建configmap,service,JobManager Deployment,TaskManager Deployment

$ kubectl create -f flink-configuration-configmap.yaml$ kubectl create -f jobmanager-service.yaml$ kubectl create -f jobmanager-session-deployment.yaml$ kubectl create -f taskmanager-session-deployment.yaml


验证pod是否创建成功
kubectl get pods
如果有如下内容输出,则表示成功
NAME READY STATUS RESTARTS AGEflink-jobmanager-5d84d6b644-8jjkm 1/1 Running 0 4sflink-taskmanager-9fdb95fd8-9mpgc 1/1 Running 1 14mflink-taskmanager-9fdb95fd8-vbtwv 1/1 Running 1 14m


(3)对外暴露端口

1、使用kubectl proxy方式

kubectl proxy --address='192.0.0.1' -p=8081 --accept-hosts='^*$'

2、使用kubectl port-forward方式

kubectl port-forward ${flink-jobmanager-pod} 8081:8081


3、使用jobmanager-rest-service方式,创建jobmanager-rest-service.yaml

apiVersion: v1kind: Servicemetadata: name: flink-jobmanager-restspec: type: NodePort ports: - name: rest port: 8081 targetPort: 8081 nodePort: 30081 selector: app: flink component: jobmanager


本文中,我们使用 jobmanager-rest-service方式,创建服务
$ kubectl create -f jobmanager-rest-service.yaml


查看端口映射
kubectl get svc flink-jobmanager-restNAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGEflink-jobmanager-rest NodePort 10.0.0.244 <none> 8081:31319/TCP 161m


(4)提交作业

使用命令提交
$kubectl exec -it flink-jobmanager-5d84d6b644-8jjkm /bin/sh$./bin/flink run -m flink-jobmanager:8081 ./examples/streaming/TopSpeedWindowing.jar


(5)释放资源

 $ kubectl delete -f jobmanager-service.yaml $ kubectl delete -f flink-configuration-configmap.yaml $ kubectl delete -f taskmanager-session-deployment.yaml $ kubectl delete -f jobmanager-session-deployment.yaml

   

特点分析
这种类型的 Flink 集群,JobManager 和 TaskManager 是以Kubernetes deployment的形式长期运行在 Kubernetes 集群中。在提交作业之前,必须先创建好 Flink session 集群。多个任务可以同时运行在同一个集群内,任务之间共享 K8S ResourceManager 和 Dispatcher,资源隔离性差,任务间会相互影响,但是 JobManager 是单独的。 这种方式比较适合运行短时作业、即席查询、任务提交频繁、或者对任务启动时长比较敏感的场景。