diff --git a/networking_arista/ml2/mechanism_arista.py b/networking_arista/ml2/mechanism_arista.py index 8a2ebb9..d307bf5 100644 --- a/networking_arista/ml2/mechanism_arista.py +++ b/networking_arista/ml2/mechanism_arista.py @@ -18,8 +18,10 @@ import threading from neutron_lib.api.definitions import portbindings from neutron_lib import constants as n_const from neutron_lib.plugins.ml2 import api as driver_api +from neutron_lib import worker from oslo_config import cfg from oslo_log import log as logging +from oslo_service import loopingcall from oslo_utils import excutils from neutron.common import constants as neutron_const @@ -52,6 +54,63 @@ def pretty_log(tag, obj): LOG.debug(log_data) +class AristaSyncWorker(worker.BaseWorker): + def __init__(self, rpc, ndb): + super(AristaSyncWorker, self).__init__(worker_process_count=0) + self.ndb = ndb + self.rpc = rpc + self.sync_service = arista_ml2.SyncService(rpc, ndb) + rpc.sync_service = self.sync_service + self._loop = None + + def start(self): + super(AristaSyncWorker, self).start() + + self._sync_running = True + self._sync_event = threading.Event() + + self._cleanup_db() + # Registering with EOS updates self.rpc.region_updated_time. Clear it + # to force an initial sync + self.rpc.clear_region_updated_time() + + if self._loop is None: + self._loop = loopingcall.FixedIntervalLoopingCall( + self.sync_service.do_synchronize + ) + self._loop.start(interval=cfg.CONF.ml2_arista.sync_interval) + + def stop(self, graceful=False): + if self._loop is not None: + self._loop.stop() + + def wait(self): + if self._loop is not None: + self._loop.wait() + + def reset(self): + self.stop() + self.wait() + self.start() + + def _cleanup_db(self): + """Clean up any unnecessary entries in our DB.""" + + LOG.info('Arista Sync: DB Cleanup') + neutron_nets = self.ndb.get_all_networks() + arista_db_nets = db_lib.get_networks(tenant_id='any') + neutron_net_ids = set() + for net in neutron_nets: + neutron_net_ids.add(net['id']) + + # Remove networks from the Arista DB if the network does not exist in + # Neutron DB + for net_id in set(arista_db_nets.keys()).difference(neutron_net_ids): + tenant_network = arista_db_nets[net_id] + db_lib.forget_network_segment(tenant_network['tenantId'], net_id) + db_lib.forget_all_ports_for_network(net_id) + + class AristaDriver(driver_api.MechanismDriver): """Ml2 Mechanism driver for Arista networking hardware. @@ -69,7 +128,6 @@ class AristaDriver(driver_api.MechanismDriver): confg = cfg.CONF.ml2_arista self.segmentation_type = db_lib.VLAN_SEGMENTATION self.timer = None - self.sync_timeout = confg['sync_interval'] self.managed_physnets = confg['managed_physnets'] self.manage_fabric = confg['manage_fabric'] self.eos_sync_lock = threading.Lock() @@ -94,21 +152,16 @@ class AristaDriver(driver_api.MechanismDriver): LOG.error(msg) raise arista_exc.AristaRpcError(msg=msg) - self.sync_service = arista_ml2.SyncService(self.rpc, self.ndb) - self.rpc.sync_service = self.sync_service - def initialize(self): if self.rpc.check_cvx_availability(): self.rpc.register_with_eos() self.rpc.check_supported_features() - self._cleanup_db() - # Registering with EOS updates self.rpc.region_updated_time. Clear it - # to force an initial sync - self.rpc.clear_region_updated_time() - self._synchronization_thread() self.sg_handler = sec_group_callback.AristaSecurityGroupHandler(self) + def get_workers(self): + return [AristaSyncWorker(self.rpc, self.ndb)] + def create_network_precommit(self, context): """Remember the tenant, and network information.""" @@ -956,34 +1009,6 @@ class AristaDriver(driver_api.MechanismDriver): fqdns_used = cfg.CONF.ml2_arista['use_fqdn'] return hostname if fqdns_used else hostname.split('.')[0] - def _synchronization_thread(self): - with self.eos_sync_lock: - self.sync_service.do_synchronize() - - self.timer = threading.Timer(self.sync_timeout, - self._synchronization_thread) - self.timer.start() - - def stop_synchronization_thread(self): - if self.timer: - self.timer.cancel() - self.timer = None - - def _cleanup_db(self): - """Clean up any unnecessary entries in our DB.""" - neutron_nets = self.ndb.get_all_networks() - arista_db_nets = db_lib.get_networks(tenant_id='any') - neutron_net_ids = set() - for net in neutron_nets: - neutron_net_ids.add(net['id']) - - # Remove networks from the Arista DB if the network does not exist in - # Neutron DB - for net_id in set(arista_db_nets.keys()).difference(neutron_net_ids): - tenant_network = arista_db_nets[net_id] - db_lib.forget_network_segment(tenant_network['tenantId'], net_id) - db_lib.forget_all_ports_for_network(net_id) - def _network_provisioned(self, tenant_id, network_id, segmentation_id=None, segment_id=None): # If network does not exist under this tenant, diff --git a/networking_arista/tests/unit/ml2/test_arista_mechanism_driver.py b/networking_arista/tests/unit/ml2/test_arista_mechanism_driver.py index 04c22a4..8fe4d6e 100644 --- a/networking_arista/tests/unit/ml2/test_arista_mechanism_driver.py +++ b/networking_arista/tests/unit/ml2/test_arista_mechanism_driver.py @@ -1594,7 +1594,6 @@ class RealNetStorageAristaDriverTestCase(testlib_api.SqlTestCase): def tearDown(self): super(RealNetStorageAristaDriverTestCase, self).tearDown() - self.drv.stop_synchronization_thread() def test_create_and_delete_network(self): tenant_id = 'ten-1' @@ -1727,13 +1726,19 @@ class RealNetStorageAristaDriverTestCase(testlib_api.SqlTestCase): # Initialize the driver which should clean up the extra networks self.drv.initialize() - adb_networks = db_lib.get_networks(tenant_id='any') + worker = self.drv.get_workers()[0] - # 'n3' should now be deleted from the Arista DB - self.assertEqual( - set(('n1', 'n2', 'ha-network')), - set(adb_networks.keys()) - ) + with mock.patch.object(worker.sync_service, 'do_synchronize') as ds: + worker.start() + adb_networks = db_lib.get_networks(tenant_id='any') + + # 'n3' should now be deleted from the Arista DB + self.assertEqual( + set(('n1', 'n2', 'ha-network')), + set(adb_networks.keys()) + ) + + ds.assert_called_once_with() def _get_network_context(self, tenant_id, net_id, seg_id, session=None): network = {'id': net_id, diff --git a/networking_arista/tests/unit/ml2/test_mechanism_arista.py b/networking_arista/tests/unit/ml2/test_mechanism_arista.py index 38a0d66..4424fe0 100644 --- a/networking_arista/tests/unit/ml2/test_mechanism_arista.py +++ b/networking_arista/tests/unit/ml2/test_mechanism_arista.py @@ -42,7 +42,6 @@ class AristaDriverTestCase(testlib_api.SqlTestCase): def tearDown(self): super(AristaDriverTestCase, self).tearDown() - self.drv.stop_synchronization_thread() def test_create_network_precommit(self): tenant_id = 'ten-1' diff --git a/requirements.txt b/requirements.txt index 08f4700..f02f254 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,10 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0 alembic>=0.8.10 # MIT neutron-lib>=1.7.0 # Apache-2.0 oslo.i18n!=3.15.2,>=2.1.0 # Apache-2.0 +oslo.config>=4.0.0,!=4.3.0,!=4.4.0 # Apache-2.0 oslo.log>=3.22.0 # Apache-2.0 +oslo.service>=1.10.0 # Apache-2.0 +oslo.utils>=3.20.0 # Apache-2.0 requests>=2.14.2 # Apache-2.0 six>=1.9.0 # MIT SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT