GRPC多路复用

今天来讲一讲gRPC的多路复用,gRPC 的多路复用是指 一个 gRPC 服务器端可以运行多个 gRPC 服务,也允许多个客户端存根使用同一个 gRPC 客户端连接。 我们继续沿用前面的代码来讲解如何使用多路复用。 多个 gRPC 服务共享同一个服务器端 假如在订单服务中, 为了管理的需求,需要在同一个服务端运行另一个 RPC 服务,这样客户端就能重用同一个连接,这样就可以按需调用相应的服务。 通过相应的服务器端注册函数,可以使多个服务注册在同一个服务器端。 具体代码如下: func main() { initSampleData() lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to isten: %v", err) } s := grpc.NewServer(grpc.UnaryInterceptor(orderUnaryServerInterceptor)) ordermgt_pb.RegisterOrderManagementServer(s, &orderMgtServer{}) user_pb.RegisterUserManagementServer(s, &userMgtServer{}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } } 这样就是多个 gRPC 服务共享同一个服务端连接,这样就可以按需调用相应的服务。同理,通过客户端,可以在两个 gRPC 客户端存根间共享相同的 gRPC 客户端连接。 两个 gRPC 客户端存根共享同一个客户端连接 如代码所示,两个 gRPC 服务在同一个 gRPC 服务器端运行,所以可以创建一个 gRPC 连接,并在两个服务创建 gRPC 客户端实例时使用这个连接。 func main() { conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock()) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() client := pb.NewOrderManagementClient(conn) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() heclien := hwpb.NewGreetingClient(conn) } 小结 在我们的日常使用中,通常不会出现两个服务间共享同一个服务器端连接的情况。 ...

一月 9, 2022 · overstarry

Trace_in_sql

使用分布式链路追踪查看 sql 的执行情况 今天我们来讲一讲如何在 go 语言中使用 OpenTelemetry 链路追踪追踪 sql的执行情况(执行时间、语句等)。 初始化 我们这里需要有一个采用了数据库的项目,为了使用方便,我们这里采用了 ent 来进行数据库的操作。 初始化数据库实体 为了演示方便,我们这里简单定义一个 user 实体, user 有2个成员字段 id 和 name。 $ ent init User package schema import ( "entgo.io/ent" "entgo.io/ent/schema/field" ) // User holds the schema definition for the User entity. type User struct { ent.Schema } // Fields of the User. func (User) Fields() []ent.Field { return []ent.Field{ field.Int("id"), field.String("name"), } } // Edges of the User. func (User) Edges() []ent.Edge { return nil } 执行 go run -mod=mod entgo.io/ent/cmd/ent generate ./schema,就会生成一系列的文件。 ...

十二月 31, 2021 · overstarry

GRPC元数据

今天我来讲一讲 gRPC 元数据的使用,以及如何获取元数据。 gRPC 应用程序通常会通过 gRPC 服务和消费者之间的 RPC 来共享信息。在大多数场景中,某些与服务端业务逻辑相关的信息会作为远程调用方法的参数,但在某些场景可能存在与服务端业务上下文无关的数据,这些数据不应该通过参数来传递,而应该通过 gRPC 元数据来处理传递。元数据的结构构造是K-V 形式,其中 k 为字符串,v 为任意类型。 接下来就介绍如何在 客户端和服务端之间使用元数据来传递数据。 创建和查询元数据 在 gRPC 应用程序中, 创建元数据非常简单,在 go 语言中有两种方式创建元数据:1) 通过 metadata.New(map[string]string{“key1”:“val1”}) 函数创建;2) 通过 metadata.Pairs 来创建元数据对,相同的 key 会被合并为切片数组。 md := metadata.New(map[string]string{"foo": "bar"}) md := metadata.Pairs( "key1", "value1", "key2", "value2", ) 二进制数据也可以设置为元数据值,以元数据值形式所设置的二进制数据在发送前都会进行 base4 编码,在传输过程中会被解码。 在客户端或服务端读取元数据,可以通过传入的 RPC 上下文和 metadata.FromIncomingContext 函数来获取元数据。 md, metadataAvailble := metadata.FromIncomingContext(ctx) 接下来讲解客户端和服务端如何发送和接受元数据。 客户端发送接收元数据 在客户端,要发送元数据,可以创建元数据并将其设置到 RPC 上下文中。在 go 语言中有两种方式实现, 可以使用 NewOutgoingContext 函数创建,也可以使用 AppendToOutgoingContext 函数来将元数据附加到 RPC 上下文中, 使用 NewOutgoingContext 会替换掉上下文中已有的元数据。 在创建完带有元数据的上下文后,就可以用于 RPC 中了。在上下文中设置的元数据会转换成 header 信息。 // 客户端发送元数据 // ****** Metadata : Creation ***** md := metadata.Pairs( "timestamp", time.Now().Format(time.StampNano), "kn", "vn", ) mdCtx := metadata.NewOutgoingContext(context.Background(), md) ctxA := metadata.AppendToOutgoingContext(mdCtx, "k1", "v1", "k1", "v2", "k2", "v3") // Search Order searchStream, _ := client.SearchOrders(ctxA, &wrapper.StringValue{Value: "Google"}) for { searchOrder, err := searchStream.Recv() if err == io.EOF { log.Print("EOF") break } if err == nil { log.Print("Search Result : ", searchOrder) } } // 在 gRPC 中接受元数据 var header,trailer metadata.MD r,err := client.SomeRPC( ctx, someReq, grpc.Header(&header), grpc.Trailer(trailer), ) stream, err := client.SomeStreamingRPC(ctx) header, err := stream.Header() trailer := stream.Trailer() 从对应的 RPC 获取到值时,就可以像处理map一样进行,对元数据进行相应处理。 ...

十二月 25, 2021 · overstarry

GRPC错误处理

今天来讲一讲 gRPC 错误处理的方式,以及如何自定义错误处理。当发起 gRPC 调用时,客户端会接受成功状态的响应或带有错误信息状态的错误响应。 考虑程序的健壮性,我们需要在编写客户端处理信息时,要处理所有可能的错误。编写服务端代码也要处理错误,并构建合适的错误状态码。 当 gRPC 发生错误时,会返回一个错误码。并附带一条可选的信息, 错误状态信息由一个整形状态码和一条字符串消息组成,适用于不同的语言实现。 下图展示了 gRPC 内置的错误码: 缺陷 gRPC 提供的错误模型非常有限,并且与底层的数据格式无关,最常用的数据格式就是 protocol buffers。 如果使用了 protocol buffers, google.rpc 提供了更丰富的错误模型,但语言兼容性待测试。 错误处理例子 接下来继续沿用前面的代码,来讲解如何运用错误处理。 假如我们需要在订单添加处理中处理非法 ID 请求。 如果我们传了一个不合法的 ID 如 -1,需要返回错误给客户端消费者。 func (s *server) AddOrder(ctx context.Context, orderReq *pb.Order) (*wrappers.StringValue, error) { if orderReq.Id == "-1" { log.Printf("Order ID is vaild : %s", orderReq.Id) errorStatus := status.New(codes.InvalidArgument, "Order ID is not valid") ds, err := errorStatus.WithDetails( &epb.BadRequest_FieldViolation{ Field: "ID", Description: fmt.Sprintf("Order ID received is not valid %s : %s", orderReq.Id, orderReq.Description), }, ) if err != nil { return nil, errorStatus.Err() } return nil, ds.Err() } orderMap[orderReq.Id] = *orderReq log.Println("Order : ", orderReq.Id, " -> Added") return &wrapper.StringValue{Value: "Order Added: " + orderReq.Id}, nil } 通过 status 包可以很方便的创建所需的错误码和相应错误详情。使用 Google API 的相应包可以设置更丰富的错误详情。 ...

十二月 18, 2021 · overstarry

GRPC拦截器

本篇文章我来介绍一下gRPC拦截器的使用,拦截器主要用于在服务器端和客户端拦截 RPC. 拦截器可以在 gRPC 中拦截 RPC 的执行,来满足一些特殊的需求,如日志,认证,访问控制等。 gRPC 提供了简单的接口,用来在客户端和服务端的 gRPC 协议中添加拦截器。 根据所使用的 gRPC 通信模式的不同,主要分为2种拦截器:1)一元拦截器,2)流拦截器。 既可以在客户端使用拦截器,也可以在服务端使用拦截器。 接下来,我会依次介绍在服务端和客户端的使用。 服务端拦截器 当客户端调用 gRPC 的远程调用方法时,可以通过服务端拦截器,在执行一些方法前,执行一些通用的操作。如果希望在 Rpc 服务中添加服务端拦截器,只需实现该拦截器,并在创建服务端时注册进来。 下面依次介绍两种服务端拦截器:1)一元拦截器,2)流拦截器。 一元拦截器 如果想在服务端拦截 一元 RPC 调用时,需要在服务端实现相应的函数,此函数的签名为: type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error) 我们在上一篇文章的代码中添加如下代码: func orderUnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { log.Println("======= [Server Interceptor] ", info.FullMethod) log.Printf(" Pre Proc Message : %s", req) m, err := handler(ctx, req) if err != nil { log.Printf("Error : %v", err) } log.Printf(" Post Proc Message : %s", m) return m, err } func main() { initSampleData() lis, err := net.Listen("tcp", port) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer(grpc.UnaryInterceptor(orderUnaryServerInterceptor)) pb.RegisterOrderManagementServer(s, &server{}) // Register reflection service on gRPC server. // reflection.Register(s) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } } 一元拦截器的实现主要分为三个部分: 前置处理、调用 RPC 服务、后置处理。在前置处理阶段可以通过检查参数获得 RPC 的信息,比如 RPC 上下文、请求和服务端信息。 ...

十二月 11, 2021 · overstarry