Merge remote-tracking branch 'origin/master' into merge-branch

Change-Id: Ice6dcc3867dff6b7fea5647a3b79d1c765bf9d73
This commit is contained in:
Davanum Srinivas 2015-12-14 16:21:25 +02:00
commit cc0f8cc8a9
107 changed files with 3988 additions and 2796 deletions

1
.gitignore vendored
View File

@ -7,6 +7,7 @@ ChangeLog
.tox
.coverage
*.egg-info/
.eggs
*.egg
build/
doc/build/

362
bandit.yaml Normal file
View File

@ -0,0 +1,362 @@
# optional: after how many files to update progress
#show_progress_every: 100
# optional: plugins directory name
#plugins_dir: plugins
# optional: plugins discovery name pattern
plugin_name_pattern: '*.py'
# optional: terminal escape sequences to display colors
#output_colors:
# DEFAULT: \033[0m
# HEADER: \033[95m
# LOW: \033[94m
# MEDIUM: \033[93m
# HIGH: \033[91m
# optional: log format string
#log_format: "[%(module)s]\t%(levelname)s\t%(message)s"
# globs of files which should be analyzed
include:
- '*.py'
- '*.pyw'
# a list of strings, which if found in the path will cause files to be excluded
# for example /tests/ - to remove all files in tests directory
exclude_dirs:
- '/tests/'
profiles:
oslo.messaging:
include:
- any_other_function_with_shell_equals_true
# Some occurrences in the olso.messaging code, but not much to do
# to get rid of these warnings, so just skip this.
# - assert_used
- blacklist_calls
- blacklist_import_func
- blacklist_imports
- exec_used
- execute_with_run_as_root_equals_true
- hardcoded_bind_all_interfaces
- hardcoded_password_string
- hardcoded_password_funcarg
- hardcoded_password_default
- hardcoded_sql_expressions
- hardcoded_tmp_directory
- jinja2_autoescape_false
- linux_commands_wildcard_injection
- paramiko_calls
- password_config_option_not_marked_secret
- request_with_no_cert_validation
- set_bad_file_permissions
- subprocess_popen_with_shell_equals_true
- subprocess_without_shell_equals_true
- start_process_with_a_shell
- start_process_with_no_shell
- start_process_with_partial_path
- ssl_with_bad_defaults
- ssl_with_bad_version
- ssl_with_no_version
# This might be nice to have, but we currently ignore a lot of
# exceptions during the cleanup phases, so this throws a lot
# false positives.
# - try_except_pass
- use_of_mako_templates
- weak_cryptographic_key
XSS:
include:
- jinja2_autoescape_false
- use_of_mako_templates
ShellInjection:
include:
- subprocess_popen_with_shell_equals_true
- subprocess_without_shell_equals_true
- any_other_function_with_shell_equals_true
- start_process_with_a_shell
- start_process_with_no_shell
- start_process_with_partial_path
exclude:
SqlInjection:
include:
- hardcoded_sql_expressions
blacklist_calls:
bad_name_sets:
- pickle:
qualnames:
- pickle.loads
- pickle.load
- pickle.Unpickler
- cPickle.loads
- cPickle.load
- cPickle.Unpickler
message: >
Pickle library appears to be in use, possible security issue.
- marshal:
qualnames: [marshal.load, marshal.loads]
message: >
Deserialization with the marshal module is possibly dangerous.
- md5:
qualnames:
- hashlib.md5
- Crypto.Hash.MD2.new
- Crypto.Hash.MD4.new
- Crypto.Hash.MD5.new
- cryptography.hazmat.primitives.hashes.MD5
message: Use of insecure MD2, MD4, or MD5 hash function.
- ciphers:
qualnames:
- Crypto.Cipher.ARC2.new
- Crypto.Cipher.ARC4.new
- Crypto.Cipher.Blowfish.new
- Crypto.Cipher.DES.new
- Crypto.Cipher.XOR.new
- cryptography.hazmat.primitives.ciphers.algorithms.ARC4
- cryptography.hazmat.primitives.ciphers.algorithms.Blowfish
- cryptography.hazmat.primitives.ciphers.algorithms.IDEA
message: >
Use of insecure cipher {func}. Replace with a known secure
cipher such as AES.
level: HIGH
- cipher_modes:
qualnames:
- cryptography.hazmat.primitives.ciphers.modes.ECB
message: Use of insecure cipher mode {func}.
- mktemp_q:
qualnames: [tempfile.mktemp]
message: Use of insecure and deprecated function (mktemp).
- eval:
qualnames: [eval]
message: >
Use of possibly insecure function - consider using safer
ast.literal_eval.
- mark_safe:
names: [mark_safe]
message: >
Use of mark_safe() may expose cross-site scripting
vulnerabilities and should be reviewed.
- httpsconnection:
qualnames: [httplib.HTTPSConnection]
message: >
Use of HTTPSConnection does not provide security, see
https://wiki.openstack.org/wiki/OSSN/OSSN-0033
- yaml_load:
qualnames: [yaml.load]
message: >
Use of unsafe yaml load. Allows instantiation of arbitrary
objects. Consider yaml.safe_load().
- urllib_urlopen:
qualnames:
- urllib.urlopen
- urllib.urlretrieve
- urllib.URLopener
- urllib.FancyURLopener
- urllib2.urlopen
- urllib2.Request
message: >
Audit url open for permitted schemes. Allowing use of file:/ or
custom schemes is often unexpected.
- telnetlib:
qualnames:
- telnetlib.*
message: >
Telnet-related funtions are being called. Telnet is considered
insecure. Use SSH or some other encrypted protocol.
level: HIGH
# Most of this is based off of Christian Heimes' work on defusedxml:
# https://pypi.python.org/pypi/defusedxml/#defusedxml-sax
- xml_bad_cElementTree:
qualnames:
- xml.etree.cElementTree.parse
- xml.etree.cElementTree.iterparse
- xml.etree.cElementTree.fromstring
- xml.etree.cElementTree.XMLParser
message: >
Using {func} to parse untrusted XML data is known to be
vulnerable to XML attacks. Replace {func} with its defusedxml
equivalent function.
- xml_bad_ElementTree:
qualnames:
- xml.etree.ElementTree.parse
- xml.etree.ElementTree.iterparse
- xml.etree.ElementTree.fromstring
- xml.etree.ElementTree.XMLParser
message: >
Using {func} to parse untrusted XML data is known to be
vulnerable to XML attacks. Replace {func} with its defusedxml
equivalent function.
- xml_bad_expatreader:
qualnames: [xml.sax.expatreader.create_parser]
message: >
Using {func} to parse untrusted XML data is known to be
vulnerable to XML attacks. Replace {func} with its defusedxml
equivalent function.
- xml_bad_expatbuilder:
qualnames:
- xml.dom.expatbuilder.parse
- xml.dom.expatbuilder.parseString
message: >
Using {func} to parse untrusted XML data is known to be
vulnerable to XML attacks. Replace {func} with its defusedxml
equivalent function.
- xml_bad_sax:
qualnames:
- xml.sax.parse
- xml.sax.parseString
- xml.sax.make_parser
message: >
Using {func} to parse untrusted XML data is known to be
vulnerable to XML attacks. Replace {func} with its defusedxml
equivalent function.
- xml_bad_minidom:
qualnames:
- xml.dom.minidom.parse
- xml.dom.minidom.parseString
message: >
Using {func} to parse untrusted XML data is known to be
vulnerable to XML attacks. Replace {func} with its defusedxml
equivalent function.
- xml_bad_pulldom:
qualnames:
- xml.dom.pulldom.parse
- xml.dom.pulldom.parseString
message: >
Using {func} to parse untrusted XML data is known to be
vulnerable to XML attacks. Replace {func} with its defusedxml
equivalent function.
- xml_bad_etree:
qualnames:
- lxml.etree.parse
- lxml.etree.fromstring
- lxml.etree.RestrictedElement
- lxml.etree.GlobalParserTLS
- lxml.etree.getDefaultParser
- lxml.etree.check_docinfo
message: >
Using {func} to parse untrusted XML data is known to be
vulnerable to XML attacks. Replace {func} with its defusedxml
equivalent function.
shell_injection:
# Start a process using the subprocess module, or one of its wrappers.
subprocess:
- subprocess.Popen
- subprocess.call
- subprocess.check_call
- subprocess.check_output
- utils.execute
- utils.execute_with_timeout
# Start a process with a function vulnerable to shell injection.
shell:
- os.system
- os.popen
- os.popen2
- os.popen3
- os.popen4
- popen2.popen2
- popen2.popen3
- popen2.popen4
- popen2.Popen3
- popen2.Popen4
- commands.getoutput
- commands.getstatusoutput
# Start a process with a function that is not vulnerable to shell injection.
no_shell:
- os.execl
- os.execle
- os.execlp
- os.execlpe
- os.execv
- os.execve
- os.execvp
- os.execvpe
- os.spawnl
- os.spawnle
- os.spawnlp
- os.spawnlpe
- os.spawnv
- os.spawnve
- os.spawnvp
- os.spawnvpe
- os.startfile
blacklist_imports:
bad_import_sets:
- telnet:
imports: [telnetlib]
level: HIGH
message: >
A telnet-related module is being imported. Telnet is
considered insecure. Use SSH or some other encrypted protocol.
- info_libs:
imports: [pickle, cPickle, subprocess, Crypto]
level: LOW
message: >
Consider possible security implications associated with
{module} module.
# Most of this is based off of Christian Heimes' work on defusedxml:
# https://pypi.python.org/pypi/defusedxml/#defusedxml-sax
- xml_libs:
imports:
- xml.etree.cElementTree
- xml.etree.ElementTree
- xml.sax.expatreader
- xml.sax
- xml.dom.expatbuilder
- xml.dom.minidom
- xml.dom.pulldom
- lxml.etree
- lxml
message: >
Using {module} to parse untrusted XML data is known to be
vulnerable to XML attacks. Replace {module} with the equivalent
defusedxml package.
level: LOW
- xml_libs_high:
imports: [xmlrpclib]
message: >
Using {module} to parse untrusted XML data is known to be
vulnerable to XML attacks. Use defused.xmlrpc.monkey_patch()
function to monkey-patch xmlrpclib and mitigate XML
vulnerabilities.
level: HIGH
hardcoded_tmp_directory:
tmp_dirs: [/tmp, /var/tmp, /dev/shm]
hardcoded_password:
# Support for full path, relative path and special "%(site_data_dir)s"
# substitution (/usr/{local}/share)
word_list: "%(site_data_dir)s/wordlist/default-passwords"
ssl_with_bad_version:
bad_protocol_versions:
- PROTOCOL_SSLv2
- SSLv2_METHOD
- SSLv23_METHOD
- PROTOCOL_SSLv3 # strict option
- PROTOCOL_TLSv1 # strict option
- SSLv3_METHOD # strict option
- TLSv1_METHOD # strict option
password_config_option_not_marked_secret:
function_names:
- oslo.config.cfg.StrOpt
- oslo_config.cfg.StrOpt
execute_with_run_as_root_equals_true:
function_names:
- ceilometer.utils.execute
- cinder.utils.execute
- neutron.agent.linux.utils.execute
- nova.utils.execute
- nova.utils.trycmd

View File

@ -6,7 +6,8 @@ I don't need notifications on the message bus. How do I disable them?
=====================================================================
Notification messages can be disabled using the ``noop`` notify
driver. Set ``notification_driver = noop`` in your configuration file.
driver. Set ``driver = noop`` in your configuration file under the
[oslo_messaging_notifications] section.
Why does the notification publisher create queues, too? Shouldn't the subscriber do that?
=========================================================================================
@ -26,9 +27,9 @@ notification "level". The default topic is ``notifications``, so an
info-level notification is published to the topic
``notifications.info``. A subscriber queue of the same name is created
automatically for each of these topics. To change the queue names,
change the notification topic using the ``notification_topics``
configuration option. The option accepts a list of values, so it is
possible to publish to multiple topics.
change the notification topic using the ``topics``
configuration option in ``[oslo_messaging_notifications]``. The option
accepts a list of values, so it is possible to publish to multiple topics.
What are the other choices of notification drivers available?
=============================================================

View File

@ -23,6 +23,7 @@ Contents
opts
conffixture
drivers
supported-messaging-drivers
AMQP1.0
zmq_driver
FAQ

View File

@ -0,0 +1,60 @@
=============================
Supported Messaging Drivers
=============================
RabbitMQ may not be sufficient for the entire community as the community
grows. Pluggability is still something we should maintain, but we should
have a very high standard for drivers that are shipped and documented
as being supported.
This document defines a very clear policy as to the requirements
for drivers to be carried in oslo.messaging and thus supported by the
OpenStack community as a whole. We will deprecate any drivers that do not
meet the requirements, and announce said deprecations in any appropriate
channels to give users time to signal their needs. Deprecation will last
for two release cycles before removing the code. We will also review and
update documentation to annotate which drivers are supported and which
are deprecated given these policies
Policy
------
Testing
~~~~~~~
* Must have unit and/or functional test coverage of at least 60% as
reported by coverage report. Unit tests must be run for all versions
of python oslo.messaging currently gates on.
* Must have integration testing including at least 3 popular oslo.messaging
dependents, preferrably at the minimum a devstack-gate job with Nova,
Cinder, and Neutron.
* All testing above must be voting in the gate of oslo.messaging.
Documentation
~~~~~~~~~~~~~
* Must have a reasonable amount of documentation including documentation
in the official OpenStack deployment guide.
Support
~~~~~~~
* Must have at least two individuals from the community commited to
triaging and fixing bugs, and responding to test failures in a timely
manner.
Prospective Drivers
~~~~~~~~~~~~~~~~~~~
* Drivers that intend to meet the requirements above, but that do not yet
meet them will be given one full release cycle, or 6 months, whichever
is longer, to comply before being marked for deprecation. Their use,
however, will not be supported by the community. This will prevent a
chicken and egg problem for new drivers.
.. note::
This work is licensed under a Creative Commons Attribution 3.0 Unported License.
http://creativecommons.org/licenses/by/3.0/legalcode

View File

@ -25,6 +25,4 @@ different 3rd party libraries that don't ensure that. In certain
cases, with some drivers, it does work:
* rabbit: works only if no connection have already been established.
* qpid: doesn't work (The qpid library has a global state that uses
file descriptors that can't be reset)
* amqp1: works

View File

@ -8,9 +8,9 @@ ZeroMQ Driver Deployment Guide
Introduction
============
0MQ (also known as ZeroMQ or zmq) looks like an embeddable
networking library but acts like a concurrency framework. It gives
you sockets that carry atomic messages across various transports
0MQ (also known as ZeroMQ or zmq) is embeddable networking library
but acts like a concurrency framework. It gives you sockets
that carry atomic messages across various transports
like in-process, inter-process, TCP, and multicast. You can connect
sockets N-to-N with patterns like fan-out, pub-sub, task distribution,
and request-reply. It's fast enough to be the fabric for clustered
@ -45,7 +45,7 @@ Juno release, as almost all the core projects in OpenStack have switched to
oslo_messaging, ZeroMQ can be the only RPC driver across the OpenStack cluster.
This document provides deployment information for this driver in oslo_messaging.
Other than AMQP-based drivers, like RabbitMQ or Qpid, ZeroMQ doesn't have
Other than AMQP-based drivers, like RabbitMQ, ZeroMQ doesn't have
any central brokers in oslo.messaging, instead, each host (running OpenStack
services) is both ZeroMQ client and server. As a result, each host needs to
listen to a certain TCP port for incoming connections and directly connect
@ -96,8 +96,9 @@ must be set to the hostname of the current node.
rpc_backend = zmq
rpc_zmq_host = {hostname}
Match Making (mandatory)
-------------------------
------------------------
The ZeroMQ driver implements a matching capability to discover hosts available
for communication when sending to a bare topic. This allows broker-less
@ -105,35 +106,20 @@ communications.
The MatchMaker is pluggable and it provides two different MatchMaker classes.
MatchMakerLocalhost: default matchmaker driver for all-in-one scenario (messages
DummyMatchMaker: default matchmaker driver for all-in-one scenario (messages
are sent to itself).
MatchMakerRing: loads a static hash table from a JSON file, sends messages to
a certain host via directed topics or cycles hosts per bare topic and supports
broker-less fanout messaging. On fanout messages returns an array of directed
topics (messages are sent to all destinations).
MatchMakerRedis: loads the hash table from a remote Redis server, supports
RedisMatchMaker: loads the hash table from a remote Redis server, supports
dynamic host/topic registrations, host expiration, and hooks for consuming
applications to acknowledge or neg-acknowledge topic.host service availability.
To set the MatchMaker class, use option 'rpc_zmq_matchmaker' in [DEFAULT].
rpc_zmq_matchmaker = local
or
rpc_zmq_matchmaker = ring
rpc_zmq_matchmaker = dummy
or
rpc_zmq_matchmaker = redis
To specify the ring file for MatchMakerRing, use option 'ringfile' in
[matchmaker_ring].
For example::
[matchmaker_ring]
ringfile = /etc/oslo/oslo_matchmaker_ring.json
To specify the Redis server for MatchMakerRedis, use options in
To specify the Redis server for RedisMatchMaker, use options in
[matchmaker_redis] of each project.
[matchmaker_redis]
@ -141,47 +127,36 @@ To specify the Redis server for MatchMakerRedis, use options in
port = 6379
password = None
MatchMaker Data Source (mandatory)
-----------------------------------
----------------------------------
MatchMaker data source is stored in files or Redis server discussed in the
previous section. How to make up the database is the key issue for making ZeroMQ
driver work.
If deploying the MatchMakerRing, a ring file is required. The format of the ring
file should contain a hash where each key is a base topic and the values are
hostname arrays to be sent to.
For example::
/etc/oslo/oslo_matchmaker_ring.json
{
"scheduler": ["host1", "host2"],
"conductor": ["host1", "host2"],
}
The AMQP-based methods like RabbitMQ and Qpid don't require any knowledge
about the source and destination of any topic. However, ZeroMQ driver
with MatchMakerRing does. The challenging task is that you should learn
and get all the (K, V) pairs from each OpenStack project to make up the
matchmaker ring file.
If deploying the MatchMakerRedis, a Redis server is required. Each (K, V) pair
If deploying the RedisMatchMaker, a Redis server is required. Each (K, V) pair
stored in Redis is that the key is a base topic and the corresponding values are
hostname arrays to be sent to.
Message Receivers (mandatory)
-------------------------------
Each machine running OpenStack services, or sending RPC messages, must run the
'oslo-messaging-zmq-receiver' daemon. This receives replies to call requests and
routes responses via IPC to blocked callers.
Proxy to avoid blocking (optional)
----------------------------------
The way that deploy the receiver process is to run it under a new user 'oslo'
and give all openstack daemons access via group membership of 'oslo' - this
supports using /var/run/openstack as a shared IPC directory for all openstack
processes, allowing different services to be hosted on the same server, served
by a single oslo-messaging-zmq-receiver process.
Each machine running OpenStack services, or sending RPC messages, may run the
'oslo-messaging-zmq-broker' daemon. This is needed to avoid blocking
if a listener (server) appears after the sender (client).
Running the local broker (proxy) or not is defined by the option 'zmq_use_broker'
(True by default). This option can be set in [DEFAULT] section.
For example::
zmq_use_broker = False
In case of using the broker all publishers (clients) talk to servers over
the local broker connecting to it via IPC transport.
The IPC runtime directory, 'rpc_zmq_ipc_dir', can be set in [DEFAULT] section.
@ -191,28 +166,14 @@ For example::
The parameters for the script oslo-messaging-zmq-receiver should be::
oslo-messaging-zmq-receiver
oslo-messaging-zmq-broker
--config-file /etc/oslo/zeromq.conf
--log-file /var/log/oslo/zmq-receiver.log
--log-file /var/log/oslo/zmq-broker.log
You can specify ZeroMQ options in /etc/oslo/zeromq.conf if necessary.
Thread Pool (optional)
-----------------------
Each service will launch threads for incoming requests. These threads are
maintained via a pool, the maximum number of threads is limited by
rpc_thread_pool_size. The default value is 1024. (This is a common RPC
configuration variable, also applicable to Kombu and Qpid)
This configuration can be set in [DEFAULT] section.
For example::
rpc_thread_pool_size = 1024
Listening Address (optional)
------------------------------
----------------------------
All services bind to an IP address or Ethernet adapter. By default, all services
bind to '*', effectively binding to 0.0.0.0. This may be changed with the option
@ -224,18 +185,40 @@ For example::
rpc_zmq_bind_address = *
Currently zmq driver uses dynamic port binding mechanism, which means that
each listener will allocate port of a random number. Ports range is controlled
by two options 'rpc_zmq_min_port' and 'rpc_zmq_max_port'. Change them to
restrict current service's port binding range. 'rpc_zmq_bind_port_retries'
controls number of retries before 'ports range exceeded' failure.
For example::
rpc_zmq_min_port = 9050
rpc_zmq_max_port = 10050
rpc_zmq_bind_port_retries = 100
DevStack Support
----------------
ZeroMQ driver has been supported by DevStack. The configuration is as follows::
ENABLED_SERVICES+=,-rabbit,-qpid,zeromq
ENABLED_SERVICES+=,-rabbit,zeromq
ZEROMQ_MATCHMAKER=redis
In local.conf [localrc] section need to enable zmq plugin which lives in
`devstack-plugin-zmq`_ repository.
For example::
enable_plugin zmq https://github.com/openstack/devstack-plugin-zmq.git
.. _devstack-plugin-zmq: https://github.com/openstack/devstack-plugin-zmq.git
Current Status
---------------
--------------
The current development status of ZeroMQ driver is shown in `wiki`_.
.. _wiki: https://wiki.openstack.org/ZeroMQ

View File

@ -8,19 +8,18 @@
# OpenStack Infra <zanata@openstack.org>, 2015. #zanata
msgid ""
msgstr ""
"Project-Id-Version: oslo.messaging 2.5.1.dev7\n"
"Project-Id-Version: oslo.messaging 2.7.1.dev15\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-09-16 18:44+0000\n"
"POT-Creation-Date: 2015-10-23 06:27+0000\n"
"PO-Revision-Date: 2015-08-27 12:47+0000\n"
"Last-Translator: Andi Chandler <andi@gowling.com>\n"
"Language-Team: English (United Kingdom) (http://www.transifex.com/openstack/"
"oslomessaging/language/en_GB/)\n"
"Language-Team: English (United Kingdom)\n"
"Language: en-GB\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.0\n"
"Plural-Forms: nplurals=2; plural=(n != 1);\n"
"Generated-By: Babel 2.0\n"
"X-Generator: Zanata 3.7.1\n"
#, python-format

View File

@ -8,19 +8,18 @@
# OpenStack Infra <zanata@openstack.org>, 2015. #zanata
msgid ""
msgstr ""
"Project-Id-Version: oslo.messaging 2.5.1.dev7\n"
"Project-Id-Version: oslo.messaging 2.7.1.dev15\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-09-16 18:44+0000\n"
"POT-Creation-Date: 2015-10-23 06:26+0000\n"
"PO-Revision-Date: 2015-08-27 12:47+0000\n"
"Last-Translator: Andi Chandler <andi@gowling.com>\n"
"Language-Team: English (United Kingdom) (http://www.transifex.com/openstack/"
"oslomessaging/language/en_GB/)\n"
"Language-Team: English (United Kingdom)\n"
"Language: en-GB\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.0\n"
"Plural-Forms: nplurals=2; plural=(n != 1);\n"
"Generated-By: Babel 2.0\n"
"X-Generator: Zanata 3.7.1\n"
#, python-format

View File

@ -8,28 +8,24 @@
# OpenStack Infra <zanata@openstack.org>, 2015. #zanata
msgid ""
msgstr ""
"Project-Id-Version: oslo.messaging 2.5.1.dev7\n"
"Project-Id-Version: oslo.messaging 2.7.1.dev15\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-09-16 18:44+0000\n"
"POT-Creation-Date: 2015-10-23 06:27+0000\n"
"PO-Revision-Date: 2015-08-27 12:55+0000\n"
"Last-Translator: Andi Chandler <andi@gowling.com>\n"
"Language-Team: English (United Kingdom) (http://www.transifex.com/openstack/"
"oslomessaging/language/en_GB/)\n"
"Language-Team: English (United Kingdom)\n"
"Language: en-GB\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.0\n"
"Plural-Forms: nplurals=2; plural=(n != 1);\n"
"Generated-By: Babel 2.0\n"
"X-Generator: Zanata 3.7.1\n"
#, python-format
msgid "Failed to load any notifiers for %s"
msgstr "Failed to load any notifiers for %s"
msgid "start/stop/wait must be called in the same thread"
msgstr "start/stop/wait must be called in the same thread"
msgid ""
"wait() should be called after stop() as it waits for existing messages to "
"finish processing"

View File

@ -6,21 +6,22 @@
# Translators:
# Adriana Chisco Landazábal <achisco94@gmail.com>, 2015
# Miriam Godinez <miriamgc@hotmail.com>, 2015
# OpenStack Infra <zanata@openstack.org>, 2015. #zanata
msgid ""
msgstr ""
"Project-Id-Version: oslo.messaging\n"
"Project-Id-Version: oslo.messaging 2.7.1.dev15\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-09-08 06:18+0000\n"
"PO-Revision-Date: 2015-09-07 22:46+0000\n"
"POT-Creation-Date: 2015-10-23 06:27+0000\n"
"PO-Revision-Date: 2015-09-07 10:46+0000\n"
"Last-Translator: Miriam Godinez <miriamgc@hotmail.com>\n"
"Language-Team: Spanish (http://www.transifex.com/openstack/oslomessaging/"
"language/es/)\n"
"Language-Team: Spanish\n"
"Language: es\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.0\n"
"Plural-Forms: nplurals=2; plural=(n != 1);\n"
"Generated-By: Babel 2.0\n"
"X-Generator: Zanata 3.7.1\n"
#, python-format
msgid "An exception occurred processing the API call: %s "

View File

@ -1,26 +0,0 @@
# Translations template for oslo.messaging.
# Copyright (C) 2015 ORGANIZATION
# This file is distributed under the same license as the oslo.messaging
# project.
#
# Translators:
# Adriana Chisco Landazábal <achisco94@gmail.com>, 2015
# Lucía Pradillos <dreamers_88@hotmail.com>, 2015
msgid ""
msgstr ""
"Project-Id-Version: oslo.messaging\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-08-26 06:33+0000\n"
"PO-Revision-Date: 2015-08-26 03:46+0000\n"
"Last-Translator: Lucía Pradillos <dreamers_88@hotmail.com>\n"
"Language-Team: Spanish (http://www.transifex.com/openstack/oslomessaging/"
"language/es/)\n"
"Language: es\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.0\n"
"Plural-Forms: nplurals=2; plural=(n != 1);\n"
msgid "start/stop/wait must be called in the same thread"
msgstr "empezar/parar/esperar debe ser llamado en el mismo hilo"

View File

@ -5,21 +5,22 @@
#
# Translators:
# Maxime COQUEREL <max.coquerel@gmail.com>, 2014
# OpenStack Infra <zanata@openstack.org>, 2015. #zanata
msgid ""
msgstr ""
"Project-Id-Version: oslo.messaging\n"
"Project-Id-Version: oslo.messaging 2.7.1.dev15\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-08-04 06:29+0000\n"
"POT-Creation-Date: 2015-10-23 06:27+0000\n"
"PO-Revision-Date: 2014-09-25 08:57+0000\n"
"Last-Translator: Maxime COQUEREL <max.coquerel@gmail.com>\n"
"Language-Team: French (http://www.transifex.com/openstack/oslomessaging/"
"language/fr/)\n"
"Language-Team: French\n"
"Language: fr\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.0\n"
"Plural-Forms: nplurals=2; plural=(n > 1);\n"
"Generated-By: Babel 2.0\n"
"X-Generator: Zanata 3.7.1\n"
#, python-format
msgid "An exception occurred processing the API call: %s "

View File

@ -1,26 +0,0 @@
# Translations template for oslo.messaging.
# Copyright (C) 2015 ORGANIZATION
# This file is distributed under the same license as the oslo.messaging
# project.
#
# Translators:
# Lucas Mascaro <mascaro.lucas@yahoo.fr>, 2015
# Maxime COQUEREL <max.coquerel@gmail.com>, 2014
msgid ""
msgstr ""
"Project-Id-Version: oslo.messaging\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-08-18 06:33+0000\n"
"PO-Revision-Date: 2015-08-17 22:45+0000\n"
"Last-Translator: Lucas Mascaro <mascaro.lucas@yahoo.fr>\n"
"Language-Team: French (http://www.transifex.com/openstack/oslomessaging/"
"language/fr/)\n"
"Language: fr\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.0\n"
"Plural-Forms: nplurals=2; plural=(n > 1);\n"
msgid "start/stop/wait must be called in the same thread"
msgstr "start/stop/wait doivent être appellés dans le même thread "

View File

@ -1,20 +0,0 @@
# Translations template for oslo.messaging.
# Copyright (C) 2015 ORGANIZATION
# This file is distributed under the same license as the oslo.messaging
# project.
# FIRST AUTHOR <EMAIL@ADDRESS>, 2015.
#
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: oslo.messaging 2.1.0\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-07-29 06:39+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.0\n"

View File

@ -7,20 +7,16 @@
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: oslo.messaging 2.4.1.dev1\n"
"Project-Id-Version: oslo.messaging 2.7.1.dev15\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-08-26 06:33+0000\n"
"POT-Creation-Date: 2015-10-23 06:27+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=utf-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.0\n"
#: oslo_messaging/server.py:145
msgid "start/stop/wait must be called in the same thread"
msgstr ""
"Generated-By: Babel 2.1.1\n"
#: oslo_messaging/server.py:178
msgid ""
@ -28,6 +24,14 @@ msgid ""
" finish processing"
msgstr ""
#: oslo_messaging/server.py:191
#, python-format
msgid ""
"wait() should have been called after stop() as wait() waits for existing "
"messages to finish processing, it has been %0.2f seconds and stop() still"
" has not been called"
msgstr ""
#: oslo_messaging/notify/_impl_routing.py:80
#, python-format
msgid "Failed to load any notifiers for %s"

View File

@ -5,23 +5,24 @@
#
# Translators:
# kogamatranslator49 <r.podarov@yandex.ru>, 2015
# OpenStack Infra <zanata@openstack.org>, 2015. #zanata
msgid ""
msgstr ""
"Project-Id-Version: oslo.messaging\n"
"Project-Id-Version: oslo.messaging 2.7.1.dev15\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2015-08-04 06:29+0000\n"
"POT-Creation-Date: 2015-10-23 06:27+0000\n"
"PO-Revision-Date: 2015-07-05 11:39+0000\n"
"Last-Translator: kogamatranslator49 <r.podarov@yandex.ru>\n"
"Language-Team: Russian (http://www.transifex.com/openstack/oslomessaging/"
"language/ru/)\n"
"Language-Team: Russian\n"
"Language: ru\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Generated-By: Babel 2.0\n"
"Plural-Forms: nplurals=4; plural=(n%10==1 && n%100!=11 ? 0 : n%10>=2 && n"
"%10<=4 && (n%100<12 || n%100>14) ? 1 : n%10==0 || (n%10>=5 && n%10<=9) || (n"
"%100>=11 && n%100<=14)? 2 : 3);\n"
"Generated-By: Babel 2.0\n"
"X-Generator: Zanata 3.7.1\n"
#, python-format
msgid "An exception occurred processing the API call: %s "

View File

@ -19,7 +19,7 @@
Shared code between AMQP based openstack.common.rpc implementations.
The code in this module is shared between the rpc implementations based on
AMQP. Specifically, this includes impl_kombu and impl_qpid. impl_carrot also
AMQP. Specifically, this includes impl_kombu. impl_carrot also
uses AMQP, but is deprecated and predates this code.
"""
@ -31,7 +31,6 @@ from oslo_config import cfg
import six
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers import pool
deprecated_durable_opts = [
cfg.DeprecatedOpt('amqp_durable_queues',
@ -49,139 +48,11 @@ amqp_opts = [
default=False,
deprecated_group='DEFAULT',
help='Auto-delete queues in AMQP.'),
cfg.BoolOpt('send_single_reply',
default=False,
help='Send a single AMQP reply to call message. The current '
'behaviour since oslo-incubator is to send two AMQP '
'replies - first one with the payload, a second one to '
'ensure the other have finish to send the payload. We '
'are going to remove it in the N release, but we must '
'keep backward compatible at the same time. This option '
'provides such compatibility - it defaults to False in '
'Liberty and can be turned on for early adopters with a '
'new installations or for testing. Please note, that '
'this option will be removed in the Mitaka release.')
]
UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
# NOTE(sileht): Even if rabbit/qpid have only one Connection class,
# this connection can be used for two purposes:
# * wait and receive amqp messages (only do read stuffs on the socket)
# * send messages to the broker (only do write stuffs on the socket)
# The code inside a connection class is not concurrency safe.
# Using one Connection class instance for doing both, will result
# of eventlet complaining of multiple greenthreads that read/write the
# same fd concurrently... because 'send' and 'listen' run in different
# greenthread.
# So, a connection cannot be shared between thread/greenthread and
# this two variables permit to define the purpose of the connection
# to allow drivers to add special handling if needed (like heatbeat).
# amqp drivers create 3 kind of connections:
# * driver.listen*(): each call create a new 'PURPOSE_LISTEN' connection
# * driver.send*(): a pool of 'PURPOSE_SEND' connections is used
# * driver internally have another 'PURPOSE_LISTEN' connection dedicated
# to wait replies of rpc call
PURPOSE_LISTEN = 'listen'
PURPOSE_SEND = 'send'
class ConnectionPool(pool.Pool):
"""Class that implements a Pool of Connections."""
def __init__(self, conf, rpc_conn_pool_size, url, connection_cls):
self.connection_cls = connection_cls
self.conf = conf
self.url = url
super(ConnectionPool, self).__init__(rpc_conn_pool_size)
self.reply_proxy = None
# TODO(comstud): Timeout connections not used in a while
def create(self, purpose=None):
if purpose is None:
purpose = PURPOSE_SEND
LOG.debug('Pool creating new connection')
return self.connection_cls(self.conf, self.url, purpose)
def empty(self):
for item in self.iter_free():
item.close()
class ConnectionContext(rpc_common.Connection):
"""The class that is actually returned to the create_connection() caller.
This is essentially a wrapper around Connection that supports 'with'.
It can also return a new Connection, or one from a pool.
The function will also catch when an instance of this class is to be
deleted. With that we can return Connections to the pool on exceptions
and so forth without making the caller be responsible for catching them.
If possible the function makes sure to return a connection to the pool.
"""
def __init__(self, connection_pool, purpose):
"""Create a new connection, or get one from the pool."""
self.connection = None
self.connection_pool = connection_pool
pooled = purpose == PURPOSE_SEND
if pooled:
self.connection = connection_pool.get()
else:
# a non-pooled connection is requested, so create a new connection
self.connection = connection_pool.create(purpose)
self.pooled = pooled
self.connection.pooled = pooled
def __enter__(self):
"""When with ConnectionContext() is used, return self."""
return self
def _done(self):
"""If the connection came from a pool, clean it up and put it back.
If it did not come from a pool, close it.
"""
if self.connection:
if self.pooled:
# Reset the connection so it's ready for the next caller
# to grab from the pool
try:
self.connection.reset()
except Exception:
LOG.exception("Fail to reset the connection, drop it")
try:
self.connection.close()
except Exception:
pass
self.connection = self.connection_pool.create()
finally:
self.connection_pool.put(self.connection)
else:
try:
self.connection.close()
except Exception:
pass
self.connection = None
def __exit__(self, exc_type, exc_value, tb):
"""End of 'with' statement. We're done here."""
self._done()
def __del__(self):
"""Caller is done with this connection. Make sure we cleaned up."""
self._done()
def close(self):
"""Caller is done with this connection."""
self._done()
def __getattr__(self, key):
"""Proxy all other calls to the Connection instance."""
if self.connection:
return getattr(self.connection, key)
else:
raise rpc_common.InvalidRPCConnectionReuse()
class RpcContext(rpc_common.CommonRpcContext):
"""Context that supports replying to a rpc.call."""

View File

@ -17,6 +17,7 @@ __all__ = ['AMQPDriverBase']
import logging
import threading
import time
import uuid
import cachetools
@ -47,45 +48,27 @@ class AMQPIncomingMessage(base.IncomingMessage):
self.requeue_callback = message.requeue
self._obsolete_reply_queues = obsolete_reply_queues
def _send_reply(self, conn, reply=None, failure=None,
ending=False, log_failure=True):
if (self.reply_q and
not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
self.msg_id)):
def _send_reply(self, conn, reply=None, failure=None, log_failure=True):
if not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
self.msg_id):
return
if failure:
failure = rpc_common.serialize_remote_exception(failure,
log_failure)
msg = {'result': reply, 'failure': failure}
if ending:
msg['ending'] = True
# NOTE(sileht): ending can be removed in N*, see Listener.wait()
# for more detail.
msg = {'result': reply, 'failure': failure, 'ending': True,
'_msg_id': self.msg_id}
rpc_amqp._add_unique_id(msg)
unique_id = msg[rpc_amqp.UNIQUE_ID]
# If a reply_q exists, add the msg_id to the reply and pass the
# reply_q to direct_send() to use it as the response queue.
# Otherwise use the msg_id for backward compatibility.
if self.reply_q:
msg['_msg_id'] = self.msg_id
try:
if ending:
LOG.debug("sending reply msg_id: %(msg_id)s "
"reply queue: %(reply_q)s" % {
'msg_id': self.msg_id,
'unique_id': unique_id,
'reply_q': self.reply_q})
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
except rpc_amqp.AMQPDestinationNotFound:
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
else:
# TODO(sileht): look at which version of oslo-incubator rpc
# send need this, but I guess this is older than icehouse
# if this is icehouse, we can drop this at Mitaka
# if this is havana, we can drop this now.
conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg))
LOG.debug("sending reply msg_id: %(msg_id)s "
"reply queue: %(reply_q)s" % {
'msg_id': self.msg_id,
'unique_id': unique_id,
'reply_q': self.reply_q})
conn.direct_send(self.reply_q, rpc_common.serialize_msg(msg))
def reply(self, reply=None, failure=None, log_failure=True):
if not self.msg_id:
@ -94,19 +77,41 @@ class AMQPIncomingMessage(base.IncomingMessage):
return
# NOTE(sileht): return without hold the a connection if possible
if (self.reply_q and
not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
self.msg_id)):
if not self._obsolete_reply_queues.reply_q_valid(self.reply_q,
self.msg_id):
return
with self.listener.driver._get_connection(
rpc_amqp.PURPOSE_SEND) as conn:
if self.listener.driver.send_single_reply:
self._send_reply(conn, reply, failure, log_failure=log_failure,
ending=True)
else:
self._send_reply(conn, reply, failure, log_failure=log_failure)
self._send_reply(conn, ending=True)
# NOTE(sileht): we read the configuration value from the driver
# to be able to backport this change in previous version that
# still have the qpid driver
duration = self.listener.driver.missing_destination_retry_timeout
timer = rpc_common.DecayingTimer(duration=duration)
timer.start()
while True:
try:
with self.listener.driver._get_connection(
rpc_common.PURPOSE_SEND) as conn:
self._send_reply(conn, reply, failure,
log_failure=log_failure)
return
except rpc_amqp.AMQPDestinationNotFound:
if timer.check_return() > 0:
LOG.debug(("The reply %(msg_id)s cannot be sent "
"%(reply_q)s reply queue don't exist, "
"retrying...") % {
'msg_id': self.msg_id,
'reply_q': self.reply_q})
time.sleep(0.25)
else:
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
LOG.info(_LI("The reply %(msg_id)s cannot be sent "
"%(reply_q)s reply queue don't exist after "
"%(duration)s sec abandoning...") % {
'msg_id': self.msg_id,
'reply_q': self.reply_q,
'duration': duration})
return
def acknowledge(self):
self.acknowledge_callback()
@ -187,12 +192,8 @@ class AMQPListener(base.Listener):
unique_id = self.msg_id_cache.check_duplicate_message(message)
if ctxt.reply_q:
LOG.debug(
"received message msg_id: %(msg_id)s reply to %(queue)s" % {
'queue': ctxt.reply_q, 'msg_id': ctxt.msg_id})
else:
LOG.debug("received message unique_id: %s " % unique_id)
LOG.debug("received message msg_id: %(msg_id)s reply to %(queue)s" % {
'queue': ctxt.reply_q, 'msg_id': ctxt.msg_id})
self.incoming.append(AMQPIncomingMessage(self,
ctxt.to_dict(),
@ -202,6 +203,7 @@ class AMQPListener(base.Listener):
ctxt.reply_q,
self._obsolete_reply_queues))
@base.batch_poll_helper
def poll(self, timeout=None):
while not self._stopped.is_set():
if self.incoming:
@ -345,10 +347,10 @@ class ReplyWaiter(object):
class AMQPDriverBase(base.BaseDriver):
missing_destination_retry_timeout = 0
def __init__(self, conf, url, connection_pool,
default_exchange=None, allowed_remote_exmods=None,
send_single_reply=False):
default_exchange=None, allowed_remote_exmods=None):
super(AMQPDriverBase, self).__init__(conf, url, default_exchange,
allowed_remote_exmods)
@ -361,14 +363,12 @@ class AMQPDriverBase(base.BaseDriver):
self._reply_q_conn = None
self._waiter = None
self.send_single_reply = send_single_reply
def _get_exchange(self, target):
return target.exchange or self._default_exchange
def _get_connection(self, purpose=rpc_amqp.PURPOSE_SEND):
return rpc_amqp.ConnectionContext(self._connection_pool,
purpose=purpose)
def _get_connection(self, purpose=rpc_common.PURPOSE_SEND):
return rpc_common.ConnectionContext(self._connection_pool,
purpose=purpose)
def _get_reply_q(self):
with self._reply_q_lock:
@ -377,7 +377,7 @@ class AMQPDriverBase(base.BaseDriver):
reply_q = 'reply_' + uuid.uuid4().hex
conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
self._waiter = ReplyWaiter(reply_q, conn,
self._allowed_remote_exmods)
@ -422,7 +422,7 @@ class AMQPDriverBase(base.BaseDriver):
log_msg = "CAST unique_id: %s " % unique_id
try:
with self._get_connection(rpc_amqp.PURPOSE_SEND) as conn:
with self._get_connection(rpc_common.PURPOSE_SEND) as conn:
if notify:
exchange = self._get_exchange(target)
log_msg += "NOTIFY exchange '%(exchange)s'" \
@ -468,7 +468,7 @@ class AMQPDriverBase(base.BaseDriver):
envelope=(version == 2.0), notify=True, retry=retry)
def listen(self, target):
conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
listener = AMQPListener(self, conn)
@ -484,7 +484,7 @@ class AMQPDriverBase(base.BaseDriver):
return listener
def listen_for_notifications(self, targets_and_priorities, pool):
conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
listener = AMQPListener(self, conn)
for target, priority in targets_and_priorities:

