Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Adding new model for the DataPackage, updating Kafka spark connecter,…
… Modifying pipeline
  • Loading branch information
Evan Langlais committed Mar 5, 2019
1 parent 7343c96 commit af640d8
Show file tree
Hide file tree
Showing 17 changed files with 1,359 additions and 642 deletions.
612 changes: 0 additions & 612 deletions .classpath

This file was deleted.

77 changes: 74 additions & 3 deletions .gitignore
@@ -1,5 +1,76 @@
# Created by https://www.gitignore.io/api/sbt,scala
# Edit at https://www.gitignore.io/?templates=sbt,scala

# Created by https://www.gitignore.io/api/sbt,scala,eclipse
# Edit at https://www.gitignore.io/?templates=sbt,scala,eclipse

### Eclipse ###

.metadata
bin/
tmp/
*.tmp
*.bak
*.swp
*~.nib
local.properties
.settings/
.loadpath
.recommenders

# External tool builders
.externalToolBuilders/

# Locally stored "Eclipse launch configurations"
*.launch

# PyDev specific (Python IDE for Eclipse)
*.pydevproject

# CDT-specific (C/C++ Development Tooling)
.cproject

# CDT- autotools
.autotools

# Java annotation processor (APT)
.factorypath

# PDT-specific (PHP Development Tools)
.buildpath

# sbteclipse plugin
.target

# Tern plugin
.tern-project

# TeXlipse plugin
.texlipse

# STS (Spring Tool Suite)
.springBeans

# Code Recommenders
.recommenders/

# Annotation Processing
.apt_generated/

# Scala IDE specific (Scala & Java development for Eclipse)
.cache-main
.scala_dependencies
.worksheet

### Eclipse Patch ###
# Eclipse Core
.project

# JDT-specific (Eclipse Java Development Tools)
.classpath

# Annotation Processing
.apt_generated

.sts4-cache/

### SBT ###
# Simple Build Tool
Expand All @@ -19,4 +90,4 @@ project/plugins/project/
*.class
*.log

# End of https://www.gitignore.io/api/sbt,scala
# End of https://www.gitignore.io/api/sbt,scala,eclipse
3 changes: 0 additions & 3 deletions .settings/org.eclipse.core.resources.prefs

This file was deleted.

