diff --git a/monasca_persister/conf/elasticsearch.py b/monasca_persister/conf/elasticsearch.py index 7054aa83..1480cdf4 100644 --- a/monasca_persister/conf/elasticsearch.py +++ b/monasca_persister/conf/elasticsearch.py @@ -18,27 +18,32 @@ from oslo_config import cfg from monasca_persister.conf import types elasticsearch_opts = [ - cfg.StrOpt('index_name', - help='Index name where events are stored', - default='monevents'), - cfg.ListOpt('hosts', - help='List of Elasticsearch nodes in format host[:port]', - default=['localhost:9200'], - item_type=types.HostAddressPortType()), - cfg.BoolOpt('sniff_on_start', - help='Flag indicating whether to obtain a list of nodes from the cluser at startup time', - default=False), - cfg.BoolOpt('sniff_on_connection_fail', - help='Flag controlling if connection failure triggers a sniff', - default=False), - cfg.IntOpt('sniffer_timeout', - help='Number of seconds between automatic sniffs', - default=None), - cfg.IntOpt('max_retries', - help='Maximum number of retries before an exception is propagated', - default=3, - min=1) -] + cfg.StrOpt( + 'index_name', + help='Index name where events are stored', + default='monevents'), + cfg.ListOpt( + 'hosts', + help='List of Elasticsearch nodes in format host[:port]', + default=['localhost:9200'], + item_type=types.HostAddressPortType()), + cfg.BoolOpt( + 'sniff_on_start', + help='Flag indicating whether to obtain a list of nodes from the cluser at startup time', + default=False), + cfg.BoolOpt( + 'sniff_on_connection_fail', + help='Flag controlling if connection failure triggers a sniff', + default=False), + cfg.IntOpt( + 'sniffer_timeout', + help='Number of seconds between automatic sniffs', + default=None), + cfg.IntOpt( + 'max_retries', + help='Maximum number of retries before an exception is propagated', + default=3, + min=1)] elasticsearch_group = cfg.OptGroup(name='elasticsearch', title='elasticsearch') diff --git a/monasca_persister/conf/kafka_alarm_history.py b/monasca_persister/conf/kafka_alarm_history.py index 77c24eea..141b92b9 100644 --- a/monasca_persister/conf/kafka_alarm_history.py +++ b/monasca_persister/conf/kafka_alarm_history.py @@ -42,9 +42,10 @@ kafka_alarm_history_opts = [ cfg.StrOpt('zookeeper_path', help='Path in zookeeper for kafka consumer group partitioning algorithm', default='/persister_partitions/$kafka_alarm_history.topic'), - cfg.IntOpt('batch_size', - help='Maximum number of alarm state history messages to buffer before writing to database', - default=1), + cfg.IntOpt( + 'batch_size', + help='Maximum number of alarm state history messages to buffer before writing to database', + default=1), ] diff --git a/monasca_persister/conf/repositories.py b/monasca_persister/conf/repositories.py index 4d92172b..4b3dca8b 100644 --- a/monasca_persister/conf/repositories.py +++ b/monasca_persister/conf/repositories.py @@ -17,15 +17,21 @@ from oslo_config import cfg repositories_opts = [ - cfg.StrOpt(name='metrics_driver', - help='The repository driver to use for metrics', - default='monasca_persister.repositories.influxdb.metrics_repository:MetricInfluxdbRepository'), - cfg.StrOpt(name='alarm_state_history_driver', - help='The repository driver to use for alarm state history', - default='monasca_persister.repositories.influxdb.metrics_repository:MetricInfluxdbRepository'), - cfg.StrOpt(name='events_driver', - help='The repository driver to use for events', - default='monasca_persister.repositories.elasticsearch.events_repository:ElasticSearchEventsRepository')] + cfg.StrOpt( + name='metrics_driver', + help='The repository driver to use for metrics', + default=('monasca_persister.repositories.influxdb.metrics_repository:' + 'MetricInfluxdbRepository')), + cfg.StrOpt( + name='alarm_state_history_driver', + help='The repository driver to use for alarm state history', + default=('monasca_persister.repositories.influxdb.metrics_repository:' + 'MetricInfluxdbRepository')), + cfg.StrOpt( + name='events_driver', + help='The repository driver to use for events', + default=('monasca_persister.repositories.elasticsearch.events_repository:' + 'ElasticSearchEventsRepository'))] repositories_group = cfg.OptGroup(name='repositories', title='repositories') diff --git a/monasca_persister/persister.py b/monasca_persister/persister.py index 36bc9532..c7bfd0c3 100644 --- a/monasca_persister/persister.py +++ b/monasca_persister/persister.py @@ -61,7 +61,8 @@ def clean_exit(signum, frame=None): for process in processors: try: if process.is_alive(): - process.terminate() # Sends sigterm which any processes after a notification is sent attempt to handle + # Sends sigterm which any processes after a notification is sent attempt to handle + process.terminate() wait_for_exit = True except Exception: # nosec # There is really nothing to do if the kill fails, so just go on. diff --git a/monasca_persister/repositories/cassandra/alarm_state_history_repository.py b/monasca_persister/repositories/cassandra/alarm_state_history_repository.py index fe0c0a08..548b71c2 100644 --- a/monasca_persister/repositories/cassandra/alarm_state_history_repository.py +++ b/monasca_persister/repositories/cassandra/alarm_state_history_repository.py @@ -25,9 +25,10 @@ from monasca_persister.repositories.utils import parse_alarm_state_hist_message LOG = log.getLogger(__name__) -UPSERT_CQL = ('update monasca.alarm_state_history USING TTL ? ' - 'set metric = ?, old_state = ?, new_state = ?, sub_alarms = ?, reason = ?, reason_data = ? ' - 'where tenant_id = ? and alarm_id = ? and time_stamp = ?') +UPSERT_CQL = ( + 'update monasca.alarm_state_history USING TTL ? ' + 'set metric = ?, old_state = ?, new_state = ?, sub_alarms = ?, reason = ?, reason_data = ? ' + 'where tenant_id = ? and alarm_id = ? and time_stamp = ?') class AlarmStateHistCassandraRepository(abstract_repository.AbstractCassandraRepository): diff --git a/monasca_persister/repositories/cassandra/connection_util.py b/monasca_persister/repositories/cassandra/connection_util.py index fce085cb..73d15a31 100644 --- a/monasca_persister/repositories/cassandra/connection_util.py +++ b/monasca_persister/repositories/cassandra/connection_util.py @@ -47,5 +47,6 @@ def create_cluster(): def create_session(cluster): session = cluster.connect(conf.cassandra.keyspace) session.default_timeout = conf.cassandra.read_timeout - session.default_consistency_level = ConsistencyLevel.name_to_value[conf.cassandra.consistency_level] + session.default_consistency_level = \ + ConsistencyLevel.name_to_value[conf.cassandra.consistency_level] return session diff --git a/monasca_persister/repositories/cassandra/metric_batch.py b/monasca_persister/repositories/cassandra/metric_batch.py index 04a6c60b..bd06c5f5 100644 --- a/monasca_persister/repositories/cassandra/metric_batch.py +++ b/monasca_persister/repositories/cassandra/metric_batch.py @@ -68,7 +68,10 @@ class MetricBatch(object): self.batch_query_by_replicas(bound_stmt, self.measurement_queries) def batch_query_by_replicas(self, bound_stmt, query_map): - hosts = tuple(self.lb_policy.make_query_plan(working_keyspace=bound_stmt.keyspace, query=bound_stmt)) + hosts = tuple( + self.lb_policy.make_query_plan( + working_keyspace=bound_stmt.keyspace, + query=bound_stmt)) queue = query_map.get(hosts, None) if not queue: @@ -96,18 +99,21 @@ class MetricBatch(object): @staticmethod def log_token_batch_map(name, query_map): - LOG.info('%s : Size: %s; Tokens: |%s|' % (name, len(query_map), - '|'.join(['%s: %s' % ( - token, - ','.join([str(counter.value()) for (batch, counter) in queue])) - for token, queue in query_map.items()]))) + LOG.info('%s : Size: %s; Tokens: |%s|' % + (name, len(query_map), + '|'.join(['%s: %s' % ( + token, + ','.join([str(counter.value()) for (batch, counter) in queue])) + for token, queue in query_map.items()]))) @staticmethod def log_replica_batch_map(name, query_map): - LOG.info('%s : Size: %s; Replicas: |%s|' % (name, len(query_map), '|'.join([ - '%s: %s' % ( - ','.join([h.address for h in hosts]), ','.join([str(counter.value()) for (batch, counter) in queue])) - for hosts, queue in query_map.items()]))) + LOG.info('%s : Size: %s; Replicas: |%s|' % + (name, len(query_map), '|'.join([ + '%s: %s' % ( + ','.join([h.address for h in hosts]), + ','.join([str(counter.value()) for (batch, counter) in queue])) + for hosts, queue in query_map.items()]))) def get_all_batches(self): self.log_token_batch_map("metric batches", self.metric_queries) diff --git a/monasca_persister/repositories/cassandra/metrics_repository.py b/monasca_persister/repositories/cassandra/metrics_repository.py index 7eee8b35..fbd0695c 100644 --- a/monasca_persister/repositories/cassandra/metrics_repository.py +++ b/monasca_persister/repositories/cassandra/metrics_repository.py @@ -31,9 +31,10 @@ from monasca_persister.repositories.utils import parse_measurement_message LOG = log.getLogger(__name__) -MEASUREMENT_INSERT_CQL = ('update monasca.measurements USING TTL ? ' - 'set value = ?, value_meta = ?, region = ?, tenant_id = ?, metric_name = ?, dimensions = ? ' - 'where metric_id = ? and time_stamp = ?') +MEASUREMENT_INSERT_CQL = ( + 'update monasca.measurements USING TTL ? ' + 'set value = ?, value_meta = ?, region = ?, tenant_id = ?, metric_name = ?, dimensions = ? ' + 'where metric_id = ? and time_stamp = ?') MEASUREMENT_UPDATE_CQL = ('update monasca.measurements USING TTL ? ' 'set value = ?, value_meta = ? where metric_id = ? and time_stamp = ?') @@ -66,8 +67,16 @@ RETRIEVE_METRIC_DIMENSION_CQL = ('select region, tenant_id, metric_name, ' 'WHERE token(region, tenant_id, metric_name) > ? ' 'and token(region, tenant_id, metric_name) <= ? ') -Metric = namedtuple('Metric', ['id', 'region', 'tenant_id', 'name', 'dimension_list', 'dimension_names', - 'time_stamp', 'value', 'value_meta']) +Metric = namedtuple('Metric', + ['id', + 'region', + 'tenant_id', + 'name', + 'dimension_list', + 'dimension_names', + 'time_stamp', + 'value', + 'value_meta']) class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository): @@ -101,7 +110,10 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository) self._retrieve_metric_dimension_stmt = self._session.prepare(RETRIEVE_METRIC_DIMENSION_CQL) - self._metric_batch = MetricBatch(self._cluster.metadata, self._cluster.load_balancing_policy, self._max_batches) + self._metric_batch = MetricBatch( + self._cluster.metadata, + self._cluster.load_balancing_policy, + self._max_batches) self._metric_id_cache = LRUCache(self._cache_size) self._dimension_cache = LRUCache(self._cache_size) @@ -178,34 +190,29 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository) self._metric_batch.add_dimension_query(dimension_bound_stmt) self._dimension_cache[dim_key] = dim_key - metric_dim_key = self._get_metric_dimnesion_key(metric.region, metric.tenant_id, metric.name, name, - value) + metric_dim_key = self._get_metric_dimnesion_key( + metric.region, metric.tenant_id, metric.name, name, value) if not self._metric_dimension_cache.get(metric_dim_key, None): - dimension_metric_bound_stmt = self._dimension_metric_stmt.bind((metric.region, - metric.tenant_id, - name, - value, - metric.name)) + dimension_metric_bound_stmt = self._dimension_metric_stmt.bind( + (metric.region, metric.tenant_id, name, value, metric.name)) self._metric_batch.add_dimension_metric_query(dimension_metric_bound_stmt) - metric_dimension_bound_stmt = self._metric_dimension_stmt.bind((metric.region, - metric.tenant_id, - metric.name, - name, - value)) + metric_dimension_bound_stmt = self._metric_dimension_stmt.bind( + (metric.region, metric.tenant_id, metric.name, name, value)) self._metric_batch.add_metric_dimension_query(metric_dimension_bound_stmt) self._metric_dimension_cache[metric_dim_key] = metric_dim_key - measurement_insert_bound_stmt = self._measurement_insert_stmt.bind((self._retention, - metric.value, - metric.value_meta, - metric.region, - metric.tenant_id, - metric.name, - metric.dimension_list, - id_bytes, - metric.time_stamp)) + measurement_insert_bound_stmt = self._measurement_insert_stmt.bind( + (self._retention, + metric.value, + metric.value_meta, + metric.region, + metric.tenant_id, + metric.name, + metric.dimension_list, + id_bytes, + metric.time_stamp)) self._metric_batch.add_measurement_query(measurement_insert_bound_stmt) return metric @@ -240,7 +247,9 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository) key = self._get_dimnesion_key(row.region, row.tenant_id, row.name, row.value) self._dimension_cache[key] = key - LOG.info("loaded %s dimension entries cache from database into cache." % self._dimension_cache.currsize) + LOG.info( + "loaded %s dimension entries cache from database into cache." % + self._dimension_cache.currsize) @staticmethod def _get_dimnesion_key(region, tenant_id, name, value): @@ -258,16 +267,22 @@ class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository) cnt = 0 for row in rows: - key = self._get_metric_dimnesion_key(row.region, row.tenant_id, row.metric_name, row.dimension_name, - row.dimension_value) + key = self._get_metric_dimnesion_key( + row.region, + row.tenant_id, + row.metric_name, + row.dimension_name, + row.dimension_value) self._metric_dimension_cache[key] = key cnt += 1 LOG.info("loaded %s metric dimension entries from database into cache." % cnt) LOG.info( - "total loaded %s metric dimension entries in cache." % self._metric_dimension_cache.currsize) + "total loaded %s metric dimension entries in cache." % + self._metric_dimension_cache.currsize) @staticmethod def _get_metric_dimnesion_key(region, tenant_id, metric_name, dimension_name, dimension_value): - return '%s\0%s\0%s\0%s\0%s' % (region, tenant_id, metric_name, dimension_name, dimension_value) + return '%s\0%s\0%s\0%s\0%s' % (region, tenant_id, metric_name, + dimension_name, dimension_value) diff --git a/monasca_persister/repositories/cassandra/retry_policy.py b/monasca_persister/repositories/cassandra/retry_policy.py index 163337d6..51052099 100644 --- a/monasca_persister/repositories/cassandra/retry_policy.py +++ b/monasca_persister/repositories/cassandra/retry_policy.py @@ -46,4 +46,8 @@ class MonascaRetryPolicy(RetryPolicy): def on_unavailable(self, query, consistency, required_replicas, alive_replicas, retry_num): - return (self.RETRY_NEXT_HOST, consistency) if retry_num < self.unavailable_attempts else (self.RETHROW, None) + return ( + self.RETRY_NEXT_HOST, + consistency) if retry_num < self.unavailable_attempts else ( + self.RETHROW, + None) diff --git a/monasca_persister/repositories/cassandra/token_range_query_manager.py b/monasca_persister/repositories/cassandra/token_range_query_manager.py index e90a47fa..fc436d2d 100644 --- a/monasca_persister/repositories/cassandra/token_range_query_manager.py +++ b/monasca_persister/repositories/cassandra/token_range_query_manager.py @@ -63,5 +63,6 @@ class TokenRangeQueryManager(object): def execute_query_token_range(token_range): - results = TokenRangeQueryManager.session.execute(TokenRangeQueryManager.prepared.bind(token_range)) + results = TokenRangeQueryManager.session.execute( + TokenRangeQueryManager.prepared.bind(token_range)) TokenRangeQueryManager.result_handler(results) diff --git a/monasca_persister/repositories/influxdb/alarm_state_history_repository.py b/monasca_persister/repositories/influxdb/alarm_state_history_repository.py index 416393b2..9e1da344 100644 --- a/monasca_persister/repositories/influxdb/alarm_state_history_repository.py +++ b/monasca_persister/repositories/influxdb/alarm_state_history_repository.py @@ -24,7 +24,7 @@ LOG = log.getLogger(__name__) class AlarmStateHistInfluxdbRepository( - abstract_repository.AbstractInfluxdbRepository): + abstract_repository.AbstractInfluxdbRepository): def __init__(self): @@ -36,7 +36,7 @@ class AlarmStateHistInfluxdbRepository( lifecycle_state, state_change_reason, sub_alarms_json_snake_case, tenant_id, time_stamp) = parse_alarm_state_hist_message( - message) + message) name = u'alarm_state_history' fields = [] diff --git a/monasca_persister/repositories/influxdb/metrics_repository.py b/monasca_persister/repositories/influxdb/metrics_repository.py index 162d9143..e9641cdc 100644 --- a/monasca_persister/repositories/influxdb/metrics_repository.py +++ b/monasca_persister/repositories/influxdb/metrics_repository.py @@ -55,7 +55,8 @@ class MetricInfluxdbRepository(abstract_repository.AbstractInfluxdbRepository): value_field = u'value={}'.format(value) value_meta_field = u'value_meta=' + value_meta_str - data = key_values + u' ' + value_field + u',' + value_meta_field + u' ' + str(int(time_stamp)) + data = key_values + u' ' + value_field + u',' + \ + value_meta_field + u' ' + str(int(time_stamp)) LOG.debug(data) diff --git a/monasca_persister/repositories/persister.py b/monasca_persister/repositories/persister.py index f504910c..48095402 100644 --- a/monasca_persister/repositories/persister.py +++ b/monasca_persister/repositories/persister.py @@ -33,14 +33,14 @@ class Persister(object): self._batch_size = kafka_conf.batch_size self._consumer = consumer.KafkaConsumer( - kafka_conf.uri, - zookeeper_conf.uri, - kafka_conf.zookeeper_path, - kafka_conf.group_id, - kafka_conf.topic, - repartition_callback=self._flush, - commit_callback=self._flush, - commit_timeout=kafka_conf.max_wait_time_seconds) + kafka_conf.uri, + zookeeper_conf.uri, + kafka_conf.zookeeper_path, + kafka_conf.group_id, + kafka_conf.topic, + repartition_callback=self._flush, + commit_callback=self._flush, + commit_timeout=kafka_conf.max_wait_time_seconds) self.repository = repository() @@ -52,7 +52,7 @@ class Persister(object): self.repository.write_batch(self._data_points) LOG.info("Processed {} messages from topic '{}'".format( - len(self._data_points), self._kafka_topic)) + len(self._data_points), self._kafka_topic)) self._data_points = [] self._consumer.commit() @@ -79,7 +79,7 @@ class Persister(object): self._flush() except Exception: LOG.exception( - 'Persister encountered fatal exception processing ' - 'messages. ' - 'Shutting down all threads and exiting') + 'Persister encountered fatal exception processing ' + 'messages. ' + 'Shutting down all threads and exiting') os._exit(1) diff --git a/monasca_persister/repositories/utils.py b/monasca_persister/repositories/utils.py index fdf15e5c..0f165ffb 100644 --- a/monasca_persister/repositories/utils.py +++ b/monasca_persister/repositories/utils.py @@ -78,21 +78,21 @@ def parse_alarm_state_hist_message(message): sub_alarms_json = json.dumps(sub_alarms, ensure_ascii=False) sub_alarms_json_snake_case = sub_alarms_json.replace( - '"subAlarmExpression":', - '"sub_alarm_expression":') + '"subAlarmExpression":', + '"sub_alarm_expression":') sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace( - '"currentValues":', - '"current_values":') + '"currentValues":', + '"current_values":') # jobrs: I do not think that this shows up sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace( - '"metricDefinition":', - '"metric_definition":') + '"metricDefinition":', + '"metric_definition":') sub_alarms_json_snake_case = sub_alarms_json_snake_case.replace( - '"subAlarmState":', - '"sub_alarm_state":') + '"subAlarmState":', + '"sub_alarm_state":') else: sub_alarms_json_snake_case = "[]" diff --git a/monasca_persister/tests/test_persister_main.py b/monasca_persister/tests/test_persister_main.py index 6ca03249..964df307 100644 --- a/monasca_persister/tests/test_persister_main.py +++ b/monasca_persister/tests/test_persister_main.py @@ -119,7 +119,7 @@ class TestPersister(base.BaseTestCase): def test_active_children_are_killed_during_exit(self): with patch.object(self.persister.multiprocessing, 'active_children') as active_children,\ - patch.object(self.persister.os, 'kill') as mock_kill: + patch.object(self.persister.os, 'kill') as mock_kill: active_children.return_value = [Mock(name='child-1', pid=1), Mock(name='child-2', pid=2)] diff --git a/tox.ini b/tox.ini index 638d703f..768878b2 100644 --- a/tox.ini +++ b/tox.ini @@ -67,11 +67,10 @@ commands = flake8 monasca_persister [flake8] -max-line-length = 120 +max-line-length = 100 # TODO: ignored checks should be enabled in the future # H405 multi line docstring summary not separated with an empty line -# H904 Wrap long lines in parentheses instead of a backslash -ignore = F821,H405,H904,E126,E125,H306,E302,E122 +ignore = F821,H405,H306,E302 exclude=.venv,.git,.tox,dist,*egg,build [bandit]