Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Merge pull request #1 from SeniorDesignGroup5/kafka-avro
Implementing Kafka datapath for data packages utilizing Avro
  • Loading branch information
eal13009 committed Mar 7, 2019
2 parents 97fada5 + c12fb74 commit f1c82f8
Show file tree
Hide file tree
Showing 9 changed files with 900 additions and 1 deletion.
61 changes: 60 additions & 1 deletion gateway.go
@@ -1,11 +1,15 @@
package main

import (
"Gateway-Server/model"
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/gorilla/websocket"
"github.com/segmentio/kafka-go"
"io/ioutil"
"net/http"
"os"
Expand Down Expand Up @@ -117,8 +121,63 @@ func (ws *socketConn) handleData(data Packet) {
fmt.Printf("Error unmarshalling data: %v\n", err)
} else {
fmt.Printf("got some data: %v\n", msg)
// TODO: Better response handling
ws.lock.Lock()
var dataPackage = avro.NewDataPackage()
dataPackage.Generator = int32(genMap.mapping[ws.genId].databaseId)
// TODO: Have to get the generators state and region, probably in the same generator id map structure
dataPackage.State = avro.State(avro.StateCT)
dataPackage.Region = avro.Region(avro.RegionNORTHEAST)
for metric, stampedReadings := range msg {
var metricData = avro.NewMetricData()
metricData.Metric = metric
for _, reading := range stampedReadings {
var dataPoint = avro.NewDataPoint()
dataPoint.Timestamp = int64(reading.Timestamp)
var val = avro.UnionLongDouble{}
switch v := (reading.Value).(type) {
case int:
case int8:
case int16:
case int32:
case int64:
val.Long = int64(v)
val.UnionType = avro.UnionLongDoubleTypeEnumLong
break
case float32:
case float64:
val.Double = float64(v)
val.UnionType = avro.UnionLongDoubleTypeEnumDouble
break
}
dataPoint.Value = val
metricData.Datapoints = append(metricData.Datapoints, dataPoint)
}
dataPackage.Data = append(dataPackage.Data, metricData)
}
out := new(bytes.Buffer)
if err := dataPackage.Serialize(out); err != nil {
fmt.Printf("Error serializing data: %v\n", err)
}

w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"sd5-data.engr.uconn.edu:9092"},
Topic: "GeneratorData",
Balancer: &kafka.LeastBytes{},
})

if err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("DataPackage"),
Value: out.Bytes(),
},
); err != nil {
fmt.Printf("Error writing to Kafka: %v\v", err)
}

if err := w.Close(); err != nil {
fmt.Printf("Error closing kafka? Weird: %v\n", err)
}

if err := ws.conn.WriteJSON(Packet{data.ID, "response", []byte("received")}); err != nil {
fmt.Printf("Error sending response: %v\n", err)
} else {
Expand Down
38 changes: 38 additions & 0 deletions model/data_package.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions model/data_point.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 64 additions & 0 deletions model/datapackage.avsc
@@ -0,0 +1,64 @@
{
"namespace": "com.powerpanel.kso.model",
"type": "record",
"name": "DataPackage",
"fields": [
{
"type": "int",
"name": "generator"
},
{
"name": "region",
"type": {
"name": "Region",
"type": "enum",
"symbols": ["NORTHEAST", "SOUTHEAST", "WEST", "MIDWEST"]
}
},
{
"name": "state",
"type": {
"name": "State",
"type": "enum",
"symbols": ["AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"]
}
},
{
"name": "data",
"type": {
"type": "array",
"items": {
"name": "MetricData",
"type": "record",
"fields": [
{
"name": "metric",
"type": "string"
},
{
"name": "datapoints",
"type": {
"type": "array",
"items": {
"name": "DataPoint",
"type": "record",
"fields": [
{
"name": "timestamp",
"type": "long"
},
{
"name": "value",
"type": ["long", "double"]
}
]
}

}
}
]
}
}
}
]
}
36 changes: 36 additions & 0 deletions model/metric_data.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit f1c82f8

Please sign in to comment.