Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Modifying byte reading
  • Loading branch information
Evan Langlais committed Mar 6, 2019
1 parent a974cb6 commit 2688b54
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions src/main/scala/com/powerpanel/kso/KafkaInput.scala
Expand Up @@ -8,10 +8,9 @@ import org.apache.spark.streaming.kafka010.ConsumerStrategies
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput}
import scala.collection.JavaConverters._
// import org.apache.avro.generic.DatumReader
// import scala.collection.JavaConverters._
import org.apache.spark.streaming.StreamingContext

import org.apache.avro.io.DecoderFactory
class KafkaInput extends Serializable {
def readFromKafka (ssc: StreamingContext) = {
val props = AppConfig.loadProperties();
Expand Down Expand Up @@ -40,10 +39,18 @@ class KafkaInput extends Serializable {
// DataFileReader<User> dataFileReader = new DataFileReader<User>(file, userDatumReader);
// val dataPackageDecoder = DataPackage.getDecoder();
val payload = x.value();
/*
val datumReader = new SpecificDatumReader[DataPackage](DataPackage.getClassSchema)
val inputStream = new SeekableByteArrayInput(payload)
val dataFileReader = new DataFileReader[DataPackage](inputStream, datumReader)
val dataPackage = dataFileReader.next()
val dataPackage = dataFileReader.next()*/
val datumReader = new SpecificDatumReader[DataPackage](DataPackage.getClassSchema)
val inputStream = new SeekableByteArrayInput(payload)
val decoder = DecoderFactory.get.binaryDecoder(inputStream, null)

// while (!decoder.isEnd) {
val dataPackage = datumReader.read(null, decoder)
//}
// val dataPackage = dataPackageDecoder.decode(payload);
dataPackage;
});
Expand Down

0 comments on commit 2688b54

Please sign in to comment.