gRPC客户端负载均衡

gRPC 中的负载均衡包括服务端负载均衡和客户端负载均衡,本文将介绍客户端负载均衡,gRPC中的客户端负载均衡主要有2个部分:1) Name Resolver 2) Load Balancing Policy 接下来将依次介绍。 Name Resolver gRPC 中的默认 name-system 是 DNS , 同时各种客户端还提供了插件以使用自定义 name-system。gRPC Name Resolver 会根据 name-system 进行对应的解析,将用户提供的名称转换为对应的地址。 Load Balancing Policy gRPC 中内置了多种负载均衡策略,本文将介绍常见的几种负载均衡策略:1) pick_first 2) round_robin pick_first pick_first 是默认的负载均衡策略,该策略从 Name Resolver 获得到服务器的地址列表,按顺序依次对每个服务器地址进行连接,直到连接成功,如果某个地址连接成功则所有的RPC请求都会发送到这个服务器地址。 round_robin round_robin 策略,该策略从 Name Resolver 获得到服务器的地址列表,依次将请求发送到每一个地址,例如第一个请求将发送到 backend1 ,第二个请求将发送到 backend2 。 接下来分别使用这两种策略进行测试。 例子 我们先创建服务端,循环创建了3个服务端,分别使用30051、30052、30053端口。 package main import ( "context" "fmt" "log" "net" "sync" "google.golang.org/grpc" pb "github.com/overstarry/grpc-example/proto/echo" ) var ( addrs = []string{":30051", ":30052",":30053"} ) type ecServer struct { pb.UnimplementedEchoServer addr string } func (s *ecServer) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) { return &pb.EchoResponse{Message: fmt.Sprintf("%s (from %s)", req.Message, s.addr)}, nil } func startServer(addr string) { lis, err := net.Listen("tcp", addr) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterEchoServer(s, &ecServer{addr: addr}) log.Printf("serving on %s\n", addr) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } } func main() { var wg sync.WaitGroup for _, addr := range addrs { wg.Add(1) go func(addr string) { defer wg.Done() startServer(addr) }(addr) } wg.Wait() } 接下来创建对应的客户端连接: ...

三月 30, 2024 · overstarry

gRPC请求重试

