From eb290ec8659477a9755d6271a08b2bb67beefe0b Mon Sep 17 00:00:00 2001 From: Evan Langlais Date: Thu, 14 Mar 2019 09:43:03 -0400 Subject: [PATCH] Adding organization id to schema --- gateway.go | 7 ++++--- model/data_package.go | 11 ++++++----- model/datapackage.avsc | 4 ++++ model/primitive.go | 8 ++++++++ 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/gateway.go b/gateway.go index ea4e09c..22e2c49 100644 --- a/gateway.go +++ b/gateway.go @@ -26,7 +26,7 @@ var upgrader = websocket.Upgrader{} func main() { - dbConnect() // Connect to the database + dbConnect() // Connect to the database defer func() { // TODO: handle SIGINT if err := dbConn.Close(); err != nil { fmt.Printf("Error closing db connection: %v\n", err) @@ -125,6 +125,7 @@ func (ws *socketConn) handleData(data Packet) { var dataPackage = avro.NewDataPackage() dataPackage.Generator = int32(genMap.mapping[ws.genId].databaseId) // TODO: Have to get the generators state and region, probably in the same generator id map structure + dataPackage.Organization = int32(1) dataPackage.State = avro.State(avro.StateCT) dataPackage.Region = avro.Region(avro.RegionNORTHEAST) for metric, stampedReadings := range msg { @@ -160,8 +161,8 @@ func (ws *socketConn) handleData(data Packet) { } w := kafka.NewWriter(kafka.WriterConfig{ - Brokers: []string{"sd5-data.engr.uconn.edu:9092"}, - Topic: "GeneratorData", + Brokers: []string{"sd5-data.engr.uconn.edu:9092"}, + Topic: "GeneratorData", Balancer: &kafka.LeastBytes{}, }) diff --git a/model/data_package.go b/model/data_package.go index ed582f7..9bc61ee 100644 --- a/model/data_package.go +++ b/model/data_package.go @@ -11,10 +11,11 @@ import ( ) type DataPackage struct { - Generator int32 - Region Region - State State - Data []*MetricData + Generator int32 + Organization int32 + Region Region + State State + Data []*MetricData } func DeserializeDataPackage(r io.Reader) (*DataPackage, error) { @@ -30,7 +31,7 @@ func NewDataPackage() *DataPackage { } func (r *DataPackage) Schema() string { - return "{\"fields\":[{\"name\":\"generator\",\"type\":\"int\"},{\"name\":\"region\",\"type\":{\"name\":\"Region\",\"symbols\":[\"NORTHEAST\",\"SOUTHEAST\",\"WEST\",\"MIDWEST\"],\"type\":\"enum\"}},{\"name\":\"state\",\"type\":{\"name\":\"State\",\"symbols\":[\"AL\",\"AK\",\"AZ\",\"AR\",\"CA\",\"CO\",\"CT\",\"DE\",\"FL\",\"GA\",\"HI\",\"ID\",\"IL\",\"IN\",\"IA\",\"KS\",\"KY\",\"LA\",\"ME\",\"MD\",\"MA\",\"MI\",\"MN\",\"MS\",\"MO\",\"MT\",\"NE\",\"NV\",\"NH\",\"NJ\",\"NM\",\"NY\",\"NC\",\"ND\",\"OH\",\"OK\",\"OR\",\"PA\",\"RI\",\"SC\",\"SD\",\"TN\",\"TX\",\"UT\",\"VT\",\"VA\",\"WA\",\"WV\",\"WI\",\"WY\"],\"type\":\"enum\"}},{\"name\":\"data\",\"type\":{\"items\":{\"fields\":[{\"name\":\"metric\",\"type\":\"string\"},{\"name\":\"datapoints\",\"type\":{\"items\":{\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"value\",\"type\":[\"long\",\"double\"]}],\"name\":\"DataPoint\",\"type\":\"record\"},\"type\":\"array\"}}],\"name\":\"MetricData\",\"type\":\"record\"},\"type\":\"array\"}}],\"name\":\"DataPackage\",\"namespace\":\"com.powerpanel.kso.model\",\"type\":\"record\"}" + return "{\"fields\":[{\"name\":\"generator\",\"type\":\"int\"},{\"name\":\"organization\",\"type\":\"int\"},{\"name\":\"region\",\"type\":{\"name\":\"Region\",\"symbols\":[\"NORTHEAST\",\"SOUTHEAST\",\"WEST\",\"MIDWEST\"],\"type\":\"enum\"}},{\"name\":\"state\",\"type\":{\"name\":\"State\",\"symbols\":[\"AL\",\"AK\",\"AZ\",\"AR\",\"CA\",\"CO\",\"CT\",\"DE\",\"FL\",\"GA\",\"HI\",\"ID\",\"IL\",\"IN\",\"IA\",\"KS\",\"KY\",\"LA\",\"ME\",\"MD\",\"MA\",\"MI\",\"MN\",\"MS\",\"MO\",\"MT\",\"NE\",\"NV\",\"NH\",\"NJ\",\"NM\",\"NY\",\"NC\",\"ND\",\"OH\",\"OK\",\"OR\",\"PA\",\"RI\",\"SC\",\"SD\",\"TN\",\"TX\",\"UT\",\"VT\",\"VA\",\"WA\",\"WV\",\"WI\",\"WY\"],\"type\":\"enum\"}},{\"name\":\"data\",\"type\":{\"items\":{\"fields\":[{\"name\":\"metric\",\"type\":\"string\"},{\"name\":\"datapoints\",\"type\":{\"items\":{\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"value\",\"type\":[\"long\",\"double\"]}],\"name\":\"DataPoint\",\"type\":\"record\"},\"type\":\"array\"}}],\"name\":\"MetricData\",\"type\":\"record\"},\"type\":\"array\"}}],\"name\":\"DataPackage\",\"namespace\":\"com.powerpanel.kso.model\",\"type\":\"record\"}" } func (r *DataPackage) Serialize(w io.Writer) error { diff --git a/model/datapackage.avsc b/model/datapackage.avsc index 948b7dc..70ed178 100644 --- a/model/datapackage.avsc +++ b/model/datapackage.avsc @@ -7,6 +7,10 @@ "type": "int", "name": "generator" }, + { + "type": "int", + "name": "organization" + }, { "name": "region", "type": { diff --git a/model/primitive.go b/model/primitive.go index 16a8634..21febc6 100644 --- a/model/primitive.go +++ b/model/primitive.go @@ -162,6 +162,10 @@ func readDataPackage(r io.Reader) (*DataPackage, error) { if err != nil { return nil, err } + str.Organization, err = readInt(r) + if err != nil { + return nil, err + } str.Region, err = readRegion(r) if err != nil { return nil, err @@ -348,6 +352,10 @@ func writeDataPackage(r *DataPackage, w io.Writer) error { if err != nil { return err } + err = writeInt(r.Organization, w) + if err != nil { + return err + } err = writeRegion(r.Region, w) if err != nil { return err