diff --git a/src/main/scala/com/powerpanel/kso/KafkaInput.scala b/src/main/scala/com/powerpanel/kso/KafkaInput.scala index 5da9f8a..a6717a1 100644 --- a/src/main/scala/com/powerpanel/kso/KafkaInput.scala +++ b/src/main/scala/com/powerpanel/kso/KafkaInput.scala @@ -8,10 +8,9 @@ import org.apache.spark.streaming.kafka010.ConsumerStrategies import org.apache.spark.streaming.kafka010.LocationStrategies import org.apache.avro.specific.SpecificDatumReader import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput} -import scala.collection.JavaConverters._ -// import org.apache.avro.generic.DatumReader +// import scala.collection.JavaConverters._ import org.apache.spark.streaming.StreamingContext - +import org.apache.avro.io.DecoderFactory class KafkaInput extends Serializable { def readFromKafka (ssc: StreamingContext) = { val props = AppConfig.loadProperties(); @@ -40,10 +39,18 @@ class KafkaInput extends Serializable { // DataFileReader dataFileReader = new DataFileReader(file, userDatumReader); // val dataPackageDecoder = DataPackage.getDecoder(); val payload = x.value(); + /* val datumReader = new SpecificDatumReader[DataPackage](DataPackage.getClassSchema) val inputStream = new SeekableByteArrayInput(payload) val dataFileReader = new DataFileReader[DataPackage](inputStream, datumReader) - val dataPackage = dataFileReader.next() + val dataPackage = dataFileReader.next()*/ + val datumReader = new SpecificDatumReader[DataPackage](DataPackage.getClassSchema) + val inputStream = new SeekableByteArrayInput(payload) + val decoder = DecoderFactory.get.binaryDecoder(inputStream, null) + + // while (!decoder.isEnd) { + val dataPackage = datumReader.read(null, decoder) + //} // val dataPackage = dataPackageDecoder.decode(payload); dataPackage; });