metrics_db_check: move check methods to each metrics drivers

monasca_api.healthcheck.metrics_db_check has two healthcheck methods for
InfluxDB and Cassandra. They should be in each metrics_repository.

Change-Id: I6c9cc637eba6c3096b5319979434cfd9f0b78a9a
This commit is contained in:
Akira Yoshiyama 2017-09-07 11:13:31 +09:00
parent 37b2111435
commit 0915993633
7 changed files with 108 additions and 237 deletions

View File

@ -29,6 +29,7 @@ from monasca_common.rest import utils as rest_utils
from monasca_api.common.repositories import exceptions
from monasca_api.common.repositories import metrics_repository
CONF = cfg.CONF
LOG = log.getLogger(__name__)
@ -36,13 +37,11 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository):
def __init__(self):
try:
self.conf = cfg.CONF
self._cassandra_cluster = Cluster(
self.conf.cassandra.cluster_ip_addresses
CONF.cassandra.cluster_ip_addresses
)
self.cassandra_session = self._cassandra_cluster.connect(
self.conf.cassandra.keyspace
CONF.cassandra.keyspace
)
except Exception as ex:
LOG.exception(ex)
@ -744,3 +743,16 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository):
except Exception as ex:
LOG.exception(ex)
raise exceptions.RepositoryException(ex)
@staticmethod
def check_status():
try:
cluster = Cluster(
CONF.cassandra.cluster_ip_addresses
)
session = cluster.connect(CONF.cassandra.keyspace)
session.shutdown()
except Exception as ex:
LOG.exception(str(ex))
return False, str(ex)
return True, 'OK'

View File

@ -21,6 +21,7 @@ from influxdb.exceptions import InfluxDBClientError
from oslo_config import cfg
from oslo_log import log
from oslo_utils import timeutils
import requests
from monasca_common.rest import utils as rest_utils
@ -29,6 +30,7 @@ from monasca_api.common.repositories import metrics_repository
MEASUREMENT_NOT_FOUND_MSG = "measurement not found"
CONF = cfg.CONF
LOG = log.getLogger(__name__)
@ -902,3 +904,16 @@ class MetricsRepository(metrics_repository.AbstractMetricsRepository):
except Exception as ex:
LOG.exception(ex)
raise exceptions.RepositoryException(ex)
@staticmethod
def check_status():
uri = 'http://{0}:{1}/ping'.format(CONF.influxdb.ip_address,
CONF.influxdb.port)
try:
resp = requests.head(url=uri)
except Exception as ex:
LOG.exception(str(ex))
return False, str(ex)
return resp.ok, 'OK' if resp.ok else 'Error: {0}'.format(
resp.status_code)

View File

@ -56,3 +56,8 @@ class AbstractMetricsRepository(object):
def alarm_history(self, tenant_id, alarm_id_list,
offset, limit, start_timestamp, end_timestamp):
pass
@staticmethod
@abc.abstractmethod
def check_status():
pass

View File

