queue.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package config
  2. import (
  3. redisqueue "IotAdmin/core/redis-queue"
  4. "IotAdmin/core/storage"
  5. "IotAdmin/core/storage/queue"
  6. "time"
  7. "github.com/redis/go-redis/v9"
  8. )
  9. var QueueConfig = new(Queue)
  10. type Queue struct {
  11. Redis *QueueRedis
  12. Memory *QueueMemory
  13. NSQ *QueueNSQ `json:"nsq" yaml:"nsq"`
  14. }
  15. type QueueRedis struct {
  16. RedisConnectOptions
  17. Producer *redisqueue.ProducerOptions
  18. Consumer *redisqueue.ConsumerOptions
  19. }
  20. type QueueMemory struct {
  21. PoolSize uint
  22. }
  23. type QueueNSQ struct {
  24. NSQOptions
  25. ChannelPrefix string
  26. }
  27. // Empty 空设置
  28. func (e Queue) Empty() bool {
  29. return e.Memory == nil && e.Redis == nil && e.NSQ == nil
  30. }
  31. // Setup 启用顺序 redis > 其他 > memory
  32. func (e Queue) Setup() (storage.AdapterQueue, error) {
  33. if e.Redis != nil {
  34. e.Redis.Consumer.ReclaimInterval = e.Redis.Consumer.ReclaimInterval * time.Second
  35. e.Redis.Consumer.BlockingTimeout = e.Redis.Consumer.BlockingTimeout * time.Second
  36. e.Redis.Consumer.VisibilityTimeout = e.Redis.Consumer.VisibilityTimeout * time.Second
  37. client := GetRedisClient()
  38. if client == nil {
  39. options, err := e.Redis.RedisConnectOptions.GetRedisOptions()
  40. if err != nil {
  41. return nil, err
  42. }
  43. client = redis.NewClient(options)
  44. _redis = client
  45. }
  46. universalClient := newUniversalClient(client)
  47. e.Redis.Producer.RedisClient = universalClient
  48. e.Redis.Consumer.RedisClient = universalClient
  49. return queue.NewRedis(e.Redis.Producer, e.Redis.Consumer)
  50. }
  51. if e.NSQ != nil {
  52. cfg, err := e.NSQ.GetNSQOptions()
  53. if err != nil {
  54. return nil, err
  55. }
  56. return queue.NewNSQ(e.NSQ.Addresses, cfg, e.NSQ.ChannelPrefix)
  57. }
  58. return queue.NewMemory(e.Memory.PoolSize), nil
  59. }
  60. func newUniversalClient(client *redis.Client) redis.UniversalClient {
  61. addr := make([]string, 1)
  62. addr = append(addr, client.Options().Addr)
  63. universalClient := redis.NewUniversalClient(&redis.UniversalOptions{
  64. Addrs: addr,
  65. Username: client.Options().Username,
  66. Password: client.Options().Password,
  67. DB: client.Options().DB,
  68. PoolSize: client.Options().PoolSize,
  69. MaxRetries: client.Options().MaxRetries,
  70. TLSConfig: client.Options().TLSConfig,
  71. })
  72. return universalClient
  73. }