Modified to use Metric in mon-common.

This commit is contained in:
Roland Hochmuth 2014-04-15 13:03:30 -06:00
parent ed82aee920
commit 9ff005daa6
24 changed files with 158 additions and 269 deletions

View File

@ -28,7 +28,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<artifactNamedVersion>${project.name}-${project.version}-${timestamp}-${buildNumber}
</artifactNamedVersion>
<mon.common.version>1.0.0.27</mon.common.version>
<mon.common.version>1.0.0.31</mon.common.version>
</properties>
<!--Needed for buildnumber-maven-plugin-->

View File

@ -7,10 +7,9 @@ import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.persister.consumer.*;
import com.hpcloud.mon.persister.dbi.DBIProvider;
import com.hpcloud.mon.persister.disruptor.*;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedMessageEventHandler;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedMessageEventHandlerFactory;
import com.hpcloud.mon.persister.disruptor.event.MetricMessageEventHandler;
import com.hpcloud.mon.persister.disruptor.event.MetricMessageEventHandlerFactory;
import com.hpcloud.mon.persister.disruptor.event.*;
import com.hpcloud.mon.persister.disruptor.event.MetricHandler;
import com.hpcloud.mon.persister.disruptor.event.MetricHandlerFactory;
import com.hpcloud.mon.persister.repository.RepositoryCommitHeartbeat;
import com.lmax.disruptor.ExceptionHandler;
import io.dropwizard.setup.Environment;
@ -33,12 +32,12 @@ public class MonPersisterModule extends AbstractModule {
bind(Environment.class).toInstance(environment);
install(new FactoryModuleBuilder()
.implement(MetricMessageEventHandler.class, MetricMessageEventHandler.class)
.build(MetricMessageEventHandlerFactory.class));
.implement(MetricHandler.class, MetricHandler.class)
.build(MetricHandlerFactory.class));
install(new FactoryModuleBuilder()
.implement(AlarmStateTransitionedMessageEventHandler.class, AlarmStateTransitionedMessageEventHandler.class)
.build(AlarmStateTransitionedMessageEventHandlerFactory.class));
.implement(AlarmStateTransitionedEventHandler.class, AlarmStateTransitionedEventHandler.class)
.build(AlarmStateTransitionedEventHandlerFactory.class));
install(new FactoryModuleBuilder()
.implement(KafkaMetricsConsumerRunnableBasic.class, KafkaMetricsConsumerRunnableBasic.class)

View File

@ -5,7 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.hpcloud.mon.persister.disruptor.AlarmStateHistoryDisruptor;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedMessageEvent;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHolder;
import com.lmax.disruptor.EventTranslator;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
@ -45,14 +45,14 @@ public class KafkaAlarmStateTransitionConsumerRunnableBasic implements Runnable
logger.debug("Thread " + threadNumber + ": " + s);
try {
final AlarmStateTransitionedEvent message = objectMapper.readValue(s, AlarmStateTransitionedEvent.class);
final AlarmStateTransitionedEvent event = objectMapper.readValue(s, AlarmStateTransitionedEvent.class);
logger.debug(message.toString());
logger.debug(event.toString());
disruptor.publishEvent(new EventTranslator<AlarmStateTransitionedMessageEvent>() {
disruptor.publishEvent(new EventTranslator<AlarmStateTransitionedEventHolder>() {
@Override
public void translateTo(AlarmStateTransitionedMessageEvent event, long sequence) {
event.setMessage(message);
public void translateTo(AlarmStateTransitionedEventHolder eventHolder, long sequence) {
eventHolder.setEvent(event);
}
});
} catch (Exception e) {

View File

@ -5,8 +5,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.hpcloud.mon.persister.disruptor.MetricDisruptor;
import com.hpcloud.mon.persister.disruptor.event.MetricMessageEvent;
import com.hpcloud.mon.persister.message.MetricEnvelope;
import com.hpcloud.mon.persister.disruptor.event.MetricHolder;
import com.hpcloud.mon.common.model.metric.MetricEnvelope;
import com.lmax.disruptor.EventTranslator;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
@ -49,9 +49,9 @@ public class KafkaMetricsConsumerRunnableBasic implements Runnable {
logger.debug(envelope.toString());
disruptor.publishEvent(new EventTranslator<MetricMessageEvent>() {
disruptor.publishEvent(new EventTranslator<MetricHolder>() {
@Override
public void translateTo(MetricMessageEvent event, long sequence) {
public void translateTo(MetricHolder event, long sequence) {
event.setEnvelope(envelope);
}

View File

@ -3,8 +3,8 @@ package com.hpcloud.mon.persister.disruptor;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedMessageEventFactory;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedMessageEventHandlerFactory;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventFactory;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHandlerFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import org.slf4j.Logger;
@ -18,13 +18,13 @@ public class AlarmHistoryDisruptorProvider implements Provider<AlarmStateHistory
private static final Logger logger = LoggerFactory.getLogger(AlarmHistoryDisruptorProvider.class);
private final MonPersisterConfiguration configuration;
private final AlarmStateTransitionedMessageEventHandlerFactory eventHandlerFactory;
private final AlarmStateTransitionedEventHandlerFactory eventHandlerFactory;
private final ExceptionHandler exceptionHandler;
private final AlarmStateHistoryDisruptor instance;
@Inject
public AlarmHistoryDisruptorProvider(MonPersisterConfiguration configuration,
AlarmStateTransitionedMessageEventHandlerFactory eventHandlerFactory,
AlarmStateTransitionedEventHandlerFactory eventHandlerFactory,
ExceptionHandler exceptionHandler) {
this.configuration = configuration;
this.eventHandlerFactory = eventHandlerFactory;
@ -37,7 +37,7 @@ public class AlarmHistoryDisruptorProvider implements Provider<AlarmStateHistory
logger.debug("Creating disruptor...");
Executor executor = Executors.newCachedThreadPool();
AlarmStateTransitionedMessageEventFactory eventFactory = new AlarmStateTransitionedMessageEventFactory();
AlarmStateTransitionedEventFactory eventFactory = new AlarmStateTransitionedEventFactory();
int bufferSize = configuration.getDisruptorConfiguration().getBufferSize();
logger.debug("Buffer size for instance of disruptor [" + bufferSize + "]");

View File

@ -1,6 +1,6 @@
package com.hpcloud.mon.persister.disruptor;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedMessageEvent;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHolder;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
@ -8,7 +8,7 @@ import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.Executor;
public class AlarmStateHistoryDisruptor extends Disruptor<AlarmStateTransitionedMessageEvent> {
public class AlarmStateHistoryDisruptor extends Disruptor<AlarmStateTransitionedEventHolder> {
public AlarmStateHistoryDisruptor(EventFactory eventFactory, int ringBufferSize, Executor executor) {
super(eventFactory, ringBufferSize, executor);
}

View File

@ -1,6 +1,6 @@
package com.hpcloud.mon.persister.disruptor;
import com.hpcloud.mon.persister.disruptor.event.MetricMessageEvent;
import com.hpcloud.mon.persister.disruptor.event.MetricHolder;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
@ -8,7 +8,7 @@ import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.Executor;
public class MetricDisruptor extends Disruptor<MetricMessageEvent> {
public class MetricDisruptor extends Disruptor<MetricHolder> {
public MetricDisruptor(EventFactory eventFactory, int ringBufferSize, Executor executor) {
super(eventFactory, ringBufferSize, executor);
}

View File

@ -3,8 +3,8 @@ package com.hpcloud.mon.persister.disruptor;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.persister.disruptor.event.MetricMessageEventFactory;
import com.hpcloud.mon.persister.disruptor.event.MetricMessageEventHandlerFactory;
import com.hpcloud.mon.persister.disruptor.event.MetricFactory;
import com.hpcloud.mon.persister.disruptor.event.MetricHandlerFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import org.slf4j.Logger;
@ -18,17 +18,17 @@ public class MetricDisruptorProvider implements Provider<MetricDisruptor> {
private static final Logger logger = LoggerFactory.getLogger(MetricDisruptorProvider.class);
private final MonPersisterConfiguration configuration;
private final MetricMessageEventHandlerFactory metricMessageEventHandlerFactory;
private final MetricHandlerFactory eventHandlerFactory;
private final ExceptionHandler exceptionHandler;
private final MetricDisruptor instance;
@Inject
public MetricDisruptorProvider(MonPersisterConfiguration configuration,
MetricMessageEventHandlerFactory metricMessageEventHandlerFactory,
MetricHandlerFactory eventHandlerFactory,
ExceptionHandler exceptionHandler) {
this.configuration = configuration;
this.metricMessageEventHandlerFactory = metricMessageEventHandlerFactory;
this.eventHandlerFactory = eventHandlerFactory;
this.exceptionHandler = exceptionHandler;
this.instance = createInstance();
}
@ -38,12 +38,12 @@ public class MetricDisruptorProvider implements Provider<MetricDisruptor> {
logger.debug("Creating disruptor...");
Executor executor = Executors.newCachedThreadPool();
MetricMessageEventFactory metricMessageEventFactory = new MetricMessageEventFactory();
MetricFactory eventFactory = new MetricFactory();
int bufferSize = configuration.getDisruptorConfiguration().getBufferSize();
logger.debug("Buffer size for instance of disruptor [" + bufferSize + "]");
MetricDisruptor disruptor = new MetricDisruptor(metricMessageEventFactory, bufferSize, executor);
MetricDisruptor disruptor = new MetricDisruptor(eventFactory, bufferSize, executor);
disruptor.handleExceptionsWith(exceptionHandler);
int batchSize = configuration.getVerticaOutputProcessorConfiguration().getBatchSize();
@ -55,7 +55,7 @@ public class MetricDisruptorProvider implements Provider<MetricDisruptor> {
EventHandler[] eventHandlers = new EventHandler[numOutputProcessors];
for (int i = 0; i < numOutputProcessors; ++i) {
eventHandlers[i] = metricMessageEventHandlerFactory.create(i, numOutputProcessors, batchSize);
eventHandlers[i] = eventHandlerFactory.create(i, numOutputProcessors, batchSize);
}
disruptor.handleEventsWith(eventHandlers);

View File

@ -0,0 +1,13 @@
package com.hpcloud.mon.persister.disruptor.event;
import com.lmax.disruptor.EventFactory;
public class AlarmStateTransitionedEventFactory implements EventFactory<AlarmStateTransitionedEventHolder> {
public static final AlarmStateTransitionedEventFactory INSTANCE = new AlarmStateTransitionedEventFactory();
@Override
public AlarmStateTransitionedEventHolder newInstance() {
return new AlarmStateTransitionedEventHolder();
}
}

View File

@ -13,9 +13,9 @@ import io.dropwizard.setup.Environment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AlarmStateTransitionedMessageEventHandler implements EventHandler<AlarmStateTransitionedMessageEvent> {
public class AlarmStateTransitionedEventHandler implements EventHandler<AlarmStateTransitionedEventHolder> {
private static final Logger logger = LoggerFactory.getLogger(AlarmStateTransitionedMessageEventHandler.class);
private static final Logger logger = LoggerFactory.getLogger(AlarmStateTransitionedEventHandler.class);
private final int ordinal;
private final int numProcessors;
private final int batchSize;
@ -34,12 +34,12 @@ public class AlarmStateTransitionedMessageEventHandler implements EventHandler<A
private final Timer commitTimer;
@Inject
public AlarmStateTransitionedMessageEventHandler(VerticaAlarmStateHistoryRepository repository,
MonPersisterConfiguration configuration,
Environment environment,
@Assisted("ordinal") int ordinal,
@Assisted("numProcessors") int numProcessors,
@Assisted("batchSize") int batchSize) {
public AlarmStateTransitionedEventHandler(VerticaAlarmStateHistoryRepository repository,
MonPersisterConfiguration configuration,
Environment environment,
@Assisted("ordinal") int ordinal,
@Assisted("numProcessors") int numProcessors,
@Assisted("batchSize") int batchSize) {
this.repository = repository;
this.configuration = configuration;
@ -58,9 +58,9 @@ public class AlarmStateTransitionedMessageEventHandler implements EventHandler<A
}
@Override
public void onEvent(AlarmStateTransitionedMessageEvent event, long sequence, boolean b) throws Exception {
public void onEvent(AlarmStateTransitionedEventHolder eventHolder, long sequence, boolean b) throws Exception {
if (event.getMessage() == null) {
if (eventHolder.getEvent() == null) {
logger.debug("Received heartbeat message. Checking last flush time.");
if (millisSinceLastFlush + millisBetweenFlushes < System.currentTimeMillis()) {
logger.debug("It's been more than " + secondsBetweenFlushes + " seconds since last flush. Flushing staging tables now...");
@ -79,10 +79,10 @@ public class AlarmStateTransitionedMessageEventHandler implements EventHandler<A
logger.debug("Sequence number: " + sequence +
" Ordinal: " + ordinal +
" Event: " + event.getMessage());
" Event: " + eventHolder.getEvent());
AlarmStateTransitionedEvent message = event.getMessage();
repository.addToBatch(message);
AlarmStateTransitionedEvent event = eventHolder.getEvent();
repository.addToBatch(event);
if (sequence % batchSize == (batchSize - 1)) {
Timer.Context context = commitTimer.time();

View File

@ -0,0 +1,9 @@
package com.hpcloud.mon.persister.disruptor.event;
import com.google.inject.assistedinject.Assisted;
public interface AlarmStateTransitionedEventHandlerFactory {
AlarmStateTransitionedEventHandler create(@Assisted("ordinal") int ordinal,
@Assisted("numProcessors") int numProcessors,
@Assisted("batchSize") int batchSize);
}

View File

@ -0,0 +1,16 @@
package com.hpcloud.mon.persister.disruptor.event;
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
public class AlarmStateTransitionedEventHolder
{
AlarmStateTransitionedEvent event;
public AlarmStateTransitionedEvent getEvent() {
return event;
}
public void setEvent(AlarmStateTransitionedEvent event) {
this.event = event;
}
}

View File

@ -1,16 +0,0 @@
package com.hpcloud.mon.persister.disruptor.event;
import com.hpcloud.mon.common.event.AlarmStateTransitionedEvent;
public class AlarmStateTransitionedMessageEvent
{
private AlarmStateTransitionedEvent message;
public AlarmStateTransitionedEvent getMessage() {
return message;
}
public void setMessage(AlarmStateTransitionedEvent message) {
this.message = message;
}
}

View File

@ -1,13 +0,0 @@
package com.hpcloud.mon.persister.disruptor.event;
import com.lmax.disruptor.EventFactory;
public class AlarmStateTransitionedMessageEventFactory implements EventFactory<AlarmStateTransitionedMessageEvent> {
public static final AlarmStateTransitionedMessageEventFactory INSTANCE = new AlarmStateTransitionedMessageEventFactory();
@Override
public AlarmStateTransitionedMessageEvent newInstance() {
return new AlarmStateTransitionedMessageEvent();
}
}

View File

@ -1,9 +0,0 @@
package com.hpcloud.mon.persister.disruptor.event;
import com.google.inject.assistedinject.Assisted;
public interface AlarmStateTransitionedMessageEventHandlerFactory {
AlarmStateTransitionedMessageEventHandler create(@Assisted("ordinal") int ordinal,
@Assisted("numProcessors") int numProcessors,
@Assisted("batchSize") int batchSize);
}

View File

@ -0,0 +1,13 @@
package com.hpcloud.mon.persister.disruptor.event;
import com.lmax.disruptor.EventFactory;
public class MetricFactory implements EventFactory<MetricHolder> {
public static final MetricFactory INSTANCE = new MetricFactory();
@Override
public MetricHolder newInstance() {
return new MetricHolder();
}
}

View File

@ -5,8 +5,8 @@ import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.hpcloud.mon.common.model.metric.Metric;
import com.hpcloud.mon.persister.configuration.MonPersisterConfiguration;
import com.hpcloud.mon.persister.message.MetricMessage;
import com.hpcloud.mon.persister.repository.Sha1HashId;
import com.hpcloud.mon.persister.repository.VerticaMetricRepository;
import com.lmax.disruptor.EventHandler;
@ -21,10 +21,11 @@ import java.util.Map;
import java.util.TimeZone;
import java.util.TreeMap;
public class MetricMessageEventHandler implements EventHandler<MetricMessageEvent> {
public class MetricHandler implements EventHandler<MetricHolder> {
private static final Logger logger = LoggerFactory.getLogger(MetricMessageEventHandler.class);
private static final Logger logger = LoggerFactory.getLogger(MetricHandler.class);
private static final String TENANT_ID = "tenantId";
private static final String REGION = "region";
private final int ordinal;
private final int numProcessors;
@ -44,17 +45,17 @@ public class MetricMessageEventHandler implements EventHandler<MetricMessageEven
private final Counter definitionCounter;
private final Counter dimensionCounter;
private final Counter definitionDimensionsCounter;
private final Meter metricMessageMeter;
private final Meter metricMeter;
private final Meter commitMeter;
private final Timer commitTimer;
@Inject
public MetricMessageEventHandler(VerticaMetricRepository verticaMetricRepository,
MonPersisterConfiguration configuration,
Environment environment,
@Assisted("ordinal") int ordinal,
@Assisted("numProcessors") int numProcessors,
@Assisted("batchSize") int batchSize) {
public MetricHandler(VerticaMetricRepository verticaMetricRepository,
MonPersisterConfiguration configuration,
Environment environment,
@Assisted("ordinal") int ordinal,
@Assisted("numProcessors") int numProcessors,
@Assisted("batchSize") int batchSize) {
this.verticaMetricRepository = verticaMetricRepository;
this.configuration = configuration;
@ -63,7 +64,7 @@ public class MetricMessageEventHandler implements EventHandler<MetricMessageEven
this.definitionCounter = this.environment.metrics().counter(this.getClass().getName() + "." + "metric-definitions-added-to-batch-counter");
this.dimensionCounter = this.environment.metrics().counter(this.getClass().getName() + "." + "metric-dimensions-added-to-batch-counter");
this.definitionDimensionsCounter = this.environment.metrics().counter(this.getClass().getName() + "." + "metric-definition-dimensions-added-to-batch-counter");
this.metricMessageMeter = this.environment.metrics().meter(this.getClass().getName() + "." + "metrics-messages-processed-meter");
this.metricMeter = this.environment.metrics().meter(this.getClass().getName() + "." + "metrics-messages-processed-meter");
this.commitMeter = this.environment.metrics().meter(this.getClass().getName() + "." + "commits-executed-meter");
this.commitTimer = this.environment.metrics().timer(this.getClass().getName() + "." + "total-commit-and-flush-timer");
@ -81,9 +82,9 @@ public class MetricMessageEventHandler implements EventHandler<MetricMessageEven
}
@Override
public void onEvent(MetricMessageEvent metricMessageEvent, long sequence, boolean b) throws Exception {
public void onEvent(MetricHolder metricEvent, long sequence, boolean b) throws Exception {
if (metricMessageEvent.getMetricEnvelope() == null) {
if (metricEvent.getMetricEnvelope() == null) {
logger.debug("Received heartbeat message. Checking last flush time.");
if (millisSinceLastFlush + millisBetweenFlushes < System.currentTimeMillis()) {
logger.debug("It's been more than " + secondsBetweenFlushes + " seconds since last flush. Flushing staging tables now...");
@ -98,34 +99,39 @@ public class MetricMessageEventHandler implements EventHandler<MetricMessageEven
return;
}
metricMessageMeter.mark();
metricMeter.mark();
logger.debug("Sequence number: " + sequence +
" Ordinal: " + ordinal +
" Event: " + metricMessageEvent.getMetricEnvelope().metric);
" Event: " + metricEvent.getMetricEnvelope().metric);
MetricMessage metricMessage = metricMessageEvent.getMetricEnvelope().metric;
Map<String, Object> meta = metricMessageEvent.getMetricEnvelope().meta;
Metric metric = metricEvent.getMetricEnvelope().metric;
Map<String, Object> meta = metricEvent.getMetricEnvelope().meta;
String tenantId = "";
if (!meta.containsKey(TENANT_ID)) {
logger.warn("Failed to find 'tenantId' in message envelope meta data. Metric message may be mal-formed. Setting 'tenantId' to empty string.");
logger.warn(metricMessage.toString());
logger.warn(metric.toString());
logger.warn("meta" + meta.toString());
} else {
tenantId = (String) meta.get(TENANT_ID);
}
String definitionIdStringToHash = metricMessage.getName() + tenantId + metricMessage.getRegion();
String region = "";
if (meta.containsKey(REGION)) {
region = (String) meta.get(REGION);
}
String definitionIdStringToHash = metric.getName() + tenantId + region;
byte[] definitionIdSha1Hash = DigestUtils.sha(definitionIdStringToHash);
Sha1HashId definitionSha1HashId = new Sha1HashId((definitionIdSha1Hash));
verticaMetricRepository.addToBatchStagingDefinitions(definitionSha1HashId, metricMessage.getName(), tenantId, metricMessage.getRegion());
verticaMetricRepository.addToBatchStagingDefinitions(definitionSha1HashId, metric.getName(), tenantId, region);
definitionCounter.inc();
String dimensionIdStringToHash = "";
if (metricMessage.getDimensions() != null) {
if (metric.getDimensions() != null) {
// Sort the dimensions on name and value.
TreeMap<String, String> dimensionTreeMap = new TreeMap<>(metricMessage.getDimensions());
TreeMap<String, String> dimensionTreeMap = new TreeMap<>(metric.getDimensions());
for (String dimensionName : dimensionTreeMap.keySet()) {
String dimensionValue = dimensionTreeMap.get(dimensionName);
dimensionIdStringToHash += dimensionName + dimensionValue;
@ -134,8 +140,8 @@ public class MetricMessageEventHandler implements EventHandler<MetricMessageEven
byte[] dimensionIdSha1Hash = DigestUtils.sha(dimensionIdStringToHash);
Sha1HashId dimensionsSha1HashId = new Sha1HashId(dimensionIdSha1Hash);
if (metricMessage.getDimensions() != null) {
TreeMap<String, String> dimensionTreeMap = new TreeMap<>(metricMessage.getDimensions());
if (metric.getDimensions() != null) {
TreeMap<String, String> dimensionTreeMap = new TreeMap<>(metric.getDimensions());
for (String dimensionName : dimensionTreeMap.keySet()) {
String dimensionValue = dimensionTreeMap.get(dimensionName);
verticaMetricRepository.addToBatchStagingDimensions(dimensionsSha1HashId, dimensionName, dimensionValue);
@ -149,22 +155,19 @@ public class MetricMessageEventHandler implements EventHandler<MetricMessageEven
verticaMetricRepository.addToBatchStagingdefinitionDimensions(definitionDimensionsSha1HashId, definitionSha1HashId, dimensionsSha1HashId);
definitionDimensionsCounter.inc();
if (metricMessage.getValue() != null && metricMessage.getTimestamp() != null) {
String timeStamp = simpleDateFormat.format(new Date(Long.parseLong(metricMessage.getTimestamp()) * 1000));
Double value = metricMessage.getValue();
if (metric.getTimeValues() == null) {
String timeStamp = simpleDateFormat.format(new Date(metric.getTimestamp() * 1000));
double value = metric.getValue();
verticaMetricRepository.addToBatchMetrics(definitionDimensionsSha1HashId, timeStamp, value);
metricCounter.inc();
}
if (metricMessage.getTime_values() != null) {
if (metricMessage.getTime_values() != null) {
for (Double[] timeValuePairs : metricMessage.getTime_values()) {
String timeStamp = simpleDateFormat.format(new Date((long) (timeValuePairs[0] * 1000)));
Double value = timeValuePairs[1];
verticaMetricRepository.addToBatchMetrics(definitionDimensionsSha1HashId, timeStamp, value);
metricCounter.inc();
else {
for (double[] timeValuePairs : metric.getTimeValues()) {
String timeStamp = simpleDateFormat.format(new Date((long) (timeValuePairs[0] * 1000)));
double value = timeValuePairs[1];
verticaMetricRepository.addToBatchMetrics(definitionDimensionsSha1HashId, timeStamp, value);
metricCounter.inc();
}
}
}

View File

@ -0,0 +1,9 @@
package com.hpcloud.mon.persister.disruptor.event;
import com.google.inject.assistedinject.Assisted;
public interface MetricHandlerFactory {
MetricHandler create(@Assisted("ordinal") int ordinal,
@Assisted("numProcessors") int numProcessors,
@Assisted("batchSize") int batchSize);
}

View File

@ -1,8 +1,10 @@
package com.hpcloud.mon.persister.disruptor.event;
import com.hpcloud.mon.persister.message.MetricEnvelope;
import com.hpcloud.mon.common.model.metric.MetricEnvelope;
public class MetricMessageEvent {
public class MetricHolder {
MetricEnvelope metricEnvelope;
public MetricEnvelope getMetricEnvelope() {
return metricEnvelope;
@ -11,7 +13,4 @@ public class MetricMessageEvent {
public void setEnvelope(MetricEnvelope metricEnvelope) {
this.metricEnvelope = metricEnvelope;
}
public MetricEnvelope metricEnvelope;
}

View File

@ -1,13 +0,0 @@
package com.hpcloud.mon.persister.disruptor.event;
import com.lmax.disruptor.EventFactory;
public class MetricMessageEventFactory implements EventFactory<MetricMessageEvent> {
public static final MetricMessageEventFactory INSTANCE = new MetricMessageEventFactory();
@Override
public MetricMessageEvent newInstance() {
return new MetricMessageEvent();
}
}

View File

@ -1,9 +0,0 @@
package com.hpcloud.mon.persister.disruptor.event;
import com.google.inject.assistedinject.Assisted;
public interface MetricMessageEventHandlerFactory {
MetricMessageEventHandler create(@Assisted("ordinal") int ordinal,
@Assisted("numProcessors") int numProcessors,
@Assisted("batchSize") int batchSize);
}

View File

@ -1,33 +0,0 @@
package com.hpcloud.mon.persister.message;
import com.google.common.base.Preconditions;
import java.util.Map;
public class MetricEnvelope {
public MetricMessage metric;
public Map<String, Object> meta;
protected MetricEnvelope() {
}
public MetricEnvelope(MetricMessage metric) {
Preconditions.checkNotNull(metric, "metric");
this.metric = metric;
}
public MetricEnvelope(MetricMessage metric, Map<String, Object> meta) {
Preconditions.checkNotNull(metric, "metric");
Preconditions.checkNotNull(meta, "meta");
this.metric = metric;
this.meta = meta;
}
@Override
public String toString() {
return "MetricEnvelope{" +
"metric=" + metric +
", meta=" + meta +
'}';
}
}

View File

@ -1,79 +0,0 @@
package com.hpcloud.mon.persister.message;
import java.util.Arrays;
import java.util.Map;
public class MetricMessage {
String name = null;
String region = "";
Map<String, String> dimensions = null;
String timestamp = null;
Double value = null;
Double[][] time_values = null;
@Override
public String toString() {
return "MetricMessage{" +
"name='" + name + '\'' +
", region='" + region + '\'' +
", dimensions=" + dimensions +
", timeStamp='" + timestamp + '\'' +
", value=" + value +
", time_values=" + Arrays.toString(time_values) +
'}';
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getRegion() {
return region;
}
public void setRegion(String region) {
this.region = region;
}
public Map<String, String> getDimensions() {
return dimensions;
}
public void setDimensions(Map<String, String> dimensions) {
this.dimensions = dimensions;
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public Double getValue() {
return value;
}
public void setValue(Double value) {
this.value = value;
}
public Double[][] getTime_values() {
return time_values;
}
public void setTime_values(Double[][] time_values) {
this.time_values = time_values;
}
}

View File

@ -3,8 +3,8 @@ package com.hpcloud.mon.persister.repository;
import com.google.inject.Inject;
import com.hpcloud.mon.persister.disruptor.AlarmStateHistoryDisruptor;
import com.hpcloud.mon.persister.disruptor.MetricDisruptor;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedMessageEvent;
import com.hpcloud.mon.persister.disruptor.event.MetricMessageEvent;
import com.hpcloud.mon.persister.disruptor.event.AlarmStateTransitionedEventHolder;
import com.hpcloud.mon.persister.disruptor.event.MetricHolder;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;
import io.dropwizard.lifecycle.Managed;
@ -58,18 +58,18 @@ public class RepositoryCommitHeartbeat implements Managed {
// Send heartbeat
logger.debug("Sending heartbeat message");
metricDisruptor.publishEvent(new EventTranslator<MetricMessageEvent>() {
metricDisruptor.publishEvent(new EventTranslator<MetricHolder>() {
@Override
public void translateTo(MetricMessageEvent event, long sequence) {
public void translateTo(MetricHolder event, long sequence) {
event.setEnvelope(null);
}
});
alarmHistoryDisruptor.publishEvent(new EventTranslator<AlarmStateTransitionedMessageEvent>() {
alarmHistoryDisruptor.publishEvent(new EventTranslator<AlarmStateTransitionedEventHolder>() {
@Override
public void translateTo(AlarmStateTransitionedMessageEvent event, long sequence) {
event.setMessage(null);
public void translateTo(AlarmStateTransitionedEventHolder event, long sequence) {
event.setEvent(null);
}
});