View File

@ -15,9 +15,12 @@
import abc
import six
from oslo_config import cfg
from oslo_utils import timeutils
import six
from six.moves import range as compat_range
from oslo_messaging import exceptions
base_opts = [
@ -28,6 +31,27 @@ base_opts = [
]
def batch_poll_helper(func):
"""Decorator to poll messages in batch
This decorator helps driver that polls message one by one,
to returns a list of message.
"""
def wrapper(in_self, timeout=None, prefetch_size=1):
incomings = []
watch = timeutils.StopWatch(duration=timeout)
with watch:
for __ in compat_range(prefetch_size):
msg = func(in_self, timeout=watch.leftover(return_none=True))
if msg is not None:
incomings.append(msg)
else:
# timeout reached or listener stopped
break
return incomings
return wrapper
class TransportDriverError(exceptions.MessagingException):
"""Base class for transport driver specific exceptions."""
@ -61,8 +85,9 @@ class Listener(object):
self.driver = driver
@abc.abstractmethod
def poll(self, timeout=None):
"""Blocking until a message is pending and return IncomingMessage.
def poll(self, timeout=None, prefetch_size=1):
"""Blocking until 'prefetch_size' message is pending and return
[IncomingMessage].
Return None after timeout seconds if timeout is set and no message is
ending or if the listener have been stopped.
"""

View File

@ -109,7 +109,7 @@ class Timeout(RPCException):
:param info: Extra info to convey to the user
:param topic: The topic that the rpc call was sent to
:param rpc_method_name: The name of the rpc method being
:param method: The name of the rpc method being
called
"""
self.info = info
@ -348,3 +348,99 @@ class DecayingTimer(object):
if left <= 0 and timeout_callback is not None:
timeout_callback(*args, **kwargs)
return left if maximum is None else min(left, maximum)
# NOTE(sileht): Even if rabbit has only one Connection class,
# this connection can be used for two purposes:
# * wait and receive amqp messages (only do read stuffs on the socket)
# * send messages to the broker (only do write stuffs on the socket)
# The code inside a connection class is not concurrency safe.
# Using one Connection class instance for doing both, will result
# of eventlet complaining of multiple greenthreads that read/write the
# same fd concurrently... because 'send' and 'listen' run in different
# greenthread.
# So, a connection cannot be shared between thread/greenthread and
# this two variables permit to define the purpose of the connection
# to allow drivers to add special handling if needed (like heatbeat).
# amqp drivers create 3 kind of connections:
# * driver.listen*(): each call create a new 'PURPOSE_LISTEN' connection
# * driver.send*(): a pool of 'PURPOSE_SEND' connections is used
# * driver internally have another 'PURPOSE_LISTEN' connection dedicated
# to wait replies of rpc call
PURPOSE_LISTEN = 'listen'
PURPOSE_SEND = 'send'
class ConnectionContext(Connection):
"""The class that is actually returned to the create_connection() caller.
This is essentially a wrapper around Connection that supports 'with'.
It can also return a new Connection, or one from a pool.
The function will also catch when an instance of this class is to be
deleted. With that we can return Connections to the pool on exceptions
and so forth without making the caller be responsible for catching them.
If possible the function makes sure to return a connection to the pool.
"""
def __init__(self, connection_pool, purpose):
"""Create a new connection, or get one from the pool."""
self.connection = None
self.connection_pool = connection_pool
pooled = purpose == PURPOSE_SEND
if pooled:
self.connection = connection_pool.get()
else:
# a non-pooled connection is requested, so create a new connection
self.connection = connection_pool.create(purpose)
self.pooled = pooled
self.connection.pooled = pooled
def __enter__(self):
"""When with ConnectionContext() is used, return self."""
return self
def _done(self):
"""If the connection came from a pool, clean it up and put it back.
If it did not come from a pool, close it.
"""
if self.connection:
if self.pooled:
# Reset the connection so it's ready for the next caller
# to grab from the pool
try:
self.connection.reset()
except Exception:
LOG.exception("Fail to reset the connection, drop it")
try:
self.connection.close()
except Exception:
pass
self.connection = self.connection_pool.create()
finally:
self.connection_pool.put(self.connection)
else:
try:
self.connection.close()
except Exception:
pass
self.connection = None
def __exit__(self, exc_type, exc_value, tb):
"""End of 'with' statement. We're done here."""
self._done()
def __del__(self):
"""Caller is done with this connection. Make sure we cleaned up."""
self._done()
def close(self):
"""Caller is done with this connection."""
self._done()
def __getattr__(self, key):
"""Proxy all other calls to the Connection instance."""
if self.connection:
return getattr(self.connection, key)
else:
raise InvalidRPCConnectionReuse()

View File

@ -54,6 +54,7 @@ class FakeListener(base.Listener):
exchange = self._exchange_manager.get_exchange(target.exchange)
exchange.ensure_queue(target, pool)
@base.batch_poll_helper
def poll(self, timeout=None):
if timeout is not None:
deadline = time.time() + timeout

View File

@ -0,0 +1,364 @@
# Copyright (C) 2015 Cisco Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as driver_common
from oslo_messaging._drivers import pool as driver_pool
from oslo_messaging._i18n import _LE
from oslo_messaging._i18n import _LW
from oslo_serialization import jsonutils
import kafka
from kafka.common import KafkaError
from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
PURPOSE_SEND = 'send'
PURPOSE_LISTEN = 'listen'
kafka_opts = [
cfg.StrOpt('kafka_default_host', default='localhost',
help='Default Kafka broker Host'),
cfg.IntOpt('kafka_default_port', default=9092,
help='Default Kafka broker Port'),
cfg.IntOpt('kafka_max_fetch_bytes', default=1024 * 1024,
help='Max fetch bytes of Kafka consumer'),
cfg.IntOpt('kafka_consumer_timeout', default=1.0,
help='Default timeout(s) for Kafka consumers'),
cfg.IntOpt('pool_size', default=10,
help='Pool Size for Kafka Consumers'),
]
CONF = cfg.CONF
def pack_context_with_message(ctxt, msg):
"""Pack context into msg."""
if isinstance(ctxt, dict):
context_d = ctxt
else:
context_d = ctxt.to_dict()
return {'message': msg, 'context': context_d}
def target_to_topic(target):
"""Convert target into topic string
:param target: Message destination target
:type target: oslo_messaging.Target
"""
if target.exchange is None:
return target.topic
return "%s_%s" % (target.exchange, target.topic)
class Connection(object):
def __init__(self, conf, url, purpose):
driver_conf = conf.oslo_messaging_kafka
self.conf = conf
self.kafka_client = None
self.producer = None
self.consumer = None
self.fetch_messages_max_bytes = driver_conf.kafka_max_fetch_bytes
self.consumer_timeout = float(driver_conf.kafka_consumer_timeout)
self.url = url
self._parse_url()
# TODO(Support for manual/auto_commit functionality)
# When auto_commit is False, consumer can manually notify
# the completion of the subscription.
# Currently we don't support for non auto commit option
self.auto_commit = True
self._consume_loop_stopped = False
def _parse_url(self):
driver_conf = self.conf.oslo_messaging_kafka
try:
self.host = self.url.hosts[0].hostname
except (NameError, IndexError):
self.host = driver_conf.kafka_default_host
try:
self.port = self.url.hosts[0].port
except (NameError, IndexError):
self.port = driver_conf.kafka_default_port
if self.host is None:
self.host = driver_conf.kafka_default_host
if self.port is None:
self.port = driver_conf.kafka_default_port
def notify_send(self, topic, ctxt, msg, retry):
"""Send messages to Kafka broker.
:param topic: String of the topic
:param ctxt: context for the messages
:param msg: messages for publishing
:param retry: the number of retry
"""
message = pack_context_with_message(ctxt, msg)
self._ensure_connection()
self._send_and_retry(message, topic, retry)
def _send_and_retry(self, message, topic, retry):
current_retry = 0
if not isinstance(message, str):
message = jsonutils.dumps(message)
while message is not None:
try:
self._send(message, topic)
message = None
except Exception:
LOG.warn(_LW("Failed to publish a message of topic %s"), topic)
current_retry += 1
if retry is not None and current_retry >= retry:
LOG.exception(_LE("Failed to retry to send data "
"with max retry times"))
message = None
def _send(self, message, topic):
self.producer.send_messages(topic, message)
def consume(self, timeout=None):
"""recieve messages as many as max_fetch_messages.
In this functions, there are no while loop to subscribe.
This would be helpful when we wants to control the velocity of
subscription.
"""
duration = (self.consumer_timeout if timeout is None else timeout)
timer = driver_common.DecayingTimer(duration=duration)
timer.start()
def _raise_timeout():
LOG.debug('Timed out waiting for Kafka response')
raise driver_common.Timeout()
poll_timeout = (self.consumer_timeout if timeout is None
else min(timeout, self.consumer_timeout))
while True:
if self._consume_loop_stopped:
return
try:
next_timeout = poll_timeout * 1000.0
# TODO(use configure() method instead)
# Currently KafkaConsumer does not support for
# the case of updating only fetch_max_wait_ms parameter
self.consumer._config['fetch_max_wait_ms'] = next_timeout
messages = list(self.consumer.fetch_messages())
except Exception as e:
LOG.exception(_LE("Failed to consume messages: %s"), e)
messages = None
if not messages:
poll_timeout = timer.check_return(
_raise_timeout, maximum=self.consumer_timeout)
continue
return messages
def stop_consuming(self):
self._consume_loop_stopped = True
def reset(self):
"""Reset a connection so it can be used again."""
if self.kafka_client:
self.kafka_client.close()
self.kafka_client = None
if self.producer:
self.producer.stop()
self.producer = None
self.consumer = None
def close(self):
if self.kafka_client:
self.kafka_client.close()
self.kafka_client = None
if self.producer:
self.producer.stop()
self.consumer = None
def commit(self):
"""Commit is used by subscribers belonging to the same group.
After subscribing messages, commit is called to prevent
the other subscribers which belong to the same group
from re-subscribing the same messages.
Currently self.auto_commit option is always True,
so we don't need to call this function.
"""
self.consumer.commit()
def _ensure_connection(self):
if self.kafka_client:
return
try:
self.kafka_client = kafka.KafkaClient(
"%s:%s" % (self.host, str(self.port)))
self.producer = kafka.SimpleProducer(self.kafka_client)
except KafkaError as e:
LOG.exception(_LE("Kafka Connection is not available: %s"), e)
self.kafka_client = None
def declare_topic_consumer(self, topics, group=None):
self.consumer = kafka.KafkaConsumer(
*topics, group_id=group,
metadata_broker_list=["%s:%s" % (self.host, str(self.port))],
# auto_commit_enable=self.auto_commit,
fetch_message_max_bytes=self.fetch_messages_max_bytes)
class OsloKafkaMessage(base.IncomingMessage):
def __init__(self, listener, ctxt, message):
super(OsloKafkaMessage, self).__init__(listener, ctxt, message)
def requeue(self):
LOG.warn(_LW("requeue is not supported"))
def reply(self, reply=None, failure=None, log_failure=True):
LOG.warn(_LW("reply is not supported"))
class KafkaListener(base.Listener):
def __init__(self, driver, conn):
super(KafkaListener, self).__init__(driver)
self._stopped = threading.Event()
self.conn = conn
self.incoming_queue = []
@base.batch_poll_helper
def poll(self, timeout=None):
while not self._stopped.is_set():
if self.incoming_queue:
return self.incoming_queue.pop(0)
try:
messages = self.conn.consume(timeout=timeout)
for msg in messages:
message = msg.value
message = jsonutils.loads(message)
self.incoming_queue.append(OsloKafkaMessage(
listener=self, ctxt=message['context'],
message=message['message']))
except driver_common.Timeout:
return None
def stop(self):
self._stopped.set()
self.conn.stop_consuming()
def cleanup(self):
self.conn.close()
def commit(self):
# TODO(Support for manually/auto commit functionality)
# It's better to allow users to commit manually and support for
# self.auto_commit = False option. For now, this commit function
# is meaningless since user couldn't call this function and
# auto_commit option is always True.
self.conn.commit()
class KafkaDriver(base.BaseDriver):
"""Note: Current implementation of this driver is experimental.
We will have functional and/or integrated testing enabled for this driver.
"""
def __init__(self, conf, url, default_exchange=None,
allowed_remote_exmods=None):
opt_group = cfg.OptGroup(name='oslo_messaging_kafka',
title='Kafka driver options')
conf.register_group(opt_group)
conf.register_opts(kafka_opts, group=opt_group)
super(KafkaDriver, self).__init__(
conf, url, default_exchange, allowed_remote_exmods)
self.connection_pool = driver_pool.ConnectionPool(
self.conf, self.conf.oslo_messaging_kafka.pool_size,
self._url, Connection)
self.listeners = []
def cleanup(self):
for c in self.listeners:
c.close()
self.listeners = []
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
retry=None):
raise NotImplementedError(
'The RPC implementation for Kafka is not implemented')
def send_notification(self, target, ctxt, message, version, retry=None):
"""Send notification to Kafka brokers
:param target: Message destination target
:type target: oslo_messaging.Target
:param ctxt: Message context
:type ctxt: dict
:param message: Message payload to pass
:type message: dict
:param version: Messaging API version (currently not used)
:type version: str
:param retry: an optional default kafka consumer retries configuration
None means to retry forever
0 means no retry
N means N retries
:type retry: int
"""
with self._get_connection(purpose=PURPOSE_SEND) as conn:
conn.notify_send(target_to_topic(target), ctxt, message, retry)
def listen(self, target):
raise NotImplementedError(
'The RPC implementation for Kafka is not implemented')
def listen_for_notifications(self, targets_and_priorities, pool=None):
"""Listen to a specified list of targets on Kafka brokers
:param targets_and_priorities: List of pairs (target, priority)
priority is not used for kafka driver
target.exchange_target.topic is used as
a kafka topic
:type targets_and_priorities: list
:param pool: consumer group of Kafka consumers
:type pool: string
"""
conn = self._get_connection(purpose=PURPOSE_LISTEN)
topics = []
for target, priority in targets_and_priorities:
topics.append(target_to_topic(target))
conn.declare_topic_consumer(topics, pool)
listener = KafkaListener(self, conn)
return listener
def _get_connection(self, purpose):
return driver_common.ConnectionContext(self.connection_pool, purpose)

View File

