Flink安装部署之Flink On Kubernetes Session Model | 清风
Flink on Kubernetes的Session集群构建过程
1)实现org.apache.flink.kubernetes.cli.KubernetesSessionCli (mian 程序)来支持Kubernetes的命令行。
2)在org.apache.flink.client.cli.CliFrontend (mian 程序)中实现加载KubernetesSessionCli的命令行。
3)实现继承org.apache.flink.client.deployment.ClusterDescriptor的KubernetesClusterDescriptor,进而实现创建JobManager Service和JobManager Deployment的逻辑。
4)实现JobManager的入口命令和继承自org.apache.flink.kubernetes.entrypoint.SessionClusterEntrypoint (mian 程序)的Kubernetes-ClusterEntrypoint。KubernetesClusterEntrypoint实现创建Dispatcher和ResourceManager组件的工厂类方法createDispatcherResourceManagerComponentFactory,该方法负责启动JobManager的运行时组件和基本服务。
5)实现KubernetesResourceManager类,该类负责根据资源情况往Kubernetes API Server发送申请TaskManager Pod的请求和维护资源的状态。
6)实现TaskManager的入口命令以及入口类org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner(mian 程序),Kubernetes-TaskExecutorRunner通过加载相应的配置构建和启动TaskExecutorRunner。
① 资源利用率低,提前确定 TaskManager 数量,如果作业需要的资源少,则大量 TaskManager 处于闲置状态。反之则会导致 TaskManager 资源不足。
② 作业隔离性差,多个作业的任务存在资源竞争,相互影响。如果一个作业异常导致 TaskManager 挂了,该 TaskManager 上的全部作业都会被重启。
(1)运行的 JobManager 的 Deployment;
(2)运行一组 TaskManager 的 Deployment;
(3)对外暴露 REST 接口和端口的 Service。
(1)定义配置、服务、deployments
1)创建configmap,flink-configuration-configmap.yaml
apiVersion: v1kind: ConfigMapmetadata:name: flink-configlabels:app: flinkdata:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122queryable-state.proxy.ports: 6125jobmanager.memory.process.size: 1600mtaskmanager.memory.process.size: 1728mparallelism.default: 2log4j-console.properties: |+# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF
2)创建jobmanager-service.yaml
apiVersion: v1kind: Servicemetadata:name: flink-jobmanagerspec:type: ClusterIPports:name: rpcport: 6123name: blob-serverport: 6124name: webuiport: 8081selector:app: flinkcomponent: jobmanager
3)创建jobmanager-session-deployment.yaml,非可用版本配置如下
apiVersion: apps/v1kind: Deploymentmetadata:name: flink-jobmanagerspec:replicas: 1selector:matchLabels:app: flinkcomponent: jobmanagertemplate:metadata:labels:app: flinkcomponent: jobmanagerspec:containers:name: jobmanagerimage: apache/flink:1.14.3-scala_2.11args: ["jobmanager"]ports:containerPort: 6123name: rpccontainerPort: 6124name: blob-servercontainerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:name: flink-config-volumemountPath: /opt/flink/confsecurityContext:runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessaryvolumes:name: flink-config-volumeconfigMap:name: flink-configitems:key: flink-conf.yamlpath: flink-conf.yamlkey: log4j-console.propertiespath: log4j-console.properties
4)创建taskmanager-session-deployment.yaml
apiVersion: apps/v1kind: Deploymentmetadata:name: flink-taskmanagerspec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerspec:containers:name: taskmanagerimage: apache/flink:1.14.3-scala_2.11args: ["taskmanager"]ports:containerPort: 6122name: rpccontainerPort: 6125name: query-statelivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 30periodSeconds: 60volumeMounts:name: flink-config-volumemountPath: /opt/flink/conf/securityContext:runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessaryvolumes:name: flink-config-volumeconfigMap:name: flink-configitems:key: flink-conf.yamlpath: flink-conf.yamlkey: log4j-console.propertiespath: log4j-console.properties
(2)创建configmap,service,JobManager Deployment,TaskManager Deployment
$ kubectl create -f flink-configuration-configmap.yaml$ kubectl create -f jobmanager-service.yaml$ kubectl create -f jobmanager-session-deployment.yaml$ kubectl create -f taskmanager-session-deployment.yaml
kubectl get pods
NAME READY STATUS RESTARTS AGEflink-jobmanager-5d84d6b644-8jjkm 1/1 Running 0 4sflink-taskmanager-9fdb95fd8-9mpgc 1/1 Running 1 14mflink-taskmanager-9fdb95fd8-vbtwv 1/1 Running 1 14m
(3)对外暴露端口
1、使用kubectl proxy方式
kubectl proxy --address='192.0.0.1' -p=8081 --accept-hosts='^*$'
2、使用kubectl port-forward方式
kubectl port-forward ${flink-jobmanager-pod} 8081:8081
3、使用jobmanager-rest-service方式,创建jobmanager-rest-service.yaml
apiVersion: v1kind: Servicemetadata:name: flink-jobmanager-restspec:type: NodePortports:name: restport: 8081targetPort: 8081nodePort: 30081selector:app: flinkcomponent: jobmanager
kubectl create -f jobmanager-rest-service.yaml
kubectl get svc flink-jobmanager-restNAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGEflink-jobmanager-rest NodePort 10.0.0.244 <none> 8081:31319/TCP 161m
(4)提交作业
kubectl exec -it flink-jobmanager-5d84d6b644-8jjkm /bin/sh./bin/flink run -m flink-jobmanager:8081 ./examples/streaming/TopSpeedWindowing.jar
(5)释放资源
$ kubectl delete -f jobmanager-service.yaml$ kubectl delete -f flink-configuration-configmap.yaml$ kubectl delete -f taskmanager-session-deployment.yaml$ kubectl delete -f jobmanager-session-deployment.yaml
