Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Implementing Kafka datapath with data package avro serialization
  • Loading branch information
Evan Langlais committed Mar 6, 2019
1 parent b101a74 commit c12fb74
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 37 deletions.
61 changes: 60 additions & 1 deletion gateway.go
@@ -1,11 +1,15 @@
package main

import (
"Gateway-Server/model"
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
_ "github.com/go-sql-driver/mysql"
"github.com/gorilla/websocket"
"github.com/segmentio/kafka-go"
"io/ioutil"
"net/http"
"os"
Expand Down Expand Up @@ -117,8 +121,63 @@ func (ws *socketConn) handleData(data Packet) {
fmt.Printf("Error unmarshalling data: %v\n", err)
} else {
fmt.Printf("got some data: %v\n", msg)
// TODO: Better response handling
ws.lock.Lock()
var dataPackage = avro.NewDataPackage()
dataPackage.Generator = int32(genMap.mapping[ws.genId].databaseId)
// TODO: Have to get the generators state and region, probably in the same generator id map structure
dataPackage.State = avro.State(avro.StateCT)
dataPackage.Region = avro.Region(avro.RegionNORTHEAST)
for metric, stampedReadings := range msg {
var metricData = avro.NewMetricData()
metricData.Metric = metric
for _, reading := range stampedReadings {
var dataPoint = avro.NewDataPoint()
dataPoint.Timestamp = int64(reading.Timestamp)
var val = avro.UnionLongDouble{}
switch v := (reading.Value).(type) {
case int:
case int8:
case int16:
case int32:
case int64:
val.Long = int64(v)
val.UnionType = avro.UnionLongDoubleTypeEnumLong
break
case float32:
case float64:
val.Double = float64(v)
val.UnionType = avro.UnionLongDoubleTypeEnumDouble
break
}
dataPoint.Value = val
metricData.Datapoints = append(metricData.Datapoints, dataPoint)
}
dataPackage.Data = append(dataPackage.Data, metricData)
}
out := new(bytes.Buffer)
if err := dataPackage.Serialize(out); err != nil {
fmt.Printf("Error serializing data: %v\n", err)
}

w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"sd5-data.engr.uconn.edu:9092"},
Topic: "GeneratorData",
Balancer: &kafka.LeastBytes{},
})

if err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("DataPackage"),
Value: out.Bytes(),
},
); err != nil {
fmt.Printf("Error writing to Kafka: %v\v", err)
}

if err := w.Close(); err != nil {
fmt.Printf("Error closing kafka? Weird: %v\n", err)
}

if err := ws.conn.WriteJSON(Packet{data.ID, "response", []byte("received")}); err != nil {
fmt.Printf("Error sending response: %v\n", err)
} else {
Expand Down
3 changes: 1 addition & 2 deletions model/data_package.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions model/datapackage.avsc
Expand Up @@ -59,10 +59,6 @@
]
}
}
},
{
"name": "rawdata",
"type": "bytes"
}
]
}
30 changes: 0 additions & 30 deletions model/primitive.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit c12fb74

Please sign in to comment.