Separate db_change_logic loop from nb_api

To be able to handle pubsub events (namely through db_change_callback)
the nb_api forced the user to call the process_changes() method which is
a blocking loop.
This is unusable for applications that do other things or have event
loops of their own.
In this patch this logic was moved to the local_controller as it is
specific to this use-case, and now the nb_api exposes a
register_db_change_callback method for this use case.
As a result of this change, there is no need to use the is_external_app
flag, as all it does is makes sure the subscriber is not initialized,
which happens now if the register_db_change_callback is not called.

Change-Id: I2b02fde8018f0ac209175beadf75bbb20a6480d6
This commit is contained in:
Shachar Snapiri 2018-04-26 15:20:47 +03:00 committed by Shachar Snapiri
parent f414212d79
commit d1050491cd
16 changed files with 100 additions and 70 deletions

View File

@ -179,7 +179,7 @@ def add_object_from_json(json_str, table):
:param table: table name where object should be added
:return: None
"""
nb_api = api_nb.NbApi.get_instance(False, True)
nb_api = api_nb.NbApi.get_instance(False)
try:
model = model_framework.get_model(table)
except KeyError:

View File

@ -279,7 +279,7 @@ def start(is_service):
"""main method"""
df_config.init(sys.argv)
df_utils.config_parse()
nb_api = api_nb.NbApi.get_instance(False, True)
nb_api = api_nb.NbApi.get_instance(False)
if is_service:
df_service.register_service('df-skydive-service', nb_api)
service_manager = cotyledon.ServiceManager()

View File

@ -81,7 +81,7 @@ def main():
config.init(sys.argv[1:])
config.setup_logging()
environment_setup()
nb_api = api_nb.NbApi.get_instance(False, True)
nb_api = api_nb.NbApi.get_instance(False)
service_instance = metadata_service.DFMetadataProxyHandler(
cfg.CONF, nb_api)
df_service.register_service('df-metadata-service', nb_api)

View File

@ -165,7 +165,7 @@ class BGPService(service.Service):
def main():
df_config.init(sys.argv)
nb_api = api_nb.NbApi.get_instance(False, True)
nb_api = api_nb.NbApi.get_instance(False)
server = BGPService(nb_api)
df_service.register_service('df-bgp-service', nb_api)
service.launch(cfg.CONF, server).wait()

View File

@ -14,7 +14,9 @@
# under the License.
import sys
import time
from eventlet import queue
from oslo_log import log
from oslo_service import loopingcall
from ryu.app.ofctl import service as of_service
@ -47,9 +49,11 @@ class DfLocalController(object):
def __init__(self, chassis_name, nb_api):
self.db_store = db_store.get_instance()
self._queue = queue.PriorityQueue()
self.chassis_name = chassis_name
self.nb_api = nb_api
self.nb_api.set_db_change_callback(self.db_change_callback)
self.ip = cfg.CONF.df.local_ip
# Virtual tunnel port support multiple tunnel types together
self.tunnel_types = cfg.CONF.df.tunnel_types
@ -67,6 +71,7 @@ class DfLocalController(object):
nb_api=self.nb_api,
vswitch_api=self.vswitch_api,
neutron_server_notifier=self.neutron_notifier,
db_change_callback=self.db_change_callback
)
# The OfctlService is needed to support the 'get_flows' method
self.open_flow_service = app_mgr.instantiate(of_service.OfctlService)
@ -85,8 +90,21 @@ class DfLocalController(object):
self.sync_rate_limiter = df_utils.RateLimiter(
max_rate=1, time_unit=db_common.DB_SYNC_MINIMUM_INTERVAL)
def db_change_callback(self, table, key, action, value, topic=None):
update = db_common.DbUpdate(table, key, action, value, topic=topic)
LOG.debug("Pushing Update to Queue: %s", update)
self._queue.put(update)
time.sleep(0)
def process_changes(self):
while True:
next_update = self._queue.get(block=True)
LOG.debug("Event update: %s", next_update)
self.nb_api._notification_cb(next_update)
self._queue.task_done()
def run(self):
self.vswitch_api.initialize(self.nb_api)
self.vswitch_api.initialize(self.db_change_callback)
self.nb_api.register_notification_callback(self._handle_update)
if cfg.CONF.df.enable_neutron_notifier:
self.neutron_notifier.initialize(nb_api=self.nb_api,
@ -118,11 +136,11 @@ class DfLocalController(object):
self._register_models()
self.register_chassis()
self.sync()
self.nb_api.process_changes()
self.process_changes()
def _submit_sync_event(self):
self.nb_api.db_change_callback(None, None,
ctrl_const.CONTROLLER_SYNC, None)
self.db_change_callback(None, None,
ctrl_const.CONTROLLER_SYNC, None)
def _register_models(self):
for model in model_framework.iter_models_by_dependency_order():
@ -300,7 +318,7 @@ class DfLocalController(object):
action = update.action
if action == ctrl_const.CONTROLLER_REINITIALIZE:
self.db_store.clear()
self.vswitch_api.initialize(self.nb_api)
self.vswitch_api.initialize(self.db_change_callback)
self.sync()
elif action == ctrl_const.CONTROLLER_SYNC:
self.sync()

View File

@ -147,7 +147,10 @@ class PublisherService(object):
def main():
df_config.init(sys.argv)
nb_api = api_nb.NbApi.get_instance(False, True)
# PATCH(snapiri): Disable pub_sub as it creates a publisher in nb_api
# which collides with the publisher we create here.
cfg.CONF.set_override('enable_df_pub_sub', False, group='df')
nb_api = api_nb.NbApi.get_instance(False)
service = PublisherService(nb_api)
df_service.register_service('df-publisher-service', nb_api)
service.initialize()

View File

@ -38,6 +38,7 @@ class RyuDFAdapter(ofp_handler.OFPHandler):
OF_AUTO_PORT_DESC_STATS_REQ_VER = 0x04
def __init__(self, vswitch_api, nb_api,
db_change_callback,
neutron_server_notifier=None):
super(RyuDFAdapter, self).__init__()
self.dispatcher = dispatcher.AppDispatcher(cfg.CONF.df.apps_list)
@ -47,6 +48,7 @@ class RyuDFAdapter(ofp_handler.OFPHandler):
self._datapath = None
self.table_handlers = {}
self.first_connect = True
self.db_change_callback = db_change_callback
@property
def datapath(self):
@ -111,11 +113,11 @@ class RyuDFAdapter(ofp_handler.OFPHandler):
if not self.first_connect:
# For reconnecting to the ryu controller, df needs a full sync
# in case any resource added during the disconnection.
self.nb_api.db_change_callback(None, None,
constants.CONTROLLER_REINITIALIZE,
None)
self.db_change_callback(None, None,
constants.CONTROLLER_REINITIALIZE,
None)
self.first_connect = False
self.vswitch_api.initialize(self.nb_api)
self.vswitch_api.initialize(self.db_change_callback)
def _send_port_desc_stats_request(self, datapath):
ofp_parser = datapath.ofproto_parser

View File

@ -17,7 +17,6 @@
import time
import traceback
from eventlet import queue
from jsonmodels import errors
from oslo_config import cfg
from oslo_log import log
@ -60,7 +59,6 @@ class NbApi(object):
super(NbApi, self).__init__()
self.driver = db_driver
self.controller = None
self._queue = queue.PriorityQueue()
self.use_pubsub = use_pubsub
self.publisher = None
self.subscriber = None
@ -73,7 +71,7 @@ class NbApi(object):
self.pub_sub_use_multiproc = cfg.CONF.df.pub_sub_use_multiproc
@staticmethod
def get_instance(is_neutron_server, is_external_app=False):
def get_instance(is_neutron_server):
global _nb_api
if _nb_api is None:
nb_driver = df_utils.load_driver(
@ -82,18 +80,16 @@ class NbApi(object):
# Do not use pubsub for external apps - this causes issues with
# threads and other issues.
use_pubsub = cfg.CONF.df.enable_df_pub_sub
if is_external_app:
use_pubsub = False
nb_api = NbApi(
nb_driver,
use_pubsub=use_pubsub,
is_neutron_server=is_neutron_server)
ip, port = get_db_ip_port()
nb_api.initialize(db_ip=ip, db_port=port)
nb_api._initialize(db_ip=ip, db_port=port)
_nb_api = nb_api
return _nb_api
def initialize(self, db_ip='127.0.0.1', db_port=4001):
def _initialize(self, db_ip='127.0.0.1', db_port=4001):
self.driver.initialize(db_ip, db_port, config=cfg.CONF.df)
if self.use_pubsub:
self.publisher = self._get_publisher()
@ -113,14 +109,19 @@ class NbApi(object):
if "active_port_detection" in cfg.CONF.df.apps_list:
self.publisher.initialize()
# NOTE(gampel) we want to start queuing event as soon
# as possible
self._start_subscriber()
def set_db_change_callback(self, db_change_callback):
if self.use_pubsub and not self.is_neutron_server:
# NOTE(gampel) we want to start queuing event as soon
# as possible
if not self.subscriber.is_running:
self._start_subscriber(db_change_callback)
# Register for DB Failover detection in NB Plugin
self.subscriber.set_subscriber_for_failover(
self.subscriber,
self.db_change_callback)
db_change_callback)
self.subscriber.register_hamsg_for_db()
else:
LOG.warning('Subscriber is already initialized, ignoring call')
def close(self):
if self.publisher:
@ -152,8 +153,8 @@ class NbApi(object):
df_utils.DF_PUBSUB_DRIVER_NAMESPACE)
return pub_sub_driver.get_subscriber()
def _start_subscriber(self):
self.subscriber.initialize(self.db_change_callback)
def _start_subscriber(self, db_change_callback):
self.subscriber.initialize(db_change_callback)
self.subscriber.register_topic(db_common.SEND_ALL_TOPIC)
publishers_ips = cfg.CONF.df.publishers_ips
uris = {'%s://%s:%s' % (
@ -200,19 +201,6 @@ class NbApi(object):
self.subscriber.register_topic(topic)
self.subscriber.daemonize()
def db_change_callback(self, table, key, action, value, topic=None):
update = db_common.DbUpdate(table, key, action, value, topic=topic)
LOG.debug("Pushing Update to Queue: %s", update)
self._queue.put(update)
time.sleep(0)
def process_changes(self):
while True:
next_update = self._queue.get(block=True)
LOG.debug("Event update: %s", next_update)
self._notification_cb(next_update)
self._queue.task_done()
def create(self, obj, skip_send_event=False):
"""Create the provided object in the database and publish an event
about its creation.

