options.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package grpc
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "math"
  6. "time"
  7. pbErr "IotAdmin/core/errors"
  8. log "IotAdmin/core/logger"
  9. "IotAdmin/core/server/grpc/interceptors/logging"
  10. requesttag "IotAdmin/core/server/grpc/interceptors/request_tag"
  11. recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
  12. ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
  13. opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
  14. prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
  15. "google.golang.org/grpc"
  16. )
  17. const (
  18. infinity = time.Duration(math.MaxInt64)
  19. defaultMaxMsgSize = 4 << 20
  20. defaultMaxConcurrentStreams = 100000
  21. defaultKeepAliveTime = 30 * time.Second
  22. defaultConnectionIdleTime = 10 * time.Second
  23. defaultMaxServerConnectionAgeGrace = 10 * time.Second
  24. defaultMiniKeepAliveTimeRate = 2
  25. )
  26. type Option func(*Options)
  27. type Options struct {
  28. id string
  29. domain string
  30. addr string
  31. tls *tls.Config
  32. keepAlive time.Duration
  33. timeout time.Duration
  34. maxConnectionAge time.Duration
  35. maxConnectionAgeGrace time.Duration
  36. maxConcurrentStreams int
  37. maxMsgSize int
  38. unaryServerInterceptors []grpc.UnaryServerInterceptor
  39. streamServerInterceptors []grpc.StreamServerInterceptor
  40. ctx context.Context
  41. }
  42. func WithContextOption(c context.Context) Option {
  43. return func(o *Options) {
  44. o.ctx = c
  45. }
  46. }
  47. func WithIDOption(s string) Option {
  48. return func(o *Options) {
  49. o.id = s
  50. }
  51. }
  52. func WithDomainOption(s string) Option {
  53. return func(o *Options) {
  54. o.domain = s
  55. }
  56. }
  57. func WithAddrOption(s string) Option {
  58. return func(o *Options) {
  59. o.addr = s
  60. }
  61. }
  62. func WithTlsOption(tls *tls.Config) Option {
  63. return func(o *Options) {
  64. o.tls = tls
  65. }
  66. }
  67. func WithKeepAliveOption(t time.Duration) Option {
  68. return func(o *Options) {
  69. o.keepAlive = t
  70. }
  71. }
  72. func WithTimeoutOption(t time.Duration) Option {
  73. return func(o *Options) {
  74. o.keepAlive = t
  75. }
  76. }
  77. func WithMaxConnectionAgeOption(t time.Duration) Option {
  78. return func(o *Options) {
  79. o.maxConnectionAge = t
  80. }
  81. }
  82. func WithMaxConnectionAgeGraceOption(t time.Duration) Option {
  83. return func(o *Options) {
  84. o.maxConnectionAgeGrace = t
  85. }
  86. }
  87. func WithMaxConcurrentStreamsOption(i int) Option {
  88. return func(o *Options) {
  89. o.maxConcurrentStreams = i
  90. }
  91. }
  92. func WithMaxMsgSizeOption(i int) Option {
  93. return func(o *Options) {
  94. o.maxMsgSize = i
  95. }
  96. }
  97. func WithUnaryServerInterceptorsOption(u ...grpc.UnaryServerInterceptor) Option {
  98. return func(o *Options) {
  99. if o.unaryServerInterceptors == nil {
  100. o.unaryServerInterceptors = make([]grpc.UnaryServerInterceptor, 0)
  101. }
  102. o.unaryServerInterceptors = append(o.unaryServerInterceptors, u...)
  103. }
  104. }
  105. func WithStreamServerInterceptorsOption(u ...grpc.StreamServerInterceptor) Option {
  106. return func(o *Options) {
  107. if o.streamServerInterceptors == nil {
  108. o.streamServerInterceptors = make([]grpc.StreamServerInterceptor, 0)
  109. }
  110. o.streamServerInterceptors = append(o.streamServerInterceptors, u...)
  111. }
  112. }
  113. func defaultOptions() *Options {
  114. return &Options{
  115. addr: ":0",
  116. keepAlive: defaultKeepAliveTime,
  117. timeout: defaultConnectionIdleTime,
  118. maxConnectionAge: infinity,
  119. maxConnectionAgeGrace: defaultMaxServerConnectionAgeGrace,
  120. maxConcurrentStreams: defaultMaxConcurrentStreams,
  121. maxMsgSize: defaultMaxMsgSize,
  122. unaryServerInterceptors: []grpc.UnaryServerInterceptor{
  123. requesttag.UnaryServerInterceptor(),
  124. ctxtags.UnaryServerInterceptor(),
  125. opentracing.UnaryServerInterceptor(),
  126. logging.UnaryServerInterceptor(),
  127. prometheus.UnaryServerInterceptor,
  128. recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(customRecovery("", ""))),
  129. },
  130. streamServerInterceptors: []grpc.StreamServerInterceptor{
  131. requesttag.StreamServerInterceptor(),
  132. ctxtags.StreamServerInterceptor(),
  133. opentracing.StreamServerInterceptor(),
  134. logging.StreamServerInterceptor(),
  135. prometheus.StreamServerInterceptor,
  136. recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(customRecovery("", ""))),
  137. },
  138. }
  139. }
  140. func customRecovery(id, domain string) recovery.RecoveryHandlerFunc {
  141. return func(p interface{}) (err error) {
  142. log.Errorf("panic triggered: %v", p)
  143. return pbErr.New(id, domain, pbErr.InternalServerError)
  144. }
  145. }