Make Deduper delete and purge.

This commit is contained in:
Deklan Dieterly 2014-03-05 15:15:16 -07:00
parent 2dcb798abb
commit 570143fdd4
3 changed files with 30 additions and 13 deletions

View File

@ -53,11 +53,17 @@ public class MonDeDuper implements Managed {
private static final String DEDEUP_STAGING_DIMS =
"insert into MonMetrics.Dimensions select distinct * from MonMetrics.StagedDimensions where metric_definition_id not in (select metric_definition_id from MonMetrics.Dimensions)";
private static final String TRUNCATE_STAGING_DEFS =
"truncate table monmetrics.stageddefinitions";
private static final String DELETE_STAGING_DEFS =
"delete from monmetrics.stageddefinitions where metric_definition_id in (select metric_definition_id from MonMetrics.Definitions)";
private static final String TRUNCATE_STAGING_DIMS =
"truncate table monmetrics.stageddimensions";
private static final String PURGE_STAGING_DEFS =
"select purge_table('monmetrics.stageddefinitions')";
private static final String DELETE_STAGING_DIMS =
"delete from monmetrics.stageddimensions where metric_definition_id in (select metric_definition_id from MonMetrics.Dimensions)";
private static final String PURGE_STAGING_DIMS =
"select purge_table('monmetrics.stageddimensions')";
private DeDuperRunnable(MonPersisterConfiguration configuration, DBI dbi) {
this.configuration = configuration;
@ -76,27 +82,36 @@ public class MonDeDuper implements Managed {
@Override
public void run() {
int seconds = configuration.getMonDeDuperConfiguration().getDedupeRunFrequencySeconds();
long startTime;
long endTime;
for (; ; ) {
try {
Thread.sleep(seconds * 1000);
handle.begin();
logger.debug("Waited " + seconds + " seconds");
logger.debug("Waking up after sleeping " + seconds + " seconds, yawn...");
startTime = System.currentTimeMillis();
logger.debug("Executing: " + DEDUPE_STAGING_DEFS);
handle.execute(DEDUPE_STAGING_DEFS);
logger.debug("Executing: " + TRUNCATE_STAGING_DEFS);
handle.execute(TRUNCATE_STAGING_DEFS);
logger.debug("Executing: " + DELETE_STAGING_DEFS);
handle.execute(DELETE_STAGING_DEFS);
logger.debug("Executing: " + PURGE_STAGING_DEFS);
handle.execute(PURGE_STAGING_DEFS);
handle.commit();
endTime = System.currentTimeMillis();
logger.debug("Deduping metric defintitions took " + (endTime - startTime) / 1000 + " seconds");
handle.begin();
startTime = System.currentTimeMillis();
logger.debug("Executing: " + DEDEUP_STAGING_DIMS);
handle.execute(DEDEUP_STAGING_DIMS);
logger.debug("Executing: " + TRUNCATE_STAGING_DIMS);
handle.execute(TRUNCATE_STAGING_DIMS);
logger.debug("Executing: " + DELETE_STAGING_DIMS);
handle.execute(DELETE_STAGING_DIMS);
logger.debug("Executing: " + PURGE_STAGING_DIMS);
handle.execute(PURGE_STAGING_DIMS);
handle.commit();
endTime = System.currentTimeMillis();
logger.debug("Deduping metric dimensions took " + (endTime - startTime) / 1000 + " seconds");
} catch (InterruptedException e) {
logger.warn("Failed to wait for " + seconds + " between deduping", e);

View File

@ -48,7 +48,7 @@ public class MetricMessageEventHandler implements EventHandler<MetricMessageEven
return;
}
logger.info("Sequence number: " + sequence +
logger.debug("Sequence number: " + sequence +
" Ordinal: " + ordinal +
" Event: " + metricMessageEvent.getMetricMessage());

View File

@ -81,6 +81,8 @@ logging:
# Sets the level for 'com.example.app' to DEBUG.
com.example.app: DEBUG
com.hpcloud.dedupe: DEBUG
com.hpcloud.event: DEBUG
# Settings for logging to stdout.
console: