Change cleanup behaviour in test_pub_sub.py

Changed the subscriber to connect to the loopback - as the publisher is
bound on that address, instead of the local host IP.
Modified cleanup to use addCleanup methods instead of manually
cleaning-up. This is because if an assertion fails before the cleanup
today, it will not clean-up and the next tests may fail.

Change-Id: I290d4b1e5378831e963d2f48f8e5a9fa8b17cd1a
This commit is contained in:
Shachar Snapiri 2018-03-11 15:05:17 +02:00
parent e2dc45a798
commit 3ed05afe66
1 changed files with 28 additions and 26 deletions

View File

@ -35,18 +35,20 @@ class Namespace(object):
class PubSubTestBase(test_base.DFTestBase):
def get_server_publisher(self, bind_address="127.0.0.1", port=12345):
def _get_server_publisher(self, bind_address="127.0.0.1", port=12345):
cfg.CONF.set_override('publisher_port', port, group='df')
cfg.CONF.set_override('publisher_bind_address',
bind_address, group='df')
return self._get_publisher(cfg.CONF.df.pub_sub_driver)
def stop_publisher(self, publisher):
def _stop_publisher(self, publisher):
if publisher:
publisher.close()
publisher = None
def get_subscriber(self, callback):
def _get_subscriber(self, callback, host_address=None):
if not host_address:
host_address = "127.0.0.1"
pub_sub_driver = df_utils.load_driver(
cfg.CONF.df.pub_sub_driver,
df_utils.DF_PUBSUB_DRIVER_NAMESPACE)
@ -55,7 +57,7 @@ class PubSubTestBase(test_base.DFTestBase):
subscriber.register_topic(db_common.SEND_ALL_TOPIC)
uri = '%s://%s:%s' % (
cfg.CONF.df.publisher_transport,
cfg.CONF.host,
host_address,
cfg.CONF.df.publisher_port
)
subscriber.register_listen_address(uri)
@ -85,7 +87,8 @@ class TestPubSub(PubSubTestBase):
def _db_change_callback(table, key, action, value, topic):
global events_num
events_num += 1
subscriber = self.get_subscriber(_db_change_callback)
subscriber = self._get_subscriber(_db_change_callback)
self.addCleanup(subscriber.close)
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network_id = network.create()
if cfg.CONF.df.enable_selective_topology_distribution:
@ -114,7 +117,6 @@ class TestPubSub(PubSubTestBase):
self.assertNotEqual(local_event_num, events_num)
if cfg.CONF.df.enable_selective_topology_distribution:
subscriber.unregister_topic(topic)
subscriber.close()
self.assertFalse(network.exists())
def test_pub_sub_update_port(self):
@ -129,7 +131,8 @@ class TestPubSub(PubSubTestBase):
def _db_change_callback(table, key, action, value, topic):
ns.events_num += 1
subscriber = self.get_subscriber(_db_change_callback)
subscriber = self._get_subscriber(_db_change_callback)
self.addCleanup(subscriber.close)
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network_id = network.create()
if cfg.CONF.df.enable_selective_topology_distribution:
@ -166,7 +169,6 @@ class TestPubSub(PubSubTestBase):
self.assertNotEqual(local_event_num, events_num)
if cfg.CONF.df.enable_selective_topology_distribution:
subscriber.unregister_topic(topic)
subscriber.close()
self.assertFalse(network.exists())
def test_pub_sub_event_number_different_port(self):
@ -183,8 +185,10 @@ class TestPubSub(PubSubTestBase):
ns.events_num += 1
ns.events_action = action
publisher = self.get_server_publisher()
subscriber = self.get_subscriber(_db_change_callback)
publisher = self._get_server_publisher()
self.addCleanup(self._stop_publisher, publisher)
subscriber = self._get_subscriber(_db_change_callback)
self.addCleanup(subscriber.close)
time.sleep(const.DEFAULT_CMD_TIMEOUT)
local_events_num = ns.events_num
@ -203,8 +207,6 @@ class TestPubSub(PubSubTestBase):
time.sleep(const.DEFAULT_CMD_TIMEOUT)
self.assertEqual(local_events_num + 100, ns.events_num)
subscriber.close()
self.stop_publisher(publisher)
def test_pub_sub_add_topic(self):
if not self.do_test:
@ -219,8 +221,10 @@ class TestPubSub(PubSubTestBase):
self.events_num_t += 1
self.events_action_t = action
publisher = self.get_server_publisher()
subscriber = self.get_subscriber(_db_change_callback_topic)
publisher = self._get_server_publisher()
self.addCleanup(self._stop_publisher, publisher)
subscriber = self._get_subscriber(_db_change_callback_topic)
self.addCleanup(subscriber.close)
time.sleep(const.DEFAULT_CMD_TIMEOUT)
topic = "topic"
subscriber.register_topic(topic)
@ -250,8 +254,6 @@ class TestPubSub(PubSubTestBase):
subscriber.unregister_topic(topic)
publisher.send_event(update, topic)
self.assertIsNone(self.events_action_t)
subscriber.close()
self.stop_publisher(publisher)
def test_pub_sub_register_addr(self):
if not self.do_test:
@ -266,8 +268,10 @@ class TestPubSub(PubSubTestBase):
ns.events_num += 1
ns.events_action = action
publisher = self.get_server_publisher()
subscriber = self.get_subscriber(_db_change_callback)
publisher = self._get_server_publisher()
self.addCleanup(self._stop_publisher, publisher)
subscriber = self._get_subscriber(_db_change_callback)
self.addCleanup(subscriber.close)
time.sleep(const.DEFAULT_CMD_TIMEOUT)
action = "log"
update = db_common.DbUpdate(
@ -282,7 +286,8 @@ class TestPubSub(PubSubTestBase):
time.sleep(const.DEFAULT_CMD_TIMEOUT)
self.assertEqual(ns.events_action, action)
publisher2 = self.get_server_publisher(port=12346)
publisher2 = self._get_server_publisher(port=12346)
self.addCleanup(self._stop_publisher, publisher2)
uri = '%s://%s:%s' % (
cfg.CONF.df.publisher_transport,
'127.0.0.1',
@ -295,9 +300,6 @@ class TestPubSub(PubSubTestBase):
publisher2.send_event(update)
time.sleep(const.DEFAULT_CMD_TIMEOUT)
self.assertEqual(ns.events_action, action)
subscriber.close()
self.stop_publisher(publisher)
self.stop_publisher(publisher2)
class TestMultiprocPubSub(PubSubTestBase):
@ -319,7 +321,7 @@ class TestMultiprocPubSub(PubSubTestBase):
def tearDown(self):
if self.subscriber:
self.subscriber.close()
self.stop_publisher(self.publisher)
self._stop_publisher(self.publisher)
super(TestMultiprocPubSub, self).tearDown()
def _handle_received_event(self, table, key, action, value, topic):
@ -371,15 +373,15 @@ class TestDbTableMonitors(PubSubTestBase):
self.namespace = Namespace()
self.namespace.events = []
self.namespace.has_values = False
self.publisher = self.get_server_publisher()
self.subscriber = self.get_subscriber(self._db_change_callback)
self.publisher = self._get_server_publisher()
self.subscriber = self._get_subscriber(self._db_change_callback)
self.monitor = self._create_monitor('chassis')
def tearDown(self):
if self.do_test:
self.monitor.stop()
self.subscriber.close()
self.stop_publisher(self.publisher)
self._stop_publisher(self.publisher)
super(TestDbTableMonitors, self).tearDown()
def _db_change_callback(self, table, key, action, value, topic):