| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- package grpc
- import (
- "context"
- "crypto/tls"
- "math"
- "time"
- pbErr "IotAdmin/core/errors"
- log "IotAdmin/core/logger"
- "IotAdmin/core/server/grpc/interceptors/logging"
- requesttag "IotAdmin/core/server/grpc/interceptors/request_tag"
- recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
- ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
- opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
- prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
- "google.golang.org/grpc"
- )
- const (
- infinity = time.Duration(math.MaxInt64)
- defaultMaxMsgSize = 4 << 20
- defaultMaxConcurrentStreams = 100000
- defaultKeepAliveTime = 30 * time.Second
- defaultConnectionIdleTime = 10 * time.Second
- defaultMaxServerConnectionAgeGrace = 10 * time.Second
- defaultMiniKeepAliveTimeRate = 2
- )
- type Option func(*Options)
- type Options struct {
- id string
- domain string
- addr string
- tls *tls.Config
- keepAlive time.Duration
- timeout time.Duration
- maxConnectionAge time.Duration
- maxConnectionAgeGrace time.Duration
- maxConcurrentStreams int
- maxMsgSize int
- unaryServerInterceptors []grpc.UnaryServerInterceptor
- streamServerInterceptors []grpc.StreamServerInterceptor
- ctx context.Context
- }
- func WithContextOption(c context.Context) Option {
- return func(o *Options) {
- o.ctx = c
- }
- }
- func WithIDOption(s string) Option {
- return func(o *Options) {
- o.id = s
- }
- }
- func WithDomainOption(s string) Option {
- return func(o *Options) {
- o.domain = s
- }
- }
- func WithAddrOption(s string) Option {
- return func(o *Options) {
- o.addr = s
- }
- }
- func WithTlsOption(tls *tls.Config) Option {
- return func(o *Options) {
- o.tls = tls
- }
- }
- func WithKeepAliveOption(t time.Duration) Option {
- return func(o *Options) {
- o.keepAlive = t
- }
- }
- func WithTimeoutOption(t time.Duration) Option {
- return func(o *Options) {
- o.keepAlive = t
- }
- }
- func WithMaxConnectionAgeOption(t time.Duration) Option {
- return func(o *Options) {
- o.maxConnectionAge = t
- }
- }
- func WithMaxConnectionAgeGraceOption(t time.Duration) Option {
- return func(o *Options) {
- o.maxConnectionAgeGrace = t
- }
- }
- func WithMaxConcurrentStreamsOption(i int) Option {
- return func(o *Options) {
- o.maxConcurrentStreams = i
- }
- }
- func WithMaxMsgSizeOption(i int) Option {
- return func(o *Options) {
- o.maxMsgSize = i
- }
- }
- func WithUnaryServerInterceptorsOption(u ...grpc.UnaryServerInterceptor) Option {
- return func(o *Options) {
- if o.unaryServerInterceptors == nil {
- o.unaryServerInterceptors = make([]grpc.UnaryServerInterceptor, 0)
- }
- o.unaryServerInterceptors = append(o.unaryServerInterceptors, u...)
- }
- }
- func WithStreamServerInterceptorsOption(u ...grpc.StreamServerInterceptor) Option {
- return func(o *Options) {
- if o.streamServerInterceptors == nil {
- o.streamServerInterceptors = make([]grpc.StreamServerInterceptor, 0)
- }
- o.streamServerInterceptors = append(o.streamServerInterceptors, u...)
- }
- }
- func defaultOptions() *Options {
- return &Options{
- addr: ":0",
- keepAlive: defaultKeepAliveTime,
- timeout: defaultConnectionIdleTime,
- maxConnectionAge: infinity,
- maxConnectionAgeGrace: defaultMaxServerConnectionAgeGrace,
- maxConcurrentStreams: defaultMaxConcurrentStreams,
- maxMsgSize: defaultMaxMsgSize,
- unaryServerInterceptors: []grpc.UnaryServerInterceptor{
- requesttag.UnaryServerInterceptor(),
- ctxtags.UnaryServerInterceptor(),
- opentracing.UnaryServerInterceptor(),
- logging.UnaryServerInterceptor(),
- prometheus.UnaryServerInterceptor,
- recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(customRecovery("", ""))),
- },
- streamServerInterceptors: []grpc.StreamServerInterceptor{
- requesttag.StreamServerInterceptor(),
- ctxtags.StreamServerInterceptor(),
- opentracing.StreamServerInterceptor(),
- logging.StreamServerInterceptor(),
- prometheus.StreamServerInterceptor,
- recovery.StreamServerInterceptor(recovery.WithRecoveryHandler(customRecovery("", ""))),
- },
- }
- }
- func customRecovery(id, domain string) recovery.RecoveryHandlerFunc {
- return func(p interface{}) (err error) {
- log.Errorf("panic triggered: %v", p)
- return pbErr.New(id, domain, pbErr.InternalServerError)
- }
- }
|