server.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. /*
  2. * @Author: lwnmengjing
  3. * @Date: 2021/6/2 4:26 下午
  4. * @Last Modified by: lwnmengjing
  5. * @Last Modified time: 2021/6/2 4:26 下午
  6. */
  7. package grpc
  8. import (
  9. "context"
  10. "errors"
  11. "fmt"
  12. "net"
  13. "sync"
  14. log "IotAdmin/core/logger"
  15. middleware "github.com/grpc-ecosystem/go-grpc-middleware"
  16. prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  17. "google.golang.org/grpc"
  18. "google.golang.org/grpc/keepalive"
  19. )
  20. type Server struct {
  21. name string
  22. srv *grpc.Server
  23. mux sync.Mutex
  24. started bool
  25. options Options
  26. }
  27. // New grpc server
  28. func New(name string, options ...Option) *Server {
  29. s := &Server{name: name}
  30. s.Options(options...)
  31. s.NewServer()
  32. return s
  33. }
  34. // String string
  35. func (e *Server) String() string {
  36. return e.name
  37. }
  38. func (e *Server) Options(options ...Option) {
  39. e.options = *defaultOptions()
  40. for _, o := range options {
  41. o(&e.options)
  42. }
  43. }
  44. func (e *Server) Server() *grpc.Server {
  45. return e.srv
  46. }
  47. func (e *Server) NewServer() {
  48. grpc.EnableTracing = false
  49. e.srv = grpc.NewServer(e.initGrpcServerOptions()...)
  50. }
  51. func (e *Server) Register(do func(server *Server)) {
  52. do(e)
  53. prometheus.Register(e.srv)
  54. }
  55. func (e *Server) initGrpcServerOptions() []grpc.ServerOption {
  56. return []grpc.ServerOption{
  57. grpc.UnaryInterceptor(middleware.ChainUnaryServer(e.options.unaryServerInterceptors...)),
  58. grpc.StreamInterceptor(middleware.ChainStreamServer(e.options.streamServerInterceptors...)),
  59. grpc.MaxConcurrentStreams(uint32(e.options.maxConcurrentStreams)),
  60. grpc.MaxRecvMsgSize(e.options.maxMsgSize),
  61. grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
  62. MinTime: e.options.keepAlive / defaultMiniKeepAliveTimeRate,
  63. }),
  64. grpc.KeepaliveParams(keepalive.ServerParameters{
  65. Time: e.options.keepAlive,
  66. Timeout: e.options.timeout,
  67. MaxConnectionAge: e.options.maxConnectionAge,
  68. MaxConnectionAgeGrace: e.options.maxConnectionAgeGrace,
  69. }),
  70. }
  71. }
  72. func (e *Server) Start(ctx context.Context) error {
  73. e.mux.Lock()
  74. defer e.mux.Unlock()
  75. if e.started {
  76. return errors.New("gRPC Server was started more than once. " +
  77. "This is likely to be caused by being added to a manager multiple times")
  78. }
  79. // Set the internal context
  80. if e.options.ctx != nil {
  81. ctx = e.options.ctx
  82. }
  83. ts, err := net.Listen("tcp", e.options.addr)
  84. if err != nil {
  85. return fmt.Errorf("gRPC Server listening on %s failed: %w", e.options.addr, err)
  86. }
  87. log.Infof("gRPC Server listening on %s", ts.Addr().String())
  88. go func() {
  89. if err = e.srv.Serve(ts); err != nil {
  90. log.Errorf("gRPC Server start error: %s", err.Error())
  91. }
  92. }()
  93. e.started = true
  94. <-ctx.Done()
  95. return e.Shutdown(ctx)
  96. }
  97. func (e *Server) Attempt() bool {
  98. return !e.started
  99. }
  100. func (e *Server) Shutdown(ctx context.Context) error {
  101. <-ctx.Done()
  102. log.Infof("gRPC Server will be shutdown gracefully")
  103. e.srv.GracefulStop()
  104. return nil
  105. }