Allow fake driver to consume multiple topics

This patch allow the fake driver to comsume multiple topics
with one listener.

Partial implements blueprint notification-subscriber-server

Change-Id: Ib52dc181e10b487854fbb398eda9f758232a1251
This commit is contained in:
Mehdi Abaakouk 2013-12-02 10:17:24 +01:00 committed by Mehdi Abaakouk
parent f81cde600b
commit 11a90eabc9
4 changed files with 22 additions and 16 deletions

View File

@ -66,8 +66,8 @@ class AMQPIncomingMessage(base.IncomingMessage):
class AMQPListener(base.Listener):
def __init__(self, driver, target, conn):
super(AMQPListener, self).__init__(driver, target)
def __init__(self, driver, conn):
super(AMQPListener, self).__init__(driver)
self.conn = conn
self.msg_id_cache = rpc_amqp._MsgIdCache()
self.incoming = []
@ -395,7 +395,7 @@ class AMQPDriverBase(base.BaseDriver):
def listen(self, target):
conn = self._get_connection(pooled=False)
listener = AMQPListener(self, target, conn)
listener = AMQPListener(self, conn)
conn.declare_topic_consumer(target.topic, listener)
conn.declare_topic_consumer('%s.%s' % (target.topic, target.server),

View File

@ -41,10 +41,9 @@ class IncomingMessage(object):
@six.add_metaclass(abc.ABCMeta)
class Listener(object):
def __init__(self, driver, target):
def __init__(self, driver):
self.conf = driver.conf
self.driver = driver
self.target = target
@abc.abstractmethod
def poll(self):

View File

@ -39,15 +39,17 @@ class FakeIncomingMessage(base.IncomingMessage):
class FakeListener(base.Listener):
def __init__(self, driver, target, exchange):
super(FakeListener, self).__init__(driver, target)
def __init__(self, driver, exchange, targets):
super(FakeListener, self).__init__(driver)
self._exchange = exchange
self._targets = targets
def poll(self):
while True:
(ctxt, message, reply_q) = self._exchange.poll(self.target)
if message is not None:
return FakeIncomingMessage(self, ctxt, message, reply_q)
for target in self._targets:
(ctxt, message, reply_q) = self._exchange.poll(target)
if message is not None:
return FakeIncomingMessage(self, ctxt, message, reply_q)
time.sleep(.05)
@ -80,8 +82,9 @@ class FakeExchange(object):
def poll(self, target):
with self._queues_lock:
queue = self._get_server_queue(target.topic, target.server)
if not queue:
if target.server:
queue = self._get_server_queue(target.topic, target.server)
else:
queue = self._get_topic_queue(target.topic)
return queue.pop(0) if queue else (None, None, None)
@ -152,7 +155,11 @@ class FakeDriver(base.BaseDriver):
exchange = self._get_exchange(target.exchange or
self._default_exchange)
return FakeListener(self, target, exchange)
listener = FakeListener(self, exchange,
[messaging.Target(topic=target.topic,
server=target.server),
messaging.Target(topic=target.topic)])
return listener
def cleanup(self):
pass

View File

@ -846,8 +846,8 @@ class ZmqIncomingMessage(base.IncomingMessage):
class ZmqListener(base.Listener):
def __init__(self, driver, target):
super(ZmqListener, self).__init__(driver, target)
def __init__(self, driver):
super(ZmqListener, self).__init__(driver)
self.incoming_queue = moves.queue.Queue()
def dispatch(self, ctxt, version, method, namespace, **kwargs):
@ -948,7 +948,7 @@ class ZmqDriver(base.BaseDriver):
def listen(self, target):
conn = create_connection(self.conf)
listener = ZmqListener(self, target)
listener = ZmqListener(self)
conn.create_consumer(target.topic, listener)
conn.create_consumer('%s.%s' % (target.topic, target.server),