4 changes: 2 additions & 2 deletions build.sbt
Expand Up @@ -17,8 +17,8 @@ packageZipTarball in Universal := {
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.4.0",
"org.apache.spark" %% "spark-streaming" % "2.4.0",
"org.apache.spark" %% "spark-streaming-kafka" % "1.6.3",
"org.apache.kafka" %% "kafka" % "0.9.0.1",
"org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.4.0",
"org.apache.kafka" %% "kafka" % "2.0.0",
"org.apache.avro" % "avro" % "1.8.2",
"org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.13",
"joda-time" % "joda-time" % "2.7",
Expand Down
7 changes: 7 additions & 0 deletions src/main/resources/application.proerties
@@ -0,0 +1,7 @@
job_name="pp.kso.collectd"
zookeeper_connect="sd5-data.engr.uconn.edu:2181"
broker_list="sd5-data.engr.uconn.edu:9092"
consumer_group="pp.kso.collectd"
zookeeper_timeout=6000
topic_list="GeneratorData"
opentsdb="sd5-data.engr.uconn.edu:4242"
68 changes: 68 additions & 0 deletions src/main/resources/datapackage.avsc
@@ -0,0 +1,68 @@
{
"namespace": "com.powerpanel.kso.model",
"type": "record",
"name": "DataPackage",
"fields": [
{
"type": "int",
"name": "generator"
},
{
"name": "region",
"type": {
"name": "Region",
"type": "enum",
"symbols": ["NORTHEAST", "SOUTHEAST", "WEST", "MIDWEST"]
}
},
{
"name": "state",
"type": {
"name": "State",
"type": "enum",
"symbols": ["AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA", "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD", "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"]
}
},
{
"name": "data",
"type": {
"type": "array",
"items": {
"name": "MetricData",
"type": "record",
"fields": [
{
"name": "metric",
"type": "string"
},
{
"name": "datapoints",
"type": {
"type": "array",
"items": {
"name": "DataPoint",
"type": "record",
"fields": [
{
"name": "timestamp",
"type": "long"
},
{
"name": "value",
"type": ["long", "double"]
}
]
}

}
}
]
}
}
},
{
"name": "rawdata",
"type": "bytes"
}
]
}
4 changes: 2 additions & 2 deletions src/main/scala/com/powerpanel/kso/Consumer.scala
Expand Up @@ -4,7 +4,7 @@ import com.powerpanel.kso._
import org.apache.spark.streaming.StreamingContext
import org.apache.log4j.Logger;

object PPConsumer {
object Consumer {

private[this] val logger = Logger.getLogger(getClass().getName());

Expand All @@ -27,7 +27,7 @@ object PPConsumer {
}

logger.info("Starting spark streaming execution")
logger.info("Logger url: " + loggerUrl)
// logger.info("Logger url: " + loggerUrl)
// ssc.addStreamingListener(new StatReporter(appName, loggerUrl))
ssc.start()
ssc.awaitTermination()
Expand Down
23 changes: 15 additions & 8 deletions src/main/scala/com/powerpanel/kso/KafkaInput.scala
Expand Up @@ -3,7 +3,9 @@ package com.powerpanel.kso;
import kafka.serializer.StringDecoder
import kafka.serializer.DefaultDecoder
import com.powerpanel.kso.model._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.ConsumerStrategies
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.spark.streaming.StreamingContext

class KafkaInput extends Serializable {
Expand All @@ -15,16 +17,21 @@ class KafkaInput extends Serializable {
{
kafkaParams.put("auto.offset.reset", "smallest");
}
val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
ssc, kafkaParams.toMap, topicsSet).repartition(Integer.parseInt(props.getProperty("app.processing_parallelism")));
val messages = KafkaUtils.createDirectStream[String, Array[Byte]](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, Array[Byte]](topicsSet, kafkaParams)
);
//.repartition(Integer.parseInt(props.getProperty("app.processing_parallelism")));

// Decode avro container format
val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc");
// val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc");
val rawMessages = messages.map(x => {
val eventDecoder = new DataPlatformEventDecoder(avroSchemaString);
val payload = x._2;
val dataPlatformEvent = eventDecoder.decode(payload);
dataPlatformEvent;
// val eventDecoder = new DataPlatformEventDecoder(avroSchemaString);
val dataPackageDecoder = DataPackage.getDecoder();
val payload = x.value();
val dataPackage = dataPackageDecoder.decode(payload);
dataPackage;
});
rawMessages;
};
Expand Down
13 changes: 7 additions & 6 deletions src/main/scala/com/powerpanel/kso/KafkaPipeline.scala
Expand Up @@ -15,14 +15,15 @@ class KafkaPipeline extends Serializable {
}

def create() = {

val parseMessages = (messagesDstream: DStream[DataPlatformEvent]) => {
val parsedMessages = messagesDstream.flatMap(dataPlatformEvent => {
/*
val parseMessages = (messagesDstream: DStream[DataPackage]) => {
val parsedMessages = messagesDstream.flatMap(dataPackage => {
val parsed = dataPlatformEvent.getRawdata();
Some(parsed);
});
parsedMessages
}: DStream[String];
*/

val props = AppConfig.loadProperties();
val checkpointDirectory = props.getProperty("app.checkpoint_path");
Expand All @@ -36,13 +37,13 @@ class KafkaPipeline extends Serializable {
ssc.checkpoint(checkpointDirectory);
}

val inputStream = new KafkaInput().readFromKafka(ssc);
val parsedStream = parseMessages(inputStream);
val dataStream = new KafkaInput().readFromKafka(ssc);
// val parsedStream = parseMessages(inputStream);
val writeCounts: DStream[Integer] =

new OpenTSDBOutput().putOpentsdb(
props.getProperty("opentsdb.ip"),
parsedStream);
dataStream);

writeCounts.reduce(_ + _).print(1);
ssc;
Expand Down
31 changes: 26 additions & 5 deletions src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala
Expand Up @@ -2,6 +2,7 @@ package com.powerpanel.kso;

import org.joda.time.DateTime
import scala.util.control.NonFatal
import com.powerpanel.kso.model._;
import org.apache.log4j.Logger
import org.apache.spark.streaming.dstream.DStream
import org.apache.http.client.methods.HttpPost
Expand All @@ -10,17 +11,17 @@ import org.apache.http.impl.client.DefaultHttpClient
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.JsonDSL._
import scala.collection.JavaConversions._

class OpenTSDBOutput extends Serializable {


def putOpentsdb[T](opentsdbIP: String,
stream: DStream[String]) = {
def putOpentsdb[T](opentsdbIP: String, stream: DStream[DataPackage]) = {
stream.mapPartitions(partition => {
var count = 0;
partition.foreach(rowData =>
partition.foreach((generatorData: DataPackage) =>
{

/*
val json = parse(rowData.replace("'", "\""))
val host = compact(render((json \\ "host"))).replace("\"", "")
val timestampStr = compact(render((json \\ "timestamp"))).replace("\"", "")
Expand All @@ -35,7 +36,27 @@ class OpenTSDBOutput extends Serializable {
| "timestamp": $timestamp,
| "tags": {"host": "$host"}
|}""".stripMargin

*/
var body: String = "[";
for(metricData <- generatorData.getData()) {
for(dataPoint <- metricData.getDatapoints()) {
val metric = metricData.getMetric();
val value = dataPoint.getValue();
val timestamp = dataPoint.getTimestamp();
val generator_id = generatorData.getGenerator();
val region = generatorData.getRegion();
val state = generatorData.getState;
val pstring = f"""{
| "metric": "$metric",
| "value": "$value",
| "timestamp": $timestamp,
| "tags": {"generator": $generator_id, "state": "$state", "region": "$region"}
|},""";
body.concat(pstring);
}
}
body.concat("]");

var openTSDBUrl = "http://" + opentsdbIP + "/api/put"
try {
val httpClient = new DefaultHttpClient()
Expand Down

0 comments on commit af640d8

Please sign in to comment.