redis.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package queue
  2. import (
  3. redisqueue "IotAdmin/core/redis-queue"
  4. "IotAdmin/core/storage"
  5. "github.com/redis/go-redis/v9"
  6. )
  7. type Redis struct {
  8. client *redis.Client
  9. consumer *redisqueue.Consumer
  10. producer *redisqueue.Producer
  11. }
  12. // NewRedis redis模式
  13. func NewRedis(
  14. producerOptions *redisqueue.ProducerOptions,
  15. consumerOptions *redisqueue.ConsumerOptions,
  16. ) (*Redis, error) {
  17. var err error
  18. r := &Redis{}
  19. r.producer, err = r.newProducer(producerOptions)
  20. if err != nil {
  21. return nil, err
  22. }
  23. r.consumer, err = r.newConsumer(consumerOptions)
  24. if err != nil {
  25. return nil, err
  26. }
  27. return r, nil
  28. }
  29. func (*Redis) String() string {
  30. return "redis"
  31. }
  32. func (r *Redis) newConsumer(options *redisqueue.ConsumerOptions) (*redisqueue.Consumer, error) {
  33. if options == nil {
  34. options = &redisqueue.ConsumerOptions{}
  35. }
  36. return redisqueue.NewConsumerWithOptions(options)
  37. }
  38. func (r *Redis) newProducer(options *redisqueue.ProducerOptions) (*redisqueue.Producer, error) {
  39. if options == nil {
  40. options = &redisqueue.ProducerOptions{}
  41. }
  42. return redisqueue.NewProducerWithOptions(options)
  43. }
  44. func (r *Redis) Append(message storage.Message) error {
  45. err := r.producer.Enqueue(&redisqueue.Message{
  46. ID: message.GetID(),
  47. Stream: message.GetStream(),
  48. Values: message.GetValues(),
  49. })
  50. return err
  51. }
  52. func (r *Redis) Register(name string, f storage.ConsumerFunc) {
  53. r.consumer.Register(name, func(message *redisqueue.Message) error {
  54. m := new(Message)
  55. m.SetValues(message.Values)
  56. m.SetStream(message.Stream)
  57. m.SetID(message.ID)
  58. return f(m)
  59. })
  60. }
  61. func (r *Redis) Run() {
  62. r.consumer.Run()
  63. }
  64. func (r *Redis) Shutdown() {
  65. r.consumer.Shutdown()
  66. }