default.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. package config
  2. import (
  3. "bytes"
  4. "sync"
  5. "time"
  6. "IotAdmin/core/config/loader"
  7. "IotAdmin/core/config/loader/memory"
  8. "IotAdmin/core/config/reader"
  9. "IotAdmin/core/config/reader/json"
  10. "IotAdmin/core/config/source"
  11. )
  12. type config struct {
  13. exit chan bool
  14. opts Options
  15. sync.RWMutex
  16. // 当前快照
  17. snap *loader.Snapshot
  18. // 当前值
  19. vals reader.Values
  20. }
  21. type watcher struct {
  22. lw loader.Watcher
  23. rd reader.Reader
  24. path []string
  25. value reader.Value
  26. }
  27. func newConfig(opts ...Option) (Config, error) {
  28. var c config
  29. err := c.Init(opts...)
  30. if err != nil {
  31. return nil, err
  32. }
  33. go c.run()
  34. return &c, nil
  35. }
  36. func (c *config) Init(opts ...Option) error {
  37. c.opts = Options{
  38. Reader: json.NewReader(),
  39. }
  40. c.exit = make(chan bool)
  41. for _, o := range opts {
  42. o(&c.opts)
  43. }
  44. // 默认加载程序使用配置的读取器
  45. if c.opts.Loader == nil {
  46. c.opts.Loader = memory.NewLoader(memory.WithReader(c.opts.Reader))
  47. }
  48. err := c.opts.Loader.Load(c.opts.Source...)
  49. if err != nil {
  50. return err
  51. }
  52. c.snap, err = c.opts.Loader.Snapshot()
  53. if err != nil {
  54. return err
  55. }
  56. c.vals, err = c.opts.Reader.Values(c.snap.ChangeSet)
  57. if err != nil {
  58. return err
  59. }
  60. if c.opts.Entity != nil {
  61. _ = c.vals.Scan(c.opts.Entity)
  62. }
  63. return nil
  64. }
  65. func (c *config) Options() Options {
  66. return c.opts
  67. }
  68. // 运行监听配置改变
  69. func (c *config) run() {
  70. watch := func(w loader.Watcher) error {
  71. for {
  72. // 获取变更集
  73. snap, err := w.Next()
  74. if err != nil {
  75. return err
  76. }
  77. c.Lock()
  78. if c.snap.Version >= snap.Version {
  79. c.Unlock()
  80. continue
  81. }
  82. // 保存快照
  83. c.snap = snap
  84. // 设置值
  85. c.vals, _ = c.opts.Reader.Values(snap.ChangeSet)
  86. if c.opts.Entity != nil {
  87. _ = c.vals.Scan(c.opts.Entity)
  88. c.opts.Entity.OnChange()
  89. }
  90. c.Unlock()
  91. }
  92. }
  93. for {
  94. w, err := c.opts.Loader.Watch()
  95. if err != nil {
  96. time.Sleep(time.Second)
  97. continue
  98. }
  99. done := make(chan bool)
  100. // 停止监听
  101. go func() {
  102. select {
  103. case <-done:
  104. case <-c.exit:
  105. }
  106. _ = w.Stop()
  107. }()
  108. // block watch
  109. if err := watch(w); err != nil {
  110. // do something better
  111. time.Sleep(time.Second)
  112. }
  113. // close done chan
  114. close(done)
  115. // if the config is closed exit
  116. select {
  117. case <-c.exit:
  118. return
  119. default:
  120. }
  121. }
  122. }
  123. func (c *config) Map() map[string]interface{} {
  124. c.RLock()
  125. defer c.RUnlock()
  126. return c.vals.Map()
  127. }
  128. func (c *config) Scan(v interface{}) error {
  129. c.RLock()
  130. defer c.RUnlock()
  131. return c.vals.Scan(v)
  132. }
  133. // Sync 同步加载所有源,调用解析器并更新配置
  134. func (c *config) Sync() error {
  135. if err := c.opts.Loader.Sync(); err != nil {
  136. return err
  137. }
  138. snap, err := c.opts.Loader.Snapshot()
  139. if err != nil {
  140. return err
  141. }
  142. c.Lock()
  143. defer c.Unlock()
  144. c.snap = snap
  145. vals, err := c.opts.Reader.Values(snap.ChangeSet)
  146. if err != nil {
  147. return err
  148. }
  149. c.vals = vals
  150. return nil
  151. }
  152. func (c *config) Close() error {
  153. select {
  154. case <-c.exit:
  155. return nil
  156. default:
  157. close(c.exit)
  158. }
  159. return nil
  160. }
  161. func (c *config) Get(path ...string) reader.Value {
  162. c.RLock()
  163. defer c.RUnlock()
  164. // did sync actually work?
  165. if c.vals != nil {
  166. return c.vals.Get(path...)
  167. }
  168. // no value
  169. return newValue()
  170. }
  171. func (c *config) Set(val interface{}, path ...string) {
  172. c.Lock()
  173. defer c.Unlock()
  174. if c.vals != nil {
  175. c.vals.Set(val, path...)
  176. }
  177. return
  178. }
  179. func (c *config) Del(path ...string) {
  180. c.Lock()
  181. defer c.Unlock()
  182. if c.vals != nil {
  183. c.vals.Del(path...)
  184. }
  185. return
  186. }
  187. func (c *config) Bytes() []byte {
  188. c.RLock()
  189. defer c.RUnlock()
  190. if c.vals == nil {
  191. return []byte{}
  192. }
  193. return c.vals.Bytes()
  194. }
  195. func (c *config) Load(sources ...source.Source) error {
  196. if err := c.opts.Loader.Load(sources...); err != nil {
  197. return err
  198. }
  199. snap, err := c.opts.Loader.Snapshot()
  200. if err != nil {
  201. return err
  202. }
  203. c.Lock()
  204. defer c.Unlock()
  205. c.snap = snap
  206. vals, err := c.opts.Reader.Values(snap.ChangeSet)
  207. if err != nil {
  208. return err
  209. }
  210. c.vals = vals
  211. return nil
  212. }
  213. func (c *config) Watch(path ...string) (Watcher, error) {
  214. value := c.Get(path...)
  215. w, err := c.opts.Loader.Watch(path...)
  216. if err != nil {
  217. return nil, err
  218. }
  219. return &watcher{
  220. lw: w,
  221. rd: c.opts.Reader,
  222. path: path,
  223. value: value,
  224. }, nil
  225. }
  226. func (c *config) String() string {
  227. return "config"
  228. }
  229. func (w *watcher) Next() (reader.Value, error) {
  230. for {
  231. s, err := w.lw.Next()
  232. if err != nil {
  233. return nil, err
  234. }
  235. // only process changes
  236. if bytes.Equal(w.value.Bytes(), s.ChangeSet.Data) {
  237. continue
  238. }
  239. v, err := w.rd.Values(s.ChangeSet)
  240. if err != nil {
  241. return nil, err
  242. }
  243. w.value = v.Get()
  244. return w.value, nil
  245. }
  246. }
  247. func (w *watcher) Stop() error {
  248. return w.lw.Stop()
  249. }