Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Attempting to construct objects a bit differently
  • Loading branch information
Evan Langlais committed Mar 6, 2019
1 parent 4962f78 commit a974cb6
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 79 deletions.
4 changes: 0 additions & 4 deletions src/main/resources/datapackage.avsc
Expand Up @@ -59,10 +59,6 @@
]
}
}
},
{
"name": "rawdata",
"type": "bytes"
}
]
}
14 changes: 12 additions & 2 deletions src/main/scala/com/powerpanel/kso/KafkaInput.scala
Expand Up @@ -6,6 +6,10 @@ import com.powerpanel.kso.model._
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.ConsumerStrategies
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput}
import scala.collection.JavaConverters._
// import org.apache.avro.generic.DatumReader
import org.apache.spark.streaming.StreamingContext

class KafkaInput extends Serializable {
Expand All @@ -32,9 +36,15 @@ class KafkaInput extends Serializable {
// val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc");
val rawMessages = messages.map(x => {
// val eventDecoder = new DataPlatformEventDecoder(avroSchemaString);
val dataPackageDecoder = DataPackage.getDecoder();
// val list = dataFileReader.iterator().asScala.toList
// DataFileReader<User> dataFileReader = new DataFileReader<User>(file, userDatumReader);
// val dataPackageDecoder = DataPackage.getDecoder();
val payload = x.value();
val dataPackage = dataPackageDecoder.decode(payload);
val datumReader = new SpecificDatumReader[DataPackage](DataPackage.getClassSchema)
val inputStream = new SeekableByteArrayInput(payload)
val dataFileReader = new DataFileReader[DataPackage](inputStream, datumReader)
val dataPackage = dataFileReader.next()
// val dataPackage = dataPackageDecoder.decode(payload);
dataPackage;
});
rawMessages;
Expand Down
76 changes: 3 additions & 73 deletions src/main/scala/com/powerpanel/kso/model/DataPackage.java
Expand Up @@ -13,8 +13,8 @@ import org.apache.avro.message.SchemaStore;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class DataPackage extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
private static final long serialVersionUID = -1480426181877249637L;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"DataPackage\",\"namespace\":\"com.powerpanel.kso.model\",\"fields\":[{\"name\":\"generator\",\"type\":\"int\"},{\"name\":\"region\",\"type\":{\"type\":\"enum\",\"name\":\"Region\",\"symbols\":[\"NORTHEAST\",\"SOUTHEAST\",\"WEST\",\"MIDWEST\"]}},{\"name\":\"state\",\"type\":{\"type\":\"enum\",\"name\":\"State\",\"symbols\":[\"AL\",\"AK\",\"AZ\",\"AR\",\"CA\",\"CO\",\"CT\",\"DE\",\"FL\",\"GA\",\"HI\",\"ID\",\"IL\",\"IN\",\"IA\",\"KS\",\"KY\",\"LA\",\"ME\",\"MD\",\"MA\",\"MI\",\"MN\",\"MS\",\"MO\",\"MT\",\"NE\",\"NV\",\"NH\",\"NJ\",\"NM\",\"NY\",\"NC\",\"ND\",\"OH\",\"OK\",\"OR\",\"PA\",\"RI\",\"SC\",\"SD\",\"TN\",\"TX\",\"UT\",\"VT\",\"VA\",\"WA\",\"WV\",\"WI\",\"WY\"]}},{\"name\":\"data\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"MetricData\",\"fields\":[{\"name\":\"metric\",\"type\":\"string\"},{\"name\":\"datapoints\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"DataPoint\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"value\",\"type\":[\"long\",\"double\"]}]}}}]}}},{\"name\":\"rawdata\",\"type\":\"bytes\"}]}");
private static final long serialVersionUID = -2765501079323175813L;
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"DataPackage\",\"namespace\":\"com.powerpanel.kso.model\",\"fields\":[{\"name\":\"generator\",\"type\":\"int\"},{\"name\":\"region\",\"type\":{\"type\":\"enum\",\"name\":\"Region\",\"symbols\":[\"NORTHEAST\",\"SOUTHEAST\",\"WEST\",\"MIDWEST\"]}},{\"name\":\"state\",\"type\":{\"type\":\"enum\",\"name\":\"State\",\"symbols\":[\"AL\",\"AK\",\"AZ\",\"AR\",\"CA\",\"CO\",\"CT\",\"DE\",\"FL\",\"GA\",\"HI\",\"ID\",\"IL\",\"IN\",\"IA\",\"KS\",\"KY\",\"LA\",\"ME\",\"MD\",\"MA\",\"MI\",\"MN\",\"MS\",\"MO\",\"MT\",\"NE\",\"NV\",\"NH\",\"NJ\",\"NM\",\"NY\",\"NC\",\"ND\",\"OH\",\"OK\",\"OR\",\"PA\",\"RI\",\"SC\",\"SD\",\"TN\",\"TX\",\"UT\",\"VT\",\"VA\",\"WA\",\"WV\",\"WI\",\"WY\"]}},{\"name\":\"data\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"MetricData\",\"fields\":[{\"name\":\"metric\",\"type\":\"string\"},{\"name\":\"datapoints\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"DataPoint\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"value\",\"type\":[\"long\",\"double\"]}]}}}]}}}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }

private static SpecificData MODEL$ = new SpecificData();
Expand Down Expand Up @@ -55,7 +55,6 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
@Deprecated public com.powerpanel.kso.model.Region region;
@Deprecated public com.powerpanel.kso.model.State state;
@Deprecated public java.util.List<com.powerpanel.kso.model.MetricData> data;
@Deprecated public java.nio.ByteBuffer rawdata;

/**
* Default constructor. Note that this does not initialize fields
Expand All @@ -70,14 +69,12 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
* @param region The new value for region
* @param state The new value for state
* @param data The new value for data
* @param rawdata The new value for rawdata
*/
public DataPackage(java.lang.Integer generator, com.powerpanel.kso.model.Region region, com.powerpanel.kso.model.State state, java.util.List<com.powerpanel.kso.model.MetricData> data, java.nio.ByteBuffer rawdata) {
public DataPackage(java.lang.Integer generator, com.powerpanel.kso.model.Region region, com.powerpanel.kso.model.State state, java.util.List<com.powerpanel.kso.model.MetricData> data) {
this.generator = generator;
this.region = region;
this.state = state;
this.data = data;
this.rawdata = rawdata;
}

public org.apache.avro.Schema getSchema() { return SCHEMA$; }
Expand All @@ -88,7 +85,6 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
case 1: return region;
case 2: return state;
case 3: return data;
case 4: return rawdata;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
Expand All @@ -101,7 +97,6 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
case 1: region = (com.powerpanel.kso.model.Region)value$; break;
case 2: state = (com.powerpanel.kso.model.State)value$; break;
case 3: data = (java.util.List<com.powerpanel.kso.model.MetricData>)value$; break;
case 4: rawdata = (java.nio.ByteBuffer)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
Expand Down Expand Up @@ -170,22 +165,6 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
this.data = value;
}

/**
* Gets the value of the 'rawdata' field.
* @return The value of the 'rawdata' field.
*/
public java.nio.ByteBuffer getRawdata() {
return rawdata;
}

/**
* Sets the value of the 'rawdata' field.
* @param value the value to set.
*/
public void setRawdata(java.nio.ByteBuffer value) {
this.rawdata = value;
}

/**
* Creates a new DataPackage RecordBuilder.
* @return A new DataPackage RecordBuilder
Expand Down Expand Up @@ -222,7 +201,6 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
private com.powerpanel.kso.model.Region region;
private com.powerpanel.kso.model.State state;
private java.util.List<com.powerpanel.kso.model.MetricData> data;
private java.nio.ByteBuffer rawdata;

/** Creates a new Builder */
private Builder() {
Expand Down Expand Up @@ -251,10 +229,6 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
this.data = data().deepCopy(fields()[3].schema(), other.data);
fieldSetFlags()[3] = true;
}
if (isValidValue(fields()[4], other.rawdata)) {
this.rawdata = data().deepCopy(fields()[4].schema(), other.rawdata);
fieldSetFlags()[4] = true;
}
}

