openstack/common : rebase to latest oslo

Rebased to latest oslo, now at 8837013, note a few interfaces
changed requiring a some tweaks to the heat code

Change-Id: I7dfc634b7c459edebca19dee522b396e3736aecc
Signed-off-by: Steven Hardy <shardy@redhat.com>
This commit is contained in:
Steven Hardy 2013-01-18 11:26:01 +00:00
parent 54f1b7b846
commit 55c90321b4
22 changed files with 562 additions and 439 deletions

View File

@ -637,8 +637,8 @@ Commands:
"""
deferred_string = version.deferred_version_string(prefix='%prog ')
oparser = optparse.OptionParser(version=str(deferred_string),
version_string = version.version_string()
oparser = optparse.OptionParser(version=version_string,
usage=usage.strip())
create_options(oparser)
(opts, cmd, args) = parse_options(oparser, sys.argv[1:])

View File

@ -255,8 +255,8 @@ Commands:
metric-put-data Publish data-point for specified metric
"""
deferred_string = version.deferred_version_string(prefix='%prog ')
oparser = optparse.OptionParser(version=str(deferred_string),
version_string = version.version_string()
oparser = optparse.OptionParser(version=version_string,
usage=usage.strip())
create_options(oparser)
(opts, cmd, args) = parse_options(oparser, sys.argv[1:])

View File

@ -28,6 +28,7 @@ from eventlet.green import socket
from heat.common import wsgi
from heat.openstack.common import cfg
from heat.openstack.common import rpc
DEFAULT_PORT = 8000
@ -102,9 +103,6 @@ rpc_opts = [
help='Name of the engine node. '
'This can be an opaque identifier.'
'It is not necessarily a hostname, FQDN, or IP address.'),
cfg.StrOpt('control_exchange',
default='heat',
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
cfg.StrOpt('engine_topic',
default='engine',
help='the topic engine nodes listen on')]
@ -113,6 +111,7 @@ rpc_opts = [
def register_api_opts():
cfg.CONF.register_opts(bind_opts)
cfg.CONF.register_opts(rpc_opts)
rpc.set_defaults(control_exchange='heat')
def register_engine_opts():
@ -120,6 +119,7 @@ def register_engine_opts():
cfg.CONF.register_opts(db_opts)
cfg.CONF.register_opts(service_opts)
cfg.CONF.register_opts(rpc_opts)
rpc.set_defaults(control_exchange='heat')
def setup_logging():

View File

@ -63,20 +63,18 @@ class EngineService(service.Service):
# stg == "Stack Thread Groups"
self.stg = {}
def _start_in_thread(self, stack_id, stack_name, func, *args, **kwargs):
def _start_in_thread(self, stack_id, func, *args, **kwargs):
if stack_id not in self.stg:
thr_name = '%s-%s' % (stack_name, stack_id)
self.stg[stack_id] = threadgroup.ThreadGroup(thr_name)
self.stg[stack_id] = threadgroup.ThreadGroup()
self.stg[stack_id].add_thread(func, *args, **kwargs)
def _timer_in_thread(self, stack_id, stack_name, func, *args, **kwargs):
def _timer_in_thread(self, stack_id, func, *args, **kwargs):
"""
Define a periodic task, to be run in a separate thread, in the stack
threadgroups. Periodicity is cfg.CONF.periodic_interval
"""
if stack_id not in self.stg:
thr_name = '%s-%s' % (stack_name, stack_id)
self.stg[stack_id] = threadgroup.ThreadGroup(thr_name)
self.stg[stack_id] = threadgroup.ThreadGroup()
self.stg[stack_id].add_timer(cfg.CONF.periodic_interval,
func, *args, **kwargs)
@ -102,9 +100,7 @@ class EngineService(service.Service):
admin_context = context.get_admin_context()
stacks = db_api.stack_get_all(admin_context)
for s in stacks:
self._timer_in_thread(s.id, s.name,
self._periodic_watcher_task,
sid=s.id)
self._timer_in_thread(s.id, self._periodic_watcher_task, sid=s.id)
@request_context
def identify_stack(self, context, stack_name):
@ -214,11 +210,10 @@ class EngineService(service.Service):
stack_id = stack.store()
self._start_in_thread(stack_id, stack_name, stack.create)
self._start_in_thread(stack_id, stack.create)
# Schedule a periodic watcher task for this stack
self._timer_in_thread(stack_id, stack_name,
self._periodic_watcher_task,
self._timer_in_thread(stack_id, self._periodic_watcher_task,
sid=stack_id)
return dict(stack.identifier())
@ -257,9 +252,7 @@ class EngineService(service.Service):
if response:
return {'Description': response}
self._start_in_thread(db_stack.id, db_stack.name,
current_stack.update,
updated_stack)
self._start_in_thread(db_stack.id, current_stack.update, updated_stack)
return dict(current_stack.identifier())

View File

@ -236,10 +236,11 @@ in order to support a common usage pattern in OpenStack::
Positional command line arguments are supported via a 'positional' Opt
constructor argument::
>>> CONF.register_cli_opt(MultiStrOpt('bar', positional=True))
>>> conf = ConfigOpts()
>>> conf.register_cli_opt(MultiStrOpt('bar', positional=True))
True
>>> CONF(['a', 'b'])
>>> CONF.bar
>>> conf(['a', 'b'])
>>> conf.bar
['a', 'b']
It is also possible to use argparse "sub-parsers" to parse additional
@ -249,10 +250,11 @@ command line arguments using the SubCommandOpt class:
... list_action = subparsers.add_parser('list')
... list_action.add_argument('id')
...
>>> CONF.register_cli_opt(SubCommandOpt('action', handler=add_parsers))
>>> conf = ConfigOpts()
>>> conf.register_cli_opt(SubCommandOpt('action', handler=add_parsers))
True
>>> CONF(['list', '10'])
>>> CONF.action.name, CONF.action.id
>>> conf(args=['list', '10'])
>>> conf.action.name, conf.action.id
('list', '10')
"""
@ -480,6 +482,13 @@ def _is_opt_registered(opts, opt):
return False
def set_defaults(opts, **kwargs):
for opt in opts:
if opt.dest in kwargs:
opt.default = kwargs[opt.dest]
break
class Opt(object):
"""Base class for all configuration options.
@ -771,7 +780,7 @@ class ListOpt(Opt):
def _get_from_config_parser(self, cparser, section):
"""Retrieve the opt value as a list from ConfigParser."""
return [v.split(',') for v in
return [[a.strip() for a in v.split(',')] for v in
self._cparser_get_with_deprecated(cparser, section)]
def _get_argparse_kwargs(self, group, **kwargs):
@ -1728,11 +1737,13 @@ class CommonConfigOpts(ConfigOpts):
BoolOpt('debug',
short='d',
default=False,
help='Print debugging output'),
help='Print debugging output (set logging level to '
'DEBUG instead of default WARNING level).'),
BoolOpt('verbose',
short='v',
default=False,
help='Print more verbose output'),
help='Print more verbose output (set logging level to '
'INFO instead of default WARNING level).'),
]
logging_cli_opts = [
@ -1756,11 +1767,13 @@ class CommonConfigOpts(ConfigOpts):
'Default: %(default)s'),
StrOpt('log-file',
metavar='PATH',
deprecated_name='logfile',
help='(Optional) Name of log file to output to. '
'If not set, logging will go to stdout.'),
StrOpt('log-dir',
deprecated_name='logdir',
help='(Optional) The directory to keep log files in '
'(will be prepended to --logfile)'),
'(will be prepended to --log-file)'),
BoolOpt('use-syslog',
default=False,
help='Use syslog for logging.'),

