【技术干货】EdgeMesh使用和源码分析
作者/古强
KubeEdge介绍
KubeEdge是一个开源的系统,可将本机容器化应用编排和管理扩展到边缘端设备。它基于Kubernetes构建,为网络和应用程序提供核心基础架构支持,并在云端和边缘端部署应用,同步元数据。KubeEdge 还支持 MQTT 协议,允许开发人员编写客户逻辑,并在边缘端启用设备通信的资源约束。KubeEdge 包含云端和边缘端两部分。
KubeEdge 由以下组件构成:
云上部分
• CloudHub: CloudHub:是一个Web Socket服务端,负责监听云端的变化, 缓存并发送消息到 EdgeHub。
• EdgeController:EdgeController是一个扩展的Kubernetes控制器,管理边缘节点和Pods的元数据确保数据能够传递到指定的边缘节点。
• DeviceController:DeviceController是一个扩展的Kubernetes控制器,管理边缘设备,确保设备信息、设备状态的云边同步。
边缘部分
• EdgeHub:EdgeHub是一个Web Socket客户端,负责与边缘计算的云服务(例如KubeEdge架构图中的Edge Controller)交互,包括同步云端资源更新、报告边缘主机和设备状态变化到云端等功能。
• Edged:Edged是运行在边缘节点的代理,用于管理容器化的应用程序。
• EventBus:EventBus是一个与MQTT服务器(mosquitto)交互的MQTT客户端,为其他组件提供订阅和发布功能。
• ServiceBus:ServiceBus是一个运行在边缘的HTTP客户端,接受来自云上服务的请求,与运行在边缘端的HTTP服务器交互,提供了云上服务通过HTTP协议访问边缘端HTTP服务器的能力。
• DeviceTwin:DeviceTwin负责存储设备状态并将设备状态同步到云,它还为应用程序提供查询接口。
• MetaManager:MetaManager是消息处理器,位于Edged和Edgehub之间,它负责向轻量级数据库(SQLite)存储/检索元数据。
架构
问题所在
KubeEdge为在边缘侧做到轻量化,对k8s的组件进行了大量精简:以EdgeCore代替Kubelet,实际上是一个裁剪的Kubelet;没有CNI支持,无容器网络。因此运行在边缘节点的Pod犹如一个个孤岛,无法利用k8s内置Service做服务发现。
解决方法
因此,KubeEdge引入了EdgeMesh作为服务发现框架,可以在边缘节点间提供基于k8s Service的服务发现能力,同时支持边缘到云端的单向服务访问。
限制
1.EdgeMesh目前只支持http 1.x服务的转发;
2.Service域名解析依赖docker0网桥,只支持docker做CRI;
3.没有健康检查;
4.访问云端服务要求节点具有公网IP。
使用方法
创建工作负载
kind: Deployment
apiVersion: apps/v1
metadata:
name: edge-nginx
namespace: default
spec:
replicas: 1
selector:
matchLabels:
app: edge-nginx
template:
metadata:
labels:
app: edge-nginx
spec:
containers:
- name: container1
image: 'nginx:latest'
ports:
- hostPort: 80 # 务必填写hostPort和containerPort
containerPort: 80 # 请求转发依赖于hostPort
protocol: TCP
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: node-role.kubernetes.io/edge
operator: Exists
验证pod服务正常
使用podIp访问服务
[root@localhost ~]# curl 172.17.0.3
<!DOCTYPE html>
...
[root@localhost ~]#
创建Service
kind: Service
apiVersion: v1
metadata:
name: edge-nginx
namespace: default
spec:
ports:
- name: http-0 # 名称务必以http开头,表明是http服务
protocol: TCP
port: 18080
targetPort: 80
selector:
app: edge-nginx
clusterIP: None # 不分配ClusterIP
测试连通性
sh-4.4# curl edge-nginx.default.svc.cluster.local:18080
<!DOCTYPE html>
...
sh-4.4
注意访问时最好使用完整域名访问,若Service在default命名空间则可以简写。
工作原理
一次完整的HTTP请求过程如下图。EdgeMesh会在DNS解析和服务负载均衡两处劫持请求。
劫持DNS请求
EdgeMesh启动时,会改写/etc/resolv.conf,将docker0网桥设置为第一个dns服务器。并且在一个协程中,使用UDP监听docker0网桥的53端口进行域名解析。
改写/etc/resolv.conf
// ensureResolvForHost adds edgemesh dns server to the head of /etc/resolv.conf
func ensureResolvForHost() {
...
}
监听dns请求
// startDNS starts edgemesh dns server
func startDNS() {
// init meta client
metaClient = client.New()
// get dns listen ip
lip, err := common.GetInterfaceIP(ifi) // ifi=docker0
// ...
laddr := &net.UDPAddr{
IP: lip,
Port: 53,
}
udpConn, err := net.ListenUDP("udp", laddr)
// ...
defer udpConn.Close()
dnsConn = udpConn
for {
req := make([]byte, bufSize)
n, from, err := dnsConn.ReadFromUDP(req)
// ...
que, err := parseDNSQuery(req[:n])
if err != nil {
continue
}
que.from = from
rsp := make([]byte, 0)
rsp, err = recordHandle(que, req[:n])
// ...
if _, err = dnsConn.WriteTo(rsp, from); err != nil {
klog.Warningf("[EdgeMesh] failed to write: %v", err)
}
}
}
之后,在StartDNS方法的无限循环中,recordHandle方法会进行DNS解析。
// recordHandle returns the answer for the dns question
func recordHandle(que *dnsQuestion, req []byte) (rsp []byte, err error) {
var exist bool
var ip string
// qType should be 1 for ipv4
if que.name != nil && que.qType == aRecord {
domainName := string(que.name)
exist, ip = lookupFromMetaManager(domainName)
}
if !exist || que.event == eventUpstream {
// if this service doesn't belongs to this cluster
go getFromRealDNS(req, que.from)
return rsp, fmt.Errorf("get from real dns")
}
address := net.ParseIP(ip).To4()
if address == nil {
que.event = eventNxDomain
}
// gen
pre := modifyRspPrefix(que)
rsp = append(rsp, pre...)
if que.event != eventNothing {
return rsp, nil
}
// create a deceptive resp, if no error
dnsAns := &dnsAnswer{
name: que.name,
qType: que.qType,
qClass: que.qClass,
ttl: ttl,
dataLen: uint16(len(address)),
addr: address,
}
ans := dnsAns.getAnswer()
rsp = append(rsp, ans...)
return rsp, nil
}
recordHandle方法的逻辑很清晰,首先尝试从metaManager获取域名所表示的Service。如果域名不符合<service_name>.<service_namespace>.svc.<cluster>.<local>格式,那么使用/etc/resolv.conf中的DNS服务器进行解析;否则,返回一个EdgeMesh管理的fake IP,fake IP默认属于网段9.251.0.0/16,Fake IP更详细的介绍请参阅Fake IP分配章节。
建立TCP连接
经过DNS解析后,客户端得到的是一个9.251.0.0/16网段的IP。EdgeMesh通过iptables规则将这个网段的流量转发到172.17.0.1:40001端口,该端口由EdgeMesh的lister监听。
iptables -t nat -A PREROUTING -d 9.251.0.0/16 -i docker0 -p tcp -j EDGE-MESH
iptables -t nat -A EDGE-MESH -p tcp -j DNAT --to-destination 172.17.0.1:40001
建立TCP连接的请求进入到lister后,目标IP被从请求中提取出来,再交给go-chassis框架进行负载均衡。
// Start starts the EdgeMesh listener
func Start() {
for {
conn, err := config.Config.Listener.Accept()
// ...
ip, port, err := realServerAddress(&conn) // 提取目标IP
// ...
proto, err := newProtocolFromSock(ip, port, conn)
// ...
go proto.Process()
}
}
服务负载均衡
go-chassis提供了一系列接口,其中包括了发现服务实际后端的接口FindMicroServiceInstances。
// FindMicroServiceInstances find micro-service instances (subnets)
func (esd *EdgeServiceDiscovery) FindMicroServiceInstances(consumerID, microServiceName string, tags utiltags.Tags) ([]*registry.MicroServiceInstance, error) {
// parse microServiceName
name, namespace, port, err := parseServiceURL(microServiceName)
...
// get service
service, err := esd.getService(name, namespace)
...
// get pods
pods, err := esd.getPods(name, namespace)
...
// get targetPort
var targetPort int
for _, p := range service.Spec.Ports {
if p.Protocol == "TCP" && int(p.Port) == port {
targetPort = p.TargetPort.IntValue()
break
}
}
// port not found
if targetPort == 0 {
...
}
// gen
var microServiceInstances []*registry.MicroServiceInstance
var hostPort int32
// all pods share the same hostport, get from pods[0]
if pods[0].Spec.HostNetwork {
// host network
hostPort = int32(targetPort)
} else {
// container network
for _, container := range pods[0].Spec.Containers {
for _, port := range container.Ports {
if port.ContainerPort == int32(targetPort) {
hostPort = port.HostPort
}
}
}
}
for _, p := range pods {
if p.Status.Phase == v1.PodRunning {
microServiceInstances = append(microServiceInstances, ®istry.MicroServiceInstance{
InstanceID: "",
ServiceID: name + "." + namespace,
HostName: "",
EndpointsMap: map[string]string{"rest": fmt.Sprintf("%s:%d", p.Status.HostIP, hostPort)},
})
}
}
return microServiceInstances, nil
可以看到,EdgeMesh目前支持HostNetwork和ContainerNetwork两种网络类型,无论哪种方式,都需要在宿主机暴露访问端口。同时,EdgeMesh没有配置后端服务的健康检查,因此无法避免将流量转发的到隔离的网络或者Pod不可用的宿主机上。
TCP连接升级
当服务后端准备就绪后,EdgeMesh是如何知道一个TCP连接应该交给哪个后端处理呢?我们继续分析go-chassis提供的Protocol接口,EdgeMesh实现了一个处理http协议的Protocol接口对象。
// Process handles http protocol
func (p *HTTP) Process() {
defer p.Conn.Close()
for {
// parse http request
req, err := http.ReadRequest(bufio.NewReader(p.Conn))
...
// http: Request.RequestURI can't be set in client requests
// just reset it before transport
req.RequestURI = ""
// create invocation
inv := invocation.New(context.Background())
// set invocation
inv.MicroServiceName = req.Host
inv.SourceServiceID = ""
inv.Protocol = "rest"
inv.Strategy = config.Config.LBStrategy
inv.Args = req
inv.Reply = &http.Response{}
// create handlerchain
c, err := handler.CreateChain(common.Consumer, "http", handler.Loadbalance, handler.Transport)
...
// start to handle
p.req = req
c.Next(inv, p.responseCallback)
}
EdgeMesh首先将TCP连接转换为http连接,这里的实现方法只支持http 1.x版本。然后根据请求的Host构造一个go-chassis所用的invocation,交给go-chassis框架进行负载均衡和透明转发。
所以,EdgeMesh完整的转发流程是这样的:
Fake IP分配
EdgeMesh至关重要的一步,是如何为Service分配Fake IP。EdgeMesh中listener的MsgProcess方法会处理edgehub发送过来的Service事件和Pod事件。
// MsgProcess processes messages from metaManager
func MsgProcess(msg model.Message) {
// process services
if svcs := filterResourceTypeService(msg); len(svcs) != 0 {
...
for i := range svcs {
svcName := svcs[i].Namespace + "." + svcs[i].Name
svcPorts := getSvcPorts(svcs[i], svcName)
switch msg.GetOperation() {
case "insert":
cache.GetMeshCache().Add("service"+"."+svcName, &svcs[i])
...
addServer(svcName, svcPorts)
case "update":
cache.GetMeshCache().Add("service"+"."+svcName, &svcs[i])
...
updateServer(svcName, svcPorts)
case "delete":
cache.GetMeshCache().Remove("service" + "." + svcName)
...
delServer(svcName)
default:
...
}
}
return
}
// process pods
if getResourceType(msg.GetResource()) == model.ResourceTypePodlist {
...
pods := make([]v1.Pod, 0)
content, err := json.Marshal(msg.GetContent())
...
pods, err = handlePodListMessage(content)
...
podListName := getResourceName(msg.GetResource())
podListNamespace := getResourceNamespace(msg.GetResource())
switch msg.GetOperation() {
case "insert", "update":
cache.GetMeshCache().Add("pods"+"."+podListNamespace+"."+podListName, pods)
...
case "delete":
cache.GetMeshCache().Remove("pods" + "." + podListNamespace + "." + podListName)
...
default:
...
}
}
}
getSvcPorts方法会以protocol,svcPort,containerPort|svcName的格式组织服务转发信息。protocol从svc.spec.ports[*].name字段获取,又由于EdgeMesh目前只支持http服务,所以要求ports的name要符合http-数字编号的格式。
然后,就是分配Fake IP。
// addServer adds a server
func addServer(svcName, svcPorts string) {
ip := svcDesc.getIP(svcName)
...
if len(unused) == 0 {
// try to expand
expandPool()
...
}
ip = unused[0]
unused = unused[1:]
svcDesc.set(svcName, ip, svcPorts)
err := metaClient.Listener().Add(svcName, ip)
...
}
IP不足时,expandPool方法尝试创建更多IP。
// expandPool expands fakeIP pool, each time with size of 256
func expandPool() {
end := indexOfPool + uint16(255)
for ; indexOfPool <= end; indexOfPool++ {
// avoid 255.255
if indexOfPool > maxPoolSize {
return
}
ip := defaultNetworkPrefix + getSubNet(indexOfPool)
// if ip is not used, append it to unused
if svcDesc.getSvcPorts(ip) == "" {
unused = append(unused, ip)
}
}
}
EdgeMesh会将DNS记录存入SQLite数据库,以便重启后恢复。因此对于新生成的IP,需要检查是否正在使用。由于Fake IP默认所属网段是9.251.0.0/16,最多能分配的IP为65534个。同时,EdgeMesh没有保证多个节点对同一个服务分配相同的Fake IP,因为dns解析和负载均衡都是在服务请求方节点的EdgeMesh完成的,不需要这种保证。
-End-