tcpclient.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. // Copyright 2014 Quoc-Viet Nguyen. All rights reserved.
  2. // This software may be modified and distributed under the terms
  3. // of the BSD license. See the LICENSE file for details.
  4. package modbus
  5. import (
  6. "encoding/binary"
  7. "fmt"
  8. "io"
  9. "log"
  10. "net"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. )
  15. const (
  16. tcpProtocolIdentifier uint16 = 0x0000
  17. // Modbus Application Protocol
  18. tcpHeaderSize = 7
  19. tcpMaxLength = 260
  20. // Default TCP timeout is not set
  21. tcpTimeout = 10 * time.Second
  22. tcpIdleTimeout = 60 * time.Second
  23. )
  24. // TCPClientHandler implements Packager and Transporter interface.
  25. type TCPClientHandler struct {
  26. tcpPackager
  27. tcpTransporter
  28. }
  29. // NewTCPClientHandler allocates a new TCPClientHandler.
  30. func NewTCPClientHandler(address string) *TCPClientHandler {
  31. h := &TCPClientHandler{}
  32. h.Address = address
  33. h.Timeout = tcpTimeout
  34. h.IdleTimeout = tcpIdleTimeout
  35. return h
  36. }
  37. // TCPClient creates TCP client with default handler and given connect string.
  38. func TCPClient(address string) Client {
  39. handler := NewTCPClientHandler(address)
  40. return NewClient(handler)
  41. }
  42. // tcpPackager implements Packager interface.
  43. type tcpPackager struct {
  44. // For synchronization between messages of server & client
  45. transactionId uint32
  46. // Broadcast address is 0
  47. SlaveId byte
  48. }
  49. // Encode adds modbus application protocol header:
  50. //
  51. // Transaction identifier: 2 bytes
  52. // Protocol identifier: 2 bytes
  53. // Length: 2 bytes
  54. // Unit identifier: 1 byte
  55. // Function code: 1 byte
  56. // Data: n bytes
  57. func (mb *tcpPackager) Encode(pdu *ProtocolDataUnit) (adu []byte, err error) {
  58. adu = make([]byte, tcpHeaderSize+1+len(pdu.Data))
  59. // Transaction identifier
  60. transactionId := atomic.AddUint32(&mb.transactionId, 1)
  61. binary.BigEndian.PutUint16(adu, uint16(transactionId))
  62. // Protocol identifier
  63. binary.BigEndian.PutUint16(adu[2:], tcpProtocolIdentifier)
  64. // Length = sizeof(SlaveId) + sizeof(FunctionCode) + Data
  65. length := uint16(1 + 1 + len(pdu.Data))
  66. binary.BigEndian.PutUint16(adu[4:], length)
  67. // Unit identifier
  68. adu[6] = mb.SlaveId
  69. // PDU
  70. adu[tcpHeaderSize] = pdu.FunctionCode
  71. copy(adu[tcpHeaderSize+1:], pdu.Data)
  72. return
  73. }
  74. // Verify confirms transaction, protocol and unit id.
  75. func (mb *tcpPackager) Verify(aduRequest []byte, aduResponse []byte) (err error) {
  76. // Transaction id
  77. responseVal := binary.BigEndian.Uint16(aduResponse)
  78. requestVal := binary.BigEndian.Uint16(aduRequest)
  79. if responseVal != requestVal {
  80. err = fmt.Errorf("modbus: response transaction id '%v' does not match request '%v'", responseVal, requestVal)
  81. return
  82. }
  83. // Protocol id
  84. responseVal = binary.BigEndian.Uint16(aduResponse[2:])
  85. requestVal = binary.BigEndian.Uint16(aduRequest[2:])
  86. if responseVal != requestVal {
  87. err = fmt.Errorf("modbus: response protocol id '%v' does not match request '%v'", responseVal, requestVal)
  88. return
  89. }
  90. // Unit id (1 byte)
  91. if aduResponse[6] != aduRequest[6] {
  92. err = fmt.Errorf("modbus: response unit id '%v' does not match request '%v'", aduResponse[6], aduRequest[6])
  93. return
  94. }
  95. return
  96. }
  97. // Decode extracts PDU from TCP frame:
  98. //
  99. // Transaction identifier: 2 bytes
  100. // Protocol identifier: 2 bytes
  101. // Length: 2 bytes
  102. // Unit identifier: 1 byte
  103. func (mb *tcpPackager) Decode(adu []byte) (pdu *ProtocolDataUnit, err error) {
  104. // Read length value in the header
  105. length := binary.BigEndian.Uint16(adu[4:])
  106. pduLength := len(adu) - tcpHeaderSize
  107. if pduLength <= 0 || pduLength != int(length-1) {
  108. err = fmt.Errorf("modbus: length in response '%v' does not match pdu data length '%v'", length-1, pduLength)
  109. return
  110. }
  111. pdu = &ProtocolDataUnit{}
  112. // The first byte after header is function code
  113. pdu.FunctionCode = adu[tcpHeaderSize]
  114. pdu.Data = adu[tcpHeaderSize+1:]
  115. return
  116. }
  117. // tcpTransporter implements Transporter interface.
  118. type tcpTransporter struct {
  119. // Connect string
  120. Address string
  121. // Connect & Read timeout
  122. Timeout time.Duration
  123. // Idle timeout to close the connection
  124. IdleTimeout time.Duration
  125. // Transmission logger
  126. Logger *log.Logger
  127. // TCP connection
  128. mu sync.Mutex
  129. conn net.Conn
  130. closeTimer *time.Timer
  131. lastActivity time.Time
  132. }
  133. // Send sends data to server and ensures response length is greater than header length.
  134. func (mb *tcpTransporter) Send(aduRequest []byte) (aduResponse []byte, err error) {
  135. mb.mu.Lock()
  136. defer mb.mu.Unlock()
  137. // Establish a new connection if not connected
  138. if err = mb.connect(); err != nil {
  139. return
  140. }
  141. // Set timer to close when idle
  142. mb.lastActivity = time.Now()
  143. mb.startCloseTimer()
  144. // Set write and read timeout
  145. var timeout time.Time
  146. if mb.Timeout > 0 {
  147. timeout = mb.lastActivity.Add(mb.Timeout)
  148. }
  149. if err = mb.conn.SetDeadline(timeout); err != nil {
  150. return
  151. }
  152. // Send data
  153. mb.logf("modbus: sending % x", aduRequest)
  154. if _, err = mb.conn.Write(aduRequest); err != nil {
  155. return
  156. }
  157. // Read header first
  158. var data [tcpMaxLength]byte
  159. if _, err = io.ReadFull(mb.conn, data[:tcpHeaderSize]); err != nil {
  160. return
  161. }
  162. // Read length, ignore transaction & protocol id (4 bytes)
  163. length := int(binary.BigEndian.Uint16(data[4:]))
  164. if length <= 0 {
  165. mb.flush(data[:])
  166. err = fmt.Errorf("modbus: length in response header '%v' must not be zero", length)
  167. return
  168. }
  169. if length > (tcpMaxLength - (tcpHeaderSize - 1)) {
  170. mb.flush(data[:])
  171. err = fmt.Errorf("modbus: length in response header '%v' must not greater than '%v'", length, tcpMaxLength-tcpHeaderSize+1)
  172. return
  173. }
  174. // Skip unit id
  175. length += tcpHeaderSize - 1
  176. if _, err = io.ReadFull(mb.conn, data[tcpHeaderSize:length]); err != nil {
  177. return
  178. }
  179. aduResponse = data[:length]
  180. mb.logf("modbus: received % x\n", aduResponse)
  181. return
  182. }
  183. // Connect establishes a new connection to the address in Address.
  184. // Connect and Close are exported so that multiple requests can be done with one session
  185. func (mb *tcpTransporter) Connect() error {
  186. mb.mu.Lock()
  187. defer mb.mu.Unlock()
  188. return mb.connect()
  189. }
  190. func (mb *tcpTransporter) connect() error {
  191. if mb.conn == nil {
  192. dialer := net.Dialer{Timeout: mb.Timeout}
  193. conn, err := dialer.Dial("tcp", mb.Address)
  194. if err != nil {
  195. return err
  196. }
  197. mb.conn = conn
  198. }
  199. return nil
  200. }
  201. func (mb *tcpTransporter) startCloseTimer() {
  202. if mb.IdleTimeout <= 0 {
  203. return
  204. }
  205. if mb.closeTimer == nil {
  206. mb.closeTimer = time.AfterFunc(mb.IdleTimeout, mb.closeIdle)
  207. } else {
  208. mb.closeTimer.Reset(mb.IdleTimeout)
  209. }
  210. }
  211. // Close closes current connection.
  212. func (mb *tcpTransporter) Close() error {
  213. mb.mu.Lock()
  214. defer mb.mu.Unlock()
  215. return mb.close()
  216. }
  217. // flush flushes pending data in the connection,
  218. // returns io.EOF if connection is closed.
  219. func (mb *tcpTransporter) flush(b []byte) (err error) {
  220. if err = mb.conn.SetReadDeadline(time.Now()); err != nil {
  221. return
  222. }
  223. // Timeout setting will be reset when reading
  224. if _, err = mb.conn.Read(b); err != nil {
  225. // Ignore timeout error
  226. if netError, ok := err.(net.Error); ok && netError.Timeout() {
  227. err = nil
  228. }
  229. }
  230. return
  231. }
  232. func (mb *tcpTransporter) logf(format string, v ...interface{}) {
  233. if mb.Logger != nil {
  234. mb.Logger.Printf(format, v...)
  235. }
  236. }
  237. // closeLocked closes current connection. Caller must hold the mutex before calling this method.
  238. func (mb *tcpTransporter) close() (err error) {
  239. if mb.conn != nil {
  240. err = mb.conn.Close()
  241. mb.conn = nil
  242. }
  243. return
  244. }
  245. // closeIdle closes the connection if last activity is passed behind IdleTimeout.
  246. func (mb *tcpTransporter) closeIdle() {
  247. mb.mu.Lock()
  248. defer mb.mu.Unlock()
  249. if mb.IdleTimeout <= 0 {
  250. return
  251. }
  252. idle := time.Now().Sub(mb.lastActivity)
  253. if idle >= mb.IdleTimeout {
  254. mb.logf("modbus: closing connection due to idle timeout: %v", idle)
  255. mb.close()
  256. }
  257. }