queue.go 886 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. package runtime
  2. import "IotAdmin/core/storage"
  3. type Queue struct {
  4. prefix string
  5. queue storage.AdapterQueue
  6. }
  7. // NewQueue 创建对应上下文队列
  8. func NewQueue(prefix string, queue storage.AdapterQueue) storage.AdapterQueue {
  9. return &Queue{
  10. prefix: prefix,
  11. queue: queue,
  12. }
  13. }
  14. func (e *Queue) String() string {
  15. return e.queue.String()
  16. }
  17. // Register 注册消费者
  18. func (e *Queue) Register(name string, f storage.ConsumerFunc) {
  19. e.queue.Register(name, f)
  20. }
  21. // Append 增加数据到生产者
  22. func (e *Queue) Append(message storage.Message) error {
  23. values := message.GetValues()
  24. if values == nil {
  25. values = make(map[string]interface{})
  26. }
  27. values[storage.PrefixKey] = e.prefix
  28. return e.queue.Append(message)
  29. }
  30. // Run 运行
  31. func (e *Queue) Run() {
  32. e.queue.Run()
  33. }
  34. // Shutdown 停止
  35. func (e *Queue) Shutdown() {
  36. if e.queue != nil {
  37. e.queue.Shutdown()
  38. }
  39. }