From 091599363350bdc114c8a6f72225834afc8ec18c Mon Sep 17 00:00:00 2001 From: Akira Yoshiyama Date: Thu, 7 Sep 2017 11:13:31 +0900 Subject: [PATCH] metrics_db_check: move check methods to each metrics drivers monasca_api.healthcheck.metrics_db_check has two healthcheck methods for InfluxDB and Cassandra. They should be in each metrics_repository. Change-Id: I6c9cc637eba6c3096b5319979434cfd9f0b78a9a --- .../cassandra/metrics_repository.py | 20 +- .../influxdb/metrics_repository.py | 15 ++ .../common/repositories/metrics_repository.py | 5 + monasca_api/healthcheck/metrics_db_check.py | 70 +------ monasca_api/tests/test_healthchecks.py | 4 +- .../tests/test_metrics_db_health_check.py | 190 ++---------------- monasca_api/tests/test_repositories.py | 41 ++++ 7 files changed, 108 insertions(+), 237 deletions(-) diff --git a/monasca_api/common/repositories/cassandra/metrics_repository.py b/monasca_api/common/repositories/cassandra/metrics_repository.py index 140e10fd6..6b9659b07 100644 --- a/monasca_api/common/repositories/cassandra/metrics_repository.py +++ b/monasca_api/common/repositories/cassandra/metrics_repository.py @@ -29,6 +29,7 @@ from monasca_common.rest import utils as rest_utils from monasca_api.common.repositories import exceptions from monasca_api.common.repositories import metrics_repository +CONF = cfg.CONF LOG = log.getLogger(__name__) @@ -36,13 +37,11 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository): def __init__(self): try: - - self.conf = cfg.CONF self._cassandra_cluster = Cluster( - self.conf.cassandra.cluster_ip_addresses + CONF.cassandra.cluster_ip_addresses ) self.cassandra_session = self._cassandra_cluster.connect( - self.conf.cassandra.keyspace + CONF.cassandra.keyspace ) except Exception as ex: LOG.exception(ex) @@ -744,3 +743,16 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository): except Exception as ex: LOG.exception(ex) raise exceptions.RepositoryException(ex) + + @staticmethod + def check_status(): + try: + cluster = Cluster( + CONF.cassandra.cluster_ip_addresses + ) + session = cluster.connect(CONF.cassandra.keyspace) + session.shutdown() + except Exception as ex: + LOG.exception(str(ex)) + return False, str(ex) + return True, 'OK' diff --git a/monasca_api/common/repositories/influxdb/metrics_repository.py b/monasca_api/common/repositories/influxdb/metrics_repository.py index ee2d1d507..6160f1a40 100644 --- a/monasca_api/common/repositories/influxdb/metrics_repository.py +++ b/monasca_api/common/repositories/influxdb/metrics_repository.py @@ -21,6 +21,7 @@ from influxdb.exceptions import InfluxDBClientError from oslo_config import cfg from oslo_log import log from oslo_utils import timeutils +import requests from monasca_common.rest import utils as rest_utils @@ -29,6 +30,7 @@ from monasca_api.common.repositories import metrics_repository MEASUREMENT_NOT_FOUND_MSG = "measurement not found" +CONF = cfg.CONF LOG = log.getLogger(__name__) @@ -902,3 +904,16 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository): except Exception as ex: LOG.exception(ex) raise exceptions.RepositoryException(ex) + + @staticmethod + def check_status(): + uri = 'http://{0}:{1}/ping'.format(CONF.influxdb.ip_address, + CONF.influxdb.port) + try: + resp = requests.head(url=uri) + except Exception as ex: + LOG.exception(str(ex)) + return False, str(ex) + + return resp.ok, 'OK' if resp.ok else 'Error: {0}'.format( + resp.status_code) diff --git a/monasca_api/common/repositories/metrics_repository.py b/monasca_api/common/repositories/metrics_repository.py index e40e3b1b0..c63f98894 100644 --- a/monasca_api/common/repositories/metrics_repository.py +++ b/monasca_api/common/repositories/metrics_repository.py @@ -56,3 +56,8 @@ class AbstractMetricsRepository(object): def alarm_history(self, tenant_id, alarm_id_list, offset, limit, start_timestamp, end_timestamp): pass + + @staticmethod + @abc.abstractmethod + def check_status(): + pass diff --git a/monasca_api/healthcheck/metrics_db_check.py b/monasca_api/healthcheck/metrics_db_check.py index 4839b9f60..9c368abd6 100644 --- a/monasca_api/healthcheck/metrics_db_check.py +++ b/monasca_api/healthcheck/metrics_db_check.py @@ -13,17 +13,14 @@ # License for the specific language governing permissions and limitations # under the License. -import requests - +from monasca_common.simport import simport from oslo_config import cfg from oslo_log import log -from oslo_utils import importutils -from monasca_api.common.repositories import exceptions from monasca_api.healthcheck import base -LOG = log.getLogger(__name__) CONF = cfg.CONF +LOG = log.getLogger(__name__) class MetricsDbCheck(base.BaseHealthCheck): @@ -32,67 +29,20 @@ class MetricsDbCheck(base.BaseHealthCheck): Healthcheck what type of database is used (InfluxDB, Cassandra) and provide health according to the given db. - Healthcheck for InfluxDB verifies if: - * check the db status by the /ping endpoint. - - Healthcheck for the Cassandra verifies if: - * Cassandra is up and running (it is possible to create new connection) - * keyspace exists - If following conditions are met health check return healthy status. Otherwise unhealthy status is returned with explanation. """ def __init__(self): - # Try to import cassandra. Not a problem if it can't be imported as long - # as the metrics db is influx - self._cluster = importutils.try_import('cassandra.cluster', None) - metric_driver = CONF.repositories.metrics_driver - self._db = self._detected_database_type(metric_driver) - if self._db == 'cassandra' and self._cluster is None: - # Should not happen, but log if it does somehow - LOG.error("Metrics Database is Cassandra but cassandra.cluster" - "not importable. Unable to do health check") + try: + self._metrics_repo = simport.load( + CONF.repositories.metrics_driver) + + except Exception as ex: + LOG.exception(ex) + raise def health_check(self): - if self._db == 'influxdb': - status = self._check_influxdb_status() - else: - status = self._check_cassandra_status() - + status = self._metrics_repo.check_status() return base.CheckResult(healthy=status[0], message=status[1]) - - def _detected_database_type(self, driver): - if 'influxdb' in driver: - return 'influxdb' - elif 'cassandra' in driver: - return 'cassandra' - else: - raise exceptions.UnsupportedDriverException( - 'Driver {0} is not supported by Healthcheck'.format(driver)) - - def _check_influxdb_status(self): - uri = 'http://{0}:{1}/ping'.format(CONF.influxdb.ip_address, - CONF.influxdb.port) - try: - resp = requests.head(url=uri) - except Exception as ex: - LOG.exception(str(ex)) - return False, str(ex) - return resp.ok, 'OK' if resp.ok else 'Error: {0}'.format( - resp.status_code) - - def _check_cassandra_status(self): - if self._cluster is None: - return False, "Cassandra driver not imported" - try: - cassandra = self._cluster.Cluster( - CONF.cassandra.cluster_ip_addresses - ) - session = cassandra.connect(CONF.cassandra.keyspace) - session.shutdown() - except Exception as ex: - LOG.exception(str(ex)) - return False, str(ex) - return True, 'OK' diff --git a/monasca_api/tests/test_healthchecks.py b/monasca_api/tests/test_healthchecks.py index 5589fb6a6..03016b8c4 100644 --- a/monasca_api/tests/test_healthchecks.py +++ b/monasca_api/tests/test_healthchecks.py @@ -35,7 +35,9 @@ class TestHealthChecks(test_base.BaseApiTestCase): ) @mock.patch('monasca_api.healthcheck.alarms_db_check.sql_repository.get_engine') - def test_should_return_200_for_head(self, _): + @mock.patch( + 'monasca_api.healthcheck.metrics_db_check.MetricsDbCheck') + def test_should_return_200_for_head(self, metrics_db_check, _): self.set_route() self.simulate_request(ENDPOINT, method='HEAD') self.assertEqual(falcon.HTTP_NO_CONTENT, self.srmock.status) diff --git a/monasca_api/tests/test_metrics_db_health_check.py b/monasca_api/tests/test_metrics_db_health_check.py index e1ef69520..53cbadb2e 100644 --- a/monasca_api/tests/test_metrics_db_health_check.py +++ b/monasca_api/tests/test_metrics_db_health_check.py @@ -13,12 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. -from cassandra import cluster as cl -import requests - import mock -from monasca_api.common.repositories import exceptions +from monasca_common.simport import simport + from monasca_api import config from monasca_api.healthcheck import metrics_db_check as tdc from monasca_api.tests import base @@ -27,183 +25,31 @@ CONF = config.CONF class TestMetricsDbHealthCheck(base.BaseTestCase): - cassandra_conf = { - 'cluster_ip_addresses': 'localhost', - 'keyspace': 'test' - } - def __init__(self, *args, **kwargs): - super(TestMetricsDbHealthCheck, self).__init__(*args, **kwargs) - self._conf = None - - def setUp(self): - super(TestMetricsDbHealthCheck, self).setUp() - self.conf_default(group='cassandra', **self.cassandra_conf) - - def test_should_detect_influxdb_db(self): + @mock.patch("monasca_api.healthcheck.metrics_db_check.simport") + def test_health_check(self, simport_mock): + metrics_repo_mock = simport_mock.load.return_value + metrics_repo_mock.check_status.return_value = (True, 'OK') db_health = tdc.MetricsDbCheck() - # check if influxdb is detected - self.assertEqual('influxdb', db_health._detected_database_type( - 'influxdb.metrics_repository')) - - def test_should_detect_cassandra_db(self): - db_health = tdc.MetricsDbCheck() - # check if cassandra is detected - self.assertEqual('cassandra', db_health._detected_database_type( - 'cassandra.metrics_repository')) - - def test_should_raise_exception_during_db_detection(self): - db_health = tdc.MetricsDbCheck() - # check exception - db = 'postgresql.metrics_repository' - self.assertRaises(exceptions.UnsupportedDriverException, db_health._detected_database_type, db) - - @mock.patch.object(requests, 'head') - def test_should_fail_influxdb_connection(self, req): - response_mock = mock.Mock() - response_mock.ok = False - response_mock.status_code = 500 - req.return_value = response_mock - - influxdb_conf = { - 'ip_address': 'localhost', - 'port': 8086 - } - messaging_conf = { - 'metrics_driver': 'influxdb.metrics_repository:MetricsRepository' - } - self.conf_override(group='repositories', **messaging_conf) - self.conf_override(group='influxdb', **influxdb_conf) - - db_health = tdc.MetricsDbCheck() - result = db_health.health_check() - - self.assertFalse(result.healthy) - self.assertEqual('Error: 500', result.message) - - @mock.patch.object(requests, 'head') - def test_should_fail_influxdb_wrong_port_number(self, req): - response_mock = mock.Mock() - response_mock.ok = False - response_mock.status_code = 404 - req.return_value = response_mock - influxdb_conf = { - 'ip_address': 'localhost', - 'port': 8099 - } - messaging_conf = { - 'metrics_driver': 'influxdb.metrics_repository:MetricsRepository' - } - self.conf_override(group='repositories', **messaging_conf) - self.conf_override(group='influxdb', **influxdb_conf) - - db_health = tdc.MetricsDbCheck() - result = db_health.health_check() - - self.assertFalse(result.healthy) - self.assertEqual('Error: 404', result.message) - - @mock.patch.object(requests, 'head') - def test_should_fail_influxdb_service_unavailable(self, req): - response_mock = mock.Mock() - req.side_effect = requests.HTTPError() - req.return_value = response_mock - influxdb_conf = { - 'ip_address': 'localhost', - 'port': 8096 - } - messaging_conf = { - 'metrics_driver': 'influxdb.metrics_repository:MetricsRepository' - } - self.conf_override(group='repositories', **messaging_conf) - self.conf_override(group='influxdb', **influxdb_conf) - - db_health = tdc.MetricsDbCheck() - result = db_health.health_check() - - self.assertFalse(result.healthy) - - @mock.patch.object(requests, 'head') - def test_should_pass_infuxdb_available(self, req): - response_mock = mock.Mock() - response_mock.ok = True - response_mock.status_code = 204 - req.return_value = response_mock - influxdb_conf = { - 'ip_address': 'localhost', - 'port': 8086 - } - messaging_conf = { - 'metrics_driver': 'influxdb.metrics_repository:MetricsRepository' - } - self.conf_override(group='repositories', **messaging_conf) - self.conf_override(group='influxdb', **influxdb_conf) - - db_health = tdc.MetricsDbCheck() result = db_health.health_check() self.assertTrue(result.healthy) - self.assertEqual('OK', result.message) - - @mock.patch('monasca_api.healthcheck.metrics_db_check.importutils.try_import') - def test_should_fail_cassandra_unavailable(self, try_import): - messaging_conf = { - 'metrics_driver': 'cassandra.metrics_repository:MetricsRepository' - } - cassandra_conf = { - 'cluster_ip_addresses': 'localhost', - 'keyspace': 'test' - } - self.conf_override(group='repositories', **messaging_conf) - self.conf_override(group='cassandra', **cassandra_conf) - - cluster = mock.Mock() - cas_mock = mock.Mock() - cas_mock.side_effect = cl.NoHostAvailable(message='Host unavailable', - errors='Unavailable') - cluster.Cluster = cas_mock - try_import.return_value = cluster + self.assertEqual(result.message, 'OK') + @mock.patch("monasca_api.healthcheck.metrics_db_check.simport") + def test_health_check_failed(self, simport_mock): + metrics_repo_mock = simport_mock.load.return_value + metrics_repo_mock.check_status.return_value = (False, 'Error') db_health = tdc.MetricsDbCheck() + result = db_health.health_check() self.assertFalse(result.healthy) + self.assertEqual(result.message, 'Error') - @mock.patch('monasca_api.healthcheck.metrics_db_check.importutils.try_import') - def test_should_fail_cassandra_no_driver(self, try_import): - messaging_conf = { - 'metrics_driver': 'cassandra.metrics_repository:MetricsRepository' - } - cassandra_conf = { - 'cluster_ip_addresses': 'localhost', - 'keyspace': 'test' - } - self.conf_override(group='repositories', **messaging_conf) - self.conf_override(group='cassandra', **cassandra_conf) - - # Simulate cassandra driver not available - try_import.return_value = None - - db_health = tdc.MetricsDbCheck() - db_health.cluster = None - result = db_health.health_check() - - self.assertFalse(result.healthy) - - @mock.patch('monasca_api.healthcheck.metrics_db_check.importutils.try_import') - def test_should_pass_cassandra_is_available(self, _): - messaging_conf = { - 'metrics_driver': 'cassandra.metrics_repository:MetricsRepository' - } - cassandra_conf = { - 'cluster_ip_addresses': 'localhost', - 'keyspace': 'test' - } - self.conf_override(group='repositories', **messaging_conf) - self.conf_override(group='cassandra', **cassandra_conf) - - db_health = tdc.MetricsDbCheck() - result = db_health.health_check() - - self.assertTrue(result.healthy) + @mock.patch("monasca_api.healthcheck.metrics_db_check.simport") + def test_health_check_load_failed(self, simport_mock): + simport_mock.load.side_effect = simport.ImportFailed( + "Failed to import 'foo'. Error: bar") + self.assertRaises(simport.ImportFailed, tdc.MetricsDbCheck) diff --git a/monasca_api/tests/test_repositories.py b/monasca_api/tests/test_repositories.py index 6ef39eb21..431d56657 100644 --- a/monasca_api/tests/test_repositories.py +++ b/monasca_api/tests/test_repositories.py @@ -18,6 +18,7 @@ import binascii from collections import namedtuple from datetime import datetime +import cassandra from mock import patch from oslo_config import cfg @@ -185,6 +186,26 @@ class TestRepoMetricsInfluxDB(base.BaseTestCase): {u'dimension_name': u'service'} ]) + @patch("monasca_api.common.repositories.influxdb." + "metrics_repository.requests.head") + def test_check_status(self, head_mock): + head_mock.return_value.ok = True + head_mock.return_value.status_code = 204 + + result = influxdb_repo.MetricsRepository.check_status() + + self.assertEqual(result, (True, 'OK')) + + @patch("monasca_api.common.repositories.influxdb." + "metrics_repository.requests.head") + def test_check_status_server_error(self, head_mock): + head_mock.return_value.status_code = 500 + head_mock.return_value.ok = False + + result = influxdb_repo.MetricsRepository.check_status() + + self.assertEqual(result, (False, 'Error: 500')) + class TestRepoMetricsCassandra(base.BaseTestCase): @@ -479,6 +500,26 @@ class TestRepoMetricsCassandra(base.BaseTestCase): ] }], result) + @patch("monasca_api.common.repositories.cassandra." + "metrics_repository.Cluster.connect") + def test_check_status(self, _): + repo = cassandra_repo.MetricsRepository() + + result = repo.check_status() + + self.assertEqual(result, (True, 'OK')) + + @patch("monasca_api.common.repositories.cassandra." + "metrics_repository.Cluster.connect") + def test_check_status_server_error(self, cassandra_connect_mock): + repo = cassandra_repo.MetricsRepository() + cassandra_connect_mock.side_effect = \ + cassandra.DriverException("Cluster is already shut down") + + result = repo.check_status() + + self.assertEqual(result, (False, 'Cluster is already shut down')) + @staticmethod def _convert_time_string(date_time_string): dt = timeutils.parse_isotime(date_time_string)