Update common again..

Previous sync borked RPC code
And general update to tip of trunk
This commit is contained in:
Endre Karlson 2012-11-02 08:59:13 +01:00
parent 946f05c513
commit e0afcc425f
9 changed files with 65 additions and 108 deletions

View File

@ -22,18 +22,6 @@ Exceptions common to OpenStack projects
import logging
class ProcessExecutionError(IOError):
def __init__(self, stdout=None, stderr=None, exit_code=None, cmd=None,
description=None):
if description is None:
description = "Unexpected error while running command."
if exit_code is None:
exit_code = '-'
message = "%s\nCommand: %s\nExit code: %s\nStdout: %r\nStderr: %r" % (
description, cmd, exit_code, stdout, stderr)
IOError.__init__(self, message)
class Error(Exception):
def __init__(self, message=None):
super(Error, self).__init__(message)

View File

@ -141,6 +141,9 @@ class ConnectionContext(rpc_common.Connection):
def consume_in_thread(self):
self.connection.consume_in_thread()
def consume_in_thread_group(self, thread_group):
self.connection.consume_in_thread_group(thread_group)
def __getattr__(self, key):
"""Proxy all other calls to the Connection instance"""
if self.connection:

View File

@ -161,6 +161,19 @@ class Connection(object):
"""
raise NotImplementedError()
def consume_in_thread_group(self, thread_group):
"""Spawn a thread to handle incoming messages in the supplied ThreadGroup.
Spawn a thread that will be responsible for handling all incoming
messages for consumers that were set up on this connection.
Message dispatching inside of this is expected to be implemented in a
non-blocking manner. An example implementation would be having this
thread pull messages in for all of the consumers, but utilize a thread
pool for dispatching the messages to the proxy objects.
"""
raise NotImplementedError()
def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging."""

View File

@ -113,6 +113,9 @@ class Connection(object):
def consume_in_thread(self):
pass
def consume_in_thread_group(self, thread_group):
pass
def create_connection(conf, new=True):
"""Create a connection"""

View File

@ -703,17 +703,25 @@ class Connection(object):
except StopIteration:
return
def _consumer_thread_callback(self):
""" Consumer thread callback used by consume_in_* """
try:
self.consume()
except greenlet.GreenletExit:
return
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread"""
def _consumer_thread():
try:
self.consume()
except greenlet.GreenletExit:
return
if self.consumer_thread is None:
self.consumer_thread = eventlet.spawn(_consumer_thread)
self.consumer_thread = eventlet.spawn(
self._consumer_thread_callback)
return self.consumer_thread
def consume_in_thread_group(self, thread_group):
""" Consume from all queues/consumers in the supplied ThreadGroup"""
thread_group.add_thread(self._consumer_thread_callback)
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
proxy_cb = rpc_amqp.ProxyCallback(
@ -777,7 +785,7 @@ def cast_to_server(conf, context, server_params, topic, msg):
def fanout_cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a fanout exchange to a specific server."""
return rpc_amqp.cast_to_server(
return rpc_amqp.fanout_cast_to_server(
conf, context, server_params, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection))

View File

