Sync with Oslo-Incubator 1fd7694e

Change-Id: Iba4619e4a2b50c3e471133c4f65826a0be7c9e90
This commit is contained in:
Kiall Mac Innes 2012-11-29 23:41:08 +00:00
parent 5f38604642
commit 400dae7348
30 changed files with 475 additions and 440 deletions

View File

@ -27,8 +27,8 @@ class Service(wsgi.Service):
def __init__(self, backlog=128, threads=1000):
super(Service, self).__init__(threads) = cfg.CONF.api_host
self.port = cfg.CONF.api_port
self._host = cfg.CONF.api_host
self._port = cfg.CONF.api_port
self.backlog = backlog
config_path = utils.find_config(cfg.CONF.api_paste_config)

View File

@ -205,27 +205,13 @@ Option values may reference other values using PEP 292 string substitution::
Note that interpolation can be avoided by using '$$'.
For command line utilities that dispatch to other command line utilities, the
disable_interspersed_args() method is available. If this this method is called,
then parsing e.g.::
script --verbose cmd --debug /tmp/mything
will no longer return::
['cmd', '/tmp/mything']
as the leftover arguments, but will instead return::
['cmd', '--debug', '/tmp/mything']
i.e. argument parsing is stopped at the first non-option argument.
FIXME(markmc): document add_cli_subparsers()
Options may be declared as required so that an error is raised if the user
does not supply a value for the option.
Options may be declared as secret so that their values are not leaked into
log files:
log files::
opts = [
cfg.StrOpt('s3_store_access_key', secret=True),
@ -234,9 +220,9 @@ log files:
This module also contains a global instance of the CommonConfigOpts class
in order to support a common usage pattern in OpenStack:
in order to support a common usage pattern in OpenStack::
from openstack.common import cfg
from moniker.openstack.common import cfg
opts = [
cfg.StrOpt('bind_host', default=''),
@ -251,11 +237,11 @@ in order to support a common usage pattern in OpenStack:
import argparse
import collections
import copy
import functools
import glob
import optparse
import os
import string
import sys
@ -489,6 +475,8 @@ class Opt(object):
a single character CLI option name
the default value of the option
True if the option is a positional CLI argument
the name shown as the argument to a CLI option in --help output
@ -497,8 +485,8 @@ class Opt(object):
multi = False
def __init__(self, name, dest=None, short=None, default=None,
metavar=None, help=None, secret=False, required=False,
positional=False, metavar=None, help=None,
secret=False, required=False, deprecated_name=None):
"""Construct an Opt object.
The only required parameter is the option's name. However, it is
@ -508,6 +496,7 @@ class Opt(object):
:param dest: the name of the corresponding ConfigOpts property
:param short: a single character CLI option name
:param default: the default value of the option
:param positional: True if the option is a positional CLI argument
:param metavar: the option argument to show in --help
:param help: an explanation of how the option is used
:param secret: true iff the value should be obfuscated in log output
@ -521,6 +510,7 @@ class Opt(object):
self.dest = dest
self.short = short
self.default = default
self.positional = positional
self.metavar = metavar = help
self.secret = secret
@ -561,64 +551,72 @@ class Opt(object):
:param parser: the CLI option parser
:param group: an optional OptGroup object
container = self._get_optparse_container(parser, group)
kwargs = self._get_optparse_kwargs(group)
prefix = self._get_optparse_prefix('', group)
self._add_to_optparse(container,, self.short, kwargs, prefix,
container = self._get_argparse_container(parser, group)
kwargs = self._get_argparse_kwargs(group)
prefix = self._get_argparse_prefix('', group)
self._add_to_argparse(container,, self.short, kwargs, prefix,
self.positional, self.deprecated_name)
def _add_to_optparse(self, container, name, short, kwargs, prefix='',
"""Add an option to an optparse parser or group.
def _add_to_argparse(self, container, name, short, kwargs, prefix='',
positional=False, deprecated_name=None):
"""Add an option to an argparse parser or group.
:param container: an optparse.OptionContainer object
:param container: an argparse._ArgumentGroup object
:param name: the opt name
:param short: the short opt name
:param kwargs: the keyword arguments for add_option()
:param kwargs: the keyword arguments for add_argument()
:param prefix: an optional prefix to prepend to the opt name
:param position: whether the optional is a positional CLI argument
:raises: DuplicateOptError if a naming confict is detected
args = ['--' + prefix + name]
def hyphen(arg):
return arg if not positional else ''
args = [hyphen('--') + prefix + name]
if short:
args += ['-' + short]
args.append(hyphen('-') + short)
if deprecated_name:
args += ['--' + prefix + deprecated_name]
for a in args:
if container.has_option(a):
raise DuplicateOptError(a)
container.add_option(*args, **kwargs)
args.append(hyphen('--') + prefix + deprecated_name)
def _get_optparse_container(self, parser, group):
"""Returns an optparse.OptionContainer.
container.add_argument(*args, **kwargs)
except argparse.ArgumentError as e:
raise DuplicateOptError(e)
:param parser: an optparse.OptionParser
def _get_argparse_container(self, parser, group):
"""Returns an argparse._ArgumentGroup.
:param parser: an argparse.ArgumentParser
:param group: an (optional) OptGroup object
:returns: an optparse.OptionGroup if a group is given, else the parser
:returns: an argparse._ArgumentGroup if group is given, else parser
if group is not None:
return group._get_optparse_group(parser)
return group._get_argparse_group(parser)
return parser
def _get_optparse_kwargs(self, group, **kwargs):
"""Build a dict of keyword arguments for optparse's add_option().
def _get_argparse_kwargs(self, group, **kwargs):
"""Build a dict of keyword arguments for argparse's add_argument().
Most opt types extend this method to customize the behaviour of the
options added to optparse.
options added to argparse.
:param group: an optional group
:param kwargs: optional keyword arguments to add to
:returns: a dict of keyword arguments
if not self.positional:
dest = self.dest
if group is not None:
dest = + '_' + dest
kwargs.update({'dest': dest,
'metavar': self.metavar,
kwargs['dest'] = dest
kwargs['nargs'] = '?'
kwargs.update({'metavar': self.metavar,
'help':, })
return kwargs
def _get_optparse_prefix(self, prefix, group):
def _get_argparse_prefix(self, prefix, group):
"""Build a prefix for the CLI option name, if required.
CLI options in a group are prefixed with the group's name in order
@ -656,6 +654,11 @@ class BoolOpt(Opt):
_boolean_states = {'1': True, 'yes': True, 'true': True, 'on': True,
'0': False, 'no': False, 'false': False, 'off': False}
def __init__(self, *args, **kwargs):
if 'positional' in kwargs:
raise ValueError('positional boolean args not supported')
super(BoolOpt, self).__init__(*args, **kwargs)
def _get_from_config_parser(self, cparser, section):
"""Retrieve the opt value as a boolean from ConfigParser."""
def convert_bool(v):
@ -671,21 +674,32 @@ class BoolOpt(Opt):
def _add_to_cli(self, parser, group=None):
"""Extends the base class method to add the --nooptname option."""
super(BoolOpt, self)._add_to_cli(parser, group)
self._add_inverse_to_optparse(parser, group)
self._add_inverse_to_argparse(parser, group)
def _add_inverse_to_optparse(self, parser, group):
def _add_inverse_to_argparse(self, parser, group):
"""Add the --nooptname option to the option parser."""
container = self._get_optparse_container(parser, group)
kwargs = self._get_optparse_kwargs(group, action='store_false')
prefix = self._get_optparse_prefix('no', group)
container = self._get_argparse_container(parser, group)
kwargs = self._get_argparse_kwargs(group, action='store_false')
prefix = self._get_argparse_prefix('no', group)
kwargs["help"] = "The inverse of --" +
self._add_to_optparse(container,, None, kwargs, prefix,
self._add_to_argparse(container,, None, kwargs, prefix,
self.positional, self.deprecated_name)
def _get_optparse_kwargs(self, group, action='store_true', **kwargs):
"""Extends the base optparse keyword dict for boolean options."""
return super(BoolOpt,
self)._get_optparse_kwargs(group, action=action, **kwargs)
def _get_argparse_kwargs(self, group, action='store_true', **kwargs):
"""Extends the base argparse keyword dict for boolean options."""
kwargs = super(BoolOpt, self)._get_argparse_kwargs(group, **kwargs)
# metavar has no effect for BoolOpt
if 'metavar' in kwargs:
del kwargs['metavar']
if action != 'store_true':
action = 'store_false'
kwargs['action'] = action
return kwargs
class IntOpt(Opt):
@ -697,10 +711,10 @@ class IntOpt(Opt):
return [int(v) for v in self._cparser_get_with_deprecated(cparser,
def _get_optparse_kwargs(self, group, **kwargs):
"""Extends the base optparse keyword dict for integer options."""
def _get_argparse_kwargs(self, group, **kwargs):
"""Extends the base argparse keyword dict for integer options."""
return super(IntOpt,
self)._get_optparse_kwargs(group, type='int', **kwargs)
self)._get_argparse_kwargs(group, type=int, **kwargs)
class FloatOpt(Opt):
@ -712,10 +726,10 @@ class FloatOpt(Opt):
return [float(v) for v in
self._cparser_get_with_deprecated(cparser, section)]
def _get_optparse_kwargs(self, group, **kwargs):
"""Extends the base optparse keyword dict for float options."""
return super(FloatOpt,
self)._get_optparse_kwargs(group, type='float', **kwargs)
def _get_argparse_kwargs(self, group, **kwargs):
"""Extends the base argparse keyword dict for float options."""
return super(FloatOpt, self)._get_argparse_kwargs(group,
type=float, **kwargs)
class ListOpt(Opt):
@ -725,24 +739,27 @@ class ListOpt(Opt):
is a list containing these strings.
class _StoreListAction(argparse.Action):
An argparse action for parsing an option value into a list.
def __call__(self, parser, namespace, values, option_string=None):
if values is not None:
values = [a.strip() for a in values.split(',')]
setattr(namespace, self.dest, values)
def _get_from_config_parser(self, cparser, section):
"""Retrieve the opt value as a list from ConfigParser."""
return [v.split(',') for v in
self._cparser_get_with_deprecated(cparser, section)]
def _get_optparse_kwargs(self, group, **kwargs):
"""Extends the base optparse keyword dict for list options."""
return super(ListOpt,
def _get_argparse_kwargs(self, group, **kwargs):
"""Extends the base argparse keyword dict for list options."""
return Opt._get_argparse_kwargs(self,
def _parse_list(self, option, opt, value, parser):
"""An optparse callback for parsing an option value into a list."""
setattr(parser.values, self.dest, value.split(','))
class MultiStrOpt(Opt):
@ -752,10 +769,14 @@ class MultiStrOpt(Opt):
multi = True
def _get_optparse_kwargs(self, group, **kwargs):
"""Extends the base optparse keyword dict for multi str options."""
return super(MultiStrOpt,
self)._get_optparse_kwargs(group, action='append')
def _get_argparse_kwargs(self, group, **kwargs):
"""Extends the base argparse keyword dict for multi str options."""
kwargs = super(MultiStrOpt, self)._get_argparse_kwargs(group)
if not self.positional:
kwargs['action'] = 'append'
kwargs['nargs'] = '*'
return kwargs
def _cparser_get_with_deprecated(self, cparser, section):
"""If cannot find option as dest try deprecated_name alias."""
@ -800,19 +821,20 @@ class OptGroup(object): = help
self._opts = {} # dict of dicts of (opt:, override:, default:)
self._optparse_group = None
self._argparse_group = None
def _register_opt(self, opt):
def _register_opt(self, opt, cli=False):
"""Add an opt to this group.
:param opt: an Opt object
:param cli: whether this is a CLI option
:returns: False if previously registered, True otherwise
:raises: DuplicateOptError if a naming conflict is detected
if _is_opt_registered(self._opts, opt):
return False
self._opts[opt.dest] = {'opt': opt}
self._opts[opt.dest] = {'opt': opt, 'cli': cli}
return True
@ -824,16 +846,16 @@ class OptGroup(object):
if opt.dest in self._opts:
del self._opts[opt.dest]
def _get_optparse_group(self, parser):
"""Build an optparse.OptionGroup for this group."""
if self._optparse_group is None:
self._optparse_group = optparse.OptionGroup(parser, self.title,
def _get_argparse_group(self, parser):
if self._argparse_group is None:
"""Build an argparse._ArgumentGroup for this group."""
self._argparse_group = parser.add_argument_group(self.title,
return self._optparse_group
return self._argparse_group
def _clear(self):
"""Clear this group's option parsing state."""
self._optparse_group = None
self._argparse_group = None
class ParseError(iniparser.ParseError):
@ -928,26 +950,45 @@ class ConfigOpts(collections.Mapping):
self._groups = {}
self._args = None
# for subparser support, a root parser should be initialized earlier
# and conserved for later use
self._pre_init_parser = None
self._oparser = None
self._cparser = None
self._cli_values = {}
self.__cache = {}
self._config_opts = []
self._disable_interspersed_args = False
def _setup(self, project, prog, version, usage, default_config_files):
"""Initialize a ConfigOpts object for option parsing."""
def _pre_setup(self, project, prog, version, usage, default_config_files):
"""Initialize a ConfigCliParser object for option parsing."""
if prog is None:
prog = os.path.basename(sys.argv[0])
if default_config_files is None:
default_config_files = find_config_files(project, prog)
self._oparser = optparse.OptionParser(prog=prog,
if self._disable_interspersed_args:
# if _pre_init_parser does not exist, create one
if self._pre_init_parser is None:
self._oparser = argparse.ArgumentParser(prog=prog, usage=usage)
# otherwise, use the pre-initialized parser with subparsers
# and re-initialize parser
self._oparser = self._pre_init_parser
self._oparser.prog = prog
self._oparser.version = version
self._oparser.usage = usage
self._pre_init_parser = None
return prog, default_config_files
def _setup(self, project, prog, version, usage, default_config_files):
"""Initialize a ConfigOpts object for option parsing."""
self._config_opts = [
@ -1017,18 +1058,23 @@ class ConfigOpts(collections.Mapping):
:raises: SystemExit, ConfigFilesNotFoundError, ConfigFileParseError,
RequiredOptError, DuplicateOptError
prog, default_config_files = self._pre_setup(project,
self._setup(project, prog, version, usage, default_config_files)
self._cli_values, leftovers = self._parse_cli_opts(args)
self._cli_values = self._parse_cli_opts(args)
return leftovers
def __getattr__(self, name):
"""Look up an option value and perform string substitution.
@ -1055,6 +1101,13 @@ class ConfigOpts(collections.Mapping):
"""Return the number of options and option groups."""
return len(self._opts) + len(self._groups)
def add_cli_subparsers(self, **kwargs):
# only add subparsers to pre-initialized root parser
# to avoid cleared by self.clear()
if self._pre_init_parser is None:
self._pre_init_parser = argparse.ArgumentParser()
return self._pre_init_parser.add_subparsers(**kwargs)
def reset(self):
"""Clear the object state and unset overrides and defaults."""
@ -1072,7 +1125,7 @@ class ConfigOpts(collections.Mapping):
def register_opt(self, opt, group=None):
def register_opt(self, opt, group=None, cli=False):
"""Register an option schema.
Registering an option schema makes any option value which is previously
@ -1080,17 +1133,19 @@ class ConfigOpts(collections.Mapping):
as an attribute of this object.
:param opt: an instance of an Opt sub-class
:param cli: whether this is a CLI option
:param group: an optional OptGroup object or group name
:return: False if the opt was already register, True otherwise
:raises: DuplicateOptError
if group is not None:
return self._get_group(group, autocreate=True)._register_opt(opt)
group = self._get_group(group, autocreate=True)
return group._register_opt(opt, cli)
if _is_opt_registered(self._opts, opt):
return False
self._opts[opt.dest] = {'opt': opt}
self._opts[opt.dest] = {'opt': opt, 'cli': cli}
return True
@ -1116,7 +1171,7 @@ class ConfigOpts(collections.Mapping):
if self._args is not None:
raise ArgsAlreadyParsedError("cannot register CLI option")
return self.register_opt(opt, group, clear_cache=False)
return self.register_opt(opt, group, cli=True, clear_cache=False)
def register_cli_opts(self, opts, group=None):
@ -1243,9 +1298,10 @@ class ConfigOpts(collections.Mapping):
for info in group._opts.values():
yield info, group
def _all_opts(self):
"""A generator function for iteration opts."""
def _all_cli_opts(self):
"""A generator function for iterating CLI opts."""
for info, group in self._all_opt_infos():
if info['cli']:
yield info['opt'], group
def _unset_defaults_and_overrides(self):
@ -1254,31 +1310,6 @@ class ConfigOpts(collections.Mapping):
info.pop('default', None)
info.pop('override', None)
def disable_interspersed_args(self):
"""Set parsing to stop on the first non-option.
If this this method is called, then parsing e.g.
script --verbose cmd --debug /tmp/mything
will no longer return:
['cmd', '/tmp/mything']
as the leftover arguments, but will instead return:
['cmd', '--debug', '/tmp/mything']
i.e. argument parsing is stopped at the first non-option argument.
self._disable_interspersed_args = True
def enable_interspersed_args(self):
"""Set parsing to not stop on the first non-option.
This it the default behaviour."""
self._disable_interspersed_args = False
def find_file(self, name):
"""Locate a file located alongside the config files.
@ -1401,6 +1432,10 @@ class ConfigOpts(collections.Mapping):
if not opt.multi:
return value
# argparse ignores default=None for nargs='*'
if opt.positional and not value:
value = opt.default
return value + values
if values:
@ -1523,12 +1558,10 @@ class ConfigOpts(collections.Mapping):
self._args = args
for opt, group in self._all_opts():
for opt, group in self._all_cli_opts():
opt._add_to_cli(self._oparser, group)
values, leftovers = self._oparser.parse_args(args)
return vars(values), leftovers
return vars(self._oparser.parse_args(args))
class GroupAttr(collections.Mapping):
@ -1543,12 +1576,12 @@ class ConfigOpts(collections.Mapping):
:param conf: a ConfigOpts object
:param group: an OptGroup object
self.conf = conf = group
self._conf = conf
self._group = group
def __getattr__(self, name):
"""Look up an option value and perform template substitution."""
return self.conf._get(name,
return self._conf._get(name, self._group)
def __getitem__(self, key):
"""Look up an option value and perform string substitution."""
@ -1556,16 +1589,16 @@ class ConfigOpts(collections.Mapping):
def __contains__(self, key):
"""Return True if key is the name of a registered opt or group."""
return key in
return key in self._group._opts
def __iter__(self):
"""Iterate over all registered opt and group names."""
for key in
for key in self._group._opts.keys():
yield key
def __len__(self):
"""Return the number of options and option groups."""
return len(
return len(self._group._opts)
class StrSubWrapper(object):
@ -1623,12 +1656,12 @@ class CommonConfigOpts(ConfigOpts):
help='A logging.Formatter log message format string which may '
'use any of the available logging.LogRecord attributes. '
'Default: %default'),
'Default: %(default)s'),
help='Format string for %(asctime)s in log records. '
'Default: %default'),
help='Format string for %%(asctime)s in log records. '
'Default: %(default)s'),
help='(Optional) Name of log file to output to. '

View File

@ -46,7 +46,7 @@ def _find_objects(t):
def _print_greenthreads():
for i, gt in enumerate(find_objects(greenlet.greenlet)):
for i, gt in enumerate(_find_objects(greenlet.greenlet)):
print i, gt
@ -61,7 +61,7 @@ def initialize_if_enabled():
if CONF.backdoor_port is None:
return None
# NOTE(johannes): The standard sys.displayhook will print the value of
# the last expression and set it to __builtin__._, which overwrites
@ -73,6 +73,8 @@ def initialize_if_enabled():
sys.displayhook = displayhook
eventlet.listen(('localhost', CONF.backdoor_port)),
sock = eventlet.listen(('localhost', CONF.backdoor_port))
port = sock.getsockname()[1]
eventlet.spawn_n(eventlet.backdoor.backdoor_server, sock,
return port

View File

@ -21,17 +21,7 @@ Exceptions common to OpenStack projects
import logging
class ProcessExecutionError(IOError):
def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
if description is None:
description = "Unexpected error while running command."
if exit_code is None:
exit_code = '-'
message = "%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" % (
description, cmd, exit_code, stdout, stderr)
IOError.__init__(self, message)
from moniker.openstack.common.gettextutils import _
class Error(Exception):
@ -109,7 +99,7 @@ def wrap_exception(f):
except Exception, e:
if not isinstance(e, Error):
#exc_type, exc_value, exc_traceback = sys.exc_info()
logging.exception('Uncaught exception')
logging.exception(_('Uncaught exception'))
raise Error(str(e))

View File

@ -24,6 +24,8 @@ import logging
import sys
import traceback
from moniker.openstack.common.gettextutils import _
def save_and_reraise_exception():
@ -43,7 +45,7 @@ def save_and_reraise_exception():
except Exception:
logging.error('Original exception being dropped: %s' %
(traceback.format_exception(type_, value, tb)))
logging.error(_('Original exception being dropped: %s'),
traceback.format_exception(type_, value, tb))
raise type_, value, tb

View File

@ -20,7 +20,7 @@ gettext for openstack-common modules.
Usual usage in an openstack.common module:
from openstack.common.gettextutils import _
from moniker.openstack.common.gettextutils import _
import gettext

View File

@ -29,7 +29,7 @@ def import_class(import_str):
return getattr(sys.modules[mod_str], class_str)
except (ValueError, AttributeError), exc:
except (ValueError, AttributeError):
raise ImportError('Class %s cannot be found (%s)' %

View File

@ -126,7 +126,7 @@ def to_primitive(value, convert_instances=False, level=0):
level=level + 1)
return value
except TypeError, e:
except TypeError:
# Class objects are tricky since they may define something like
# __iter__ defined but it isn't callable as list().
return unicode(value)

View File

@ -50,7 +50,7 @@ from moniker.openstack.common import notifier
log_opts = [
default='%(asctime)s %(levelname)s %(name)s [%(request_id)s '
'%(user_id)s %(project_id)s] %(instance)s'
'%(user)s %(tenant)s] %(instance)s'
help='format string to use for log messages with context'),
@ -174,7 +174,7 @@ class ContextAdapter(logging.LoggerAdapter):
self.log(logging.AUDIT, msg, *args, **kwargs)
def deprecated(self, msg, *args, **kwargs):
stdmsg = _("Deprecated Config: %s") % msg
stdmsg = _("Deprecated: %s") % msg
if CONF.fatal_deprecations:
self.critical(stdmsg, *args, **kwargs)
raise DeprecatedConfig(msg=stdmsg)

View File

@ -24,6 +24,7 @@ from eventlet import greenthread
from moniker.openstack.common.gettextutils import _
from moniker.openstack.common import log as logging
from moniker.openstack.common import timeutils
LOG = logging.getLogger(__name__)
@ -62,10 +63,16 @@ class LoopingCall(object):
while self._running:
start = timeutils.utcnow()
self.f(*self.args, **
end = timeutils.utcnow()
if not self._running:
delay = interval - timeutils.delta_seconds(start, end)
if delay <= 0:
LOG.warn(_('task run outlasted interval by %s sec') %
greenthread.sleep(delay if delay > 0 else 0)
except LoopingCallDone, e:
@ -78,7 +85,7 @@ class LoopingCall(object):
self.done = done
return self.done
def stop(self):

View File

@ -137,10 +137,11 @@ def notify(context, publisher_id, event_type, priority, payload):
for driver in _get_drivers():
driver.notify(context, msg)
except Exception, e:
except Exception as e:
LOG.exception(_("Problem '%(e)s' attempting to "
"send to notification system. "
"Payload=%(payload)s") % locals())
% dict(e=e, payload=payload))
_drivers = None
@ -166,7 +167,7 @@ def add_driver(notification_driver):
driver = importutils.import_module(notification_driver)
_drivers[notification_driver] = driver
except ImportError as e:
except ImportError:
LOG.exception(_("Failed to load notifier %s. "
"These notifications will not be sent.") %

View File

@ -1,4 +1,4 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2012 Red Hat, Inc.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -14,33 +14,16 @@
# under the License.
from moniker.openstack.common import cfg
from moniker.openstack.common import context as req_context
from moniker.openstack.common.gettextutils import _
from moniker.openstack.common import log as logging
from moniker.openstack.common import rpc
from moniker.openstack.common.notifier import rpc_notifier
LOG = logging.getLogger(__name__)
notification_topic_opt = cfg.ListOpt(
'notification_topics', default=['notifications', ],
help='AMQP topic used for openstack notifications')
def notify(context, message):
"""Sends a notification to the RabbitMQ"""
if not context:
context = req_context.get_admin_context()
priority = message.get('priority',
priority = priority.lower()
for topic in CONF.notification_topics:
topic = '%s.%s' % (topic, priority)
rpc.notify(context, topic, message)
except Exception, e:
LOG.exception(_("Could not send notification to %(topic)s. "
"Payload=%(message)s"), locals())
"""Deprecated in Grizzly. Please use rpc_notifier instead."""
LOG.deprecated(_("The rabbit_notifier is now deprecated."
" Please use rpc_notifier instead."))
rpc_notifier.notify(context, message)

View File

@ -0,0 +1,46 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from moniker.openstack.common import cfg
from moniker.openstack.common import context as req_context
from moniker.openstack.common.gettextutils import _
from moniker.openstack.common import log as logging
from moniker.openstack.common import rpc
LOG = logging.getLogger(__name__)
notification_topic_opt = cfg.ListOpt(
'notification_topics', default=['notifications', ],
help='AMQP topic used for openstack notifications')
def notify(context, message):
"""Sends a notification via RPC"""
if not context:
context = req_context.get_admin_context()
priority = message.get('priority',
priority = priority.lower()
for topic in CONF.notification_topics:
topic = '%s.%s' % (topic, priority)
rpc.notify(context, topic, message)
except Exception:
LOG.exception(_("Could not send notification to %(topic)s. "
"Payload=%(message)s"), locals())

View File

@ -250,7 +250,7 @@ def queue_get_for(context, topic, host):
Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on
return '%s.%s' % (topic, host)
return '%s.%s' % (topic, host) if host else topic

View File

@ -26,7 +26,6 @@ AMQP, but is deprecated and predates this code.
import inspect
import logging
import sys
import uuid
@ -38,6 +37,7 @@ from moniker.openstack.common import cfg
from moniker.openstack.common import excutils
from moniker.openstack.common.gettextutils import _
from moniker.openstack.common import local
from moniker.openstack.common import log as logging
from moniker.openstack.common.rpc import common as rpc_common
@ -55,7 +55,7 @@ class Pool(pools.Pool):
# TODO(comstud): Timeout connections not used in a while
def create(self):
LOG.debug('Pool creating new connection')
LOG.debug(_('Pool creating new connection'))
return self.connection_cls(self.conf)
def empty(self):
@ -285,8 +285,8 @@ class ProxyCallback(object):
ctxt.reply(rval, None, connection_pool=self.connection_pool)
# This final None tells multicall that it is done.
ctxt.reply(ending=True, connection_pool=self.connection_pool)
except Exception as e:
LOG.exception('Exception during message handling')
except Exception:
LOG.exception(_('Exception during message handling'))
ctxt.reply(None, sys.exc_info(),
@ -410,8 +410,9 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
def notify(conf, context, topic, msg, connection_pool):
"""Sends a notification event on a topic."""
event_type = msg.get('event_type')
LOG.debug(_('Sending %(event_type)s on %(topic)s'), locals())
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.notify_send(topic, msg)

View File

@ -18,13 +18,13 @@
# under the License.
import copy
import logging
import traceback
from moniker.openstack.common.gettextutils import _
from moniker.openstack.common import importutils
from moniker.openstack.common import jsonutils
from moniker.openstack.common import local
from moniker.openstack.common import log as logging
LOG = logging.getLogger(__name__)
@ -40,7 +40,7 @@ class RPCException(Exception):
message = self.message % kwargs
except Exception as e:
except Exception:
# kwargs doesn't match a variable in the message
# log the issue and the kwargs
LOG.exception(_('Exception in string format operation'))
@ -273,7 +273,7 @@ def deserialize_remote_exception(conf, data):
# we cannot necessarily change an exception message so we must override
# the __str__ method.
failure.__class__ = new_ex_type
except TypeError as e:
except TypeError:
# NOTE(ameade): If a core exception then just add the traceback to the
# first exception argument.
failure.args = (message,) + failure.args[1:]

View File

@ -41,8 +41,8 @@ server side of the API at the same time. However, as the code stands today,
there can be both versioned and unversioned APIs implemented in the same code
Nova was the first project to use versioned rpc APIs. Consider the compute rpc
API as an example. The client side is in nova/compute/ and the server
@ -50,12 +50,13 @@ side is in nova/compute/
Example 1) Adding a new method.
Adding a new method is a backwards compatible change. It should be added to
nova/compute/, and RPC_API_VERSION should be bumped from X.Y to
X.Y+1. On the client side, the new method in nova/compute/ should
have a specific version specified to indicate the minimum API version that must
be implemented for the method to be supported. For example:
be implemented for the method to be supported. For example::
def get_host_uptime(self, ctxt, host):
topic = _compute_topic(self.topic, ctxt, host, None)
@ -67,10 +68,11 @@ get_host_uptime() method.
Example 2) Adding a new parameter.
Adding a new parameter to an rpc method can be made backwards compatible. The
RPC_API_VERSION on the server side (nova/compute/ should be bumped.
The implementation of the method must not expect the parameter to be present.
The implementation of the method must not expect the parameter to be present.::
def some_remote_method(self, arg1, arg2, newarg=None):
# The code needs to deal with newarg=None for cases

View File

@ -409,18 +409,18 @@ class Connection(object):
hostname, port = network_utils.parse_host_port(
adr, default_port=self.conf.rabbit_port)
params = {}
params = {
'hostname': hostname,
'port': port,
'userid': self.conf.rabbit_userid,
'password': self.conf.rabbit_password,
'virtual_host': self.conf.rabbit_virtual_host,
for sp_key, value in server_params.iteritems():
p_key = server_params_to_kombu_params.get(sp_key, sp_key)
params[p_key] = value
params.setdefault('hostname', hostname)
params.setdefault('port', port)
params.setdefault('userid', self.conf.rabbit_userid)
params.setdefault('password', self.conf.rabbit_password)
params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
if self.conf.fake_rabbit:
params['transport'] = 'memory'
if self.conf.rabbit_use_ssl:
@ -785,7 +785,7 @@ def cast_to_server(conf, context, server_params, topic, msg):
def fanout_cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a fanout exchange to a specific server."""
return rpc_amqp.cast_to_server(
return rpc_amqp.fanout_cast_to_server(
conf, context, server_params, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))

View File

@ -17,7 +17,6 @@
import functools
import itertools
import logging
import time
import uuid
@ -29,6 +28,7 @@ import qpid.messaging.exceptions
from moniker.openstack.common import cfg
from moniker.openstack.common.gettextutils import _
from moniker.openstack.common import jsonutils
from moniker.openstack.common import log as logging
from moniker.openstack.common.rpc import amqp as rpc_amqp
from moniker.openstack.common.rpc import common as rpc_common
@ -41,6 +41,9 @@ qpid_opts = [
help='Qpid broker port'),
help='Qpid HA cluster host:port pairs'),
help='Username for qpid connection'),
@ -50,24 +53,6 @@ qpid_opts = [
help='Space separated list of SASL mechanisms to use for auth'),
help='Automatically reconnect'),
help='Reconnection timeout in seconds'),
help='Max reconnections before giving up'),
help='Minimum seconds between reconnection attempts'),
help='Maximum seconds between reconnection attempts'),
help='Equivalent to setting max and min to the same value'),
help='Seconds between connection keepalive heartbeats'),
@ -294,50 +279,35 @@ class Connection(object):
self.consumer_thread = None
self.conf = conf
if server_params is None:
server_params = {}
params = {
'qpid_hosts': self.conf.qpid_hosts,
'username': self.conf.qpid_username,
'password': self.conf.qpid_password,
params.update(server_params or {})
default_params = dict(hostname=self.conf.qpid_hostname,
self.brokers = params['qpid_hosts']
self.username = params['username']
self.password = params['password']
params = server_params
for key in default_params.keys():
params.setdefault(key, default_params[key]) = params['hostname'] + ":" + str(params['port'])
def connection_create(self, broker):
# Create the connection - this does not open the connection
self.connection = qpid.messaging.Connection(
self.connection = qpid.messaging.Connection(broker)
# Check if flags are set and if so set them for the connection
# before we call open
self.connection.username = params['username']
self.connection.password = params['password']
self.connection.username = self.username
self.connection.password = self.password
self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
self.connection.reconnect = self.conf.qpid_reconnect
if self.conf.qpid_reconnect_timeout:
self.connection.reconnect_timeout = (
if self.conf.qpid_reconnect_limit:
self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
if self.conf.qpid_reconnect_interval_max:
self.connection.reconnect_interval_max = (
if self.conf.qpid_reconnect_interval_min:
self.connection.reconnect_interval_min = (
if self.conf.qpid_reconnect_interval:
self.connection.reconnect_interval = (
# Reconnection is done by self.reconnect()
self.connection.reconnect = False
self.connection.heartbeat = self.conf.qpid_heartbeat
self.connection.protocol = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
# Open is part of reconnect -
# NOTE(WGH) not sure we need this with the reconnect flags
def _register_consumer(self, consumer):
self.consumers[str(consumer.get_receiver())] = consumer
@ -352,23 +322,36 @@ class Connection(object):
except qpid.messaging.exceptions.ConnectionError:
attempt = 0
delay = 1
while True:
broker = self.brokers[attempt % len(self.brokers)]
attempt += 1
except qpid.messaging.exceptions.ConnectionError, e:
LOG.error(_('Unable to connect to AMQP server: %s'), e)
time.sleep(self.conf.qpid_reconnect_interval or 1)
msg_dict = dict(e=e, delay=delay)
msg = _("Unable to connect to AMQP server: %(e)s. "
"Sleeping %(delay)s seconds") % msg_dict
delay = min(2 * delay, 60)
else:'Connected to AMQP server on %s'), broker)
break'Connected to AMQP server on %s'),
self.session = self.connection.session()
for consumer in self.consumers.itervalues():
if self.consumers:
consumers = self.consumers
self.consumers = {}
for consumer in consumers.itervalues():
LOG.debug(_("Re-established AMQP queues"))
def ensure(self, error_callback, method, *args, **kwargs):

View File

@ -21,10 +21,10 @@ return keys for direct exchanges, per (approximate) AMQP parlance.
import contextlib
import itertools
import json
import logging
from moniker.openstack.common import cfg
from moniker.openstack.common.gettextutils import _
from moniker.openstack.common import log as logging
matchmaker_opts = [

View File

@ -57,6 +57,11 @@ class Service(service.Service):
self.conn.create_consumer(self.topic, dispatcher, fanout=True)
# Hook to allow the manager to do other initializations after
# the rpc connection is created.
if callable(getattr(self.manager, 'initialize_service_hook', None)):
# Consume from all consumers in a thread

View File

@ -27,7 +27,7 @@ import sys
import time
import eventlet
import greenlet
import extras
import logging as std_logging
from moniker.openstack.common import cfg
@ -36,11 +36,8 @@ from moniker.openstack.common.gettextutils import _
from moniker.openstack.common import log as logging
from moniker.openstack.common import threadgroup
from openstack.common import rpc
except ImportError:
rpc = None
rpc = extras.try_import('moniker.openstack.common.rpc')
LOG = logging.getLogger(__name__)
@ -54,7 +51,7 @@ class Launcher(object):
:returns: None
self._services = []
self._services = threadgroup.ThreadGroup('launcher')
@ -75,8 +72,7 @@ class Launcher(object):
:returns: None
gt = eventlet.spawn(self.run_service, service)
self._services.add_thread(self.run_service, service)
def stop(self):
"""Stop all services which are currently running.
@ -84,8 +80,7 @@ class Launcher(object):
:returns: None
for service in self._services:
def wait(self):
"""Waits until all services have been stopped, and then returns.
@ -93,11 +88,7 @@ class Launcher(object):
:returns: None
for service in self._services:
except greenlet.GreenletExit:
class SignalExit(SystemExit):
@ -132,9 +123,9 @@ class ServiceLauncher(Launcher):
except SystemExit as exc:
status = exc.code
if rpc:
return status
@ -191,7 +182,7 @@ class ProcessLauncher(object):
# Close write to ensure only parent has it open
# Create greenthread to watch for parent to close pipe
# Reseed random number generator
@ -260,10 +251,12 @@ class ProcessLauncher(object):
if os.WIFSIGNALED(status):
sig = os.WTERMSIG(status)'Child %(pid)d killed by signal %(sig)d'), locals())'Child %(pid)d killed by signal %(sig)d'),
dict(pid=pid, sig=sig))
code = os.WEXITSTATUS(status)'Child %(pid)d exited with status %(code)d'), locals())'Child %(pid)s exited with status %(code)d'),
dict(pid=pid, code=code))
if pid not in self.children:
LOG.warning(_('pid %d not in child list'), pid)
@ -275,6 +268,10 @@ class ProcessLauncher(object):
def wait(self):
"""Loop waiting on children to die and respawning as necessary"""
LOG.debug(_('Full set of CONF:'))
CONF.log_opt_values(LOG, std_logging.DEBUG)
while self.running:
wrap = self._wait_child()
if not wrap:
@ -305,8 +302,8 @@ class ProcessLauncher(object):
class Service(object):
"""Service object for binaries running on hosts."""
def __init__(self): = threadgroup.ThreadGroup('service')
def __init__(self, threads=1000): = threadgroup.ThreadGroup('service', threads)
def start(self):

View File

@ -117,6 +117,10 @@ def write_requirements():
def _run_shell_command(cmd):
if == 'nt':
output = subprocess.Popen(["cmd.exe", "/C", cmd],
output = subprocess.Popen(["/bin/sh", "-c", cmd],
out = output.communicate()

View File

@ -18,7 +18,6 @@ from eventlet import greenlet
from eventlet import greenpool
from eventlet import greenthread
from moniker.openstack.common.gettextutils import _
from moniker.openstack.common import log as logging
from moniker.openstack.common import loopingcall
@ -27,19 +26,17 @@ LOG = logging.getLogger(__name__)
def _thread_done(gt, *args, **kwargs):
Callback function to be passed to when we spawn()
Calls the ThreadGroup to notify if.
""" Callback function to be passed to when we spawn()
Calls the :class:`ThreadGroup` to notify if.
class Thread(object):
Wrapper around a greenthread, that holds a reference to
the ThreadGroup. The Thread will notify the ThreadGroup
when it has done so it can be removed from the threads
""" Wrapper around a greenthread, that holds a reference to the
:class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
it has done so it can be removed from the threads list.
def __init__(self, name, thread, group): = name
@ -47,18 +44,18 @@ class Thread(object):, group=group, thread=self)
def stop(self):
def wait(self):
return self.thread.wait()
class ThreadGroup():
The point of this class is to:
- keep track of timers and greenthreads (making it easier to stop them
class ThreadGroup(object):
""" The point of the ThreadGroup classis to:
* keep track of timers and greenthreads (making it easier to stop them
when need be).
- provide an easy API to add timers.
* provide an easy API to add timers.
def __init__(self, name, thread_pool_size=10): = name

View File

@ -87,6 +87,9 @@ def utcnow_ts():
def utcnow():
"""Overridable version of utils.utcnow."""
if utcnow.override_time:
return utcnow.override_time.pop(0)
except AttributeError:
return utcnow.override_time
return datetime.datetime.utcnow()
@ -95,13 +98,20 @@ utcnow.override_time = None
def set_time_override(override_time=datetime.datetime.utcnow()):
"""Override utils.utcnow to return a constant time."""
Override utils.utcnow to return a constant time or a list thereof,
one at a time.
utcnow.override_time = override_time
def advance_time_delta(timedelta):
"""Advance overridden time using a datetime.timedelta."""
assert(not utcnow.override_time is None)
for dt in utcnow.override_time:
dt += timedelta
except TypeError:
utcnow.override_time += timedelta
@ -135,3 +145,16 @@ def unmarshall_time(tyme):
def delta_seconds(before, after):
Compute the difference in seconds between two date, time, or
datetime objects (as a float, to microsecond resolution).
delta = after - before
return delta.total_seconds()
except AttributeError:
return ((delta.days * 24 * 3600) + delta.seconds +
float(delta.microseconds) / (10 ** 6))

View File

@ -20,15 +20,6 @@ System-level utilities and helper functions.
import logging
import random
import shlex
from import subprocess
from eventlet import greenthread
from moniker.openstack.common import exception
from moniker.openstack.common.gettextutils import _
LOG = logging.getLogger(__name__)
@ -38,7 +29,9 @@ def int_from_bool_as_string(subject):
Interpret a string as a boolean and return either 1 or 0.
Any string value in:
('True', 'true', 'On', 'on', '1')
is interpreted as a boolean True.
Useful for JSON-decoded stuff and config file parsing
@ -51,7 +44,9 @@ def bool_from_string(subject):
Interpret a string as a boolean.
Any string value in:
('True', 'true', 'On', 'on', 'Yes', 'yes', '1')
is interpreted as a boolean True.
Useful for JSON-decoded stuff and config file parsing
@ -62,79 +57,3 @@ def bool_from_string(subject):
if subject.strip().lower() in ('true', 'on', 'yes', '1'):
return True
return False
def execute(*cmd, **kwargs):
Helper method to execute command with optional retry.
:cmd Passed to subprocess.Popen.
:process_input Send to opened process.
:check_exit_code Defaults to 0. Raise exception.ProcessExecutionError
unless program exits with this code.
:delay_on_retry True | False. Defaults to True. If set to True, wait a
short amount of time before retrying.
:attempts How many times to retry cmd.
:run_as_root True | False. Defaults to False. If set to True,
the command is prefixed by the command specified
in the root_helper kwarg.
:root_helper command to prefix all cmd's with
:raises exception.Error on receiving unknown arguments
:raises exception.ProcessExecutionError
process_input = kwargs.pop('process_input', None)
check_exit_code = kwargs.pop('check_exit_code', 0)
delay_on_retry = kwargs.pop('delay_on_retry', True)
attempts = kwargs.pop('attempts', 1)
run_as_root = kwargs.pop('run_as_root', False)
root_helper = kwargs.pop('root_helper', '')
if len(kwargs):
raise exception.Error(_('Got unknown keyword args '
'to utils.execute: %r') % kwargs)
if run_as_root:
cmd = shlex.split(root_helper) + list(cmd)
cmd = map(str, cmd)
while attempts > 0:
attempts -= 1
LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd))
_PIPE = subprocess.PIPE # pylint: disable=E1101
obj = subprocess.Popen(cmd,
result = None
if process_input is not None:
result = obj.communicate(process_input)
result = obj.communicate()
obj.stdin.close() # pylint: disable=E1101
_returncode = obj.returncode # pylint: disable=E1101
if _returncode:
LOG.debug(_('Result was %s') % _returncode)
if (isinstance(check_exit_code, int) and
not isinstance(check_exit_code, bool) and
_returncode != check_exit_code):
(stdout, stderr) = result
raise exception.ProcessExecutionError(
cmd=' '.join(cmd))
return result
except exception.ProcessExecutionError:
if not attempts:
LOG.debug(_('%r failed. Retrying.'), cmd)
if delay_on_retry:
greenthread.sleep(random.randint(20, 200) / 100.0)
# NOTE(termie): this appears to be necessary to let the subprocess
# call clean something up in between calls, without
# it two execute calls in a row hangs the second one

View File

@ -0,0 +1,39 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 Intel Corporation.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
UUID related utilities and helper functions.
import uuid
def generate_uuid():
return str(uuid.uuid4())
def is_uuid_like(val):
"""Returns validation of a value as a UUID.
For our purposes, a UUID is a canonical form string:
return str(uuid.UUID(val)) == val
except (TypeError, ValueError, AttributeError):
return False

View File

@ -56,8 +56,7 @@ class Service(service.Service):
def __init__(self, threads=1000):
super(Service, self).__init__()
self.pool = eventlet.GreenPool(threads)
super(Service, self).__init__(threads)
def start(self, application, port, host='', backlog=128):
"""Start serving this service using the provided server instance.
@ -67,7 +66,16 @@ class Service(service.Service):
super(Service, self).start()
socket = eventlet.listen((host, port), backlog=backlog)
self.pool.spawn_n(self._run, application, socket)
(self._host, self._port) = socket.getsockname(), application, socket)
def host(self):
return self._host
def port(self):
return self._port
def stop(self):
"""Stop serving this API.
@ -77,18 +85,10 @@ class Service(service.Service):
super(Service, self).stop()
def wait(self):
"""Wait until all servers have completed running."""
super(Service, self).wait()
except KeyboardInterrupt:
def _run(self, application, socket):
"""Start a WSGI server in a new green thread."""
logger = logging.getLogger('eventlet.wsgi.server')
eventlet.wsgi.server(socket, application, custom_pool=self.pool,
eventlet.wsgi.server(socket, application,,
@ -568,9 +568,9 @@ class RequestDeserializer(object):
"""Extract necessary pieces of the request.
:param request: Request object
:returns tuple of expected controller action name, dictionary of
:returns: tuple of (expected controller action name, dictionary of
keyword arguments to pass to the controller, the expected
content type of the response
content type of the response)
action_args = self.get_action_args(request.environ)

View File

@ -1,3 +1,3 @@

View File

@ -11,3 +11,4 @@ cliff