vlambda博客
学习文章列表

【技术干货】EdgeMesh使用和源码分析

作者/古强


KubeEdge介绍


KubeEdge是一个开源的系统,可将本机容器化应用编排和管理扩展到边缘端设备。它基于Kubernetes构建,为网络和应用程序提供核心基础架构支持,并在云端和边缘端部署应用,同步元数据。KubeEdge 还支持 MQTT 协议,允许开发人员编写客户逻辑,并在边缘端启用设备通信的资源约束。KubeEdge 包含云端和边缘端两部分。


KubeEdge 由以下组件构成:


云上部分


• CloudHub: CloudHub:是一个Web Socket服务端,负责监听云端的变化, 缓存并发送消息到 EdgeHub。

• EdgeController:EdgeController是一个扩展的Kubernetes控制器,管理边缘节点和Pods的元数据确保数据能够传递到指定的边缘节点。

• DeviceController:DeviceController是一个扩展的Kubernetes控制器,管理边缘设备,确保设备信息、设备状态的云边同步。


边缘部分


• EdgeHub:EdgeHub是一个Web Socket客户端,负责与边缘计算的云服务(例如KubeEdge架构图中的Edge Controller)交互,包括同步云端资源更新、报告边缘主机和设备状态变化到云端等功能。

• Edged:Edged是运行在边缘节点的代理,用于管理容器化的应用程序。

• EventBus:EventBus是一个与MQTT服务器(mosquitto)交互的MQTT客户端,为其他组件提供订阅和发布功能。

• ServiceBus:ServiceBus是一个运行在边缘的HTTP客户端,接受来自云上服务的请求,与运行在边缘端的HTTP服务器交互,提供了云上服务通过HTTP协议访问边缘端HTTP服务器的能力。

• DeviceTwin:DeviceTwin负责存储设备状态并将设备状态同步到云,它还为应用程序提供查询接口。

• MetaManager:MetaManager是消息处理器,位于Edged和Edgehub之间,它负责向轻量级数据库(SQLite)存储/检索元数据。


架构


【技术干货】EdgeMesh使用和源码分析


问题所在


KubeEdge为在边缘侧做到轻量化,对k8s的组件进行了大量精简:以EdgeCore代替Kubelet,实际上是一个裁剪的Kubelet;没有CNI支持,无容器网络。因此运行在边缘节点的Pod犹如一个个孤岛,无法利用k8s内置Service做服务发现。


解决方法


因此,KubeEdge引入了EdgeMesh作为服务发现框架,可以在边缘节点间提供基于k8s Service的服务发现能力,同时支持边缘到云端的单向服务访问。


限制


1.EdgeMesh目前只支持http 1.x服务的转发;

2.Service域名解析依赖docker0网桥,只支持docker做CRI;

3.没有健康检查;

4.访问云端服务要求节点具有公网IP。


使用方法


创建工作负载


kind: DeploymentapiVersion: apps/v1metadata:  name: edge-nginx  namespace: defaultspec:  replicas: 1  selector:    matchLabels:      app: edge-nginx  template:    metadata:      labels:        app: edge-nginx    spec:      containers:        - name: container1          image: 'nginx:latest'          ports:            - hostPort: 80 # 务必填写hostPort和containerPort              containerPort: 80 # 请求转发依赖于hostPort              protocol: TCP      affinity:        nodeAffinity:          requiredDuringSchedulingIgnoredDuringExecution:            nodeSelectorTerms:              - matchExpressions:                  - key: node-role.kubernetes.io/edge                    operator: Exists


验证pod服务正常


使用podIp访问服务


[root@localhost ~]# curl 172.17.0.3<!DOCTYPE html>...[root@localhost ~]# 创建Servicekind: ServiceapiVersion: v1metadata:  name: edge-nginx  namespace: defaultspec:  ports:    - name: http-0 # 名称务必以http开头,表明是http服务      protocol: TCP      port: 18080      targetPort: 80  selector:    app: edge-nginx  clusterIP: None # 不分配ClusterIP


测试连通性


sh-4.4# curl edge-nginx.default.svc.cluster.local:18080<!DOCTYPE html>...sh-4.4


注意访问时最好使用完整域名访问,若Service在default命名空间则可以简写。


工作原理


一次完整的HTTP请求过程如下图。EdgeMesh会在DNS解析和服务负载均衡两处劫持请求。


【技术干货】EdgeMesh使用和源码分析


劫持DNS请求


EdgeMesh启动时,会改写/etc/resolv.conf,将docker0网桥设置为第一个dns服务器。并且在一个协程中,使用UDP监听docker0网桥的53端口进行域名解析。


改写/etc/resolv.conf


// ensureResolvForHost adds edgemesh dns server to the head of /etc/resolv.conffunc ensureResolvForHost() {    ...}


监听dns请求