View File

@ -96,9 +96,9 @@ def _is_ovsport_update_valid(action, ovsport):
class DFIdl(idl.Idl):
def __init__(self, nb_api, remote, schema):
def __init__(self, remote, schema, db_change_callback):
super(DFIdl, self).__init__(remote, schema)
self.nb_api = nb_api
self.db_change_callback = db_change_callback
def notify(self, event, row, updates=None):
if not row or not hasattr(row, '_table'):
@ -109,7 +109,7 @@ class DFIdl(idl.Idl):
local_interface = ovs.OvsPort.from_idl_row(row)
action = event if event != 'update' else 'set'
if _is_ovsport_update_valid(action, local_interface):
self.nb_api.db_change_callback(
self.db_change_callback(
local_interface.table_name,
local_interface.id,
action,
@ -117,7 +117,8 @@ class DFIdl(idl.Idl):
)
def df_idl_from_server(nb_api, connection_string, schema_name):
def df_idl_from_server(connection_string, schema_name,
db_change_callback):
"""Create the Idl instance by pulling the schema from OVSDB server"""
helper = idlutils.get_schema_helper(connection_string, schema_name)
tables = ovsdb_monitor_table_filter_default
@ -126,7 +127,7 @@ def df_idl_from_server(nb_api, connection_string, schema_name):
helper.register_table(table_name)
else:
helper.register_columns(table_name, columns)
return DFIdl(nb_api, connection_string, helper)
return DFIdl(connection_string, helper, db_change_callback)
class DFOvsdbApi(impl_idl.OvsdbIdl):
@ -136,8 +137,9 @@ class DFOvsdbApi(impl_idl.OvsdbIdl):
class OvsdbIdl has defined lots of command. Dragonflow can use
them. And Dragonflow can extend its own commands in this class.
"""
def __init__(self, nb_api, db_connection, timeout):
idl = df_idl_from_server(nb_api, db_connection, 'Open_vSwitch')
def __init__(self, db_connection, timeout, db_change_callback):
idl = df_idl_from_server(db_connection, 'Open_vSwitch',
db_change_callback)
type(self).ovsdb_connection = None
ovsdb_connection = connection.Connection(idl, timeout)
super(DFOvsdbApi, self).__init__(ovsdb_connection)

View File

@ -54,17 +54,18 @@ class OvsApi(object):
else:
vlog.Vlog.init()
def initialize(self, nb_api):
def initialize(self, db_change_callback):
db_connection = ('%s:%s:%s' % (self.protocol, self.ip, self.port))
nb_api.db_change_callback(None, None,
constants.CONTROLLER_OVS_SYNC_STARTED, None)
db_change_callback(None, None,
constants.CONTROLLER_OVS_SYNC_STARTED, None)
self.ovsdb = impl_idl.DFOvsdbApi(
nb_api, db_connection, self.vsctl_timeout)
db_connection, self.vsctl_timeout,
db_change_callback)
nb_api.db_change_callback(None, None,
constants.CONTROLLER_OVS_SYNC_FINISHED, None)
db_change_callback(None, None,
constants.CONTROLLER_OVS_SYNC_FINISHED, None)
def _db_get_val(self, table, record, column, check_error=False,
log_errors=True):

View File

@ -13,7 +13,9 @@
import os
import random
import string
import time
from eventlet import queue
from oslo_log import log
from dragonflow.common import utils as df_utils
@ -57,14 +59,19 @@ class DFTestBase(base.BaseTestCase):
self.conf = cfg.CONF.df
self.integration_bridge = self.conf.integration_bridge
self._queue = queue.PriorityQueue()
self.nb_api = api_nb.NbApi.get_instance(False)
# As we are running in the same process over and over,
# do not perform redundant calls to the subscriber
if not self.nb_api.subscriber.is_running:
self.nb_api.set_db_change_callback(self._db_change_callback)
self.mgt_ip = self.conf.management_ip
self.__objects_to_close = []
self.addCleanup(self._close_stored_objects)
self.vswitch_api = utils.OvsTestApi(self.mgt_ip)
self.vswitch_api.initialize(self.nb_api)
self.vswitch_api.initialize(self._db_change_callback)
if cfg.CONF.df.enable_selective_topology_distribution:
self.start_subscribing()
@ -73,6 +80,11 @@ class DFTestBase(base.BaseTestCase):
self._publish_log_event('started')
self.addCleanup(self._publish_log_event, 'finished')
def _db_change_callback(self, table, key, action, value, topic=None):
update = db_common.DbUpdate(table, key, action, value, topic=topic)
self._queue.put(update)
time.sleep(0)
def _publish_log_event(self, event):
global _publisher
if _publisher is None:

View File

@ -64,22 +64,22 @@ class TestOvsdbMonitor(test_base.DFTestBase):
return True
def _get_wanted_vm_online(self, mac):
while self.nb_api._queue.qsize() > 0:
self.next_update = self.nb_api._queue.get()
while self._queue.qsize() > 0:
self.next_update = self._queue.get()
if self._check_wanted_vm_online(self.next_update, mac):
return True
return False
def _get_wanted_vm_offline(self, mac):
while self.nb_api._queue.qsize() > 0:
self.next_update = self.nb_api._queue.get()
while self._queue.qsize() > 0:
self.next_update = self._queue.get()
if self._check_wanted_vm_offline(self.next_update, mac):
return True
return False
def _get_all_wanted_vms_online(self, mac1, mac2):
while self.nb_api._queue.qsize() > 0:
self.next_update = self.nb_api._queue.get()
while self._queue.qsize() > 0:
self.next_update = self._queue.get()
if self._check_wanted_vm_online(self.next_update, mac1):
self.set_wanted_vms.add(mac1)
if len(self.set_wanted_vms) == 2:
@ -93,7 +93,7 @@ class TestOvsdbMonitor(test_base.DFTestBase):
return False
def test_notify_message(self):
network = self.store(objects.NetworkTestObj(self.neutron, self.nb_api))
network = objects.NetworkTestObj(self.neutron, self.nb_api)
network_id = network.create()
subnet = self.store(objects.SubnetTestObj(self.neutron, self.nb_api,
network_id))

View File

@ -374,7 +374,7 @@ class TestDbTableMonitors(PubSubTestBase):
self.namespace.events = []
self.namespace.has_values = False
self.publisher = self._get_server_publisher()
self.subscriber = self._get_subscriber(self._db_change_callback)
self.subscriber = self._get_subscriber(self._pubsub_change_callback)
self.monitor = self._create_monitor('chassis')
def tearDown(self):
@ -384,7 +384,7 @@ class TestDbTableMonitors(PubSubTestBase):
self._stop_publisher(self.publisher)
super(TestDbTableMonitors, self).tearDown()
def _db_change_callback(self, table, key, action, value, topic):
def _pubsub_change_callback(self, table, key, action, value, topic):
self.namespace.events.append({
'table': table,
'key': key,

View File

@ -33,16 +33,18 @@ class TestRyuBaseApp(test_base.DFTestBase):
ryu_cfg.CONF.ofp_listen_host = cfg.CONF.df_ryu.of_listen_address
ryu_cfg.CONF.ofp_tcp_listen_port = cfg.CONF.df_ryu.of_listen_port + 1
app_mgr = app_manager.AppManager.get_instance()
self.open_flow_app = app_mgr.instantiate(ryu_base_app.RyuDFAdapter,
vswitch_api=mock.Mock(),
nb_api=mock.Mock())
self.open_flow_app = app_mgr.instantiate(
ryu_base_app.RyuDFAdapter,
vswitch_api=mock.Mock(),
nb_api=mock.Mock(),
db_change_callback=self._db_change_callback)
self.open_flow_app.load = mock.Mock()
self.addCleanup(app_mgr.uninstantiate, self.open_flow_app.name)
test_controller = ('tcp:' + cfg.CONF.df_ryu.of_listen_address + ':' +
str(cfg.CONF.df_ryu.of_listen_port + 1))
self.vswitch_api = vswitch_impl.OvsApi(self.mgt_ip)
self.vswitch_api.initialize(self.nb_api)
self.vswitch_api.initialize(self._db_change_callback)
cur_controllers = self.vswitch_api.ovsdb.get_controller(
self.integration_bridge).execute()
cur_controllers.append(test_controller)

View File

@ -79,9 +79,10 @@ class DFAppTestBase(tests_base.BaseTestCase):
self.vswitch_api = self.controller.vswitch_api = mock.MagicMock()
kwargs = dict(
nb_api=self.controller.nb_api,
vswitch_api=self.controller.vswitch_api,
vswitch_api=self.controller.vswitch_api
)
self.controller.open_flow_app = ryu_base_app.RyuDFAdapter(**kwargs)
self.controller.open_flow_app = ryu_base_app.RyuDFAdapter(
db_change_callback=self.controller.db_change_callback, **kwargs)
self.open_flow_app = self.controller.open_flow_app
self.datapath = self.open_flow_app._datapath = mock.Mock()
self.open_flow_app.load(self.controller.open_flow_app, **kwargs)

View File

@ -31,7 +31,8 @@ class TestRyuDFAdapter(tests_base.BaseTestCase):
super(TestRyuDFAdapter, self).setUp()
self.ryu_df_adapter = ryu_base_app.RyuDFAdapter(
vswitch_api=mock.Mock(),
nb_api=mock.Mock())
nb_api=mock.Mock(),
db_change_callback=mock.Mock())
self.mock_app = mock.Mock(spec=[
'router_updated',
'router_deleted',