server.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package server
  2. import (
  3. "IotAdmin/core/logger"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "sync"
  8. )
  9. type Server struct {
  10. services map[string]Runnable
  11. mutex sync.Mutex
  12. errChan chan error
  13. waitForRunnable sync.WaitGroup
  14. internalCtx context.Context
  15. internalCancel context.CancelFunc
  16. internalProceduresStop chan struct{}
  17. shutdownCtx context.Context
  18. shutdownCancel context.CancelFunc
  19. logger *logger.Helper
  20. opts options
  21. }
  22. // New 实例化
  23. func New(opts ...Option) Manager {
  24. s := &Server{
  25. services: make(map[string]Runnable),
  26. errChan: make(chan error),
  27. internalProceduresStop: make(chan struct{}),
  28. }
  29. s.opts = setDefaultOptions()
  30. for i := range opts {
  31. opts[i](&s.opts)
  32. }
  33. return s
  34. }
  35. // Add runnable
  36. func (e *Server) Add(r ...Runnable) {
  37. if e.services == nil {
  38. e.services = make(map[string]Runnable)
  39. }
  40. for i := range r {
  41. e.services[r[i].String()] = r[i]
  42. }
  43. }
  44. // Start runnable
  45. func (e *Server) Start(ctx context.Context) (err error) {
  46. e.mutex.Lock()
  47. defer e.mutex.Unlock()
  48. e.internalCtx, e.internalCancel = context.WithCancel(ctx)
  49. stopComplete := make(chan struct{})
  50. defer close(stopComplete)
  51. defer func() {
  52. stopErr := e.engageStopProcedure(stopComplete)
  53. if stopErr != nil {
  54. if err != nil {
  55. err = fmt.Errorf("%s, %w", stopErr.Error(), err)
  56. } else {
  57. err = stopErr
  58. }
  59. }
  60. }()
  61. e.errChan = make(chan error)
  62. for k := range e.services {
  63. if !e.services[k].Attempt() {
  64. //先判断是否可以启动
  65. return errors.New("can't accept new runnable as stop procedure is already engaged")
  66. }
  67. }
  68. //按顺序启动
  69. for k := range e.services {
  70. e.startRunnable(e.services[k])
  71. }
  72. e.waitForRunnable.Wait()
  73. select {
  74. case <-ctx.Done():
  75. return nil
  76. case err := <-e.errChan:
  77. return err
  78. }
  79. }
  80. func (e *Server) startRunnable(r Runnable) {
  81. e.waitForRunnable.Add(1)
  82. go func() {
  83. defer e.waitForRunnable.Done()
  84. if err := r.Start(e.internalCtx); err != nil {
  85. e.errChan <- err
  86. }
  87. }()
  88. }
  89. func (e *Server) engageStopProcedure(stopComplete <-chan struct{}) error {
  90. var shutdownCancel context.CancelFunc
  91. if e.opts.gracefulShutdownTimeout > 0 {
  92. e.shutdownCtx, shutdownCancel = context.WithTimeout(
  93. context.Background(), e.opts.gracefulShutdownTimeout)
  94. } else {
  95. e.shutdownCtx, shutdownCancel = context.WithCancel(context.Background())
  96. }
  97. defer shutdownCancel()
  98. close(e.internalProceduresStop)
  99. e.internalCancel()
  100. go func() {
  101. for {
  102. select {
  103. case err, ok := <-e.errChan:
  104. if ok {
  105. e.logger.Error(err, "error received after stop sequence was engaged")
  106. }
  107. case <-stopComplete:
  108. return
  109. }
  110. }
  111. }()
  112. if e.opts.gracefulShutdownTimeout == 0 {
  113. return nil
  114. }
  115. e.mutex.Lock()
  116. defer e.mutex.Unlock()
  117. return e.waitForRunnableToEnd(shutdownCancel)
  118. }
  119. func (e *Server) waitForRunnableToEnd(shutdownCancel context.CancelFunc) error {
  120. go func() {
  121. e.waitForRunnable.Wait()
  122. shutdownCancel()
  123. }()
  124. <-e.shutdownCtx.Done()
  125. if err := e.shutdownCtx.Err(); err != nil && err != context.Canceled {
  126. return fmt.Errorf(
  127. "failed waiting for all runnables to end within grace period of %s: %w",
  128. e.opts.gracefulShutdownTimeout, err)
  129. }
  130. return nil
  131. }