Merge "Sync rpc fix from oslo-incubator"
This commit is contained in:
commit
c99ac7a7fe
|
@ -18,7 +18,6 @@
|
||||||
import functools
|
import functools
|
||||||
import itertools
|
import itertools
|
||||||
import time
|
import time
|
||||||
import uuid
|
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
import greenlet
|
import greenlet
|
||||||
|
@ -124,7 +123,6 @@ class ConsumerBase(object):
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"link": {
|
"link": {
|
||||||
"name": link_name,
|
|
||||||
"durable": True,
|
"durable": True,
|
||||||
"x-declare": {
|
"x-declare": {
|
||||||
"durable": False,
|
"durable": False,
|
||||||
|
@ -139,6 +137,7 @@ class ConsumerBase(object):
|
||||||
"link": {
|
"link": {
|
||||||
"x-declare": {
|
"x-declare": {
|
||||||
"auto-delete": True,
|
"auto-delete": True,
|
||||||
|
"exclusive": False,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -146,6 +145,8 @@ class ConsumerBase(object):
|
||||||
raise_invalid_topology_version()
|
raise_invalid_topology_version()
|
||||||
|
|
||||||
addr_opts["link"]["x-declare"].update(link_opts)
|
addr_opts["link"]["x-declare"].update(link_opts)
|
||||||
|
if link_name:
|
||||||
|
addr_opts["link"]["name"] = link_name
|
||||||
|
|
||||||
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
||||||
|
|
||||||
|
@ -220,14 +221,16 @@ class DirectConsumer(ConsumerBase):
|
||||||
if conf.qpid_topology_version == 1:
|
if conf.qpid_topology_version == 1:
|
||||||
node_name = "%s/%s" % (msg_id, msg_id)
|
node_name = "%s/%s" % (msg_id, msg_id)
|
||||||
node_opts = {"type": "direct"}
|
node_opts = {"type": "direct"}
|
||||||
|
link_name = msg_id
|
||||||
elif conf.qpid_topology_version == 2:
|
elif conf.qpid_topology_version == 2:
|
||||||
node_name = "amq.direct/%s" % msg_id
|
node_name = "amq.direct/%s" % msg_id
|
||||||
node_opts = {}
|
node_opts = {}
|
||||||
|
link_name = None
|
||||||
else:
|
else:
|
||||||
raise_invalid_topology_version()
|
raise_invalid_topology_version()
|
||||||
|
|
||||||
super(DirectConsumer, self).__init__(conf, session, callback,
|
super(DirectConsumer, self).__init__(conf, session, callback,
|
||||||
node_name, node_opts, msg_id,
|
node_name, node_opts, link_name,
|
||||||
link_opts)
|
link_opts)
|
||||||
|
|
||||||
|
|
||||||
|
@ -279,30 +282,16 @@ class FanoutConsumer(ConsumerBase):
|
||||||
if conf.qpid_topology_version == 1:
|
if conf.qpid_topology_version == 1:
|
||||||
node_name = "%s_fanout" % topic
|
node_name = "%s_fanout" % topic
|
||||||
node_opts = {"durable": False, "type": "fanout"}
|
node_opts = {"durable": False, "type": "fanout"}
|
||||||
link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex)
|
|
||||||
elif conf.qpid_topology_version == 2:
|
elif conf.qpid_topology_version == 2:
|
||||||
node_name = "amq.topic/fanout/%s" % topic
|
node_name = "amq.topic/fanout/%s" % topic
|
||||||
node_opts = {}
|
node_opts = {}
|
||||||
link_name = ""
|
|
||||||
else:
|
else:
|
||||||
raise_invalid_topology_version()
|
raise_invalid_topology_version()
|
||||||
|
|
||||||
super(FanoutConsumer, self).__init__(conf, session, callback,
|
super(FanoutConsumer, self).__init__(conf, session, callback,
|
||||||
node_name, node_opts, link_name,
|
node_name, node_opts, None,
|
||||||
link_opts)
|
link_opts)
|
||||||
|
|
||||||
def reconnect(self, session):
|
|
||||||
topic = self.get_node_name().rpartition('_fanout')[0]
|
|
||||||
params = {
|
|
||||||
'session': session,
|
|
||||||
'topic': topic,
|
|
||||||
'callback': self.callback,
|
|
||||||
}
|
|
||||||
|
|
||||||
self.__init__(conf=self.conf, **params)
|
|
||||||
|
|
||||||
super(FanoutConsumer, self).reconnect(session)
|
|
||||||
|
|
||||||
|
|
||||||
class Publisher(object):
|
class Publisher(object):
|
||||||
"""Base Publisher class."""
|
"""Base Publisher class."""
|
||||||
|
|
Loading…
Reference in New Issue