前面的文章介绍了 gRPC 相关的功能,今天继续介绍 gRPC 的功能,本文将介绍 gRPC 的重试功能。 介绍 请求的重试是一个常见的功能,在我们日常的使用中,如果需要重试请求往往需要使用外部包进行实现,在gRPC 中内置了重试了功能,不需要我们自己实现。 通过查阅 gRPC 的文档可以看到,gRPC 会根据开发者设定的策略进行失败RPC的重试,有两种策略 1)重试策略:重试失败的RPC请求 2) hedging 策略:并行发生相同RPC请求。单个RPC请求可以选择两种重试策略中的一种,不能同时选择多种策略。 重试策略有以下参数可以使用: maxAttempts: 必填 RPC 最大请求次数,包括原始请求 initialBackoff, maxBackoff, backoffMultiplier: 必填 决定下次重试前的延迟时间 random(0, min(initialBackoff*backoffMultiplier**(n-1), maxBackoff)) retryableStatusCodes: 必填 收到服务器非正常状态码时,根据 retryableStatusCodes 中的状态码列表决定是否重试请求 hedging 策略可以主动发送单个请求的多个副本,而无需等待响应。需要注意的是,此策略可能会导致后端多次执行,因此最好仅对可以多次执行不会有不利影响的请求开启此策略。有如下参数: maxAttempts 必填 hedgingDelay 可选 nonFatalStatusCodes 可选 一个请求在没有收到成功响应时,经过 hedgingDelay没收到响应 将继续发送请求,直至达到 maxAttempts 最大次数或请求成功。当收到成功响应时,所有未完成的其它请求将停止。本质上hedging 策略可以看作在收到失败响应前重试请求。 使用 接下来讲解如何在 gRPC go语言版本中配置使用重试功能。 服务端 服务端创建一个服务,只有当请求次数达到第三次时,才返回成功响应。 package main import ( "context" "flag" "fmt" "log" "net" "sync" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" pb "github.com/overstarry/grpc-example/proto/echo" ) var port = flag.Int("port", 9000, "port number") type failingServer struct { pb.UnimplementedEchoServer mu sync.Mutex reqCounter uint reqModulo uint } func (s *failingServer) maybeFailRequest() error { s.mu.Lock() defer s.mu.Unlock() s.reqCounter++ if (s.reqModulo > 0) && (s.reqCounter%s.reqModulo == 0) { return nil } return status.Errorf(codes.Unavailable, "maybeFailRequest: failing it") } func (s *failingServer) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) { if err := s.maybeFailRequest(); err != nil { log.Println("request failed count:", s.reqCounter) return nil, err } log.Println("request succeeded count:", s.reqCounter) return &pb.EchoResponse{Message: req.Message}, nil } func main() { flag.Parse() address := fmt.Sprintf(":%v", *port) lis, err := net.Listen("tcp", address) if err != nil { log.Fatalf("failed to listen: %v", err) } fmt.Println("listen on address", address) s := grpc.NewServer() failingservice := &failingServer{ reqCounter: 0, reqModulo: 3, } pb.RegisterEchoServer(s, failingservice) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } } 客户端 客户端通过 WithDefaultServiceConfig 设置配置好重试功能 ...

三月 23, 2024 · overstarry

apisix 代理 gRPC 服务

最近需要使用 apisix 来代理 gRPC 服务, 本文记录一下 apisix 代理 gRPC 服务以及实践过程中遇到的一些问题。 准备 在接下来的步骤前,我们需要准备一个 gRPC 服务,我们使用 kratos 简单启动一个 gRPC 服务: $ kratos new hellowrold $ cd helloworld $ kratos run 一个简单的 gRPC 服务就启动了,我们先直接请求 gRPC 服务看看,通过 postman 请求接口后,接口顺利返回相应的值。 接下来我们开始本篇的主要内容: apisix 代理服务。 apisix 代理 gRPC 服务 我们使用apisix admin 接口创建 Route: upstream 的 scheme 指定为 grpc 或 grpcs,nodes指定需要代理的服务地址。 curl http://127.0.0.1:9180/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' { "methods": ["POST", "GET"], "uri": "/helloworld.v1.Greeter/SayHello", "upstream": { "scheme": "grpc", "type": "roundrobin", "nodes": { "127.0.0.1:9001": 1 } } }' ...

十月 20, 2023 · overstarry

gRPC中间件

本文我来介绍 gRPC 中的中间件相关知识。 介绍 gRPC 中间件基于前面讲解的拦截器相关概念,它是一组拦截器、辅助、工具的集合,在我们使用 gRPC 开发应用时,往往会使用到各种中间件。它允许在服务端或客户端以拦截器链条形式应用多个中间件。 因为拦截器经常用来实现一些通用的功能 ,如鉴权认证、日志、监控等,所以使用 gRPC 中间件来实现这些可重用功能是十分合适的。下面的代码就分别暂时服务端和客户端使用中间件的例子: import "github.com/grpc-ecosystem/go-grpc-middleware" myServer := grpc.NewServer( grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpc_ctxtags.StreamServerInterceptor(), grpc_opentracing.StreamServerInterceptor(), grpc_prometheus.StreamServerInterceptor, grpc_zap.StreamServerInterceptor(zapLogger), grpc_auth.StreamServerInterceptor(myAuthFunction), grpc_recovery.StreamServerInterceptor(), )), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( grpc_ctxtags.UnaryServerInterceptor(), grpc_opentracing.UnaryServerInterceptor(), grpc_prometheus.UnaryServerInterceptor, grpc_zap.UnaryServerInterceptor(zapLogger), grpc_auth.UnaryServerInterceptor(myAuthFunction), grpc_recovery.UnaryServerInterceptor(), )), ) import "github.com/grpc-ecosystem/go-grpc-middleware" clientConn, err = grpc.Dial( address, grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(monitoringClientUnary, retryUnary)), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(monitoringClientStream, retryStream)), ) 常用的中间件 go-grpc-middleware 项目提供了一些常用的中间件: 认证 grpc_auth - 一个可定制的(通过AuthFunc)身份验证中间件 日志记录 grpc_ctxtags - 将Tag地图添加到上下文的库,数据从请求正文填充 grpc_zap - 将zap日志库集成到 gRPC 处理程序中。 grpc_logrus - 将logrus日志库集成到 gRPC 处理程序中。 grpc_kit - 将go-kit/log日志库集成到 gRPC 处理程序中。 grpc_grpc_logsettablegrpclog.LoggerV2 - 一个允许在运行时替换记录器的包装器(线程安全)。 监控 grpc_prometheus - Prometheus 客户端和服务器端监控中间件 otgrpc - OpenTracing客户端和服务器端拦截器 grpc_opentracing - OpenTracing客户端和服务器端拦截器,支持流式处理和处理程序返回的标签 otelgrpc - OpenTelemetry客户端和服务器端拦截器 客户端中间件 grpc_retry - 一个通用的 gRPC 响应代码重试机制,客户端中间件 ...

