gRPC 客户端负载均衡

gRPC 中的负载均衡包括服务端负载均衡和客户端负载均衡,本文将介绍客户端负载均衡,gRPC 中的客户端负载均衡主要有 2 个部分:1) Name Resolver 2) Load Balancing Policy 接下来将依次介绍。 Name Resolver gRPC 中的默认 name-system 是 DNS , 同时各种客户端还提供了插件以使用自定义 name-system。gRPC Name Resolver 会根据 name-system 进行对应的解析,将用户提供的名称转换为对应的地址。 Load Balancing Policy gRPC 中内置了多种负载均衡策略,本文将介绍常见的几种负载均衡策略:1) pick_first 2) round_robin pick_first pick_first 是默认的负载均衡策略,该策略从 Name Resolver 获得到服务器的地址列表,按顺序依次对每个服务器地址进行连接,直到连接成功,如果某个地址连接成功则所有的 RPC 请求都会发送到这个服务器地址。 round_robin round_robin 策略,该策略从 Name Resolver 获得到服务器的地址列表,依次将请求发送到每一个地址,例如第一个请求将发送到 backend1 ,第二个请求将发送到 backend2。 接下来分别使用这两种策略进行测试。 例子 我们先创建服务端,循环创建了 3 个服务端,分别使用 30051、30052、30053 端口。 package main import ( "context" "fmt" "log" "net" "sync" "google.golang.org/grpc" pb "github.com/overstarry/grpc-example/proto/echo" ) var ( addrs = []string{":30051", ":30052",":30053"} ) type ecServer struct { pb.UnimplementedEchoServer addr string } func (s *ecServer) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) { return &pb.EchoResponse{Message: fmt.Sprintf("%s (from %s)", req.Message, s.addr)}, nil } func startServer(addr string) { lis, err := net.Listen("tcp", addr) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterEchoServer(s, &ecServer{addr: addr}) log.Printf("serving on %s\n", addr) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } } func main() { var wg sync.WaitGroup for _, addr := range addrs { wg.Add(1) go func(addr string) { defer wg.Done() startServer(addr) }(addr) } wg.Wait() } 接下来创建对应的客户端连接: ...

三月 30, 2024 · overstarry

gRPC 请求重试