@ -494,6 +494,13 @@ class Connection(object):
"""Send a notify message on a topic"""
self.publisher_send(NotifyPublisher, topic, msg)
def _consumer_thread_callback(self):
""" Consumer thread callback used by consume_in_* """
try:
self.consume()
except greenlet.GreenletExit:
return
def consume(self, limit=None):
"""Consume from all queues/consumers"""
it = self.iterconsume(limit=limit)
@ -505,15 +512,16 @@ class Connection(object):
def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread"""
def _consumer_thread():
try:
self.consume()
except greenlet.GreenletExit:
return
if self.consumer_thread is None:
self.consumer_thread = eventlet.spawn(_consumer_thread)
self.consumer_thread = eventlet.spawn(
self._consumer_thread_callback)
return self.consumer_thread
def consume_in_thread_group(self, thread_group):
""" Consume from all queues/consumers in the supplied ThreadGroup"""
thread_group.add_thread(self._consumer_thread_callback)
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
proxy_cb = rpc_amqp.ProxyCallback(

View File

@ -370,17 +370,24 @@ class ZmqBaseReactor(ConsumerBase):
LOG.info(_("Out reactor registered"))
def consume_in_thread(self):
def _consume(sock):
LOG.info(_("Consuming socket"))
while True:
self.consume(sock)
def _consumer_thread_callback(self, sock):
""" Consumer thread callback used by consume_in_* """
LOG.info(_("Consuming socket"))
while True:
self.consume(sock)
def consume_in_thread(self):
for k in self.proxies.keys():
self.threads.append(
self.pool.spawn(_consume, k)
self.pool.spawn(self._consumer_thread_callback, k)
)
def consume_in_thread_group(self, thread_group):
""" Consume from all queues/consumers in the supplied ThreadGroup"""
for k in self.proxies.keys():
thread_group.add_thread(self._consumer_thread_callback, k)
def wait(self):
for t in self.threads:
t.wait()
@ -523,6 +530,9 @@ class Connection(rpc_common.Connection):
def consume_in_thread(self):
self.reactor.consume_in_thread()
def consume_in_thread_group(self, thread_group):
self.reactor.consume_in_thread_group(thread_group)
def _cast(addr, context, msg_id, topic, msg, timeout=None):
timeout_cast = timeout or CONF.rpc_cast_timeout

View File

@ -58,7 +58,7 @@ class Service(service.Service):
self.conn.create_consumer(self.topic, dispatcher, fanout=True)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
self.conn.consume_in_thread_group(self.tg)
def stop(self):
# Try to shut the connection down, but if we get any sort of

View File

@ -62,79 +62,3 @@ def bool_from_string(subject):
if subject.strip().lower() in ('true', 'on', 'yes', '1'):
return True
return False
def execute(*cmd, **kwargs):
"""
Helper method to execute command with optional retry.
:cmd Passed to subprocess.Popen.
:process_input Send to opened process.
:check_exit_code Defaults to 0. Raise exception.ProcessExecutionError
unless program exits with this code.
:delay_on_retry True | False. Defaults to True. If set to True, wait a
short amount of time before retrying.
:attempts How many times to retry cmd.
:run_as_root True | False. Defaults to False. If set to True,
the command is prefixed by the command specified
in the root_helper kwarg.
:root_helper command to prefix all cmd's with
:raises exception.Error on receiving unknown arguments
:raises exception.ProcessExecutionError
"""
process_input = kwargs.pop('process_input', None)
check_exit_code = kwargs.pop('check_exit_code', 0)
delay_on_retry = kwargs.pop('delay_on_retry', True)
attempts = kwargs.pop('attempts', 1)
run_as_root = kwargs.pop('run_as_root', False)
root_helper = kwargs.pop('root_helper', '')
if len(kwargs):
raise exception.Error(_('Got unknown keyword args '
'to utils.execute: %r') % kwargs)
if run_as_root:
cmd = shlex.split(root_helper) + list(cmd)
cmd = map(str, cmd)
while attempts > 0:
attempts -= 1
try:
LOG.debug(_('Running cmd (subprocess): %s'), ' '.join(cmd))
_PIPE = subprocess.PIPE # pylint: disable=E1101
obj = subprocess.Popen(cmd,
stdin=_PIPE,
stdout=_PIPE,
stderr=_PIPE,
close_fds=True)
result = None
if process_input is not None:
result = obj.communicate(process_input)
else:
result = obj.communicate()
obj.stdin.close() # pylint: disable=E1101
_returncode = obj.returncode # pylint: disable=E1101
if _returncode:
LOG.debug(_('Result was %s') % _returncode)
if (isinstance(check_exit_code, int) and
not isinstance(check_exit_code, bool) and
_returncode != check_exit_code):
(stdout, stderr) = result
raise exception.ProcessExecutionError(
exit_code=_returncode,
stdout=stdout,
stderr=stderr,
cmd=' '.join(cmd))
return result
except exception.ProcessExecutionError:
if not attempts:
raise
else:
LOG.debug(_('%r failed. Retrying.'), cmd)
if delay_on_retry:
greenthread.sleep(random.randint(20, 200) / 100.0)
finally:
# NOTE(termie): this appears to be necessary to let the subprocess
# call clean something up in between calls, without
# it two execute calls in a row hangs the second one
greenthread.sleep(0)