implement sending health notifications

this change implements sending notifications about
health check creation and updates. code was refactored
to avoid duplicated code.

Partially-implements blueprint: cluster-verification

Change-Id: Iac74a7bdec0f59a3720e17a682268faea36a45f3
This commit is contained in:
Vitaly Gridnev 2016-02-17 14:08:45 +03:00
parent 96722ea467
commit 99765e3295
5 changed files with 53 additions and 40 deletions

View File

@ -127,8 +127,8 @@ def _cluster_create(values, plugin):
ctx = context.ctx()
cluster = conductor.cluster_create(ctx, values)
context.set_current_cluster_id(cluster.id)
sender.notify(ctx, cluster.id, cluster.name, "New",
"create")
sender.status_notify(cluster.id, cluster.name, "New",
"create")
_add_ports_for_auto_sg(ctx, cluster, plugin)
# validating cluster
@ -167,8 +167,8 @@ def terminate_cluster(id):
return
OPS.terminate_cluster(id)
sender.notify(context.ctx(), cluster.id, cluster.name, cluster.status,
"delete")
sender.status_notify(cluster.id, cluster.name, cluster.status,
"delete")
def update_cluster(id, values):

View File

@ -26,6 +26,7 @@ from sahara.i18n import _LE
from sahara.plugins import base as plugin_base
from sahara.service.health import common
from sahara.utils import cluster as cluster_utils
from sahara.utils.notification import sender
cond = conductor.API
LOG = logging.getLogger(__name__)
@ -91,11 +92,17 @@ class BasicHealthCheck(object):
self.health_check_id = cond.cluster_health_check_add(
context.ctx(), vid, {'status': common.HEALTH_STATUS_CHECKING,
'name': self.get_health_check_name()}).id
self.health_check = cond.cluster_health_check_get(
context.ctx(), self.health_check_id)
sender.health_notify(self.cluster, self.health_check)
def _write_result(self, status, description):
cond.cluster_health_check_update(
context.ctx(), self.health_check_id,
{'status': status, 'description': description})
self.health_check = cond.cluster_health_check_get(
context.ctx(), self.health_check_id)
sender.health_notify(self.cluster, self.health_check)
def execute(self):
if not self.is_available():

View File

@ -34,8 +34,8 @@ class NotificationTest(base.SaharaTestCase):
self.cluster_name = 'someName'
self.cluster_status = 'someStatus'
sender.notify(ctx, self.cluster_id, self.cluster_name,
self.cluster_status, "update")
sender.status_notify(self.cluster_id, self.cluster_name,
self.cluster_status, "update")
self.create_mock('update')

View File

@ -96,8 +96,8 @@ def change_cluster_status(cluster, status, status_description=None):
LOG.info(_LI("Cluster status has been changed. New status="
"{status}").format(status=cluster.status))
sender.notify(ctx, cluster.id, cluster.name, cluster.status,
"update")
sender.status_notify(cluster.id, cluster.name, cluster.status,
"update")
return cluster

View File

@ -16,12 +16,14 @@
from oslo_config import cfg
from oslo_log import log as logging
from sahara import context
from sahara.utils import rpc as messaging
LOG = logging.getLogger(__name__)
SERVICE = 'sahara'
EVENT_TEMPLATE = "sahara.cluster.%s"
CLUSTER_EVENT_TEMPLATE = "sahara.cluster.%s"
HEALTH_EVENT_TYPE = CLUSTER_EVENT_TEMPLATE % "health"
notifier_opts = [
cfg.StrOpt('level',
@ -53,40 +55,44 @@ def _get_publisher():
return publisher_id
def _notify(context, event_type, level, body):
client = messaging.get_notifier(_get_publisher())
method = getattr(client, level.lower())
method(context, event_type, body)
def _body(
cluster_id,
cluster_name,
cluster_status,
tenant_id,
user_id):
result = {
'cluster_id': cluster_id,
'cluster_name': cluster_name,
'cluster_status': cluster_status,
'project_id': tenant_id,
'user_id': user_id,
}
return result
def notify(context, cluster_id, cluster_name, cluster_status, ev_type):
"""Sends notification about creating/updating/deleting cluster."""
def _notify(event_type, body):
if not cfg.CONF.oslo_messaging_notifications.enable:
return
LOG.debug("Notification about cluster is going to be sent. Notification "
"type={type}, cluster status = {status}"
.format(type=ev_type, status=cluster_status))
"type={type}".format(type=event_type))
ctx = context.ctx()
level = CONF.oslo_messaging_notifications.level
_notify(context, EVENT_TEMPLATE % ev_type, level,
_body(cluster_id, cluster_name, cluster_status, context.tenant_id,
context.user_id))
body.update({'project_id': ctx.tenant_id, 'user_id': ctx.user_id})
client = messaging.get_notifier(_get_publisher())
method = getattr(client, level.lower())
method(ctx, event_type, body)
def _health_notification_body(cluster, health_check):
verification = cluster.verification
return {
'cluster_id': cluster.id,
'cluster_name': cluster.name,
'verification_id': verification['id'],
'health_check_status': health_check['status'],
'health_check_name': health_check['name'],
'health_check_description': health_check['description'],
'created_at': health_check['created_at'],
'updated_at': health_check['updated_at']
}
def status_notify(cluster_id, cluster_name, cluster_status, ev_type):
"""Sends notification about creating/updating/deleting cluster."""
_notify(CLUSTER_EVENT_TEMPLATE % ev_type, {
'cluster_id': cluster_id, 'cluster_name': cluster_name,
'cluster_status': cluster_status})
def health_notify(cluster, health_check):
"""Sends notification about current cluster health."""
_notify(HEALTH_EVENT_TYPE,
_health_notification_body(cluster, health_check))