metrics_db_check: move check methods to each metrics drivers

monasca_api.healthcheck.metrics_db_check has two healthcheck methods for
InfluxDB and Cassandra. They should be in each metrics_repository.

Change-Id: I6c9cc637eba6c3096b5319979434cfd9f0b78a9a
This commit is contained in:
Akira Yoshiyama 2017-09-07 11:13:31 +09:00
parent 37b2111435
commit 0915993633
7 changed files with 108 additions and 237 deletions

View File

@ -29,6 +29,7 @@ from monasca_common.rest import utils as rest_utils
from monasca_api.common.repositories import exceptions from monasca_api.common.repositories import exceptions
from monasca_api.common.repositories import metrics_repository from monasca_api.common.repositories import metrics_repository
CONF = cfg.CONF
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -36,13 +37,11 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository):
def __init__(self): def __init__(self):
try: try:
self.conf = cfg.CONF
self._cassandra_cluster = Cluster( self._cassandra_cluster = Cluster(
self.conf.cassandra.cluster_ip_addresses CONF.cassandra.cluster_ip_addresses
) )
self.cassandra_session = self._cassandra_cluster.connect( self.cassandra_session = self._cassandra_cluster.connect(
self.conf.cassandra.keyspace CONF.cassandra.keyspace
) )
except Exception as ex: except Exception as ex:
LOG.exception(ex) LOG.exception(ex)
@ -744,3 +743,16 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository):
except Exception as ex: except Exception as ex:
LOG.exception(ex) LOG.exception(ex)
raise exceptions.RepositoryException(ex) raise exceptions.RepositoryException(ex)
@staticmethod
def check_status():
try:
cluster = Cluster(
CONF.cassandra.cluster_ip_addresses
)
session = cluster.connect(CONF.cassandra.keyspace)
session.shutdown()
except Exception as ex:
LOG.exception(str(ex))
return False, str(ex)
return True, 'OK'

View File

@ -21,6 +21,7 @@ from influxdb.exceptions import InfluxDBClientError
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
from oslo_utils import timeutils from oslo_utils import timeutils
import requests
from monasca_common.rest import utils as rest_utils from monasca_common.rest import utils as rest_utils
@ -29,6 +30,7 @@ from monasca_api.common.repositories import metrics_repository
MEASUREMENT_NOT_FOUND_MSG = "measurement not found" MEASUREMENT_NOT_FOUND_MSG = "measurement not found"
CONF = cfg.CONF
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -902,3 +904,16 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository):
except Exception as ex: except Exception as ex:
LOG.exception(ex) LOG.exception(ex)
raise exceptions.RepositoryException(ex) raise exceptions.RepositoryException(ex)
@staticmethod
def check_status():
uri = 'http://{0}:{1}/ping'.format(CONF.influxdb.ip_address,
CONF.influxdb.port)
try:
resp = requests.head(url=uri)
except Exception as ex:
LOG.exception(str(ex))
return False, str(ex)
return resp.ok, 'OK' if resp.ok else 'Error: {0}'.format(
resp.status_code)

View File

@ -56,3 +56,8 @@ class AbstractMetricsRepository(object):
def alarm_history(self, tenant_id, alarm_id_list, def alarm_history(self, tenant_id, alarm_id_list,
offset, limit, start_timestamp, end_timestamp): offset, limit, start_timestamp, end_timestamp):
pass pass
@staticmethod
@abc.abstractmethod
def check_status():
pass

View File

