diff --git a/gateway.go b/gateway.go index 1694a6b..ea4e09c 100644 --- a/gateway.go +++ b/gateway.go @@ -1,11 +1,15 @@ package main import ( + "Gateway-Server/model" + "bytes" + "context" "database/sql" "encoding/json" "fmt" _ "github.com/go-sql-driver/mysql" "github.com/gorilla/websocket" + "github.com/segmentio/kafka-go" "io/ioutil" "net/http" "os" @@ -117,8 +121,63 @@ func (ws *socketConn) handleData(data Packet) { fmt.Printf("Error unmarshalling data: %v\n", err) } else { fmt.Printf("got some data: %v\n", msg) - // TODO: Better response handling ws.lock.Lock() + var dataPackage = avro.NewDataPackage() + dataPackage.Generator = int32(genMap.mapping[ws.genId].databaseId) + // TODO: Have to get the generators state and region, probably in the same generator id map structure + dataPackage.State = avro.State(avro.StateCT) + dataPackage.Region = avro.Region(avro.RegionNORTHEAST) + for metric, stampedReadings := range msg { + var metricData = avro.NewMetricData() + metricData.Metric = metric + for _, reading := range stampedReadings { + var dataPoint = avro.NewDataPoint() + dataPoint.Timestamp = int64(reading.Timestamp) + var val = avro.UnionLongDouble{} + switch v := (reading.Value).(type) { + case int: + case int8: + case int16: + case int32: + case int64: + val.Long = int64(v) + val.UnionType = avro.UnionLongDoubleTypeEnumLong + break + case float32: + case float64: + val.Double = float64(v) + val.UnionType = avro.UnionLongDoubleTypeEnumDouble + break + } + dataPoint.Value = val + metricData.Datapoints = append(metricData.Datapoints, dataPoint) + } + dataPackage.Data = append(dataPackage.Data, metricData) + } + out := new(bytes.Buffer) + if err := dataPackage.Serialize(out); err != nil { + fmt.Printf("Error serializing data: %v\n", err) + } + + w := kafka.NewWriter(kafka.WriterConfig{ + Brokers: []string{"sd5-data.engr.uconn.edu:9092"}, + Topic: "GeneratorData", + Balancer: &kafka.LeastBytes{}, + }) + + if err := w.WriteMessages(context.Background(), + kafka.Message{ + Key: []byte("DataPackage"), + Value: out.Bytes(), + }, + ); err != nil { + fmt.Printf("Error writing to Kafka: %v\v", err) + } + + if err := w.Close(); err != nil { + fmt.Printf("Error closing kafka? Weird: %v\n", err) + } + if err := ws.conn.WriteJSON(Packet{data.ID, "response", []byte("received")}); err != nil { fmt.Printf("Error sending response: %v\n", err) } else { diff --git a/model/data_package.go b/model/data_package.go index 94f18e7..ed582f7 100644 --- a/model/data_package.go +++ b/model/data_package.go @@ -15,7 +15,6 @@ type DataPackage struct { Region Region State State Data []*MetricData - Rawdata []byte } func DeserializeDataPackage(r io.Reader) (*DataPackage, error) { @@ -31,7 +30,7 @@ func NewDataPackage() *DataPackage { } func (r *DataPackage) Schema() string { - return "{\"fields\":[{\"name\":\"generator\",\"type\":\"int\"},{\"name\":\"region\",\"type\":{\"name\":\"Region\",\"symbols\":[\"NORTHEAST\",\"SOUTHEAST\",\"WEST\",\"MIDWEST\"],\"type\":\"enum\"}},{\"name\":\"state\",\"type\":{\"name\":\"State\",\"symbols\":[\"AL\",\"AK\",\"AZ\",\"AR\",\"CA\",\"CO\",\"CT\",\"DE\",\"FL\",\"GA\",\"HI\",\"ID\",\"IL\",\"IN\",\"IA\",\"KS\",\"KY\",\"LA\",\"ME\",\"MD\",\"MA\",\"MI\",\"MN\",\"MS\",\"MO\",\"MT\",\"NE\",\"NV\",\"NH\",\"NJ\",\"NM\",\"NY\",\"NC\",\"ND\",\"OH\",\"OK\",\"OR\",\"PA\",\"RI\",\"SC\",\"SD\",\"TN\",\"TX\",\"UT\",\"VT\",\"VA\",\"WA\",\"WV\",\"WI\",\"WY\"],\"type\":\"enum\"}},{\"name\":\"data\",\"type\":{\"items\":{\"fields\":[{\"name\":\"metric\",\"type\":\"string\"},{\"name\":\"datapoints\",\"type\":{\"items\":{\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"value\",\"type\":[\"long\",\"double\"]}],\"name\":\"DataPoint\",\"type\":\"record\"},\"type\":\"array\"}}],\"name\":\"MetricData\",\"type\":\"record\"},\"type\":\"array\"}},{\"name\":\"rawdata\",\"type\":\"bytes\"}],\"name\":\"DataPackage\",\"namespace\":\"com.powerpanel.kso.model\",\"type\":\"record\"}" + return "{\"fields\":[{\"name\":\"generator\",\"type\":\"int\"},{\"name\":\"region\",\"type\":{\"name\":\"Region\",\"symbols\":[\"NORTHEAST\",\"SOUTHEAST\",\"WEST\",\"MIDWEST\"],\"type\":\"enum\"}},{\"name\":\"state\",\"type\":{\"name\":\"State\",\"symbols\":[\"AL\",\"AK\",\"AZ\",\"AR\",\"CA\",\"CO\",\"CT\",\"DE\",\"FL\",\"GA\",\"HI\",\"ID\",\"IL\",\"IN\",\"IA\",\"KS\",\"KY\",\"LA\",\"ME\",\"MD\",\"MA\",\"MI\",\"MN\",\"MS\",\"MO\",\"MT\",\"NE\",\"NV\",\"NH\",\"NJ\",\"NM\",\"NY\",\"NC\",\"ND\",\"OH\",\"OK\",\"OR\",\"PA\",\"RI\",\"SC\",\"SD\",\"TN\",\"TX\",\"UT\",\"VT\",\"VA\",\"WA\",\"WV\",\"WI\",\"WY\"],\"type\":\"enum\"}},{\"name\":\"data\",\"type\":{\"items\":{\"fields\":[{\"name\":\"metric\",\"type\":\"string\"},{\"name\":\"datapoints\",\"type\":{\"items\":{\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"value\",\"type\":[\"long\",\"double\"]}],\"name\":\"DataPoint\",\"type\":\"record\"},\"type\":\"array\"}}],\"name\":\"MetricData\",\"type\":\"record\"},\"type\":\"array\"}}],\"name\":\"DataPackage\",\"namespace\":\"com.powerpanel.kso.model\",\"type\":\"record\"}" } func (r *DataPackage) Serialize(w io.Writer) error { diff --git a/model/datapackage.avsc b/model/datapackage.avsc index 6e98a07..948b7dc 100644 --- a/model/datapackage.avsc +++ b/model/datapackage.avsc @@ -59,10 +59,6 @@ ] } } - }, - { - "name": "rawdata", - "type": "bytes" } ] } \ No newline at end of file diff --git a/model/primitive.go b/model/primitive.go index 4a68b10..16a8634 100644 --- a/model/primitive.go +++ b/model/primitive.go @@ -155,19 +155,6 @@ func readArrayMetricData(r io.Reader) ([]*MetricData, error) { return arr, nil } -func readBytes(r io.Reader) ([]byte, error) { - size, err := readLong(r) - if err != nil { - return nil, err - } - if size == 0 { - return []byte{}, nil - } - bb := make([]byte, size) - _, err = io.ReadFull(r, bb) - return bb, err -} - func readDataPackage(r io.Reader) (*DataPackage, error) { var str = &DataPackage{} var err error @@ -187,10 +174,6 @@ func readDataPackage(r io.Reader) (*DataPackage, error) { if err != nil { return nil, err } - str.Rawdata, err = readBytes(r) - if err != nil { - return nil, err - } return str, nil } @@ -359,15 +342,6 @@ func writeArrayMetricData(r []*MetricData, w io.Writer) error { return writeLong(0, w) } -func writeBytes(r []byte, w io.Writer) error { - err := writeLong(int64(len(r)), w) - if err != nil { - return err - } - _, err = w.Write(r) - return err -} - func writeDataPackage(r *DataPackage, w io.Writer) error { var err error err = writeInt(r.Generator, w) @@ -386,10 +360,6 @@ func writeDataPackage(r *DataPackage, w io.Writer) error { if err != nil { return err } - err = writeBytes(r.Rawdata, w) - if err != nil { - return err - } return nil }