前面的文章介绍了 gRPC 相关的功能,今天继续介绍 gRPC 的功能,本文将介绍 gRPC 的重试功能。 介绍 请求的重试是一个常见的功能,在我们日常的使用中,如果需要重试请求往往需要使用外部包进行实现,在 gRPC 中内置了重试了功能,不需要我们自己实现。 通过查阅 gRPC 的文档可以看到,gRPC 会根据开发者设定的策略进行失败 RPC 的重试,有两种策略 1) 重试策略:重试失败的 RPC 请求 2) hedging 策略:并行发生相同 RPC 请求。单个 RPC 请求可以选择两种重试策略中的一种,不能同时选择多种策略。 重试策略有以下参数可以使用: maxAttempts: 必填 RPC 最大请求次数,包括原始请求 initialBackoff, maxBackoff, backoffMultiplier: 必填 决定下次重试前的延迟时间 random(0, min(initialBackoff*backoffMultiplier**(n-1), maxBackoff)) retryableStatusCodes: 必填 收到服务器非正常状态码时,根据 retryableStatusCodes 中的状态码列表决定是否重试请求 hedging 策略可以主动发送单个请求的多个副本,而无需等待响应。需要注意的是,此策略可能会导致后端多次执行,因此最好仅对可以多次执行不会有不利影响的请求开启此策略。有如下参数: maxAttempts 必填 hedgingDelay 可选 nonFatalStatusCodes 可选 一个请求在没有收到成功响应时,经过 hedgingDelay 没收到响应 将继续发送请求,直至达到 maxAttempts 最大次数或请求成功。当收到成功响应时,所有未完成的其它请求将停止。本质上 hedging 策略可以看作在收到失败响应前重试请求。 使用 接下来讲解如何在 gRPC go 语言版本中配置使用重试功能。 服务端 服务端创建一个服务,只有当请求次数达到第三次时,才返回成功响应。 package main import ( "context" "flag" "fmt" "log" "net" "sync" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" pb "github.com/overstarry/grpc-example/proto/echo" ) var port = flag.Int("port", 9000, "port number") type failingServer struct { pb.UnimplementedEchoServer mu sync.Mutex reqCounter uint reqModulo uint } func (s *failingServer) maybeFailRequest() error { s.mu.Lock() defer s.mu.Unlock() s.reqCounter++ if (s.reqModulo > 0) && (s.reqCounter%s.reqModulo == 0) { return nil } return status.Errorf(codes.Unavailable, "maybeFailRequest: failing it") } func (s *failingServer) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) { if err := s.maybeFailRequest(); err != nil { log.Println("request failed count:", s.reqCounter) return nil, err } log.Println("request succeeded count:", s.reqCounter) return &pb.EchoResponse{Message: req.Message}, nil } func main() { flag.Parse() address := fmt.Sprintf(":%v", *port) lis, err := net.Listen("tcp", address) if err != nil { log.Fatalf("failed to listen: %v", err) } fmt.Println("listen on address", address) s := grpc.NewServer() failingservice := &failingServer{ reqCounter: 0, reqModulo: 3, } pb.RegisterEchoServer(s, failingservice) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } } 客户端 客户端通过 WithDefaultServiceConfig 设置配置好重试功能 ...

三月 23, 2024 · overstarry

apisix 代理 gRPC 服务

最近需要使用 apisix 来代理 gRPC 服务,本文记录一下 apisix 代理 gRPC 服务以及实践过程中遇到的一些问题。 准备 在接下来的步骤前,我们需要准备一个 gRPC 服务,我们使用 kratos 简单启动一个 gRPC 服务: $ kratos new hellowrold $ cd helloworld $ kratos run 一个简单的 gRPC 服务就启动了,我们先直接请求 gRPC 服务看看,通过 postman 请求接口后,接口顺利返回相应的值。 接下来我们开始本篇的主要内容:apisix 代理服务。 apisix 代理 gRPC 服务 我们使用 apisix admin 接口创建 Route: upstream 的 scheme 指定为 grpc 或 grpcs,nodes 指定需要代理的服务地址。 curl http://127.0.0.1:9180/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' { "methods": ["POST", "GET"], "uri": "/helloworld.v1.Greeter/SayHello", "upstream": { "scheme": "grpc", "type": "roundrobin", "nodes": { "127.0.0.1:9001": 1 } } }' ...

十月 20, 2023 · overstarry

gRPC 中间件

本文我来介绍 gRPC 中的中间件相关知识。 介绍 gRPC 中间件基于前面讲解的拦截器相关概念,它是一组拦截器、辅助、工具的集合,在我们使用 gRPC 开发应用时,往往会使用到各种中间件。它允许在服务端或客户端以拦截器链条形式应用多个中间件。因为拦截器经常用来实现一些通用的功能 ,如鉴权认证、日志、监控等,所以使用 gRPC 中间件来实现这些可重用功能是十分合适的。下面的代码就分别暂时服务端和客户端使用中间件的例子: import "github.com/grpc-ecosystem/go-grpc-middleware" myServer := grpc.NewServer( grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpc_ctxtags.StreamServerInterceptor(), grpc_opentracing.StreamServerInterceptor(), grpc_prometheus.StreamServerInterceptor, grpc_zap.StreamServerInterceptor(zapLogger), grpc_auth.StreamServerInterceptor(myAuthFunction), grpc_recovery.StreamServerInterceptor(), )), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( grpc_ctxtags.UnaryServerInterceptor(), grpc_opentracing.UnaryServerInterceptor(), grpc_prometheus.UnaryServerInterceptor, grpc_zap.UnaryServerInterceptor(zapLogger), grpc_auth.UnaryServerInterceptor(myAuthFunction), grpc_recovery.UnaryServerInterceptor(), )), ) import "github.com/grpc-ecosystem/go-grpc-middleware" clientConn, err = grpc.Dial( address, grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(monitoringClientUnary, retryUnary)), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(monitoringClientStream, retryStream)), ) 常用的中间件 go-grpc-middleware 项目提供了一些常用的中间件: 认证 grpc_auth - 一个可定制的(通过 AuthFunc)身份验证中间件 日志记录 grpc_ctxtags - 将 Tag 地图添加到上下文的库,数据从请求正文填充 grpc_zap - 将 zap 日志库集成到 gRPC 处理程序中。 grpc_logrus - 将 logrus 日志库集成到 gRPC 处理程序中。 grpc_kit - 将 go-kit/log 日志库集成到 gRPC 处理程序中。 grpc_grpc_logsettablegrpclog.LoggerV2 - 一个允许在运行时替换记录器的包装器(线程安全)。 监控 grpc_prometheus - Prometheus 客户端和服务器端监控中间件 otgrpc - OpenTracing 客户端和服务器端拦截器 grpc_opentracing - OpenTracing 客户端和服务器端拦截器,支持流式处理和处理程序返回的标签 otelgrpc - OpenTelemetry 客户端和服务器端拦截器 客户端中间件 grpc_retry - 一个通用的 gRPC 响应代码重试机制,客户端中间件 ...

