Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Receives status packets from the generator server
  • Loading branch information
wrr14001 committed Apr 26, 2019
1 parent 18bb22c commit 9aabb97
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 20 deletions.
5 changes: 3 additions & 2 deletions constants.go
Expand Up @@ -6,6 +6,7 @@ const (
headerTimestamp = "time"

// Data packet types
kindData = "data"
kindHeartbeat = "heartbeat"
kindData = "data"
kindStatus = "status"
kindResponse = "response"
)
5 changes: 5 additions & 0 deletions data-types.go
Expand Up @@ -13,6 +13,11 @@ type Packet struct {
Message []byte `json:"message"`
}

type StatusUpdates struct {
Timestamp int `json:"timestamp"`
Updates map[string]int `json:"updates"`
}

type generator struct {
databaseId int
serverSn string
Expand Down
46 changes: 28 additions & 18 deletions gateway.go
@@ -1,7 +1,7 @@
package main

import (
avro "Gateway-Server/model"
"Gateway-Server/model"
"bytes"
"context"
"database/sql"
Expand All @@ -26,7 +26,7 @@ var upgrader = websocket.Upgrader{}

func main() {

dbConnect() // Connect to the database
dbConnect() // Connect to the database
defer func() { // TODO: handle SIGINT
if err := dbConn.Close(); err != nil {
fmt.Printf("Error closing db connection: %v\n", err)
Expand All @@ -45,7 +45,7 @@ func main() {
}

func dbConnect() {
sqlConf, err := ioutil.ReadFile("/root/.mysql_goconf") // Read the db config file
sqlConf, err := ioutil.ReadFile("/opt/gateway/.mysql_goconf") // Read the db config file
if err != nil {
fmt.Printf("Error reading MySQL config file: %v\n", err)
os.Exit(1) // If we can't connect to the database there's not much point in continuing
Expand All @@ -57,7 +57,8 @@ func dbConnect() {
connInfo[0],
connInfo[1],
connInfo[2],
connInfo[3]))
connInfo[3],
))

if err != nil {
fmt.Printf("Error connecting to the database: %v\n", err)
Expand All @@ -70,7 +71,7 @@ type socketConn struct {
// Struct to handle our websocket connection
conn *websocket.Conn
lock sync.Mutex
genId string
genSn string
}

func connectionHandler(w http.ResponseWriter, r *http.Request) {
Expand All @@ -92,7 +93,7 @@ func connectionHandler(w http.ResponseWriter, r *http.Request) {
} else {
fmt.Printf("Got generator: %v\n", g)
}
ws.genId = g.serverSn
ws.genSn = g.serverSn
genMap.add(g.serverSn, g) // Add to map of SNs to generator IDs/ws connections
DBIDMap.add(g.databaseId, g.serverSn) // Add to map of database IDs to SNs

Expand All @@ -101,13 +102,14 @@ func connectionHandler(w http.ResponseWriter, r *http.Request) {
err := conn.ReadJSON(&packet) // Get our next packet
if err != nil {
fmt.Printf("Error reading message: %v\n", err)
return // End this connection
delete(genMap.mapping, g.serverSn) // Remove connection mapping
return // End this connection
} else {
switch packet.Kind {
case kindData:
go ws.handleData(packet)
case kindHeartbeat:
go ws.handleHeartbeat(packet)
case kindStatus:
go ws.handleStatus(packet)
default:
fmt.Printf("Received unknown packet type: %v\n", packet.Kind)
}
Expand All @@ -121,9 +123,8 @@ func (ws *socketConn) handleData(data Packet) {
fmt.Printf("Error unmarshalling data: %v\n", err)
} else {
fmt.Printf("got some data: %v\n", msg)
ws.lock.Lock()
var dataPackage = avro.NewDataPackage()
dataPackage.Generator = int32(genMap.mapping[ws.genId].databaseId)
dataPackage.Generator = int32(genMap.mapping[ws.genSn].databaseId)
// TODO: Have to get the generators state and region, probably in the same generator id map structure
dataPackage.Organization = int32(1)
dataPackage.State = avro.State(avro.StateCT)
Expand Down Expand Up @@ -179,22 +180,31 @@ func (ws *socketConn) handleData(data Packet) {
fmt.Printf("Error closing kafka? Weird: %v\n", err)
}

if err := ws.conn.WriteJSON(Packet{data.ID, "response", []byte("received")}); err != nil {
fmt.Printf("Error sending response: %v\n", err)
ws.lock.Lock()
if err := ws.conn.WriteJSON(Packet{data.ID, kindResponse, []byte("received")}); err != nil {
fmt.Printf("Error sending response for packet %v: %v\n", data.ID, err)
} else {
fmt.Printf("Sent response for packet %v\n", data.ID)
}
ws.lock.Unlock()
}
}

func (ws *socketConn) handleHeartbeat(data Packet) {
fmt.Printf("Heartbeat from %v\n", ws.conn.RemoteAddr())
func (ws *socketConn) handleStatus(updates Packet) {
var msg StatusUpdates
if err := json.Unmarshal(updates.Message, &msg); err != nil {
fmt.Printf("Error unmarshalling status updates: %v\n", err)
} else {
fmt.Printf("Got some status updates: %v\n", msg)
}

// TODO: Database updating and emailing

ws.lock.Lock()
if err := ws.conn.WriteJSON(Packet{data.ID, "response", []byte("received")}); err != nil {
fmt.Printf("Error sending response: %v\n", err)
if err := ws.conn.WriteJSON(Packet{updates.ID, kindResponse, []byte("received")}); err != nil {
fmt.Printf("Error sending response for packet %v: %v\n", updates.ID, err)
} else {
fmt.Printf("Sent response for packet %v\n", data.ID)
fmt.Printf("Sent response for packet %v\n", updates.ID)
}
ws.lock.Unlock()
}

0 comments on commit 9aabb97

Please sign in to comment.