package config import ( redisqueue "IotAdmin/core/redis-queue" "IotAdmin/core/storage" "IotAdmin/core/storage/queue" "time" "github.com/redis/go-redis/v9" ) var QueueConfig = new(Queue) type Queue struct { Redis *QueueRedis Memory *QueueMemory NSQ *QueueNSQ `json:"nsq" yaml:"nsq"` } type QueueRedis struct { RedisConnectOptions Producer *redisqueue.ProducerOptions Consumer *redisqueue.ConsumerOptions } type QueueMemory struct { PoolSize uint } type QueueNSQ struct { NSQOptions ChannelPrefix string } // Empty 空设置 func (e Queue) Empty() bool { return e.Memory == nil && e.Redis == nil && e.NSQ == nil } // Setup 启用顺序 redis > 其他 > memory func (e Queue) Setup() (storage.AdapterQueue, error) { if e.Redis != nil { e.Redis.Consumer.ReclaimInterval = e.Redis.Consumer.ReclaimInterval * time.Second e.Redis.Consumer.BlockingTimeout = e.Redis.Consumer.BlockingTimeout * time.Second e.Redis.Consumer.VisibilityTimeout = e.Redis.Consumer.VisibilityTimeout * time.Second client := GetRedisClient() if client == nil { options, err := e.Redis.RedisConnectOptions.GetRedisOptions() if err != nil { return nil, err } client = redis.NewClient(options) _redis = client } universalClient := newUniversalClient(client) e.Redis.Producer.RedisClient = universalClient e.Redis.Consumer.RedisClient = universalClient return queue.NewRedis(e.Redis.Producer, e.Redis.Consumer) } if e.NSQ != nil { cfg, err := e.NSQ.GetNSQOptions() if err != nil { return nil, err } return queue.NewNSQ(e.NSQ.Addresses, cfg, e.NSQ.ChannelPrefix) } return queue.NewMemory(e.Memory.PoolSize), nil } func newUniversalClient(client *redis.Client) redis.UniversalClient { addr := make([]string, 1) addr = append(addr, client.Options().Addr) universalClient := redis.NewUniversalClient(&redis.UniversalOptions{ Addrs: addr, Username: client.Options().Username, Password: client.Options().Password, DB: client.Options().DB, PoolSize: client.Options().PoolSize, MaxRetries: client.Options().MaxRetries, TLSConfig: client.Options().TLSConfig, }) return universalClient }