From 9c33366fbd6535e1e23435463c9675d32c840bb8 Mon Sep 17 00:00:00 2001 From: Doug Hellmann Date: Thu, 14 Mar 2013 08:03:40 -0700 Subject: [PATCH] Update openstack.common The main motivation for this update is to fix the way logging is configured so that all libraries are able to log at the default level. The other changes come in because there's no clean way to update the common libs piecemeal. bug 1152584 Change-Id: I22563f7c0aa7c48edc81cb85328c3ae3ec8c2f11 Signed-off-by: Doug Hellmann --- bin/ceilometer-rpc-zmq-receiver | 4 +- ceilometer/openstack/common/context.py | 8 +- .../openstack/common/eventlet_backdoor.py | 2 +- ceilometer/openstack/common/excutils.py | 2 +- ceilometer/openstack/common/importutils.py | 2 +- ceilometer/openstack/common/jsonutils.py | 6 - ceilometer/openstack/common/local.py | 2 +- ceilometer/openstack/common/log.py | 11 +- ceilometer/openstack/common/network_utils.py | 2 +- .../openstack/common/notifier/__init__.py | 2 +- ceilometer/openstack/common/notifier/api.py | 2 +- .../openstack/common/notifier/log_notifier.py | 2 +- .../common/notifier/no_op_notifier.py | 2 +- .../openstack/common/notifier/rpc_notifier.py | 2 +- .../common/notifier/rpc_notifier2.py | 2 +- .../common/notifier/test_notifier.py | 2 +- ceilometer/openstack/common/policy.py | 2 +- ceilometer/openstack/common/rpc/amqp.py | 59 +++++- ceilometer/openstack/common/rpc/common.py | 4 + ceilometer/openstack/common/rpc/impl_fake.py | 2 +- ceilometer/openstack/common/rpc/impl_kombu.py | 6 +- ceilometer/openstack/common/rpc/impl_qpid.py | 6 +- ceilometer/openstack/common/rpc/impl_zmq.py | 44 +++-- ceilometer/openstack/common/rpc/matchmaker.py | 169 +++++++++++++++++- .../openstack/common/rpc/matchmaker_redis.py | 149 +++++++++++++++ ceilometer/openstack/common/setup.py | 54 ++++-- ceilometer/openstack/common/timeutils.py | 18 +- ceilometer/openstack/common/version.py | 2 +- 28 files changed, 488 insertions(+), 80 deletions(-) create mode 100644 ceilometer/openstack/common/rpc/matchmaker_redis.py diff --git a/bin/ceilometer-rpc-zmq-receiver b/bin/ceilometer-rpc-zmq-receiver index 8ac28b6ef7..f6e878eff1 100755 --- a/bin/ceilometer-rpc-zmq-receiver +++ b/bin/ceilometer-rpc-zmq-receiver @@ -1,7 +1,7 @@ #!/usr/bin/env python # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC +# Copyright 2011 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -39,10 +39,10 @@ from ceilometer.openstack.common.rpc import impl_zmq CONF = cfg.CONF CONF.register_opts(rpc.rpc_opts) CONF.register_opts(impl_zmq.zmq_opts) -CONF(sys.argv[1:], project='ceilometer') def main(): + CONF(sys.argv[1:], project='ceilometer') logging.setup("ceilometer") with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor: diff --git a/ceilometer/openstack/common/context.py b/ceilometer/openstack/common/context.py index dd7dd04c38..e9cfd73cc1 100644 --- a/ceilometer/openstack/common/context.py +++ b/ceilometer/openstack/common/context.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -37,9 +37,9 @@ class RequestContext(object): accesses the system, as well as additional request information. """ - def __init__(self, auth_tok=None, user=None, tenant=None, is_admin=False, + def __init__(self, auth_token=None, user=None, tenant=None, is_admin=False, read_only=False, show_deleted=False, request_id=None): - self.auth_tok = auth_tok + self.auth_token = auth_token self.user = user self.tenant = tenant self.is_admin = is_admin @@ -55,7 +55,7 @@ class RequestContext(object): 'is_admin': self.is_admin, 'read_only': self.read_only, 'show_deleted': self.show_deleted, - 'auth_token': self.auth_tok, + 'auth_token': self.auth_token, 'request_id': self.request_id} diff --git a/ceilometer/openstack/common/eventlet_backdoor.py b/ceilometer/openstack/common/eventlet_backdoor.py index 8b81ebf8eb..c0ad460fe6 100644 --- a/ceilometer/openstack/common/eventlet_backdoor.py +++ b/ceilometer/openstack/common/eventlet_backdoor.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright (c) 2012 Openstack, LLC. +# Copyright (c) 2012 OpenStack Foundation. # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. # diff --git a/ceilometer/openstack/common/excutils.py b/ceilometer/openstack/common/excutils.py index 414cc27e95..fce39595cf 100644 --- a/ceilometer/openstack/common/excutils.py +++ b/ceilometer/openstack/common/excutils.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # Copyright 2012, Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/importutils.py b/ceilometer/openstack/common/importutils.py index 9dec764fb4..3bd277f47e 100644 --- a/ceilometer/openstack/common/importutils.py +++ b/ceilometer/openstack/common/importutils.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/jsonutils.py b/ceilometer/openstack/common/jsonutils.py index c34a8c327c..ab3620c3b8 100644 --- a/ceilometer/openstack/common/jsonutils.py +++ b/ceilometer/openstack/common/jsonutils.py @@ -38,14 +38,10 @@ import functools import inspect import itertools import json -import logging import xmlrpclib -from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import timeutils -LOG = logging.getLogger(__name__) - def to_primitive(value, convert_instances=False, convert_datetime=True, level=0, max_depth=3): @@ -85,8 +81,6 @@ def to_primitive(value, convert_instances=False, convert_datetime=True, return 'mock' if level > max_depth: - LOG.error(_('Max serialization depth exceeded on object: %d %s'), - level, value) return '?' # The try block may not be necessary after the class check above, diff --git a/ceilometer/openstack/common/local.py b/ceilometer/openstack/common/local.py index 8bdc837a91..f1bfc824bf 100644 --- a/ceilometer/openstack/common/local.py +++ b/ceilometer/openstack/common/local.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/log.py b/ceilometer/openstack/common/log.py index 01d2a68fde..2c482f3431 100644 --- a/ceilometer/openstack/common/log.py +++ b/ceilometer/openstack/common/log.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. @@ -328,7 +328,7 @@ def setup(product_name): if CONF.log_config: logging.config.fileConfig(CONF.log_config) else: - _setup_logging_from_conf(product_name) + _setup_logging_from_conf() sys.excepthook = _create_logging_excepthook(product_name) @@ -362,8 +362,8 @@ def _find_facility_from_conf(): return facility -def _setup_logging_from_conf(product_name): - log_root = getLogger(product_name).logger +def _setup_logging_from_conf(): + log_root = getLogger(None).logger for handler in log_root.handlers: log_root.removeHandler(handler) @@ -401,7 +401,8 @@ def _setup_logging_from_conf(product_name): if CONF.log_format: handler.setFormatter(logging.Formatter(fmt=CONF.log_format, datefmt=datefmt)) - handler.setFormatter(LegacyFormatter(datefmt=datefmt)) + else: + handler.setFormatter(LegacyFormatter(datefmt=datefmt)) if CONF.debug: log_root.setLevel(logging.DEBUG) diff --git a/ceilometer/openstack/common/network_utils.py b/ceilometer/openstack/common/network_utils.py index 69f6732163..5224e01aa9 100644 --- a/ceilometer/openstack/common/network_utils.py +++ b/ceilometer/openstack/common/network_utils.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2012 OpenStack LLC. +# Copyright 2012 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/notifier/__init__.py b/ceilometer/openstack/common/notifier/__init__.py index 482d54e4fd..45c3b46ae9 100644 --- a/ceilometer/openstack/common/notifier/__init__.py +++ b/ceilometer/openstack/common/notifier/__init__.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/notifier/api.py b/ceilometer/openstack/common/notifier/api.py index 1361faa5aa..c1c897dd24 100644 --- a/ceilometer/openstack/common/notifier/api.py +++ b/ceilometer/openstack/common/notifier/api.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/notifier/log_notifier.py b/ceilometer/openstack/common/notifier/log_notifier.py index 496fa6d271..fc63e9de84 100644 --- a/ceilometer/openstack/common/notifier/log_notifier.py +++ b/ceilometer/openstack/common/notifier/log_notifier.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/notifier/no_op_notifier.py b/ceilometer/openstack/common/notifier/no_op_notifier.py index ee1ddbdcac..bc7a56ca7a 100644 --- a/ceilometer/openstack/common/notifier/no_op_notifier.py +++ b/ceilometer/openstack/common/notifier/no_op_notifier.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/notifier/rpc_notifier.py b/ceilometer/openstack/common/notifier/rpc_notifier.py index 55111007b6..ee6c03615b 100644 --- a/ceilometer/openstack/common/notifier/rpc_notifier.py +++ b/ceilometer/openstack/common/notifier/rpc_notifier.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/notifier/rpc_notifier2.py b/ceilometer/openstack/common/notifier/rpc_notifier2.py index 5de2e8a292..f8a299c2c1 100644 --- a/ceilometer/openstack/common/notifier/rpc_notifier2.py +++ b/ceilometer/openstack/common/notifier/rpc_notifier2.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/notifier/test_notifier.py b/ceilometer/openstack/common/notifier/test_notifier.py index 5e348803dc..96c1746bf4 100644 --- a/ceilometer/openstack/common/notifier/test_notifier.py +++ b/ceilometer/openstack/common/notifier/test_notifier.py @@ -1,4 +1,4 @@ -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/policy.py b/ceilometer/openstack/common/policy.py index 3c482548ae..155de8b040 100644 --- a/ceilometer/openstack/common/policy.py +++ b/ceilometer/openstack/common/policy.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright (c) 2012 OpenStack, LLC. +# Copyright (c) 2012 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may diff --git a/ceilometer/openstack/common/rpc/amqp.py b/ceilometer/openstack/common/rpc/amqp.py index 3574891b0b..d8e6ba06f9 100644 --- a/ceilometer/openstack/common/rpc/amqp.py +++ b/ceilometer/openstack/common/rpc/amqp.py @@ -25,25 +25,27 @@ Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses AMQP, but is deprecated and predates this code. """ +import collections import inspect import sys import uuid from eventlet import greenpool from eventlet import pools -from eventlet import semaphore from eventlet import queue - +from eventlet import semaphore # TODO(pekowsk): Remove import cfg and below comment in Havana. # This import should no longer be needed when the amqp_rpc_single_reply_queue # option is removed. from oslo.config import cfg + from ceilometer.openstack.common import excutils from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import local from ceilometer.openstack.common import log as logging from ceilometer.openstack.common.rpc import common as rpc_common + # TODO(pekowski): Remove this option in Havana. amqp_opts = [ cfg.BoolOpt('amqp_rpc_single_reply_queue', @@ -54,6 +56,7 @@ amqp_opts = [ cfg.CONF.register_opts(amqp_opts) +UNIQUE_ID = '_unique_id' LOG = logging.getLogger(__name__) @@ -236,6 +239,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None, 'failure': failure} if ending: msg['ending'] = True + _add_unique_id(msg) # If a reply_q exists, add the msg_id to the reply and pass the # reply_q to direct_send() to use it as the response queue. # Otherwise use the msg_id for backward compatibilty. @@ -302,6 +306,37 @@ def pack_context(msg, context): msg.update(context_d) +class _MsgIdCache(object): + """This class checks any duplicate messages.""" + + # NOTE: This value is considered can be a configuration item, but + # it is not necessary to change its value in most cases, + # so let this value as static for now. + DUP_MSG_CHECK_SIZE = 16 + + def __init__(self, **kwargs): + self.prev_msgids = collections.deque([], + maxlen=self.DUP_MSG_CHECK_SIZE) + + def check_duplicate_message(self, message_data): + """AMQP consumers may read same message twice when exceptions occur + before ack is returned. This method prevents doing it. + """ + if UNIQUE_ID in message_data: + msg_id = message_data[UNIQUE_ID] + if msg_id not in self.prev_msgids: + self.prev_msgids.append(msg_id) + else: + raise rpc_common.DuplicateMessageError(msg_id=msg_id) + + +def _add_unique_id(msg): + """Add unique_id for checking duplicate messages.""" + unique_id = uuid.uuid4().hex + msg.update({UNIQUE_ID: unique_id}) + LOG.debug(_('UNIQUE_ID is %s.') % (unique_id)) + + class _ThreadPoolWithWait(object): """Base class for a delayed invocation manager used by the Connection class to start up green threads @@ -349,6 +384,7 @@ class ProxyCallback(_ThreadPoolWithWait): connection_pool=connection_pool, ) self.proxy = proxy + self.msg_id_cache = _MsgIdCache() def __call__(self, message_data): """Consumer callback to call a method on a proxy object. @@ -368,6 +404,7 @@ class ProxyCallback(_ThreadPoolWithWait): if hasattr(local.store, 'context'): del local.store.context rpc_common._safe_log(LOG.debug, _('received %s'), message_data) + self.msg_id_cache.check_duplicate_message(message_data) ctxt = unpack_context(self.conf, message_data) method = message_data.get('method') args = message_data.get('args', {}) @@ -406,9 +443,11 @@ class ProxyCallback(_ThreadPoolWithWait): connection_pool=self.connection_pool, log_failure=False) except Exception: - LOG.exception(_('Exception during message handling')) - ctxt.reply(None, sys.exc_info(), - connection_pool=self.connection_pool) + # sys.exc_info() is deleted by LOG.exception(). + exc_info = sys.exc_info() + LOG.error(_('Exception during message handling'), + exc_info=exc_info) + ctxt.reply(None, exc_info, connection_pool=self.connection_pool) class MulticallProxyWaiter(object): @@ -422,6 +461,7 @@ class MulticallProxyWaiter(object): self._dataqueue = queue.LightQueue() # Add this caller to the reply proxy's call_waiters self._reply_proxy.add_call_waiter(self, self._msg_id) + self.msg_id_cache = _MsgIdCache() def put(self, data): self._dataqueue.put(data) @@ -435,6 +475,7 @@ class MulticallProxyWaiter(object): def _process_data(self, data): result = None + self.msg_id_cache.check_duplicate_message(data) if data['failure']: failure = data['failure'] result = rpc_common.deserialize_remote_exception(self._conf, @@ -479,6 +520,7 @@ class MulticallWaiter(object): self._done = False self._got_ending = False self._conf = conf + self.msg_id_cache = _MsgIdCache() def done(self): if self._done: @@ -490,6 +532,7 @@ class MulticallWaiter(object): def __call__(self, data): """The consume() callback will call this. Store the result.""" + self.msg_id_cache.check_duplicate_message(data) if data['failure']: failure = data['failure'] self._result = rpc_common.deserialize_remote_exception(self._conf, @@ -542,6 +585,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) LOG.debug(_('MSG_ID is %s') % (msg_id)) + _add_unique_id(msg) pack_context(msg, context) # TODO(pekowski): Remove this flag and the code under the if clause @@ -575,6 +619,7 @@ def call(conf, context, topic, msg, timeout, connection_pool): def cast(conf, context, topic, msg, connection_pool): """Sends a message on a topic without waiting for a response.""" LOG.debug(_('Making asynchronous cast on %s...'), topic) + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: conn.topic_send(topic, rpc_common.serialize_msg(msg)) @@ -583,6 +628,7 @@ def cast(conf, context, topic, msg, connection_pool): def fanout_cast(conf, context, topic, msg, connection_pool): """Sends a message on a fanout exchange without waiting for a response.""" LOG.debug(_('Making asynchronous fanout cast...')) + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: conn.fanout_send(topic, rpc_common.serialize_msg(msg)) @@ -590,6 +636,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool): def cast_to_server(conf, context, server_params, topic, msg, connection_pool): """Sends a message on a topic to a specific server.""" + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: @@ -599,6 +646,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool): def fanout_cast_to_server(conf, context, server_params, topic, msg, connection_pool): """Sends a message on a fanout exchange to a specific server.""" + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: @@ -610,6 +658,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope): LOG.debug(_('Sending %(event_type)s on %(topic)s'), dict(event_type=msg.get('event_type'), topic=topic)) + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: if envelope: diff --git a/ceilometer/openstack/common/rpc/common.py b/ceilometer/openstack/common/rpc/common.py index 88ebe42d9e..b0f5c694a0 100644 --- a/ceilometer/openstack/common/rpc/common.py +++ b/ceilometer/openstack/common/rpc/common.py @@ -125,6 +125,10 @@ class Timeout(RPCException): message = _("Timeout while waiting on RPC response.") +class DuplicateMessageError(RPCException): + message = _("Found duplicate message(%(msg_id)s). Skipping it.") + + class InvalidRPCConnectionReuse(RPCException): message = _("Invalid reuse of an RPC connection.") diff --git a/ceilometer/openstack/common/rpc/impl_fake.py b/ceilometer/openstack/common/rpc/impl_fake.py index 72041bfe65..7c3e69caae 100644 --- a/ceilometer/openstack/common/rpc/impl_fake.py +++ b/ceilometer/openstack/common/rpc/impl_fake.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC +# Copyright 2011 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain diff --git a/ceilometer/openstack/common/rpc/impl_kombu.py b/ceilometer/openstack/common/rpc/impl_kombu.py index d3c86a62e0..9e2a34bceb 100644 --- a/ceilometer/openstack/common/rpc/impl_kombu.py +++ b/ceilometer/openstack/common/rpc/impl_kombu.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC +# Copyright 2011 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -624,8 +624,8 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, socket.timeout): - LOG.exception(_('Timed out waiting for RPC response: %s') % - str(exc)) + LOG.debug(_('Timed out waiting for RPC response: %s') % + str(exc)) raise rpc_common.Timeout() else: LOG.exception(_('Failed to consume message from queue: %s') % diff --git a/ceilometer/openstack/common/rpc/impl_qpid.py b/ceilometer/openstack/common/rpc/impl_qpid.py index 6bdec04ef6..cd28cc34e5 100644 --- a/ceilometer/openstack/common/rpc/impl_qpid.py +++ b/ceilometer/openstack/common/rpc/impl_qpid.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC +# Copyright 2011 OpenStack Foundation # Copyright 2011 - 2012, Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -415,8 +415,8 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, qpid_exceptions.Empty): - LOG.exception(_('Timed out waiting for RPC response: %s') % - str(exc)) + LOG.debug(_('Timed out waiting for RPC response: %s') % + str(exc)) raise rpc_common.Timeout() else: LOG.exception(_('Failed to consume message from queue: %s') % diff --git a/ceilometer/openstack/common/rpc/impl_zmq.py b/ceilometer/openstack/common/rpc/impl_zmq.py index 4a3f5ff348..1ee5554abe 100644 --- a/ceilometer/openstack/common/rpc/impl_zmq.py +++ b/ceilometer/openstack/common/rpc/impl_zmq.py @@ -16,6 +16,7 @@ import os import pprint +import re import socket import sys import types @@ -25,6 +26,7 @@ import eventlet import greenlet from oslo.config import cfg +from ceilometer.openstack.common import excutils from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import importutils from ceilometer.openstack.common import jsonutils @@ -91,8 +93,8 @@ def _serialize(data): try: return jsonutils.dumps(data, ensure_ascii=True) except TypeError: - LOG.error(_("JSON serialization failed.")) - raise + with excutils.save_and_reraise_exception(): + LOG.error(_("JSON serialization failed.")) def _deserialize(data): @@ -430,6 +432,8 @@ class ZmqProxy(ZmqBaseReactor): def __init__(self, conf): super(ZmqProxy, self).__init__(conf) + pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\')) + self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep))) self.topic_proxy = {} @@ -455,6 +459,13 @@ class ZmqProxy(ZmqBaseReactor): LOG.info(_("Creating proxy for topic: %s"), topic) try: + # The topic is received over the network, + # don't trust this input. + if self.badchars.search(topic) is not None: + emsg = _("Topic contained dangerous characters.") + LOG.warn(emsg) + raise RPCException(emsg) + out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), sock_type, bind=True) @@ -511,9 +522,9 @@ class ZmqProxy(ZmqBaseReactor): ipc_dir, run_as_root=True) utils.execute('chmod', '750', ipc_dir, run_as_root=True) except utils.ProcessExecutionError: - LOG.error(_("Could not create IPC directory %s") % - (ipc_dir, )) - raise + with excutils.save_and_reraise_exception(): + LOG.error(_("Could not create IPC directory %s") % + (ipc_dir, )) try: self.register(consumption_proxy, @@ -521,9 +532,9 @@ class ZmqProxy(ZmqBaseReactor): zmq.PULL, out_bind=True) except zmq.ZMQError: - LOG.error(_("Could not create ZeroMQ receiver daemon. " - "Socket may already be in use.")) - raise + with excutils.save_and_reraise_exception(): + LOG.error(_("Could not create ZeroMQ receiver daemon. " + "Socket may already be in use.")) super(ZmqProxy, self).consume_in_thread() @@ -594,6 +605,9 @@ class Connection(rpc_common.Connection): self.reactor = ZmqReactor(conf) def create_consumer(self, topic, proxy, fanout=False): + # Register with matchmaker. + _get_matchmaker().register(topic, CONF.rpc_zmq_host) + # Subscription scenarios if fanout: sock_type = zmq.SUB @@ -620,6 +634,10 @@ class Connection(rpc_common.Connection): self.topics.append(topic) def close(self): + _get_matchmaker().stop_heartbeat() + for topic in self.topics: + _get_matchmaker().unregister(topic, CONF.rpc_zmq_host) + self.reactor.close() self.topics = [] @@ -627,6 +645,7 @@ class Connection(rpc_common.Connection): self.reactor.wait() def consume_in_thread(self): + _get_matchmaker().start_heartbeat() self.reactor.consume_in_thread() @@ -742,7 +761,7 @@ def _multi_send(method, context, topic, msg, timeout=None, LOG.warn(_("No matchmaker results. Not casting.")) # While not strictly a timeout, callers know how to handle # this exception and a timeout isn't too big a lie. - raise rpc_common.Timeout, "No match from matchmaker." + raise rpc_common.Timeout(_("No match from matchmaker.")) # This supports brokerless fanout (addresses > 1) for queue in queues: @@ -785,7 +804,7 @@ def fanout_cast(conf, context, topic, msg, **kwargs): _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs) -def notify(conf, context, topic, msg, **kwargs): +def notify(conf, context, topic, msg, envelope): """ Send notification event. Notifications are sent to topic-priority. @@ -793,9 +812,8 @@ def notify(conf, context, topic, msg, **kwargs): """ # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. - topic.replace('.', '-') - kwargs['envelope'] = kwargs.get('envelope', True) - cast(conf, context, topic, msg, **kwargs) + topic = topic.replace('.', '-') + cast(conf, context, topic, msg, envelope=envelope) def cleanup(): diff --git a/ceilometer/openstack/common/rpc/matchmaker.py b/ceilometer/openstack/common/rpc/matchmaker.py index ed5cebacbb..7ae6b7e8c4 100644 --- a/ceilometer/openstack/common/rpc/matchmaker.py +++ b/ceilometer/openstack/common/rpc/matchmaker.py @@ -22,6 +22,7 @@ import contextlib import itertools import json +import eventlet from oslo.config import cfg from ceilometer.openstack.common.gettextutils import _ @@ -33,6 +34,12 @@ matchmaker_opts = [ cfg.StrOpt('matchmaker_ringfile', default='/etc/nova/matchmaker_ring.json', help='Matchmaker ring file (JSON)'), + cfg.IntOpt('matchmaker_heartbeat_freq', + default='300', + help='Heartbeat frequency'), + cfg.IntOpt('matchmaker_heartbeat_ttl', + default='600', + help='Heartbeat time-to-live.'), ] CONF = cfg.CONF @@ -70,12 +77,73 @@ class Binding(object): class MatchMakerBase(object): - """Match Maker Base Class.""" - + """ + Match Maker Base Class. + Build off HeartbeatMatchMakerBase if building a + heartbeat-capable MatchMaker. + """ def __init__(self): # Array of tuples. Index [2] toggles negation, [3] is last-if-true self.bindings = [] + self.no_heartbeat_msg = _('Matchmaker does not implement ' + 'registration or heartbeat.') + + def register(self, key, host): + """ + Register a host on a backend. + Heartbeats, if applicable, may keepalive registration. + """ + pass + + def ack_alive(self, key, host): + """ + Acknowledge that a key.host is alive. + Used internally for updating heartbeats, + but may also be used publically to acknowledge + a system is alive (i.e. rpc message successfully + sent to host) + """ + pass + + def is_alive(self, topic, host): + """ + Checks if a host is alive. + """ + pass + + def expire(self, topic, host): + """ + Explicitly expire a host's registration. + """ + pass + + def send_heartbeats(self): + """ + Send all heartbeats. + Use start_heartbeat to spawn a heartbeat greenthread, + which loops this method. + """ + pass + + def unregister(self, key, host): + """ + Unregister a topic. + """ + pass + + def start_heartbeat(self): + """ + Spawn heartbeat greenthread. + """ + pass + + def stop_heartbeat(self): + """ + Destroys the heartbeat greenthread. + """ + pass + def add_binding(self, binding, rule, last=True): self.bindings.append((binding, rule, False, last)) @@ -99,6 +167,103 @@ class MatchMakerBase(object): return workers +class HeartbeatMatchMakerBase(MatchMakerBase): + """ + Base for a heart-beat capable MatchMaker. + Provides common methods for registering, + unregistering, and maintaining heartbeats. + """ + def __init__(self): + self.hosts = set() + self._heart = None + self.host_topic = {} + + super(HeartbeatMatchMakerBase, self).__init__() + + def send_heartbeats(self): + """ + Send all heartbeats. + Use start_heartbeat to spawn a heartbeat greenthread, + which loops this method. + """ + for key, host in self.host_topic: + self.ack_alive(key, host) + + def ack_alive(self, key, host): + """ + Acknowledge that a host.topic is alive. + Used internally for updating heartbeats, + but may also be used publically to acknowledge + a system is alive (i.e. rpc message successfully + sent to host) + """ + raise NotImplementedError("Must implement ack_alive") + + def backend_register(self, key, host): + """ + Implements registration logic. + Called by register(self,key,host) + """ + raise NotImplementedError("Must implement backend_register") + + def backend_unregister(self, key, key_host): + """ + Implements de-registration logic. + Called by unregister(self,key,host) + """ + raise NotImplementedError("Must implement backend_unregister") + + def register(self, key, host): + """ + Register a host on a backend. + Heartbeats, if applicable, may keepalive registration. + """ + self.hosts.add(host) + self.host_topic[(key, host)] = host + key_host = '.'.join((key, host)) + + self.backend_register(key, key_host) + + self.ack_alive(key, host) + + def unregister(self, key, host): + """ + Unregister a topic. + """ + if (key, host) in self.host_topic: + del self.host_topic[(key, host)] + + self.hosts.discard(host) + self.backend_unregister(key, '.'.join((key, host))) + + LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host))) + + def start_heartbeat(self): + """ + Implementation of MatchMakerBase.start_heartbeat + Launches greenthread looping send_heartbeats(), + yielding for CONF.matchmaker_heartbeat_freq seconds + between iterations. + """ + if len(self.hosts) == 0: + raise MatchMakerException( + _("Register before starting heartbeat.")) + + def do_heartbeat(): + while True: + self.send_heartbeats() + eventlet.sleep(CONF.matchmaker_heartbeat_freq) + + self._heart = eventlet.spawn(do_heartbeat) + + def stop_heartbeat(self): + """ + Destroys the heartbeat greenthread. + """ + if self._heart: + self._heart.kill() + + class DirectBinding(Binding): """ Specifies a host in the key via a '.' character diff --git a/ceilometer/openstack/common/rpc/matchmaker_redis.py b/ceilometer/openstack/common/rpc/matchmaker_redis.py new file mode 100644 index 0000000000..325a6512da --- /dev/null +++ b/ceilometer/openstack/common/rpc/matchmaker_redis.py @@ -0,0 +1,149 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Cloudscaling Group, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +The MatchMaker classes should accept a Topic or Fanout exchange key and +return keys for direct exchanges, per (approximate) AMQP parlance. +""" + +from oslo.config import cfg + +from ceilometer.openstack.common import importutils +from ceilometer.openstack.common import log as logging +from ceilometer.openstack.common.rpc import matchmaker as mm_common + +redis = importutils.try_import('redis') + + +matchmaker_redis_opts = [ + cfg.StrOpt('host', + default='127.0.0.1', + help='Host to locate redis'), + cfg.IntOpt('port', + default=6379, + help='Use this port to connect to redis host.'), + cfg.StrOpt('password', + default=None, + help='Password for Redis server. (optional)'), +] + +CONF = cfg.CONF +opt_group = cfg.OptGroup(name='matchmaker_redis', + title='Options for Redis-based MatchMaker') +CONF.register_group(opt_group) +CONF.register_opts(matchmaker_redis_opts, opt_group) +LOG = logging.getLogger(__name__) + + +class RedisExchange(mm_common.Exchange): + def __init__(self, matchmaker): + self.matchmaker = matchmaker + self.redis = matchmaker.redis + super(RedisExchange, self).__init__() + + +class RedisTopicExchange(RedisExchange): + """ + Exchange where all topic keys are split, sending to second half. + i.e. "compute.host" sends a message to "compute" running on "host" + """ + def run(self, topic): + while True: + member_name = self.redis.srandmember(topic) + + if not member_name: + # If this happens, there are no + # longer any members. + break + + if not self.matchmaker.is_alive(topic, member_name): + continue + + host = member_name.split('.', 1)[1] + return [(member_name, host)] + return [] + + +class RedisFanoutExchange(RedisExchange): + """ + Return a list of all hosts. + """ + def run(self, topic): + topic = topic.split('~', 1)[1] + hosts = self.redis.smembers(topic) + good_hosts = filter( + lambda host: self.matchmaker.is_alive(topic, host), hosts) + + return [(x, x.split('.', 1)[1]) for x in good_hosts] + + +class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase): + """ + MatchMaker registering and looking-up hosts with a Redis server. + """ + def __init__(self): + super(MatchMakerRedis, self).__init__() + + if not redis: + raise ImportError("Failed to import module redis.") + + self.redis = redis.StrictRedis( + host=CONF.matchmaker_redis.host, + port=CONF.matchmaker_redis.port, + password=CONF.matchmaker_redis.password) + + self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self)) + self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange()) + self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self)) + + def ack_alive(self, key, host): + topic = "%s.%s" % (key, host) + if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl): + # If we could not update the expiration, the key + # might have been pruned. Re-register, creating a new + # key in Redis. + self.register(self.topic_host[host], host) + + def is_alive(self, topic, host): + if self.redis.ttl(host) == -1: + self.expire(topic, host) + return False + return True + + def expire(self, topic, host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.delete(host) + pipe.srem(topic, host) + pipe.execute() + + def backend_register(self, key, key_host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.sadd(key, key_host) + + # No value is needed, we just + # care if it exists. Sets aren't viable + # because only keys can expire. + pipe.set(key_host, '') + + pipe.execute() + + def backend_unregister(self, key, key_host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.srem(key, key_host) + pipe.delete(key_host) + pipe.execute() diff --git a/ceilometer/openstack/common/setup.py b/ceilometer/openstack/common/setup.py index 22f864d50e..030df61c99 100644 --- a/ceilometer/openstack/common/setup.py +++ b/ceilometer/openstack/common/setup.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # Copyright 2012-2013 Hewlett-Packard Development Company, L.P. # All Rights Reserved. # @@ -43,6 +43,11 @@ def parse_mailmap(mailmap='.mailmap'): return mapping +def _parse_git_mailmap(git_dir, mailmap='.mailmap'): + mailmap = os.path.join(os.path.dirname(git_dir), mailmap) + return parse_mailmap(mailmap) + + def canonicalize_emails(changelog, mapping): """Takes in a string and an email alias mapping and replaces all instances of the aliases in the string with their real email. @@ -127,14 +132,26 @@ def _run_shell_command(cmd, throw_on_error=False): return out[0].strip() +def _get_git_directory(): + parent_dir = os.path.dirname(__file__) + while True: + git_dir = os.path.join(parent_dir, '.git') + if os.path.exists(git_dir): + return git_dir + parent_dir, child = os.path.split(parent_dir) + if not child: # reached to root dir + return None + + def write_git_changelog(): """Write a changelog based on the git changelog.""" new_changelog = 'ChangeLog' + git_dir = _get_git_directory() if not os.getenv('SKIP_WRITE_GIT_CHANGELOG'): - if os.path.exists('.git'): - git_log_cmd = 'git log --stat' + if git_dir: + git_log_cmd = 'git --git-dir=%s log' % git_dir changelog = _run_shell_command(git_log_cmd) - mailmap = parse_mailmap() + mailmap = _parse_git_mailmap(git_dir) with open(new_changelog, "w") as changelog_file: changelog_file.write(canonicalize_emails(changelog, mailmap)) else: @@ -146,13 +163,15 @@ def generate_authors(): jenkins_email = 'jenkins@review.(openstack|stackforge).org' old_authors = 'AUTHORS.in' new_authors = 'AUTHORS' + git_dir = _get_git_directory() if not os.getenv('SKIP_GENERATE_AUTHORS'): - if os.path.exists('.git'): + if git_dir: # don't include jenkins email address in AUTHORS file - git_log_cmd = ("git log --format='%aN <%aE>' | sort -u | " + git_log_cmd = ("git --git-dir=" + git_dir + + " log --format='%aN <%aE>' | sort -u | " "egrep -v '" + jenkins_email + "'") changelog = _run_shell_command(git_log_cmd) - mailmap = parse_mailmap() + mailmap = _parse_git_mailmap(git_dir) with open(new_authors, 'w') as new_authors_fh: new_authors_fh.write(canonicalize_emails(changelog, mailmap)) if os.path.exists(old_authors): @@ -258,19 +277,21 @@ def get_cmdclass(): return cmdclass -def _get_revno(): +def _get_revno(git_dir): """Return the number of commits since the most recent tag. We use git-describe to find this out, but if there are no tags then we fall back to counting commits since the beginning of time. """ - describe = _run_shell_command("git describe --always") + describe = _run_shell_command( + "git --git-dir=%s describe --always" % git_dir) if "-" in describe: return describe.rsplit("-", 2)[-2] # no tags found - revlist = _run_shell_command("git rev-list --abbrev-commit HEAD") + revlist = _run_shell_command( + "git --git-dir=%s rev-list --abbrev-commit HEAD" % git_dir) return len(revlist.splitlines()) @@ -279,18 +300,21 @@ def _get_version_from_git(pre_version): revision if there is one, or tag plus number of additional revisions if the current revision has no tag.""" - if os.path.exists('.git'): + git_dir = _get_git_directory() + if git_dir: if pre_version: try: return _run_shell_command( - "git describe --exact-match", + "git --git-dir=" + git_dir + " describe --exact-match", throw_on_error=True).replace('-', '.') except Exception: - sha = _run_shell_command("git log -n1 --pretty=format:%h") - return "%s.a%s.g%s" % (pre_version, _get_revno(), sha) + sha = _run_shell_command( + "git --git-dir=" + git_dir + " log -n1 --pretty=format:%h") + return "%s.a%s.g%s" % (pre_version, _get_revno(git_dir), sha) else: return _run_shell_command( - "git describe --always").replace('-', '.') + "git --git-dir=" + git_dir + " describe --always").replace( + '-', '.') return None diff --git a/ceilometer/openstack/common/timeutils.py b/ceilometer/openstack/common/timeutils.py index e2c2740573..6094365907 100644 --- a/ceilometer/openstack/common/timeutils.py +++ b/ceilometer/openstack/common/timeutils.py @@ -1,6 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 -# Copyright 2011 OpenStack LLC. +# Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -25,18 +25,22 @@ import datetime import iso8601 -TIME_FORMAT = "%Y-%m-%dT%H:%M:%S" -PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f" +# ISO 8601 extended time format with microseconds +_ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f' +_ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S' +PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND -def isotime(at=None): +def isotime(at=None, subsecond=False): """Stringify time in ISO 8601 format""" if not at: at = utcnow() - str = at.strftime(TIME_FORMAT) + st = at.strftime(_ISO8601_TIME_FORMAT + if not subsecond + else _ISO8601_TIME_FORMAT_SUBSECOND) tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC' - str += ('Z' if tz == 'UTC' else tz) - return str + st += ('Z' if tz == 'UTC' else tz) + return st def parse_isotime(timestr): diff --git a/ceilometer/openstack/common/version.py b/ceilometer/openstack/common/version.py index 5500f1b98b..d1f8368275 100644 --- a/ceilometer/openstack/common/version.py +++ b/ceilometer/openstack/common/version.py @@ -1,5 +1,5 @@ -# Copyright 2012 OpenStack LLC +# Copyright 2012 OpenStack Foundation # Copyright 2012-2013 Hewlett-Packard Development Company, L.P. # # Licensed under the Apache License, Version 2.0 (the "License"); you may