Refactor influxdb and vertica classes and code

Clean up code for cleaner separation.

Change-Id: I30b7d404713eba9ae2ee64be1e4dfbecd8f6c1d2
This commit is contained in:
Deklan Dieterly 2015-04-03 19:19:13 -06:00
parent 0f233cd76b
commit 16daf95a1d
26 changed files with 943 additions and 754 deletions

View File

@ -51,15 +51,15 @@ import monasca.persister.pipeline.event.AlarmStateTransitionedEventHandlerFactor
import monasca.persister.pipeline.event.MetricHandler;
import monasca.persister.pipeline.event.MetricHandlerFactory;
import monasca.persister.repository.AlarmRepo;
import monasca.persister.repository.InfluxV8AlarmRepo;
import monasca.persister.repository.InfluxV8MetricRepo;
import monasca.persister.repository.InfluxV8RepoWriter;
import monasca.persister.repository.InfluxV9AlarmRepo;
import monasca.persister.repository.InfluxV9MetricRepo;
import monasca.persister.repository.InfluxV9RepoWriter;
import monasca.persister.repository.MetricRepo;
import monasca.persister.repository.VerticaAlarmRepo;
import monasca.persister.repository.VerticaMetricRepo;
import monasca.persister.repository.influxdb.InfluxV8AlarmRepo;
import monasca.persister.repository.influxdb.InfluxV8MetricRepo;
import monasca.persister.repository.influxdb.InfluxV8RepoWriter;
import monasca.persister.repository.influxdb.InfluxV9AlarmRepo;
import monasca.persister.repository.influxdb.InfluxV9MetricRepo;
import monasca.persister.repository.influxdb.InfluxV9RepoWriter;
import monasca.persister.repository.vertica.VerticaAlarmRepo;
import monasca.persister.repository.vertica.VerticaMetricRepo;
public class PersisterModule extends AbstractModule {

View File

@ -45,8 +45,7 @@ public abstract class KafkaConsumer<T> {
public void start() {
executorService = Executors.newFixedThreadPool(1);
KafkaConsumerRunnableBasic<T> kafkaConsumerRunnableBasic =
createRunnable(kafkaChannel, this.threadNum);
kafkaConsumerRunnableBasic = createRunnable(kafkaChannel, this.threadNum);
executorService.submit(kafkaConsumerRunnableBasic);
}

View File

@ -20,6 +20,6 @@ package monasca.persister.consumer;
import monasca.persister.pipeline.MetricPipeline;
public interface KafkaMetricsConsumerFactory {
public KafkaMetricsConsumer create(KafkaChannel kafkaChannel, int threadNum,
MetricPipeline pipeline);
KafkaMetricsConsumer create(KafkaChannel kafkaChannel, int threadNum,
MetricPipeline pipeline);
}

View File

@ -22,154 +22,51 @@ import com.google.inject.assistedinject.Assisted;
import com.codahale.metrics.Counter;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.TimeZone;
import java.util.TreeMap;
import io.dropwizard.setup.Environment;
import monasca.common.model.metric.Metric;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.configuration.PipelineConfig;
import monasca.persister.repository.MetricRepo;
import monasca.persister.repository.Sha1HashId;
import static monasca.persister.repository.VerticaMetricsConstants.MAX_COLUMN_LENGTH;
public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
private static final Logger logger = LoggerFactory.getLogger(MetricHandler.class);
private static final String TENANT_ID = "tenantId";
private static final String REGION = "region";
private final int ordinal;
private final SimpleDateFormat simpleDateFormat;
private final MetricRepo metricRepo;
private final Counter metricCounter;
private final Counter definitionCounter;
private final Counter dimensionCounter;
private final Counter definitionDimensionsCounter;
@Inject
public MetricHandler(MetricRepo metricRepo,
@Assisted PipelineConfig configuration, Environment environment,
@Assisted("ordinal") int ordinal, @Assisted("batchSize") int batchSize) {
public MetricHandler(MetricRepo metricRepo, @Assisted PipelineConfig configuration,
Environment environment, @Assisted("ordinal") int ordinal,
@Assisted("batchSize") int batchSize) {
super(configuration, environment, ordinal, batchSize, MetricHandler.class.getName());
final String handlerName = String.format("%s[%d]", MetricHandler.class.getName(), ordinal);
this.metricRepo = metricRepo;
final String handlerName = String.format("%s[%d]", MetricHandler.class.getName(), ordinal);
this.metricCounter =
environment.metrics().counter(handlerName + "." + "metrics-added-to-batch-counter");
this.definitionCounter =
environment.metrics()
.counter(handlerName + "." + "metric-definitions-added-to-batch-counter");
this.dimensionCounter =
environment.metrics()
.counter(handlerName + "." + "metric-dimensions-added-to-batch-counter");
this.definitionDimensionsCounter =
environment.metrics()
.counter(handlerName + "." + "metric-definition-dimensions-added-to-batch-counter");
this.ordinal = ordinal;
simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
simpleDateFormat.setTimeZone(TimeZone.getTimeZone("GMT-0"));
}
@Override
public int process(MetricEnvelope[] metricEnvelopes) throws Exception {
int metricCount = 0;
for (final MetricEnvelope metricEnvelope : metricEnvelopes) {
metricCount += processEnvelope(metricEnvelope);
processEnvelope(metricEnvelope);
}
return metricCount;
return metricEnvelopes.length;
}
private int processEnvelope(MetricEnvelope metricEnvelope) {
int metricCount = 0;
Metric metric = metricEnvelope.metric;
Map<String, Object> meta = metricEnvelope.meta;
private void processEnvelope(MetricEnvelope metricEnvelope) {
logger.debug("ordinal: {}", ordinal);
logger.debug("metric: {}", metric);
logger.debug("meta: {}", meta);
this.metricRepo.addToBatch(metricEnvelope);
String tenantId = "";
if (meta.containsKey(TENANT_ID)) {
tenantId = (String) meta.get(TENANT_ID);
} else {
logger.warn(
"Failed to find tenantId in message envelope meta data. Metric message may be malformed. Setting tenantId to empty string.");
logger.warn("metric: {}", metric.toString());
logger.warn("meta: {}", meta.toString());
}
String region = "";
if (meta.containsKey(REGION)) {
region = (String) meta.get(REGION);
} else {
logger.warn(
"Failed to find region in message envelope meta data. Metric message may be malformed. Setting region to empty string.");
logger.warn("metric: {}", metric.toString());
logger.warn("meta: {}", meta.toString());
}
// Add the definition to the batch.
StringBuilder
definitionIdStringToHash =
new StringBuilder(trunc(metric.getName(), MAX_COLUMN_LENGTH));
definitionIdStringToHash.append(trunc(tenantId, MAX_COLUMN_LENGTH));
definitionIdStringToHash.append(trunc(region, MAX_COLUMN_LENGTH));
byte[] definitionIdSha1Hash = DigestUtils.sha(definitionIdStringToHash.toString());
Sha1HashId definitionSha1HashId = new Sha1HashId((definitionIdSha1Hash));
metricRepo
.addDefinitionToBatch(definitionSha1HashId, trunc(metric.getName(), MAX_COLUMN_LENGTH),
trunc(tenantId, MAX_COLUMN_LENGTH), trunc(region, MAX_COLUMN_LENGTH));
definitionCounter.inc();
// Calculate dimensions sha1 hash id.
StringBuilder dimensionIdStringToHash = new StringBuilder();
Map<String, String> preppedDimMap = prepDimensions(metric.getDimensions());
for (Map.Entry<String, String> entry : preppedDimMap.entrySet()) {
dimensionIdStringToHash.append(entry.getKey());
dimensionIdStringToHash.append(entry.getValue());
}
byte[] dimensionIdSha1Hash = DigestUtils.sha(dimensionIdStringToHash.toString());
Sha1HashId dimensionsSha1HashId = new Sha1HashId(dimensionIdSha1Hash);
// Add the dimension name/values to the batch.
metricRepo.addDimensionsToBatch(dimensionsSha1HashId, preppedDimMap);
// Add the definition dimensions to the batch.
StringBuilder
definitionDimensionsIdStringToHash =
new StringBuilder(definitionSha1HashId.toHexString());
definitionDimensionsIdStringToHash.append(dimensionsSha1HashId.toHexString());
byte[]
definitionDimensionsIdSha1Hash =
DigestUtils.sha(definitionDimensionsIdStringToHash.toString());
Sha1HashId definitionDimensionsSha1HashId = new Sha1HashId(definitionDimensionsIdSha1Hash);
metricRepo
.addDefinitionDimensionToBatch(definitionDimensionsSha1HashId, definitionSha1HashId,
dimensionsSha1HashId);
definitionDimensionsCounter.inc();
// Add the measurement to the batch.
String timeStamp = simpleDateFormat.format(new Date(metric.getTimestamp()));
double value = metric.getValue();
metricRepo.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value,
metric.getValueMeta());
metricCounter.inc();
metricCount++;
return metricCount;
}
@Override
@ -177,37 +74,4 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
metricRepo.flush();
}
private Map<String, String> prepDimensions(Map<String, String> dimMap) {
Map<String, String> newDimMap = new TreeMap<>();
if (dimMap != null) {
for (String dimName : dimMap.keySet()) {
if (dimName != null && !dimName.isEmpty()) {
String dimValue = dimMap.get(dimName);
if (dimValue != null && !dimValue.isEmpty()) {
newDimMap.put(trunc(dimName, MAX_COLUMN_LENGTH), trunc(dimValue, MAX_COLUMN_LENGTH));
dimensionCounter.inc();
}
}
}
}
return newDimMap;
}
private String trunc(String s, int l) {
if (s == null) {
return "";
} else if (s.length() <= l) {
return s;
} else {
String r = s.substring(0, l);
logger.warn("Input string exceeded max column length. Truncating input string {} to {} chars",
s, l);
logger.warn("Resulting string {}", r);
return r;
}
}
}

View File

@ -21,7 +21,7 @@ import monasca.common.model.event.AlarmStateTransitionedEvent;
public interface AlarmRepo {
public void addToBatch(final AlarmStateTransitionedEvent message);
void addToBatch(final AlarmStateTransitionedEvent message);
public void flush();
void flush();
}

View File

@ -1,263 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import io.dropwizard.setup.Environment;
public abstract class InfluxMetricRepo implements MetricRepo {
private static final Logger logger = LoggerFactory.getLogger(InfluxMetricRepo.class);
protected final Map<Sha1HashId, Def> defMap = new HashMap<>();
protected final Map<Sha1HashId, Set<Dim>> dimMap = new HashMap<>();
protected final Map<Sha1HashId, DefDim> defDimMap = new HashMap<>();
protected final Map<Sha1HashId, List<Measurement>> measurementMap = new HashMap<>();
public final com.codahale.metrics.Timer flushTimer;
public final Meter measurementMeter;
private static final Sha1HashId BLANK_SHA_1_HASH_ID = new Sha1HashId(DigestUtils.sha(""));
private static final Set<Dim> EMPTY_DIM_TREE_SET = new TreeSet<>();
protected abstract void write() throws Exception;
public InfluxMetricRepo(final Environment env) {
this.flushTimer = env.metrics().timer(this.getClass().getName() + "." +
"flush-timer");
this.measurementMeter = env.metrics().meter(this.getClass().getName() + "." +
"measurement-meter");
}
@Override
public void addMetricToBatch(final Sha1HashId defDimsId, final String timeStamp,
final double value, final Map<String, String> valueMeta) {
final Measurement measurement = new Measurement(defDimsId, timeStamp, value, valueMeta);
List<Measurement> measurementList = this.measurementMap.get(defDimsId);
if (measurementList == null) {
measurementList = new LinkedList<>();
this.measurementMap.put(defDimsId, measurementList);
}
measurementList.add(measurement);
}
@Override
public void addDefinitionToBatch(final Sha1HashId defId, final String name, final String tenantId,
final String region) {
if (!this.defMap.containsKey(defId)) {
final Def def = new Def(defId, name, tenantId, region);
this.defMap.put(defId, def);
}
}
@Override
public void addDimensionsToBatch(final Sha1HashId dimSetId, Map<String, String> dimMap) {
if (!this.dimMap.containsKey(dimSetId)) {
final Set<Dim> dimSet = new TreeSet<>();
this.dimMap.put(dimSetId, dimSet);
for (Map.Entry<String, String> entry : dimMap.entrySet()) {
final String name = entry.getKey();
final String value = entry.getValue();
final Dim dim = new Dim(dimSetId, name, value);
dimSet.add(dim);
}
}
}
@Override
public void addDefinitionDimensionToBatch(final Sha1HashId defDimsId, final Sha1HashId defId,
Sha1HashId dimId) {
if (!this.defDimMap.containsKey(defDimsId)) {
final DefDim defDim = new DefDim(defDimsId, defId, dimId);
this.defDimMap.put(defDimsId, defDim);
}
}
@Override
public void flush() {
try {
final long startTime = System.currentTimeMillis();
final Timer.Context context = flushTimer.time();
write();
final long endTime = System.currentTimeMillis();
context.stop();
logger.debug("Writing measurements, definitions, and dimensions to InfluxDB took {} seconds",
(endTime - startTime) / 1000);
} catch (Exception e) {
logger.error("Failed to write measurements to InfluxDB", e);
}
clearBuffers();
}
protected Def getDef(final Sha1HashId defId) throws Exception {
final Def def = this.defMap.get(defId);
if (def == null) {
throw new Exception("Failed to find definition for defId: " + defId);
}
return def;
}
protected Set<Dim> getDimSet(final Sha1HashId dimId) throws Exception {
// If there were no dimensions, then "" was used in the hash id and nothing was
// ever added to the dimension map for this dimension set.
if (dimId.equals(BLANK_SHA_1_HASH_ID)) {
return EMPTY_DIM_TREE_SET;
}
final Set<Dim> dimSet = this.dimMap.get(dimId);
if (dimSet == null) {
throw new Exception("Failed to find dimension set for dimId: " + dimId);
}
return dimSet;
}
private void clearBuffers() {
this.measurementMap.clear();
this.defMap.clear();
this.dimMap.clear();
this.defDimMap.clear();
}
static protected final class Measurement {
protected final Sha1HashId defDimsId;
protected final String time;
protected final double value;
protected final Map<String, String> valueMeta;
private Measurement(final Sha1HashId defDimsId, final String time, final double value,
final Map<String, String> valueMeta) {
this.defDimsId = defDimsId;
this.time = time;
this.value = value;
this.valueMeta = valueMeta;
}
@Override
public String toString() {
return "Measurement{" + "defDimsId=" + this.defDimsId + ", time='" + this.time + '\'' + ", " +
"value=" + this.value + '}';
}
}
static protected final class Def {
protected final Sha1HashId defId;
protected final String name;
protected final String tenantId;
protected final String region;
private Def(final Sha1HashId defId, final String name, final String tenantId,
final String region) {
this.defId = defId;
this.name = name;
this.tenantId = tenantId;
this.region = region;
}
@Override
public String toString() {
return "Definition{" + "defId=" + this.defId + ", name='" + this.name + '\'' + ", " +
"tenantId='" + this.tenantId + '\'' + ", region='" + this.region + '\'' + '}';
}
}
static protected final class Dim implements Comparable<Dim> {
protected final Sha1HashId dimSetId;
protected final String name;
protected final String value;
private Dim(final Sha1HashId dimSetId, final String name, final String value) {
this.dimSetId = dimSetId;
this.name = name;
this.value = value;
}
@Override
public String toString() {
return "Dimension{" + "dimSetId=" + this.dimSetId + ", name='" + this.name + '\'' + ", " +
"value='" + this.value + '\'' + '}';
}
@Override
public int compareTo(Dim o) {
int nameCmp = String.CASE_INSENSITIVE_ORDER.compare(name, o.name);
return (nameCmp != 0 ? nameCmp : String.CASE_INSENSITIVE_ORDER.compare(this.value, o.value));
}
}
static protected final class DefDim {
protected final Sha1HashId defDimId;
protected final Sha1HashId defId;
protected final Sha1HashId dimId;
private DefDim(final Sha1HashId defDimId, final Sha1HashId defId, final Sha1HashId dimId) {
this.defDimId = defDimId;
this.defId = defId;
this.dimId = dimId;
}
@Override
public String toString() {
return "DefinitionDimension{" + "defDimId=" + this.defDimId + ", defId=" + this.defId + ", " +
"dimId=" + this.dimId + '}';
}
}
}

View File

@ -1,153 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository;
import com.google.inject.Inject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.influxdb.dto.Serie;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import io.dropwizard.setup.Environment;
public class InfluxV8MetricRepo extends InfluxMetricRepo
{
private static final Logger logger = LoggerFactory.getLogger(InfluxV8MetricRepo.class);
private static final String[] COL_NAMES_STRING_ARRY = {"time", "value", "value_meta"};
private final InfluxV8RepoWriter influxV8RepoWriter;
private final SimpleDateFormat simpleDateFormat =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS zzz");
private final ObjectMapper objectMapper = new ObjectMapper();
@Inject
public InfluxV8MetricRepo(final Environment env,
final InfluxV8RepoWriter influxV8RepoWriter) {
super(env);
this.influxV8RepoWriter = influxV8RepoWriter;
}
@Override
protected void write() throws Exception {
this.influxV8RepoWriter.write(TimeUnit.MILLISECONDS, getSeries());
}
private Serie[] getSeries() throws Exception {
final List<Serie> serieList = new LinkedList<>();
for (Map.Entry<Sha1HashId, List<Measurement>> entry : this.measurementMap.entrySet()) {
Sha1HashId defDimId = entry.getKey();
final DefDim defDim = this.defDimMap.get(defDimId);
final Def def = getDef(defDim.defId);
final Set<Dim> dimSet = getDimSet(defDim.dimId);
final Serie.Builder builder = new Serie.Builder(buildSerieName(def, dimSet));
builder.columns(COL_NAMES_STRING_ARRY);
for (final Measurement measurement : entry.getValue()) {
final Object[] colValsObjArry = new Object[COL_NAMES_STRING_ARRY.length];
final Date date = this.simpleDateFormat.parse(measurement.time + " UTC");
final Long time = date.getTime();
colValsObjArry[0] = time;
logger.debug("Added column value to colValsObjArry[{}] = {}", 0, colValsObjArry[0]);
colValsObjArry[1] = measurement.value;
logger.debug("Added column value to colValsObjArry[{}] = {}", 1, colValsObjArry[1]);
if (measurement.valueMeta != null && !measurement.valueMeta.isEmpty()) {
try {
final String valueMetaJson = objectMapper.writeValueAsString(measurement.valueMeta);
colValsObjArry[2] = valueMetaJson;
logger.debug("Added column value to colValsObjArry[{}] = {}", 2, valueMetaJson);
}
catch (JsonProcessingException e) {
logger.error("Unable to serialize value meta data {}", measurement.valueMeta, e);
}
}
builder.values(colValsObjArry);
this.measurementMeter.mark();
}
final Serie serie = builder.build();
if (logger.isDebugEnabled()) {
this.influxV8RepoWriter.logColValues(serie);
}
serieList.add(serie);
logger.debug("Added serie: {} to serieList", serie.getName());
}
return serieList.toArray(new Serie[serieList.size()]);
}
private String buildSerieName(final Def def, final Set<Dim> dimList)
throws UnsupportedEncodingException {
logger.debug("Creating serie name");
final StringBuilder serieNameBuilder = new StringBuilder();
logger.debug("Adding tenant_id to serie name: {}", def.tenantId);
serieNameBuilder.append(urlEncodeUTF8(def.tenantId));
serieNameBuilder.append("?");
logger.debug("Adding region to serie name: {}", def.region);
serieNameBuilder.append(urlEncodeUTF8(def.region));
serieNameBuilder.append("&");
logger.debug("Adding name to serie name: {}", def.name);
serieNameBuilder.append(urlEncodeUTF8(def.name));
for (final Dim dim : dimList) {
serieNameBuilder.append("&");
logger.debug("Adding dimension name to serie name: {}", dim.name);
serieNameBuilder.append(urlEncodeUTF8(dim.name));
serieNameBuilder.append("=");
logger.debug("Adding dimension value to serie name: {}", dim.value);
serieNameBuilder.append(urlEncodeUTF8(dim.value));
}
final String serieName = serieNameBuilder.toString();
logger.debug("Created serie name: {}", serieName);
return serieName;
}
private String urlEncodeUTF8(final String s) throws UnsupportedEncodingException {
return URLEncoder.encode(s, "UTF-8");
}
}

View File

@ -1,117 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository;
import com.google.inject.Inject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import io.dropwizard.setup.Environment;
import monasca.persister.repository.influxdb.InfluxPoint;
public class InfluxV9MetricRepo extends InfluxMetricRepo {
private static final Logger logger = LoggerFactory.getLogger(InfluxV9MetricRepo.class);
private final ObjectMapper objectMapper = new ObjectMapper();
private final SimpleDateFormat simpleDateFormat =
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS zzz");
private final InfluxV9RepoWriter influxV9RepoWriter;
@Inject
public InfluxV9MetricRepo(final Environment env,
final InfluxV9RepoWriter influxV9RepoWriter) {
super(env);
this.influxV9RepoWriter = influxV9RepoWriter;
}
@Override
protected void write() throws Exception {
this.influxV9RepoWriter.write(getInfluxPointArry());
}
private InfluxPoint[] getInfluxPointArry() throws Exception {
DateTimeFormatter dateFormatter = ISODateTimeFormat.dateTime();
List<InfluxPoint> influxPointList = new LinkedList<>();
for (final Sha1HashId defDimId : this.measurementMap.keySet()) {
final DefDim defDim = this.defDimMap.get(defDimId);
final Def def = getDef(defDim.defId);
final Set<Dim> dimSet = getDimSet(defDim.dimId);
Map<String, String> tagMap = new HashMap<>();
for (Dim dim : dimSet) {
tagMap.put(dim.name, dim.value);
}
tagMap.put("_tenant_id", def.tenantId);
tagMap.put("_region", def.region);
for (final Measurement measurement : this.measurementMap.get(defDimId)) {
Date date = this.simpleDateFormat.parse(measurement.time + " UTC");
DateTime dateTime = new DateTime(date.getTime(), DateTimeZone.UTC);
String dateString = dateFormatter.print(dateTime);
Map<String, Object> valueMap = new HashMap<>();
valueMap.put("value", measurement.value);
if (measurement.valueMeta != null && !measurement.valueMeta.isEmpty()) {
try {
final String valueMetaJson = objectMapper.writeValueAsString(measurement.valueMeta);
logger.debug("Added value for value_meta of {}", valueMetaJson);
valueMap.put("value_meta", valueMetaJson);
} catch (JsonProcessingException e) {
logger.error("Unable to serialize {}", measurement.valueMeta, e);
}
}
InfluxPoint influxPoint = new InfluxPoint(def.name, tagMap, dateString, valueMap);
influxPointList.add(influxPoint);
this.measurementMeter.mark();
}
}
return influxPointList.toArray(new InfluxPoint[influxPointList.size()]);
}
}

View File

@ -17,16 +17,11 @@
package monasca.persister.repository;
import java.util.Map;
import monasca.common.model.metric.MetricEnvelope;
public interface MetricRepo {
void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value, Map<String, String> valueMeta);
void addDefinitionToBatch(Sha1HashId defId, String name, String tenantId, String region);
void addDimensionsToBatch(Sha1HashId dimSetId, Map<String, String> dimMap);
void addDefinitionDimensionToBatch(Sha1HashId defDimsId, Sha1HashId defId, Sha1HashId dimId);
void addToBatch(MetricEnvelope metricEnvelope);
void flush();
}

View File

@ -1,22 +0,0 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository;
public final class VerticaMetricsConstants {
public static final int MAX_COLUMN_LENGTH = 255;
}

View File

@ -0,0 +1,89 @@
/*
* Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package monasca.persister.repository.influxdb;
public final class Definition {
private static final int MAX_DEFINITION_NAME_LENGTH = 255;
private static final int MAX_TENANT_ID_LENGTH = 255;
private static final int MAX_REGION_LENGTH = 255;
public final String name;
public final String tenantId;
public final String region;
public Definition(String name, String tenantId, String region) {
if (name.length() > MAX_DEFINITION_NAME_LENGTH) {
name = name.substring(0, MAX_DEFINITION_NAME_LENGTH);
}
this.name = name;
if (tenantId.length() > MAX_TENANT_ID_LENGTH) {
tenantId = tenantId.substring(0, MAX_TENANT_ID_LENGTH);
}
this.tenantId = tenantId;
if (region.length() > MAX_REGION_LENGTH) {
region = region.substring(0, MAX_REGION_LENGTH);
}
this.region = region;
}
public String getName() {
return name;
}
public String getTenantId() {
return tenantId;
}
public String getRegion() {
return region;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Definition)) {
return false;
}
Definition that = (Definition) o;
if (!name.equals(that.name)) {
return false;
}
if (!tenantId.equals(that.tenantId)) {
return false;
}
return region.equals(that.region);
}
@Override
public int hashCode() {
int result = name.hashCode();
result = 31 * result + tenantId.hashCode();
result = 31 * result + region.hashCode();
return result;
}
}

View File

@ -0,0 +1,103 @@
/*
* Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package monasca.persister.repository.influxdb;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import javax.annotation.Nullable;
public class Dimensions {
private static final int MAX_DIMENSIONS_NAME_LENGTH = 255;
private static final int MAX_DIMENSIONS_VALUE_LENGTH = 255;
private final Map<String, String> dimensionsMap;
public Dimensions(@Nullable Map<String, String> dimensionsMap) {
this.dimensionsMap = new TreeMap<>();
if (dimensionsMap != null) {
for (String name : dimensionsMap.keySet()) {
if (name != null && !name.isEmpty()) {
String value = dimensionsMap.get(name);
if (value != null && !value.isEmpty()) {
if (name.length() > MAX_DIMENSIONS_NAME_LENGTH) {
name = name.substring(0, MAX_DIMENSIONS_NAME_LENGTH);
}
if (value.length() > MAX_DIMENSIONS_VALUE_LENGTH) {
value = value.substring(0, MAX_DIMENSIONS_VALUE_LENGTH);
}
this.dimensionsMap.put(name, value);
}
}
}
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Dimensions)) {
return false;
}
Dimensions that = (Dimensions) o;
return dimensionsMap.equals(that.dimensionsMap);
}
@Override
public int hashCode() {
return dimensionsMap.hashCode();
}
public Set<String> keySet() {
return this.dimensionsMap.keySet();
}
public Set<Map.Entry<String, String>> entrySet() {
return this.dimensionsMap.entrySet();
}
public String get(String key) {
return this.dimensionsMap.get(key);
}
}

View File

@ -12,7 +12,10 @@
* the License.
*/
package monasca.persister.repository;
package monasca.persister.repository.influxdb;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.persister.repository.AlarmRepo;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
@ -25,7 +28,6 @@ import java.util.LinkedList;
import java.util.List;
import io.dropwizard.setup.Environment;
import monasca.common.model.event.AlarmStateTransitionedEvent;
public abstract class InfluxAlarmRepo implements AlarmRepo {

View File

@ -0,0 +1,103 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository.influxdb;
import monasca.common.model.metric.Metric;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.repository.MetricRepo;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import io.dropwizard.setup.Environment;
public abstract class InfluxMetricRepo implements MetricRepo {
private static final Logger logger = LoggerFactory.getLogger(InfluxMetricRepo.class);
protected final MeasurementBuffer measurementBuffer = new MeasurementBuffer();
public final com.codahale.metrics.Timer flushTimer;
public final Meter measurementMeter;
protected abstract void write() throws Exception;
public InfluxMetricRepo(final Environment env) {
this.flushTimer = env.metrics().timer(this.getClass().getName() + ".flush-timer");
this.measurementMeter = env.metrics().meter(this.getClass().getName() + ".measurement-meter");
}
@Override
public void addToBatch(MetricEnvelope metricEnvelope) {
Metric metric = metricEnvelope.metric;
Map<String, Object> meta = metricEnvelope.meta;
Definition
definition =
new Definition(metric.getName(), (String) meta.get("tenantId"),
(String) meta.get("region"));
Dimensions dimensions = new Dimensions(metric.getDimensions());
Measurement
measurement =
new Measurement(metric.getTimestamp(), metric.getValue(), metric.getValueMeta());
this.measurementBuffer.put(definition, dimensions, measurement);
this.measurementMeter.mark();
}
@Override
public void flush() {
try {
final long startTime = System.currentTimeMillis();
final Timer.Context context = flushTimer.time();
write();
final long endTime = System.currentTimeMillis();
context.stop();
logger.debug("Writing measurements, definitions, and dimensions to InfluxDB took {} seconds",
(endTime - startTime) / 1000);
} catch (Exception e) {
logger.error("Failed to write measurements to InfluxDB", e);
}
clearBuffers();
}
private void clearBuffers() {
this.measurementBuffer.clear();
}
}

View File

@ -15,7 +15,9 @@
* limitations under the License.
*/
package monasca.persister.repository;
package monasca.persister.repository.influxdb;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import com.google.inject.Inject;
@ -30,7 +32,6 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
import io.dropwizard.setup.Environment;
import monasca.common.model.event.AlarmStateTransitionedEvent;
public class InfluxV8AlarmRepo extends InfluxAlarmRepo {

View File

@ -0,0 +1,150 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository.influxdb;
import com.google.inject.Inject;
import org.influxdb.dto.Serie;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.text.ParseException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.dropwizard.setup.Environment;
public class InfluxV8MetricRepo extends InfluxMetricRepo
{
private static final Logger logger = LoggerFactory.getLogger(InfluxV8MetricRepo.class);
private static final String[] COL_NAMES_STRING_ARRY = {"time", "value", "value_meta"};
private final InfluxV8RepoWriter influxV8RepoWriter;
@Inject
public InfluxV8MetricRepo(final Environment env,
final InfluxV8RepoWriter influxV8RepoWriter) {
super(env);
this.influxV8RepoWriter = influxV8RepoWriter;
}
@Override
protected void write() throws Exception {
this.influxV8RepoWriter.write(TimeUnit.MILLISECONDS, getSeries());
}
private Serie[] getSeries() throws UnsupportedEncodingException, ParseException {
final List<Serie> serieList = new LinkedList<>();
for (Map.Entry<Definition, Map<Dimensions, List<Measurement>>> definitionMapEntry
: this.measurementBuffer.entrySet()) {
Definition definition = definitionMapEntry.getKey();
Map<Dimensions, List<Measurement>> dimensionsMap = definitionMapEntry.getValue();
for (Map.Entry<Dimensions, List<Measurement>> dimensionsMapEntry : dimensionsMap.entrySet()) {
Dimensions dimensions = dimensionsMapEntry.getKey();
List<Measurement> measurementList = dimensionsMapEntry.getValue();
final Serie.Builder builder = new Serie.Builder(buildSerieName(definition, dimensions));
builder.columns(COL_NAMES_STRING_ARRY);
for (Measurement measurement : measurementList) {
final Object[] valObjArry = new Object[COL_NAMES_STRING_ARRY.length];
valObjArry[0] = measurement.getTime();
logger.debug("Added column value to valObjArry[{}] = {}", 0, valObjArry[0]);
valObjArry[1] = measurement.getValue();
logger.debug("Added column value to valObjArry[{}] = {}", 1, valObjArry[1]);
valObjArry[2] = measurement.getValueMetaJSONString();
logger.debug("Added column value to valObjArry[{}] = {}", 2, valObjArry[2]);
builder.values(valObjArry);
this.measurementMeter.mark();
}
final Serie serie = builder.build();
if (logger.isDebugEnabled()) {
this.influxV8RepoWriter.logColValues(serie);
}
serieList.add(serie);
logger.debug("Added serie: {} to serieList", serie.getName());
}
}
return serieList.toArray(new Serie[serieList.size()]);
}
private String buildSerieName(final Definition definition, final Dimensions dimensions)
throws UnsupportedEncodingException {
logger.debug("Creating serie name");
final StringBuilder serieNameBuilder = new StringBuilder();
logger.debug("Adding tenant_id to serie name: {}", definition.getTenantId());
serieNameBuilder.append(urlEncodeUTF8(definition.getTenantId()));
serieNameBuilder.append("?");
logger.debug("Adding region to serie name: {}", definition.getRegion());
serieNameBuilder.append(urlEncodeUTF8(definition.getRegion()));
serieNameBuilder.append("&");
logger.debug("Adding name to serie name: {}", definition.getName());
serieNameBuilder.append(urlEncodeUTF8(definition.getName()));
for (final String name : dimensions.keySet()) {
final String value = dimensions.get(name);
serieNameBuilder.append("&");
logger.debug("Adding dimension name to serie name: {}", name);
serieNameBuilder.append(urlEncodeUTF8(name));
serieNameBuilder.append("=");
logger.debug("Adding dimension value to serie name: {}", value);
serieNameBuilder.append(urlEncodeUTF8(value));
}
final String serieName = serieNameBuilder.toString();
logger.debug("Created serie name: {}", serieName);
return serieName;
}
private String urlEncodeUTF8(final String s) throws UnsupportedEncodingException {
return URLEncoder.encode(s, "UTF-8");
}
}

View File

@ -15,7 +15,9 @@
* limitations under the License.
*/
package monasca.persister.repository;
package monasca.persister.repository.influxdb;
import monasca.persister.configuration.PersisterConfig;
import com.google.inject.Inject;
@ -30,7 +32,6 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.dropwizard.setup.Environment;
import monasca.persister.configuration.PersisterConfig;
public class InfluxV8RepoWriter {

View File

@ -15,7 +15,9 @@
* limitations under the License.
*/
package monasca.persister.repository;
package monasca.persister.repository.influxdb;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import com.google.inject.Inject;
@ -35,8 +37,6 @@ import java.util.List;
import java.util.Map;
import io.dropwizard.setup.Environment;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.persister.repository.influxdb.InfluxPoint;
public class InfluxV9AlarmRepo extends InfluxAlarmRepo {

View File

@ -0,0 +1,119 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monasca.persister.repository.influxdb;
import com.google.inject.Inject;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import io.dropwizard.setup.Environment;
public class InfluxV9MetricRepo extends InfluxMetricRepo {
private final InfluxV9RepoWriter influxV9RepoWriter;
@Inject
public InfluxV9MetricRepo(final Environment env,
final InfluxV9RepoWriter influxV9RepoWriter) {
super(env);
this.influxV9RepoWriter = influxV9RepoWriter;
}
@Override
protected void write() throws Exception {
this.influxV9RepoWriter.write(getInfluxPointArry());
}
private InfluxPoint[] getInfluxPointArry() throws Exception {
List<InfluxPoint> influxPointList = new LinkedList<>();
for (Map.Entry<Definition, Map<Dimensions, List<Measurement>>> definitionMapEntry :
this.measurementBuffer.entrySet()) {
Definition definition = definitionMapEntry.getKey();
Map<Dimensions, List<Measurement>> dimensionsMap = definitionMapEntry.getValue();
for (Map.Entry<Dimensions, List<Measurement>> dimensionsMapEntry : dimensionsMap.entrySet()) {
Dimensions dimensions = dimensionsMapEntry.getKey();
List<Measurement> measurementList = dimensionsMapEntry.getValue();
Map<String, String> tagMap = buildTagMap(definition, dimensions);
for (Measurement measurement : measurementList) {
InfluxPoint
influxPoint =
new InfluxPoint(definition.getName(),
tagMap,
measurement.getISOFormattedTimeString(),
buildValueMap(measurement));
influxPointList.add(influxPoint);
}
}
}
return influxPointList.toArray(new InfluxPoint[influxPointList.size()]);
}
private Map<String, Object> buildValueMap(Measurement measurement) {
Map<String, Object> valueMap = new HashMap<>();
valueMap.put("value", measurement.getValue());
String valueMetaJSONString = measurement.getValueMetaJSONString();
if (valueMetaJSONString != null) {
valueMap.put("value_meta", valueMetaJSONString);
}
return valueMap;
}
private Map<String, String> buildTagMap(Definition definition, Dimensions dimensions) {
Map<String,String> tagMap = new HashMap<>();
for (Map.Entry<String, String> dimensionsEntry : dimensions.entrySet()) {
String name = dimensionsEntry.getKey();
String value = dimensionsEntry.getValue();
tagMap.put(name, value);
}
tagMap.put("_tenant_id", definition.getTenantId());
tagMap.put("_region", definition.getRegion());
return tagMap;
}
}

View File

@ -15,7 +15,9 @@
* limitations under the License.
*/
package monasca.persister.repository;
package monasca.persister.repository.influxdb;
import monasca.persister.configuration.PersisterConfig;
import com.google.inject.Inject;
@ -47,10 +49,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.repository.influxdb.InfluxPoint;
import monasca.persister.repository.influxdb.InfluxWrite;
public class InfluxV9RepoWriter {
private static final Logger logger = LoggerFactory.getLogger(InfluxV9RepoWriter.class);

View File

@ -0,0 +1,94 @@
/*
* Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package monasca.persister.repository.influxdb;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
public final class Measurement {
private static final Logger logger = LoggerFactory.getLogger(Measurement.class);
private final ObjectMapper objectMapper = new ObjectMapper();
public final long time;
public final double value;
public final Map<String, String> valueMeta;
public Measurement(final long time, final double value,
final @Nullable Map<String, String> valueMeta) {
this.time = time;
this.value = value;
this.valueMeta = valueMeta == null ? new HashMap<String, String>() : valueMeta;
}
public String getISOFormattedTimeString() {
DateTimeFormatter dateFormatter = ISODateTimeFormat.dateTime();
Date date = new Date(this.time);
DateTime dateTime = new DateTime(date.getTime(), DateTimeZone.UTC);
return dateFormatter.print(dateTime);
}
public long getTime() {
return time;
}
public double getValue() {
return value;
}
public Map<String, String> getValueMeta() {
return valueMeta;
}
public String getValueMetaJSONString() {
if (!this.valueMeta.isEmpty()) {
try {
return objectMapper.writeValueAsString(this.valueMeta);
} catch (JsonProcessingException e) {
logger.error("Failed to serialize value meta {}", this.valueMeta, e);
}
}
return null;
}
}

View File

@ -0,0 +1,79 @@
/*
* Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package monasca.persister.repository.influxdb;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class MeasurementBuffer {
private final Map<Definition, Map<Dimensions, List<Measurement>>>
measurementMap = new HashMap<>();
public void put(Definition definition, Dimensions dimensions, Measurement measurement) {
Map<Dimensions, List<Measurement>> dimensionsMap = this.measurementMap.get(definition);
if (dimensionsMap == null) {
dimensionsMap = initDimensionsMap(definition, dimensions);
}
List<Measurement> measurementList = dimensionsMap.get(dimensions);
if (measurementList == null) {
measurementList = initMeasurementList(dimensionsMap, dimensions);
}
measurementList.add(measurement);
}
public Set<Map.Entry<Definition, Map<Dimensions, List<Measurement>>>> entrySet() {
return this.measurementMap.entrySet();
}
public void clear() {
this.measurementMap.clear();
}
private Map<Dimensions, List<Measurement>> initDimensionsMap(Definition definition,
Dimensions dimensions) {
Map<Dimensions, List<Measurement>> dimensionsMap = new HashMap<>();
List<Measurement> measurementList = new LinkedList<>();
dimensionsMap.put(dimensions, measurementList);
this.measurementMap.put(definition, dimensionsMap);
return dimensionsMap;
}
private List<Measurement> initMeasurementList(Map<Dimensions, List<Measurement>> dimensionsMap,
Dimensions dimensions) {
List<Measurement> measurementList = new LinkedList<>();
dimensionsMap.put(dimensions, measurementList);
return measurementList;
}
}

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package monasca.persister.repository;
package monasca.persister.repository.vertica;
import org.apache.commons.codec.binary.Hex;

View File

@ -15,15 +15,14 @@
* limitations under the License.
*/
package monasca.persister.repository;
package monasca.persister.repository.vertica;
import monasca.common.model.event.AlarmStateTransitionedEvent;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.repository.AlarmRepo;
import com.codahale.metrics.Timer;
import io.dropwizard.setup.Environment;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.PreparedBatch;
import org.slf4j.Logger;
@ -37,6 +36,8 @@ import java.util.TimeZone;
import javax.inject.Inject;
import io.dropwizard.setup.Environment;
public class VerticaAlarmRepo extends VerticaRepo implements AlarmRepo {
private static final Logger logger = LoggerFactory.getLogger(VerticaAlarmRepo.class);

View File

@ -15,14 +15,22 @@
* limitations under the License.
*/
package monasca.persister.repository;
package monasca.persister.repository.vertica;
import monasca.common.model.metric.Metric;
import monasca.common.model.metric.MetricEnvelope;
import monasca.persister.configuration.PersisterConfig;
import monasca.persister.pipeline.event.MetricHandler;
import monasca.persister.repository.MetricRepo;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import org.apache.commons.codec.digest.DigestUtils;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.PreparedBatch;
import org.slf4j.Logger;
@ -30,19 +38,30 @@ import org.slf4j.LoggerFactory;
import java.security.NoSuchAlgorithmException;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TimeZone;
import java.util.TreeMap;
import javax.inject.Inject;
import io.dropwizard.setup.Environment;
import monasca.persister.configuration.PersisterConfig;
public class VerticaMetricRepo extends VerticaRepo implements MetricRepo {
private static final Logger logger = LoggerFactory.getLogger(VerticaMetricRepo.class);
public static final int MAX_COLUMN_LENGTH = 255;
private final SimpleDateFormat simpleDateFormat;
private static final String TENANT_ID = "tenantId";
private static final String REGION = "region";
private final Environment environment;
private final Cache<Sha1HashId, Sha1HashId> definitionsIdCache;
@ -56,8 +75,6 @@ public class VerticaMetricRepo extends VerticaRepo implements MetricRepo {
private static final String SQL_INSERT_INTO_METRICS =
"insert into MonMetrics.measurements (definition_dimensions_id, time_stamp, value) values (:definition_dimension_id, :time_stamp, :value)";
// If any of the columns change size be sure to update VerticaMetricConstants.java as well.
private static final String DEFINITIONS_TEMP_STAGING_TABLE = "(" + " id BINARY(20) NOT NULL,"
+ " name VARCHAR(255) NOT NULL," + " tenant_id VARCHAR(255) NOT NULL,"
+ " region VARCHAR(255) NOT NULL" + ")";
@ -83,6 +100,12 @@ public class VerticaMetricRepo extends VerticaRepo implements MetricRepo {
private final String dimensionsTempStagingTableInsertStmt;
private final String definitionDimensionsTempStagingTableInsertStmt;
private final Counter metricCounter;
private final Counter definitionCounter;
private final Counter dimensionCounter;
private final Counter definitionDimensionsCounter;
private final Timer flushTimer;
public final Meter measurementMeter;
public final Meter definitionCacheMissMeter;
@ -98,7 +121,24 @@ public class VerticaMetricRepo extends VerticaRepo implements MetricRepo {
super(dbi);
logger.debug("Instantiating: " + this.getClass().getName());
simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
simpleDateFormat.setTimeZone(TimeZone.getTimeZone("GMT-0"));
this.environment = environment;
final String handlerName = String.format("%s[%d]", MetricHandler.class.getName(), new Random().nextInt());
this.metricCounter =
environment.metrics().counter(handlerName + "." + "metrics-added-to-batch-counter");
this.definitionCounter =
environment.metrics()
.counter(handlerName + "." + "metric-definitions-added-to-batch-counter");
this.dimensionCounter =
environment.metrics()
.counter(handlerName + "." + "metric-dimensions-added-to-batch-counter");
this.definitionDimensionsCounter =
environment.metrics()
.counter(handlerName + "." + "metric-definition-dimensions-added-to-batch-counter");
this.flushTimer =
this.environment.metrics().timer(this.getClass().getName() + "." + "flush-timer");
this.measurementMeter =
@ -203,18 +243,93 @@ public class VerticaMetricRepo extends VerticaRepo implements MetricRepo {
}
@Override
public void addToBatch(MetricEnvelope metricEnvelope) {
Metric metric = metricEnvelope.metric;
Map<String, Object> meta = metricEnvelope.meta;
logger.debug("metric: {}", metric);
logger.debug("meta: {}", meta);
String tenantId = "";
if (meta.containsKey(TENANT_ID)) {
tenantId = (String) meta.get(TENANT_ID);
} else {
logger.warn(
"Failed to find tenantId in message envelope meta data. Metric message may be malformed"
+ ". Setting tenantId to empty string.");
logger.warn("metric: {}", metric.toString());
logger.warn("meta: {}", meta.toString());
}
String region = "";
if (meta.containsKey(REGION)) {
region = (String) meta.get(REGION);
} else {
logger.warn(
"Failed to find region in message envelope meta data. Metric message may be malformed. "
+ "Setting region to empty string.");
logger.warn("metric: {}", metric.toString());
logger.warn("meta: {}", meta.toString());
}
// Add the definition to the batch.
StringBuilder
definitionIdStringToHash =
new StringBuilder(trunc(metric.getName(), MAX_COLUMN_LENGTH));
definitionIdStringToHash.append(trunc(tenantId, MAX_COLUMN_LENGTH));
definitionIdStringToHash.append(trunc(region, MAX_COLUMN_LENGTH));
byte[] definitionIdSha1Hash = DigestUtils.sha(definitionIdStringToHash.toString());
Sha1HashId definitionSha1HashId = new Sha1HashId((definitionIdSha1Hash));
this.addDefinitionToBatch(definitionSha1HashId, trunc(metric.getName(), MAX_COLUMN_LENGTH),
trunc(tenantId, MAX_COLUMN_LENGTH), trunc(region, MAX_COLUMN_LENGTH));
definitionCounter.inc();
// Calculate dimensions sha1 hash id.
StringBuilder dimensionIdStringToHash = new StringBuilder();
Map<String, String> preppedDimMap = prepDimensions(metric.getDimensions());
for (Map.Entry<String, String> entry : preppedDimMap.entrySet()) {
dimensionIdStringToHash.append(entry.getKey());
dimensionIdStringToHash.append(entry.getValue());
}
byte[] dimensionIdSha1Hash = DigestUtils.sha(dimensionIdStringToHash.toString());
Sha1HashId dimensionsSha1HashId = new Sha1HashId(dimensionIdSha1Hash);
// Add the dimension name/values to the batch.
this.addDimensionsToBatch(dimensionsSha1HashId, preppedDimMap);
// Add the definition dimensions to the batch.
StringBuilder
definitionDimensionsIdStringToHash =
new StringBuilder(definitionSha1HashId.toHexString());
definitionDimensionsIdStringToHash.append(dimensionsSha1HashId.toHexString());
byte[]
definitionDimensionsIdSha1Hash =
DigestUtils.sha(definitionDimensionsIdStringToHash.toString());
Sha1HashId definitionDimensionsSha1HashId = new Sha1HashId(definitionDimensionsIdSha1Hash);
this.addDefinitionDimensionToBatch(definitionDimensionsSha1HashId, definitionSha1HashId,
dimensionsSha1HashId);
definitionDimensionsCounter.inc();
// Add the measurement to the batch.
String timeStamp = simpleDateFormat.format(new Date(metric.getTimestamp()));
double value = metric.getValue();
this.addMetricToBatch(definitionDimensionsSha1HashId, timeStamp, value, metric.getValueMeta());
this.metricCounter.inc();
}
public void addMetricToBatch(Sha1HashId defDimsId, String timeStamp, double value,
Map<String, String> valueMeta) {
// TODO: Actually handle valueMeta
logger.debug("Adding metric to batch: defDimsId: {}, time: {}, value: {}",
defDimsId.toHexString(), timeStamp, value);
defDimsId.toHexString(), timeStamp, value);
metricsBatch.add().bind("definition_dimension_id", defDimsId.getSha1Hash())
.bind("time_stamp", timeStamp).bind("value", value);
measurementMeter.mark();
}
@Override
public void addDefinitionToBatch(Sha1HashId defId, String name, String tenantId, String region) {
private void addDefinitionToBatch(Sha1HashId defId, String name, String tenantId, String region) {
if (definitionsIdCache.getIfPresent(defId) == null) {
@ -236,8 +351,7 @@ public class VerticaMetricRepo extends VerticaRepo implements MetricRepo {
}
}
@Override
public void addDimensionsToBatch(Sha1HashId dimSetId, Map<String, String> dimMap) {
private void addDimensionsToBatch(Sha1HashId dimSetId, Map<String, String> dimMap) {
if (dimensionsIdCache.getIfPresent(dimSetId) == null) {
@ -266,8 +380,7 @@ public class VerticaMetricRepo extends VerticaRepo implements MetricRepo {
}
}
@Override
public void addDefinitionDimensionToBatch(Sha1HashId defDimsId, Sha1HashId defId,
private void addDefinitionDimensionToBatch(Sha1HashId defDimsId, Sha1HashId defId,
Sha1HashId dimId) {
if (definitionDimensionsIdCache.getIfPresent(defDimsId) == null) {
@ -354,4 +467,37 @@ public class VerticaMetricRepo extends VerticaRepo implements MetricRepo {
dimensionIdSet.clear();
definitionDimensionsIdSet.clear();
}
private Map<String, String> prepDimensions(Map<String, String> dimMap) {
Map<String, String> newDimMap = new TreeMap<>();
if (dimMap != null) {
for (String dimName : dimMap.keySet()) {
if (dimName != null && !dimName.isEmpty()) {
String dimValue = dimMap.get(dimName);
if (dimValue != null && !dimValue.isEmpty()) {
newDimMap.put(trunc(dimName, MAX_COLUMN_LENGTH), trunc(dimValue, MAX_COLUMN_LENGTH));
dimensionCounter.inc();
}
}
}
}
return newDimMap;
}
private String trunc(String s, int l) {
if (s == null) {
return "";
} else if (s.length() <= l) {
return s;
} else {
String r = s.substring(0, l);
logger.warn("Input string exceeded max column length. Truncating input string {} to {} chars",
s, l);
logger.warn("Resulting string {}", r);
return r;
}
}
}

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package monasca.persister.repository;
package monasca.persister.repository.vertica;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;