Sync with Oslo 30a50c8a

Change-Id: I2614b234d0d59b698ddefbeb87541867843848b2
This commit is contained in:
Kiall Mac Innes 2013-01-28 23:42:27 +00:00
parent 00ea223c6e
commit 512f379cda
19 changed files with 408 additions and 424 deletions

View File

@ -217,7 +217,7 @@ log files::
...
]
This module also contains a global instance of the CommonConfigOpts class
This module also contains a global instance of the ConfigOpts class
in order to support a common usage pattern in OpenStack::
from moniker.openstack.common import cfg
@ -236,10 +236,11 @@ in order to support a common usage pattern in OpenStack::
Positional command line arguments are supported via a 'positional' Opt
constructor argument::
>>> CONF.register_cli_opt(MultiStrOpt('bar', positional=True))
>>> conf = ConfigOpts()
>>> conf.register_cli_opt(MultiStrOpt('bar', positional=True))
True
>>> CONF(['a', 'b'])
>>> CONF.bar
>>> conf(['a', 'b'])
>>> conf.bar
['a', 'b']
It is also possible to use argparse "sub-parsers" to parse additional
@ -249,10 +250,11 @@ command line arguments using the SubCommandOpt class:
... list_action = subparsers.add_parser('list')
... list_action.add_argument('id')
...
>>> CONF.register_cli_opt(SubCommandOpt('action', handler=add_parsers))
>>> conf = ConfigOpts()
>>> conf.register_cli_opt(SubCommandOpt('action', handler=add_parsers))
True
>>> CONF(['list', '10'])
>>> CONF.action.name, CONF.action.id
>>> conf(args=['list', '10'])
>>> conf.action.name, conf.action.id
('list', '10')
"""
@ -861,7 +863,7 @@ class SubCommandOpt(Opt):
description=self.description,
help=self.help)
if not self.handler is None:
if self.handler is not None:
self.handler(subparsers)
@ -1545,8 +1547,8 @@ class ConfigOpts(collections.Mapping):
group = group_or_name if isinstance(group_or_name, OptGroup) else None
group_name = group.name if group else group_or_name
if not group_name in self._groups:
if not group is None or not autocreate:
if group_name not in self._groups:
if group is not None or not autocreate:
raise NoSuchGroupError(group_name)
self.register_group(OptGroup(name=group_name))
@ -1566,7 +1568,7 @@ class ConfigOpts(collections.Mapping):
group = self._get_group(group)
opts = group._opts
if not opt_name in opts:
if opt_name not in opts:
raise NoSuchOptError(opt_name, group)
return opts[opt_name]
@ -1604,7 +1606,7 @@ class ConfigOpts(collections.Mapping):
opt = info['opt']
if opt.required:
if ('default' in info or 'override' in info):
if 'default' in info or 'override' in info:
continue
if self._get(opt.dest, group) is None:
@ -1726,62 +1728,4 @@ class ConfigOpts(collections.Mapping):
return value
class CommonConfigOpts(ConfigOpts):
DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
common_cli_opts = [
BoolOpt('debug',
short='d',
default=False,
help='Print debugging output'),
BoolOpt('verbose',
short='v',
default=False,
help='Print more verbose output'),
]
logging_cli_opts = [
StrOpt('log-config',
metavar='PATH',
help='If this option is specified, the logging configuration '
'file specified is used and overrides any other logging '
'options specified. Please see the Python logging module '
'documentation for details on logging configuration '
'files.'),
StrOpt('log-format',
default=DEFAULT_LOG_FORMAT,
metavar='FORMAT',
help='A logging.Formatter log message format string which may '
'use any of the available logging.LogRecord attributes. '
'Default: %(default)s'),
StrOpt('log-date-format',
default=DEFAULT_LOG_DATE_FORMAT,
metavar='DATE_FORMAT',
help='Format string for %%(asctime)s in log records. '
'Default: %(default)s'),
StrOpt('log-file',
metavar='PATH',
deprecated_name='logfile',
help='(Optional) Name of log file to output to. '
'If not set, logging will go to stdout.'),
StrOpt('log-dir',
deprecated_name='logdir',
help='(Optional) The directory to keep log files in '
'(will be prepended to --log-file)'),
BoolOpt('use-syslog',
default=False,
help='Use syslog for logging.'),
StrOpt('syslog-log-facility',
default='LOG_USER',
help='syslog facility to receive log lines')
]
def __init__(self):
super(CommonConfigOpts, self).__init__()
self.register_cli_opts(self.common_cli_opts)
self.register_cli_opts(self.logging_cli_opts)
CONF = CommonConfigOpts()
CONF = ConfigOpts()

View File

@ -57,3 +57,11 @@ def import_module(import_str):
"""Import a module."""
__import__(import_str)
return sys.modules[import_str]
def try_import(import_str, default=None):
"""Try to import a module and if it fails return default."""
try:
return import_module(import_str)
except ImportError:
return default

View File

@ -54,7 +54,7 @@ class BaseParser(object):
value = value.strip()
if ((value and value[0] == value[-1]) and
(value[0] == "\"" or value[0] == "'")):
(value[0] == "\"" or value[0] == "'")):
value = value[1:-1]
return key.strip(), [value]

View File

@ -47,21 +47,82 @@ from moniker.openstack.common import local
from moniker.openstack.common import notifier
_DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
common_cli_opts = [
cfg.BoolOpt('debug',
short='d',
default=False,
help='Print debugging output (set logging level to '
'DEBUG instead of default WARNING level).'),
cfg.BoolOpt('verbose',
short='v',
default=False,
help='Print more verbose output (set logging level to '
'INFO instead of default WARNING level).'),
]
logging_cli_opts = [
cfg.StrOpt('log-config',
metavar='PATH',
help='If this option is specified, the logging configuration '
'file specified is used and overrides any other logging '
'options specified. Please see the Python logging module '
'documentation for details on logging configuration '
'files.'),
cfg.StrOpt('log-format',
default=_DEFAULT_LOG_FORMAT,
metavar='FORMAT',
help='A logging.Formatter log message format string which may '
'use any of the available logging.LogRecord attributes. '
'Default: %(default)s'),
cfg.StrOpt('log-date-format',
default=_DEFAULT_LOG_DATE_FORMAT,
metavar='DATE_FORMAT',
help='Format string for %%(asctime)s in log records. '
'Default: %(default)s'),
cfg.StrOpt('log-file',
metavar='PATH',
deprecated_name='logfile',
help='(Optional) Name of log file to output to. '
'If not set, logging will go to stdout.'),
cfg.StrOpt('log-dir',
deprecated_name='logdir',
help='(Optional) The directory to keep log files in '
'(will be prepended to --log-file)'),
cfg.BoolOpt('use-syslog',
default=False,
help='Use syslog for logging.'),
cfg.StrOpt('syslog-log-facility',
default='LOG_USER',
help='syslog facility to receive log lines')
]
generic_log_opts = [
cfg.BoolOpt('use_stderr',
default=True,
help='Log output to standard error'),
cfg.StrOpt('logfile_mode',
default='0644',
help='Default file mode used when creating log files'),
]
log_opts = [
cfg.StrOpt('logging_context_format_string',
default='%(asctime)s.%(msecs)d %(levelname)s %(name)s '
default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s '
'[%(request_id)s %(user)s %(tenant)s] %(instance)s'
'%(message)s',
help='format string to use for log messages with context'),
cfg.StrOpt('logging_default_format_string',
default='%(asctime)s.%(msecs)d %(process)d %(levelname)s '
default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
'%(name)s [-] %(instance)s%(message)s',
help='format string to use for log messages without context'),
cfg.StrOpt('logging_debug_format_suffix',
default='%(funcName)s %(pathname)s:%(lineno)d',
help='data to append to log format when level is DEBUG'),
cfg.StrOpt('logging_exception_prefix',
default='%(asctime)s.%(msecs)d %(process)d TRACE %(name)s '
default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s '
'%(instance)s',
help='prefix each line of exception output with this format'),
cfg.ListOpt('default_log_levels',
@ -94,24 +155,9 @@ log_opts = [
'format it like this'),
]
generic_log_opts = [
cfg.StrOpt('logdir',
default=None,
help='Log output to a per-service log file in named directory'),
cfg.StrOpt('logfile',
default=None,
help='Log output to a named file'),
cfg.BoolOpt('use_stderr',
default=True,
help='Log output to standard error'),
cfg.StrOpt('logfile_mode',
default='0644',
help='Default file mode used when creating log files'),
]
CONF = cfg.CONF
CONF.register_cli_opts(common_cli_opts)
CONF.register_cli_opts(logging_cli_opts)
CONF.register_opts(generic_log_opts)
CONF.register_opts(log_opts)
@ -149,8 +195,8 @@ def _get_binary_name():
def _get_log_file_path(binary=None):
logfile = CONF.log_file or CONF.logfile
logdir = CONF.log_dir or CONF.logdir
logfile = CONF.log_file
logdir = CONF.log_dir
if logfile and not logdir:
return logfile
@ -259,7 +305,7 @@ class JSONFormatter(logging.Formatter):
class PublishErrorsHandler(logging.Handler):
def emit(self, record):
if ('moniker.openstack.common.notifier.log_notifier' in
CONF.notification_driver):
CONF.notification_driver):
return
notifier.api.notify(None, 'error.publisher',
'error_notification',
@ -361,10 +407,12 @@ def _setup_logging_from_conf(product_name):
datefmt=datefmt))
handler.setFormatter(LegacyFormatter(datefmt=datefmt))
if CONF.verbose or CONF.debug:
if CONF.debug:
log_root.setLevel(logging.DEBUG)
else:
elif CONF.verbose:
log_root.setLevel(logging.INFO)
else:
log_root.setLevel(logging.WARNING)
level = logging.NOTSET
for pair in CONF.default_log_levels:
@ -425,7 +473,7 @@ class LegacyFormatter(logging.Formatter):
self._fmt = CONF.logging_default_format_string
if (record.levelno == logging.DEBUG and
CONF.logging_debug_format_suffix):
CONF.logging_debug_format_suffix):
self._fmt += " " + CONF.logging_debug_format_suffix
# Cache this on the record, Logger will respect our formated copy

View File

@ -574,19 +574,19 @@ class ParseState(object):
for reduction, methname in self.reducers:
if (len(self.tokens) >= len(reduction) and
self.tokens[-len(reduction):] == reduction):
# Get the reduction method
meth = getattr(self, methname)
self.tokens[-len(reduction):] == reduction):
# Get the reduction method
meth = getattr(self, methname)
# Reduce the token stream
results = meth(*self.values[-len(reduction):])
# Reduce the token stream
results = meth(*self.values[-len(reduction):])
# Update the tokens and values
self.tokens[-len(reduction):] = [r[0] for r in results]
self.values[-len(reduction):] = [r[1] for r in results]
# Update the tokens and values
self.tokens[-len(reduction):] = [r[0] for r in results]
self.values[-len(reduction):] = [r[1] for r in results]
# Check for any more reductions
return self.reduce()
# Check for any more reductions
return self.reduce()
def shift(self, tok, value):
"""Adds one more token to the state. Calls reduce()."""

View File

@ -371,7 +371,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, rpc_common.serialize_msg(msg))
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg

View File

@ -170,7 +170,7 @@ def cast(conf, context, topic, msg):
pass
def notify(conf, context, topic, msg):
def notify(conf, context, topic, msg, envelope):
check_serialize(msg)

View File

@ -302,9 +302,15 @@ class Publisher(object):
channel=channel,
routing_key=self.routing_key)
def send(self, msg):
def send(self, msg, timeout=None):
"""Send a message"""
self.producer.publish(msg)
if timeout:
#
# AMQP TTL is in milliseconds when set in the header.
#
self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
else:
self.producer.publish(msg)
class DirectPublisher(Publisher):
@ -653,7 +659,7 @@ class Connection(object):
for proxy_cb in self.proxy_callbacks:
proxy_cb.wait()
def publisher_send(self, cls, topic, msg, **kwargs):
def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
"""Send to a publisher based on the publisher class"""
def _error_callback(exc):
@ -663,7 +669,7 @@ class Connection(object):
def _publish():
publisher = cls(self.conf, self.channel, topic, **kwargs)
publisher.send(msg)
publisher.send(msg, timeout)
self.ensure(_error_callback, _publish)
@ -691,9 +697,9 @@ class Connection(object):
"""Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg)
def topic_send(self, topic, msg):
def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message"""
self.publisher_send(TopicPublisher, topic, msg)
self.publisher_send(TopicPublisher, topic, msg, timeout)
def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""
@ -701,7 +707,7 @@ class Connection(object):
def notify_send(self, topic, msg, **kwargs):
"""Send a notify message on a topic"""
self.publisher_send(NotifyPublisher, topic, msg, **kwargs)
self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
def consume(self, limit=None):
"""Consume from all queues/consumers"""

View File

@ -22,16 +22,18 @@ import uuid
import eventlet
import greenlet
import qpid.messaging
import qpid.messaging.exceptions
from moniker.openstack.common import cfg
from moniker.openstack.common.gettextutils import _
from moniker.openstack.common import importutils
from moniker.openstack.common import jsonutils
from moniker.openstack.common import log as logging
from moniker.openstack.common.rpc import amqp as rpc_amqp
from moniker.openstack.common.rpc import common as rpc_common
qpid_messaging = importutils.try_import("qpid.messaging")
qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
LOG = logging.getLogger(__name__)
qpid_opts = [
@ -275,6 +277,9 @@ class Connection(object):
pool = None
def __init__(self, conf, server_params=None):
if not qpid_messaging:
raise ImportError("Failed to import qpid.messaging")
self.session = None
self.consumers = {}
self.consumer_thread = None
@ -303,7 +308,7 @@ class Connection(object):
def connection_create(self, broker):
# Create the connection - this does not open the connection
self.connection = qpid.messaging.Connection(broker)
self.connection = qpid_messaging.Connection(broker)
# Check if flags are set and if so set them for the connection
# before we call open
@ -328,7 +333,7 @@ class Connection(object):
if self.connection.opened():
try:
self.connection.close()
except qpid.messaging.exceptions.ConnectionError:
except qpid_exceptions.ConnectionError:
pass
attempt = 0
@ -340,7 +345,7 @@ class Connection(object):
try:
self.connection_create(broker)
self.connection.open()
except qpid.messaging.exceptions.ConnectionError, e:
except qpid_exceptions.ConnectionError, e:
msg_dict = dict(e=e, delay=delay)
msg = _("Unable to connect to AMQP server: %(e)s. "
"Sleeping %(delay)s seconds") % msg_dict
@ -367,8 +372,8 @@ class Connection(object):
while True:
try:
return method(*args, **kwargs)
except (qpid.messaging.exceptions.Empty,
qpid.messaging.exceptions.ConnectionError), e:
except (qpid_exceptions.Empty,
qpid_exceptions.ConnectionError), e:
if error_callback:
error_callback(e)
self.reconnect()
@ -408,7 +413,7 @@ class Connection(object):
"""Return an iterator that will consume from all queues/consumers"""
def _error_callback(exc):
if isinstance(exc, qpid.messaging.exceptions.Empty):
if isinstance(exc, qpid_exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
@ -481,9 +486,20 @@ class Connection(object):
"""Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg)
def topic_send(self, topic, msg):
def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message"""
self.publisher_send(TopicPublisher, topic, msg)
#
# We want to create a message with attributes, e.g. a TTL. We
# don't really need to keep 'msg' in its JSON format any longer
# so let's create an actual qpid message here and get some
# value-add on the go.
#
# WARNING: Request timeout happens to be in the same units as
# qpid's TTL (seconds). If this changes in the future, then this
# will need to be altered accordingly.
#
qpid_message = qpid_messaging.Message(content=msg, ttl=timeout)
self.publisher_send(TopicPublisher, topic, qpid_message)
def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import pprint
import socket
import string
@ -22,15 +23,16 @@ import types
import uuid
import eventlet
from eventlet.green import zmq
import greenlet
from moniker.openstack.common import cfg
from moniker.openstack.common.gettextutils import _
from moniker.openstack.common import importutils
from moniker.openstack.common import jsonutils
from moniker.openstack.common import processutils as utils
from moniker.openstack.common.rpc import common as rpc_common
zmq = importutils.try_import('eventlet.green.zmq')
# for convenience, are not modified.
pformat = pprint.pformat
@ -61,6 +63,10 @@ zmq_opts = [
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),
cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
help='Maximum number of ingress messages to locally buffer '
'per topic. Default is unlimited.'),
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
help='Directory for holding IPC sockets'),
@ -70,9 +76,9 @@ zmq_opts = [
]
# These globals are defined in register_opts(conf),
# a mandatory initialization call
CONF = None
CONF = cfg.CONF
CONF.register_opts(zmq_opts)
ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object
@ -107,7 +113,7 @@ class ZmqSocket(object):
"""
def __init__(self, addr, zmq_type, bind=True, subscribe=None):
self.sock = ZMQ_CTX.socket(zmq_type)
self.sock = _get_ctxt().socket(zmq_type)
self.addr = addr
self.type = zmq_type
self.subscriptions = []
@ -181,11 +187,15 @@ class ZmqSocket(object):
pass
self.subscriptions = []
# Linger -1 prevents lost/dropped messages
try:
self.sock.close(linger=-1)
# Default is to linger
self.sock.close()
except Exception:
pass
# While this is a bad thing to happen,
# it would be much worse if some of the code calling this
# were to fail. For now, lets log, and later evaluate
# if we can safely raise here.
LOG.error("ZeroMQ socket could not be closed.")
self.sock = None
def recv(self):
@ -202,7 +212,9 @@ class ZmqSocket(object):
class ZmqClient(object):
"""Client for ZMQ sockets."""
def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
def __init__(self, addr, socket_type=None, bind=False):
if socket_type is None:
socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
@ -420,12 +432,6 @@ class ZmqProxy(ZmqBaseReactor):
super(ZmqProxy, self).__init__(conf)
self.topic_proxy = {}
ipc_dir = CONF.rpc_zmq_ipc_dir
self.topic_proxy['zmq_replies'] = \
ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
zmq.PUB, bind=True)
self.sockets.append(self.topic_proxy['zmq_replies'])
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
@ -450,21 +456,82 @@ class ZmqProxy(ZmqBaseReactor):
else:
sock_type = zmq.PUSH
if not topic in self.topic_proxy:
outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
sock_type, bind=True)
self.topic_proxy[topic] = outq
self.sockets.append(outq)
LOG.info(_("Created topic proxy: %s"), topic)
if topic not in self.topic_proxy:
def publisher(waiter):
LOG.info(_("Creating proxy for topic: %s"), topic)
# It takes some time for a pub socket to open,
# before we can have any faith in doing a send() to it.
if sock_type == zmq.PUB:
eventlet.sleep(.5)
try:
out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
(ipc_dir, topic),
sock_type, bind=True)
except RPCException:
waiter.send_exception(*sys.exc_info())
return
LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
self.topic_proxy[topic].send(data)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
self.topic_proxy[topic] = eventlet.queue.LightQueue(
CONF.rpc_zmq_topic_backlog)
self.sockets.append(out_sock)
# It takes some time for a pub socket to open,
# before we can have any faith in doing a send() to it.
if sock_type == zmq.PUB:
eventlet.sleep(.5)
waiter.send(True)
while(True):
data = self.topic_proxy[topic].get()
out_sock.send(data)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
{'data': data})
wait_sock_creation = eventlet.event.Event()
eventlet.spawn(publisher, wait_sock_creation)
try:
wait_sock_creation.wait()
except RPCException:
LOG.error(_("Topic socket file creation failed."))
return
try:
self.topic_proxy[topic].put_nowait(data)
LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
{'data': data})
except eventlet.queue.Full:
LOG.error(_("Local per-topic backlog buffer full for topic "
"%(topic)s. Dropping message.") % {'topic': topic})
def consume_in_thread(self):
"""Runs the ZmqProxy service"""
ipc_dir = CONF.rpc_zmq_ipc_dir
consume_in = "tcp://%s:%s" % \
(CONF.rpc_zmq_bind_address,
CONF.rpc_zmq_port)
consumption_proxy = InternalContext(None)
if not os.path.isdir(ipc_dir):
try:
utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
ipc_dir, run_as_root=True)
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
except utils.ProcessExecutionError:
LOG.error(_("Could not create IPC directory %s") %
(ipc_dir, ))
raise
try:
self.register(consumption_proxy,
consume_in,
zmq.PULL,
out_bind=True)
except zmq.ZMQError:
LOG.error(_("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use."))
raise
super(ZmqProxy, self).consume_in_thread()
class ZmqReactor(ZmqBaseReactor):
@ -561,7 +628,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn.close()
def _call(addr, context, msg_id, topic, msg, timeout=None):
def _call(addr, context, msg_id, topic, msg, timeout=None,
serialize=True, force_envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
@ -596,7 +664,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
)
LOG.debug(_("Sending cast"))
_cast(addr, context, msg_id, topic, payload)
_cast(addr, context, msg_id, topic, payload,
serialize=serialize, force_envelope=force_envelope)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
@ -632,7 +701,7 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
conf = CONF
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
queues = matchmaker.queues(topic)
queues = _get_matchmaker().queues(topic)
LOG.debug(_("Sending message(s) to: %s"), queues)
# Don't stack if we have no matchmaker results
@ -652,7 +721,8 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
_topic, _topic, msg, timeout, serialize,
force_envelope)
return
return method(_addr, context, _topic, _topic, msg, timeout)
return method(_addr, context, _topic, _topic, msg, timeout,
serialize, force_envelope)
def create_connection(conf, new=True):
@ -699,32 +769,29 @@ def notify(conf, context, topic, msg, **kwargs):
def cleanup():
"""Clean up resources in use by implementation."""
global ZMQ_CTX
global matchmaker
matchmaker = None
ZMQ_CTX.term()
if ZMQ_CTX:
ZMQ_CTX.term()
ZMQ_CTX = None
def register_opts(conf):
"""Registration of options for this driver."""
#NOTE(ewindisch): ZMQ_CTX and matchmaker
# are initialized here as this is as good
# an initialization method as any.
# We memoize through these globals
global ZMQ_CTX
global matchmaker
global CONF
matchmaker = None
if not CONF:
conf.register_opts(zmq_opts)
CONF = conf
# Don't re-set, if this method is called twice.
def _get_ctxt():
if not zmq:
raise ImportError("Failed to import eventlet.green.zmq")
global ZMQ_CTX
if not ZMQ_CTX:
ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)
return ZMQ_CTX
def _get_matchmaker():
global matchmaker
if not matchmaker:
# rpc_zmq_matchmaker should be set to a 'module.Class'
mm_path = conf.rpc_zmq_matchmaker.split('.')
mm_path = CONF.rpc_zmq_matchmaker.split('.')
mm_module = '.'.join(mm_path[:-1])
mm_class = mm_path[-1]
@ -737,6 +804,4 @@ def register_opts(conf):
mm_impl = importutils.import_module(mm_module)
mm_constructor = getattr(mm_impl, mm_class)
matchmaker = mm_constructor()
register_opts(cfg.CONF)
return matchmaker

View File

@ -27,17 +27,17 @@ import sys
import time
import eventlet
import extras
import logging as std_logging
from moniker.openstack.common import cfg
from moniker.openstack.common import eventlet_backdoor
from moniker.openstack.common.gettextutils import _
from moniker.openstack.common import importutils
from moniker.openstack.common import log as logging
from moniker.openstack.common import threadgroup
rpc = extras.try_import('moniker.openstack.common.rpc')
rpc = importutils.try_import('moniker.openstack.common.rpc')
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@ -51,7 +51,7 @@ class Launcher(object):
:returns: None
"""
self._services = threadgroup.ThreadGroup('launcher')
self._services = threadgroup.ThreadGroup()
eventlet_backdoor.initialize_if_enabled()
@staticmethod
@ -310,7 +310,7 @@ class Service(object):
"""Service object for binaries running on hosts."""
def __init__(self, threads=1000):
self.tg = threadgroup.ThreadGroup('service', threads)
self.tg = threadgroup.ThreadGroup(threads)
def start(self):
pass

View File

@ -1,6 +1,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -19,7 +20,7 @@
Utilities with minimum-depends for use in setup.py
"""
import datetime
import email
import os
import re
import subprocess
@ -33,11 +34,12 @@ def parse_mailmap(mailmap='.mailmap'):
if os.path.exists(mailmap):
with open(mailmap, 'r') as fp:
for l in fp:
l = l.strip()
if not l.startswith('#') and ' ' in l:
canonical_email, alias = [x for x in l.split(' ')
if x.startswith('<')]
mapping[alias] = canonical_email
try:
canonical_email, alias = re.match(
r'[^#]*?(<.+>).*(<.+>).*', l).groups()
except AttributeError:
continue
mapping[alias] = canonical_email
return mapping
@ -45,8 +47,8 @@ def canonicalize_emails(changelog, mapping):
"""Takes in a string and an email alias mapping and replaces all
instances of the aliases in the string with their real email.
"""
for alias, email in mapping.iteritems():
changelog = changelog.replace(alias, email)
for alias, email_address in mapping.iteritems():
changelog = changelog.replace(alias, email_address)
return changelog
@ -106,23 +108,17 @@ def parse_dependency_links(requirements_files=['requirements.txt',
return dependency_links
def write_requirements():
venv = os.environ.get('VIRTUAL_ENV', None)
if venv is not None:
with open("requirements.txt", "w") as req_file:
output = subprocess.Popen(["pip", "-E", venv, "freeze", "-l"],
stdout=subprocess.PIPE)
requirements = output.communicate()[0].strip()
req_file.write(requirements)
def _run_shell_command(cmd):
def _run_shell_command(cmd, throw_on_error=False):
if os.name == 'nt':
output = subprocess.Popen(["cmd.exe", "/C", cmd],
stdout=subprocess.PIPE)
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
else:
output = subprocess.Popen(["/bin/sh", "-c", cmd],
stdout=subprocess.PIPE)
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if output.returncode and throw_on_error:
raise Exception("%s returned %d" % cmd, output.returncode)
out = output.communicate()
if len(out) == 0:
return None
@ -131,57 +127,6 @@ def _run_shell_command(cmd):
return out[0].strip()
def _get_git_next_version_suffix(branch_name):
datestamp = datetime.datetime.now().strftime('%Y%m%d')
if branch_name == 'milestone-proposed':
revno_prefix = "r"
else:
revno_prefix = ""
_run_shell_command("git fetch origin +refs/meta/*:refs/remotes/meta/*")
milestone_cmd = "git show meta/openstack/release:%s" % branch_name
milestonever = _run_shell_command(milestone_cmd)
if milestonever:
first_half = "%s~%s" % (milestonever, datestamp)
else:
first_half = datestamp
post_version = _get_git_post_version()
# post version should look like:
# 0.1.1.4.gcc9e28a
# where the bit after the last . is the short sha, and the bit between
# the last and second to last is the revno count
(revno, sha) = post_version.split(".")[-2:]
second_half = "%s%s.%s" % (revno_prefix, revno, sha)
return ".".join((first_half, second_half))
def _get_git_current_tag():
return _run_shell_command("git tag --contains HEAD")
def _get_git_tag_info():
return _run_shell_command("git describe --tags")
def _get_git_post_version():
current_tag = _get_git_current_tag()
if current_tag is not None:
return current_tag
else:
tag_info = _get_git_tag_info()
if tag_info is None:
base_version = "0.0"
cmd = "git --no-pager log --oneline"
out = _run_shell_command(cmd)
revno = len(out.split("\n"))
sha = _run_shell_command("git describe --always")
else:
tag_infos = tag_info.split("-")
base_version = "-".join(tag_infos[:-2])
(revno, sha) = tag_infos[-2:]
return "%s.%s.%s" % (base_version, revno, sha)
def write_git_changelog():
"""Write a changelog based on the git changelog."""
new_changelog = 'ChangeLog'
@ -227,26 +172,6 @@ _rst_template = """%(heading)s
"""
def read_versioninfo(project):
"""Read the versioninfo file. If it doesn't exist, we're in a github
zipball, and there's really no way to know what version we really
are, but that should be ok, because the utility of that should be
just about nil if this code path is in use in the first place."""
versioninfo_path = os.path.join(project, 'versioninfo')
if os.path.exists(versioninfo_path):
with open(versioninfo_path, 'r') as vinfo:
version = vinfo.read().strip()
else:
version = "0.0.0"
return version
def write_versioninfo(project, version):
"""Write a simple file containing the version of the package."""
with open(os.path.join(project, 'versioninfo'), 'w') as fil:
fil.write("%s\n" % version)
def get_cmdclass():
"""Return dict of commands to run from setup.py."""
@ -333,42 +258,78 @@ def get_cmdclass():
return cmdclass
def get_git_branchname():
for branch in _run_shell_command("git branch --color=never").split("\n"):
if branch.startswith('*'):
_branch_name = branch.split()[1].strip()
if _branch_name == "(no":
_branch_name = "no-branch"
return _branch_name
def _get_revno():
"""Return the number of commits since the most recent tag.
We use git-describe to find this out, but if there are no
tags then we fall back to counting commits since the beginning
of time.
"""
describe = _run_shell_command("git describe --always")
if "-" in describe:
return describe.rsplit("-", 2)[-2]
# no tags found
revlist = _run_shell_command("git rev-list --abbrev-commit HEAD")
return len(revlist.splitlines())
def get_pre_version(projectname, base_version):
"""Return a version which is leading up to a version that will
be released in the future."""
if os.path.isdir('.git'):
current_tag = _get_git_current_tag()
if current_tag is not None:
version = current_tag
else:
branch_name = os.getenv('BRANCHNAME',
os.getenv('GERRIT_REFNAME',
get_git_branchname()))
version_suffix = _get_git_next_version_suffix(branch_name)
version = "%s~%s" % (base_version, version_suffix)
write_versioninfo(projectname, version)
return version
else:
version = read_versioninfo(projectname)
return version
def get_post_version(projectname):
def get_version_from_git(pre_version):
"""Return a version which is equal to the tag that's on the current
revision if there is one, or tag plus number of additional revisions
if the current revision has no tag."""
if os.path.isdir('.git'):
version = _get_git_post_version()
write_versioninfo(projectname, version)
if pre_version:
try:
return _run_shell_command(
"git describe --exact-match",
throw_on_error=True).replace('-', '.')
except Exception:
sha = _run_shell_command("git log -n1 --pretty=format:%h")
return "%s.a%s.g%s" % (pre_version, _get_revno(), sha)
else:
return _run_shell_command(
"git describe --always").replace('-', '.')
return None
def get_version_from_pkg_info(package_name):
"""Get the version from PKG-INFO file if we can."""
try:
pkg_info_file = open('PKG-INFO', 'r')
except (IOError, OSError):
return None
try:
pkg_info = email.message_from_file(pkg_info_file)
except email.MessageError:
return None
# Check to make sure we're in our own dir
if pkg_info.get('Name', None) != package_name:
return None
return pkg_info.get('Version', None)
def get_version(package_name, pre_version=None):
"""Get the version of the project. First, try getting it from PKG-INFO, if
it exists. If it does, that means we're in a distribution tarball or that
install has happened. Otherwise, if there is no PKG-INFO file, pull the
version from git.
We do not support setup.py version sanity in git archive tarballs, nor do
we support packagers directly sucking our git repo into theirs. We expect
that a source tarball be made from our git repo - or that if someone wants
to make a source tarball from a fork of our repo with additional tags in it
that they understand and desire the results of doing that.
"""
version = os.environ.get("OSLO_PACKAGE_VERSION", None)
if version:
return version
return read_versioninfo(projectname)
version = get_version_from_pkg_info(package_name)
if version:
return version
version = get_version_from_git(pre_version)
if version:
return version
raise Exception("Versioning for this project requires either an sdist"
" tarball, or access to an upstream git repository.")

View File

@ -38,8 +38,7 @@ class Thread(object):
:class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when
it has done so it can be removed from the threads list.
"""
def __init__(self, name, thread, group):
self.name = name
def __init__(self, thread, group):
self.thread = thread
self.thread.link(_thread_done, group=group, thread=self)
@ -57,8 +56,7 @@ class ThreadGroup(object):
when need be).
* provide an easy API to add timers.
"""
def __init__(self, name, thread_pool_size=10):
self.name = name
def __init__(self, thread_pool_size=10):
self.pool = greenpool.GreenPool(thread_pool_size)
self.threads = []
self.timers = []
@ -72,7 +70,7 @@ class ThreadGroup(object):
def add_thread(self, callback, *args, **kwargs):
gt = self.pool.spawn(callback, *args, **kwargs)
th = Thread(callback.__name__, gt, self)
th = Thread(gt, self)
self.threads.append(th)
def thread_done(self, thread):

View File

@ -98,6 +98,11 @@ def utcnow():
return datetime.datetime.utcnow()
def iso8601_from_timestamp(timestamp):
"""Returns a iso8601 formated date from timestamp"""
return isotime(datetime.datetime.utcfromtimestamp(timestamp))
utcnow.override_time = None
@ -162,3 +167,16 @@ def delta_seconds(before, after):
except AttributeError:
return ((delta.days * 24 * 3600) + delta.seconds +
float(delta.microseconds) / (10 ** 6))
def is_soon(dt, window):
"""
Determines if time is going to happen in the next window seconds.
:params dt: the time
:params window: minimum seconds to remain to consider the time not soon
:return: True if expiration is within the given duration
"""
soon = (utcnow() + datetime.timedelta(seconds=window))
return normalize_time(dt) < soon

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC
# Copyright 2012-2013 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -15,116 +15,56 @@
# under the License.
"""
Utilities for consuming the auto-generated versioninfo files.
Utilities for consuming the version from pkg_resources.
"""
import datetime
import pkg_resources
import setup
class VersionInfo(object):
def __init__(self, package, python_package=None, pre_version=None):
def __init__(self, package):
"""Object that understands versioning for a package
:param package: name of the top level python namespace. For glance,
this would be "glance" for python-glanceclient, it
would be "glanceclient"
:param python_package: optional name of the project name. For
glance this can be left unset. For
python-glanceclient, this would be
"python-glanceclient"
:param pre_version: optional version that the project is working to
:param package: name of the python package, such as glance, or
python-glanceclient
"""
self.package = package
if python_package is None:
self.python_package = package
else:
self.python_package = python_package
self.pre_version = pre_version
self.release = None
self.version = None
self._cached_version = None
def _generate_version(self):
"""Defer to the openstack.common.setup routines for making a
version from git."""
if self.pre_version is None:
return setup.get_post_version(self.package)
else:
return setup.get_pre_version(self.package, self.pre_version)
def _get_version_from_pkg_resources(self):
"""Get the version of the package from the pkg_resources record
associated with the package."""
requirement = pkg_resources.Requirement.parse(self.package)
provider = pkg_resources.get_provider(requirement)
return provider.version
def _newer_version(self, pending_version):
"""Check to see if we're working with a stale version or not.
We expect a version string that either looks like:
2012.2~f3~20120708.10.4426392
which is an unreleased version of a pre-version, or:
0.1.1.4.gcc9e28a
which is an unreleased version of a post-version, or:
0.1.1
Which is a release and which should match tag.
For now, if we have a date-embedded version, check to see if it's
old, and if so re-generate. Otherwise, just deal with it.
"""
try:
version_date = int(self.version.split("~")[-1].split('.')[0])
if version_date < int(datetime.date.today().strftime('%Y%m%d')):
return self._generate_version()
else:
return pending_version
except Exception:
return pending_version
def version_string_with_vcs(self, always=False):
def release_string(self):
"""Return the full version of the package including suffixes indicating
VCS status.
For instance, if we are working towards the 2012.2 release,
canonical_version_string should return 2012.2 if this is a final
release, or else something like 2012.2~f1~20120705.20 if it's not.
:param always: if true, skip all version caching
"""
if always:
self.version = self._generate_version()
if self.release is None:
self.release = self._get_version_from_pkg_resources()
return self.release
def version_string(self):
"""Return the short version minus any alpha/beta tags."""
if self.version is None:
requirement = pkg_resources.Requirement.parse(self.python_package)
versioninfo = "%s/versioninfo" % self.package
try:
raw_version = pkg_resources.resource_string(requirement,
versioninfo)
self.version = self._newer_version(raw_version.strip())
except (IOError, pkg_resources.DistributionNotFound):
self.version = self._generate_version()
parts = []
for part in self.release_string().split('.'):
if part[0].isdigit():
parts.append(part)
else:
break
self.version = ".".join(parts)
return self.version
def canonical_version_string(self, always=False):
"""Return the simple version of the package excluding any suffixes.
For instance, if we are working towards the 2012.2 release,
canonical_version_string should return 2012.2 in all cases.
:param always: if true, skip all version caching
"""
return self.version_string_with_vcs(always).split('~')[0]
def version_string(self, always=False):
"""Return the base version of the package.
For instance, if we are working towards the 2012.2 release,
version_string should return 2012.2 if this is a final release, or
2012.2-dev if it is not.
:param always: if true, skip all version caching
"""
version_parts = self.version_string_with_vcs(always).split('~')
if len(version_parts) == 1:
return version_parts[0]
else:
return '%s-dev' % (version_parts[0],)
# Compatibility functions
canonical_version_string = version_string
version_string_with_vcs = release_string
def cached_version_string(self, prefix=""):
"""Generate an object which will expand in a string context to

View File

@ -257,7 +257,7 @@ class Request(webob.Request):
Does not do any body introspection, only checks header
"""
if not "Content-Type" in self.headers:
if "Content-Type" not in self.headers:
return None
content_type = self.content_type

View File

@ -1,18 +0,0 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from moniker.openstack.common import version as common_version
version_info = common_version.VersionInfo('moniker')

View File

@ -17,7 +17,6 @@
from setuptools import setup, find_packages
import textwrap
from moniker.openstack.common import setup as common_setup
from moniker.version import version_info as version
install_requires = common_setup.parse_requirements(['tools/pip-requires'])
install_options = common_setup.parse_requirements(['tools/pip-options'])
@ -32,7 +31,7 @@ dependency_links = common_setup.parse_dependency_links([
setup(
name='moniker',
version=version.canonical_version_string(always=True),
version=common_setup.get_version('moniker'),
description='DNS as a Service',
author='Kiall Mac Innes',
author_email='kiall@managedit.ie',

View File

@ -1 +0,0 @@
setuptools-git>=0.4