From 7e0dd37681bfca96ee995fa690fcaa6585168077 Mon Sep 17 00:00:00 2001 From: Fabio Verboso Date: Fri, 14 Sep 2018 15:53:26 +0200 Subject: [PATCH] Update RPC service Change-Id: Id6954a7a7b5e92ed4855cd9c000fb456d7b167c6 --- iotronic/common/rpc.py | 113 +++++++++++++++++++------------------ iotronic/common/service.py | 10 ++++ 2 files changed, 67 insertions(+), 56 deletions(-) diff --git a/iotronic/common/rpc.py b/iotronic/common/rpc.py index 8e834cb..71f3a5e 100644 --- a/iotronic/common/rpc.py +++ b/iotronic/common/rpc.py @@ -16,87 +16,70 @@ from oslo_config import cfg import oslo_messaging as messaging from oslo_messaging.rpc import dispatcher -from oslo_serialization import jsonutils +from osprofiler import profiler from iotronic.common import context as iotronic_context from iotronic.common import exception -__all__ = [ - 'init', - 'cleanup', - 'set_defaults', - 'add_extra_exmods', - 'clear_extra_exmods', - 'get_allowed_exmods', - 'RequestContextSerializer', - 'get_client', - 'get_server', - 'get_notifier', - 'TRANSPORT_ALIASES', -] CONF = cfg.CONF + TRANSPORT = None -NOTIFIER = None +NOTIFICATION_TRANSPORT = None +SENSORS_NOTIFIER = None +VERSIONED_NOTIFIER = None ALLOWED_EXMODS = [ exception.__name__, ] EXTRA_EXMODS = [] -# NOTE(lucasagomes): The iotronic.openstack.common.rpc entries are for -# backwards compat with IceHouse rpc_backend configuration values. -TRANSPORT_ALIASES = { - 'iotronic.openstack.common.rpc.impl_kombu': 'rabbit', - 'iotronic.openstack.common.rpc.impl_qpid': 'qpid', - 'iotronic.openstack.common.rpc.impl_zmq': 'zmq', - 'iotronic.rpc.impl_kombu': 'rabbit', - 'iotronic.rpc.impl_qpid': 'qpid', - 'iotronic.rpc.impl_zmq': 'zmq', -} - def init(conf): - global TRANSPORT, NOTIFIER + global TRANSPORT, NOTIFICATION_TRANSPORT + global SENSORS_NOTIFIER, VERSIONED_NOTIFIER exmods = get_allowed_exmods() - TRANSPORT = messaging.get_transport(conf, - allowed_remote_exmods=exmods, - aliases=TRANSPORT_ALIASES) - serializer = RequestContextSerializer(JsonPayloadSerializer()) - NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer) + TRANSPORT = messaging.get_rpc_transport(conf, + allowed_remote_exmods=exmods) + NOTIFICATION_TRANSPORT = messaging.get_notification_transport( + conf, + allowed_remote_exmods=exmods) + + serializer = RequestContextSerializer(messaging.JsonPayloadSerializer()) + SENSORS_NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT, + serializer=serializer) + if conf.notification_level is None: + VERSIONED_NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT, + serializer=serializer, + driver='noop') + else: + VERSIONED_NOTIFIER = messaging.Notifier(NOTIFICATION_TRANSPORT, + serializer=serializer, + topics=['iotronic_versioned_' + 'notifications']) def cleanup(): - global TRANSPORT, NOTIFIER + global TRANSPORT, NOTIFICATION_TRANSPORT + global SENSORS_NOTIFIER, VERSIONED_NOTIFIER assert TRANSPORT is not None - assert NOTIFIER is not None + assert NOTIFICATION_TRANSPORT is not None + assert SENSORS_NOTIFIER is not None + assert VERSIONED_NOTIFIER is not None TRANSPORT.cleanup() - TRANSPORT = NOTIFIER = None + NOTIFICATION_TRANSPORT.cleanup() + TRANSPORT = NOTIFICATION_TRANSPORT = None + SENSORS_NOTIFIER = VERSIONED_NOTIFIER = None def set_defaults(control_exchange): messaging.set_transport_defaults(control_exchange) -def add_extra_exmods(*args): - EXTRA_EXMODS.extend(args) - - -def clear_extra_exmods(): - del EXTRA_EXMODS[:] - - def get_allowed_exmods(): return ALLOWED_EXMODS + EXTRA_EXMODS -class JsonPayloadSerializer(messaging.NoOpSerializer): - - @staticmethod - def serialize_entity(context, entity): - return jsonutils.to_primitive(entity, convert_instances=True) - - class RequestContextSerializer(messaging.Serializer): def __init__(self, base): @@ -113,14 +96,26 @@ class RequestContextSerializer(messaging.Serializer): return self._base.deserialize_entity(context, entity) def serialize_context(self, context): - return context.to_dict() + _context = context.to_dict() + prof = profiler.get() + if prof: + trace_info = { + "hmac_key": prof.hmac_key, + "base_id": prof.get_base_id(), + "parent_id": prof.get_id() + } + _context.update({"trace_info": trace_info}) + return _context def deserialize_context(self, context): + trace_info = context.pop("trace_info", None) + if trace_info: + profiler.init(**trace_info) return iotronic_context.RequestContext.from_dict(context) def get_transport_url(url_str=None): - return messaging.TransportURL.parse(CONF, url_str, TRANSPORT_ALIASES) + return messaging.TransportURL.parse(CONF, url_str) def get_client(target, version_cap=None, serializer=None): @@ -134,8 +129,8 @@ def get_client(target, version_cap=None, serializer=None): def get_server(target, endpoints, serializer=None): assert TRANSPORT is not None - access_policy = dispatcher.DefaultRPCAccessPolicy serializer = RequestContextSerializer(serializer) + access_policy = dispatcher.DefaultRPCAccessPolicy return messaging.get_rpc_server(TRANSPORT, target, endpoints, @@ -144,8 +139,14 @@ def get_server(target, endpoints, serializer=None): access_policy=access_policy) -def get_notifier(service=None, host=None, publisher_id=None): - assert NOTIFIER is not None +def get_sensors_notifier(service=None, host=None, publisher_id=None): + assert SENSORS_NOTIFIER is not None if not publisher_id: publisher_id = "%s.%s" % (service, host or CONF.host) - return NOTIFIER.prepare(publisher_id=publisher_id) + return SENSORS_NOTIFIER.prepare(publisher_id=publisher_id) + + +def get_versioned_notifier(publisher_id=None): + assert VERSIONED_NOTIFIER is not None + assert publisher_id is not None + return VERSIONED_NOTIFIER.prepare(publisher_id=publisher_id) diff --git a/iotronic/common/service.py b/iotronic/common/service.py index fbe2cc7..a22a254 100644 --- a/iotronic/common/service.py +++ b/iotronic/common/service.py @@ -24,6 +24,7 @@ import oslo_messaging as messaging from oslo_utils import importutils from iotronic.common import config +from iotronic.common.i18n import _ from iotronic.common.i18n import _LE from iotronic.common.i18n import _LI from iotronic.common import rpc @@ -42,6 +43,15 @@ service_opts = [ 'However, the board name must be valid within ' 'an AMQP key, and if using ZeroMQ, a valid ' 'hostname, FQDN, or IP address.'), + cfg.StrOpt('notification_level', + choices=[('debug', _('"debug" level')), + ('info', _('"info" level')), + ('warning', _('"warning" level')), + ('error', _('"error" level')), + ('critical', _('"critical" level'))], + help=_('Specifies the minimum level for which to send ' + 'notifications. If not set, no notifications will ' + 'be sent. The default is for this option to be unset.')), ] cfg.CONF.register_opts(service_opts)