@ -1,800 +0,0 @@
# Copyright 2011 OpenStack Foundation
# Copyright 2011 - 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import itertools
import logging
import os
import random
import time
import warnings
from oslo_config import cfg
from oslo_serialization import jsonutils
from oslo_utils import importutils
from oslo_utils import netutils
import six
from oslo_messaging._drivers import amqp as rpc_amqp
from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._i18n import _
from oslo_messaging._i18n import _LE
from oslo_messaging._i18n import _LI
from oslo_messaging import exceptions
qpid_codec = importutils.try_import("qpid.codec010")
qpid_messaging = importutils.try_import("qpid.messaging")
qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
LOG = logging.getLogger(__name__)
qpid_opts = [
cfg.StrOpt('qpid_hostname',
default='localhost',
deprecated_group='DEFAULT',
help='Qpid broker hostname.'),
cfg.IntOpt('qpid_port',
default=5672,
deprecated_group='DEFAULT',
help='Qpid broker port.'),
cfg.ListOpt('qpid_hosts',
default=['$qpid_hostname:$qpid_port'],
deprecated_group='DEFAULT',
help='Qpid HA cluster host:port pairs.'),
cfg.StrOpt('qpid_username',
default='',
deprecated_group='DEFAULT',
help='Username for Qpid connection.'),
cfg.StrOpt('qpid_password',
default='',
deprecated_group='DEFAULT',
help='Password for Qpid connection.',
secret=True),
cfg.StrOpt('qpid_sasl_mechanisms',
default='',
deprecated_group='DEFAULT',
help='Space separated list of SASL mechanisms to use for '
'auth.'),
cfg.IntOpt('qpid_heartbeat',
default=60,
deprecated_group='DEFAULT',
help='Seconds between connection keepalive heartbeats.'),
cfg.StrOpt('qpid_protocol',
default='tcp',
deprecated_group='DEFAULT',
help="Transport to use, either 'tcp' or 'ssl'."),
cfg.BoolOpt('qpid_tcp_nodelay',
default=True,
deprecated_group='DEFAULT',
help='Whether to disable the Nagle algorithm.'),
cfg.IntOpt('qpid_receiver_capacity',
default=1,
deprecated_group='DEFAULT',
help='The number of prefetched messages held by receiver.'),
# NOTE(russellb) If any additional versions are added (beyond 1 and 2),
# this file could probably use some additional refactoring so that the
# differences between each version are split into different classes.
cfg.IntOpt('qpid_topology_version',
default=1,
deprecated_group='DEFAULT',
help="The qpid topology version to use. Version 1 is what "
"was originally used by impl_qpid. Version 2 includes "
"some backwards-incompatible changes that allow broker "
"federation to work. Users should update to version 2 "
"when they are able to take everything down, as it "
"requires a clean break."),
]
JSON_CONTENT_TYPE = 'application/json; charset=utf8'
def raise_invalid_topology_version(conf):
msg = (_("Invalid value for qpid_topology_version: %d") %
conf.qpid_topology_version)
LOG.error(msg)
raise Exception(msg)
class QpidMessage(dict):
def __init__(self, session, raw_message):
super(QpidMessage, self).__init__(
rpc_common.deserialize_msg(raw_message.content))
self._raw_message = raw_message
self._session = session
def acknowledge(self):
self._session.acknowledge(self._raw_message)
def requeue(self):
pass
class ConsumerBase(object):
"""Consumer base class."""
def __init__(self, conf, session, callback, node_name, node_opts,
link_name, link_opts):
"""Declare a queue on an amqp session.
'session' is the amqp session to use
'callback' is the callback to call when messages are received
'node_name' is the first part of the Qpid address string, before ';'
'node_opts' will be applied to the "x-declare" section of "node"
in the address string.
'link_name' goes into the "name" field of the "link" in the address
string
'link_opts' will be applied to the "x-declare" section of "link"
in the address string.
"""
self.callback = callback
self.receiver = None
self.rcv_capacity = conf.qpid_receiver_capacity
self.session = None
if conf.qpid_topology_version == 1:
addr_opts = {
"create": "always",
"node": {
"type": "topic",
"x-declare": {
"durable": True,
"auto-delete": True,
},
},
"link": {
"durable": True,
"x-declare": {
"durable": False,
"auto-delete": True,
"exclusive": False,
},
},
}
addr_opts["node"]["x-declare"].update(node_opts)
elif conf.qpid_topology_version == 2:
addr_opts = {
"link": {
"x-declare": {
"auto-delete": True,
"exclusive": False,
},
},
}
else:
raise_invalid_topology_version(conf)
addr_opts["link"]["x-declare"].update(link_opts)
if link_name:
addr_opts["link"]["name"] = link_name
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
self.connect(session)
def connect(self, session):
"""Declare the receiver on connect."""
self._declare_receiver(session)
def reconnect(self, session):
"""Re-declare the receiver after a Qpid reconnect."""
self._declare_receiver(session)
def _declare_receiver(self, session):
self.session = session
self.receiver = session.receiver(self.address)
self.receiver.capacity = self.rcv_capacity
def _unpack_json_msg(self, msg):
"""Load the JSON data in msg if msg.content_type indicates that it
is necessary. Put the loaded data back into msg.content and
update msg.content_type appropriately.
A Qpid Message containing a dict will have a content_type of
'amqp/map', whereas one containing a string that needs to be converted
back from JSON will have a content_type of JSON_CONTENT_TYPE.
:param msg: a Qpid Message object
:returns: None
"""
if msg.content_type == JSON_CONTENT_TYPE:
msg.content = jsonutils.loads(msg.content)
msg.content_type = 'amqp/map'
def consume(self):
"""Fetch the message and pass it to the callback object."""
message = self.receiver.fetch()
try:
self._unpack_json_msg(message)
self.callback(QpidMessage(self.session, message))
except Exception:
LOG.exception(_LE("Failed to process message... skipping it."))
self.session.acknowledge(message)
def get_receiver(self):
return self.receiver
def get_node_name(self):
return self.address.split(';')[0]
class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'."""
def __init__(self, conf, session, msg_id, callback):
"""Init a 'direct' queue.
'session' is the amqp session to use
'msg_id' is the msg_id to listen on
'callback' is the callback to call when messages are received
"""
link_opts = {
"exclusive": True,
"durable": conf.amqp_durable_queues,
}
if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (msg_id, msg_id)
node_opts = {"type": "direct"}
link_name = msg_id
elif conf.qpid_topology_version == 2:
node_name = "amq.direct/%s" % msg_id
node_opts = {}
link_name = msg_id
else:
raise_invalid_topology_version(conf)
super(DirectConsumer, self).__init__(conf, session, callback,
node_name, node_opts, link_name,
link_opts)
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'."""
def __init__(self, conf, session, topic, callback, exchange_name,
name=None):
"""Init a 'topic' queue.
:param session: the amqp session to use
:param topic: is the topic to listen on
:paramtype topic: str
:param callback: the callback to call when messages are received
:param name: optional queue name, defaults to topic
"""
link_opts = {
"auto-delete": conf.amqp_auto_delete,
"durable": conf.amqp_durable_queues,
}
if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (exchange_name, topic)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
else:
raise_invalid_topology_version(conf)
super(TopicConsumer, self).__init__(conf, session, callback, node_name,
{}, name or topic, link_opts)
class FanoutConsumer(ConsumerBase):
"""Consumer class for 'fanout'."""
def __init__(self, conf, session, topic, callback):
"""Init a 'fanout' queue.
'session' is the amqp session to use
'topic' is the topic to listen on
'callback' is the callback to call when messages are received
"""
self.conf = conf
link_opts = {"exclusive": True}
if conf.qpid_topology_version == 1:
node_name = "%s_fanout" % topic
node_opts = {"durable": False, "type": "fanout"}
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/fanout/%s" % topic
node_opts = {}
else:
raise_invalid_topology_version(conf)
super(FanoutConsumer, self).__init__(conf, session, callback,
node_name, node_opts, None,
link_opts)
class Publisher(object):
"""Base Publisher class."""
def __init__(self, conf, session, node_name, node_opts=None):
"""Init the Publisher class with the exchange_name, routing_key,
and other options
"""
self.sender = None
self.session = session
if conf.qpid_topology_version == 1:
addr_opts = {
"create": "always",
"node": {
"type": "topic",
"x-declare": {
"durable": False,
# auto-delete isn't implemented for exchanges in qpid,
# but put in here anyway
"auto-delete": True,
},
},
}
if node_opts:
addr_opts["node"]["x-declare"].update(node_opts)
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
elif conf.qpid_topology_version == 2:
self.address = node_name
else:
raise_invalid_topology_version(conf)
self.reconnect(session)
def reconnect(self, session):
"""Re-establish the Sender after a reconnection."""
self.sender = session.sender(self.address)
def _pack_json_msg(self, msg):
"""Qpid cannot serialize dicts containing strings longer than 65535
characters. This function dumps the message content to a JSON
string, which Qpid is able to handle.
:param msg: May be either a Qpid Message object or a bare dict.
:returns: A Qpid Message with its content field JSON encoded.
"""
try:
msg.content = jsonutils.dumps(msg.content)
except AttributeError:
# Need to have a Qpid message so we can set the content_type.
msg = qpid_messaging.Message(jsonutils.dumps(msg))
msg.content_type = JSON_CONTENT_TYPE
return msg
def send(self, msg):
"""Send a message."""
try:
# Check if Qpid can encode the message
check_msg = msg
if not hasattr(check_msg, 'content_type'):
check_msg = qpid_messaging.Message(msg)
content_type = check_msg.content_type
enc, dec = qpid_messaging.message.get_codec(content_type)
enc(check_msg.content)
except qpid_codec.CodecException:
# This means the message couldn't be serialized as a dict.
msg = self._pack_json_msg(msg)
self.sender.send(msg)
class DirectPublisher(Publisher):
"""Publisher class for 'direct'."""
def __init__(self, conf, session, topic):
"""Init a 'direct' publisher."""
if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (topic, topic)
node_opts = {"type": "direct"}
elif conf.qpid_topology_version == 2:
node_name = "amq.direct/%s" % topic
node_opts = {}
else:
raise_invalid_topology_version(conf)
super(DirectPublisher, self).__init__(conf, session, node_name,
node_opts)
class TopicPublisher(Publisher):
"""Publisher class for 'topic'."""
def __init__(self, conf, session, exchange_name, topic):
"""Init a 'topic' publisher.
"""
if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (exchange_name, topic)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
else:
raise_invalid_topology_version(conf)
super(TopicPublisher, self).__init__(conf, session, node_name)
class FanoutPublisher(Publisher):
"""Publisher class for 'fanout'."""
def __init__(self, conf, session, topic):
"""Init a 'fanout' publisher.
"""
if conf.qpid_topology_version == 1:
node_name = "%s_fanout" % topic
node_opts = {"type": "fanout"}
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/fanout/%s" % topic
node_opts = {}
else:
raise_invalid_topology_version(conf)
super(FanoutPublisher, self).__init__(conf, session, node_name,
node_opts)
class NotifyPublisher(Publisher):
"""Publisher class for notifications."""
def __init__(self, conf, session, exchange_name, topic):
"""Init a 'topic' publisher.
"""
node_opts = {"durable": True}
if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (exchange_name, topic)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
else:
raise_invalid_topology_version(conf)
super(NotifyPublisher, self).__init__(conf, session, node_name,
node_opts)
class Connection(object):
"""Connection object."""
pools = {}
def __init__(self, conf, url, purpose):
if not qpid_messaging:
raise ImportError("Failed to import qpid.messaging")
self.connection = None
self.session = None
self.consumers = {}
self.conf = conf
self.driver_conf = conf.oslo_messaging_qpid
self._consume_loop_stopped = False
self.brokers_params = []
if url.hosts:
for host in url.hosts:
params = {
'username': host.username or '',
'password': host.password or '',
}
if host.port is not None:
params['host'] = '%s:%d' % (host.hostname, host.port)
else:
params['host'] = host.hostname
self.brokers_params.append(params)
else:
# Old configuration format
for adr in self.driver_conf.qpid_hosts:
hostname, port = netutils.parse_host_port(
adr, default_port=5672)
if ':' in hostname:
hostname = '[' + hostname + ']'
params = {
'host': '%s:%d' % (hostname, port),
'username': self.driver_conf.qpid_username,
'password': self.driver_conf.qpid_password,
}
self.brokers_params.append(params)
random.shuffle(self.brokers_params)
self.brokers = itertools.cycle(self.brokers_params)
self._initial_pid = os.getpid()
self.reconnect()
def _connect(self, broker):
# Create the connection - this does not open the connection
self.connection = qpid_messaging.Connection(broker['host'])
# Check if flags are set and if so set them for the connection
# before we call open
self.connection.username = broker['username']
self.connection.password = broker['password']
self.connection.sasl_mechanisms = self.driver_conf.qpid_sasl_mechanisms
# Reconnection is done by self.reconnect()
self.connection.reconnect = False
self.connection.heartbeat = self.driver_conf.qpid_heartbeat
self.connection.transport = self.driver_conf.qpid_protocol
self.connection.tcp_nodelay = self.driver_conf.qpid_tcp_nodelay
self.connection.open()
def _register_consumer(self, consumer):
self.consumers[six.text_type(consumer.get_receiver())] = consumer
def _lookup_consumer(self, receiver):
return self.consumers[six.text_type(receiver)]
def _disconnect(self):
# Close the session if necessary
if self.connection is not None and self.connection.opened():
try:
self.connection.close()
except qpid_exceptions.MessagingError:
pass
self.connection = None
def reconnect(self, retry=None):
"""Handles reconnecting and re-establishing sessions and queues.
Will retry up to retry number of times.
retry = None or -1 means to retry forever
retry = 0 means no retry
retry = N means N retries
"""
delay = 1
attempt = 0
loop_forever = False
if retry is None or retry < 0:
loop_forever = True
while True:
self._disconnect()
attempt += 1
broker = six.next(self.brokers)
try:
self._connect(broker)
except qpid_exceptions.MessagingError as e:
msg_dict = dict(e=e,
delay=delay,
retry=retry,
broker=broker)
if not loop_forever and attempt > retry:
msg = _('Unable to connect to AMQP server on '
'%(broker)s after %(retry)d '
'tries: %(e)s') % msg_dict
LOG.error(msg)
raise exceptions.MessageDeliveryFailure(msg)
else:
msg = _LE("Unable to connect to AMQP server on "
"%(broker)s: %(e)s. Sleeping %(delay)s seconds")
LOG.error(msg, msg_dict)
time.sleep(delay)
delay = min(delay + 1, 5)
else:
LOG.info(_LI('Connected to AMQP server on %s'), broker['host'])
break
self.session = self.connection.session()
if self.consumers:
consumers = self.consumers
self.consumers = {}
for consumer in six.itervalues(consumers):
consumer.reconnect(self.session)
self._register_consumer(consumer)
LOG.debug("Re-established AMQP queues")
def ensure(self, error_callback, method, retry=None):
current_pid = os.getpid()
if self._initial_pid != current_pid:
# NOTE(sileht):
# to get the same level of fork support that rabbit driver have
# (ie: allow fork before the first connection established)
# we could use the kombu workaround:
# https://github.com/celery/kombu/blob/master/kombu/transport/
# qpid_patches.py#L67
LOG.warn("Process forked! "
"This can result in unpredictable behavior. "
"See: http://docs.openstack.org/developer/"
"oslo_messaging/transport.html")
self._initial_pid = current_pid
while True:
try:
return method()
except (qpid_exceptions.Empty,
qpid_exceptions.MessagingError) as e:
if error_callback:
error_callback(e)
self.reconnect(retry=retry)
def close(self):
"""Close/release this connection."""
try:
self.connection.close()
except Exception:
# NOTE(dripton) Logging exceptions that happen during cleanup just
# causes confusion; there's really nothing useful we can do with
# them.
pass
self.connection = None
def reset(self):
"""Reset a connection so it can be used again."""
self.session.close()
self.session = self.connection.session()
self.consumers = {}
def declare_consumer(self, consumer_cls, topic, callback):
"""Create a Consumer using the class that was passed in and
add it to our list of consumers
"""
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': exc}
LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s"), log_info)
def _declare_consumer():
consumer = consumer_cls(self.driver_conf, self.session, topic,
callback)
self._register_consumer(consumer)
return consumer
return self.ensure(_connect_error, _declare_consumer)
def consume(self, timeout=None):
"""Consume from all queues/consumers."""
timer = rpc_common.DecayingTimer(duration=timeout)
timer.start()
def _raise_timeout(exc):
LOG.debug('Timed out waiting for RPC response: %s', exc)
raise rpc_common.Timeout()
def _error_callback(exc):
timer.check_return(_raise_timeout, exc)
LOG.exception(_LE('Failed to consume message from queue: %s'), exc)
def _consume():
# NOTE(sileht):
# maximum value chosen according the best practice from kombu:
# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
poll_timeout = 1 if timeout is None else min(timeout, 1)
while True:
if self._consume_loop_stopped:
self._consume_loop_stopped = False
return
try:
nxt_receiver = self.session.next_receiver(
timeout=poll_timeout)
except qpid_exceptions.Empty as exc:
poll_timeout = timer.check_return(_raise_timeout, exc,
maximum=1)
else:
break
try:
self._lookup_consumer(nxt_receiver).consume()
except Exception:
LOG.exception(_LE("Error processing message. "
"Skipping it."))
self.ensure(_error_callback, _consume)
def publisher_send(self, cls, topic, msg, retry=None, **kwargs):
"""Send to a publisher based on the publisher class."""
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': exc}
LOG.exception(_LE("Failed to publish message to topic "
"'%(topic)s': %(err_str)s"), log_info)
def _publisher_send():
publisher = cls(self.driver_conf, self.session, topic=topic,
**kwargs)
publisher.send(msg)
return self.ensure(_connect_error, _publisher_send, retry=retry)
def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
In nova's use, this is generally a msg_id queue used for
responses for call/multicall
"""
self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, exchange_name, topic, callback=None,
queue_name=None):
"""Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
exchange_name=exchange_name,
),
topic, callback)
def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer."""
self.declare_consumer(FanoutConsumer, topic, callback)
def direct_send(self, msg_id, msg):
"""Send a 'direct' message."""
self.publisher_send(DirectPublisher, topic=msg_id, msg=msg)
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
"""Send a 'topic' message."""
#
# We want to create a message with attributes, for example a TTL. We
# don't really need to keep 'msg' in its JSON format any longer
# so let's create an actual Qpid message here and get some
# value-add on the go.
#
# WARNING: Request timeout happens to be in the same units as
# Qpid's TTL (seconds). If this changes in the future, then this
# will need to be altered accordingly.
#
qpid_message = qpid_messaging.Message(content=msg, ttl=timeout)
self.publisher_send(TopicPublisher, topic=topic, msg=qpid_message,
exchange_name=exchange_name, retry=retry)
def fanout_send(self, topic, msg, retry=None):
"""Send a 'fanout' message."""
self.publisher_send(FanoutPublisher, topic=topic, msg=msg, retry=retry)
def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
"""Send a notify message on a topic."""
self.publisher_send(NotifyPublisher, topic=topic, msg=msg,
exchange_name=exchange_name, retry=retry)
def stop_consuming(self):
self._consume_loop_stopped = True
class QpidDriver(amqpdriver.AMQPDriverBase):
"""qpidd Driver
.. deprecated:: 1.16 (Liberty)
"""
def __init__(self, conf, url,
default_exchange=None, allowed_remote_exmods=None):
warnings.warn(_('The Qpid driver has been deprecated. '
'The driver is planned to be removed during the '
'`Mitaka` development cycle.'),
DeprecationWarning, stacklevel=2)
opt_group = cfg.OptGroup(name='oslo_messaging_qpid',
title='QPID driver options')
conf.register_group(opt_group)
conf.register_opts(qpid_opts, group=opt_group)
conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
conf.register_opts(base.base_opts, group=opt_group)
connection_pool = rpc_amqp.ConnectionPool(
conf, conf.oslo_messaging_qpid.rpc_conn_pool_size,
url, Connection)
super(QpidDriver, self).__init__(
conf, url,
connection_pool,
default_exchange,
allowed_remote_exmods,
conf.oslo_messaging_qpid.send_single_reply,
)

View File

@ -37,6 +37,7 @@ from oslo_messaging._drivers import amqp as rpc_amqp
from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers import pool
from oslo_messaging._i18n import _
from oslo_messaging._i18n import _LE
from oslo_messaging._i18n import _LI
@ -72,27 +73,28 @@ rabbit_opts = [
deprecated_group='DEFAULT',
help='How long to wait before reconnecting in response to an '
'AMQP consumer cancel notification.'),
cfg.IntOpt('kombu_reconnect_timeout',
# NOTE(dhellmann): We want this to be similar to
# rpc_response_timeout, but we can't use
# "$rpc_response_timeout" as a default because that
# option may not have been defined by the time this
# option is accessed. Instead, document the intent in
# the help text for this option and provide a separate
# literal default value.
cfg.IntOpt('kombu_missing_consumer_retry_timeout',
deprecated_name="kombu_reconnect_timeout",
default=60,
help='How long to wait before considering a reconnect '
'attempt to have failed. This value should not be '
'longer than rpc_response_timeout.'),
help='How long to wait a missing client beforce abandoning to '
'send it its replies. This value should not be longer '
'than rpc_response_timeout.'),
cfg.StrOpt('kombu_failover_strategy',
choices=('round-robin', 'shuffle'),
default='round-robin',
help='Determines how the next RabbitMQ node is chosen in case '
'the one we are currently connected to becomes '
'unavailable. Takes effect only if more than one '
'RabbitMQ node is provided in config.'),
cfg.StrOpt('rabbit_host',
default='localhost',
deprecated_group='DEFAULT',
help='The RabbitMQ broker address where a single node is '
'used.'),
cfg.IntOpt('rabbit_port',
default=5672,
deprecated_group='DEFAULT',
help='The RabbitMQ broker port where a single node is used.'),
cfg.PortOpt('rabbit_port',
default=5672,
deprecated_group='DEFAULT',
help='The RabbitMQ broker port where a single node is used.'),
cfg.ListOpt('rabbit_hosts',
default=['$rabbit_host:$rabbit_port'],
deprecated_group='DEFAULT',
@ -376,7 +378,9 @@ class Connection(object):
self.amqp_durable_queues = driver_conf.amqp_durable_queues
self.amqp_auto_delete = driver_conf.amqp_auto_delete
self.rabbit_use_ssl = driver_conf.rabbit_use_ssl
self.kombu_reconnect_timeout = driver_conf.kombu_reconnect_timeout
self.kombu_missing_consumer_retry_timeout = \
driver_conf.kombu_missing_consumer_retry_timeout
self.kombu_failover_strategy = driver_conf.kombu_failover_strategy
if self.rabbit_use_ssl:
self.kombu_ssl_version = driver_conf.kombu_ssl_version
@ -448,7 +452,7 @@ class Connection(object):
# NOTE(sileht): if purpose is PURPOSE_LISTEN
# we don't need the lock because we don't
# have a heartbeat thread
if purpose == rpc_amqp.PURPOSE_SEND:
if purpose == rpc_common.PURPOSE_SEND:
self._connection_lock = ConnectionLock()
else:
self._connection_lock = DummyConnectionLock()
@ -456,8 +460,8 @@ class Connection(object):
self.connection = kombu.connection.Connection(
self._url, ssl=self._fetch_ssl_params(),
login_method=self.login_method,
failover_strategy="shuffle",
heartbeat=self.heartbeat_timeout_threshold,
failover_strategy=self.kombu_failover_strategy,
transport_options={
'confirm_publish': True,
'on_blocked': self._on_connection_blocked,
@ -465,8 +469,8 @@ class Connection(object):
},
)
LOG.info(_LI('Connecting to AMQP server on %(hostname)s:%(port)s'),
self.connection.info())
LOG.debug('Connecting to AMQP server on %(hostname)s:%(port)s',
self.connection.info())
# NOTE(sileht): kombu recommend to run heartbeat_check every
# seconds, but we use a lock around the kombu connection
@ -488,11 +492,12 @@ class Connection(object):
# the consume code does the heartbeat stuff
# we don't need a thread
self._heartbeat_thread = None
if purpose == rpc_amqp.PURPOSE_SEND:
if purpose == rpc_common.PURPOSE_SEND:
self._heartbeat_start()
LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)s'),
self.connection.info())
LOG.debug('Connected to AMQP server on %(hostname)s:%(port)s '
'via [%(transport)s] client',
self.connection.info())
# NOTE(sileht): value chosen according the best practice from kombu
# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
@ -576,7 +581,10 @@ class Connection(object):
LOG.info(_LI("The broker has unblocked the connection"))
def ensure_connection(self):
self.ensure(method=lambda: True)
# NOTE(sileht): we reset the channel and ensure
# the kombu underlying connection works
self._set_current_channel(None)
self.ensure(method=lambda: self.connection.connection)
def ensure(self, method, retry=None,
recoverable_error_callback=None, error_callback=None,
@ -649,7 +657,7 @@ class Connection(object):
consumer.declare(self)
LOG.info(_LI('Reconnected to AMQP server on '
'%(hostname)s:%(port)s'),
'%(hostname)s:%(port)s via [%(transport)s] client'),
self.connection.info())
def execute_method(channel):
@ -695,6 +703,10 @@ class Connection(object):
'tries: %(err_str)s') % info
LOG.error(msg)
raise exceptions.MessageDeliveryFailure(msg)
except rpc_amqp.AMQPDestinationNotFound:
# NOTE(sileht): we must reraise this without
# trigger error_callback
raise
except Exception as exc:
error_callback and error_callback(exc)
raise
@ -727,7 +739,6 @@ class Connection(object):
for tag, consumer in enumerate(self._consumers):
consumer.cancel(tag=tag)
except recoverable_errors:
self._set_current_channel(None)
self.ensure_connection()
self._consumers = []
@ -848,7 +859,8 @@ class Connection(object):
raise rpc_common.Timeout()
def _recoverable_error_callback(exc):
self._new_consumers = self._consumers
if not isinstance(exc, rpc_common.Timeout):
self._new_consumers = self._consumers
timer.check_return(_raise_timeout, exc)
def _error_callback(exc):
@ -1033,32 +1045,20 @@ class Connection(object):
self._publish(exchange, msg, routing_key=routing_key, timeout=timeout)
def _publish_and_retry_on_missing_exchange(self, exchange, msg,
routing_key=None, timeout=None):
"""Publisher that retry if the exchange is missing.
"""
def _publish_and_raises_on_missing_exchange(self, exchange, msg,
routing_key=None,
timeout=None):
"""Publisher that raises exception if exchange is missing."""
if not exchange.passive:
raise RuntimeError("_publish_and_retry_on_missing_exchange() must "
"be called with an passive exchange.")
# TODO(sileht): use @retrying
# NOTE(sileht): no need to wait the application expect a response
# before timeout is exshauted
duration = (
timeout if timeout is not None
else self.kombu_reconnect_timeout
)
timer = rpc_common.DecayingTimer(duration=duration)
timer.start()
while True:
try:
self._publish(exchange, msg, routing_key=routing_key,
timeout=timeout)
return
except self.connection.channel_errors as exc:
try:
self._publish(exchange, msg, routing_key=routing_key,
timeout=timeout)
return
except self.connection.channel_errors as exc:
if exc.code == 404:
# NOTE(noelbk/sileht):
# If rabbit dies, the consumer can be disconnected before the
# publisher sends, and if the consumer hasn't declared the
@ -1067,24 +1067,9 @@ class Connection(object):
# So we set passive=True to the publisher exchange and catch
# the 404 kombu ChannelError and retry until the exchange
# appears
if exc.code == 404 and timer.check_return() > 0:
LOG.info(_LI("The exchange %(exchange)s to send to "
"%(routing_key)s doesn't exist yet, "
"retrying...") % {
'exchange': exchange.name,
'routing_key': routing_key})
time.sleep(0.25)
continue
elif exc.code == 404:
msg = _("The exchange %(exchange)s to send to "
"%(routing_key)s still doesn't exist after "
"%(duration)s sec abandoning...") % {
'duration': duration,
'exchange': exchange.name,
'routing_key': routing_key}
LOG.info(msg)
raise rpc_amqp.AMQPDestinationNotFound(msg)
raise
raise rpc_amqp.AMQPDestinationNotFound(
"exchange %s doesn't exists" % exchange.name)
raise
def direct_send(self, msg_id, msg):
"""Send a 'direct' message."""
@ -1094,7 +1079,7 @@ class Connection(object):
auto_delete=True,
passive=True)
self._ensure_publishing(self._publish_and_retry_on_missing_exchange,
self._ensure_publishing(self._publish_and_raises_on_missing_exchange,
exchange, msg, routing_key=msg_id)
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
@ -1150,7 +1135,10 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
conf.register_opts(base.base_opts, group=opt_group)
connection_pool = rpc_amqp.ConnectionPool(
self.missing_destination_retry_timeout = (
conf.oslo_messaging_rabbit.kombu_missing_consumer_retry_timeout)
connection_pool = pool.ConnectionPool(
conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size,
url, Connection)
@ -1158,8 +1146,7 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
conf, url,
connection_pool,
default_exchange,
allowed_remote_exmods,
conf.oslo_messaging_rabbit.send_single_reply,
allowed_remote_exmods
)
def require_features(self, requeue=True):

View File

@ -13,6 +13,7 @@
# under the License.
import logging
import os
import pprint
import socket
import threading
@ -23,8 +24,11 @@ from stevedore import driver
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client import zmq_client
from oslo_messaging._drivers.zmq_driver.client import zmq_client_light
from oslo_messaging._drivers.zmq_driver.server import zmq_server
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._executors import impl_pooledexecutor # FIXME(markmc)
from oslo_messaging._i18n import _LE
pformat = pprint.pformat
@ -78,8 +82,23 @@ zmq_opts = [
'Poll raises timeout exception when timeout expired.'),
cfg.BoolOpt('zmq_use_broker',
default=True,
help='Shows whether zmq-messaging uses broker or not.')
default=False,
help='Configures zmq-messaging to use broker or not.'),
cfg.PortOpt('rpc_zmq_min_port',
default=49152,
help='Minimal port number for random ports range.'),
cfg.IntOpt('rpc_zmq_max_port',
min=1,
max=65536,
default=65536,
help='Maximal port number for random ports range.'),
cfg.IntOpt('rpc_zmq_bind_port_retries',
default=100,
help='Number of retries to find free port number before '
'fail with ZMQBindError.')
]
@ -91,6 +110,7 @@ class LazyDriverItem(object):
self.item_class = item_cls
self.args = args
self.kwargs = kwargs
self.process_id = os.getpid()
def get(self):
# NOTE(ozamiatin): Lazy initialization.
@ -99,11 +119,12 @@ class LazyDriverItem(object):
# __init__, but 'fork' extensively used by services
# breaks all things.
if self.item is not None:
if self.item is not None and os.getpid() == self.process_id:
return self.item
self._lock.acquire()
if self.item is None:
if self.item is None or os.getpid() != self.process_id:
self.process_id = os.getpid()
self.item = self.item_class(*self.args, **self.kwargs)
self._lock.release()
return self.item
@ -143,6 +164,10 @@ class ZmqDriver(base.BaseDriver):
:param allowed_remote_exmods: remote exception passing options
:type allowed_remote_exmods: list
"""
zmq = zmq_async.import_zmq()
if zmq is None:
raise ImportError(_LE("ZeroMQ is not available!"))
conf.register_opts(zmq_opts)
conf.register_opts(impl_pooledexecutor._pool_opts)
conf.register_opts(base.base_opts)
@ -160,12 +185,15 @@ class ZmqDriver(base.BaseDriver):
self.notify_server = LazyDriverItem(
zmq_server.ZmqServer, self, self.conf, self.matchmaker)
client_cls = zmq_client_light.ZmqClientLight \
if conf.zmq_use_broker else zmq_client.ZmqClient
self.client = LazyDriverItem(
zmq_client.ZmqClient, self.conf, self.matchmaker,
client_cls, self.conf, self.matchmaker,
self.allowed_remote_exmods)
self.notifier = LazyDriverItem(
zmq_client.ZmqClient, self.conf, self.matchmaker,
client_cls, self.conf, self.matchmaker,
self.allowed_remote_exmods)
super(ZmqDriver, self).__init__(conf, url, default_exchange,
@ -229,7 +257,7 @@ class ZmqDriver(base.BaseDriver):
:param target: Message destination target
:type target: oslo_messaging.Target
"""
server = self.server.get()
server = zmq_server.ZmqServer(self, self.conf, self.matchmaker)
server.listen(target)
return server

View File

@ -17,8 +17,13 @@ import abc
import collections
import threading
from oslo_log import log as logging
import six
from oslo_messaging._drivers import common
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class Pool(object):
@ -86,3 +91,24 @@ class Pool(object):
@abc.abstractmethod
def create(self):
"""Construct a new item."""
class ConnectionPool(Pool):
"""Class that implements a Pool of Connections."""
def __init__(self, conf, rpc_conn_pool_size, url, connection_cls):
self.connection_cls = connection_cls
self.conf = conf
self.url = url
super(ConnectionPool, self).__init__(rpc_conn_pool_size)
self.reply_proxy = None
# TODO(comstud): Timeout connections not used in a while
def create(self, purpose=None):
if purpose is None:
purpose = common.PURPOSE_SEND
LOG.debug('Pool creating new connection')
return self.connection_cls(self.conf, self.url, purpose)
def empty(self):
for item in self.iter_free():
item.close()

View File

@ -117,8 +117,12 @@ class ProtonListener(base.Listener):
super(ProtonListener, self).__init__(driver)
self.incoming = moves.queue.Queue()
def poll(self):
message = self.incoming.get()
@base.batch_poll_helper
def poll(self, timeout=None):
try:
message = self.incoming.get(True, timeout)
except moves.queue.Empty:
return
request, ctxt = unmarshal_request(message)
LOG.debug("Returning incoming message")
return ProtonIncomingMessage(self, ctxt, request, message)
@ -180,7 +184,7 @@ class ProtonDriver(base.BaseDriver):
"""Send a message to the given target."""
# TODO(kgiusti) need to add support for retry
if retry is not None:
raise NotImplementedError('"retry" not implemented by'
raise NotImplementedError('"retry" not implemented by '
'this transport driver')
request = marshal_request(message, ctxt, envelope)
@ -210,7 +214,7 @@ class ProtonDriver(base.BaseDriver):
"""Send a notification message to the given target."""
# TODO(kgiusti) need to add support for retry
if retry is not None:
raise NotImplementedError('"retry" not implemented by'
raise NotImplementedError('"retry" not implemented by '
'this transport driver')
return self.send(target, ctxt, message, envelope=(version == 2.0))
@ -226,7 +230,7 @@ class ProtonDriver(base.BaseDriver):
def listen_for_notifications(self, targets_and_priorities, pool):
LOG.debug("Listen for notifications %s", targets_and_priorities)
if pool:
raise NotImplementedError('"pool" not implemented by'
raise NotImplementedError('"pool" not implemented by '
'this transport driver')
listener = ProtonListener(self)
for target, priority in targets_and_priorities:

View File

@ -41,7 +41,7 @@ class SendTask(controller.Task):
"""Wait for the send to complete, and, optionally, a reply message from
the remote. Will raise MessagingTimeout if the send does not complete
or no reply is received within timeout seconds. If the request has
failed for any other reason, a MessagingException is raised."
failed for any other reason, a MessagingException is raised.
"""
try:
result = self._results_queue.get(timeout=timeout)

View File

@ -64,6 +64,7 @@ amqp1_opts = [
cfg.StrOpt('ssl_key_password',
default=None,
deprecated_group='amqp1',
secret=True,
help='Password for decrypting ssl_key_file (if encrypted)'),
cfg.BoolOpt('allow_insecure_clients',
@ -94,5 +95,6 @@ amqp1_opts = [
cfg.StrOpt('password',
default='',
deprecated_group='amqp1',
secret=True,
help='Password for message broker authentication')
]

View File

@ -16,7 +16,6 @@ import logging
import os
from oslo_utils import excutils
import six
from stevedore import driver
from oslo_messaging._drivers.zmq_driver.broker import zmq_queue_proxy
@ -51,11 +50,8 @@ class ZmqBroker(object):
).driver(self.conf)
self.context = zmq.Context()
self.queue = six.moves.queue.Queue()
self.proxies = [zmq_queue_proxy.OutgoingQueueProxy(
conf, self.context, self.queue, self.matchmaker),
zmq_queue_proxy.IncomingQueueProxy(
conf, self.context, self.queue)
self.proxies = [zmq_queue_proxy.UniversalQueueProxy(
conf, self.context, self.matchmaker)
]
def _create_ipc_dirs(self):

View File

@ -14,65 +14,69 @@
import logging
import six
from oslo_messaging._drivers.zmq_driver.broker import zmq_base_proxy
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_dealer_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher_proxy
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LI
zmq = zmq_async.import_zmq(zmq_concurrency='native')
LOG = logging.getLogger(__name__)
class OutgoingQueueProxy(zmq_base_proxy.BaseProxy):
class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
def __init__(self, conf, context, matchmaker):
super(UniversalQueueProxy, self).__init__(conf, context)
self.poller = zmq_async.get_poller(zmq_concurrency='native')
self.router_socket = context.socket(zmq.ROUTER)
self.router_socket.bind(zmq_address.get_broker_address(conf))
self.poller.register(self.router_socket, self._receive_in_request)
LOG.info(_LI("Polling at universal proxy"))
def __init__(self, conf, context, queue, matchmaker):
super(OutgoingQueueProxy, self).__init__(conf, context)
self.queue = queue
self.matchmaker = matchmaker
self.publisher = zmq_dealer_publisher.DealerPublisher(
conf, matchmaker)
LOG.info(_LI("Polling at outgoing proxy ..."))
reply_receiver = zmq_dealer_publisher_proxy.ReplyReceiver(self.poller)
self.publisher = zmq_dealer_publisher_proxy.DealerPublisherProxy(
conf, matchmaker, reply_receiver)
def run(self):
try:
request = self.queue.get(timeout=self.conf.rpc_poll_timeout)
LOG.info(_LI("Redirecting request %s to TCP publisher ...")
% request)
self.publisher.send_request(request)
except six.moves.queue.Empty:
message, socket = self.poller.poll(self.conf.rpc_poll_timeout)
if message is None:
return
if socket == self.router_socket:
self._redirect_in_request(message)
else:
self._redirect_reply(message)
class IncomingQueueProxy(zmq_base_proxy.BaseProxy):
def _redirect_in_request(self, request):
LOG.debug("-> Redirecting request %s to TCP publisher"
% request)
self.publisher.send_request(request)
def __init__(self, conf, context, queue):
super(IncomingQueueProxy, self).__init__(conf, context)
self.poller = zmq_async.get_poller(
zmq_concurrency='native')
self.queue = queue
self.socket = context.socket(zmq.ROUTER)
self.socket.bind(zmq_address.get_broker_address(conf))
self.poller.register(self.socket, self.receive_request)
LOG.info(_LI("Polling at incoming proxy ..."))
def run(self):
request, socket = self.poller.poll(self.conf.rpc_poll_timeout)
if request is None:
def _redirect_reply(self, reply):
LOG.debug("Reply proxy %s" % reply)
if reply[zmq_names.IDX_REPLY_TYPE] == zmq_names.ACK_TYPE:
LOG.debug("Acknowledge dropped %s" % reply)
return
LOG.info(_LI("Received request and queue it: %s") % str(request))
LOG.debug("<- Redirecting reply to ROUTER: reply: %s"
% reply[zmq_names.IDX_REPLY_BODY:])
self.queue.put(request)
self.router_socket.send_multipart(reply[zmq_names.IDX_REPLY_BODY:])
def receive_request(self, socket):
def _receive_in_request(self, socket):
reply_id = socket.recv()
assert reply_id is not None, "Valid id expected"
empty = socket.recv()
assert empty == b'', "Empty delimiter expected"
return socket.recv_pyobj()
envelope = socket.recv_pyobj()
if envelope[zmq_names.FIELD_MSG_TYPE] == zmq_names.CALL_TYPE:
envelope[zmq_names.FIELD_REPLY_ID] = reply_id
payload = socket.recv_multipart()
payload.insert(0, envelope)
return payload

View File

@ -0,0 +1,194 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import threading
from concurrent import futures
import futurist
import oslo_messaging
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class DealerCallPublisher(zmq_publisher_base.PublisherBase):
"""Thread-safe CALL publisher
Used as faster and thread-safe publisher for CALL
instead of ReqPublisher.
"""
def __init__(self, conf, matchmaker):
super(DealerCallPublisher, self).__init__(conf)
self.matchmaker = matchmaker
self.reply_waiter = ReplyWaiter(conf)
self.sender = RequestSender(conf, matchmaker, self.reply_waiter) \
if not conf.zmq_use_broker else \
RequestSenderLight(conf, matchmaker, self.reply_waiter)
def send_request(self, request):
reply_future = self.sender.send_request(request)
try:
reply = reply_future.result(timeout=request.timeout)
except futures.TimeoutError:
raise oslo_messaging.MessagingTimeout(
"Timeout %s seconds was reached" % request.timeout)
finally:
self.reply_waiter.untrack_id(request.message_id)
LOG.debug("Received reply %s" % reply)
if reply[zmq_names.FIELD_FAILURE]:
raise rpc_common.deserialize_remote_exception(
reply[zmq_names.FIELD_FAILURE],
request.allowed_remote_exmods)
else:
return reply[zmq_names.FIELD_REPLY]
class RequestSender(zmq_publisher_base.PublisherMultisend):
def __init__(self, conf, matchmaker, reply_waiter):
super(RequestSender, self).__init__(conf, matchmaker, zmq.DEALER)
self.reply_waiter = reply_waiter
self.queue, self.empty_except = zmq_async.get_queue()
self.executor = zmq_async.get_executor(self.run_loop)
self.executor.execute()
def send_request(self, request):
reply_future = futurist.Future()
self.reply_waiter.track_reply(reply_future, request.message_id)
self.queue.put(request)
return reply_future
def _do_send_request(self, socket, request):
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(request)
LOG.debug("Sending message_id %(message)s to a target %(target)s"
% {"message": request.message_id,
"target": request.target})
def _check_hosts_connections(self, target, listener_type):
if str(target) in self.outbound_sockets:
socket = self.outbound_sockets[str(target)]
else:
hosts = self.matchmaker.get_hosts(
target, listener_type)
socket = zmq_socket.ZmqSocket(self.zmq_context, self.socket_type)
self.outbound_sockets[str(target)] = socket
for host in hosts:
self._connect_to_host(socket, host, target)
return socket
def run_loop(self):
try:
request = self.queue.get(timeout=self.conf.rpc_poll_timeout)
except self.empty_except:
return
socket = self._check_hosts_connections(
request.target, zmq_names.socket_type_str(zmq.ROUTER))
self._do_send_request(socket, request)
self.reply_waiter.poll_socket(socket)
class RequestSenderLight(RequestSender):
"""This class used with proxy.
Simplified address matching because there is only
one proxy IPC address.
"""
def __init__(self, conf, matchmaker, reply_waiter):
if not conf.zmq_use_broker:
raise rpc_common.RPCException("RequestSenderLight needs a proxy!")
super(RequestSenderLight, self).__init__(
conf, matchmaker, reply_waiter)
self.socket = None
def _check_hosts_connections(self, target, listener_type):
if self.socket is None:
self.socket = zmq_socket.ZmqSocket(self.zmq_context,
self.socket_type)
self.outbound_sockets[str(target)] = self.socket
address = zmq_address.get_broker_address(self.conf)
self._connect_to_address(self.socket, address, target)
return self.socket
def _do_send_request(self, socket, request):
LOG.debug("Sending %(type)s message_id %(message)s"
" to a target %(target)s"
% {"type": request.msg_type,
"message": request.message_id,
"target": request.target})
envelope = request.create_envelope()
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(envelope, zmq.SNDMORE)
socket.send_pyobj(request)
class ReplyWaiter(object):
def __init__(self, conf):
self.conf = conf
self.replies = {}
self.poller = zmq_async.get_poller()
self.executor = zmq_async.get_executor(self.run_loop)
self.executor.execute()
self._lock = threading.Lock()
def track_reply(self, reply_future, message_id):
self._lock.acquire()
self.replies[message_id] = reply_future
self._lock.release()
def untrack_id(self, message_id):
self._lock.acquire()
self.replies.pop(message_id)
self._lock.release()
def poll_socket(self, socket):
def _receive_method(socket):
empty = socket.recv()
assert empty == b'', "Empty expected!"
reply = socket.recv_pyobj()
LOG.debug("Received reply %s" % reply)
return reply
self.poller.register(socket, recv_method=_receive_method)
def run_loop(self):
reply, socket = self.poller.poll(
timeout=self.conf.rpc_poll_timeout)
if reply is not None:
call_future = self.replies[reply[zmq_names.FIELD_MSG_ID]]
call_future.set_result(reply)

View File

@ -18,7 +18,7 @@ from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LI, _LW
from oslo_messaging._i18n import _LW
LOG = logging.getLogger(__name__)
@ -29,14 +29,13 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
def __init__(self, conf, matchmaker):
super(DealerPublisher, self).__init__(conf, matchmaker, zmq.DEALER)
self.ack_receiver = AcknowledgementReceiver()
def send_request(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
self._check_request_pattern(request)
dealer_socket, hosts = self._check_hosts_connections(request.target)
dealer_socket = self._check_hosts_connections(
request.target, zmq_names.socket_type_str(zmq.ROUTER))
if not dealer_socket.connections:
# NOTE(ozamiatin): Here we can provide
@ -47,29 +46,31 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
% request.msg_type)
return
self.ack_receiver.track_socket(dealer_socket.handle)
if request.msg_type in zmq_names.MULTISEND_TYPES:
for _ in range(dealer_socket.connections_count()):
self._send_request(dealer_socket, request)
else:
self._send_request(dealer_socket, request)
def _check_request_pattern(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
def _send_request(self, socket, request):
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(request)
LOG.info(_LI("Sending message %(message)s to a target %(target)s")
% {"message": request.message,
"target": request.target})
LOG.debug("Sending message_id %(message)s to a target %(target)s"
% {"message": request.message_id,
"target": request.target})
def cleanup(self):
self.ack_receiver.cleanup()
super(DealerPublisher, self).cleanup()
class DealerPublisherLight(zmq_publisher_base.PublisherBase):
"""Used when publishing to proxy. """
def __init__(self, conf, address):
super(DealerPublisherLight, self).__init__(conf)
@ -81,7 +82,10 @@ class DealerPublisherLight(zmq_publisher_base.PublisherBase):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
envelope = request.create_envelope()
self.socket.send(b'', zmq.SNDMORE)
self.socket.send_pyobj(envelope, zmq.SNDMORE)
self.socket.send_pyobj(request)
def cleanup(self):
@ -107,8 +111,7 @@ class AcknowledgementReceiver(object):
def poll_for_acknowledgements(self):
ack_message, socket = self.poller.poll()
LOG.info(_LI("Message %s acknowledged")
% ack_message[zmq_names.FIELD_ID])
LOG.debug("Message %s acknowledged" % ack_message[zmq_names.FIELD_ID])
def cleanup(self):
self.thread.stop()

View File

@ -0,0 +1,87 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LI, _LW
zmq = zmq_async.import_zmq()
LOG = logging.getLogger(__name__)
class DealerPublisherProxy(zmq_dealer_publisher.DealerPublisher):
def __init__(self, conf, matchmaker, reply_receiver):
super(DealerPublisherProxy, self).__init__(conf, matchmaker)
self.reply_receiver = reply_receiver
def send_request(self, multipart_message):
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
LOG.debug("Envelope: %s" % envelope)
target = envelope[zmq_names.FIELD_TARGET]
dealer_socket = self._check_hosts_connections(
target, zmq_names.socket_type_str(zmq.ROUTER))
if not dealer_socket.connections:
# NOTE(ozamiatin): Here we can provide
# a queue for keeping messages to send them later
# when some listener appears. However such approach
# being more reliable will consume additional memory.
LOG.warning(_LW("Request %s was dropped because no connection")
% envelope[zmq_names.FIELD_MSG_TYPE])
return
self.reply_receiver.track_socket(dealer_socket.handle)
LOG.debug("Sending message %(message)s to a target %(target)s"
% {"message": envelope[zmq_names.FIELD_MSG_ID],
"target": envelope[zmq_names.FIELD_TARGET]})
if envelope[zmq_names.FIELD_MSG_TYPE] in zmq_names.MULTISEND_TYPES:
for _ in range(dealer_socket.connections_count()):
self._send_request(dealer_socket, multipart_message)
else:
self._send_request(dealer_socket, multipart_message)
def _send_request(self, socket, multipart_message):
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(
multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE],
zmq.SNDMORE)
socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY])
class ReplyReceiver(object):
def __init__(self, poller):
self.poller = poller
LOG.info(_LI("Reply waiter created in broker"))
def _receive_reply(self, socket):
return socket.recv_multipart()
def track_socket(self, socket):
self.poller.register(socket, self._receive_reply)
def cleanup(self):
self.poller.close()

View File

@ -18,7 +18,6 @@ from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LI
LOG = logging.getLogger(__name__)
@ -35,13 +34,14 @@ class PubPublisher(zmq_publisher_base.PublisherMultisend):
if request.msg_type not in zmq_names.NOTIFY_TYPES:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
pub_socket, hosts = self._check_hosts_connections(request.target)
pub_socket = self._check_hosts_connections(
request.target, zmq_names.socket_type_str(zmq.SUB))
self._send_request(pub_socket, request)
def _send_request(self, socket, request):
super(PubPublisher, self)._send_request(socket, request)
LOG.info(_LI("Publishing message %(message)s to a target %(target)s")
% {"message": request.message,
"target": request.target})
LOG.debug("Publishing message %(message)s to a target %(target)s"
% {"message": request.message,
"target": request.target})

View File

@ -14,6 +14,7 @@
import abc
import logging
import uuid
import six
@ -89,6 +90,11 @@ class PublisherBase(object):
:param request: Message data and destination container object
:type request: zmq_request.Request
"""
LOG.debug("Sending %(type)s message_id %(message)s to a target "
"%(target)s"
% {"type": request.msg_type,
"message": request.message_id,
"target": request.target})
socket.send_pyobj(request)
def cleanup(self):
@ -115,30 +121,33 @@ class PublisherMultisend(PublisherBase):
self.socket_type = socket_type
self.matchmaker = matchmaker
def _check_hosts_connections(self, target):
def _check_hosts_connections(self, target, listener_type):
# TODO(ozamiatin): Place for significant optimization
# Matchmaker cache should be implemented
hosts = self.matchmaker.get_hosts(target)
if str(target) in self.outbound_sockets:
socket = self.outbound_sockets[str(target)]
else:
hosts = self.matchmaker.get_hosts(target, listener_type)
socket = zmq_socket.ZmqSocket(self.zmq_context, self.socket_type)
self.outbound_sockets[str(target)] = socket
for host in hosts:
self._connect_to_host(socket, host, target)
for host in hosts:
self._connect_to_host(socket, host, target)
return socket
return socket, hosts
def _connect_to_host(self, socket, host, target):
address = zmq_address.get_tcp_direct_address(host)
LOG.info(address)
def _connect_to_address(self, socket, address, target):
stype = zmq_names.socket_type_str(self.socket_type)
try:
LOG.info(_LI("Connecting %(stype)s to %(address)s for %(target)s")
% {"stype": stype,
"address": address,
"target": target})
if six.PY3:
socket.setsockopt_string(zmq.IDENTITY, str(uuid.uuid1()))
else:
socket.handle.identity = str(uuid.uuid1())
socket.connect(address)
except zmq.ZMQError as e:
errmsg = _LE("Failed connecting %(stype) to %(address)s: %(e)s")\
@ -146,3 +155,7 @@ class PublisherMultisend(PublisherBase):
LOG.error(_LE("Failed connecting %(stype) to %(address)s: %(e)s")
% (stype, address, e))
raise rpc_common.RPCException(errmsg)
def _connect_to_host(self, socket, host, target):
address = zmq_address.get_tcp_direct_address(host)
self._connect_to_address(socket, address, target)

View File

@ -18,7 +18,7 @@ from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_publisher_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LI, _LW
from oslo_messaging._i18n import _LW
LOG = logging.getLogger(__name__)
@ -35,7 +35,8 @@ class PushPublisher(zmq_publisher_base.PublisherMultisend):
if request.msg_type == zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
push_socket, hosts = self._check_hosts_connections(request.target)
push_socket = self._check_hosts_connections(
request.target, zmq_names.socket_type_str(zmq.PULL))
if not push_socket.connections:
LOG.warning(_LW("Request %s was dropped because no connection")
@ -52,6 +53,6 @@ class PushPublisher(zmq_publisher_base.PublisherMultisend):
super(PushPublisher, self)._send_request(socket, request)
LOG.info(_LI("Publishing message %(message)s to a target %(target)s")
% {"message": request.message,
"target": request.target})
LOG.debug("Publishing message %(message)s to a target %(target)s"
% {"message": request.message,
"target": request.target})

View File

@ -14,6 +14,9 @@
import contextlib
import logging
import uuid
import six
import oslo_messaging
from oslo_messaging._drivers import common as rpc_common
@ -40,24 +43,35 @@ class ReqPublisher(zmq_publisher_base.PublisherBase):
if request.msg_type != zmq_names.CALL_TYPE:
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
socket = self._connect_to_host(request.target, request.timeout)
socket, connect_address = self._connect_to_host(request.target,
request.timeout)
request.host = connect_address
self._send_request(socket, request)
return self._receive_reply(socket, request)
def _resolve_host_address(self, target, timeout=0):
host = self.matchmaker.get_single_host(
target, zmq_names.socket_type_str(zmq.ROUTER), timeout)
return zmq_address.get_tcp_direct_address(host)
def _connect_to_host(self, target, timeout=0):
try:
self.zmq_context = zmq.Context()
socket = self.zmq_context.socket(zmq.REQ)
host = self.matchmaker.get_single_host(target, timeout)
connect_address = zmq_address.get_tcp_direct_address(host)
if six.PY3:
socket.setsockopt_string(zmq.IDENTITY, str(uuid.uuid1()))
else:
socket.identity = str(uuid.uuid1())
connect_address = self._resolve_host_address(target, timeout)
LOG.info(_LI("Connecting REQ to %s") % connect_address)
socket.connect(connect_address)
self.outbound_sockets[str(target)] = socket
return socket
return socket, connect_address
except zmq.ZMQError as e:
errmsg = _LE("Error connecting to socket: %s") % str(e)
@ -68,8 +82,11 @@ class ReqPublisher(zmq_publisher_base.PublisherBase):
def _receive_reply(socket, request):
def _receive_method(socket):
return socket.recv_pyobj()
reply = socket.recv_pyobj()
LOG.debug("Received reply %s" % reply)
return reply
LOG.debug("Start waiting reply")
# NOTE(ozamiatin): Check for retry here (no retries now)
with contextlib.closing(zmq_async.get_reply_poller()) as poller:
poller.register(socket, recv_method=_receive_method)
@ -77,6 +94,7 @@ class ReqPublisher(zmq_publisher_base.PublisherBase):
if reply is None:
raise oslo_messaging.MessagingTimeout(
"Timeout %s seconds was reached" % request.timeout)
LOG.debug("Received reply %s" % reply)
if reply[zmq_names.FIELD_FAILURE]:
raise rpc_common.deserialize_remote_exception(
reply[zmq_names.FIELD_FAILURE],
@ -87,3 +105,26 @@ class ReqPublisher(zmq_publisher_base.PublisherBase):
def close(self):
# For contextlib compatibility
self.cleanup()
class ReqPublisherLight(ReqPublisher):
def __init__(self, conf, matchmaker):
super(ReqPublisherLight, self).__init__(conf, matchmaker)
def _resolve_host_address(self, target, timeout=0):
return zmq_address.get_broker_address(self.conf)
def _send_request(self, socket, request):
LOG.debug("Sending %(type)s message_id %(message)s"
" to a target %(target)s, host:%(host)s"
% {"type": request.msg_type,
"message": request.message_id,
"target": request.target,
"host": request.host})
envelope = request.create_envelope()
socket.send_pyobj(envelope, zmq.SNDMORE)
socket.send_pyobj(request)

View File

@ -12,68 +12,33 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from oslo_messaging._drivers.zmq_driver.client.publishers\
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_call_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers\
import zmq_req_publisher
from oslo_messaging._drivers.zmq_driver.client import zmq_request
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
zmq = zmq_async.import_zmq()
class ZmqClient(object):
class ZmqClient(zmq_client_base.ZmqClientBase):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
self.conf = conf
self.context = zmq.Context()
self.matchmaker = matchmaker
self.allowed_remote_exmods = allowed_remote_exmods or []
if conf.zmq_use_broker:
raise rpc_common.RPCException("This client doesn't need proxy!")
self.dealer_publisher = None
if self.conf.zmq_use_broker:
self.dealer_publisher = zmq_dealer_publisher.DealerPublisherLight(
conf, zmq_address.get_broker_address(self.conf))
else:
self.dealer_publisher = zmq_dealer_publisher.DealerPublisher(
conf, matchmaker)
super(ZmqClient, self).__init__(
conf, matchmaker, allowed_remote_exmods,
publishers={
zmq_names.CALL_TYPE:
zmq_dealer_call_publisher.DealerCallPublisher(
conf, matchmaker),
def send_call(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.CallRequest(
target, context=context, message=message,
timeout=timeout, retry=retry,
allowed_remote_exmods=self.allowed_remote_exmods)) as request:
with contextlib.closing(zmq_req_publisher.ReqPublisher(
self.conf, self.matchmaker)) as req_publisher:
return req_publisher.send_request(request)
def send_cast(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.CastRequest(
target, context=context, message=message,
timeout=timeout, retry=retry)) as request:
self.dealer_publisher.send_request(request)
def send_fanout(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.FanoutRequest(
target, context=context, message=message,
timeout=timeout, retry=retry)) as request:
self.dealer_publisher.send_request(request)
def send_notify(self, target, context, message, version, retry=None):
with contextlib.closing(zmq_request.NotificationRequest(
target, context, message, version=version,
retry=retry)) as request:
self.dealer_publisher.send_request(request)
def send_notify_fanout(self, target, context, message, version,
retry=None):
with contextlib.closing(zmq_request.NotificationFanoutRequest(
target, context, message, version=version,
retry=retry)) as request:
self.dealer_publisher.send_request(request)
def cleanup(self):
self.dealer_publisher.cleanup()
"default": zmq_dealer_publisher.DealerPublisher(
conf, matchmaker)
}
)

View File

@ -0,0 +1,77 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from oslo_messaging._drivers.zmq_driver.client import zmq_request
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
zmq = zmq_async.import_zmq()
class ZmqClientBase(object):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None,
publishers=None):
self.conf = conf
self.context = zmq.Context()
self.matchmaker = matchmaker
self.allowed_remote_exmods = allowed_remote_exmods or []
self.publishers = publishers
self.call_publisher = publishers.get(zmq_names.CALL_TYPE) \
or publishers["default"]
self.cast_publisher = publishers.get(zmq_names.CAST_TYPE) \
or publishers["default"]
self.fanout_publisher = publishers.get(zmq_names.CAST_FANOUT_TYPE) \
or publishers["default"]
self.notify_publisher = publishers.get(zmq_names.NOTIFY_TYPE) \
or publishers["default"]
def send_call(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.CallRequest(
target, context=context, message=message,
timeout=timeout, retry=retry,
allowed_remote_exmods=self.allowed_remote_exmods)) as request:
return self.call_publisher.send_request(request)
def send_cast(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.CastRequest(
target, context=context, message=message,
timeout=timeout, retry=retry)) as request:
self.cast_publisher.send_request(request)
def send_fanout(self, target, context, message, timeout=None, retry=None):
with contextlib.closing(zmq_request.FanoutRequest(
target, context=context, message=message,
timeout=timeout, retry=retry)) as request:
self.fanout_publisher.send_request(request)
def send_notify(self, target, context, message, version, retry=None):
with contextlib.closing(zmq_request.NotificationRequest(
target, context, message, version=version,
retry=retry)) as request:
self.notify_publisher.send_request(request)
def send_notify_fanout(self, target, context, message, version,
retry=None):
with contextlib.closing(zmq_request.NotificationFanoutRequest(
target, context, message, version=version,
retry=retry)) as request:
self.notify_publisher.send_request(request)
def cleanup(self):
for publisher in self.publishers.values():
publisher.cleanup()

View File

@ -0,0 +1,46 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_call_publisher
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
import zmq_dealer_publisher
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
zmq = zmq_async.import_zmq()
class ZmqClientLight(zmq_client_base.ZmqClientBase):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
if not conf.zmq_use_broker:
raise rpc_common.RPCException(
"This client needs proxy to be configured!")
super(ZmqClientLight, self).__init__(
conf, matchmaker, allowed_remote_exmods,
publishers={
zmq_names.CALL_TYPE:
zmq_dealer_call_publisher.DealerCallPublisher(
conf, matchmaker),
"default": zmq_dealer_publisher.DealerPublisherLight(
conf, zmq_address.get_broker_address(self.conf))
}
)

View File

@ -63,6 +63,12 @@ class Request(object):
self.message = message
self.retry = retry
self.message_id = str(uuid.uuid1())
self.proxy_reply_id = None
def create_envelope(self):
return {'msg_type': self.msg_type,
'message_id': self.message_id,
'target': self.target}
@abc.abstractproperty
def msg_type(self):
@ -86,6 +92,11 @@ class RpcRequest(Request):
super(RpcRequest, self).__init__(*args, **kwargs)
def create_envelope(self):
envelope = super(RpcRequest, self).create_envelope()
envelope['timeout'] = self.timeout
return envelope
class CallRequest(RpcRequest):

View File

@ -20,6 +20,7 @@ import retrying
import six
import oslo_messaging
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._i18n import _LI, _LW
@ -35,27 +36,31 @@ class MatchMakerBase(object):
self.conf = conf
@abc.abstractmethod
def register(self, target, hostname):
def register(self, target, hostname, listener_type):
"""Register target on nameserver.
:param target: the target for host
:type target: Target
:param hostname: host for the topic in "host:port" format
:type hostname: String
:param listener_type: Listener socket type ROUTER, SUB etc.
:type listener_type: String
"""
@abc.abstractmethod
def unregister(self, target, hostname):
def unregister(self, target, hostname, listener_type):
"""Unregister target from nameserver.
:param target: the target for host
:type target: Target
:param hostname: host for the topic in "host:port" format
:type hostname: String
:param listener_type: Listener socket type ROUTER, SUB etc.
:type listener_type: String
"""
@abc.abstractmethod
def get_hosts(self, target):
def get_hosts(self, target, listener_type):
"""Get all hosts from nameserver by target.
:param target: the default target for invocations
@ -63,7 +68,7 @@ class MatchMakerBase(object):
:returns: a list of "hostname:port" hosts
"""
def get_single_host(self, target, timeout=None, retry=0):
def get_single_host(self, target, listener_type, timeout=None, retry=0):
"""Get a single host by target.
:param target: the target for messages
@ -101,7 +106,7 @@ class MatchMakerBase(object):
@_retry
def _get_single_host():
hosts = self.get_hosts(target)
hosts = self.get_hosts(target, listener_type)
try:
if not hosts:
err_msg = "No hosts were found for target %s." % target
@ -136,16 +141,16 @@ class DummyMatchMaker(MatchMakerBase):
self._cache = collections.defaultdict(list)
def register(self, target, hostname):
key = str(target)
def register(self, target, hostname, listener_type):
key = zmq_address.target_to_key(target, listener_type)
if hostname not in self._cache[key]:
self._cache[key].append(hostname)
def unregister(self, target, hostname):
key = str(target)
def unregister(self, target, hostname, listener_type):
key = zmq_address.target_to_key(target, listener_type)
if hostname in self._cache[key]:
self._cache[key].remove(hostname)
def get_hosts(self, target):
key = str(target)
def get_hosts(self, target, listener_type):
key = zmq_address.target_to_key(target, listener_type)
return self._cache[key]

View File

@ -17,6 +17,7 @@ from oslo_config import cfg
from oslo_utils import importutils
from oslo_messaging._drivers.zmq_driver.matchmaker import base
from oslo_messaging._drivers.zmq_driver import zmq_address
redis = importutils.try_import('redis')
LOG = logging.getLogger(__name__)
@ -26,9 +27,9 @@ matchmaker_redis_opts = [
cfg.StrOpt('host',
default='127.0.0.1',
help='Host to locate redis.'),
cfg.IntOpt('port',
default=6379,
help='Use this port to connect to redis host.'),
cfg.PortOpt('port',
default=6379,
help='Use this port to connect to redis host.'),
cfg.StrOpt('password',
default='',
secret=True,
@ -48,34 +49,32 @@ class RedisMatchMaker(base.MatchMakerBase):
password=self.conf.matchmaker_redis.password,
)
def _target_to_key(self, target):
attributes = ['topic', 'exchange', 'server']
prefix = "ZMQ-target"
key = ":".join((getattr(target, attr) or "*") for attr in attributes)
return "%s-%s" % (prefix, key)
def _get_keys_by_pattern(self, pattern):
return self._redis.keys(pattern)
def _get_hosts_by_key(self, key):
return self._redis.lrange(key, 0, -1)
def register(self, target, hostname):
key = self._target_to_key(target)
if hostname not in self._get_hosts_by_key(key):
self._redis.lpush(key, hostname)
def register(self, target, hostname, listener_type):
def unregister(self, target, hostname):
key = self._target_to_key(target)
if target.topic and target.server:
key = zmq_address.target_to_key(target, listener_type)
if hostname not in self._get_hosts_by_key(key):
self._redis.lpush(key, hostname)
if target.topic:
key = zmq_address.prefix_str(target.topic, listener_type)
if hostname not in self._get_hosts_by_key(key):
self._redis.lpush(key, hostname)
if target.server:
key = zmq_address.prefix_str(target.server, listener_type)
if hostname not in self._get_hosts_by_key(key):
self._redis.lpush(key, hostname)
def unregister(self, target, hostname, listener_type):
key = zmq_address.target_to_key(target, listener_type)
self._redis.lrem(key, 0, hostname)
def get_hosts(self, target):
pattern = self._target_to_key(target)
if "*" not in pattern:
# pattern have no placeholders, so this is valid key
return self._get_hosts_by_key(pattern)
def get_hosts(self, target, listener_type):
hosts = []
for key in self._get_keys_by_pattern(pattern):
hosts.extend(self._get_hosts_by_key(key))
key = zmq_address.target_to_key(target, listener_type)
hosts.extend(self._get_hosts_by_key(key))
return hosts

View File

@ -38,12 +38,20 @@ class ThreadingPoller(zmq_poller.ZmqPoller):
self.recv_methods = {}
def register(self, socket, recv_method=None):
LOG.debug("Registering socket")
if socket in self.recv_methods:
return
if recv_method is not None:
self.recv_methods[socket] = recv_method
self.poller.register(socket, zmq.POLLIN)
def poll(self, timeout=None):
timeout *= 1000 # zmq poller waits milliseconds
LOG.debug("Entering poll method")
if timeout:
timeout *= 1000 # zmq poller waits milliseconds
sockets = None
try:

View File

@ -19,8 +19,9 @@ import six
from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._i18n import _LE, _LI
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
@ -43,10 +44,10 @@ class ConsumerBase(object):
self.conf, self.context, socket_type)
self.sockets.append(socket)
self.poller.register(socket, self.receive_message)
LOG.info(_LI("Run %(stype)s consumer on %(addr)s:%(port)d"),
{"stype": socket_type,
"addr": socket.bind_address,
"port": socket.port})
LOG.debug("Run %(stype)s consumer on %(addr)s:%(port)d",
{"stype": zmq_names.socket_type_str(socket_type),
"addr": socket.bind_address,
"port": socket.port})
return socket
except zmq.ZMQError as e:
errmsg = _LE("Failed binding to port %(port)d: %(e)s")\

View File

@ -56,9 +56,9 @@ class PullConsumer(zmq_consumer_base.SingleSocketConsumer):
assert msg_type is not None, 'Bad format: msg type expected'
context = socket.recv_pyobj()
message = socket.recv_pyobj()
LOG.info(_LI("Received %(msg_type)s message %(msg)s")
% {"msg_type": msg_type,
"msg": str(message)})
LOG.debug("Received %(msg_type)s message %(msg)s"
% {"msg_type": msg_type,
"msg": str(message)})
if msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES):
return PullIncomingMessage(self.server, context, message)

View File

@ -21,7 +21,7 @@ from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE, _LI
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
@ -43,11 +43,7 @@ class RouterIncomingMessage(base.IncomingMessage):
"""Reply is not needed for non-call messages"""
def acknowledge(self):
LOG.info("Sending acknowledge for %s", self.msg_id)
ack_message = {zmq_names.FIELD_ID: self.msg_id}
self.socket.send(self.reply_id, zmq.SNDMORE)
self.socket.send(b'', zmq.SNDMORE)
self.socket.send_pyobj(ack_message)
LOG.debug("Not sending acknowledge for %s", self.msg_id)
def requeue(self):
"""Requeue is not supported"""
@ -61,36 +57,41 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
self.targets = []
self.host = zmq_address.combine_address(self.conf.rpc_zmq_host,
self.port)
LOG.info("[%s] Run ROUTER consumer" % self.host)
def listen(self, target):
LOG.info("Listen to target %s on %s:%d" %
(target, self.address, self.port))
LOG.info("[%s] Listen to target %s" % (self.host, target))
self.targets.append(target)
self.matchmaker.register(target=target,
hostname=self.host)
self.matchmaker.register(target, self.host,
zmq_names.socket_type_str(zmq.ROUTER))
def cleanup(self):
super(RouterConsumer, self).cleanup()
for target in self.targets:
self.matchmaker.unregister(target, self.host)
self.matchmaker.unregister(target, self.host,
zmq_names.socket_type_str(zmq.ROUTER))
def _receive_request(self, socket):
reply_id = socket.recv()
empty = socket.recv()
assert empty == b'', 'Bad format: empty delimiter expected'
request = socket.recv_pyobj()
return request, reply_id
def receive_message(self, socket):
try:
reply_id = socket.recv()
empty = socket.recv()
assert empty == b'', 'Bad format: empty delimiter expected'
request = socket.recv_pyobj()
LOG.info(_LI("Received %(msg_type)s message %(msg)s")
% {"msg_type": request.msg_type,
"msg": str(request.message)})
request, reply_id = self._receive_request(socket)
LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s"
% {"host": self.host,
"type": request.msg_type,
"id": request.message_id,
"target": request.target})
if request.msg_type == zmq_names.CALL_TYPE:
return zmq_incoming_message.ZmqIncomingRequest(
self.server, request.context, request.message, socket,
reply_id, self.poller)
self.server, socket, reply_id, request, self.poller)
elif request.msg_type in zmq_names.NON_BLOCKING_TYPES:
return RouterIncomingMessage(
self.server, request.context, request.message, socket,
@ -100,3 +101,20 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
except zmq.ZMQError as e:
LOG.error(_LE("Receiving message failed: %s") % str(e))
class RouterConsumerBroker(RouterConsumer):
def __init__(self, conf, poller, server):
super(RouterConsumerBroker, self).__init__(conf, poller, server)
def _receive_request(self, socket):
reply_id = socket.recv()
empty = socket.recv()
assert empty == b'', 'Bad format: empty delimiter expected'
envelope = socket.recv_pyobj()
request = socket.recv_pyobj()
if zmq_names.FIELD_REPLY_ID in envelope:
request.proxy_reply_id = envelope[zmq_names.FIELD_REPLY_ID]
return request, reply_id

View File

@ -28,10 +28,12 @@ zmq = zmq_async.import_zmq()
class ZmqIncomingRequest(base.IncomingMessage):
def __init__(self, listener, context, message, socket, rep_id, poller):
super(ZmqIncomingRequest, self).__init__(listener, context, message)
def __init__(self, listener, socket, rep_id, request, poller):
super(ZmqIncomingRequest, self).__init__(listener, request.context,
request.message)
self.reply_socket = socket
self.reply_id = rep_id
self.request = request
self.received = None
self.poller = poller
@ -39,15 +41,22 @@ class ZmqIncomingRequest(base.IncomingMessage):
if failure is not None:
failure = rpc_common.serialize_remote_exception(failure,
log_failure)
message_reply = {zmq_names.FIELD_REPLY: reply,
message_reply = {zmq_names.FIELD_TYPE: zmq_names.REPLY_TYPE,
zmq_names.FIELD_REPLY: reply,
zmq_names.FIELD_FAILURE: failure,
zmq_names.FIELD_LOG_FAILURE: log_failure}
zmq_names.FIELD_LOG_FAILURE: log_failure,
zmq_names.FIELD_ID: self.request.proxy_reply_id,
zmq_names.FIELD_MSG_ID: self.request.message_id}
LOG.info("Replying %s REP", (str(message_reply)))
LOG.debug("Replying %s", (str(self.request.message_id)))
self.received = True
self.reply_socket.send(self.reply_id, zmq.SNDMORE)
self.reply_socket.send(b'', zmq.SNDMORE)
if self.request.proxy_reply_id:
self.reply_socket.send_string(zmq_names.REPLY_TYPE, zmq.SNDMORE)
self.reply_socket.send(self.request.proxy_reply_id, zmq.SNDMORE)
self.reply_socket.send(b'', zmq.SNDMORE)
self.reply_socket.send_pyobj(message_reply)
self.poller.resume_polling(self.reply_socket)

View File

@ -31,11 +31,16 @@ class ZmqServer(base.Listener):
super(ZmqServer, self).__init__(driver)
self.matchmaker = matchmaker
self.poller = zmq_async.get_poller()
self.rpc_consumer = zmq_router_consumer.RouterConsumer(
conf, self.poller, self)
if conf.zmq_use_broker:
self.rpc_consumer = zmq_router_consumer.RouterConsumerBroker(
conf, self.poller, self)
else:
self.rpc_consumer = zmq_router_consumer.RouterConsumer(
conf, self.poller, self)
self.notify_consumer = self.rpc_consumer
self.consumers = [self.rpc_consumer]
@base.batch_poll_helper
def poll(self, timeout=None):
message, socket = self.poller.poll(
timeout or self.conf.rpc_poll_timeout)

View File

@ -22,8 +22,27 @@ def get_tcp_direct_address(host):
def get_tcp_random_address(conf):
return "tcp://*"
return "tcp://%s" % conf.rpc_zmq_bind_address
def get_broker_address(conf):
return "ipc://%s/zmq-broker" % conf.rpc_zmq_ipc_dir
def prefix_str(key, listener_type):
return listener_type + "_" + key
def target_to_key(target, listener_type):
def prefix(key):
return prefix_str(key, listener_type)
if target.topic and target.server:
attributes = ['topic', 'server']
key = ".".join(getattr(target, attr) for attr in attributes)
return prefix(key)
if target.topic:
return prefix(target.topic)
if target.server:
return prefix(target.server)

View File

@ -30,12 +30,10 @@ def import_zmq(zmq_concurrency='eventlet'):
_raise_error_if_invalid_config_value(zmq_concurrency)
imported_zmq = importutils.try_import(ZMQ_MODULES[zmq_concurrency],
default='zmq')
default=None)
if imported_zmq is None:
errmsg = _LE("ZeroMQ not found!")
LOG.error(_LE("ZeroMQ not found!"))
raise ImportError(errmsg)
return imported_zmq
@ -80,3 +78,13 @@ def _raise_error_if_invalid_config_value(zmq_concurrency):
if zmq_concurrency not in ZMQ_MODULES:
errmsg = _('Invalid zmq_concurrency value: %s')
raise ValueError(errmsg % zmq_concurrency)
def get_queue(zmq_concurrency='eventlet'):
_raise_error_if_invalid_config_value(zmq_concurrency)
if zmq_concurrency == 'eventlet' and _is_eventlet_zmq_available():
import eventlet
return eventlet.queue.Queue(), eventlet.queue.Empty
else:
import six
return six.moves.queue.Queue(), six.moves.queue.Empty

View File

@ -17,10 +17,23 @@ from oslo_messaging._drivers.zmq_driver import zmq_async
zmq = zmq_async.import_zmq()
FIELD_TYPE = 'type'
FIELD_FAILURE = 'failure'
FIELD_REPLY = 'reply'
FIELD_LOG_FAILURE = 'log_failure'
FIELD_ID = 'id'
FIELD_MSG_ID = 'message_id'
FIELD_MSG_TYPE = 'msg_type'
FIELD_REPLY_ID = 'reply_id'
FIELD_TARGET = 'target'
IDX_REPLY_TYPE = 1
IDX_REPLY_BODY = 2
MULTIPART_IDX_ENVELOPE = 0
MULTIPART_IDX_BODY = 1
CALL_TYPE = 'call'
CAST_TYPE = 'cast'
@ -28,6 +41,9 @@ CAST_FANOUT_TYPE = 'cast-f'
NOTIFY_TYPE = 'notify'
NOTIFY_FANOUT_TYPE = 'notify-f'
REPLY_TYPE = 'reply'
ACK_TYPE = 'ack'
MESSAGE_TYPES = (CALL_TYPE,
CAST_TYPE,
CAST_FANOUT_TYPE,

View File

@ -17,6 +17,8 @@ import logging
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE
from oslo_messaging import exceptions
LOG = logging.getLogger(__name__)
@ -45,6 +47,9 @@ class ZmqSocket(object):
def setsockopt(self, *args, **kwargs):
self.handle.setsockopt(*args, **kwargs)
def setsockopt_string(self, *args, **kwargs):
self.handle.setsockopt_string(*args, **kwargs)
def send(self, *args, **kwargs):
self.handle.send(*args, **kwargs)
@ -57,6 +62,9 @@ class ZmqSocket(object):
def send_pyobj(self, *args, **kwargs):
self.handle.send_pyobj(*args, **kwargs)
def send_multipart(self, *args, **kwargs):
self.handle.send_multipart(*args, **kwargs)
def recv(self, *args, **kwargs):
return self.handle.recv(*args, **kwargs)
@ -69,14 +77,30 @@ class ZmqSocket(object):
def recv_pyobj(self, *args, **kwargs):
return self.handle.recv_pyobj(*args, **kwargs)
def recv_multipart(self, *args, **kwargs):
return self.handle.recv_multipart(*args, **kwargs)
def close(self, *args, **kwargs):
self.handle.close(*args, **kwargs)
class ZmqPortRangeExceededException(exceptions.MessagingException):
"""Raised by ZmqRandomPortSocket - wrapping zmq.ZMQBindError"""
class ZmqRandomPortSocket(ZmqSocket):
def __init__(self, conf, context, socket_type):
super(ZmqRandomPortSocket, self).__init__(context, socket_type)
self.conf = conf
self.bind_address = zmq_address.get_tcp_random_address(self.conf)
self.port = self.handle.bind_to_random_port(self.bind_address)
try:
self.port = self.handle.bind_to_random_port(
self.bind_address,
min_port=conf.rpc_zmq_min_port,
max_port=conf.rpc_zmq_max_port,
max_tries=conf.rpc_zmq_bind_port_retries)
except zmq.ZMQBindError:
LOG.error(_LE("Random ports range exceeded!"))
raise ZmqPortRangeExceededException()

View File

@ -14,28 +14,57 @@
# under the License.
import futurist
import threading
from oslo_messaging._executors import impl_pooledexecutor
from oslo_utils import timeutils
class FakeBlockingThread(object):
'''A minimal implementation of threading.Thread which does not create a
thread or start executing the target when start() is called. Instead, the
caller must explicitly execute the non-blocking thread.execute() method
after start() has been called.
'''
def __init__(self, target):
self._target = target
self._running = False
self._running_cond = threading.Condition()
def start(self):
self._target()
if self._running:
# Not a user error. No need to translate.
raise RuntimeError('FakeBlockingThread already started')
@staticmethod
def join(timeout=None):
pass
with self._running_cond:
self._running = True
self._running_cond.notify_all()
@staticmethod
def stop():
pass
def join(self, timeout=None):
with timeutils.StopWatch(duration=timeout) as w, self._running_cond:
while self._running:
self._running_cond.wait(w.leftover(return_none=True))
@staticmethod
def is_alive():
return False
# Thread.join() does not raise an exception on timeout. It is
# the caller's responsibility to check is_alive().
if w.expired():
return
def is_alive(self):
return self._running
def execute(self):
if not self._running:
# Not a user error. No need to translate.
raise RuntimeError('FakeBlockingThread not started')
try:
self._target()
finally:
with self._running_cond:
self._running = False
self._running_cond.notify_all()
class BlockingExecutor(impl_pooledexecutor.PooledExecutor):
@ -52,3 +81,22 @@ class BlockingExecutor(impl_pooledexecutor.PooledExecutor):
_executor_cls = lambda __, ___: futurist.SynchronousExecutor()
_thread_cls = FakeBlockingThread
def __init__(self, *args, **kwargs):
super(BlockingExecutor, self).__init__(*args, **kwargs)
def execute(self):
'''Explicitly run the executor in the current context.'''
# NOTE(mdbooth): Splitting start into start and execute for the
# blocking executor closes a potential race. On a non-blocking
# executor, calling start performs some initialisation synchronously
# before starting the executor and returning control to the caller. In
# the non-blocking caller there was no externally visible boundary
# between the completion of initialisation and the start of execution,
# meaning the caller cannot indicate to another thread that
# initialisation is complete. With the split, the start call for the
# blocking executor becomes analogous to the non-blocking case,
# indicating that initialisation is complete. The caller can then
# synchronously call execute.
if self._poller is not None:
self._poller.execute()

View File

@ -33,7 +33,7 @@ _pool_opts = [
class PooledExecutor(base.ExecutorBase):
"""A message executor which integrates with some async executor.
"""A message executor which integrates with some executor.
This will create a message thread that polls for messages from a
dispatching thread and on reception of an incoming message places the
@ -93,8 +93,11 @@ class PooledExecutor(base.ExecutorBase):
@excutils.forever_retry_uncaught_exceptions
def _runner(self):
while not self._tombstone.is_set():
incoming = self.listener.poll()
if incoming is None:
incoming = self.listener.poll(
timeout=self.dispatcher.batch_timeout,
prefetch_size=self.dispatcher.batch_size)
if not incoming:
continue
callback = self.dispatcher(incoming, self._executor_callback)
was_submitted = self._do_submit(callback)

View File

@ -46,57 +46,6 @@ def version_is_compatible(imp_version, version):
return True
class DispatcherExecutorContext(object):
"""Dispatcher executor context helper
A dispatcher can have work to do before and after the dispatch of the
request in the main server thread while the dispatcher itself can be
done in its own thread.
The executor can use the helper like this:
callback = dispatcher(incoming)
callback.prepare()
thread = MyWhateverThread()
thread.on_done(callback.done)
thread.run(callback.run)
"""
def __init__(self, incoming, dispatch, executor_callback=None,
post=None):
self._result = None
self._incoming = incoming
self._dispatch = dispatch
self._post = post
self._executor_callback = executor_callback
def run(self):
"""The incoming message dispath itself
Can be run in an other thread/greenlet/corotine if the executor is
able to do it.
"""
try:
self._result = self._dispatch(self._incoming,
self._executor_callback)
except Exception:
msg = 'The dispatcher method must catches all exceptions'
LOG.exception(msg)
raise RuntimeError(msg)
def done(self):
"""Callback after the incoming message have been dispathed
Should be ran in the main executor thread/greenlet/corotine
"""
# FIXME(sileht): this is not currently true, this works only because
# the driver connection used for polling write on the wire only to
# ack/requeue message, but what if one day, the driver do something
# else
if self._post is not None:
self._post(self._incoming, self._result)
def fetch_current_thread_functor():
# Until https://github.com/eventlet/eventlet/issues/172 is resolved
# or addressed we have to use complicated workaround to get a object
@ -116,29 +65,6 @@ def fetch_current_thread_functor():
return lambda: threading.current_thread()
class DummyCondition(object):
def acquire(self):
pass
def notify(self):
pass
def notify_all(self):
pass
def wait(self, timeout=None):
pass
def release(self):
pass
def __enter__(self):
self.acquire()
def __exit__(self, type, value, traceback):
self.release()
class DummyLock(object):
def acquire(self):
pass

View File

@ -18,6 +18,7 @@ __all__ = ['ConfFixture']
import sys
import fixtures
from functools import wraps
def _import_opts(conf, module, opts, group=None):
@ -50,9 +51,6 @@ class ConfFixture(fixtures.Fixture):
_import_opts(self.conf,
'oslo_messaging._drivers.amqp', 'amqp_opts',
'oslo_messaging_rabbit')
_import_opts(self.conf,
'oslo_messaging._drivers.impl_qpid', 'qpid_opts',
'oslo_messaging_qpid')
_import_opts(self.conf,
'oslo_messaging._drivers.amqp', 'amqp_opts',
'oslo_messaging_qpid')
@ -69,15 +67,63 @@ class ConfFixture(fixtures.Fixture):
_import_opts(self.conf, 'oslo_messaging.rpc.client', '_client_opts')
_import_opts(self.conf, 'oslo_messaging.transport', '_transport_opts')
_import_opts(self.conf,
'oslo_messaging.notify.notifier', '_notifier_opts')
'oslo_messaging.notify.notifier',
'_notifier_opts',
'oslo_messaging_notifications')
def _setup_decorator(self):
# Support older test cases that still use the set_override
# with the old config key names
def decorator_for_set_override(wrapped_function):
@wraps(wrapped_function)
def _wrapper(*args, **kwargs):
group = 'oslo_messaging_notifications'
if args[0] == 'notification_driver':
args = ('driver', args[1], group)
elif args[0] == 'notification_transport_url':
args = ('transport_url', args[1], group)
elif args[0] == 'notification_topics':
args = ('topics', args[1], group)
return wrapped_function(*args, **kwargs)
_wrapper.wrapped = wrapped_function
return _wrapper
def decorator_for_clear_override(wrapped_function):
@wraps(wrapped_function)
def _wrapper(*args, **kwargs):
group = 'oslo_messaging_notifications'
if args[0] == 'notification_driver':
args = ('driver', group)
elif args[0] == 'notification_transport_url':
args = ('transport_url', group)
elif args[0] == 'notification_topics':
args = ('topics', group)
return wrapped_function(*args, **kwargs)
_wrapper.wrapped = wrapped_function
return _wrapper
if not hasattr(self.conf.set_override, 'wrapped'):
self.conf.set_override = decorator_for_set_override(
self.conf.set_override)
if not hasattr(self.conf.clear_override, 'wrapped'):
self.conf.clear_override = decorator_for_clear_override(
self.conf.clear_override)
def _teardown_decorator(self):
if hasattr(self.conf.set_override, 'wrapped'):
self.conf.set_override = self.conf.set_override.wrapped
if hasattr(self.conf.clear_override, 'wrapped'):
self.conf.clear_override = self.conf.clear_override.wrapped
def setUp(self):
super(ConfFixture, self).setUp()
self._setup_decorator()
self.addCleanup(self._teardown_decorator)
self.addCleanup(self.conf.reset)
@property
def transport_driver(self):
"""The transport driver - for example 'rabbit', 'qpid' or 'fake'."""
"""The transport driver - for example 'rabbit', 'amqp' or 'fake'."""
return self.conf.rpc_backend
@transport_driver.setter

View File

@ -0,0 +1,111 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
import six
__all__ = [
"DispatcherBase",
"DispatcherExecutorContext"
]
LOG = logging.getLogger(__name__)
class DispatcherExecutorContext(object):
"""Dispatcher executor context helper
A dispatcher can have work to do before and after the dispatch of the
request in the main server thread while the dispatcher itself can be
done in its own thread.
The executor can use the helper like this:
callback = dispatcher(incoming)
callback.prepare()
thread = MyWhateverThread()
thread.on_done(callback.done)
thread.run(callback.run)
"""
def __init__(self, incoming, dispatch, executor_callback=None,
post=None):
self._result = None
self._incoming = incoming
self._dispatch = dispatch
self._post = post
self._executor_callback = executor_callback
def run(self):
"""The incoming message dispath itself
Can be run in an other thread/greenlet/corotine if the executor is
able to do it.
"""
try:
self._result = self._dispatch(self._incoming,
self._executor_callback)
except Exception:
msg = 'The dispatcher method must catches all exceptions'
LOG.exception(msg)
raise RuntimeError(msg)
def done(self):
"""Callback after the incoming message have been dispathed
Should be ran in the main executor thread/greenlet/corotine
"""
# FIXME(sileht): this is not currently true, this works only because
# the driver connection used for polling write on the wire only to
# ack/requeue message, but what if one day, the driver do something
# else
if self._post is not None:
self._post(self._incoming, self._result)
@six.add_metaclass(abc.ABCMeta)
class DispatcherBase(object):
"Base class for dispatcher"
batch_size = 1
"Number of messages to wait before calling endpoints callacks"
batch_timeout = None
"Number of seconds to wait before calling endpoints callacks"
@abc.abstractmethod
def _listen(self, transport):
"""Initiate the driver Listener
Usualy the driver Listener is start with the transport helper methods:
* transport._listen()
* transport._listen_for_notifications()
:param transport: the transport object
:type transport: oslo_messaging.transport.Transport
:returns: a driver Listener object
:rtype: oslo_messaging._drivers.base.Listener
"""
@abc.abstractmethod
def __call__(self, incoming, executor_callback=None):
"""Called by the executor to get the DispatcherExecutorContext
:param incoming: list of messages
:type incoming: oslo_messging._drivers.base.IncomingMessage
:returns: DispatcherExecutorContext
:rtype: DispatcherExecutorContext
"""

View File

@ -15,7 +15,9 @@
__all__ = ['Notifier',
'LoggingNotificationHandler',
'get_notification_transport',
'get_notification_listener',
'get_batch_notification_listener',
'NotificationResult',
'NotificationFilter',
'PublishErrorsHandler',

View File

@ -16,6 +16,7 @@
# under the License.
import logging
import warnings
from oslo_serialization import jsonutils
from oslo_utils import strutils
@ -40,3 +41,7 @@ class LogDriver(notifier.Driver):
method = getattr(logger, priority.lower(), None)
if method:
method(strutils.mask_password(jsonutils.dumps(message)))
else:
warnings.warn('Unable to log message as notify cannot find a '
'logger with the priority specified '
'%s' % priority.lower())

View File

@ -0,0 +1,24 @@
# Copyright 2015 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from debtcollector import removals
from oslo_messaging.notify.messaging import * # noqa
# NOTE(mriedem): removal depends on how we can cap requirements in
# stable/liberty such that neutron does not try to load this
removals.removed_module(__name__,
oslo_messaging.notify.messaging.__name__,
removal_version='?')

View File

@ -27,11 +27,13 @@ from oslo_messaging.notify import notifier
LOG = logging.getLogger(__name__)
router_config = cfg.StrOpt('routing_notifier_config', default='',
router_config = cfg.StrOpt('routing_config', default='',
deprecated_group='DEFAULT',
deprecated_name='routing_notifier_config',
help='RoutingNotifier configuration file location.')
CONF = cfg.CONF
CONF.register_opt(router_config)
CONF.register_opt(router_config, group='oslo_messaging_notifications')
class RoutingDriver(notifier.Driver):
@ -56,12 +58,12 @@ class RoutingDriver(notifier.Driver):
"""One-time load of notifier config file."""
self.routing_groups = {}
self.used_drivers = set()
filename = CONF.routing_notifier_config
filename = CONF.oslo_messaging_notifications.routing_config
if not filename:
return
# Infer which drivers are used from the config file.
self.routing_groups = yaml.load(
self.routing_groups = yaml.safe_load(
self._get_notifier_config_file(filename))
if not self.routing_groups:
self.routing_groups = {} # In case we got None from load()

View File

@ -16,9 +16,10 @@
import itertools
import logging
import sys
from oslo_messaging import _utils as utils
import six
from oslo_messaging import dispatcher
from oslo_messaging import localcontext
from oslo_messaging import serializer as msg_serializer
@ -33,17 +34,7 @@ class NotificationResult(object):
REQUEUE = 'requeue'
class NotificationDispatcher(object):
"""A message dispatcher which understands Notification messages.
A MessageHandlingServer is constructed by passing a callable dispatcher
which is invoked with context and message dictionaries each time a message
is received.
NotifcationDispatcher is one such dispatcher which pass a raw notification
message to the endpoints
"""
class _NotificationDispatcherBase(dispatcher.DispatcherBase):
def __init__(self, targets, endpoints, serializer, allow_requeue,
pool=None):
self.targets = targets
@ -65,21 +56,25 @@ class NotificationDispatcher(object):
priorities))
def _listen(self, transport):
transport._require_driver_features(requeue=self.allow_requeue)
return transport._listen_for_notifications(self._targets_priorities,
pool=self.pool)
def __call__(self, incoming, executor_callback=None):
return utils.DispatcherExecutorContext(
return dispatcher.DispatcherExecutorContext(
incoming, self._dispatch_and_handle_error,
executor_callback=executor_callback,
post=self._post_dispatch)
@staticmethod
def _post_dispatch(incoming, result):
if result == NotificationResult.HANDLED:
incoming.acknowledge()
else:
incoming.requeue()
def _post_dispatch(self, incoming, requeues):
for m in incoming:
try:
if requeues and m in requeues:
m.requeue()
else:
m.acknowledge()
except Exception:
LOG.error("Fail to ack/requeue message", exc_info=True)
def _dispatch_and_handle_error(self, incoming, executor_callback):
"""Dispatch a notification message to the appropriate endpoint method.
@ -88,24 +83,59 @@ class NotificationDispatcher(object):
:type ctxt: IncomingMessage
"""
try:
return self._dispatch(incoming.ctxt, incoming.message,
executor_callback)
return self._dispatch(incoming, executor_callback)
except Exception:
# sys.exc_info() is deleted by LOG.exception().
exc_info = sys.exc_info()
LOG.error('Exception during message handling',
exc_info=exc_info)
return NotificationResult.HANDLED
LOG.error('Exception during message handling', exc_info=True)
def _dispatch(self, ctxt, message, executor_callback=None):
"""Dispatch an RPC message to the appropriate endpoint method.
:param ctxt: the request context
:type ctxt: dict
:param message: the message payload
:type message: dict
def _dispatch(self, incoming, executor_callback=None):
"""Dispatch notification messages to the appropriate endpoint method.
"""
ctxt = self.serializer.deserialize_context(ctxt)
messages_grouped = itertools.groupby((
self._extract_user_message(m)
for m in incoming), lambda x: x[0])
requeues = set()
for priority, messages in messages_grouped:
__, raw_messages, messages = six.moves.zip(*messages)
raw_messages = list(raw_messages)
messages = list(messages)
if priority not in PRIORITIES:
LOG.warning('Unknown priority "%s"', priority)
continue
for screen, callback in self._callbacks_by_priority.get(priority,
[]):
if screen:
filtered_messages = [message for message in messages
if screen.match(
message["ctxt"],
message["publisher_id"],
message["event_type"],
message["metadata"],
message["payload"])]
else:
filtered_messages = messages
if not filtered_messages:
continue
ret = self._exec_callback(executor_callback, callback,
filtered_messages)
if self.allow_requeue and ret == NotificationResult.REQUEUE:
requeues.update(raw_messages)
break
return requeues
def _exec_callback(self, executor_callback, callback, *args):
if executor_callback:
ret = executor_callback(callback, *args)
else:
ret = callback(*args)
return NotificationResult.HANDLED if ret is None else ret
def _extract_user_message(self, incoming):
ctxt = self.serializer.deserialize_context(incoming.ctxt)
message = incoming.message
publisher_id = message.get('publisher_id')
event_type = message.get('event_type')
@ -114,28 +144,50 @@ class NotificationDispatcher(object):
'timestamp': message.get('timestamp')
}
priority = message.get('priority', '').lower()
if priority not in PRIORITIES:
LOG.warning('Unknown priority "%s"', priority)
return
payload = self.serializer.deserialize_entity(ctxt,
message.get('payload'))
return priority, incoming, dict(ctxt=ctxt,
publisher_id=publisher_id,
event_type=event_type,
payload=payload,
metadata=metadata)
for screen, callback in self._callbacks_by_priority.get(priority, []):
if screen and not screen.match(ctxt, publisher_id, event_type,
metadata, payload):
continue
localcontext._set_local_context(ctxt)
try:
if executor_callback:
ret = executor_callback(callback, ctxt, publisher_id,
event_type, payload, metadata)
else:
ret = callback(ctxt, publisher_id, event_type, payload,
metadata)
ret = NotificationResult.HANDLED if ret is None else ret
if self.allow_requeue and ret == NotificationResult.REQUEUE:
return ret
finally:
localcontext._clear_local_context()
return NotificationResult.HANDLED
class NotificationDispatcher(_NotificationDispatcherBase):
"""A message dispatcher which understands Notification messages.
A MessageHandlingServer is constructed by passing a callable dispatcher
which is invoked with context and message dictionaries each time a message
is received.
"""
def _exec_callback(self, executor_callback, callback, messages):
localcontext._set_local_context(
messages[0]["ctxt"])
try:
return super(NotificationDispatcher, self)._exec_callback(
executor_callback, callback,
messages[0]["ctxt"],
messages[0]["publisher_id"],
messages[0]["event_type"],
messages[0]["payload"],
messages[0]["metadata"])
finally:
localcontext._clear_local_context()
class BatchNotificationDispatcher(_NotificationDispatcherBase):
"""A message dispatcher which understands Notification messages.
A MessageHandlingServer is constructed by passing a callable dispatcher
which is invoked with a list of message dictionaries each time 'batch_size'
messages are received or 'batch_timeout' seconds is reached.
"""
def __init__(self, targets, endpoints, serializer, allow_requeue,
pool=None, batch_size=None, batch_timeout=None):
super(BatchNotificationDispatcher, self).__init__(targets, endpoints,
serializer,
allow_requeue,
pool)
self.batch_size = batch_size
self.batch_timeout = batch_timeout

View File

@ -19,12 +19,13 @@ contain a set of methods. Each method corresponds to a notification priority.
To create a notification listener, you supply a transport, list of targets and
a list of endpoints.
A transport can be obtained simply by calling the get_transport() method::
A transport can be obtained simply by calling the get_notification_transport()
method::
transport = messaging.get_transport(conf)
transport = messaging.get_notification_transport(conf)
which will load the appropriate transport driver according to the user's
messaging configuration. See get_transport() for more details.
messaging configuration. See get_notification_transport() for more details.
The target supplied when creating a notification listener expresses the topic
and - optionally - the exchange to listen on. See Target for more details
@ -56,7 +57,7 @@ A simple example of a notification listener with multiple endpoints might be::
def error(self, ctxt, publisher_id, event_type, payload, metadata):
do_something(payload)
transport = oslo_messaging.get_transport(cfg.CONF)
transport = oslo_messaging.get_notification_transport(cfg.CONF)
targets = [
oslo_messaging.Target(topic='notifications')
oslo_messaging.Target(topic='notifications_bis')
@ -136,8 +137,49 @@ def get_notification_listener(transport, targets, endpoints,
:type pool: str
:raises: NotImplementedError
"""
transport._require_driver_features(requeue=allow_requeue)
dispatcher = notify_dispatcher.NotificationDispatcher(targets, endpoints,
serializer,
allow_requeue, pool)
return msg_server.MessageHandlingServer(transport, dispatcher, executor)
def get_batch_notification_listener(transport, targets, endpoints,
executor='blocking', serializer=None,
allow_requeue=False, pool=None,
batch_size=None, batch_timeout=None):
"""Construct a batch notification listener
The executor parameter controls how incoming messages will be received and
dispatched. By default, the most simple executor is used - the blocking
executor.
If the eventlet executor is used, the threading and time library need to be
monkeypatched.
:param transport: the messaging transport
:type transport: Transport
:param targets: the exchanges and topics to listen on
:type targets: list of Target
:param endpoints: a list of endpoint objects
:type endpoints: list
:param executor: name of a message executor - for example
'eventlet', 'blocking'
:type executor: str
:param serializer: an optional entity serializer
:type serializer: Serializer
:param allow_requeue: whether NotificationResult.REQUEUE support is needed
:type allow_requeue: bool
:param pool: the pool name
:type pool: str
:param batch_size: number of messages to wait before calling
endpoints callacks
:type batch_size: int
:param batch_timeout: number of seconds to wait before calling
endpoints callacks
:type batch_timeout: int
:raises: NotImplementedError
"""
dispatcher = notify_dispatcher.BatchNotificationDispatcher(
targets, endpoints, serializer, allow_requeue, pool,
batch_size, batch_timeout)
return msg_server.MessageHandlingServer(transport, dispatcher, executor)

View File

@ -21,14 +21,15 @@ class LoggingErrorNotificationHandler(logging.Handler):
# at runtime.
import oslo_messaging
logging.Handler.__init__(self, *args, **kwargs)
self._transport = oslo_messaging.get_transport(cfg.CONF)
self._transport = oslo_messaging.get_notification_transport(cfg.CONF)
self._notifier = oslo_messaging.Notifier(
self._transport,
publisher_id='error.publisher')
def emit(self, record):
conf = self._transport.conf
# NOTE(bnemec): Notifier registers this opt with the transport.
if ('log' in self._transport.conf.notification_driver):
if ('log' in conf.oslo_messaging_notifications.driver):
# NOTE(lbragstad): If we detect that log is one of the
# notification drivers, then return. This protects from infinite
# recursion where something bad happens, it gets logged, the log

View File

@ -19,7 +19,6 @@ import logging
from oslo_config import cfg
from oslo_messaging.notify import notifier
from oslo_messaging import transport
class LoggingNotificationHandler(logging.Handler):
@ -34,7 +33,7 @@ class LoggingNotificationHandler(logging.Handler):
[handler_notifier]
class=oslo_messaging.LoggingNotificationHandler
level=ERROR
args=('qpid:///')
args=('rabbit:///')
"""
@ -47,7 +46,7 @@ class LoggingNotificationHandler(logging.Handler):
def __init__(self, url, publisher_id=None, driver=None,
topic=None, serializer=None):
self.notifier = notifier.Notifier(
transport.get_transport(self.CONF, url),
notifier.get_notification_transport(self.CONF, url),
publisher_id, driver,
topic,
serializer() if serializer else None)

View File

@ -22,7 +22,6 @@ import sys
import traceback as tb
from oslo_config import cfg
from oslo_context import context
from oslo_middleware import base
import six
import webob.dec
@ -59,7 +58,8 @@ class RequestNotifier(base.Middleware):
def __init__(self, app, **conf):
self.notifier = notify.Notifier(
oslo_messaging.get_transport(cfg.CONF, conf.get('url')),
oslo_messaging.get_notification_transport(cfg.CONF,
conf.get('url')),
publisher_id=conf.get('publisher_id',
os.path.basename(sys.argv[0])))
self.service_name = conf.get('service_name')
@ -84,7 +84,7 @@ class RequestNotifier(base.Middleware):
'request': self.environ_to_dict(request.environ),
}
self.notifier.info(context.get_admin_context(),
self.notifier.info({},
'http.request',
payload)
@ -107,7 +107,7 @@ class RequestNotifier(base.Middleware):
'traceback': tb.format_tb(traceback)
}
self.notifier.info(context.get_admin_context(),
self.notifier.info({},
'http.response',
payload)

View File

@ -25,17 +25,30 @@ import six
from stevedore import named
from oslo_messaging import serializer as msg_serializer
from oslo_messaging import transport as msg_transport
_notifier_opts = [
cfg.MultiStrOpt('notification_driver',
cfg.MultiStrOpt('driver',
default=[],
deprecated_name='notification_driver',
deprecated_group='DEFAULT',
help='The Drivers(s) to handle sending notifications. '
'Possible values are messaging, messagingv2, '
'routing, log, test, noop'),
cfg.ListOpt('notification_topics',
cfg.StrOpt('transport_url',
deprecated_name='notification_transport_url',
deprecated_group='DEFAULT',
help='A URL representing the messaging driver to use for '
'notifications. If not set, we fall back to the same '
'configuration used for RPC.'),
cfg.ListOpt('topics',
default=['notifications', ],
deprecated_name='topics',
deprecated_group='rpc_notifier2',
deprecated_opts=[
cfg.DeprecatedOpt('topics',
group='rpc_notifier2'),
cfg.DeprecatedOpt('notification_topics',
group='DEFAULT')
],
help='AMQP topic used for OpenStack notifications.'),
]
@ -75,6 +88,16 @@ class Driver(object):
pass
def get_notification_transport(conf, url=None,
allowed_remote_exmods=None, aliases=None):
conf.register_opts(_notifier_opts,
group='oslo_messaging_notifications')
if url is None:
url = conf.oslo_messaging_notifications.transport_url
return msg_transport.get_transport(conf, url,
allowed_remote_exmods, aliases)
class Notifier(object):
"""Send notification messages.
@ -94,16 +117,18 @@ class Notifier(object):
A Notifier object can be instantiated with a transport object and a
publisher ID:
notifier = messaging.Notifier(get_transport(CONF), 'compute')
notifier = messaging.Notifier(get_notification_transport(CONF),
'compute')
and notifications are sent via drivers chosen with the notification_driver
config option and on the topics chosen with the notification_topics config
option.
and notifications are sent via drivers chosen with the driver
config option and on the topics chosen with the topics config
option in [oslo_messaging_notifications] section.
Alternatively, a Notifier object can be instantiated with a specific
driver or topic::
notifier = notifier.Notifier(RPC_TRANSPORT,
transport = notifier.get_notification_transport(CONF)
notifier = notifier.Notifier(transport,
'compute.host',
driver='messaging',
topic='notifications')
@ -138,24 +163,26 @@ class Notifier(object):
N means N retries
:type retry: int
"""
transport.conf.register_opts(_notifier_opts)
conf = transport.conf
conf.register_opts(_notifier_opts,
group='oslo_messaging_notifications')
self.transport = transport
self.publisher_id = publisher_id
self.retry = retry
self._driver_names = ([driver] if driver is not None
else transport.conf.notification_driver)
self._driver_names = ([driver] if driver is not None else
conf.oslo_messaging_notifications.driver)
self._topics = ([topic] if topic is not None
else transport.conf.notification_topics)
self._topics = ([topic] if topic is not None else
conf.oslo_messaging_notifications.topics)
self._serializer = serializer or msg_serializer.NoOpSerializer()
self._driver_mgr = named.NamedExtensionManager(
'oslo.messaging.notify.drivers',
names=self._driver_names,
invoke_on_load=True,
invoke_args=[transport.conf],
invoke_args=[conf],
invoke_kwds={
'topics': self._topics,
'transport': self.transport,

View File

@ -22,7 +22,6 @@ import itertools
from oslo_messaging._drivers import amqp
from oslo_messaging._drivers import base as drivers_base
from oslo_messaging._drivers import impl_qpid
from oslo_messaging._drivers import impl_rabbit
from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers.protocols.amqp import opts as amqp_opts
@ -48,8 +47,6 @@ _opts = [
('oslo_messaging_amqp', amqp_opts.amqp1_opts),
('oslo_messaging_rabbit', list(itertools.chain(amqp.amqp_opts,
impl_rabbit.rabbit_opts))),
('oslo_messaging_qpid', list(itertools.chain(amqp.amqp_opts,
impl_qpid.qpid_opts)))
]

View File

@ -229,7 +229,7 @@ class RPCClient(object):
class TestClient(object):
def __init__(self, transport):
target = messaging.Target(topic='testtopic', version='2.0')
target = messaging.Target(topic='test', version='2.0')
self._client = messaging.RPCClient(transport, target)
def test(self, ctxt, arg):
@ -254,7 +254,7 @@ class RPCClient(object):
For example::
transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic='testtopic', version='2.0')
target = messaging.Target(topic='test', version='2.0')
client = messaging.RPCClient(transport, target)
client.call(ctxt, 'test', arg=arg)
@ -356,6 +356,10 @@ class RPCClient(object):
Similarly, the request context must be a dict unless the client's
serializer supports serializing another type.
Note: cast doesn't ensure the remote method to be been executed
on each destination. But ensures that it will be not executed twice
on a destination.
:param ctxt: a request context dict
:type ctxt: dict
:param method: the method name
@ -392,6 +396,12 @@ class RPCClient(object):
allowed_remote_exmods list, then a messaging.RemoteError exception is
raised with all details of the remote exception.
Note: call is done 'at-most-once'. In case of we can't known
if the call have been done correctly, because we didn't get the
response on time, MessagingTimeout exception is raised.
The real reason can vary, transport failure, worker
doesn't answer in time or crash, ...
:param ctxt: a request context dict
:type ctxt: dict
:param method: the method name

View File

@ -31,6 +31,7 @@ import six
from oslo_messaging._i18n import _LE
from oslo_messaging import _utils as utils
from oslo_messaging import dispatcher
from oslo_messaging import localcontext
from oslo_messaging import serializer as msg_serializer
from oslo_messaging import server as msg_server
@ -75,7 +76,7 @@ class UnsupportedVersion(RPCDispatcherError):
self.method = method
class RPCDispatcher(object):
class RPCDispatcher(dispatcher.DispatcherBase):
"""A message dispatcher which understands RPC messages.
A MessageHandlingServer is constructed by passing a callable dispatcher
@ -130,9 +131,9 @@ class RPCDispatcher(object):
return self.serializer.serialize_entity(ctxt, result)
def __call__(self, incoming, executor_callback=None):
incoming.acknowledge()
return utils.DispatcherExecutorContext(
incoming, self._dispatch_and_reply,
incoming[0].acknowledge()
return dispatcher.DispatcherExecutorContext(
incoming[0], self._dispatch_and_reply,
executor_callback=executor_callback)
def _dispatch_and_reply(self, incoming, executor_callback):
@ -145,7 +146,9 @@ class RPCDispatcher(object):
e.exc_info[1])
incoming.reply(failure=e.exc_info, log_failure=False)
except Exception as e:
# sys.exc_info() is deleted by LOG.exception().
# current sys.exc_info() content can be overriden
# by another exception raise by a log handler during
# LOG.exception(). So keep a copy and delete it later.
exc_info = sys.exc_info()
LOG.error(_LE('Exception during message handling: %s'), e,
exc_info=exc_info)

View File

@ -44,6 +44,7 @@ A simple example of an RPC server with multiple endpoints might be::
from oslo_config import cfg
import oslo_messaging
import time
class ServerControlEndpoint(object):
@ -54,7 +55,7 @@ A simple example of an RPC server with multiple endpoints might be::
self.server = server
def stop(self, ctx):
if server:
if self.server:
self.server.stop()
class TestEndpoint(object):
@ -70,7 +71,14 @@ A simple example of an RPC server with multiple endpoints might be::
]
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
executor='blocking')
server.start()
try:
server.start()
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping server")
server.stop()
server.wait()
Clients can invoke methods on the server by sending the request to a topic and

View File

@ -19,6 +19,7 @@ __all__ = ['Serializer', 'NoOpSerializer', 'JsonPayloadSerializer',
import abc
from debtcollector import removals
from oslo_context import context as common_context
from oslo_serialization import jsonutils
import six
@ -63,6 +64,7 @@ class Serializer(object):
"""
@removals.remove(version="2.9", removal_version="3.0")
class RequestContextSerializer(Serializer):
def __init__(self, base):

View File

@ -23,20 +23,25 @@ __all__ = [
'ServerListenError',
]
import functools
import inspect
import logging
import threading
import traceback
from oslo_service import service
from oslo_utils import timeutils
from stevedore import driver
from oslo_messaging._drivers import base as driver_base
from oslo_messaging._i18n import _LW
from oslo_messaging import _utils
from oslo_messaging import exceptions
LOG = logging.getLogger(__name__)
# The default number of seconds of waiting after which we will emit a log
# message
DEFAULT_LOG_AFTER = 30
class MessagingServerError(exceptions.MessagingException):
"""Base class for all MessageHandlingServer exceptions."""
@ -62,7 +67,223 @@ class ServerListenError(MessagingServerError):
self.ex = ex
class MessageHandlingServer(service.ServiceBase):
class TaskTimeout(MessagingServerError):
"""Raised if we timed out waiting for a task to complete."""
class _OrderedTask(object):
"""A task which must be executed in a particular order.
A caller may wait for this task to complete by calling
`wait_for_completion`.
A caller may run this task with `run_once`, which will ensure that however
many times the task is called it only runs once. Simultaneous callers will
block until the running task completes, which means that any caller can be
sure that the task has completed after run_once returns.
"""
INIT = 0 # The task has not yet started
RUNNING = 1 # The task is running somewhere
COMPLETE = 2 # The task has run somewhere
def __init__(self, name):
"""Create a new _OrderedTask.
:param name: The name of this task. Used in log messages.
"""
super(_OrderedTask, self).__init__()
self._name = name
self._cond = threading.Condition()
self._state = self.INIT
def _wait(self, condition, msg, log_after, timeout_timer):
"""Wait while condition() is true. Write a log message if condition()
has not become false within `log_after` seconds. Raise TaskTimeout if
timeout_timer expires while waiting.
"""
log_timer = None
if log_after != 0:
log_timer = timeutils.StopWatch(duration=log_after)
log_timer.start()
while condition():
if log_timer is not None and log_timer.expired():
LOG.warn('Possible hang: %s' % msg)
LOG.debug(''.join(traceback.format_stack()))
# Only log once. After than we wait indefinitely without
# logging.
log_timer = None
if timeout_timer is not None and timeout_timer.expired():
raise TaskTimeout(msg)
timeouts = []
if log_timer is not None:
timeouts.append(log_timer.leftover())
if timeout_timer is not None:
timeouts.append(timeout_timer.leftover())
wait = None
if timeouts:
wait = min(timeouts)
self._cond.wait(wait)
@property
def complete(self):
return self._state == self.COMPLETE
def wait_for_completion(self, caller, log_after, timeout_timer):
"""Wait until this task has completed.
:param caller: The name of the task which is waiting.
:param log_after: Emit a log message if waiting longer than `log_after`
seconds.
:param timeout_timer: Raise TaskTimeout if StopWatch object
`timeout_timer` expires while waiting.
"""
with self._cond:
msg = '%s is waiting for %s to complete' % (caller, self._name)
self._wait(lambda: not self.complete,
msg, log_after, timeout_timer)
def run_once(self, fn, log_after, timeout_timer):
"""Run a task exactly once. If it is currently running in another
thread, wait for it to complete. If it has already run, return
immediately without running it again.
:param fn: The task to run. It must be a callable taking no arguments.
It may optionally return another callable, which also takes
no arguments, which will be executed after completion has
been signaled to other threads.
:param log_after: Emit a log message if waiting longer than `log_after`
seconds.
:param timeout_timer: Raise TaskTimeout if StopWatch object
`timeout_timer` expires while waiting.
"""
with self._cond:
if self._state == self.INIT:
self._state = self.RUNNING
# Note that nothing waits on RUNNING, so no need to notify
# We need to release the condition lock before calling out to
# prevent deadlocks. Reacquire it immediately afterwards.
self._cond.release()
try:
post_fn = fn()
finally:
self._cond.acquire()
self._state = self.COMPLETE
self._cond.notify_all()
if post_fn is not None:
# Release the condition lock before calling out to prevent
# deadlocks. Reacquire it immediately afterwards.
self._cond.release()
try:
post_fn()
finally:
self._cond.acquire()
elif self._state == self.RUNNING:
msg = ('%s is waiting for another thread to complete'
% self._name)
self._wait(lambda: self._state == self.RUNNING,
msg, log_after, timeout_timer)
class _OrderedTaskRunner(object):
"""Mixin for a class which executes ordered tasks."""
def __init__(self, *args, **kwargs):
super(_OrderedTaskRunner, self).__init__(*args, **kwargs)
# Get a list of methods on this object which have the _ordered
# attribute
self._tasks = [name
for (name, member) in inspect.getmembers(self)
if inspect.ismethod(member) and
getattr(member, '_ordered', False)]
self.reset_states()
self._reset_lock = threading.Lock()
def reset_states(self):
# Create new task states for tasks in reset
self._states = {task: _OrderedTask(task) for task in self._tasks}
@staticmethod
def decorate_ordered(fn, state, after, reset_after):
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
# If the reset_after state has already completed, reset state so
# we can run again.
# NOTE(mdbooth): This is ugly and requires external locking to be
# deterministic when using multiple threads. Consider a thread that
# does: server.stop(), server.wait(). If another thread causes a
# reset between stop() and wait(), this will not have the intended
# behaviour. It is safe without external locking, if the caller
# instantiates a new object.
with self._reset_lock:
if (reset_after is not None and
self._states[reset_after].complete):
self.reset_states()
# Store the states we started with in case the state wraps on us
# while we're sleeping. We must wait and run_once in the same
# epoch. If the epoch ended while we were sleeping, run_once will
# safely do nothing.
states = self._states
log_after = kwargs.pop('log_after', DEFAULT_LOG_AFTER)
timeout = kwargs.pop('timeout', None)
timeout_timer = None
if timeout is not None:
timeout_timer = timeutils.StopWatch(duration=timeout)
timeout_timer.start()
# Wait for the given preceding state to complete
if after is not None:
states[after].wait_for_completion(state,
log_after, timeout_timer)
# Run this state
states[state].run_once(lambda: fn(self, *args, **kwargs),
log_after, timeout_timer)
return wrapper
def ordered(after=None, reset_after=None):
"""A method which will be executed as an ordered task. The method will be
called exactly once, however many times it is called. If it is called
multiple times simultaneously it will only be called once, but all callers
will wait until execution is complete.
If `after` is given, this method will not run until `after` has completed.
If `reset_after` is given and the target method has completed, allow this
task to run again by resetting all task states.
:param after: Optionally, the name of another `ordered` method. Wait for
the completion of `after` before executing this method.
:param reset_after: Optionally, the name of another `ordered` method. Reset
all states when calling this method if `reset_after`
has completed.
"""
def _ordered(fn):
# Set an attribute on the method so we can find it later
setattr(fn, '_ordered', True)
state = fn.__name__
return _OrderedTaskRunner.decorate_ordered(fn, state, after,
reset_after)
return _ordered
class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
"""Server for handling messages.
Connect a transport to a dispatcher that knows how to process the
@ -94,29 +315,20 @@ class MessageHandlingServer(service.ServiceBase):
self.dispatcher = dispatcher
self.executor = executor
# NOTE(sileht): we use a lock to protect the state change of the
# server, we don't want to call stop until the transport driver
# is fully started. Except for the blocking executor that have
# start() that doesn't return
if self.executor != "blocking":
self._state_cond = threading.Condition()
self._dummy_cond = False
else:
self._state_cond = _utils.DummyCondition()
self._dummy_cond = True
try:
mgr = driver.DriverManager('oslo.messaging.executors',
self.executor)
except RuntimeError as ex:
raise ExecutorLoadFailure(self.executor, ex)
else:
self._executor_cls = mgr.driver
self._executor = None
self._running = False
self._executor_cls = mgr.driver
self._executor_obj = None
self._started = False
super(MessageHandlingServer, self).__init__()
@ordered(reset_after='stop')
def start(self):
"""Start handling incoming messages.
@ -131,21 +343,30 @@ class MessageHandlingServer(service.ServiceBase):
choose to dispatch messages in a new thread, coroutine or simply the
current thread.
"""
if self._executor is not None:
return
with self._state_cond:
if self._executor is not None:
return
try:
listener = self.dispatcher._listen(self.transport)
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)
self._running = True
self._executor = self._executor_cls(self.conf, listener,
self.dispatcher)
self._executor.start()
self._state_cond.notify_all()
# Warn that restarting will be deprecated
if self._started:
LOG.warn('Restarting a MessageHandlingServer is inherently racy. '
'It is deprecated, and will become a noop in a future '
'release of oslo.messaging. If you need to restart '
'MessageHandlingServer you should instantiate a new '
'object.')
self._started = True
try:
listener = self.dispatcher._listen(self.transport)
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)
executor = self._executor_cls(self.conf, listener, self.dispatcher)
executor.start()
self._executor_obj = executor
if self.executor == 'blocking':
# N.B. This will be executed unlocked and unordered, so
# we can't rely on the value of self._executor_obj when this runs.
# We explicitly pass the local variable.
return lambda: executor.execute()
@ordered(after='start')
def stop(self):
"""Stop handling incoming messages.
@ -154,12 +375,9 @@ class MessageHandlingServer(service.ServiceBase):
some messages, and underlying driver resources associated to this
server are still in use. See 'wait' for more details.
"""
with self._state_cond:
if self._executor is not None:
self._running = False
self._executor.stop()
self._state_cond.notify_all()
self._executor_obj.stop()
@ordered(after='stop')
def wait(self):
"""Wait for message processing to complete.
@ -170,37 +388,12 @@ class MessageHandlingServer(service.ServiceBase):
Once it's finished, the underlying driver resources associated to this
server are released (like closing useless network connections).
"""
with self._state_cond:
if self._running:
LOG.warn(_LW("wait() should be called after stop() as it "
"waits for existing messages to finish "
"processing"))
w = timeutils.StopWatch()
w.start()
while self._running:
# NOTE(harlowja): 1.0 seconds was mostly chosen at
# random, but it seems like a reasonable value to
# use to avoid spamming the logs with to much
# information.
self._state_cond.wait(1.0)
if self._running and not self._dummy_cond:
LOG.warn(
_LW("wait() should have been called"
" after stop() as wait() waits for existing"
" messages to finish processing, it has"
" been %0.2f seconds and stop() still has"
" not been called"), w.elapsed())
executor = self._executor
self._executor = None
if executor is not None:
# We are the lucky calling thread to wait on the executor to
# actually finish.
try:
executor.wait()
finally:
# Close listener connection after processing all messages
executor.listener.cleanup()
executor = None
try:
self._executor_obj.wait()
finally:
# Close listener connection after processing all messages
self._executor_obj.listener.cleanup()
self._executor_obj = None
def reset(self):
"""Reset service.

View File

@ -0,0 +1,288 @@
# Copyright (C) 2015 Cisco Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import kafka
from kafka.common import KafkaError
import mock
import testscenarios
from testtools.testcase import unittest
import time
import oslo_messaging
from oslo_messaging._drivers import common as driver_common
from oslo_messaging._drivers import impl_kafka as kafka_driver
from oslo_messaging.tests import utils as test_utils
load_tests = testscenarios.load_tests_apply_scenarios
KAFKA_BROKER = 'localhost:9092'
KAFKA_BROKER_URL = 'kafka://localhost:9092'
def _is_kafka_service_running():
"""Checks whether the Kafka service is running or not"""
kafka_running = True
try:
broker = KAFKA_BROKER
kafka.KafkaClient(broker)
except KafkaError:
# Kafka service is not running.
kafka_running = False
return kafka_running
class TestKafkaDriverLoad(test_utils.BaseTestCase):
def setUp(self):
super(TestKafkaDriverLoad, self).setUp()
self.messaging_conf.transport_driver = 'kafka'
def test_driver_load(self):
transport = oslo_messaging.get_transport(self.conf)
self.assertIsInstance(transport._driver, kafka_driver.KafkaDriver)
class TestKafkaTransportURL(test_utils.BaseTestCase):
scenarios = [
('none', dict(url=None,
expected=[dict(host='localhost', port=9092)])),
('empty', dict(url='kafka:///',
expected=[dict(host='localhost', port=9092)])),
('host', dict(url='kafka://127.0.0.1',
expected=[dict(host='127.0.0.1', port=9092)])),
('port', dict(url='kafka://localhost:1234',
expected=[dict(host='localhost', port=1234)])),
]
def setUp(self):
super(TestKafkaTransportURL, self).setUp()
self.messaging_conf.transport_driver = 'kafka'
def test_transport_url(self):
transport = oslo_messaging.get_transport(self.conf, self.url)
self.addCleanup(transport.cleanup)
driver = transport._driver
conn = driver._get_connection(kafka_driver.PURPOSE_SEND)
self.assertEqual(self.expected[0]['host'], conn.host)
self.assertEqual(self.expected[0]['port'], conn.port)
class TestKafkaDriver(test_utils.BaseTestCase):
"""Unit Test cases to test the kafka driver
"""
def setUp(self):
super(TestKafkaDriver, self).setUp()
self.messaging_conf.transport_driver = 'kafka'
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
def test_send(self):
target = oslo_messaging.Target(topic="topic_test")
self.assertRaises(NotImplementedError,
self.driver.send, target, {}, {})
def test_send_notification(self):
target = oslo_messaging.Target(topic="topic_test")
with mock.patch.object(
kafka_driver.Connection, 'notify_send') as fake_send:
self.driver.send_notification(target, {}, {}, None)
self.assertEquals(1, len(fake_send.mock_calls))
def test_listen(self):
target = oslo_messaging.Target(topic="topic_test")
self.assertRaises(NotImplementedError, self.driver.listen, target)
class TestKafkaConnection(test_utils.BaseTestCase):
def setUp(self):
super(TestKafkaConnection, self).setUp()
self.messaging_conf.transport_driver = 'kafka'
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
@mock.patch.object(kafka_driver.Connection, '_send')
def test_notify(self, fake_send, fake_ensure_connection):
conn = self.driver._get_connection(kafka_driver.PURPOSE_SEND)
conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"},
{"fake_text": "fake_message_1"}, 10)
self.assertEqual(1, len(fake_send.mock_calls))
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
@mock.patch.object(kafka_driver.Connection, '_send')
def test_notify_with_retry(self, fake_send, fake_ensure_connection):
conn = self.driver._get_connection(kafka_driver.PURPOSE_SEND)
fake_send.side_effect = KafkaError("fake_exception")
conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"},
{"fake_text": "fake_message_2"}, 10)
self.assertEqual(10, len(fake_send.mock_calls))
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
@mock.patch.object(kafka_driver.Connection, '_parse_url')
def test_consume(self, fake_parse_url, fake_ensure_connection):
fake_message = {
"context": {"fake": "fake_context_1"},
"message": {"fake": "fake_message_1"}}
conn = kafka_driver.Connection(
self.conf, '', kafka_driver.PURPOSE_LISTEN)
conn.consumer = mock.MagicMock()
conn.consumer.fetch_messages = mock.MagicMock(
return_value=iter([json.dumps(fake_message)]))
self.assertEqual(fake_message, json.loads(conn.consume()[0]))
self.assertEqual(1, len(conn.consumer.fetch_messages.mock_calls))
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
@mock.patch.object(kafka_driver.Connection, '_parse_url')
def test_consume_timeout(self, fake_parse_url, fake_ensure_connection):
deadline = time.time() + 3
conn = kafka_driver.Connection(
self.conf, '', kafka_driver.PURPOSE_LISTEN)
conn.consumer = mock.MagicMock()
conn.consumer.fetch_messages = mock.MagicMock(return_value=iter([]))
self.assertRaises(driver_common.Timeout, conn.consume, timeout=3)
self.assertEqual(0, int(deadline - time.time()))
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
@mock.patch.object(kafka_driver.Connection, '_parse_url')
def test_consume_with_default_timeout(
self, fake_parse_url, fake_ensure_connection):
deadline = time.time() + 1
conn = kafka_driver.Connection(
self.conf, '', kafka_driver.PURPOSE_LISTEN)
conn.consumer = mock.MagicMock()
conn.consumer.fetch_messages = mock.MagicMock(return_value=iter([]))
self.assertRaises(driver_common.Timeout, conn.consume)
self.assertEqual(0, int(deadline - time.time()))
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
@mock.patch.object(kafka_driver.Connection, '_parse_url')
def test_consume_timeout_without_consumers(
self, fake_parse_url, fake_ensure_connection):
deadline = time.time() + 3
conn = kafka_driver.Connection(
self.conf, '', kafka_driver.PURPOSE_LISTEN)
conn.consumer = mock.MagicMock(return_value=None)
self.assertRaises(driver_common.Timeout, conn.consume, timeout=3)
self.assertEqual(0, int(deadline - time.time()))
class TestKafkaListener(test_utils.BaseTestCase):
def setUp(self):
super(TestKafkaListener, self).setUp()
self.messaging_conf.transport_driver = 'kafka'
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
@mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer')
def test_create_listener(self, fake_consumer, fake_ensure_connection):
fake_target = oslo_messaging.Target(topic='fake_topic')
fake_targets_and_priorities = [(fake_target, 'info')]
listener = self.driver.listen_for_notifications(
fake_targets_and_priorities)
self.assertEqual(1, len(fake_consumer.mock_calls))
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
@mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer')
def test_stop_listener(self, fake_consumer, fake_client):
fake_target = oslo_messaging.Target(topic='fake_topic')
fake_targets_and_priorities = [(fake_target, 'info')]
listener = self.driver.listen_for_notifications(
fake_targets_and_priorities)
listener.conn.consume = mock.MagicMock()
listener.conn.consume.return_value = (
iter([kafka.common.KafkaMessage(
topic='fake_topic', partition=0, offset=0,
key=None, value='{"message": {"fake": "fake_message_1"},'
'"context": {"fake": "fake_context_1"}}')]))
listener.poll()
self.assertEqual(1, len(listener.conn.consume.mock_calls))
listener.conn.stop_consuming = mock.MagicMock()
listener.stop()
fake_response = listener.poll()
self.assertEqual(1, len(listener.conn.consume.mock_calls))
self.assertEqual([], fake_response)
class TestWithRealKafkaBroker(test_utils.BaseTestCase):
def setUp(self):
super(TestWithRealKafkaBroker, self).setUp()
self.messaging_conf.transport_driver = 'kafka'
transport = oslo_messaging.get_transport(self.conf, KAFKA_BROKER_URL)
self.driver = transport._driver
@unittest.skipUnless(
_is_kafka_service_running(), "Kafka service is not available")
def test_send_and_recieve_message(self):
target = oslo_messaging.Target(
topic="fake_topic", exchange='fake_exchange')
targets_and_priorities = [(target, 'fake_info')]
listener = self.driver.listen_for_notifications(
targets_and_priorities)
fake_context = {"fake_context_key": "fake_context_value"}
fake_message = {"fake_message_key": "fake_message_value"}
self.driver.send_notification(
target, fake_context, fake_message, None)
received_message = listener.poll()[0]
self.assertEqual(fake_context, received_message.ctxt)
self.assertEqual(fake_message, received_message.message)
@unittest.skipUnless(
_is_kafka_service_running(), "Kafka service is not available")
def test_send_and_recieve_message_without_exchange(self):
target = oslo_messaging.Target(topic="fake_no_exchange_topic")
targets_and_priorities = [(target, 'fake_info')]
listener = self.driver.listen_for_notifications(
targets_and_priorities)
fake_context = {"fake_context_key": "fake_context_value"}
fake_message = {"fake_message_key": "fake_message_value"}
self.driver.send_notification(
target, fake_context, fake_message, None)
received_message = listener.poll()[0]
self.assertEqual(fake_context, received_message.ctxt)
self.assertEqual(fake_message, received_message.message)
@unittest.skipUnless(
_is_kafka_service_running(), "Kafka service is not available")
def test_recieve_message_from_empty_topic_with_timeout(self):
target = oslo_messaging.Target(
topic="fake_empty_topic", exchange='fake_empty_exchange')
targets_and_priorities = [(target, 'fake_info')]
listener = self.driver.listen_for_notifications(
targets_and_priorities)
deadline = time.time() + 3
received_message = listener.poll(timeout=3)
self.assertEqual(0, int(deadline - time.time()))
self.assertEqual(None, received_message)

View File

@ -1,850 +0,0 @@
# Copyright (C) 2014 eNovance SAS <licensing@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import operator
import random
import threading
import time
try:
import qpid
except ImportError:
qpid = None
from six.moves import _thread
import testscenarios
import testtools
import oslo_messaging
from oslo_messaging._drivers import amqp
from oslo_messaging._drivers import impl_qpid as qpid_driver
from oslo_messaging.tests import utils as test_utils
from six.moves import mock
load_tests = testscenarios.load_tests_apply_scenarios
QPID_BROKER = 'localhost:5672'
class TestQpidDriverLoad(test_utils.BaseTestCase):
def setUp(self):
super(TestQpidDriverLoad, self).setUp()
self.messaging_conf.transport_driver = 'qpid'
def test_driver_load(self):
transport = oslo_messaging.get_transport(self.conf)
self.assertIsInstance(transport._driver, qpid_driver.QpidDriver)
def _is_qpidd_service_running():
"""this function checks if the qpid service is running or not."""
qpid_running = True
try:
broker = QPID_BROKER
connection = qpid.messaging.Connection(broker)
connection.open()
except Exception:
# qpid service is not running.
qpid_running = False
else:
connection.close()
return qpid_running
class _QpidBaseTestCase(test_utils.BaseTestCase):
@testtools.skipIf(qpid is None, "qpid not available")
def setUp(self):
super(_QpidBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'qpid'
self.fake_qpid = not _is_qpidd_service_running()
if self.fake_qpid:
self.session_receive = get_fake_qpid_session()
self.session_send = get_fake_qpid_session()
else:
self.broker = QPID_BROKER
# create connection from the qpid.messaging
# connection for the Consumer.
self.con_receive = qpid.messaging.Connection(self.broker)
self.con_receive.open()
# session to receive the messages
self.session_receive = self.con_receive.session()
# connection for sending the message
self.con_send = qpid.messaging.Connection(self.broker)
self.con_send.open()
# session to send the messages
self.session_send = self.con_send.session()
# list to store the expected messages and
# the actual received messages
self._expected = []
self._messages = []
self.initialized = True
def tearDown(self):
super(_QpidBaseTestCase, self).tearDown()
if self.initialized:
if self.fake_qpid:
_fake_session.flush_exchanges()
else:
self.con_receive.close()
self.con_send.close()
class TestQpidTransportURL(_QpidBaseTestCase):
scenarios = [
('none', dict(url=None,
expected=[dict(host='localhost:5672',
username='',
password='')])),
('empty',
dict(url='qpid:///',
expected=[dict(host='localhost:5672',
username='',
password='')])),
('localhost',
dict(url='qpid://localhost/',
expected=[dict(host='localhost',
username='',
password='')])),
('no_creds',
dict(url='qpid://host/',
expected=[dict(host='host',
username='',
password='')])),
('no_port',
dict(url='qpid://user:password@host/',
expected=[dict(host='host',
username='user',
password='password')])),
('full_url',
dict(url='qpid://user:password@host:10/',
expected=[dict(host='host:10',
username='user',
password='password')])),
('full_two_url',
dict(url='qpid://user:password@host:10,'
'user2:password2@host2:12/',
expected=[dict(host='host:10',
username='user',
password='password'),
dict(host='host2:12',
username='user2',
password='password2')
]
)),
]
@mock.patch.object(qpid_driver.Connection, 'reconnect')
def test_transport_url(self, *args):
transport = oslo_messaging.get_transport(self.conf, self.url)
self.addCleanup(transport.cleanup)
driver = transport._driver
brokers_params = driver._get_connection().brokers_params
self.assertEqual(sorted(self.expected,
key=operator.itemgetter('host')),
sorted(brokers_params,
key=operator.itemgetter('host')))
class TestQpidInvalidTopologyVersion(_QpidBaseTestCase):
"""Unit test cases to test invalid qpid topology version."""
scenarios = [
('direct', dict(consumer_cls=qpid_driver.DirectConsumer,
consumer_kwargs={},
publisher_cls=qpid_driver.DirectPublisher,
publisher_kwargs={})),
('topic', dict(consumer_cls=qpid_driver.TopicConsumer,
consumer_kwargs={'exchange_name': 'openstack'},
publisher_cls=qpid_driver.TopicPublisher,
publisher_kwargs={'exchange_name': 'openstack'})),
('fanout', dict(consumer_cls=qpid_driver.FanoutConsumer,
consumer_kwargs={},
publisher_cls=qpid_driver.FanoutPublisher,
publisher_kwargs={})),
]
def setUp(self):
super(TestQpidInvalidTopologyVersion, self).setUp()
self.config(qpid_topology_version=-1,
group='oslo_messaging_qpid')
def test_invalid_topology_version(self):
def consumer_callback(msg):
pass
msgid_or_topic = 'test'
# not using self.assertRaises because
# 1. qpid driver raises Exception(msg) for invalid topology version
# 2. flake8 - H202 assertRaises Exception too broad
exception_msg = ("Invalid value for qpid_topology_version: %d" %
self.conf.oslo_messaging_qpid.qpid_topology_version)
recvd_exc_msg = ''
try:
self.consumer_cls(self.conf.oslo_messaging_qpid,
self.session_receive,
msgid_or_topic,
consumer_callback,
**self.consumer_kwargs)
except Exception as e:
recvd_exc_msg = e.message
self.assertEqual(exception_msg, recvd_exc_msg)
recvd_exc_msg = ''
try:
self.publisher_cls(self.conf.oslo_messaging_qpid,
self.session_send,
topic=msgid_or_topic,
**self.publisher_kwargs)
except Exception as e:
recvd_exc_msg = e.message
self.assertEqual(exception_msg, recvd_exc_msg)
class TestQpidDirectConsumerPublisher(_QpidBaseTestCase):
"""Unit test cases to test DirectConsumer and Direct Publisher."""
_n_qpid_topology = [
('v1', dict(qpid_topology=1)),
('v2', dict(qpid_topology=2)),
]
_n_msgs = [
('single', dict(no_msgs=1)),
('multiple', dict(no_msgs=10)),
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._n_qpid_topology,
cls._n_msgs)
def consumer_callback(self, msg):
# This function will be called by the DirectConsumer
# when any message is received.
# Append the received message into the messages list
# so that the received messages can be validated
# with the expected messages
if isinstance(msg, dict):
self._messages.append(msg['content'])
else:
self._messages.append(msg)
def test_qpid_direct_consumer_producer(self):
self.msgid = str(random.randint(1, 100))
# create a DirectConsumer and DirectPublisher class objects
self.dir_cons = qpid_driver.DirectConsumer(
self.conf.oslo_messaging_qpid,
self.session_receive,
self.msgid,
self.consumer_callback)
self.dir_pub = qpid_driver.DirectPublisher(
self.conf.oslo_messaging_qpid,
self.session_send,
self.msgid)
def try_send_msg(no_msgs):
for i in range(no_msgs):
self._expected.append(str(i))
snd_msg = {'content_type': 'text/plain', 'content': str(i)}
self.dir_pub.send(snd_msg)
def try_receive_msg(no_msgs):
for i in range(no_msgs):
self.dir_cons.consume()
thread1 = threading.Thread(target=try_receive_msg,
args=(self.no_msgs,))
thread2 = threading.Thread(target=try_send_msg,
args=(self.no_msgs,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
self.assertEqual(self.no_msgs, len(self._messages))
self.assertEqual(self._expected, self._messages)
TestQpidDirectConsumerPublisher.generate_scenarios()
class TestQpidTopicAndFanout(_QpidBaseTestCase):
"""Unit Test cases to test TopicConsumer and
TopicPublisher classes of the qpid driver
and FanoutConsumer and FanoutPublisher classes
of the qpid driver
"""
_n_qpid_topology = [
('v1', dict(qpid_topology=1)),
('v2', dict(qpid_topology=2)),
]
_n_msgs = [
('single', dict(no_msgs=1)),
('multiple', dict(no_msgs=10)),
]
_n_senders = [
('single', dict(no_senders=1)),
('multiple', dict(no_senders=10)),
]
_n_receivers = [
('single', dict(no_receivers=1)),
]
_exchange_class = [
('topic', dict(consumer_cls=qpid_driver.TopicConsumer,
consumer_kwargs={'exchange_name': 'openstack'},
publisher_cls=qpid_driver.TopicPublisher,
publisher_kwargs={'exchange_name': 'openstack'},
topic='topictest.test',
receive_topic='topictest.test')),
('fanout', dict(consumer_cls=qpid_driver.FanoutConsumer,
consumer_kwargs={},
publisher_cls=qpid_driver.FanoutPublisher,
publisher_kwargs={},
topic='fanouttest',
receive_topic='fanouttest')),
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._n_qpid_topology,
cls._n_msgs,
cls._n_senders,
cls._n_receivers,
cls._exchange_class)
def setUp(self):
super(TestQpidTopicAndFanout, self).setUp()
# to store the expected messages and the
# actual received messages
#
# NOTE(dhellmann): These are dicts, where the base class uses
# lists.
self._expected = {}
self._messages = {}
self._senders = []
self._receivers = []
self._sender_threads = []
self._receiver_threads = []
def consumer_callback(self, msg):
"""callback function called by the ConsumerBase class of
qpid driver.
Message will be received in the format x-y
where x is the sender id and y is the msg number of the sender
extract the sender id 'x' and store the msg 'x-y' with 'x' as
the key
"""
if isinstance(msg, dict):
msgcontent = msg['content']
else:
msgcontent = msg
splitmsg = msgcontent.split('-')
key = _thread.get_ident()
if key not in self._messages:
self._messages[key] = dict()
tdict = self._messages[key]
if splitmsg[0] not in tdict:
tdict[splitmsg[0]] = []
tdict[splitmsg[0]].append(msgcontent)
def _try_send_msg(self, sender_id, no_msgs):
for i in range(no_msgs):
sendmsg = '%s-%s' % (str(sender_id), str(i))
key = str(sender_id)
# Store the message in the self._expected for each sender.
# This will be used later to
# validate the test by comparing it with the
# received messages by all the receivers
if key not in self._expected:
self._expected[key] = []
self._expected[key].append(sendmsg)
send_dict = {'content_type': 'text/plain', 'content': sendmsg}
self._senders[sender_id].send(send_dict)
def _try_receive_msg(self, receiver_id, no_msgs):
for i in range(self.no_senders * no_msgs):
no_of_attempts = 0
# ConsumerBase.consume blocks indefinitely until a message
# is received.
# So qpid_receiver.available() is called before calling
# ConsumerBase.consume() so that we are not
# blocked indefinitely
qpid_receiver = self._receivers[receiver_id].get_receiver()
while no_of_attempts < 50:
if qpid_receiver.available() > 0:
self._receivers[receiver_id].consume()
break
no_of_attempts += 1
time.sleep(0.05)
def test_qpid_topic_and_fanout(self):
for receiver_id in range(self.no_receivers):
consumer = self.consumer_cls(self.conf.oslo_messaging_qpid,
self.session_receive,
self.receive_topic,
self.consumer_callback,
**self.consumer_kwargs)
self._receivers.append(consumer)
# create receivers threads
thread = threading.Thread(target=self._try_receive_msg,
args=(receiver_id, self.no_msgs,))
self._receiver_threads.append(thread)
for sender_id in range(self.no_senders):
publisher = self.publisher_cls(self.conf.oslo_messaging_qpid,
self.session_send,
topic=self.topic,
**self.publisher_kwargs)
self._senders.append(publisher)
# create sender threads
thread = threading.Thread(target=self._try_send_msg,
args=(sender_id, self.no_msgs,))
self._sender_threads.append(thread)
for thread in self._receiver_threads:
thread.start()
for thread in self._sender_threads:
thread.start()
for thread in self._receiver_threads:
thread.join()
for thread in self._sender_threads:
thread.join()
# Each receiver should receive all the messages sent by
# the sender(s).
# So, Iterate through each of the receiver items in
# self._messages and compare with the expected messages
# messages.
self.assertEqual(self.no_senders, len(self._expected))
self.assertEqual(self.no_receivers, len(self._messages))
for key, messages in self._messages.iteritems():
self.assertEqual(self._expected, messages)
TestQpidTopicAndFanout.generate_scenarios()
class AddressNodeMatcher(object):
def __init__(self, node):
self.node = node
def __eq__(self, address):
return address.split(';')[0].strip() == self.node
class TestDriverInterface(_QpidBaseTestCase):
"""Unit Test cases to test the amqpdriver with qpid
"""
def setUp(self):
super(TestDriverInterface, self).setUp()
self.config(qpid_topology_version=2,
group='oslo_messaging_qpid')
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
original_get_connection = self.driver._get_connection
p = mock.patch.object(self.driver, '_get_connection',
side_effect=lambda pooled=True:
original_get_connection(False))
p.start()
self.addCleanup(p.stop)
def test_listen_and_direct_send(self):
target = oslo_messaging.Target(exchange="exchange_test",
topic="topic_test",
server="server_test")
with mock.patch('qpid.messaging.Connection') as conn_cls:
conn = conn_cls.return_value
session = conn.session.return_value
session.receiver.side_effect = [mock.Mock(), mock.Mock(),
mock.Mock()]
listener = self.driver.listen(target)
listener.conn.direct_send("msg_id", {})
self.assertEqual(3, len(listener.conn.consumers))
expected_calls = [
mock.call(AddressNodeMatcher(
'amq.topic/topic/exchange_test/topic_test')),
mock.call(AddressNodeMatcher(
'amq.topic/topic/exchange_test/topic_test.server_test')),
mock.call(AddressNodeMatcher('amq.topic/fanout/topic_test')),
]
session.receiver.assert_has_calls(expected_calls)
session.sender.assert_called_with(
AddressNodeMatcher("amq.direct/msg_id"))
def test_send(self):
target = oslo_messaging.Target(exchange="exchange_test",
topic="topic_test",
server="server_test")
with mock.patch('qpid.messaging.Connection') as conn_cls:
conn = conn_cls.return_value
session = conn.session.return_value
self.driver.send(target, {}, {})
session.sender.assert_called_with(AddressNodeMatcher(
"amq.topic/topic/exchange_test/topic_test.server_test"))
def test_send_notification(self):
target = oslo_messaging.Target(exchange="exchange_test",
topic="topic_test.info")
with mock.patch('qpid.messaging.Connection') as conn_cls:
conn = conn_cls.return_value
session = conn.session.return_value
self.driver.send_notification(target, {}, {}, "2.0")
session.sender.assert_called_with(AddressNodeMatcher(
"amq.topic/topic/exchange_test/topic_test.info"))
class TestQpidReconnectOrder(test_utils.BaseTestCase):
"""Unit Test cases to test reconnection
"""
@testtools.skipIf(qpid is None, "qpid not available")
def test_reconnect_order(self):
brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
brokers_count = len(brokers)
self.config(qpid_hosts=brokers,
group='oslo_messaging_qpid')
with mock.patch('qpid.messaging.Connection') as conn_mock:
# starting from the first broker in the list
url = oslo_messaging.TransportURL.parse(self.conf, None)
connection = qpid_driver.Connection(self.conf, url,
amqp.PURPOSE_SEND)
# reconnect will advance to the next broker, one broker per
# attempt, and then wrap to the start of the list once the end is
# reached
for _ in range(brokers_count):
connection.reconnect()
expected = []
for broker in brokers:
expected.extend([mock.call("%s:5672" % broker),
mock.call().open(),
mock.call().session(),
mock.call().opened(),
mock.call().opened().__nonzero__(),
mock.call().close()])
conn_mock.assert_has_calls(expected, any_order=True)
def synchronized(func):
func.__lock__ = threading.Lock()
def synced_func(*args, **kws):
with func.__lock__:
return func(*args, **kws)
return synced_func
class FakeQpidMsgManager(object):
def __init__(self):
self._exchanges = {}
@synchronized
def add_exchange(self, exchange):
if exchange not in self._exchanges:
self._exchanges[exchange] = {'msgs': [], 'consumers': {}}
@synchronized
def add_exchange_consumer(self, exchange, consumer_id):
exchange_info = self._exchanges[exchange]
cons_dict = exchange_info['consumers']
cons_dict[consumer_id] = 0
@synchronized
def add_exchange_msg(self, exchange, msg):
exchange_info = self._exchanges[exchange]
exchange_info['msgs'].append(msg)
def get_exchange_msg(self, exchange, index):
exchange_info = self._exchanges[exchange]
return exchange_info['msgs'][index]
def get_no_exch_msgs(self, exchange):
exchange_info = self._exchanges[exchange]
return len(exchange_info['msgs'])
def get_exch_cons_index(self, exchange, consumer_id):
exchange_info = self._exchanges[exchange]
cons_dict = exchange_info['consumers']
return cons_dict[consumer_id]
@synchronized
def inc_consumer_index(self, exchange, consumer_id):
exchange_info = self._exchanges[exchange]
cons_dict = exchange_info['consumers']
cons_dict[consumer_id] += 1
_fake_qpid_msg_manager = FakeQpidMsgManager()
class FakeQpidSessionSender(object):
def __init__(self, session, id, target, options):
self.session = session
self.id = id
self.target = target
self.options = options
@synchronized
def send(self, object, sync=True, timeout=None):
_fake_qpid_msg_manager.add_exchange_msg(self.target, object)
def close(self, timeout=None):
pass
class FakeQpidSessionReceiver(object):
def __init__(self, session, id, source, options):
self.session = session
self.id = id
self.source = source
self.options = options
@synchronized
def fetch(self, timeout=None):
if timeout is None:
# if timeout is not given, take a default time out
# of 30 seconds to avoid indefinite loop
_timeout = 30
else:
_timeout = timeout
deadline = time.time() + _timeout
while time.time() <= deadline:
index = _fake_qpid_msg_manager.get_exch_cons_index(self.source,
self.id)
try:
msg = _fake_qpid_msg_manager.get_exchange_msg(self.source,
index)
except IndexError:
pass
else:
_fake_qpid_msg_manager.inc_consumer_index(self.source,
self.id)
return qpid.messaging.Message(msg)
time.sleep(0.050)
if timeout is None:
raise Exception('timed out waiting for reply')
def close(self, timeout=None):
pass
@synchronized
def available(self):
no_msgs = _fake_qpid_msg_manager.get_no_exch_msgs(self.source)
index = _fake_qpid_msg_manager.get_exch_cons_index(self.source,
self.id)
if no_msgs == 0 or index >= no_msgs:
return 0
else:
return no_msgs - index
class FakeQpidSession(object):
def __init__(self, connection=None, name=None, transactional=None):
self.connection = connection
self.name = name
self.transactional = transactional
self._receivers = {}
self.conf = None
self.url = None
self._senders = {}
self._sender_id = 0
self._receiver_id = 0
@synchronized
def sender(self, target, **options):
exchange_key = self._extract_exchange_key(target)
_fake_qpid_msg_manager.add_exchange(exchange_key)
sendobj = FakeQpidSessionSender(self, self._sender_id,
exchange_key, options)
self._senders[self._sender_id] = sendobj
self._sender_id = self._sender_id + 1
return sendobj
@synchronized
def receiver(self, source, **options):
exchange_key = self._extract_exchange_key(source)
_fake_qpid_msg_manager.add_exchange(exchange_key)
recvobj = FakeQpidSessionReceiver(self, self._receiver_id,
exchange_key, options)
self._receivers[self._receiver_id] = recvobj
_fake_qpid_msg_manager.add_exchange_consumer(exchange_key,
self._receiver_id)
self._receiver_id += 1
return recvobj
def acknowledge(self, message=None, disposition=None, sync=True):
pass
@synchronized
def flush_exchanges(self):
_fake_qpid_msg_manager._exchanges = {}
def _extract_exchange_key(self, exchange_msg):
"""This function extracts a unique key for the exchange.
This key is used in the dictionary as a 'key' for
this exchange.
Eg. if the exchange_msg (for qpid topology version 1)
is 33/33 ; {"node": {"x-declare": {"auto-delete": true, ....
then 33 is returned as the key.
Eg 2. For topology v2, if the
exchange_msg is - amq.direct/44 ; {"link": {"x-dec.......
then 44 is returned
"""
# first check for ';'
semicolon_split = exchange_msg.split(';')
# split the first item of semicolon_split with '/'
slash_split = semicolon_split[0].split('/')
# return the last element of the list as the key
key = slash_split[-1]
return key.strip()
def close(self):
pass
_fake_session = FakeQpidSession()
def get_fake_qpid_session():
return _fake_session
class QPidHATestCase(test_utils.BaseTestCase):
@testtools.skipIf(qpid is None, "qpid not available")
def setUp(self):
super(QPidHATestCase, self).setUp()
self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
self.config(qpid_hosts=self.brokers,
qpid_username=None,
qpid_password=None,
group='oslo_messaging_qpid')
hostname_sets = set()
self.info = {'attempt': 0,
'fail': False}
def _connect(myself, broker):
# do as little work that is enough to pass connection attempt
myself.connection = mock.Mock()
hostname = broker['host']
self.assertNotIn(hostname, hostname_sets)
hostname_sets.add(hostname)
self.info['attempt'] += 1
if self.info['fail']:
raise qpid.messaging.exceptions.ConnectionError
# just make sure connection instantiation does not fail with an
# exception
self.stubs.Set(qpid_driver.Connection, '_connect', _connect)
# starting from the first broker in the list
url = oslo_messaging.TransportURL.parse(self.conf, None)
self.connection = qpid_driver.Connection(self.conf, url,
amqp.PURPOSE_SEND)
self.addCleanup(self.connection.close)
self.info.update({'attempt': 0,
'fail': True})
hostname_sets.clear()
def test_reconnect_order(self):
self.assertRaises(oslo_messaging.MessageDeliveryFailure,
self.connection.reconnect,
retry=len(self.brokers) - 1)
self.assertEqual(len(self.brokers), self.info['attempt'])
def test_ensure_four_retries(self):
mock_callback = mock.Mock(
side_effect=qpid.messaging.exceptions.ConnectionError)
self.assertRaises(oslo_messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
retry=4)
self.assertEqual(5, self.info['attempt'])
self.assertEqual(1, mock_callback.call_count)
def test_ensure_one_retry(self):
mock_callback = mock.Mock(
side_effect=qpid.messaging.exceptions.ConnectionError)
self.assertRaises(oslo_messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
retry=1)
self.assertEqual(2, self.info['attempt'])
self.assertEqual(1, mock_callback.call_count)
def test_ensure_no_retry(self):
mock_callback = mock.Mock(
side_effect=qpid.messaging.exceptions.ConnectionError)
self.assertRaises(oslo_messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
retry=0)
self.assertEqual(1, self.info['attempt'])
self.assertEqual(1, mock_callback.call_count)

View File

@ -28,7 +28,6 @@ from oslotest import mockpatch
import testscenarios
import oslo_messaging
from oslo_messaging._drivers import amqp
from oslo_messaging._drivers import amqpdriver
from oslo_messaging._drivers import common as driver_common
from oslo_messaging._drivers import impl_rabbit as rabbit_driver
@ -91,10 +90,12 @@ class TestHeartbeat(test_utils.BaseTestCase):
if not heartbeat_side_effect:
self.assertEqual(1, fake_ensure_connection.call_count)
self.assertEqual(2, fake_logger.info.call_count)
self.assertEqual(2, fake_logger.debug.call_count)
self.assertEqual(0, fake_logger.info.call_count)
else:
self.assertEqual(2, fake_ensure_connection.call_count)
self.assertEqual(3, fake_logger.info.call_count)
self.assertEqual(2, fake_logger.debug.call_count)
self.assertEqual(1, fake_logger.info.call_count)
self.assertIn(mock.call(info, mock.ANY),
fake_logger.info.mock_calls)
@ -167,7 +168,7 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase):
'on_blocked': mock.ANY,
'on_unblocked': mock.ANY},
ssl=self.expected, login_method='AMQPLAIN',
heartbeat=60, failover_strategy="shuffle")
heartbeat=60, failover_strategy='round-robin')
class TestRabbitPublisher(test_utils.BaseTestCase):
@ -175,7 +176,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
def test_send_with_timeout(self, fake_publish):
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')
with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn:
with transport._driver._get_connection(driver_common.PURPOSE_SEND) as pool_conn:
conn = pool_conn.connection
conn._publish(mock.Mock(), 'msg', routing_key='routing_key',
timeout=1)
@ -185,7 +186,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
def test_send_no_timeout(self, fake_publish):
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')
with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn:
with transport._driver._get_connection(driver_common.PURPOSE_SEND) as pool_conn:
conn = pool_conn.connection
conn._publish(mock.Mock(), 'msg', routing_key='routing_key')
fake_publish.assert_called_with('msg', expiration=None)
@ -205,7 +206,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
type='topic',
passive=False)
with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn:
with transport._driver._get_connection(driver_common.PURPOSE_SEND) as pool_conn:
conn = pool_conn.connection
exc = conn.connection.channel_errors[0]
@ -238,7 +239,7 @@ class TestRabbitConsume(test_utils.BaseTestCase):
'kombu+memory:////')
self.addCleanup(transport.cleanup)
deadline = time.time() + 6
with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
with transport._driver._get_connection(driver_common.PURPOSE_LISTEN) as conn:
self.assertRaises(driver_common.Timeout,
conn.consume, timeout=3)
@ -257,7 +258,7 @@ class TestRabbitConsume(test_utils.BaseTestCase):
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')
self.addCleanup(transport.cleanup)
with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
with transport._driver._get_connection(driver_common.PURPOSE_LISTEN) as conn:
channel = conn.connection.channel
with mock.patch('kombu.connection.Connection.connected',
new_callable=mock.PropertyMock,
@ -361,11 +362,6 @@ class TestSendReceive(test_utils.BaseTestCase):
('timeout', dict(timeout=0.01)), # FIXME(markmc): timeout=0 is broken?
]
_reply_ending = [
('old_behavior', dict(send_single_reply=False)),
('new_behavior', dict(send_single_reply=True)),
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders,
@ -373,16 +369,13 @@ class TestSendReceive(test_utils.BaseTestCase):
cls._reply,
cls._reply_fail,
cls._failure,
cls._timeout,
cls._reply_ending)
cls._timeout)
def test_send_receive(self):
self.config(kombu_reconnect_timeout=0.5,
self.config(kombu_missing_consumer_retry_timeout=0.5,
group="oslo_messaging_rabbit")
self.config(heartbeat_timeout_threshold=0,
group="oslo_messaging_rabbit")
self.config(send_single_reply=self.send_single_reply,
group="oslo_messaging_rabbit")
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')
self.addCleanup(transport.cleanup)
@ -430,7 +423,7 @@ class TestSendReceive(test_utils.BaseTestCase):
for i in range(len(senders)):
senders[i].start()
received = listener.poll()
received = listener.poll()[0]
self.assertIsNotNone(received)
self.assertEqual(self.ctxt, received.ctxt)
self.assertEqual({'tx_id': i}, received.message)
@ -472,10 +465,10 @@ class TestSendReceive(test_utils.BaseTestCase):
if self.reply_failure_404:
# NOTE(sileht) all reply fail, first take
# kombu_reconnect_timeout seconds to fail
# kombu_missing_consumer_retry_timeout seconds to fail
# next immediately fail
dt = time.time() - start
timeout = self.conf.oslo_messaging_rabbit.kombu_reconnect_timeout
timeout = self.conf.oslo_messaging_rabbit.kombu_missing_consumer_retry_timeout
self.assertTrue(timeout <= dt < (timeout + 0.100), dt)
self.assertEqual(len(senders), len(replies))
@ -508,7 +501,7 @@ class TestPollAsync(test_utils.BaseTestCase):
target = oslo_messaging.Target(topic='testtopic')
listener = driver.listen(target)
received = listener.poll(timeout=0.050)
self.assertIsNone(received)
self.assertEqual([], received)
class TestRacyWaitForReply(test_utils.BaseTestCase):
@ -568,13 +561,13 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
senders[0].start()
notify_condition.wait()
msgs.append(listener.poll())
msgs.extend(listener.poll())
self.assertEqual({'tx_id': 0}, msgs[-1].message)
# Start the second guy, receive his message
senders[1].start()
msgs.append(listener.poll())
msgs.extend(listener.poll())
self.assertEqual({'tx_id': 1}, msgs[-1].message)
# Reply to both in order, making the second thread queue
@ -588,7 +581,7 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
# Start the 3rd guy, receive his message
senders[2].start()
msgs.append(listener.poll())
msgs.extend(listener.poll())
self.assertEqual({'tx_id': 2}, msgs[-1].message)
# Verify the _send_reply was not invoked by driver:
@ -869,7 +862,7 @@ class TestReplyWireFormat(test_utils.BaseTestCase):
producer.publish(msg)
received = listener.poll()
received = listener.poll()[0]
self.assertIsNotNone(received)
self.assertEqual(self.expected_ctxt, received.ctxt)
self.assertEqual(self.expected, received.message)
@ -894,13 +887,15 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
self.useFixture(mockpatch.Patch(
'kombu.connection.Connection.connect',
side_effect=self.kombu_connect))
self.useFixture(mockpatch.Patch(
'kombu.connection.Connection.connection'))
self.useFixture(mockpatch.Patch(
'kombu.connection.Connection.channel'))
# starting from the first broker in the list
url = oslo_messaging.TransportURL.parse(self.conf, None)
self.connection = rabbit_driver.Connection(self.conf, url,
amqp.PURPOSE_SEND)
driver_common.PURPOSE_SEND)
self.addCleanup(self.connection.close)
def test_ensure_four_retry(self):

