资讯 小学 初中 高中 语言 会计职称 学历提升 法考 计算机考试 医护考试 建工考试 教育百科
栏目分类:
子分类:
返回
空麓网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
空麓网 > 计算机考试 > 软件开发 > 后端开发 > Java

GRPC 程序在 Kubernetes 中的负载均衡

Java 更新时间: 发布时间: 计算机考试归档 最新发布

GRPC 程序在 Kubernetes 中的负载均衡

本文的背景使用的是 kratos 框架。

背景

众所周知 grpc 底层使用 http2 协议,而 http2 是一个长链接多路复用的。在正常情况下客服端与服务端一对一不会需要负载均衡手段;但是当服务上云之后为了保障服务的可用性所以我们服务端一般是多副本,这种情况下客户端通过服务端的 service 名称与服务端建立连接而 service 会将流量轮训到后端的某一个 pod 这样就会造成客户端只会与某一个 pod 建立连接并且所有的请求流量都会到一个 pod 上面,其他任意数量的 pod 都是摆设。

解决方法

在讲述解决办法之前先知道以下一些概念

  • Headless Service
  • GRPC Name Resolver

Headless Service

首先 headless svc 是 k8s 一种服务,但它不为自己分配 IP 地址。相反,会返回与 svc 关联的 pod 的 ip,允许客户端直连到单个 pod(客户端和 pod 之间直接通信)。

和正常/普通的 svc 的却别就是普通的 svc 会分配一个 vip 地址,然后流量会通过这个 vip 轮训到后端的一个 pod;headless svc 不会分配任何 ip 地址并且返回 headless svc 会返回 pods ip。

GRPC Name Resolver

grpc 一些基础概念

  • Resolver:解析器,用于从注册中心实时获取当前服务端的列表,同步发送给 Balancer
  • Balancer:平衡器,一是接收从 Resolver 发送的服务端列表,建立并维护(长)连接状态;二是每次当 client 发起 rpc 调用时,按照算法从连接池中选择一个连接进行 rpc 调用
  • Register:注册,用于服务端初始化将自己信息上报到注册中心,主要信息有 ip、port

本文没有注册中心,使用 k8s api

简单的来说 Name Resolver(名称解析器)可以看作是一个 map[service-name][]backend-ip。它接收一个服务名称,并返回后端的 IP 列表。gRPC 中根据目标字符串中的 scheme 选择名称解析器。

解决方法的本质就是通过 headless svc + Name Resolver 自定一个解析器。

使用方法

通过 kuberesolver 程序实现;kuberesolver 是一个使用 k8s api 的 GRPC Name Resolver。

  • 创建 headless svc
  • 改造代码
apiVersion: v1kind: Servicemetadata:  name: headless  namespace: oasis-devspec:  ports:  - name: http    port: 8000    protocol: TCP    targetPort: 8000  - name: grpc    port: 9000    protocol: TCP    targetPort: 9000  selector:    app: oasis-dev-prizesinfrastructure  type: ClusterIP

改造源码

import "github.com/sercand/kuberesolver/v4"
kuberesolver.RegisterInCluster()ep := fmt.Sprintf("kubernetes:///%s.%s:%d", service, namespace, port)conn, err := transgrpc.DialInsecure(	context.Background(),	transgrpc.WithEndpoint(ep),	transgrpc.WithTimeout(60*time.Second),	transgrpc.WithMiddleware(		recovery.Recovery(),		tracing.Client(),	),)

使用缺点

  • 使用 kuberesolver 只能在集群内运行
  • 使用 kuberesolver 部署需要一个 service account

原理

上文讲到 kuberesolver 内部自定义了一个名称解析器;其中 kubeBuilder 和 kResolver 结构体分别实现了 grpc 的 Builder 接口和 Resolver 接口。

注册解析器

// RegisterInCluster registers the kuberesolver builder to grpc with kubernetes schema// kubernetesSchema = kubernetesfunc RegisterInCluster() {	RegisterInClusterWithSchema(kubernetesSchema)}// RegisterInClusterWithSchema registers the kuberesolver builder to the grpc with custom schemafunc RegisterInClusterWithSchema(schema string) {	resolver.Register(NewBuilder(nil, schema))}// NewBuilder creates a kubeBuilder which is used by grpc resolver.func NewBuilder(client K8sClient, schema string) resolver.Builder {	return &kubeBuilder{		k8sClient: client,		schema:    schema,	}}

