Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Updating things
  • Loading branch information
Evan Langlais committed Mar 4, 2019
1 parent 2094542 commit 7343c96
Show file tree
Hide file tree
Showing 7 changed files with 437 additions and 269 deletions.
516 changes: 258 additions & 258 deletions .classpath

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion .settings/org.eclipse.core.resources.prefs
@@ -1,3 +1,3 @@
#Generated by sbteclipse
#Tue Feb 26 19:32:31 EST 2019
#Mon Mar 04 17:38:21 EST 2019
encoding/<project>=UTF-8
Expand Up @@ -11,7 +11,7 @@ object PPConsumer {
def main(args: Array[String]) {

val props = AppConfig.loadProperties();
val loggerUrl = props.getProperty("environment.metric_logger_url")
// val loggerUrl = props.getProperty("environment.metric_logger_url")
val appName = props.getProperty("component.application")
val checkpointDirectory = props.getProperty("app.checkpoint_path");
val batchSizeSeconds = Integer.parseInt(props.getProperty("app.batch_size_seconds"));
Expand All @@ -28,7 +28,7 @@ object PPConsumer {

logger.info("Starting spark streaming execution")
logger.info("Logger url: " + loggerUrl)
ssc.addStreamingListener(new StatReporter(appName, loggerUrl))
// ssc.addStreamingListener(new StatReporter(appName, loggerUrl))
ssc.start()
ssc.awaitTermination()
}
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/com/powerpanel/kso/KafkaInput.scala
Expand Up @@ -21,10 +21,10 @@ class KafkaInput extends Serializable {
// Decode avro container format
val avroSchemaString = StaticHelpers.loadResourceFile("dataplatform-raw.avsc");
val rawMessages = messages.map(x => {
val eventDecoder = new DataPlatformEventDecoder(avroSchemaString);
val payload = x._2;
val dataPlatformEvent = eventDecoder.decode(payload);
dataPlatformEvent;
val eventDecoder = new DataPlatformEventDecoder(avroSchemaString);
val payload = x._2;
val dataPlatformEvent = eventDecoder.decode(payload);
dataPlatformEvent;
});
rawMessages;
};
Expand Down
7 changes: 3 additions & 4 deletions src/main/scala/com/powerpanel/kso/KafkaPipeline.scala
Expand Up @@ -11,16 +11,15 @@ import org.apache.log4j.Logger;
class KafkaPipeline extends Serializable {

object Holder extends Serializable {
@transient lazy val logger = Logger.getLogger(getClass.getName)
@transient lazy val logger = Logger.getLogger(getClass.getName)
}

def create() = {

val parseMessages = (messagesDstream: DStream[DataPlatformEvent]) => {

val parsedMessages = messagesDstream.flatMap(dataPlatformEvent => {
val parsed = dataPlatformEvent.getRawdata();
Some(parsed);
val parsed = dataPlatformEvent.getRawdata();
Some(parsed);
});
parsedMessages
}: DStream[String];
Expand Down
79 changes: 79 additions & 0 deletions src/main/scala/com/powerpanel/kso/model/DataPlatformEvent.java
@@ -0,0 +1,79 @@
/**
* Name: DataPlatformEvent
* Purpose: Data model class for an avro event on Kafka
* Author: PNDA team
*
* Created: 07/04/2016
*/

package com.powerpanel.kso.model;

import java.io.IOException;
import java.io.Serializable;

import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.map.ObjectMapper;

public class DataPlatformEvent implements Serializable {
private static final long serialVersionUID = 1L;
protected static ObjectMapper _mapper = new ObjectMapper();

private String _src;
private Long _timestamp;
private String _hostIp;
private String _rawdata;

public DataPlatformEvent(String src, Long timestamp, String host_ip, String rawdata) {
_src = src;
_timestamp = timestamp;
_hostIp = host_ip;
_rawdata = rawdata;
}

public String getSrc() {
return _src;
}

public Long getTimestamp() {
return _timestamp;
}

public String getHostIp() {
return _hostIp;
}

public String getRawdata() {
return _rawdata;
}

@Override
public String toString() {
try {
return _mapper.writeValueAsString(this);
} catch (Exception ex) {
return null;
}
}

@Override
public boolean equals(Object other) {
boolean result = false;
if (other instanceof DataPlatformEvent) {
DataPlatformEvent that = (DataPlatformEvent) other;
result = (this.getSrc() == that.getSrc() || (this.getSrc() != null && this.getSrc().equals(that.getSrc())))
&& (this.getTimestamp() == that.getTimestamp()
|| (this.getTimestamp() != null && this.getTimestamp().equals(that.getTimestamp())))
&& (this.getHostIp() == that.getHostIp()
|| (this.getHostIp() != null && this.getHostIp().equals(that.getHostIp())))
&& (this.getRawdata() == that.getRawdata()
|| (this.getRawdata() != null && this.getRawdata().equals(that.getRawdata())));
}
return result;

}

public JsonNode RawdataAsJsonObj() throws JsonProcessingException, IOException {
return _mapper.readTree(_rawdata);
}
}
@@ -0,0 +1,90 @@
/**
* Name: DataPlatformEventDecoder
* Purpose: Encodes and secodes binary event data from kafka to/from a DataPlatformEvent object
* Author: PNDA team
*
* Created: 07/04/2016
*/

package com.powerpanel.kso.model;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.log4j.Logger;

public class DataPlatformEventDecoder
{
private static final Logger LOGGER = Logger.getLogger(DataPlatformEventDecoder.class.getName());

private Schema.Parser _parser = new Schema.Parser();
private DatumReader<GenericRecord> _reader;
private DatumWriter<GenericRecord> _writer;
private Schema _schema;
private String _schemaDef;

public DataPlatformEventDecoder(String schemaDef) throws IOException
{
_schemaDef = schemaDef;
_schema = _parser.parse(schemaDef);
_reader = new GenericDatumReader<GenericRecord>(_schema);
_writer = new GenericDatumWriter<GenericRecord>(_schema);
}

public DataPlatformEvent decode(byte[] data) throws IOException
{

try
{
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
GenericRecord r = _reader.read(null, decoder);
return new DataPlatformEvent((String) r.get("src").toString(),
(Long) r.get("timestamp"),
(String) r.get("host_ip").toString(),
new String(((ByteBuffer)r.get("rawdata")).array()));
}
catch(Exception ex)
{
LOGGER.error("data:" + hexStr(data) + " schema: " + _schemaDef, ex);
throw new IOException(ex);
}
}

final protected static char[] hexArray = "0123456789ABCDEF".toCharArray();
public static String hexStr(byte[] bytes) {
char[] hexChars = new char[bytes.length * 2];
for ( int j = 0; j < bytes.length; j++ ) {
int v = bytes[j] & 0xFF;
hexChars[j * 2] = hexArray[v >>> 4];
hexChars[j * 2 + 1] = hexArray[v & 0x0F];
}
return new String(hexChars);
}

public byte[] encode(DataPlatformEvent e) throws IOException
{
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
GenericRecord datum = new GenericData.Record(_schema);
datum.put("src", e.getSrc());
datum.put("timestamp", e.getTimestamp());
datum.put("host_ip", e.getHostIp());
datum.put("rawdata", ByteBuffer.wrap(e.getRawdata().getBytes("UTF-8")));
_writer.write(datum, encoder);
encoder.flush();
out.close();
return out.toByteArray();
}
}

0 comments on commit 7343c96

Please sign in to comment.