nsq.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package queue
  2. import (
  3. "IotAdmin/core/storage"
  4. json "github.com/json-iterator/go"
  5. "github.com/nsqio/go-nsq"
  6. )
  7. type NSQ struct {
  8. addresses []string
  9. cfg *nsq.Config
  10. producer *nsq.Producer
  11. consumer *nsq.Consumer
  12. channelPrefix string
  13. }
  14. // NewNSQ nsq模式 只能监听一个channel
  15. func NewNSQ(addresses []string, cfg *nsq.Config, channelPrefix string) (*NSQ, error) {
  16. n := &NSQ{
  17. addresses: addresses,
  18. cfg: cfg,
  19. channelPrefix: channelPrefix,
  20. }
  21. var err error
  22. n.producer, err = n.newProducer()
  23. return n, err
  24. }
  25. // String 字符串类型
  26. func (*NSQ) String() string {
  27. return "nsq"
  28. }
  29. // switchAddress ⚠️生产环境至少配置三个节点
  30. func (e *NSQ) switchAddress() {
  31. if len(e.addresses) > 1 {
  32. e.addresses[0], e.addresses[len(e.addresses)-1] =
  33. e.addresses[1],
  34. e.addresses[0]
  35. }
  36. }
  37. func (e *NSQ) newProducer() (*nsq.Producer, error) {
  38. if e.cfg == nil {
  39. e.cfg = nsq.NewConfig()
  40. }
  41. return nsq.NewProducer(e.addresses[0], e.cfg)
  42. }
  43. func (e *NSQ) newConsumer(topic string, h nsq.Handler) (err error) {
  44. if e.cfg == nil {
  45. e.cfg = nsq.NewConfig()
  46. }
  47. if e.consumer == nil {
  48. e.consumer, err = nsq.NewConsumer(topic, e.channelPrefix+topic, e.cfg)
  49. if err != nil {
  50. return err
  51. }
  52. }
  53. e.consumer.AddHandler(h)
  54. err = e.consumer.ConnectToNSQDs(e.addresses)
  55. return err
  56. }
  57. // Append 消息入生产者
  58. func (e *NSQ) Append(message storage.Message) error {
  59. rb, err := json.Marshal(message.GetValues())
  60. if err != nil {
  61. return err
  62. }
  63. return e.producer.Publish(message.GetStream(), rb)
  64. }
  65. // Register 监听消费者
  66. func (e *NSQ) Register(name string, f storage.ConsumerFunc) {
  67. h := &nsqConsumerHandler{f}
  68. err := e.newConsumer(name, h)
  69. if err != nil {
  70. //目前不支持动态注册
  71. panic(err)
  72. }
  73. }
  74. func (e *NSQ) Run() {
  75. }
  76. func (e *NSQ) Shutdown() {
  77. if e.producer != nil {
  78. e.producer.Stop()
  79. }
  80. if e.consumer != nil {
  81. e.consumer.Stop()
  82. }
  83. }
  84. type nsqConsumerHandler struct {
  85. f storage.ConsumerFunc
  86. }
  87. func (e nsqConsumerHandler) HandleMessage(message *nsq.Message) error {
  88. m := new(Message)
  89. data := make(map[string]interface{})
  90. err := json.Unmarshal(message.Body, &data)
  91. if err != nil {
  92. return err
  93. }
  94. m.SetValues(data)
  95. return e.f(m)
  96. }