| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- package queue
- import (
- redisqueue "IotAdmin/core/redis-queue"
- "IotAdmin/core/storage"
- "github.com/redis/go-redis/v9"
- )
- type Redis struct {
- client *redis.Client
- consumer *redisqueue.Consumer
- producer *redisqueue.Producer
- }
- // NewRedis redis模式
- func NewRedis(
- producerOptions *redisqueue.ProducerOptions,
- consumerOptions *redisqueue.ConsumerOptions,
- ) (*Redis, error) {
- var err error
- r := &Redis{}
- r.producer, err = r.newProducer(producerOptions)
- if err != nil {
- return nil, err
- }
- r.consumer, err = r.newConsumer(consumerOptions)
- if err != nil {
- return nil, err
- }
- return r, nil
- }
- func (*Redis) String() string {
- return "redis"
- }
- func (r *Redis) newConsumer(options *redisqueue.ConsumerOptions) (*redisqueue.Consumer, error) {
- if options == nil {
- options = &redisqueue.ConsumerOptions{}
- }
- return redisqueue.NewConsumerWithOptions(options)
- }
- func (r *Redis) newProducer(options *redisqueue.ProducerOptions) (*redisqueue.Producer, error) {
- if options == nil {
- options = &redisqueue.ProducerOptions{}
- }
- return redisqueue.NewProducerWithOptions(options)
- }
- func (r *Redis) Append(message storage.Message) error {
- err := r.producer.Enqueue(&redisqueue.Message{
- ID: message.GetID(),
- Stream: message.GetStream(),
- Values: message.GetValues(),
- })
- return err
- }
- func (r *Redis) Register(name string, f storage.ConsumerFunc) {
- r.consumer.Register(name, func(message *redisqueue.Message) error {
- m := new(Message)
- m.SetValues(message.Values)
- m.SetStream(message.Stream)
- m.SetID(message.ID)
- return f(m)
- })
- }
- func (r *Redis) Run() {
- r.consumer.Run()
- }
- func (r *Redis) Shutdown() {
- r.consumer.Shutdown()
- }
|