optionNsq.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package config
  2. import (
  3. "time"
  4. "github.com/nsqio/go-nsq"
  5. )
  6. type NSQOptions struct {
  7. DialTimeout time.Duration `opt:"dial_timeout" default:"1s"`
  8. // Deadlines for network reads and writes
  9. ReadTimeout time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s"`
  10. WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s"`
  11. // Addresses is the local address to use when dialing an nsqd.
  12. Addresses []string `opt:"addresses"`
  13. // Duration between polling lookupd for new producers, and fractional jitter to add to
  14. // the lookupd pool loop. this helps evenly distribute requests even if multiple consumers
  15. // restart at the same time
  16. //
  17. // NOTE: when not using nsqlookupd, LookupdPollInterval represents the duration of time between
  18. // reconnection attempts
  19. LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"`
  20. LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`
  21. // Maximum duration when REQueueing (for doubling of deferred requeue)
  22. MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
  23. DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"`
  24. // Maximum amount of time to backoff when processing fails 0 == no backoff
  25. MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`
  26. // Unit of time for calculating consumer backoff
  27. BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`
  28. // Maximum number of times this consumer will attempt to process a message before giving up
  29. MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`
  30. // Duration to wait for a message from an nsqd when in a state where RDY
  31. // counts are re-distributed (e.g. max_in_flight < num_producers)
  32. LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`
  33. // Duration to wait until redistributing RDY for an nsqd regardless of LowRdyIdleTimeout
  34. LowRdyTimeout time.Duration `opt:"low_rdy_timeout" min:"1s" max:"5m" default:"30s"`
  35. // Duration between redistributing max-in-flight to connections
  36. RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"`
  37. // Identifiers sent to nsqd representing this client
  38. // UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>")
  39. ClientID string `opt:"client_id"` // (defaults: short hostname)
  40. Hostname string `opt:"hostname"`
  41. UserAgent string `opt:"user_agent"`
  42. // Duration of time between heartbeats. This must be less than ReadTimeout
  43. HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"`
  44. // Integer percentage to sample the channel (requires nsqd 0.2.25+)
  45. SampleRate int32 `opt:"sample_rate" min:"0" max:"99"`
  46. Tls *Tls `yaml:"tls" json:"tls"`
  47. // Compression Settings
  48. Deflate bool `opt:"deflate"`
  49. DeflateLevel int `opt:"deflate_level" min:"1" max:"9" default:"6"`
  50. Snappy bool `opt:"snappy"`
  51. // Size of the buffer (in bytes) used by nsqd for buffering writes to this connection
  52. OutputBufferSize int64 `opt:"output_buffer_size" default:"16384"`
  53. // Timeout used by nsqd before flushing buffered writes (set to 0 to disable).
  54. //
  55. // WARNING: configuring clients with an extremely low
  56. // (< 25ms) output_buffer_timeout has a significant effect
  57. // on nsqd CPU usage (particularly with > 50 clients connected).
  58. OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms"`
  59. // Maximum number of messages to allow in flight (concurrency knob)
  60. MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"`
  61. // The server-side message timeout for messages delivered to this client
  62. MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`
  63. // secret for nsqd authentication (requires nsqd 0.2.29+)
  64. AuthSecret string `opt:"auth_secret"`
  65. }
  66. func (e NSQOptions) GetNSQOptions() (*nsq.Config, error) {
  67. cfg := nsq.NewConfig()
  68. var err error
  69. cfg.TlsConfig, err = getTLS(e.Tls)
  70. if err != nil {
  71. return nil, err
  72. }
  73. if e.DialTimeout > 0 {
  74. cfg.DialTimeout = e.DialTimeout * time.Second
  75. }
  76. if e.ReadTimeout > 0 {
  77. cfg.ReadTimeout = e.ReadTimeout * time.Second
  78. }
  79. if e.WriteTimeout > 0 {
  80. cfg.WriteTimeout = e.WriteTimeout * time.Second
  81. }
  82. if e.LookupdPollInterval > 0 {
  83. cfg.LookupdPollInterval = e.LookupdPollInterval * time.Second
  84. }
  85. if e.MaxRequeueDelay > 0 {
  86. cfg.MaxRequeueDelay = e.MaxRequeueDelay * time.Second
  87. }
  88. if e.DefaultRequeueDelay > 0 {
  89. cfg.DefaultRequeueDelay = e.DefaultRequeueDelay * time.Second
  90. }
  91. if e.MaxBackoffDuration > 0 {
  92. cfg.MaxBackoffDuration = e.MaxBackoffDuration * time.Millisecond
  93. }
  94. if e.BackoffMultiplier > 0 {
  95. cfg.BackoffMultiplier = e.BackoffMultiplier * time.Second
  96. }
  97. if e.LowRdyIdleTimeout > 0 {
  98. cfg.LowRdyIdleTimeout = e.LowRdyIdleTimeout * time.Second
  99. }
  100. if e.LowRdyTimeout > 0 {
  101. cfg.LowRdyTimeout = e.LowRdyTimeout * time.Second
  102. }
  103. if e.RDYRedistributeInterval > 0 {
  104. cfg.RDYRedistributeInterval = e.RDYRedistributeInterval * time.Second
  105. }
  106. if e.HeartbeatInterval > 0 {
  107. cfg.HeartbeatInterval = e.HeartbeatInterval * time.Second
  108. }
  109. if e.OutputBufferTimeout > 0 {
  110. cfg.OutputBufferTimeout = e.OutputBufferTimeout * time.Second
  111. }
  112. if e.MsgTimeout > 0 {
  113. cfg.MsgTimeout = e.MsgTimeout * time.Second
  114. }
  115. if e.LookupdPollJitter > 0 {
  116. cfg.LookupdPollJitter = e.LookupdPollJitter
  117. }
  118. cfg.MaxAttempts = e.MaxAttempts
  119. if e.ClientID != "" {
  120. cfg.ClientID = e.ClientID
  121. }
  122. if e.Hostname != "" {
  123. cfg.Hostname = e.Hostname
  124. }
  125. if e.UserAgent != "" {
  126. cfg.UserAgent = e.UserAgent
  127. }
  128. if e.SampleRate > 0 {
  129. cfg.SampleRate = e.SampleRate
  130. }
  131. cfg.Deflate = e.Deflate
  132. if e.DeflateLevel >= 6 && e.DeflateLevel <= 9 {
  133. cfg.DeflateLevel = e.DeflateLevel
  134. }
  135. cfg.Snappy = e.Snappy
  136. if e.OutputBufferSize > 0 {
  137. cfg.OutputBufferSize = e.OutputBufferSize
  138. }
  139. if e.MaxInFlight > 0 {
  140. cfg.MaxInFlight = e.MaxInFlight
  141. }
  142. if e.AuthSecret != "" {
  143. cfg.AuthSecret = e.AuthSecret
  144. }
  145. return cfg, nil
  146. }