Update all columns in metrics on an update to refresh TTL

Cassandra Time to Live (TTL) is applied per-column in a
row, and with a default TTL for the table.  The Cassandra
driver appears to have been written with a misunderstanding
that the TTL was per-row, and as a result the update
statements were written to avoid writing all cells.
But that results in some cells in the row not being updated,
and after their TTL expires they are removed, so a metric
query may result in partial data (updated_at set but
metric_id and created_at missing).

This fix changes the statement used when a new measurement
is received to use the same insert statement for the metric
row and thus refresh the TTL for all the columns.

A follow on patch should be created to remove the Update
statements for Measurements, and consider refactoring/removing
the caching model.

Change-Id: I6b7636a52e8bdb2ce8ad97f839acb6184cd58a8b
Story: 2005832
Task: 35872
This commit is contained in:
Joseph Davis 2019-06-25 12:25:03 -07:00 committed by Witek Bedyk
parent c32885aed8
commit 4b7e7bf905
2 changed files with 6 additions and 11 deletions

View File

@ -64,6 +64,7 @@ public class CassandraCluster {
+ "set value = ?, value_meta = ?, region = ?, tenant_id = ?, metric_name = ?, dimensions = ? "
+ "where metric_id = ? and time_stamp = ?";
// TODO: Remove update statements, TTL issues
private static final String MEASUREMENT_UPDATE_CQL = "update monasca.measurements USING TTL ? "
+ "set value = ?, value_meta = ? " + "where metric_id = ? and time_stamp = ?";
@ -71,10 +72,6 @@ public class CassandraCluster {
+ "set metric_id = ?, created_at = ?, updated_at = ? "
+ "where region = ? and tenant_id = ? and metric_name = ? and dimensions = ? and dimension_names = ?";
private static final String METRICS_UPDATE_CQL = "update monasca.metrics USING TTL ? "
+ "set updated_at = ? "
+ "where region = ? and tenant_id = ? and metric_name = ? and dimensions = ? and dimension_names = ?";
private static final String DIMENSION_INSERT_CQL = "insert into monasca.dimensions "
+ "(region, tenant_id, name, value) values (?, ?, ?, ?)";
@ -115,7 +112,6 @@ public class CassandraCluster {
private PreparedStatement measurementInsertStmt;
private PreparedStatement measurementUpdateStmt;
private PreparedStatement metricInsertStmt;
private PreparedStatement metricUpdateStmt;
private PreparedStatement dimensionStmt;
private PreparedStatement dimensionMetricStmt;
private PreparedStatement metricDimensionStmt;
@ -182,9 +178,9 @@ public class CassandraCluster {
metricsSession = cluster.connect(dbConfig.getKeySpace());
measurementInsertStmt = metricsSession.prepare(MEASUREMENT_INSERT_CQL).setIdempotent(true);
// TODO: Remove update statements, TTL issues
measurementUpdateStmt = metricsSession.prepare(MEASUREMENT_UPDATE_CQL).setIdempotent(true);
metricInsertStmt = metricsSession.prepare(METRICS_INSERT_CQL).setIdempotent(true);
metricUpdateStmt = metricsSession.prepare(METRICS_UPDATE_CQL).setIdempotent(true);
dimensionStmt = metricsSession.prepare(DIMENSION_INSERT_CQL).setIdempotent(true);
dimensionMetricStmt = metricsSession.prepare(DIMENSION_METRIC_INSERT_CQL).setIdempotent(true);
metricDimensionStmt = metricsSession.prepare(METRIC_DIMENSION_INSERT_CQL).setIdempotent(true);
@ -232,6 +228,7 @@ public class CassandraCluster {
return measurementInsertStmt;
}
// TODO: Remove update statements, TTL issues
public PreparedStatement getMeasurementUpdateStmt() {
return measurementUpdateStmt;
}
@ -240,10 +237,6 @@ public class CassandraCluster {
return metricInsertStmt;
}
public PreparedStatement getMetricUpdateStmt() {
return metricUpdateStmt;
}
public PreparedStatement getDimensionStmt() {
return dimensionStmt;
}

View File

@ -155,7 +155,9 @@ public class CassandraMetricRepo extends CassandraRepo implements Repo<MetricEnv
metric.getValue(), metric.getValueMeta(), region, tenantId, metricName, dimensions, id));
} else {
metricCacheHitMeter.mark();
batches.addMetricQuery(cluster.getMetricUpdateStmt().bind(retention,
// MUST update all relevant columns to ensure TTL consistency in a row
batches.addMetricQuery(cluster.getMetricInsertStmt().bind(retention,
defIdShaHash.getSha1HashByteBuffer(), new Timestamp(metric.getTimestamp()),
new Timestamp(metric.getTimestamp()), region, tenantId, metricName,
getDimensionList(dimensions), new ArrayList<>(dimensions.keySet())));
batches.addMeasurementQuery(buildMeasurementUpdateQuery(defIdShaHash, metric.getTimestamp(),