Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Adding more template code
  • Loading branch information
eal13009 committed Mar 4, 2019
1 parent 9053740 commit 2094542
Show file tree
Hide file tree
Showing 7 changed files with 297 additions and 0 deletions.
39 changes: 39 additions & 0 deletions src/main/scala/com/powerpanel/kso/AppConfig.java
@@ -0,0 +1,39 @@
package com.powerpanel.kso;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import org.apache.log4j.Logger;

public class AppConfig
{
private static final Logger LOGGER = Logger.getLogger(AppConfig.class);

private static Properties _properties;
private static Object _lock = new Object();

public static Properties loadProperties()
{
InputStream is = AppConfig.class.getClassLoader().getResourceAsStream("application.properties");

synchronized (_lock)
{
if (_properties == null)
{
_properties = new Properties();
try
{
_properties.load(is);
LOGGER.info("Properties loaded");
}
catch (IOException e)
{
LOGGER.info("Failed to load properties", e);
System.exit(1);
}
}
return _properties;
}
}
}
31 changes: 31 additions & 0 deletions src/main/scala/com/powerpanel/kso/KafkaInput.scala
@@ -0,0 +1,31 @@
package com.powerpanel.kso;

import kafka.serializer.StringDecoder
import kafka.serializer.DefaultDecoder
import com.powerpanel.kso.model._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.StreamingContext

class KafkaInput extends Serializable {
def readFromKafka (ssc: StreamingContext) = {
val props = AppConfig.loadProperties();
val topicsSet = props.getProperty("kafka.topic").split(",").toSet;
val kafkaParams = collection.mutable.Map[String, String]("metadata.broker.list" -> props.getProperty("kafka.brokers"))
if (props.getProperty("kafka.consume_from_beginning").toBoolean)
{
kafkaParams.put("auto.offset.reset", "smallest");
}
val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
ssc, kafkaParams.toMap, topicsSet).repartition(Integer.parseInt(props.getProperty("app.processing_parallelism")));

// Decode avro container format
val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc");
val rawMessages = messages.map(x => {
val eventDecoder = new DataPlatformEventDecoder(avroSchemaString);
val payload = x._2;
val dataPlatformEvent = eventDecoder.decode(payload);
dataPlatformEvent;
});
rawMessages;
};
}
51 changes: 51 additions & 0 deletions src/main/scala/com/powerpanel/kso/KafkaPipeline.scala
@@ -0,0 +1,51 @@
package com.powerpanel.kso;

import com.powerpanel.kso._
import com.powerpanel.kso.model._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf
import org.apache.log4j.Logger;

class KafkaPipeline extends Serializable {

object Holder extends Serializable {
@transient lazy val logger = Logger.getLogger(getClass.getName)
}

def create() = {

val parseMessages = (messagesDstream: DStream[DataPlatformEvent]) => {

val parsedMessages = messagesDstream.flatMap(dataPlatformEvent => {
val parsed = dataPlatformEvent.getRawdata();
Some(parsed);
});
parsedMessages
}: DStream[String];

val props = AppConfig.loadProperties();
val checkpointDirectory = props.getProperty("app.checkpoint_path");
val batchSizeSeconds = Integer.parseInt(props.getProperty("app.batch_size_seconds"));

val sparkConf = new SparkConf();
Holder.logger.info("Creating new spark context with checkpoint directory: " + checkpointDirectory)
val ssc = new StreamingContext(sparkConf, Seconds(batchSizeSeconds));

if (checkpointDirectory.length() > 0) {
ssc.checkpoint(checkpointDirectory);
}

val inputStream = new KafkaInput().readFromKafka(ssc);
val parsedStream = parseMessages(inputStream);
val writeCounts: DStream[Integer] =

new OpenTSDBOutput().putOpentsdb(
props.getProperty("opentsdb.ip"),
parsedStream);

writeCounts.reduce(_ + _).print(1);
ssc;
}: StreamingContext
}
58 changes: 58 additions & 0 deletions src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala
@@ -0,0 +1,58 @@
package com.powerpanel.kso;

import org.joda.time.DateTime
import scala.util.control.NonFatal
import org.apache.log4j.Logger
import org.apache.spark.streaming.dstream.DStream
import org.apache.http.client.methods.HttpPost
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.DefaultHttpClient
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.JsonDSL._