View File

@ -62,47 +62,47 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
self.host2 = b"test_host2"
def test_register(self):
self.test_matcher.register(self.target, self.host1)
self.test_matcher.register(self.target, self.host1, "test")
self.assertEqual(self.test_matcher.get_hosts(self.target),
self.assertEqual(self.test_matcher.get_hosts(self.target, "test"),
[self.host1])
self.assertEqual(self.test_matcher.get_single_host(self.target),
self.assertEqual(self.test_matcher.get_single_host(self.target, "test"),
self.host1)
def test_register_two_hosts(self):
self.test_matcher.register(self.target, self.host1)
self.test_matcher.register(self.target, self.host2)
self.test_matcher.register(self.target, self.host1, "test")
self.test_matcher.register(self.target, self.host2, "test")
self.assertItemsEqual(self.test_matcher.get_hosts(self.target),
self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
[self.host1, self.host2])
self.assertIn(self.test_matcher.get_single_host(self.target),
self.assertIn(self.test_matcher.get_single_host(self.target, "test"),
[self.host1, self.host2])
def test_register_unsibscribe(self):
self.test_matcher.register(self.target, self.host1)
self.test_matcher.register(self.target, self.host2)
self.test_matcher.register(self.target, self.host1, "test")
self.test_matcher.register(self.target, self.host2, "test")
self.test_matcher.unregister(self.target, self.host2)
self.test_matcher.unregister(self.target, self.host2, "test")
self.assertItemsEqual(self.test_matcher.get_hosts(self.target),
self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
[self.host1])
self.assertNotIn(self.test_matcher.get_single_host(self.target),
self.assertNotIn(self.test_matcher.get_single_host(self.target, "test"),
[self.host2])
def test_register_two_same_hosts(self):
self.test_matcher.register(self.target, self.host1)
self.test_matcher.register(self.target, self.host1)
self.test_matcher.register(self.target, self.host1, "test")
self.test_matcher.register(self.target, self.host1, "test")
self.assertEqual(self.test_matcher.get_hosts(self.target),
self.assertEqual(self.test_matcher.get_hosts(self.target, "test"),
[self.host1])
self.assertEqual(self.test_matcher.get_single_host(self.target),
self.assertEqual(self.test_matcher.get_single_host(self.target, "test"),
self.host1)
def test_get_hosts_wrong_topic(self):
target = oslo_messaging.Target(topic="no_such_topic")
self.assertEqual(self.test_matcher.get_hosts(target), [])
self.assertEqual(self.test_matcher.get_hosts(target, "test"), [])
def test_get_single_host_wrong_topic(self):
target = oslo_messaging.Target(topic="no_such_topic")
self.assertRaises(oslo_messaging.InvalidTarget,
self.test_matcher.get_single_host, target)
self.test_matcher.get_single_host, target, "test")

View File

@ -21,6 +21,7 @@ import testtools
import oslo_messaging
from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._i18n import _
from oslo_messaging.tests import utils as test_utils
@ -51,7 +52,8 @@ class TestServerListener(object):
def _run(self):
try:
message = self.listener.poll()
if message is not None:
if message:
message = message[0]
message.acknowledge()
self._received.set()
self.message = message
@ -90,6 +92,33 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
self.addCleanup(stopRpc(self.__dict__))
class ZmqTestPortsRange(ZmqBaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(ZmqTestPortsRange, self).setUp()
# Set config values
kwargs = {'rpc_zmq_min_port': 5555,
'rpc_zmq_max_port': 5560}
self.config(**kwargs)
def test_ports_range(self):
listeners = []
for i in range(10):
try:
target = oslo_messaging.Target(topic='testtopic_'+str(i))
new_listener = self.driver.listen(target)
listeners.append(new_listener)
except zmq_socket.ZmqPortRangeExceededException:
pass
self.assertLessEqual(len(listeners), 5)
for l in listeners:
l.cleanup()
class TestConfZmqDriverLoad(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
@ -196,6 +225,7 @@ class TestZmqBasics(ZmqBaseTestCase):
class TestPoller(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestPoller, self).setUp()
self.poller = zmq_async.get_poller()

View File

@ -11,15 +11,19 @@
# under the License.
import mock
import testtools
from oslo_messaging._drivers.zmq_driver.poller import green_poller
from oslo_messaging._drivers.zmq_driver.poller import threading_poller
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging.tests import utils as test_utils
zmq = zmq_async.import_zmq()
class TestImportZmq(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestImportZmq, self).setUp()
@ -29,12 +33,12 @@ class TestImportZmq(test_utils.BaseTestCase):
zmq_async.importutils.try_import.return_value = 'mock zmq module'
self.assertEqual('mock zmq module', zmq_async.import_zmq('native'))
mock_try_import.assert_called_with('zmq', default='zmq')
mock_try_import.assert_called_with('zmq', default=None)
zmq_async.importutils.try_import.return_value = 'mock eventlet module'
self.assertEqual('mock eventlet module',
zmq_async.import_zmq('eventlet'))
mock_try_import.assert_called_with('eventlet.green.zmq', default='zmq')
mock_try_import.assert_called_with('eventlet.green.zmq', default=None)
def test_when_no_args_then_default_zmq_module_is_loaded(self):
mock_try_import = mock.Mock()
@ -42,14 +46,7 @@ class TestImportZmq(test_utils.BaseTestCase):
zmq_async.import_zmq()
mock_try_import.assert_called_with('eventlet.green.zmq', default='zmq')
def test_when_import_fails_then_raise_ImportError(self):
zmq_async.importutils.try_import = mock.Mock()
zmq_async.importutils.try_import.return_value = None
with self.assertRaisesRegexp(ImportError, "ZeroMQ not found!"):
zmq_async.import_zmq('native')
mock_try_import.assert_called_with('eventlet.green.zmq', default=None)
def test_invalid_config_value_raise_ValueError(self):
invalid_opt = 'x'
@ -61,6 +58,7 @@ class TestImportZmq(test_utils.BaseTestCase):
class TestGetPoller(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestGetPoller, self).setUp()
@ -100,6 +98,7 @@ class TestGetPoller(test_utils.BaseTestCase):
class TestGetReplyPoller(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestGetReplyPoller, self).setUp()
@ -134,6 +133,7 @@ class TestGetReplyPoller(test_utils.BaseTestCase):
class TestGetExecutor(test_utils.BaseTestCase):
@testtools.skipIf(zmq is None, "zmq not available")
def setUp(self):
super(TestGetExecutor, self).setUp()

View File

@ -44,7 +44,7 @@ try:
except ImportError:
impl_eventlet = None
from oslo_messaging._executors import impl_thread
from oslo_messaging import _utils as utils
from oslo_messaging import dispatcher as dispatcher_base
from oslo_messaging.tests import utils as test_utils
from six.moves import mock
@ -81,6 +81,12 @@ class TestExecutor(test_utils.BaseTestCase):
aioeventlet_class = None
is_aioeventlet = (self.executor == aioeventlet_class)
if impl_blocking is not None:
blocking_class = impl_blocking.BlockingExecutor
else:
blocking_class = None
is_blocking = (self.executor == blocking_class)
if is_aioeventlet:
policy = aioeventlet.EventLoopPolicy()
trollius.set_event_loop_policy(policy)
@ -110,8 +116,15 @@ class TestExecutor(test_utils.BaseTestCase):
endpoint = mock.MagicMock(return_value=simple_coroutine('result'))
event = eventlet.event.Event()
else:
elif is_blocking:
def run_executor(executor):
executor.start()
executor.execute()
executor.wait()
endpoint = mock.MagicMock(return_value='result')
event = None
else:
def run_executor(executor):
executor.start()
executor.wait()
@ -119,11 +132,14 @@ class TestExecutor(test_utils.BaseTestCase):
endpoint = mock.MagicMock(return_value='result')
event = None
class Dispatcher(object):
class Dispatcher(dispatcher_base.DispatcherBase):
def __init__(self, endpoint):
self.endpoint = endpoint
self.result = "not set"
def _listen(self, transport):
pass
def callback(self, incoming, executor_callback):
if executor_callback is None:
result = self.endpoint(incoming.ctxt,
@ -138,9 +154,8 @@ class TestExecutor(test_utils.BaseTestCase):
return result
def __call__(self, incoming, executor_callback=None):
return utils.DispatcherExecutorContext(incoming,
self.callback,
executor_callback)
return dispatcher_base.DispatcherExecutorContext(
incoming[0], self.callback, executor_callback)
return Dispatcher(endpoint), endpoint, event, run_executor
@ -150,7 +165,7 @@ class TestExecutor(test_utils.BaseTestCase):
executor = self.executor(self.conf, listener, dispatcher)
incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'})
def fake_poll(timeout=None):
def fake_poll(timeout=None, prefetch_size=1):
time.sleep(0.1)
if listener.poll.call_count == 10:
if event is not None:
@ -178,9 +193,9 @@ class TestExecutor(test_utils.BaseTestCase):
executor = self.executor(self.conf, listener, dispatcher)
incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'})
def fake_poll(timeout=None):
def fake_poll(timeout=None, prefetch_size=1):
if listener.poll.call_count == 1:
return incoming_message
return [incoming_message]
if event is not None:
event.wait()
executor.stop()

View File

@ -46,10 +46,6 @@ case $RPC_BACKEND in
sudo apt-get update -y
sudo apt-get install -y redis-server python-redis
;;
qpid)
sudo apt-get update -y
sudo apt-get install -y qpidd sasl2-bin
;;
amqp1)
sudo yum install -y qpid-cpp-server qpid-proton-c-devel python-qpid-proton cyrus-sasl-lib cyrus-sasl-plain
;;

View File

@ -51,8 +51,9 @@ class LoggingNotificationHandlerTestCase(utils.SkipIfNoTransportURL):
# NOTE(gtt): Using different topic to make tests run in parallel
topic = 'test_logging_%s_driver_%s' % (self.priority, self.driver)
self.conf.notification_driver = [self.driver]
self.conf.notification_topics = [topic]
self.config(driver=[self.driver],
topics=[topic],
group='oslo_messaging_notifications')
listener = self.useFixture(
utils.NotificationFixture(self.conf, self.url, [topic]))

View File

@ -16,6 +16,7 @@ import uuid
import concurrent.futures
from oslo_config import cfg
import six.moves
from testtools import matchers
import oslo_messaging
@ -27,8 +28,8 @@ class CallTestCase(utils.SkipIfNoTransportURL):
def setUp(self):
super(CallTestCase, self).setUp(conf=cfg.ConfigOpts())
self.conf.prog="test_prog"
self.conf.project="test_project"
self.conf.prog = "test_prog"
self.conf.project = "test_project"
self.config(heartbeat_timeout_threshold=0,
group='oslo_messaging_rabbit')
@ -324,3 +325,18 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
self.assertEqual(expected[1], actual[0])
self.assertEqual(expected[2], actual[1])
self.assertEqual(expected[3], actual[2])
def test_simple_batch(self):
listener = self.useFixture(
utils.BatchNotificationFixture(self.conf, self.url,
['test_simple_batch'],
batch_size=100, batch_timeout=2))
notifier = listener.notifier('abc')
for i in six.moves.range(0, 205):
notifier.info({}, 'test%s' % i, 'Hello World!')
events = listener.get_events(timeout=3)
self.assertEqual(3, len(events), events)
self.assertEqual(100, len(events[0][1]))
self.assertEqual(100, len(events[1][1]))
self.assertEqual(5, len(events[2][1]))

View File

@ -293,13 +293,14 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
class NotificationFixture(fixtures.Fixture):
def __init__(self, conf, url, topics):
def __init__(self, conf, url, topics, batch=None):
super(NotificationFixture, self).__init__()
self.conf = conf
self.url = url
self.topics = topics
self.events = moves.queue.Queue()
self.name = str(id(self))
self.batch = batch
def setUp(self):
super(NotificationFixture, self).setUp()
@ -307,10 +308,7 @@ class NotificationFixture(fixtures.Fixture):
# add a special topic for internal notifications
targets.append(oslo_messaging.Target(topic=self.name))
transport = self.useFixture(TransportFixture(self.conf, self.url))
self.server = oslo_messaging.get_notification_listener(
transport.transport,
targets,
[self], 'eventlet')
self.server = self._get_server(transport, targets)
self._ctrl = self.notifier('internal', topic=self.name)
self._start()
transport.wait()
@ -319,6 +317,12 @@ class NotificationFixture(fixtures.Fixture):
self._stop()
super(NotificationFixture, self).cleanUp()
def _get_server(self, transport, targets):
return oslo_messaging.get_notification_listener(
transport.transport,
targets,
[self], 'eventlet')
def _start(self):
self.thread = test_utils.ServerThreadHelper(self.server)
self.thread.start()
@ -366,3 +370,39 @@ class NotificationFixture(fixtures.Fixture):
except moves.queue.Empty:
pass
return results
class BatchNotificationFixture(NotificationFixture):
def __init__(self, conf, url, topics, batch_size=5, batch_timeout=2):
super(BatchNotificationFixture, self).__init__(conf, url, topics)
self.batch_size = batch_size
self.batch_timeout = batch_timeout
def _get_server(self, transport, targets):
return oslo_messaging.get_batch_notification_listener(
transport.transport,
targets,
[self], 'eventlet',
batch_timeout=self.batch_timeout,
batch_size=self.batch_size)
def debug(self, messages):
self.events.put(['debug', messages])
def audit(self, messages):
self.events.put(['audit', messages])
def info(self, messages):
self.events.put(['info', messages])
def warn(self, messages):
self.events.put(['warn', messages])
def error(self, messages):
self.events.put(['error', messages])
def critical(self, messages):
self.events.put(['critical', messages])
def sample(self, messages):
pass # Just used for internal shutdown control

View File

@ -107,7 +107,7 @@ class TestDispatcher(test_utils.BaseTestCase):
sorted(dispatcher._targets_priorities))
incoming = mock.Mock(ctxt={}, message=msg)
callback = dispatcher(incoming)
callback = dispatcher([incoming])
callback.run()
callback.done()
@ -144,7 +144,7 @@ class TestDispatcher(test_utils.BaseTestCase):
msg['priority'] = 'what???'
dispatcher = notify_dispatcher.NotificationDispatcher(
[mock.Mock()], [mock.Mock()], None, allow_requeue=True, pool=None)
callback = dispatcher(mock.Mock(ctxt={}, message=msg))
callback = dispatcher([mock.Mock(ctxt={}, message=msg)])
callback.run()
callback.done()
mylog.warning.assert_called_once_with('Unknown priority "%s"',
@ -246,7 +246,7 @@ class TestDispatcherFilter(test_utils.BaseTestCase):
'timestamp': '2014-03-03 18:21:04.369234',
'message_id': '99863dda-97f0-443a-a0c1-6ed317b7fd45'}
incoming = mock.Mock(ctxt=self.context, message=message)
callback = dispatcher(incoming)
callback = dispatcher([incoming])
callback.run()
callback.done()

View File

@ -0,0 +1,30 @@
# Copyright 2015 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_messaging.tests import utils as test_utils
class TestDeprecationWarning(test_utils.BaseTestCase):
@mock.patch('warnings.warn')
def test_impl_messaging_deprecation_warning(self, mock_warn):
# Tests that we get a deprecation warning when loading a messaging
# driver out of oslo_messaging.notify._impl_messaging.
from oslo_messaging.notify import _impl_messaging as messaging
driver = messaging.MessagingV2Driver(
conf={}, topics=['notifications'], transport='rpc')
# Make sure we got a deprecation warning by loading from the alias
self.assertEqual(1, mock_warn.call_count)

View File

@ -21,7 +21,9 @@ import testscenarios
import oslo_messaging
from oslo_messaging.notify import dispatcher
from oslo_messaging.notify import notifier as msg_notifier
from oslo_messaging.tests import utils as test_utils
import six
from six.moves import mock
load_tests = testscenarios.load_tests_apply_scenarios
@ -53,16 +55,18 @@ class ListenerSetupMixin(object):
def __init__(self):
self._received_msgs = 0
self.threads = []
self.lock = threading.Lock()
self.lock = threading.Condition()
def info(self, ctxt, publisher_id, event_type, payload, metadata):
def info(self, *args, **kwargs):
# NOTE(sileht): this run into an other thread
with self.lock:
self._received_msgs += 1
self.lock.notify_all()
def wait_for_messages(self, expect_messages):
while self._received_msgs < expect_messages:
time.sleep(0.01)
with self.lock:
while self._received_msgs < expect_messages:
self.lock.wait()
def stop(self):
for thread in self.threads:
@ -83,7 +87,7 @@ class ListenerSetupMixin(object):
self.trackers = {}
def _setup_listener(self, transport, endpoints,
targets=None, pool=None):
targets=None, pool=None, batch=False):
if pool is None:
tracker_name = '__default__'
@ -95,9 +99,15 @@ class ListenerSetupMixin(object):
tracker = self.trackers.setdefault(
tracker_name, self.ThreadTracker())
listener = oslo_messaging.get_notification_listener(
transport, targets=targets, endpoints=[tracker] + endpoints,
allow_requeue=True, pool=pool, executor='eventlet')
if batch:
listener = oslo_messaging.get_batch_notification_listener(
transport, targets=targets, endpoints=[tracker] + endpoints,
allow_requeue=True, pool=pool, executor='eventlet',
batch_size=batch[0], batch_timeout=batch[1])
else:
listener = oslo_messaging.get_notification_listener(
transport, targets=targets, endpoints=[tracker] + endpoints,
allow_requeue=True, pool=pool, executor='eventlet')
thread = RestartableServerThread(listener)
tracker.start(thread)
@ -124,7 +134,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
ListenerSetupMixin.setUp(self)
def test_constructor(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo')
endpoints = [object()]
@ -139,7 +150,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
self.assertEqual('blocking', listener.executor)
def test_no_target_topic(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
listener = oslo_messaging.get_notification_listener(
transport,
@ -153,7 +165,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
self.assertTrue(False)
def test_unknown_executor(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
try:
oslo_messaging.get_notification_listener(transport, [], [],
@ -164,9 +177,86 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
else:
self.assertTrue(False)
def test_one_topic(self):
def test_batch_timeout(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info.return_value = None
listener_thread = self._setup_listener(transport, [endpoint],
batch=(5, 1))
notifier = self._setup_notifier(transport)
for i in six.moves.range(12):
notifier.info({}, 'an_event.start', 'test message')
self.wait_for_messages(3)
self.assertFalse(listener_thread.stop())
messages = [dict(ctxt={},
publisher_id='testpublisher',
event_type='an_event.start',
payload='test message',
metadata={'message_id': mock.ANY,
'timestamp': mock.ANY})]
endpoint.info.assert_has_calls([mock.call(messages * 5),
mock.call(messages * 5),
mock.call(messages * 2)])
def test_batch_size(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info.return_value = None
listener_thread = self._setup_listener(transport, [endpoint],
batch=(5, None))
notifier = self._setup_notifier(transport)
for i in six.moves.range(10):
notifier.info({}, 'an_event.start', 'test message')
self.wait_for_messages(2)
self.assertFalse(listener_thread.stop())
messages = [dict(ctxt={},
publisher_id='testpublisher',
event_type='an_event.start',
payload='test message',
metadata={'message_id': mock.ANY,
'timestamp': mock.ANY})]
endpoint.info.assert_has_calls([mock.call(messages * 5),
mock.call(messages * 5)])
def test_batch_size_exception_path(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info.side_effect = [None, Exception('boom!')]
listener_thread = self._setup_listener(transport, [endpoint],
batch=(5, None))
notifier = self._setup_notifier(transport)
for i in six.moves.range(10):
notifier.info({}, 'an_event.start', 'test message')
self.wait_for_messages(2)
self.assertFalse(listener_thread.stop())
messages = [dict(ctxt={},
publisher_id='testpublisher',
event_type='an_event.start',
payload='test message',
metadata={'message_id': mock.ANY,
'timestamp': mock.ANY})]
endpoint.info.assert_has_calls([mock.call(messages * 5)])
def test_one_topic(self):
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info.return_value = None
listener_thread = self._setup_listener(transport, [endpoint])
@ -182,7 +272,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
{'message_id': mock.ANY, 'timestamp': mock.ANY})
def test_two_topics(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info.return_value = None
@ -208,7 +299,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
any_order=True)
def test_two_exchanges(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info.return_value = None
@ -252,7 +344,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
any_order=True)
def test_two_endpoints(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
endpoint1 = mock.Mock()
endpoint1.info.return_value = None
@ -277,7 +370,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
'message_id': mock.ANY})
def test_requeue(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info = mock.Mock()
@ -301,7 +395,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
{'timestamp': mock.ANY, 'message_id': mock.ANY})])
def test_two_pools(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
endpoint1 = mock.Mock()
endpoint1.info.return_value = None
@ -334,7 +429,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
mocked_endpoint_call(1)])
def test_two_pools_three_listener(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
endpoint1 = mock.Mock()
endpoint1.info.return_value = None

View File

@ -28,7 +28,8 @@ class PublishErrorsHandlerTestCase(test_utils.BaseTestCase):
def test_emit_cfg_log_notifier_in_notifier_drivers(self):
drivers = ['messaging', 'log']
self.config(notification_driver=drivers)
self.config(driver=drivers,
group='oslo_messaging_notifications')
self.stub_flg = True
transport = test_notifier._FakeTransport(self.conf)

View File

@ -49,7 +49,8 @@ class TestLogNotifier(test_utils.BaseTestCase):
def setUp(self):
super(TestLogNotifier, self).setUp()
self.addCleanup(oslo_messaging.notify._impl_test.reset)
self.config(notification_driver=['test'])
self.config(driver=['test'],
group='oslo_messaging_notifications')
# NOTE(jamespage) disable thread information logging for testing
# as this causes test failures when zmq tests monkey_patch via
# eventlet

View File

@ -156,8 +156,9 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
if self.v2:
drivers.append('messagingv2')
self.config(notification_driver=drivers,
notification_topics=self.topics)
self.config(driver=drivers,
topics=self.topics,
group='oslo_messaging_notifications')
transport = _FakeTransport(self.conf)
@ -269,7 +270,8 @@ class TestLogNotifier(test_utils.BaseTestCase):
@mock.patch('oslo_utils.timeutils.utcnow')
def test_notifier(self, mock_utcnow):
self.config(notification_driver=['log'])
self.config(driver=['log'],
group='oslo_messaging_notifications')
transport = _FakeTransport(self.conf)
@ -338,7 +340,8 @@ class TestLogNotifier(test_utils.BaseTestCase):
class TestRoutingNotifier(test_utils.BaseTestCase):
def setUp(self):
super(TestRoutingNotifier, self).setUp()
self.config(notification_driver=['routing'])
self.config(driver=['routing'],
group='oslo_messaging_notifications')
transport = _FakeTransport(self.conf)
self.notifier = oslo_messaging.Notifier(transport)
@ -360,13 +363,14 @@ class TestRoutingNotifier(test_utils.BaseTestCase):
self.assertTrue(self.router._should_load_plugin(ext))
def test_load_notifiers_no_config(self):
# default routing_notifier_config=""
# default routing_config=""
self.router._load_notifiers()
self.assertEqual({}, self.router.routing_groups)
self.assertEqual(0, len(self.router.used_drivers))
def test_load_notifiers_no_extensions(self):
self.config(routing_notifier_config="routing_notifier.yaml")
self.config(routing_config="routing_notifier.yaml",
group='oslo_messaging_notifications')
routing_config = r""
config_file = mock.MagicMock()
config_file.return_value = routing_config
@ -382,7 +386,8 @@ class TestRoutingNotifier(test_utils.BaseTestCase):
self.assertEqual({}, self.router.routing_groups)
def test_load_notifiers_config(self):
self.config(routing_notifier_config="routing_notifier.yaml")
self.config(routing_config="routing_notifier.yaml",
group='oslo_messaging_notifications')
routing_config = r"""
group_1:
rpc : foo
@ -412,7 +417,7 @@ group_1:
- blah.zoo.*
- zip
"""
groups = yaml.load(config)
groups = yaml.safe_load(config)
group = groups['group_1']
# No matching event ...
@ -443,7 +448,7 @@ group_1:
- info
- error
"""
groups = yaml.load(config)
groups = yaml.safe_load(config)
group = groups['group_1']
# No matching priority
@ -476,7 +481,7 @@ group_1:
accepted_events:
- foo.*
"""
groups = yaml.load(config)
groups = yaml.safe_load(config)
group = groups['group_1']
# Valid event, but no matching priority
@ -519,7 +524,8 @@ group_1:
sorted(pm.map.call_args[0][6]))
def test_notify_filtered(self):
self.config(routing_notifier_config="routing_notifier.yaml")
self.config(routing_config="routing_notifier.yaml",
group='oslo_messaging_notifications')
routing_config = r"""
group_1:
rpc:

View File

@ -133,7 +133,7 @@ class TestDispatcher(test_utils.BaseTestCase):
incoming = mock.Mock(ctxt=self.ctxt, message=self.msg)
incoming.reply.side_effect = check_reply
callback = dispatcher(incoming)
callback = dispatcher([incoming])
callback.run()
callback.done()

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
import time
import threading
from oslo_config import cfg
@ -20,6 +22,7 @@ import testscenarios
import mock
import oslo_messaging
from oslo_messaging import server as server_module
from oslo_messaging.tests import utils as test_utils
load_tests = testscenarios.load_tests_apply_scenarios
@ -27,22 +30,38 @@ load_tests = testscenarios.load_tests_apply_scenarios
class ServerSetupMixin(object):
class Server(object):
class Server(threading.Thread):
def __init__(self, transport, topic, server, endpoint, serializer):
self.controller = ServerSetupMixin.ServerController()
target = oslo_messaging.Target(topic=topic, server=server)
self._server = oslo_messaging.get_rpc_server(transport,
target,
[endpoint, self],
serializer=serializer)
self.server = oslo_messaging.get_rpc_server(transport,
target,
[endpoint,
self.controller],
serializer=serializer)
super(ServerSetupMixin.Server, self).__init__()
self.daemon = True
def wait(self):
# Wait for the executor to process the stop message, indicating all
# test messages have been processed
self.controller.stopped.wait()
# Check start() does nothing with a running server
self.server.start()
self.server.stop()
self.server.wait()
def run(self):
self.server.start()
class ServerController(object):
def __init__(self):
self.stopped = threading.Event()
def stop(self, ctxt):
# Check start() does nothing with a running server
self._server.start()
self._server.stop()
self._server.wait()
def start(self):
self._server.start()
self.stopped.set()
class TestSerializer(object):
@ -72,13 +91,14 @@ class ServerSetupMixin(object):
thread.daemon = True
thread.start()
return thread
return server
def _stop_server(self, client, server_thread, topic=None):
def _stop_server(self, client, server, topic=None):
if topic is not None:
client = client.prepare(topic=topic)
client.cast({}, 'stop')
server_thread.join(timeout=30)
server.wait()
def _setup_client(self, transport, topic='testtopic'):
return oslo_messaging.RPCClient(transport,
@ -117,17 +137,26 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
endpoints = [object()]
serializer = object()
class MagicMockIgnoreArgs(mock.MagicMock):
'''A MagicMock which can never misinterpret the arguments passed to
it during construction.'''
def __init__(self, *args, **kwargs):
super(MagicMockIgnoreArgs, self).__init__()
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
serializer=serializer)
# Mocking executor
server._executor = mock.Mock()
server._executor_cls = MagicMockIgnoreArgs
# Here assigning executor's listener object to listener variable
# before calling wait method, because in wait method we are
# setting executor to None.
listener = server._executor.listener
server.start()
listener = server._executor_obj.listener
server.stop()
# call server wait method
server.wait()
self.assertIsNone(server._executor)
self.assertIsNone(server._executor_obj)
self.assertEqual(1, listener.cleanup.call_count)
def test_no_target_server(self):
@ -502,3 +531,302 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
TestMultipleServers.generate_scenarios()
class TestServerLocking(test_utils.BaseTestCase):
def setUp(self):
super(TestServerLocking, self).setUp(conf=cfg.ConfigOpts())
def _logmethod(name):
def method(self):
with self._lock:
self._calls.append(name)
return method
executors = []
class FakeExecutor(object):
def __init__(self, *args, **kwargs):
self._lock = threading.Lock()
self._calls = []
self.listener = mock.MagicMock()
executors.append(self)
start = _logmethod('start')
stop = _logmethod('stop')
wait = _logmethod('wait')
execute = _logmethod('execute')
self.executors = executors
self.server = oslo_messaging.MessageHandlingServer(mock.Mock(),
mock.Mock())
self.server._executor_cls = FakeExecutor
def test_start_stop_wait(self):
# Test a simple execution of start, stop, wait in order
thread = eventlet.spawn(self.server.start)
self.server.stop()
self.server.wait()
self.assertEqual(len(self.executors), 1)
executor = self.executors[0]
self.assertEqual(executor._calls,
['start', 'execute', 'stop', 'wait'])
self.assertTrue(executor.listener.cleanup.called)
def test_reversed_order(self):
# Test that if we call wait, stop, start, these will be correctly
# reordered
wait = eventlet.spawn(self.server.wait)
# This is non-deterministic, but there's not a great deal we can do
# about that
eventlet.sleep(0)
stop = eventlet.spawn(self.server.stop)
eventlet.sleep(0)
start = eventlet.spawn(self.server.start)
self.server.wait()
self.assertEqual(len(self.executors), 1)
executor = self.executors[0]
self.assertEqual(executor._calls,
['start', 'execute', 'stop', 'wait'])
def test_wait_for_running_task(self):
# Test that if 2 threads call a method simultaneously, both will wait,
# but only 1 will call the underlying executor method.
start_event = threading.Event()
finish_event = threading.Event()
running_event = threading.Event()
done_event = threading.Event()
runner = [None]
class SteppingFakeExecutor(self.server._executor_cls):
def start(self):
# Tell the test which thread won the race
runner[0] = eventlet.getcurrent()
running_event.set()
start_event.wait()
super(SteppingFakeExecutor, self).start()
done_event.set()
finish_event.wait()
self.server._executor_cls = SteppingFakeExecutor
start1 = eventlet.spawn(self.server.start)
start2 = eventlet.spawn(self.server.start)
# Wait until one of the threads starts running
running_event.wait()
runner = runner[0]
waiter = start2 if runner == start1 else start2
waiter_finished = threading.Event()
waiter.link(lambda _: waiter_finished.set())
# At this point, runner is running start(), and waiter() is waiting for
# it to complete. runner has not yet logged anything.
self.assertEqual(1, len(self.executors))
executor = self.executors[0]
self.assertEqual(executor._calls, [])
self.assertFalse(waiter_finished.is_set())
# Let the runner log the call
start_event.set()
done_event.wait()
# We haven't signalled completion yet, so execute shouldn't have run
self.assertEqual(executor._calls, ['start'])
self.assertFalse(waiter_finished.is_set())
# Let the runner complete
finish_event.set()
waiter.wait()
runner.wait()
# Check that both threads have finished, start was only called once,
# and execute ran
self.assertTrue(waiter_finished.is_set())
self.assertEqual(executor._calls, ['start', 'execute'])
def test_start_stop_wait_stop_wait(self):
# Test that we behave correctly when calling stop/wait more than once.
# Subsequent calls should be noops.
self.server.start()
self.server.stop()
self.server.wait()
self.server.stop()
self.server.wait()
self.assertEqual(len(self.executors), 1)
executor = self.executors[0]
self.assertEqual(executor._calls,
['start', 'execute', 'stop', 'wait'])
self.assertTrue(executor.listener.cleanup.called)
def test_state_wrapping(self):
# Test that we behave correctly if a thread waits, and the server state
# has wrapped when it it next scheduled
# Ensure that if 2 threads wait for the completion of 'start', the
# first will wait until complete_event is signalled, but the second
# will continue
complete_event = threading.Event()
complete_waiting_callback = threading.Event()
start_state = self.server._states['start']
old_wait_for_completion = start_state.wait_for_completion
waited = [False]
def new_wait_for_completion(*args, **kwargs):
if not waited[0]:
waited[0] = True
complete_waiting_callback.set()
complete_event.wait()
old_wait_for_completion(*args, **kwargs)
start_state.wait_for_completion = new_wait_for_completion
# thread1 will wait for start to complete until we signal it
thread1 = eventlet.spawn(self.server.stop)
thread1_finished = threading.Event()
thread1.link(lambda _: thread1_finished.set())
self.server.start()
complete_waiting_callback.wait()
# The server should have started, but stop should not have been called
self.assertEqual(1, len(self.executors))
self.assertEqual(self.executors[0]._calls, ['start', 'execute'])
self.assertFalse(thread1_finished.is_set())
self.server.stop()
self.server.wait()
# We should have gone through all the states, and thread1 should still
# be waiting
self.assertEqual(1, len(self.executors))
self.assertEqual(self.executors[0]._calls, ['start', 'execute',
'stop', 'wait'])
self.assertFalse(thread1_finished.is_set())
# Start again
self.server.start()
# We should now record 2 executors
self.assertEqual(2, len(self.executors))
self.assertEqual(self.executors[0]._calls, ['start', 'execute',
'stop', 'wait'])
self.assertEqual(self.executors[1]._calls, ['start', 'execute'])
self.assertFalse(thread1_finished.is_set())
# Allow thread1 to complete
complete_event.set()
thread1_finished.wait()
# thread1 should now have finished, and stop should not have been
# called again on either the first or second executor
self.assertEqual(2, len(self.executors))
self.assertEqual(self.executors[0]._calls, ['start', 'execute',
'stop', 'wait'])
self.assertEqual(self.executors[1]._calls, ['start', 'execute'])
self.assertTrue(thread1_finished.is_set())
@mock.patch.object(server_module, 'DEFAULT_LOG_AFTER', 1)
@mock.patch.object(server_module, 'LOG')
def test_logging(self, mock_log):
# Test that we generate a log message if we wait longer than
# DEFAULT_LOG_AFTER
log_event = threading.Event()
mock_log.warn.side_effect = lambda _: log_event.set()
# Call stop without calling start. We should log a wait after 1 second
thread = eventlet.spawn(self.server.stop)
log_event.wait()
# Redundant given that we already waited, but it's nice to assert
self.assertTrue(mock_log.warn.called)
thread.kill()
@mock.patch.object(server_module, 'LOG')
def test_logging_explicit_wait(self, mock_log):
# Test that we generate a log message if we wait longer than
# the number of seconds passed to log_after
log_event = threading.Event()
mock_log.warn.side_effect = lambda _: log_event.set()
# Call stop without calling start. We should log a wait after 1 second
thread = eventlet.spawn(self.server.stop, log_after=1)
log_event.wait()
# Redundant given that we already waited, but it's nice to assert
self.assertTrue(mock_log.warn.called)
thread.kill()
@mock.patch.object(server_module, 'LOG')
def test_logging_with_timeout(self, mock_log):
# Test that we log a message after log_after seconds if we've also
# specified an absolute timeout
log_event = threading.Event()
mock_log.warn.side_effect = lambda _: log_event.set()
# Call stop without calling start. We should log a wait after 1 second
thread = eventlet.spawn(self.server.stop, log_after=1, timeout=2)
log_event.wait()
# Redundant given that we already waited, but it's nice to assert
self.assertTrue(mock_log.warn.called)
thread.kill()
def test_timeout_wait(self):
# Test that we will eventually timeout when passing the timeout option
# if a preceding condition is not satisfied.
self.assertRaises(server_module.TaskTimeout,
self.server.stop, timeout=1)
def test_timeout_running(self):
# Test that we will eventually timeout if we're waiting for another
# thread to complete this task
# Start the server, which will also instantiate an executor
self.server.start()
stop_called = threading.Event()
# Patch the executor's stop method to be very slow
def slow_stop():
stop_called.set()
eventlet.sleep(10)
self.executors[0].stop = slow_stop
# Call stop in a new thread
thread = eventlet.spawn(self.server.stop)
# Wait until the thread is in the slow stop method
stop_called.wait()
# Call stop again in the main thread with a timeout
self.assertRaises(server_module.TaskTimeout,
self.server.stop, timeout=1)
thread.kill()
@mock.patch.object(server_module, 'LOG')
def test_log_after_zero(self, mock_log):
# Test that we do not log a message after DEFAULT_LOG_AFTER if the
# caller gave log_after=1
# Call stop without calling start.
self.assertRaises(server_module.TaskTimeout,
self.server.stop, log_after=0, timeout=2)
# We timed out. Ensure we didn't log anything.
self.assertFalse(mock_log.warn.called)

View File

@ -38,6 +38,11 @@ pyngus = importutils.try_import("pyngus")
if pyngus:
from oslo_messaging._drivers.protocols.amqp import driver as amqp_driver
# The Cyrus-based SASL tests can only be run if the installed version of proton
# has been built with Cyrus SASL support.
_proton = importutils.try_import("proton")
CYRUS_ENABLED = (pyngus and pyngus.VERSION >= (2, 0, 0) and _proton
and getattr(_proton.SASL, "extended", lambda: False)())
LOG = logging.getLogger(__name__)
@ -55,7 +60,7 @@ class _ListenerThread(threading.Thread):
def run(self):
LOG.debug("Listener started")
while self.msg_count > 0:
in_msg = self.listener.poll()
in_msg = self.listener.poll()[0]
self.messages.put(in_msg)
self.msg_count -= 1
if in_msg.message.get('method') == 'echo':
@ -354,8 +359,7 @@ class TestAuthentication(test_utils.BaseTestCase):
driver.cleanup()
@testtools.skipUnless(pyngus and pyngus.VERSION >= (2, 0, 0),
"pyngus module not present")
@testtools.skipUnless(CYRUS_ENABLED, "Cyrus SASL not supported")
class TestCyrusAuthentication(test_utils.BaseTestCase):
"""Test the driver's Cyrus SASL integration"""

View File

@ -0,0 +1,82 @@
# Copyright 2015 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_messaging import conffixture
from oslo_messaging.tests import utils as test_utils
class TestConfFixture(test_utils.BaseTestCase):
def test_fixture_wraps_set_override(self):
conf = self.messaging_conf.conf
self.assertIsNotNone(conf.set_override.wrapped)
self.messaging_conf._teardown_decorator()
self.assertFalse(hasattr(conf.set_override, 'wrapped'))
def test_fixture_wraps_clear_override(self):
conf = self.messaging_conf.conf
self.assertIsNotNone(conf.clear_override.wrapped)
self.messaging_conf._teardown_decorator()
self.assertFalse(hasattr(conf.clear_override, 'wrapped'))
def test_fixture_setup_teardown_decorator(self):
conf = cfg.ConfigOpts()
self.assertFalse(hasattr(conf.set_override, 'wrapped'))
self.assertFalse(hasattr(conf.clear_override, 'wrapped'))
fixture = conffixture.ConfFixture(conf)
self.assertFalse(hasattr(conf.set_override, 'wrapped'))
self.assertFalse(hasattr(conf.clear_override, 'wrapped'))
self.useFixture(fixture)
self.assertTrue(hasattr(conf.set_override, 'wrapped'))
self.assertTrue(hasattr(conf.clear_override, 'wrapped'))
fixture._teardown_decorator()
self.assertFalse(hasattr(conf.set_override, 'wrapped'))
self.assertFalse(hasattr(conf.clear_override, 'wrapped'))
def test_fixture_properties(self):
conf = self.messaging_conf.conf
self.messaging_conf.transport_driver = 'fake'
self.assertEqual('fake',
self.messaging_conf.transport_driver)
self.assertEqual('fake',
conf.rpc_backend)
def test_old_notifications_config_override(self):
conf = self.messaging_conf.conf
conf.set_override(
"notification_driver", "messaging")
conf.set_override(
"notification_transport_url", "http://xyz")
conf.set_override(
"notification_topics", ['topic1'])
self.assertEqual("messaging",
conf.oslo_messaging_notifications.driver)
self.assertEqual("http://xyz",
conf.oslo_messaging_notifications.transport_url)
self.assertEqual(['topic1'],
conf.oslo_messaging_notifications.topics)
conf.clear_override("notification_driver")
conf.clear_override("notification_transport_url")
conf.clear_override("notification_topics")
self.assertEqual([],
conf.oslo_messaging_notifications.driver)
self.assertEqual(None,
conf.oslo_messaging_notifications.transport_url)
self.assertEqual(['notifications'],
conf.oslo_messaging_notifications.topics)

View File

@ -32,14 +32,13 @@ class OptsTestCase(test_utils.BaseTestCase):
super(OptsTestCase, self).setUp()
def _test_list_opts(self, result):
self.assertEqual(5, len(result))
self.assertEqual(4, len(result))
groups = [g for (g, l) in result]
self.assertIn(None, groups)
self.assertIn('matchmaker_redis', groups)
self.assertIn('oslo_messaging_amqp', groups)
self.assertIn('oslo_messaging_rabbit', groups)
self.assertIn('oslo_messaging_qpid', groups)
opt_names = [o.name for (g, l) in result for o in l]
self.assertIn('rpc_backend', opt_names)

View File

@ -66,7 +66,6 @@ class ServerThreadHelper(threading.Thread):
self.daemon = True
self._server = server
self._stop_event = threading.Event()
self._wait_event = threading.Event()
def run(self):
self._server.start()
@ -75,7 +74,6 @@ class ServerThreadHelper(threading.Thread):
self._server.start()
self._server.stop()
self._server.wait()
self._wait_event.set()
def stop(self):
self._stop_event.set()

Some files were not shown because too many files have changed in this diff Show More