Skip to content
Permalink
af640d86a0
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
79 lines (71 sloc) 2.96 KB
package com.powerpanel.kso;
import org.joda.time.DateTime
import scala.util.control.NonFatal
import com.powerpanel.kso.model._;
import org.apache.log4j.Logger
import org.apache.spark.streaming.dstream.DStream
import org.apache.http.client.methods.HttpPost
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.DefaultHttpClient
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.JsonDSL._
import scala.collection.JavaConversions._
class OpenTSDBOutput extends Serializable {
def putOpentsdb[T](opentsdbIP: String, stream: DStream[DataPackage]) = {
stream.mapPartitions(partition => {
var count = 0;
partition.foreach((generatorData: DataPackage) =>
{
/*
val json = parse(rowData.replace("'", "\""))
val host = compact(render((json \\ "host"))).replace("\"", "")
val timestampStr = compact(render((json \\ "timestamp"))).replace("\"", "")
val value = (compact(render((json \\ "value"))).replace("\"", "")).toDouble
val collectd_type = compact(render((json \\ "collectd_type"))).replace("\"", "")
var metric:String = "kso.collectd"
metric = metric.concat("." + collectd_type)
val timestamp = new DateTime(timestampStr).getMillis
val body = f"""{
| "metric": "$metric",
| "value": "$value",
| "timestamp": $timestamp,
| "tags": {"host": "$host"}
|}""".stripMargin
*/
var body: String = "[";
for(metricData <- generatorData.getData()) {
for(dataPoint <- metricData.getDatapoints()) {
val metric = metricData.getMetric();
val value = dataPoint.getValue();
val timestamp = dataPoint.getTimestamp();
val generator_id = generatorData.getGenerator();
val region = generatorData.getRegion();
val state = generatorData.getState;
val pstring = f"""{
| "metric": "$metric",
| "value": "$value",
| "timestamp": $timestamp,
| "tags": {"generator": $generator_id, "state": "$state", "region": "$region"}
|},""";
body.concat(pstring);
}
}
body.concat("]");
var openTSDBUrl = "http://" + opentsdbIP + "/api/put"
try {
val httpClient = new DefaultHttpClient()
val post = new HttpPost(openTSDBUrl)
post.setHeader("Content-type", "application/json")
post.setEntity(new StringEntity(body))
httpClient.execute(post)
} catch {
case NonFatal(t) => {
}
}
count += 1
});
Iterator[Integer](count)
});
}
}