// startDNS starts edgemesh dns serverfunc startDNS() {    // init meta client    metaClient = client.New()    // get dns listen ip    lip, err := common.GetInterfaceIP(ifi) // ifi=docker0    // ...    laddr := &net.UDPAddr{        IP:   lip,        Port: 53,    }    udpConn, err := net.ListenUDP("udp", laddr)  // ...    defer udpConn.Close()    dnsConn = udpConn    for {        req := make([]byte, bufSize)        n, from, err := dnsConn.ReadFromUDP(req)        // ...        que, err := parseDNSQuery(req[:n])        if err != nil {            continue        }
       que.from = from
       rsp := make([]byte, 0)        rsp, err = recordHandle(que, req[:n])        // ...        if _, err = dnsConn.WriteTo(rsp, from); err != nil {            klog.Warningf("[EdgeMesh] failed to write: %v", err)        }    }}


之后,在StartDNS方法的无限循环中,recordHandle方法会进行DNS解析。


// recordHandle returns the answer for the dns questionfunc recordHandle(que *dnsQuestion, req []byte) (rsp []byte, err error) {    var exist bool    var ip string    // qType should be 1 for ipv4    if que.name != nil && que.qType == aRecord {        domainName := string(que.name)        exist, ip = lookupFromMetaManager(domainName)    }
   if !exist || que.event == eventUpstream {        // if this service doesn't belongs to this cluster        go getFromRealDNS(req, que.from)        return rsp, fmt.Errorf("get from real dns")    }
   address := net.ParseIP(ip).To4()    if address == nil {        que.event = eventNxDomain    }    // gen    pre := modifyRspPrefix(que)    rsp = append(rsp, pre...)    if que.event != eventNothing {        return rsp, nil    }    // create a deceptive resp, if no error    dnsAns := &dnsAnswer{        name:    que.name,        qType:   que.qType,        qClass:  que.qClass,        ttl:     ttl,        dataLen: uint16(len(address)),        addr:    address,    }    ans := dnsAns.getAnswer()    rsp = append(rsp, ans...)
   return rsp, nil}


recordHandle方法的逻辑很清晰,首先尝试从metaManager获取域名所表示的Service。如果域名不符合<service_name>.<service_namespace>.svc.<cluster>.<local>格式,那么使用/etc/resolv.conf中的DNS服务器进行解析;否则,返回一个EdgeMesh管理的fake IP,fake IP默认属于网段9.251.0.0/16,Fake IP更详细的介绍请参阅Fake IP分配章节。


建立TCP连接


经过DNS解析后,客户端得到的是一个9.251.0.0/16网段的IP。EdgeMesh通过iptables规则将这个网段的流量转发到172.17.0.1:40001端口,该端口由EdgeMesh的lister监听。


iptables -t nat -A PREROUTING -d 9.251.0.0/16 -i docker0 -p tcp -j EDGE-MESH
iptables -t nat -A EDGE-MESH -p tcp -j DNAT --to-destination 172.17.0.1:40001


建立TCP连接的请求进入到lister后,目标IP被从请求中提取出来,再交给go-chassis框架进行负载均衡。


// Start starts the EdgeMesh listenerfunc Start() {    for {        conn, err := config.Config.Listener.Accept()        // ...        ip, port, err := realServerAddress(&conn) // 提取目标IP        // ...        proto, err := newProtocolFromSock(ip, port, conn)    // ...        go proto.Process()    }}



服务负载均衡


go-chassis提供了一系列接口,其中包括了发现服务实际后端的接口FindMicroServiceInstances。


// FindMicroServiceInstances find micro-service instances (subnets)func (esd *EdgeServiceDiscovery) FindMicroServiceInstances(consumerID, microServiceName string, tags utiltags.Tags) ([]*registry.MicroServiceInstance, error) {    // parse microServiceName    name, namespace, port, err := parseServiceURL(microServiceName)  ...    // get service    service, err := esd.getService(name, namespace)    ...    // get pods    pods, err := esd.getPods(name, namespace)    ...    // get targetPort    var targetPort int    for _, p := range service.Spec.Ports {        if p.Protocol == "TCP" && int(p.Port) == port {            targetPort = p.TargetPort.IntValue()            break        }    }    // port not found    if targetPort == 0 {        ...    }
   // gen    var microServiceInstances []*registry.MicroServiceInstance    var hostPort int32    // all pods share the same hostport, get from pods[0]    if pods[0].Spec.HostNetwork {        // host network        hostPort = int32(targetPort)    } else {        // container network        for _, container := range pods[0].Spec.Containers {            for _, port := range container.Ports {                if port.ContainerPort == int32(targetPort) {                    hostPort = port.HostPort                }            }        }    }    for _, p := range pods {        if p.Status.Phase == v1.PodRunning {            microServiceInstances = append(microServiceInstances, &registry.MicroServiceInstance{                InstanceID:   "",                ServiceID:    name + "." + namespace,                HostName:     "",                EndpointsMap: map[string]string{"rest": fmt.Sprintf("%s:%d", p.Status.HostIP, hostPort)},            })        }    }
   return microServiceInstances, nil


