Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Modifying properties for new Kafka versions
  • Loading branch information
eal13009 committed Mar 6, 2019
1 parent f4a25ac commit 5b4d680
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
16 changes: 9 additions & 7 deletions src/main/resources/application.properties
@@ -1,7 +1,9 @@
job_name="pp.kso.collectd"
zookeeper_connect="sd5-data.engr.uconn.edu:2181"
broker_list="sd5-data.engr.uconn.edu:9092"
consumer_group="pp.kso.collectd"
zookeeper_timeout=6000
topic_list="GeneratorData"
opentsdb="sd5-data.engr.uconn.edu:4242"
kafka.brokers="sd5-data.engr.uconn.edu:9092"
kafka.zookeeper="sd5-data.engr.uconn.edu:2181"
app.checkpoint_path=""
app.job_name="pp.kso"
app.processing_parallelism=1
app.batch_size_seconds=3
kafka.topic="GeneratorData"
kafka.consume_from_beginning=true
opentsdb.ip="sd5-data.engr.uconn.edu:4242"
8 changes: 6 additions & 2 deletions src/main/scala/com/powerpanel/kso/KafkaInput.scala
Expand Up @@ -12,11 +12,15 @@ class KafkaInput extends Serializable {
def readFromKafka (ssc: StreamingContext) = {
val props = AppConfig.loadProperties();
val topicsSet = props.getProperty("kafka.topic").split(",").toSet;
val kafkaParams = collection.mutable.Map[String, String]("metadata.broker.list" -> props.getProperty("kafka.brokers"))

val kafkaParams = collection.mutable.Map[String, String]("bootstrap.servers" -> props.getProperty("kafka.brokers"))
if (props.getProperty("kafka.consume_from_beginning").toBoolean)
{
kafkaParams.put("auto.offset.reset", "smallest");
kafkaParams.put("auto.offset.reset", "earliest");
}
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");

val messages = KafkaUtils.createDirectStream[String, Array[Byte]](
ssc,
LocationStrategies.PreferConsistent,
Expand Down

0 comments on commit 5b4d680

Please sign in to comment.