From 5e3f416ed57b2403d8dbb66d11c308a050c97e5e Mon Sep 17 00:00:00 2001 From: Evan Langlais Date: Sun, 28 Apr 2019 22:27:58 -0400 Subject: [PATCH] Adding database and kafka status/fault support --- gateway.go | 62 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/gateway.go b/gateway.go index 29d97bb..96e8514 100644 --- a/gateway.go +++ b/gateway.go @@ -26,7 +26,7 @@ var upgrader = websocket.Upgrader{} func main() { - dbConnect() // Connect to the database + dbConnect() // Connect to the database defer func() { // TODO: handle SIGINT if err := dbConn.Close(); err != nil { fmt.Printf("Error closing db connection: %v\n", err) @@ -198,7 +198,67 @@ func (ws *socketConn) handleStatus(updates Packet) { fmt.Printf("Got some status updates: %v\n", msg) } + var generatorId = int32(genMap.mapping[ws.genSn].databaseId) + var dataPackage = avro.NewDataPackage() + dataPackage.Organization = int32(1) + dataPackage.State = avro.State(avro.StateCT) + dataPackage.Region = avro.Region(avro.RegionNORTHEAST) + + var table = "" + // TODO: Database updating and emailing + for statusCode, value := range msg.Updates { + if strings.Contains(statusCode, "status.") { + // We have a status update + table = "generator_faults" + } else if strings.Contains(statusCode, "fault.") { + // We have fault update + table = "generator_statuses" + } + + if value == 0 { + if _, err := dbConn.Exec("DELETE FROM ? WHERE gen_id = ? AND status_code = ?", table, generatorId, statusCode); err != nil { + fmt.Printf("Error removing status/fault") + } + } else { + if _, err := dbConn.Exec("INSERT INTO ? (gen_id, status_code) VALUES (?, '?') ON DUPLICATE KEY UPDATE gen_id = VALUES(gen_id), status_code = VALUES(status_code)", table, generatorId, statusCode); err != nil { + fmt.Printf("Error setting status/fault") + } + } + + var metricData = avro.NewMetricData() + metricData.Metric = statusCode + var dataPoint = avro.NewDataPoint() + dataPoint.Timestamp = int64(msg.Timestamp) + dataPoint.Value = avro.UnionLongDouble{Long: int64(value), UnionType: avro.UnionLongDoubleTypeEnumLong} + metricData.Datapoints = append(metricData.Datapoints, dataPoint) + dataPackage.Data = append(dataPackage.Data, metricData) + + } + + out := new(bytes.Buffer) + if err := dataPackage.Serialize(out); err != nil { + fmt.Printf("Error serializing data: %v\n", err) + } + + w := kafka.NewWriter(kafka.WriterConfig{ + Brokers: []string{"sd5-data.engr.uconn.edu:9092"}, + Topic: "GeneratorData", + Balancer: &kafka.LeastBytes{}, + }) + + if err := w.WriteMessages(context.Background(), + kafka.Message{ + Key: []byte("DataPackage"), + Value: out.Bytes(), + }, + ); err != nil { + fmt.Printf("Error writing to Kafka: %v\v", err) + } + + if err := w.Close(); err != nil { + fmt.Printf("Error closing kafka? Weird: %v\n", err) + } ws.lock.Lock() if err := ws.conn.WriteJSON(Packet{updates.ID, kindResponse, []byte("received")}); err != nil {