|
|
@@ -8,15 +8,19 @@ import (
|
|
|
"MeterService/dataStruct"
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
- "log"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
+var (
|
|
|
+ errCount = 0
|
|
|
+)
|
|
|
+
|
|
|
func NewDownStreamService() {
|
|
|
logger.Info("启动DTU服务:%d", config.C.Vber.CmdPort)
|
|
|
server := tcpserver.New(fmt.Sprintf(":%d", config.C.Vber.CmdPort))
|
|
|
server.OnNewClient(func(conn *tcpserver.Client) {
|
|
|
logger.Info("新设备连接:%s", conn.GetClientHost())
|
|
|
+
|
|
|
})
|
|
|
server.OnNewMessage(func(conn *tcpserver.Client, msg []byte) {
|
|
|
//logger.Debug("[%s]收到新消息:%s", conn.GetClientHost(), string(msg))
|
|
|
@@ -50,27 +54,31 @@ func dealDtuUpMsg(conn *tcpserver.Client, msg []byte) bool {
|
|
|
|
|
|
// 处理DTU注册包及心跳包
|
|
|
func dealDtuRegisterHeartBeat(conn *tcpserver.Client, dtuMsg *dataStruct.DTUManageCmd) {
|
|
|
- if dtuMsg.Cmd == 0 || dtuMsg.Cmd == 1 {
|
|
|
- clientState, _ := data.DtuMapState.Get(dtuMsg.SN)
|
|
|
+ if dtuMsg.Cmd == dataStruct.CmdRegister || dtuMsg.Cmd == dataStruct.CmdHeartBeat {
|
|
|
+ //clientState, _ := data.DtuMapState.Get(dtuMsg.SN)
|
|
|
+ clientState, ok := data.OnlineSN.Get(dtuMsg.SN)
|
|
|
conn.SetClientRegisterID(dtuMsg.SN)
|
|
|
clientState.FD = conn
|
|
|
clientState.Online = true
|
|
|
clientState.UpdateTime()
|
|
|
- data.DtuMapState.Add(dtuMsg.SN, clientState)
|
|
|
- data.IpSN.Add(conn.GetClientHost(), dtuMsg.SN)
|
|
|
- onlineSn, ok := data.OnlineSN.Get(dtuMsg.SN)
|
|
|
- if !ok || onlineSn.FD.GetClientHost() != clientState.FD.GetClientHost() {
|
|
|
+
|
|
|
+ //data.IpSN.Add(conn.GetClientHost(), dtuMsg.SN)
|
|
|
+ //onlineSn, ok := data.OnlineSN.Get(dtuMsg.SN)
|
|
|
+ if !ok || clientState.FD.GetClientHost() != clientState.FD.GetClientHost() {
|
|
|
msg := &dataStruct.DtuRegisterChanMsg{
|
|
|
Sn: dtuMsg.SN,
|
|
|
Value: clientState,
|
|
|
}
|
|
|
data.TranSN <- msg
|
|
|
+ } else {
|
|
|
+ logger.Debug("[%s]设备上线:%s", clientState.FD.GetClientHost(), dtuMsg.SN)
|
|
|
+ data.OnlineSN.Add(dtuMsg.SN, clientState)
|
|
|
}
|
|
|
- if dtuMsg.Cmd == 0 {
|
|
|
- logger.Info(dtuMsg.SN + "[" + clientState.FD.GetClientHost() + "] Register")
|
|
|
+ if dtuMsg.Cmd == dataStruct.CmdRegister {
|
|
|
+ logger.Info("======【%s】====== Register %s", dtuMsg.SN, conn.GetClientHost())
|
|
|
}
|
|
|
- if dtuMsg.Cmd == 1 {
|
|
|
- logger.Info(dtuMsg.SN + "[" + clientState.FD.GetClientHost() + "] HeartBeat")
|
|
|
+ if dtuMsg.Cmd == dataStruct.CmdHeartBeat {
|
|
|
+ logger.Info("======【%s】====== HeartBeat %s", dtuMsg.SN, conn.GetClientHost())
|
|
|
}
|
|
|
} else {
|
|
|
logger.Error("Register || HeartBeat 失败")
|
|
|
@@ -83,13 +91,16 @@ func dealDTUDisconnect(conn *tcpserver.Client) {
|
|
|
addr := conn.GetClientHost()
|
|
|
logger.Info("[%s]设备断开连接:%s", addr, sn)
|
|
|
if sn != "" {
|
|
|
- if client, ok := data.DtuMapState.Get(sn); ok && client.FD.GetClientHost() == addr {
|
|
|
- client.Online = false
|
|
|
- data.DtuMapState.Add(sn, client)
|
|
|
- logger.Debug("[%s]设备离线:%s", addr, sn)
|
|
|
- }
|
|
|
+ //if client, ok := data.DtuMapState.Get(sn); ok && client.FD.GetClientHost() == addr {
|
|
|
+ // client.Online = false
|
|
|
+ // data.DtuMapState.Add(sn, client)
|
|
|
+ // logger.Debug("[%s]设备离线:%s", addr, sn)
|
|
|
+ //}
|
|
|
if client, ok := data.OnlineSN.Get(sn); ok && client.FD.GetClientHost() == addr {
|
|
|
data.OnlineSN.Remove(sn)
|
|
|
+ //client.Online = false
|
|
|
+ //data.OnlineSN.Add(sn, client)
|
|
|
+ logger.Debug("[%s]设备离线:%s", addr, sn)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -104,56 +115,72 @@ func runTran(msg <-chan *dataStruct.DtuRegisterChanMsg) {
|
|
|
logger.Info("【更新设备配置】 SN:%s", msg.Sn)
|
|
|
updateOnlineConf(msg)
|
|
|
case nowTime := <-ticker.C:
|
|
|
- minute := nowTime.Minute() //当前分钟
|
|
|
- if minute != lastMinute {
|
|
|
- count := 0
|
|
|
- onlineDtu := make(map[string]*dataStruct.ClientState)
|
|
|
-
|
|
|
- for _, v := range data.DtuMap.Map.Map {
|
|
|
- vv := v.(*dataStruct.DTUDeviceState)
|
|
|
- log.Println("===============", vv.Info.Name)
|
|
|
- }
|
|
|
- data.OnlineSN.Map.Range(func(key, value interface{}) bool {
|
|
|
- v := value.(*dataStruct.ClientState)
|
|
|
- dtuSn := key.(string)
|
|
|
- logger.Debug("【检查在线设备配置】 SN:%s", dtuSn)
|
|
|
- if v.Config != nil {
|
|
|
- onlineDtu[dtuSn] = v
|
|
|
- }
|
|
|
- count++
|
|
|
- return true
|
|
|
- })
|
|
|
- if count == 0 {
|
|
|
- logger.Info("没有在线设备。")
|
|
|
- }
|
|
|
- for dtuSn, client := range onlineDtu {
|
|
|
- if minute%client.Config.Secs == 0 {
|
|
|
- // 采集上报数据
|
|
|
- if dataArray := collectData(dtuSn, client.Config); len(dataArray) > 0 {
|
|
|
- logger.Debug("开始上报平台:%s", dtuSn)
|
|
|
- reportData(dataArray, client.Config)
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- lastMinute = minute
|
|
|
- }
|
|
|
+ collectAndReport(nowTime, lastMinute)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// 更新设备配置
|
|
|
+// 设备上线更新配置
|
|
|
func updateOnlineConf(msg *dataStruct.DtuRegisterChanMsg) {
|
|
|
dtuConfig := &dataStruct.DtuConfig{}
|
|
|
if dtu, ok := data.DtuMap.Get(msg.Sn); ok {
|
|
|
if err := json.Unmarshal([]byte(dtu.Info.Configs), dtuConfig); err != nil {
|
|
|
- logger.Error("[%s]设备配置解析失败。", msg.Sn)
|
|
|
+ logger.Error("[%s]设备上线配置解析失败。", msg.Sn)
|
|
|
return
|
|
|
}
|
|
|
msg.Value.Config = dtuConfig
|
|
|
data.OnlineSN.Add(msg.Sn, msg.Value)
|
|
|
- logger.Debug("[%s]设备配置更新成功。 %v ", msg.Sn, *dtuConfig)
|
|
|
+ logger.Debug("[%s]设备上线:%s ;配置: %v", msg.Value.FD.GetClientHost(), msg.Sn, *dtuConfig)
|
|
|
} else {
|
|
|
- logger.Debug("【更新设备配置失败】[%s]设备不存在。", msg.Sn)
|
|
|
+ logger.Debug("【设备上线失败】[%s]设备不存在。", msg.Sn)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// 采集并上报数据
|
|
|
+func collectAndReport(nowTime time.Time, lastMinute int) {
|
|
|
+ minute := nowTime.Minute() //当前分钟
|
|
|
+ if minute != lastMinute {
|
|
|
+ lastMinute = minute
|
|
|
+ count := 0
|
|
|
+ onlineDtu := make(map[string]*dataStruct.ClientState)
|
|
|
+
|
|
|
+ //for _, v := range data.DtuMap.Map.Map {
|
|
|
+ // vv := v.(*dataStruct.DTUDeviceState)
|
|
|
+ // log.Println("===============", vv.Info.Name)
|
|
|
+ //}
|
|
|
+ data.OnlineSN.Map.Range(func(key, value interface{}) bool {
|
|
|
+ v := value.(*dataStruct.ClientState)
|
|
|
+ dtuSn := key.(string)
|
|
|
+ dtu, ok := data.DtuMap.Get(dtuSn)
|
|
|
+ if !ok || dtu.Info.Configs == "" {
|
|
|
+ logger.Error("【在线设备更新配置】设备不存在 SN:%s", dtuSn)
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ dtuConfig := &dataStruct.DtuConfig{}
|
|
|
+ if err := json.Unmarshal([]byte(dtu.Info.Configs), dtuConfig); err != nil {
|
|
|
+ logger.Error("【在线设备更新配置】解析失败 SN:%s", dtuSn)
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ v.Config = dtuConfig
|
|
|
+ onlineDtu[dtuSn] = v
|
|
|
+ count++
|
|
|
+ return true
|
|
|
+ })
|
|
|
+ if count == 0 && errCount < 10 {
|
|
|
+ errCount++
|
|
|
+ logger.Info("没有在线设备。")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ for dtuSn, client := range onlineDtu {
|
|
|
+ if minute%client.Config.Secs == 0 {
|
|
|
+ // 采集上报数据
|
|
|
+ if dataArray := collectData(dtuSn, client.Config); len(dataArray) > 0 {
|
|
|
+ logger.Debug("开始上报平台:%s", dtuSn)
|
|
|
+ reportData(dataArray, client.Config)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ errCount = 0
|
|
|
+
|
|
|
}
|
|
|
}
|