九月 18, 2022 · overstarry

gRPC请求抓包

前言 本文来简单介绍如何使用 wireshark 来获取 gRPC 请求。 wireshark 配置 在进行对 gRPC 请求抓包前,得来几个准备。 设置 proto 文件路径 依次打开 编辑 > 首选项 > Protocols > ProtoBuf, 点击如图选项,添加 proto 文件所在的路径。记得勾选右边的 “Load all files” 选项。 设置TCP 消息解码 默认情况下,界面上显示的都是 TCP 数据包。依次点击菜单栏的 分析 -> 解码为… (或者右击随便一行)。 把 9000 (你的 gRPC 服务端端口) 端口的 TCP 消息解码成 HTTP2 协议信息。 开始抓包 现在开始捕获 gRPC 请求消息,为了避免其他无关的流量,在捕获选项设置筛选 tcp port 9000 只获得跟服务端相关的流量。 我们使用 postman 向服务端发送请求。回到 wireshark 界面,我们就可以看到许多流量,通过前面设置的解码,我们可以很方便的获得 gRPC 消息的具体内容。 小结 本文简单介绍了如何使用 wireshark 捕获 gRPC 请求流量。在使用Wireshark抓包时把未识别的HTTP/2协议手动设置为HTTP/2,这样会方便很多。 ...

九月 3, 2022 · overstarry

gRPC服务反射协议

本文主要介绍 gRPC 的服务反射协议和相关的应用。 介绍 gRPC 服务反射协议 (server reflection) 是在 gRPC 服务端定义的一个服务,它能提供该服务器端上可公开使用的 gRPC 服务的信息,简单的来说,就是服务反射向客户端提供了服务端注册的服务的信息。 因此客户端不需要预编译服务定义就能与服务端交互了。 客户端想要与服务端程序进行通信,必须要有所定义的服务信息,需要编译生产客户端存根,借助 gRPC 服务反射协议,我们就可以无需编译服务定义就能通信。 使用 该如何开启服务反射协议呢? 很简单,只需要通过一行代码即可开启: reflection.Register() package main import ( "context" "flag" "fmt" "log" "net" "google.golang.org/grpc" "google.golang.org/grpc/reflection" ecpb "google.golang.org/grpc/examples/features/proto/echo" hwpb "google.golang.org/grpc/examples/helloworld/helloworld" ) var port = flag.Int("port", 50051, "the port to serve on") // hwServer is used to implement helloworld.GreeterServer. type hwServer struct { hwpb.UnimplementedGreeterServer } // SayHello implements helloworld.GreeterServer func (s *hwServer) SayHello(ctx context.Context, in *hwpb.HelloRequest) (*hwpb.HelloReply, error) { return &hwpb.HelloReply{Message: "Hello " + in.Name}, nil } type ecServer struct { ecpb.UnimplementedEchoServer } func (s *ecServer) UnaryEcho(ctx context.Context, req *ecpb.EchoRequest) (*ecpb.EchoResponse, error) { return &ecpb.EchoResponse{Message: req.Message}, nil } func main() { flag.Parse() lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) if err != nil { log.Fatalf("failed to listen: %v", err) } fmt.Printf("server listening at %v\n", lis.Addr()) s := grpc.NewServer() // Register Greeter on the server. hwpb.RegisterGreeterServer(s, &hwServer{}) // Register RouteGuide on the same server. ecpb.RegisterEchoServer(s, &ecServer{}) // Register reflection service on gRPC server. reflection.Register(s) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } } 服务端开启服务反射协议后,就可以通过 gRPC CLI 工具来检查服务端了。这里就不多介绍了,接下来我们来看看服务反射协议在 kratos 中的使用。 ...

八月 27, 2022 · overstarry

gRPC健康探针

