diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 316e68b..d531da9 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,7 +1,9 @@ -job_name="pp.kso.collectd" -zookeeper_connect="sd5-data.engr.uconn.edu:2181" -broker_list="sd5-data.engr.uconn.edu:9092" -consumer_group="pp.kso.collectd" -zookeeper_timeout=6000 -topic_list="GeneratorData" -opentsdb="sd5-data.engr.uconn.edu:4242" \ No newline at end of file +kafka.brokers="sd5-data.engr.uconn.edu:9092" +kafka.zookeeper="sd5-data.engr.uconn.edu:2181" +app.checkpoint_path="" +app.job_name="pp.kso" +app.processing_parallelism=1 +app.batch_size_seconds=3 +kafka.topic="GeneratorData" +kafka.consume_from_beginning=true +opentsdb.ip="sd5-data.engr.uconn.edu:4242" diff --git a/src/main/scala/com/powerpanel/kso/KafkaInput.scala b/src/main/scala/com/powerpanel/kso/KafkaInput.scala index 84582bb..49bbf10 100644 --- a/src/main/scala/com/powerpanel/kso/KafkaInput.scala +++ b/src/main/scala/com/powerpanel/kso/KafkaInput.scala @@ -12,11 +12,15 @@ class KafkaInput extends Serializable { def readFromKafka (ssc: StreamingContext) = { val props = AppConfig.loadProperties(); val topicsSet = props.getProperty("kafka.topic").split(",").toSet; - val kafkaParams = collection.mutable.Map[String, String]("metadata.broker.list" -> props.getProperty("kafka.brokers")) + + val kafkaParams = collection.mutable.Map[String, String]("bootstrap.servers" -> props.getProperty("kafka.brokers")) if (props.getProperty("kafka.consume_from_beginning").toBoolean) { - kafkaParams.put("auto.offset.reset", "smallest"); + kafkaParams.put("auto.offset.reset", "earliest"); } + kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + val messages = KafkaUtils.createDirectStream[String, Array[Byte]]( ssc, LocationStrategies.PreferConsistent,