diff --git a/responseManager.go b/responseManager.go new file mode 100644 index 0000000..8852e2a --- /dev/null +++ b/responseManager.go @@ -0,0 +1,79 @@ +package main + +import ( + "errors" + "fmt" + "sync" + "time" +) + +var responseManager = &_responseManager{make(map[int]*responseListener), sync.Mutex{}, make(chan int)} + +type _responseManager struct { + // Struct to manage which responses we are waiting for + mapping map[int]*responseListener + lock sync.Mutex + resolved chan int +} + +type responseListener struct { + // Struct to manage waiting for responses + packet Packet + timer *time.Timer + recv chan error + resend chan Packet +} + +func (rman *_responseManager) addPacket(p Packet) { // Add a packet to the response map and start waiting for it + rl := &responseListener{p, time.NewTimer(10 * time.Second), make(chan error), client.resend} + rman.lock.Lock() // Safe map access + defer rman.lock.Unlock() + rman.mapping[p.ID] = rl + go rl.awaitResponse() // Start waiting +} + +func (rman *_responseManager) received(p Packet) { + rman.lock.Lock() + listener, found := rman.mapping[p.ID] // Get the response listener + rman.lock.Unlock() + + if !found { + fmt.Printf("Well, we received a response for packet %v, which is unexpected because there's no listener for it\n", p.ID) + return + } + + var err error = nil + + message := string(p.Message) // See what the message was + if message != messageReceived { // The only one that's implemented right now is messageReceived so this shouldn't fail + err = errors.New(fmt.Sprintf("Unexpected response message for packet %v: %v\n", p.ID, message)) + } else { + fmt.Printf("Got good response for packet %v\n", p.ID) + } + + listener.recv <- err +} + +func (rman *_responseManager) remove(id int) { + rman.lock.Lock() + defer rman.lock.Unlock() + delete(rman.mapping, id) +} + +func (rl *responseListener) awaitResponse() { // Gets a message from the timer or the Recv channel + select { + case <-rl.timer.C: // Timer expires, no response + fmt.Printf("Didn't get a response for packet %v, queueing for resend...\n", rl.packet.ID) + client.resend <- rl.packet + responseManager.remove(rl.packet.ID) + + case err := <-rl.recv: // We got a response + rl.timer.Stop() // Stop that timer + if err != nil { // Something went wrong, resend TODO: probably check to make sure this is a server-side error + fmt.Printf("Response to packet %v was an error: %v\n", rl.packet.ID, err) + fmt.Printf("Queueing packet %v for resend...\n", rl.packet.ID) + client.resend <- rl.packet + } + responseManager.remove(rl.packet.ID) + } +} diff --git a/server.go b/server.go index 4d37d47..060ff4b 100644 --- a/server.go +++ b/server.go @@ -1,26 +1,31 @@ package main import ( - "fmt" "math/rand" "time" ) -var dataBuffer = make(map[string][]StampedReading) // Map to store readings in until they are sent +// This file should only handle initialization of actors func main() { - go dataReadLoop() // Start our async read loop - client.wait.Add(1) // Make sure data and heartbeat loops don't do anything without a valid client - InitializeClient() // Get our client in this thread - go client.readIncoming() // Start our loop to read incoming messages - go dataSendLoop() // Start our async send loop - go heartbeatLoop() // Start our async heartbeat loop + go dataReadLoop() // Start our async read loop + client.start() // Start our client + go dataSendLoop() // Start our loop to tell the client when to send select {} // Immortality } +func dataSendLoop() { + for { + time.Sleep(time.Second * dataSendPeriod) + client.sendData() + } +} + +// TODO: when we have a real modbus connection the next two funcs will be moved into a modbus manager func dataReadLoop() { // Read data all day every day for { + dataBuffer := make(map[string][]StampedReading) reading := readData() // TODO: error handling for metric, value := range reading { // Put our data in the buffer if record, found := dataBuffer[metric]; !found { @@ -29,31 +34,12 @@ func dataReadLoop() { // Read data all day every day dataBuffer[metric] = append(record, value) } } + client.send <- dataBuffer time.Sleep(time.Second * dataReadPeriod) } } -func dataSendLoop() { - for { - time.Sleep(time.Second * dataSendPeriod) - if len(dataBuffer) == 0 { // Only send if we have data - fmt.Println("No data to send... check logs (that don't exist yet) for data reading errors (also don't exist)") - continue - } - go client.sendData(dataBuffer) - dataBuffer = make(map[string][]StampedReading) - } -} - -func heartbeatLoop() { - for { - go client.sendHeartbeat() - time.Sleep(time.Second * heartbeatPeriod) - } -} - func readData() map[string]StampedReading { - // TODO: modbus return map[string]StampedReading{ "oil.temp": { Timestamp: int(time.Now().Unix()), diff --git a/socketManager.go b/socketManager.go index 23a422b..9fadfd7 100644 --- a/socketManager.go +++ b/socketManager.go @@ -4,7 +4,6 @@ import ( "crypto/tls" "crypto/x509" "encoding/json" - "errors" "fmt" "github.com/gorilla/websocket" "io/ioutil" @@ -14,19 +13,38 @@ import ( "time" ) -var client = &socketClient{&websocket.Conn{}, sync.Mutex{}, sync.WaitGroup{}} -var responseMap = &ResponseMap{make(map[int]*ResponseListener), sync.Mutex{}} +var client = &_client{ // Our global client + &websocket.Conn{}, + sync.Mutex{}, + sync.WaitGroup{}, + make(chan map[string][]StampedReading), + make(chan Packet), + make(map[string][]StampedReading), + sync.Mutex{}, +} -type socketClient struct { - // Manage our connection - conn *websocket.Conn - lock sync.Mutex - wait sync.WaitGroup +type _client struct { + // Client struct definition + conn *websocket.Conn // The websocket connection + connLock sync.Mutex // Lock to manage concurrent writing + wait sync.WaitGroup // Waitgroup to manage connection (re)initialization + send chan map[string][]StampedReading // Channel to accept incoming data to send + resend chan Packet // Channel to accept packets that need resending + sendBuffer map[string][]StampedReading // Buffer to store data to be sent + bufferLock sync.Mutex // Lock to control concurrent access to send buffer } -func InitializeClient() { - client.lock.Lock() // Shouldn't matter because the other processes will be waiting, but I'm gonna do it anyways - defer client.lock.Unlock() +func (c *_client) start() { + client.wait.Add(1) // Make sure other loops don't do anything without a valid client + go c.connect() // Do the connecty thing + go c.handlePackets() // Loop to handle incoming packets from send and resend channels + go c.readIncoming() // Loop to read incoming messages from the gateway +} + +func (c *_client) connect() { + c.connLock.Lock() // Shouldn't matter because the other processes will be waiting, but I'm gonna do it anyways + defer c.connLock.Unlock() + rootCAs := x509.NewCertPool() // Initialize a cert pool rootCert, _ := ioutil.ReadFile("rootCA.pem") // Read our root CA cert rootCAs.AppendCertsFromPEM(rootCert) // Add it to the pool @@ -37,49 +55,96 @@ func InitializeClient() { TLSClientConfig: config, } - for { + for { // Do this until we succeed conn, _, err := dialer.Dial(websocketEndpoint, getHeader()) // send our dial request with a header containing id if err != nil { fmt.Printf("Error getting websocket client: %v\n", err) - fmt.Println("Retrying in 5 seconds...") // TODO: back off after repeated failures + fmt.Println("Retrying in 5 seconds...") + // TODO: back off after repeated failures + // TODO: read data less frequently after repeated errors time.Sleep(5 * time.Second) continue } - client.conn = conn - client.wait.Done() // Now that we have a connection, let communication proceed + c.conn = conn + c.wait.Done() // Now that we have a connection, let communication proceed return } } -func (client *socketClient) sendData(data map[string][]StampedReading) { - byteMsg, err := json.Marshal(data) // Send data as a byte slice for easy unmarshalling on the gateway side +func (c *_client) addReadings(readings map[string][]StampedReading) { + c.bufferLock.Lock() + defer c.bufferLock.Unlock() + for metric, value := range readings { // Put our data in the buffer + if record, found := c.sendBuffer[metric]; !found { + c.sendBuffer[metric] = value // If there are no readings for this metric yet, add a new map key + } else { + c.sendBuffer[metric] = append(record, value...) // Otherwise just throw them in at the end + } + } +} + +func (c *_client) handlePackets() { + for { + select { + case readings := <-c.send: // Data packet from the send channel + c.addReadings(readings) + case packet := <-c.resend: // Packet to be resent + var readings map[string][]StampedReading + if err := json.Unmarshal(packet.Message, &readings); err != nil { // Pull data out of the packet + fmt.Printf("Error unmarshalling data from packet %v to resend: %v\n", packet.ID, err) + } else { + c.addReadings(readings) + } + } + } +} + +func (c *_client) sendData() { + c.wait.Wait() // Make sure client is connected + c.bufferLock.Lock() // Lock to grab and replace buffer + buffer := c.sendBuffer // Grab all the stuff there is to send + c.sendBuffer = make(map[string][]StampedReading) // Clear the buffer + c.bufferLock.Unlock() // Unlock after + + byteMsg, err := json.Marshal(buffer) // Send data as a byte slice for easy unmarshalling on the gateway side if err != nil { fmt.Printf("Error marshalling data: %v\n", err) - fmt.Println("NOT RETRYING") + fmt.Println("NOT RETRYING") // TODO: probably something return } - client.sendPacket(Packet{ID: nextId(), Kind: kindData, Message: byteMsg}) -} -func (client *socketClient) sendHeartbeat() { - client.sendPacket(Packet{ID: nextId(), Kind: kindHeartbeat, Message: nil}) -} + p := Packet{nextId(), kindData, byteMsg} // Wrap it into a packet + fmt.Printf("Sending packet %v\n", p.ID) -func (client *socketClient) sendPacket(p Packet) { - client.wait.Wait() // Make sure client is initialized - client.lock.Lock() // Lock to send - if err := client.conn.WriteJSON(p); err != nil { // Try to send + c.connLock.Lock() // Lock to send + if err := c.conn.WriteJSON(p); err != nil { // Try to send fmt.Printf("Error sending %v packet %v: %v\n", p.Kind, p.ID, err) // If we fail, try to reconnect - client.wait.Add(1) - go InitializeClient() // Reconnect + c.wait.Add(1) // Prevent client things from happening + go c.connect() // Reconnect + } + c.connLock.Unlock() // Unlock after + + responseManager.addPacket(p) // Wait for a response -- also requeues the data if sending it failed the first time. +} + +func (c *_client) readIncoming() { // Read messages from gateway + for { + c.wait.Wait() // Wait for the client to be ready + var packet Packet + if err := c.conn.ReadJSON(&packet); err != nil { // Get a packet + fmt.Printf("Error reading from websocket: %v\n", err) // Connection is screwed + c.wait.Add(1) + go c.connect() // Reconnect + } else { + switch packet.Kind { + case kindResponse: // Got a response + go responseManager.received(packet) // Tell the response map + default: + fmt.Printf("Got unknown packet type: %v\n", packet.Kind) + } + } } - client.lock.Unlock() // Unlock after - defer responseMap.addPacket(p) // Wait for a response -- also requeues the message if it failed the first time. - // FIXME: Upon reconnection, this spams data at the gateway in lots of small packets -- change to consolidate packets somehow. - // Probably combine the above change with some backoff for read times and send attempts. Works for now. - // Probably don't need to retry sending heartbeats - fmt.Printf("Sending packet %v\n", p.ID) } func getHeader() http.Header { @@ -103,81 +168,3 @@ func nextId() int { // Make sure we get a different number every time this is ca idManager._id++ return idManager._id } - -type ResponseListener struct { - // Struct to manage waiting for responses - Packet Packet - Timer *time.Timer - Recv chan error -} - -type ResponseMap struct { - // Struct to manage which responses we are waiting for - mapping map[int]*ResponseListener - lock sync.Mutex -} - -func (rmap *ResponseMap) addPacket(p Packet) { // Add a packet to the response map and start waiting for it - timer := time.NewTimer(10 * time.Second) - rl := &ResponseListener{p, timer, make(chan error)} - rmap.lock.Lock() // Safe map access - defer rmap.lock.Unlock() - rmap.mapping[p.ID] = rl - go rl.awaitResponse() // Start waiting -} - -func (rmap *ResponseMap) received(p Packet) { - rmap.lock.Lock() - listener, found := rmap.mapping[p.ID] // Get the response listener - rmap.lock.Unlock() - - message := string(p.Message) // See what the message was - - if message != messageReceived { // The only one that's implemented right now is messageReceived so this shouldn't fail - listener.Recv <- errors.New(fmt.Sprintf("Unexpected response message for packet %v: %v\n", p.ID, message)) - return - } - - if found { // Got our listener - fmt.Printf("Got good response for packet %v\n", p.ID) - listener.Recv <- nil // Tell it there's no error - rmap.lock.Lock() - delete(rmap.mapping, p.ID) // Get rid of it in the response map - rmap.lock.Unlock() - } -} - -func (rl *ResponseListener) awaitResponse() { // Gets a message from the timer or the Recv channel - select { - case <-rl.Timer.C: // Timer expires, no response - fmt.Printf("Didn't get a response for packet %v, resending...\n", rl.Packet.ID) - go client.sendPacket(rl.Packet) - - case err := <-rl.Recv: // We got a response - rl.Timer.Stop() // Stop that timer - if err != nil { // Something went wrong, resend TODO: probably check to make sure this is a server-side error - fmt.Printf("Response to packet %v was an error: %v\n", rl.Packet.ID, err) - fmt.Printf("Resending packet %v...\n", rl.Packet.ID) - go client.sendPacket(rl.Packet) - } - } -} - -func (client *socketClient) readIncoming() { // Read messages from gateway - for { - client.wait.Wait() // Wait for the client to be ready - var packet Packet - if err := client.conn.ReadJSON(&packet); err != nil { // Get a packet - fmt.Printf("Error reading from websocket: %v\n", err) // Connection is screwed - client.wait.Add(1) - go InitializeClient() // Reconnect - } else { - switch packet.Kind { - case kindResponse: // Got a response - go responseMap.received(packet) // Tell the response map - default: - fmt.Printf("Got unknown packet type: %v\n", packet.Kind) - } - } - } -}