Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Changed back to websockets for simplifying bi-directional communicati…
…on. Lots of organizational changes.
  • Loading branch information
wrr14001 committed Mar 4, 2019
1 parent 2ef9a94 commit d5df703
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 141 deletions.
11 changes: 9 additions & 2 deletions constants.go
Expand Up @@ -2,13 +2,20 @@ package main

const (
// Gateway addresses
dataEndpoint = "https://sd5-endpoint.engr.uconn.edu:48820/data"
heartbeatEndpoint = "https://sd5-endpoint.engr.uconn.edu:48820/heartbeat"
websocketEndpoint = "wss://sd5-endpoint.engr.uconn.edu:48820/ws"

// Header constants
headerUUID = "uuid"
headerTimestamp = "time"

// message types
// outgoing
kindData = "data"
kindHeartbeat = "heartbeat"
// incoming
kindResponse = "response"
messageReceived = "received"

// Time (in seconds) to wait between performing read/send operations
dataReadPeriod = 1
dataSendPeriod = 10
Expand Down
10 changes: 8 additions & 2 deletions dataTypes.go
@@ -1,6 +1,12 @@
package main

type StampedReading struct {
Timestamp int
Value interface{}
Timestamp int `json:"timestamp"`
Value interface{} `json:"value"`
}

type Packet struct {
ID int `json:"id"`
Kind string `json:"kind"`
Message []byte `json:"message"`
}
169 changes: 32 additions & 137 deletions server.go
@@ -1,95 +1,54 @@
package main

import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"strconv"
"time"
)

type gatewayClient struct {
client *http.Client
}
var dataBuffer = make(map[string][]StampedReading) // Map to store readings in until they are sent

func main() {
rootCAs := x509.NewCertPool() // Initialize a cert pool
rootCert, _ := ioutil.ReadFile("rootCA.pem") // Read our root CA cert

if ok := rootCAs.AppendCertsFromPEM(rootCert); !ok {
fmt.Printf("Error adding gateway cert")
}

config := &tls.Config{RootCAs: rootCAs,} // Trust our root CA

tr := &http.Transport{TLSClientConfig: config,}
client := gatewayClient{&http.Client{Transport: tr}}

dataReadChan := make(chan map[string]StampedReading) // Channel to handle data readings
dataBuffer := make(map[string][]StampedReading) // Map to store readings in until they are sent

go dataReadLoop(dataReadChan) // Start our async read loop
dataSendTick := time.NewTicker(time.Second * dataSendPeriod) // Ticker to send data
heartbeatTick := time.NewTicker(time.Second * heartbeatPeriod) // Ticker to send heartbeats

if err := client.sendHeartbeat(); err != nil {
fmt.Printf("Error sending startup heartbeat: %v\n", err)
}
go dataReadLoop() // Start our async read loop
client.wait.Add(1) // Make sure data and heartbeat loops don't do anything without a valid client
InitializeClient() // Get our client in this thread
go client.readIncoming() // Start our loop to read incoming messages
go dataSendLoop() // Start our async send loop
go heartbeatLoop() // Start our async heartbeat loop

select {} // Immortality
}

// Non-blocking channel reads
func dataReadLoop() { // Read data all day every day
for {
select {

case reading := <-dataReadChan:
// We got a data packet
fmt.Printf("New reading: %v\n", reading)

// Put our reading in the map
for metric, value := range reading {
if record, found := dataBuffer[metric]; !found {
dataBuffer[metric] = []StampedReading{value}
} else {
dataBuffer[metric] = append(record, value)
}
}

case <-dataSendTick.C:
// Time to send some data
if len(dataBuffer) > 0 { // Only send if we have data
if err := client.sendData(dataBuffer); err != nil { // Something went wrong
fmt.Printf("Error sending data: %v\n", err)
} else { // Data sent, clear the map
fmt.Println("Sent data")
dataBuffer = make(map[string][]StampedReading)
}
} else { // There was no data to send
fmt.Println("Got data send signal but buffer is empty")
// TODO: manage the map so that it doesn't run out of memory and kill everything if we don't send data
}

case <-heartbeatTick.C:
// Send a heartbeat
if err := client.sendHeartbeat(); err != nil {
fmt.Printf("Error sending heartbeat: %v\n", err)
reading := readData() // TODO: error handling
for metric, value := range reading { // Put our data in the buffer
if record, found := dataBuffer[metric]; !found {
dataBuffer[metric] = []StampedReading{value}
} else {
fmt.Println("Sent heartbeat")
dataBuffer[metric] = append(record, value)
}
}
time.Sleep(time.Second * dataReadPeriod)
}
}

func dataSendLoop() {
for {
time.Sleep(time.Second * dataSendPeriod)
if len(dataBuffer) == 0 { // Only send if we have data
fmt.Println("No data to send... check logs (that don't exist yet) for data reading errors (also don't exist)")
continue
}
go client.sendData(dataBuffer)
dataBuffer = make(map[string][]StampedReading)
}
}

func dataReadLoop(x chan map[string]StampedReading) {
func heartbeatLoop() {
for {
// Send our read packets down the channel
// TODO: error handling
x <- readData()
time.Sleep(time.Second)
go client.sendHeartbeat()
time.Sleep(time.Second * heartbeatPeriod)
}
}

Expand All @@ -106,67 +65,3 @@ func readData() map[string]StampedReading {
},
}
}

func (client *gatewayClient) sendData(data map[string][]StampedReading) error {

jsonStr, err := json.Marshal(data) // Dump our data map into a json string
if err != nil {
return err
}

req, err := http.NewRequest("POST", dataEndpoint, bytes.NewBuffer(jsonStr)) // Create our request
if err != nil {
return err
}

fillHeader(req) // Get id and timestamp

resp, err := client.client.Do(req) // Do the request
if err != nil {
return err
}

defer func() {
if err := resp.Body.Close(); err != nil { // Honestly not sure what this is for
fmt.Printf("Error: %v\n", err)
}
}()

if resp.StatusCode < 200 || resp.StatusCode >= 300 { // Make sure it was successful
return errors.New(fmt.Sprintf("Received a non-200 response: %v\n", resp.StatusCode))
}

return nil
}

func (client *gatewayClient) sendHeartbeat() error {
req, err := http.NewRequest("GET", heartbeatEndpoint, nil) // Send to heartbeat endpoint
if err != nil {
return err
}

fillHeader(req) // Get id and timestamp

resp, err := client.client.Do(req) // Do it
if err != nil {
return err
}

defer func() {
if err := resp.Body.Close(); err != nil { // This thing again
fmt.Printf("Error: %v\n", err)
}
}()

if resp.StatusCode < 200 || resp.StatusCode >= 300 { // Make sure it was successful
return errors.New(fmt.Sprintf("Received a non-200 response: %v\n", resp.StatusCode))
}

return nil
}

func fillHeader(req *http.Request) { // Sets the uuid and timestamp for a request
req.Header.Set("Content-Type", "application/json")
req.Header.Set(headerUUID, "0000001bcd3f") // TODO: get this from somewhere
req.Header.Set(headerTimestamp, strconv.FormatInt(time.Now().Unix(), 10))
}

0 comments on commit d5df703

Please sign in to comment.