From d1050491cd41792d2b7cbd8161d8efde66059d6d Mon Sep 17 00:00:00 2001 From: Shachar Snapiri Date: Thu, 26 Apr 2018 15:20:47 +0300 Subject: [PATCH] Separate db_change_logic loop from nb_api To be able to handle pubsub events (namely through db_change_callback) the nb_api forced the user to call the process_changes() method which is a blocking loop. This is unusable for applications that do other things or have event loops of their own. In this patch this logic was moved to the local_controller as it is specific to this use-case, and now the nb_api exposes a register_db_change_callback method for this use case. As a result of this change, there is no need to use the is_external_app flag, as all it does is makes sure the subscriber is not initialized, which happens now if the register_db_change_callback is not called. Change-Id: I2b02fde8018f0ac209175beadf75bbb20a6480d6 --- dragonflow/cli/df_db.py | 2 +- dragonflow/cmd/df_skydive_service.py | 2 +- .../cmd/eventlet/df_metadata_service.py | 2 +- dragonflow/controller/df_bgp_service.py | 2 +- dragonflow/controller/df_local_controller.py | 28 ++++++++++--- dragonflow/controller/df_publisher_service.py | 5 ++- dragonflow/controller/ryu_base_app.py | 10 +++-- dragonflow/db/api_nb.py | 40 +++++++------------ dragonflow/ovsdb/impl_idl.py | 16 ++++---- dragonflow/ovsdb/vswitch_impl.py | 13 +++--- dragonflow/tests/fullstack/test_base.py | 14 ++++++- .../tests/fullstack/test_ovsdb_monitor.py | 14 +++---- dragonflow/tests/fullstack/test_pub_sub.py | 4 +- .../tests/fullstack/test_ryu_base_app.py | 10 +++-- dragonflow/tests/unit/test_app_base.py | 5 ++- dragonflow/tests/unit/test_ryu_base_app.py | 3 +- 16 files changed, 100 insertions(+), 70 deletions(-) diff --git a/dragonflow/cli/df_db.py b/dragonflow/cli/df_db.py index a1414b667..123bbaf4c 100644 --- a/dragonflow/cli/df_db.py +++ b/dragonflow/cli/df_db.py @@ -179,7 +179,7 @@ def add_object_from_json(json_str, table): :param table: table name where object should be added :return: None """ - nb_api = api_nb.NbApi.get_instance(False, True) + nb_api = api_nb.NbApi.get_instance(False) try: model = model_framework.get_model(table) except KeyError: diff --git a/dragonflow/cmd/df_skydive_service.py b/dragonflow/cmd/df_skydive_service.py index 6e046a47f..14c1e0a91 100644 --- a/dragonflow/cmd/df_skydive_service.py +++ b/dragonflow/cmd/df_skydive_service.py @@ -279,7 +279,7 @@ def start(is_service): """main method""" df_config.init(sys.argv) df_utils.config_parse() - nb_api = api_nb.NbApi.get_instance(False, True) + nb_api = api_nb.NbApi.get_instance(False) if is_service: df_service.register_service('df-skydive-service', nb_api) service_manager = cotyledon.ServiceManager() diff --git a/dragonflow/cmd/eventlet/df_metadata_service.py b/dragonflow/cmd/eventlet/df_metadata_service.py index cd5e40e63..1851e7cbc 100644 --- a/dragonflow/cmd/eventlet/df_metadata_service.py +++ b/dragonflow/cmd/eventlet/df_metadata_service.py @@ -81,7 +81,7 @@ def main(): config.init(sys.argv[1:]) config.setup_logging() environment_setup() - nb_api = api_nb.NbApi.get_instance(False, True) + nb_api = api_nb.NbApi.get_instance(False) service_instance = metadata_service.DFMetadataProxyHandler( cfg.CONF, nb_api) df_service.register_service('df-metadata-service', nb_api) diff --git a/dragonflow/controller/df_bgp_service.py b/dragonflow/controller/df_bgp_service.py index df89cf695..070bca6e8 100644 --- a/dragonflow/controller/df_bgp_service.py +++ b/dragonflow/controller/df_bgp_service.py @@ -165,7 +165,7 @@ class BGPService(service.Service): def main(): df_config.init(sys.argv) - nb_api = api_nb.NbApi.get_instance(False, True) + nb_api = api_nb.NbApi.get_instance(False) server = BGPService(nb_api) df_service.register_service('df-bgp-service', nb_api) service.launch(cfg.CONF, server).wait() diff --git a/dragonflow/controller/df_local_controller.py b/dragonflow/controller/df_local_controller.py index 5a7c8159a..40ee34bd5 100644 --- a/dragonflow/controller/df_local_controller.py +++ b/dragonflow/controller/df_local_controller.py @@ -14,7 +14,9 @@ # under the License. import sys +import time +from eventlet import queue from oslo_log import log from oslo_service import loopingcall from ryu.app.ofctl import service as of_service @@ -47,9 +49,11 @@ class DfLocalController(object): def __init__(self, chassis_name, nb_api): self.db_store = db_store.get_instance() + self._queue = queue.PriorityQueue() self.chassis_name = chassis_name self.nb_api = nb_api + self.nb_api.set_db_change_callback(self.db_change_callback) self.ip = cfg.CONF.df.local_ip # Virtual tunnel port support multiple tunnel types together self.tunnel_types = cfg.CONF.df.tunnel_types @@ -67,6 +71,7 @@ class DfLocalController(object): nb_api=self.nb_api, vswitch_api=self.vswitch_api, neutron_server_notifier=self.neutron_notifier, + db_change_callback=self.db_change_callback ) # The OfctlService is needed to support the 'get_flows' method self.open_flow_service = app_mgr.instantiate(of_service.OfctlService) @@ -85,8 +90,21 @@ class DfLocalController(object): self.sync_rate_limiter = df_utils.RateLimiter( max_rate=1, time_unit=db_common.DB_SYNC_MINIMUM_INTERVAL) + def db_change_callback(self, table, key, action, value, topic=None): + update = db_common.DbUpdate(table, key, action, value, topic=topic) + LOG.debug("Pushing Update to Queue: %s", update) + self._queue.put(update) + time.sleep(0) + + def process_changes(self): + while True: + next_update = self._queue.get(block=True) + LOG.debug("Event update: %s", next_update) + self.nb_api._notification_cb(next_update) + self._queue.task_done() + def run(self): - self.vswitch_api.initialize(self.nb_api) + self.vswitch_api.initialize(self.db_change_callback) self.nb_api.register_notification_callback(self._handle_update) if cfg.CONF.df.enable_neutron_notifier: self.neutron_notifier.initialize(nb_api=self.nb_api, @@ -118,11 +136,11 @@ class DfLocalController(object): self._register_models() self.register_chassis() self.sync() - self.nb_api.process_changes() + self.process_changes() def _submit_sync_event(self): - self.nb_api.db_change_callback(None, None, - ctrl_const.CONTROLLER_SYNC, None) + self.db_change_callback(None, None, + ctrl_const.CONTROLLER_SYNC, None) def _register_models(self): for model in model_framework.iter_models_by_dependency_order(): @@ -300,7 +318,7 @@ class DfLocalController(object): action = update.action if action == ctrl_const.CONTROLLER_REINITIALIZE: self.db_store.clear() - self.vswitch_api.initialize(self.nb_api) + self.vswitch_api.initialize(self.db_change_callback) self.sync() elif action == ctrl_const.CONTROLLER_SYNC: self.sync() diff --git a/dragonflow/controller/df_publisher_service.py b/dragonflow/controller/df_publisher_service.py index fabe938a2..14228c7cf 100644 --- a/dragonflow/controller/df_publisher_service.py +++ b/dragonflow/controller/df_publisher_service.py @@ -147,7 +147,10 @@ class PublisherService(object): def main(): df_config.init(sys.argv) - nb_api = api_nb.NbApi.get_instance(False, True) + # PATCH(snapiri): Disable pub_sub as it creates a publisher in nb_api + # which collides with the publisher we create here. + cfg.CONF.set_override('enable_df_pub_sub', False, group='df') + nb_api = api_nb.NbApi.get_instance(False) service = PublisherService(nb_api) df_service.register_service('df-publisher-service', nb_api) service.initialize() diff --git a/dragonflow/controller/ryu_base_app.py b/dragonflow/controller/ryu_base_app.py index 6e623d069..b5fb9bdc3 100644 --- a/dragonflow/controller/ryu_base_app.py +++ b/dragonflow/controller/ryu_base_app.py @@ -38,6 +38,7 @@ class RyuDFAdapter(ofp_handler.OFPHandler): OF_AUTO_PORT_DESC_STATS_REQ_VER = 0x04 def __init__(self, vswitch_api, nb_api, + db_change_callback, neutron_server_notifier=None): super(RyuDFAdapter, self).__init__() self.dispatcher = dispatcher.AppDispatcher(cfg.CONF.df.apps_list) @@ -47,6 +48,7 @@ class RyuDFAdapter(ofp_handler.OFPHandler): self._datapath = None self.table_handlers = {} self.first_connect = True + self.db_change_callback = db_change_callback @property def datapath(self): @@ -111,11 +113,11 @@ class RyuDFAdapter(ofp_handler.OFPHandler): if not self.first_connect: # For reconnecting to the ryu controller, df needs a full sync # in case any resource added during the disconnection. - self.nb_api.db_change_callback(None, None, - constants.CONTROLLER_REINITIALIZE, - None) + self.db_change_callback(None, None, + constants.CONTROLLER_REINITIALIZE, + None) self.first_connect = False - self.vswitch_api.initialize(self.nb_api) + self.vswitch_api.initialize(self.db_change_callback) def _send_port_desc_stats_request(self, datapath): ofp_parser = datapath.ofproto_parser diff --git a/dragonflow/db/api_nb.py b/dragonflow/db/api_nb.py index a10b92eac..8fe252062 100644 --- a/dragonflow/db/api_nb.py +++ b/dragonflow/db/api_nb.py @@ -17,7 +17,6 @@ import time import traceback -from eventlet import queue from jsonmodels import errors from oslo_config import cfg from oslo_log import log @@ -60,7 +59,6 @@ class NbApi(object): super(NbApi, self).__init__() self.driver = db_driver self.controller = None - self._queue = queue.PriorityQueue() self.use_pubsub = use_pubsub self.publisher = None self.subscriber = None @@ -73,7 +71,7 @@ class NbApi(object): self.pub_sub_use_multiproc = cfg.CONF.df.pub_sub_use_multiproc @staticmethod - def get_instance(is_neutron_server, is_external_app=False): + def get_instance(is_neutron_server): global _nb_api if _nb_api is None: nb_driver = df_utils.load_driver( @@ -82,18 +80,16 @@ class NbApi(object): # Do not use pubsub for external apps - this causes issues with # threads and other issues. use_pubsub = cfg.CONF.df.enable_df_pub_sub - if is_external_app: - use_pubsub = False nb_api = NbApi( nb_driver, use_pubsub=use_pubsub, is_neutron_server=is_neutron_server) ip, port = get_db_ip_port() - nb_api.initialize(db_ip=ip, db_port=port) + nb_api._initialize(db_ip=ip, db_port=port) _nb_api = nb_api return _nb_api - def initialize(self, db_ip='127.0.0.1', db_port=4001): + def _initialize(self, db_ip='127.0.0.1', db_port=4001): self.driver.initialize(db_ip, db_port, config=cfg.CONF.df) if self.use_pubsub: self.publisher = self._get_publisher() @@ -113,14 +109,19 @@ class NbApi(object): if "active_port_detection" in cfg.CONF.df.apps_list: self.publisher.initialize() - # NOTE(gampel) we want to start queuing event as soon - # as possible - self._start_subscriber() + def set_db_change_callback(self, db_change_callback): + if self.use_pubsub and not self.is_neutron_server: + # NOTE(gampel) we want to start queuing event as soon + # as possible + if not self.subscriber.is_running: + self._start_subscriber(db_change_callback) # Register for DB Failover detection in NB Plugin self.subscriber.set_subscriber_for_failover( self.subscriber, - self.db_change_callback) + db_change_callback) self.subscriber.register_hamsg_for_db() + else: + LOG.warning('Subscriber is already initialized, ignoring call') def close(self): if self.publisher: @@ -152,8 +153,8 @@ class NbApi(object): df_utils.DF_PUBSUB_DRIVER_NAMESPACE) return pub_sub_driver.get_subscriber() - def _start_subscriber(self): - self.subscriber.initialize(self.db_change_callback) + def _start_subscriber(self, db_change_callback): + self.subscriber.initialize(db_change_callback) self.subscriber.register_topic(db_common.SEND_ALL_TOPIC) publishers_ips = cfg.CONF.df.publishers_ips uris = {'%s://%s:%s' % ( @@ -200,19 +201,6 @@ class NbApi(object): self.subscriber.register_topic(topic) self.subscriber.daemonize() - def db_change_callback(self, table, key, action, value, topic=None): - update = db_common.DbUpdate(table, key, action, value, topic=topic) - LOG.debug("Pushing Update to Queue: %s", update) - self._queue.put(update) - time.sleep(0) - - def process_changes(self): - while True: - next_update = self._queue.get(block=True) - LOG.debug("Event update: %s", next_update) - self._notification_cb(next_update) - self._queue.task_done() - def create(self, obj, skip_send_event=False): """Create the provided object in the database and publish an event about its creation. diff --git a/dragonflow/ovsdb/impl_idl.py b/dragonflow/ovsdb/impl_idl.py index 6372c47bb..7fd3ff00d 100644 --- a/dragonflow/ovsdb/impl_idl.py +++ b/dragonflow/ovsdb/impl_idl.py @@ -96,9 +96,9 @@ def _is_ovsport_update_valid(action, ovsport): class DFIdl(idl.Idl): - def __init__(self, nb_api, remote, schema): + def __init__(self, remote, schema, db_change_callback): super(DFIdl, self).__init__(remote, schema) - self.nb_api = nb_api + self.db_change_callback = db_change_callback def notify(self, event, row, updates=None): if not row or not hasattr(row, '_table'): @@ -109,7 +109,7 @@ class DFIdl(idl.Idl): local_interface = ovs.OvsPort.from_idl_row(row) action = event if event != 'update' else 'set' if _is_ovsport_update_valid(action, local_interface): - self.nb_api.db_change_callback( + self.db_change_callback( local_interface.table_name, local_interface.id, action, @@ -117,7 +117,8 @@ class DFIdl(idl.Idl): ) -def df_idl_from_server(nb_api, connection_string, schema_name): +def df_idl_from_server(connection_string, schema_name, + db_change_callback): """Create the Idl instance by pulling the schema from OVSDB server""" helper = idlutils.get_schema_helper(connection_string, schema_name) tables = ovsdb_monitor_table_filter_default @@ -126,7 +127,7 @@ def df_idl_from_server(nb_api, connection_string, schema_name): helper.register_table(table_name) else: helper.register_columns(table_name, columns) - return DFIdl(nb_api, connection_string, helper) + return DFIdl(connection_string, helper, db_change_callback) class DFOvsdbApi(impl_idl.OvsdbIdl): @@ -136,8 +137,9 @@ class DFOvsdbApi(impl_idl.OvsdbIdl): class OvsdbIdl has defined lots of command. Dragonflow can use them. And Dragonflow can extend its own commands in this class. """ - def __init__(self, nb_api, db_connection, timeout): - idl = df_idl_from_server(nb_api, db_connection, 'Open_vSwitch') + def __init__(self, db_connection, timeout, db_change_callback): + idl = df_idl_from_server(db_connection, 'Open_vSwitch', + db_change_callback) type(self).ovsdb_connection = None ovsdb_connection = connection.Connection(idl, timeout) super(DFOvsdbApi, self).__init__(ovsdb_connection) diff --git a/dragonflow/ovsdb/vswitch_impl.py b/dragonflow/ovsdb/vswitch_impl.py index e4669778e..8090fcb8a 100644 --- a/dragonflow/ovsdb/vswitch_impl.py +++ b/dragonflow/ovsdb/vswitch_impl.py @@ -54,17 +54,18 @@ class OvsApi(object): else: vlog.Vlog.init() - def initialize(self, nb_api): + def initialize(self, db_change_callback): db_connection = ('%s:%s:%s' % (self.protocol, self.ip, self.port)) - nb_api.db_change_callback(None, None, - constants.CONTROLLER_OVS_SYNC_STARTED, None) + db_change_callback(None, None, + constants.CONTROLLER_OVS_SYNC_STARTED, None) self.ovsdb = impl_idl.DFOvsdbApi( - nb_api, db_connection, self.vsctl_timeout) + db_connection, self.vsctl_timeout, + db_change_callback) - nb_api.db_change_callback(None, None, - constants.CONTROLLER_OVS_SYNC_FINISHED, None) + db_change_callback(None, None, + constants.CONTROLLER_OVS_SYNC_FINISHED, None) def _db_get_val(self, table, record, column, check_error=False, log_errors=True): diff --git a/dragonflow/tests/fullstack/test_base.py b/dragonflow/tests/fullstack/test_base.py index 635ae74aa..00e49cd51 100644 --- a/dragonflow/tests/fullstack/test_base.py +++ b/dragonflow/tests/fullstack/test_base.py @@ -13,7 +13,9 @@ import os import random import string +import time +from eventlet import queue from oslo_log import log from dragonflow.common import utils as df_utils @@ -57,14 +59,19 @@ class DFTestBase(base.BaseTestCase): self.conf = cfg.CONF.df self.integration_bridge = self.conf.integration_bridge + self._queue = queue.PriorityQueue() self.nb_api = api_nb.NbApi.get_instance(False) + # As we are running in the same process over and over, + # do not perform redundant calls to the subscriber + if not self.nb_api.subscriber.is_running: + self.nb_api.set_db_change_callback(self._db_change_callback) self.mgt_ip = self.conf.management_ip self.__objects_to_close = [] self.addCleanup(self._close_stored_objects) self.vswitch_api = utils.OvsTestApi(self.mgt_ip) - self.vswitch_api.initialize(self.nb_api) + self.vswitch_api.initialize(self._db_change_callback) if cfg.CONF.df.enable_selective_topology_distribution: self.start_subscribing() @@ -73,6 +80,11 @@ class DFTestBase(base.BaseTestCase): self._publish_log_event('started') self.addCleanup(self._publish_log_event, 'finished') + def _db_change_callback(self, table, key, action, value, topic=None): + update = db_common.DbUpdate(table, key, action, value, topic=topic) + self._queue.put(update) + time.sleep(0) + def _publish_log_event(self, event): global _publisher if _publisher is None: diff --git a/dragonflow/tests/fullstack/test_ovsdb_monitor.py b/dragonflow/tests/fullstack/test_ovsdb_monitor.py index 41b34b78b..1335eeeb5 100644 --- a/dragonflow/tests/fullstack/test_ovsdb_monitor.py +++ b/dragonflow/tests/fullstack/test_ovsdb_monitor.py @@ -64,22 +64,22 @@ class TestOvsdbMonitor(test_base.DFTestBase): return True def _get_wanted_vm_online(self, mac): - while self.nb_api._queue.qsize() > 0: - self.next_update = self.nb_api._queue.get() + while self._queue.qsize() > 0: + self.next_update = self._queue.get() if self._check_wanted_vm_online(self.next_update, mac): return True return False def _get_wanted_vm_offline(self, mac): - while self.nb_api._queue.qsize() > 0: - self.next_update = self.nb_api._queue.get() + while self._queue.qsize() > 0: + self.next_update = self._queue.get() if self._check_wanted_vm_offline(self.next_update, mac): return True return False def _get_all_wanted_vms_online(self, mac1, mac2): - while self.nb_api._queue.qsize() > 0: - self.next_update = self.nb_api._queue.get() + while self._queue.qsize() > 0: + self.next_update = self._queue.get() if self._check_wanted_vm_online(self.next_update, mac1): self.set_wanted_vms.add(mac1) if len(self.set_wanted_vms) == 2: @@ -93,7 +93,7 @@ class TestOvsdbMonitor(test_base.DFTestBase): return False def test_notify_message(self): - network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api)) + network = objects.NetworkTestObj(self.neutron, self.nb_api) network_id = network.create() subnet = self.store(objects.SubnetTestObj(self.neutron, self.nb_api, network_id)) diff --git a/dragonflow/tests/fullstack/test_pub_sub.py b/dragonflow/tests/fullstack/test_pub_sub.py index 4b0ec5826..20bca6cd9 100644 --- a/dragonflow/tests/fullstack/test_pub_sub.py +++ b/dragonflow/tests/fullstack/test_pub_sub.py @@ -374,7 +374,7 @@ class TestDbTableMonitors(PubSubTestBase): self.namespace.events = [] self.namespace.has_values = False self.publisher = self._get_server_publisher() - self.subscriber = self._get_subscriber(self._db_change_callback) + self.subscriber = self._get_subscriber(self._pubsub_change_callback) self.monitor = self._create_monitor('chassis') def tearDown(self): @@ -384,7 +384,7 @@ class TestDbTableMonitors(PubSubTestBase): self._stop_publisher(self.publisher) super(TestDbTableMonitors, self).tearDown() - def _db_change_callback(self, table, key, action, value, topic): + def _pubsub_change_callback(self, table, key, action, value, topic): self.namespace.events.append({ 'table': table, 'key': key, diff --git a/dragonflow/tests/fullstack/test_ryu_base_app.py b/dragonflow/tests/fullstack/test_ryu_base_app.py index 8432d8484..3f887ce9a 100644 --- a/dragonflow/tests/fullstack/test_ryu_base_app.py +++ b/dragonflow/tests/fullstack/test_ryu_base_app.py @@ -33,16 +33,18 @@ class TestRyuBaseApp(test_base.DFTestBase): ryu_cfg.CONF.ofp_listen_host = cfg.CONF.df_ryu.of_listen_address ryu_cfg.CONF.ofp_tcp_listen_port = cfg.CONF.df_ryu.of_listen_port + 1 app_mgr = app_manager.AppManager.get_instance() - self.open_flow_app = app_mgr.instantiate(ryu_base_app.RyuDFAdapter, - vswitch_api=mock.Mock(), - nb_api=mock.Mock()) + self.open_flow_app = app_mgr.instantiate( + ryu_base_app.RyuDFAdapter, + vswitch_api=mock.Mock(), + nb_api=mock.Mock(), + db_change_callback=self._db_change_callback) self.open_flow_app.load = mock.Mock() self.addCleanup(app_mgr.uninstantiate, self.open_flow_app.name) test_controller = ('tcp:' + cfg.CONF.df_ryu.of_listen_address + ':' + str(cfg.CONF.df_ryu.of_listen_port + 1)) self.vswitch_api = vswitch_impl.OvsApi(self.mgt_ip) - self.vswitch_api.initialize(self.nb_api) + self.vswitch_api.initialize(self._db_change_callback) cur_controllers = self.vswitch_api.ovsdb.get_controller( self.integration_bridge).execute() cur_controllers.append(test_controller) diff --git a/dragonflow/tests/unit/test_app_base.py b/dragonflow/tests/unit/test_app_base.py index 45d2968db..fc017065b 100644 --- a/dragonflow/tests/unit/test_app_base.py +++ b/dragonflow/tests/unit/test_app_base.py @@ -79,9 +79,10 @@ class DFAppTestBase(tests_base.BaseTestCase): self.vswitch_api = self.controller.vswitch_api = mock.MagicMock() kwargs = dict( nb_api=self.controller.nb_api, - vswitch_api=self.controller.vswitch_api, + vswitch_api=self.controller.vswitch_api ) - self.controller.open_flow_app = ryu_base_app.RyuDFAdapter(**kwargs) + self.controller.open_flow_app = ryu_base_app.RyuDFAdapter( + db_change_callback=self.controller.db_change_callback, **kwargs) self.open_flow_app = self.controller.open_flow_app self.datapath = self.open_flow_app._datapath = mock.Mock() self.open_flow_app.load(self.controller.open_flow_app, **kwargs) diff --git a/dragonflow/tests/unit/test_ryu_base_app.py b/dragonflow/tests/unit/test_ryu_base_app.py index 36fe024c2..7a340294d 100644 --- a/dragonflow/tests/unit/test_ryu_base_app.py +++ b/dragonflow/tests/unit/test_ryu_base_app.py @@ -31,7 +31,8 @@ class TestRyuDFAdapter(tests_base.BaseTestCase): super(TestRyuDFAdapter, self).setUp() self.ryu_df_adapter = ryu_base_app.RyuDFAdapter( vswitch_api=mock.Mock(), - nb_api=mock.Mock()) + nb_api=mock.Mock(), + db_change_callback=mock.Mock()) self.mock_app = mock.Mock(spec=[ 'router_updated', 'router_deleted',