Commit Graph

35 Commits

Author SHA1 Message Date
Doug Hellmann e55a83e832 Move files out of the namespace package
Move the public API out of oslo.messaging to oslo_messaging. Retain
the ability to import from the old namespace package for backwards
compatibility for this release cycle.

bp/drop-namespace-packages

Co-authored-by: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
Change-Id: Ia562010c152a214f1c0fed767c82022c7c2c52e7
2015-01-12 12:50:41 -05:00
Mehdi Abaakouk 15aa5cbda8 The executor doesn't need to set the timeout
It's up to the driver to set a suitable timeout for polling the broker,
this one can be different that the one requested by the driver
caller as long as the caller timeout is respected.

This change also adds a new driver listener API, to be able
to stop it cleanly, specially in case of timeout=None.

Closes bug: #1400268
Closes bug: #1399257
Change-Id: I674c0def1efb420c293897d49683593a0b10e291
2014-12-08 12:59:33 +01:00
Mehdi Abaakouk 30e0aea877 Notification listener pools
We can now set the pool name of a notification listener
to create multiple groups/pools of listeners consuming notifications
and that each group/pool only receives one copy of each notification.

The AMQP implementation of that is to set queue_name with the pool name.

Implements blueprint notification-listener-pools
Closes-bug: #1356226

Change-Id: I8dc0549f5550f684a261c78c58737b798fcdd656
2014-11-14 10:20:18 +01:00
Nejc Saje 1cc3e5d866 Fixes incorrect exchange lock in fake driver
The exchanges lock in the fake driver was being disregarded,
probably due to a typo.

Closes-Bug: #1342088
Change-Id: I45261fce501a867b9253112e1608b14ed5afea26
2014-07-15 12:20:16 +00:00
Paul Michali 0cfafac246 encoding error in file
There is a non-printable character (c2a0) in a comment line, which
in a new Neutron test case causes a StringException. There are
three issues here.

1) Encoding issue in the file.
2) No detection of encoding issues in files.
3) Neutron tests mask the failure detail

This addresses the first issue. Bug 1334798 in Neutron was created
for the last issue. Oslo team is aware of the second issue.

Related-Bug: #1334798

Change-Id: I8cfa6c93085a26666a8b4bee06dcd52709d04a95
2014-06-26 18:53:35 +00:00
Mehdi Abaakouk e582da68f4 Ensures listener queues exist in fake driver
The fanout queues of the fake driver are created at the first executor
poll, but if we use eventlet executor and the fake driver, when the sender
delivers a fanout message before the first poll, the message goes to the
topic queue instead of the server fanout queue.

The changes fixes that by ensuring the all queues exists when the
listener is created.

Closes bug #1331453

Change-Id: I92e7c01dd87d634b741bbcaea92f48730fdd555e
2014-06-23 09:10:21 +02:00
ChangBo Guo(gcb) 821ee096a6 Removes the use of mutables as default args
Passing mutable objects as default args is a known Python pitfall.
We'd better avoid this.

Change-Id: I67cc0774a65886ef9fce0b72e52157b622248a85
Closes-Bug: #1327473
2014-06-21 11:41:36 +08:00
Jenkins a64692689a Merge "Add an optional timeout parameter to Listener.poll" 2014-06-20 18:50:16 +00:00
Mehdi Abaakouk 1ea9c35ab4 Transport reconnection retries for notification
This patch add support of reconnection retries for the
messaging notifier.

Related bug #1282639
Change-Id: Ia30331f8306ff0f6952d83ef42ff8bee6b900427
2014-06-18 18:41:33 +02:00
Mehdi Abaakouk 948c05417c Add transport reconnection retries
When a rpc client try to make a RPC call and the server is unreachable
The rpc call hang until the server come back.

In most case this is the desired behavior.

But sometimes, we can prefer that the library raise an exception after a
certain number of retries.

For example in ceilometer, when publishing a
storage.objects.incoming.bytes sample from the Swift middleware to an
AMQP topic, you might not want to block the Swift client if the AMQP broker
is unavailable - instead, you might have a queueing policy whereby
if a single reconection attempt fails we queue the sample in memory and
try again when another sample is to be published.

