Sync common

This commit is contained in:
Endre Karlson 2012-11-02 08:43:33 +01:00
parent 5eeddfc1d1
commit aa4b7123c9
15 changed files with 65 additions and 89 deletions

View File

@ -236,10 +236,10 @@ log files:
This module also contains a global instance of the CommonConfigOpts class
in order to support a common usage pattern in OpenStack:
from billistix.openstack.common import cfg
from openstack.common import cfg
opts = [
cfg.StrOpt('bind_host' default='0.0.0.0'),
cfg.StrOpt('bind_host', default='0.0.0.0'),
cfg.IntOpt('bind_port', default=9292),
]

View File

@ -16,11 +16,11 @@
# under the License.
"""
gettext for billistix.openstack-common modules.
gettext for openstack-common modules.
Usual usage in an billistix.openstack.common module:
Usual usage in an openstack.common module:
from billistix.openstack.common.gettextutils import _
from openstack.common.gettextutils import _
"""
import gettext

View File

@ -76,6 +76,9 @@ log_opts = [
cfg.BoolOpt('publish_errors',
default=False,
help='publish error events'),
cfg.BoolOpt('fatal_deprecations',
default=False,
help='make deprecations fatal'),
# NOTE(mikal): there are two options here because sometimes we are handed
# a full instance (and could include more information), and other times we
@ -170,6 +173,14 @@ class ContextAdapter(logging.LoggerAdapter):
def audit(self, msg, *args, **kwargs):
self.log(logging.AUDIT, msg, *args, **kwargs)
def deprecated(self, msg, *args, **kwargs):
stdmsg = _("Deprecated Config: %s") % msg
if CONF.fatal_deprecations:
self.critical(stdmsg, *args, **kwargs)
raise DeprecatedConfig(msg=stdmsg)
else:
self.warn(stdmsg, *args, **kwargs)
def process(self, msg, kwargs):
if 'extra' not in kwargs:
kwargs['extra'] = {}
@ -246,7 +257,7 @@ class JSONFormatter(logging.Formatter):
class PublishErrorsHandler(logging.Handler):
def emit(self, record):
if ('openstack.common.notifier.log_notifier' in
if ('billistix.openstack.common.notifier.log_notifier' in
CONF.notification_driver):
return
notifier.api.notify(None, 'error.publisher',
@ -450,3 +461,10 @@ class ColorHandler(logging.StreamHandler):
def format(self, record):
record.color = self.LEVEL_COLORS[record.levelno]
return logging.StreamHandler.format(self, record)
class DeprecatedConfig(Exception):
message = _("Fatal call to deprecated config: %(msg)s")
def __init__(self, msg):
super(Exception, self).__init__(self.message % dict(msg=msg))

View File

@ -24,12 +24,12 @@ CONF = cfg.CONF
def notify(_context, message):
"""Notifies the recipient of the desired event given the model.
Log notifications using billistix.openstack's default logging system"""
Log notifications using openstack's default logging system"""
priority = message.get('priority',
CONF.default_notification_level)
priority = priority.lower()
logger = logging.getLogger(
'openstack.common.notification.%s' %
'billistix.openstack.common.notification.%s' %
message['event_type'])
getattr(logger, priority)(jsonutils.dumps(message))

View File

@ -24,7 +24,7 @@ LOG = logging.getLogger(__name__)
notification_topic_opt = cfg.ListOpt(
'notification_topics', default=['notifications', ],
help='AMQP topic used for billistix.openstack notifications')
help='AMQP topic used for openstack notifications')
CONF = cfg.CONF
CONF.register_opt(notification_topic_opt)

View File

@ -47,7 +47,7 @@ rpc_opts = [
help='Seconds to wait before a cast expires (TTL). '
'Only supported by impl_zmq.'),
cfg.ListOpt('allowed_rpc_exception_modules',
default=['openstack.common.exception',
default=['billistix.openstack.common.exception',
'nova.exception',
'cinder.exception',
],
@ -80,7 +80,7 @@ def create_connection(new=True):
implementation is free to return an existing connection from a
pool.
:returns: An instance of billistix.openstack.common.rpc.common.Connection
:returns: An instance of openstack.common.rpc.common.Connection
"""
return _get_impl().create_connection(cfg.CONF, new=new)
@ -92,7 +92,7 @@ def call(context, topic, msg, timeout=None):
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
billistix.openstack.common.rpc.common.Connection.create_consumer()
openstack.common.rpc.common.Connection.create_consumer()
and only applies when the consumer was created with
fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
@ -102,7 +102,7 @@ def call(context, topic, msg, timeout=None):
:returns: A dict from the remote method.
:raises: billistix.openstack.common.rpc.common.Timeout if a complete response
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
return _get_impl().call(cfg.CONF, context, topic, msg, timeout)
@ -115,7 +115,7 @@ def cast(context, topic, msg):
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
billistix.openstack.common.rpc.common.Connection.create_consumer()
openstack.common.rpc.common.Connection.create_consumer()
and only applies when the consumer was created with
fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
@ -136,7 +136,7 @@ def fanout_cast(context, topic, msg):
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
billistix.openstack.common.rpc.common.Connection.create_consumer()
openstack.common.rpc.common.Connection.create_consumer()
and only applies when the consumer was created with
fanout=True.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
@ -158,7 +158,7 @@ def multicall(context, topic, msg, timeout=None):
request.
:param topic: The topic to send the rpc message to. This correlates to the
topic argument of
billistix.openstack.common.rpc.common.Connection.create_consumer()
openstack.common.rpc.common.Connection.create_consumer()
and only applies when the consumer was created with
fanout=False.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
@ -171,7 +171,7 @@ def multicall(context, topic, msg, timeout=None):
returned and X is the Nth value that was returned by the remote
method.
:raises: billistix.openstack.common.rpc.common.Timeout if a complete response
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)

View File

@ -18,7 +18,7 @@
# under the License.
"""
Shared code between AMQP based billistix.openstack.common.rpc implementations.
Shared code between AMQP based openstack.common.rpc implementations.
The code in this module is shared between the rpc implemenations based on AMQP.
Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
@ -141,9 +141,6 @@ class ConnectionContext(rpc_common.Connection):
def consume_in_thread(self):
self.connection.consume_in_thread()
def consume_in_thread_group(self, thread_group):
self.connection.consume_in_thread_group(thread_group)
def __getattr__(self, key):
"""Proxy all other calls to the Connection instance"""
if self.connection:

View File

@ -161,19 +161,6 @@ class Connection(object):
"""
raise NotImplementedError()
def consume_in_thread_group(self, thread_group):
"""Spawn a thread to handle incoming messages in the supplied ThreadGroup.
Spawn a thread that will be responsible for handling all incoming
messages for consumers that were set up on this connection.
Message dispatching inside of this is expected to be implemented in a
non-blocking manner. An example implementation would be having this
thread pull messages in for all of the consumers, but utilize a thread
pool for dispatching the messages to the proxy objects.
"""
raise NotImplementedError()
def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging."""

View File

@ -113,9 +113,6 @@ class Connection(object):
def consume_in_thread(self):
pass
def consume_in_thread_group(self, thread_group):
pass
def create_connection(conf, new=True):
"""Create a connection"""

View File

@ -31,9 +31,9 @@ import kombu.messaging
from billistix.openstack.common import cfg
from billistix.openstack.common.gettextutils import _
from billistix.openstack.common import network_utils
from billistix.openstack.common.rpc import amqp as rpc_amqp
from billistix.openstack.common.rpc import common as rpc_common
from billistix.openstack.common import network_utils
kombu_opts = [
cfg.StrOpt('kombu_ssl_version',
@ -267,6 +267,7 @@ class FanoutConsumer(ConsumerBase):
# Default options
options = {'durable': False,
'queue_arguments': _get_queue_arguments(conf),
'auto_delete': True,
'exclusive': True}
options.update(kwargs)
@ -702,25 +703,17 @@ class Connection(object):
except StopIteration:
return
def _consumer_thread_callback(self):
""" Consumer thread callback used by consume_in_* """
try:
self.consume()
except greenlet.GreenletExit:
return
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread"""
def _consumer_thread():
try:
self.consume()
except greenlet.GreenletExit:
return
if self.consumer_thread is None:
self.consumer_thread = eventlet.spawn(
self._consumer_thread_callback)
self.consumer_thread = eventlet.spawn(_consumer_thread)
return self.consumer_thread
def consume_in_thread_group(self, thread_group):
""" Consume from all queues/consumers in the supplied ThreadGroup"""
thread_group.add_thread(self._consumer_thread_callback)
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
proxy_cb = rpc_amqp.ProxyCallback(

View File

@ -494,13 +494,6 @@ class Connection(object):
"""Send a notify message on a topic"""
self.publisher_send(NotifyPublisher, topic, msg)
def _consumer_thread_callback(self):
""" Consumer thread callback used by consume_in_* """
try:
self.consume()
except greenlet.GreenletExit:
return
def consume(self, limit=None):
"""Consume from all queues/consumers"""
it = self.iterconsume(limit=limit)
@ -512,16 +505,15 @@ class Connection(object):
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread"""
def _consumer_thread():
try:
self.consume()
except greenlet.GreenletExit:
return
if self.consumer_thread is None:
self.consumer_thread = eventlet.spawn(
self._consumer_thread_callback)
self.consumer_thread = eventlet.spawn(_consumer_thread)
return self.consumer_thread
def consume_in_thread_group(self, thread_group):
""" Consume from all queues/consumers in the supplied ThreadGroup"""
thread_group.add_thread(self._consumer_thread_callback)
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
proxy_cb = rpc_amqp.ProxyCallback(

View File

@ -49,7 +49,7 @@ zmq_opts = [
# The module.Class to use for matchmaking.
cfg.StrOpt(
'rpc_zmq_matchmaker',
default=('openstack.common.rpc.'
default=('billistix.openstack.common.rpc.'
'matchmaker.MatchMakerLocalhost'),
help='MatchMaker driver',
),
@ -370,24 +370,17 @@ class ZmqBaseReactor(ConsumerBase):
LOG.info(_("Out reactor registered"))
def _consumer_thread_callback(self, sock):
""" Consumer thread callback used by consume_in_* """
LOG.info(_("Consuming socket"))
while True:
self.consume(sock)
def consume_in_thread(self):
def _consume(sock):
LOG.info(_("Consuming socket"))
while True:
self.consume(sock)
for k in self.proxies.keys():
self.threads.append(
self.pool.spawn(self._consumer_thread_callback, k)
self.pool.spawn(_consume, k)
)
def consume_in_thread_group(self, thread_group):
""" Consume from all queues/consumers in the supplied ThreadGroup"""
for k in self.proxies.keys():
thread_group.add_thread(self._consumer_thread_callback, k)
def wait(self):
for t in self.threads:
t.wait()
@ -530,9 +523,6 @@ class Connection(rpc_common.Connection):
def consume_in_thread(self):
self.reactor.consume_in_thread()
def consume_in_thread_group(self, thread_group):
self.reactor.consume_in_thread_group(thread_group)
def _cast(addr, context, msg_id, topic, msg, timeout=None):
timeout_cast = timeout or CONF.rpc_cast_timeout
@ -556,7 +546,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
timeout = timeout or CONF.rpc_response_timeout
# The msg_id is used to track replies.
msg_id = str(uuid.uuid4().hex)
msg_id = uuid.uuid4().hex
# Replies always come into the reply service.
reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host

View File

@ -58,7 +58,7 @@ class Service(service.Service):
self.conn.create_consumer(self.topic, dispatcher, fanout=True)
# Consume from all consumers in a thread
self.conn.consume_in_thread_group(self.tg)
self.conn.consume_in_thread()
def stop(self):
# Try to shut the connection down, but if we get any sort of

View File

@ -136,15 +136,17 @@ def _get_git_next_version_suffix(branch_name):
_run_shell_command("git fetch origin +refs/meta/*:refs/remotes/meta/*")
milestone_cmd = "git show meta/openstack/release:%s" % branch_name
milestonever = _run_shell_command(milestone_cmd)
if not milestonever:
milestonever = ""
if milestonever:
first_half = "%s~%s" % (milestonever, datestamp)
else:
first_half = datestamp
post_version = _get_git_post_version()
# post version should look like:
# 0.1.1.4.gcc9e28a
# where the bit after the last . is the short sha, and the bit between
# the last and second to last is the revno count
(revno, sha) = post_version.split(".")[-2:]
first_half = "%s~%s" % (milestonever, datestamp)
second_half = "%s%s.%s" % (revno_prefix, revno, sha)
return ".".join((first_half, second_half))

View File

@ -23,8 +23,8 @@ import logging
import random
import shlex
from eventlet import greenthread
from eventlet.green import subprocess
from eventlet import greenthread
from billistix.openstack.common import exception
from billistix.openstack.common.gettextutils import _