View File

@ -49,19 +49,20 @@ from heat.openstack.common import notifier
log_opts = [
cfg.StrOpt('logging_context_format_string',
default='%(asctime)s %(levelname)s %(name)s [%(request_id)s '
'%(user)s %(tenant)s] %(instance)s'
default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s '
'[%(request_id)s %(user)s %(tenant)s] %(instance)s'
'%(message)s',
help='format string to use for log messages with context'),
cfg.StrOpt('logging_default_format_string',
default='%(asctime)s %(process)d %(levelname)s %(name)s [-]'
' %(instance)s%(message)s',
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
'%(name)s [-] %(instance)s%(message)s',
help='format string to use for log messages without context'),
cfg.StrOpt('logging_debug_format_suffix',
default='%(funcName)s %(pathname)s:%(lineno)d',
help='data to append to log format when level is DEBUG'),
cfg.StrOpt('logging_exception_prefix',
default='%(asctime)s %(process)d TRACE %(name)s %(instance)s',
default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s '
'%(instance)s',
help='prefix each line of exception output with this format'),
cfg.ListOpt('default_log_levels',
default=[
@ -289,6 +290,12 @@ def setup(product_name):
_setup_logging_from_conf(product_name)
def set_defaults(logging_context_format_string):
cfg.set_defaults(log_opts,
logging_context_format_string=
logging_context_format_string)
def _find_facility_from_conf():
facility_names = logging.handlers.SysLogHandler.facility_names
facility = getattr(logging.handlers.SysLogHandler,
@ -354,10 +361,12 @@ def _setup_logging_from_conf(product_name):
datefmt=datefmt))
handler.setFormatter(LegacyFormatter(datefmt=datefmt))
if CONF.verbose or CONF.debug:
if CONF.debug:
log_root.setLevel(logging.DEBUG)
else:
elif CONF.verbose:
log_root.setLevel(logging.INFO)
else:
log_root.setLevel(logging.WARNING)
level = logging.NOTSET
for pair in CONF.default_log_levels:

View File

@ -50,25 +50,26 @@ rpc_opts = [
default=['heat.openstack.common.exception',
'nova.exception',
'cinder.exception',
'exceptions',
],
help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'),
cfg.BoolOpt('fake_rabbit',
default=False,
help='If passed, use a fake RabbitMQ provider'),
#
# The following options are not registered here, but are expected to be
# present. The project using this library must register these options with
# the configuration so that project-specific defaults may be defined.
#
#cfg.StrOpt('control_exchange',
# default='nova',
# help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
cfg.StrOpt('control_exchange',
default='openstack',
help='AMQP exchange to connect to if using RabbitMQ or Qpid'),
]
cfg.CONF.register_opts(rpc_opts)
def set_defaults(control_exchange):
cfg.set_defaults(rpc_opts,
control_exchange=control_exchange)
def create_connection(new=True):
"""Create a connection to the message bus used for rpc.
@ -177,17 +178,18 @@ def multicall(context, topic, msg, timeout=None):
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
def notify(context, topic, msg):
def notify(context, topic, msg, envelope=False):
"""Send notification event.
:param context: Information that identifies the user that has made this
request.
:param topic: The topic to send the notification to.
:param msg: This is a dict of content of event.
:param envelope: Set to True to enable message envelope for notifications.
:returns: None
"""
return _get_impl().notify(cfg.CONF, context, topic, msg)
return _get_impl().notify(cfg.CONF, context, topic, msg, envelope)
def cleanup():

View File

@ -33,7 +33,6 @@ from eventlet import greenpool
from eventlet import pools
from eventlet import semaphore
from heat.openstack.common import cfg
from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _
from heat.openstack.common import local
@ -150,7 +149,7 @@ class ConnectionContext(rpc_common.Connection):
def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
ending=False):
ending=False, log_failure=True):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
@ -158,7 +157,8 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
"""
with ConnectionContext(conf, connection_pool) as conn:
if failure:
failure = rpc_common.serialize_remote_exception(failure)
failure = rpc_common.serialize_remote_exception(failure,
log_failure)
try:
msg = {'result': reply, 'failure': failure}
@ -168,7 +168,7 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
'failure': failure}
if ending:
msg['ending'] = True
conn.direct_send(msg_id, msg)
conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
class RpcContext(rpc_common.CommonRpcContext):
@ -185,10 +185,10 @@ class RpcContext(rpc_common.CommonRpcContext):
return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False,
connection_pool=None):
connection_pool=None, log_failure=True):
if self.msg_id:
msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
ending)
ending, log_failure)
if ending:
self.msg_id = None
@ -282,11 +282,21 @@ class ProxyCallback(object):
ctxt.reply(rval, None, connection_pool=self.connection_pool)
# This final None tells multicall that it is done.
ctxt.reply(ending=True, connection_pool=self.connection_pool)
except rpc_common.ClientException as e:
LOG.debug(_('Expected exception during message handling (%s)') %
e._exc_info[1])
ctxt.reply(None, e._exc_info,
connection_pool=self.connection_pool,
log_failure=False)
except Exception:
LOG.exception(_('Exception during message handling'))
ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool)
def wait(self):
"""Wait for all callback threads to exit."""
self.pool.waitall()
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):
@ -349,7 +359,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
# that will continue to use the connection. When it's done,
# connection.close() will get called which will put it back into
# the pool
LOG.debug(_('Making asynchronous call on %s ...'), topic)
LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
@ -358,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, msg)
conn.topic_send(topic, rpc_common.serialize_msg(msg))
return wait_msg
@ -377,7 +387,7 @@ def cast(conf, context, topic, msg, connection_pool):
LOG.debug(_('Making asynchronous cast on %s...'), topic)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, msg)
conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast(conf, context, topic, msg, connection_pool):
@ -385,7 +395,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
LOG.debug(_('Making asynchronous fanout cast...'))
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.fanout_send(topic, msg)
conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
@ -393,7 +403,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
conn.topic_send(topic, msg)
conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast_to_server(conf, context, server_params, topic, msg,
@ -402,16 +412,18 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
conn.fanout_send(topic, msg)
conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def notify(conf, context, topic, msg, connection_pool):
def notify(conf, context, topic, msg, connection_pool, envelope):
"""Sends a notification event on a topic."""
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
dict(event_type=msg.get('event_type'),
topic=topic))
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
if envelope:
msg = rpc_common.serialize_msg(msg, force_envelope=True)
conn.notify_send(topic, msg)
@ -421,7 +433,4 @@ def cleanup(connection_pool):
def get_control_exchange(conf):
try:
return conf.control_exchange
except cfg.NoSuchOptError:
return 'openstack'
return conf.control_exchange

