| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- /*
- Package redis-queue provides a producer and consumer of a queue that uses Redis
- streams (https://redis.io/topics/streams-intro).
- # Features
- The features of this package include:
- - A `Producer` struct to make enqueuing messages easy.
- - A `Consumer` struct to make processing messages concurrenly.
- - Claiming and acknowledging messages if there's no error, so that if a consumer
- dies while processing, the message it was working on isn't lost. This
- guarantees at least once delivery.
- - A "visibility timeout" so that if a message isn't processed in a designated
- time frame, it will be be processed by another consumer.
- - A max length on the stream so that it doesn't store the messages indefinitely
- and run out of memory.
- - Graceful handling of Unix signals (`SIGINT` and `SIGTERM`) to let in-flight
- messages complete.
- - A channel that will surface any errors so you can handle them centrally.
- - Graceful handling of panics to avoid crashing the whole process.
- - A concurrency setting to control how many goroutines are spawned to process
- messages.
- - A batch size setting to limit the total messages in flight.
- - Support for multiple streams.
- # Example
- Here's an example of a producer that inserts 1000 messages into a queue:
- package main
- import (
- "fmt"
- "github.com/robinjoseph08/redis-queue/v2"
- )
- func main() {
- p, err := redis-queue.NewProducerWithOptions(&redis-queue.ProducerOptions{
- StreamMaxLength: 10000,
- ApproximateMaxLength: true,
- })
- if err != nil {
- panic(err)
- }
- for i := 0; i < 1000; i++ {
- err := p.Enqueue(&redis-queue.Message{
- Stream: "redis-queue:test",
- Values: map[string]interface{}{
- "index": i,
- },
- })
- if err != nil {
- panic(err)
- }
- if i%100 == 0 {
- fmt.Printf("enqueued %d\n", i)
- }
- }
- }
- And here's an example of a consumer that reads the messages off of that queue:
- package main
- import (
- "fmt"
- "time"
- "github.com/robinjoseph08/redis-queue/v2"
- )
- func main() {
- c, err := redis-queue.NewConsumerWithOptions(&redis-queue.ConsumerOptions{
- VisibilityTimeout: 60 * time.Second,
- BlockingTimeout: 5 * time.Second,
- ReclaimInterval: 1 * time.Second,
- BufferSize: 100,
- Concurrency: 10,
- })
- if err != nil {
- panic(err)
- }
- c.Register("redis-queue:test", process)
- go func() {
- for err := range c.Errors {
- // handle errors accordingly
- fmt.Printf("err: %+v\n", err)
- }
- }()
- fmt.Println("starting")
- c.Run()
- fmt.Println("stopped")
- }
- func process(msg *redis-queue.Message) error {
- fmt.Printf("processing message: %v\n", msg.Values["index"])
- return nil
- }
- */
- package redisqueue
|