diff --git a/constants.go b/constants.go index 1eba521..0058a5d 100644 --- a/constants.go +++ b/constants.go @@ -10,16 +10,23 @@ const ( // message types // outgoing - kindData = "data" - kindHeartbeat = "heartbeat" + kindData = "data" + kindStatus = "status" // incoming kindResponse = "response" messageReceived = "received" // Time (in seconds) to wait between performing read/send operations - dataReadPeriod = 300 - dataSendPeriod = 300 - heartbeatPeriod = 10 + dataReadPeriod = 300 + dataSendPeriod = 300 + statusReadPeriod = 300 + + // Modbus data types + WORD = "WORD" + SWORD = "SWORD" + + // Maximum number of modbus registers we can read at a time + maxRegisterRead = 50 ) var metrics = [...]Metric{ diff --git a/dataTypes.go b/dataTypes.go index 5e4369d..1504b4b 100644 --- a/dataTypes.go +++ b/dataTypes.go @@ -1,8 +1,8 @@ package main type StampedReading struct { - Timestamp int `json:"timestamp"` - Value float32 `json:"value"` + Timestamp int `json:"timestamp"` + Value int32 `json:"value"` } type Packet struct { @@ -16,3 +16,8 @@ type Metric struct { LBound float32 UBound float32 } + +type StatusUpdates struct { + Timestamp int `json:"timestamp"` + Updates map[string]int `json:"updates"` +} diff --git a/modbusManager.go b/modbusManager.go new file mode 100644 index 0000000..82b7a72 --- /dev/null +++ b/modbusManager.go @@ -0,0 +1,233 @@ +package main + +import ( + "encoding/binary" + "fmt" + "github.com/goburrow/modbus" + "sort" + "strconv" + "strings" + "sync" + "time" +) + +var modbusClient = &_modbusClient{ + modbus.NewClient(nil), + sync.Mutex{}, + sync.WaitGroup{}, + _registers, +} + +type _modbusClient struct { + conn modbus.Client + lock sync.Mutex + wait sync.WaitGroup + registers map[int][2]string +} + +func (m *_modbusClient) start() { + m.wait.Add(1) + go m.connect() +} + +func (m *_modbusClient) connect() { + handler := modbus.NewRTUClientHandler("/dev/ttyUSB0") + handler.BaudRate = 19200 + handler.DataBits = 8 + handler.Parity = "N" + handler.StopBits = 1 + handler.SlaveId = 1 + handler.Timeout = 5 * time.Second + if err := handler.Connect(); err != nil { + fmt.Printf("Error connecting to modbus RTU client: %v\n", err) + } + m.conn = modbus.NewClient(handler) + m.wait.Done() +} + +func (m *_modbusClient) readHoldingRegisters(start uint16, num uint16) []byte { + m.wait.Wait() + m.lock.Lock() + defer m.lock.Unlock() + data, err := m.conn.ReadHoldingRegisters(start, num) + if err != nil { + fmt.Printf("Error reading %v holding registers starting with %v: %v\n", start, num, err) + return []byte{} + } else { + return data + } +} + +func (m *_modbusClient) readAllRegisterData() map[string]StampedReading { + m.wait.Wait() // Wait for client to start up + var keys []int // Array for register numbers + for k := range m.registers { + keys = append(keys, k) + } + sort.Ints(keys) // Make sure it's ascending + + results := make(map[string]StampedReading) // Result map + intervals := m.readsForAllRegisters() // Modbus reading intervals + var data []byte // Data from modbus + + m.lock.Lock() // One reader at a time to be safe + for _, interval := range intervals { // Do readings for all intervals + start := uint16(interval[0]) + number := uint16(interval[1]) + reading, err := m.conn.ReadHoldingRegisters(start, number) // Read the registers + if err != nil { // Make sure it worked + fmt.Printf("Error reading %v registers starting at %v: %v\n", number, start, err) + continue + } + data = append(data, reading...) // Accumulate the data + } + m.lock.Unlock() + + timestamp := int(time.Now().Unix()) + for _, interval := range intervals { // Do mappings for all intervals + start := uint16(interval[0]) + number := uint16(interval[1]) + for r := 0; r < int(start+number); r++ { // Check all the registers in this range + if val, found := m.registers[r]; found { // If we have a metric for this, add it to results + switch val[1] { + case WORD: + results[val[0]] = StampedReading{timestamp, int32(binary.LittleEndian.Uint16(data[2*r : 2*r+2]))} + case SWORD: + results[val[0]] = StampedReading{timestamp, int32(int16(binary.LittleEndian.Uint16(data[2*r : 2*r+2])))} + } + } + } + } + + return results +} + +func (m *_modbusClient) readsForAllRegisters() [][2]int { + var reads [][2]int + var keys []int + for k := range m.registers { + keys = append(keys, k) + } + sort.Ints(keys) + + startReg := keys[0] + var i int + for i = 0; i < len(keys); i++ { + if keys[i]-startReg > 49 { + reads = append(reads, [2]int{startReg, keys[i-1] - startReg + 1}) + startReg = keys[i] + } + } + reads = append(reads, [2]int{startReg, keys[i-1] - startReg + 1}) + + return reads +} + +func reverse(s string) string { + runes := []rune(s) + for i, j := 0, len(runes)-1; i < j; i, j = i+1, j-1 { + runes[i], runes[j] = runes[j], runes[i] + } + return string(runes) +} + +func (m *_modbusClient) getStatusCodes() string { + data := m.readHoldingRegisters(uint16(1499), uint16(16)) + binstr := make([]string, len(data)) + for i := 0; i < len(data); i += 2 { + binstr[i] = reverse(fmt.Sprintf("%08s", strconv.FormatInt(int64(data[i+1]), 2))) + binstr[i+1] = reverse(fmt.Sprintf("%08s", strconv.FormatInt(int64(data[i]), 2))) + } + return strings.Join(binstr, "") +} + +func (m *_modbusClient) getStatusUpdates() map[string]int { + statuses := m.getStatusCodes() + fmt.Printf("Statuses: %v\n", statuses) + statusMap := make(map[string]int) + for k, v := range _statuses { + status := statuses[k] - 48 + statusMap[v] = int(status) + } + if len(_lastStatus) == 0 { + _lastStatus = statusMap + return statusMap + } else { + updateMap := make(map[string]int) + for k, v := range _lastStatus { + newv := statusMap[k] + if newv != v { + updateMap[k] = newv + } + } + _lastStatus = statusMap + return updateMap + } +} + +var _registers = map[int][2]string{ + 0: {"l1.l2.voltage", WORD}, + 1: {"l2.l3.voltage", WORD}, + 2: {"l3.l1.voltage", WORD}, + 3: {"l1.l0.voltage", WORD}, + 4: {"l2.l0.voltage", WORD}, + 5: {"l3.l0.voltage", WORD}, + 6: {"l1.current", WORD}, + 7: {"l2.current", WORD}, + 8: {"l3.current", WORD}, + 9: {"frequency", WORD}, + 10: {"total.kw", WORD}, + 11: {"rate.kw", WORD}, + 12: {"total.pf", SWORD}, + 13: {"l1.kw", WORD}, + 14: {"l1.pf", SWORD}, + 15: {"l2.kw", WORD}, + 16: {"l2.pf", SWORD}, + 17: {"l3.kw", WORD}, + 18: {"l3.pf", SWORD}, + 19: {"total.kvar", SWORD}, + 20: {"l1.kvar", SWORD}, + 21: {"l2.kvar", SWORD}, + 22: {"l3.kvar", SWORD}, + 23: {"total.kva", WORD}, + 24: {"l1.kva", WORD}, + 25: {"l2.kva", WORD}, + 26: {"l3.kva", WORD}, + 32: {"oil.pressure", SWORD}, + 33: {"coolant.temp", SWORD}, + 34: {"engine.rpm", WORD}, + 35: {"battery.voltage", WORD}, + 36: {"fuel.pressure", WORD}, + 37: {"fuel.temp", SWORD}, + 38: {"fuel.rate", WORD}, + 40: {"coolant.pressure", WORD}, + 41: {"coolant.level", WORD}, + 42: {"oil.temp", SWORD}, + 43: {"oil.level", WORD}, + 44: {"crankcase.pressure", WORD}, + 45: {"ambient.temp", SWORD}, + 46: {"ecm.battery.voltage", WORD}, + 48: {"intake.temp", SWORD}, + 49: {"intake.pressure", WORD}, +} + +var _statuses = map[int]string{ + 0: "fault.emstop", + 1: "fault.overspeed", + 2: "fault.overcrank", + 5: "fault.coolant.temp.low", + 6: "fault.fuel.low", + 7: "fault.engine.temp.high", + 8: "fault.oil.pressure.low", + 9: "status.notinauto", + 11: "fault.batt.volt.low", + 12: "fault.batt.volt.high", + 14: "status.system.ready", + 19: "fault.coolant.level.low", + 27: "fault.crank.volt.low", + 34: "status.eps", + 39: "status.gen.running", + 207: "fault.common", +} + +var _lastStatus = map[string]int{} diff --git a/responseManager.go b/responseManager.go index 8852e2a..845fd89 100644 --- a/responseManager.go +++ b/responseManager.go @@ -25,7 +25,7 @@ type responseListener struct { } func (rman *_responseManager) addPacket(p Packet) { // Add a packet to the response map and start waiting for it - rl := &responseListener{p, time.NewTimer(10 * time.Second), make(chan error), client.resend} + rl := &responseListener{p, time.NewTimer(10 * time.Second), make(chan error), socketClient.packetResend} rman.lock.Lock() // Safe map access defer rman.lock.Unlock() rman.mapping[p.ID] = rl @@ -44,7 +44,7 @@ func (rman *_responseManager) received(p Packet) { var err error = nil - message := string(p.Message) // See what the message was + message := string(p.Message) // See what the message was if message != messageReceived { // The only one that's implemented right now is messageReceived so this shouldn't fail err = errors.New(fmt.Sprintf("Unexpected response message for packet %v: %v\n", p.ID, message)) } else { @@ -64,15 +64,15 @@ func (rl *responseListener) awaitResponse() { // Gets a message from the timer o select { case <-rl.timer.C: // Timer expires, no response fmt.Printf("Didn't get a response for packet %v, queueing for resend...\n", rl.packet.ID) - client.resend <- rl.packet + socketClient.packetResend <- rl.packet responseManager.remove(rl.packet.ID) case err := <-rl.recv: // We got a response - rl.timer.Stop() // Stop that timer + rl.timer.Stop() // Stop that timer if err != nil { // Something went wrong, resend TODO: probably check to make sure this is a server-side error fmt.Printf("Response to packet %v was an error: %v\n", rl.packet.ID, err) fmt.Printf("Queueing packet %v for resend...\n", rl.packet.ID) - client.resend <- rl.packet + socketClient.packetResend <- rl.packet } responseManager.remove(rl.packet.ID) } diff --git a/server.go b/server.go index a7ab522..91cfc92 100644 --- a/server.go +++ b/server.go @@ -1,6 +1,7 @@ package main import ( + "math" "math/rand" "time" ) @@ -8,9 +9,11 @@ import ( // This file should only handle initialization of actors func main() { - go dataReadLoop() // Start our async read loop - client.start() // Start our client - go dataSendLoop() // Start our loop to tell the client when to send + go dataReadLoop() // Start our async read loop + socketClient.start() // Start our client + modbusClient.start() + go dataSendLoop() // Start our loop to tell the client when to send + go statusReadLoop() // Start our loop to read status updates select {} // Immortality } @@ -18,7 +21,7 @@ func main() { func dataSendLoop() { for { time.Sleep(time.Second * dataSendPeriod) - client.sendData() + socketClient.sendData() } } @@ -26,7 +29,7 @@ func dataSendLoop() { func dataReadLoop() { // Read data all day every day for { dataBuffer := make(map[string][]StampedReading) - reading := readData() // TODO: error handling + reading := readDummyData() // TODO: error handling for metric, value := range reading { // Put our data in the buffer if record, found := dataBuffer[metric]; !found { dataBuffer[metric] = []StampedReading{value} @@ -34,22 +37,56 @@ func dataReadLoop() { // Read data all day every day dataBuffer[metric] = append(record, value) } } - client.send <- dataBuffer + socketClient.dataSend <- dataBuffer time.Sleep(time.Second * dataReadPeriod) } } +func statusReadLoop() { // + for { + updates := readDummyStatuses() + if len(updates) > 0 { + packetData := StatusUpdates{int(time.Now().Unix()), updates} + go socketClient.sendStatusUpdates(packetData) // Sends statuses immediately + } + time.Sleep(time.Second * statusReadPeriod) + } +} + var oldReadings = map[string]StampedReading{} -func readData() map[string]StampedReading { +func readDummyData() map[string]StampedReading { newReadings := map[string]StampedReading{} for _, element := range metrics { newRead := StampedReading{ int(time.Now().Unix()), - 0.875*oldReadings[element.Name].Value + 0.125*((rand.Float32()*(element.UBound-element.LBound))+element.LBound), + int32(math.Round(0.875*float64(oldReadings[element.Name].Value) + 0.125*float64((rand.Float32()*(element.UBound-element.LBound))+element.LBound))), } newReadings[element.Name] = newRead } oldReadings = newReadings return newReadings } + +var oldStatuses = map[string]int{} + +func readDummyStatuses() map[string]int { + newStatuses := make(map[string]int) + for _, status := range _statuses { + newStatuses[status] = rand.Intn(2) + } + updateMap := make(map[string]int) + if len(oldStatuses) == 0 { + oldStatuses = newStatuses + updateMap = newStatuses + } else { + for k, v := range oldStatuses { + newv := newStatuses[k] + if newv != v { + updateMap[k] = newv + } + } + oldStatuses = newStatuses + } + return updateMap +} diff --git a/socketManager.go b/socketManager.go index b8a5404..3e24942 100644 --- a/socketManager.go +++ b/socketManager.go @@ -13,41 +13,43 @@ import ( "time" ) -var client = &_client{ // Our global client +var socketClient = &_socketClient{ // Our global client &websocket.Conn{}, sync.Mutex{}, sync.WaitGroup{}, + make(chan map[string]int), make(chan map[string][]StampedReading), make(chan Packet), make(map[string][]StampedReading), sync.Mutex{}, } -type _client struct { +type _socketClient struct { // Client struct definition - conn *websocket.Conn // The websocket connection - connLock sync.Mutex // Lock to manage concurrent writing - wait sync.WaitGroup // Waitgroup to manage connection (re)initialization - send chan map[string][]StampedReading // Channel to accept incoming data to send - resend chan Packet // Channel to accept packets that need resending - sendBuffer map[string][]StampedReading // Buffer to store data to be sent - bufferLock sync.Mutex // Lock to control concurrent access to send buffer + conn *websocket.Conn // The websocket connection + connLock sync.Mutex // Lock to manage concurrent writing + wait sync.WaitGroup // Waitgroup to manage connection (re)initialization + statusSend chan map[string]int + dataSend chan map[string][]StampedReading // Channel to accept incoming data to send + packetResend chan Packet // Channel to accept packets that need resending + sendBuffer map[string][]StampedReading // Buffer to store data to be sent + bufferLock sync.Mutex // Lock to control concurrent access to send buffer } -func (c *_client) start() { - client.wait.Add(1) // Make sure other loops don't do anything without a valid client +func (c *_socketClient) start() { + c.wait.Add(1) // Make sure other loops don't do anything without a valid client go c.connect() // Do the connecty thing go c.handlePackets() // Loop to handle incoming packets from send and resend channels go c.readIncoming() // Loop to read incoming messages from the gateway } -func (c *_client) connect() { +func (c *_socketClient) connect() { c.connLock.Lock() // Shouldn't matter because the other processes will be waiting, but I'm gonna do it anyways defer c.connLock.Unlock() - rootCAs := x509.NewCertPool() // Initialize a cert pool - rootCert, _ := ioutil.ReadFile("/opt/emulation-server/rootCA.pem") // Read our root CA cert - rootCAs.AppendCertsFromPEM(rootCert) // Add it to the pool + rootCAs := x509.NewCertPool() // Initialize a cert pool + rootCert, _ := ioutil.ReadFile("rootCA.pem") // Read our root CA cert + rootCAs.AppendCertsFromPEM(rootCert) // Add it to the pool config := &tls.Config{RootCAs: rootCAs} // Trust our root CA @@ -72,7 +74,7 @@ func (c *_client) connect() { } } -func (c *_client) addReadings(readings map[string][]StampedReading) { +func (c *_socketClient) addReadings(readings map[string][]StampedReading) { c.bufferLock.Lock() defer c.bufferLock.Unlock() for metric, value := range readings { // Put our data in the buffer @@ -84,23 +86,46 @@ func (c *_client) addReadings(readings map[string][]StampedReading) { } } -func (c *_client) handlePackets() { +func (c *_socketClient) handlePackets() { for { select { - case readings := <-c.send: // Data packet from the send channel + case readings := <-c.dataSend: // Data packet from the send channel c.addReadings(readings) - case packet := <-c.resend: // Packet to be resent - var readings map[string][]StampedReading - if err := json.Unmarshal(packet.Message, &readings); err != nil { // Pull data out of the packet - fmt.Printf("Error unmarshalling data from packet %v to resend: %v\n", packet.ID, err) - } else { - c.addReadings(readings) + case packet := <-c.packetResend: // Packet to be resent + switch packet.Kind { + case kindData: + var readings map[string][]StampedReading + if err := json.Unmarshal(packet.Message, &readings); err != nil { // Pull data out of the packet + fmt.Printf("Error unmarshalling data from packet %v to resend: %v\n", packet.ID, err) + } else { + c.addReadings(readings) + } + case kindStatus: + var statuses StatusUpdates + if err := json.Unmarshal(packet.Message, &statuses); err != nil { + fmt.Printf("Error unmarshalling status updates from packet %v to resend: %v\n", packet.ID, err) + } else { + c.sendStatusUpdates(statuses) + } } } } } -func (c *_client) sendData() { +func (c *_socketClient) sendPacket(p Packet) { + fmt.Printf("Sending %v packet with ID %v\n", p.Kind, p.ID) + c.connLock.Lock() // Lock to send + if err := c.conn.WriteJSON(p); err != nil { // Try to send + fmt.Printf("Error sending %v packet %v: %v\n", p.Kind, p.ID, err) // If we fail, try to reconnect + c.wait.Add(1) // Prevent client things from happening + go c.connect() // Reconnect + } + c.connLock.Unlock() // Unlock after + + responseManager.addPacket(p) // Wait for a response -- also requeues the data if sending it failed the first time. +} + +func (c *_socketClient) sendData() { c.wait.Wait() // Make sure client is connected c.bufferLock.Lock() // Lock to grab and replace buffer buffer := c.sendBuffer // Grab all the stuff there is to send @@ -115,20 +140,23 @@ func (c *_client) sendData() { } p := Packet{nextId(), kindData, byteMsg} // Wrap it into a packet - fmt.Printf("Sending packet %v\n", p.ID) + c.sendPacket(p) +} - c.connLock.Lock() // Lock to send - if err := c.conn.WriteJSON(p); err != nil { // Try to send - fmt.Printf("Error sending %v packet %v: %v\n", p.Kind, p.ID, err) // If we fail, try to reconnect - c.wait.Add(1) // Prevent client things from happening - go c.connect() // Reconnect +func (c *_socketClient) sendStatusUpdates(u StatusUpdates) { + c.wait.Wait() + byteMsg, err := json.Marshal(u) + if err != nil { + fmt.Printf("Error marshalling data: %v\n", err) + fmt.Println("NOT RETRYING") // TODO: probably something + return } - c.connLock.Unlock() // Unlock after - responseManager.addPacket(p) // Wait for a response -- also requeues the data if sending it failed the first time. + p := Packet{nextId(), kindStatus, byteMsg} + c.sendPacket(p) } -func (c *_client) readIncoming() { // Read messages from gateway +func (c *_socketClient) readIncoming() { // Read messages from gateway for { c.wait.Wait() // Wait for the client to be ready var packet Packet @@ -150,7 +178,7 @@ func (c *_client) readIncoming() { // Read messages from gateway func getHeader() http.Header { h := http.Header{} h.Set("Content-Type", "application/json") - h.Set(headerUUID, "0000000000000001") // TODO: get this from somewhere + h.Set(headerUUID, "0000000000000002") // TODO: get this from somewhere h.Set(headerTimestamp, strconv.FormatInt(time.Now().Unix(), 10)) return h }