View File

@ -18,8 +18,10 @@
# under the License.
import copy
import sys
import traceback
from heat.openstack.common import cfg
from heat.openstack.common.gettextutils import _
from heat.openstack.common import importutils
from heat.openstack.common import jsonutils
@ -27,9 +29,50 @@ from heat.openstack.common import local
from heat.openstack.common import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
'''RPC Envelope Version.
This version number applies to the top level structure of messages sent out.
It does *not* apply to the message payload, which must be versioned
independently. For example, when using rpc APIs, a version number is applied
for changes to the API being exposed over rpc. This version number is handled
in the rpc proxy and dispatcher modules.
This version number applies to the message envelope that is used in the
serialization done inside the rpc layer. See serialize_msg() and
deserialize_msg().
The current message format (version 2.0) is very simple. It is:
{
'heat.version': <RPC Envelope Version as a String>,
'heat.message': <Application Message Payload, JSON encoded>
}
Message format version '1.0' is just considered to be the messages we sent
without a message envelope.
So, the current message envelope just includes the envelope version. It may
eventually contain additional information, such as a signature for the message
payload.
We will JSON encode the application message payload. The message envelope,
which includes the JSON encoded application message body, will be passed down
to the messaging libraries as a dict.
'''
_RPC_ENVELOPE_VERSION = '2.0'
_VERSION_KEY = 'heat.version'
_MESSAGE_KEY = 'heat.message'
# TODO(russellb) Turn this on after Grizzly.
_SEND_RPC_ENVELOPE = False
class RPCException(Exception):
message = _("An unknown RPC related exception occurred.")
@ -90,6 +133,11 @@ class UnsupportedRpcVersion(RPCException):
"this endpoint.")
class UnsupportedRpcEnvelopeVersion(RPCException):
message = _("Specified RPC envelope version, %(version)s, "
"not supported by this endpoint.")
class Connection(object):
"""A connection, returned by rpc.create_connection().
@ -164,8 +212,12 @@ class Connection(object):
def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging."""
SANITIZE = {'set_admin_password': ('new_pass',),
'run_instance': ('admin_password',), }
SANITIZE = {'set_admin_password': [('args', 'new_pass')],
'run_instance': [('args', 'admin_password')],
'route_message': [('args', 'message', 'args', 'method_info',
'method_kwargs', 'password'),
('args', 'message', 'args', 'method_info',
'method_kwargs', 'admin_password')]}
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
has_context_token = '_context_auth_token' in msg_data
@ -177,14 +229,16 @@ def _safe_log(log_func, msg, msg_data):
msg_data = copy.deepcopy(msg_data)
if has_method:
method = msg_data['method']
if method in SANITIZE:
args_to_sanitize = SANITIZE[method]
for arg in args_to_sanitize:
try:
msg_data['args'][arg] = "<SANITIZED>"
except KeyError:
pass
for arg in SANITIZE.get(msg_data['method'], []):
try:
d = msg_data
for elem in arg[:-1]:
d = d[elem]
d[arg[-1]] = '<SANITIZED>'
except KeyError, e:
LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
{'item': arg,
'err': e})
if has_context_token:
msg_data['_context_auth_token'] = '<SANITIZED>'
@ -195,7 +249,7 @@ def _safe_log(log_func, msg, msg_data):
return log_func(msg, msg_data)
def serialize_remote_exception(failure_info):
def serialize_remote_exception(failure_info, log_failure=True):
"""Prepares exception data to be sent over rpc.
Failure_info should be a sys.exc_info() tuple.
@ -203,8 +257,9 @@ def serialize_remote_exception(failure_info):
"""
tb = traceback.format_exception(*failure_info)
failure = failure_info[1]
LOG.error(_("Returning exception %s to caller"), unicode(failure))
LOG.error(tb)
if log_failure:
LOG.error(_("Returning exception %s to caller"), unicode(failure))
LOG.error(tb)
kwargs = {}
if hasattr(failure, 'kwargs'):
@ -309,3 +364,107 @@ class CommonRpcContext(object):
context.values['read_deleted'] = read_deleted
return context
class ClientException(Exception):
"""This encapsulates some actual exception that is expected to be
hit by an RPC proxy object. Merely instantiating it records the
current exception information, which will be passed back to the
RPC client without exceptional logging."""
def __init__(self):
self._exc_info = sys.exc_info()
def catch_client_exception(exceptions, func, *args, **kwargs):
try:
return func(*args, **kwargs)
except Exception, e:
if type(e) in exceptions:
raise ClientException()
else:
raise
def client_exceptions(*exceptions):
"""Decorator for manager methods that raise expected exceptions.
Marking a Manager method with this decorator allows the declaration
of expected exceptions that the RPC layer should not consider fatal,
and not log as if they were generated in a real error scenario. Note
that this will cause listed exceptions to be wrapped in a
ClientException, which is used internally by the RPC layer."""
def outer(func):
def inner(*args, **kwargs):
return catch_client_exception(exceptions, func, *args, **kwargs)
return inner
return outer
def version_is_compatible(imp_version, version):
"""Determine whether versions are compatible.
:param imp_version: The version implemented
:param version: The version requested by an incoming message.
"""
version_parts = version.split('.')
imp_version_parts = imp_version.split('.')
if int(version_parts[0]) != int(imp_version_parts[0]): # Major
return False
if int(version_parts[1]) > int(imp_version_parts[1]): # Minor
return False
return True
def serialize_msg(raw_msg, force_envelope=False):
if not _SEND_RPC_ENVELOPE and not force_envelope:
return raw_msg
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
# information about this format.
msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
_MESSAGE_KEY: jsonutils.dumps(raw_msg)}
return msg
def deserialize_msg(msg):
# NOTE(russellb): Hang on to your hats, this road is about to
# get a little bumpy.
#
# Robustness Principle:
# "Be strict in what you send, liberal in what you accept."
#
# At this point we have to do a bit of guessing about what it
# is we just received. Here is the set of possibilities:
#
# 1) We received a dict. This could be 2 things:
#
# a) Inspect it to see if it looks like a standard message envelope.
# If so, great!
#
# b) If it doesn't look like a standard message envelope, it could either
# be a notification, or a message from before we added a message
# envelope (referred to as version 1.0).
# Just return the message as-is.
#
# 2) It's any other non-dict type. Just return it and hope for the best.
# This case covers return values from rpc.call() from before message
# envelopes were used. (messages to call a method were always a dict)
if not isinstance(msg, dict):
# See #2 above.
return msg
base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
if not all(map(lambda key: key in msg, base_envelope_keys)):
# See #1.b above.
return msg
# At this point we think we have the message envelope
# format we were expecting. (#1.a above)
if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
return raw_msg

