From 2688b54b1ceaf8952912a9a6cefabf615682e97b Mon Sep 17 00:00:00 2001 From: Evan Langlais Date: Wed, 6 Mar 2019 15:34:00 -0500 Subject: [PATCH] Modifying byte reading --- .../scala/com/powerpanel/kso/KafkaInput.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/powerpanel/kso/KafkaInput.scala b/src/main/scala/com/powerpanel/kso/KafkaInput.scala index 5da9f8a..a6717a1 100644 --- a/src/main/scala/com/powerpanel/kso/KafkaInput.scala +++ b/src/main/scala/com/powerpanel/kso/KafkaInput.scala @@ -8,10 +8,9 @@ import org.apache.spark.streaming.kafka010.ConsumerStrategies import org.apache.spark.streaming.kafka010.LocationStrategies import org.apache.avro.specific.SpecificDatumReader import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput} -import scala.collection.JavaConverters._ -// import org.apache.avro.generic.DatumReader +// import scala.collection.JavaConverters._ import org.apache.spark.streaming.StreamingContext - +import org.apache.avro.io.DecoderFactory class KafkaInput extends Serializable { def readFromKafka (ssc: StreamingContext) = { val props = AppConfig.loadProperties(); @@ -40,10 +39,18 @@ class KafkaInput extends Serializable { // DataFileReader dataFileReader = new DataFileReader(file, userDatumReader); // val dataPackageDecoder = DataPackage.getDecoder(); val payload = x.value(); + /* val datumReader = new SpecificDatumReader[DataPackage](DataPackage.getClassSchema) val inputStream = new SeekableByteArrayInput(payload) val dataFileReader = new DataFileReader[DataPackage](inputStream, datumReader) - val dataPackage = dataFileReader.next() + val dataPackage = dataFileReader.next()*/ + val datumReader = new SpecificDatumReader[DataPackage](DataPackage.getClassSchema) + val inputStream = new SeekableByteArrayInput(payload) + val decoder = DecoderFactory.get.binaryDecoder(inputStream, null) + + // while (!decoder.isEnd) { + val dataPackage = datumReader.read(null, decoder) + //} // val dataPackage = dataPackageDecoder.decode(payload); dataPackage; });