Add cache for metric definitions.
This commit is contained in:
parent
a6ecbc35d1
commit
ba1a5fdebd
|
@ -36,6 +36,10 @@ verticaOutputProcessorConfiguration:
|
|||
monDeDuperConfiguration:
|
||||
dedupeRunFrequencySeconds: 30
|
||||
|
||||
verticaMetricRepositoryConfiguration:
|
||||
maxCacheSize: 2000000
|
||||
|
||||
|
||||
databaseConfiguration:
|
||||
driverClass: com.vertica.jdbc.Driver
|
||||
# url: jdbc:vertica://mon-aw1rdd1-vertica0001.rndd.aw1.hpcloud.net:5433/som
|
||||
|
|
|
@ -61,4 +61,13 @@ public class MonPersisterConfiguration extends Configuration {
|
|||
return monDeDuperConfiguration;
|
||||
}
|
||||
|
||||
@Valid
|
||||
@NotNull
|
||||
@JsonProperty
|
||||
private VerticaMetricRepositoryConfiguration verticaMetricRepositoryConfiguration = new VerticaMetricRepositoryConfiguration();
|
||||
|
||||
public VerticaMetricRepositoryConfiguration getVerticaMetricRepositoryConfiguration() {
|
||||
return verticaMetricRepositoryConfiguration;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
package com.hpcloud.configuration;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
public class VerticaMetricRepositoryConfiguration {
|
||||
|
||||
@JsonProperty
|
||||
Integer maxCacheSize;
|
||||
|
||||
public Integer getMaxCacheSize() {
|
||||
return maxCacheSize;
|
||||
}
|
||||
}
|
|
@ -140,6 +140,7 @@ public class MetricMessageEventHandler implements EventHandler<MetricMessageEven
|
|||
dimensionCounter.inc();
|
||||
}
|
||||
}
|
||||
|
||||
if (sequence % batchSize == (batchSize - 1)) {
|
||||
TimerContext context = commitTimer.time();
|
||||
flush();
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
package com.hpcloud.repository;
|
||||
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.hpcloud.configuration.MonPersisterConfiguration;
|
||||
import com.yammer.metrics.Metrics;
|
||||
import com.yammer.metrics.core.Timer;
|
||||
import com.yammer.metrics.core.TimerContext;
|
||||
|
@ -11,11 +14,18 @@ import org.slf4j.LoggerFactory;
|
|||
import javax.inject.Inject;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.sql.SQLException;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
public class VerticaMetricRepository extends VerticaRepository {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(VerticaMetricRepository.class);
|
||||
|
||||
private final MonPersisterConfiguration configuration;
|
||||
|
||||
private final Cache<byte[], byte[]> defIdCache;
|
||||
private final Set<byte[]> defIdSet = new HashSet<>();
|
||||
|
||||
private static final String SQL_INSERT_INTO_METRICS =
|
||||
"insert into MonMetrics.metrics (metric_definition_id, time_stamp, value) values (:metric_definition_id, :time_stamp, :value)";
|
||||
|
||||
|
@ -47,10 +57,17 @@ public class VerticaMetricRepository extends VerticaRepository {
|
|||
private final Timer flushTimer = Metrics.newTimer(this.getClass(), "staging-tables-flushed-timer");
|
||||
|
||||
@Inject
|
||||
public VerticaMetricRepository(DBI dbi) throws NoSuchAlgorithmException, SQLException {
|
||||
public VerticaMetricRepository(DBI dbi, MonPersisterConfiguration configuration) throws NoSuchAlgorithmException, SQLException {
|
||||
super(dbi);
|
||||
logger.debug("Instantiating: " + this);
|
||||
|
||||
this.configuration = configuration;
|
||||
|
||||
defIdCache = CacheBuilder.newBuilder()
|
||||
.maximumSize(configuration.getVerticaMetricRepositoryConfiguration().getMaxCacheSize()).build();
|
||||
|
||||
logger.info("Building temp staging tables...");
|
||||
|
||||
this.sDefs = this.toString().replaceAll("\\.", "_").replaceAll("\\@", "_") + "_staged_definitions";
|
||||
logger.debug("temp staging definitions table: " + sDefs);
|
||||
|
||||
|
@ -81,13 +98,19 @@ public class VerticaMetricRepository extends VerticaRepository {
|
|||
}
|
||||
|
||||
public void addToBatchStagingDefinitions(byte[] defId, String name, String tenantId, String region) {
|
||||
stagedDefinitionsBatch.add().bind(0, defId).bind(1, name).bind(2, tenantId).bind(3, region);
|
||||
if (defIdCache.getIfPresent(defId) == null) {
|
||||
stagedDefinitionsBatch.add().bind(0, defId).bind(1, name).bind(2, tenantId).bind(3, region);
|
||||
defIdSet.add(defId);
|
||||
}
|
||||
}
|
||||
|
||||
public void addToBatchStagingDimensions(byte[] defId, String name, String value) {
|
||||
stagedDimensionsBatch.add().bind(0, defId)
|
||||
.bind(1, name)
|
||||
.bind(2, value);
|
||||
if (defIdCache.getIfPresent(defId) == null) {
|
||||
stagedDimensionsBatch.add().bind(0, defId)
|
||||
.bind(1, name)
|
||||
.bind(2, value);
|
||||
defIdSet.add(defId);
|
||||
}
|
||||
}
|
||||
|
||||
public void flush() {
|
||||
|
@ -113,9 +136,17 @@ public class VerticaMetricRepository extends VerticaRepository {
|
|||
stagedDefinitionsBatch.execute();
|
||||
stagedDimensionsBatch.execute();
|
||||
handle.commit();
|
||||
updateDefIdCache();
|
||||
handle.begin();
|
||||
context.stop();
|
||||
long endTime = System.currentTimeMillis();
|
||||
logger.debug("Commiting batch took " + (endTime - startTime) / 1000 + " seconds");
|
||||
}
|
||||
|
||||
private void updateDefIdCache() {
|
||||
for (byte[] defId : defIdSet) {
|
||||
defIdCache.put(defId, defId);
|
||||
}
|
||||
defIdSet.clear();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,6 +36,9 @@ verticaOutputProcessorConfiguration:
|
|||
monDeDuperConfiguration:
|
||||
dedupeRunFrequencySeconds: 30
|
||||
|
||||
verticaMetricRepositoryConfiguration:
|
||||
maxCacheSize: 2000000
|
||||
|
||||
databaseConfiguration:
|
||||
driverClass: com.vertica.jdbc.Driver
|
||||
# url: jdbc:vertica://mon-aw1rdd1-vertica0001.rndd.aw1.hpcloud.net:5433/som
|
||||
|
@ -81,8 +84,8 @@ logging:
|
|||
|
||||
# Sets the level for 'com.example.app' to DEBUG.
|
||||
com.example.app: DEBUG
|
||||
com.hpcloud: debug
|
||||
com.hpcloud.repository: DEBUG
|
||||
# com.hpcloud: debug
|
||||
# com.hpcloud.repository: DEBUG
|
||||
|
||||
# Settings for logging to stdout.
|
||||
console:
|
||||
|
|
Loading…
Reference in New Issue