View File

@ -103,21 +103,6 @@ class RpcDispatcher(object):
self.callbacks = callbacks
super(RpcDispatcher, self).__init__()
@staticmethod
def _is_compatible(mversion, version):
"""Determine whether versions are compatible.
:param mversion: The API version implemented by a callback.
:param version: The API version requested by an incoming message.
"""
version_parts = version.split('.')
mversion_parts = mversion.split('.')
if int(version_parts[0]) != int(mversion_parts[0]): # Major
return False
if int(version_parts[1]) > int(mversion_parts[1]): # Minor
return False
return True
def dispatch(self, ctxt, version, method, **kwargs):
"""Dispatch a message based on a requested version.
@ -139,7 +124,8 @@ class RpcDispatcher(object):
rpc_api_version = proxyobj.RPC_API_VERSION
else:
rpc_api_version = '1.0'
is_compatible = self._is_compatible(rpc_api_version, version)
is_compatible = rpc_common.version_is_compatible(rpc_api_version,
version)
had_compatible = had_compatible or is_compatible
if not hasattr(proxyobj, method):
continue

View File

@ -79,6 +79,8 @@ class Consumer(object):
else:
res.append(rval)
done.send(res)
except rpc_common.ClientException as e:
done.send_exception(e._exc_info[1])
except Exception as e:
done.send_exception(e)
@ -165,7 +167,7 @@ def cast(conf, context, topic, msg):
pass
def notify(conf, context, topic, msg):
def notify(conf, context, topic, msg, envelope):
check_serialize(msg)

View File

@ -162,7 +162,8 @@ class ConsumerBase(object):
def _callback(raw_message):
message = self.channel.message_to_python(raw_message)
try:
callback(message.payload)
msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
message.ack()
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
@ -196,7 +197,7 @@ class DirectConsumer(ConsumerBase):
# Default options
options = {'durable': False,
'auto_delete': True,
'exclusive': True}
'exclusive': False}
options.update(kwargs)
exchange = kombu.entity.Exchange(name=msg_id,
type='direct',
@ -269,7 +270,7 @@ class FanoutConsumer(ConsumerBase):
options = {'durable': False,
'queue_arguments': _get_queue_arguments(conf),
'auto_delete': True,
'exclusive': True}
'exclusive': False}
options.update(kwargs)
exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
durable=options['durable'],
@ -316,7 +317,7 @@ class DirectPublisher(Publisher):
options = {'durable': False,
'auto_delete': True,
'exclusive': True}
'exclusive': False}
options.update(kwargs)
super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
type='direct', **options)
@ -350,7 +351,7 @@ class FanoutPublisher(Publisher):
"""
options = {'durable': False,
'auto_delete': True,
'exclusive': True}
'exclusive': False}
options.update(kwargs)
super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
None, type='fanout', **options)
@ -387,6 +388,7 @@ class Connection(object):
def __init__(self, conf, server_params=None):
self.consumers = []
self.consumer_thread = None
self.proxy_callbacks = []
self.conf = conf
self.max_retries = self.conf.rabbit_max_retries
# Try forever?
@ -469,7 +471,7 @@ class Connection(object):
LOG.info(_("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % params)
try:
self.connection.close()
self.connection.release()
except self.connection_errors:
pass
# Setting this in case the next statement fails, though
@ -573,12 +575,14 @@ class Connection(object):
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.connection.release()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again"""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.channel.close()
self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3
@ -644,6 +648,11 @@ class Connection(object):
pass
self.consumer_thread = None
def wait_on_proxy_callbacks(self):
"""Wait for all proxy callback threads to exit."""
for proxy_cb in self.proxy_callbacks:
proxy_cb.wait()
def publisher_send(self, cls, topic, msg, **kwargs):
"""Send to a publisher based on the publisher class"""
@ -719,6 +728,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
if fanout:
self.declare_fanout_consumer(topic, proxy_cb)
@ -730,6 +740,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
self.declare_topic_consumer(topic, proxy_cb, pool_name)
@ -782,11 +793,12 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
rpc_amqp.get_connection_pool(conf, Connection))
def notify(conf, context, topic, msg):
def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(
conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
rpc_amqp.get_connection_pool(conf, Connection),
envelope)
def cleanup():

View File

@ -22,16 +22,18 @@ import uuid
import eventlet
import greenlet
import qpid.messaging
import qpid.messaging.exceptions
from heat.openstack.common import cfg
from heat.openstack.common.gettextutils import _
from heat.openstack.common import importutils
from heat.openstack.common import jsonutils
from heat.openstack.common import log as logging
from heat.openstack.common.rpc import amqp as rpc_amqp
from heat.openstack.common.rpc import common as rpc_common
qpid_messaging = importutils.try_import("qpid.messaging")
qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
LOG = logging.getLogger(__name__)
qpid_opts = [
@ -124,7 +126,8 @@ class ConsumerBase(object):
"""Fetch the message and pass it to the callback object"""
message = self.receiver.fetch()
try:
self.callback(message.content)
msg = rpc_common.deserialize_msg(message.content)
self.callback(msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
finally:
@ -274,11 +277,22 @@ class Connection(object):
pool = None
def __init__(self, conf, server_params=None):
if not qpid_messaging:
raise ImportError("Failed to import qpid.messaging")
self.session = None
self.consumers = {}
self.consumer_thread = None
self.proxy_callbacks = []
self.conf = conf
if server_params and 'hostname' in server_params:
# NOTE(russellb) This enables support for cast_to_server.
server_params['qpid_hosts'] = [
'%s:%d' % (server_params['hostname'],
server_params.get('port', 5672))
]
params = {
'qpid_hosts': self.conf.qpid_hosts,
'username': self.conf.qpid_username,
@ -294,7 +308,7 @@ class Connection(object):
def connection_create(self, broker):
# Create the connection - this does not open the connection
self.connection = qpid.messaging.Connection(broker)
self.connection = qpid_messaging.Connection(broker)
# Check if flags are set and if so set them for the connection
# before we call open
@ -319,7 +333,7 @@ class Connection(object):
if self.connection.opened():
try:
self.connection.close()
except qpid.messaging.exceptions.ConnectionError:
except qpid_exceptions.ConnectionError:
pass
attempt = 0
@ -331,7 +345,7 @@ class Connection(object):
try:
self.connection_create(broker)
self.connection.open()
except qpid.messaging.exceptions.ConnectionError, e:
except qpid_exceptions.ConnectionError, e:
msg_dict = dict(e=e, delay=delay)
msg = _("Unable to connect to AMQP server: %(e)s. "
"Sleeping %(delay)s seconds") % msg_dict
@ -358,8 +372,8 @@ class Connection(object):
while True:
try:
return method(*args, **kwargs)
except (qpid.messaging.exceptions.Empty,
qpid.messaging.exceptions.ConnectionError), e:
except (qpid_exceptions.Empty,
qpid_exceptions.ConnectionError), e:
if error_callback:
error_callback(e)
self.reconnect()
@ -367,12 +381,14 @@ class Connection(object):
def close(self):
"""Close/release this connection"""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.connection.close()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again"""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.session.close()
self.session = self.connection.session()
self.consumers = {}
@ -397,7 +413,7 @@ class Connection(object):
"""Return an iterator that will consume from all queues/consumers"""
def _error_callback(exc):
if isinstance(exc, qpid.messaging.exceptions.Empty):
if isinstance(exc, qpid_exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
@ -427,6 +443,11 @@ class Connection(object):
pass
self.consumer_thread = None
def wait_on_proxy_callbacks(self):
"""Wait for all proxy callback threads to exit."""
for proxy_cb in self.proxy_callbacks:
proxy_cb.wait()
def publisher_send(self, cls, topic, msg):
"""Send to a publisher based on the publisher class"""
@ -502,6 +523,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
if fanout:
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
@ -517,6 +539,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
name=pool_name)
@ -575,10 +598,11 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
rpc_amqp.get_connection_pool(conf, Connection))
def notify(conf, context, topic, msg):
def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))
rpc_amqp.get_connection_pool(conf, Connection),
envelope)
def cleanup():

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import pprint
import socket
import string
@ -22,15 +23,16 @@ import types
import uuid
import eventlet
from eventlet.green import zmq
import greenlet
from heat.openstack.common import cfg
from heat.openstack.common.gettextutils import _
from heat.openstack.common import importutils
from heat.openstack.common import jsonutils
from heat.openstack.common import processutils as utils
from heat.openstack.common.rpc import common as rpc_common
zmq = importutils.try_import('eventlet.green.zmq')
# for convenience, are not modified.
pformat = pprint.pformat
@ -61,6 +63,10 @@ zmq_opts = [
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),
cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
help='Maximum number of ingress messages to locally buffer '
'per topic. Default is unlimited.'),
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
help='Directory for holding IPC sockets'),
@ -70,9 +76,9 @@ zmq_opts = [
]
# These globals are defined in register_opts(conf),
# a mandatory initialization call
CONF = None
CONF = cfg.CONF
CONF.register_opts(zmq_opts)
ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object
@ -107,7 +113,7 @@ class ZmqSocket(object):
"""
def __init__(self, addr, zmq_type, bind=True, subscribe=None):
self.sock = ZMQ_CTX.socket(zmq_type)
self.sock = _get_ctxt().socket(zmq_type)
self.addr = addr
self.type = zmq_type
self.subscriptions = []
@ -181,11 +187,15 @@ class ZmqSocket(object):
pass
self.subscriptions = []
# Linger -1 prevents lost/dropped messages
try:
self.sock.close(linger=-1)
# Default is to linger
self.sock.close()
except Exception:
pass
# While this is a bad thing to happen,
# it would be much worse if some of the code calling this
# were to fail. For now, lets log, and later evaluate
# if we can safely raise here.
LOG.error("ZeroMQ socket could not be closed.")
self.sock = None
def recv(self):
@ -202,10 +212,14 @@ class ZmqSocket(object):
class ZmqClient(object):
"""Client for ZMQ sockets."""
def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
def __init__(self, addr, socket_type=None, bind=False):
if socket_type is None:
socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data):
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
if serialize:
data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send([str(msg_id), str(topic), str('cast'),
_serialize(data)])
@ -250,7 +264,7 @@ class InternalContext(object):
"""Process a curried message and cast the result to topic."""
LOG.debug(_("Running func with context: %s"), ctx.to_dict())
data.setdefault('version', None)
data.setdefault('args', [])
data.setdefault('args', {})
try:
result = proxy.dispatch(
@ -259,7 +273,14 @@ class InternalContext(object):
except greenlet.GreenletExit:
# ignore these since they are just from shutdowns
pass
except rpc_common.ClientException, e:
LOG.debug(_("Expected exception during message handling (%s)") %
e._exc_info[1])
return {'exc':
rpc_common.serialize_remote_exception(e._exc_info,
log_failure=False)}
except Exception:
LOG.error(_("Exception during message handling"))
return {'exc':
rpc_common.serialize_remote_exception(sys.exc_info())}
@ -314,7 +335,7 @@ class ConsumerBase(object):
return
data.setdefault('version', None)
data.setdefault('args', [])
data.setdefault('args', {})
proxy.dispatch(ctx, data['version'],
data['method'], **data['args'])
@ -404,12 +425,6 @@ class ZmqProxy(ZmqBaseReactor):
super(ZmqProxy, self).__init__(conf)
self.topic_proxy = {}
ipc_dir = CONF.rpc_zmq_ipc_dir
self.topic_proxy['zmq_replies'] = \
ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
zmq.PUB, bind=True)
self.sockets.append(self.topic_proxy['zmq_replies'])
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
@ -426,7 +441,7 @@ class ZmqProxy(ZmqBaseReactor):
sock_type = zmq.PUB
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
inside = _deserialize(in_msg)
inside = rpc_common.deserialize_msg(_deserialize(in_msg))
msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response)
@ -435,20 +450,81 @@ class ZmqProxy(ZmqBaseReactor):
sock_type = zmq.PUSH
if not topic in self.topic_proxy:
outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
sock_type, bind=True)
self.topic_proxy[topic] = outq
self.sockets.append(outq)
LOG.info(_("Created topic proxy: %s"), topic)
def publisher(waiter):
LOG.info(_("Creating proxy for topic: %s"), topic)
# It takes some time for a pub socket to open,
# before we can have any faith in doing a send() to it.
if sock_type == zmq.PUB:
eventlet.sleep(.5)
try:
out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
(ipc_dir, topic),
sock_type, bind=True)
except RPCException:
waiter.send_exception(*sys.exc_info())
return
LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
self.topic_proxy[topic].send(data)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
self.topic_proxy[topic] = eventlet.queue.LightQueue(
CONF.rpc_zmq_topic_backlog)
self.sockets.append(out_sock)
# It takes some time for a pub socket to open,
# before we can have any faith in doing a send() to it.
if sock_type == zmq.PUB:
eventlet.sleep(.5)
waiter.send(True)
while(True):
data = self.topic_proxy[topic].get()
out_sock.send(data)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
{'data': data})
wait_sock_creation = eventlet.event.Event()
eventlet.spawn(publisher, wait_sock_creation)
try:
wait_sock_creation.wait()
except RPCException:
LOG.error(_("Topic socket file creation failed."))
return
try:
self.topic_proxy[topic].put_nowait(data)
LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
{'data': data})
except eventlet.queue.Full:
LOG.error(_("Local per-topic backlog buffer full for topic "
"%(topic)s. Dropping message.") % {'topic': topic})
def consume_in_thread(self):
"""Runs the ZmqProxy service"""
ipc_dir = CONF.rpc_zmq_ipc_dir
consume_in = "tcp://%s:%s" % \
(CONF.rpc_zmq_bind_address,
CONF.rpc_zmq_port)
consumption_proxy = InternalContext(None)
if not os.path.isdir(ipc_dir):
try:
utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
ipc_dir, run_as_root=True)
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
except utils.ProcessExecutionError:
LOG.error(_("Could not create IPC directory %s") %
(ipc_dir, ))
raise
try:
self.register(consumption_proxy,
consume_in,
zmq.PULL,
out_bind=True)
except zmq.ZMQError:
LOG.error(_("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use."))
raise
super(ZmqProxy, self).consume_in_thread()
class ZmqReactor(ZmqBaseReactor):
@ -473,7 +549,7 @@ class ZmqReactor(ZmqBaseReactor):
msg_id, topic, style, in_msg = data
ctx, request = _deserialize(in_msg)
ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
ctx = RpcContext.unmarshal(ctx)
proxy = self.proxies[sock]
@ -524,7 +600,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread()
def _cast(addr, context, msg_id, topic, msg, timeout=None):
def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
force_envelope=False):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@ -533,7 +610,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
conn = ZmqClient(addr)
# assumes cast can't return an exception
conn.cast(msg_id, topic, payload)
conn.cast(msg_id, topic, payload, serialize, force_envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@ -541,7 +618,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
conn.close()
def _call(addr, context, msg_id, topic, msg, timeout=None):
def _call(addr, context, msg_id, topic, msg, timeout=None,
serialize=True, force_envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
@ -576,7 +654,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
)
LOG.debug(_("Sending cast"))
_cast(addr, context, msg_id, topic, payload)
_cast(addr, context, msg_id, topic, payload,
serialize=serialize, force_envelope=force_envelope)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
@ -602,7 +681,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
return responses[-1]
def _multi_send(method, context, topic, msg, timeout=None):
def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
force_envelope=False):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@ -611,7 +691,7 @@ def _multi_send(method, context, topic, msg, timeout=None):
conf = CONF
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
queues = matchmaker.queues(topic)
queues = _get_matchmaker().queues(topic)
LOG.debug(_("Sending message(s) to: %s"), queues)
# Don't stack if we have no matchmaker results
@ -628,9 +708,11 @@ def _multi_send(method, context, topic, msg, timeout=None):
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
_topic, _topic, msg, timeout)
_topic, _topic, msg, timeout, serialize,
force_envelope)
return
return method(_addr, context, _topic, _topic, msg, timeout)
return method(_addr, context, _topic, _topic, msg, timeout,
serialize, force_envelope)
def create_connection(conf, new=True):
@ -669,38 +751,37 @@ def notify(conf, context, topic, msg, **kwargs):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
kwargs['serialize'] = kwargs.pop('envelope')
kwargs['force_envelope'] = True
cast(conf, context, topic, msg, **kwargs)
def cleanup():
"""Clean up resources in use by implementation."""
global ZMQ_CTX
global matchmaker
matchmaker = None
ZMQ_CTX.term()
if ZMQ_CTX:
ZMQ_CTX.term()
ZMQ_CTX = None
def register_opts(conf):
"""Registration of options for this driver."""
#NOTE(ewindisch): ZMQ_CTX and matchmaker
# are initialized here as this is as good
# an initialization method as any.
# We memoize through these globals
global ZMQ_CTX
global matchmaker
global CONF
matchmaker = None
if not CONF:
conf.register_opts(zmq_opts)
CONF = conf
# Don't re-set, if this method is called twice.
def _get_ctxt():
if not zmq:
raise ImportError("Failed to import eventlet.green.zmq")
global ZMQ_CTX
if not ZMQ_CTX:
ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
return ZMQ_CTX
def _get_matchmaker():
global matchmaker
if not matchmaker:
# rpc_zmq_matchmaker should be set to a 'module.Class'
mm_path = conf.rpc_zmq_matchmaker.split('.')
mm_path = CONF.rpc_zmq_matchmaker.split('.')
mm_module = '.'.join(mm_path[:-1])
mm_class = mm_path[-1]
@ -713,6 +794,4 @@ def register_opts(conf):
mm_impl = importutils.import_module(mm_module)
mm_constructor = getattr(mm_impl, mm_class)
matchmaker = mm_constructor()
register_opts(cfg.CONF)
return matchmaker

View File

@ -51,7 +51,7 @@ class Launcher(object):
:returns: None
"""
self._services = threadgroup.ThreadGroup('launcher')
self._services = threadgroup.ThreadGroup()
eventlet_backdoor.initialize_if_enabled()
@staticmethod
@ -310,7 +310,7 @@ class Service(object):
"""Service object for binaries running on hosts."""
def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup('service', threads)
self.tg = threadgroup.ThreadGroup(threads)
def start(self):
pass

