【技术干货】EdgeMesh使用和源码分析
作者/古强
KubeEdge介绍
KubeEdge是一个开源的系统,可将本机容器化应用编排和管理扩展到边缘端设备。它基于Kubernetes构建,为网络和应用程序提供核心基础架构支持,并在云端和边缘端部署应用,同步元数据。KubeEdge 还支持 MQTT 协议,允许开发人员编写客户逻辑,并在边缘端启用设备通信的资源约束。KubeEdge 包含云端和边缘端两部分。
KubeEdge 由以下组件构成:
云上部分
• CloudHub: CloudHub:是一个Web Socket服务端,负责监听云端的变化, 缓存并发送消息到 EdgeHub。
• EdgeController:EdgeController是一个扩展的Kubernetes控制器,管理边缘节点和Pods的元数据确保数据能够传递到指定的边缘节点。
• DeviceController:DeviceController是一个扩展的Kubernetes控制器,管理边缘设备,确保设备信息、设备状态的云边同步。
边缘部分
• EdgeHub:EdgeHub是一个Web Socket客户端,负责与边缘计算的云服务(例如KubeEdge架构图中的Edge Controller)交互,包括同步云端资源更新、报告边缘主机和设备状态变化到云端等功能。
• Edged:Edged是运行在边缘节点的代理,用于管理容器化的应用程序。
• EventBus:EventBus是一个与MQTT服务器(mosquitto)交互的MQTT客户端,为其他组件提供订阅和发布功能。
• ServiceBus:ServiceBus是一个运行在边缘的HTTP客户端,接受来自云上服务的请求,与运行在边缘端的HTTP服务器交互,提供了云上服务通过HTTP协议访问边缘端HTTP服务器的能力。
• DeviceTwin:DeviceTwin负责存储设备状态并将设备状态同步到云,它还为应用程序提供查询接口。
• MetaManager:MetaManager是消息处理器,位于Edged和Edgehub之间,它负责向轻量级数据库(SQLite)存储/检索元数据。
架构
问题所在
KubeEdge为在边缘侧做到轻量化,对k8s的组件进行了大量精简:以EdgeCore代替Kubelet,实际上是一个裁剪的Kubelet;没有CNI支持,无容器网络。因此运行在边缘节点的Pod犹如一个个孤岛,无法利用k8s内置Service做服务发现。
解决方法
因此,KubeEdge引入了EdgeMesh作为服务发现框架,可以在边缘节点间提供基于k8s Service的服务发现能力,同时支持边缘到云端的单向服务访问。
限制
1.EdgeMesh目前只支持http 1.x服务的转发;
2.Service域名解析依赖docker0网桥,只支持docker做CRI;
3.没有健康检查;
4.访问云端服务要求节点具有公网IP。
使用方法
创建工作负载
kind: DeploymentapiVersion: apps/v1metadata:name: edge-nginxnamespace: defaultspec:replicas: 1selector:matchLabels:app: edge-nginxtemplate:metadata:labels:app: edge-nginxspec:containers:- name: container1image: 'nginx:latest'ports:- hostPort: 80 # 务必填写hostPort和containerPortcontainerPort: 80 # 请求转发依赖于hostPortprotocol: TCPaffinity:nodeAffinity:requiredDuringSchedulingIgnoredDuringExecution:nodeSelectorTerms:- matchExpressions:- key: node-role.kubernetes.io/edgeoperator: Exists
验证pod服务正常
使用podIp访问服务
[root@localhost ~]# curl 172.17.0.3<!DOCTYPE html>...[root@localhost ~]#创建Servicekind: ServiceapiVersion: v1metadata:name: edge-nginxnamespace: defaultspec:ports:- name: http-0 # 名称务必以http开头,表明是http服务protocol: TCPport: 18080targetPort: 80selector:app: edge-nginxclusterIP: None # 不分配ClusterIP
测试连通性
sh-4.4# curl edge-nginx.default.svc.cluster.local:18080<!DOCTYPE html>...sh-4.4
注意访问时最好使用完整域名访问,若Service在default命名空间则可以简写。
工作原理
一次完整的HTTP请求过程如下图。EdgeMesh会在DNS解析和服务负载均衡两处劫持请求。
劫持DNS请求
EdgeMesh启动时,会改写/etc/resolv.conf,将docker0网桥设置为第一个dns服务器。并且在一个协程中,使用UDP监听docker0网桥的53端口进行域名解析。
改写/etc/resolv.conf
// ensureResolvForHost adds edgemesh dns server to the head of /etc/resolv.conffunc ensureResolvForHost() {...}
监听dns请求
// startDNS starts edgemesh dns serverfunc startDNS() {// init meta clientmetaClient = client.New()// get dns listen iplip, err := common.GetInterfaceIP(ifi) // ifi=docker0// ...laddr := &net.UDPAddr{IP: lip,Port: 53,}udpConn, err := net.ListenUDP("udp", laddr)// ...defer udpConn.Close()dnsConn = udpConnfor {req := make([]byte, bufSize)n, from, err := dnsConn.ReadFromUDP(req)// ...que, err := parseDNSQuery(req[:n])if err != nil {continue}que.from = fromrsp := make([]byte, 0)rsp, err = recordHandle(que, req[:n])// ...if _, err = dnsConn.WriteTo(rsp, from); err != nil {klog.Warningf("[EdgeMesh] failed to write: %v", err)}}}
之后,在StartDNS方法的无限循环中,recordHandle方法会进行DNS解析。
// recordHandle returns the answer for the dns questionfunc recordHandle(que *dnsQuestion, req []byte) (rsp []byte, err error) {var exist boolvar ip string// qType should be 1 for ipv4if que.name != nil && que.qType == aRecord {domainName := string(que.name)exist, ip = lookupFromMetaManager(domainName)}if !exist || que.event == eventUpstream {// if this service doesn't belongs to this clustergo getFromRealDNS(req, que.from)return rsp, fmt.Errorf("get from real dns")}address := net.ParseIP(ip).To4()if address == nil {que.event = eventNxDomain}// genpre := modifyRspPrefix(que)rsp = append(rsp, pre...)if que.event != eventNothing {return rsp, nil}// create a deceptive resp, if no errordnsAns := &dnsAnswer{name: que.name,qType: que.qType,qClass: que.qClass,ttl: ttl,dataLen: uint16(len(address)),addr: address,}ans := dnsAns.getAnswer()rsp = append(rsp, ans...)return rsp, nil}
recordHandle方法的逻辑很清晰,首先尝试从metaManager获取域名所表示的Service。如果域名不符合<service_name>.<service_namespace>.svc.<cluster>.<local>格式,那么使用/etc/resolv.conf中的DNS服务器进行解析;否则,返回一个EdgeMesh管理的fake IP,fake IP默认属于网段9.251.0.0/16,Fake IP更详细的介绍请参阅Fake IP分配章节。
建立TCP连接
经过DNS解析后,客户端得到的是一个9.251.0.0/16网段的IP。EdgeMesh通过iptables规则将这个网段的流量转发到172.17.0.1:40001端口,该端口由EdgeMesh的lister监听。
iptables -t nat -A PREROUTING -d 9.251.0.0/16 -i docker0 -p tcp -j EDGE-MESH
iptables -t nat -A EDGE-MESH -p tcp -j DNAT --to-destination 172.17.0.1:40001
建立TCP连接的请求进入到lister后,目标IP被从请求中提取出来,再交给go-chassis框架进行负载均衡。
// Start starts the EdgeMesh listenerfunc Start() {for {conn, err := config.Config.Listener.Accept()// ...ip, port, err := realServerAddress(&conn) // 提取目标IP// ...proto, err := newProtocolFromSock(ip, port, conn)// ...go proto.Process()}}
服务负载均衡
go-chassis提供了一系列接口,其中包括了发现服务实际后端的接口FindMicroServiceInstances。
// FindMicroServiceInstances find micro-service instances (subnets)func (esd *EdgeServiceDiscovery) FindMicroServiceInstances(consumerID, microServiceName string, tags utiltags.Tags) ([]*registry.MicroServiceInstance, error) {// parse microServiceNamename, namespace, port, err := parseServiceURL(microServiceName)...// get serviceservice, err := esd.getService(name, namespace)...// get podspods, err := esd.getPods(name, namespace)...// get targetPortvar targetPort intfor _, p := range service.Spec.Ports {if p.Protocol == "TCP" && int(p.Port) == port {targetPort = p.TargetPort.IntValue()break}}// port not foundif targetPort == 0 {...}// genvar microServiceInstances []*registry.MicroServiceInstancevar hostPort int32// all pods share the same hostport, get from pods[0]if pods[0].Spec.HostNetwork {// host networkhostPort = int32(targetPort)} else {// container networkfor _, container := range pods[0].Spec.Containers {for _, port := range container.Ports {if port.ContainerPort == int32(targetPort) {hostPort = port.HostPort}}}}for _, p := range pods {if p.Status.Phase == v1.PodRunning {microServiceInstances = append(microServiceInstances, ®istry.MicroServiceInstance{InstanceID: "",ServiceID: name + "." + namespace,HostName: "",EndpointsMap: map[string]string{"rest": fmt.Sprintf("%s:%d", p.Status.HostIP, hostPort)},})}}return microServiceInstances, nil
可以看到,EdgeMesh目前支持HostNetwork和ContainerNetwork两种网络类型,无论哪种方式,都需要在宿主机暴露访问端口。同时,EdgeMesh没有配置后端服务的健康检查,因此无法避免将流量转发的到隔离的网络或者Pod不可用的宿主机上。
TCP连接升级
当服务后端准备就绪后,EdgeMesh是如何知道一个TCP连接应该交给哪个后端处理呢?我们继续分析go-chassis提供的Protocol接口,EdgeMesh实现了一个处理http协议的Protocol接口对象。
// Process handles http protocolfunc (p *HTTP) Process() {defer p.Conn.Close()for {// parse http requestreq, err := http.ReadRequest(bufio.NewReader(p.Conn))...// http: Request.RequestURI can't be set in client requests// just reset it before transportreq.RequestURI = ""// create invocationinv := invocation.New(context.Background())// set invocationinv.MicroServiceName = req.Hostinv.SourceServiceID = ""inv.Protocol = "rest"inv.Strategy = config.Config.LBStrategyinv.Args = reqinv.Reply = &http.Response{}// create handlerchainc, err := handler.CreateChain(common.Consumer, "http", handler.Loadbalance, handler.Transport)...// start to handlep.req = reqc.Next(inv, p.responseCallback)}
EdgeMesh首先将TCP连接转换为http连接,这里的实现方法只支持http 1.x版本。然后根据请求的Host构造一个go-chassis所用的invocation,交给go-chassis框架进行负载均衡和透明转发。
所以,EdgeMesh完整的转发流程是这样的:
Fake IP分配
EdgeMesh至关重要的一步,是如何为Service分配Fake IP。EdgeMesh中listener的MsgProcess方法会处理edgehub发送过来的Service事件和Pod事件。
// MsgProcess processes messages from metaManagerfunc MsgProcess(msg model.Message) {// process servicesif svcs := filterResourceTypeService(msg); len(svcs) != 0 {...for i := range svcs {svcName := svcs[i].Namespace + "." + svcs[i].NamesvcPorts := getSvcPorts(svcs[i], svcName)switch msg.GetOperation() {case "insert":cache.GetMeshCache().Add("service"+"."+svcName, &svcs[i])...addServer(svcName, svcPorts)case "update":cache.GetMeshCache().Add("service"+"."+svcName, &svcs[i])...updateServer(svcName, svcPorts)case "delete":cache.GetMeshCache().Remove("service" + "." + svcName)...delServer(svcName)default:...}}return}// process podsif getResourceType(msg.GetResource()) == model.ResourceTypePodlist {...pods := make([]v1.Pod, 0)content, err := json.Marshal(msg.GetContent())...pods, err = handlePodListMessage(content)...podListName := getResourceName(msg.GetResource())podListNamespace := getResourceNamespace(msg.GetResource())switch msg.GetOperation() {case "insert", "update":cache.GetMeshCache().Add("pods"+"."+podListNamespace+"."+podListName, pods)...case "delete":cache.GetMeshCache().Remove("pods" + "." + podListNamespace + "." + podListName)...default:...}}}
getSvcPorts方法会以protocol,svcPort,containerPort|svcName的格式组织服务转发信息。protocol从svc.spec.ports[*].name字段获取,又由于EdgeMesh目前只支持http服务,所以要求ports的name要符合http-数字编号的格式。
然后,就是分配Fake IP。
// addServer adds a serverfunc addServer(svcName, svcPorts string) {ip := svcDesc.getIP(svcName)...if len(unused) == 0 {// try to expandexpandPool()...}ip = unused[0]unused = unused[1:]svcDesc.set(svcName, ip, svcPorts)err := metaClient.Listener().Add(svcName, ip)...}
IP不足时,expandPool方法尝试创建更多IP。
// expandPool expands fakeIP pool, each time with size of 256func expandPool() {end := indexOfPool + uint16(255)for ; indexOfPool <= end; indexOfPool++ {// avoid 255.255if indexOfPool > maxPoolSize {return}ip := defaultNetworkPrefix + getSubNet(indexOfPool)// if ip is not used, append it to unusedif svcDesc.getSvcPorts(ip) == "" {unused = append(unused, ip)}}}
EdgeMesh会将DNS记录存入SQLite数据库,以便重启后恢复。因此对于新生成的IP,需要检查是否正在使用。由于Fake IP默认所属网段是9.251.0.0/16,最多能分配的IP为65534个。同时,EdgeMesh没有保证多个节点对同一个服务分配相同的Fake IP,因为dns解析和负载均衡都是在服务请求方节点的EdgeMesh完成的,不需要这种保证。
-End-
