diff --git a/assembly.sbt b/assembly.sbt index b345dae..2ada887 100644 --- a/assembly.sbt +++ b/assembly.sbt @@ -10,13 +10,10 @@ mergeStrategy in assembly := { case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last case PathList("com", "codahale", xs @ _*) => MergeStrategy.last case PathList("com", "yammer", xs @ _*) => MergeStrategy.last - case "about.html" => MergeStrategy.rename case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last case "META-INF/mailcap" => MergeStrategy.last case "META-INF/mimetypes.default" => MergeStrategy.last - case "plugin.properties" => MergeStrategy.last - case "log4j.properties" => MergeStrategy.last case x => val oldStrategy = (mergeStrategy in assembly).value oldStrategy(x) -} +} \ No newline at end of file diff --git a/build.sbt b/build.sbt index 0491e3c..e7b8f55 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ name := "PowerPanel Data Processor" -version := "0.0.2" +version := "0.9.0" scalaVersion := "2.11.12" diff --git a/src/main/scala/com/powerpanel/kso/Consumer.scala b/src/main/scala/com/powerpanel/kso/Consumer.scala index 38a2c32..52b1d23 100644 --- a/src/main/scala/com/powerpanel/kso/Consumer.scala +++ b/src/main/scala/com/powerpanel/kso/Consumer.scala @@ -11,7 +11,6 @@ object Consumer { def main(args: Array[String]) { val props = AppConfig.loadProperties(); - // val loggerUrl = props.getProperty("environment.metric_logger_url") val appName = props.getProperty("component.application") val checkpointDirectory = props.getProperty("app.checkpoint_path"); val batchSizeSeconds = Integer.parseInt(props.getProperty("app.batch_size_seconds")); @@ -27,8 +26,6 @@ object Consumer { } logger.info("Starting spark streaming execution") - // logger.info("Logger url: " + loggerUrl) - // ssc.addStreamingListener(new StatReporter(appName, loggerUrl)) ssc.start() ssc.awaitTermination() } diff --git a/src/main/scala/com/powerpanel/kso/KafkaInput.scala b/src/main/scala/com/powerpanel/kso/KafkaInput.scala index 5b8eb5e..fb19b7e 100644 --- a/src/main/scala/com/powerpanel/kso/KafkaInput.scala +++ b/src/main/scala/com/powerpanel/kso/KafkaInput.scala @@ -8,7 +8,6 @@ import org.apache.spark.streaming.kafka010.ConsumerStrategies import org.apache.spark.streaming.kafka010.LocationStrategies import org.apache.avro.specific.SpecificDatumReader import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput} -// import scala.collection.JavaConverters._ import org.apache.spark.streaming.StreamingContext import org.apache.avro.io.DecoderFactory class KafkaInput extends Serializable { @@ -16,7 +15,7 @@ class KafkaInput extends Serializable { val props = AppConfig.loadProperties(); val topicsSet = props.getProperty("kafka.topic").split(",").toSet; - val kafkaParams = collection.mutable.Map[String, String]("bootstrap.servers" -> props.getProperty("kafka.brokers")) + val kafkaParams = collection.mutable.Map[String, String]("bootstrap.servers" -> props.getProperty("kafka.brokers")) if (props.getProperty("kafka.consume_from_beginning").toBoolean) { kafkaParams.put("auto.offset.reset", "earliest"); @@ -31,27 +30,12 @@ class KafkaInput extends Serializable { ); //.repartition(Integer.parseInt(props.getProperty("app.processing_parallelism"))); - // Decode avro container format - // val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc"); val rawMessages = messages.map(x => { - // val eventDecoder = new DataPlatformEventDecoder(avroSchemaString); - // val list = dataFileReader.iterator().asScala.toList - // DataFileReader dataFileReader = new DataFileReader(file, userDatumReader); - // val dataPackageDecoder = DataPackage.getDecoder(); val payload = x.value(); - /* - val datumReader = new SpecificDatumReader[DataPackage](DataPackage.getClassSchema) - val inputStream = new SeekableByteArrayInput(payload) - val dataFileReader = new DataFileReader[DataPackage](inputStream, datumReader) - val dataPackage = dataFileReader.next()*/ val datumReader = new SpecificDatumReader[DataPackage](DataPackage.getClassSchema) val inputStream = new SeekableByteArrayInput(payload) val decoder = DecoderFactory.get.binaryDecoder(inputStream, null) - - // while (!decoder.isEnd) { - val dataPackage = datumReader.read(null, decoder) - //} - // val dataPackage = dataPackageDecoder.decode(payload); + val dataPackage = datumReader.read(null, decoder) dataPackage; }); rawMessages; diff --git a/src/main/scala/com/powerpanel/kso/KafkaPipeline.scala b/src/main/scala/com/powerpanel/kso/KafkaPipeline.scala index c959fd9..668e5df 100644 --- a/src/main/scala/com/powerpanel/kso/KafkaPipeline.scala +++ b/src/main/scala/com/powerpanel/kso/KafkaPipeline.scala @@ -15,15 +15,6 @@ class KafkaPipeline extends Serializable { } def create() = { - /* - val parseMessages = (messagesDstream: DStream[DataPackage]) => { - val parsedMessages = messagesDstream.flatMap(dataPackage => { - val parsed = dataPlatformEvent.getRawdata(); - Some(parsed); - }); - parsedMessages - }: DStream[String]; - */ val props = AppConfig.loadProperties(); val checkpointDirectory = props.getProperty("app.checkpoint_path"); @@ -38,7 +29,6 @@ class KafkaPipeline extends Serializable { } val dataStream = new KafkaInput().readFromKafka(ssc); - // val parsedStream = parseMessages(inputStream); val writeCounts: DStream[Integer] = new OpenTSDBOutput().putOpentsdb( diff --git a/src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala b/src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala index 512353e..10cb9e4 100644 --- a/src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala +++ b/src/main/scala/com/powerpanel/kso/OpenTSDBOutput.scala @@ -24,22 +24,6 @@ class OpenTSDBOutput extends Serializable { var count = 0; partition.foreach((generatorData: DataPackage) => { - /* - val json = parse(rowData.replace("'", "\"")) - val host = compact(render((json \\ "host"))).replace("\"", "") - val timestampStr = compact(render((json \\ "timestamp"))).replace("\"", "") - val value = (compact(render((json \\ "value"))).replace("\"", "")).toDouble - val collectd_type = compact(render((json \\ "collectd_type"))).replace("\"", "") - var metric:String = "kso.collectd" - metric = metric.concat("." + collectd_type) - val timestamp = new DateTime(timestampStr).getMillis - val body = f"""{ - | "metric": "$metric", - | "value": "$value", - | "timestamp": $timestamp, - | "tags": {"host": "$host"} - |}""".stripMargin - */ var body: String = "["; for(metricData <- generatorData.getData()) { for(dataPoint <- metricData.getDatapoints()) { @@ -60,7 +44,7 @@ class OpenTSDBOutput extends Serializable { } body = body.dropRight(1); body = body.concat("]"); - Holder.logger.info("Post body: " + body); + // Holder.logger.info("Post body: " + body); var openTSDBUrl = "http://" + opentsdbIP + "/api/put" try { val httpClient = new DefaultHttpClient() diff --git a/src/main/scala/com/powerpanel/kso/StatReporter.scala b/src/main/scala/com/powerpanel/kso/StatReporter.scala deleted file mode 100644 index e8d3871..0000000 --- a/src/main/scala/com/powerpanel/kso/StatReporter.scala +++ /dev/null @@ -1,53 +0,0 @@ -package com.powerpanel.kso - -import scala.util.control.NonFatal -import java.io.StringWriter -import java.io.PrintWriter -import org.apache.spark.streaming.scheduler.StreamingListener -import org.apache.spark.streaming.scheduler.StreamingListenerBatchCompleted -import org.apache.log4j.Logger -import org.apache.http.client.methods.HttpPost -import org.apache.http.entity.StringEntity -import org.apache.http.impl.client.DefaultHttpClient - -class StatReporter(appName: String, metricsUrl: String) extends StreamingListener { - - private[this] val logger = Logger.getLogger(getClass().getName()) - - override def onBatchCompleted(batch: StreamingListenerBatchCompleted) = { - def doSend(metricName: String, metricValue: String) = { - try { - val httpClient = new DefaultHttpClient() - val post = new HttpPost(metricsUrl) - post.setHeader("Content-type", "application/json") - val ts = java.lang.System.currentTimeMillis() - val body = f"""{ - | "data": [{ - | "source": "application.$appName", - | "metric": "application.kpi.$appName.$metricName", - | "value": "$metricValue", - | "timestamp": $ts%d - | }], - | "timestamp": $ts%d - |}""".stripMargin - - logger.debug(body) - post.setEntity(new StringEntity(body)) - val response = httpClient.execute(post) - if (response.getStatusLine.getStatusCode() != 200) { - logger.error("POST failed: " + metricsUrl + " response:" + response.getStatusLine.getStatusCode()) - } - - } catch { - case NonFatal(t) => { - logger.error("POST failed: " + metricsUrl) - val sw = new StringWriter - t.printStackTrace(new PrintWriter(sw)) - logger.error(sw.toString) - } - } - } - doSend("processing-delay", batch.batchInfo.processingDelay.get.toString()) - doSend("scheduling-delay", batch.batchInfo.schedulingDelay.get.toString()) - } -} diff --git a/src/main/scala/com/powerpanel/kso/model/DataPlatformEvent.java-bak b/src/main/scala/com/powerpanel/kso/model/DataPlatformEvent.java-bak deleted file mode 100644 index a87e347..0000000 --- a/src/main/scala/com/powerpanel/kso/model/DataPlatformEvent.java-bak +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Name: DataPlatformEvent - * Purpose: Data model class for an avro event on Kafka - * Author: PNDA team - * - * Created: 07/04/2016 - */ - -package com.powerpanel.kso.model; - -import java.io.IOException; -import java.io.Serializable; - -import org.codehaus.jackson.JsonNode; -import org.codehaus.jackson.JsonProcessingException; -import org.codehaus.jackson.map.ObjectMapper; - -public class DataPlatformEvent implements Serializable { - private static final long serialVersionUID = 1L; - protected static ObjectMapper _mapper = new ObjectMapper(); - - private String _src; - private Long _timestamp; - private String _hostIp; - private String _rawdata; - - public DataPlatformEvent(String src, Long timestamp, String host_ip, String rawdata) { - _src = src; - _timestamp = timestamp; - _hostIp = host_ip; - _rawdata = rawdata; - } - - public String getSrc() { - return _src; - } - - public Long getTimestamp() { - return _timestamp; - } - - public String getHostIp() { - return _hostIp; - } - - public String getRawdata() { - return _rawdata; - } - - @Override - public String toString() { - try { - return _mapper.writeValueAsString(this); - } catch (Exception ex) { - return null; - } - } - - @Override - public boolean equals(Object other) { - boolean result = false; - if (other instanceof DataPlatformEvent) { - DataPlatformEvent that = (DataPlatformEvent) other; - result = (this.getSrc() == that.getSrc() || (this.getSrc() != null && this.getSrc().equals(that.getSrc()))) - && (this.getTimestamp() == that.getTimestamp() - || (this.getTimestamp() != null && this.getTimestamp().equals(that.getTimestamp()))) - && (this.getHostIp() == that.getHostIp() - || (this.getHostIp() != null && this.getHostIp().equals(that.getHostIp()))) - && (this.getRawdata() == that.getRawdata() - || (this.getRawdata() != null && this.getRawdata().equals(that.getRawdata()))); - } - return result; - - } - - public JsonNode RawdataAsJsonObj() throws JsonProcessingException, IOException { - return _mapper.readTree(_rawdata); - } -} \ No newline at end of file diff --git a/src/main/scala/com/powerpanel/kso/model/DataPlatformEventDecoder.java-bak b/src/main/scala/com/powerpanel/kso/model/DataPlatformEventDecoder.java-bak deleted file mode 100644 index 2e99169..0000000 --- a/src/main/scala/com/powerpanel/kso/model/DataPlatformEventDecoder.java-bak +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Name: DataPlatformEventDecoder - * Purpose: Encodes and secodes binary event data from kafka to/from a DataPlatformEvent object - * Author: PNDA team - * - * Created: 07/04/2016 - */ - -package com.powerpanel.kso.model; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.BinaryEncoder; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.Decoder; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.EncoderFactory; -import org.apache.log4j.Logger; - -public class DataPlatformEventDecoder -{ - private static final Logger LOGGER = Logger.getLogger(DataPlatformEventDecoder.class.getName()); - - private Schema.Parser _parser = new Schema.Parser(); - private DatumReader _reader; - private DatumWriter _writer; - private Schema _schema; - private String _schemaDef; - - public DataPlatformEventDecoder(String schemaDef) throws IOException - { - _schemaDef = schemaDef; - _schema = _parser.parse(schemaDef); - _reader = new GenericDatumReader(_schema); - _writer = new GenericDatumWriter(_schema); - } - - public DataPlatformEvent decode(byte[] data) throws IOException - { - try - { - Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); - GenericRecord r = _reader.read(null, decoder); - return new DataPlatformEvent((String) r.get("src").toString(), - (Long) r.get("timestamp"), - (String) r.get("host_ip").toString(), - new String(((ByteBuffer)r.get("rawdata")).array())); - } - catch(Exception ex) - { - LOGGER.error("data:" + hexStr(data) + " schema: " + _schemaDef, ex); - throw new IOException(ex); - } - } - - final protected static char[] hexArray = "0123456789ABCDEF".toCharArray(); - public static String hexStr(byte[] bytes) { - char[] hexChars = new char[bytes.length * 2]; - for ( int j = 0; j < bytes.length; j++ ) { - int v = bytes[j] & 0xFF; - hexChars[j * 2] = hexArray[v >>> 4]; - hexChars[j * 2 + 1] = hexArray[v & 0x0F]; - } - return new String(hexChars); - } - - public byte[] encode(DataPlatformEvent e) throws IOException - { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); - GenericRecord datum = new GenericData.Record(_schema); - datum.put("src", e.getSrc()); - datum.put("timestamp", e.getTimestamp()); - datum.put("host_ip", e.getHostIp()); - datum.put("rawdata", ByteBuffer.wrap(e.getRawdata().getBytes("UTF-8"))); - _writer.write(datum, encoder); - encoder.flush(); - out.close(); - return out.toByteArray(); - } -} \ No newline at end of file diff --git a/src/main/scala/com/powerpanel/kso/model/StaticHelpers.java b/src/main/scala/com/powerpanel/kso/model/StaticHelpers.java deleted file mode 100644 index 4533d86..0000000 --- a/src/main/scala/com/powerpanel/kso/model/StaticHelpers.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.powerpanel.kso.model; - -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; - -public class StaticHelpers { - public static String loadResourceFile(String path) - { - InputStream is = StaticHelpers.class.getClassLoader().getResourceAsStream(path); - try - { - char[] buf = new char[2048]; - Reader r = new InputStreamReader(is, "UTF-8"); - StringBuilder s = new StringBuilder(); - while (true) - { - int n = r.read(buf); - if (n < 0) - break; - s.append(buf, 0, n); - } - return s.toString(); - } - catch (Exception e) - { - return null; - } - } -} diff --git a/src/universal/powerpanel/application.properties b/src/universal/powerpanel/application.properties deleted file mode 100644 index aabe4b4..0000000 --- a/src/universal/powerpanel/application.properties +++ /dev/null @@ -1,9 +0,0 @@ -kafka.brokers=${environment_kafka_brokers} -kafka.zookeeper=${environment_kafka_zookeeper} -app.checkpoint_path=${component_checkpoint_path} -app.job_name=${component_job_name} -app.processing_parallelism=${component_processing_parallelism} -app.batch_size_seconds=${component_batch_size_seconds} -kafka.topic=${component_input_topic} -kafka.consume_from_beginning=${component_consume_from_beginning} -opentsdb.ip=${environment_opentsdb} diff --git a/src/universal/powerpanel/log4j.properties b/src/universal/powerpanel/log4j.properties deleted file mode 100644 index e0b077e..0000000 --- a/src/universal/powerpanel/log4j.properties +++ /dev/null @@ -1,11 +0,0 @@ -log4j.rootLogger=ERROR,rolling -log4j.logger.com.cisco.pnda=${component_log_level},rolling -log4j.additivity.com.cisco.pnda=false - -log4j.appender.rolling=org.apache.log4j.RollingFileAppender -log4j.appender.rolling.layout=org.apache.log4j.PatternLayout -log4j.appender.rolling.layout.conversionPattern=[%d] %p %m (%c)%n -log4j.appender.rolling.maxFileSize=50MB -log4j.appender.rolling.maxBackupIndex=1 -log4j.appender.rolling.file=${spark.yarn.app.container.log.dir}/spark.log -log4j.appender.rolling.encoding=UTF-8 diff --git a/src/universal/powerpanel/opentsdb.json b/src/universal/powerpanel/opentsdb.json deleted file mode 100644 index c81e1bc..0000000 --- a/src/universal/powerpanel/opentsdb.json +++ /dev/null @@ -1,131 +0,0 @@ -[ - { - "name": "l1.l2.voltage" - }, - { - "name": "l2.l3.voltage" - }, - { - "name": "l3.l1.voltage" - }, - { - "name": "l1.l0.voltage" - }, - { - "name": "l2.l0.voltage" - }, - { - "name": "l3.l0.voltage" - }, - { - "name": "l1.current" - }, - { - "name": "l2.current" - }, - { - "name": "l3.current" - }, - { - "name": "frequency" - }, - { - "name": "total.kw" - }, - { - "name": "rate.kw" - }, - { - "name": "total.pf" - }, - { - "name": "l1.kw" - }, - { - "name": "l1.pf" - }, - { - "name": "l2.kw" - }, - { - "name": "l2.pf" - }, - { - "name": "l3.kw" - }, - { - "name": "l3.pf" - }, - { - "name": "total.kvar" - }, - { - "name": "l1.kvar" - }, - { - "name": "l2.kvar" - }, - { - "name": "l3.kvar" - }, - { - "name": "total.kva" - }, - { - "name": "l1.kva" - }, - { - "name": "l2.kva" - }, - { - "name": "l3.kva" - }, - { - "name": "oil.pressure" - }, - { - "name": "coolant.temp" - }, - { - "name": "engine.rpm" - }, - { - "name": "battery.voltage" - }, - { - "name": "fuel.pressure" - }, - { - "name": "fuel.temp" - }, - { - "name": "fuel.rate" - }, - { - "name": "coolant.pressure" - }, - { - "name": "coolant.level" - }, - { - "name": "oil.temp" - }, - { - "name": "oil.level" - }, - { - "name": "crankcase.pressure" - }, - { - "name": "ambient.temp" - }, - { - "name": "ecm.battery.voltage" - }, - { - "name": "intake.temp" - }, - { - "name": "intake.pressure" - } -] diff --git a/src/universal/powerpanel/properties.json b/src/universal/powerpanel/properties.json deleted file mode 100644 index 55cd1bc..0000000 --- a/src/universal/powerpanel/properties.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "main_class":"com.powerpanel.kso.Consumer", - "main_jar": "PowerPanelKSO.jar", - "log_level":"INFO", - "batch_size_seconds" : "2", - "processing_parallelism" : "1", - "checkpoint_path":"", - "input_topic": "GeneratorData", - "consume_from_beginning":"false", - "spark_submit_args": "" -}