This patch is the oslo.messaging part that allow this.

Closes bug #1282639
Co-Authored-By: Ala Rezmerita <ala.rezmerita@cloudwatt.com>

Change-Id: I32086d0abf141c368343bf225d4b021da496c020
2014-06-13 16:21:59 +02:00
Victor Stinner 7fe2ef7334 Add an optional timeout parameter to Listener.poll
For asynchronous programming, a timeout parameter is required on the listener
to allow to stop it at exit. poll() returns None on timeout.

It plan to use it in my new asyncio (Trollius) executor:
https://review.openstack.org/#/c/70983/

See also the related blueprint for the rationale:
https://wiki.openstack.org/wiki/Oslo/blueprints/asyncio

Change-Id: I918ae3c267743a0eaed1d6a210c79fb4a0eb8733
2014-06-13 10:20:39 +02:00
ChangBo Guo(gcb) 16a4c9ea7a Remove vim header
We don't need vim modelines in each source file, it can be set in
user's vimrc.

Change-Id: I615f1ffc7e097f1ddbd908a512f1cb988489e871
Closes-Bug: #1229324
2014-03-18 05:51:12 +00:00
Jenkins 8cccf06c65 Merge "notification listener: add allow_requeue param" 2014-03-03 20:49:54 +00:00
Mark McLoughlin 5bd31315c2 notification listener: add allow_requeue param
In commit d8d2ad9 we added support for notification listener endpoint
methods to return REQUEUE, but if a driver does not support this we
raise NotImplementedError when the application attempts to requeue
a message.

This requeuing behaviour might only be used by an application in
unusual, exceptional circumstances and catch users by surprise.

Instead, let's require the application to assert that it needs this
feature in advance and raise NotImplementError at that point if the
driver doesn't support it.

Change-Id: Id0bb0e57d2dcc1ec7d752e98c9b1e8e48d99f35c
2014-03-03 07:51:18 -08:00
Mehdi Abaakouk 71ac681a73 Add multiple exchange per listerner in fake driver
This patch allow the FakeListener to listen on multiple FakeExchange.

Partial implements blueprint notification-subscriber-server

Change-Id: I9830cc01efdd931f6628853f7e84b947d7b855c9
2014-03-03 09:27:57 +01:00
Mehdi Abaakouk d8d2ad95d7 Allow to requeue the notification message
This patch allow to requeue the notification received by the
notification listener.

Partial implements blueprint notification-subscriber-server

Change-Id: I49c4ba91224c280e479edb19289ccb337a2ab843
2014-03-03 09:27:57 +01:00
Mehdi Abaakouk 0d102aa361 Abstract the acknowledge layer of a message
The patch add ta abstraction layer to acknowledge a message.

Partial implements blueprint notification-subscriber-server

Change-Id: I6e37780cc28737cfd56b6719ec8d9cebbc9bb278
2014-02-14 16:06:26 +01:00
Mehdi Abaakouk 9f58e2c3fe Implements notification listener and dispatcher
This patch allows to quickly create a listener to receive
notification messages.

Example of the api:

class Endpoint(object):
    def warn(self, ctxt, publisher_id, event_type, payload):
        do_something(payload)

target = messaging.Target(topic='notifications', exchange='cinder')
listener = notify.get_notification_listener(transport, [target],
                                            [Endpoint()],
                                            executor,
                                            serializer)

Implements blueprint notification-subscriber-server

Change-Id: I434bc487c382a2048670df726d9bebd640150bb9
2014-02-14 16:06:26 +01:00
Mehdi Abaakouk 11a90eabc9 Allow fake driver to consume multiple topics
This patch allow the fake driver to comsume multiple topics
with one listener.

Partial implements blueprint notification-subscriber-server

Change-Id: Ib52dc181e10b487854fbb398eda9f758232a1251
2014-01-30 13:40:42 +01:00
Chang Bo Guo 2278ce0b95 Use six.moves.queue instead of Queue
The Queue module has been renamed to queue in Python 3.
Use six.moves.queue to fit the change.

