Use individual local temp staging tables.

This commit is contained in:
Deklan Dieterly 2014-03-07 17:01:14 -07:00
parent 262d52f44e
commit 75787e5b37
8 changed files with 136 additions and 155 deletions

View File

@ -31,7 +31,7 @@ disruptorConfiguration:
numProcessors: 2
verticaOutputProcessorConfiguration:
batchSize: 10
batchSize: 2
monDeDuperConfiguration:
dedupeRunFrequencySeconds: 30

View File

@ -9,7 +9,7 @@ import com.hpcloud.consumer.KafkaConsumerRunnableBasic;
import com.hpcloud.consumer.KafkaConsumerRunnableBasicFactory;
import com.hpcloud.consumer.MonConsumer;
import com.hpcloud.dbi.DBIProvider;
import com.hpcloud.dedupe.MonDeDuper;
import com.hpcloud.dedupe.MonDeDuperHeartbeat;
import com.hpcloud.disruptor.DisruptorExceptionHandler;
import com.hpcloud.disruptor.DisruptorProvider;
import com.hpcloud.disruptor.event.MetricMessageEventHandler;
@ -53,7 +53,7 @@ public class MonPersisterModule extends AbstractModule {
bind(DBI.class).toProvider(DBIProvider.class).in(Scopes.SINGLETON);
bind(MonConsumer.class);
bind(MonDeDuper.class);
bind(MonDeDuperHeartbeat.class);
}
}

View File

@ -4,7 +4,7 @@ import com.google.inject.Guice;
import com.google.inject.Injector;
import com.hpcloud.configuration.MonPersisterConfiguration;
import com.hpcloud.consumer.MonConsumer;
import com.hpcloud.dedupe.MonDeDuper;
import com.hpcloud.dedupe.MonDeDuperHeartbeat;
import com.hpcloud.healthcheck.SimpleHealthCheck;
import com.hpcloud.resource.Resource;
import com.yammer.dropwizard.Service;
@ -35,8 +35,8 @@ public class MonPersisterService extends Service<MonPersisterConfiguration> {
MonConsumer monConsumer = injector.getInstance(MonConsumer.class);
environment.manage(monConsumer);
MonDeDuper monDeDuper = injector.getInstance(MonDeDuper.class);
environment.manage(monDeDuper);
MonDeDuperHeartbeat monDeDuperHeartbeat = injector.getInstance(MonDeDuperHeartbeat.class);
environment.manage(monDeDuperHeartbeat);
}
}

View File

@ -1,140 +0,0 @@
package com.hpcloud.dedupe;
import com.google.inject.Inject;
import com.hpcloud.configuration.MonPersisterConfiguration;
import com.yammer.dropwizard.lifecycle.Managed;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Timer;
import com.yammer.metrics.core.TimerContext;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
public class MonDeDuper implements Managed {
private static Logger logger = LoggerFactory.getLogger(MonDeDuper.class);
private final MonPersisterConfiguration configuration;
private final DBI dbi;
private final DeDuperRunnable deDuperRunnable;
@Inject
public MonDeDuper(MonPersisterConfiguration configuration,
DBI dbi) {
this.configuration = configuration;
this.dbi = dbi;
this.deDuperRunnable = new DeDuperRunnable(configuration, dbi);
}
@Override
public void start() throws Exception {
Thread deduperThread = new Thread(deDuperRunnable);
deduperThread.start();
}
@Override
public void stop() throws Exception {
}
private static class DeDuperRunnable implements Runnable {
private static Logger logger = LoggerFactory.getLogger(DeDuperRunnable.class);
private final MonPersisterConfiguration configuration;
private final DBI dbi;
private final Handle handle;
private final Timer dedupeTimer = Metrics.newTimer(this.getClass(), "dedupe-execution-timer");
private static final String DEDUPE_STAGING_DEFS =
"insert into MonMetrics.Definitions select distinct * from MonMetrics.StagedDefinitions where metric_definition_id not in (select metric_definition_id from MonMetrics.Definitions)";
private static final String DEDEUP_STAGING_DIMS =
"insert into MonMetrics.Dimensions select distinct * from MonMetrics.StagedDimensions where metric_definition_id not in (select metric_definition_id from MonMetrics.Dimensions)";
private static final String DELETE_STAGING_DEFS =
"delete from monmetrics.stageddefinitions where metric_definition_id in (select metric_definition_id from MonMetrics.Definitions)";
private static final String PURGE_STAGING_DEFS =
"select purge_table('monmetrics.stageddefinitions')";
private static final String DELETE_STAGING_DIMS =
"delete from monmetrics.stageddimensions where metric_definition_id in (select metric_definition_id from MonMetrics.Dimensions)";
private static final String PURGE_STAGING_DIMS =
"select purge_table('monmetrics.stageddimensions')";
private DeDuperRunnable(MonPersisterConfiguration configuration, DBI dbi) {
this.configuration = configuration;
this.dbi = dbi;
this.handle = this.dbi.open();
this.handle.execute("SET TIME ZONE TO 'UTC'");
try {
this.handle.getConnection().setAutoCommit(false);
} catch (SQLException e) {
logger.error("Failed to set autocommit to false", e);
System.exit(-1);
}
}
@Override
public void run() {
int seconds = configuration.getMonDeDuperConfiguration().getDedupeRunFrequencySeconds();
long startTime;
long endTime;
for (; ; ) {
try {
Thread.sleep(seconds * 1000);
logger.debug("Waking up after sleeping " + seconds + " seconds, yawn...");
TimerContext context = dedupeTimer.time();
handle.begin();
startTime = System.currentTimeMillis();
logger.debug("Executing: " + DELETE_STAGING_DEFS);
handle.execute(DELETE_STAGING_DEFS);
logger.debug("Executing: " + DEDUPE_STAGING_DEFS);
handle.execute(DEDUPE_STAGING_DEFS);
logger.debug("Executing: " + DELETE_STAGING_DEFS);
handle.execute(DELETE_STAGING_DEFS);
logger.debug("Executing: " + PURGE_STAGING_DEFS);
handle.execute(PURGE_STAGING_DEFS);
handle.commit();
endTime = System.currentTimeMillis();
logger.debug("Deduping metric defintitions took " + (endTime - startTime) / 1000 + " seconds");
handle.begin();
startTime = System.currentTimeMillis();
logger.debug("Executing: " + DELETE_STAGING_DIMS);
handle.execute(DELETE_STAGING_DIMS);
logger.debug("Executing: " + DEDEUP_STAGING_DIMS);
handle.execute(DEDEUP_STAGING_DIMS);
logger.debug("Executing: " + DELETE_STAGING_DIMS);
handle.execute(DELETE_STAGING_DIMS);
logger.debug("Executing: " + PURGE_STAGING_DIMS);
handle.execute(PURGE_STAGING_DIMS);
handle.commit();
endTime = System.currentTimeMillis();
logger.debug("Deduping metric dimensions took " + (endTime - startTime) / 1000 + " seconds");
context.stop();
} catch (InterruptedException e) {
logger.warn("Failed to wait for " + seconds + " between deduping", e);
} catch (Exception e) {
logger.error("Failed to dedupe", e);
}
}
}
}
}

