service.go 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package grpc
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. log "IotAdmin/core/logger"
  7. "IotAdmin/core/server/grpc/interceptors/logging"
  8. reqtags "IotAdmin/core/server/grpc/interceptors/request_tag"
  9. middleware "github.com/grpc-ecosystem/go-grpc-middleware"
  10. opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
  11. "google.golang.org/grpc"
  12. )
  13. type Service struct {
  14. Connection *grpc.ClientConn
  15. CallTimeout time.Duration
  16. }
  17. func (e *Service) Dial(
  18. endpoint string,
  19. callTimeout time.Duration,
  20. unary ...grpc.UnaryClientInterceptor) (err error) {
  21. log.Infof("configure service with endpoint: %s", endpoint)
  22. ctx, cancel := context.WithTimeout(context.Background(), callTimeout)
  23. defer cancel()
  24. if len(unary) == 0 {
  25. unary = defaultUnaryClientInterceptors()
  26. }
  27. e.Connection, err = grpc.DialContext(ctx,
  28. endpoint,
  29. grpc.WithInsecure(),
  30. grpc.WithStreamInterceptor(middleware.ChainStreamClient(defaultStreamClientInterceptors()...)),
  31. grpc.WithUnaryInterceptor(middleware.ChainUnaryClient(unary...)),
  32. grpc.WithDefaultCallOptions(grpc.WaitForReady(true), grpc.MaxCallRecvMsgSize(defaultMaxMsgSize)),
  33. )
  34. if err != nil {
  35. msg := fmt.Sprintf("connect gRPC service %s failed", endpoint)
  36. log.Errorf(msg, err)
  37. return fmt.Errorf("%w, "+msg, err)
  38. }
  39. return nil
  40. }
  41. func defaultUnaryClientInterceptors() []grpc.UnaryClientInterceptor {
  42. return []grpc.UnaryClientInterceptor{
  43. opentracing.UnaryClientInterceptor(),
  44. logging.UnaryClientInterceptor(),
  45. reqtags.UnaryClientInterceptor(),
  46. }
  47. }
  48. func defaultStreamClientInterceptors() []grpc.StreamClientInterceptor {
  49. return []grpc.StreamClientInterceptor{
  50. opentracing.StreamClientInterceptor(),
  51. logging.StreamClientInterceptor(),
  52. reqtags.StreamClientInterceptor(),
  53. }
  54. }