class OpenTSDBOutput extends Serializable {


def putOpentsdb[T](opentsdbIP: String,
stream: DStream[String]) = {
stream.mapPartitions(partition => {
var count = 0;
partition.foreach(rowData =>
{

val json = parse(rowData.replace("'", "\""))
val host = compact(render((json \\ "host"))).replace("\"", "")
val timestampStr = compact(render((json \\ "timestamp"))).replace("\"", "")
val value = (compact(render((json \\ "value"))).replace("\"", "")).toDouble
val collectd_type = compact(render((json \\ "collectd_type"))).replace("\"", "")
var metric:String = "kso.collectd"
metric = metric.concat("." + collectd_type)
val timestamp = new DateTime(timestampStr).getMillis
val body = f"""{
| "metric": "$metric",
| "value": "$value",
| "timestamp": $timestamp,
| "tags": {"host": "$host"}
|}""".stripMargin

var openTSDBUrl = "http://" + opentsdbIP + "/api/put"
try {
val httpClient = new DefaultHttpClient()
val post = new HttpPost(openTSDBUrl)
post.setHeader("Content-type", "application/json")
post.setEntity(new StringEntity(body))
httpClient.execute(post)

} catch {
case NonFatal(t) => {

}
}

count += 1
});
Iterator[Integer](count)
});
}
}
35 changes: 35 additions & 0 deletions src/main/scala/com/powerpanel/kso/PPConsumer.scala
@@ -0,0 +1,35 @@
package com.powerpanel.kso;

import com.powerpanel.kso._
import org.apache.spark.streaming.StreamingContext
import org.apache.log4j.Logger;

object PPConsumer {

private[this] val logger = Logger.getLogger(getClass().getName());

def main(args: Array[String]) {

val props = AppConfig.loadProperties();
val loggerUrl = props.getProperty("environment.metric_logger_url")
val appName = props.getProperty("component.application")
val checkpointDirectory = props.getProperty("app.checkpoint_path");
val batchSizeSeconds = Integer.parseInt(props.getProperty("app.batch_size_seconds"));

val pipeline = new KafkaPipeline()
// Create the streaming context, or load a saved one from disk
val ssc = if (checkpointDirectory.length() > 0) StreamingContext.getOrCreate(checkpointDirectory, pipeline.create) else pipeline.create();

sys.ShutdownHookThread {
logger.info("Gracefully stopping Spark Streaming Application")
ssc.stop(true, true)
logger.info("Application stopped")
}

logger.info("Starting spark streaming execution")
logger.info("Logger url: " + loggerUrl)
ssc.addStreamingListener(new StatReporter(appName, loggerUrl))
ssc.start()
ssc.awaitTermination()
}
}
53 changes: 53 additions & 0 deletions src/main/scala/com/powerpanel/kso/StatReporter.scala
@@ -0,0 +1,53 @@
package com.powerpanel.kso

import scala.util.control.NonFatal
import java.io.StringWriter
import java.io.PrintWriter
import org.apache.spark.streaming.scheduler.StreamingListener
import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted
import org.apache.log4j.Logger
import org.apache.http.client.methods.HttpPost
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.DefaultHttpClient

class StatReporter(appName: String, metricsUrl: String) extends StreamingListener {

private[this] val logger = Logger.getLogger(getClass().getName())

override def onBatchCompleted(batch: StreamingListenerBatchCompleted) = {
def doSend(metricName: String, metricValue: String) = {
try {
val httpClient = new DefaultHttpClient()
val post = new HttpPost(metricsUrl)
post.setHeader("Content-type", "application/json")
val ts = java.lang.System.currentTimeMillis()
val body = f"""{
| "data": [{
| "source": "application.$appName",
| "metric": "application.kpi.$appName.$metricName",
| "value": "$metricValue",
| "timestamp": $ts%d
| }],
| "timestamp": $ts%d
|}""".stripMargin

logger.debug(body)
post.setEntity(new StringEntity(body))
val response = httpClient.execute(post)
if (response.getStatusLine.getStatusCode() != 200) {
logger.error("POST failed: " + metricsUrl + " response:" + response.getStatusLine.getStatusCode())
}

} catch {
case NonFatal(t) => {
logger.error("POST failed: " + metricsUrl)
val sw = new StringWriter
t.printStackTrace(new PrintWriter(sw))
logger.error(sw.toString)
}
}
}
doSend("processing-delay", batch.batchInfo.processingDelay.get.toString())
doSend("scheduling-delay", batch.batchInfo.schedulingDelay.get.toString())
}
}
30 changes: 30 additions & 0 deletions src/main/scala/com/powerpanel/kso/model/StaticHelpers.java
@@ -0,0 +1,30 @@
package com.powerpanel.kso.model;

import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;

public class StaticHelpers {
public static String loadResourceFile(String path)
{
InputStream is = StaticHelpers.class.getClassLoader().getResourceAsStream(path);
try
{
char[] buf = new char[2048];
Reader r = new InputStreamReader(is, "UTF-8");
StringBuilder s = new StringBuilder();
while (true)
{
int n = r.read(buf);
if (n < 0)
break;
s.append(buf, 0, n);
}
return s.toString();
}
catch (Exception e)
{
return null;
}
}
}

0 comments on commit 2094542

Please sign in to comment.