View File

@ -0,0 +1,79 @@
package com.hpcloud.dedupe;
import com.google.inject.Inject;
import com.hpcloud.configuration.MonPersisterConfiguration;
import com.hpcloud.disruptor.event.MetricMessageEvent;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;
import com.yammer.dropwizard.lifecycle.Managed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MonDeDuperHeartbeat implements Managed {
private static Logger logger = LoggerFactory.getLogger(MonDeDuperHeartbeat.class);
private final MonPersisterConfiguration configuration;
private final Disruptor disruptor;
private final DeDuperRunnable deDuperRunnable;
@Inject
public MonDeDuperHeartbeat(MonPersisterConfiguration configuration,
Disruptor disruptor) {
this.configuration = configuration;
this.disruptor = disruptor;
this.deDuperRunnable = new DeDuperRunnable(configuration, disruptor);
}
@Override
public void start() throws Exception {
Thread deduperThread = new Thread(deDuperRunnable);
deduperThread.start();
}
@Override
public void stop() throws Exception {
}
private static class DeDuperRunnable implements Runnable {
private static Logger logger = LoggerFactory.getLogger(DeDuperRunnable.class);
private final MonPersisterConfiguration configuration;
private final Disruptor disruptor;
private DeDuperRunnable(MonPersisterConfiguration configuration, Disruptor disruptor) {
this.configuration = configuration;
this.disruptor = disruptor;
}
@Override
public void run() {
int seconds = configuration.getMonDeDuperConfiguration().getDedupeRunFrequencySeconds();
for (; ; ) {
try {
Thread.sleep(seconds * 1000);
logger.debug("Waking up after sleeping " + seconds + " seconds, yawn...");
// Send heartbeat
logger.debug("Sending dedupe heartbeat message");
disruptor.publishEvent(new EventTranslator<MetricMessageEvent>() {
@Override
public void translateTo(MetricMessageEvent event, long sequence) {
event.setMetricEnvelope(null);
}
});
} catch (Exception e) {
logger.error("Failed to send dedupe heartbeat", e);
}
}
}
}
}

View File

@ -58,6 +58,12 @@ public class MetricMessageEventHandler implements EventHandler<MetricMessageEven
@Override
public void onEvent(MetricMessageEvent metricMessageEvent, long sequence, boolean b) throws Exception {
if (metricMessageEvent.getMetricEnvelope() == null) {
logger.debug("Received heartbeat message. Flushing staging tables.");
verticaMetricRepository.flush();
return;
}
if (((sequence / batchSize) % this.numProcessors) != this.ordinal) {
return;
}
@ -66,7 +72,7 @@ public class MetricMessageEventHandler implements EventHandler<MetricMessageEven
logger.debug("Sequence number: " + sequence +
" Ordinal: " + ordinal +
" Event: " + metricMessageEvent.getMetricEnvelope());
" Event: " + metricMessageEvent.getMetricEnvelope().metric);
MetricMessage metricMessage = metricMessageEvent.getMetricEnvelope().metric;
Map<String, Object> meta = metricMessageEvent.getMetricEnvelope().meta;