简介 gRPC 健康探针 grpc-health-probe 是社区提供的一个工具,用来检查 gRPC 服务的健康状态,此工具 是通过 gRPC 健康检查协议公开服务的状态。 使用 我在本地使用 kratos 创建一个使用 9000 端口的 gRPC 的服务。通过 grpc-health-probe 可以检查服务的健康状态。 grpc-health-probe -addr=localhost:9000 status: SERVING 可以看到此服务目前是健康的,不健康的服务将以非零退出代码退出。 grpc_health_probe -addr=localhost:9000 -connect-timeout 250ms -rpc-timeout 100ms failed to connect service at "localhost:9000": context deadline exceeded exit status 2 grpc_health_probe 发送了一个对 /grpc.health.v1.Health/Check 的RPC 请求。如果已 SERVING 状态作为响应,就会正常成功退出,否则会给出一个非零的退出。 Kubernetes 使用 grpc_health_probe 可用于 Kubernetes对 Pod 中运行的 gRPC 服务器进行健康检查。建议您使用Kubernetes exec探针并为您的 gRPC 服务器 pod 定义活跃度和/或就绪性检查。 您可以将静态编译grpc_health_probe的内容捆绑到您的容器映像中。 RUN GRPC_HEALTH_PROBE_VERSION=v0.3.1 && \ wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \ chmod +x /bin/grpc_health_probe 在您的 Kubernetes Pod 规范清单中,为容器指定一个livenessProbe和/或 :readinessProbe ...

七月 31, 2022 · overstarry

GRPC健康检查

本篇文章我来介绍 gRPC 中的健康检查相关的知识。 简介 服务的健康检测一般是指的是检测服务是否正常运行: 是否存在,因为程序逻辑错误或者 OOM 等进程不存在; 存在是否可以正常的响应请求,尽管进程存在但可能因为请求量过大或者程序逻辑错误,导致服务 hang 住,无法正常对外请求。 gRPC 定义了一个健康检查协议,它允许使用了 gRPC 服务暴露服务器的状态,这样服务的消费者就能获得服务的健康状态。服务器的健康状态是由服务器是否响应非健康状态来确定的。 当服务器还没准备好处理 rpc 请求或者根本没有响应健康探针的请求时,就会发生这种情况。 健康服务定义 gRPC 健康检查协议基于 gRPC 定义了 API。下面就是服务定义: syntax = "proto3"; package grpc.health.v1; message HealthCheckRequest { string service = 1; } message HealthCheckResponse { enum ServingStatus { UNKNOWN = 0; SERVING = 1; NOT_SERVING = 2; } ServingStatus status = 1; } service Health { rpc Check(HealthCheckRequest) returns (HealthCheckResponse); rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse); } 客户端应该调用 Check 服务来判断服务是否正常运行,并且设置 deadline。客户端可以设置需要查询的服务名称,来返回对应的服务是否正常。 服务器应手动注册所有的服务并单个设置状态,包括空服务名称及其状态。对于收到的每一个请求,从注册表中查询服务的状态并返回。如果未找到,返回 NOT_FOUND 状态。 服务器也可以根据实际的业务逻辑提供更为复杂的状态返回。 ...

七月 23, 2022 · overstarry

GRPC单向安全连接

从本篇开始,我将介绍加强 gRPC 的安全性的一系列措施。本篇介绍使用 TLS 加密 gRPC 通信的第一篇文章: gRPC 单向安全连接。 TLS 协议介绍 传输层安全性协议(英语:Transport Layer Security,缩写作TLS),及其前身安全套接层(Secure Sockets Layer,缩写作SSL)是一种安全协议,目的是为互联网通信提供安全及数据完整性保障。网景公司(Netscape)在1994年推出首版网页浏览器,网景导航者时,推出HTTPS协议,以SSL进行加密,这是SSL的起源。IETF将SSL进行标准化,1999年公布第一版TLS标准文件。随后又公布RFC 5246 (2008年8月)与RFC 6176(2011年3月)。在浏览器、邮箱、即时通信、VoIP、网络传真等应用程序中,广泛支持这个协议。主要的网站,如Google、Facebook等也以这个协议来创建安全连线,发送数据。目前已成为互联网上保密通信的工业标准。 SSL包含记录层(Record Layer)和传输层,记录层协议确定传输层数据的封装格式。传输层安全协议使用X.509认证,之后利用非对称加密演算来对通信方做身份认证,之后交换对称密钥作为会谈密钥(Session key)。这个会谈密钥是用来将通信两方交换的数据做加密,保证两个应用间通信的保密性和可靠性,使客户与服务器应用之间的通信不被攻击者窃听。 单向安全连接 通过安全的连接进行传输数据非常重要,那么如何在 gRPC 中使用 TLS 保护 gRPC 通信呢? TLS 认证机制集成在了 gRPC 库中,这使得 gRPC 可以很方便使用 TLS 进行安全连接。 客户端和服务端之间的安全传输可以采用单向或双向的方式来实现。本文主要介绍 单向安全连接。 在单向安全连接中,只有客户端会校验服务端,以确保它所接收的数据来自预期的服务器,在建立连接时,服务端会与客户端共享其公开证书,客户端会校验收到的证书。这是通过证书授权中心完成的。 证书校验完成后,客户端会使用密钥加密数据。 要启用 TLS ,需要证书和密钥(xx.key,xx.pem/xx.crt),前者是用于签名和扔着公钥,后者用于分发自签名 X.509 公钥。证书和密钥的生成这里就不过多介绍了,需要的可以自行了解。 在 gRPC 服务端启用单向安全连接 在 gRPC 服务端启用单向安全连接的主要流程如下: 1 读取和解析公钥-私钥,创建启用 TLS 的证书 2 添加证书作为 TLS 服务凭证,为所有连接启用 TLS. 3 通过 TLS 凭证创建新的 gRPC 连接 接下来的流程跟普通的流程差不多,就不多介绍了,直接上代码: package main import ( "context" "crypto/tls" "errors" wrapper "github.com/golang/protobuf/ptypes/wrappers" "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/credentials" pb "grpc-demo/proto" "log" "net" ) const ( port = ":50051" crtFile = "./server.crt" keyFile = "./server.key" ) type server struct { pb.UnimplementedProductInfoServer productMap map[string]*pb.Product } func (s *server) AddProduct(ctx context.Context, in *pb.Product) (*wrapper.StringValue, error) { out, err := uuid.NewUUID() if err != nil { log.Fatal(err) } in.Id = out.String() if s.productMap == nil { s.productMap = make(map[string]*pb.Product) } s.productMap[in.Id] = in return &wrapper.StringValue{Value: in.Id}, nil } func (s *server) GetProduct(ctx context.Context, in *wrapper.StringValue) (*pb.Product, error) { value, exists := s.productMap[in.Value] if exists { return value, nil } return nil, errors.New("Product does not exist for the ID" + in.Value) } func main() { cert, err := tls.LoadX509KeyPair(crtFile, keyFile) if err != nil { log.Fatalf("failed to load key pair: %s", err) } opts := []grpc.ServerOption{ grpc.Creds(credentials.NewServerTLSFromCert(&cert)), } s := grpc.NewServer(opts...) pb.RegisterProductInfoServer(s, &server{}) lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } } 在客户端启用单向安全连接 为了与服务器连接,客户都需要服务端的自认证公钥。具体流程如下: ...

二月 12, 2022 · overstarry

GRPC多路复用

今天来讲一讲gRPC的多路复用,gRPC 的多路复用是指 一个 gRPC 服务器端可以运行多个 gRPC 服务,也允许多个客户端存根使用同一个 gRPC 客户端连接。 我们继续沿用前面的代码来讲解如何使用多路复用。 多个 gRPC 服务共享同一个服务器端 假如在订单服务中, 为了管理的需求,需要在同一个服务端运行另一个 RPC 服务,这样客户端就能重用同一个连接,这样就可以按需调用相应的服务。 通过相应的服务器端注册函数,可以使多个服务注册在同一个服务器端。 具体代码如下: func main() { initSampleData() lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to isten: %v", err) } s := grpc.NewServer(grpc.UnaryInterceptor(orderUnaryServerInterceptor)) ordermgt_pb.RegisterOrderManagementServer(s, &orderMgtServer{}) user_pb.RegisterUserManagementServer(s, &userMgtServer{}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } } 这样就是多个 gRPC 服务共享同一个服务端连接,这样就可以按需调用相应的服务。同理,通过客户端,可以在两个 gRPC 客户端存根间共享相同的 gRPC 客户端连接。 两个 gRPC 客户端存根共享同一个客户端连接 如代码所示,两个 gRPC 服务在同一个 gRPC 服务器端运行,所以可以创建一个 gRPC 连接,并在两个服务创建 gRPC 客户端实例时使用这个连接。 func main() { conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() client := pb.NewOrderManagementClient(conn) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() heclien := hwpb.NewGreetingClient(conn) } 小结 在我们的日常使用中,通常不会出现两个服务间共享同一个服务器端连接的情况。 ...

一月 9, 2022 · overstarry