Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Gateway-Server/gateway.go
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
274 lines (240 sloc)
8.24 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"Gateway-Server/model" | |
"bytes" | |
"context" | |
"database/sql" | |
"encoding/json" | |
"fmt" | |
_ "github.com/go-sql-driver/mysql" | |
"github.com/gorilla/websocket" | |
"github.com/segmentio/kafka-go" | |
"io/ioutil" | |
"net/http" | |
"os" | |
"strings" | |
"sync" | |
) | |
// TODO: log to file | |
var genMap = newGenMap() | |
var DBIDMap = newIDMap() | |
var dbConn *sql.DB | |
var upgrader = websocket.Upgrader{} | |
func main() { | |
dbConnect() // Connect to the database | |
defer func() { // TODO: handle SIGINT | |
if err := dbConn.Close(); err != nil { | |
fmt.Printf("Error closing db connection: %v\n", err) | |
} | |
}() | |
http.HandleFunc("/ws", connectionHandler) // All incoming websocket connections use this | |
if err := http.ListenAndServeTLS( | |
":48820", | |
"/etc/pki/tls/certs/gateway.crt", | |
"/etc/pki/tls/private/gateway.key", | |
nil); err != nil { | |
fmt.Printf("Error serving TLS: %v\n", err) | |
} | |
} | |
func dbConnect() { | |
sqlConf, err := ioutil.ReadFile("/opt/gateway/.mysql_goconf") // Read the db config file | |
if err != nil { | |
fmt.Printf("Error reading MySQL config file: %v\n", err) | |
os.Exit(1) // If we can't connect to the database there's not much point in continuing | |
} | |
connInfo := strings.Fields(string(sqlConf)) | |
dbConn, err = sql.Open("mysql", fmt.Sprintf("%v:%v@tcp(%v)/%v", | |
connInfo[0], | |
connInfo[1], | |
connInfo[2], | |
connInfo[3], | |
)) | |
if err != nil { | |
fmt.Printf("Error connecting to the database: %v\n", err) | |
// TODO: maybe not die here and probably handle the database going down and coming back up | |
os.Exit(1) | |
} | |
} | |
type socketConn struct { | |
// Struct to handle our websocket connection | |
conn *websocket.Conn | |
lock sync.Mutex | |
genSn string | |
} | |
func connectionHandler(w http.ResponseWriter, r *http.Request) { | |
conn, err := upgrader.Upgrade(w, r, nil) // Upgrade to websocket | |
if err != nil { | |
fmt.Printf("Error upgrading websocket: %v\n", err) | |
return | |
} else { | |
fmt.Printf("Got connection from %v@%v\n", r.Header.Get(headerUUID), conn.RemoteAddr()) | |
} | |
ws := &socketConn{conn: conn, lock: sync.Mutex{}} | |
g := &generator{socket: ws} | |
row := dbConn.QueryRow( // Figure out the database id of this generator | |
"SELECT gen_id, server_id FROM generators WHERE server_id = ?", | |
r.Header.Get(headerUUID)) | |
if err := row.Scan(&g.databaseId, &g.serverSn); err != nil { // Dump that info into our generator struct | |
fmt.Printf("Error with database results: %v\n", err) | |
} else { | |
fmt.Printf("Got generator: %v\n", g) | |
} | |
ws.genSn = g.serverSn | |
genMap.add(g.serverSn, g) // Add to map of SNs to generator IDs/ws connections | |
DBIDMap.add(g.databaseId, g.serverSn) // Add to map of database IDs to SNs | |
for { // Do forever | |
if _, err := dbConn.Exec(fmt.Sprintf("INSERT INTO `sd5-state`.generator_state (gen_id, gen_ip, ttl) VALUES (%d, '%s', DATE_ADD(CURRENT_TIME(), INTERVAL 15 MINUTE )) ON DUPLICATE KEY UPDATE gen_id = VALUES(gen_id), gen_ip = VALUES(gen_ip), ttl = VALUES(ttl)", g.databaseId, ws.conn.RemoteAddr().String())); err != nil { | |
fmt.Printf("Error updating generator state: %v\n", err) | |
} | |
var packet Packet | |
err := conn.ReadJSON(&packet) // Get our next packet | |
if err != nil { | |
fmt.Printf("Error reading message: %v\n", err) | |
delete(genMap.mapping, g.serverSn) // Remove connection mapping | |
return // End this connection | |
} else { | |
switch packet.Kind { | |
case kindData: | |
go ws.handleData(packet) | |
case kindStatus: | |
go ws.handleStatus(packet) | |
default: | |
fmt.Printf("Received unknown packet type: %v\n", packet.Kind) | |
} | |
} | |
} | |
} | |
func (ws *socketConn) handleData(data Packet) { | |
var msg map[string][]StampedReading // The message should always be a byte slice that will unmarshal into this | |
if err := json.Unmarshal(data.Message, &msg); err != nil { | |
fmt.Printf("Error unmarshalling data: %v\n", err) | |
} else { | |
fmt.Printf("got some data: %v\n", msg) | |
var dataPackage = avro.NewDataPackage() | |
dataPackage.Generator = int32(genMap.mapping[ws.genSn].databaseId) | |
// TODO: Have to get the generators state and region, probably in the same generator id map structure | |
dataPackage.Organization = int32(1) | |
dataPackage.State = avro.State(avro.StateCT) | |
dataPackage.Region = avro.Region(avro.RegionNORTHEAST) | |
for metric, stampedReadings := range msg { | |
var metricData = avro.NewMetricData() | |
metricData.Metric = metric | |
for _, reading := range stampedReadings { | |
var dataPoint = avro.NewDataPoint() | |
dataPoint.Timestamp = int64(reading.Timestamp) | |
var val = avro.UnionLongDouble{} | |
switch v := (reading.Value).(type) { | |
case int: | |
case int8: | |
case int16: | |
case int32: | |
case int64: | |
val.Long = int64(v) | |
val.UnionType = avro.UnionLongDoubleTypeEnumLong | |
break | |
case float32: | |
case float64: | |
val.Double = float64(v) | |
val.UnionType = avro.UnionLongDoubleTypeEnumDouble | |
break | |
} | |
dataPoint.Value = val | |
metricData.Datapoints = append(metricData.Datapoints, dataPoint) | |
} | |
dataPackage.Data = append(dataPackage.Data, metricData) | |
} | |
out := new(bytes.Buffer) | |
if err := dataPackage.Serialize(out); err != nil { | |
fmt.Printf("Error serializing data: %v\n", err) | |
} | |
w := kafka.NewWriter(kafka.WriterConfig{ | |
Brokers: []string{"sd5-data.engr.uconn.edu:9092"}, | |
Topic: "GeneratorData", | |
Balancer: &kafka.LeastBytes{}, | |
}) | |
if err := w.WriteMessages(context.Background(), | |
kafka.Message{ | |
Key: []byte("DataPackage"), | |
Value: out.Bytes(), | |
}, | |
); err != nil { | |
fmt.Printf("Error writing to Kafka: %v\v", err) | |
} | |
if err := w.Close(); err != nil { | |
fmt.Printf("Error closing kafka? Weird: %v\n", err) | |
} | |
ws.lock.Lock() | |
if err := ws.conn.WriteJSON(Packet{data.ID, kindResponse, []byte("received")}); err != nil { | |
fmt.Printf("Error sending response for packet %v: %v\n", data.ID, err) | |
} else { | |
fmt.Printf("Sent response for packet %v\n", data.ID) | |
} | |
ws.lock.Unlock() | |
} | |
} | |
func (ws *socketConn) handleStatus(updates Packet) { | |
var msg StatusUpdates | |
if err := json.Unmarshal(updates.Message, &msg); err != nil { | |
fmt.Printf("Error unmarshalling status updates: %v\n", err) | |
} else { | |
fmt.Printf("Got some status updates: %v\n", msg) | |
} | |
var generatorId = int32(genMap.mapping[ws.genSn].databaseId) | |
var dataPackage = avro.NewDataPackage() | |
dataPackage.Organization = int32(1) | |
dataPackage.State = avro.State(avro.StateCT) | |
dataPackage.Region = avro.Region(avro.RegionNORTHEAST) | |
var table = "" | |
for statusCode, value := range msg.Updates { | |
if strings.Contains(statusCode, "fault.") { | |
// We have a status update | |
table = "generator_faults" | |
} else if strings.Contains(statusCode, "status.") { | |
// We have fault update | |
table = "generator_statuses" | |
} | |
var query = "" | |
if value == 0 { | |
query = fmt.Sprintf("DELETE FROM `sd5-state`.%s WHERE gen_id = %d AND status_code = '%s'", table, generatorId, statusCode) | |
} else { | |
query = fmt.Sprintf("INSERT INTO `sd5-state`.%s (gen_id, status_code) VALUES (%d, '%s') ON DUPLICATE KEY UPDATE gen_id = VALUES(gen_id), status_code = VALUES(status_code)", table, generatorId, statusCode) | |
} | |
if _, err := dbConn.Exec(query); err != nil { | |
fmt.Printf("Error processing statuses/faults: %v\n", err) | |
} | |
var metricData = avro.NewMetricData() | |
metricData.Metric = statusCode | |
var dataPoint = avro.NewDataPoint() | |
dataPoint.Timestamp = int64(msg.Timestamp) | |
dataPoint.Value = avro.UnionLongDouble{Long: int64(value), UnionType: avro.UnionLongDoubleTypeEnumLong} | |
metricData.Datapoints = append(metricData.Datapoints, dataPoint) | |
dataPackage.Data = append(dataPackage.Data, metricData) | |
} | |
out := new(bytes.Buffer) | |
if err := dataPackage.Serialize(out); err != nil { | |
fmt.Printf("Error serializing data: %v\n", err) | |
} | |
w := kafka.NewWriter(kafka.WriterConfig{ | |
Brokers: []string{"sd5-data.engr.uconn.edu:9092"}, | |
Topic: "GeneratorData", | |
Balancer: &kafka.LeastBytes{}, | |
}) | |
if err := w.WriteMessages(context.Background(), | |
kafka.Message{ | |
Key: []byte("DataPackage"), | |
Value: out.Bytes(), | |
}, | |
); err != nil { | |
fmt.Printf("Error writing to Kafka: %v\v", err) | |
} | |
if err := w.Close(); err != nil { | |
fmt.Printf("Error closing kafka? Weird: %v\n", err) | |
} | |
ws.lock.Lock() | |
if err := ws.conn.WriteJSON(Packet{updates.ID, kindResponse, []byte("received")}); err != nil { | |
fmt.Printf("Error sending response for packet %v: %v\n", updates.ID, err) | |
} else { | |
fmt.Printf("Sent response for packet %v\n", updates.ID) | |
} | |
ws.lock.Unlock() | |
} |