@ -13,17 +13,14 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import requests from monasca_common.simport import simport
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
from oslo_utils import importutils
from monasca_api.common.repositories import exceptions
from monasca_api.healthcheck import base from monasca_api.healthcheck import base
LOG = log.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
LOG = log.getLogger(__name__)
class MetricsDbCheck(base.BaseHealthCheck): class MetricsDbCheck(base.BaseHealthCheck):
@ -32,67 +29,20 @@ class MetricsDbCheck(base.BaseHealthCheck):
Healthcheck what type of database is used (InfluxDB, Cassandra) Healthcheck what type of database is used (InfluxDB, Cassandra)
and provide health according to the given db. and provide health according to the given db.
Healthcheck for InfluxDB verifies if:
* check the db status by the /ping endpoint.
Healthcheck for the Cassandra verifies if:
* Cassandra is up and running (it is possible to create new connection)
* keyspace exists
If following conditions are met health check return healthy status. If following conditions are met health check return healthy status.
Otherwise unhealthy status is returned with explanation. Otherwise unhealthy status is returned with explanation.
""" """
def __init__(self): def __init__(self):
# Try to import cassandra. Not a problem if it can't be imported as long try:
# as the metrics db is influx self._metrics_repo = simport.load(
self._cluster = importutils.try_import('cassandra.cluster', None) CONF.repositories.metrics_driver)
metric_driver = CONF.repositories.metrics_driver
self._db = self._detected_database_type(metric_driver) except Exception as ex:
if self._db == 'cassandra' and self._cluster is None: LOG.exception(ex)
# Should not happen, but log if it does somehow raise
LOG.error("Metrics Database is Cassandra but cassandra.cluster"
"not importable. Unable to do health check")
def health_check(self): def health_check(self):
if self._db == 'influxdb': status = self._metrics_repo.check_status()
status = self._check_influxdb_status()
else:
status = self._check_cassandra_status()
return base.CheckResult(healthy=status[0], return base.CheckResult(healthy=status[0],
message=status[1]) message=status[1])
def _detected_database_type(self, driver):
if 'influxdb' in driver:
return 'influxdb'
elif 'cassandra' in driver:
return 'cassandra'
else:
raise exceptions.UnsupportedDriverException(
'Driver {0} is not supported by Healthcheck'.format(driver))
def _check_influxdb_status(self):
uri = 'http://{0}:{1}/ping'.format(CONF.influxdb.ip_address,
CONF.influxdb.port)
try:
resp = requests.head(url=uri)
except Exception as ex:
LOG.exception(str(ex))
return False, str(ex)
return resp.ok, 'OK' if resp.ok else 'Error: {0}'.format(
resp.status_code)
def _check_cassandra_status(self):
if self._cluster is None:
return False, "Cassandra driver not imported"
try:
cassandra = self._cluster.Cluster(
CONF.cassandra.cluster_ip_addresses
)
session = cassandra.connect(CONF.cassandra.keyspace)
session.shutdown()
except Exception as ex:
LOG.exception(str(ex))
return False, str(ex)
return True, 'OK'

View File

@ -35,7 +35,9 @@ class TestHealthChecks(test_base.BaseApiTestCase):
) )
@mock.patch('monasca_api.healthcheck.alarms_db_check.sql_repository.get_engine') @mock.patch('monasca_api.healthcheck.alarms_db_check.sql_repository.get_engine')
def test_should_return_200_for_head(self, _): @mock.patch(
'monasca_api.healthcheck.metrics_db_check.MetricsDbCheck')
def test_should_return_200_for_head(self, metrics_db_check, _):
self.set_route() self.set_route()
self.simulate_request(ENDPOINT, method='HEAD') self.simulate_request(ENDPOINT, method='HEAD')
self.assertEqual(falcon.HTTP_NO_CONTENT, self.srmock.status) self.assertEqual(falcon.HTTP_NO_CONTENT, self.srmock.status)

View File

