Change cleanup behaviour in test_pub_sub.py
Changed the subscriber to connect to the loopback - as the publisher is bound on that address, instead of the local host IP. Modified cleanup to use addCleanup methods instead of manually cleaning-up. This is because if an assertion fails before the cleanup today, it will not clean-up and the next tests may fail. Change-Id: I290d4b1e5378831e963d2f48f8e5a9fa8b17cd1a
This commit is contained in:
parent
e2dc45a798
commit
3ed05afe66
|
@ -35,18 +35,20 @@ class Namespace(object):
|
|||
|
||||
|
||||
class PubSubTestBase(test_base.DFTestBase):
|
||||
def get_server_publisher(self, bind_address="127.0.0.1", port=12345):
|
||||
def _get_server_publisher(self, bind_address="127.0.0.1", port=12345):
|
||||
cfg.CONF.set_override('publisher_port', port, group='df')
|
||||
cfg.CONF.set_override('publisher_bind_address',
|
||||
bind_address, group='df')
|
||||
return self._get_publisher(cfg.CONF.df.pub_sub_driver)
|
||||
|
||||
def stop_publisher(self, publisher):
|
||||
def _stop_publisher(self, publisher):
|
||||
if publisher:
|
||||
publisher.close()
|
||||
publisher = None
|
||||
|
||||
def get_subscriber(self, callback):
|
||||
def _get_subscriber(self, callback, host_address=None):
|
||||
if not host_address:
|
||||
host_address = "127.0.0.1"
|
||||
pub_sub_driver = df_utils.load_driver(
|
||||
cfg.CONF.df.pub_sub_driver,
|
||||
df_utils.DF_PUBSUB_DRIVER_NAMESPACE)
|
||||
|
@ -55,7 +57,7 @@ class PubSubTestBase(test_base.DFTestBase):
|
|||
subscriber.register_topic(db_common.SEND_ALL_TOPIC)
|
||||
uri = '%s://%s:%s' % (
|
||||
cfg.CONF.df.publisher_transport,
|
||||
cfg.CONF.host,
|
||||
host_address,
|
||||
cfg.CONF.df.publisher_port
|
||||
)
|
||||
subscriber.register_listen_address(uri)
|
||||
|
@ -85,7 +87,8 @@ class TestPubSub(PubSubTestBase):
|
|||
def _db_change_callback(table, key, action, value, topic):
|
||||
global events_num
|
||||
events_num += 1
|
||||
subscriber = self.get_subscriber(_db_change_callback)
|
||||
subscriber = self._get_subscriber(_db_change_callback)
|
||||
self.addCleanup(subscriber.close)
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network_id = network.create()
|
||||
if cfg.CONF.df.enable_selective_topology_distribution:
|
||||
|
@ -114,7 +117,6 @@ class TestPubSub(PubSubTestBase):
|
|||
self.assertNotEqual(local_event_num, events_num)
|
||||
if cfg.CONF.df.enable_selective_topology_distribution:
|
||||
subscriber.unregister_topic(topic)
|
||||
subscriber.close()
|
||||
self.assertFalse(network.exists())
|
||||
|
||||
def test_pub_sub_update_port(self):
|
||||
|
@ -129,7 +131,8 @@ class TestPubSub(PubSubTestBase):
|
|||
def _db_change_callback(table, key, action, value, topic):
|
||||
ns.events_num += 1
|
||||
|
||||
subscriber = self.get_subscriber(_db_change_callback)
|
||||
subscriber = self._get_subscriber(_db_change_callback)
|
||||
self.addCleanup(subscriber.close)
|
||||
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
|
||||
network_id = network.create()
|
||||
if cfg.CONF.df.enable_selective_topology_distribution:
|
||||
|
@ -166,7 +169,6 @@ class TestPubSub(PubSubTestBase):
|
|||
self.assertNotEqual(local_event_num, events_num)
|
||||
if cfg.CONF.df.enable_selective_topology_distribution:
|
||||
subscriber.unregister_topic(topic)
|
||||
subscriber.close()
|
||||
self.assertFalse(network.exists())
|
||||
|
||||
def test_pub_sub_event_number_different_port(self):
|
||||
|
@ -183,8 +185,10 @@ class TestPubSub(PubSubTestBase):
|
|||
ns.events_num += 1
|
||||
ns.events_action = action
|
||||
|
||||
publisher = self.get_server_publisher()
|
||||
subscriber = self.get_subscriber(_db_change_callback)
|
||||
publisher = self._get_server_publisher()
|
||||
self.addCleanup(self._stop_publisher, publisher)
|
||||
subscriber = self._get_subscriber(_db_change_callback)
|
||||
self.addCleanup(subscriber.close)
|
||||
|
||||
time.sleep(const.DEFAULT_CMD_TIMEOUT)
|
||||
local_events_num = ns.events_num
|
||||
|
@ -203,8 +207,6 @@ class TestPubSub(PubSubTestBase):
|
|||
time.sleep(const.DEFAULT_CMD_TIMEOUT)
|
||||
|
||||
self.assertEqual(local_events_num + 100, ns.events_num)
|
||||
subscriber.close()
|
||||
self.stop_publisher(publisher)
|
||||
|
||||
def test_pub_sub_add_topic(self):
|
||||
if not self.do_test:
|
||||
|
@ -219,8 +221,10 @@ class TestPubSub(PubSubTestBase):
|
|||
self.events_num_t += 1
|
||||
self.events_action_t = action
|
||||
|
||||
publisher = self.get_server_publisher()
|
||||
subscriber = self.get_subscriber(_db_change_callback_topic)
|
||||
publisher = self._get_server_publisher()
|
||||
self.addCleanup(self._stop_publisher, publisher)
|
||||
subscriber = self._get_subscriber(_db_change_callback_topic)
|
||||
self.addCleanup(subscriber.close)
|
||||
time.sleep(const.DEFAULT_CMD_TIMEOUT)
|
||||
topic = "topic"
|
||||
subscriber.register_topic(topic)
|
||||
|
@ -250,8 +254,6 @@ class TestPubSub(PubSubTestBase):
|
|||
subscriber.unregister_topic(topic)
|
||||
publisher.send_event(update, topic)
|
||||
self.assertIsNone(self.events_action_t)
|
||||
subscriber.close()
|
||||
self.stop_publisher(publisher)
|
||||
|
||||
def test_pub_sub_register_addr(self):
|
||||
if not self.do_test:
|
||||
|
@ -266,8 +268,10 @@ class TestPubSub(PubSubTestBase):
|
|||
ns.events_num += 1
|
||||
ns.events_action = action
|
||||
|
||||
publisher = self.get_server_publisher()
|
||||
subscriber = self.get_subscriber(_db_change_callback)
|
||||
publisher = self._get_server_publisher()
|
||||
self.addCleanup(self._stop_publisher, publisher)
|
||||
subscriber = self._get_subscriber(_db_change_callback)
|
||||
self.addCleanup(subscriber.close)
|
||||
time.sleep(const.DEFAULT_CMD_TIMEOUT)
|
||||
action = "log"
|
||||
update = db_common.DbUpdate(
|
||||
|
@ -282,7 +286,8 @@ class TestPubSub(PubSubTestBase):
|
|||
time.sleep(const.DEFAULT_CMD_TIMEOUT)
|
||||
self.assertEqual(ns.events_action, action)
|
||||
|
||||
publisher2 = self.get_server_publisher(port=12346)
|
||||
publisher2 = self._get_server_publisher(port=12346)
|
||||
self.addCleanup(self._stop_publisher, publisher2)
|
||||
uri = '%s://%s:%s' % (
|
||||
cfg.CONF.df.publisher_transport,
|
||||
'127.0.0.1',
|
||||
|
@ -295,9 +300,6 @@ class TestPubSub(PubSubTestBase):
|
|||
publisher2.send_event(update)
|
||||
time.sleep(const.DEFAULT_CMD_TIMEOUT)
|
||||
self.assertEqual(ns.events_action, action)
|
||||
subscriber.close()
|
||||
self.stop_publisher(publisher)
|
||||
self.stop_publisher(publisher2)
|
||||
|
||||
|
||||
class TestMultiprocPubSub(PubSubTestBase):
|
||||
|
@ -319,7 +321,7 @@ class TestMultiprocPubSub(PubSubTestBase):
|
|||
def tearDown(self):
|
||||
if self.subscriber:
|
||||
self.subscriber.close()
|
||||
self.stop_publisher(self.publisher)
|
||||
self._stop_publisher(self.publisher)
|
||||
super(TestMultiprocPubSub, self).tearDown()
|
||||
|
||||
def _handle_received_event(self, table, key, action, value, topic):
|
||||
|
@ -371,15 +373,15 @@ class TestDbTableMonitors(PubSubTestBase):
|
|||
self.namespace = Namespace()
|
||||
self.namespace.events = []
|
||||
self.namespace.has_values = False
|
||||
self.publisher = self.get_server_publisher()
|
||||
self.subscriber = self.get_subscriber(self._db_change_callback)
|
||||
self.publisher = self._get_server_publisher()
|
||||
self.subscriber = self._get_subscriber(self._db_change_callback)
|
||||
self.monitor = self._create_monitor('chassis')
|
||||
|
||||
def tearDown(self):
|
||||
if self.do_test:
|
||||
self.monitor.stop()
|
||||
self.subscriber.close()
|
||||
self.stop_publisher(self.publisher)
|
||||
self._stop_publisher(self.publisher)
|
||||
super(TestDbTableMonitors, self).tearDown()
|
||||
|
||||
def _db_change_callback(self, table, key, action, value, topic):
|
||||
|
|
Loading…
Reference in New Issue