grpc.DialInsecure 或者 grpc.Dial 函数中会先根据 kubernetes 这个 scheme 找到我们通过 RegisterInCluster 函数注册的 kubeBuilder,然后调用它的 Build() 方法构建我们自定义的 kResolver。kResolver 的 watch 方法一直以协程的方式监听我们指定的 headless svc 对应的 endpoints(如果 pod 上面或者销毁会被程序立即感知到)。

func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {	if b.k8sClient == nil {		if cl, err := NewInClusterK8sClient(); err == nil {			b.k8sClient = cl		} else {			return nil, err		}	}	ti, err := parseResolverTarget(target)	if err != nil {		return nil, err	}	if ti.serviceNamespace == "" {		ti.serviceNamespace = getCurrentNamespaceOrDefault()	}	ctx, cancel := context.WithCancel(context.Background())	r := &kResolver{		target:    ti,		ctx:       ctx,		cancel:    cancel,		cc:        cc,		rn:        make(chan struct{}, 1),		k8sClient: b.k8sClient,		t:         time.NewTimer(defaultFreq),		freq:      defaultFreq,		endpoints: endpointsForTarget.WithLabelValues(ti.String()),		addresses: addressesForTarget.WithLabelValues(ti.String()),	}	go until(func() {		r.wg.Add(1)		err := r.watch()		if err != nil && err != io.EOF {			grpclog.Errorf("kuberesolver: watching ended with error='%v', will reconnect again", err)		}	}, time.Second, ctx.Done())	return r, nil}

watch 中一共有三个地方会触发 grpc address 的更新分别如下

  • k.t.C:定时器触发 30 min 一次
  • k.rn:ResolveNow 触发
  • up, hasMore := <-sw.ResultChan():streamWatcher 有新消息(事件)通知时,调用 k.handle(up.Object) 处理事件,即监听的资源发生了改变(创建/销毁/更新)
func (k *kResolver) watch() error {	defer k.wg.Done()	// watch endpoints lists existing endpoints at start	sw, err := watchEndpoints(k.ctx, k.k8sClient, k.target.serviceNamespace, k.target.serviceName)	if err != nil {		return err	}	for {		select {		case <-k.ctx.Done():			return nil		case <-k.t.C:			k.resolve()		case <-k.rn:			k.resolve()		case up, hasMore := <-sw.ResultChan():			if hasMore {				k.handle(up.Object)			} else {				return nil			}		}	}}
func (k *kResolver) ResolveNow(resolver.ResolveNowOptions) {	select {	case k.rn <- struct{}{}:	default:	}}

以上三种途径最后都会通过 k.handle() 处理。

func (k *kResolver) resolve() {	e, err := getEndpoints(k.k8sClient, k.target.serviceNamespace, k.target.serviceName)	if err == nil {		k.handle(e)	} else {		grpclog.Errorf("kuberesolver: lookup endpoints failed: %v", err)	}	// Next lookup should happen after an interval defined by k.freq.	k.t.Reset(k.freq)}

k.makeAddresses 会循环 endpoints 的 subsets 返回一个 []resolver.Address 用于解析器调用 NewAddress 来通知 ClientConn 一个新的解析地址列表(即服务列表更新通知接口)

func (k *kResolver) handle(e Endpoints) {	result, _ := k.makeAddresses(e)	//	k.cc.NewServiceConfig(sc)	if len(result) > 0 {		k.cc.NewAddress(result)	}	k.endpoints.Set(float64(len(e.Subsets)))	k.addresses.Set(float64(len(result)))}
转载请注明:文章转载自 http://www.konglu.com/
本文地址:http://www.konglu.com/it/1097753.html
免责声明:

我们致力于保护作者版权,注重分享,被刊用文章【GRPC 程序在 Kubernetes 中的负载均衡】因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理,本文部分文字与图片资源来自于网络,转载此文是出于传递更多信息之目的,若有来源标注错误或侵犯了您的合法权益,请立即通知我们,情况属实,我们会第一时间予以删除,并同时向您表示歉意,谢谢!

我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2023 成都空麓科技有限公司

ICP备案号:蜀ICP备2023000828号-2