diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index d531da9..ef01d86 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,9 +1,10 @@ -kafka.brokers="sd5-data.engr.uconn.edu:9092" -kafka.zookeeper="sd5-data.engr.uconn.edu:2181" -app.checkpoint_path="" -app.job_name="pp.kso" +kafka.brokers=sd5-data.engr.uconn.edu:9092 +kafka.zookeeper=sd5-data.engr.uconn.edu:2181 +app.checkpoint_path= +app.job_name=pp.kso app.processing_parallelism=1 app.batch_size_seconds=3 -kafka.topic="GeneratorData" -kafka.consume_from_beginning=true -opentsdb.ip="sd5-data.engr.uconn.edu:4242" +kafka.topic=GeneratorData +kafka.consume_from_beginning=false +kafka.group_id=pp.kso +opentsdb.ip=sd5-data.engr.uconn.edu:4242 diff --git a/src/main/scala/com/powerpanel/kso/KafkaInput.scala b/src/main/scala/com/powerpanel/kso/KafkaInput.scala index 49bbf10..4ff587c 100644 --- a/src/main/scala/com/powerpanel/kso/KafkaInput.scala +++ b/src/main/scala/com/powerpanel/kso/KafkaInput.scala @@ -20,7 +20,7 @@ class KafkaInput extends Serializable { } kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - + kafkaParams.put("group.id", props.getProperty("kafka.group_id")); val messages = KafkaUtils.createDirectStream[String, Array[Byte]]( ssc, LocationStrategies.PreferConsistent,