Commit Graph

66 Commits

Author SHA1 Message Date
Doug Hellmann e55a83e832 Move files out of the namespace package
Move the public API out of oslo.messaging to oslo_messaging. Retain
the ability to import from the old namespace package for backwards
compatibility for this release cycle.

bp/drop-namespace-packages

Co-authored-by: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
Change-Id: Ia562010c152a214f1c0fed767c82022c7c2c52e7
2015-01-12 12:50:41 -05:00
Jenkins 8102f25dd2 Merge "Fix some comments in a backporting review session" 2015-01-08 19:23:39 +00:00
Mehdi Abaakouk c18f9f7c61 Don't log each received messages
oslo.utils.strutils.mask_password take too much time on
big payload (nova-conductor can receive payload ~ 66k in
largeops jobs for example).

This change removes this logging until we make mask_password
more efficient or we have a smarted the oslo.messaging logging to
not log everything.

Change-Id: Ib1f1d70c5cb820e8ff2de10e6064037808ea1f3a
Closes-bug: #1408362
2015-01-07 18:27:32 +01:00
Jie Li 3e2d142a87 Fix some comments in a backporting review session
* add i18n support to some exception messages
  * remove the return value of DecayingTimer.start()

Relates to review: I898a236a384b3466147026abc7a1ee21801e8ca1

Change-Id: I7adf5478732f1f46db1009b059b66ff8af6ecdc3
2014-12-27 12:01:25 +08:00
Mehdi Abaakouk 15aa5cbda8 The executor doesn't need to set the timeout
It's up to the driver to set a suitable timeout for polling the broker,
this one can be different that the one requested by the driver
caller as long as the caller timeout is respected.

This change also adds a new driver listener API, to be able
to stop it cleanly, specially in case of timeout=None.

Closes bug: #1400268
Closes bug: #1399257
Change-Id: I674c0def1efb420c293897d49683593a0b10e291
2014-12-08 12:59:33 +01:00
Mehdi Abaakouk 023b7f44e2 rabbit: more precise iterconsume timeout
The iterconsume always set the timeout of kombu to 1 second
even the requested timeout more precise or < 1 second.

This change fixes that.

Related bug: #1400268
Related bug: #1399257

Change-Id: I157dab80cdb4afcf9a5f26fa900f96f0696db502
2014-12-08 12:08:01 +01:00
Mehdi Abaakouk cb78f2e43d Rabbit: Fixes debug message format
Add the missing 's' for the message format and
just print the number queues instead of print a list
full of '<object object at 0xXXXXXXXXXX>'.

Change-Id: Idaab4057bc6a41523a1944ae0c8e15e5e885b390
2014-12-02 15:32:34 +01:00
Jenkins 42a2df15dd Merge "Have the timeout decrement inside the wait() method" 2014-12-02 11:10:50 +00:00
Jenkins f2234d291f Merge "Show what the threshold is being increased to" 2014-12-02 07:05:38 +00:00
Joshua Harlow c7d99bf28f Show what the threshold is being increased to
Change-Id: I2182ca88126d636970885fe4403779a879f32aa7
2014-12-01 21:14:23 -08:00
Joshua Harlow f1c7e78a56 Have the timeout decrement inside the wait() method
Currently it appears the timeout provided to the wait()
method will be reused X number of times (where X may be
indeterminate depending on reconnections, loss of messages
and so-on) so instead of reusing it which can potentially
result in a infinite number of calls have a new object be
used that will cause the subsequent timeouts used elsewhere
in the wait function to actually decay correctly.

Closes-bug: #1379394

Change-Id: I12c4ea1eef6b857d12246db0483adaf7c87e740c
2014-11-27 11:22:20 -08:00
Jenkins 0b839c57b7 Merge "Notification listener pools" 2014-11-27 11:56:42 +00:00
Mehdi Abaakouk 7306680bfa Remove unuseful param of the ConnectionContext
The ConnectionContext depends on the ConnectionPool.
A ConnectionPool already known the connection classes,
the configuration object and the url needed to create a new connection.

But we pass again thoses informations when we create a ConnectionContext.
This is unuseful, we can reuse thoses in the connection pool even we
want a not pooled connection.

This change removes the unuseful ConnectionContext parameters,
this also ensures that connection created with or without the pool
are created in the same ways and only at one place (the create method
of the connection pool).

Change-Id: I4bd43d202fa2774ad5dcb0f8dd05e58ba60c6009
2014-11-18 13:58:33 +01:00
Mehdi Abaakouk 30e0aea877 Notification listener pools
We can now set the pool name of a notification listener
to create multiple groups/pools of listeners consuming notifications
and that each group/pool only receives one copy of each notification.

The AMQP implementation of that is to set queue_name with the pool name.

Implements blueprint notification-listener-pools
Closes-bug: #1356226

Change-Id: I8dc0549f5550f684a261c78c58737b798fcdd656
2014-11-14 10:20:18 +01:00
Mehdi Abaakouk f61f7c570f Don't put the message payload into warning log
When a caller msg doesn't exists anymore but a reply is ready for it.
amqp driver drop the whole message into a logging.warn. That can be a
bit huge in some case.

This change just writes the message id to the WARN level and put the queues
and the messages into the debug level.

Change-Id: Ibcc6b066171cdea48f102ca1bd85f81c639fbbb5
2014-10-28 11:38:14 +01:00
Abhijeet Malawade f37800943e Cleanup listener after stopping rpc server
If you don't close the AMQP connection, then the connection
remains open and the next time when the messages are sent on
the listening topic, then some of the messages will not be processed
as there is no dispatcher running to process the message.

Closes-Bug: #1335086
Change-Id: I1f39eedf1500b6b6209ae0222f32e08e304895e0
2014-09-23 03:35:43 -07:00
Alexei Kornienko 7381ccdc2a Should not send replies for cast messages
Cast messages will not contain a msg_id  and we're currently
sending messages to the default excahgne with empty msg_id.
In order to fix it we should not send replies if there is no
msg_id in message.

Change-Id: I5b1142029d2c718c3929cf6cf1f6e958b95a5c96
Closes-bug: #1355058
2014-08-19 13:30:40 +03:00
Victor Stinner 71588adbb5 Fix AMQPListener for polling with timeout
On timeout, poll() should return None, not raise an exception (Timeout).

Add also an unit test.

Change-Id: I1ed5ae2f093841111f0b691dddb879c16d218b73
2014-07-07 14:23:00 +00:00
ChangBo Guo(gcb) 821ee096a6 Removes the use of mutables as default args
Passing mutable objects as default args is a known Python pitfall.
We'd better avoid this.

Change-Id: I67cc0774a65886ef9fce0b72e52157b622248a85
Closes-Bug: #1327473
2014-06-21 11:41:36 +08:00
Jenkins a64692689a Merge "Add an optional timeout parameter to Listener.poll" 2014-06-20 18:50:16 +00:00
Christian Berendt 409108c74f replace string format arguments with function parameters
There are files containing string format arguments inside
logging messages. Using logging function parameters should
be preferred.

Change-Id: I4a7ce6916aefb2f2d445f0ebd977c824b1c51e24
Partial-Bug: #1321274
2014-06-20 14:18:40 +01:00
Mehdi Abaakouk 1ea9c35ab4 Transport reconnection retries for notification
This patch add support of reconnection retries for the
messaging notifier.

Related bug #1282639
Change-Id: Ia30331f8306ff0f6952d83ef42ff8bee6b900427
2014-06-18 18:41:33 +02:00
Mehdi Abaakouk 948c05417c Add transport reconnection retries
When a rpc client try to make a RPC call and the server is unreachable
The rpc call hang until the server come back.

In most case this is the desired behavior.

But sometimes, we can prefer that the library raise an exception after a
certain number of retries.

For example in ceilometer, when publishing a
storage.objects.incoming.bytes sample from the Swift middleware to an
AMQP topic, you might not want to block the Swift client if the AMQP broker
is unavailable - instead, you might have a queueing policy whereby
if a single reconection attempt fails we queue the sample in memory and
try again when another sample is to be published.

This patch is the oslo.messaging part that allow this.

Closes bug #1282639
Co-Authored-By: Ala Rezmerita <ala.rezmerita@cloudwatt.com>

Change-Id: I32086d0abf141c368343bf225d4b021da496c020
2014-06-13 16:21:59 +02:00
Victor Stinner 7fe2ef7334 Add an optional timeout parameter to Listener.poll
For asynchronous programming, a timeout parameter is required on the listener
to allow to stop it at exit. poll() returns None on timeout.

