From 203352332db60c4d9fb19219f7b669e2c2fab65d Mon Sep 17 00:00:00 2001 From: William Reid Date: Fri, 26 Apr 2019 15:39:21 -0400 Subject: [PATCH] Reads data and statuses from controller via modbus --- constants.go | 17 +++- dataTypes.go | 9 +- modbusManager.go | 233 +++++++++++++++++++++++++++++++++++++++++++++ responseManager.go | 6 +- server.go | 33 +++---- socketManager.go | 90 +++++++++++------ 6 files changed, 329 insertions(+), 59 deletions(-) create mode 100644 modbusManager.go diff --git a/constants.go b/constants.go index 29ae098..e2d8289 100644 --- a/constants.go +++ b/constants.go @@ -10,16 +10,23 @@ const ( // message types // outgoing - kindData = "data" - kindHeartbeat = "heartbeat" + kindData = "data" + kindStatus = "status" // incoming kindResponse = "response" messageReceived = "received" // Time (in seconds) to wait between performing read/send operations - dataReadPeriod = 5 - dataSendPeriod = 30 - heartbeatPeriod = 10 + dataReadPeriod = 5 + dataSendPeriod = 30 + statusReadPeriod = 5 + + // Modbus data types + WORD = "WORD" + SWORD = "SWORD" + + // Maximum number of modbus registers we can read at a time + maxRegisterRead = 50 ) var metrics = [...]Metric{ diff --git a/dataTypes.go b/dataTypes.go index 5e4369d..1504b4b 100644 --- a/dataTypes.go +++ b/dataTypes.go @@ -1,8 +1,8 @@ package main type StampedReading struct { - Timestamp int `json:"timestamp"` - Value float32 `json:"value"` + Timestamp int `json:"timestamp"` + Value int32 `json:"value"` } type Packet struct { @@ -16,3 +16,8 @@ type Metric struct { LBound float32 UBound float32 } + +type StatusUpdates struct { + Timestamp int `json:"timestamp"` + Updates map[string]int `json:"updates"` +} diff --git a/modbusManager.go b/modbusManager.go new file mode 100644 index 0000000..d4ad8fd --- /dev/null +++ b/modbusManager.go @@ -0,0 +1,233 @@ +package main + +import ( + "encoding/binary" + "fmt" + "github.com/goburrow/modbus" + "sort" + "strconv" + "strings" + "sync" + "time" +) + +var modbusClient = &_modbusClient{ + modbus.NewClient(nil), + sync.Mutex{}, + sync.WaitGroup{}, + _registers, +} + +type _modbusClient struct { + conn modbus.Client + lock sync.Mutex + wait sync.WaitGroup + registers map[int][2]string +} + +func (m *_modbusClient) start() { + m.wait.Add(1) + go m.connect() +} + +func (m *_modbusClient) connect() { + handler := modbus.NewRTUClientHandler("/dev/ttyUSB0") + handler.BaudRate = 19200 + handler.DataBits = 8 + handler.Parity = "N" + handler.StopBits = 1 + handler.SlaveId = 1 + handler.Timeout = 5 * time.Second + if err := handler.Connect(); err != nil { + fmt.Printf("Error connecting to modbus RTU client: %v\n", err) + } + m.conn = modbus.NewClient(handler) + m.wait.Done() +} + +func (m *_modbusClient) readHoldingRegisters(start uint16, num uint16) []byte { + m.wait.Wait() + m.lock.Lock() + defer m.lock.Unlock() + data, err := m.conn.ReadHoldingRegisters(start, num) + if err != nil { + fmt.Printf("Error reading %v holding registers starting with %v: %v\n", start, num, err) + return []byte{} + } else { + return data + } +} + +func (m *_modbusClient) readAllRegisterData() map[string]StampedReading { + m.wait.Wait() // Wait for client to start up + var keys []int // Array for register numbers + for k := range m.registers { + keys = append(keys, k) + } + sort.Ints(keys) // Make sure it's ascending + + results := make(map[string]StampedReading) // Result map + intervals := m.readsForAllRegisters() // Modbus reading intervals + var data []byte // Data from modbus + + m.lock.Lock() // One reader at a time to be safe + for _, interval := range intervals { // Do readings for all intervals + start := uint16(interval[0]) + number := uint16(interval[1]) + reading, err := m.conn.ReadHoldingRegisters(start, number) // Read the registers + if err != nil { // Make sure it worked + fmt.Printf("Error reading %v registers starting at %v: %v\n", number, start, err) + continue + } + data = append(data, reading...) // Accumulate the data + } + m.lock.Unlock() + + timestamp := int(time.Now().Unix()) + for _, interval := range intervals { // Do mappings for all intervals + start := uint16(interval[0]) + number := uint16(interval[1]) + for r := 0; r < int(start+number); r++ { // Check all the registers in this range + if val, found := m.registers[r]; found { // If we have a metric for this, add it to results + switch val[1] { + case WORD: + results[val[0]] = StampedReading{timestamp, int32(binary.LittleEndian.Uint16(data[2*r : 2*r+2]))} + case SWORD: + results[val[0]] = StampedReading{timestamp, int32(int16(binary.LittleEndian.Uint16(data[2*r : 2*r+2])))} + } + } + } + } + + return results +} + +func (m *_modbusClient) readsForAllRegisters() [][2]int { + var reads [][2]int + var keys []int + for k := range m.registers { + keys = append(keys, k) + } + sort.Ints(keys) + + startReg := keys[0] + var i int + for i = 0; i < len(keys); i++ { + if keys[i]-startReg > 49 { + reads = append(reads, [2]int{startReg, keys[i-1] - startReg + 1}) + startReg = keys[i] + } + } + reads = append(reads, [2]int{startReg, keys[i-1] - startReg + 1}) + + return reads +} + +func reverse(s string) string { + runes := []rune(s) + for i, j := 0, len(runes)-1; i < j; i, j = i+1, j-1 { + runes[i], runes[j] = runes[j], runes[i] + } + return string(runes) +} + +func (m *_modbusClient) getStatusCodes() string { + data := m.readHoldingRegisters(uint16(1499), uint16(16)) + binstr := make([]string, len(data)) + for i := 0; i < len(data); i += 2 { + binstr[i] = reverse(fmt.Sprintf("%08s", strconv.FormatInt(int64(data[i+1]), 2))) + binstr[i+1] = reverse(fmt.Sprintf("%08s", strconv.FormatInt(int64(data[i]), 2))) + } + return strings.Join(binstr, "") +} + +func (m *_modbusClient) getStatusUpdates() map[string]int { + statuses := m.getStatusCodes() + fmt.Printf("Statuses: %v\n", statuses) + statusMap := make(map[string]int) + for k, v := range _statuses { + status := statuses[k] - 48 + statusMap[v] = int(status) + } + if len(_lastStatus) == 0 { + _lastStatus = statusMap + return statusMap + } else { + updateMap := make(map[string]int) + for k, v := range _lastStatus { + newv := statusMap[k] + if newv != v { + updateMap[k] = newv + } + } + _lastStatus = statusMap + return updateMap + } +} + +var _registers = map[int][2]string{ + 0: {"l1.l2.voltage", WORD}, + 1: {"l2.l3.voltage", WORD}, + 2: {"l3.l1.voltage", WORD}, + 3: {"l1.l0.voltage", WORD}, + 4: {"l2.l0.voltage", WORD}, + 5: {"l3.l0.voltage", WORD}, + 6: {"l1.current", WORD}, + 7: {"l2.current", WORD}, + 8: {"l3.current", WORD}, + 9: {"frequency", WORD}, + 10: {"total.kw", WORD}, + 11: {"rate.kw", WORD}, + 12: {"total.pf", SWORD}, + 13: {"l1.kw", WORD}, + 14: {"l1.pf", SWORD}, + 15: {"l2.kw", WORD}, + 16: {"l2.pf", SWORD}, + 17: {"l3.kw", WORD}, + 18: {"l3.pf", SWORD}, + 19: {"total.kvar", SWORD}, + 20: {"l1.kvar", SWORD}, + 21: {"l2.kvar", SWORD}, + 22: {"l3.kvar", SWORD}, + 23: {"total.kva", WORD}, + 24: {"l1.kva", WORD}, + 25: {"l2.kva", WORD}, + 26: {"l3.kva", WORD}, + 32: {"oil.pressure", SWORD}, + 33: {"coolant.temp", SWORD}, + 34: {"engine.rpm", WORD}, + 35: {"battery.voltage", WORD}, + 36: {"fuel.pressure", WORD}, + 37: {"fuel.temp", SWORD}, + 38: {"fuel.rate", WORD}, + 40: {"coolant.pressure", WORD}, + 41: {"coolant.level", WORD}, + 42: {"oil.temp", SWORD}, + 43: {"oil.level", WORD}, + 44: {"crankcase.pressure", WORD}, + 45: {"ambient.temp", SWORD}, + 46: {"ecm.battery.voltage", WORD}, + 48: {"intake.temp", SWORD}, + 49: {"intake.pressure", WORD}, +} + +var _statuses = map[int]string{ + 0: "fault.emstop", + 1: "fault.overspeed", + 2: "fault.overcrank", + 5: "fault.coolant.temp.low", + 6: "fault.fuel.low", + 7: "fault.engine.temp.high", + 8: "fault.oil.pressure.low", + 9: "status.notinauto", + 11: "fault.batt.volt.low", + 12: "fault.batt.volt.high", + 14: "status.system.ready", + 19: "fault.coolant.level.low", + 27: "fault.crank.volt.low", + 34: "status.eps", + 39: "status.gen.running", + 207: "fault.common", +} + +var _lastStatus = map[string]int{} diff --git a/responseManager.go b/responseManager.go index 8852e2a..3a793c9 100644 --- a/responseManager.go +++ b/responseManager.go @@ -25,7 +25,7 @@ type responseListener struct { } func (rman *_responseManager) addPacket(p Packet) { // Add a packet to the response map and start waiting for it - rl := &responseListener{p, time.NewTimer(10 * time.Second), make(chan error), client.resend} + rl := &responseListener{p, time.NewTimer(10 * time.Second), make(chan error), socketClient.packetResend} rman.lock.Lock() // Safe map access defer rman.lock.Unlock() rman.mapping[p.ID] = rl @@ -64,7 +64,7 @@ func (rl *responseListener) awaitResponse() { // Gets a message from the timer o select { case <-rl.timer.C: // Timer expires, no response fmt.Printf("Didn't get a response for packet %v, queueing for resend...\n", rl.packet.ID) - client.resend <- rl.packet + socketClient.packetResend <- rl.packet responseManager.remove(rl.packet.ID) case err := <-rl.recv: // We got a response @@ -72,7 +72,7 @@ func (rl *responseListener) awaitResponse() { // Gets a message from the timer o if err != nil { // Something went wrong, resend TODO: probably check to make sure this is a server-side error fmt.Printf("Response to packet %v was an error: %v\n", rl.packet.ID, err) fmt.Printf("Queueing packet %v for resend...\n", rl.packet.ID) - client.resend <- rl.packet + socketClient.packetResend <- rl.packet } responseManager.remove(rl.packet.ID) } diff --git a/server.go b/server.go index a7ab522..bac8d7b 100644 --- a/server.go +++ b/server.go @@ -1,16 +1,17 @@ package main import ( - "math/rand" "time" ) // This file should only handle initialization of actors func main() { - go dataReadLoop() // Start our async read loop - client.start() // Start our client - go dataSendLoop() // Start our loop to tell the client when to send + go dataReadLoop() // Start our async read loop + socketClient.start() // Start our client + modbusClient.start() + go dataSendLoop() // Start our loop to tell the client when to send + go statusReadLoop() // Start our loop to read status updates select {} // Immortality } @@ -18,7 +19,7 @@ func main() { func dataSendLoop() { for { time.Sleep(time.Second * dataSendPeriod) - client.sendData() + socketClient.sendData() } } @@ -26,7 +27,7 @@ func dataSendLoop() { func dataReadLoop() { // Read data all day every day for { dataBuffer := make(map[string][]StampedReading) - reading := readData() // TODO: error handling + reading := modbusClient.readAllRegisterData() // TODO: error handling for metric, value := range reading { // Put our data in the buffer if record, found := dataBuffer[metric]; !found { dataBuffer[metric] = []StampedReading{value} @@ -34,22 +35,18 @@ func dataReadLoop() { // Read data all day every day dataBuffer[metric] = append(record, value) } } - client.send <- dataBuffer + socketClient.dataSend <- dataBuffer time.Sleep(time.Second * dataReadPeriod) } } -var oldReadings = map[string]StampedReading{} - -func readData() map[string]StampedReading { - newReadings := map[string]StampedReading{} - for _, element := range metrics { - newRead := StampedReading{ - int(time.Now().Unix()), - 0.875*oldReadings[element.Name].Value + 0.125*((rand.Float32()*(element.UBound-element.LBound))+element.LBound), +func statusReadLoop() { // + for { + updates := modbusClient.getStatusUpdates() + if len(updates) > 0 { + packetData := StatusUpdates{int(time.Now().Unix()), updates} + go socketClient.sendStatusUpdates(packetData) // Sends statuses immediately } - newReadings[element.Name] = newRead + time.Sleep(time.Second * statusReadPeriod) } - oldReadings = newReadings - return newReadings } diff --git a/socketManager.go b/socketManager.go index 9fadfd7..6c9b5cc 100644 --- a/socketManager.go +++ b/socketManager.go @@ -13,35 +13,37 @@ import ( "time" ) -var client = &_client{ // Our global client +var socketClient = &_socketClient{ // Our global client &websocket.Conn{}, sync.Mutex{}, sync.WaitGroup{}, + make(chan map[string]int), make(chan map[string][]StampedReading), make(chan Packet), make(map[string][]StampedReading), sync.Mutex{}, } -type _client struct { +type _socketClient struct { // Client struct definition - conn *websocket.Conn // The websocket connection - connLock sync.Mutex // Lock to manage concurrent writing - wait sync.WaitGroup // Waitgroup to manage connection (re)initialization - send chan map[string][]StampedReading // Channel to accept incoming data to send - resend chan Packet // Channel to accept packets that need resending - sendBuffer map[string][]StampedReading // Buffer to store data to be sent - bufferLock sync.Mutex // Lock to control concurrent access to send buffer + conn *websocket.Conn // The websocket connection + connLock sync.Mutex // Lock to manage concurrent writing + wait sync.WaitGroup // Waitgroup to manage connection (re)initialization + statusSend chan map[string]int + dataSend chan map[string][]StampedReading // Channel to accept incoming data to send + packetResend chan Packet // Channel to accept packets that need resending + sendBuffer map[string][]StampedReading // Buffer to store data to be sent + bufferLock sync.Mutex // Lock to control concurrent access to send buffer } -func (c *_client) start() { - client.wait.Add(1) // Make sure other loops don't do anything without a valid client +func (c *_socketClient) start() { + c.wait.Add(1) // Make sure other loops don't do anything without a valid client go c.connect() // Do the connecty thing go c.handlePackets() // Loop to handle incoming packets from send and resend channels go c.readIncoming() // Loop to read incoming messages from the gateway } -func (c *_client) connect() { +func (c *_socketClient) connect() { c.connLock.Lock() // Shouldn't matter because the other processes will be waiting, but I'm gonna do it anyways defer c.connLock.Unlock() @@ -72,7 +74,7 @@ func (c *_client) connect() { } } -func (c *_client) addReadings(readings map[string][]StampedReading) { +func (c *_socketClient) addReadings(readings map[string][]StampedReading) { c.bufferLock.Lock() defer c.bufferLock.Unlock() for metric, value := range readings { // Put our data in the buffer @@ -84,23 +86,46 @@ func (c *_client) addReadings(readings map[string][]StampedReading) { } } -func (c *_client) handlePackets() { +func (c *_socketClient) handlePackets() { for { select { - case readings := <-c.send: // Data packet from the send channel + case readings := <-c.dataSend: // Data packet from the send channel c.addReadings(readings) - case packet := <-c.resend: // Packet to be resent - var readings map[string][]StampedReading - if err := json.Unmarshal(packet.Message, &readings); err != nil { // Pull data out of the packet - fmt.Printf("Error unmarshalling data from packet %v to resend: %v\n", packet.ID, err) - } else { - c.addReadings(readings) + case packet := <-c.packetResend: // Packet to be resent + switch packet.Kind { + case kindData: + var readings map[string][]StampedReading + if err := json.Unmarshal(packet.Message, &readings); err != nil { // Pull data out of the packet + fmt.Printf("Error unmarshalling data from packet %v to resend: %v\n", packet.ID, err) + } else { + c.addReadings(readings) + } + case kindStatus: + var statuses StatusUpdates + if err := json.Unmarshal(packet.Message, &statuses); err != nil { + fmt.Printf("Error unmarshalling status updates from packet %v to resend: %v\n", packet.ID, err) + } else { + c.sendStatusUpdates(statuses) + } } } } } -func (c *_client) sendData() { +func (c *_socketClient) sendPacket(p Packet) { + fmt.Printf("Sending %v packet with ID %v\n", p.Kind, p.ID) + c.connLock.Lock() // Lock to send + if err := c.conn.WriteJSON(p); err != nil { // Try to send + fmt.Printf("Error sending %v packet %v: %v\n", p.Kind, p.ID, err) // If we fail, try to reconnect + c.wait.Add(1) // Prevent client things from happening + go c.connect() // Reconnect + } + c.connLock.Unlock() // Unlock after + + responseManager.addPacket(p) // Wait for a response -- also requeues the data if sending it failed the first time. +} + +func (c *_socketClient) sendData() { c.wait.Wait() // Make sure client is connected c.bufferLock.Lock() // Lock to grab and replace buffer buffer := c.sendBuffer // Grab all the stuff there is to send @@ -115,20 +140,23 @@ func (c *_client) sendData() { } p := Packet{nextId(), kindData, byteMsg} // Wrap it into a packet - fmt.Printf("Sending packet %v\n", p.ID) + c.sendPacket(p) +} - c.connLock.Lock() // Lock to send - if err := c.conn.WriteJSON(p); err != nil { // Try to send - fmt.Printf("Error sending %v packet %v: %v\n", p.Kind, p.ID, err) // If we fail, try to reconnect - c.wait.Add(1) // Prevent client things from happening - go c.connect() // Reconnect +func (c *_socketClient) sendStatusUpdates(u StatusUpdates) { + c.wait.Wait() + byteMsg, err := json.Marshal(u) + if err != nil { + fmt.Printf("Error marshalling data: %v\n", err) + fmt.Println("NOT RETRYING") // TODO: probably something + return } - c.connLock.Unlock() // Unlock after - responseManager.addPacket(p) // Wait for a response -- also requeues the data if sending it failed the first time. + p := Packet{nextId(), kindStatus, byteMsg} + c.sendPacket(p) } -func (c *_client) readIncoming() { // Read messages from gateway +func (c *_socketClient) readIncoming() { // Read messages from gateway for { c.wait.Wait() // Wait for the client to be ready var packet Packet