From d5df70370a2e5dc891596c0992f6aff9c6e46207 Mon Sep 17 00:00:00 2001 From: William Reid Date: Sun, 3 Mar 2019 20:30:21 -0500 Subject: [PATCH] Changed back to websockets for simplifying bi-directional communication. Lots of organizational changes. --- constants.go | 11 ++- dataTypes.go | 10 ++- server.go | 169 +++++++++---------------------------------- socketManager.go | 183 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 232 insertions(+), 141 deletions(-) create mode 100644 socketManager.go diff --git a/constants.go b/constants.go index b0a4a07..3b417a1 100644 --- a/constants.go +++ b/constants.go @@ -2,13 +2,20 @@ package main const ( // Gateway addresses - dataEndpoint = "https://sd5-endpoint.engr.uconn.edu:48820/data" - heartbeatEndpoint = "https://sd5-endpoint.engr.uconn.edu:48820/heartbeat" + websocketEndpoint = "wss://sd5-endpoint.engr.uconn.edu:48820/ws" // Header constants headerUUID = "uuid" headerTimestamp = "time" + // message types + // outgoing + kindData = "data" + kindHeartbeat = "heartbeat" + // incoming + kindResponse = "response" + messageReceived = "received" + // Time (in seconds) to wait between performing read/send operations dataReadPeriod = 1 dataSendPeriod = 10 diff --git a/dataTypes.go b/dataTypes.go index 580dff5..93f9dc9 100644 --- a/dataTypes.go +++ b/dataTypes.go @@ -1,6 +1,12 @@ package main type StampedReading struct { - Timestamp int - Value interface{} + Timestamp int `json:"timestamp"` + Value interface{} `json:"value"` +} + +type Packet struct { + ID int `json:"id"` + Kind string `json:"kind"` + Message []byte `json:"message"` } diff --git a/server.go b/server.go index 8db30dd..4d37d47 100644 --- a/server.go +++ b/server.go @@ -1,95 +1,54 @@ package main import ( - "bytes" - "crypto/tls" - "crypto/x509" - "encoding/json" - "errors" "fmt" - "io/ioutil" "math/rand" - "net/http" - "strconv" "time" ) -type gatewayClient struct { - client *http.Client -} +var dataBuffer = make(map[string][]StampedReading) // Map to store readings in until they are sent func main() { - rootCAs := x509.NewCertPool() // Initialize a cert pool - rootCert, _ := ioutil.ReadFile("rootCA.pem") // Read our root CA cert - - if ok := rootCAs.AppendCertsFromPEM(rootCert); !ok { - fmt.Printf("Error adding gateway cert") - } - - config := &tls.Config{RootCAs: rootCAs,} // Trust our root CA - - tr := &http.Transport{TLSClientConfig: config,} - client := gatewayClient{&http.Client{Transport: tr}} - - dataReadChan := make(chan map[string]StampedReading) // Channel to handle data readings - dataBuffer := make(map[string][]StampedReading) // Map to store readings in until they are sent - - go dataReadLoop(dataReadChan) // Start our async read loop - dataSendTick := time.NewTicker(time.Second * dataSendPeriod) // Ticker to send data - heartbeatTick := time.NewTicker(time.Second * heartbeatPeriod) // Ticker to send heartbeats - - if err := client.sendHeartbeat(); err != nil { - fmt.Printf("Error sending startup heartbeat: %v\n", err) - } + go dataReadLoop() // Start our async read loop + client.wait.Add(1) // Make sure data and heartbeat loops don't do anything without a valid client + InitializeClient() // Get our client in this thread + go client.readIncoming() // Start our loop to read incoming messages + go dataSendLoop() // Start our async send loop + go heartbeatLoop() // Start our async heartbeat loop + + select {} // Immortality +} - // Non-blocking channel reads +func dataReadLoop() { // Read data all day every day for { - select { - - case reading := <-dataReadChan: - // We got a data packet - fmt.Printf("New reading: %v\n", reading) - - // Put our reading in the map - for metric, value := range reading { - if record, found := dataBuffer[metric]; !found { - dataBuffer[metric] = []StampedReading{value} - } else { - dataBuffer[metric] = append(record, value) - } - } - - case <-dataSendTick.C: - // Time to send some data - if len(dataBuffer) > 0 { // Only send if we have data - if err := client.sendData(dataBuffer); err != nil { // Something went wrong - fmt.Printf("Error sending data: %v\n", err) - } else { // Data sent, clear the map - fmt.Println("Sent data") - dataBuffer = make(map[string][]StampedReading) - } - } else { // There was no data to send - fmt.Println("Got data send signal but buffer is empty") - // TODO: manage the map so that it doesn't run out of memory and kill everything if we don't send data - } - - case <-heartbeatTick.C: - // Send a heartbeat - if err := client.sendHeartbeat(); err != nil { - fmt.Printf("Error sending heartbeat: %v\n", err) + reading := readData() // TODO: error handling + for metric, value := range reading { // Put our data in the buffer + if record, found := dataBuffer[metric]; !found { + dataBuffer[metric] = []StampedReading{value} } else { - fmt.Println("Sent heartbeat") + dataBuffer[metric] = append(record, value) } } + time.Sleep(time.Second * dataReadPeriod) + } +} + +func dataSendLoop() { + for { + time.Sleep(time.Second * dataSendPeriod) + if len(dataBuffer) == 0 { // Only send if we have data + fmt.Println("No data to send... check logs (that don't exist yet) for data reading errors (also don't exist)") + continue + } + go client.sendData(dataBuffer) + dataBuffer = make(map[string][]StampedReading) } } -func dataReadLoop(x chan map[string]StampedReading) { +func heartbeatLoop() { for { - // Send our read packets down the channel - // TODO: error handling - x <- readData() - time.Sleep(time.Second) + go client.sendHeartbeat() + time.Sleep(time.Second * heartbeatPeriod) } } @@ -106,67 +65,3 @@ func readData() map[string]StampedReading { }, } } - -func (client *gatewayClient) sendData(data map[string][]StampedReading) error { - - jsonStr, err := json.Marshal(data) // Dump our data map into a json string - if err != nil { - return err - } - - req, err := http.NewRequest("POST", dataEndpoint, bytes.NewBuffer(jsonStr)) // Create our request - if err != nil { - return err - } - - fillHeader(req) // Get id and timestamp - - resp, err := client.client.Do(req) // Do the request - if err != nil { - return err - } - - defer func() { - if err := resp.Body.Close(); err != nil { // Honestly not sure what this is for - fmt.Printf("Error: %v\n", err) - } - }() - - if resp.StatusCode < 200 || resp.StatusCode >= 300 { // Make sure it was successful - return errors.New(fmt.Sprintf("Received a non-200 response: %v\n", resp.StatusCode)) - } - - return nil -} - -func (client *gatewayClient) sendHeartbeat() error { - req, err := http.NewRequest("GET", heartbeatEndpoint, nil) // Send to heartbeat endpoint - if err != nil { - return err - } - - fillHeader(req) // Get id and timestamp - - resp, err := client.client.Do(req) // Do it - if err != nil { - return err - } - - defer func() { - if err := resp.Body.Close(); err != nil { // This thing again - fmt.Printf("Error: %v\n", err) - } - }() - - if resp.StatusCode < 200 || resp.StatusCode >= 300 { // Make sure it was successful - return errors.New(fmt.Sprintf("Received a non-200 response: %v\n", resp.StatusCode)) - } - - return nil -} - -func fillHeader(req *http.Request) { // Sets the uuid and timestamp for a request - req.Header.Set("Content-Type", "application/json") - req.Header.Set(headerUUID, "0000001bcd3f") // TODO: get this from somewhere - req.Header.Set(headerTimestamp, strconv.FormatInt(time.Now().Unix(), 10)) -} diff --git a/socketManager.go b/socketManager.go new file mode 100644 index 0000000..23a422b --- /dev/null +++ b/socketManager.go @@ -0,0 +1,183 @@ +package main + +import ( + "crypto/tls" + "crypto/x509" + "encoding/json" + "errors" + "fmt" + "github.com/gorilla/websocket" + "io/ioutil" + "net/http" + "strconv" + "sync" + "time" +) + +var client = &socketClient{&websocket.Conn{}, sync.Mutex{}, sync.WaitGroup{}} +var responseMap = &ResponseMap{make(map[int]*ResponseListener), sync.Mutex{}} + +type socketClient struct { + // Manage our connection + conn *websocket.Conn + lock sync.Mutex + wait sync.WaitGroup +} + +func InitializeClient() { + client.lock.Lock() // Shouldn't matter because the other processes will be waiting, but I'm gonna do it anyways + defer client.lock.Unlock() + rootCAs := x509.NewCertPool() // Initialize a cert pool + rootCert, _ := ioutil.ReadFile("rootCA.pem") // Read our root CA cert + rootCAs.AppendCertsFromPEM(rootCert) // Add it to the pool + + config := &tls.Config{RootCAs: rootCAs,} // Trust our root CA + + dialer := websocket.Dialer{ // Create a websocket dialer with our tls config + TLSClientConfig: config, + } + + for { + conn, _, err := dialer.Dial(websocketEndpoint, getHeader()) // send our dial request with a header containing id + if err != nil { + fmt.Printf("Error getting websocket client: %v\n", err) + fmt.Println("Retrying in 5 seconds...") // TODO: back off after repeated failures + time.Sleep(5 * time.Second) + continue + } + + client.conn = conn + client.wait.Done() // Now that we have a connection, let communication proceed + return + } +} + +func (client *socketClient) sendData(data map[string][]StampedReading) { + byteMsg, err := json.Marshal(data) // Send data as a byte slice for easy unmarshalling on the gateway side + if err != nil { + fmt.Printf("Error marshalling data: %v\n", err) + fmt.Println("NOT RETRYING") + return + } + client.sendPacket(Packet{ID: nextId(), Kind: kindData, Message: byteMsg}) +} + +func (client *socketClient) sendHeartbeat() { + client.sendPacket(Packet{ID: nextId(), Kind: kindHeartbeat, Message: nil}) +} + +func (client *socketClient) sendPacket(p Packet) { + client.wait.Wait() // Make sure client is initialized + client.lock.Lock() // Lock to send + if err := client.conn.WriteJSON(p); err != nil { // Try to send + fmt.Printf("Error sending %v packet %v: %v\n", p.Kind, p.ID, err) // If we fail, try to reconnect + client.wait.Add(1) + go InitializeClient() // Reconnect + } + client.lock.Unlock() // Unlock after + defer responseMap.addPacket(p) // Wait for a response -- also requeues the message if it failed the first time. + // FIXME: Upon reconnection, this spams data at the gateway in lots of small packets -- change to consolidate packets somehow. + // Probably combine the above change with some backoff for read times and send attempts. Works for now. + // Probably don't need to retry sending heartbeats + fmt.Printf("Sending packet %v\n", p.ID) +} + +func getHeader() http.Header { + h := http.Header{} + h.Set("Content-Type", "application/json") + h.Set(headerUUID, "0000000000000001") // TODO: get this from somewhere + h.Set(headerTimestamp, strconv.FormatInt(time.Now().Unix(), 10)) + return h +} + +var idManager = packetId{0, sync.Mutex{}} // One ID manager to rule them all + +type packetId struct { + _id int + lock sync.Mutex +} + +func nextId() int { // Make sure we get a different number every time this is called + idManager.lock.Lock() + defer idManager.lock.Unlock() + idManager._id++ + return idManager._id +} + +type ResponseListener struct { + // Struct to manage waiting for responses + Packet Packet + Timer *time.Timer + Recv chan error +} + +type ResponseMap struct { + // Struct to manage which responses we are waiting for + mapping map[int]*ResponseListener + lock sync.Mutex +} + +func (rmap *ResponseMap) addPacket(p Packet) { // Add a packet to the response map and start waiting for it + timer := time.NewTimer(10 * time.Second) + rl := &ResponseListener{p, timer, make(chan error)} + rmap.lock.Lock() // Safe map access + defer rmap.lock.Unlock() + rmap.mapping[p.ID] = rl + go rl.awaitResponse() // Start waiting +} + +func (rmap *ResponseMap) received(p Packet) { + rmap.lock.Lock() + listener, found := rmap.mapping[p.ID] // Get the response listener + rmap.lock.Unlock() + + message := string(p.Message) // See what the message was + + if message != messageReceived { // The only one that's implemented right now is messageReceived so this shouldn't fail + listener.Recv <- errors.New(fmt.Sprintf("Unexpected response message for packet %v: %v\n", p.ID, message)) + return + } + + if found { // Got our listener + fmt.Printf("Got good response for packet %v\n", p.ID) + listener.Recv <- nil // Tell it there's no error + rmap.lock.Lock() + delete(rmap.mapping, p.ID) // Get rid of it in the response map + rmap.lock.Unlock() + } +} + +func (rl *ResponseListener) awaitResponse() { // Gets a message from the timer or the Recv channel + select { + case <-rl.Timer.C: // Timer expires, no response + fmt.Printf("Didn't get a response for packet %v, resending...\n", rl.Packet.ID) + go client.sendPacket(rl.Packet) + + case err := <-rl.Recv: // We got a response + rl.Timer.Stop() // Stop that timer + if err != nil { // Something went wrong, resend TODO: probably check to make sure this is a server-side error + fmt.Printf("Response to packet %v was an error: %v\n", rl.Packet.ID, err) + fmt.Printf("Resending packet %v...\n", rl.Packet.ID) + go client.sendPacket(rl.Packet) + } + } +} + +func (client *socketClient) readIncoming() { // Read messages from gateway + for { + client.wait.Wait() // Wait for the client to be ready + var packet Packet + if err := client.conn.ReadJSON(&packet); err != nil { // Get a packet + fmt.Printf("Error reading from websocket: %v\n", err) // Connection is screwed + client.wait.Add(1) + go InitializeClient() // Reconnect + } else { + switch packet.Kind { + case kindResponse: // Got a response + go responseMap.received(packet) // Tell the response map + default: + fmt.Printf("Got unknown packet type: %v\n", packet.Kind) + } + } + } +}