Fixes sync between Arista ML2 driver and EOS

A bug in the sync code prevented the Arista ML2 driver from syncing with EOS.
This patch fixes the bug and adds some unit tests to ensure that the sync
behaves as expected.

This patch is not being submitted as a back-port because the fix
was submitted in stackforge/networking-arista as a part of 
decomposition effort under patch - 
https://review.openstack.org/#/c/155940/


Change-Id: I8557496ac8447752d14ba756af2236ad83d1a630
Closes-Bug: 1411383
This commit is contained in:
Shashank Hegde 2015-06-10 16:08:47 -07:00 committed by Sukhdev Kapur
parent 18333f918f
commit ea63062898
2 changed files with 344 additions and 109 deletions

View File

@ -45,7 +45,6 @@ class AristaRPCWrapper(object):
self.keystone_conf = cfg.CONF.keystone_authtoken
self.region = cfg.CONF.ml2_arista.region_name
self.sync_interval = cfg.CONF.ml2_arista.sync_interval
self._region_updated_time = None
# The cli_commands dict stores the mapping between the CLI command key
# and the actual CLI command.
self.cli_commands = {}
@ -200,18 +199,6 @@ class AristaRPCWrapper(object):
'exit']
self._run_openstack_cmds(cmds)
def sync_start(self):
"""Sends indication to EOS that ML2->EOS sync has started."""
sync_start_cmd = ['sync start']
self._run_openstack_cmds(sync_start_cmd)
def sync_end(self):
"""Sends indication to EOS that ML2->EOS sync has completed."""
sync_end_cmd = ['sync end']
self._run_openstack_cmds(sync_end_cmd)
def create_network(self, tenant_id, network):
"""Creates a single network on Arista hardware
@ -411,17 +398,9 @@ class AristaRPCWrapper(object):
self._run_openstack_cmds(cmds, commands_to_log=log_cmds)
def clear_region_updated_time(self):
"""Clear the region updated time which forces a resync."""
self._region_updated_time = None
def region_in_sync(self):
"""Check whether EOS is in sync with Neutron."""
eos_region_updated_times = self.get_region_updated_time()
return (self._region_updated_time and
(self._region_updated_time['regionTimestamp'] ==
eos_region_updated_times['regionTimestamp']))
# TODO(shashank): Remove this once the call is removed from the ML2
# driver.
pass
def get_region_updated_time(self):
"""Return the timestamp of the last update.
@ -498,7 +477,6 @@ class AristaRPCWrapper(object):
full_command.extend(self._get_exit_mode_cmds(['region',
'openstack',
'cvx']))
full_command.extend(self.cli_commands['timestamp'])
return full_command
def _run_openstack_cmds(self, commands, commands_to_log=None):
@ -518,11 +496,7 @@ class AristaRPCWrapper(object):
full_log_command = self._build_command(commands_to_log)
else:
full_log_command = None
ret = self._run_eos_cmds(full_command, full_log_command)
# Remove return values for 'configure terminal',
# 'service openstack' and 'exit' commands
if self.cli_commands['timestamp']:
self._region_updated_time = ret[-1]
self._run_eos_cmds(full_command, full_log_command)
def _eapi_host_url(self):
self._validate_config()
@ -557,41 +531,37 @@ class SyncService(object):
self._rpc = rpc_wrapper
self._ndb = neutron_db
self._force_sync = True
self._region_updated_time = None
def force_sync(self):
"""Sets the force_sync flag."""
self._force_sync = True
def do_synchronize(self):
try:
# Send trigger to EOS that the ML2->EOS sync has started.
self._rpc.sync_start()
LOG.info(_('Sync start trigger sent to EOS'))
except arista_exc.AristaRpcError:
LOG.warning(EOS_UNREACHABLE_MSG)
"""Periodically check whether EOS is in sync with ML2 driver.
If ML2 database is not in sync with EOS, then compute the diff and
send it down to EOS.
"""
if not self._sync_required():
return
# Perform the sync
self.synchronize()
# Send 'sync start' marker.
if not self._sync_start():
return
try:
# Send trigger to EOS that the ML2->EOS sync is Complete.
self._rpc.sync_end()
except arista_exc.AristaRpcError:
LOG.warning(EOS_UNREACHABLE_MSG)
# Perform the actual synchronization.
self.synchronize()
# Send 'sync end' marker.
if not self._sync_end():
return
self._set_region_updated_time()
def synchronize(self):
"""Sends data to EOS which differs from neutron DB."""
LOG.info(_('Syncing Neutron <-> EOS'))
try:
# Get the time at which entities in the region were updated.
# If the times match, then ML2 is in sync with EOS. Otherwise
# perform a complete sync.
if not self._force_sync and self._rpc.region_in_sync():
LOG.info(_('OpenStack and EOS are in sync!'))
return
except arista_exc.AristaRpcError:
LOG.warning(EOS_UNREACHABLE_MSG)
self._force_sync = True
return
try:
#Always register with EOS to ensure that it has correct credentials
self._rpc.register_with_eos()
@ -603,24 +573,6 @@ class SyncService(object):
db_tenants = db.get_tenants()
if not db_tenants and eos_tenants:
# No tenants configured in Neutron. Clear all EOS state
try:
self._rpc.delete_this_region()
msg = _('No Tenants configured in Neutron DB. But %d '
'tenants discovered in EOS during synchronization.'
'Entire EOS region is cleared') % len(eos_tenants)
LOG.info(msg)
# Re-register with EOS so that the timestamp is updated.
self._rpc.register_with_eos()
# Region has been completely cleaned. So there is nothing to
# synchronize
self._force_sync = False
except arista_exc.AristaRpcError:
LOG.warning(EOS_UNREACHABLE_MSG)
self._force_sync = True
return
# Delete tenants that are in EOS, but not in the database
tenants_to_delete = frozenset(eos_tenants.keys()).difference(
db_tenants.keys())
@ -637,6 +589,7 @@ class SyncService(object):
# operations fail, then force_sync is set to true
self._force_sync = False
vms_to_update = {}
for tenant in db_tenants:
db_nets = db.get_networks(tenant)
db_vms = db.get_vms(tenant)
@ -658,7 +611,7 @@ class SyncService(object):
nets_to_update = db_nets_key_set.difference(eos_nets_key_set)
# Find the VMs that are present in Neutron DB, but not on EOS
vms_to_update = db_vms_key_set.difference(eos_vms_key_set)
vms_to_update[tenant] = db_vms_key_set.difference(eos_vms_key_set)
try:
if vms_to_delete:
@ -672,27 +625,91 @@ class SyncService(object):
self._ndb.get_all_networks_for_tenant(tenant)
)
networks = [
{'network_id': net_id,
'segmentation_id':
networks = [{
'network_id': net_id,
'segmentation_id':
db_nets[net_id]['segmentationTypeId'],
'network_name':
neutron_nets.get(net_id, {'name': ''})['name'], }
'network_name':
neutron_nets.get(net_id, {'name': ''})['name'],
}
for net_id in nets_to_update
]
self._rpc.create_network_bulk(tenant, networks)
if vms_to_update:
# Filter the ports to only the vms that we are interested
# in.
vm_ports = [
port for port in self._ndb.get_all_ports_for_tenant(
tenant) if port['device_id'] in vms_to_update
]
except arista_exc.AristaRpcError:
LOG.warning(EOS_UNREACHABLE_MSG)
self._force_sync = True
# Now update the VMs
for tenant in vms_to_update:
if not vms_to_update[tenant]:
continue
try:
# Filter the ports to only the vms that we are interested
# in.
vm_ports = [
port for port in self._ndb.get_all_ports_for_tenant(
tenant) if port['device_id'] in vms_to_update[tenant]
]
if vm_ports:
db_vms = db.get_vms(tenant)
self._rpc.create_vm_port_bulk(tenant, vm_ports, db_vms)
except arista_exc.AristaRpcError:
LOG.warning(EOS_UNREACHABLE_MSG)
self._force_sync = True
def _sync_start(self):
"""Let EOS know that a sync in being initiated."""
try:
self._rpc._run_openstack_cmds(['sync start'])
return True
except arista_exc.AristaRpcError:
self._force_sync = True
return False
def _sync_end(self):
"""Let EOS know that sync is complete."""
try:
self._rpc._run_openstack_cmds(['sync end'])
return True
except arista_exc.AristaRpcError:
self._force_sync = True
return False
def _region_in_sync(self):
"""Checks if the region is in sync with EOS.
Checks whether the timestamp stored in EOS is the same as the
timestamp stored locally.
"""
eos_region_updated_times = self._rpc.get_region_updated_time()
return (self._region_updated_time and
(self._region_updated_time['regionTimestamp'] ==
eos_region_updated_times['regionTimestamp']))
def _sync_required(self):
""""Check whether the sync is required."""
try:
# Get the time at which entities in the region were updated.
# If the times match, then ML2 is in sync with EOS. Otherwise
# perform a complete sync.
if not self._force_sync and self._region_in_sync():
LOG.info(_('OpenStack and EOS are in sync!'))
self._sync_end()
return False
except arista_exc.AristaRpcError:
LOG.warning(EOS_UNREACHABLE_MSG)
# Force an update incase of an error.
self._force_sync = True
return True
def _set_region_updated_time(self):
"""Get the region updated time from EOS and store it locally."""
try:
self._region_updated_time = self._rpc.get_region_updated_time()
except arista_exc.AristaRpcError:
# Force an update incase of an error.
self._force_sync = True
def _get_eos_networks(self, eos_tenants, tenant):
networks = {}
if eos_tenants and tenant in eos_tenants:
@ -766,7 +783,8 @@ class AristaDriver(driver_api.MechanismDriver):
network_dict = {
'network_id': network_id,
'segmentation_id': vlan_id,
'network_name': network_name}
'network_name': network_name,
}
self.rpc.create_network(tenant_id, network_dict)
except arista_exc.AristaRpcError:
LOG.info(EOS_UNREACHABLE_MSG)
@ -797,7 +815,7 @@ class AristaDriver(driver_api.MechanismDriver):
"""
new_network = context.current
orig_network = context.original
if new_network['name'] != orig_network['name']:
if(new_network['name'] != orig_network['name']):
network_id = new_network['id']
network_name = new_network['name']
tenant_id = new_network['tenant_id']
@ -808,7 +826,8 @@ class AristaDriver(driver_api.MechanismDriver):
network_dict = {
'network_id': network_id,
'segmentation_id': vlan_id,
'network_name': network_name}
'network_name': network_name,
}
self.rpc.create_network(tenant_id, network_dict)
except arista_exc.AristaRpcError:
LOG.info(EOS_UNREACHABLE_MSG)
@ -863,6 +882,7 @@ class AristaDriver(driver_api.MechanismDriver):
network_id = port['network_id']
tenant_id = port['tenant_id']
with self.eos_sync_lock:
db.remember_tenant(tenant_id)
db.remember_vm(device_id, host, port_id,
network_id, tenant_id)

View File

@ -220,24 +220,6 @@ class PositiveRPCWrapperValidConfigTestCase(base.BaseTestCase):
def test_no_exception_on_correct_configuration(self):
self.assertIsNotNone(self.drv)
def test_sync_start(self):
self.drv.sync_start()
cmds = ['enable', 'configure', 'cvx', 'service openstack',
'region RegionOne',
'sync start',
'exit', 'exit', 'exit']
self.drv._server.runCmds.assert_called_once_with(version=1, cmds=cmds)
def test_sync_end(self):
self.drv.sync_end()
cmds = ['enable', 'configure', 'cvx', 'service openstack',
'region RegionOne',
'sync end',
'exit', 'exit', 'exit']
self.drv._server.runCmds.assert_called_once_with(version=1, cmds=cmds)
def test_plug_host_into_network(self):
tenant_id = 'ten-1'
vm_id = 'vm-1'
@ -311,7 +293,8 @@ class PositiveRPCWrapperValidConfigTestCase(base.BaseTestCase):
network = {
'network_id': 'net-id',
'network_name': 'net-name',
'segmentation_id': 123}
'segmentation_id': 123,
}
self.drv.create_network(tenant_id, network)
cmds = ['enable', 'configure', 'cvx', 'service openstack',
'region RegionOne',
@ -326,7 +309,8 @@ class PositiveRPCWrapperValidConfigTestCase(base.BaseTestCase):
networks = [{
'network_id': 'net-id-%d' % net_id,
'network_name': 'net-name-%d' % net_id,
'segmentation_id': net_id} for net_id in range(1, num_networks)
'segmentation_id': net_id,
} for net_id in range(1, num_networks)
]
self.drv.create_network_bulk(tenant_id, networks)
@ -769,3 +753,234 @@ class FakePortContext(object):
@property
def original_host(self):
return self._original_port.get(portbindings.HOST_ID)
class SyncServiceTest(base.BaseTestCase):
"""Test cases for the sync service."""
def setUp(self):
super(SyncServiceTest, self).setUp()
self.rpc = mock.MagicMock()
ndb = db.NeutronNets()
self.sync_service = arista.SyncService(self.rpc, ndb)
self.sync_service._force_sync = False
def test_region_in_sync(self):
"""Tests whether the region_in_sync() behaves as expected."""
region_updated_time = {
'regionName': 'RegionOne',
'regionTimestamp': '12345'
}
self.rpc.get_region_updated_time.return_value = region_updated_time
self.sync_service._region_updated_time = None
assert not self.sync_service._region_in_sync()
self.sync_service._region_updated_time = region_updated_time
assert self.sync_service._region_in_sync()
def test_synchronize_required(self):
"""Tests whether synchronize() sends the right commands.
This test verifies a scenario when the sync is required.
"""
region_updated_time = {
'regionName': 'RegionOne',
'regionTimestamp': '12345'
}
self.rpc.get_region_updated_time.return_value = region_updated_time
self.sync_service._region_updated_time = {
'regionName': 'RegionOne',
'regionTimestamp': '0',
}
tenant_id = 'tenant-1'
network_id = 'net-1'
segmentation_id = 42
db.remember_tenant(tenant_id)
db.remember_network(tenant_id, network_id, segmentation_id)
self.rpc.get_tenants.return_value = {}
self.sync_service.do_synchronize()
expected_calls = [
mock.call.get_region_updated_time(),
mock.call._run_openstack_cmds(['sync start']),
mock.call.register_with_eos(),
mock.call.get_tenants(),
mock.call.create_network_bulk(
tenant_id,
[{'network_id': network_id,
'segmentation_id': segmentation_id,
'network_name': ''}]),
mock.call._run_openstack_cmds(['sync end']),
mock.call.get_region_updated_time()
]
assert self.rpc.mock_calls == expected_calls
db.forget_network(tenant_id, network_id)
db.forget_tenant(tenant_id)
def test_synchronize_not_required(self):
"""Tests whether synchronize() sends the right commands.
This test verifies a scenario when the sync is not required.
"""
region_updated_time = {
'regionName': 'RegionOne',
'regionTimestamp': '424242'
}
self.rpc.get_region_updated_time.return_value = region_updated_time
self.sync_service._region_updated_time = {
'regionName': 'RegionOne',
'regionTimestamp': '424242',
}
self.sync_service.do_synchronize()
# If the timestamps do match, then the sync should not be executed.
expected_calls = [
mock.call.get_region_updated_time(),
mock.call._run_openstack_cmds(['sync end']),
]
assert self.rpc.mock_calls == expected_calls
def test_synchronize_one_network(self):
"""Test to ensure that only the required resources are sent to EOS."""
# Store two tenants in a db and a single tenant in EOS.
# The sync should send details of the second tenant to EOS
tenant_1_id = 'tenant-1'
tenant_1_net_1_id = 'ten-1-net-1'
tenant_1_net_1_seg_id = 11
db.remember_tenant(tenant_1_id)
db.remember_network(tenant_1_id, tenant_1_net_1_id,
tenant_1_net_1_seg_id)
tenant_2_id = 'tenant-2'
tenant_2_net_1_id = 'ten-2-net-1'
tenant_2_net_1_seg_id = 21
db.remember_tenant(tenant_2_id)
db.remember_network(tenant_2_id, tenant_2_net_1_id,
tenant_2_net_1_seg_id)
self.rpc.get_tenants.return_value = {
tenant_1_id: {
'tenantVmInstances': {},
'tenantNetworks': {
tenant_1_net_1_id: {
'networkId': tenant_1_net_1_id,
'networkName': 'Net1',
'segmenationType': 'vlan',
'segmentationTypeId': tenant_1_net_1_seg_id,
}
}
}
}
self.sync_service.do_synchronize()
expected_calls = [
mock.call.get_region_updated_time(),
mock.call._run_openstack_cmds(['sync start']),
mock.call.register_with_eos(),
mock.call.get_tenants(),
mock.call.create_network_bulk(
tenant_2_id,
[{'network_id': tenant_2_net_1_id,
'segmentation_id': tenant_2_net_1_seg_id,
'network_name': ''}]),
mock.call._run_openstack_cmds(['sync end']),
mock.call.get_region_updated_time()
]
self.assertTrue(self.rpc.mock_calls == expected_calls,
"Seen: %s\nExpected: %s" % (
self.rpc.mock_calls,
expected_calls,
)
)
db.forget_network(tenant_1_id, tenant_1_net_1_id)
db.forget_network(tenant_2_id, tenant_2_net_1_id)
db.forget_tenant(tenant_1_id)
db.forget_tenant(tenant_2_id)
def test_synchronize_all_networks(self):
"""Test to ensure that only the required resources are sent to EOS."""
# Store two tenants in a db and none on EOS.
# The sync should send details of all tenants to EOS
tenant_1_id = u'tenant-1'
tenant_1_net_1_id = u'ten-1-net-1'
tenant_1_net_1_seg_id = 11
db.remember_tenant(tenant_1_id)
db.remember_network(tenant_1_id, tenant_1_net_1_id,
tenant_1_net_1_seg_id)
tenant_2_id = u'tenant-2'
tenant_2_net_1_id = u'ten-2-net-1'
tenant_2_net_1_seg_id = 21
db.remember_tenant(tenant_2_id)
db.remember_network(tenant_2_id, tenant_2_net_1_id,
tenant_2_net_1_seg_id)
self.rpc.get_tenants.return_value = {}
self.sync_service.do_synchronize()
expected_calls = [
mock.call.get_region_updated_time(),
mock.call._run_openstack_cmds(['sync start']),
mock.call.register_with_eos(),
mock.call.get_tenants(),
mock.call.create_network_bulk(
tenant_1_id,
[{'network_id': tenant_1_net_1_id,
'segmentation_id': tenant_1_net_1_seg_id,
'network_name': ''}]),
mock.call.create_network_bulk(
tenant_2_id,
[{'network_id': tenant_2_net_1_id,
'segmentation_id': tenant_2_net_1_seg_id,
'network_name': ''}]),
mock.call._run_openstack_cmds(['sync end']),
mock.call.get_region_updated_time()
]
# The create_network_bulk() can be called in different order. So split
# it up. The first part checks if the initial set of methods are
# invoked.
self.assertTrue(self.rpc.mock_calls[:4] == expected_calls[:4],
"Seen: %s\nExpected: %s" % (
self.rpc.mock_calls,
expected_calls,
)
)
# Check if tenant 1 networks are created. It must be one of the two
# methods.
self.assertTrue(self.rpc.mock_calls[4] in expected_calls[4:6],
"Seen: %s\nExpected: %s" % (
self.rpc.mock_calls,
expected_calls,
)
)
# Check if tenant 2 networks are created. It must be one of the two
# methods.
self.assertTrue(self.rpc.mock_calls[5] in expected_calls[4:6],
"Seen: %s\nExpected: %s" % (
self.rpc.mock_calls,
expected_calls,
)
)
# Check if the sync end methods are invoked.
self.assertTrue(self.rpc.mock_calls[6:8] == expected_calls[6:8],
"Seen: %s\nExpected: %s" % (
self.rpc.mock_calls,
expected_calls,
)
)
db.forget_network(tenant_1_id, tenant_1_net_1_id)
db.forget_network(tenant_2_id, tenant_2_net_1_id)
db.forget_tenant(tenant_1_id)
db.forget_tenant(tenant_2_id)