@ -13,12 +13,10 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from cassandra import cluster as cl
import requests
import mock import mock
from monasca_api.common.repositories import exceptions from monasca_common.simport import simport
from monasca_api import config from monasca_api import config
from monasca_api.healthcheck import metrics_db_check as tdc from monasca_api.healthcheck import metrics_db_check as tdc
from monasca_api.tests import base from monasca_api.tests import base
@ -27,183 +25,31 @@ CONF = config.CONF
class TestMetricsDbHealthCheck(base.BaseTestCase): class TestMetricsDbHealthCheck(base.BaseTestCase):
cassandra_conf = {
'cluster_ip_addresses': 'localhost',
'keyspace': 'test'
}
def __init__(self, *args, **kwargs): @mock.patch("monasca_api.healthcheck.metrics_db_check.simport")
super(TestMetricsDbHealthCheck, self).__init__(*args, **kwargs) def test_health_check(self, simport_mock):
self._conf = None metrics_repo_mock = simport_mock.load.return_value
metrics_repo_mock.check_status.return_value = (True, 'OK')
def setUp(self):
super(TestMetricsDbHealthCheck, self).setUp()
self.conf_default(group='cassandra', **self.cassandra_conf)
def test_should_detect_influxdb_db(self):
db_health = tdc.MetricsDbCheck() db_health = tdc.MetricsDbCheck()
# check if influxdb is detected
self.assertEqual('influxdb', db_health._detected_database_type(
'influxdb.metrics_repository'))
def test_should_detect_cassandra_db(self):
db_health = tdc.MetricsDbCheck()
# check if cassandra is detected
self.assertEqual('cassandra', db_health._detected_database_type(
'cassandra.metrics_repository'))
def test_should_raise_exception_during_db_detection(self):
db_health = tdc.MetricsDbCheck()
# check exception
db = 'postgresql.metrics_repository'
self.assertRaises(exceptions.UnsupportedDriverException, db_health._detected_database_type, db)
@mock.patch.object(requests, 'head')
def test_should_fail_influxdb_connection(self, req):
response_mock = mock.Mock()
response_mock.ok = False
response_mock.status_code = 500
req.return_value = response_mock
influxdb_conf = {
'ip_address': 'localhost',
'port': 8086
}
messaging_conf = {
'metrics_driver': 'influxdb.metrics_repository:MetricsRepository'
}
self.conf_override(group='repositories', **messaging_conf)
self.conf_override(group='influxdb', **influxdb_conf)
db_health = tdc.MetricsDbCheck()
result = db_health.health_check()
self.assertFalse(result.healthy)
self.assertEqual('Error: 500', result.message)
@mock.patch.object(requests, 'head')
def test_should_fail_influxdb_wrong_port_number(self, req):
response_mock = mock.Mock()
response_mock.ok = False
response_mock.status_code = 404
req.return_value = response_mock
influxdb_conf = {
'ip_address': 'localhost',
'port': 8099
}
messaging_conf = {
'metrics_driver': 'influxdb.metrics_repository:MetricsRepository'
}
self.conf_override(group='repositories', **messaging_conf)
self.conf_override(group='influxdb', **influxdb_conf)
db_health = tdc.MetricsDbCheck()
result = db_health.health_check()
self.assertFalse(result.healthy)
self.assertEqual('Error: 404', result.message)
@mock.patch.object(requests, 'head')
def test_should_fail_influxdb_service_unavailable(self, req):
response_mock = mock.Mock()
req.side_effect = requests.HTTPError()
req.return_value = response_mock
influxdb_conf = {
'ip_address': 'localhost',
'port': 8096
}
messaging_conf = {
'metrics_driver': 'influxdb.metrics_repository:MetricsRepository'
}
self.conf_override(group='repositories', **messaging_conf)
self.conf_override(group='influxdb', **influxdb_conf)
db_health = tdc.MetricsDbCheck()
result = db_health.health_check()
self.assertFalse(result.healthy)
@mock.patch.object(requests, 'head')
def test_should_pass_infuxdb_available(self, req):
response_mock = mock.Mock()
response_mock.ok = True
response_mock.status_code = 204
req.return_value = response_mock
influxdb_conf = {
'ip_address': 'localhost',
'port': 8086
}
messaging_conf = {
'metrics_driver': 'influxdb.metrics_repository:MetricsRepository'
}
self.conf_override(group='repositories', **messaging_conf)
self.conf_override(group='influxdb', **influxdb_conf)
db_health = tdc.MetricsDbCheck()
result = db_health.health_check() result = db_health.health_check()
self.assertTrue(result.healthy) self.assertTrue(result.healthy)
self.assertEqual('OK', result.message) self.assertEqual(result.message, 'OK')
@mock.patch('monasca_api.healthcheck.metrics_db_check.importutils.try_import')
def test_should_fail_cassandra_unavailable(self, try_import):
messaging_conf = {
'metrics_driver': 'cassandra.metrics_repository:MetricsRepository'
}
cassandra_conf = {
'cluster_ip_addresses': 'localhost',
'keyspace': 'test'
}
self.conf_override(group='repositories', **messaging_conf)
self.conf_override(group='cassandra', **cassandra_conf)
cluster = mock.Mock()
cas_mock = mock.Mock()
cas_mock.side_effect = cl.NoHostAvailable(message='Host unavailable',
errors='Unavailable')
cluster.Cluster = cas_mock
try_import.return_value = cluster
@mock.patch("monasca_api.healthcheck.metrics_db_check.simport")
def test_health_check_failed(self, simport_mock):
metrics_repo_mock = simport_mock.load.return_value
metrics_repo_mock.check_status.return_value = (False, 'Error')
db_health = tdc.MetricsDbCheck() db_health = tdc.MetricsDbCheck()
result = db_health.health_check() result = db_health.health_check()
self.assertFalse(result.healthy) self.assertFalse(result.healthy)
self.assertEqual(result.message, 'Error')
@mock.patch('monasca_api.healthcheck.metrics_db_check.importutils.try_import') @mock.patch("monasca_api.healthcheck.metrics_db_check.simport")
def test_should_fail_cassandra_no_driver(self, try_import): def test_health_check_load_failed(self, simport_mock):
messaging_conf = { simport_mock.load.side_effect = simport.ImportFailed(
'metrics_driver': 'cassandra.metrics_repository:MetricsRepository' "Failed to import 'foo'. Error: bar")
} self.assertRaises(simport.ImportFailed, tdc.MetricsDbCheck)
cassandra_conf = {
'cluster_ip_addresses': 'localhost',
'keyspace': 'test'
}
self.conf_override(group='repositories', **messaging_conf)
self.conf_override(group='cassandra', **cassandra_conf)
# Simulate cassandra driver not available
try_import.return_value = None
db_health = tdc.MetricsDbCheck()
db_health.cluster = None
result = db_health.health_check()
self.assertFalse(result.healthy)
@mock.patch('monasca_api.healthcheck.metrics_db_check.importutils.try_import')
def test_should_pass_cassandra_is_available(self, _):
messaging_conf = {
'metrics_driver': 'cassandra.metrics_repository:MetricsRepository'
}
cassandra_conf = {
'cluster_ip_addresses': 'localhost',
'keyspace': 'test'
}
self.conf_override(group='repositories', **messaging_conf)
self.conf_override(group='cassandra', **cassandra_conf)
db_health = tdc.MetricsDbCheck()
result = db_health.health_check()
self.assertTrue(result.healthy)