View File

@ -15,6 +15,7 @@ public class VerticaMetricRepository extends VerticaRepository {
private static final String SQL_INSERT_INTO_METRICS =
"insert into MonMetrics.metrics (metric_definition_id, time_stamp, value) values (:metric_definition_id, :time_stamp, :value)";
private static final String SQL_INSERT_INTO_STAGING_DEFINITIONS =
"insert into MonMetrics.stagedDefinitions values (:metric_definition_id, :name, :tenant_id," +
":region)";
@ -35,26 +36,43 @@ public class VerticaMetricRepository extends VerticaRepository {
")";
private PreparedBatch metricsBatch;
private PreparedBatch stagedDefinitionsBatch;
private PreparedBatch stagedDimensionsBatch;
private final String sDefs;
private final String sDims;
private final String dsDefs;
private final String dsDims;
@Inject
public VerticaMetricRepository(DBI dbi) throws NoSuchAlgorithmException, SQLException {
super(dbi);
logger.debug("Instantiating: " + this);
String sDefs = this.toString().replaceAll(".", "_").replaceAll("@", "_") + "staged_definitions";
String sDims = this.toString().replaceAll(".", "_").replaceAll("@", "_") + "staged_dimensions";
handle.execute("drop table if exists" + sDefs + " cascade");
handle.execute("drop table if exists" + sDims + " cascade");
this.sDefs = this.toString().replaceAll("\\.", "_").replaceAll("\\@", "_") + "_staged_definitions";
logger.debug("temp staging definitions table: " + sDefs);
this.sDims = this.toString().replaceAll("\\.", "_").replaceAll("\\@", "_") + "_staged_dimensions";
logger.debug("temp staging dimensions table: " + sDims);
this.dsDefs = "insert into MonMetrics.Definitions select distinct * from " + sDefs + " where metric_definition_id not in (select metric_definition_id from MonMetrics.Definitions)";
logger.debug("insert stmt: " + dsDefs);
this.dsDims = "insert into MonMetrics.Dimensions select distinct * from " + sDims + " where metric_definition_id not in (select metric_definition_id from MonMetrics.Dimensions)";
logger.debug("insert stmt: " + dsDefs);
handle.execute("drop table if exists " + sDefs + " cascade");
handle.execute("drop table if exists " + sDims + " cascade");
handle.execute("create local temp table " + sDefs + " " + defs + " on commit preserve rows");
handle.execute("create local temp table " + sDims + " " + dims + " on commit preserve rows");
handle.getConnection().setAutoCommit(false);
metricsBatch = handle.prepareBatch(SQL_INSERT_INTO_METRICS);
stagedDefinitionsBatch = handle.prepareBatch("insert into " + sDefs + " values (:metric_definition_id, :name, :tenant_id, ");
stagedDimensionsBatch = handle.prepareBatch(SQL_INSERT_INTO_STAGING_DIMENSIONS);
stagedDefinitionsBatch = handle.prepareBatch("insert into " + sDefs + " values (:metric_definition_id, :name, :tenant_id, :region)");
stagedDimensionsBatch = handle.prepareBatch("insert into " + sDims + " values (:metric_definition_id, :name, :value)");
handle.begin();
}
@ -72,11 +90,28 @@ public class VerticaMetricRepository extends VerticaRepository {
.bind(2, value);
}
public void flush() {
commitBatch();
long startTime = System.currentTimeMillis();
handle.execute(dsDefs);
handle.execute("truncate table " + sDefs);
handle.execute(dsDims);
handle.execute("truncate table " + sDims);
handle.commit();
handle.begin();
long endTime = System.currentTimeMillis();
logger.debug("Flushing staging tables took " + (endTime - startTime) / 1000 + " seconds");
}
public void commitBatch() {
long startTime = System.currentTimeMillis();
metricsBatch.execute();
stagedDefinitionsBatch.execute();
stagedDimensionsBatch.execute();
handle.commit();
handle.begin();
long endTime = System.currentTimeMillis();
logger.debug("Commiting batch took " + (endTime - startTime) / 1000 + " seconds");
}
}

View File

@ -31,7 +31,7 @@ disruptorConfiguration:
numProcessors: 2
verticaOutputProcessorConfiguration:
batchSize: 50000
batchSize: 25000
monDeDuperConfiguration:
dedupeRunFrequencySeconds: 30
@ -81,7 +81,8 @@ logging:
# Sets the level for 'com.example.app' to DEBUG.
com.example.app: DEBUG
com.hpcloud: DEBUG
com.hpcloud.repository: DEBUG
com.hpcloud.disruptor.event: INFO
# Settings for logging to stdout.
console: