From 20945429470887125fe64c93deb4576a56cd53d3 Mon Sep 17 00:00:00 2001 From: Evan Langlais Date: Mon, 4 Mar 2019 12:20:08 -0500 Subject: [PATCH] Adding more template code --- .../scala/com/powerpanel/kso/AppConfig.java | 39 +++++++++++++ .../scala/com/powerpanel/kso/KafkaInput.scala | 31 ++++++++++ .../com/powerpanel/kso/KafkaPipeline.scala | 51 ++++++++++++++++ .../com/powerpanel/kso/OpenTSDBOutput.scala | 58 +++++++++++++++++++ .../scala/com/powerpanel/kso/PPConsumer.scala | 35 +++++++++++ .../com/powerpanel/kso/StatReporter.scala | 53 +++++++++++++++++ .../powerpanel/kso/model/StaticHelpers.java | 30 ++++++++++ 7 files changed, 297 insertions(+) create mode 100644 src/main/scala/com/powerpanel/kso/AppConfig.java create mode 100644 src/main/scala/com/powerpanel/kso/KafkaInput.scala create mode 100644 src/main/scala/com/powerpanel/kso/KafkaPipeline.scala create mode 100644 src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala create mode 100644 src/main/scala/com/powerpanel/kso/PPConsumer.scala create mode 100644 src/main/scala/com/powerpanel/kso/StatReporter.scala create mode 100644 src/main/scala/com/powerpanel/kso/model/StaticHelpers.java diff --git a/src/main/scala/com/powerpanel/kso/AppConfig.java b/src/main/scala/com/powerpanel/kso/AppConfig.java new file mode 100644 index 0000000..8e64223 --- /dev/null +++ b/src/main/scala/com/powerpanel/kso/AppConfig.java @@ -0,0 +1,39 @@ +package com.powerpanel.kso; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +import org.apache.log4j.Logger; + +public class AppConfig +{ + private static final Logger LOGGER = Logger.getLogger(AppConfig.class); + + private static Properties _properties; + private static Object _lock = new Object(); + + public static Properties loadProperties() + { + InputStream is = AppConfig.class.getClassLoader().getResourceAsStream("application.properties"); + + synchronized (_lock) + { + if (_properties == null) + { + _properties = new Properties(); + try + { + _properties.load(is); + LOGGER.info("Properties loaded"); + } + catch (IOException e) + { + LOGGER.info("Failed to load properties", e); + System.exit(1); + } + } + return _properties; + } + } +} diff --git a/src/main/scala/com/powerpanel/kso/KafkaInput.scala b/src/main/scala/com/powerpanel/kso/KafkaInput.scala new file mode 100644 index 0000000..f406451 --- /dev/null +++ b/src/main/scala/com/powerpanel/kso/KafkaInput.scala @@ -0,0 +1,31 @@ +package com.powerpanel.kso; + +import kafka.serializer.StringDecoder +import kafka.serializer.DefaultDecoder +import com.powerpanel.kso.model._ +import org.apache.spark.streaming.kafka.KafkaUtils +import org.apache.spark.streaming.StreamingContext + +class KafkaInput extends Serializable { + def readFromKafka (ssc: StreamingContext) = { + val props = AppConfig.loadProperties(); + val topicsSet = props.getProperty("kafka.topic").split(",").toSet; + val kafkaParams = collection.mutable.Map[String, String]("metadata.broker.list" -> props.getProperty("kafka.brokers")) + if (props.getProperty("kafka.consume_from_beginning").toBoolean) + { + kafkaParams.put("auto.offset.reset", "smallest"); + } + val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder]( + ssc, kafkaParams.toMap, topicsSet).repartition(Integer.parseInt(props.getProperty("app.processing_parallelism"))); + + // Decode avro container format + val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc"); + val rawMessages = messages.map(x => { + val eventDecoder = new DataPlatformEventDecoder(avroSchemaString); + val payload = x._2; + val dataPlatformEvent = eventDecoder.decode(payload); + dataPlatformEvent; + }); + rawMessages; + }; +} diff --git a/src/main/scala/com/powerpanel/kso/KafkaPipeline.scala b/src/main/scala/com/powerpanel/kso/KafkaPipeline.scala new file mode 100644 index 0000000..79d84b2 --- /dev/null +++ b/src/main/scala/com/powerpanel/kso/KafkaPipeline.scala @@ -0,0 +1,51 @@ +package com.powerpanel.kso; + +import com.powerpanel.kso._ +import com.powerpanel.kso.model._ +import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.Seconds +import org.apache.spark.SparkConf +import org.apache.log4j.Logger; + +class KafkaPipeline extends Serializable { + + object Holder extends Serializable { + @transient lazy val logger = Logger.getLogger(getClass.getName) + } + + def create() = { + + val parseMessages = (messagesDstream: DStream[DataPlatformEvent]) => { + + val parsedMessages = messagesDstream.flatMap(dataPlatformEvent => { + val parsed = dataPlatformEvent.getRawdata(); + Some(parsed); + }); + parsedMessages + }: DStream[String]; + + val props = AppConfig.loadProperties(); + val checkpointDirectory = props.getProperty("app.checkpoint_path"); + val batchSizeSeconds = Integer.parseInt(props.getProperty("app.batch_size_seconds")); + + val sparkConf = new SparkConf(); + Holder.logger.info("Creating new spark context with checkpoint directory: " + checkpointDirectory) + val ssc = new StreamingContext(sparkConf, Seconds(batchSizeSeconds)); + + if (checkpointDirectory.length() > 0) { + ssc.checkpoint(checkpointDirectory); + } + + val inputStream = new KafkaInput().readFromKafka(ssc); + val parsedStream = parseMessages(inputStream); + val writeCounts: DStream[Integer] = + + new OpenTSDBOutput().putOpentsdb( + props.getProperty("opentsdb.ip"), + parsedStream); + + writeCounts.reduce(_ + _).print(1); + ssc; + }: StreamingContext +} diff --git a/src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala b/src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala new file mode 100644 index 0000000..bb289fb --- /dev/null +++ b/src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala @@ -0,0 +1,58 @@ +package com.powerpanel.kso; + +import org.joda.time.DateTime +import scala.util.control.NonFatal +import org.apache.log4j.Logger +import org.apache.spark.streaming.dstream.DStream +import org.apache.http.client.methods.HttpPost +import org.apache.http.entity.StringEntity +import org.apache.http.impl.client.DefaultHttpClient +import org.json4s._ +import org.json4s.jackson.JsonMethods._ +import org.json4s.JsonDSL._ + +class OpenTSDBOutput extends Serializable { + + + def putOpentsdb[T](opentsdbIP: String, + stream: DStream[String]) = { + stream.mapPartitions(partition => { + var count = 0; + partition.foreach(rowData => + { + + val json = parse(rowData.replace("'", "\"")) + val host = compact(render((json \\ "host"))).replace("\"", "") + val timestampStr = compact(render((json \\ "timestamp"))).replace("\"", "") + val value = (compact(render((json \\ "value"))).replace("\"", "")).toDouble + val collectd_type = compact(render((json \\ "collectd_type"))).replace("\"", "") + var metric:String = "kso.collectd" + metric = metric.concat("." + collectd_type) + val timestamp = new DateTime(timestampStr).getMillis + val body = f"""{ + | "metric": "$metric", + | "value": "$value", + | "timestamp": $timestamp, + | "tags": {"host": "$host"} + |}""".stripMargin + + var openTSDBUrl = "http://" + opentsdbIP + "/api/put" + try { + val httpClient = new DefaultHttpClient() + val post = new HttpPost(openTSDBUrl) + post.setHeader("Content-type", "application/json") + post.setEntity(new StringEntity(body)) + httpClient.execute(post) + + } catch { + case NonFatal(t) => { + + } + } + + count += 1 + }); + Iterator[Integer](count) + }); + } +} diff --git a/src/main/scala/com/powerpanel/kso/PPConsumer.scala b/src/main/scala/com/powerpanel/kso/PPConsumer.scala new file mode 100644 index 0000000..b6ac4ed --- /dev/null +++ b/src/main/scala/com/powerpanel/kso/PPConsumer.scala @@ -0,0 +1,35 @@ +package com.powerpanel.kso; + +import com.powerpanel.kso._ +import org.apache.spark.streaming.StreamingContext +import org.apache.log4j.Logger; + +object PPConsumer { + + private[this] val logger = Logger.getLogger(getClass().getName()); + + def main(args: Array[String]) { + + val props = AppConfig.loadProperties(); + val loggerUrl = props.getProperty("environment.metric_logger_url") + val appName = props.getProperty("component.application") + val checkpointDirectory = props.getProperty("app.checkpoint_path"); + val batchSizeSeconds = Integer.parseInt(props.getProperty("app.batch_size_seconds")); + + val pipeline = new KafkaPipeline() + // Create the streaming context, or load a saved one from disk + val ssc = if (checkpointDirectory.length() > 0) StreamingContext.getOrCreate(checkpointDirectory, pipeline.create) else pipeline.create(); + + sys.ShutdownHookThread { + logger.info("Gracefully stopping Spark Streaming Application") + ssc.stop(true, true) + logger.info("Application stopped") + } + + logger.info("Starting spark streaming execution") + logger.info("Logger url: " + loggerUrl) + ssc.addStreamingListener(new StatReporter(appName, loggerUrl)) + ssc.start() + ssc.awaitTermination() + } +} diff --git a/src/main/scala/com/powerpanel/kso/StatReporter.scala b/src/main/scala/com/powerpanel/kso/StatReporter.scala new file mode 100644 index 0000000..e8d3871 --- /dev/null +++ b/src/main/scala/com/powerpanel/kso/StatReporter.scala @@ -0,0 +1,53 @@ +package com.powerpanel.kso + +import scala.util.control.NonFatal +import java.io.StringWriter +import java.io.PrintWriter +import org.apache.spark.streaming.scheduler.StreamingListener +import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted +import org.apache.log4j.Logger +import org.apache.http.client.methods.HttpPost +import org.apache.http.entity.StringEntity +import org.apache.http.impl.client.DefaultHttpClient + +class StatReporter(appName: String, metricsUrl: String) extends StreamingListener { + + private[this] val logger = Logger.getLogger(getClass().getName()) + + override def onBatchCompleted(batch: StreamingListenerBatchCompleted) = { + def doSend(metricName: String, metricValue: String) = { + try { + val httpClient = new DefaultHttpClient() + val post = new HttpPost(metricsUrl) + post.setHeader("Content-type", "application/json") + val ts = java.lang.System.currentTimeMillis() + val body = f"""{ + | "data": [{ + | "source": "application.$appName", + | "metric": "application.kpi.$appName.$metricName", + | "value": "$metricValue", + | "timestamp": $ts%d + | }], + | "timestamp": $ts%d + |}""".stripMargin + + logger.debug(body) + post.setEntity(new StringEntity(body)) + val response = httpClient.execute(post) + if (response.getStatusLine.getStatusCode() != 200) { + logger.error("POST failed: " + metricsUrl + " response:" + response.getStatusLine.getStatusCode()) + } + + } catch { + case NonFatal(t) => { + logger.error("POST failed: " + metricsUrl) + val sw = new StringWriter + t.printStackTrace(new PrintWriter(sw)) + logger.error(sw.toString) + } + } + } + doSend("processing-delay", batch.batchInfo.processingDelay.get.toString()) + doSend("scheduling-delay", batch.batchInfo.schedulingDelay.get.toString()) + } +} diff --git a/src/main/scala/com/powerpanel/kso/model/StaticHelpers.java b/src/main/scala/com/powerpanel/kso/model/StaticHelpers.java new file mode 100644 index 0000000..4533d86 --- /dev/null +++ b/src/main/scala/com/powerpanel/kso/model/StaticHelpers.java @@ -0,0 +1,30 @@ +package com.powerpanel.kso.model; + +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; + +public class StaticHelpers { + public static String loadResourceFile(String path) + { + InputStream is = StaticHelpers.class.getClassLoader().getResourceAsStream(path); + try + { + char[] buf = new char[2048]; + Reader r = new InputStreamReader(is, "UTF-8"); + StringBuilder s = new StringBuilder(); + while (true) + { + int n = r.read(buf); + if (n < 0) + break; + s.append(buf, 0, n); + } + return s.toString(); + } + catch (Exception e) + { + return null; + } + } +}