View File

@ -18,6 +18,7 @@ import binascii
from collections import namedtuple from collections import namedtuple
from datetime import datetime from datetime import datetime
import cassandra
from mock import patch from mock import patch
from oslo_config import cfg from oslo_config import cfg
@ -185,6 +186,26 @@ class TestRepoMetricsInfluxDB(base.BaseTestCase):
{u'dimension_name': u'service'} {u'dimension_name': u'service'}
]) ])
@patch("monasca_api.common.repositories.influxdb."
"metrics_repository.requests.head")
def test_check_status(self, head_mock):
head_mock.return_value.ok = True
head_mock.return_value.status_code = 204
result = influxdb_repo.MetricsRepository.check_status()
self.assertEqual(result, (True, 'OK'))
@patch("monasca_api.common.repositories.influxdb."
"metrics_repository.requests.head")
def test_check_status_server_error(self, head_mock):
head_mock.return_value.status_code = 500
head_mock.return_value.ok = False
result = influxdb_repo.MetricsRepository.check_status()
self.assertEqual(result, (False, 'Error: 500'))
class TestRepoMetricsCassandra(base.BaseTestCase): class TestRepoMetricsCassandra(base.BaseTestCase):
@ -479,6 +500,26 @@ class TestRepoMetricsCassandra(base.BaseTestCase):
] ]
}], result) }], result)
@patch("monasca_api.common.repositories.cassandra."
"metrics_repository.Cluster.connect")
def test_check_status(self, _):
repo = cassandra_repo.MetricsRepository()
result = repo.check_status()
self.assertEqual(result, (True, 'OK'))
@patch("monasca_api.common.repositories.cassandra."
"metrics_repository.Cluster.connect")
def test_check_status_server_error(self, cassandra_connect_mock):
repo = cassandra_repo.MetricsRepository()
cassandra_connect_mock.side_effect = \
cassandra.DriverException("Cluster is already shut down")
result = repo.check_status()
self.assertEqual(result, (False, 'Cluster is already shut down'))
@staticmethod @staticmethod
def _convert_time_string(date_time_string): def _convert_time_string(date_time_string):
dt = timeutils.parse_isotime(date_time_string) dt = timeutils.parse_isotime(date_time_string)