It plan to use it in my new asyncio (Trollius) executor:
https://review.openstack.org/#/c/70983/

See also the related blueprint for the rationale:
https://wiki.openstack.org/wiki/Oslo/blueprints/asyncio

Change-Id: I918ae3c267743a0eaed1d6a210c79fb4a0eb8733
2014-06-13 10:20:39 +02:00
Mehdi Abaakouk f4da213539 Remove amqp default exchange hack
This change remove the hack to set the default exchange of a transport in the
amqp driver, by removing the usage of the configuration object to get the
default exchange in rabbit and qpid driver, and instead use the value
passed to the driver constructor into all amqp publishers and consumers
class/method that needs it.

Closes-bug: #1256345
Change-Id: Iba54ca79a49f8545854205c1451b2403735c1006
2014-05-28 14:12:30 +02:00
Jenkins 17375f41ce Merge "Full support of multiple hosts in transport url" 2014-05-05 15:46:34 +00:00
ChangBo Guo(gcb) 23edc1b4ce Remove str() from LOG.* and exceptions
gettextutils is expecting to receive unicode strings
rather than basestrings. A basestring can cause an
unhandled exception in the logging code. To help avoid
such issues we should remove str() from LOG.* messages and
exceptions. We have verified that the %s formatting code
properly handle getting strings to unicode where necessary.

Copied from https://review.openstack.org/#/c/77722

Change-Id: I082af5c9ae8bf9859382c2c387b10b48358e10b3
Related-Bug: #1286306
2014-04-27 10:21:21 +08:00
Mehdi Abaakouk 53b9d741a8 Full support of multiple hosts in transport url
This patch add the support of multiple hosts in transport url for rabbit and
qpid drivers. And also fix the amqp connection pool management to allow
to have one pool by transport.

Implements blueprint multiple-hosts-support-in-url

Co-Authored-By: Ala Rezmerita <ala.rezmerita@cloudwatt.com>
Change-Id: I5aff24d292b67a7b65e33e7083e245efbbe82024
2014-04-22 17:02:52 +02:00
Mark McLoughlin d5dcfe91db Use driver's notify_send() method again
Drivers have a notify_send() method which allows for special semantics
to be implemented for notification sending. During the port over to
oslo.messaging, it appears we stopped using this and switched to just
using topic_send() instead.

Lets restore the use of notify_send() so that we don't lose behaviours
like I9f8dfd6eaf3996be58fecff6ad91508bdcef23f3 added.

Change-Id: Id0ad0431d3e2b1bfe21723e7f3b1842c2d3a9546
2014-03-18 07:17:49 +00:00
Mark McLoughlin 5464229e63 Do not leak _unique_id out of amqp drivers
In commit d8d2ad9 we delayed when a message's unique ID gets added to
the duplicate message cache in order to allow for message requeueing.

However, as part of this, we exposed the private _unique_id field
outside of the driver. This commit reverses that change by storing
the ID in the AMQPIncomingMessage object.

Change-Id: Ibeb7896de7ad9abf3c6a43495c1a87aabb762c0d
2014-03-03 07:18:54 -08:00
Mehdi Abaakouk d8d2ad95d7 Allow to requeue the notification message
This patch allow to requeue the notification received by the
notification listener.

Partial implements blueprint notification-subscriber-server

Change-Id: I49c4ba91224c280e479edb19289ccb337a2ab843
2014-03-03 09:27:57 +01:00
Mehdi Abaakouk e785a5d994 Make the dispatcher responsible of the message ack
This patch make the dispatcher responsible of the message
acknowledgement.

This is the preliminar step to be able to requeue message.

Partial implements blueprint notification-subscriber-server

Change-Id: If74b47d5e23976d407deb27df7395b1982963c75
2014-02-18 08:31:30 +01:00
Mehdi Abaakouk 0d102aa361 Abstract the acknowledge layer of a message
The patch add ta abstraction layer to acknowledge a message.

Partial implements blueprint notification-subscriber-server

Change-Id: I6e37780cc28737cfd56b6719ec8d9cebbc9bb278
2014-02-14 16:06:26 +01:00
Mehdi Abaakouk 9f58e2c3fe Implements notification listener and dispatcher
This patch allows to quickly create a listener to receive
notification messages.

