consumer.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. package redisqueue
  2. import (
  3. "context"
  4. "net"
  5. "os"
  6. "sync"
  7. "time"
  8. "github.com/pkg/errors"
  9. "github.com/redis/go-redis/v9"
  10. )
  11. // ConsumerFunc is a type alias for the functions that will be used to handle
  12. // and process Messages.
  13. type ConsumerFunc func(*Message) error
  14. type registeredConsumer struct {
  15. fn ConsumerFunc
  16. id string
  17. }
  18. // ConsumerOptions provide options to configure the Consumer.
  19. type ConsumerOptions struct {
  20. // Name sets the name of this consumer. This will be used when fetching from
  21. // Redis. If empty, the hostname will be used.
  22. Name string
  23. // GroupName sets the name of the consumer group. This will be used when
  24. // coordinating in Redis. If empty, the hostname will be used.
  25. GroupName string
  26. // VisibilityTimeout dictates the maximum amount of time a message should
  27. // stay in pending. If there is a message that has been idle for more than
  28. // this duration, the consumer will attempt to claim it.
  29. VisibilityTimeout time.Duration
  30. // BlockingTimeout designates how long the XREADGROUP call blocks for. If
  31. // this is 0, it will block indefinitely. While this is the most efficient
  32. // from a polling perspective, if this call never times out, there is no
  33. // opportunity to yield back to Go at a regular interval. This means it's
  34. // possible that if no messages are coming in, the consumer cannot
  35. // gracefully shutdown. Instead, it's recommended to set this to 1-5
  36. // seconds, or even longer, depending on how long your application can wait
  37. // to shutdown.
  38. BlockingTimeout time.Duration
  39. // ReclaimInterval is the amount of time in between calls to XPENDING to
  40. // attempt to reclaim jobs that have been idle for more than the visibility
  41. // timeout. A smaller duration will result in more frequent checks. This
  42. // will allow messages to be reaped faster, but it will put more load on
  43. // Redis.
  44. ReclaimInterval time.Duration
  45. // BufferSize determines the size of the channel uses to coordinate the
  46. // processing of the messages. This determines the maximum number of
  47. // in-flight messages.
  48. BufferSize int
  49. // Concurrency dictates how many goroutines to spawn to handle the messages.
  50. Concurrency int
  51. // RedisClient supersedes the RedisOptions field, and allows you to inject
  52. // an already-made Redis Client for use in the consumer. This may be either
  53. // the standard client or a cluster client.
  54. RedisClient redis.UniversalClient
  55. // RedisOptions allows you to configure the underlying Redis connection.
  56. // More info here:
  57. // https://pkg.go.dev/github.com/go-redis/redis/v7?tab=doc#Options.
  58. //
  59. // This field is used if RedisClient field is nil.
  60. RedisOptions *RedisOptions
  61. }
  62. // Consumer adds a convenient wrapper around dequeuing and managing concurrency.
  63. type Consumer struct {
  64. // Errors is a channel that you can receive from to centrally handle any
  65. // errors that may occur either by your ConsumerFuncs or by internal
  66. // processing functions. Because this is an unbuffered channel, you must
  67. // have a listener on it. If you don't parts of the consumer could stop
  68. // functioning when errors occur due to the blocking nature of unbuffered
  69. // channels.
  70. Errors chan error
  71. options *ConsumerOptions
  72. redis redis.UniversalClient
  73. consumers map[string]registeredConsumer
  74. streams []string
  75. queue chan *Message
  76. wg *sync.WaitGroup
  77. stopReclaim chan struct{}
  78. stopPoll chan struct{}
  79. stopWorkers chan struct{}
  80. }
  81. var defaultConsumerOptions = &ConsumerOptions{
  82. VisibilityTimeout: 60 * time.Second,
  83. BlockingTimeout: 5 * time.Second,
  84. ReclaimInterval: 1 * time.Second,
  85. BufferSize: 100,
  86. Concurrency: 10,
  87. }
  88. // NewConsumer uses a default set of options to create a Consumer. It sets Name
  89. // to the hostname, GroupName to "redis-queue", VisibilityTimeout to 60 seconds,
  90. // BufferSize to 100, and Concurrency to 10. In most production environments,
  91. // you'll want to use NewConsumerWithOptions.
  92. func NewConsumer() (*Consumer, error) {
  93. return NewConsumerWithOptions(defaultConsumerOptions)
  94. }
  95. // NewConsumerWithOptions creates a Consumer with custom ConsumerOptions. If
  96. // Name is left empty, it defaults to the hostname; if GroupName is left empty,
  97. // it defaults to "redis-queue"; if BlockingTimeout is 0, it defaults to 5
  98. // seconds; if ReclaimInterval is 0, it defaults to 1 second.
  99. func NewConsumerWithOptions(options *ConsumerOptions) (*Consumer, error) {
  100. hostname, _ := os.Hostname()
  101. if options.Name == "" {
  102. options.Name = hostname
  103. }
  104. if options.GroupName == "" {
  105. options.GroupName = "redis-queue"
  106. }
  107. if options.BlockingTimeout == 0 {
  108. options.BlockingTimeout = 5 * time.Second
  109. }
  110. if options.ReclaimInterval == 0 {
  111. options.ReclaimInterval = 1 * time.Second
  112. }
  113. var r redis.UniversalClient
  114. if options.RedisClient != nil {
  115. r = options.RedisClient
  116. } else {
  117. r = newRedisClient(options.RedisOptions)
  118. }
  119. if err := redisPreflightChecks(r); err != nil {
  120. return nil, err
  121. }
  122. return &Consumer{
  123. Errors: make(chan error),
  124. options: options,
  125. redis: r,
  126. consumers: make(map[string]registeredConsumer),
  127. streams: make([]string, 0),
  128. queue: make(chan *Message, options.BufferSize),
  129. wg: &sync.WaitGroup{},
  130. stopReclaim: make(chan struct{}, 1),
  131. stopPoll: make(chan struct{}, 1),
  132. stopWorkers: make(chan struct{}, options.Concurrency),
  133. }, nil
  134. }
  135. // RegisterWithLastID is the same as Register, except that it also lets you
  136. // specify the oldest message to receive when first creating the consumer group.
  137. // This can be any valid message ID, "0" for all messages in the stream, or "$"
  138. // for only new messages.
  139. //
  140. // If the consumer group already exists the id field is ignored, meaning you'll
  141. // receive unprocessed messages.
  142. func (c *Consumer) RegisterWithLastID(stream string, id string, fn ConsumerFunc) {
  143. if len(id) == 0 {
  144. id = "0"
  145. }
  146. c.consumers[stream] = registeredConsumer{
  147. fn: fn,
  148. id: id,
  149. }
  150. }
  151. // Register takes in a stream name and a ConsumerFunc that will be called when a
  152. // message comes in from that stream. Register must be called at least once
  153. // before Run is called. If the same stream name is passed in twice, the first
  154. // ConsumerFunc is overwritten by the second.
  155. func (c *Consumer) Register(stream string, fn ConsumerFunc) {
  156. c.RegisterWithLastID(stream, "0", fn)
  157. }
  158. // Run starts all of the worker goroutines and starts processing from the
  159. // streams that have been registered with Register. All errors will be sent to
  160. // the Errors channel. If Register was never called, an error will be sent and
  161. // Run will terminate early. The same will happen if an error occurs when
  162. // creating the consumer group in Redis. Run will block until Shutdown is called
  163. // and all of the in-flight messages have been processed.
  164. func (c *Consumer) Run() {
  165. if len(c.consumers) == 0 {
  166. c.Errors <- errors.New("at least one consumer function needs to be registered")
  167. return
  168. }
  169. for stream, consumer := range c.consumers {
  170. c.streams = append(c.streams, stream)
  171. err := c.redis.XGroupCreateMkStream(context.TODO(), stream, c.options.GroupName, consumer.id).Err()
  172. // ignoring the BUSYGROUP error makes this a noop
  173. if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
  174. c.Errors <- errors.Wrap(err, "error creating consumer group")
  175. return
  176. }
  177. }
  178. for i := 0; i < len(c.consumers); i++ {
  179. c.streams = append(c.streams, ">")
  180. }
  181. go c.reclaim()
  182. go c.poll()
  183. stop := newSignalHandler()
  184. go func() {
  185. <-stop
  186. c.Shutdown()
  187. }()
  188. c.wg.Add(c.options.Concurrency)
  189. for i := 0; i < c.options.Concurrency; i++ {
  190. go c.work()
  191. }
  192. c.wg.Wait()
  193. }
  194. // Shutdown stops new messages from being processed and tells the workers to
  195. // wait until all in-flight messages have been processed, and then they exit.
  196. // The order that things stop is 1) the reclaim process (if it's running), 2)
  197. // the polling process, and 3) the worker processes.
  198. func (c *Consumer) Shutdown() {
  199. c.stopReclaim <- struct{}{}
  200. if c.options.VisibilityTimeout == 0 {
  201. c.stopPoll <- struct{}{}
  202. }
  203. }
  204. // reclaim runs in a separate goroutine and checks the list of pending messages
  205. // in every stream. For every message, if it's been idle for longer than the
  206. // VisibilityTimeout, it will attempt to claim that message for this consumer.
  207. // If VisibilityTimeout is 0, this function returns early and no messages are
  208. // reclaimed. It checks the list of pending messages according to
  209. // ReclaimInterval.
  210. func (c *Consumer) reclaim() {
  211. if c.options.VisibilityTimeout == 0 {
  212. return
  213. }
  214. ticker := time.NewTicker(c.options.ReclaimInterval)
  215. for {
  216. select {
  217. case <-c.stopReclaim:
  218. // once the reclaim process has stopped, stop the polling process
  219. c.stopPoll <- struct{}{}
  220. return
  221. case <-ticker.C:
  222. for stream := range c.consumers {
  223. start := "-"
  224. end := "+"
  225. for {
  226. res, err := c.redis.XPendingExt(context.TODO(), &redis.XPendingExtArgs{
  227. Stream: stream,
  228. Group: c.options.GroupName,
  229. Start: start,
  230. End: end,
  231. Count: int64(c.options.BufferSize - len(c.queue)),
  232. }).Result()
  233. if err != nil && err != redis.Nil {
  234. c.Errors <- errors.Wrap(err, "error listing pending messages")
  235. break
  236. }
  237. if len(res) == 0 {
  238. break
  239. }
  240. msgs := make([]string, 0)
  241. for _, r := range res {
  242. if r.Idle >= c.options.VisibilityTimeout {
  243. claimres, err := c.redis.XClaim(context.TODO(), &redis.XClaimArgs{
  244. Stream: stream,
  245. Group: c.options.GroupName,
  246. Consumer: c.options.Name,
  247. MinIdle: c.options.VisibilityTimeout,
  248. Messages: []string{r.ID},
  249. }).Result()
  250. if err != nil && err != redis.Nil {
  251. c.Errors <- errors.Wrapf(err, "error claiming %d message(s)", len(msgs))
  252. break
  253. }
  254. // If the Redis nil error is returned, it means that
  255. // the message no longer exists in the stream.
  256. // However, it is still in a pending state. This
  257. // could happen if a message was claimed by a
  258. // consumer, that consumer died, and the message
  259. // gets deleted (either through a XDEL call or
  260. // through MAXLEN). Since the message no longer
  261. // exists, the only way we can get it out of the
  262. // pending state is to acknowledge it.
  263. if err == redis.Nil {
  264. err = c.redis.XAck(context.TODO(), stream, c.options.GroupName, r.ID).Err()
  265. if err != nil {
  266. c.Errors <- errors.Wrapf(err, "error acknowledging after failed claim for %q stream and %q message", stream, r.ID)
  267. continue
  268. }
  269. }
  270. c.enqueue(stream, claimres)
  271. }
  272. }
  273. newID, err := incrementMessageID(res[len(res)-1].ID)
  274. if err != nil {
  275. c.Errors <- err
  276. break
  277. }
  278. start = newID
  279. }
  280. }
  281. }
  282. }
  283. }
  284. // poll constantly checks the streams using XREADGROUP to see if there are any
  285. // messages for this consumer to process. It blocks for up to 5 seconds instead
  286. // of blocking indefinitely so that it can periodically check to see if Shutdown
  287. // was called.
  288. func (c *Consumer) poll() {
  289. for {
  290. select {
  291. case <-c.stopPoll:
  292. // once the polling has stopped (i.e. there will be no more messages
  293. // put onto c.queue), stop all of the workers
  294. for i := 0; i < c.options.Concurrency; i++ {
  295. c.stopWorkers <- struct{}{}
  296. }
  297. return
  298. default:
  299. res, err := c.redis.XReadGroup(context.TODO(), &redis.XReadGroupArgs{
  300. Group: c.options.GroupName,
  301. Consumer: c.options.Name,
  302. Streams: c.streams,
  303. Count: int64(c.options.BufferSize - len(c.queue)),
  304. Block: c.options.BlockingTimeout,
  305. }).Result()
  306. if err != nil {
  307. if err, ok := err.(net.Error); ok && err.Timeout() {
  308. continue
  309. }
  310. if err == redis.Nil {
  311. continue
  312. }
  313. c.Errors <- errors.Wrap(err, "error reading redis stream")
  314. continue
  315. }
  316. for _, r := range res {
  317. c.enqueue(r.Stream, r.Messages)
  318. }
  319. }
  320. }
  321. }
  322. // enqueue takes a slice of XMessages, creates corresponding Messages, and sends
  323. // them on the centralized channel for worker goroutines to process.
  324. func (c *Consumer) enqueue(stream string, msgs []redis.XMessage) {
  325. for _, m := range msgs {
  326. msg := &Message{
  327. ID: m.ID,
  328. Stream: stream,
  329. Values: m.Values,
  330. }
  331. c.queue <- msg
  332. }
  333. }
  334. // work is called in a separate goroutine. The number of work goroutines is
  335. // determined by Concurreny. Once it gets a message from the centralized
  336. // channel, it calls the corrensponding ConsumerFunc depending on the stream it
  337. // came from. If no error is returned from the ConsumerFunc, the message is
  338. // acknowledged in Redis.
  339. func (c *Consumer) work() {
  340. defer c.wg.Done()
  341. for {
  342. select {
  343. case msg := <-c.queue:
  344. err := c.process(msg)
  345. if err != nil {
  346. c.Errors <- errors.Wrapf(err, "error calling ConsumerFunc for %q stream and %q message", msg.Stream, msg.ID)
  347. continue
  348. }
  349. err = c.redis.XAck(context.TODO(), msg.Stream, c.options.GroupName, msg.ID).Err()
  350. if err != nil {
  351. c.Errors <- errors.Wrapf(err, "error acknowledging after success for %q stream and %q message", msg.Stream, msg.ID)
  352. continue
  353. }
  354. case <-c.stopWorkers:
  355. return
  356. }
  357. }
  358. }
  359. func (c *Consumer) process(msg *Message) (err error) {
  360. defer func() {
  361. if r := recover(); r != nil {
  362. if e, ok := r.(error); ok {
  363. err = errors.Wrap(e, "ConsumerFunc panic")
  364. return
  365. }
  366. err = errors.Errorf("ConsumerFunc panic: %v", r)
  367. }
  368. }()
  369. err = c.consumers[msg.Stream].fn(msg)
  370. return
  371. }