-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgrpc.go
143 lines (128 loc) · 3.4 KB
/
grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package istio
import (
"fmt"
"net"
"time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/nilorg/pkg/logger"
"github.com/nilorg/sdk/log"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection"
)
// GrpcServer 服务端
type GrpcServer struct {
serviceName string
address string
server *grpc.Server
Log log.Logger
}
// GetSrv 获取rpc server
func (s *GrpcServer) GetSrv() *grpc.Server {
return s.server
}
func (s *GrpcServer) register() {
// 在gRPC服务器上注册反射服务。
reflection.Register(s.server)
}
// Run ...
func (s *GrpcServer) Run() {
s.register()
lis, err := net.Listen("tcp", s.address)
if err != nil {
s.Log.Errorf("%s grpc server failed to listen: %v", s.serviceName, err)
return
}
if err := s.server.Serve(lis); err != nil {
s.Log.Errorf("%s grpc server failed to serve: %v", s.serviceName, err)
}
}
// Start 启动
func (s *GrpcServer) Start() {
go func() {
s.Run()
}()
}
// Stop ...
func (s *GrpcServer) Stop() {
if s.server == nil {
s.Log.Warningf("stop %s grpc server is nil", s.serviceName)
return
}
s.server.Stop()
}
// NewGrpcServer 创建Grpc服务端
func NewGrpcServer(name string, address string, streamServerInterceptors []grpc.StreamServerInterceptor, unaryServerInterceptors []grpc.UnaryServerInterceptor) *GrpcServer {
var opts []grpc.ServerOption
if len(streamServerInterceptors) > 0 {
opts = append(opts, grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(streamServerInterceptors...)))
}
if len(unaryServerInterceptors) > 0 {
opts = append(opts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unaryServerInterceptors...)))
}
server := grpc.NewServer(opts...)
if logger.Default() == nil {
logger.Init()
}
return &GrpcServer{
serviceName: name,
server: server,
address: address,
Log: logger.Default(),
}
}
// GrpcClient grpc客户端
type GrpcClient struct {
serviceName string
conn *grpc.ClientConn // 连接
Log log.Logger
}
// GetConn 获取客户端连接
func (c *GrpcClient) GetConn() *grpc.ClientConn {
return c.conn
}
// Close 关闭
func (c *GrpcClient) Close() {
if c.conn == nil {
c.Log.Warningf("close %s grpc client is nil", c.serviceName)
return
}
err := c.conn.Close()
if err != nil {
c.Log.Errorf("close %s grpc client: %v", err)
return
}
}
// NewGrpcClient 创建Grpc客户端
func NewGrpcClient(serviceName string, port int, streamClientInterceptors []grpc.StreamClientInterceptor, unaryClientInterceptors []grpc.UnaryClientInterceptor) *GrpcClient {
opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithKeepaliveParams(
keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 100 * time.Millisecond,
PermitWithoutStream: true,
},
),
}
if len(streamClientInterceptors) > 0 {
for _, v := range streamClientInterceptors {
opts = append(opts, grpc.WithStreamInterceptor(v))
}
}
if len(unaryClientInterceptors) > 0 {
for _, v := range unaryClientInterceptors {
opts = append(opts, grpc.WithUnaryInterceptor(v))
}
}
conn, err := grpc.Dial(fmt.Sprintf("%s:%d", serviceName, port), opts...)
if err != nil {
logrus.Errorf("%s grpc client dial error: %v", serviceName, err)
}
return &GrpcClient{
serviceName: serviceName,
conn: conn,
Log: logrus.StandardLogger(),
}
}