From 16eae2dd60803f144eb6cee7d32dc60d96688881 Mon Sep 17 00:00:00 2001 From: Evan Langlais Date: Wed, 6 Mar 2019 15:48:13 -0500 Subject: [PATCH] Adding OpenTSDB logging --- src/main/scala/com/powerpanel/kso/KafkaInput.scala | 2 +- src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/powerpanel/kso/KafkaInput.scala b/src/main/scala/com/powerpanel/kso/KafkaInput.scala index a6717a1..5b8eb5e 100644 --- a/src/main/scala/com/powerpanel/kso/KafkaInput.scala +++ b/src/main/scala/com/powerpanel/kso/KafkaInput.scala @@ -23,7 +23,7 @@ class KafkaInput extends Serializable { } kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - kafkaParams.put("group.id", props.getProperty("kafka.group_id")); + kafkaParams.put("group.id", props.getProperty("kafka.group_id")); val messages = KafkaUtils.createDirectStream[String, Array[Byte]]( ssc, LocationStrategies.PreferConsistent, diff --git a/src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala b/src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala index f59e082..c4b360d 100644 --- a/src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala +++ b/src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala @@ -15,7 +15,8 @@ import scala.collection.JavaConversions._ class OpenTSDBOutput extends Serializable { - + private[this] val logger = Logger.getLogger(getClass().getName()); + def putOpentsdb[T](opentsdbIP: String, stream: DStream[DataPackage]) = { stream.mapPartitions(partition => { var count = 0; @@ -57,6 +58,8 @@ class OpenTSDBOutput extends Serializable { } body.concat("]"); + logger.info(body); + var openTSDBUrl = "http://" + opentsdbIP + "/api/put" try { val httpClient = new DefaultHttpClient()