doc.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. /*
  2. Package redis-queue provides a producer and consumer of a queue that uses Redis
  3. streams (https://redis.io/topics/streams-intro).
  4. # Features
  5. The features of this package include:
  6. - A `Producer` struct to make enqueuing messages easy.
  7. - A `Consumer` struct to make processing messages concurrenly.
  8. - Claiming and acknowledging messages if there's no error, so that if a consumer
  9. dies while processing, the message it was working on isn't lost. This
  10. guarantees at least once delivery.
  11. - A "visibility timeout" so that if a message isn't processed in a designated
  12. time frame, it will be be processed by another consumer.
  13. - A max length on the stream so that it doesn't store the messages indefinitely
  14. and run out of memory.
  15. - Graceful handling of Unix signals (`SIGINT` and `SIGTERM`) to let in-flight
  16. messages complete.
  17. - A channel that will surface any errors so you can handle them centrally.
  18. - Graceful handling of panics to avoid crashing the whole process.
  19. - A concurrency setting to control how many goroutines are spawned to process
  20. messages.
  21. - A batch size setting to limit the total messages in flight.
  22. - Support for multiple streams.
  23. # Example
  24. Here's an example of a producer that inserts 1000 messages into a queue:
  25. package main
  26. import (
  27. "fmt"
  28. "github.com/robinjoseph08/redis-queue/v2"
  29. )
  30. func main() {
  31. p, err := redis-queue.NewProducerWithOptions(&redis-queue.ProducerOptions{
  32. StreamMaxLength: 10000,
  33. ApproximateMaxLength: true,
  34. })
  35. if err != nil {
  36. panic(err)
  37. }
  38. for i := 0; i < 1000; i++ {
  39. err := p.Enqueue(&redis-queue.Message{
  40. Stream: "redis-queue:test",
  41. Values: map[string]interface{}{
  42. "index": i,
  43. },
  44. })
  45. if err != nil {
  46. panic(err)
  47. }
  48. if i%100 == 0 {
  49. fmt.Printf("enqueued %d\n", i)
  50. }
  51. }
  52. }
  53. And here's an example of a consumer that reads the messages off of that queue:
  54. package main
  55. import (
  56. "fmt"
  57. "time"
  58. "github.com/robinjoseph08/redis-queue/v2"
  59. )
  60. func main() {
  61. c, err := redis-queue.NewConsumerWithOptions(&redis-queue.ConsumerOptions{
  62. VisibilityTimeout: 60 * time.Second,
  63. BlockingTimeout: 5 * time.Second,
  64. ReclaimInterval: 1 * time.Second,
  65. BufferSize: 100,
  66. Concurrency: 10,
  67. })
  68. if err != nil {
  69. panic(err)
  70. }
  71. c.Register("redis-queue:test", process)
  72. go func() {
  73. for err := range c.Errors {
  74. // handle errors accordingly
  75. fmt.Printf("err: %+v\n", err)
  76. }
  77. }()
  78. fmt.Println("starting")
  79. c.Run()
  80. fmt.Println("stopped")
  81. }
  82. func process(msg *redis-queue.Message) error {
  83. fmt.Printf("processing message: %v\n", msg.Values["index"])
  84. return nil
  85. }
  86. */
  87. package redisqueue