Merge "Add the ability to give vertica a hint to satisfy query locally"
This commit is contained in:
commit
4ac3459ffb
|
@ -75,6 +75,14 @@ vertica:
|
|||
minSize: 4
|
||||
maxSize: 32
|
||||
checkConnectionWhileIdle: true
|
||||
#
|
||||
# vertica database hint to be added to SELECT
|
||||
# statements. For example, the hint below is used
|
||||
# to tell vertica that the query can be satisfied
|
||||
# locally (replicated projection).
|
||||
#
|
||||
# dbHint: "/*+KV(01)*/"
|
||||
dbHint: ""
|
||||
|
||||
middleware:
|
||||
enabled: true
|
||||
|
|
|
@ -55,6 +55,14 @@ vertica:
|
|||
minSize: 4
|
||||
maxSize: 32
|
||||
checkConnectionWhileIdle: false
|
||||
#
|
||||
# vertica database hint to be added to SELECT
|
||||
# statements. For example, the hint below is used
|
||||
# to tell vertica that the query can be satisfied
|
||||
# locally (replicated projection).
|
||||
#
|
||||
# dbHint: "/*+KV(01)*/"
|
||||
dbHint: ""
|
||||
|
||||
middleware:
|
||||
enabled: true
|
||||
|
|
|
@ -17,6 +17,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
|||
import monasca.common.hibernate.configuration.HibernateDbConfiguration;
|
||||
import monasca.common.messaging.kafka.KafkaConfiguration;
|
||||
import monasca.api.infrastructure.middleware.MiddlewareConfiguration;
|
||||
import monasca.api.infrastructure.persistence.vertica.VerticaDataSourceFactory;
|
||||
import monasca.common.configuration.DatabaseConfiguration;
|
||||
|
||||
import monasca.common.configuration.InfluxDbConfiguration;
|
||||
|
@ -48,7 +49,7 @@ public class ApiConfig extends Configuration {
|
|||
public DataSourceFactory mysql;
|
||||
@Valid
|
||||
@NotNull
|
||||
public DataSourceFactory vertica;
|
||||
public VerticaDataSourceFactory vertica;
|
||||
@Valid
|
||||
@NotNull
|
||||
public KafkaConfiguration kafka;
|
||||
|
|
|
@ -15,6 +15,7 @@ package monasca.api.infrastructure.persistence.vertica;
|
|||
import monasca.api.domain.exception.MultipleMetricsException;
|
||||
import monasca.api.domain.model.measurement.MeasurementRepo;
|
||||
import monasca.api.domain.model.measurement.Measurements;
|
||||
import monasca.api.ApiConfig;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
@ -50,7 +51,7 @@ public class MeasurementVerticaRepoImpl implements MeasurementRepo {
|
|||
ISODateTimeFormat.dateTime().withZoneUTC();
|
||||
|
||||
private static final String FIND_BY_METRIC_DEF_SQL =
|
||||
"SELECT to_hex(mes.definition_dimensions_id) as def_dims_id, "
|
||||
"SELECT %s to_hex(mes.definition_dimensions_id) as def_dims_id, "
|
||||
+ "mes.time_stamp, mes.value, mes.value_meta "
|
||||
+ "FROM MonMetrics.Measurements mes "
|
||||
+ "WHERE to_hex(mes.definition_dimensions_id) %s " // Sub select query
|
||||
|
@ -62,7 +63,7 @@ public class MeasurementVerticaRepoImpl implements MeasurementRepo {
|
|||
|
||||
private static final String
|
||||
DEFDIM_IDS_SELECT =
|
||||
"SELECT defDims.id "
|
||||
"SELECT %s defDims.id "
|
||||
+ "FROM MonMetrics.DefinitionDimensions defDims "
|
||||
+ "WHERE defDims.id IN (%s)";
|
||||
|
||||
|
@ -72,11 +73,14 @@ public class MeasurementVerticaRepoImpl implements MeasurementRepo {
|
|||
|
||||
private final static TypeReference VALUE_META_TYPE = new TypeReference<Map<String, String>>() {};
|
||||
|
||||
private final String dbHint;
|
||||
|
||||
@Inject
|
||||
public MeasurementVerticaRepoImpl(
|
||||
@Named("vertica") DBI db) {
|
||||
|
||||
@Named("vertica") DBI db, ApiConfig config)
|
||||
{
|
||||
this.db = db;
|
||||
this.dbHint = config.vertica.dbHint;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -144,7 +148,7 @@ public class MeasurementVerticaRepoImpl implements MeasurementRepo {
|
|||
|
||||
}
|
||||
|
||||
String sql = String.format(FIND_BY_METRIC_DEF_SQL, defDimInClause, sb, orderById);
|
||||
String sql = String.format(FIND_BY_METRIC_DEF_SQL, this.dbHint, defDimInClause, sb, orderById);
|
||||
|
||||
Query<Map<String, Object>> query = h.createQuery(sql)
|
||||
.bind("startTime", new Timestamp(startTime.getMillis()))
|
||||
|
@ -251,6 +255,7 @@ public class MeasurementVerticaRepoImpl implements MeasurementRepo {
|
|||
|
||||
String defDimSql = String.format(
|
||||
MetricQueries.FIND_METRIC_DEFS_SQL,
|
||||
this.dbHint,
|
||||
MetricQueries.buildMetricDefinitionSubSql(name, dimensions));
|
||||
|
||||
Query<Map<String, Object>> query = h.createQuery(defDimSql).bind("tenantId", tenantId);
|
||||
|
|
|
@ -16,6 +16,7 @@ package monasca.api.infrastructure.persistence.vertica;
|
|||
import monasca.api.domain.model.metric.MetricDefinitionRepo;
|
||||
import monasca.api.domain.model.metric.MetricName;
|
||||
import monasca.api.resource.exception.Exceptions;
|
||||
import monasca.api.ApiConfig;
|
||||
import monasca.common.model.metric.MetricDefinition;
|
||||
|
||||
import org.apache.commons.codec.DecoderException;
|
||||
|
@ -58,7 +59,7 @@ public class MetricDefinitionVerticaRepoImpl implements MetricDefinitionRepo {
|
|||
+ "%s "; // limit goes here
|
||||
|
||||
private static final String FIND_METRIC_NAMES_SQL =
|
||||
"SELECT distinct def.id, def.name "
|
||||
"SELECT %s distinct def.id, def.name "
|
||||
+ "FROM MonMetrics.Definitions def "
|
||||
+ "WHERE def.id IN (%s) " // Subselect goes here
|
||||
+ "ORDER BY def.id ASC ";
|
||||
|
@ -74,7 +75,7 @@ public class MetricDefinitionVerticaRepoImpl implements MetricDefinitionRepo {
|
|||
+ "ORDER BY max_id ASC %s"; // Limit goes here.
|
||||
|
||||
private static final String DEFDIM_IDS_SELECT =
|
||||
"SELECT to_hex(defDims.id) AS id "
|
||||
"SELECT %s to_hex(defDims.id) AS id "
|
||||
+ "FROM MonMetrics.Definitions def, MonMetrics.DefinitionDimensions defDims "
|
||||
+ "WHERE defDims.definition_id = def.id "
|
||||
+ "AND def.tenant_id = :tenantId "
|
||||
|
@ -92,11 +93,13 @@ public class MetricDefinitionVerticaRepoImpl implements MetricDefinitionRepo {
|
|||
|
||||
private final DBI db;
|
||||
|
||||
private final String dbHint;
|
||||
|
||||
@Inject
|
||||
public MetricDefinitionVerticaRepoImpl(@Named("vertica") DBI db) {
|
||||
|
||||
public MetricDefinitionVerticaRepoImpl(@Named("vertica") DBI db, ApiConfig config)
|
||||
{
|
||||
this.db = db;
|
||||
|
||||
this.dbHint = config.vertica.dbHint;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -275,6 +278,7 @@ public class MetricDefinitionVerticaRepoImpl implements MetricDefinitionRepo {
|
|||
|
||||
String sql =
|
||||
String.format(MetricQueries.FIND_METRIC_DEFS_SQL,
|
||||
this.dbHint,
|
||||
String.format(METRIC_DEF_SUB_QUERY,
|
||||
namePart,
|
||||
offsetPart,
|
||||
|
@ -344,6 +348,7 @@ public class MetricDefinitionVerticaRepoImpl implements MetricDefinitionRepo {
|
|||
|
||||
String defDimSql = String.format(
|
||||
DEFDIM_IDS_SELECT,
|
||||
this.dbHint,
|
||||
namePart,
|
||||
MetricQueries.buildDimensionAndClause(dimensions, "defDims"));
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ final class MetricQueries {
|
|||
static final Splitter offsetSplitter = Splitter.on(OFFSET_SEPARATOR).omitEmptyStrings().trimResults();
|
||||
|
||||
static final String FIND_METRIC_DEFS_SQL =
|
||||
"SELECT TO_HEX(defDims.id) as defDimsId, def.name, dims.name as dName, dims.value AS dValue "
|
||||
"SELECT %s TO_HEX(defDims.id) as defDimsId, def.name, dims.name as dName, dims.value AS dValue "
|
||||
+ "FROM MonMetrics.Definitions def "
|
||||
+ "JOIN MonMetrics.DefinitionDimensions defDims ON def.id = defDims.definition_id "
|
||||
// Outer join needed in case there are no dimensions for a definition.
|
||||
|
|
|
@ -16,6 +16,7 @@ package monasca.api.infrastructure.persistence.vertica;
|
|||
import monasca.api.domain.exception.MultipleMetricsException;
|
||||
import monasca.api.domain.model.statistic.StatisticRepo;
|
||||
import monasca.api.domain.model.statistic.Statistics;
|
||||
import monasca.api.ApiConfig;
|
||||
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.format.DateTimeFormatter;
|
||||
|
@ -46,12 +47,14 @@ public class StatisticVerticaRepoImpl implements StatisticRepo {
|
|||
ISODateTimeFormat.dateTime().withZoneUTC();
|
||||
|
||||
private final DBI db;
|
||||
private final String dbHint;
|
||||
|
||||
@Inject
|
||||
public StatisticVerticaRepoImpl(@Named("vertica") DBI db) {
|
||||
|
||||
public StatisticVerticaRepoImpl(@Named("vertica") DBI db,
|
||||
ApiConfig config)
|
||||
{
|
||||
this.db = db;
|
||||
|
||||
this.dbHint = config.vertica.dbHint;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -213,6 +216,7 @@ public class StatisticVerticaRepoImpl implements StatisticRepo {
|
|||
|
||||
String sql = String.format(
|
||||
MetricQueries.FIND_METRIC_DEFS_SQL,
|
||||
this.dbHint,
|
||||
MetricQueries.buildMetricDefinitionSubSql(name, dimensions));
|
||||
|
||||
Query<Map<String, Object>> query =
|
||||
|
@ -300,7 +304,7 @@ public class StatisticVerticaRepoImpl implements StatisticRepo {
|
|||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
sb.append("SELECT ");
|
||||
sb.append("SELECT " + this.dbHint + " ");
|
||||
if (groupBy != null && !groupBy.isEmpty()) {
|
||||
sb.append(" to_hex(definition_dimensions_id) AS id, ");
|
||||
}
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Copyright (c) 2016 Hewlett-Packard Development Company, L.P.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package monasca.api.infrastructure.persistence.vertica;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.dropwizard.db.DataSourceFactory;
|
||||
|
||||
public class VerticaDataSourceFactory extends DataSourceFactory {
|
||||
|
||||
@JsonProperty
|
||||
String dbHint = "";
|
||||
|
||||
public String getDbHint() {
|
||||
return dbHint;
|
||||
}
|
||||
|
||||
}
|
|
@ -58,6 +58,14 @@ vertica:
|
|||
minSize: 4
|
||||
maxSize: 32
|
||||
checkConnectionWhileIdle: false
|
||||
#
|
||||
# vertica database hint to be added to SELECT
|
||||
# statements. For example, the hint below is used
|
||||
# to tell vertica that the query can be satisfied
|
||||
# locally (replicated projection).
|
||||
#
|
||||
# dbHint: "/*+KV(01)*/"
|
||||
dbHint: ""
|
||||
|
||||
middleware:
|
||||
enabled: true
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
|
||||
package monasca.api.infrastructure.persistence.vertica;
|
||||
|
||||
import monasca.api.ApiConfig;
|
||||
import monasca.api.domain.model.measurement.MeasurementRepo;
|
||||
import monasca.api.domain.model.measurement.Measurements;
|
||||
|
||||
|
@ -33,6 +34,7 @@ import static org.testng.Assert.assertEquals;
|
|||
|
||||
@Test(groups = "database")
|
||||
public class MeasurementVerticaRepositoryImplTest {
|
||||
private ApiConfig config;
|
||||
private DBI db;
|
||||
private Handle handle;
|
||||
private MeasurementRepo repo;
|
||||
|
@ -42,7 +44,8 @@ public class MeasurementVerticaRepositoryImplTest {
|
|||
Class.forName("com.vertica.jdbc.Driver");
|
||||
db = new DBI("jdbc:vertica://192.168.10.4/mon", "dbadmin", "password");
|
||||
handle = db.open();
|
||||
repo = new MeasurementVerticaRepoImpl(db);
|
||||
config = new ApiConfig();
|
||||
repo = new MeasurementVerticaRepoImpl(db, config);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
package monasca.api.infrastructure.persistence.vertica;
|
||||
|
||||
import monasca.common.model.metric.MetricDefinition;
|
||||
import monasca.api.ApiConfig;
|
||||
import monasca.api.domain.model.metric.MetricDefinitionRepo;
|
||||
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -36,13 +37,15 @@ public class MetricDefinitionVerticaRepositoryImplTest {
|
|||
private DBI db;
|
||||
private Handle handle;
|
||||
private MetricDefinitionRepo repo;
|
||||
private ApiConfig config;
|
||||
|
||||
@BeforeClass
|
||||
protected void setupClass() throws Exception {
|
||||
Class.forName("com.vertica.jdbc.Driver");
|
||||
db = new DBI("jdbc:vertica://192.168.10.4/mon", "dbadmin", "password");
|
||||
handle = db.open();
|
||||
repo = new MetricDefinitionVerticaRepoImpl(db);
|
||||
config = new ApiConfig();
|
||||
repo = new MetricDefinitionVerticaRepoImpl(db, config);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
|
|
@ -11,8 +11,7 @@ metricsTopic: metrics
|
|||
eventsTopic: events
|
||||
|
||||
databaseConfiguration:
|
||||
# vertica | influxdb
|
||||
#databaseType: influxdb
|
||||
# vertica | influxdb
|
||||
databaseType: influxdb
|
||||
|
||||
kafka:
|
||||
|
@ -44,6 +43,14 @@ mysql:
|
|||
# minSize: 4
|
||||
# maxSize: 32
|
||||
# checkConnectionWhileIdle: false
|
||||
# #
|
||||
# # vertica database hint to be added to SELECT
|
||||
# # statements. For example, the hint below is used
|
||||
# # to tell vertica that the query can be satisfied
|
||||
# # locally (replicated projection).
|
||||
# #
|
||||
# # dbHint: "/*+KV(01)*/"
|
||||
# dbHint: ""
|
||||
|
||||
|
||||
influxDB:
|
||||
|
|
Loading…
Reference in New Issue