diff --git a/monasca_common/healthcheck/__init__.py b/monasca_common/healthcheck/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/monasca_common/healthcheck/checks/__init__.py b/monasca_common/healthcheck/checks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/monasca_common/healthcheck/checks/kafka.py b/monasca_common/healthcheck/checks/kafka.py new file mode 100644 index 00000000..da1e10b9 --- /dev/null +++ b/monasca_common/healthcheck/checks/kafka.py @@ -0,0 +1,103 @@ +# Copyright 2016 FUJITSU LIMITED +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from kafka import client +from oslo_config import cfg +from oslo_log import log + +from monasca_common.healthcheck import result + +LOG = log.getLogger(__name__) +CONF = cfg.CONF + +kafka_check_opts = [ + cfg.StrOpt('kafka_url', + required=True, + help='Url to kafka server'), + cfg.ListOpt('kafka_topics', + required=True, + default=['logs'], + help='Verify existence of configured topics') +] +kafka_check_group = cfg.OptGroup(name='kafka_healthcheck', + title='kafka_healthcheck') + +cfg.CONF.register_group(kafka_check_group) +cfg.CONF.register_opts(kafka_check_opts, kafka_check_group) + + +class KafkaHealthCheck(object): + """Evaluates kafka health + + Healthcheck verifies if: + + * kafka server is up and running + * there is a configured topic in kafka + + If following conditions are met healthcheck returns healthy status. + Otherwise unhealthy status is returned with explanation. + + Example of middleware configuration: + + .. code-block:: ini + + [kafka_healthcheck] + kafka_url = localhost:8900 + kafka_topics = log + + Note: + It is possible to specify multiple topics if necessary. + Just separate them with , + + """ + + def healthcheck(self): + url = CONF.kafka_healthcheck.kafka_url + + try: + kafka_client = client.KafkaClient(hosts=url) + except client.KafkaUnavailableError as ex: + LOG.error(repr(ex)) + error_str = 'Could not connect to kafka at %s' % url + return result.HealthCheckResult(healthy=False, message=error_str) + + self._disconnect_gracefully(kafka_client) + + return self._verify_topics(kafka_client) + + # noinspection PyMethodMayBeStatic + def _verify_topics(self, kafka_client): + topics = CONF.kafka_healthcheck.kafka_topics + + for t in topics: + # kafka client loads metadata for topics as fast + # as possible (happens in __init__), therefore this + # topic_partitions is sure to be filled + for_topic = t in kafka_client.topic_partitions + if not for_topic: + error_str = 'Kafka: Topic %s not found' % t + LOG.error(error_str) + return result.HealthCheckResult(healthy=False, message=error_str) + + return result.HealthCheckResult(healthy=True, message='OK') + + # noinspection PyMethodMayBeStatic + def _disconnect_gracefully(self, kafka_client): + # at this point, client is connected so it must be closed + # regardless of topic existence + try: + kafka_client.close() + except Exception as ex: + # log that something went wrong and move on + LOG.error(repr(ex)) diff --git a/monasca_common/healthcheck/result.py b/monasca_common/healthcheck/result.py new file mode 100644 index 00000000..91f20100 --- /dev/null +++ b/monasca_common/healthcheck/result.py @@ -0,0 +1,19 @@ +# Copyright 2016 FUJITSU LIMITED +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections + +HealthCheckResult = collections.namedtuple('CheckResult', + ['healthy', 'message']) +"""Result from the healthcheck, contains healthy(boolean) and message"""