Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Implementing organization id for datapoints
  • Loading branch information
Evan Langlais committed Mar 13, 2019
1 parent 7839968 commit 93d489b
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 44 deletions.
4 changes: 4 additions & 0 deletions src/main/resources/datapackage.avsc
Expand Up @@ -7,6 +7,10 @@
"type": "int",
"name": "generator"
},
{
"type": "int",
"name": "organization"
},
{
"name": "region",
"type": {
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/com/powerpanel/kso/KafkaInput.scala
Expand Up @@ -15,18 +15,18 @@ class KafkaInput extends Serializable {
val props = AppConfig.loadProperties();
val topicsSet = props.getProperty("kafka.topic").split(",").toSet;

val kafkaParams = collection.mutable.Map[String, String]("bootstrap.servers" -> props.getProperty("kafka.brokers"))
val kafkaParams = collection.mutable.Map[String, String]("bootstrap.servers" -> props.getProperty("kafka.brokers"))
if (props.getProperty("kafka.consume_from_beginning").toBoolean)
{
kafkaParams.put("auto.offset.reset", "earliest");
}
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
kafkaParams.put("group.id", props.getProperty("kafka.group_id"));
kafkaParams.put("group.id", props.getProperty("kafka.group_id"));
val messages = KafkaUtils.createDirectStream[String, Array[Byte]](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, Array[Byte]](topicsSet, kafkaParams)
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, Array[Byte]](topicsSet, kafkaParams)
);
//.repartition(Integer.parseInt(props.getProperty("app.processing_parallelism")));

Expand Down
7 changes: 4 additions & 3 deletions src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala
Expand Up @@ -26,21 +26,22 @@ class OpenTSDBOutput extends Serializable {
{
var body: String = "[";
for(metricData <- generatorData.getData()) {
for(dataPoint <- metricData.getDatapoints()) {
for(dataPoint <- metricData.getDatapoints()) {
val metric = metricData.getMetric();
val value = dataPoint.getValue();
val timestamp = dataPoint.getTimestamp();
val generator_id = generatorData.getGenerator();
val organization_id = generatorData.getOrganization();
val region = generatorData.getRegion();
val state = generatorData.getState;
val pstring = f"""{
"metric": "$metric",
"value": $value,
"timestamp": $timestamp,
"tags": {"generator": $generator_id, "state": "$state", "region": "$region"}
"tags": {"generator": $generator_id, "organization": $organization_id, "state": "$state", "region": "$region"}
},""".stripMargin;
body = body.concat(pstring);
}
}
}
body = body.dropRight(1);
body = body.concat("]");
Expand Down
141 changes: 105 additions & 36 deletions src/main/scala/com/powerpanel/kso/model/DataPackage.java
Expand Up @@ -13,8 +13,8 @@ import org.apache.avro.message.SchemaStore;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class DataPackage extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
private static final long serialVersionUID = -2765501079323175813L;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"DataPackage\",\"namespace\":\"com.powerpanel.kso.model\",\"fields\":[{\"name\":\"generator\",\"type\":\"int\"},{\"name\":\"region\",\"type\":{\"type\":\"enum\",\"name\":\"Region\",\"symbols\":[\"NORTHEAST\",\"SOUTHEAST\",\"WEST\",\"MIDWEST\"]}},{\"name\":\"state\",\"type\":{\"type\":\"enum\",\"name\":\"State\",\"symbols\":[\"AL\",\"AK\",\"AZ\",\"AR\",\"CA\",\"CO\",\"CT\",\"DE\",\"FL\",\"GA\",\"HI\",\"ID\",\"IL\",\"IN\",\"IA\",\"KS\",\"KY\",\"LA\",\"ME\",\"MD\",\"MA\",\"MI\",\"MN\",\"MS\",\"MO\",\"MT\",\"NE\",\"NV\",\"NH\",\"NJ\",\"NM\",\"NY\",\"NC\",\"ND\",\"OH\",\"OK\",\"OR\",\"PA\",\"RI\",\"SC\",\"SD\",\"TN\",\"TX\",\"UT\",\"VT\",\"VA\",\"WA\",\"WV\",\"WI\",\"WY\"]}},{\"name\":\"data\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"MetricData\",\"fields\":[{\"name\":\"metric\",\"type\":\"string\"},{\"name\":\"datapoints\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"DataPoint\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"value\",\"type\":[\"long\",\"double\"]}]}}}]}}}]}");
private static final long serialVersionUID = 7754366959731319639L;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"DataPackage\",\"namespace\":\"com.powerpanel.kso.model\",\"fields\":[{\"name\":\"generator\",\"type\":\"int\"},{\"name\":\"organization\",\"type\":\"int\"},{\"name\":\"region\",\"type\":{\"type\":\"enum\",\"name\":\"Region\",\"symbols\":[\"NORTHEAST\",\"SOUTHEAST\",\"WEST\",\"MIDWEST\"]}},{\"name\":\"state\",\"type\":{\"type\":\"enum\",\"name\":\"State\",\"symbols\":[\"AL\",\"AK\",\"AZ\",\"AR\",\"CA\",\"CO\",\"CT\",\"DE\",\"FL\",\"GA\",\"HI\",\"ID\",\"IL\",\"IN\",\"IA\",\"KS\",\"KY\",\"LA\",\"ME\",\"MD\",\"MA\",\"MI\",\"MN\",\"MS\",\"MO\",\"MT\",\"NE\",\"NV\",\"NH\",\"NJ\",\"NM\",\"NY\",\"NC\",\"ND\",\"OH\",\"OK\",\"OR\",\"PA\",\"RI\",\"SC\",\"SD\",\"TN\",\"TX\",\"UT\",\"VT\",\"VA\",\"WA\",\"WV\",\"WI\",\"WY\"]}},{\"name\":\"data\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"MetricData\",\"fields\":[{\"name\":\"metric\",\"type\":\"string\"},{\"name\":\"datapoints\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"DataPoint\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"value\",\"type\":[\"long\",\"double\"]}]}}}]}}}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }

private static SpecificData MODEL$ = new SpecificData();
Expand Down Expand Up @@ -52,6 +52,7 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
}

@Deprecated public int generator;
@Deprecated public int organization;
@Deprecated public com.powerpanel.kso.model.Region region;
@Deprecated public com.powerpanel.kso.model.State state;
@Deprecated public java.util.List<com.powerpanel.kso.model.MetricData> data;
Expand All @@ -66,12 +67,14 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
/**
* All-args constructor.
* @param generator The new value for generator
* @param organization The new value for organization
* @param region The new value for region
* @param state The new value for state
* @param data The new value for data
*/
public DataPackage(java.lang.Integer generator, com.powerpanel.kso.model.Region region, com.powerpanel.kso.model.State state, java.util.List<com.powerpanel.kso.model.MetricData> data) {
public DataPackage(java.lang.Integer generator, java.lang.Integer organization, com.powerpanel.kso.model.Region region, com.powerpanel.kso.model.State state, java.util.List<com.powerpanel.kso.model.MetricData> data) {
this.generator = generator;
this.organization = organization;
this.region = region;
this.state = state;
this.data = data;
Expand All @@ -82,9 +85,10 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
public java.lang.Object get(int field$) {
switch (field$) {
case 0: return generator;
case 1: return region;
case 2: return state;
case 3: return data;
case 1: return organization;
case 2: return region;
case 3: return state;
case 4: return data;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
Expand All @@ -94,9 +98,10 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
public void put(int field$, java.lang.Object value$) {
switch (field$) {
case 0: generator = (java.lang.Integer)value$; break;
case 1: region = (com.powerpanel.kso.model.Region)value$; break;
case 2: state = (com.powerpanel.kso.model.State)value$; break;
case 3: data = (java.util.List<com.powerpanel.kso.model.MetricData>)value$; break;
case 1: organization = (java.lang.Integer)value$; break;
case 2: region = (com.powerpanel.kso.model.Region)value$; break;
case 3: state = (com.powerpanel.kso.model.State)value$; break;
case 4: data = (java.util.List<com.powerpanel.kso.model.MetricData>)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
Expand All @@ -117,6 +122,22 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
this.generator = value;
}

/**
* Gets the value of the 'organization' field.
* @return The value of the 'organization' field.
*/
public java.lang.Integer getOrganization() {
return organization;
}

/**
* Sets the value of the 'organization' field.
* @param value the value to set.
*/
public void setOrganization(java.lang.Integer value) {
this.organization = value;
}

/**
* Gets the value of the 'region' field.
* @return The value of the 'region' field.
Expand Down Expand Up @@ -198,6 +219,7 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
implements org.apache.avro.data.RecordBuilder<DataPackage> {

private int generator;
private int organization;
private com.powerpanel.kso.model.Region region;
private com.powerpanel.kso.model.State state;
private java.util.List<com.powerpanel.kso.model.MetricData> data;
Expand All @@ -217,18 +239,22 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
this.generator = data().deepCopy(fields()[0].schema(), other.generator);
fieldSetFlags()[0] = true;
}
if (isValidValue(fields()[1], other.region)) {
this.region = data().deepCopy(fields()[1].schema(), other.region);
if (isValidValue(fields()[1], other.organization)) {
this.organization = data().deepCopy(fields()[1].schema(), other.organization);
fieldSetFlags()[1] = true;
}
if (isValidValue(fields()[2], other.state)) {
this.state = data().deepCopy(fields()[2].schema(), other.state);
if (isValidValue(fields()[2], other.region)) {
this.region = data().deepCopy(fields()[2].schema(), other.region);
fieldSetFlags()[2] = true;
}
if (isValidValue(fields()[3], other.data)) {
this.data = data().deepCopy(fields()[3].schema(), other.data);
if (isValidValue(fields()[3], other.state)) {
this.state = data().deepCopy(fields()[3].schema(), other.state);
fieldSetFlags()[3] = true;
}
if (isValidValue(fields()[4], other.data)) {
this.data = data().deepCopy(fields()[4].schema(), other.data);
fieldSetFlags()[4] = true;
}
}

/**
Expand All @@ -241,18 +267,22 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
this.generator = data().deepCopy(fields()[0].schema(), other.generator);
fieldSetFlags()[0] = true;
}
if (isValidValue(fields()[1], other.region)) {
this.region = data().deepCopy(fields()[1].schema(), other.region);
if (isValidValue(fields()[1], other.organization)) {
this.organization = data().deepCopy(fields()[1].schema(), other.organization);
fieldSetFlags()[1] = true;
}
if (isValidValue(fields()[2], other.state)) {
this.state = data().deepCopy(fields()[2].schema(), other.state);
if (isValidValue(fields()[2], other.region)) {
this.region = data().deepCopy(fields()[2].schema(), other.region);
fieldSetFlags()[2] = true;
}
if (isValidValue(fields()[3], other.data)) {
this.data = data().deepCopy(fields()[3].schema(), other.data);
if (isValidValue(fields()[3], other.state)) {
this.state = data().deepCopy(fields()[3].schema(), other.state);
fieldSetFlags()[3] = true;
}
if (isValidValue(fields()[4], other.data)) {
this.data = data().deepCopy(fields()[4].schema(), other.data);
fieldSetFlags()[4] = true;
}
}

/**
Expand Down Expand Up @@ -293,6 +323,44 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
return this;
}

/**
* Gets the value of the 'organization' field.
* @return The value.
*/
public java.lang.Integer getOrganization() {
return organization;
}

/**
* Sets the value of the 'organization' field.
* @param value The value of 'organization'.
* @return This builder.
*/
public com.powerpanel.kso.model.DataPackage.Builder setOrganization(int value) {
validate(fields()[1], value);
this.organization = value;
fieldSetFlags()[1] = true;
return this;
}

/**
* Checks whether the 'organization' field has been set.
* @return True if the 'organization' field has been set, false otherwise.
*/
public boolean hasOrganization() {
return fieldSetFlags()[1];
}


/**
* Clears the value of the 'organization' field.
* @return This builder.
*/
public com.powerpanel.kso.model.DataPackage.Builder clearOrganization() {
fieldSetFlags()[1] = false;
return this;
}

/**
* Gets the value of the 'region' field.
* @return The value.
Expand All @@ -307,9 +375,9 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
* @return This builder.
*/
public com.powerpanel.kso.model.DataPackage.Builder setRegion(com.powerpanel.kso.model.Region value) {
validate(fields()[1], value);
validate(fields()[2], value);
this.region = value;
fieldSetFlags()[1] = true;
fieldSetFlags()[2] = true;
return this;
}

Expand All @@ -318,7 +386,7 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
* @return True if the 'region' field has been set, false otherwise.
*/
public boolean hasRegion() {
return fieldSetFlags()[1];
return fieldSetFlags()[2];
}


Expand All @@ -328,7 +396,7 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
*/
public com.powerpanel.kso.model.DataPackage.Builder clearRegion() {
region = null;
fieldSetFlags()[1] = false;
fieldSetFlags()[2] = false;
return this;
}

Expand All @@ -346,9 +414,9 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
* @return This builder.
*/
public com.powerpanel.kso.model.DataPackage.Builder setState(com.powerpanel.kso.model.State value) {
validate(fields()[2], value);
validate(fields()[3], value);
this.state = value;
fieldSetFlags()[2] = true;
fieldSetFlags()[3] = true;
return this;
}

Expand All @@ -357,7 +425,7 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
* @return True if the 'state' field has been set, false otherwise.
*/
public boolean hasState() {
return fieldSetFlags()[2];
return fieldSetFlags()[3];
}


Expand All @@ -367,7 +435,7 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
*/
public com.powerpanel.kso.model.DataPackage.Builder clearState() {
state = null;
fieldSetFlags()[2] = false;
fieldSetFlags()[3] = false;
return this;
}

Expand All @@ -385,9 +453,9 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
* @return This builder.
*/
public com.powerpanel.kso.model.DataPackage.Builder setData(java.util.List<com.powerpanel.kso.model.MetricData> value) {
validate(fields()[3], value);
validate(fields()[4], value);
this.data = value;
fieldSetFlags()[3] = true;
fieldSetFlags()[4] = true;
return this;
}

Expand All @@ -396,7 +464,7 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
* @return True if the 'data' field has been set, false otherwise.
*/
public boolean hasData() {
return fieldSetFlags()[3];
return fieldSetFlags()[4];
}


Expand All @@ -406,7 +474,7 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
*/
public com.powerpanel.kso.model.DataPackage.Builder clearData() {
data = null;
fieldSetFlags()[3] = false;
fieldSetFlags()[4] = false;
return this;
}

Expand All @@ -416,9 +484,10 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
try {
DataPackage record = new DataPackage();
record.generator = fieldSetFlags()[0] ? this.generator : (java.lang.Integer) defaultValue(fields()[0]);
record.region = fieldSetFlags()[1] ? this.region : (com.powerpanel.kso.model.Region) defaultValue(fields()[1]);
record.state = fieldSetFlags()[2] ? this.state : (com.powerpanel.kso.model.State) defaultValue(fields()[2]);
record.data = fieldSetFlags()[3] ? this.data : (java.util.List<com.powerpanel.kso.model.MetricData>) defaultValue(fields()[3]);
record.organization = fieldSetFlags()[1] ? this.organization : (java.lang.Integer) defaultValue(fields()[1]);
record.region = fieldSetFlags()[2] ? this.region : (com.powerpanel.kso.model.Region) defaultValue(fields()[2]);
record.state = fieldSetFlags()[3] ? this.state : (com.powerpanel.kso.model.State) defaultValue(fields()[3]);
record.data = fieldSetFlags()[4] ? this.data : (java.util.List<com.powerpanel.kso.model.MetricData>) defaultValue(fields()[4]);
return record;
} catch (java.lang.Exception e) {
throw new org.apache.avro.AvroRuntimeException(e);
Expand Down

0 comments on commit 93d489b

Please sign in to comment.