http://docs.python.org/2/library/queue.html#module-Queue

Change-Id: I2940f34d161b2e3cbc5464619f76e6adea4ef9f6
2013-12-07 19:35:41 -08:00
Mark McLoughlin 001d66e6e5 Fix fake driver handling of failure replies
The driver reply() method is actually passed a full sys.exc_info()
tuple.

This was masked in the unit tests because the driver ended up basically
doing:

  raise (ValueError, ValueError, ...)

which caused a new ValueError to be instantiated and the test was
satisified. However, if an exception type has some required arguments,
you'll get a TypeError when this statement attempts to instantiate it
with no arguments.

Change-Id: I4af9c5084954d7b9c5f02cdae3387d17c206985b
2013-08-16 11:08:50 +01:00
Mark McLoughlin c846cf35b8 Add a TransportURL class to the public API
Nova's cells/rpc_driver.py has some code which allows user of the REST
API to update elements of a cell's transport URL (say, the host name of
the message broker) stored in the database. To achieve this, it has
a parse_transport_url() method which breaks the URL into its constituent
parts and an unparse_transport_url() which re-forms it again after
updating some of its parts.

This is all fine and, since it's fairly specialized, it wouldn't be a
big deal to leave this code in Nova for now ... except the unparse
method looks at CONF.rpc_backend to know what scheme to use in the
returned URL if now backend was specified.

oslo.messaging registers the rpc_backend option, but the ability to
reference any option registered by the library should not be relied upon
by users of the library. Imagine, for instance, if we renamed the option
in future (with backwards compat for old configurations), then this
would mean API breakage.

So, long story short - an API along these lines makes some sense, but
especially since not having it would mean we'd need to add some way to
query the name of the transport driver.

In this commit, we add a simple new TransportURL class:

  >>> url = messaging.TransportURL.parse(cfg.CONF, 'foo:///')
  >>> str(url), url
  ('foo:///', <TransportURL transport='foo'>)
  >>> url.hosts.append(messaging.TransportHost(hostname='localhost'))
  >>> str(url), url
  ('foo://localhost/', <TransportURL transport='foo', hosts=[<TransportHost hostname='localhost'>]>)
  >>> url.transport = None
  >>> str(url), url
  ('kombu://localhost/', <TransportURL transport='kombu', hosts=[<TransportHost hostname='localhost'>]>)
  >>> cfg.CONF.set_override('rpc_backend', 'bar')
  >>> str(url), url
  ('bar://localhost/', <TransportURL transport='bar', hosts=[<TransportHost hostname='localhost'>]>)

The TransportURL.parse() method equates to parse_transport_url() and
TransportURL.__str__() equates to unparse_transport().

The transport drivers are also updated to take a TransportURL as a
required argument, which simplifies the handling of transport URLs in
the drivers.

Change-Id: Ic04173476329858e4a2c2d2707e9d4aeb212d127
2013-08-12 23:30:43 +01:00
Mark McLoughlin 9cc66e1e01 Kill ability to specify exchange in transport URL
My original thinking was that if you're using the exchange name to
separate two instances of the applications (so their queues don't
collide) then the exchange name is pretty key to transport
configuration. In fact, it's really a virtual host that you'd use for
this (at least in the case of rabbit and qpid).

Also, Nova's cells code has already moved ahead with the assumption that
the path specifies a virtual host, so it'd only make sense to deviate
from that if there was a really good reason to.

Change-Id: Ic8b5dc3538b6b17afec524047acc2efa76366377
2013-08-12 06:09:48 +01:00
Mark McLoughlin ac2176cde3 Add a per-transport allow_remote_exmods API
Currently we have a allowed_rpc_exception_modules configuration variable
which we use to configure a per-project list of modules which we will
allow exceptions to be instantiated from when deserializing remote
errors.

It makes no sense for this to be user configurable, instead the list of
modules should be set when you create a transport.