View File

@ -1,6 +1,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -19,7 +20,7 @@
Utilities with minimum-depends for use in setup.py
"""
import datetime
import email
import os
import re
import subprocess
@ -33,11 +34,12 @@ def parse_mailmap(mailmap='.mailmap'):
if os.path.exists(mailmap):
with open(mailmap, 'r') as fp:
for l in fp:
l = l.strip()
if not l.startswith('#') and ' ' in l:
canonical_email, alias = [x for x in l.split(' ')
if x.startswith('<')]
mapping[alias] = canonical_email
try:
canonical_email, alias = re.match(
r'[^#]*?(<.+>).*(<.+>).*', l).groups()
except AttributeError:
continue
mapping[alias] = canonical_email
return mapping
@ -45,8 +47,8 @@ def canonicalize_emails(changelog, mapping):
"""Takes in a string and an email alias mapping and replaces all
instances of the aliases in the string with their real email.
"""
for alias, email in mapping.iteritems():
changelog = changelog.replace(alias, email)
for alias, email_address in mapping.iteritems():
changelog = changelog.replace(alias, email_address)
return changelog
@ -106,16 +108,6 @@ def parse_dependency_links(requirements_files=['requirements.txt',
return dependency_links
def write_requirements():
venv = os.environ.get('VIRTUAL_ENV', None)
if venv is not None:
with open("requirements.txt", "w") as req_file:
output = subprocess.Popen(["pip", "-E", venv, "freeze", "-l"],
stdout=subprocess.PIPE)
requirements = output.communicate()[0].strip()
req_file.write(requirements)
def _run_shell_command(cmd):
if os.name == 'nt':
output = subprocess.Popen(["cmd.exe", "/C", cmd],
@ -131,57 +123,6 @@ def _run_shell_command(cmd):
return out[0].strip()
def _get_git_next_version_suffix(branch_name):
datestamp = datetime.datetime.now().strftime('%Y%m%d')
if branch_name == 'milestone-proposed':
revno_prefix = "r"
else:
revno_prefix = ""
_run_shell_command("git fetch origin +refs/meta/*:refs/remotes/meta/*")
milestone_cmd = "git show meta/openstack/release:%s" % branch_name
milestonever = _run_shell_command(milestone_cmd)
if milestonever:
first_half = "%s~%s" % (milestonever, datestamp)
else:
first_half = datestamp
post_version = _get_git_post_version()
# post version should look like:
# 0.1.1.4.gcc9e28a
# where the bit after the last . is the short sha, and the bit between
# the last and second to last is the revno count
(revno, sha) = post_version.split(".")[-2:]
second_half = "%s%s.%s" % (revno_prefix, revno, sha)
return ".".join((first_half, second_half))
def _get_git_current_tag():
return _run_shell_command("git tag --contains HEAD")
def _get_git_tag_info():
return _run_shell_command("git describe --tags")
def _get_git_post_version():
current_tag = _get_git_current_tag()
if current_tag is not None:
return current_tag
else:
tag_info = _get_git_tag_info()
if tag_info is None:
base_version = "0.0"
cmd = "git --no-pager log --oneline"
out = _run_shell_command(cmd)
revno = len(out.split("\n"))
sha = _run_shell_command("git describe --always")
else:
tag_infos = tag_info.split("-")
base_version = "-".join(tag_infos[:-2])
(revno, sha) = tag_infos[-2:]
return "%s.%s.%s" % (base_version, revno, sha)
def write_git_changelog():
"""Write a changelog based on the git changelog."""
new_changelog = 'ChangeLog'
@ -227,26 +168,6 @@ _rst_template = """%(heading)s
"""
def read_versioninfo(project):
"""Read the versioninfo file. If it doesn't exist, we're in a github
zipball, and there's really no way to know what version we really
are, but that should be ok, because the utility of that should be
just about nil if this code path is in use in the first place."""
versioninfo_path = os.path.join(project, 'versioninfo')
if os.path.exists(versioninfo_path):
with open(versioninfo_path, 'r') as vinfo:
version = vinfo.read().strip()
else:
version = "0.0.0"
return version
def write_versioninfo(project, version):
"""Write a simple file containing the version of the package."""
with open(os.path.join(project, 'versioninfo'), 'w') as fil:
fil.write("%s\n" % version)
def get_cmdclass():
"""Return dict of commands to run from setup.py."""
@ -276,6 +197,9 @@ def get_cmdclass():
from sphinx.setup_command import BuildDoc
class LocalBuildDoc(BuildDoc):
builders = ['html', 'man']
def generate_autoindex(self):
print "**Autodocumenting from %s" % os.path.abspath(os.curdir)
modules = {}
@ -311,56 +235,69 @@ def get_cmdclass():
if not os.getenv('SPHINX_DEBUG'):
self.generate_autoindex()
for builder in ['html', 'man']:
for builder in self.builders:
self.builder = builder
self.finalize_options()
self.project = self.distribution.get_name()
self.version = self.distribution.get_version()
self.release = self.distribution.get_version()
BuildDoc.run(self)
class LocalBuildLatex(LocalBuildDoc):
builders = ['latex']
cmdclass['build_sphinx'] = LocalBuildDoc
cmdclass['build_sphinx_latex'] = LocalBuildLatex
except ImportError:
pass
return cmdclass
def get_git_branchname():
for branch in _run_shell_command("git branch --color=never").split("\n"):
if branch.startswith('*'):
_branch_name = branch.split()[1].strip()
if _branch_name == "(no":
_branch_name = "no-branch"
return _branch_name
def get_pre_version(projectname, base_version):
"""Return a version which is leading up to a version that will
be released in the future."""
if os.path.isdir('.git'):
current_tag = _get_git_current_tag()
if current_tag is not None:
version = current_tag
else:
branch_name = os.getenv('BRANCHNAME',
os.getenv('GERRIT_REFNAME',
get_git_branchname()))
version_suffix = _get_git_next_version_suffix(branch_name)
version = "%s~%s" % (base_version, version_suffix)
write_versioninfo(projectname, version)
return version
else:
version = read_versioninfo(projectname)
return version
def get_post_version(projectname):
def get_version_from_git():
"""Return a version which is equal to the tag that's on the current
revision if there is one, or tag plus number of additional revisions
if the current revision has no tag."""
if os.path.isdir('.git'):
version = _get_git_post_version()
write_versioninfo(projectname, version)
return _run_shell_command(
"git describe --always").replace('-', '.')
return None
def get_version_from_pkg_info(package_name):
"""Get the version from PKG-INFO file if we can."""
try:
pkg_info_file = open('PKG-INFO', 'r')
except (IOError, OSError):
return None
try:
pkg_info = email.message_from_file(pkg_info_file)
except email.MessageError:
return None
# Check to make sure we're in our own dir
if pkg_info.get('Name', None) != package_name:
return None
return pkg_info.get('Version', None)
def get_version(package_name):
"""Get the version of the project. First, try getting it from PKG-INFO, if
it exists. If it does, that means we're in a distribution tarball or that
install has happened. Otherwise, if there is no PKG-INFO file, pull the
version from git.
We do not support setup.py version sanity in git archive tarballs, nor do
we support packagers directly sucking our git repo into theirs. We expect
that a source tarball be made from our git repo - or that if someone wants
to make a source tarball from a fork of our repo with additional tags in it
that they understand and desire the results of doing that.
"""
version = get_version_from_pkg_info(package_name)
if version:
return version
return read_versioninfo(projectname)
version = get_version_from_git()
if version:
return version
raise Exception("Versioning for this project requires either an sdist"
" tarball, or access to an upstream git repository.")

View File

@ -38,8 +38,7 @@ class Thread(object):
:class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
it has done so it can be removed from the threads list.
"""
def __init__(self, name, thread, group):
self.name = name
def __init__(self, thread, group):
self.thread = thread
self.thread.link(_thread_done, group=group, thread=self)
@ -57,8 +56,7 @@ class ThreadGroup(object):
when need be).
* provide an easy API to add timers.
"""
def __init__(self, name, thread_pool_size=10):
self.name = name
def __init__(self, thread_pool_size=10):
self.pool = greenpool.GreenPool(thread_pool_size)
self.threads = []
self.timers = []
@ -72,7 +70,7 @@ class ThreadGroup(object):
def add_thread(self, callback, *args, **kwargs):
gt = self.pool.spawn(callback, *args, **kwargs)
th = Thread(callback.__name__, gt, self)
th = Thread(gt, self)
self.threads.append(th)
def thread_done(self, thread):

View File

@ -71,11 +71,15 @@ def normalize_time(timestamp):
def is_older_than(before, seconds):
"""Return True if before is older than seconds."""
if isinstance(before, basestring):
before = parse_strtime(before).replace(tzinfo=None)
return utcnow() - before > datetime.timedelta(seconds=seconds)
def is_newer_than(after, seconds):
"""Return True if after is newer than seconds."""
if isinstance(after, basestring):
after = parse_strtime(after).replace(tzinfo=None)
return after - utcnow() > datetime.timedelta(seconds=seconds)

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC
# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -15,154 +15,51 @@
# under the License.
"""
Utilities for consuming the auto-generated versioninfo files.
Utilities for consuming the version from pkg_resources.
"""
import datetime
import pkg_resources
import setup
class _deferred_version_string(str):
"""Internal helper class which provides delayed version calculation."""
def __new__(cls, version_info, prefix):
new_obj = str.__new__(cls, "")
new_obj._version_info = version_info
new_obj._prefix = prefix
new_obj._cached_version = None
return new_obj
def _get_cached_version(self):
if not self._cached_version:
self._cached_version = \
"%s%s" % (self._prefix,
self._version_info.version_string())
return self._cached_version
def __len__(self):
return self._get_cached_version().__len__()
def __contains__(self, item):
return self._get_cached_version().__contains__(item)
def __getslice__(self, i, j):
return self._get_cached_version().__getslice__(i, j)
def __str__(self):
return self._get_cached_version()
def __repr__(self):
return self._get_cached_version()
class VersionInfo(object):
def __init__(self, package, python_package=None, pre_version=None):
def __init__(self, package):
"""Object that understands versioning for a package
:param package: name of the top level python namespace. For glance,
this would be "glance" for python-glanceclient, it
would be "glanceclient"
:param python_package: optional name of the project name. For
glance this can be left unset. For
python-glanceclient, this would be
"python-glanceclient"
:param pre_version: optional version that the project is working to
:param package: name of the python package, such as glance, or
python-glanceclient
"""
self.package = package
if python_package is None:
self.python_package = package
else:
self.python_package = python_package
self.pre_version = pre_version
self.version = None
self._cached_version = None
def _generate_version(self):
"""Defer to the openstack.common.setup routines for making a
version from git."""
if self.pre_version is None:
return setup.get_post_version(self.python_package)
else:
return setup.get_pre_version(self.python_package, self.pre_version)
def _get_version_from_pkg_resources(self):
"""Get the version of the package from the pkg_resources record
associated with the package."""
requirement = pkg_resources.Requirement.parse(self.package)
provider = pkg_resources.get_provider(requirement)
return provider.version
def _newer_version(self, pending_version):
"""Check to see if we're working with a stale version or not.
We expect a version string that either looks like:
2012.2~f3~20120708.10.4426392
which is an unreleased version of a pre-version, or:
0.1.1.4.gcc9e28a
which is an unreleased version of a post-version, or:
0.1.1
Which is a release and which should match tag.
For now, if we have a date-embedded version, check to see if it's
old, and if so re-generate. Otherwise, just deal with it.
"""
try:
version_date = int(self.version.split("~")[-1].split('.')[0])
if version_date < int(datetime.date.today().strftime('%Y%m%d')):
return self._generate_version()
else:
return pending_version
except Exception:
return pending_version
def version_string_with_vcs(self, always=False):
def version_string(self):
"""Return the full version of the package including suffixes indicating
VCS status.
For instance, if we are working towards the 2012.2 release,
canonical_version_string should return 2012.2 if this is a final
release, or else something like 2012.2~f1~20120705.20 if it's not.
:param always: if true, skip all version caching
"""
if always:
self.version = self._generate_version()
if self.version is None:
requirement = pkg_resources.Requirement.parse(self.python_package)
versioninfo = "%s/versioninfo" % self.package
try:
raw_version = pkg_resources.resource_string(requirement,
versioninfo)
self.version = self._newer_version(raw_version.strip())
except (IOError, pkg_resources.DistributionNotFound):
self.version = self._generate_version()
self.version = self._get_version_from_pkg_resources()
return self.version
def canonical_version_string(self, always=False):
"""Return the simple version of the package excluding any suffixes.
# Compatibility functions
canonical_version_string = version_string
version_string_with_vcs = version_string
For instance, if we are working towards the 2012.2 release,
canonical_version_string should return 2012.2 in all cases.
:param always: if true, skip all version caching
"""
return self.version_string_with_vcs(always).split('~')[0]
def version_string(self, always=False):
"""Return the base version of the package.
For instance, if we are working towards the 2012.2 release,
version_string should return 2012.2 if this is a final release, or
2012.2-dev if it is not.
:param always: if true, skip all version caching
"""
version_parts = self.version_string_with_vcs(always).split('~')
if len(version_parts) == 1:
return version_parts[0]
else:
return '%s-dev' % (version_parts[0],)
def deferred_version_string(self, prefix=""):
def cached_version_string(self, prefix=""):
"""Generate an object which will expand in a string context to
the results of version_string(). We do this so that don't
call into pkg_resources every time we start up a program when
passing version information into the CONF constructor, but
rather only do the calculation when and if a version is requested
"""
return _deferred_version_string(self, prefix)
if not self._cached_version:
self._cached_version = "%s%s" % (prefix,
self.version_string())
return self._cached_version

View File

@ -172,8 +172,7 @@ class stackServiceCreateUpdateDeleteTest(unittest.TestCase):
stack.validate().AndReturn(None)
self.m.StubOutWithMock(threadgroup, 'ThreadGroup')
name_match = mox.StrContains(stack.name)
threadgroup.ThreadGroup(name_match).AndReturn(DummyThreadGroup())
threadgroup.ThreadGroup().AndReturn(DummyThreadGroup())
self.m.ReplayAll()

View File

@ -16,5 +16,4 @@
from heat.openstack.common import version as common_version
NEXT_VERSION = '2013.1'
version_info = common_version.VersionInfo('heat', pre_version=NEXT_VERSION)
version_info = common_version.VersionInfo('heat')

View File

@ -16,13 +16,14 @@
import setuptools
from heat.openstack.common import setup
from heat.version import version_info as version
requires = setup.parse_requirements()
project = 'heat'
setuptools.setup(
name='heat',
version=version.canonical_version_string(always=True),
name=project,
version=setup.get_version(project),
description='The heat project provides services for provisioning '
'virtual machines',
license='Apache License (2.0)',