Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Cleaning up project
  • Loading branch information
Evan Langlais committed Mar 7, 2019
1 parent ee8c7e4 commit 7839968
Show file tree
Hide file tree
Showing 14 changed files with 5 additions and 466 deletions.
5 changes: 1 addition & 4 deletions assembly.sbt
Expand Up @@ -10,13 +10,10 @@ mergeStrategy in assembly := {
case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last
case PathList("com", "codahale", xs @ _*) => MergeStrategy.last
case PathList("com", "yammer", xs @ _*) => MergeStrategy.last
case "about.html" => MergeStrategy.rename
case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last
case "META-INF/mailcap" => MergeStrategy.last
case "META-INF/mimetypes.default" => MergeStrategy.last
case "plugin.properties" => MergeStrategy.last
case "log4j.properties" => MergeStrategy.last
case x =>
val oldStrategy = (mergeStrategy in assembly).value
oldStrategy(x)
}
}
2 changes: 1 addition & 1 deletion build.sbt
@@ -1,6 +1,6 @@
name := "PowerPanel Data Processor"

version := "0.0.2"
version := "0.9.0"

scalaVersion := "2.11.12"

Expand Down
3 changes: 0 additions & 3 deletions src/main/scala/com/powerpanel/kso/Consumer.scala
Expand Up @@ -11,7 +11,6 @@ object Consumer {
def main(args: Array[String]) {

val props = AppConfig.loadProperties();
// val loggerUrl = props.getProperty("environment.metric_logger_url")
val appName = props.getProperty("component.application")
val checkpointDirectory = props.getProperty("app.checkpoint_path");
val batchSizeSeconds = Integer.parseInt(props.getProperty("app.batch_size_seconds"));
Expand All @@ -27,8 +26,6 @@ object Consumer {
}

logger.info("Starting spark streaming execution")
// logger.info("Logger url: " + loggerUrl)
// ssc.addStreamingListener(new StatReporter(appName, loggerUrl))
ssc.start()
ssc.awaitTermination()
}
Expand Down
20 changes: 2 additions & 18 deletions src/main/scala/com/powerpanel/kso/KafkaInput.scala
Expand Up @@ -8,15 +8,14 @@ import org.apache.spark.streaming.kafka010.ConsumerStrategies
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput}
// import scala.collection.JavaConverters._
import org.apache.spark.streaming.StreamingContext
import org.apache.avro.io.DecoderFactory
class KafkaInput extends Serializable {
def readFromKafka (ssc: StreamingContext) = {
val props = AppConfig.loadProperties();
val topicsSet = props.getProperty("kafka.topic").split(",").toSet;

val kafkaParams = collection.mutable.Map[String, String]("bootstrap.servers" -> props.getProperty("kafka.brokers"))
val kafkaParams = collection.mutable.Map[String, String]("bootstrap.servers" -> props.getProperty("kafka.brokers"))
if (props.getProperty("kafka.consume_from_beginning").toBoolean)
{
kafkaParams.put("auto.offset.reset", "earliest");
Expand All @@ -31,27 +30,12 @@ class KafkaInput extends Serializable {
);
//.repartition(Integer.parseInt(props.getProperty("app.processing_parallelism")));

// Decode avro container format
// val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc");
val rawMessages = messages.map(x => {
// val eventDecoder = new DataPlatformEventDecoder(avroSchemaString);
// val list = dataFileReader.iterator().asScala.toList
// DataFileReader<User> dataFileReader = new DataFileReader<User>(file, userDatumReader);
// val dataPackageDecoder = DataPackage.getDecoder();
val payload = x.value();
/*
val datumReader = new SpecificDatumReader[DataPackage](DataPackage.getClassSchema)
val inputStream = new SeekableByteArrayInput(payload)
val dataFileReader = new DataFileReader[DataPackage](inputStream, datumReader)
val dataPackage = dataFileReader.next()*/
val datumReader = new SpecificDatumReader[DataPackage](DataPackage.getClassSchema)
val inputStream = new SeekableByteArrayInput(payload)
val decoder = DecoderFactory.get.binaryDecoder(inputStream, null)

// while (!decoder.isEnd) {
val dataPackage = datumReader.read(null, decoder)
//}
// val dataPackage = dataPackageDecoder.decode(payload);
val dataPackage = datumReader.read(null, decoder)
dataPackage;
});
rawMessages;
Expand Down
10 changes: 0 additions & 10 deletions src/main/scala/com/powerpanel/kso/KafkaPipeline.scala
Expand Up @@ -15,15 +15,6 @@ class KafkaPipeline extends Serializable {
}

def create() = {
/*
val parseMessages = (messagesDstream: DStream[DataPackage]) => {
val parsedMessages = messagesDstream.flatMap(dataPackage => {
val parsed = dataPlatformEvent.getRawdata();
Some(parsed);
});
parsedMessages
}: DStream[String];
*/

val props = AppConfig.loadProperties();
val checkpointDirectory = props.getProperty("app.checkpoint_path");
Expand All @@ -38,7 +29,6 @@ class KafkaPipeline extends Serializable {
}

val dataStream = new KafkaInput().readFromKafka(ssc);
// val parsedStream = parseMessages(inputStream);
val writeCounts: DStream[Integer] =

new OpenTSDBOutput().putOpentsdb(
Expand Down
18 changes: 1 addition & 17 deletions src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala
Expand Up @@ -24,22 +24,6 @@ class OpenTSDBOutput extends Serializable {
var count = 0;
partition.foreach((generatorData: DataPackage) =>
{
/*
val json = parse(rowData.replace("'", "\""))
val host = compact(render((json \\ "host"))).replace("\"", "")
val timestampStr = compact(render((json \\ "timestamp"))).replace("\"", "")
val value = (compact(render((json \\ "value"))).replace("\"", "")).toDouble
val collectd_type = compact(render((json \\ "collectd_type"))).replace("\"", "")
var metric:String = "kso.collectd"
metric = metric.concat("." + collectd_type)
val timestamp = new DateTime(timestampStr).getMillis
val body = f"""{
| "metric": "$metric",
| "value": "$value",
| "timestamp": $timestamp,
| "tags": {"host": "$host"}
|}""".stripMargin
*/
var body: String = "[";
for(metricData <- generatorData.getData()) {
for(dataPoint <- metricData.getDatapoints()) {
Expand All @@ -60,7 +44,7 @@ class OpenTSDBOutput extends Serializable {
}
body = body.dropRight(1);
body = body.concat("]");
Holder.logger.info("Post body: " + body);
// Holder.logger.info("Post body: " + body);
var openTSDBUrl = "http://" + opentsdbIP + "/api/put"
try {
val httpClient = new DefaultHttpClient()
Expand Down
53 changes: 0 additions & 53 deletions src/main/scala/com/powerpanel/kso/StatReporter.scala

This file was deleted.

79 changes: 0 additions & 79 deletions src/main/scala/com/powerpanel/kso/model/DataPlatformEvent.java-bak

This file was deleted.

This file was deleted.

30 changes: 0 additions & 30 deletions src/main/scala/com/powerpanel/kso/model/StaticHelpers.java

This file was deleted.

0 comments on commit 7839968

Please sign in to comment.