diff --git a/src/main/resources/datapackage.avsc b/src/main/resources/datapackage.avsc index 948b7dc..70ed178 100644 --- a/src/main/resources/datapackage.avsc +++ b/src/main/resources/datapackage.avsc @@ -7,6 +7,10 @@ "type": "int", "name": "generator" }, + { + "type": "int", + "name": "organization" + }, { "name": "region", "type": { diff --git a/src/main/scala/com/powerpanel/kso/KafkaInput.scala b/src/main/scala/com/powerpanel/kso/KafkaInput.scala index fb19b7e..fd8c11e 100644 --- a/src/main/scala/com/powerpanel/kso/KafkaInput.scala +++ b/src/main/scala/com/powerpanel/kso/KafkaInput.scala @@ -15,18 +15,18 @@ class KafkaInput extends Serializable { val props = AppConfig.loadProperties(); val topicsSet = props.getProperty("kafka.topic").split(",").toSet; - val kafkaParams = collection.mutable.Map[String, String]("bootstrap.servers" -> props.getProperty("kafka.brokers")) + val kafkaParams = collection.mutable.Map[String, String]("bootstrap.servers" -> props.getProperty("kafka.brokers")) if (props.getProperty("kafka.consume_from_beginning").toBoolean) { kafkaParams.put("auto.offset.reset", "earliest"); } kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - kafkaParams.put("group.id", props.getProperty("kafka.group_id")); + kafkaParams.put("group.id", props.getProperty("kafka.group_id")); val messages = KafkaUtils.createDirectStream[String, Array[Byte]]( - ssc, - LocationStrategies.PreferConsistent, - ConsumerStrategies.Subscribe[String, Array[Byte]](topicsSet, kafkaParams) + ssc, + LocationStrategies.PreferConsistent, + ConsumerStrategies.Subscribe[String, Array[Byte]](topicsSet, kafkaParams) ); //.repartition(Integer.parseInt(props.getProperty("app.processing_parallelism"))); diff --git a/src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala b/src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala index 10cb9e4..14bb4f9 100644 --- a/src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala +++ b/src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala @@ -26,21 +26,22 @@ class OpenTSDBOutput extends Serializable { { var body: String = "["; for(metricData <- generatorData.getData()) { - for(dataPoint <- metricData.getDatapoints()) { + for(dataPoint <- metricData.getDatapoints()) { val metric = metricData.getMetric(); val value = dataPoint.getValue(); val timestamp = dataPoint.getTimestamp(); val generator_id = generatorData.getGenerator(); + val organization_id = generatorData.getOrganization(); val region = generatorData.getRegion(); val state = generatorData.getState; val pstring = f"""{ "metric": "$metric", "value": $value, "timestamp": $timestamp, - "tags": {"generator": $generator_id, "state": "$state", "region": "$region"} + "tags": {"generator": $generator_id, "organization": $organization_id, "state": "$state", "region": "$region"} },""".stripMargin; body = body.concat(pstring); - } + } } body = body.dropRight(1); body = body.concat("]"); diff --git a/src/main/scala/com/powerpanel/kso/model/DataPackage.java b/src/main/scala/com/powerpanel/kso/model/DataPackage.java index 55cad87..47b0745 100644 --- a/src/main/scala/com/powerpanel/kso/model/DataPackage.java +++ b/src/main/scala/com/powerpanel/kso/model/DataPackage.java @@ -13,8 +13,8 @@ import org.apache.avro.message.SchemaStore; @SuppressWarnings("all") @org.apache.avro.specific.AvroGenerated public class DataPackage extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - private static final long serialVersionUID = -2765501079323175813L; - public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"DataPackage\",\"namespace\":\"com.powerpanel.kso.model\",\"fields\":[{\"name\":\"generator\",\"type\":\"int\"},{\"name\":\"region\",\"type\":{\"type\":\"enum\",\"name\":\"Region\",\"symbols\":[\"NORTHEAST\",\"SOUTHEAST\",\"WEST\",\"MIDWEST\"]}},{\"name\":\"state\",\"type\":{\"type\":\"enum\",\"name\":\"State\",\"symbols\":[\"AL\",\"AK\",\"AZ\",\"AR\",\"CA\",\"CO\",\"CT\",\"DE\",\"FL\",\"GA\",\"HI\",\"ID\",\"IL\",\"IN\",\"IA\",\"KS\",\"KY\",\"LA\",\"ME\",\"MD\",\"MA\",\"MI\",\"MN\",\"MS\",\"MO\",\"MT\",\"NE\",\"NV\",\"NH\",\"NJ\",\"NM\",\"NY\",\"NC\",\"ND\",\"OH\",\"OK\",\"OR\",\"PA\",\"RI\",\"SC\",\"SD\",\"TN\",\"TX\",\"UT\",\"VT\",\"VA\",\"WA\",\"WV\",\"WI\",\"WY\"]}},{\"name\":\"data\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"MetricData\",\"fields\":[{\"name\":\"metric\",\"type\":\"string\"},{\"name\":\"datapoints\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"DataPoint\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"value\",\"type\":[\"long\",\"double\"]}]}}}]}}}]}"); + private static final long serialVersionUID = 7754366959731319639L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"DataPackage\",\"namespace\":\"com.powerpanel.kso.model\",\"fields\":[{\"name\":\"generator\",\"type\":\"int\"},{\"name\":\"organization\",\"type\":\"int\"},{\"name\":\"region\",\"type\":{\"type\":\"enum\",\"name\":\"Region\",\"symbols\":[\"NORTHEAST\",\"SOUTHEAST\",\"WEST\",\"MIDWEST\"]}},{\"name\":\"state\",\"type\":{\"type\":\"enum\",\"name\":\"State\",\"symbols\":[\"AL\",\"AK\",\"AZ\",\"AR\",\"CA\",\"CO\",\"CT\",\"DE\",\"FL\",\"GA\",\"HI\",\"ID\",\"IL\",\"IN\",\"IA\",\"KS\",\"KY\",\"LA\",\"ME\",\"MD\",\"MA\",\"MI\",\"MN\",\"MS\",\"MO\",\"MT\",\"NE\",\"NV\",\"NH\",\"NJ\",\"NM\",\"NY\",\"NC\",\"ND\",\"OH\",\"OK\",\"OR\",\"PA\",\"RI\",\"SC\",\"SD\",\"TN\",\"TX\",\"UT\",\"VT\",\"VA\",\"WA\",\"WV\",\"WI\",\"WY\"]}},{\"name\":\"data\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"MetricData\",\"fields\":[{\"name\":\"metric\",\"type\":\"string\"},{\"name\":\"datapoints\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"DataPoint\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"value\",\"type\":[\"long\",\"double\"]}]}}}]}}}]}"); public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } private static SpecificData MODEL$ = new SpecificData(); @@ -52,6 +52,7 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp } @Deprecated public int generator; + @Deprecated public int organization; @Deprecated public com.powerpanel.kso.model.Region region; @Deprecated public com.powerpanel.kso.model.State state; @Deprecated public java.util.List data; @@ -66,12 +67,14 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp /** * All-args constructor. * @param generator The new value for generator + * @param organization The new value for organization * @param region The new value for region * @param state The new value for state * @param data The new value for data */ - public DataPackage(java.lang.Integer generator, com.powerpanel.kso.model.Region region, com.powerpanel.kso.model.State state, java.util.List data) { + public DataPackage(java.lang.Integer generator, java.lang.Integer organization, com.powerpanel.kso.model.Region region, com.powerpanel.kso.model.State state, java.util.List data) { this.generator = generator; + this.organization = organization; this.region = region; this.state = state; this.data = data; @@ -82,9 +85,10 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp public java.lang.Object get(int field$) { switch (field$) { case 0: return generator; - case 1: return region; - case 2: return state; - case 3: return data; + case 1: return organization; + case 2: return region; + case 3: return state; + case 4: return data; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } } @@ -94,9 +98,10 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp public void put(int field$, java.lang.Object value$) { switch (field$) { case 0: generator = (java.lang.Integer)value$; break; - case 1: region = (com.powerpanel.kso.model.Region)value$; break; - case 2: state = (com.powerpanel.kso.model.State)value$; break; - case 3: data = (java.util.List)value$; break; + case 1: organization = (java.lang.Integer)value$; break; + case 2: region = (com.powerpanel.kso.model.Region)value$; break; + case 3: state = (com.powerpanel.kso.model.State)value$; break; + case 4: data = (java.util.List)value$; break; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } } @@ -117,6 +122,22 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp this.generator = value; } + /** + * Gets the value of the 'organization' field. + * @return The value of the 'organization' field. + */ + public java.lang.Integer getOrganization() { + return organization; + } + + /** + * Sets the value of the 'organization' field. + * @param value the value to set. + */ + public void setOrganization(java.lang.Integer value) { + this.organization = value; + } + /** * Gets the value of the 'region' field. * @return The value of the 'region' field. @@ -198,6 +219,7 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp implements org.apache.avro.data.RecordBuilder { private int generator; + private int organization; private com.powerpanel.kso.model.Region region; private com.powerpanel.kso.model.State state; private java.util.List data; @@ -217,18 +239,22 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp this.generator = data().deepCopy(fields()[0].schema(), other.generator); fieldSetFlags()[0] = true; } - if (isValidValue(fields()[1], other.region)) { - this.region = data().deepCopy(fields()[1].schema(), other.region); + if (isValidValue(fields()[1], other.organization)) { + this.organization = data().deepCopy(fields()[1].schema(), other.organization); fieldSetFlags()[1] = true; } - if (isValidValue(fields()[2], other.state)) { - this.state = data().deepCopy(fields()[2].schema(), other.state); + if (isValidValue(fields()[2], other.region)) { + this.region = data().deepCopy(fields()[2].schema(), other.region); fieldSetFlags()[2] = true; } - if (isValidValue(fields()[3], other.data)) { - this.data = data().deepCopy(fields()[3].schema(), other.data); + if (isValidValue(fields()[3], other.state)) { + this.state = data().deepCopy(fields()[3].schema(), other.state); fieldSetFlags()[3] = true; } + if (isValidValue(fields()[4], other.data)) { + this.data = data().deepCopy(fields()[4].schema(), other.data); + fieldSetFlags()[4] = true; + } } /** @@ -241,18 +267,22 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp this.generator = data().deepCopy(fields()[0].schema(), other.generator); fieldSetFlags()[0] = true; } - if (isValidValue(fields()[1], other.region)) { - this.region = data().deepCopy(fields()[1].schema(), other.region); + if (isValidValue(fields()[1], other.organization)) { + this.organization = data().deepCopy(fields()[1].schema(), other.organization); fieldSetFlags()[1] = true; } - if (isValidValue(fields()[2], other.state)) { - this.state = data().deepCopy(fields()[2].schema(), other.state); + if (isValidValue(fields()[2], other.region)) { + this.region = data().deepCopy(fields()[2].schema(), other.region); fieldSetFlags()[2] = true; } - if (isValidValue(fields()[3], other.data)) { - this.data = data().deepCopy(fields()[3].schema(), other.data); + if (isValidValue(fields()[3], other.state)) { + this.state = data().deepCopy(fields()[3].schema(), other.state); fieldSetFlags()[3] = true; } + if (isValidValue(fields()[4], other.data)) { + this.data = data().deepCopy(fields()[4].schema(), other.data); + fieldSetFlags()[4] = true; + } } /** @@ -293,6 +323,44 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp return this; } + /** + * Gets the value of the 'organization' field. + * @return The value. + */ + public java.lang.Integer getOrganization() { + return organization; + } + + /** + * Sets the value of the 'organization' field. + * @param value The value of 'organization'. + * @return This builder. + */ + public com.powerpanel.kso.model.DataPackage.Builder setOrganization(int value) { + validate(fields()[1], value); + this.organization = value; + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'organization' field has been set. + * @return True if the 'organization' field has been set, false otherwise. + */ + public boolean hasOrganization() { + return fieldSetFlags()[1]; + } + + + /** + * Clears the value of the 'organization' field. + * @return This builder. + */ + public com.powerpanel.kso.model.DataPackage.Builder clearOrganization() { + fieldSetFlags()[1] = false; + return this; + } + /** * Gets the value of the 'region' field. * @return The value. @@ -307,9 +375,9 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp * @return This builder. */ public com.powerpanel.kso.model.DataPackage.Builder setRegion(com.powerpanel.kso.model.Region value) { - validate(fields()[1], value); + validate(fields()[2], value); this.region = value; - fieldSetFlags()[1] = true; + fieldSetFlags()[2] = true; return this; } @@ -318,7 +386,7 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp * @return True if the 'region' field has been set, false otherwise. */ public boolean hasRegion() { - return fieldSetFlags()[1]; + return fieldSetFlags()[2]; } @@ -328,7 +396,7 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp */ public com.powerpanel.kso.model.DataPackage.Builder clearRegion() { region = null; - fieldSetFlags()[1] = false; + fieldSetFlags()[2] = false; return this; } @@ -346,9 +414,9 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp * @return This builder. */ public com.powerpanel.kso.model.DataPackage.Builder setState(com.powerpanel.kso.model.State value) { - validate(fields()[2], value); + validate(fields()[3], value); this.state = value; - fieldSetFlags()[2] = true; + fieldSetFlags()[3] = true; return this; } @@ -357,7 +425,7 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp * @return True if the 'state' field has been set, false otherwise. */ public boolean hasState() { - return fieldSetFlags()[2]; + return fieldSetFlags()[3]; } @@ -367,7 +435,7 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp */ public com.powerpanel.kso.model.DataPackage.Builder clearState() { state = null; - fieldSetFlags()[2] = false; + fieldSetFlags()[3] = false; return this; } @@ -385,9 +453,9 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp * @return This builder. */ public com.powerpanel.kso.model.DataPackage.Builder setData(java.util.List value) { - validate(fields()[3], value); + validate(fields()[4], value); this.data = value; - fieldSetFlags()[3] = true; + fieldSetFlags()[4] = true; return this; } @@ -396,7 +464,7 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp * @return True if the 'data' field has been set, false otherwise. */ public boolean hasData() { - return fieldSetFlags()[3]; + return fieldSetFlags()[4]; } @@ -406,7 +474,7 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp */ public com.powerpanel.kso.model.DataPackage.Builder clearData() { data = null; - fieldSetFlags()[3] = false; + fieldSetFlags()[4] = false; return this; } @@ -416,9 +484,10 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp try { DataPackage record = new DataPackage(); record.generator = fieldSetFlags()[0] ? this.generator : (java.lang.Integer) defaultValue(fields()[0]); - record.region = fieldSetFlags()[1] ? this.region : (com.powerpanel.kso.model.Region) defaultValue(fields()[1]); - record.state = fieldSetFlags()[2] ? this.state : (com.powerpanel.kso.model.State) defaultValue(fields()[2]); - record.data = fieldSetFlags()[3] ? this.data : (java.util.List) defaultValue(fields()[3]); + record.organization = fieldSetFlags()[1] ? this.organization : (java.lang.Integer) defaultValue(fields()[1]); + record.region = fieldSetFlags()[2] ? this.region : (com.powerpanel.kso.model.Region) defaultValue(fields()[2]); + record.state = fieldSetFlags()[3] ? this.state : (com.powerpanel.kso.model.State) defaultValue(fields()[3]); + record.data = fieldSetFlags()[4] ? this.data : (java.util.List) defaultValue(fields()[4]); return record; } catch (java.lang.Exception e) { throw new org.apache.avro.AvroRuntimeException(e);