diff --git a/src/main/resources/datapackage.avsc b/src/main/resources/datapackage.avsc index 6e98a07..948b7dc 100644 --- a/src/main/resources/datapackage.avsc +++ b/src/main/resources/datapackage.avsc @@ -59,10 +59,6 @@ ] } } - }, - { - "name": "rawdata", - "type": "bytes" } ] } \ No newline at end of file diff --git a/src/main/scala/com/powerpanel/kso/KafkaInput.scala b/src/main/scala/com/powerpanel/kso/KafkaInput.scala index 4ff587c..5da9f8a 100644 --- a/src/main/scala/com/powerpanel/kso/KafkaInput.scala +++ b/src/main/scala/com/powerpanel/kso/KafkaInput.scala @@ -6,6 +6,10 @@ import com.powerpanel.kso.model._ import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.kafka010.ConsumerStrategies import org.apache.spark.streaming.kafka010.LocationStrategies +import org.apache.avro.specific.SpecificDatumReader +import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput} +import scala.collection.JavaConverters._ +// import org.apache.avro.generic.DatumReader import org.apache.spark.streaming.StreamingContext class KafkaInput extends Serializable { @@ -32,9 +36,15 @@ class KafkaInput extends Serializable { // val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc"); val rawMessages = messages.map(x => { // val eventDecoder = new DataPlatformEventDecoder(avroSchemaString); - val dataPackageDecoder = DataPackage.getDecoder(); + // val list = dataFileReader.iterator().asScala.toList + // DataFileReader dataFileReader = new DataFileReader(file, userDatumReader); + // val dataPackageDecoder = DataPackage.getDecoder(); val payload = x.value(); - val dataPackage = dataPackageDecoder.decode(payload); + val datumReader = new SpecificDatumReader[DataPackage](DataPackage.getClassSchema) + val inputStream = new SeekableByteArrayInput(payload) + val dataFileReader = new DataFileReader[DataPackage](inputStream, datumReader) + val dataPackage = dataFileReader.next() + // val dataPackage = dataPackageDecoder.decode(payload); dataPackage; }); rawMessages; diff --git a/src/main/scala/com/powerpanel/kso/model/DataPackage.java b/src/main/scala/com/powerpanel/kso/model/DataPackage.java index 9864d9c..55cad87 100644 --- a/src/main/scala/com/powerpanel/kso/model/DataPackage.java +++ b/src/main/scala/com/powerpanel/kso/model/DataPackage.java @@ -13,8 +13,8 @@ import org.apache.avro.message.SchemaStore; @SuppressWarnings("all") @org.apache.avro.specific.AvroGenerated public class DataPackage extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - private static final long serialVersionUID = -1480426181877249637L; - public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"DataPackage\",\"namespace\":\"com.powerpanel.kso.model\",\"fields\":[{\"name\":\"generator\",\"type\":\"int\"},{\"name\":\"region\",\"type\":{\"type\":\"enum\",\"name\":\"Region\",\"symbols\":[\"NORTHEAST\",\"SOUTHEAST\",\"WEST\",\"MIDWEST\"]}},{\"name\":\"state\",\"type\":{\"type\":\"enum\",\"name\":\"State\",\"symbols\":[\"AL\",\"AK\",\"AZ\",\"AR\",\"CA\",\"CO\",\"CT\",\"DE\",\"FL\",\"GA\",\"HI\",\"ID\",\"IL\",\"IN\",\"IA\",\"KS\",\"KY\",\"LA\",\"ME\",\"MD\",\"MA\",\"MI\",\"MN\",\"MS\",\"MO\",\"MT\",\"NE\",\"NV\",\"NH\",\"NJ\",\"NM\",\"NY\",\"NC\",\"ND\",\"OH\",\"OK\",\"OR\",\"PA\",\"RI\",\"SC\",\"SD\",\"TN\",\"TX\",\"UT\",\"VT\",\"VA\",\"WA\",\"WV\",\"WI\",\"WY\"]}},{\"name\":\"data\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"MetricData\",\"fields\":[{\"name\":\"metric\",\"type\":\"string\"},{\"name\":\"datapoints\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"DataPoint\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"value\",\"type\":[\"long\",\"double\"]}]}}}]}}},{\"name\":\"rawdata\",\"type\":\"bytes\"}]}"); + private static final long serialVersionUID = -2765501079323175813L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"DataPackage\",\"namespace\":\"com.powerpanel.kso.model\",\"fields\":[{\"name\":\"generator\",\"type\":\"int\"},{\"name\":\"region\",\"type\":{\"type\":\"enum\",\"name\":\"Region\",\"symbols\":[\"NORTHEAST\",\"SOUTHEAST\",\"WEST\",\"MIDWEST\"]}},{\"name\":\"state\",\"type\":{\"type\":\"enum\",\"name\":\"State\",\"symbols\":[\"AL\",\"AK\",\"AZ\",\"AR\",\"CA\",\"CO\",\"CT\",\"DE\",\"FL\",\"GA\",\"HI\",\"ID\",\"IL\",\"IN\",\"IA\",\"KS\",\"KY\",\"LA\",\"ME\",\"MD\",\"MA\",\"MI\",\"MN\",\"MS\",\"MO\",\"MT\",\"NE\",\"NV\",\"NH\",\"NJ\",\"NM\",\"NY\",\"NC\",\"ND\",\"OH\",\"OK\",\"OR\",\"PA\",\"RI\",\"SC\",\"SD\",\"TN\",\"TX\",\"UT\",\"VT\",\"VA\",\"WA\",\"WV\",\"WI\",\"WY\"]}},{\"name\":\"data\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"MetricData\",\"fields\":[{\"name\":\"metric\",\"type\":\"string\"},{\"name\":\"datapoints\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"DataPoint\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"value\",\"type\":[\"long\",\"double\"]}]}}}]}}}]}"); public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } private static SpecificData MODEL$ = new SpecificData(); @@ -55,7 +55,6 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp @Deprecated public com.powerpanel.kso.model.Region region; @Deprecated public com.powerpanel.kso.model.State state; @Deprecated public java.util.List data; - @Deprecated public java.nio.ByteBuffer rawdata; /** * Default constructor. Note that this does not initialize fields @@ -70,14 +69,12 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp * @param region The new value for region * @param state The new value for state * @param data The new value for data - * @param rawdata The new value for rawdata */ - public DataPackage(java.lang.Integer generator, com.powerpanel.kso.model.Region region, com.powerpanel.kso.model.State state, java.util.List data, java.nio.ByteBuffer rawdata) { + public DataPackage(java.lang.Integer generator, com.powerpanel.kso.model.Region region, com.powerpanel.kso.model.State state, java.util.List data) { this.generator = generator; this.region = region; this.state = state; this.data = data; - this.rawdata = rawdata; } public org.apache.avro.Schema getSchema() { return SCHEMA$; } @@ -88,7 +85,6 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp case 1: return region; case 2: return state; case 3: return data; - case 4: return rawdata; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } } @@ -101,7 +97,6 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp case 1: region = (com.powerpanel.kso.model.Region)value$; break; case 2: state = (com.powerpanel.kso.model.State)value$; break; case 3: data = (java.util.List)value$; break; - case 4: rawdata = (java.nio.ByteBuffer)value$; break; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } } @@ -170,22 +165,6 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp this.data = value; } - /** - * Gets the value of the 'rawdata' field. - * @return The value of the 'rawdata' field. - */ - public java.nio.ByteBuffer getRawdata() { - return rawdata; - } - - /** - * Sets the value of the 'rawdata' field. - * @param value the value to set. - */ - public void setRawdata(java.nio.ByteBuffer value) { - this.rawdata = value; - } - /** * Creates a new DataPackage RecordBuilder. * @return A new DataPackage RecordBuilder @@ -222,7 +201,6 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp private com.powerpanel.kso.model.Region region; private com.powerpanel.kso.model.State state; private java.util.List data; - private java.nio.ByteBuffer rawdata; /** Creates a new Builder */ private Builder() { @@ -251,10 +229,6 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp this.data = data().deepCopy(fields()[3].schema(), other.data); fieldSetFlags()[3] = true; } - if (isValidValue(fields()[4], other.rawdata)) { - this.rawdata = data().deepCopy(fields()[4].schema(), other.rawdata); - fieldSetFlags()[4] = true; - } } /** @@ -279,10 +253,6 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp this.data = data().deepCopy(fields()[3].schema(), other.data); fieldSetFlags()[3] = true; } - if (isValidValue(fields()[4], other.rawdata)) { - this.rawdata = data().deepCopy(fields()[4].schema(), other.rawdata); - fieldSetFlags()[4] = true; - } } /** @@ -440,45 +410,6 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp return this; } - /** - * Gets the value of the 'rawdata' field. - * @return The value. - */ - public java.nio.ByteBuffer getRawdata() { - return rawdata; - } - - /** - * Sets the value of the 'rawdata' field. - * @param value The value of 'rawdata'. - * @return This builder. - */ - public com.powerpanel.kso.model.DataPackage.Builder setRawdata(java.nio.ByteBuffer value) { - validate(fields()[4], value); - this.rawdata = value; - fieldSetFlags()[4] = true; - return this; - } - - /** - * Checks whether the 'rawdata' field has been set. - * @return True if the 'rawdata' field has been set, false otherwise. - */ - public boolean hasRawdata() { - return fieldSetFlags()[4]; - } - - - /** - * Clears the value of the 'rawdata' field. - * @return This builder. - */ - public com.powerpanel.kso.model.DataPackage.Builder clearRawdata() { - rawdata = null; - fieldSetFlags()[4] = false; - return this; - } - @Override @SuppressWarnings("unchecked") public DataPackage build() { @@ -488,7 +419,6 @@ public class DataPackage extends org.apache.avro.specific.SpecificRecordBase imp record.region = fieldSetFlags()[1] ? this.region : (com.powerpanel.kso.model.Region) defaultValue(fields()[1]); record.state = fieldSetFlags()[2] ? this.state : (com.powerpanel.kso.model.State) defaultValue(fields()[2]); record.data = fieldSetFlags()[3] ? this.data : (java.util.List) defaultValue(fields()[3]); - record.rawdata = fieldSetFlags()[4] ? this.rawdata : (java.nio.ByteBuffer) defaultValue(fields()[4]); return record; } catch (java.lang.Exception e) { throw new org.apache.avro.AvroRuntimeException(e);