diff --git a/gateway.go b/gateway.go index 1694a6b..ea4e09c 100644 --- a/gateway.go +++ b/gateway.go @@ -1,11 +1,15 @@ package main import ( + "Gateway-Server/model" + "bytes" + "context" "database/sql" "encoding/json" "fmt" _ "github.com/go-sql-driver/mysql" "github.com/gorilla/websocket" + "github.com/segmentio/kafka-go" "io/ioutil" "net/http" "os" @@ -117,8 +121,63 @@ func (ws *socketConn) handleData(data Packet) { fmt.Printf("Error unmarshalling data: %v\n", err) } else { fmt.Printf("got some data: %v\n", msg) - // TODO: Better response handling ws.lock.Lock() + var dataPackage = avro.NewDataPackage() + dataPackage.Generator = int32(genMap.mapping[ws.genId].databaseId) + // TODO: Have to get the generators state and region, probably in the same generator id map structure + dataPackage.State = avro.State(avro.StateCT) + dataPackage.Region = avro.Region(avro.RegionNORTHEAST) + for metric, stampedReadings := range msg { + var metricData = avro.NewMetricData() + metricData.Metric = metric + for _, reading := range stampedReadings { + var dataPoint = avro.NewDataPoint() + dataPoint.Timestamp = int64(reading.Timestamp) + var val = avro.UnionLongDouble{} + switch v := (reading.Value).(type) { + case int: + case int8: + case int16: + case int32: + case int64: + val.Long = int64(v) + val.UnionType = avro.UnionLongDoubleTypeEnumLong + break + case float32: + case float64: + val.Double = float64(v) + val.UnionType = avro.UnionLongDoubleTypeEnumDouble + break + } + dataPoint.Value = val + metricData.Datapoints = append(metricData.Datapoints, dataPoint) + } + dataPackage.Data = append(dataPackage.Data, metricData) + } + out := new(bytes.Buffer) + if err := dataPackage.Serialize(out); err != nil { + fmt.Printf("Error serializing data: %v\n", err) + } + + w := kafka.NewWriter(kafka.WriterConfig{ + Brokers: []string{"sd5-data.engr.uconn.edu:9092"}, + Topic: "GeneratorData", + Balancer: &kafka.LeastBytes{}, + }) + + if err := w.WriteMessages(context.Background(), + kafka.Message{ + Key: []byte("DataPackage"), + Value: out.Bytes(), + }, + ); err != nil { + fmt.Printf("Error writing to Kafka: %v\v", err) + } + + if err := w.Close(); err != nil { + fmt.Printf("Error closing kafka? Weird: %v\n", err) + } + if err := ws.conn.WriteJSON(Packet{data.ID, "response", []byte("received")}); err != nil { fmt.Printf("Error sending response: %v\n", err) } else { diff --git a/model/data_package.go b/model/data_package.go new file mode 100644 index 0000000..ed582f7 --- /dev/null +++ b/model/data_package.go @@ -0,0 +1,38 @@ +// Code generated by github.com/actgardner/gogen-avro. DO NOT EDIT. +/* + * SOURCE: + * datapackage.avsc + */ + +package avro + +import ( + "io" +) + +type DataPackage struct { + Generator int32 + Region Region + State State + Data []*MetricData +} + +func DeserializeDataPackage(r io.Reader) (*DataPackage, error) { + return readDataPackage(r) +} + +func NewDataPackage() *DataPackage { + v := &DataPackage{ + Data: make([]*MetricData, 0), + } + + return v +} + +func (r *DataPackage) Schema() string { + return "{\"fields\":[{\"name\":\"generator\",\"type\":\"int\"},{\"name\":\"region\",\"type\":{\"name\":\"Region\",\"symbols\":[\"NORTHEAST\",\"SOUTHEAST\",\"WEST\",\"MIDWEST\"],\"type\":\"enum\"}},{\"name\":\"state\",\"type\":{\"name\":\"State\",\"symbols\":[\"AL\",\"AK\",\"AZ\",\"AR\",\"CA\",\"CO\",\"CT\",\"DE\",\"FL\",\"GA\",\"HI\",\"ID\",\"IL\",\"IN\",\"IA\",\"KS\",\"KY\",\"LA\",\"ME\",\"MD\",\"MA\",\"MI\",\"MN\",\"MS\",\"MO\",\"MT\",\"NE\",\"NV\",\"NH\",\"NJ\",\"NM\",\"NY\",\"NC\",\"ND\",\"OH\",\"OK\",\"OR\",\"PA\",\"RI\",\"SC\",\"SD\",\"TN\",\"TX\",\"UT\",\"VT\",\"VA\",\"WA\",\"WV\",\"WI\",\"WY\"],\"type\":\"enum\"}},{\"name\":\"data\",\"type\":{\"items\":{\"fields\":[{\"name\":\"metric\",\"type\":\"string\"},{\"name\":\"datapoints\",\"type\":{\"items\":{\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"value\",\"type\":[\"long\",\"double\"]}],\"name\":\"DataPoint\",\"type\":\"record\"},\"type\":\"array\"}}],\"name\":\"MetricData\",\"type\":\"record\"},\"type\":\"array\"}}],\"name\":\"DataPackage\",\"namespace\":\"com.powerpanel.kso.model\",\"type\":\"record\"}" +} + +func (r *DataPackage) Serialize(w io.Writer) error { + return writeDataPackage(r, w) +} diff --git a/model/data_point.go b/model/data_point.go new file mode 100644 index 0000000..bf1a56d --- /dev/null +++ b/model/data_point.go @@ -0,0 +1,34 @@ +// Code generated by github.com/actgardner/gogen-avro. DO NOT EDIT. +/* + * SOURCE: + * datapackage.avsc + */ + +package avro + +import ( + "io" +) + +type DataPoint struct { + Timestamp int64 + Value UnionLongDouble +} + +func DeserializeDataPoint(r io.Reader) (*DataPoint, error) { + return readDataPoint(r) +} + +func NewDataPoint() *DataPoint { + v := &DataPoint{} + + return v +} + +func (r *DataPoint) Schema() string { + return "{\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"value\",\"type\":[\"long\",\"double\"]}],\"name\":\"DataPoint\",\"type\":\"record\"}" +} + +func (r *DataPoint) Serialize(w io.Writer) error { + return writeDataPoint(r, w) +} diff --git a/model/datapackage.avsc b/model/datapackage.avsc new file mode 100644 index 0000000..948b7dc --- /dev/null +++ b/model/datapackage.avsc @@ -0,0 +1,64 @@ +{ + "namespace": "com.powerpanel.kso.model", + "type": "record", + "name": "DataPackage", + "fields": [ + { + "type": "int", + "name": "generator" + }, + { + "name": "region", + "type": { + "name": "Region", + "type": "enum", + "symbols": ["NORTHEAST", "SOUTHEAST", "WEST", "MIDWEST"] + } + }, + { + "name": "state", + "type": { + "name": "State", + "type": "enum", + "symbols": ["AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"] + } + }, + { + "name": "data", + "type": { + "type": "array", + "items": { + "name": "MetricData", + "type": "record", + "fields": [ + { + "name": "metric", + "type": "string" + }, + { + "name": "datapoints", + "type": { + "type": "array", + "items": { + "name": "DataPoint", + "type": "record", + "fields": [ + { + "name": "timestamp", + "type": "long" + }, + { + "name": "value", + "type": ["long", "double"] + } + ] + } + + } + } + ] + } + } + } + ] +} \ No newline at end of file diff --git a/model/metric_data.go b/model/metric_data.go new file mode 100644 index 0000000..9c0833a --- /dev/null +++ b/model/metric_data.go @@ -0,0 +1,36 @@ +// Code generated by github.com/actgardner/gogen-avro. DO NOT EDIT. +/* + * SOURCE: + * datapackage.avsc + */ + +package avro + +import ( + "io" +) + +type MetricData struct { + Metric string + Datapoints []*DataPoint +} + +func DeserializeMetricData(r io.Reader) (*MetricData, error) { + return readMetricData(r) +} + +func NewMetricData() *MetricData { + v := &MetricData{ + Datapoints: make([]*DataPoint, 0), + } + + return v +} + +func (r *MetricData) Schema() string { + return "{\"fields\":[{\"name\":\"metric\",\"type\":\"string\"},{\"name\":\"datapoints\",\"type\":{\"items\":{\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"value\",\"type\":[\"long\",\"double\"]}],\"name\":\"DataPoint\",\"type\":\"record\"},\"type\":\"array\"}}],\"name\":\"MetricData\",\"type\":\"record\"}" +} + +func (r *MetricData) Serialize(w io.Writer) error { + return writeMetricData(r, w) +} diff --git a/model/primitive.go b/model/primitive.go new file mode 100644 index 0000000..16a8634 --- /dev/null +++ b/model/primitive.go @@ -0,0 +1,448 @@ +// Code generated by github.com/actgardner/gogen-avro. DO NOT EDIT. +/* + * SOURCE: + * datapackage.avsc + */ + +package avro + +import ( + "encoding/binary" + "fmt" + "io" + "math" +) + +type ByteWriter interface { + Grow(int) + WriteByte(byte) error +} + +type StringWriter interface { + WriteString(string) (int, error) +} + +func encodeFloat(w io.Writer, byteCount int, bits uint64) error { + var err error + var bb []byte + bw, ok := w.(ByteWriter) + if ok { + bw.Grow(byteCount) + } else { + bb = make([]byte, 0, byteCount) + } + for i := 0; i < byteCount; i++ { + if bw != nil { + err = bw.WriteByte(byte(bits & 255)) + if err != nil { + return err + } + } else { + bb = append(bb, byte(bits&255)) + } + bits = bits >> 8 + } + if bw == nil { + _, err = w.Write(bb) + return err + } + return nil +} + +func encodeInt(w io.Writer, byteCount int, encoded uint64) error { + var err error + var bb []byte + bw, ok := w.(ByteWriter) + // To avoid reallocations, grow capacity to the largest possible size + // for this integer + if ok { + bw.Grow(byteCount) + } else { + bb = make([]byte, 0, byteCount) + } + + if encoded == 0 { + if bw != nil { + err = bw.WriteByte(0) + if err != nil { + return err + } + } else { + bb = append(bb, byte(0)) + } + } else { + for encoded > 0 { + b := byte(encoded & 127) + encoded = encoded >> 7 + if !(encoded == 0) { + b |= 128 + } + if bw != nil { + err = bw.WriteByte(b) + if err != nil { + return err + } + } else { + bb = append(bb, b) + } + } + } + if bw == nil { + _, err := w.Write(bb) + return err + } + return nil + +} + +func readArrayDataPoint(r io.Reader) ([]*DataPoint, error) { + var err error + var blkSize int64 + var arr = make([]*DataPoint, 0) + for { + blkSize, err = readLong(r) + if err != nil { + return nil, err + } + if blkSize == 0 { + break + } + if blkSize < 0 { + blkSize = -blkSize + _, err = readLong(r) + if err != nil { + return nil, err + } + } + for i := int64(0); i < blkSize; i++ { + elem, err := readDataPoint(r) + if err != nil { + return nil, err + } + arr = append(arr, elem) + } + } + return arr, nil +} + +func readArrayMetricData(r io.Reader) ([]*MetricData, error) { + var err error + var blkSize int64 + var arr = make([]*MetricData, 0) + for { + blkSize, err = readLong(r) + if err != nil { + return nil, err + } + if blkSize == 0 { + break + } + if blkSize < 0 { + blkSize = -blkSize + _, err = readLong(r) + if err != nil { + return nil, err + } + } + for i := int64(0); i < blkSize; i++ { + elem, err := readMetricData(r) + if err != nil { + return nil, err + } + arr = append(arr, elem) + } + } + return arr, nil +} + +func readDataPackage(r io.Reader) (*DataPackage, error) { + var str = &DataPackage{} + var err error + str.Generator, err = readInt(r) + if err != nil { + return nil, err + } + str.Region, err = readRegion(r) + if err != nil { + return nil, err + } + str.State, err = readState(r) + if err != nil { + return nil, err + } + str.Data, err = readArrayMetricData(r) + if err != nil { + return nil, err + } + + return str, nil +} + +func readDataPoint(r io.Reader) (*DataPoint, error) { + var str = &DataPoint{} + var err error + str.Timestamp, err = readLong(r) + if err != nil { + return nil, err + } + str.Value, err = readUnionLongDouble(r) + if err != nil { + return nil, err + } + + return str, nil +} + +func readDouble(r io.Reader) (float64, error) { + buf := make([]byte, 8) + _, err := io.ReadFull(r, buf) + if err != nil { + return 0, err + } + bits := binary.LittleEndian.Uint64(buf) + val := math.Float64frombits(bits) + return val, nil +} + +func readInt(r io.Reader) (int32, error) { + var v int + buf := make([]byte, 1) + for shift := uint(0); ; shift += 7 { + if _, err := io.ReadFull(r, buf); err != nil { + return 0, err + } + b := buf[0] + v |= int(b&127) << shift + if b&128 == 0 { + break + } + } + datum := (int32(v>>1) ^ -int32(v&1)) + return datum, nil +} + +func readLong(r io.Reader) (int64, error) { + var v uint64 + buf := make([]byte, 1) + for shift := uint(0); ; shift += 7 { + if _, err := io.ReadFull(r, buf); err != nil { + return 0, err + } + b := buf[0] + v |= uint64(b&127) << shift + if b&128 == 0 { + break + } + } + datum := (int64(v>>1) ^ -int64(v&1)) + return datum, nil +} + +func readMetricData(r io.Reader) (*MetricData, error) { + var str = &MetricData{} + var err error + str.Metric, err = readString(r) + if err != nil { + return nil, err + } + str.Datapoints, err = readArrayDataPoint(r) + if err != nil { + return nil, err + } + + return str, nil +} + +func readRegion(r io.Reader) (Region, error) { + val, err := readInt(r) + return Region(val), err +} + +func readState(r io.Reader) (State, error) { + val, err := readInt(r) + return State(val), err +} + +func readString(r io.Reader) (string, error) { + len, err := readLong(r) + if err != nil { + return "", err + } + + // makeslice can fail depending on available memory. + // We arbitrarily limit string size to sane default (~2.2GB). + if len < 0 || len > math.MaxInt32 { + return "", fmt.Errorf("string length out of range: %d", len) + } + + if len == 0 { + return "", nil + } + + bb := make([]byte, len) + _, err = io.ReadFull(r, bb) + if err != nil { + return "", err + } + return string(bb), nil +} + +func readUnionLongDouble(r io.Reader) (UnionLongDouble, error) { + field, err := readLong(r) + var unionStr UnionLongDouble + if err != nil { + return unionStr, err + } + unionStr.UnionType = UnionLongDoubleTypeEnum(field) + switch unionStr.UnionType { + case UnionLongDoubleTypeEnumLong: + val, err := readLong(r) + if err != nil { + return unionStr, err + } + unionStr.Long = val + case UnionLongDoubleTypeEnumDouble: + val, err := readDouble(r) + if err != nil { + return unionStr, err + } + unionStr.Double = val + + default: + return unionStr, fmt.Errorf("invalid value for UnionLongDouble") + } + return unionStr, nil +} + +func writeArrayDataPoint(r []*DataPoint, w io.Writer) error { + err := writeLong(int64(len(r)), w) + if err != nil || len(r) == 0 { + return err + } + for _, e := range r { + err = writeDataPoint(e, w) + if err != nil { + return err + } + } + return writeLong(0, w) +} + +func writeArrayMetricData(r []*MetricData, w io.Writer) error { + err := writeLong(int64(len(r)), w) + if err != nil || len(r) == 0 { + return err + } + for _, e := range r { + err = writeMetricData(e, w) + if err != nil { + return err + } + } + return writeLong(0, w) +} + +func writeDataPackage(r *DataPackage, w io.Writer) error { + var err error + err = writeInt(r.Generator, w) + if err != nil { + return err + } + err = writeRegion(r.Region, w) + if err != nil { + return err + } + err = writeState(r.State, w) + if err != nil { + return err + } + err = writeArrayMetricData(r.Data, w) + if err != nil { + return err + } + + return nil +} +func writeDataPoint(r *DataPoint, w io.Writer) error { + var err error + err = writeLong(r.Timestamp, w) + if err != nil { + return err + } + err = writeUnionLongDouble(r.Value, w) + if err != nil { + return err + } + + return nil +} + +func writeDouble(r float64, w io.Writer) error { + bits := uint64(math.Float64bits(r)) + const byteCount = 8 + return encodeFloat(w, byteCount, bits) +} + +func writeInt(r int32, w io.Writer) error { + downShift := uint32(31) + encoded := uint64((uint32(r) << 1) ^ uint32(r>>downShift)) + const maxByteSize = 5 + return encodeInt(w, maxByteSize, encoded) +} + +func writeLong(r int64, w io.Writer) error { + downShift := uint64(63) + encoded := uint64((r << 1) ^ (r >> downShift)) + const maxByteSize = 10 + return encodeInt(w, maxByteSize, encoded) +} + +func writeMetricData(r *MetricData, w io.Writer) error { + var err error + err = writeString(r.Metric, w) + if err != nil { + return err + } + err = writeArrayDataPoint(r.Datapoints, w) + if err != nil { + return err + } + + return nil +} + +func writeRegion(r Region, w io.Writer) error { + return writeInt(int32(r), w) +} + +func writeState(r State, w io.Writer) error { + return writeInt(int32(r), w) +} + +func writeString(r string, w io.Writer) error { + err := writeLong(int64(len(r)), w) + if err != nil { + return err + } + if sw, ok := w.(StringWriter); ok { + _, err = sw.WriteString(r) + } else { + _, err = w.Write([]byte(r)) + } + return err +} + +func writeUnionLongDouble(r UnionLongDouble, w io.Writer) error { + err := writeLong(int64(r.UnionType), w) + if err != nil { + return err + } + switch r.UnionType { + case UnionLongDoubleTypeEnumLong: + return writeLong(r.Long, w) + case UnionLongDoubleTypeEnumDouble: + return writeDouble(r.Double, w) + + } + return fmt.Errorf("invalid value for UnionLongDouble") +} diff --git a/model/region.go b/model/region.go new file mode 100644 index 0000000..ae0e471 --- /dev/null +++ b/model/region.go @@ -0,0 +1,31 @@ +// Code generated by github.com/actgardner/gogen-avro. DO NOT EDIT. +/* + * SOURCE: + * datapackage.avsc + */ + +package avro + +type Region int32 + +const ( + RegionNORTHEAST Region = 0 + RegionSOUTHEAST Region = 1 + RegionWEST Region = 2 + RegionMIDWEST Region = 3 +) + +func (e Region) String() string { + switch e { + case RegionNORTHEAST: + return "NORTHEAST" + case RegionSOUTHEAST: + return "SOUTHEAST" + case RegionWEST: + return "WEST" + case RegionMIDWEST: + return "MIDWEST" + + } + return "unknown" +} diff --git a/model/state.go b/model/state.go new file mode 100644 index 0000000..2b354da --- /dev/null +++ b/model/state.go @@ -0,0 +1,169 @@ +// Code generated by github.com/actgardner/gogen-avro. DO NOT EDIT. +/* + * SOURCE: + * datapackage.avsc + */ + +package avro + +type State int32 + +const ( + StateAL State = 0 + StateAK State = 1 + StateAZ State = 2 + StateAR State = 3 + StateCA State = 4 + StateCO State = 5 + StateCT State = 6 + StateDE State = 7 + StateFL State = 8 + StateGA State = 9 + StateHI State = 10 + StateID State = 11 + StateIL State = 12 + StateIN State = 13 + StateIA State = 14 + StateKS State = 15 + StateKY State = 16 + StateLA State = 17 + StateME State = 18 + StateMD State = 19 + StateMA State = 20 + StateMI State = 21 + StateMN State = 22 + StateMS State = 23 + StateMO State = 24 + StateMT State = 25 + StateNE State = 26 + StateNV State = 27 + StateNH State = 28 + StateNJ State = 29 + StateNM State = 30 + StateNY State = 31 + StateNC State = 32 + StateND State = 33 + StateOH State = 34 + StateOK State = 35 + StateOR State = 36 + StatePA State = 37 + StateRI State = 38 + StateSC State = 39 + StateSD State = 40 + StateTN State = 41 + StateTX State = 42 + StateUT State = 43 + StateVT State = 44 + StateVA State = 45 + StateWA State = 46 + StateWV State = 47 + StateWI State = 48 + StateWY State = 49 +) + +func (e State) String() string { + switch e { + case StateAL: + return "AL" + case StateAK: + return "AK" + case StateAZ: + return "AZ" + case StateAR: + return "AR" + case StateCA: + return "CA" + case StateCO: + return "CO" + case StateCT: + return "CT" + case StateDE: + return "DE" + case StateFL: + return "FL" + case StateGA: + return "GA" + case StateHI: + return "HI" + case StateID: + return "ID" + case StateIL: + return "IL" + case StateIN: + return "IN" + case StateIA: + return "IA" + case StateKS: + return "KS" + case StateKY: + return "KY" + case StateLA: + return "LA" + case StateME: + return "ME" + case StateMD: + return "MD" + case StateMA: + return "MA" + case StateMI: + return "MI" + case StateMN: + return "MN" + case StateMS: + return "MS" + case StateMO: + return "MO" + case StateMT: + return "MT" + case StateNE: + return "NE" + case StateNV: + return "NV" + case StateNH: + return "NH" + case StateNJ: + return "NJ" + case StateNM: + return "NM" + case StateNY: + return "NY" + case StateNC: + return "NC" + case StateND: + return "ND" + case StateOH: + return "OH" + case StateOK: + return "OK" + case StateOR: + return "OR" + case StatePA: + return "PA" + case StateRI: + return "RI" + case StateSC: + return "SC" + case StateSD: + return "SD" + case StateTN: + return "TN" + case StateTX: + return "TX" + case StateUT: + return "UT" + case StateVT: + return "VT" + case StateVA: + return "VA" + case StateWA: + return "WA" + case StateWV: + return "WV" + case StateWI: + return "WI" + case StateWY: + return "WY" + + } + return "unknown" +} diff --git a/model/union_long_double.go b/model/union_long_double.go new file mode 100644 index 0000000..92af4e2 --- /dev/null +++ b/model/union_long_double.go @@ -0,0 +1,20 @@ +// Code generated by github.com/actgardner/gogen-avro. DO NOT EDIT. +/* + * SOURCE: + * datapackage.avsc + */ + +package avro + +type UnionLongDouble struct { + Long int64 + Double float64 + UnionType UnionLongDoubleTypeEnum +} + +type UnionLongDoubleTypeEnum int + +const ( + UnionLongDoubleTypeEnumLong UnionLongDoubleTypeEnum = 0 + UnionLongDoubleTypeEnumDouble UnionLongDoubleTypeEnum = 1 +)