/**
Expand All @@ -279,10 +253,6 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
this.data = data().deepCopy(fields()[3].schema(), other.data);
fieldSetFlags()[3] = true;
}
if (isValidValue(fields()[4], other.rawdata)) {
this.rawdata = data().deepCopy(fields()[4].schema(), other.rawdata);
fieldSetFlags()[4] = true;
}
}

/**
Expand Down Expand Up @@ -440,45 +410,6 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
return this;
}

/**
* Gets the value of the 'rawdata' field.
* @return The value.
*/
public java.nio.ByteBuffer getRawdata() {
return rawdata;
}

/**
* Sets the value of the 'rawdata' field.
* @param value The value of 'rawdata'.
* @return This builder.
*/
public com.powerpanel.kso.model.DataPackage.Builder setRawdata(java.nio.ByteBuffer value) {
validate(fields()[4], value);
this.rawdata = value;
fieldSetFlags()[4] = true;
return this;
}

/**
* Checks whether the 'rawdata' field has been set.
* @return True if the 'rawdata' field has been set, false otherwise.
*/
public boolean hasRawdata() {
return fieldSetFlags()[4];
}


/**
* Clears the value of the 'rawdata' field.
* @return This builder.
*/
public com.powerpanel.kso.model.DataPackage.Builder clearRawdata() {
rawdata = null;
fieldSetFlags()[4] = false;
return this;
}

@Override
@SuppressWarnings("unchecked")
public DataPackage build() {
Expand All @@ -488,7 +419,6 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp
record.region = fieldSetFlags()[1] ? this.region : (com.powerpanel.kso.model.Region) defaultValue(fields()[1]);
record.state = fieldSetFlags()[2] ? this.state : (com.powerpanel.kso.model.State) defaultValue(fields()[2]);
record.data = fieldSetFlags()[3] ? this.data : (java.util.List<com.powerpanel.kso.model.MetricData>) defaultValue(fields()[3]);
record.rawdata = fieldSetFlags()[4] ? this.rawdata : (java.nio.ByteBuffer) defaultValue(fields()[4]);
return record;
} catch (java.lang.Exception e) {
throw new org.apache.avro.AvroRuntimeException(e);
Expand Down

0 comments on commit a974cb6

Please sign in to comment.