Move the public API out of oslo.messaging to oslo_messaging. Retain
the ability to import from the old namespace package for backwards
compatibility for this release cycle.
bp/drop-namespace-packages
Co-authored-by: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
Change-Id: Ia562010c152a214f1c0fed767c82022c7c2c52e7
It's up to the driver to set a suitable timeout for polling the broker,
this one can be different that the one requested by the driver
caller as long as the caller timeout is respected.
This change also adds a new driver listener API, to be able
to stop it cleanly, specially in case of timeout=None.
Closes bug: #1400268
Closes bug: #1399257
Change-Id: I674c0def1efb420c293897d49683593a0b10e291
We can now set the pool name of a notification listener
to create multiple groups/pools of listeners consuming notifications
and that each group/pool only receives one copy of each notification.
The AMQP implementation of that is to set queue_name with the pool name.
Implements blueprint notification-listener-pools
Closes-bug: #1356226
Change-Id: I8dc0549f5550f684a261c78c58737b798fcdd656
The exchanges lock in the fake driver was being disregarded,
probably due to a typo.
Closes-Bug: #1342088
Change-Id: I45261fce501a867b9253112e1608b14ed5afea26
There is a non-printable character (c2a0) in a comment line, which
in a new Neutron test case causes a StringException. There are
three issues here.
1) Encoding issue in the file.
2) No detection of encoding issues in files.
3) Neutron tests mask the failure detail
This addresses the first issue. Bug 1334798 in Neutron was created
for the last issue. Oslo team is aware of the second issue.
Related-Bug: #1334798
Change-Id: I8cfa6c93085a26666a8b4bee06dcd52709d04a95
The fanout queues of the fake driver are created at the first executor
poll, but if we use eventlet executor and the fake driver, when the sender
delivers a fanout message before the first poll, the message goes to the
topic queue instead of the server fanout queue.
The changes fixes that by ensuring the all queues exists when the
listener is created.
Closes bug #1331453
Change-Id: I92e7c01dd87d634b741bbcaea92f48730fdd555e
Passing mutable objects as default args is a known Python pitfall.
We'd better avoid this.
Change-Id: I67cc0774a65886ef9fce0b72e52157b622248a85
Closes-Bug: #1327473
When a rpc client try to make a RPC call and the server is unreachable
The rpc call hang until the server come back.
In most case this is the desired behavior.
But sometimes, we can prefer that the library raise an exception after a
certain number of retries.
For example in ceilometer, when publishing a
storage.objects.incoming.bytes sample from the Swift middleware to an
AMQP topic, you might not want to block the Swift client if the AMQP broker
is unavailable - instead, you might have a queueing policy whereby
if a single reconection attempt fails we queue the sample in memory and
try again when another sample is to be published.
This patch is the oslo.messaging part that allow this.
Closes bug #1282639
Co-Authored-By: Ala Rezmerita <ala.rezmerita@cloudwatt.com>
Change-Id: I32086d0abf141c368343bf225d4b021da496c020
For asynchronous programming, a timeout parameter is required on the listener
to allow to stop it at exit. poll() returns None on timeout.
It plan to use it in my new asyncio (Trollius) executor:
https://review.openstack.org/#/c/70983/
See also the related blueprint for the rationale:
https://wiki.openstack.org/wiki/Oslo/blueprints/asyncio
Change-Id: I918ae3c267743a0eaed1d6a210c79fb4a0eb8733
We don't need vim modelines in each source file, it can be set in
user's vimrc.
Change-Id: I615f1ffc7e097f1ddbd908a512f1cb988489e871
Closes-Bug: #1229324
In commit d8d2ad9 we added support for notification listener endpoint
methods to return REQUEUE, but if a driver does not support this we
raise NotImplementedError when the application attempts to requeue
a message.
This requeuing behaviour might only be used by an application in
unusual, exceptional circumstances and catch users by surprise.
Instead, let's require the application to assert that it needs this
feature in advance and raise NotImplementError at that point if the
driver doesn't support it.
Change-Id: Id0bb0e57d2dcc1ec7d752e98c9b1e8e48d99f35c
This patch allow the FakeListener to listen on multiple FakeExchange.
Partial implements blueprint notification-subscriber-server
Change-Id: I9830cc01efdd931f6628853f7e84b947d7b855c9
This patch allow to requeue the notification received by the
notification listener.
Partial implements blueprint notification-subscriber-server
Change-Id: I49c4ba91224c280e479edb19289ccb337a2ab843
The patch add ta abstraction layer to acknowledge a message.
Partial implements blueprint notification-subscriber-server
Change-Id: I6e37780cc28737cfd56b6719ec8d9cebbc9bb278
This patch allow the fake driver to comsume multiple topics
with one listener.
Partial implements blueprint notification-subscriber-server
Change-Id: Ib52dc181e10b487854fbb398eda9f758232a1251
The driver reply() method is actually passed a full sys.exc_info()
tuple.
This was masked in the unit tests because the driver ended up basically
doing:
raise (ValueError, ValueError, ...)
which caused a new ValueError to be instantiated and the test was
satisified. However, if an exception type has some required arguments,
you'll get a TypeError when this statement attempts to instantiate it
with no arguments.
Change-Id: I4af9c5084954d7b9c5f02cdae3387d17c206985b
Nova's cells/rpc_driver.py has some code which allows user of the REST
API to update elements of a cell's transport URL (say, the host name of
the message broker) stored in the database. To achieve this, it has
a parse_transport_url() method which breaks the URL into its constituent
parts and an unparse_transport_url() which re-forms it again after
updating some of its parts.
This is all fine and, since it's fairly specialized, it wouldn't be a
big deal to leave this code in Nova for now ... except the unparse
method looks at CONF.rpc_backend to know what scheme to use in the
returned URL if now backend was specified.
oslo.messaging registers the rpc_backend option, but the ability to
reference any option registered by the library should not be relied upon
by users of the library. Imagine, for instance, if we renamed the option
in future (with backwards compat for old configurations), then this
would mean API breakage.
So, long story short - an API along these lines makes some sense, but
especially since not having it would mean we'd need to add some way to
query the name of the transport driver.
In this commit, we add a simple new TransportURL class:
>>> url = messaging.TransportURL.parse(cfg.CONF, 'foo:///')
>>> str(url), url
('foo:///', <TransportURL transport='foo'>)
>>> url.hosts.append(messaging.TransportHost(hostname='localhost'))
>>> str(url), url
('foo://localhost/', <TransportURL transport='foo', hosts=[<TransportHost hostname='localhost'>]>)
>>> url.transport = None
>>> str(url), url
('kombu://localhost/', <TransportURL transport='kombu', hosts=[<TransportHost hostname='localhost'>]>)
>>> cfg.CONF.set_override('rpc_backend', 'bar')
>>> str(url), url
('bar://localhost/', <TransportURL transport='bar', hosts=[<TransportHost hostname='localhost'>]>)
The TransportURL.parse() method equates to parse_transport_url() and
TransportURL.__str__() equates to unparse_transport().
The transport drivers are also updated to take a TransportURL as a
required argument, which simplifies the handling of transport URLs in
the drivers.
Change-Id: Ic04173476329858e4a2c2d2707e9d4aeb212d127
My original thinking was that if you're using the exchange name to
separate two instances of the applications (so their queues don't
collide) then the exchange name is pretty key to transport
configuration. In fact, it's really a virtual host that you'd use for
this (at least in the case of rabbit and qpid).
Also, Nova's cells code has already moved ahead with the assumption that
the path specifies a virtual host, so it'd only make sense to deviate
from that if there was a really good reason to.
Change-Id: Ic8b5dc3538b6b17afec524047acc2efa76366377
Currently we have a allowed_rpc_exception_modules configuration variable
which we use to configure a per-project list of modules which we will
allow exceptions to be instantiated from when deserializing remote
errors.
It makes no sense for this to be user configurable, instead the list of
modules should be set when you create a transport.
Closes-Bug: #1031719
Change-Id: Ib40e92cb920996ec5e8f63d6f2cbd88fd01a90f2
Review I4e7b19dc730342091fd70a717065741d56da4555 gives a lot of the
background here, but the idea is that some exceptions raised by an RPC
endpoint method do not indicate any sort of failure and should not be
logged by the server as an error.
The classic example of this is conductor's instance_get() method raising
InstanceNotFound. This is perfectly normal and should not be considered
an error.
The new API is a decorator which you can use with RPC endpoints methods
to indicate which exceptions are expected:
@messaging.expected_exceptions(InstanceNotFound)
def instance_get(self, context, instance_id):
...
but we also need to expose the ExpectedException type itself so that
direct "local" users of the endpoint class know what type will be used
to wrap expected exceptions. For example, Nova has an ExceptionHelper
class which unwraps the original exception from an ExpectedException and
re-raises it.
I've changed from client_exceptions() and ClientException to make it
more clear it's intent. I felt that the "client" naming gave the
impression it was intended for use on the client side.
Change-Id: Ieec4600bd6b70cf31ac7925a98a517b84acada4d
Notifications are an unusual case in that we need users to manually opt
in to new incompatible message formats by editing configuration because
there may be external consumers expecting the old format.
Add a send_notification() method to the driver interface and add a
format version paramater to the method, to make it clear that this
version selection is specifically for notifications.
In the case of the rabbit/qpid drivers, the 2.0 format is where we added
the message envelope.
Change-Id: Ib4925c308b1252503749962aa16f043281f2b429
The target preconditions (e.g. you need at least a topic to send) are
the same for all drivers, so enforce them before we ever call into a
driver.
Change-Id: Ic4e9bd94bd9f060ec0662d2bb778c699903dddc4
We appear to not have a use for this. I had originally thought we might
use this to ack messages one they've been processed and replied to, but
we actually have always acked messages as soon as they have been
deserialized and queued for dispatching.
Change-Id: I8e1fd565814f3b5e3ba0f1bc77e62ed52ff08661
Pretty obvious that we need this.
The rabbit/qpid implementations just empty the connection pool, in the
same way their module-level cleanup() methods do now.
Change-Id: I70ba5cab3eb7a30f74cdd6cafe60087769a77b57
By storing the reply_q on the listener, we were assuming there was only
one message being dispatched at the time. Put it on the incoming message
instead and use it directly in reply().
Currently, if there are no servers listening on a topic then a message
to that topic just gets dropped by the fake driver.
This makes the tests intermittently fail if the server takes longer to
start.
Turn things on their head so that the client always creates the queues
on the exchange so that messages can get queued up even if there is no
server listening.
Now we also need to delete the "duplicate server on topic" test - it's
actually fine to have multiple servers listening on the one topic.