Closes-Bug: #1031719
Change-Id: Ib40e92cb920996ec5e8f63d6f2cbd88fd01a90f2
2013-08-07 13:11:46 +01:00
Mark McLoughlin 9ac9f615b2 Implement failure replies in the fake driver
Change-Id: Ifd9ede7cb17a471ae2f9024b49ef6bbdc645476a
2013-08-07 12:52:58 +01:00
Mark McLoughlin f6df32d943 Add API for expected endpoint exceptions
Review I4e7b19dc730342091fd70a717065741d56da4555 gives a lot of the
background here, but the idea is that some exceptions raised by an RPC
endpoint method do not indicate any sort of failure and should not be
logged by the server as an error.

The classic example of this is conductor's instance_get() method raising
InstanceNotFound. This is perfectly normal and should not be considered
an error.

The new API is a decorator which you can use with RPC endpoints methods
to indicate which exceptions are expected:

    @messaging.expected_exceptions(InstanceNotFound)
    def instance_get(self, context, instance_id):
        ...

but we also need to expose the ExpectedException type itself so that
direct "local" users of the endpoint class know what type will be used
to wrap expected exceptions. For example, Nova has an ExceptionHelper
class which unwraps the original exception from an ExpectedException and
re-raises it.

I've changed from client_exceptions() and ClientException to make it
more clear it's intent. I felt that the "client" naming gave the
impression it was intended for use on the client side.

Change-Id: Ieec4600bd6b70cf31ac7925a98a517b84acada4d
2013-08-07 12:52:54 +01:00
Mark McLoughlin 206c19e99e Add a driver method specifically for sending notifications
Notifications are an unusual case in that we need users to manually opt
in to new incompatible message formats by editing configuration because
there may be external consumers expecting the old format.

Add a send_notification() method to the driver interface and add a
format version paramater to the method, to make it clear that this
version selection is specifically for notifications.

In the case of the rabbit/qpid drivers, the 2.0 format is where we added
the message envelope.

Change-Id: Ib4925c308b1252503749962aa16f043281f2b429
2013-08-07 06:51:35 +01:00
Mark McLoughlin 294c99a6d2 Enforce target preconditions outside of drivers
The target preconditions (e.g. you need at least a topic to send) are
the same for all drivers, so enforce them before we ever call into a
driver.

Change-Id: Ic4e9bd94bd9f060ec0662d2bb778c699903dddc4
2013-08-07 06:51:24 +01:00
Mark McLoughlin 84a0693737 Remove unused IncomingMessage.done()
We appear to not have a use for this. I had originally thought we might
use this to ack messages one they've been processed and replied to, but
we actually have always acked messages as soon as they have been
deserialized and queued for dispatching.

Change-Id: I8e1fd565814f3b5e3ba0f1bc77e62ed52ff08661
2013-08-07 06:51:24 +01:00
Mark McLoughlin 1f0874857c Add a transport cleanup() method
Pretty obvious that we need this.

The rabbit/qpid implementations just empty the connection pool, in the
same way their module-level cleanup() methods do now.

Change-Id: I70ba5cab3eb7a30f74cdd6cafe60087769a77b57
2013-08-02 06:47:57 +01:00
Mark McLoughlin 29cb5b0b58 Move url utils into messaging._urls 2013-06-16 15:27:36 +01:00
Mark McLoughlin 9cb803ee10 Fix fake driver with eventlet
By storing the reply_q on the listener, we were assuming there was only
one message being dispatched at the time. Put it on the incoming message
instead and use it directly in reply().
2013-06-16 12:23:11 +01:00
Mark McLoughlin de88d62998 Remove a fixed fixme 2013-06-15 21:19:05 +01:00
Mark McLoughlin 978d19c256 Rework how queues get created in fake driver
Currently, if there are no servers listening on a topic then a message
to that topic just gets dropped by the fake driver.

This makes the tests intermittently fail if the server takes longer to
start.

Turn things on their head so that the client always creates the queues
on the exchange so that messages can get queued up even if there is no
server listening.

Now we also need to delete the "duplicate server on topic" test - it's
actually fine to have multiple servers listening on the one topic.
2013-06-15 18:56:11 +01:00
Mark McLoughlin cbfb1452a4 Move files to new locations for oslo.messaging 2013-06-15 08:43:54 +01:00