From 7343c967bc93650169c0fdf0a93475ba44d87954 Mon Sep 17 00:00:00 2001 From: Evan Langlais Date: Mon, 4 Mar 2019 17:44:10 -0500 Subject: [PATCH] Updating things --- .classpath | 516 +++++++++--------- .settings/org.eclipse.core.resources.prefs | 2 +- .../kso/{PPConsumer.scala => Consumer.scala} | 4 +- .../scala/com/powerpanel/kso/KafkaInput.scala | 8 +- .../com/powerpanel/kso/KafkaPipeline.scala | 7 +- .../kso/model/DataPlatformEvent.java | 79 +++ .../kso/model/DataPlatformEventDecoder.java | 90 +++ 7 files changed, 437 insertions(+), 269 deletions(-) rename src/main/scala/com/powerpanel/kso/{PPConsumer.scala => Consumer.scala} (88%) create mode 100644 src/main/scala/com/powerpanel/kso/model/DataPlatformEvent.java create mode 100644 src/main/scala/com/powerpanel/kso/model/DataPlatformEventDecoder.java diff --git a/.classpath b/.classpath index 9040be1..baa990f 100644 --- a/.classpath +++ b/.classpath @@ -1,610 +1,610 @@ - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - - + + - + - + - + - + - + - + - + - - + + - + - + - + - - - + + + - + - + - + - + - + - + - + - + - + - - - + + + - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - - - + + + - + - + - + - + - + - + - + - + - + - - - + + + - + - + - + - + - + - + - + - + - + - - - + + + - + - + - + - + - + - + - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - - - + + + - + - - + + - + - - + + - + - - + + - + - + - + - + - + - + - + - - - + + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - - + + - + - + - + - + - + - - + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + diff --git a/.settings/org.eclipse.core.resources.prefs b/.settings/org.eclipse.core.resources.prefs index 5f5f066..284c145 100644 --- a/.settings/org.eclipse.core.resources.prefs +++ b/.settings/org.eclipse.core.resources.prefs @@ -1,3 +1,3 @@ #Generated by sbteclipse -#Tue Feb 26 19:32:31 EST 2019 +#Mon Mar 04 17:38:21 EST 2019 encoding/=UTF-8 diff --git a/src/main/scala/com/powerpanel/kso/PPConsumer.scala b/src/main/scala/com/powerpanel/kso/Consumer.scala similarity index 88% rename from src/main/scala/com/powerpanel/kso/PPConsumer.scala rename to src/main/scala/com/powerpanel/kso/Consumer.scala index b6ac4ed..a2acadb 100644 --- a/src/main/scala/com/powerpanel/kso/PPConsumer.scala +++ b/src/main/scala/com/powerpanel/kso/Consumer.scala @@ -11,7 +11,7 @@ object PPConsumer { def main(args: Array[String]) { val props = AppConfig.loadProperties(); - val loggerUrl = props.getProperty("environment.metric_logger_url") + // val loggerUrl = props.getProperty("environment.metric_logger_url") val appName = props.getProperty("component.application") val checkpointDirectory = props.getProperty("app.checkpoint_path"); val batchSizeSeconds = Integer.parseInt(props.getProperty("app.batch_size_seconds")); @@ -28,7 +28,7 @@ object PPConsumer { logger.info("Starting spark streaming execution") logger.info("Logger url: " + loggerUrl) - ssc.addStreamingListener(new StatReporter(appName, loggerUrl)) + // ssc.addStreamingListener(new StatReporter(appName, loggerUrl)) ssc.start() ssc.awaitTermination() } diff --git a/src/main/scala/com/powerpanel/kso/KafkaInput.scala b/src/main/scala/com/powerpanel/kso/KafkaInput.scala index f406451..b25764a 100644 --- a/src/main/scala/com/powerpanel/kso/KafkaInput.scala +++ b/src/main/scala/com/powerpanel/kso/KafkaInput.scala @@ -21,10 +21,10 @@ class KafkaInput extends Serializable { // Decode avro container format val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc"); val rawMessages = messages.map(x => { - val eventDecoder = new DataPlatformEventDecoder(avroSchemaString); - val payload = x._2; - val dataPlatformEvent = eventDecoder.decode(payload); - dataPlatformEvent; + val eventDecoder = new DataPlatformEventDecoder(avroSchemaString); + val payload = x._2; + val dataPlatformEvent = eventDecoder.decode(payload); + dataPlatformEvent; }); rawMessages; }; diff --git a/src/main/scala/com/powerpanel/kso/KafkaPipeline.scala b/src/main/scala/com/powerpanel/kso/KafkaPipeline.scala index 79d84b2..716619d 100644 --- a/src/main/scala/com/powerpanel/kso/KafkaPipeline.scala +++ b/src/main/scala/com/powerpanel/kso/KafkaPipeline.scala @@ -11,16 +11,15 @@ import org.apache.log4j.Logger; class KafkaPipeline extends Serializable { object Holder extends Serializable { - @transient lazy val logger = Logger.getLogger(getClass.getName) + @transient lazy val logger = Logger.getLogger(getClass.getName) } def create() = { val parseMessages = (messagesDstream: DStream[DataPlatformEvent]) => { - val parsedMessages = messagesDstream.flatMap(dataPlatformEvent => { - val parsed = dataPlatformEvent.getRawdata(); - Some(parsed); + val parsed = dataPlatformEvent.getRawdata(); + Some(parsed); }); parsedMessages }: DStream[String]; diff --git a/src/main/scala/com/powerpanel/kso/model/DataPlatformEvent.java b/src/main/scala/com/powerpanel/kso/model/DataPlatformEvent.java new file mode 100644 index 0000000..a87e347 --- /dev/null +++ b/src/main/scala/com/powerpanel/kso/model/DataPlatformEvent.java @@ -0,0 +1,79 @@ +/** + * Name: DataPlatformEvent + * Purpose: Data model class for an avro event on Kafka + * Author: PNDA team + * + * Created: 07/04/2016 + */ + +package com.powerpanel.kso.model; + +import java.io.IOException; +import java.io.Serializable; + +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonProcessingException; +import org.codehaus.jackson.map.ObjectMapper; + +public class DataPlatformEvent implements Serializable { + private static final long serialVersionUID = 1L; + protected static ObjectMapper _mapper = new ObjectMapper(); + + private String _src; + private Long _timestamp; + private String _hostIp; + private String _rawdata; + + public DataPlatformEvent(String src, Long timestamp, String host_ip, String rawdata) { + _src = src; + _timestamp = timestamp; + _hostIp = host_ip; + _rawdata = rawdata; + } + + public String getSrc() { + return _src; + } + + public Long getTimestamp() { + return _timestamp; + } + + public String getHostIp() { + return _hostIp; + } + + public String getRawdata() { + return _rawdata; + } + + @Override + public String toString() { + try { + return _mapper.writeValueAsString(this); + } catch (Exception ex) { + return null; + } + } + + @Override + public boolean equals(Object other) { + boolean result = false; + if (other instanceof DataPlatformEvent) { + DataPlatformEvent that = (DataPlatformEvent) other; + result = (this.getSrc() == that.getSrc() || (this.getSrc() != null && this.getSrc().equals(that.getSrc()))) + && (this.getTimestamp() == that.getTimestamp() + || (this.getTimestamp() != null && this.getTimestamp().equals(that.getTimestamp()))) + && (this.getHostIp() == that.getHostIp() + || (this.getHostIp() != null && this.getHostIp().equals(that.getHostIp()))) + && (this.getRawdata() == that.getRawdata() + || (this.getRawdata() != null && this.getRawdata().equals(that.getRawdata()))); + } + return result; + + } + + public JsonNode RawdataAsJsonObj() throws JsonProcessingException, IOException { + return _mapper.readTree(_rawdata); + } +} \ No newline at end of file diff --git a/src/main/scala/com/powerpanel/kso/model/DataPlatformEventDecoder.java b/src/main/scala/com/powerpanel/kso/model/DataPlatformEventDecoder.java new file mode 100644 index 0000000..7d26f2d --- /dev/null +++ b/src/main/scala/com/powerpanel/kso/model/DataPlatformEventDecoder.java @@ -0,0 +1,90 @@ +/** + * Name: DataPlatformEventDecoder + * Purpose: Encodes and secodes binary event data from kafka to/from a DataPlatformEvent object + * Author: PNDA team + * + * Created: 07/04/2016 + */ + +package com.powerpanel.kso.model; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.log4j.Logger; + +public class DataPlatformEventDecoder +{ + private static final Logger LOGGER = Logger.getLogger(DataPlatformEventDecoder.class.getName()); + + private Schema.Parser _parser = new Schema.Parser(); + private DatumReader _reader; + private DatumWriter _writer; + private Schema _schema; + private String _schemaDef; + + public DataPlatformEventDecoder(String schemaDef) throws IOException + { + _schemaDef = schemaDef; + _schema = _parser.parse(schemaDef); + _reader = new GenericDatumReader(_schema); + _writer = new GenericDatumWriter(_schema); + } + + public DataPlatformEvent decode(byte[] data) throws IOException + { + + try + { + Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); + GenericRecord r = _reader.read(null, decoder); + return new DataPlatformEvent((String) r.get("src").toString(), + (Long) r.get("timestamp"), + (String) r.get("host_ip").toString(), + new String(((ByteBuffer)r.get("rawdata")).array())); + } + catch(Exception ex) + { + LOGGER.error("data:" + hexStr(data) + " schema: " + _schemaDef, ex); + throw new IOException(ex); + } + } + + final protected static char[] hexArray = "0123456789ABCDEF".toCharArray(); + public static String hexStr(byte[] bytes) { + char[] hexChars = new char[bytes.length * 2]; + for ( int j = 0; j < bytes.length; j++ ) { + int v = bytes[j] & 0xFF; + hexChars[j * 2] = hexArray[v >>> 4]; + hexChars[j * 2 + 1] = hexArray[v & 0x0F]; + } + return new String(hexChars); + } + + public byte[] encode(DataPlatformEvent e) throws IOException + { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + GenericRecord datum = new GenericData.Record(_schema); + datum.put("src", e.getSrc()); + datum.put("timestamp", e.getTimestamp()); + datum.put("host_ip", e.getHostIp()); + datum.put("rawdata", ByteBuffer.wrap(e.getRawdata().getBytes("UTF-8"))); + _writer.write(datum, encoder); + encoder.flush(); + out.close(); + return out.toByteArray(); + } +} \ No newline at end of file