Example of the api:

class Endpoint(object):
    def warn(self, ctxt, publisher_id, event_type, payload):
        do_something(payload)

target = messaging.Target(topic='notifications', exchange='cinder')
listener = notify.get_notification_listener(transport, [target],
                                            [Endpoint()],
                                            executor,
                                            serializer)

Implements blueprint notification-subscriber-server

Change-Id: I434bc487c382a2048670df726d9bebd640150bb9
2014-02-14 16:06:26 +01:00
Mehdi Abaakouk 11a90eabc9 Allow fake driver to consume multiple topics
This patch allow the fake driver to comsume multiple topics
with one listener.

Partial implements blueprint notification-subscriber-server

Change-Id: Ib52dc181e10b487854fbb398eda9f758232a1251
2014-01-30 13:40:42 +01:00
Chang Bo Guo 2278ce0b95 Use six.moves.queue instead of Queue
The Queue module has been renamed to queue in Python 3.
Use six.moves.queue to fit the change.

http://docs.python.org/2/library/queue.html#module-Queue

Change-Id: I2940f34d161b2e3cbc5464619f76e6adea4ef9f6
2013-12-07 19:35:41 -08:00
Chang Bo Guo 0b078b6062 Fix some typos and adjust capitalization
Change-Id: I61cf108f9746fc44a08d83e11d44ed1007a6a1fa
2013-11-03 07:07:51 -08:00
Mark McLoughlin 7914181398 Properly handle transport URL config on the client
On the client side, in the rabbit and qpid drivers, we use a connection
pool to avoid opening a connection for each message we send. However,
there is only currently one connection pool per process:

 def get_connection_pool(conf, connection_cls):
     with _pool_create_sem:
         # Make sure only one thread tries to create the connection pool.
         if not connection_cls.pool:
             connection_cls.pool = ConnectionPool(conf, connection_cls)
     return connection_cls.pool

This is a nasty artifact of the original RPC having no conectp of a
transport context - everything was a global. We'll fix this soon enough.

In the meantime, we need to make sure we only use this connection pool
where we're not using the default transport configuration from the
config file - i.e. where we supply a transport URL.

The use case here is cells - we send messages to a remote cell by
connecting to it using a transport URL. In our devstack testing, the
two cells are on the same Rabbit broker but under different virtual
hosts. Because we were always using the connection pool on the client
side, we were seeing both cells always send messages to the '/' virtual
host.

Note - avoiding the connection pool in the case of cells is the same
behaviour as the current RPC code:

 def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
     ...
     with ConnectionContext(conf, connection_pool, pooled=False,
                            server_params=server_params) as conn:

Change-Id: I2f35b45ef237bb85ab8faf58a408c03fcb1de9d7
2013-10-25 07:18:36 +01:00
Mark McLoughlin 2ec1fb7572 Properly handle transport:///vhost URL
Currently, if we are supplied with a transport URL with only the virtual
host specified, we completely ignore it. Instead, the behaviour should
be that we use that virtual host with the host, port and credentials
from the config file.

Change-Id: Ic97aa511ddf9bce69b1a5069d9f6468f4bd6dd4c
2013-10-23 07:45:16 +01:00
Jenkins 5c2c32010c Merge "Don't include msg_id or reply_q in casts" 2013-08-26 13:16:42 +00:00
Mark McLoughlin aebe53f242 Fix race-condition in rabbit reply processing
Concurrency. Sigh.

A sequence of events like this is possible:

  - We send a request from thread A
  - Thread B, who is waiting for a response gets scheduled
  - Thread B receives our response and queues it up
  - Thread B receives its own response and drops the connection lock
  - Thread A grabs the connection lock and wait for a response to arrive

The obvious solution is that when we grab the connection lock, we should
check whether a previous lock-holding thread had already received our
response and queued it up.

Change-Id: I88b0d55d5a40814a84d82ed4f42d5ba85d2ef7e0
2013-08-26 12:47:04 +01:00
Mark McLoughlin f7cf85333c Don't include msg_id or reply_q in casts
On the server side, we only send replies if the request included a
_msg_id key. Also, the _reply_q key is only used when we wish to send a
reply.

So, in order to retain the exact same on-the-wire behaviour and ensure
servers aren't sending replies where none is needed, only include these
keys if we're doing a call (i.e. wait_for_reply=True).

