Skip to content
Permalink
90909e4de0
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
170 lines (148 sloc) 5.8 KB
package main
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"github.com/gorilla/websocket"
"io/ioutil"
"net/http"
"strconv"
"sync"
"time"
)
var client = &_client{ // Our global client
&websocket.Conn{},
sync.Mutex{},
sync.WaitGroup{},
make(chan map[string][]StampedReading),
make(chan Packet),
make(map[string][]StampedReading),
sync.Mutex{},
}
type _client struct {
// Client struct definition
conn *websocket.Conn // The websocket connection
connLock sync.Mutex // Lock to manage concurrent writing
wait sync.WaitGroup // Waitgroup to manage connection (re)initialization
send chan map[string][]StampedReading // Channel to accept incoming data to send
resend chan Packet // Channel to accept packets that need resending
sendBuffer map[string][]StampedReading // Buffer to store data to be sent
bufferLock sync.Mutex // Lock to control concurrent access to send buffer
}
func (c *_client) start() {
client.wait.Add(1) // Make sure other loops don't do anything without a valid client
go c.connect() // Do the connecty thing
go c.handlePackets() // Loop to handle incoming packets from send and resend channels
go c.readIncoming() // Loop to read incoming messages from the gateway
}
func (c *_client) connect() {
c.connLock.Lock() // Shouldn't matter because the other processes will be waiting, but I'm gonna do it anyways
defer c.connLock.Unlock()
rootCAs := x509.NewCertPool() // Initialize a cert pool
rootCert, _ := ioutil.ReadFile("/opt/emulation-server/rootCA.pem") // Read our root CA cert
rootCAs.AppendCertsFromPEM(rootCert) // Add it to the pool
config := &tls.Config{RootCAs: rootCAs} // Trust our root CA
dialer := websocket.Dialer{ // Create a websocket dialer with our tls config
TLSClientConfig: config,
}
for { // Do this until we succeed
conn, _, err := dialer.Dial(websocketEndpoint, getHeader()) // send our dial request with a header containing id
if err != nil {
fmt.Printf("Error getting websocket client: %v\n", err)
fmt.Println("Retrying in 5 seconds...")
// TODO: back off after repeated failures
// TODO: read data less frequently after repeated errors
time.Sleep(5 * time.Second)
continue
}
c.conn = conn
c.wait.Done() // Now that we have a connection, let communication proceed
return
}
}
func (c *_client) addReadings(readings map[string][]StampedReading) {
c.bufferLock.Lock()
defer c.bufferLock.Unlock()
for metric, value := range readings { // Put our data in the buffer
if record, found := c.sendBuffer[metric]; !found {
c.sendBuffer[metric] = value // If there are no readings for this metric yet, add a new map key
} else {
c.sendBuffer[metric] = append(record, value...) // Otherwise just throw them in at the end
}
}
}
func (c *_client) handlePackets() {
for {
select {
case readings := <-c.send: // Data packet from the send channel
c.addReadings(readings)
case packet := <-c.resend: // Packet to be resent
var readings map[string][]StampedReading
if err := json.Unmarshal(packet.Message, &readings); err != nil { // Pull data out of the packet
fmt.Printf("Error unmarshalling data from packet %v to resend: %v\n", packet.ID, err)
} else {
c.addReadings(readings)
}
}
}
}
func (c *_client) sendData() {
c.wait.Wait() // Make sure client is connected
c.bufferLock.Lock() // Lock to grab and replace buffer
buffer := c.sendBuffer // Grab all the stuff there is to send
c.sendBuffer = make(map[string][]StampedReading) // Clear the buffer
c.bufferLock.Unlock() // Unlock after
byteMsg, err := json.Marshal(buffer) // Send data as a byte slice for easy unmarshalling on the gateway side
if err != nil {
fmt.Printf("Error marshalling data: %v\n", err)
fmt.Println("NOT RETRYING") // TODO: probably something
return
}
p := Packet{nextId(), kindData, byteMsg} // Wrap it into a packet
fmt.Printf("Sending packet %v\n", p.ID)
c.connLock.Lock() // Lock to send
if err := c.conn.WriteJSON(p); err != nil { // Try to send
fmt.Printf("Error sending %v packet %v: %v\n", p.Kind, p.ID, err) // If we fail, try to reconnect
c.wait.Add(1) // Prevent client things from happening
go c.connect() // Reconnect
}
c.connLock.Unlock() // Unlock after
responseManager.addPacket(p) // Wait for a response -- also requeues the data if sending it failed the first time.
}
func (c *_client) readIncoming() { // Read messages from gateway
for {
c.wait.Wait() // Wait for the client to be ready
var packet Packet
if err := c.conn.ReadJSON(&packet); err != nil { // Get a packet
fmt.Printf("Error reading from websocket: %v\n", err) // Connection is screwed
c.wait.Add(1)
go c.connect() // Reconnect
} else {
switch packet.Kind {
case kindResponse: // Got a response
go responseManager.received(packet) // Tell the response map
default:
fmt.Printf("Got unknown packet type: %v\n", packet.Kind)
}
}
}
}
func getHeader() http.Header {
h := http.Header{}
h.Set("Content-Type", "application/json")
h.Set(headerUUID, "0000000000000001") // TODO: get this from somewhere
h.Set(headerTimestamp, strconv.FormatInt(time.Now().Unix(), 10))
return h
}
var idManager = packetId{0, sync.Mutex{}} // One ID manager to rule them all
type packetId struct {
_id int
lock sync.Mutex
}
func nextId() int { // Make sure we get a different number every time this is called
idManager.lock.Lock()
defer idManager.lock.Unlock()
idManager._id++
return idManager._id
}