diff --git a/moniker/openstack/common/cfg.py b/moniker/openstack/common/cfg.py index a35d22c08..3e47c8874 100644 --- a/moniker/openstack/common/cfg.py +++ b/moniker/openstack/common/cfg.py @@ -217,7 +217,7 @@ log files:: ... ] -This module also contains a global instance of the CommonConfigOpts class +This module also contains a global instance of the ConfigOpts class in order to support a common usage pattern in OpenStack:: from moniker.openstack.common import cfg @@ -236,10 +236,11 @@ in order to support a common usage pattern in OpenStack:: Positional command line arguments are supported via a 'positional' Opt constructor argument:: - >>> CONF.register_cli_opt(MultiStrOpt('bar', positional=True)) + >>> conf = ConfigOpts() + >>> conf.register_cli_opt(MultiStrOpt('bar', positional=True)) True - >>> CONF(['a', 'b']) - >>> CONF.bar + >>> conf(['a', 'b']) + >>> conf.bar ['a', 'b'] It is also possible to use argparse "sub-parsers" to parse additional @@ -249,10 +250,11 @@ command line arguments using the SubCommandOpt class: ... list_action = subparsers.add_parser('list') ... list_action.add_argument('id') ... - >>> CONF.register_cli_opt(SubCommandOpt('action', handler=add_parsers)) + >>> conf = ConfigOpts() + >>> conf.register_cli_opt(SubCommandOpt('action', handler=add_parsers)) True - >>> CONF(['list', '10']) - >>> CONF.action.name, CONF.action.id + >>> conf(args=['list', '10']) + >>> conf.action.name, conf.action.id ('list', '10') """ @@ -861,7 +863,7 @@ class SubCommandOpt(Opt): description=self.description, help=self.help) - if not self.handler is None: + if self.handler is not None: self.handler(subparsers) @@ -1545,8 +1547,8 @@ class ConfigOpts(collections.Mapping): group = group_or_name if isinstance(group_or_name, OptGroup) else None group_name = group.name if group else group_or_name - if not group_name in self._groups: - if not group is None or not autocreate: + if group_name not in self._groups: + if group is not None or not autocreate: raise NoSuchGroupError(group_name) self.register_group(OptGroup(name=group_name)) @@ -1566,7 +1568,7 @@ class ConfigOpts(collections.Mapping): group = self._get_group(group) opts = group._opts - if not opt_name in opts: + if opt_name not in opts: raise NoSuchOptError(opt_name, group) return opts[opt_name] @@ -1604,7 +1606,7 @@ class ConfigOpts(collections.Mapping): opt = info['opt'] if opt.required: - if ('default' in info or 'override' in info): + if 'default' in info or 'override' in info: continue if self._get(opt.dest, group) is None: @@ -1726,62 +1728,4 @@ class ConfigOpts(collections.Mapping): return value -class CommonConfigOpts(ConfigOpts): - - DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s" - DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" - - common_cli_opts = [ - BoolOpt('debug', - short='d', - default=False, - help='Print debugging output'), - BoolOpt('verbose', - short='v', - default=False, - help='Print more verbose output'), - ] - - logging_cli_opts = [ - StrOpt('log-config', - metavar='PATH', - help='If this option is specified, the logging configuration ' - 'file specified is used and overrides any other logging ' - 'options specified. Please see the Python logging module ' - 'documentation for details on logging configuration ' - 'files.'), - StrOpt('log-format', - default=DEFAULT_LOG_FORMAT, - metavar='FORMAT', - help='A logging.Formatter log message format string which may ' - 'use any of the available logging.LogRecord attributes. ' - 'Default: %(default)s'), - StrOpt('log-date-format', - default=DEFAULT_LOG_DATE_FORMAT, - metavar='DATE_FORMAT', - help='Format string for %%(asctime)s in log records. ' - 'Default: %(default)s'), - StrOpt('log-file', - metavar='PATH', - deprecated_name='logfile', - help='(Optional) Name of log file to output to. ' - 'If not set, logging will go to stdout.'), - StrOpt('log-dir', - deprecated_name='logdir', - help='(Optional) The directory to keep log files in ' - '(will be prepended to --log-file)'), - BoolOpt('use-syslog', - default=False, - help='Use syslog for logging.'), - StrOpt('syslog-log-facility', - default='LOG_USER', - help='syslog facility to receive log lines') - ] - - def __init__(self): - super(CommonConfigOpts, self).__init__() - self.register_cli_opts(self.common_cli_opts) - self.register_cli_opts(self.logging_cli_opts) - - -CONF = CommonConfigOpts() +CONF = ConfigOpts() diff --git a/moniker/openstack/common/importutils.py b/moniker/openstack/common/importutils.py index 2a28b455e..9dec764fb 100644 --- a/moniker/openstack/common/importutils.py +++ b/moniker/openstack/common/importutils.py @@ -57,3 +57,11 @@ def import_module(import_str): """Import a module.""" __import__(import_str) return sys.modules[import_str] + + +def try_import(import_str, default=None): + """Try to import a module and if it fails return default.""" + try: + return import_module(import_str) + except ImportError: + return default diff --git a/moniker/openstack/common/iniparser.py b/moniker/openstack/common/iniparser.py index 241284449..9bf399f0c 100644 --- a/moniker/openstack/common/iniparser.py +++ b/moniker/openstack/common/iniparser.py @@ -54,7 +54,7 @@ class BaseParser(object): value = value.strip() if ((value and value[0] == value[-1]) and - (value[0] == "\"" or value[0] == "'")): + (value[0] == "\"" or value[0] == "'")): value = value[1:-1] return key.strip(), [value] diff --git a/moniker/openstack/common/log.py b/moniker/openstack/common/log.py index b6c559c8a..3c2466833 100644 --- a/moniker/openstack/common/log.py +++ b/moniker/openstack/common/log.py @@ -47,21 +47,82 @@ from moniker.openstack.common import local from moniker.openstack.common import notifier +_DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s" +_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" + +common_cli_opts = [ + cfg.BoolOpt('debug', + short='d', + default=False, + help='Print debugging output (set logging level to ' + 'DEBUG instead of default WARNING level).'), + cfg.BoolOpt('verbose', + short='v', + default=False, + help='Print more verbose output (set logging level to ' + 'INFO instead of default WARNING level).'), +] + +logging_cli_opts = [ + cfg.StrOpt('log-config', + metavar='PATH', + help='If this option is specified, the logging configuration ' + 'file specified is used and overrides any other logging ' + 'options specified. Please see the Python logging module ' + 'documentation for details on logging configuration ' + 'files.'), + cfg.StrOpt('log-format', + default=_DEFAULT_LOG_FORMAT, + metavar='FORMAT', + help='A logging.Formatter log message format string which may ' + 'use any of the available logging.LogRecord attributes. ' + 'Default: %(default)s'), + cfg.StrOpt('log-date-format', + default=_DEFAULT_LOG_DATE_FORMAT, + metavar='DATE_FORMAT', + help='Format string for %%(asctime)s in log records. ' + 'Default: %(default)s'), + cfg.StrOpt('log-file', + metavar='PATH', + deprecated_name='logfile', + help='(Optional) Name of log file to output to. ' + 'If not set, logging will go to stdout.'), + cfg.StrOpt('log-dir', + deprecated_name='logdir', + help='(Optional) The directory to keep log files in ' + '(will be prepended to --log-file)'), + cfg.BoolOpt('use-syslog', + default=False, + help='Use syslog for logging.'), + cfg.StrOpt('syslog-log-facility', + default='LOG_USER', + help='syslog facility to receive log lines') +] + +generic_log_opts = [ + cfg.BoolOpt('use_stderr', + default=True, + help='Log output to standard error'), + cfg.StrOpt('logfile_mode', + default='0644', + help='Default file mode used when creating log files'), +] + log_opts = [ cfg.StrOpt('logging_context_format_string', - default='%(asctime)s.%(msecs)d %(levelname)s %(name)s ' + default='%(asctime)s.%(msecs)03d %(levelname)s %(name)s ' '[%(request_id)s %(user)s %(tenant)s] %(instance)s' '%(message)s', help='format string to use for log messages with context'), cfg.StrOpt('logging_default_format_string', - default='%(asctime)s.%(msecs)d %(process)d %(levelname)s ' + default='%(asctime)s.%(msecs)03d %(process)d %(levelname)s ' '%(name)s [-] %(instance)s%(message)s', help='format string to use for log messages without context'), cfg.StrOpt('logging_debug_format_suffix', default='%(funcName)s %(pathname)s:%(lineno)d', help='data to append to log format when level is DEBUG'), cfg.StrOpt('logging_exception_prefix', - default='%(asctime)s.%(msecs)d %(process)d TRACE %(name)s ' + default='%(asctime)s.%(msecs)03d %(process)d TRACE %(name)s ' '%(instance)s', help='prefix each line of exception output with this format'), cfg.ListOpt('default_log_levels', @@ -94,24 +155,9 @@ log_opts = [ 'format it like this'), ] - -generic_log_opts = [ - cfg.StrOpt('logdir', - default=None, - help='Log output to a per-service log file in named directory'), - cfg.StrOpt('logfile', - default=None, - help='Log output to a named file'), - cfg.BoolOpt('use_stderr', - default=True, - help='Log output to standard error'), - cfg.StrOpt('logfile_mode', - default='0644', - help='Default file mode used when creating log files'), -] - - CONF = cfg.CONF +CONF.register_cli_opts(common_cli_opts) +CONF.register_cli_opts(logging_cli_opts) CONF.register_opts(generic_log_opts) CONF.register_opts(log_opts) @@ -149,8 +195,8 @@ def _get_binary_name(): def _get_log_file_path(binary=None): - logfile = CONF.log_file or CONF.logfile - logdir = CONF.log_dir or CONF.logdir + logfile = CONF.log_file + logdir = CONF.log_dir if logfile and not logdir: return logfile @@ -259,7 +305,7 @@ class JSONFormatter(logging.Formatter): class PublishErrorsHandler(logging.Handler): def emit(self, record): if ('moniker.openstack.common.notifier.log_notifier' in - CONF.notification_driver): + CONF.notification_driver): return notifier.api.notify(None, 'error.publisher', 'error_notification', @@ -361,10 +407,12 @@ def _setup_logging_from_conf(product_name): datefmt=datefmt)) handler.setFormatter(LegacyFormatter(datefmt=datefmt)) - if CONF.verbose or CONF.debug: + if CONF.debug: log_root.setLevel(logging.DEBUG) - else: + elif CONF.verbose: log_root.setLevel(logging.INFO) + else: + log_root.setLevel(logging.WARNING) level = logging.NOTSET for pair in CONF.default_log_levels: @@ -425,7 +473,7 @@ class LegacyFormatter(logging.Formatter): self._fmt = CONF.logging_default_format_string if (record.levelno == logging.DEBUG and - CONF.logging_debug_format_suffix): + CONF.logging_debug_format_suffix): self._fmt += " " + CONF.logging_debug_format_suffix # Cache this on the record, Logger will respect our formated copy diff --git a/moniker/openstack/common/policy.py b/moniker/openstack/common/policy.py index d8791496b..dd3388882 100644 --- a/moniker/openstack/common/policy.py +++ b/moniker/openstack/common/policy.py @@ -574,19 +574,19 @@ class ParseState(object): for reduction, methname in self.reducers: if (len(self.tokens) >= len(reduction) and - self.tokens[-len(reduction):] == reduction): - # Get the reduction method - meth = getattr(self, methname) + self.tokens[-len(reduction):] == reduction): + # Get the reduction method + meth = getattr(self, methname) - # Reduce the token stream - results = meth(*self.values[-len(reduction):]) + # Reduce the token stream + results = meth(*self.values[-len(reduction):]) - # Update the tokens and values - self.tokens[-len(reduction):] = [r[0] for r in results] - self.values[-len(reduction):] = [r[1] for r in results] + # Update the tokens and values + self.tokens[-len(reduction):] = [r[0] for r in results] + self.values[-len(reduction):] = [r[1] for r in results] - # Check for any more reductions - return self.reduce() + # Check for any more reductions + return self.reduce() def shift(self, tok, value): """Adds one more token to the state. Calls reduce().""" diff --git a/moniker/openstack/common/rpc/amqp.py b/moniker/openstack/common/rpc/amqp.py index 6de6fa368..2f1b19102 100644 --- a/moniker/openstack/common/rpc/amqp.py +++ b/moniker/openstack/common/rpc/amqp.py @@ -371,7 +371,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): conn = ConnectionContext(conf, connection_pool) wait_msg = MulticallWaiter(conf, conn, timeout) conn.declare_direct_consumer(msg_id, wait_msg) - conn.topic_send(topic, rpc_common.serialize_msg(msg)) + conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) return wait_msg diff --git a/moniker/openstack/common/rpc/impl_fake.py b/moniker/openstack/common/rpc/impl_fake.py index 0a90fd049..cdcb832a5 100644 --- a/moniker/openstack/common/rpc/impl_fake.py +++ b/moniker/openstack/common/rpc/impl_fake.py @@ -170,7 +170,7 @@ def cast(conf, context, topic, msg): pass -def notify(conf, context, topic, msg): +def notify(conf, context, topic, msg, envelope): check_serialize(msg) diff --git a/moniker/openstack/common/rpc/impl_kombu.py b/moniker/openstack/common/rpc/impl_kombu.py index cad919c35..5056efb2f 100644 --- a/moniker/openstack/common/rpc/impl_kombu.py +++ b/moniker/openstack/common/rpc/impl_kombu.py @@ -302,9 +302,15 @@ class Publisher(object): channel=channel, routing_key=self.routing_key) - def send(self, msg): + def send(self, msg, timeout=None): """Send a message""" - self.producer.publish(msg) + if timeout: + # + # AMQP TTL is in milliseconds when set in the header. + # + self.producer.publish(msg, headers={'ttl': (timeout * 1000)}) + else: + self.producer.publish(msg) class DirectPublisher(Publisher): @@ -653,7 +659,7 @@ class Connection(object): for proxy_cb in self.proxy_callbacks: proxy_cb.wait() - def publisher_send(self, cls, topic, msg, **kwargs): + def publisher_send(self, cls, topic, msg, timeout=None, **kwargs): """Send to a publisher based on the publisher class""" def _error_callback(exc): @@ -663,7 +669,7 @@ class Connection(object): def _publish(): publisher = cls(self.conf, self.channel, topic, **kwargs) - publisher.send(msg) + publisher.send(msg, timeout) self.ensure(_error_callback, _publish) @@ -691,9 +697,9 @@ class Connection(object): """Send a 'direct' message""" self.publisher_send(DirectPublisher, msg_id, msg) - def topic_send(self, topic, msg): + def topic_send(self, topic, msg, timeout=None): """Send a 'topic' message""" - self.publisher_send(TopicPublisher, topic, msg) + self.publisher_send(TopicPublisher, topic, msg, timeout) def fanout_send(self, topic, msg): """Send a 'fanout' message""" @@ -701,7 +707,7 @@ class Connection(object): def notify_send(self, topic, msg, **kwargs): """Send a notify message on a topic""" - self.publisher_send(NotifyPublisher, topic, msg, **kwargs) + self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs) def consume(self, limit=None): """Consume from all queues/consumers""" diff --git a/moniker/openstack/common/rpc/impl_qpid.py b/moniker/openstack/common/rpc/impl_qpid.py index d74b1c34b..84dfa73e5 100644 --- a/moniker/openstack/common/rpc/impl_qpid.py +++ b/moniker/openstack/common/rpc/impl_qpid.py @@ -22,16 +22,18 @@ import uuid import eventlet import greenlet -import qpid.messaging -import qpid.messaging.exceptions from moniker.openstack.common import cfg from moniker.openstack.common.gettextutils import _ +from moniker.openstack.common import importutils from moniker.openstack.common import jsonutils from moniker.openstack.common import log as logging from moniker.openstack.common.rpc import amqp as rpc_amqp from moniker.openstack.common.rpc import common as rpc_common +qpid_messaging = importutils.try_import("qpid.messaging") +qpid_exceptions = importutils.try_import("qpid.messaging.exceptions") + LOG = logging.getLogger(__name__) qpid_opts = [ @@ -275,6 +277,9 @@ class Connection(object): pool = None def __init__(self, conf, server_params=None): + if not qpid_messaging: + raise ImportError("Failed to import qpid.messaging") + self.session = None self.consumers = {} self.consumer_thread = None @@ -303,7 +308,7 @@ class Connection(object): def connection_create(self, broker): # Create the connection - this does not open the connection - self.connection = qpid.messaging.Connection(broker) + self.connection = qpid_messaging.Connection(broker) # Check if flags are set and if so set them for the connection # before we call open @@ -328,7 +333,7 @@ class Connection(object): if self.connection.opened(): try: self.connection.close() - except qpid.messaging.exceptions.ConnectionError: + except qpid_exceptions.ConnectionError: pass attempt = 0 @@ -340,7 +345,7 @@ class Connection(object): try: self.connection_create(broker) self.connection.open() - except qpid.messaging.exceptions.ConnectionError, e: + except qpid_exceptions.ConnectionError, e: msg_dict = dict(e=e, delay=delay) msg = _("Unable to connect to AMQP server: %(e)s. " "Sleeping %(delay)s seconds") % msg_dict @@ -367,8 +372,8 @@ class Connection(object): while True: try: return method(*args, **kwargs) - except (qpid.messaging.exceptions.Empty, - qpid.messaging.exceptions.ConnectionError), e: + except (qpid_exceptions.Empty, + qpid_exceptions.ConnectionError), e: if error_callback: error_callback(e) self.reconnect() @@ -408,7 +413,7 @@ class Connection(object): """Return an iterator that will consume from all queues/consumers""" def _error_callback(exc): - if isinstance(exc, qpid.messaging.exceptions.Empty): + if isinstance(exc, qpid_exceptions.Empty): LOG.exception(_('Timed out waiting for RPC response: %s') % str(exc)) raise rpc_common.Timeout() @@ -481,9 +486,20 @@ class Connection(object): """Send a 'direct' message""" self.publisher_send(DirectPublisher, msg_id, msg) - def topic_send(self, topic, msg): + def topic_send(self, topic, msg, timeout=None): """Send a 'topic' message""" - self.publisher_send(TopicPublisher, topic, msg) + # + # We want to create a message with attributes, e.g. a TTL. We + # don't really need to keep 'msg' in its JSON format any longer + # so let's create an actual qpid message here and get some + # value-add on the go. + # + # WARNING: Request timeout happens to be in the same units as + # qpid's TTL (seconds). If this changes in the future, then this + # will need to be altered accordingly. + # + qpid_message = qpid_messaging.Message(content=msg, ttl=timeout) + self.publisher_send(TopicPublisher, topic, qpid_message) def fanout_send(self, topic, msg): """Send a 'fanout' message""" diff --git a/moniker/openstack/common/rpc/impl_zmq.py b/moniker/openstack/common/rpc/impl_zmq.py index 853803d99..59859d2ab 100644 --- a/moniker/openstack/common/rpc/impl_zmq.py +++ b/moniker/openstack/common/rpc/impl_zmq.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import os import pprint import socket import string @@ -22,15 +23,16 @@ import types import uuid import eventlet -from eventlet.green import zmq import greenlet from moniker.openstack.common import cfg from moniker.openstack.common.gettextutils import _ from moniker.openstack.common import importutils from moniker.openstack.common import jsonutils +from moniker.openstack.common import processutils as utils from moniker.openstack.common.rpc import common as rpc_common +zmq = importutils.try_import('eventlet.green.zmq') # for convenience, are not modified. pformat = pprint.pformat @@ -61,6 +63,10 @@ zmq_opts = [ cfg.IntOpt('rpc_zmq_contexts', default=1, help='Number of ZeroMQ contexts, defaults to 1'), + cfg.IntOpt('rpc_zmq_topic_backlog', default=None, + help='Maximum number of ingress messages to locally buffer ' + 'per topic. Default is unlimited.'), + cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', help='Directory for holding IPC sockets'), @@ -70,9 +76,9 @@ zmq_opts = [ ] -# These globals are defined in register_opts(conf), -# a mandatory initialization call -CONF = None +CONF = cfg.CONF +CONF.register_opts(zmq_opts) + ZMQ_CTX = None # ZeroMQ Context, must be global. matchmaker = None # memoized matchmaker object @@ -107,7 +113,7 @@ class ZmqSocket(object): """ def __init__(self, addr, zmq_type, bind=True, subscribe=None): - self.sock = ZMQ_CTX.socket(zmq_type) + self.sock = _get_ctxt().socket(zmq_type) self.addr = addr self.type = zmq_type self.subscriptions = [] @@ -181,11 +187,15 @@ class ZmqSocket(object): pass self.subscriptions = [] - # Linger -1 prevents lost/dropped messages try: - self.sock.close(linger=-1) + # Default is to linger + self.sock.close() except Exception: - pass + # While this is a bad thing to happen, + # it would be much worse if some of the code calling this + # were to fail. For now, lets log, and later evaluate + # if we can safely raise here. + LOG.error("ZeroMQ socket could not be closed.") self.sock = None def recv(self): @@ -202,7 +212,9 @@ class ZmqSocket(object): class ZmqClient(object): """Client for ZMQ sockets.""" - def __init__(self, addr, socket_type=zmq.PUSH, bind=False): + def __init__(self, addr, socket_type=None, bind=False): + if socket_type is None: + socket_type = zmq.PUSH self.outq = ZmqSocket(addr, socket_type, bind=bind) def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): @@ -420,12 +432,6 @@ class ZmqProxy(ZmqBaseReactor): super(ZmqProxy, self).__init__(conf) self.topic_proxy = {} - ipc_dir = CONF.rpc_zmq_ipc_dir - - self.topic_proxy['zmq_replies'] = \ - ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ), - zmq.PUB, bind=True) - self.sockets.append(self.topic_proxy['zmq_replies']) def consume(self, sock): ipc_dir = CONF.rpc_zmq_ipc_dir @@ -450,21 +456,82 @@ class ZmqProxy(ZmqBaseReactor): else: sock_type = zmq.PUSH - if not topic in self.topic_proxy: - outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), - sock_type, bind=True) - self.topic_proxy[topic] = outq - self.sockets.append(outq) - LOG.info(_("Created topic proxy: %s"), topic) + if topic not in self.topic_proxy: + def publisher(waiter): + LOG.info(_("Creating proxy for topic: %s"), topic) - # It takes some time for a pub socket to open, - # before we can have any faith in doing a send() to it. - if sock_type == zmq.PUB: - eventlet.sleep(.5) + try: + out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" % + (ipc_dir, topic), + sock_type, bind=True) + except RPCException: + waiter.send_exception(*sys.exc_info()) + return - LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data}) - self.topic_proxy[topic].send(data) - LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data}) + self.topic_proxy[topic] = eventlet.queue.LightQueue( + CONF.rpc_zmq_topic_backlog) + self.sockets.append(out_sock) + + # It takes some time for a pub socket to open, + # before we can have any faith in doing a send() to it. + if sock_type == zmq.PUB: + eventlet.sleep(.5) + + waiter.send(True) + + while(True): + data = self.topic_proxy[topic].get() + out_sock.send(data) + LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % + {'data': data}) + + wait_sock_creation = eventlet.event.Event() + eventlet.spawn(publisher, wait_sock_creation) + + try: + wait_sock_creation.wait() + except RPCException: + LOG.error(_("Topic socket file creation failed.")) + return + + try: + self.topic_proxy[topic].put_nowait(data) + LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") % + {'data': data}) + except eventlet.queue.Full: + LOG.error(_("Local per-topic backlog buffer full for topic " + "%(topic)s. Dropping message.") % {'topic': topic}) + + def consume_in_thread(self): + """Runs the ZmqProxy service""" + ipc_dir = CONF.rpc_zmq_ipc_dir + consume_in = "tcp://%s:%s" % \ + (CONF.rpc_zmq_bind_address, + CONF.rpc_zmq_port) + consumption_proxy = InternalContext(None) + + if not os.path.isdir(ipc_dir): + try: + utils.execute('mkdir', '-p', ipc_dir, run_as_root=True) + utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()), + ipc_dir, run_as_root=True) + utils.execute('chmod', '750', ipc_dir, run_as_root=True) + except utils.ProcessExecutionError: + LOG.error(_("Could not create IPC directory %s") % + (ipc_dir, )) + raise + + try: + self.register(consumption_proxy, + consume_in, + zmq.PULL, + out_bind=True) + except zmq.ZMQError: + LOG.error(_("Could not create ZeroMQ receiver daemon. " + "Socket may already be in use.")) + raise + + super(ZmqProxy, self).consume_in_thread() class ZmqReactor(ZmqBaseReactor): @@ -561,7 +628,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, conn.close() -def _call(addr, context, msg_id, topic, msg, timeout=None): +def _call(addr, context, msg_id, topic, msg, timeout=None, + serialize=True, force_envelope=False): # timeout_response is how long we wait for a response timeout = timeout or CONF.rpc_response_timeout @@ -596,7 +664,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): ) LOG.debug(_("Sending cast")) - _cast(addr, context, msg_id, topic, payload) + _cast(addr, context, msg_id, topic, payload, + serialize=serialize, force_envelope=force_envelope) LOG.debug(_("Cast sent; Waiting reply")) # Blocks until receives reply @@ -632,7 +701,7 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True, conf = CONF LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))}) - queues = matchmaker.queues(topic) + queues = _get_matchmaker().queues(topic) LOG.debug(_("Sending message(s) to: %s"), queues) # Don't stack if we have no matchmaker results @@ -652,7 +721,8 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True, _topic, _topic, msg, timeout, serialize, force_envelope) return - return method(_addr, context, _topic, _topic, msg, timeout) + return method(_addr, context, _topic, _topic, msg, timeout, + serialize, force_envelope) def create_connection(conf, new=True): @@ -699,32 +769,29 @@ def notify(conf, context, topic, msg, **kwargs): def cleanup(): """Clean up resources in use by implementation.""" global ZMQ_CTX - global matchmaker - matchmaker = None - ZMQ_CTX.term() + if ZMQ_CTX: + ZMQ_CTX.term() ZMQ_CTX = None - -def register_opts(conf): - """Registration of options for this driver.""" - #NOTE(ewindisch): ZMQ_CTX and matchmaker - # are initialized here as this is as good - # an initialization method as any. - - # We memoize through these globals - global ZMQ_CTX global matchmaker - global CONF + matchmaker = None - if not CONF: - conf.register_opts(zmq_opts) - CONF = conf - # Don't re-set, if this method is called twice. + +def _get_ctxt(): + if not zmq: + raise ImportError("Failed to import eventlet.green.zmq") + + global ZMQ_CTX if not ZMQ_CTX: - ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts) + ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts) + return ZMQ_CTX + + +def _get_matchmaker(): + global matchmaker if not matchmaker: # rpc_zmq_matchmaker should be set to a 'module.Class' - mm_path = conf.rpc_zmq_matchmaker.split('.') + mm_path = CONF.rpc_zmq_matchmaker.split('.') mm_module = '.'.join(mm_path[:-1]) mm_class = mm_path[-1] @@ -737,6 +804,4 @@ def register_opts(conf): mm_impl = importutils.import_module(mm_module) mm_constructor = getattr(mm_impl, mm_class) matchmaker = mm_constructor() - - -register_opts(cfg.CONF) + return matchmaker diff --git a/moniker/openstack/common/service.py b/moniker/openstack/common/service.py index b695c33a2..bd711df43 100644 --- a/moniker/openstack/common/service.py +++ b/moniker/openstack/common/service.py @@ -27,17 +27,17 @@ import sys import time import eventlet -import extras import logging as std_logging from moniker.openstack.common import cfg from moniker.openstack.common import eventlet_backdoor from moniker.openstack.common.gettextutils import _ +from moniker.openstack.common import importutils from moniker.openstack.common import log as logging from moniker.openstack.common import threadgroup -rpc = extras.try_import('moniker.openstack.common.rpc') +rpc = importutils.try_import('moniker.openstack.common.rpc') CONF = cfg.CONF LOG = logging.getLogger(__name__) @@ -51,7 +51,7 @@ class Launcher(object): :returns: None """ - self._services = threadgroup.ThreadGroup('launcher') + self._services = threadgroup.ThreadGroup() eventlet_backdoor.initialize_if_enabled() @staticmethod @@ -310,7 +310,7 @@ class Service(object): """Service object for binaries running on hosts.""" def __init__(self, threads=1000): - self.tg = threadgroup.ThreadGroup('service', threads) + self.tg = threadgroup.ThreadGroup(threads) def start(self): pass diff --git a/moniker/openstack/common/setup.py b/moniker/openstack/common/setup.py index ec37a7f83..cc8b99e3a 100644 --- a/moniker/openstack/common/setup.py +++ b/moniker/openstack/common/setup.py @@ -1,6 +1,7 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2011 OpenStack LLC. +# Copyright 2012-2013 Hewlett-Packard Development Company, L.P. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -19,7 +20,7 @@ Utilities with minimum-depends for use in setup.py """ -import datetime +import email import os import re import subprocess @@ -33,11 +34,12 @@ def parse_mailmap(mailmap='.mailmap'): if os.path.exists(mailmap): with open(mailmap, 'r') as fp: for l in fp: - l = l.strip() - if not l.startswith('#') and ' ' in l: - canonical_email, alias = [x for x in l.split(' ') - if x.startswith('<')] - mapping[alias] = canonical_email + try: + canonical_email, alias = re.match( + r'[^#]*?(<.+>).*(<.+>).*', l).groups() + except AttributeError: + continue + mapping[alias] = canonical_email return mapping @@ -45,8 +47,8 @@ def canonicalize_emails(changelog, mapping): """Takes in a string and an email alias mapping and replaces all instances of the aliases in the string with their real email. """ - for alias, email in mapping.iteritems(): - changelog = changelog.replace(alias, email) + for alias, email_address in mapping.iteritems(): + changelog = changelog.replace(alias, email_address) return changelog @@ -106,23 +108,17 @@ def parse_dependency_links(requirements_files=['requirements.txt', return dependency_links -def write_requirements(): - venv = os.environ.get('VIRTUAL_ENV', None) - if venv is not None: - with open("requirements.txt", "w") as req_file: - output = subprocess.Popen(["pip", "-E", venv, "freeze", "-l"], - stdout=subprocess.PIPE) - requirements = output.communicate()[0].strip() - req_file.write(requirements) - - -def _run_shell_command(cmd): +def _run_shell_command(cmd, throw_on_error=False): if os.name == 'nt': output = subprocess.Popen(["cmd.exe", "/C", cmd], - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) else: output = subprocess.Popen(["/bin/sh", "-c", cmd], - stdout=subprocess.PIPE) + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + if output.returncode and throw_on_error: + raise Exception("%s returned %d" % cmd, output.returncode) out = output.communicate() if len(out) == 0: return None @@ -131,57 +127,6 @@ def _run_shell_command(cmd): return out[0].strip() -def _get_git_next_version_suffix(branch_name): - datestamp = datetime.datetime.now().strftime('%Y%m%d') - if branch_name == 'milestone-proposed': - revno_prefix = "r" - else: - revno_prefix = "" - _run_shell_command("git fetch origin +refs/meta/*:refs/remotes/meta/*") - milestone_cmd = "git show meta/openstack/release:%s" % branch_name - milestonever = _run_shell_command(milestone_cmd) - if milestonever: - first_half = "%s~%s" % (milestonever, datestamp) - else: - first_half = datestamp - - post_version = _get_git_post_version() - # post version should look like: - # 0.1.1.4.gcc9e28a - # where the bit after the last . is the short sha, and the bit between - # the last and second to last is the revno count - (revno, sha) = post_version.split(".")[-2:] - second_half = "%s%s.%s" % (revno_prefix, revno, sha) - return ".".join((first_half, second_half)) - - -def _get_git_current_tag(): - return _run_shell_command("git tag --contains HEAD") - - -def _get_git_tag_info(): - return _run_shell_command("git describe --tags") - - -def _get_git_post_version(): - current_tag = _get_git_current_tag() - if current_tag is not None: - return current_tag - else: - tag_info = _get_git_tag_info() - if tag_info is None: - base_version = "0.0" - cmd = "git --no-pager log --oneline" - out = _run_shell_command(cmd) - revno = len(out.split("\n")) - sha = _run_shell_command("git describe --always") - else: - tag_infos = tag_info.split("-") - base_version = "-".join(tag_infos[:-2]) - (revno, sha) = tag_infos[-2:] - return "%s.%s.%s" % (base_version, revno, sha) - - def write_git_changelog(): """Write a changelog based on the git changelog.""" new_changelog = 'ChangeLog' @@ -227,26 +172,6 @@ _rst_template = """%(heading)s """ -def read_versioninfo(project): - """Read the versioninfo file. If it doesn't exist, we're in a github - zipball, and there's really no way to know what version we really - are, but that should be ok, because the utility of that should be - just about nil if this code path is in use in the first place.""" - versioninfo_path = os.path.join(project, 'versioninfo') - if os.path.exists(versioninfo_path): - with open(versioninfo_path, 'r') as vinfo: - version = vinfo.read().strip() - else: - version = "0.0.0" - return version - - -def write_versioninfo(project, version): - """Write a simple file containing the version of the package.""" - with open(os.path.join(project, 'versioninfo'), 'w') as fil: - fil.write("%s\n" % version) - - def get_cmdclass(): """Return dict of commands to run from setup.py.""" @@ -333,42 +258,78 @@ def get_cmdclass(): return cmdclass -def get_git_branchname(): - for branch in _run_shell_command("git branch --color=never").split("\n"): - if branch.startswith('*'): - _branch_name = branch.split()[1].strip() - if _branch_name == "(no": - _branch_name = "no-branch" - return _branch_name +def _get_revno(): + """Return the number of commits since the most recent tag. + + We use git-describe to find this out, but if there are no + tags then we fall back to counting commits since the beginning + of time. + """ + describe = _run_shell_command("git describe --always") + if "-" in describe: + return describe.rsplit("-", 2)[-2] + + # no tags found + revlist = _run_shell_command("git rev-list --abbrev-commit HEAD") + return len(revlist.splitlines()) -def get_pre_version(projectname, base_version): - """Return a version which is leading up to a version that will - be released in the future.""" - if os.path.isdir('.git'): - current_tag = _get_git_current_tag() - if current_tag is not None: - version = current_tag - else: - branch_name = os.getenv('BRANCHNAME', - os.getenv('GERRIT_REFNAME', - get_git_branchname())) - version_suffix = _get_git_next_version_suffix(branch_name) - version = "%s~%s" % (base_version, version_suffix) - write_versioninfo(projectname, version) - return version - else: - version = read_versioninfo(projectname) - return version - - -def get_post_version(projectname): +def get_version_from_git(pre_version): """Return a version which is equal to the tag that's on the current revision if there is one, or tag plus number of additional revisions if the current revision has no tag.""" if os.path.isdir('.git'): - version = _get_git_post_version() - write_versioninfo(projectname, version) + if pre_version: + try: + return _run_shell_command( + "git describe --exact-match", + throw_on_error=True).replace('-', '.') + except Exception: + sha = _run_shell_command("git log -n1 --pretty=format:%h") + return "%s.a%s.g%s" % (pre_version, _get_revno(), sha) + else: + return _run_shell_command( + "git describe --always").replace('-', '.') + return None + + +def get_version_from_pkg_info(package_name): + """Get the version from PKG-INFO file if we can.""" + try: + pkg_info_file = open('PKG-INFO', 'r') + except (IOError, OSError): + return None + try: + pkg_info = email.message_from_file(pkg_info_file) + except email.MessageError: + return None + # Check to make sure we're in our own dir + if pkg_info.get('Name', None) != package_name: + return None + return pkg_info.get('Version', None) + + +def get_version(package_name, pre_version=None): + """Get the version of the project. First, try getting it from PKG-INFO, if + it exists. If it does, that means we're in a distribution tarball or that + install has happened. Otherwise, if there is no PKG-INFO file, pull the + version from git. + + We do not support setup.py version sanity in git archive tarballs, nor do + we support packagers directly sucking our git repo into theirs. We expect + that a source tarball be made from our git repo - or that if someone wants + to make a source tarball from a fork of our repo with additional tags in it + that they understand and desire the results of doing that. + """ + version = os.environ.get("OSLO_PACKAGE_VERSION", None) + if version: return version - return read_versioninfo(projectname) + version = get_version_from_pkg_info(package_name) + if version: + return version + version = get_version_from_git(pre_version) + if version: + return version + raise Exception("Versioning for this project requires either an sdist" + " tarball, or access to an upstream git repository.") diff --git a/moniker/openstack/common/threadgroup.py b/moniker/openstack/common/threadgroup.py index eaa087da0..2a0e87917 100644 --- a/moniker/openstack/common/threadgroup.py +++ b/moniker/openstack/common/threadgroup.py @@ -38,8 +38,7 @@ class Thread(object): :class:`ThreadGroup`. The Thread will notify the :class:`ThreadGroup` when it has done so it can be removed from the threads list. """ - def __init__(self, name, thread, group): - self.name = name + def __init__(self, thread, group): self.thread = thread self.thread.link(_thread_done, group=group, thread=self) @@ -57,8 +56,7 @@ class ThreadGroup(object): when need be). * provide an easy API to add timers. """ - def __init__(self, name, thread_pool_size=10): - self.name = name + def __init__(self, thread_pool_size=10): self.pool = greenpool.GreenPool(thread_pool_size) self.threads = [] self.timers = [] @@ -72,7 +70,7 @@ class ThreadGroup(object): def add_thread(self, callback, *args, **kwargs): gt = self.pool.spawn(callback, *args, **kwargs) - th = Thread(callback.__name__, gt, self) + th = Thread(gt, self) self.threads.append(th) def thread_done(self, thread): diff --git a/moniker/openstack/common/timeutils.py b/moniker/openstack/common/timeutils.py index 0f346087f..5a011e818 100644 --- a/moniker/openstack/common/timeutils.py +++ b/moniker/openstack/common/timeutils.py @@ -98,6 +98,11 @@ def utcnow(): return datetime.datetime.utcnow() +def iso8601_from_timestamp(timestamp): + """Returns a iso8601 formated date from timestamp""" + return isotime(datetime.datetime.utcfromtimestamp(timestamp)) + + utcnow.override_time = None @@ -162,3 +167,16 @@ def delta_seconds(before, after): except AttributeError: return ((delta.days * 24 * 3600) + delta.seconds + float(delta.microseconds) / (10 ** 6)) + + +def is_soon(dt, window): + """ + Determines if time is going to happen in the next window seconds. + + :params dt: the time + :params window: minimum seconds to remain to consider the time not soon + + :return: True if expiration is within the given duration + """ + soon = (utcnow() + datetime.timedelta(seconds=window)) + return normalize_time(dt) < soon diff --git a/moniker/openstack/common/version.py b/moniker/openstack/common/version.py index c04a695ff..3653ad0d0 100644 --- a/moniker/openstack/common/version.py +++ b/moniker/openstack/common/version.py @@ -1,6 +1,6 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2012 OpenStack LLC +# Copyright 2012-2013 Hewlett-Packard Development Company, L.P. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -15,116 +15,56 @@ # under the License. """ -Utilities for consuming the auto-generated versioninfo files. +Utilities for consuming the version from pkg_resources. """ -import datetime import pkg_resources -import setup - class VersionInfo(object): - def __init__(self, package, python_package=None, pre_version=None): + def __init__(self, package): """Object that understands versioning for a package - :param package: name of the top level python namespace. For glance, - this would be "glance" for python-glanceclient, it - would be "glanceclient" - :param python_package: optional name of the project name. For - glance this can be left unset. For - python-glanceclient, this would be - "python-glanceclient" - :param pre_version: optional version that the project is working to + :param package: name of the python package, such as glance, or + python-glanceclient """ self.package = package - if python_package is None: - self.python_package = package - else: - self.python_package = python_package - self.pre_version = pre_version + self.release = None self.version = None self._cached_version = None - def _generate_version(self): - """Defer to the openstack.common.setup routines for making a - version from git.""" - if self.pre_version is None: - return setup.get_post_version(self.package) - else: - return setup.get_pre_version(self.package, self.pre_version) + def _get_version_from_pkg_resources(self): + """Get the version of the package from the pkg_resources record + associated with the package.""" + requirement = pkg_resources.Requirement.parse(self.package) + provider = pkg_resources.get_provider(requirement) + return provider.version - def _newer_version(self, pending_version): - """Check to see if we're working with a stale version or not. - We expect a version string that either looks like: - 2012.2~f3~20120708.10.4426392 - which is an unreleased version of a pre-version, or: - 0.1.1.4.gcc9e28a - which is an unreleased version of a post-version, or: - 0.1.1 - Which is a release and which should match tag. - For now, if we have a date-embedded version, check to see if it's - old, and if so re-generate. Otherwise, just deal with it. - """ - try: - version_date = int(self.version.split("~")[-1].split('.')[0]) - if version_date < int(datetime.date.today().strftime('%Y%m%d')): - return self._generate_version() - else: - return pending_version - except Exception: - return pending_version - - def version_string_with_vcs(self, always=False): + def release_string(self): """Return the full version of the package including suffixes indicating VCS status. - - For instance, if we are working towards the 2012.2 release, - canonical_version_string should return 2012.2 if this is a final - release, or else something like 2012.2~f1~20120705.20 if it's not. - - :param always: if true, skip all version caching """ - if always: - self.version = self._generate_version() + if self.release is None: + self.release = self._get_version_from_pkg_resources() + return self.release + + def version_string(self): + """Return the short version minus any alpha/beta tags.""" if self.version is None: - - requirement = pkg_resources.Requirement.parse(self.python_package) - versioninfo = "%s/versioninfo" % self.package - try: - raw_version = pkg_resources.resource_string(requirement, - versioninfo) - self.version = self._newer_version(raw_version.strip()) - except (IOError, pkg_resources.DistributionNotFound): - self.version = self._generate_version() + parts = [] + for part in self.release_string().split('.'): + if part[0].isdigit(): + parts.append(part) + else: + break + self.version = ".".join(parts) return self.version - def canonical_version_string(self, always=False): - """Return the simple version of the package excluding any suffixes. - - For instance, if we are working towards the 2012.2 release, - canonical_version_string should return 2012.2 in all cases. - - :param always: if true, skip all version caching - """ - return self.version_string_with_vcs(always).split('~')[0] - - def version_string(self, always=False): - """Return the base version of the package. - - For instance, if we are working towards the 2012.2 release, - version_string should return 2012.2 if this is a final release, or - 2012.2-dev if it is not. - - :param always: if true, skip all version caching - """ - version_parts = self.version_string_with_vcs(always).split('~') - if len(version_parts) == 1: - return version_parts[0] - else: - return '%s-dev' % (version_parts[0],) + # Compatibility functions + canonical_version_string = version_string + version_string_with_vcs = release_string def cached_version_string(self, prefix=""): """Generate an object which will expand in a string context to diff --git a/moniker/openstack/common/wsgi.py b/moniker/openstack/common/wsgi.py index 13af36ac6..e94455838 100644 --- a/moniker/openstack/common/wsgi.py +++ b/moniker/openstack/common/wsgi.py @@ -257,7 +257,7 @@ class Request(webob.Request): Does not do any body introspection, only checks header """ - if not "Content-Type" in self.headers: + if "Content-Type" not in self.headers: return None content_type = self.content_type diff --git a/moniker/version.py b/moniker/version.py deleted file mode 100644 index 53fb75576..000000000 --- a/moniker/version.py +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright 2012 Managed I.T. -# -# Author: Kiall Mac Innes -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -from moniker.openstack.common import version as common_version - -version_info = common_version.VersionInfo('moniker') diff --git a/setup.py b/setup.py index 26c2a272c..c08830af0 100755 --- a/setup.py +++ b/setup.py @@ -17,7 +17,6 @@ from setuptools import setup, find_packages import textwrap from moniker.openstack.common import setup as common_setup -from moniker.version import version_info as version install_requires = common_setup.parse_requirements(['tools/pip-requires']) install_options = common_setup.parse_requirements(['tools/pip-options']) @@ -32,7 +31,7 @@ dependency_links = common_setup.parse_dependency_links([ setup( name='moniker', - version=version.canonical_version_string(always=True), + version=common_setup.get_version('moniker'), description='DNS as a Service', author='Kiall Mac Innes', author_email='kiall@managedit.ie', diff --git a/tools/setup-requires b/tools/setup-requires index 72c473c69..e69de29bb 100644 --- a/tools/setup-requires +++ b/tools/setup-requires @@ -1 +0,0 @@ -setuptools-git>=0.4