Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Adding database and kafka status/fault support
  • Loading branch information
Evan Langlais committed Apr 29, 2019
1 parent 9aabb97 commit 5e3f416
Showing 1 changed file with 61 additions and 1 deletion.
62 changes: 61 additions & 1 deletion gateway.go
Expand Up @@ -26,7 +26,7 @@ var upgrader = websocket.Upgrader{}

func main() {

dbConnect() // Connect to the database
dbConnect() // Connect to the database
defer func() { // TODO: handle SIGINT
if err := dbConn.Close(); err != nil {
fmt.Printf("Error closing db connection: %v\n", err)
Expand Down Expand Up @@ -198,7 +198,67 @@ func (ws *socketConn) handleStatus(updates Packet) {
fmt.Printf("Got some status updates: %v\n", msg)
}

var generatorId = int32(genMap.mapping[ws.genSn].databaseId)
var dataPackage = avro.NewDataPackage()
dataPackage.Organization = int32(1)
dataPackage.State = avro.State(avro.StateCT)
dataPackage.Region = avro.Region(avro.RegionNORTHEAST)

var table = ""

// TODO: Database updating and emailing
for statusCode, value := range msg.Updates {
if strings.Contains(statusCode, "status.") {
// We have a status update
table = "generator_faults"
} else if strings.Contains(statusCode, "fault.") {
// We have fault update
table = "generator_statuses"
}

if value == 0 {
if _, err := dbConn.Exec("DELETE FROM ? WHERE gen_id = ? AND status_code = ?", table, generatorId, statusCode); err != nil {
fmt.Printf("Error removing status/fault")
}
} else {
if _, err := dbConn.Exec("INSERT INTO ? (gen_id, status_code) VALUES (?, '?') ON DUPLICATE KEY UPDATE gen_id = VALUES(gen_id), status_code = VALUES(status_code)", table, generatorId, statusCode); err != nil {
fmt.Printf("Error setting status/fault")
}
}

var metricData = avro.NewMetricData()
metricData.Metric = statusCode
var dataPoint = avro.NewDataPoint()
dataPoint.Timestamp = int64(msg.Timestamp)
dataPoint.Value = avro.UnionLongDouble{Long: int64(value), UnionType: avro.UnionLongDoubleTypeEnumLong}
metricData.Datapoints = append(metricData.Datapoints, dataPoint)
dataPackage.Data = append(dataPackage.Data, metricData)

}

out := new(bytes.Buffer)
if err := dataPackage.Serialize(out); err != nil {
fmt.Printf("Error serializing data: %v\n", err)
}

w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"sd5-data.engr.uconn.edu:9092"},
Topic: "GeneratorData",
Balancer: &kafka.LeastBytes{},
})

if err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("DataPackage"),
Value: out.Bytes(),
},
); err != nil {
fmt.Printf("Error writing to Kafka: %v\v", err)
}

if err := w.Close(); err != nil {
fmt.Printf("Error closing kafka? Weird: %v\n", err)
}

ws.lock.Lock()
if err := ws.conn.WriteJSON(Packet{updates.ID, kindResponse, []byte("received")}); err != nil {
Expand Down

0 comments on commit 5e3f416

Please sign in to comment.