Move Sync Thread to ML2 worker process

Neutron changed the way resources are extended after Pike milestone-1. As a
result, there's a race condition where the sync thread
makes calls on the plugin before the ML2 plugin has finished
initialization. ML2 supports the get_workers() command to spawn agents for ML2
drivers. This change moves the sync thread to from the API Service to a
child worker process.

Change-Id: Id36018093bc1356dbe7986cbe664f07c35d6c224
This commit is contained in:
Mark McClain 2017-06-08 17:26:36 +00:00 committed by mark mcclain
parent 4dc0f37aa1
commit 83a36f22c3
4 changed files with 77 additions and 45 deletions

View File

@ -18,8 +18,10 @@ import threading
from neutron_lib.api.definitions import portbindings
from neutron_lib import constants as n_const
from neutron_lib.plugins.ml2 import api as driver_api
from neutron_lib import worker
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_utils import excutils
from neutron.common import constants as neutron_const
@ -52,6 +54,63 @@ def pretty_log(tag, obj):
LOG.debug(log_data)
class AristaSyncWorker(worker.BaseWorker):
def __init__(self, rpc, ndb):
super(AristaSyncWorker, self).__init__(worker_process_count=0)
self.ndb = ndb
self.rpc = rpc
self.sync_service = arista_ml2.SyncService(rpc, ndb)
rpc.sync_service = self.sync_service
self._loop = None
def start(self):
super(AristaSyncWorker, self).start()
self._sync_running = True
self._sync_event = threading.Event()
self._cleanup_db()
# Registering with EOS updates self.rpc.region_updated_time. Clear it
# to force an initial sync
self.rpc.clear_region_updated_time()
if self._loop is None:
self._loop = loopingcall.FixedIntervalLoopingCall(
self.sync_service.do_synchronize
)
self._loop.start(interval=cfg.CONF.ml2_arista.sync_interval)
def stop(self, graceful=False):
if self._loop is not None:
self._loop.stop()
def wait(self):
if self._loop is not None:
self._loop.wait()
def reset(self):
self.stop()
self.wait()
self.start()
def _cleanup_db(self):
"""Clean up any unnecessary entries in our DB."""
LOG.info('Arista Sync: DB Cleanup')
neutron_nets = self.ndb.get_all_networks()
arista_db_nets = db_lib.get_networks(tenant_id='any')
neutron_net_ids = set()
for net in neutron_nets:
neutron_net_ids.add(net['id'])
# Remove networks from the Arista DB if the network does not exist in
# Neutron DB
for net_id in set(arista_db_nets.keys()).difference(neutron_net_ids):
tenant_network = arista_db_nets[net_id]
db_lib.forget_network_segment(tenant_network['tenantId'], net_id)
db_lib.forget_all_ports_for_network(net_id)
class AristaDriver(driver_api.MechanismDriver):
"""Ml2 Mechanism driver for Arista networking hardware.
@ -69,7 +128,6 @@ class AristaDriver(driver_api.MechanismDriver):
confg = cfg.CONF.ml2_arista
self.segmentation_type = db_lib.VLAN_SEGMENTATION
self.timer = None
self.sync_timeout = confg['sync_interval']
self.managed_physnets = confg['managed_physnets']
self.manage_fabric = confg['manage_fabric']
self.eos_sync_lock = threading.Lock()
@ -94,21 +152,16 @@ class AristaDriver(driver_api.MechanismDriver):
LOG.error(msg)
raise arista_exc.AristaRpcError(msg=msg)
self.sync_service = arista_ml2.SyncService(self.rpc, self.ndb)
self.rpc.sync_service = self.sync_service
def initialize(self):
if self.rpc.check_cvx_availability():
self.rpc.register_with_eos()
self.rpc.check_supported_features()
self._cleanup_db()
# Registering with EOS updates self.rpc.region_updated_time. Clear it
# to force an initial sync
self.rpc.clear_region_updated_time()
self._synchronization_thread()
self.sg_handler = sec_group_callback.AristaSecurityGroupHandler(self)
def get_workers(self):
return [AristaSyncWorker(self.rpc, self.ndb)]
def create_network_precommit(self, context):
"""Remember the tenant, and network information."""
@ -956,34 +1009,6 @@ class AristaDriver(driver_api.MechanismDriver):
fqdns_used = cfg.CONF.ml2_arista['use_fqdn']
return hostname if fqdns_used else hostname.split('.')[0]
def _synchronization_thread(self):
with self.eos_sync_lock:
self.sync_service.do_synchronize()
self.timer = threading.Timer(self.sync_timeout,
self._synchronization_thread)
self.timer.start()
def stop_synchronization_thread(self):
if self.timer:
self.timer.cancel()
self.timer = None
def _cleanup_db(self):
"""Clean up any unnecessary entries in our DB."""
neutron_nets = self.ndb.get_all_networks()
arista_db_nets = db_lib.get_networks(tenant_id='any')
neutron_net_ids = set()
for net in neutron_nets:
neutron_net_ids.add(net['id'])
# Remove networks from the Arista DB if the network does not exist in
# Neutron DB
for net_id in set(arista_db_nets.keys()).difference(neutron_net_ids):
tenant_network = arista_db_nets[net_id]
db_lib.forget_network_segment(tenant_network['tenantId'], net_id)
db_lib.forget_all_ports_for_network(net_id)
def _network_provisioned(self, tenant_id, network_id,
segmentation_id=None, segment_id=None):
# If network does not exist under this tenant,

View File

@ -1594,7 +1594,6 @@ class RealNetStorageAristaDriverTestCase(testlib_api.SqlTestCase):
def tearDown(self):
super(RealNetStorageAristaDriverTestCase, self).tearDown()
self.drv.stop_synchronization_thread()
def test_create_and_delete_network(self):
tenant_id = 'ten-1'
@ -1727,13 +1726,19 @@ class RealNetStorageAristaDriverTestCase(testlib_api.SqlTestCase):
# Initialize the driver which should clean up the extra networks
self.drv.initialize()
adb_networks = db_lib.get_networks(tenant_id='any')
worker = self.drv.get_workers()[0]
# 'n3' should now be deleted from the Arista DB
self.assertEqual(
set(('n1', 'n2', 'ha-network')),
set(adb_networks.keys())
)
with mock.patch.object(worker.sync_service, 'do_synchronize') as ds:
worker.start()
adb_networks = db_lib.get_networks(tenant_id='any')
# 'n3' should now be deleted from the Arista DB
self.assertEqual(
set(('n1', 'n2', 'ha-network')),
set(adb_networks.keys())
)
ds.assert_called_once_with()
def _get_network_context(self, tenant_id, net_id, seg_id, session=None):
network = {'id': net_id,

View File

@ -42,7 +42,6 @@ class AristaDriverTestCase(testlib_api.SqlTestCase):
def tearDown(self):
super(AristaDriverTestCase, self).tearDown()
self.drv.stop_synchronization_thread()
def test_create_network_precommit(self):
tenant_id = 'ten-1'

View File

@ -6,7 +6,10 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0
alembic>=0.8.10 # MIT
neutron-lib>=1.7.0 # Apache-2.0
oslo.i18n!=3.15.2,>=2.1.0 # Apache-2.0
oslo.config>=4.0.0,!=4.3.0,!=4.4.0 # Apache-2.0
oslo.log>=3.22.0 # Apache-2.0
oslo.service>=1.10.0 # Apache-2.0
oslo.utils>=3.20.0 # Apache-2.0
requests>=2.14.2 # Apache-2.0
six>=1.9.0 # MIT
SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT