Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Merge branch 'develop' into emulation_generator
# Conflicts:
#	constants.go
#	socketManager.go

Implemented emulated status updates
  • Loading branch information
Evan Langlais committed Apr 30, 2019
2 parents 89aa727 + 2033523 commit b5b1585
Show file tree
Hide file tree
Showing 6 changed files with 365 additions and 55 deletions.
17 changes: 12 additions & 5 deletions constants.go
Expand Up @@ -10,16 +10,23 @@ const (

// message types
// outgoing
kindData = "data"
kindHeartbeat = "heartbeat"
kindData = "data"
kindStatus = "status"
// incoming
kindResponse = "response"
messageReceived = "received"

// Time (in seconds) to wait between performing read/send operations
dataReadPeriod = 300
dataSendPeriod = 300
heartbeatPeriod = 10
dataReadPeriod = 300
dataSendPeriod = 300
statusReadPeriod = 300

// Modbus data types
WORD = "WORD"
SWORD = "SWORD"

// Maximum number of modbus registers we can read at a time
maxRegisterRead = 50
)

var metrics = [...]Metric{
Expand Down
9 changes: 7 additions & 2 deletions dataTypes.go
@@ -1,8 +1,8 @@
package main

type StampedReading struct {
Timestamp int `json:"timestamp"`
Value float32 `json:"value"`
Timestamp int `json:"timestamp"`
Value int32 `json:"value"`
}

type Packet struct {
Expand All @@ -16,3 +16,8 @@ type Metric struct {
LBound float32
UBound float32
}

type StatusUpdates struct {
Timestamp int `json:"timestamp"`
Updates map[string]int `json:"updates"`
}
233 changes: 233 additions & 0 deletions modbusManager.go
@@ -0,0 +1,233 @@
package main

import (
"encoding/binary"
"fmt"
"github.com/goburrow/modbus"
"sort"
"strconv"
"strings"
"sync"
"time"
)

var modbusClient = &_modbusClient{
modbus.NewClient(nil),
sync.Mutex{},
sync.WaitGroup{},
_registers,
}

type _modbusClient struct {
conn modbus.Client
lock sync.Mutex
wait sync.WaitGroup
registers map[int][2]string
}

func (m *_modbusClient) start() {
m.wait.Add(1)
go m.connect()
}

func (m *_modbusClient) connect() {
handler := modbus.NewRTUClientHandler("/dev/ttyUSB0")
handler.BaudRate = 19200
handler.DataBits = 8
handler.Parity = "N"
handler.StopBits = 1
handler.SlaveId = 1
handler.Timeout = 5 * time.Second
if err := handler.Connect(); err != nil {
fmt.Printf("Error connecting to modbus RTU client: %v\n", err)
}
m.conn = modbus.NewClient(handler)
m.wait.Done()
}

func (m *_modbusClient) readHoldingRegisters(start uint16, num uint16) []byte {
m.wait.Wait()
m.lock.Lock()
defer m.lock.Unlock()
data, err := m.conn.ReadHoldingRegisters(start, num)
if err != nil {
fmt.Printf("Error reading %v holding registers starting with %v: %v\n", start, num, err)
return []byte{}
} else {
return data
}
}

func (m *_modbusClient) readAllRegisterData() map[string]StampedReading {
m.wait.Wait() // Wait for client to start up
var keys []int // Array for register numbers
for k := range m.registers {
keys = append(keys, k)
}
sort.Ints(keys) // Make sure it's ascending

results := make(map[string]StampedReading) // Result map
intervals := m.readsForAllRegisters() // Modbus reading intervals
var data []byte // Data from modbus

m.lock.Lock() // One reader at a time to be safe
for _, interval := range intervals { // Do readings for all intervals
start := uint16(interval[0])
number := uint16(interval[1])
reading, err := m.conn.ReadHoldingRegisters(start, number) // Read the registers
if err != nil { // Make sure it worked
fmt.Printf("Error reading %v registers starting at %v: %v\n", number, start, err)
continue
}
data = append(data, reading...) // Accumulate the data
}
m.lock.Unlock()

timestamp := int(time.Now().Unix())
for _, interval := range intervals { // Do mappings for all intervals
start := uint16(interval[0])
number := uint16(interval[1])
for r := 0; r < int(start+number); r++ { // Check all the registers in this range
if val, found := m.registers[r]; found { // If we have a metric for this, add it to results
switch val[1] {
case WORD:
results[val[0]] = StampedReading{timestamp, int32(binary.LittleEndian.Uint16(data[2*r : 2*r+2]))}
case SWORD:
results[val[0]] = StampedReading{timestamp, int32(int16(binary.LittleEndian.Uint16(data[2*r : 2*r+2])))}
}
}
}
}

return results
}

func (m *_modbusClient) readsForAllRegisters() [][2]int {
var reads [][2]int
var keys []int
for k := range m.registers {
keys = append(keys, k)
}
sort.Ints(keys)

startReg := keys[0]
var i int
for i = 0; i < len(keys); i++ {
if keys[i]-startReg > 49 {
reads = append(reads, [2]int{startReg, keys[i-1] - startReg + 1})
startReg = keys[i]
}
}
reads = append(reads, [2]int{startReg, keys[i-1] - startReg + 1})

return reads
}

func reverse(s string) string {
runes := []rune(s)
for i, j := 0, len(runes)-1; i < j; i, j = i+1, j-1 {
runes[i], runes[j] = runes[j], runes[i]
}
return string(runes)
}

func (m *_modbusClient) getStatusCodes() string {
data := m.readHoldingRegisters(uint16(1499), uint16(16))
binstr := make([]string, len(data))
for i := 0; i < len(data); i += 2 {
binstr[i] = reverse(fmt.Sprintf("%08s", strconv.FormatInt(int64(data[i+1]), 2)))
binstr[i+1] = reverse(fmt.Sprintf("%08s", strconv.FormatInt(int64(data[i]), 2)))
}
return strings.Join(binstr, "")
}

func (m *_modbusClient) getStatusUpdates() map[string]int {
statuses := m.getStatusCodes()
fmt.Printf("Statuses: %v\n", statuses)
statusMap := make(map[string]int)
for k, v := range _statuses {
status := statuses[k] - 48
statusMap[v] = int(status)
}
if len(_lastStatus) == 0 {
_lastStatus = statusMap
return statusMap
} else {
updateMap := make(map[string]int)
for k, v := range _lastStatus {
newv := statusMap[k]
if newv != v {
updateMap[k] = newv
}
}
_lastStatus = statusMap
return updateMap
}
}

var _registers = map[int][2]string{
0: {"l1.l2.voltage", WORD},
1: {"l2.l3.voltage", WORD},
2: {"l3.l1.voltage", WORD},
3: {"l1.l0.voltage", WORD},
4: {"l2.l0.voltage", WORD},
5: {"l3.l0.voltage", WORD},
6: {"l1.current", WORD},
7: {"l2.current", WORD},
8: {"l3.current", WORD},
9: {"frequency", WORD},
10: {"total.kw", WORD},
11: {"rate.kw", WORD},
12: {"total.pf", SWORD},
13: {"l1.kw", WORD},
14: {"l1.pf", SWORD},
15: {"l2.kw", WORD},
16: {"l2.pf", SWORD},
17: {"l3.kw", WORD},
18: {"l3.pf", SWORD},
19: {"total.kvar", SWORD},
20: {"l1.kvar", SWORD},
21: {"l2.kvar", SWORD},
22: {"l3.kvar", SWORD},
23: {"total.kva", WORD},
24: {"l1.kva", WORD},
25: {"l2.kva", WORD},
26: {"l3.kva", WORD},
32: {"oil.pressure", SWORD},
33: {"coolant.temp", SWORD},
34: {"engine.rpm", WORD},
35: {"battery.voltage", WORD},
36: {"fuel.pressure", WORD},
37: {"fuel.temp", SWORD},
38: {"fuel.rate", WORD},
40: {"coolant.pressure", WORD},
41: {"coolant.level", WORD},
42: {"oil.temp", SWORD},
43: {"oil.level", WORD},
44: {"crankcase.pressure", WORD},
45: {"ambient.temp", SWORD},
46: {"ecm.battery.voltage", WORD},
48: {"intake.temp", SWORD},
49: {"intake.pressure", WORD},
}

var _statuses = map[int]string{
0: "fault.emstop",
1: "fault.overspeed",
2: "fault.overcrank",
5: "fault.coolant.temp.low",
6: "fault.fuel.low",
7: "fault.engine.temp.high",
8: "fault.oil.pressure.low",
9: "status.notinauto",
11: "fault.batt.volt.low",
12: "fault.batt.volt.high",
14: "status.system.ready",
19: "fault.coolant.level.low",
27: "fault.crank.volt.low",
34: "status.eps",
39: "status.gen.running",
207: "fault.common",
}

var _lastStatus = map[string]int{}
10 changes: 5 additions & 5 deletions responseManager.go
Expand Up @@ -25,7 +25,7 @@ type responseListener struct {
}

func (rman *_responseManager) addPacket(p Packet) { // Add a packet to the response map and start waiting for it
rl := &responseListener{p, time.NewTimer(10 * time.Second), make(chan error), client.resend}
rl := &responseListener{p, time.NewTimer(10 * time.Second), make(chan error), socketClient.packetResend}
rman.lock.Lock() // Safe map access
defer rman.lock.Unlock()
rman.mapping[p.ID] = rl
Expand All @@ -44,7 +44,7 @@ func (rman *_responseManager) received(p Packet) {

var err error = nil

message := string(p.Message) // See what the message was
message := string(p.Message) // See what the message was
if message != messageReceived { // The only one that's implemented right now is messageReceived so this shouldn't fail
err = errors.New(fmt.Sprintf("Unexpected response message for packet %v: %v\n", p.ID, message))
} else {
Expand All @@ -64,15 +64,15 @@ func (rl *responseListener) awaitResponse() { // Gets a message from the timer o
select {
case <-rl.timer.C: // Timer expires, no response
fmt.Printf("Didn't get a response for packet %v, queueing for resend...\n", rl.packet.ID)
client.resend <- rl.packet
socketClient.packetResend <- rl.packet
responseManager.remove(rl.packet.ID)

case err := <-rl.recv: // We got a response
rl.timer.Stop() // Stop that timer
rl.timer.Stop() // Stop that timer
if err != nil { // Something went wrong, resend TODO: probably check to make sure this is a server-side error
fmt.Printf("Response to packet %v was an error: %v\n", rl.packet.ID, err)
fmt.Printf("Queueing packet %v for resend...\n", rl.packet.ID)
client.resend <- rl.packet
socketClient.packetResend <- rl.packet
}
responseManager.remove(rl.packet.ID)
}
Expand Down

0 comments on commit b5b1585

Please sign in to comment.