九月 18, 2022 · overstarry

gRPC 请求抓包

前言 本文来简单介绍如何使用 wireshark 来获取 gRPC 请求。 wireshark 配置 在进行对 gRPC 请求抓包前,得来几个准备。 设置 proto 文件路径 依次打开 编辑 > 首选项 > Protocols > ProtoBuf, 点击如图选项,添加 proto 文件所在的路径。记得勾选右边的 “Load all files” 选项。 设置 TCP 消息解码 默认情况下,界面上显示的都是 TCP 数据包。依次点击菜单栏的 分析 -> 解码为… (或者右击随便一行)。 把 9000 (你的 gRPC 服务端端口) 端口的 TCP 消息解码成 HTTP2 协议信息。 开始抓包 现在开始捕获 gRPC 请求消息,为了避免其他无关的流量,在捕获选项设置筛选 tcp port 9000 只获得跟服务端相关的流量。 我们使用 postman 向服务端发送请求。回到 wireshark 界面,我们就可以看到许多流量,通过前面设置的解码,我们可以很方便的获得 gRPC 消息的具体内容。 小结 本文简单介绍了如何使用 wireshark 捕获 gRPC 请求流量。在使用 Wireshark 抓包时把未识别的 HTTP/2 协议手动设置为 HTTP/2,这样会方便很多。 ...

九月 3, 2022 · overstarry

gRPC 服务反射协议

本文主要介绍 gRPC 的服务反射协议和相关的应用。 介绍 gRPC 服务反射协议 (server reflection) 是在 gRPC 服务端定义的一个服务,它能提供该服务器端上可公开使用的 gRPC 服务的信息,简单的来说,就是服务反射向客户端提供了服务端注册的服务的信息。因此客户端不需要预编译服务定义就能与服务端交互了。 客户端想要与服务端程序进行通信,必须要有所定义的服务信息,需要编译生产客户端存根,借助 gRPC 服务反射协议,我们就可以无需编译服务定义就能通信。 使用 该如何开启服务反射协议呢?很简单,只需要通过一行代码即可开启:reflection.Register() package main import ( "context" "flag" "fmt" "log" "net" "google.golang.org/grpc" "google.golang.org/grpc/reflection" ecpb "google.golang.org/grpc/examples/features/proto/echo" hwpb "google.golang.org/grpc/examples/helloworld/helloworld" ) var port = flag.Int("port", 50051, "the port to serve on") // hwServer is used to implement helloworld.GreeterServer. type hwServer struct { hwpb.UnimplementedGreeterServer } // SayHello implements helloworld.GreeterServer func (s *hwServer) SayHello(ctx context.Context, in *hwpb.HelloRequest) (*hwpb.HelloReply, error) { return &hwpb.HelloReply{Message: "Hello " + in.Name}, nil } type ecServer struct { ecpb.UnimplementedEchoServer } func (s *ecServer) UnaryEcho(ctx context.Context, req *ecpb.EchoRequest) (*ecpb.EchoResponse, error) { return &ecpb.EchoResponse{Message: req.Message}, nil } func main() { flag.Parse() lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) if err != nil { log.Fatalf("failed to listen: %v", err) } fmt.Printf("server listening at %v\n", lis.Addr()) s := grpc.NewServer() // Register Greeter on the server. hwpb.RegisterGreeterServer(s, &hwServer{}) // Register RouteGuide on the same server. ecpb.RegisterEchoServer(s, &ecServer{}) // Register reflection service on gRPC server. reflection.Register(s) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } } 服务端开启服务反射协议后,就可以通过 gRPC CLI 工具来检查服务端了。这里就不多介绍了,接下来我们来看看服务反射协议在 kratos 中的使用。 ...

八月 27, 2022 · overstarry

gRPC 健康探针

简介 gRPC 健康探针 grpc-health-probe 是社区提供的一个工具,用来检查 gRPC 服务的健康状态,此工具 是通过 gRPC 健康检查协议公开服务的状态。 使用 我在本地使用 kratos 创建一个使用 9000 端口的 gRPC 的服务。通过 grpc-health-probe 可以检查服务的健康状态。 grpc-health-probe -addr=localhost:9000 status: SERVING 可以看到此服务目前是健康的,不健康的服务将以非零退出代码退出。 grpc_health_probe -addr=localhost:9000 -connect-timeout 250ms -rpc-timeout 100ms failed to connect service at "localhost:9000": context deadline exceeded exit status 2 grpc_health_probe 发送了一个对 /grpc.health.v1.Health/Check 的 RPC 请求。如果已 SERVING 状态作为响应,就会正常成功退出,否则会给出一个非零的退出。 Kubernetes 使用 grpc_health_probe 可用于 Kubernetes 对 Pod 中运行的 gRPC 服务器进行健康检查。建议您使用 Kubernetes exec 探针并为您的 gRPC 服务器 pod 定义活跃度和/或就绪性检查。 您可以将静态编译 grpc_health_probe 的内容捆绑到您的容器映像中。 ...

七月 31, 2022 · overstarry

GRPC 健康检查

本篇文章我来介绍 gRPC 中的健康检查相关的知识。 简介 服务的健康检测一般是指的是检测服务是否正常运行: 是否存在,因为程序逻辑错误或者 OOM 等进程不存在; 存在是否可以正常的响应请求,尽管进程存在但可能因为请求量过大或者程序逻辑错误,导致服务 hang 住,无法正常对外请求。 gRPC 定义了一个健康检查协议,它允许使用了 gRPC 服务暴露服务器的状态,这样服务的消费者就能获得服务的健康状态。服务器的健康状态是由服务器是否响应非健康状态来确定的。 当服务器还没准备好处理 rpc 请求或者根本没有响应健康探针的请求时,就会发生这种情况。 健康服务定义 gRPC 健康检查协议基于 gRPC 定义了 API。下面就是服务定义: syntax = "proto3"; package grpc.health.v1; message HealthCheckRequest { string service = 1; } message HealthCheckResponse { enum ServingStatus { UNKNOWN = 0; SERVING = 1; NOT_SERVING = 2; } ServingStatus status = 1; } service Health { rpc Check(HealthCheckRequest) returns (HealthCheckResponse); rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse); } 客户端应该调用 Check 服务来判断服务是否正常运行,并且设置 deadline。客户端可以设置需要查询的服务名称,来返回对应的服务是否正常。 服务器应手动注册所有的服务并单个设置状态,包括空服务名称及其状态。对于收到的每一个请求,从注册表中查询服务的状态并返回。如果未找到,返回 NOT_FOUND 状态。服务器也可以根据实际的业务逻辑提供更为复杂的状态返回。 ...

七月 23, 2022 · overstarry

GRPC 单向安全连接

从本篇开始,我将介绍加强 gRPC 的安全性的一系列措施。本篇介绍使用 TLS 加密 gRPC 通信的第一篇文章:gRPC 单向安全连接。 TLS 协议介绍 传输层安全性协议(英语:Transport Layer Security,缩写作 TLS),及其前身安全套接层(Secure Sockets Layer,缩写作 SSL)是一种安全协议,目的是为互联网通信提供安全及数据完整性保障。网景公司(Netscape)在 1994 年推出首版网页浏览器,网景导航者时,推出 HTTPS 协议,以 SSL 进行加密,这是 SSL 的起源。IETF 将 SSL 进行标准化,1999 年公布第一版 TLS 标准文件。随后又公布 RFC 5246(2008 年 8 月)与 RFC 6176(2011 年 3 月)。在浏览器、邮箱、即时通信、VoIP、网络传真等应用程序中,广泛支持这个协议。主要的网站,如 Google、Facebook 等也以这个协议来创建安全连线,发送数据。目前已成为互联网上保密通信的工业标准。 SSL 包含记录层(Record Layer)和传输层,记录层协议确定传输层数据的封装格式。传输层安全协议使用 X.509 认证,之后利用非对称加密演算来对通信方做身份认证,之后交换对称密钥作为会谈密钥(Session key)。这个会谈密钥是用来将通信两方交换的数据做加密,保证两个应用间通信的保密性和可靠性,使客户与服务器应用之间的通信不被攻击者窃听。 单向安全连接 通过安全的连接进行传输数据非常重要,那么如何在 gRPC 中使用 TLS 保护 gRPC 通信呢?TLS 认证机制集成在了 gRPC 库中,这使得 gRPC 可以很方便使用 TLS 进行安全连接。 客户端和服务端之间的安全传输可以采用单向或双向的方式来实现。本文主要介绍 单向安全连接。 在单向安全连接中,只有客户端会校验服务端,以确保它所接收的数据来自预期的服务器,在建立连接时,服务端会与客户端共享其公开证书,客户端会校验收到的证书。这是通过证书授权中心完成的。证书校验完成后,客户端会使用密钥加密数据。 要启用 TLS,需要证书和密钥 (xx.key,xx.pem/xx.crt),前者是用于签名和扔着公钥,后者用于分发自签名 X.509 公钥。证书和密钥的生成这里就不过多介绍了,需要的可以自行了解。 ...

二月 12, 2022 · overstarry

GRPC 多路复用

今天来讲一讲 gRPC 的多路复用,gRPC 的多路复用是指 一个 gRPC 服务器端可以运行多个 gRPC 服务,也允许多个客户端存根使用同一个 gRPC 客户端连接。 我们继续沿用前面的代码来讲解如何使用多路复用。 多个 gRPC 服务共享同一个服务器端 假如在订单服务中,为了管理的需求,需要在同一个服务端运行另一个 RPC 服务,这样客户端就能重用同一个连接,这样就可以按需调用相应的服务。通过相应的服务器端注册函数,可以使多个服务注册在同一个服务器端。 具体代码如下: func main() { initSampleData() lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to isten: %v", err) } s := grpc.NewServer(grpc.UnaryInterceptor(orderUnaryServerInterceptor)) ordermgt_pb.RegisterOrderManagementServer(s, &orderMgtServer{}) user_pb.RegisterUserManagementServer(s, &userMgtServer{}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } } 这样就是多个 gRPC 服务共享同一个服务端连接,这样就可以按需调用相应的服务。同理,通过客户端,可以在两个 gRPC 客户端存根间共享相同的 gRPC 客户端连接。 两个 gRPC 客户端存根共享同一个客户端连接 如代码所示,两个 gRPC 服务在同一个 gRPC 服务器端运行,所以可以创建一个 gRPC 连接,并在两个服务创建 gRPC 客户端实例时使用这个连接。 func main() { conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() client := pb.NewOrderManagementClient(conn) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() heclien := hwpb.NewGreetingClient(conn) } 小结 在我们的日常使用中,通常不会出现两个服务间共享同一个服务器端连接的情况。 ...

一月 9, 2022 · overstarry