diff --git a/constants.go b/constants.go index a8c54f0..eb47905 100644 --- a/constants.go +++ b/constants.go @@ -6,6 +6,7 @@ const ( headerTimestamp = "time" // Data packet types - kindData = "data" - kindHeartbeat = "heartbeat" + kindData = "data" + kindStatus = "status" + kindResponse = "response" ) diff --git a/data-types.go b/data-types.go index db5e1c2..031cbbf 100644 --- a/data-types.go +++ b/data-types.go @@ -13,6 +13,11 @@ type Packet struct { Message []byte `json:"message"` } +type StatusUpdates struct { + Timestamp int `json:"timestamp"` + Updates map[string]int `json:"updates"` +} + type generator struct { databaseId int serverSn string diff --git a/gateway.go b/gateway.go index 82b657e..29d97bb 100644 --- a/gateway.go +++ b/gateway.go @@ -1,7 +1,7 @@ package main import ( - avro "Gateway-Server/model" + "Gateway-Server/model" "bytes" "context" "database/sql" @@ -26,7 +26,7 @@ var upgrader = websocket.Upgrader{} func main() { - dbConnect() // Connect to the database + dbConnect() // Connect to the database defer func() { // TODO: handle SIGINT if err := dbConn.Close(); err != nil { fmt.Printf("Error closing db connection: %v\n", err) @@ -45,7 +45,7 @@ func main() { } func dbConnect() { - sqlConf, err := ioutil.ReadFile("/root/.mysql_goconf") // Read the db config file + sqlConf, err := ioutil.ReadFile("/opt/gateway/.mysql_goconf") // Read the db config file if err != nil { fmt.Printf("Error reading MySQL config file: %v\n", err) os.Exit(1) // If we can't connect to the database there's not much point in continuing @@ -57,7 +57,8 @@ func dbConnect() { connInfo[0], connInfo[1], connInfo[2], - connInfo[3])) + connInfo[3], + )) if err != nil { fmt.Printf("Error connecting to the database: %v\n", err) @@ -70,7 +71,7 @@ type socketConn struct { // Struct to handle our websocket connection conn *websocket.Conn lock sync.Mutex - genId string + genSn string } func connectionHandler(w http.ResponseWriter, r *http.Request) { @@ -92,7 +93,7 @@ func connectionHandler(w http.ResponseWriter, r *http.Request) { } else { fmt.Printf("Got generator: %v\n", g) } - ws.genId = g.serverSn + ws.genSn = g.serverSn genMap.add(g.serverSn, g) // Add to map of SNs to generator IDs/ws connections DBIDMap.add(g.databaseId, g.serverSn) // Add to map of database IDs to SNs @@ -101,13 +102,14 @@ func connectionHandler(w http.ResponseWriter, r *http.Request) { err := conn.ReadJSON(&packet) // Get our next packet if err != nil { fmt.Printf("Error reading message: %v\n", err) - return // End this connection + delete(genMap.mapping, g.serverSn) // Remove connection mapping + return // End this connection } else { switch packet.Kind { case kindData: go ws.handleData(packet) - case kindHeartbeat: - go ws.handleHeartbeat(packet) + case kindStatus: + go ws.handleStatus(packet) default: fmt.Printf("Received unknown packet type: %v\n", packet.Kind) } @@ -121,9 +123,8 @@ func (ws *socketConn) handleData(data Packet) { fmt.Printf("Error unmarshalling data: %v\n", err) } else { fmt.Printf("got some data: %v\n", msg) - ws.lock.Lock() var dataPackage = avro.NewDataPackage() - dataPackage.Generator = int32(genMap.mapping[ws.genId].databaseId) + dataPackage.Generator = int32(genMap.mapping[ws.genSn].databaseId) // TODO: Have to get the generators state and region, probably in the same generator id map structure dataPackage.Organization = int32(1) dataPackage.State = avro.State(avro.StateCT) @@ -179,8 +180,9 @@ func (ws *socketConn) handleData(data Packet) { fmt.Printf("Error closing kafka? Weird: %v\n", err) } - if err := ws.conn.WriteJSON(Packet{data.ID, "response", []byte("received")}); err != nil { - fmt.Printf("Error sending response: %v\n", err) + ws.lock.Lock() + if err := ws.conn.WriteJSON(Packet{data.ID, kindResponse, []byte("received")}); err != nil { + fmt.Printf("Error sending response for packet %v: %v\n", data.ID, err) } else { fmt.Printf("Sent response for packet %v\n", data.ID) } @@ -188,13 +190,21 @@ func (ws *socketConn) handleData(data Packet) { } } -func (ws *socketConn) handleHeartbeat(data Packet) { - fmt.Printf("Heartbeat from %v\n", ws.conn.RemoteAddr()) +func (ws *socketConn) handleStatus(updates Packet) { + var msg StatusUpdates + if err := json.Unmarshal(updates.Message, &msg); err != nil { + fmt.Printf("Error unmarshalling status updates: %v\n", err) + } else { + fmt.Printf("Got some status updates: %v\n", msg) + } + + // TODO: Database updating and emailing + ws.lock.Lock() - if err := ws.conn.WriteJSON(Packet{data.ID, "response", []byte("received")}); err != nil { - fmt.Printf("Error sending response: %v\n", err) + if err := ws.conn.WriteJSON(Packet{updates.ID, kindResponse, []byte("received")}); err != nil { + fmt.Printf("Error sending response for packet %v: %v\n", updates.ID, err) } else { - fmt.Printf("Sent response for packet %v\n", data.ID) + fmt.Printf("Sent response for packet %v\n", updates.ID) } ws.lock.Unlock() }