Skip to content
Permalink
2ef9a9413a
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
172 lines (141 sloc) 4.45 KB
package main
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"strconv"
"time"
)
type gatewayClient struct {
client *http.Client
}
func main() {
rootCAs := x509.NewCertPool() // Initialize a cert pool
rootCert, _ := ioutil.ReadFile("rootCA.pem") // Read our root CA cert
if ok := rootCAs.AppendCertsFromPEM(rootCert); !ok {
fmt.Printf("Error adding gateway cert")
}
config := &tls.Config{RootCAs: rootCAs,} // Trust our root CA
tr := &http.Transport{TLSClientConfig: config,}
client := gatewayClient{&http.Client{Transport: tr}}
dataReadChan := make(chan map[string]StampedReading) // Channel to handle data readings
dataBuffer := make(map[string][]StampedReading) // Map to store readings in until they are sent
go dataReadLoop(dataReadChan) // Start our async read loop
dataSendTick := time.NewTicker(time.Second * dataSendPeriod) // Ticker to send data
heartbeatTick := time.NewTicker(time.Second * heartbeatPeriod) // Ticker to send heartbeats
if err := client.sendHeartbeat(); err != nil {
fmt.Printf("Error sending startup heartbeat: %v\n", err)
}
// Non-blocking channel reads
for {
select {
case reading := <-dataReadChan:
// We got a data packet
fmt.Printf("New reading: %v\n", reading)
// Put our reading in the map
for metric, value := range reading {
if record, found := dataBuffer[metric]; !found {
dataBuffer[metric] = []StampedReading{value}
} else {
dataBuffer[metric] = append(record, value)
}
}
case <-dataSendTick.C:
// Time to send some data
if len(dataBuffer) > 0 { // Only send if we have data
if err := client.sendData(dataBuffer); err != nil { // Something went wrong
fmt.Printf("Error sending data: %v\n", err)
} else { // Data sent, clear the map
fmt.Println("Sent data")
dataBuffer = make(map[string][]StampedReading)
}
} else { // There was no data to send
fmt.Println("Got data send signal but buffer is empty")
// TODO: manage the map so that it doesn't run out of memory and kill everything if we don't send data
}
case <-heartbeatTick.C:
// Send a heartbeat
if err := client.sendHeartbeat(); err != nil {
fmt.Printf("Error sending heartbeat: %v\n", err)
} else {
fmt.Println("Sent heartbeat")
}
}
}
}
func dataReadLoop(x chan map[string]StampedReading) {
for {
// Send our read packets down the channel
// TODO: error handling
x <- readData()
time.Sleep(time.Second)
}
}
func readData() map[string]StampedReading {
// TODO: modbus
return map[string]StampedReading{
"oil.temp": {
Timestamp: int(time.Now().Unix()),
Value: rand.Intn(150),
},
"fuel.temp": {
Timestamp: int(time.Now().Unix()),
Value: rand.Intn(150),
},
}
}
func (client *gatewayClient) sendData(data map[string][]StampedReading) error {
jsonStr, err := json.Marshal(data) // Dump our data map into a json string
if err != nil {
return err
}
req, err := http.NewRequest("POST", dataEndpoint, bytes.NewBuffer(jsonStr)) // Create our request
if err != nil {
return err
}
fillHeader(req) // Get id and timestamp
resp, err := client.client.Do(req) // Do the request
if err != nil {
return err
}
defer func() {
if err := resp.Body.Close(); err != nil { // Honestly not sure what this is for
fmt.Printf("Error: %v\n", err)
}
}()
if resp.StatusCode < 200 || resp.StatusCode >= 300 { // Make sure it was successful
return errors.New(fmt.Sprintf("Received a non-200 response: %v\n", resp.StatusCode))
}
return nil
}
func (client *gatewayClient) sendHeartbeat() error {
req, err := http.NewRequest("GET", heartbeatEndpoint, nil) // Send to heartbeat endpoint
if err != nil {
return err
}
fillHeader(req) // Get id and timestamp
resp, err := client.client.Do(req) // Do it
if err != nil {
return err
}
defer func() {
if err := resp.Body.Close(); err != nil { // This thing again
fmt.Printf("Error: %v\n", err)
}
}()
if resp.StatusCode < 200 || resp.StatusCode >= 300 { // Make sure it was successful
return errors.New(fmt.Sprintf("Received a non-200 response: %v\n", resp.StatusCode))
}
return nil
}
func fillHeader(req *http.Request) { // Sets the uuid and timestamp for a request
req.Header.Set("Content-Type", "application/json")
req.Header.Set(headerUUID, "0000001bcd3f") // TODO: get this from somewhere
req.Header.Set(headerTimestamp, strconv.FormatInt(time.Now().Unix(), 10))
}