From 04d2d540b2f70cfe1c65629361ed1589be33a0fb Mon Sep 17 00:00:00 2001 From: Deklan Dieterly Date: Mon, 16 Jun 2014 13:26:59 -0600 Subject: [PATCH] Add influxDB Support. --- pom.xml | 694 +++++++++--------- .../mon/Config/DatabaseConfiguration.java | 11 + .../hpcloud/mon/Config/InfluxDBConfig.java | 44 ++ .../com/hpcloud/mon/MonApiConfiguration.java | 50 +- .../java/com/hpcloud/mon/MonApiModule.java | 2 +- .../infrastructure/InfrastructureModule.java | 54 +- ...armStateHistoryInfluxDBRepositoryImpl.java | 145 ++++ .../MeasurementInfluxDBRepositoryImpl.java | 99 +++ ...etricDefinitionInfluxDBRepositoryImpl.java | 70 ++ .../StatisticInfluxDBRepositoryImpl.java | 140 ++++ .../persistence/StatisticRepositoryImpl.java | 373 +++++----- 11 files changed, 1115 insertions(+), 567 deletions(-) create mode 100644 src/main/java/com/hpcloud/mon/Config/DatabaseConfiguration.java create mode 100644 src/main/java/com/hpcloud/mon/Config/InfluxDBConfig.java create mode 100644 src/main/java/com/hpcloud/mon/infrastructure/persistence/AlarmStateHistoryInfluxDBRepositoryImpl.java create mode 100644 src/main/java/com/hpcloud/mon/infrastructure/persistence/MeasurementInfluxDBRepositoryImpl.java create mode 100644 src/main/java/com/hpcloud/mon/infrastructure/persistence/MetricDefinitionInfluxDBRepositoryImpl.java create mode 100644 src/main/java/com/hpcloud/mon/infrastructure/persistence/StatisticInfluxDBRepositoryImpl.java diff --git a/pom.xml b/pom.xml index a51a399fc..50a6fc3be 100644 --- a/pom.xml +++ b/pom.xml @@ -1,355 +1,363 @@ - - 4.0.0 + + 4.0.0 - com.hpcloud - mon-api - 0.1.0 - http://github.com/hpcloud-mon/mon-api - jar + com.hpcloud + mon-api + 0.1.0 + http://github.com/hpcloud-mon/mon-api + jar - - 3.0 - + + 3.0 + - - ${project.version}-${timestamp}-${buildNumber} - ${project.artifactId}-${computedVersion} - 1.0.0-SNAPSHOT - 0.7.0 + + ${project.version}-${timestamp}-${buildNumber} + ${project.artifactId}-${computedVersion} + 1.0.0-SNAPSHOT + 0.7.0 - true - UTF-8 - UTF-8 - + true + UTF-8 + UTF-8 + - - scm:git:git@github.com:hpcloud-mon/mon-api - scm:git:git@github.com:hpcloud-mon/mon-api - + + scm:git:git@github.com:hpcloud-mon/mon-api + scm:git:git@github.com:hpcloud-mon/mon-api + - - - nexus releases - http://nexus.paas.hpcloud.net:8081/nexus/content/repositories/releases - - - nexus 3rd party - http://nexus.paas.hpcloud.net:8081/nexus/content/repositories/thirdparty - - - nexus-snapshots - nexus snapshots - http://nexus.paas.hpcloud.net:8081/nexus/content/repositories/snapshots - - + + + nexus releases + http://nexus.paas.hpcloud.net:8081/nexus/content/repositories/releases + + + nexus 3rd party + http://nexus.paas.hpcloud.net:8081/nexus/content/repositories/thirdparty + + + nexus-snapshots + nexus snapshots + http://nexus.paas.hpcloud.net:8081/nexus/content/repositories/snapshots + + - - - release-deploy-url-override - - - BUILD_NUM - - - - ${versionNumber}.${BUILD_NUM} - - - + + + release-deploy-url-override + + + BUILD_NUM + + + + ${versionNumber}.${BUILD_NUM} + + + - - - com.hpcloud - mon-model - ${mon.common.version} - - - com.hpcloud - mon-persistence - ${mon.common.version} - - - com.hpcloud - mon-util - ${mon.common.version} - - - com.hpcloud - mon-kafka - ${mon.common.version} - - - io.dropwizard - dropwizard-core - ${dropwizard.version} - - - io.dropwizard - dropwizard-db - ${dropwizard.version} - - - io.dropwizard - dropwizard-jdbi - ${dropwizard.version} - - - io.dropwizard - dropwizard-assets - ${dropwizard.version} - - - io.dropwizard - dropwizard-jersey - ${dropwizard.version} - - - com.vertica - vertica-jdbc - 6.1.0 - - - mysql - mysql-connector-java - 5.1.26 - - - com.google.code.findbugs - jsr305 - 2.0.0 - - - com.hp.csbu.cc - CsMiddleware - 3.34.0 - - - org.apache.curator - curator-recipes - 2.2.0-incubating - - - org.slf4j - slf4j-log4j12 - - - - - org.apache.kafka - kafka_2.9.2 - 0.8.0 - - - com.sun.jdmk - jmxtools - - - com.sun.jmx - jmxri - - - org.slf4j - slf4j-simple - - - - - com.wordnik - swagger-jaxrs_2.9.1 - 1.3.1 - + + + com.hpcloud + mon-model + ${mon.common.version} + + + com.hpcloud + mon-persistence + ${mon.common.version} + + + com.hpcloud + mon-util + ${mon.common.version} + + + com.hpcloud + mon-kafka + ${mon.common.version} + + + io.dropwizard + dropwizard-core + ${dropwizard.version} + + + io.dropwizard + dropwizard-db + ${dropwizard.version} + + + io.dropwizard + dropwizard-jdbi + ${dropwizard.version} + + + io.dropwizard + dropwizard-assets + ${dropwizard.version} + + + io.dropwizard + dropwizard-jersey + ${dropwizard.version} + + + com.vertica + vertica-jdbc + 6.1.0 + + + mysql + mysql-connector-java + 5.1.26 + + + com.google.code.findbugs + jsr305 + 2.0.0 + + + com.hp.csbu.cc + CsMiddleware + 3.34.0 + + + org.apache.curator + curator-recipes + 2.2.0-incubating + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.kafka + kafka_2.9.2 + 0.8.0 + + + com.sun.jdmk + jmxtools + + + com.sun.jmx + jmxri + + + org.slf4j + slf4j-simple + + + + + com.wordnik + swagger-jaxrs_2.9.1 + 1.3.1 + + + org.influxdb + influxdb-java + 1.0 + - - - com.hpcloud - mon-testing - ${mon.common.version} - test - - - com.hpcloud - mon-dropwizard - ${mon.common.version} - test-jar - test - - - io.dropwizard - dropwizard-testing - ${dropwizard.version} - test - - + + + com.hpcloud + mon-testing + ${mon.common.version} + test + + + com.hpcloud + mon-dropwizard + ${mon.common.version} + test-jar + test + + + io.dropwizard + dropwizard-testing + ${dropwizard.version} + test + + - - - - maven-clean-plugin - 2.5 - - - - ${project.basedir}/debs - - - - - - org.codehaus.mojo - buildnumber-maven-plugin - 1.1 - - - validate - - create - - - - - false - 6 - - - - org.apache.maven.plugins - maven-compiler-plugin - - 1.7 - 1.7 - - - - org.apache.maven.plugins - maven-surefire-plugin - - performance,functional,integration,database,slow - - - - - org.apache.maven.plugins - maven-failsafe-plugin - - performance,functional,integration,database,slow - ${skipITs} - - - - - integration-test - - - - **/*.class - - - - - - - org.apache.maven.plugins - maven-shade-plugin - 2.1 - - ${computedName} - true - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - - package - - shade - - - - - - com.hpcloud.mon.MonApiApplication - - - true - - - - - - org.apache.maven.plugins - maven-jar-plugin - 2.4 - - - - true - - - - - - jdeb - org.vafer - 1.0 - - - package - - jdeb - - - ${project.basedir}/debs/binaries/${computedName}.deb - - - file - ${project.build.directory}/${computedName}.jar - /opt/mon/mon-api.jar - - - file - ${project.basedir}/src/deb/init/mon-api.conf - /etc/init/mon-api.conf - - - file - ${project.basedir}/src/deb/etc/mon-api-config.yml-sample - - /etc/mon/mon-api-config.yml-sample - - - - - - - - org.apache.maven.plugins - maven-source-plugin - - - attach-sources - - jar - - - - - - org.apache.maven.plugins - maven-scm-plugin - - ${project.version} - - - - + + + + maven-clean-plugin + 2.5 + + + + ${project.basedir}/debs + + + + + + org.codehaus.mojo + buildnumber-maven-plugin + 1.1 + + + validate + + create + + + + + false + 6 + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.7 + 1.7 + + + + org.apache.maven.plugins + maven-surefire-plugin + + performance,functional,integration,database,slow + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + performance,functional,integration,database,slow + ${skipITs} + + + + + integration-test + + + + **/*.class + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 2.1 + + ${computedName} + true + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + package + + shade + + + + + + com.hpcloud.mon.MonApiApplication + + + true + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + true + + + + + + jdeb + org.vafer + 1.0 + + + package + + jdeb + + + ${project.basedir}/debs/binaries/${computedName}.deb + + + file + ${project.build.directory}/${computedName}.jar + /opt/mon/mon-api.jar + + + file + ${project.basedir}/src/deb/init/mon-api.conf + /etc/init/mon-api.conf + + + file + ${project.basedir}/src/deb/etc/mon-api-config.yml-sample + + /etc/mon/mon-api-config.yml-sample + + + + + + + + org.apache.maven.plugins + maven-source-plugin + + + attach-sources + + jar + + + + + + org.apache.maven.plugins + maven-scm-plugin + + ${project.version} + + + + diff --git a/src/main/java/com/hpcloud/mon/Config/DatabaseConfiguration.java b/src/main/java/com/hpcloud/mon/Config/DatabaseConfiguration.java new file mode 100644 index 000000000..bfb79e3b4 --- /dev/null +++ b/src/main/java/com/hpcloud/mon/Config/DatabaseConfiguration.java @@ -0,0 +1,11 @@ +package com.hpcloud.mon.Config; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class DatabaseConfiguration { + + @JsonProperty + String databaseType; + + public String getDatabaseType() { return databaseType; } +} diff --git a/src/main/java/com/hpcloud/mon/Config/InfluxDBConfig.java b/src/main/java/com/hpcloud/mon/Config/InfluxDBConfig.java new file mode 100644 index 000000000..91a9f5689 --- /dev/null +++ b/src/main/java/com/hpcloud/mon/Config/InfluxDBConfig.java @@ -0,0 +1,44 @@ +package com.hpcloud.mon.Config; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class InfluxDBConfig { + + + @JsonProperty + String name; + + public String getName() { + return name; + } + + @JsonProperty + int replicationFactor; + + public int getReplicationFactor() { + return replicationFactor; + } + + @JsonProperty + String url; + + public String getUrl() { + return url; + } + + @JsonProperty + String user; + + public String getUser() { + return user; + } + + @JsonProperty + String password; + + public String getPassword() { + return password; + } + + +} diff --git a/src/main/java/com/hpcloud/mon/MonApiConfiguration.java b/src/main/java/com/hpcloud/mon/MonApiConfiguration.java index 376ad7226..01a4e9aa3 100644 --- a/src/main/java/com/hpcloud/mon/MonApiConfiguration.java +++ b/src/main/java/com/hpcloud/mon/MonApiConfiguration.java @@ -16,26 +16,48 @@ */ package com.hpcloud.mon; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.hpcloud.messaging.kafka.KafkaConfiguration; +import com.hpcloud.mon.Config.DatabaseConfiguration; +import com.hpcloud.mon.Config.InfluxDBConfig; +import com.hpcloud.mon.infrastructure.middleware.MiddlewareConfiguration; import io.dropwizard.Configuration; import io.dropwizard.db.DataSourceFactory; +import org.hibernate.validator.constraints.NotEmpty; import javax.validation.Valid; import javax.validation.constraints.NotNull; -import org.hibernate.validator.constraints.NotEmpty; - -import com.hpcloud.messaging.kafka.KafkaConfiguration; -import com.hpcloud.mon.infrastructure.middleware.MiddlewareConfiguration; - public class MonApiConfiguration extends Configuration { - @NotEmpty public String region; - @NotNull public Boolean accessedViaHttps; - @NotEmpty public String metricsTopic = "metrics"; - @NotEmpty public String eventsTopic = "events"; - @NotEmpty public String alarmStateTransitionsTopic = "alarm-state-transitions"; + @NotEmpty + public String region; + @NotNull + public Boolean accessedViaHttps; + @NotEmpty + public String metricsTopic = "metrics"; + @NotEmpty + public String eventsTopic = "events"; + @NotEmpty + public String alarmStateTransitionsTopic = "alarm-state-transitions"; + + @Valid + @NotNull + public DataSourceFactory mysql; + @Valid + @NotNull + public DataSourceFactory vertica; + @Valid + @NotNull + public KafkaConfiguration kafka; + @Valid + @NotNull + public MiddlewareConfiguration middleware; + @Valid + @NotNull + public InfluxDBConfig influxDB; + @Valid + @NotNull + @JsonProperty + public DatabaseConfiguration databaseConfiguration; - @Valid @NotNull public DataSourceFactory mysql; - @Valid @NotNull public DataSourceFactory vertica; - @Valid @NotNull public KafkaConfiguration kafka; - @Valid @NotNull public MiddlewareConfiguration middleware; } diff --git a/src/main/java/com/hpcloud/mon/MonApiModule.java b/src/main/java/com/hpcloud/mon/MonApiModule.java index c58ad922c..1284ff26a 100644 --- a/src/main/java/com/hpcloud/mon/MonApiModule.java +++ b/src/main/java/com/hpcloud/mon/MonApiModule.java @@ -61,7 +61,7 @@ public class MonApiModule extends AbstractModule { install(new ApplicationModule()); install(new DomainModule()); - install(new InfrastructureModule()); + install(new InfrastructureModule(this.config)); } @Provides diff --git a/src/main/java/com/hpcloud/mon/infrastructure/InfrastructureModule.java b/src/main/java/com/hpcloud/mon/infrastructure/InfrastructureModule.java index 49f97c503..176214369 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/InfrastructureModule.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/InfrastructureModule.java @@ -16,35 +16,51 @@ */ package com.hpcloud.mon.infrastructure; -import javax.inject.Singleton; - import com.google.inject.AbstractModule; +import com.hpcloud.mon.MonApiConfiguration; import com.hpcloud.mon.domain.model.alarm.AlarmRepository; import com.hpcloud.mon.domain.model.alarmstatehistory.AlarmStateHistoryRepository; import com.hpcloud.mon.domain.model.measurement.MeasurementRepository; import com.hpcloud.mon.domain.model.metric.MetricDefinitionRepository; import com.hpcloud.mon.domain.model.notificationmethod.NotificationMethodRepository; import com.hpcloud.mon.domain.model.statistic.StatisticRepository; -import com.hpcloud.mon.infrastructure.persistence.AlarmStateHistoryRepositoryImpl; -import com.hpcloud.mon.infrastructure.persistence.AlarmRepositoryImpl; -import com.hpcloud.mon.infrastructure.persistence.MeasurementRepositoryImpl; -import com.hpcloud.mon.infrastructure.persistence.MetricDefinitionRepositoryImpl; -import com.hpcloud.mon.infrastructure.persistence.NotificationMethodRepositoryImpl; -import com.hpcloud.mon.infrastructure.persistence.StatisticRepositoryImpl; +import com.hpcloud.mon.infrastructure.persistence.*; + +import javax.inject.Singleton; /** * Infrastructure layer bindings. */ public class InfrastructureModule extends AbstractModule { - @Override - protected void configure() { - // Bind repositories - bind(AlarmRepository.class).to(AlarmRepositoryImpl.class).in(Singleton.class); - bind(AlarmStateHistoryRepository.class).to(AlarmStateHistoryRepositoryImpl.class).in(Singleton.class); - bind(MetricDefinitionRepository.class).to(MetricDefinitionRepositoryImpl.class).in(Singleton.class); - bind(MeasurementRepository.class).to(MeasurementRepositoryImpl.class).in(Singleton.class); - bind(StatisticRepository.class).to(StatisticRepositoryImpl.class).in(Singleton.class); - bind(NotificationMethodRepository.class).to(NotificationMethodRepositoryImpl.class).in( - Singleton.class); - } + + private MonApiConfiguration config; + + public InfrastructureModule(MonApiConfiguration config) { + this.config = config; + } + + @Override + protected void configure() { + // Bind repositories + bind(AlarmRepository.class).to(AlarmRepositoryImpl.class).in(Singleton.class); + if (config.databaseConfiguration.getDatabaseType().trim().toLowerCase().equals("vertica")) { + bind(AlarmStateHistoryRepository.class).to(AlarmStateHistoryRepositoryImpl.class).in(Singleton.class); + bind(MetricDefinitionRepository.class).to(MetricDefinitionRepositoryImpl.class).in(Singleton.class); + bind(MeasurementRepository.class).to(MeasurementRepositoryImpl.class).in(Singleton.class); + bind(StatisticRepository.class).to(StatisticRepositoryImpl.class).in(Singleton.class); + } else if (config.databaseConfiguration.getDatabaseType().trim().toLowerCase().equals("influxdb")) { + bind(AlarmStateHistoryRepository.class).to(AlarmStateHistoryInfluxDBRepositoryImpl.class).in(Singleton.class); + bind(MetricDefinitionRepository.class).to(MetricDefinitionInfluxDBRepositoryImpl.class).in(Singleton.class); + bind(MeasurementRepository.class).to(MeasurementInfluxDBRepositoryImpl.class).in(Singleton.class); + bind(StatisticRepository.class).to(StatisticInfluxDBRepositoryImpl.class).in(Singleton.class); + } else { + System.err.println("Unknown database type encountered: " + config.databaseConfiguration.getDatabaseType()); + System.err.println("Supported databases are 'vertica' and 'influxdb'"); + System.err.println("Check your config file."); + System.exit(1); + } + + bind(NotificationMethodRepository.class).to(NotificationMethodRepositoryImpl.class).in( + Singleton.class); + } } diff --git a/src/main/java/com/hpcloud/mon/infrastructure/persistence/AlarmStateHistoryInfluxDBRepositoryImpl.java b/src/main/java/com/hpcloud/mon/infrastructure/persistence/AlarmStateHistoryInfluxDBRepositoryImpl.java new file mode 100644 index 000000000..0007e1d48 --- /dev/null +++ b/src/main/java/com/hpcloud/mon/infrastructure/persistence/AlarmStateHistoryInfluxDBRepositoryImpl.java @@ -0,0 +1,145 @@ +package com.hpcloud.mon.infrastructure.persistence; + +import com.google.inject.Inject; +import com.hpcloud.mon.MonApiConfiguration; +import com.hpcloud.mon.common.model.alarm.AlarmState; +import com.hpcloud.mon.domain.model.alarmstatehistory.AlarmStateHistory; +import com.hpcloud.mon.domain.model.alarmstatehistory.AlarmStateHistoryRepository; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Serie; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.skife.jdbi.v2.DBI; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.Query; +import org.skife.jdbi.v2.util.StringMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.inject.Named; +import java.util.*; +import java.util.concurrent.TimeUnit; + +public class AlarmStateHistoryInfluxDBRepositoryImpl implements AlarmStateHistoryRepository { + + private static final Logger logger = LoggerFactory.getLogger(AlarmStateHistoryInfluxDBRepositoryImpl.class); + + private final MonApiConfiguration config; + private final InfluxDB influxDB; + private final DBI mysql; + + private static final String FIND_ALARMS_SQL = "select distinct a.id from alarm as a " + + "join sub_alarm sa on a.id = sa.alarm_id " + + "left outer join sub_alarm_dimension dim on sa.id = dim.sub_alarm_id%s " + + "where a.tenant_id = :tenantId and a.deleted_at is NULL"; + + @Inject + public AlarmStateHistoryInfluxDBRepositoryImpl(@Named("mysql") DBI mysql, MonApiConfiguration config) { + this.mysql = mysql; + this.config = config; + + this.influxDB = InfluxDBFactory.connect(this.config.influxDB.getUrl(), this.config.influxDB.getUser(), + this.config.influxDB.getPassword()); + } + + @Override + public List findById(String tenantId, String alarmId) { + + // InfluxDB orders queries by time stamp desc by default. + String query = String.format("select alarm_id, old_state, new_state, reason, reason_data " + + "from alarm_state_history " + + "where tenant_id = '%1$s' and alarm_id = '%2$s'", tenantId, alarmId); + + return queryInfluxDBForAlarmStateHistory(query); + } + + @Override + public Collection find(String tenantId, Map dimensions, DateTime startTime, @Nullable DateTime endTime) { + + List alarmIds = null; + // Find alarm Ids for dimensions + try (Handle h = mysql.open()) { + String sql = String.format(FIND_ALARMS_SQL, SubAlarmQueries.buildJoinClauseFor(dimensions)); + Query> query = h.createQuery(sql).bind("tenantId", tenantId); + DimensionQueries.bindDimensionsToQuery(query, dimensions); + alarmIds = query.map(StringMapper.FIRST).list(); + } + + if (alarmIds == null || alarmIds.isEmpty()) { + return Collections.emptyList(); + } + + String timePart = buildTimePart(startTime, endTime); + String alarmsPart = buildAlarmsPart(alarmIds); + + String query = String.format("select alarm_id, old_state, new_state, reason, reason_data " + + "from alarm_state_history " + + "where tenant_id = '%1$s' %2$s %3$s", tenantId, timePart, alarmsPart); + + return queryInfluxDBForAlarmStateHistory(query); + + } + + private String buildAlarmsPart(List alarmIds) { + + String s = ""; + for (String alarmId : alarmIds) { + if (s.length() > 0) { + s += " or "; + } + s += String.format(" alarm_id = '%1$s' ", alarmId); + } + + if (s.length() > 0) { + s = String.format(" and (%1$s)", s); + } + return s; + } + + private String buildTimePart(DateTime startTime, DateTime endTime) { + + String s = ""; + + if (startTime != null) { + s += String.format(" and time > %1$ds", startTime.getMillis() / 1000); + } + + if (endTime != null) { + s += String.format(" and time < %1$ds", endTime.getMillis() / 1000); + } + + return s; + } + + private List queryInfluxDBForAlarmStateHistory(String query) { + + logger.debug("Query string: {}", query); + + List result = this.influxDB.Query(this.config.influxDB.getName(), query, TimeUnit.SECONDS); + + List alarmStateHistoryList = new LinkedList<>(); + + // Should only be one serie -- alarm_state_history. + for (Serie serie : result) { + Object[][] valObjArryArry = serie.getPoints(); + for (int i = 0; i < valObjArryArry.length; i++) { + + AlarmStateHistory alarmStateHistory = new AlarmStateHistory(); + // Time is always in position 0. + alarmStateHistory.setTimestamp(new DateTime(new Long((Integer) valObjArryArry[i][0]) * 1000, DateTimeZone.UTC)); + // Sequence_number is always in position 1. + alarmStateHistory.setAlarmId((String) valObjArryArry[i][2]); + alarmStateHistory.setNewState(AlarmState.valueOf((String) valObjArryArry[i][3])); + alarmStateHistory.setOldState(AlarmState.valueOf((String) valObjArryArry[i][4])); + alarmStateHistory.setReason((String) valObjArryArry[i][5]); + alarmStateHistory.setReasonData((String) valObjArryArry[i][6]); + + alarmStateHistoryList.add(alarmStateHistory); + } + } + + return alarmStateHistoryList; + } +} diff --git a/src/main/java/com/hpcloud/mon/infrastructure/persistence/MeasurementInfluxDBRepositoryImpl.java b/src/main/java/com/hpcloud/mon/infrastructure/persistence/MeasurementInfluxDBRepositoryImpl.java new file mode 100644 index 000000000..6b6726e8e --- /dev/null +++ b/src/main/java/com/hpcloud/mon/infrastructure/persistence/MeasurementInfluxDBRepositoryImpl.java @@ -0,0 +1,99 @@ +package com.hpcloud.mon.infrastructure.persistence; + +import com.google.inject.Inject; +import com.hpcloud.mon.MonApiConfiguration; +import com.hpcloud.mon.domain.model.measurement.MeasurementRepository; +import com.hpcloud.mon.domain.model.measurement.Measurements; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Serie; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.*; +import java.util.concurrent.TimeUnit; + +public class MeasurementInfluxDBRepositoryImpl implements MeasurementRepository { + + private static final Logger logger = LoggerFactory.getLogger(MeasurementInfluxDBRepositoryImpl.class); + + private final MonApiConfiguration config; + private final InfluxDB influxDB; + + public static final DateTimeFormatter DATETIME_FORMATTER = ISODateTimeFormat.dateTimeNoMillis(); + + @Inject + public MeasurementInfluxDBRepositoryImpl(MonApiConfiguration config) { + this.config = config; + + this.influxDB = InfluxDBFactory.connect(this.config.influxDB.getUrl(), this.config.influxDB.getUser(), + this.config.influxDB.getPassword()); + } + + @Override + public Collection find(String tenantId, String name, Map dimensions, DateTime startTime, @Nullable DateTime endTime) { + + String dimWhereClause = ""; + if (dimensions != null) { + for (String colName : dimensions.keySet()) { + dimWhereClause += String.format(" and %1$s = '%2$s'", colName, dimensions.get(colName)); + + } + } + + String timePart = buildTimePart(startTime, endTime); + String query = String.format("select value " + + "from %1$s " + + "where tenant_id = '%2$s' %3$s %4$s", + name, tenantId, timePart, dimWhereClause); + + logger.debug("Query string: {}", query); + + List result = this.influxDB.Query(this.config.influxDB.getName(), query, TimeUnit.MILLISECONDS); + + Measurements measurements = new Measurements(); + measurements.setName(name); + measurements.setDimensions(dimensions); + List valObjArryList = new LinkedList<>(); + for (Serie serie : result) { + Object[][] valObjArry = serie.getPoints(); + for (int i = 0; i < valObjArry.length; i++) { + + Object[] objArry = new Object[3]; + + // sequence_number + objArry[0] = valObjArry[i][1]; + // time + objArry[1] = DATETIME_FORMATTER.print((long) valObjArry[i][0]); + ; + // value + objArry[2] = valObjArry[i][2]; + + valObjArryList.add(objArry); + } + } + + measurements.setMeasurements(valObjArryList); + + return Arrays.asList(measurements); + } + + private String buildTimePart(DateTime startTime, DateTime endTime) { + + String s = ""; + + if (startTime != null) { + s += String.format(" and time > %1$ds", startTime.getMillis() / 1000); + } + + if (endTime != null) { + s += String.format(" and time < %1$ds", endTime.getMillis() / 1000); + } + + return s; + } +} diff --git a/src/main/java/com/hpcloud/mon/infrastructure/persistence/MetricDefinitionInfluxDBRepositoryImpl.java b/src/main/java/com/hpcloud/mon/infrastructure/persistence/MetricDefinitionInfluxDBRepositoryImpl.java new file mode 100644 index 000000000..ee1bb1a7d --- /dev/null +++ b/src/main/java/com/hpcloud/mon/infrastructure/persistence/MetricDefinitionInfluxDBRepositoryImpl.java @@ -0,0 +1,70 @@ +package com.hpcloud.mon.infrastructure.persistence; + +import com.google.inject.Inject; +import com.hpcloud.mon.MonApiConfiguration; +import com.hpcloud.mon.common.model.metric.MetricDefinition; +import com.hpcloud.mon.domain.model.metric.MetricDefinitionRepository; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Serie; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class MetricDefinitionInfluxDBRepositoryImpl implements MetricDefinitionRepository { + private static final Logger logger = LoggerFactory.getLogger(AlarmStateHistoryInfluxDBRepositoryImpl.class); + + private final MonApiConfiguration config; + private final InfluxDB influxDB; + + @Inject + public MetricDefinitionInfluxDBRepositoryImpl(MonApiConfiguration config) { + this.config = config; + this.influxDB = InfluxDBFactory.connect(this.config.influxDB.getUrl(), this.config.influxDB.getUser(), + this.config.influxDB.getPassword()); + } + + @Override + public List find(String tenantId, String name, Map dimensions) { + + String dimWhereClause = ""; + String dimColNames = ""; + boolean first = true; + if (dimensions != null) { + for (String colName : dimensions.keySet()) { + if (first) { + first = false; + } else { + dimWhereClause += " and"; + dimColNames += ","; + } + dimWhereClause += String.format(" %1$s = '%2$s'", colName, dimensions.get(colName)); + dimColNames += colName; + + } + if (dimWhereClause.length() > 0) { + dimWhereClause = String.format(" and %1$s", dimWhereClause); + } + } + String query = String.format("select %1$s from /.*/ where tenant_id = '%2$s' %3$s", dimColNames, tenantId, dimWhereClause); + + logger.debug("Query string: {}", query); + + List result = this.influxDB.Query(this.config.influxDB.getName(), query, TimeUnit.SECONDS); + + List metricDefinitionList = new ArrayList<>(); + for (Serie serie : result) { + + MetricDefinition metricDefinition = new MetricDefinition(); + metricDefinition.name = serie.getName(); + metricDefinition.setDimensions(dimensions); + metricDefinitionList.add(metricDefinition); + } + + return metricDefinitionList; + } +} diff --git a/src/main/java/com/hpcloud/mon/infrastructure/persistence/StatisticInfluxDBRepositoryImpl.java b/src/main/java/com/hpcloud/mon/infrastructure/persistence/StatisticInfluxDBRepositoryImpl.java new file mode 100644 index 000000000..ea6f4e9e3 --- /dev/null +++ b/src/main/java/com/hpcloud/mon/infrastructure/persistence/StatisticInfluxDBRepositoryImpl.java @@ -0,0 +1,140 @@ +package com.hpcloud.mon.infrastructure.persistence; + +import com.google.inject.Inject; +import com.hpcloud.mon.MonApiConfiguration; +import com.hpcloud.mon.domain.model.statistic.StatisticRepository; +import com.hpcloud.mon.domain.model.statistic.Statistics; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Serie; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.util.*; +import java.util.concurrent.TimeUnit; + +public class StatisticInfluxDBRepositoryImpl implements StatisticRepository { + + private static final Logger logger = LoggerFactory.getLogger(StatisticInfluxDBRepositoryImpl.class); + + private final MonApiConfiguration config; + private final InfluxDB influxDB; + + public static final DateTimeFormatter DATETIME_FORMATTER = ISODateTimeFormat.dateTimeNoMillis(); + + @Inject + public StatisticInfluxDBRepositoryImpl(MonApiConfiguration config) { + this.config = config; + + this.influxDB = InfluxDBFactory.connect(this.config.influxDB.getUrl(), this.config.influxDB.getUser(), + this.config.influxDB.getPassword()); + + } + + @Override + public List find(String tenantId, String name, Map dimensions, + DateTime startTime, @Nullable DateTime endTime, + List statistics, int period) { + + String statsPart = buildStatsPart(statistics); + String timePart = buildTimePart(startTime, endTime); + String dimsPart = buildDimPart(dimensions); + String periodPart = buildPeriodPart(period); + + String query = String.format("select time %1$s from %2$s where tenant_id = '%3$s' %4$s %5$s %6$s", + statsPart, name, tenantId, timePart, dimsPart, periodPart); + + logger.debug("Query string: {}", query); + + List result = this.influxDB.Query(this.config.influxDB.getName(), query, TimeUnit.MILLISECONDS); + + List statisticsList = new LinkedList(); + + // Should only be one serie -- name. + for (Serie serie : result) { + Statistics stat = new Statistics(); + stat.setName(serie.getName()); + List colNamesList = new LinkedList<>(statistics); + colNamesList.add(0, "timestamp"); + stat.setColumns(colNamesList); + stat.setDimensions(dimensions); + List> valObjArryArry = new LinkedList>(); + stat.setStatistics(valObjArryArry); + Object[][] pointsArryArry = serie.getPoints(); + for (int i = 0; i < pointsArryArry.length; i++) { + List valObjArry = new ArrayList<>(); + // First column is always time. + valObjArry.add(DATETIME_FORMATTER.print((long) pointsArryArry[i][0])); + for (int j = 1; j < statistics.size() + 1; j++) { + valObjArry.add(pointsArryArry[i][j]); + } + valObjArryArry.add(valObjArry); + } + statisticsList.add(stat); + } + + return statisticsList; + } + + private String buildPeriodPart(int period) { + + String s = ""; + if (period >= 1) { + s += String.format("group by time(%1$ds)", period); + } + + return s; + } + + private String buildDimPart(Map dims) { + + String s = ""; + if (dims != null) { + for (String colName : dims.keySet()) { + if (s.length() > 0) { + s += " and"; + } + s += String.format(" %1$s = '%2$s'", colName, dims.get(colName)); + } + + if (s.length() > 0) { + s = " and " + s; + } + } + return s; + } + + private String buildTimePart(DateTime startTime, DateTime endTime) { + + String s = ""; + + if (startTime != null) { + s += String.format(" and time > %1$ds", startTime.getMillis() / 1000); + } + + if (endTime != null) { + s += String.format(" and time < %1$ds", endTime.getMillis() / 1000); + } + + return s; + } + + private String buildStatsPart(List statistics) { + + String s = ""; + for (String statistic : statistics) { + s += ","; + if (statistic.trim().toLowerCase().equals("avg")) { + s += " mean(value)"; + } else { + s += " " + statistic + "(value)"; + } + } + + return s; + } +} diff --git a/src/main/java/com/hpcloud/mon/infrastructure/persistence/StatisticRepositoryImpl.java b/src/main/java/com/hpcloud/mon/infrastructure/persistence/StatisticRepositoryImpl.java index 7b1a1d422..2e4aab192 100644 --- a/src/main/java/com/hpcloud/mon/infrastructure/persistence/StatisticRepositoryImpl.java +++ b/src/main/java/com/hpcloud/mon/infrastructure/persistence/StatisticRepositoryImpl.java @@ -16,17 +16,8 @@ */ package com.hpcloud.mon.infrastructure.persistence; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import javax.inject.Inject; -import javax.inject.Named; - +import com.hpcloud.mon.domain.model.statistic.StatisticRepository; +import com.hpcloud.mon.domain.model.statistic.Statistics; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.ISODateTimeFormat; @@ -34,219 +25,221 @@ import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Query; -import com.hpcloud.mon.domain.model.statistic.StatisticRepository; -import com.hpcloud.mon.domain.model.statistic.Statistics; +import javax.inject.Inject; +import javax.inject.Named; +import java.sql.Timestamp; +import java.util.*; /** * Vertica statistic repository implementation. */ public class StatisticRepositoryImpl implements StatisticRepository { - public static final DateTimeFormatter DATETIME_FORMATTER = ISODateTimeFormat.dateTimeNoMillis() - .withZoneUTC(); - private static final String FIND_BY_METRIC_DEF_SQL = "select dd.id, def.name, d.name as dname, d.value as dvalue " - + "from MonMetrics.Definitions def, MonMetrics.DefinitionDimensions dd " - + "left outer join MonMetrics.Dimensions d on d.dimension_set_id = dd.dimension_set_id%s " - + "where def.id = dd.definition_id and def.tenant_id = :tenantId%s order by dd.id"; + public static final DateTimeFormatter DATETIME_FORMATTER = ISODateTimeFormat.dateTimeNoMillis() + .withZoneUTC(); + private static final String FIND_BY_METRIC_DEF_SQL = "select dd.id, def.name, d.name as dname, d.value as dvalue " + + "from MonMetrics.Definitions def, MonMetrics.DefinitionDimensions dd " + + "left outer join MonMetrics.Dimensions d on d.dimension_set_id = dd.dimension_set_id%s " + + "where def.id = dd.definition_id and def.tenant_id = :tenantId%s order by dd.id"; - private final DBI db; - - @Inject - public StatisticRepositoryImpl(@Named("vertica") DBI db) { - this.db = db; - } + private final DBI db; - @Override - public List find(String tenantId, String name, Map dimensions, - DateTime startTime, DateTime endTime, List statistics, int period) { - List listStats = new ArrayList<>(); - List copyStatistics = createColumns(statistics); + @Inject + public StatisticRepositoryImpl(@Named("vertica") DBI db) { + this.db = db; + } - try (Handle h = db.open()) { - Map byteMap = findDefIds(h, tenantId, name, dimensions, startTime, endTime); + @Override + public List find(String tenantId, String name, Map dimensions, + DateTime startTime, DateTime endTime, List statistics, int period) { + List listStats = new ArrayList<>(); + List copyStatistics = createColumns(statistics); - for (byte[] bufferId : byteMap.keySet()) { + try (Handle h = db.open()) { + Map byteMap = findDefIds(h, tenantId, name, dimensions, startTime, endTime); - Query> query = h.createQuery( - createQuery(period, startTime, endTime, statistics)) - .bind("definition_id", bufferId) - .bind("start_time", startTime) - .bind("end_time", endTime); + for (byte[] bufferId : byteMap.keySet()) { + + Query> query = h.createQuery( + createQuery(period, startTime, endTime, statistics)) + .bind("definition_id", bufferId) + .bind("start_time", startTime) + .bind("end_time", endTime); + + // Execute + List> rows = query.list(); + List statisticsRow = new ArrayList(); + + for (Map row : rows) { + Double sum = (Double) row.get("sum"); + Double average = (Double) row.get("avg"); + Double min = (Double) row.get("min"); + Double max = (Double) row.get("max"); + Long count = (Long) row.get("count"); + Timestamp time_stamp = (Timestamp) row.get("time_interval"); + + if (time_stamp != null) { + statisticsRow.add(DATETIME_FORMATTER.print(time_stamp.getTime())); + } + + if (average != null) { + statisticsRow.add(average); + } + if (count != null) { + statisticsRow.add(count); + } + if (max != null) { + statisticsRow.add(max); + } + if (min != null) { + statisticsRow.add(min); + } + if (sum != null) { + statisticsRow.add(sum); + } + byteMap.get(bufferId).addValues(statisticsRow); + statisticsRow = new ArrayList<>(); + } + + byteMap.get(bufferId).setColumns(copyStatistics); + listStats.add(byteMap.get(bufferId)); + } + } + return listStats; + } + + private Map findDefIds(Handle h, String tenantId, String name, + Map dimensions, DateTime startTime, DateTime endTime) { + List bytes = new ArrayList<>(); + + // Build query + StringBuilder sbWhere = new StringBuilder(); + + if (name != null) + sbWhere.append(" and def.name = :name"); + + String sql = String.format(FIND_BY_METRIC_DEF_SQL, + MetricQueries.buildJoinClauseFor(dimensions), sbWhere); + + Query> query = h.createQuery(sql) + .bind("tenantId", tenantId) + .bind("startTime", startTime); + + if (name != null) { + query.bind("name", name); + } + + if (endTime != null) { + query.bind("endTime", new Timestamp(endTime.getMillis())); + } + + DimensionQueries.bindDimensionsToQuery(query, dimensions); // Execute List> rows = query.list(); - List statisticsRow = new ArrayList(); + Map byteIdMap = new HashMap<>(); + + // Build results + byte[] currentId = null; + Map dims = null; for (Map row : rows) { - Double sum = (Double) row.get("sum"); - Double average = (Double) row.get("avg"); - Double min = (Double) row.get("min"); - Double max = (Double) row.get("max"); - Long count = (Long) row.get("count"); - Timestamp time_stamp = (Timestamp) row.get("time_interval"); + byte[] defId = (byte[]) row.get("id"); + String defName = (String) row.get("name"); + String demName = (String) row.get("dname"); + String demValue = (String) row.get("dvalue"); - if (time_stamp != null) { - statisticsRow.add(DATETIME_FORMATTER.print(time_stamp.getTime())); - } + if (defId == null || !Arrays.equals(currentId, defId)) { + currentId = defId; + dims = new HashMap<>(); + dims.put(demName, demValue); - if (average != null) { - statisticsRow.add(average); - } - if (count != null) { - statisticsRow.add(count); - } - if (max != null) { - statisticsRow.add(max); - } - if (min != null) { - statisticsRow.add(min); - } - if (sum != null) { - statisticsRow.add(sum); - } - byteMap.get(bufferId).addValues(statisticsRow); - statisticsRow = new ArrayList<>(); + Statistics statistics = new Statistics(); + statistics.setName(defName); + statistics.setDimensions(dims); + byteIdMap.put(currentId, statistics); + } else + dims.put(demName, demValue); } - byteMap.get(bufferId).setColumns(copyStatistics); - listStats.add(byteMap.get(bufferId)); - } - } - return listStats; - } + bytes.add(currentId); - private Map findDefIds(Handle h, String tenantId, String name, - Map dimensions, DateTime startTime, DateTime endTime) { - List bytes = new ArrayList<>(); - - // Build query - StringBuilder sbWhere = new StringBuilder(); - - if (name != null) - sbWhere.append(" and def.name = :name"); - - String sql = String.format(FIND_BY_METRIC_DEF_SQL, - MetricQueries.buildJoinClauseFor(dimensions), sbWhere); - - Query> query = h.createQuery(sql) - .bind("tenantId", tenantId) - .bind("startTime", startTime); - - if (name != null) { - query.bind("name", name); + return byteIdMap; } - if (endTime != null) { - query.bind("endTime", new Timestamp(endTime.getMillis())); + List createColumns(List list) { + List copy = new ArrayList<>(); + for (String string : list) { + copy.add(string); + } + Collections.sort(copy); + copy.add(0, "timestamp"); + + return copy; } - DimensionQueries.bindDimensionsToQuery(query, dimensions); + private String createQuery(int period, DateTime startTime, DateTime endTime, + List statistics) { + StringBuilder builder = new StringBuilder(); - // Execute - List> rows = query.list(); + builder.append("SELECT " + getColumns(statistics)); - Map byteIdMap = new HashMap<>(); + if (period >= 1) { + builder.append(",MIN(time_stamp) as time_interval "); + builder.append(" FROM (Select FLOOR((EXTRACT('epoch' from time_stamp) - "); + builder.append(createOffset(period, startTime, endTime)); + builder.append(" AS time_slice, time_stamp, value "); + } - // Build results - byte[] currentId = null; - Map dims = null; - for (Map row : rows) { - byte[] defId = (byte[]) row.get("id"); - String defName = (String) row.get("name"); - String demName = (String) row.get("dname"); - String demValue = (String) row.get("dvalue"); + builder.append(" FROM MonMetrics.Measurements "); + builder.append("WHERE definition_dimensions_id = :definition_id "); + builder.append(createWhereClause(startTime, endTime)); - if (defId == null || !Arrays.equals(currentId, defId)) { - currentId = defId; - dims = new HashMap<>(); - dims.put(demName, demValue); - - Statistics statistics = new Statistics(); - statistics.setName(defName); - statistics.setDimensions(dims); - byteIdMap.put(currentId, statistics); - } else - dims.put(demName, demValue); + if (period >= 1) { + builder.append(") as TimeSlices group by time_slice order by time_slice"); + } + return builder.toString(); } - bytes.add(currentId); - - return byteIdMap; - } - - List createColumns(List list) { - List copy = new ArrayList<>(); - for (String string : list) { - copy.add(string); - } - Collections.sort(copy); - copy.add(0, "timestamp"); - - return copy; - } - - private String createQuery(int period, DateTime startTime, DateTime endTime, - List statistics) { - StringBuilder builder = new StringBuilder(); - - builder.append("SELECT " + getColumns(statistics)); - - if (period >= 1) { - builder.append(",MIN(time_stamp) as time_interval "); - builder.append(" FROM (Select FLOOR((EXTRACT('epoch' from time_stamp) - "); - builder.append(createOffset(period, startTime, endTime)); - builder.append(" AS time_slice, time_stamp, value "); + private String createWhereClause(DateTime startTime, DateTime endTime) { + String clause = ""; + if (startTime != null && endTime != null) { + clause = "AND time_stamp >= :start_time AND time_stamp <= :end_time "; + } else if (startTime != null) { + clause = "AND time_stamp >= :start_time "; + } + return clause; } - builder.append(" FROM MonMetrics.Measurements "); - builder.append("WHERE definition_dimensions_id = :definition_id "); - builder.append(createWhereClause(startTime, endTime)); + private String createOffset(int period, DateTime startTime, DateTime endTime) { - if (period >= 1) { - builder.append(") as TimeSlices group by time_slice order by time_slice"); + StringBuilder offset = new StringBuilder(); + offset.append("(select mod((select extract('epoch' from time_stamp) from MonMetrics.Measurements "); + offset.append("WHERE definition_dimensions_id = :definition_id "); + offset.append(createWhereClause(startTime, endTime)); + offset.append("order by time_stamp limit 1"); + offset.append("),"); + offset.append(period + ")))/" + period + ")"); + + return offset.toString(); } - return builder.toString(); - } - private String createWhereClause(DateTime startTime, DateTime endTime) { - String clause = ""; - if (startTime != null && endTime != null) { - clause = "AND time_stamp >= :start_time AND time_stamp <= :end_time "; - } else if (startTime != null) { - clause = "AND time_stamp >= :start_time "; + private String getColumns(List statistics) { + StringBuilder buildColumns = new StringBuilder(); + + int size = statistics.size(); + int count = 0; + for (String statistic : statistics) { + if (statistic.equals("average")) { + buildColumns.append("avg(value) as average "); + } else { + buildColumns.append(statistic + "(value) as " + statistic + " "); + } + + if (size - 1 > count) { + buildColumns.append(","); + } + count++; + } + return buildColumns.toString(); } - return clause; - } - - private String createOffset(int period, DateTime startTime, DateTime endTime) { - - StringBuilder offset = new StringBuilder(); - offset.append("(select mod((select extract('epoch' from time_stamp) from MonMetrics.Measurements "); - offset.append("WHERE definition_dimensions_id = :definition_id "); - offset.append(createWhereClause(startTime, endTime)); - offset.append("order by time_stamp limit 1"); - offset.append("),"); - offset.append(period + ")))/" + period + ")"); - - return offset.toString(); - } - - private String getColumns(List statistics) { - StringBuilder buildColumns = new StringBuilder(); - - int size = statistics.size(); - int count = 0; - for (String statistic : statistics) { - if (statistic.equals("average")) { - buildColumns.append("avg(value) as average "); - } else { - buildColumns.append(statistic + "(value) as " + statistic + " "); - } - - if (size - 1 > count) { - buildColumns.append(","); - } - count++; - } - return buildColumns.toString(); - } } \ No newline at end of file