4、Golang——gRPC+protobuf
1、gRPC引入原因
(传输+序列化)
微服务架构中需要进行进程与进程、服务器与服务器之间的通信,需要发起网络调用
在微服务架构中,http虽然便捷,但是其性能较低,所以引入了RPC(远程过程调度),通过自定义协议发起TCP调度,来加快传输效率。
2、gRPC介绍
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609382/.jpg)
数据在网络中传输的时候,需要被序列化,序列化协议有很多种,比如xml、json、protobuf等,其中gRPC默认使用protobuf,这是google开源的一套成熟的结构数据序列化机制。
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609413/.jpg)
序列化的目的:便于传输以及存储。
3、protobuf
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609380/.jpg)
4、protobuf安装
4.1 下载通用编译器:https://github.com/protocolbuffers/protobuf
4.2 配置环境变量
4.3 安装对应语言的protoc的生成器
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609300/.jpg)
4.4 protobuf使用
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609380/.jpg)
5、protobuf简单例子
文件目录:
. ├── go.mod ├── go.sum ├── main.go ├── pbfile │ └── user.proto └── service └── user.pb.go
user.proto
//指定语法版本 syntax ="proto3"; //对应生成的目录 option go_package="../service"; //option go_package= "path;name"; //path表示生成的go文件的存放地址,name表示生成的go文件所属的报名 //生成文件之后,它的包名 package service; //消息 传输的对象 message User{ string username=1; int32 age=2; }
执行
protoc --go_out=生成文件的位路径 proto文件的路径
protoc --go_out=./ ./user.proto
main.go
package main import ( "fmt" "google.golang.org/protobuf/proto" "myproto/service" ) func main() { user := &service.User{ Username: "hyh", Age: 18, } //序列化的过程 marshal, err := proto.Marshal(user) if err != nil { panic(err) } //反序列化的过程 newUser := &service.User{} err = proto.Unmarshal(marshal, newUser) if err != nil { panic(err) } fmt.Println(newUser.String()) }
6、proto文件说明
message:
5.1 关键字 optional repeated
//指定语法版本 syntax ="proto3"; //对应生成的目录 option go_package="../service"; //option go_package= "path;name"; //path表示生成的go文件的存放地址,name表示生成的go文件所属的包名 //生成文件之后,它的包名 package service; //消息 传输的对象 message User{ string username=1; int32 age=2; optional string password=3; //optional 关键字 生成指针 repeated string addresses=4; //repeated 关键字 生成切片 }
5.2 字段映射
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609411/.jpg)
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609489/.jpg)
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609416/.jpg)
5.3 默认值
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609370/.jpg)
5.4 标识号
标识号:在消息体的定义中,每个字段都必须有一个唯一的标识号,标识号[0,2^29-1]范围内的一个整数。
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609361/.jpg)
5.5 定义多个消息类型
一个proto文件定义多个消息类型:
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609352/.jpg)
5.6 定义嵌套消息
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609427/.jpg)
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609488/.jpg)
5.7 自定义服务
如果想让将消息类型用在RPC系统中,可以在.proto文件中定义一个RPC服务接口,protocol buffer编译器会根据所选择的不同语言生成服务接口代码以及存根(stub)
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609444/.jpg)
以上代码在go语言中相当于定义了一个接口。
5.8 protobuf 序列化与反序列化
main.go:
package main import ( "fmt" "google.golang.org/protobuf/proto" "log" "test/user" ) func main() { article := &user.Article{ Aid: 1, Title: "protobuf", Views: 100, } //序列化 结构体转二进制 bytes, _ := proto.Marshal(article) fmt.Printf("bytes: %v\n", bytes) //反序列化 二进制转结构体 other := &user.Article{} err := proto.Unmarshal(bytes, other) if err != nil { log.Fatal("失败", err) } fmt.Println("other: %v %v %v", other.Aid, other.Title, other.Views) }
5.9 protobuf与json相互切换
user.proto:
syntax="proto3"; option go_package="./;blog"; package blog; message User{ int32 uid=1; string uname =2; int32 age=3; } message Article{ int32 aid=1; string title=2; int32 views=3; }
main.go
package main import ( "fmt" "google.golang.org/protobuf/encoding/protojson" "test/blog" ) func main() { article := &blog.Article{ Aid: 1, Title: "protobuf", Views: 100, } //message to json s := protojson.Format(article.ProtoReflect().Interface()) fmt.Printf("s:%v\n", s) //json to message m := article.ProtoReflect().Interface() protojson.Unmarshal([]byte(s), m) fmt.Printf("m:%v\n", m) }
6、gRPC
暂时无法在飞书文档外展示此内容
6.1 RPC与gRPC
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609474/.jpg)
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609277/.jpg)
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609584/.jpg)
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609475/.jpg)
官方文档:https://grpc.io/
底层协议:
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609450/.jpg)
HTTP2:
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609346/.jpg)
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609589/.jpg)
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609438/.jpg)
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609540/.jpg)
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609478/.jpg)
6.2 gRPC实例
代码结构:
myproto ├── client │ ├── grpc_client.go │ └── service │ └── product.pb.go ├── go.mod ├── go.sum ├── grpc_server.gp.go ├── pbfile │ └── product.proto └── service ├── product.go └── product.pb.go
说明:myproto/client/service/product.pb.go与myproto/service/product.pb.go内容完全相同,但是前者是为grpc_client.go提供grpc服务,后者是为grpc_server.gp.go提供grpc服务。
/myproto/pbfile/product.proto
syntax ="proto3"; option go_package="../service"; //option go_package= "path;name"; //path表示生成的go文件的存放地址,name表示生成的go文件所属的报名 package service; message ProductRequest{ int32 prod_id=1; } message ProductResponse{ int32 prod_stock=1; } //定义服务主体 service ProService{ //定义方法 rpc GetProductStock(ProductRequest) returns(ProductResponse); }
执行命令:
protoc --go_out=plugins=grpc:./ ./product.proto
/myproto/server/product.go
package service import "context" //定义一个结构体 type productService struct { } //定义一个结构体实例 var ProductService = &productService{} //实现该变量的ProServiceServer接口方法, func (p *productService) GetProductStock(ctx context.Context, request *ProductRequest) (*ProductResponse, error) { //todo 具体业务逻辑 stock := p.GetStockById(request.ProdId) return &ProductResponse{ProdStock: stock}, nil } func (p *productService) GetStockById(id int32) int32 { return 110 }
/myproto/client/client.go
package main import ( "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "log" "myproto/client/service" ) func main() { conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalln("服务端出错,连接不上!", err) } defer conn.Close() prodClient := service.NewProServiceClient(conn) requst := &service.ProductRequest{ ProdId: 123, } stockResponse, err := prodClient.GetProductStock(context.Background(), requst) if err != nil { log.Fatal("查询库存失败", err) } fmt.Println("查询成功,输出:", stockResponse.ProdStock) }
/myproto/grpc_server.go
package main import ( "fmt" "google.golang.org/grpc" "log" "myproto/service" "net" ) func main() { //new一个服务端 recServer := grpc.NewServer() //service 是我们自己的包,里面有一个RegisterProServiceServer的方法注册一个服务 //接口实现service.ProductService被注册上了 service.RegisterProServiceServer(recServer, service.ProductService) listener, err := net.Listen("tcp", ":8002") if err != nil { log.Fatalln("启动监听失败", err) } err = recServer.Serve(listener) if err != nil { log.Fatalln("启动服务失败", err) } fmt.Println("启动gRPC服务成功") }
7、gRPC安全传输
7.1 生成自签名证书
1) 下载安装openssl
2)生成密钥文件
openssl genrsa -des3 -out ca.key 2048
3)创建证书请求(对应的公钥生成)
openssl req -new -key ca.key -out ca.csr
4)ca.crt
openssl x509 -req -days 365 -in ca.csr -signkey ca.key -out ca.crt
5} 找到openssl.cnf 文件
- 打开copy_extensions = copy
- 打开 req_extensions = v3_req
- 找到[ v3_req ],添加 subjectAltName = @alt_names
- 添加新的标签 [ alt_names ] , 和标签字段
[ alt_names ]DNS.1 = *.zjtd.com
6)生成证书私钥server.key
openssl genpkey -algorithm RSA -out server.key
7)通过私钥server.key生成证书请求文件server.csr
openssl req -new -nodes -key server.key -out server.csr -days 3650 -config ./openssl.cnf -extensions v3_req
8)生成SAN证书
openssl x509 -req -days 365 -in server.csr -out server.pem -CA ca.crt -CAkey ca.key -CAcreateserial -extfile ./openssl.cnf -extensions v3_req
- key: 服务器上的私钥文件,用于对发送给客户端数据的加密,以及对从客户端接收到数据的解密。
- csr: 证书签名请求文件,用于提交给证书颁发机构(CA)对证书签名。
- crt: 由证书颁发机构(CA)签名后的证书,或者是开发者自签名的证书,包含证书持有人的信息,持有人的公钥,以及签署者的签名等信息。
- pem: 是基于Base64编码的证书格式,扩展名包括PEM、CRT和CER。
Q :什么是 SAN?
A :SAN(Subject Alternative Name)是 SSL 标准 x509 中定义的一个扩展。使用了 SAN 字段的 SSL 证书,可以扩展此证书支持的域名,使得一个证书可以支持多个不同域名的解析。
9)实例代码
目录:
. ├── cert │ ├── ca.crt │ ├── ca.csr │ ├── ca.key │ ├── ca.srl │ ├── openssl.cnf │ ├── server.csr │ ├── server.key │ └── server.pem ├── client │ ├── grpc_client.go │ └── service │ └── product.pb.go ├── go.mod ├── go.sum ├── grpc_server.gp ├── grpc_server.gp.go ├── pbfile │ └── product.proto └── service ├── product.go └── product.pb.go
grpc_server.gp.go:服务端
package main import ( "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "log" "myproto/service" "net" ) func main() { //添加证书 creds, err2 := credentials.NewServerTLSFromFile("cert/server.pem", "cert/server.key") if err2 != nil { log.Fatal("证书生成错误", err2) } //new一个服务端 recServer := grpc.NewServer(grpc.Creds(creds)) //service 是我们自己的包,里面有一个RegisterProServiceServer的方法注册一个服务 //接口实现service.ProductService被注册上了 // service.RegisterProServiceServer(服务端, 实现主体函数的任意实例 interface{}) service.RegisterProServiceServer(recServer, service.ProductService) listener, err := net.Listen("tcp", ":8008") if err != nil { log.Fatalln("启动监听失败", err) } err = recServer.Serve(listener) if err != nil { log.Fatalln("启动服务失败", err) } fmt.Println("启动gRPC服务成功") }
grpc_client.go:客户端
package main import ( "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "log" "myproto/client/service" ) func main() { //添加证书 creds, err2 := credentials.NewClientTLSFromFile("cert/server.pem", "*.zjtd.com") if err2 != nil { log.Fatal("证书错误", err2) } //无证书方式 //conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(insecure.NewCredentials())) //if err != nil { // log.Fatalln("服务端出错,连接不上!", err) //} conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(creds)) if err != nil { log.Fatalln("服务端出错,连接不上!", err) } defer conn.Close() prodClient := service.NewProServiceClient(conn) requst := &service.ProductRequest{ ProdId: 123, } stockResponse, err := prodClient.GetProductStock(context.Background(), requst) if err != nil { log.Fatal("查询库存失败", err) } fmt.Println("查询成功,输出:", stockResponse.ProdStock) }
上述认证方式为单向认证:
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609492/.jpg)
中间人攻击
7.2 双向认证
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609464/.jpg)
上面的server.pem和server.key 是服务端的 公钥和私钥。
如果双向认证,客户端也需要生成对应的公钥和私钥。
1)私钥:
openssl genpkey -algorithm RSA -out client.key
2)证书:
openssl req -new -nodes -key client.key -out client.csr -days 3650 -config ./openssl.cnf -extensions v3_req
3)SAN证书:
openssl x509 -req -days 365 -in client.csr -out client.pem -CA ca.crt -CAkey ca.key -CAcreateserial -extfile ./openssl.cnf -extensions v3_req
4)实例代码
目录:
. ├── cert │ ├── ca.crt │ ├── ca.csr │ ├── ca.key │ ├── ca.srl │ ├── client.csr │ ├── client.key │ ├── client.pem │ ├── openssl.cnf │ ├── server.csr │ ├── server.key │ └── server.pem ├── client │ ├── grpc_client.go │ └── service │ └── product.pb.go ├── go.mod ├── go.sum ├── grpc_server.gp ├── grpc_server.gp.go ├── pbfile │ └── product.proto └── service ├── product.go └── product.pb.go
grpc_server.gp.go:
package main import ( "crypto/tls" "crypto/x509" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "io/ioutil" "log" "myproto/service" "net" ) func main() { ////添加单向证书 //creds, err2 := credentials.NewServerTLSFromFile("cert/server.pem", "cert/server.key") //if err2 != nil { // log.Fatal("证书生成错误", err2) //} //添加双向证书 cert, err := tls.LoadX509KeyPair("cert/server.pem", "cert/server.key") if err != nil { log.Fatal("证书读取错误", err) } // 创建一个新的、空的 CertPool certPool := x509.NewCertPool() ca, err := ioutil.ReadFile("cert/ca.crt") if err != nil { log.Fatal("ca证书读取错误", err) } // 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用 certPool.AppendCertsFromPEM(ca) // 构建基于 TLS 的 TransportCredentials 选项 creds := credentials.NewTLS(&tls.Config{ // 设置证书链,允许包含一个或多个 Certificates: []tls.Certificate{cert}, // 要求必须校验客户端的证书。可以根据实际情况选用以下参数 ClientAuth: tls.RequireAndVerifyClientCert, // 设置根证书的集合,校验方式使用 ClientAuth 中设定的模式 ClientCAs: certPool, }) //new一个服务端 recServer := grpc.NewServer(grpc.Creds(creds)) //service 是我们自己的包,里面有一个RegisterProServiceServer的方法注册一个服务 //接口实现service.ProductService被注册上了 // service.RegisterProServiceServer(服务端, 实现主体函数的任意实例 interface{}) service.RegisterProServiceServer(recServer, service.ProductService) fmt.Println("认证成功") listener, err := net.Listen("tcp", ":8008") if err != nil { log.Fatalln("启动监听失败", err) } err = recServer.Serve(listener) if err != nil { log.Fatalln("启动服务失败", err) } fmt.Println("启动gRPC服务成功") }
grpc_client.go:
package main import ( "context" "crypto/tls" "crypto/x509" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "io/ioutil" "log" "myproto/client/service" ) func main() { ////添加证书 //creds, err2 := credentials.NewClientTLSFromFile("cert/server.pem", "*.zjtd.com") //if err2 != nil { // log.Fatal("证书错误", err2) //} ////无证书方式 //conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(insecure.NewCredentials())) //if err != nil { // log.Fatalln("服务端出错,连接不上!", err) //} //添加双向认证 // 证书认证-双向认证 // 从证书相关文件中读取和解析信息,得到证书公钥、密钥对 cert, _ := tls.LoadX509KeyPair("cert/client.pem", "cert/client.key") // 创建一个新的、空的 CertPool certPool := x509.NewCertPool() ca, _ := ioutil.ReadFile("cert/ca.crt") // 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用 certPool.AppendCertsFromPEM(ca) // 构建基于 TLS 的 TransportCredentials 选项 creds := credentials.NewTLS(&tls.Config{ // 设置证书链,允许包含一个或多个 Certificates: []tls.Certificate{cert}, // 要求必须校验客户端的证书。可以根据实际情况选用以下参数 ServerName: "*.zjtd.com", RootCAs: certPool, }) conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(creds)) if err != nil { log.Fatalln("服务端出错,连接不上!", err) } defer conn.Close() prodClient := service.NewProServiceClient(conn) requst := &service.ProductRequest{ ProdId: 123, } stockResponse, err := prodClient.GetProductStock(context.Background(), requst) if err != nil { log.Fatal("查询库存失败", err) } fmt.Println("查询成功,输出:", stockResponse.ProdStock) }
7.5 Token 认证
jwt等等
目录:
─ cert │ ├── ca.crt │ ├── ca.csr │ ├── ca.key │ ├── ca.srl │ ├── client.csr │ ├── client.key │ ├── client.pem │ ├── openssl.cnf │ ├── server.csr │ ├── server.key │ └── server.pem ├── client │ ├── auth │ │ └── auth.go │ ├── grpc_client.go │ └── service │ └── product.pb.go ├── go.mod ├── go.sum ├── grpc_server.gp ├── grpc_server.gp.go ├── pbfile │ └── product.proto └── service ├── product.go └── product.pb.go
1)服务器添加用户名密码的校验
package main import ( "context" "crypto/tls" "crypto/x509" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "io/ioutil" "log" "myproto/service" "net" ) func main() { ////添加单向证书 //creds, err2 := credentials.NewServerTLSFromFile("cert/server.pem", "cert/server.key") //if err2 != nil { // log.Fatal("证书生成错误", err2) //} //添加双向证书 cert, err := tls.LoadX509KeyPair("cert/server.pem", "cert/server.key") if err != nil { log.Fatal("证书读取错误", err) } // 创建一个新的、空的 CertPool certPool := x509.NewCertPool() ca, err := ioutil.ReadFile("cert/ca.crt") if err != nil { log.Fatal("ca证书读取错误", err) } // 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用 certPool.AppendCertsFromPEM(ca) // 构建基于 TLS 的 TransportCredentials 选项 creds := credentials.NewTLS(&tls.Config{ // 设置证书链,允许包含一个或多个 Certificates: []tls.Certificate{cert}, // 要求必须校验客户端的证书。可以根据实际情况选用以下参数 ClientAuth: tls.RequireAndVerifyClientCert, // 设置根证书的集合,校验方式使用 ClientAuth 中设定的模式 ClientCAs: certPool, }) //实现token认证,需要合法的用户名和密码 //实现一个拦截器 var AuthInterceptor grpc.UnaryServerInterceptor AuthInterceptor = func( ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { //拦截普通方法请求,验证tokrn err = Auth(ctx) if err != nil { return } return handler(ctx, req) } recServer := grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(AuthInterceptor)) //new一个服务端 //recServer := grpc.NewServer(grpc.Creds(creds)) //service 是我们自己的包,里面有一个RegisterProServiceServer的方法注册一个服务 //接口实现service.ProductService被注册上了 // service.RegisterProServiceServer(服务端, 实现主体函数的任意实例 interface{}) service.RegisterProServiceServer(recServer, service.ProductService) fmt.Println("认证成功") listener, err := net.Listen("tcp", ":8008") if err != nil { log.Fatalln("启动监听失败", err) } err = recServer.Serve(listener) if err != nil { log.Fatalln("启动服务失败", err) } fmt.Println("启动gRPC服务成功") } func Auth(ctx context.Context) error { //实际上拿到传输的用户名和密码 md, ok := metadata.FromIncomingContext(ctx) if !ok { return fmt.Errorf("missing credentials") } var user string var password string if val, ok := md["user"]; ok { user = val[0] } if val, ok := md["password"]; ok { password = val[0] } if user != "hyh" || password != "123456" { return status.Error(codes.Unauthenticated, "token不合法") } return nil }
2)auth.go
package auth import "context" type Authentication struct { User string Password string } func (a *Authentication) GetRequestMetadata(context.Context, ...string) (map[string]string, error) { return map[string]string{"user": a.User, "password": a.Password}, nil } func (a *Authentication) RequireTransportSecurity() bool { return false }
3)客户端
package main import ( "context" "crypto/tls" "crypto/x509" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "io/ioutil" "log" "myproto/client/auth" "myproto/client/service" ) func main() { ////添加证书 //creds, err2 := credentials.NewClientTLSFromFile("cert/server.pem", "*.zjtd.com") //if err2 != nil { // log.Fatal("证书错误", err2) //} ////无证书方式 //conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(insecure.NewCredentials())) //if err != nil { // log.Fatalln("服务端出错,连接不上!", err) //} //添加双向认证 // 证书认证-双向认证 // 从证书相关文件中读取和解析信息,得到证书公钥、密钥对 cert, _ := tls.LoadX509KeyPair("cert/client.pem", "cert/client.key") // 创建一个新的、空的 CertPool certPool := x509.NewCertPool() ca, _ := ioutil.ReadFile("cert/ca.crt") // 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用 certPool.AppendCertsFromPEM(ca) // 构建基于 TLS 的 TransportCredentials 选项 creds := credentials.NewTLS(&tls.Config{ // 设置证书链,允许包含一个或多个 Certificates: []tls.Certificate{cert}, // 要求必须校验客户端的证书。可以根据实际情况选用以下参数 ServerName: "*.zjtd.com", RootCAs: certPool, }) //Token认证 token := &auth.Authentication{ User: "hyh", Password: "123456", } conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(creds), grpc.WithPerRPCCredentials(token)) if err != nil { log.Fatalln("服务端出错,连接不上!", err) } defer conn.Close() prodClient := service.NewProServiceClient(conn) requst := &service.ProductRequest{ ProdId: 123, } stockResponse, err := prodClient.GetProductStock(context.Background(), requst) if err != nil { log.Fatal("查询库存失败", err) } fmt.Println("查询成功,输出:", stockResponse.ProdStock) }
8、更换protoc-gen-go生成器
前面的课程中,我们使用的proto的go生成器,使用的命令是
go get github.com/golang/protobuf/protoc-gen-go,在https://www.grpc.io/docs/languages/go/quickstart/ 中,我们发现
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609336/.jpg)
官方使用的和我们使用的并不一致。
github的方式,需要使用--go_out=plugins=grpc来去进行生成,而在golang.org方式中,弃用了这种方式,使用protoc-gen-go将不再支持gRPC service的定义,需要使用新的插件protoc-gen-go-grpc。
1. 使用google.golang.org/protobuf
- 安装插件
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
- 安装完成后会在gopath下的bin目录下生成
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609383/.jpg)
- 利用proto文件重新生成go文件
#在myproto文件下运行 protoc --go_out=./service --go-grpc_out=./service pbfile/product.proto
- 修改之前的service实现
package service import ( "context" ) var ProductService = &productService{} type productService struct { } func (p *productService) GetProductStock(ctx context.Context, request *ProductRequest) (*ProductResponse, error) { return &ProductResponse{ProdStock: request.ProdId}, nil } func (p *productService) mustEmbedUnimplementedProdServiceServer() {}
9、import使用
1、import
文件目录:
. ├── cert │ ├── ca.crt │ ├── ca.csr │ ├── ca.key │ ├── ca.srl │ ├── client.csr │ ├── client.key │ ├── client.pem │ ├── openssl.cnf │ ├── server.csr │ ├── server.key │ └── server.pem ├── client │ ├── auth │ │ └── auth.go │ ├── grpc_client.go │ └── service │ ├── product.pb.go │ ├── product_grpc.pb.go │ └── user.pb.go ├── go.mod ├── go.sum ├── grpc_server.gp ├── grpc_server.gp.go ├── pbfile │ ├── product.proto │ └── user.proto └── service ├── product.go ├── product.pb.go ├── product_grpc.pb.go └── user.pb.go
product.proto:
syntax ="proto3"; import "pbfile/user.proto"; option go_package="../service"; //option go_package= "path;name"; //path表示生成的go文件的存放地址,name表示生成的go文件所属的报名 package service; message ProductRequest{ int32 prod_id=1; } message ProductResponse{ int32 prod_stock=1; User user=2; } //定义服务主体 service ProService{ //定义方法 rpc GetProductStock(ProductRequest) returns(ProductResponse); }
user.proto:
syntax = "proto3"; option go_package="../service"; package service; message User{ string username=1; int32 age=2; optional string password=3; repeated string addresses=4; }
编译proto文件:
protoc --go_out=./service --go-grpc_out=./service pbfile/user.proto protoc --go_out=./service --go-grpc_out=./service pbfile/product.proto
product.go:
package service import ( "context" ) //定义一个结构体 type productService struct { } //定义一个结构体实例 var ProductService = &productService{} //实现该变量的ProServiceServer接口方法, func (p *productService) GetProductStock(ctx context.Context, request *ProductRequest) (*ProductResponse, error) { //todo 具体业务逻辑 stock := p.GetStockById(request.ProdId) user := &User{Username: "hyh"} return &ProductResponse{ProdStock: stock, User: user}, nil } func (p *productService) GetStockById(id int32) int32 { return id } func (p *productService) mustEmbedUnimplementedProServiceServer() {}
grpc_client.go:
package main import ( "context" "crypto/tls" "crypto/x509" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "io/ioutil" "log" "zjtd.com/myproto/client/auth" "zjtd.com/myproto/client/service" ) func main() { ////添加证书 //creds, err2 := credentials.NewClientTLSFromFile("cert/server.pem", "*.zjtd.com") //if err2 != nil { // log.Fatal("证书错误", err2) //} ////无证书方式 //conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(insecure.NewCredentials())) //if err != nil { // log.Fatalln("服务端出错,连接不上!", err) //} //添加双向认证 // 证书认证-双向认证 // 从证书相关文件中读取和解析信息,得到证书公钥、密钥对 cert, _ := tls.LoadX509KeyPair("cert/client.pem", "cert/client.key") // 创建一个新的、空的 CertPool certPool := x509.NewCertPool() ca, _ := ioutil.ReadFile("cert/ca.crt") // 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用 certPool.AppendCertsFromPEM(ca) // 构建基于 TLS 的 TransportCredentials 选项 creds := credentials.NewTLS(&tls.Config{ // 设置证书链,允许包含一个或多个 Certificates: []tls.Certificate{cert}, // 要求必须校验客户端的证书。可以根据实际情况选用以下参数 ServerName: "*.zjtd.com", RootCAs: certPool, }) //Token认证 token := &auth.Authentication{ User: "hyh", Password: "123456", } conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(creds), grpc.WithPerRPCCredentials(token)) if err != nil { log.Fatalln("服务端出错,连接不上!", err) } defer conn.Close() prodClient := service.NewProServiceClient(conn) requst := &service.ProductRequest{ ProdId: 123, } stockResponse, err := prodClient.GetProductStock(context.Background(), requst) if err != nil { log.Fatal("查询库存失败", err) } fmt.Println("查询成功,输出:", stockResponse.ProdStock, stockResponse.User) }
2、任意类型
// 使用any类型,需要导入这个 import "google/protobuf/any.proto"; // 定义入参消息 message HelloParam{ // any,代表可以是任何类型 暂时不知道定义为什么类型 google.protobuf.Any data = 1; }
// 这个就是protobuf的中间文件// 指定的当前proto语法的版本,有2和3 syntax = "proto3"; //从执行 protoc这个命令的当前目录开始算起, import "user.proto"; // 使用any类型,需要导入这个 import "google/protobuf/any.proto"; option go_package="../service"; // 指定等会文件生成出来的package package service; // 定义request model message ProductRequest{ int32 prod_id = 1; // 1代表顺序 } message Content { string msg = 1; } // 定义response model message ProductResponse{ int32 prod_stock = 1; // 1代表顺序 User user = 2; google.protobuf.Any data = 3; } // 定义服务主体 service ProdService{ // 定义方法 rpc GetProductStock(ProductRequest) returns(ProductResponse); }
func (p *productService) GetProductStock(context context.Context, request *ProductRequest) (*ProductResponse, error) { //实现具体的业务逻辑 stock := p.GetStockById(request.ProdId) user := User{Username: "mszlu"} content := Content{Msg: "mszlu msg..."} //转换成any类型 any, _ := anypb.New(&content) return &ProductResponse{ProdStock: stock, User: &user, Data: any}, nil }
10、客户端流
在 HTTP/1.1 的时代,同一个时刻只能对一个请求进行处理或者响应,换句话说,下一个请求必须要等当前请求处理完才能继续进行。
HTTP/1.1需要注意的是,在服务端没有response的时候,客户端是可以发起多个request的,但服务端依旧是顺序对请求进行处理, 并按照收到请求的次序予以返回。
HTTP/2 的时代,多路复用的特性让一次同时处理多个请求成为了现实,并且同一个 TCP 通道中的请求不分先后、不会阻塞,HTTP/2 中引入了流(Stream) 和 帧(Frame) 的概念,当 TCP 通道建立以后,后续的所有操作都是以流的方式发送的,而二进制帧则是组成流的最小单位,属于协议层上的流式传输。
HTTP/2 在一个 TCP 连接的基础上虚拟出多个 Stream, Stream 之间可以并发的请求和处理, 并且 HTTP/2 以二进制帧 (frame) 的方式进行数据传送, 并引入了头部压缩 (HPACK), 大大提升了交互效率
10.1 流定义
1 // 普通 RPC2 rpc SimplePing(PingRequest) returns (PingReply); 34 // 客户端流式 RPC5 rpc ClientStreamPing(stream PingRequest) returns (PingReply); 67 // 服务器端流式 RPC8 rpc ServerStreamPing(PingRequest) returns (stream PingReply); 910 // 双向流式 RPC11 rpc BothStreamPing(stream PingRequest) returns (stream PingReply);
stream关键字,当该关键字修饰参数时,表示这是一个客户端流式的 gRPC 接口;当该参数修饰返回值时,表示这是一个服务器端流式的 gRPC 接口;当该关键字同时修饰参数和返回值时,表示这是一个双向流式的 gRPC 接口。
10.2 客户端流定义:
客户端可以源源不断的给服务端发送信息
rpc UpdateStockClientStream(stream ProductRequest) returns(ProductResponse);
product.proto:
定义客户端流rpc方法
syntax ="proto3";
import "pbfile/user.proto";
import "google/protobuf/any.proto";
option go_package="../service";
//option go_package= "path;name";
//path表示生成的go文件的存放地址,name表示生成的go文件所属的报名
package service;
message ProductRequest{
int32 prod_id=1;
}
message Content{
string msg=1;
}
message ProductResponse{
int32 prod_stock=1;
User user=2;
google.protobuf.Any data=3;
}
//定义服务主体
service ProService{
//定义方法
rpc GetProductStock(ProductRequest) returns(ProductResponse); rpc UpdateProductStockClientStream(stream ProductRequest) returns(ProductResponse); }
执行protoc命令:
protoc --go_out=./service --go-grpc_out=./service pbfile/product.proto
/service/product.go:
实现对应接口
package service import ( "context" "fmt" "google.golang.org/protobuf/types/known/anypb" ) //定义一个结构体 type productService struct { } //定义一个结构体实例 var ProductService = &productService{} //实现该变量的ProServiceServer接口方法, func (p *productService) GetProductStock(ctx context.Context, request *ProductRequest) (*ProductResponse, error) { //todo 具体业务逻辑 stock := p.GetStockById(request.ProdId) user := &User{Username: "hyh"} contnet := &Content{Msg: "zjtd msg..."} //转换为any类型 any, _ := anypb.New(contnet) return &ProductResponse{ProdStock: stock, User: user, Data: any}, nil } func (p *productService) UpdateProductStockClientStream(stream ProService_UpdateProductStockClientStreamServer) error { count := 0 for { //源源不断的接受客户端发来的信息 recv, err := stream.Recv() if err != nil { return err } fmt.Println("服务端接受到到流", recv.ProdId, count) count++ if count > 10 { rsp := &ProductResponse{ProdStock: recv.ProdId} err := stream.SendAndClose(rsp) if err != nil { return err } return nil } } } func (p *productService) GetStockById(id int32) int32 { return id } func (p *productService) mustEmbedUnimplementedProServiceServer() {}
/myproto/grpc_server.go
服务端代码 不需要做修改
package main import ( "context" "crypto/tls" "crypto/x509" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "io/ioutil" "log" "net" "zjtd.com/myproto/service" ) func main() { ////添加单向证书 //creds, err2 := credentials.NewServerTLSFromFile("cert/server.pem", "cert/server.key") //if err2 != nil { // log.Fatal("证书生成错误", err2) //} //添加双向证书 cert, err := tls.LoadX509KeyPair("cert/server.pem", "cert/server.key") if err != nil { log.Fatal("证书读取错误", err) } // 创建一个新的、空的 CertPool certPool := x509.NewCertPool() ca, err := ioutil.ReadFile("cert/ca.crt") if err != nil { log.Fatal("ca证书读取错误", err) } // 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用 certPool.AppendCertsFromPEM(ca) // 构建基于 TLS 的 TransportCredentials 选项 creds := credentials.NewTLS(&tls.Config{ // 设置证书链,允许包含一个或多个 Certificates: []tls.Certificate{cert}, // 要求必须校验客户端的证书。可以根据实际情况选用以下参数 ClientAuth: tls.RequireAndVerifyClientCert, // 设置根证书的集合,校验方式使用 ClientAuth 中设定的模式 ClientCAs: certPool, }) //实现token认证,需要合法的用户名和密码 //实现一个拦截器 var AuthInterceptor grpc.UnaryServerInterceptor AuthInterceptor = func( ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { //拦截普通方法请求,验证tokrn err = Auth(ctx) if err != nil { return } return handler(ctx, req) } recServer := grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(AuthInterceptor)) //new一个服务端 //recServer := grpc.NewServer(grpc.Creds(creds)) //service 是我们自己的包,里面有一个RegisterProServiceServer的方法注册一个服务 //接口实现service.ProductService被注册上了 // service.RegisterProServiceServer(服务端, 实现主体函数的任意实例 interface{}) service.RegisterProServiceServer(recServer, service.ProductService) fmt.Println("认证成功") listener, err := net.Listen("tcp", ":8008") if err != nil { log.Fatalln("启动监听失败", err) } err = recServer.Serve(listener) if err != nil { log.Fatalln("启动服务失败", err) } fmt.Println("启动gRPC服务成功") } func Auth(ctx context.Context) error { //实际上拿到传输的用户名和密码 md, ok := metadata.FromIncomingContext(ctx) if !ok { return fmt.Errorf("missing credentials") } var user string var password string if val, ok := md["user.proto"]; ok { user = val[0] } if val, ok := md["password"]; ok { password = val[0] } if user != "hyh" || password != "123456" { return status.Error(codes.Unauthenticated, "token不合法") } return nil }
/myproto/client/grpc_client.go
需要远程调用服务端的 UpdateProductStockClientStream 函数
package main import ( "context" "crypto/tls" "crypto/x509" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "io/ioutil" "log" "time" "zjtd.com/myproto/client/auth" "zjtd.com/myproto/client/service" ) func main() { ////添加证书 //creds, err2 := credentials.NewClientTLSFromFile("cert/server.pem", "*.zjtd.com") //if err2 != nil { // log.Fatal("证书错误", err2) //} ////无证书方式 //conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(insecure.NewCredentials())) //if err != nil { // log.Fatalln("服务端出错,连接不上!", err) //} //添加双向认证 // 证书认证-双向认证 // 从证书相关文件中读取和解析信息,得到证书公钥、密钥对 cert, _ := tls.LoadX509KeyPair("cert/client.pem", "cert/client.key") // 创建一个新的、空的 CertPool certPool := x509.NewCertPool() ca, _ := ioutil.ReadFile("cert/ca.crt") // 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用 certPool.AppendCertsFromPEM(ca) // 构建基于 TLS 的 TransportCredentials 选项 creds := credentials.NewTLS(&tls.Config{ // 设置证书链,允许包含一个或多个 Certificates: []tls.Certificate{cert}, // 要求必须校验客户端的证书。可以根据实际情况选用以下参数 ServerName: "*.zjtd.com", RootCAs: certPool, }) //Token认证 token := &auth.Authentication{ User: "hyh", Password: "123456", } conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(creds), grpc.WithPerRPCCredentials(token)) if err != nil { log.Fatalln("服务端出错,连接不上!", err) } defer conn.Close() //声明一个客户端 prodClient := service.NewProServiceClient(conn) //requst := &service.ProductRequest{ // ProdId: 123, //} //stockResponse, err := prodClient.GetProductStock(context.Background(), requst) //if err != nil { // log.Fatal("查询库存失败", err) //} //fmt.Println("查询成功,输出:", stockResponse.ProdStock, stockResponse.User, stockResponse.Data) //客户端流 stream, err := prodClient.UpdateProductStockClientStream(context.Background()) if err != nil { log.Fatal("获取流出错", err) } rsp := make(chan struct{}, 1) //声明函数处理 stream go prodRequest(stream, rsp) select { case <-rsp: recv, err := stream.CloseAndRecv() if err != nil { log.Fatal(err) } stock := recv.ProdStock fmt.Println("客户端收到响应", stock) } } func prodRequest(stream service.ProService_UpdateProductStockClientStreamClient, rsp chan struct{}) { count := 0 for true { requst := &service.ProductRequest{ ProdId: 123, } err := stream.Send(requst) if err != nil { log.Fatal(err) } time.Sleep(time.Second) count++ if count > 10 { rsp <- struct{}{} break } } }
10.3 服务端流
定义:客户端一次发送,服务端可以源源不断的回消息
rpc GetProductStockServerStream(ProductRequest) returns(stream ProductResponse);
product.proto:
syntax ="proto3";
import "pbfile/user.proto";
import "google/protobuf/any.proto";
option go_package="../service";
//option go_package= "path;name";
//path表示生成的go文件的存放地址,name表示生成的go文件所属的报名
package service;
message ProductRequest{
int32 prod_id=1;
}
message Content{
string msg=1;
}
message ProductResponse{
int32 prod_stock=1;
User user=2;
google.protobuf.Any data=3;
}
//定义服务主体
service ProService{
//定义方法
rpc GetProductStock(ProductRequest) returns(ProductResponse);
//客户端流定义方法
rpc UpdateProductStockClientStream(stream ProductRequest) returns(ProductResponse);
//服务端流定义方法 rpc GetProductStockServerStream(ProductRequest) returns(stream ProductResponse); }
执行protoc命令:
protoc --go_out=./service --go-grpc_out=./service pbfile/product.proto
/service/product.go:
package service import ( "context" "fmt" "google.golang.org/protobuf/types/known/anypb" "time" ) //定义一个结构体 type productService struct { } //定义一个结构体实例 var ProductService = &productService{} //实现该变量的ProServiceServer接口方法, func (p *productService) GetProductStock(ctx context.Context, request *ProductRequest) (*ProductResponse, error) { //todo 具体业务逻辑 stock := p.GetStockById(request.ProdId) user := &User{Username: "hyh"} contnet := &Content{Msg: "zjtd msg..."} //转换为any类型 any, _ := anypb.New(contnet) return &ProductResponse{ProdStock: stock, User: user, Data: any}, nil } func (p *productService) UpdateProductStockClientStream(stream ProService_UpdateProductStockClientStreamServer) error { count := 0 for { //源源不断的接受客户端发来的信息 recv, err := stream.Recv() if err != nil { return err } fmt.Println("服务端接受到到流", recv.ProdId, count) count++ if count > 10 { rsp := &ProductResponse{ProdStock: recv.ProdId} err := stream.SendAndClose(rsp) if err != nil { return err } return nil } } } func (p *productService) GetProductStockServerStream(request *ProductRequest, stream ProService_GetProductStockServerStreamServer) error { count := 0 for { rsp := &ProductResponse{ProdStock: request.ProdId} err := stream.Send(rsp) if err != nil { return err } time.Sleep(time.Second) count++ if count > 10 { return nil } } } func (p *productService) GetStockById(id int32) int32 { return id } func (p *productService) mustEmbedUnimplementedProServiceServer() {}
/myproto/grpc_server.go
package main import ( "context" "crypto/tls" "crypto/x509" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "io/ioutil" "log" "net" "zjtd.com/myproto/service" ) func main() { ////添加单向证书 //creds, err2 := credentials.NewServerTLSFromFile("cert/server.pem", "cert/server.key") //if err2 != nil { // log.Fatal("证书生成错误", err2) //} //添加双向证书 cert, err := tls.LoadX509KeyPair("cert/server.pem", "cert/server.key") if err != nil { log.Fatal("证书读取错误", err) } // 创建一个新的、空的 CertPool certPool := x509.NewCertPool() ca, err := ioutil.ReadFile("cert/ca.crt") if err != nil { log.Fatal("ca证书读取错误", err) } // 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用 certPool.AppendCertsFromPEM(ca) // 构建基于 TLS 的 TransportCredentials 选项 creds := credentials.NewTLS(&tls.Config{ // 设置证书链,允许包含一个或多个 Certificates: []tls.Certificate{cert}, // 要求必须校验客户端的证书。可以根据实际情况选用以下参数 ClientAuth: tls.RequireAndVerifyClientCert, // 设置根证书的集合,校验方式使用 ClientAuth 中设定的模式 ClientCAs: certPool, }) //实现token认证,需要合法的用户名和密码 //实现一个拦截器 var AuthInterceptor grpc.UnaryServerInterceptor AuthInterceptor = func( ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { //拦截普通方法请求,验证tokrn err = Auth(ctx) if err != nil { return } return handler(ctx, req) } recServer := grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(AuthInterceptor)) //new一个服务端 //recServer := grpc.NewServer(grpc.Creds(creds)) //service 是我们自己的包,里面有一个RegisterProServiceServer的方法注册一个服务 //接口实现service.ProductService被注册上了 // service.RegisterProServiceServer(服务端, 实现主体函数的任意实例 interface{}) service.RegisterProServiceServer(recServer, service.ProductService) fmt.Println("认证成功") listener, err := net.Listen("tcp", ":8008") if err != nil { log.Fatalln("启动监听失败", err) } err = recServer.Serve(listener) if err != nil { log.Fatalln("启动服务失败", err) } fmt.Println("启动gRPC服务成功") } func Auth(ctx context.Context) error { //实际上拿到传输的用户名和密码 md, ok := metadata.FromIncomingContext(ctx) if !ok { return fmt.Errorf("missing credentials") } var user string var password string if val, ok := md["user.proto"]; ok { user = val[0] } if val, ok := md["password"]; ok { password = val[0] } if user != "hyh" || password != "123456" { return status.Error(codes.Unauthenticated, "token不合法") } return nil }
/myproto/client/grpc_client.go
package main import ( "context" "crypto/tls" "crypto/x509" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "io" "io/ioutil" "log" "zjtd.com/myproto/client/auth" "zjtd.com/myproto/client/service" ) func main() { ////添加证书 //creds, err2 := credentials.NewClientTLSFromFile("cert/server.pem", "*.zjtd.com") //if err2 != nil { // log.Fatal("证书错误", err2) //} ////无证书方式 //conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(insecure.NewCredentials())) //if err != nil { // log.Fatalln("服务端出错,连接不上!", err) //} //添加双向认证 // 证书认证-双向认证 // 从证书相关文件中读取和解析信息,得到证书公钥、密钥对 cert, _ := tls.LoadX509KeyPair("cert/client.pem", "cert/client.key") // 创建一个新的、空的 CertPool certPool := x509.NewCertPool() ca, _ := ioutil.ReadFile("cert/ca.crt") // 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用 certPool.AppendCertsFromPEM(ca) // 构建基于 TLS 的 TransportCredentials 选项 creds := credentials.NewTLS(&tls.Config{ // 设置证书链,允许包含一个或多个 Certificates: []tls.Certificate{cert}, // 要求必须校验客户端的证书。可以根据实际情况选用以下参数 ServerName: "*.zjtd.com", RootCAs: certPool, }) //Token认证 token := &auth.Authentication{ User: "hyh", Password: "123456", } conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(creds), grpc.WithPerRPCCredentials(token)) if err != nil { log.Fatalln("服务端出错,连接不上!", err) } defer conn.Close() prodClient := service.NewProServiceClient(conn) //requst := &service.ProductRequest{ // ProdId: 123, //} //stockResponse, err := prodClient.GetProductStoGetProductStockck(context.Background(), requst) //if err != nil { // log.Fatal("查询库存失败", err) //} //fmt.Println("查询成功,输出:", stockResponse.ProdStock, stockResponse.User, stockResponse.Data) //客户端流 // stream, err := prodClient.UpdateProductStockClientStream(context.Background()) // if err != nil { // log.Fatal("获取流出错", err) // } // rsp := make(chan struct{}, 1) // go prodRequest(stream, rsp) // select { // case <-rsp: // recv, err := stream.CloseAndRecv() // if err != nil { // log.Fatal(err) // } // stock := recv.ProdStock // fmt.Println("客户端收到响应", stock) // } //服务端流 requst := &service.ProductRequest{ ProdId: 123, } stream, err := prodClient.GetProductStockServerStream(context.Background(), requst) if err != nil { log.Fatal("获取流出错") } for { recv, err := stream.Recv() if err != nil { if err == io.EOF { fmt.Println("客户端数据接受完成") err := stream.CloseSend() if err != nil { log.Fatal(err) } break } log.Fatal(err) } fmt.Println("客户端收到的流:", recv.ProdStock) } } //func prodRequest(stream service.ProService_UpdateProductStockClientStreamClient, rsp chan struct{}) { // count := 0 // for true { // requst := &service.ProductRequest{ // ProdId: 123, // } // err := stream.Send(requst) // if err != nil { // log.Fatal(err) // } // time.Sleep(time.Second) // count++ // if count > 10 { // rsp <- struct{}{} // break // } // } //}
10.4 双向流
双向通信使用双向流,资源消耗过高-》websocket
双向流用于心跳检测还是可以的
定义:互为服务端
product.proto:
syntax ="proto3"; import "pbfile/user.proto"; import "google/protobuf/any.proto"; option go_package="../service"; //option go_package= "path;name"; //path表示生成的go文件的存放地址,name表示生成的go文件所属的报名 package service; message ProductRequest{ int32 prod_id=1; } message Content{ string msg=1; } message ProductResponse{ int32 prod_stock=1; User user=2; google.protobuf.Any data=3; } //定义服务主体 service ProService{ //定义方法 rpc GetProductStock(ProductRequest) returns(ProductResponse); //客户端流定义方法 rpc UpdateProductStockClientStream(stream ProductRequest) returns(ProductResponse); //服务端流定义方法 rpc GetProductStockServerStream(ProductRequest) returns(stream ProductResponse); //双向流定义方法 rpc SayHelloStream(stream ProductRequest) returns(stream ProductResponse); }
执行protoc命令:
protoc --go_out=./service --go-grpc_out=./service pbfile/product.proto
/service/product.go:
package service import ( "context" "fmt" "google.golang.org/protobuf/types/known/anypb" "time" ) //定义一个结构体 type productService struct { } //定义一个结构体实例 var ProductService = &productService{} //实现该变量的ProServiceServer接口方法, func (p *productService) GetProductStock(ctx context.Context, request *ProductRequest) (*ProductResponse, error) { //todo 具体业务逻辑 stock := p.GetStockById(request.ProdId) user := &User{Username: "hyh"} contnet := &Content{Msg: "zjtd msg..."} //转换为any类型 any, _ := anypb.New(contnet) return &ProductResponse{ProdStock: stock, User: user, Data: any}, nil } func (p *productService) UpdateProductStockClientStream(stream ProService_UpdateProductStockClientStreamServer) error { count := 0 for { //源源不断的接受客户端发来的信息 recv, err := stream.Recv() if err != nil { return err } fmt.Println("服务端接受到到流", recv.ProdId, count) count++ if count > 10 { rsp := &ProductResponse{ProdStock: recv.ProdId} err := stream.SendAndClose(rsp) if err != nil { return err } return nil } } } func (p *productService) GetProductStockServerStream(request *ProductRequest, stream ProService_GetProductStockServerStreamServer) error { count := 0 for { rsp := &ProductResponse{ProdStock: request.ProdId} err := stream.Send(rsp) if err != nil { return err } time.Sleep(time.Second) count++ if count > 10 { return nil } } } func (p *productService) SayHelloStream(stream ProService_SayHelloStreamServer) error { for { recv, err := stream.Recv() if err != nil { return nil } fmt.Println("服务端收到客户端端消息", recv.ProdId) time.Sleep(time.Second) rsp := &ProductResponse{ProdStock: recv.ProdId} err = stream.Send(rsp) if err != nil { return nil } } } func (p *productService) GetStockById(id int32) int32 { return id } func (p *productService) mustEmbedUnimplementedProServiceServer() {}
/myproto/grpc_server.go
package main import ( "context" "crypto/tls" "crypto/x509" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "io/ioutil" "log" "net" "zjtd.com/myproto/service" ) func main() { ////添加单向证书 //creds, err2 := credentials.NewServerTLSFromFile("cert/server.pem", "cert/server.key") //if err2 != nil { // log.Fatal("证书生成错误", err2) //} //添加双向证书 cert, err := tls.LoadX509KeyPair("cert/server.pem", "cert/server.key") if err != nil { log.Fatal("证书读取错误", err) } // 创建一个新的、空的 CertPool certPool := x509.NewCertPool() ca, err := ioutil.ReadFile("cert/ca.crt") if err != nil { log.Fatal("ca证书读取错误", err) } // 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用 certPool.AppendCertsFromPEM(ca) // 构建基于 TLS 的 TransportCredentials 选项 creds := credentials.NewTLS(&tls.Config{ // 设置证书链,允许包含一个或多个 Certificates: []tls.Certificate{cert}, // 要求必须校验客户端的证书。可以根据实际情况选用以下参数 ClientAuth: tls.RequireAndVerifyClientCert, // 设置根证书的集合,校验方式使用 ClientAuth 中设定的模式 ClientCAs: certPool, }) //实现token认证,需要合法的用户名和密码 //实现一个拦截器 var AuthInterceptor grpc.UnaryServerInterceptor AuthInterceptor = func( ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { //拦截普通方法请求,验证tokrn err = Auth(ctx) if err != nil { return } return handler(ctx, req) } recServer := grpc.NewServer(grpc.Creds(creds), grpc.UnaryInterceptor(AuthInterceptor)) //new一个服务端 //recServer := grpc.NewServer(grpc.Creds(creds)) //service 是我们自己的包,里面有一个RegisterProServiceServer的方法注册一个服务 //接口实现service.ProductService被注册上了 // service.RegisterProServiceServer(服务端, 实现主体函数的任意实例 interface{}) service.RegisterProServiceServer(recServer, service.ProductService) fmt.Println("认证成功") listener, err := net.Listen("tcp", ":8008") if err != nil { log.Fatalln("启动监听失败", err) } err = recServer.Serve(listener) if err != nil { log.Fatalln("启动服务失败", err) } fmt.Println("启动gRPC服务成功") } func Auth(ctx context.Context) error { //实际上拿到传输的用户名和密码 md, ok := metadata.FromIncomingContext(ctx) if !ok { return fmt.Errorf("missing credentials") } var user string var password string if val, ok := md["user.proto"]; ok { user = val[0] } if val, ok := md["password"]; ok { password = val[0] } if user != "hyh" || password != "123456" { return status.Error(codes.Unauthenticated, "token不合法") } return nil }
/myproto/client/grpc_client.go
package main import ( "context" "crypto/tls" "crypto/x509" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "io/ioutil" "log" "time" "zjtd.com/myproto/client/auth" "zjtd.com/myproto/client/service" ) func main() { ////添加证书 //creds, err2 := credentials.NewClientTLSFromFile("cert/server.pem", "*.zjtd.com") //if err2 != nil { // log.Fatal("证书错误", err2) //} ////无证书方式 //conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(insecure.NewCredentials())) //if err != nil { // log.Fatalln("服务端出错,连接不上!", err) //} //添加双向认证 // 证书认证-双向认证 // 从证书相关文件中读取和解析信息,得到证书公钥、密钥对 cert, _ := tls.LoadX509KeyPair("cert/client.pem", "cert/client.key") // 创建一个新的、空的 CertPool certPool := x509.NewCertPool() ca, _ := ioutil.ReadFile("cert/ca.crt") // 尝试解析所传入的 PEM 编码的证书。如果解析成功会将其加到 CertPool 中,便于后面的使用 certPool.AppendCertsFromPEM(ca) // 构建基于 TLS 的 TransportCredentials 选项 creds := credentials.NewTLS(&tls.Config{ // 设置证书链,允许包含一个或多个 Certificates: []tls.Certificate{cert}, // 要求必须校验客户端的证书。可以根据实际情况选用以下参数 ServerName: "*.zjtd.com", RootCAs: certPool, }) //Token认证 token := &auth.Authentication{ User: "hyh", Password: "123456", } conn, err := grpc.Dial(":8008", grpc.WithTransportCredentials(creds), grpc.WithPerRPCCredentials(token)) if err != nil { log.Fatalln("服务端出错,连接不上!", err) } defer conn.Close() prodClient := service.NewProServiceClient(conn) //requst := &service.ProductRequest{ // ProdId: 123, //} //stockResponse, err := prodClient.GetProductStoGetProductStockck(context.Background(), requst) //if err != nil { // log.Fatal("查询库存失败", err) //} //fmt.Println("查询成功,输出:", stockResponse.ProdStock, stockResponse.User, stockResponse.Data) //客户端流 // stream, err := prodClient.UpdateProductStockClientStream(context.Background()) // if err != nil { // log.Fatal("获取流出错", err) // } // rsp := make(chan struct{}, 1) // go prodRequest(stream, rsp) // select { // case <-rsp: // recv, err := stream.CloseAndRecv() // if err != nil { // log.Fatal(err) // } // stock := recv.ProdStock // fmt.Println("客户端收到响应", stock) // } ////服务端流 //requst := &service.ProductRequest{ // ProdId: 123, //} //stream, err := prodClient.GetProductStockServerStream(context.Background(), requst) //if err != nil { // log.Fatal("获取流出错") //} //for { // recv, err := stream.Recv() // if err != nil { // if err == io.EOF { // fmt.Println("客户端数据接受完成") // err := stream.CloseSend() // if err != nil { // log.Fatal(err) // } // break // } // log.Fatal(err) // } // fmt.Println("客户端收到的流:", recv.ProdStock) //} //双向流 stream, err := prodClient.SayHelloStream(context.Background()) for { requst := &service.ProductRequest{ ProdId: 123, } err = stream.Send(requst) if err != nil { log.Fatal(err) } time.Sleep(time.Second) recv, err := stream.Recv() if err != nil { log.Fatal(err) } fmt.Println("客户端收到流的消息:", recv.ProdStock) } } //func prodRequest(stream service.ProService_UpdateProductStockClientStreamClient, rsp chan struct{}) { // count := 0 // for true { // requst := &service.ProductRequest{ // ProdId: 123, // } // err := stream.Send(requst) // if err != nil { // log.Fatal(err) // } // time.Sleep(time.Second) // count++ // if count > 10 { // rsp <- struct{}{} // break // } // } //}
11、Grpc 拦截器
通常客户端请求到达服务端的时候不会立即进行业务处理,而是进行一些预处理操作,比如监控数据采集(统计 QPS),链路追踪,身份信息校验,必传参数校验等等。gRPC 为此提供了一个拦截器(Interceptor)功能来实现这一系列的操作。按照通信方式可以分为一元拦截器(Unary Interceptor)和流拦截器(Streaming Interceptor),按照应用角色可以分为客户端拦截器(Client-Side Interceptor)和服务端拦截器(Server-Side Interceptor),具体类型如下
// grpc interceptor.go grpc.UnaryClientInterceptor grpc.UnaryServerInterceptor grpc.StreamClientInterceptor grpc.StreamServerInterceptor
1、介绍
grpc服务端和客户端都提供了拦截器interceptor功能,功能类似中间件middleware,很适合在这里处理验证、日志等流程。
grpc针对不同的rpc类型为我们设计了两种类型的拦截器:
- 一元拦截器 UnaryInterceptor,可以拦截一元rpc请求
- 流式拦截器 StreamInterceptor,可以拦截服务端流式rpc、客户端流式、双向流失rpc请求
2、拦截器的声明
2.1、 一元RPC拦截器
func UnaryServerInterceptor( ctx context.Context, // 请求上下文,可以做一些超时处理 req interface{}, // gRPC 请求参数 info *UnaryServerInfo, // gRPC 服务接口信息 handler UnaryHandler, // gRPC 实际调用方法 ) (resp interface{}, err error)
ctx context.Context 请求上下文 req interface{} 用户请求的参数 info *UnaryServerInfo RPC方法的所有信息,如:方法名 handler UnaryHandler 执行RPC的方法本身 type UnaryHandler func(ctx context.Context, req interface{}) (interface{}, error) resp interface{} 执行RPC方法后的响应结果
2.2.、流式RPC拦截器
func StreamServerInterceptor( srv interface{}, // 请求参数 ss ServerStream, // gRPC 服务端流信息 info *StreamServerInfo, // gRPC 服务接口信息 handler StreamHandler // gRPC 实际调用方法 ) error
3、拦截器作用?
我们可以通过拦截器执行以下操作:
- 在传递之前更新原始 gRPC 请求——例如,注入额外的信息,例如 token
- 操纵原始调用者函数的行为,例如绕过调用以便我们可以使用缓存的结果
- 在返回给客户端之前更新响应
接下来我们将实现日志记录拦截器示例。
4、拦截器的实现
4.1、服务端定义一元RPC拦截器
下面给出的代码说明了执行以下操作的一元拦截器:
- 它在 执行RPC 之前记录了请求的时间+请求的方法。
- 请求方式异常后将打印异常信息。
- 请求正常结束时,将记录结束的时间和具体的方法。
我们需要定义一个2.1中说明的 UnaryServerInterceptor 类型的函数,表示它为一元RPC拦截器:
func UnaryLoggerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { // 记录开始时间和请求的方法 log.Println("start:" + time.Now().Format("2006-01-02 15:04:05") + " " + info.FullMethod) // 执行具体rpc方法 resp, err = handler(ctx, req) if err != nil { // 记录异常日志信息 log.Println("error:" + time.Now().Format("2006-01-02 15:04:05") + " " + err.Error()) return resp, err } // 正常结束,记录结束时间和方法 log.Println("end:" + time.Now().Format("2006-01-02 15:04:05") + " " + info.FullMethod) return resp, nil }
4.2 服务器定义流式RPC拦截器
func StreamLoggerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { log.Println(time.Now().Format("2006-01-02 15:04:05") + " " + info.FullMethod) err := handler(srv, ss) if err != nil { log.Println(time.Now().Format("2006-01-02 15:04:05") + " " + err.Error()) return err } log.Println(time.Now().Format("2006-01-02 15:04:05") + " " + info.FullMethod) return nil }
4.3服务端注册拦截器
// 注册一元RPC拦截器 server := grpc.NewServer(grpc.UnaryInterceptor(interceptor.UnaryLoggerInterceptor)) // 注册流式RPC拦截器 server := grpc.NewServer(grpc.StreamInterceptor(interceptor.StreamLoggerInterceptor))
5、如何将多个同类型拦截器串联
grpc中为我们提供了将多个同一类型的拦截器串起来的方法,当然也可以使用第三方框架go-grpc-middleware
// 添加多个一元rpc拦截器 unaryInterceptorChain := grpc.ChainUnaryInterceptor(interceptor.UnaryLoggerInterceptor,interceptor.UnaryLoggerInterceptor) // 添加多个流式rpc拦截器 streamInterceptorChain := grpc.ChainStreamInterceptor(interceptor.StreamLoggerInterceptor,interceptor.StreamLoggerInterceptor) // 注册拦截器 server := grpc.NewServer(unaryInterceptorChain,streamInterceptorChain)
6、拦截器的执行
![](https://uploadfiles.nowcoder.com/files/20220914/5814143_1663155609223/.jpg)
7、客户端拦截器
7.1 定义拦截器
// 定义客户端一元拦截器 func UnaryClientInterceptor( ctx context.Context, // 请求上下文,可以做一些超时处理 method string, // 请求方法 req, reply interface{}, // 请求和响应 cc *ClientConn, // 连接信息 invoker UnaryInvoker, // 调用的 gRPC 方法 opts ...CallOption // gRPC 调用预处理接***后处理接口 ) error // 定义客户端流式拦截器 func StreamClientInterceptor( ctx context.Context, // 请求上下文 desc *StreamDesc, // 调用 gRPC 方法流信息 cc *ClientConn, // 连接信息 method string, // 调用方法 streamer Streamer, // 流对象,通过 desc 初始化 opts ...CallOption // gRPC 调用预处理接***后处理接口 ) (ClientStream, error)
7.2注册拦截器
// 注册一元拦截器 clientUnaryLoggerInterceptor := grpc.WithUnaryInterceptor(ClientUnaryLoggerInterceptor) // 注册流式拦截器 clientStreamLoggerInterceptor := grpc.WithStreamInterceptor(ClientStreamLoggerInterceptor) grpc.Dial("127.0.0.1:8000",grpc.WithInsecure(),clientUnaryLoggerInterceptor,clientStreamLoggerInterceptor)
7.3客户端添加多个拦截器
// 串联拦截器 grpc.WithChainUnaryInterceptor(ClientUnaryLoggerInterceptor) grpc.WithChainStreamInterceptor(ClientStreamLoggerInterceptor)
8、拦截器实例
tree:
. ├── client.go ├── go.mod ├── go.sum ├── pb │ ├── user.pb.go │ └── user_grpc.pb.go ├── server │ └── server.go └── user.proto
user.proto:
syntax = "proto3"; import "google/protobuf/timestamp.proto"; package pb; option go_package = "./pb"; service ToDoService { rpc DoWork (TodoRequest) returns (TodoResponse); } enum Week { Sunday = 0; Monday = 1; Tuesday = 2; Wednesday = 3; Thursday = 4; Friday = 5; Saturday = 6; } message TodoRequest{ string todo = 1; Week week = 2; map <string,string> bookMap = 3; // BookMap map[string]string google.protobuf.Timestamp doneTime = 4; } message TodoResponse { bool done = 1; }
srever/server.go:
package main import ( "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "net" "reflect" pb2 "test/pb" "time" ) //自定义拦截器 // func MyInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { //从context 获取数据 md, ok := metadata.FromIncomingContext(ctx) if !ok { fmt.Println("拦截器 :metadata nil") return nil, status.Error(codes.Unauthenticated, "认证失败[metadata]") } token, ok := md["token"] if !ok { fmt.Println("拦截器 :token nil") return nil, status.Error(codes.Unauthenticated, "认证失败[token]") } servertoken := []string{"123456789"} if !reflect.DeepEqual(servertoken, token) { return nil, status.Error(codes.Unauthenticated, "密码错误[token]") } //fmt.Println("token:", token) now := time.Now() resp, err = handler(ctx, req) //前面的先执行; 下面的最后执行类似gin 中 next() 中间件 fmt.Println("req:", req) lastTime := time.Now().Sub(now) fmt.Println("执行时间:", lastTime.Milliseconds()) return } type TodoInfo struct { } //拦截器handle函数 func (t *TodoInfo) DoWork(ctx context.Context, td_req *pb2.TodoRequest) (*pb2.TodoResponse, error) { //接受context 参数 md, ok := metadata.FromIncomingContext(ctx) if !ok { fmt.Println("metadata false") } for k, v := range md { fmt.Println("k:", k, "== v:", v) } fmt.Println(td_req.Todo) fmt.Println(td_req.Week) fmt.Println(td_req.BookMap) return &pb2.TodoResponse{ Done: true, }, nil } func main() { //注册拦截器 serviceOption := grpc.UnaryInterceptor(MyInterceptor) //实例化grpc grpcServer := grpc.NewServer(serviceOption) //注册服务 pb2.RegisterToDoServiceServer(grpcServer, &TodoInfo{}) listen, err := net.Listen("tcp", "127.0.0.1:8081") if err != nil { panic(err) } err = grpcServer.Serve(listen) if err != nil { println(err) } }
client.go:
package main import ( "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/metadata" pb2 "test/pb" ) //继承PerRPCCredentials 实现认证 //重写GetRequestMetadata 和 RequireTransportSecurity type MyCredentials struct { } func (c *MyCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) { return map[string]string{ "token": "123456897", }, nil } func (c *MyCredentials) RequireTransportSecurity() bool { return false } func main() { //拦截器 //clientInterceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error{ // md2 := metadata.Pairs("token","99663322poiopui") // ctx = metadata.NewOutgoingContext(context.Background(), md2) // // now := time.Now() // err := invoker(ctx, method, req, reply, cc, opts ...) // lastTime := time.Now().Sub(now) // fmt.Println("客户端执行时间:",lastTime.Milliseconds()) // return err //} //opt := grpc.WithUnaryInterceptor(clientInterceptor) //认证 opt := grpc.WithPerRPCCredentials(&MyCredentials{}) conn, err := grpc.Dial("127.0.0.1:8081", grpc.WithInsecure(), opt) defer conn.Close() if err != nil { panic(err) } //实例化 ToDo 服务 client := pb2.NewToDoServiceClient(conn) md1 := metadata.New(map[string]string{ "name": "hello grpc", }) //从过context 传递参数 ctx := metadata.NewOutgoingContext(context.Background(), md1) //调用服务端DoWork方法 rep, err := client.DoWork(ctx, &pb2.TodoRequest{ Todo: "拦截器", Week: pb2.Week_Monday, BookMap: map[string]string{ "age": "20", }, //DoneTime: time.Now(), }) if err != nil { panic(err) } fmt.Println("rep:", rep) }