Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Better organization hand handling of data resending
  • Loading branch information
wrr14001 committed Apr 1, 2019
1 parent d5df703 commit 57b36ec
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 141 deletions.
79 changes: 79 additions & 0 deletions responseManager.go
@@ -0,0 +1,79 @@
package main

import (
"errors"
"fmt"
"sync"
"time"
)

var responseManager = &_responseManager{make(map[int]*responseListener), sync.Mutex{}, make(chan int)}

type _responseManager struct {
// Struct to manage which responses we are waiting for
mapping map[int]*responseListener
lock sync.Mutex
resolved chan int
}

type responseListener struct {
// Struct to manage waiting for responses
packet Packet
timer *time.Timer
recv chan error
resend chan Packet
}

func (rman *_responseManager) addPacket(p Packet) { // Add a packet to the response map and start waiting for it
rl := &responseListener{p, time.NewTimer(10 * time.Second), make(chan error), client.resend}
rman.lock.Lock() // Safe map access
defer rman.lock.Unlock()
rman.mapping[p.ID] = rl
go rl.awaitResponse() // Start waiting
}

func (rman *_responseManager) received(p Packet) {
rman.lock.Lock()
listener, found := rman.mapping[p.ID] // Get the response listener
rman.lock.Unlock()

if !found {
fmt.Printf("Well, we received a response for packet %v, which is unexpected because there's no listener for it\n", p.ID)
return
}

var err error = nil

message := string(p.Message) // See what the message was
if message != messageReceived { // The only one that's implemented right now is messageReceived so this shouldn't fail
err = errors.New(fmt.Sprintf("Unexpected response message for packet %v: %v\n", p.ID, message))
} else {
fmt.Printf("Got good response for packet %v\n", p.ID)
}

listener.recv <- err
}

func (rman *_responseManager) remove(id int) {
rman.lock.Lock()
defer rman.lock.Unlock()
delete(rman.mapping, id)
}

func (rl *responseListener) awaitResponse() { // Gets a message from the timer or the Recv channel
select {
case <-rl.timer.C: // Timer expires, no response
fmt.Printf("Didn't get a response for packet %v, queueing for resend...\n", rl.packet.ID)
client.resend <- rl.packet
responseManager.remove(rl.packet.ID)

case err := <-rl.recv: // We got a response
rl.timer.Stop() // Stop that timer
if err != nil { // Something went wrong, resend TODO: probably check to make sure this is a server-side error
fmt.Printf("Response to packet %v was an error: %v\n", rl.packet.ID, err)
fmt.Printf("Queueing packet %v for resend...\n", rl.packet.ID)
client.resend <- rl.packet
}
responseManager.remove(rl.packet.ID)
}
}
42 changes: 14 additions & 28 deletions server.go
@@ -1,26 +1,31 @@
package main

import (
"fmt"
"math/rand"
"time"
)

var dataBuffer = make(map[string][]StampedReading) // Map to store readings in until they are sent
// This file should only handle initialization of actors

func main() {
go dataReadLoop() // Start our async read loop
client.wait.Add(1) // Make sure data and heartbeat loops don't do anything without a valid client
InitializeClient() // Get our client in this thread
go client.readIncoming() // Start our loop to read incoming messages
go dataSendLoop() // Start our async send loop
go heartbeatLoop() // Start our async heartbeat loop
go dataReadLoop() // Start our async read loop
client.start() // Start our client
go dataSendLoop() // Start our loop to tell the client when to send

select {} // Immortality
}

func dataSendLoop() {
for {
time.Sleep(time.Second * dataSendPeriod)
client.sendData()
}
}

// TODO: when we have a real modbus connection the next two funcs will be moved into a modbus manager
func dataReadLoop() { // Read data all day every day
for {
dataBuffer := make(map[string][]StampedReading)
reading := readData() // TODO: error handling
for metric, value := range reading { // Put our data in the buffer
if record, found := dataBuffer[metric]; !found {
Expand All @@ -29,31 +34,12 @@ func dataReadLoop() { // Read data all day every day
dataBuffer[metric] = append(record, value)
}
}
client.send <- dataBuffer
time.Sleep(time.Second * dataReadPeriod)
}
}

func dataSendLoop() {
for {
time.Sleep(time.Second * dataSendPeriod)
if len(dataBuffer) == 0 { // Only send if we have data
fmt.Println("No data to send... check logs (that don't exist yet) for data reading errors (also don't exist)")
continue
}
go client.sendData(dataBuffer)
dataBuffer = make(map[string][]StampedReading)
}
}

func heartbeatLoop() {
for {
go client.sendHeartbeat()
time.Sleep(time.Second * heartbeatPeriod)
}
}

func readData() map[string]StampedReading {
// TODO: modbus
return map[string]StampedReading{
"oil.temp": {
Timestamp: int(time.Now().Unix()),
Expand Down

0 comments on commit 57b36ec

Please sign in to comment.