producer.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package redisqueue
  2. import (
  3. "context"
  4. "github.com/redis/go-redis/v9"
  5. )
  6. // ProducerOptions provide options to configure the Producer.
  7. type ProducerOptions struct {
  8. // StreamMaxLength sets the MAXLEN option when calling XADD. This creates a
  9. // capped stream to prevent the stream from taking up memory indefinitely.
  10. // It's important to note though that this isn't the maximum number of
  11. // _completed_ messages, but the maximum number of _total_ messages. This
  12. // means that if all consumers are down, but producers are still enqueuing,
  13. // and the maximum is reached, unprocessed message will start to be dropped.
  14. // So ideally, you'll set this number to be as high as you can makee it.
  15. // More info here: https://redis.io/commands/xadd#capped-streams.
  16. StreamMaxLength int64
  17. // ApproximateMaxLength determines whether to use the ~ with the MAXLEN
  18. // option. This allows the stream trimming to done in a more efficient
  19. // manner. More info here: https://redis.io/commands/xadd#capped-streams.
  20. ApproximateMaxLength bool
  21. // RedisClient supersedes the RedisOptions field, and allows you to inject
  22. // an already-made Redis Client for use in the consumer. This may be either
  23. // the standard client or a cluster client.
  24. RedisClient redis.UniversalClient
  25. // RedisOptions allows you to configure the underlying Redis connection.
  26. // More info here:
  27. // https://pkg.go.dev/github.com/go-redis/redis/v7?tab=doc#Options.
  28. //
  29. // This field is used if RedisClient field is nil.
  30. RedisOptions *RedisOptions
  31. }
  32. // Producer adds a convenient wrapper around enqueuing messages that will be
  33. // processed later by a Consumer.
  34. type Producer struct {
  35. options *ProducerOptions
  36. redis redis.UniversalClient
  37. }
  38. var defaultProducerOptions = &ProducerOptions{
  39. StreamMaxLength: 1000,
  40. ApproximateMaxLength: true,
  41. }
  42. // NewProducer uses a default set of options to create a Producer. It sets
  43. // StreamMaxLength to 1000 and ApproximateMaxLength to true. In most production
  44. // environments, you'll want to use NewProducerWithOptions.
  45. func NewProducer() (*Producer, error) {
  46. return NewProducerWithOptions(defaultProducerOptions)
  47. }
  48. // NewProducerWithOptions creates a Producer using custom ProducerOptions.
  49. func NewProducerWithOptions(options *ProducerOptions) (*Producer, error) {
  50. var r redis.UniversalClient
  51. if options.RedisClient != nil {
  52. r = options.RedisClient
  53. } else {
  54. r = newRedisClient(options.RedisOptions)
  55. }
  56. if err := redisPreflightChecks(r); err != nil {
  57. return nil, err
  58. }
  59. return &Producer{
  60. options: options,
  61. redis: r,
  62. }, nil
  63. }
  64. // Enqueue takes in a pointer to Message and enqueues it into the stream set at
  65. // msg.Stream. While you can set msg.ID, unless you know what you're doing, you
  66. // should let Redis auto-generate the ID. If an ID is auto-generated, it will be
  67. // set on msg.ID for your reference. msg.Values is also required.
  68. func (p *Producer) Enqueue(msg *Message) error {
  69. args := &redis.XAddArgs{
  70. ID: msg.ID,
  71. Stream: msg.Stream,
  72. Values: msg.Values,
  73. }
  74. //if p.options.ApproximateMaxLength {
  75. // args.MaxLenApprox = p.options.StreamMaxLength
  76. //} else {
  77. args.MaxLen = p.options.StreamMaxLength
  78. //}
  79. id, err := p.redis.XAdd(context.TODO(), args).Result()
  80. if err != nil {
  81. return err
  82. }
  83. msg.ID = id
  84. return nil
  85. }