可以看到,EdgeMesh目前支持HostNetwork和ContainerNetwork两种网络类型,无论哪种方式,都需要在宿主机暴露访问端口。同时,EdgeMesh没有配置后端服务的健康检查,因此无法避免将流量转发的到隔离的网络或者Pod不可用的宿主机上。


TCP连接升级


当服务后端准备就绪后,EdgeMesh是如何知道一个TCP连接应该交给哪个后端处理呢?我们继续分析go-chassis提供的Protocol接口,EdgeMesh实现了一个处理http协议的Protocol接口对象。


// Process handles http protocolfunc (p *HTTP) Process() {    defer p.Conn.Close()
   for {        // parse http request        req, err := http.ReadRequest(bufio.NewReader(p.Conn))        ...        // http: Request.RequestURI can't be set in client requests        // just reset it before transport        req.RequestURI = ""
       // create invocation        inv := invocation.New(context.Background())
       // set invocation        inv.MicroServiceName = req.Host        inv.SourceServiceID = ""        inv.Protocol = "rest"        inv.Strategy = config.Config.LBStrategy        inv.Args = req        inv.Reply = &http.Response{}
       // create handlerchain        c, err := handler.CreateChain(common.Consumer, "http", handler.Loadbalance, handler.Transport)        ...        // start to handle        p.req = req        c.Next(inv, p.responseCallback)    }


EdgeMesh首先将TCP连接转换为http连接,这里的实现方法只支持http 1.x版本。然后根据请求的Host构造一个go-chassis所用的invocation,交给go-chassis框架进行负载均衡和透明转发。


所以,EdgeMesh完整的转发流程是这样的:



Fake IP分配


EdgeMesh至关重要的一步,是如何为Service分配Fake IP。EdgeMesh中listener的MsgProcess方法会处理edgehub发送过来的Service事件和Pod事件。


// MsgProcess processes messages from metaManagerfunc MsgProcess(msg model.Message) {    // process services    if svcs := filterResourceTypeService(msg); len(svcs) != 0 {        ...        for i := range svcs {            svcName := svcs[i].Namespace + "." + svcs[i].Name            svcPorts := getSvcPorts(svcs[i], svcName)            switch msg.GetOperation() {            case "insert":                cache.GetMeshCache().Add("service"+"."+svcName, &svcs[i])                ...                addServer(svcName, svcPorts)            case "update":                cache.GetMeshCache().Add("service"+"."+svcName, &svcs[i])                ...                updateServer(svcName, svcPorts)            case "delete":                cache.GetMeshCache().Remove("service" + "." + svcName)                ...                delServer(svcName)            default:                ...            }        }        return    }    // process pods    if getResourceType(msg.GetResource()) == model.ResourceTypePodlist {        ...        pods := make([]v1.Pod, 0)        content, err := json.Marshal(msg.GetContent())        ...        pods, err = handlePodListMessage(content)        ...        podListName := getResourceName(msg.GetResource())        podListNamespace := getResourceNamespace(msg.GetResource())        switch msg.GetOperation() {        case "insert", "update":            cache.GetMeshCache().Add("pods"+"."+podListNamespace+"."+podListName, pods)            ...        case "delete":            cache.GetMeshCache().Remove("pods" + "." + podListNamespace + "." + podListName)            ...        default:            ...        }    }}


getSvcPorts方法会以protocol,svcPort,containerPort|svcName的格式组织服务转发信息。protocol从svc.spec.ports[*].name字段获取,又由于EdgeMesh目前只支持http服务,所以要求ports的name要符合http-数字编号的格式。


然后,就是分配Fake IP。


// addServer adds a serverfunc addServer(svcName, svcPorts string) {    ip := svcDesc.getIP(svcName)    ...    if len(unused) == 0 {        // try to expand        expandPool()        ...    }    ip = unused[0]    unused = unused[1:]
   svcDesc.set(svcName, ip, svcPorts)    err := metaClient.Listener().Add(svcName, ip)    ...}


IP不足时,expandPool方法尝试创建更多IP。


// expandPool expands fakeIP pool, each time with size of 256func expandPool() {    end := indexOfPool + uint16(255)    for ; indexOfPool <= end; indexOfPool++ {        // avoid 255.255        if indexOfPool > maxPoolSize {            return        }        ip := defaultNetworkPrefix + getSubNet(indexOfPool)        // if ip is not used, append it to unused        if svcDesc.getSvcPorts(ip) == "" {            unused = append(unused, ip)        }    }}


EdgeMesh会将DNS记录存入SQLite数据库,以便重启后恢复。因此对于新生成的IP,需要检查是否正在使用。由于Fake IP默认所属网段是9.251.0.0/16,最多能分配的IP为65534个。同时,EdgeMesh没有保证多个节点对同一个服务分配相同的Fake IP,因为dns解析和负载均衡都是在服务请求方节点的EdgeMesh完成的,不需要这种保证。


-End-