message.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package queue
  2. import (
  3. redisqueue "IotAdmin/core/redis-queue"
  4. "IotAdmin/core/storage"
  5. "sync"
  6. )
  7. type Message struct {
  8. redisqueue.Message
  9. ErrorCount int
  10. mux sync.RWMutex
  11. }
  12. func (m *Message) GetID() string {
  13. return m.ID
  14. }
  15. func (m *Message) SetID(id string) {
  16. m.ID = id
  17. }
  18. func (m *Message) GetStream() string {
  19. m.mux.Lock()
  20. defer m.mux.Unlock()
  21. return m.Stream
  22. }
  23. func (m *Message) SetStream(stream string) {
  24. m.mux.Lock()
  25. defer m.mux.Unlock()
  26. m.Stream = stream
  27. }
  28. func (m *Message) GetValues() map[string]interface{} {
  29. m.mux.Lock()
  30. defer m.mux.Unlock()
  31. return m.Values
  32. }
  33. func (m *Message) SetValues(values map[string]interface{}) {
  34. m.mux.Lock()
  35. defer m.mux.Unlock()
  36. m.Values = values
  37. }
  38. func (m *Message) GetPrefix() (prefix string) {
  39. m.mux.Lock()
  40. defer m.mux.Unlock()
  41. if m.Values == nil {
  42. return
  43. }
  44. v, _ := m.Values[storage.PrefixKey]
  45. prefix, _ = v.(string)
  46. return
  47. }
  48. func (m *Message) SetPrefix(prefix string) {
  49. m.mux.Lock()
  50. defer m.mux.Unlock()
  51. if m.Values == nil {
  52. m.Values = make(map[string]interface{})
  53. }
  54. m.Values[storage.PrefixKey] = prefix
  55. }
  56. func (m *Message) GetErrorCount() int {
  57. return m.ErrorCount
  58. }
  59. func (m *Message) SetErrorCount(count int) {
  60. m.ErrorCount = count
  61. }