| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- package config
- import (
- redisqueue "IotAdmin/core/redis-queue"
- "IotAdmin/core/storage"
- "IotAdmin/core/storage/queue"
- "time"
- "github.com/redis/go-redis/v9"
- )
- var QueueConfig = new(Queue)
- type Queue struct {
- Redis *QueueRedis
- Memory *QueueMemory
- NSQ *QueueNSQ `json:"nsq" yaml:"nsq"`
- }
- type QueueRedis struct {
- RedisConnectOptions
- Producer *redisqueue.ProducerOptions
- Consumer *redisqueue.ConsumerOptions
- }
- type QueueMemory struct {
- PoolSize uint
- }
- type QueueNSQ struct {
- NSQOptions
- ChannelPrefix string
- }
- // Empty 空设置
- func (e Queue) Empty() bool {
- return e.Memory == nil && e.Redis == nil && e.NSQ == nil
- }
- // Setup 启用顺序 redis > 其他 > memory
- func (e Queue) Setup() (storage.AdapterQueue, error) {
- if e.Redis != nil {
- e.Redis.Consumer.ReclaimInterval = e.Redis.Consumer.ReclaimInterval * time.Second
- e.Redis.Consumer.BlockingTimeout = e.Redis.Consumer.BlockingTimeout * time.Second
- e.Redis.Consumer.VisibilityTimeout = e.Redis.Consumer.VisibilityTimeout * time.Second
- client := GetRedisClient()
- if client == nil {
- options, err := e.Redis.RedisConnectOptions.GetRedisOptions()
- if err != nil {
- return nil, err
- }
- client = redis.NewClient(options)
- _redis = client
- }
- universalClient := newUniversalClient(client)
- e.Redis.Producer.RedisClient = universalClient
- e.Redis.Consumer.RedisClient = universalClient
- return queue.NewRedis(e.Redis.Producer, e.Redis.Consumer)
- }
- if e.NSQ != nil {
- cfg, err := e.NSQ.GetNSQOptions()
- if err != nil {
- return nil, err
- }
- return queue.NewNSQ(e.NSQ.Addresses, cfg, e.NSQ.ChannelPrefix)
- }
- return queue.NewMemory(e.Memory.PoolSize), nil
- }
- func newUniversalClient(client *redis.Client) redis.UniversalClient {
- addr := make([]string, 1)
- addr = append(addr, client.Options().Addr)
- universalClient := redis.NewUniversalClient(&redis.UniversalOptions{
- Addrs: addr,
- Username: client.Options().Username,
- Password: client.Options().Password,
- DB: client.Options().DB,
- PoolSize: client.Options().PoolSize,
- MaxRetries: client.Options().MaxRetries,
- TLSConfig: client.Options().TLSConfig,
- })
- return universalClient
- }
|