@ -13,17 +13,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import requests
from monasca_common.simport import simport
from oslo_config import cfg
from oslo_log import log
from oslo_utils import importutils
from monasca_api.common.repositories import exceptions
from monasca_api.healthcheck import base
LOG = log.getLogger(__name__)
CONF = cfg.CONF
LOG = log.getLogger(__name__)
class MetricsDbCheck(base.BaseHealthCheck):
@ -32,67 +29,20 @@ class MetricsDbCheck(base.BaseHealthCheck):
Healthcheck what type of database is used (InfluxDB, Cassandra)
and provide health according to the given db.
Healthcheck for InfluxDB verifies if:
* check the db status by the /ping endpoint.
Healthcheck for the Cassandra verifies if:
* Cassandra is up and running (it is possible to create new connection)
* keyspace exists
If following conditions are met health check return healthy status.
Otherwise unhealthy status is returned with explanation.
"""
def __init__(self):
# Try to import cassandra. Not a problem if it can't be imported as long
# as the metrics db is influx
self._cluster = importutils.try_import('cassandra.cluster', None)
metric_driver = CONF.repositories.metrics_driver
self._db = self._detected_database_type(metric_driver)
if self._db == 'cassandra' and self._cluster is None:
# Should not happen, but log if it does somehow
LOG.error("Metrics Database is Cassandra but cassandra.cluster"
"not importable. Unable to do health check")
try:
self._metrics_repo = simport.load(
CONF.repositories.metrics_driver)
except Exception as ex:
LOG.exception(ex)
raise
def health_check(self):
if self._db == 'influxdb':
status = self._check_influxdb_status()
else:
status = self._check_cassandra_status()
status = self._metrics_repo.check_status()
return base.CheckResult(healthy=status[0],
message=status[1])
def _detected_database_type(self, driver):
if 'influxdb' in driver:
return 'influxdb'
elif 'cassandra' in driver:
return 'cassandra'
else:
raise exceptions.UnsupportedDriverException(
'Driver {0} is not supported by Healthcheck'.format(driver))
def _check_influxdb_status(self):
uri = 'http://{0}:{1}/ping'.format(CONF.influxdb.ip_address,
CONF.influxdb.port)
try:
resp = requests.head(url=uri)
except Exception as ex:
LOG.exception(str(ex))
return False, str(ex)
return resp.ok, 'OK' if resp.ok else 'Error: {0}'.format(
resp.status_code)
def _check_cassandra_status(self):
if self._cluster is None:
return False, "Cassandra driver not imported"
try:
cassandra = self._cluster.Cluster(
CONF.cassandra.cluster_ip_addresses
)
session = cassandra.connect(CONF.cassandra.keyspace)
session.shutdown()
except Exception as ex:
LOG.exception(str(ex))
return False, str(ex)
return True, 'OK'

View File

@ -35,7 +35,9 @@ class TestHealthChecks(test_base.BaseApiTestCase):
)
@mock.patch('monasca_api.healthcheck.alarms_db_check.sql_repository.get_engine')
def test_should_return_200_for_head(self, _):
@mock.patch(
'monasca_api.healthcheck.metrics_db_check.MetricsDbCheck')
def test_should_return_200_for_head(self, metrics_db_check, _):
self.set_route()
self.simulate_request(ENDPOINT, method='HEAD')
self.assertEqual(falcon.HTTP_NO_CONTENT, self.srmock.status)

View File

@ -13,12 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from cassandra import cluster as cl
import requests
import mock
from monasca_api.common.repositories import exceptions
from monasca_common.simport import simport
from monasca_api import config
from monasca_api.healthcheck import metrics_db_check as tdc
from monasca_api.tests import base
@ -27,183 +25,31 @@ CONF = config.CONF
class TestMetricsDbHealthCheck(base.BaseTestCase):
cassandra_conf = {
'cluster_ip_addresses': 'localhost',
'keyspace': 'test'
}
def __init__(self, *args, **kwargs):
super(TestMetricsDbHealthCheck, self).__init__(*args, **kwargs)
self._conf = None
def setUp(self):
super(TestMetricsDbHealthCheck, self).setUp()
self.conf_default(group='cassandra', **self.cassandra_conf)
def test_should_detect_influxdb_db(self):
@mock.patch("monasca_api.healthcheck.metrics_db_check.simport")
def test_health_check(self, simport_mock):
metrics_repo_mock = simport_mock.load.return_value
metrics_repo_mock.check_status.return_value = (True, 'OK')
db_health = tdc.MetricsDbCheck()
# check if influxdb is detected
self.assertEqual('influxdb', db_health._detected_database_type(
'influxdb.metrics_repository'))
def test_should_detect_cassandra_db(self):
db_health = tdc.MetricsDbCheck()
# check if cassandra is detected
self.assertEqual('cassandra', db_health._detected_database_type(
'cassandra.metrics_repository'))
def test_should_raise_exception_during_db_detection(self):
db_health = tdc.MetricsDbCheck()
# check exception
db = 'postgresql.metrics_repository'
self.assertRaises(exceptions.UnsupportedDriverException, db_health._detected_database_type, db)
@mock.patch.object(requests, 'head')
def test_should_fail_influxdb_connection(self, req):
response_mock = mock.Mock()
response_mock.ok = False
response_mock.status_code = 500
req.return_value = response_mock
influxdb_conf = {
'ip_address': 'localhost',
'port': 8086
}
messaging_conf = {
'metrics_driver': 'influxdb.metrics_repository:MetricsRepository'
}
self.conf_override(group='repositories', **messaging_conf)
self.conf_override(group='influxdb', **influxdb_conf)
db_health = tdc.MetricsDbCheck()
result = db_health.health_check()
self.assertFalse(result.healthy)
self.assertEqual('Error: 500', result.message)
@mock.patch.object(requests, 'head')
def test_should_fail_influxdb_wrong_port_number(self, req):
response_mock = mock.Mock()
response_mock.ok = False
response_mock.status_code = 404
req.return_value = response_mock
influxdb_conf = {
'ip_address': 'localhost',
'port': 8099
}
messaging_conf = {
'metrics_driver': 'influxdb.metrics_repository:MetricsRepository'
}
self.conf_override(group='repositories', **messaging_conf)
self.conf_override(group='influxdb', **influxdb_conf)
db_health = tdc.MetricsDbCheck()
result = db_health.health_check()
self.assertFalse(result.healthy)
self.assertEqual('Error: 404', result.message)
@mock.patch.object(requests, 'head')
def test_should_fail_influxdb_service_unavailable(self, req):
response_mock = mock.Mock()
req.side_effect = requests.HTTPError()
req.return_value = response_mock
influxdb_conf = {
'ip_address': 'localhost',
'port': 8096
}
messaging_conf = {
'metrics_driver': 'influxdb.metrics_repository:MetricsRepository'
}
self.conf_override(group='repositories', **messaging_conf)
self.conf_override(group='influxdb', **influxdb_conf)
db_health = tdc.MetricsDbCheck()
result = db_health.health_check()
self.assertFalse(result.healthy)
@mock.patch.object(requests, 'head')
def test_should_pass_infuxdb_available(self, req):
response_mock = mock.Mock()
response_mock.ok = True
response_mock.status_code = 204
req.return_value = response_mock
influxdb_conf = {
'ip_address': 'localhost',
'port': 8086
}
messaging_conf = {
'metrics_driver': 'influxdb.metrics_repository:MetricsRepository'
}
self.conf_override(group='repositories', **messaging_conf)
self.conf_override(group='influxdb', **influxdb_conf)
db_health = tdc.MetricsDbCheck()
result = db_health.health_check()
self.assertTrue(result.healthy)
self.assertEqual('OK', result.message)
@mock.patch('monasca_api.healthcheck.metrics_db_check.importutils.try_import')
def test_should_fail_cassandra_unavailable(self, try_import):
messaging_conf = {
'metrics_driver': 'cassandra.metrics_repository:MetricsRepository'
}
cassandra_conf = {
'cluster_ip_addresses': 'localhost',
'keyspace': 'test'
}
self.conf_override(group='repositories', **messaging_conf)
self.conf_override(group='cassandra', **cassandra_conf)
cluster = mock.Mock()
cas_mock = mock.Mock()
cas_mock.side_effect = cl.NoHostAvailable(message='Host unavailable',
errors='Unavailable')
cluster.Cluster = cas_mock
try_import.return_value = cluster
self.assertEqual(result.message, 'OK')
@mock.patch("monasca_api.healthcheck.metrics_db_check.simport")
def test_health_check_failed(self, simport_mock):
metrics_repo_mock = simport_mock.load.return_value
metrics_repo_mock.check_status.return_value = (False, 'Error')
db_health = tdc.MetricsDbCheck()
result = db_health.health_check()
self.assertFalse(result.healthy)
self.assertEqual(result.message, 'Error')
@mock.patch('monasca_api.healthcheck.metrics_db_check.importutils.try_import')
def test_should_fail_cassandra_no_driver(self, try_import):
messaging_conf = {
'metrics_driver': 'cassandra.metrics_repository:MetricsRepository'
}
cassandra_conf = {
'cluster_ip_addresses': 'localhost',
'keyspace': 'test'
}
self.conf_override(group='repositories', **messaging_conf)
self.conf_override(group='cassandra', **cassandra_conf)
# Simulate cassandra driver not available
try_import.return_value = None
db_health = tdc.MetricsDbCheck()
db_health.cluster = None
result = db_health.health_check()
self.assertFalse(result.healthy)
@mock.patch('monasca_api.healthcheck.metrics_db_check.importutils.try_import')
def test_should_pass_cassandra_is_available(self, _):
messaging_conf = {
'metrics_driver': 'cassandra.metrics_repository:MetricsRepository'
}
cassandra_conf = {
'cluster_ip_addresses': 'localhost',
'keyspace': 'test'
}
self.conf_override(group='repositories', **messaging_conf)
self.conf_override(group='cassandra', **cassandra_conf)
db_health = tdc.MetricsDbCheck()
result = db_health.health_check()
self.assertTrue(result.healthy)
@mock.patch("monasca_api.healthcheck.metrics_db_check.simport")
def test_health_check_load_failed(self, simport_mock):
simport_mock.load.side_effect = simport.ImportFailed(
"Failed to import 'foo'. Error: bar")
self.assertRaises(simport.ImportFailed, tdc.MetricsDbCheck)

View File

@ -18,6 +18,7 @@ import binascii
from collections import namedtuple
from datetime import datetime
import cassandra
from mock import patch
from oslo_config import cfg
@ -185,6 +186,26 @@ class TestRepoMetricsInfluxDB(base.BaseTestCase):
{u'dimension_name': u'service'}
])
@patch("monasca_api.common.repositories.influxdb."
"metrics_repository.requests.head")
def test_check_status(self, head_mock):
head_mock.return_value.ok = True
head_mock.return_value.status_code = 204
result = influxdb_repo.MetricsRepository.check_status()
self.assertEqual(result, (True, 'OK'))
@patch("monasca_api.common.repositories.influxdb."
"metrics_repository.requests.head")
def test_check_status_server_error(self, head_mock):
head_mock.return_value.status_code = 500
head_mock.return_value.ok = False
result = influxdb_repo.MetricsRepository.check_status()
self.assertEqual(result, (False, 'Error: 500'))
class TestRepoMetricsCassandra(base.BaseTestCase):
@ -479,6 +500,26 @@ class TestRepoMetricsCassandra(base.BaseTestCase):
]
}], result)
@patch("monasca_api.common.repositories.cassandra."
"metrics_repository.Cluster.connect")
def test_check_status(self, _):
repo = cassandra_repo.MetricsRepository()
result = repo.check_status()
self.assertEqual(result, (True, 'OK'))
@patch("monasca_api.common.repositories.cassandra."
"metrics_repository.Cluster.connect")
def test_check_status_server_error(self, cassandra_connect_mock):
repo = cassandra_repo.MetricsRepository()
cassandra_connect_mock.side_effect = \
cassandra.DriverException("Cluster is already shut down")
result = repo.check_status()
self.assertEqual(result, (False, 'Cluster is already shut down'))
@staticmethod
def _convert_time_string(date_time_string):
dt = timeutils.parse_isotime(date_time_string)