Change-Id: Iac329493252be7d94b1ebe24f00e4d3f5c61d269
2013-08-26 10:23:06 +01:00
Davanum Srinivas 0555e939be Fix dictionary changed size during iteration
Make a copy of the keys before we operate on it

Fixes LP# : 1212854

Change-Id: I431ffb3878863e9be6d1a35078f7d7c3edf4b133
2013-08-17 22:04:05 -04:00
Mark McLoughlin c79bd1f24c Fix rabbit driver handling of None, etc. replies
Just like the bug in the fake driver, we have a similar rather
embarassing and obvious bug - we're currently not allowing endpoint
methods to send replies of None, '', False, [], {}, etc.

Change-Id: Icf0abdfcf122c5757dd3737f66130b3a53769ef6
2013-08-17 21:40:02 +01:00
Mark McLoughlin c846cf35b8 Add a TransportURL class to the public API
Nova's cells/rpc_driver.py has some code which allows user of the REST
API to update elements of a cell's transport URL (say, the host name of
the message broker) stored in the database. To achieve this, it has
a parse_transport_url() method which breaks the URL into its constituent
parts and an unparse_transport_url() which re-forms it again after
updating some of its parts.

This is all fine and, since it's fairly specialized, it wouldn't be a
big deal to leave this code in Nova for now ... except the unparse
method looks at CONF.rpc_backend to know what scheme to use in the
returned URL if now backend was specified.

oslo.messaging registers the rpc_backend option, but the ability to
reference any option registered by the library should not be relied upon
by users of the library. Imagine, for instance, if we renamed the option
in future (with backwards compat for old configurations), then this
would mean API breakage.

So, long story short - an API along these lines makes some sense, but
especially since not having it would mean we'd need to add some way to
query the name of the transport driver.

In this commit, we add a simple new TransportURL class:

  >>> url = messaging.TransportURL.parse(cfg.CONF, 'foo:///')
  >>> str(url), url
  ('foo:///', <TransportURL transport='foo'>)
  >>> url.hosts.append(messaging.TransportHost(hostname='localhost'))
  >>> str(url), url
  ('foo://localhost/', <TransportURL transport='foo', hosts=[<TransportHost hostname='localhost'>]>)
  >>> url.transport = None
  >>> str(url), url
  ('kombu://localhost/', <TransportURL transport='kombu', hosts=[<TransportHost hostname='localhost'>]>)
  >>> cfg.CONF.set_override('rpc_backend', 'bar')
  >>> str(url), url
  ('bar://localhost/', <TransportURL transport='bar', hosts=[<TransportHost hostname='localhost'>]>)

The TransportURL.parse() method equates to parse_transport_url() and
TransportURL.__str__() equates to unparse_transport().

The transport drivers are also updated to take a TransportURL as a
required argument, which simplifies the handling of transport URLs in
the drivers.

Change-Id: Ic04173476329858e4a2c2d2707e9d4aeb212d127
2013-08-12 23:30:43 +01:00
Jenkins 514e91cc95 Merge "Add transport URL support to rabbit driver" 2013-08-12 09:24:42 +00:00
Jenkins 3a7dde4fe8 Merge "Kill ability to specify exchange in transport URL" 2013-08-12 09:24:08 +00:00
Jenkins f3b30fde49 Merge "Add thread-local store of request context" 2013-08-12 09:23:42 +00:00
Mark McLoughlin 9c110a4c94 Add transport URL support to rabbit driver
If a transport URL is supplied, transform it into the server_params
format which was previously used for cast_to_server() etc.

Change-Id: I453734a71748dc8d3ffc02ead7bfb92ffb0a6c7c
2013-08-12 07:50:30 +01:00
Mark McLoughlin 9cc66e1e01 Kill ability to specify exchange in transport URL
My original thinking was that if you're using the exchange name to
separate two instances of the applications (so their queues don't
collide) then the exchange name is pretty key to transport
configuration. In fact, it's really a virtual host that you'd use for
this (at least in the case of rabbit and qpid).

Also, Nova's cells code has already moved ahead with the assumption that
the path specifies a virtual host, so it'd only make sense to deviate
from that if there was a really good reason to.

Change-Id: Ic8b5dc3538b6b17afec524047acc2efa76366377
2013-08-12 06:09:48 +01:00