Handle multiple controllers in vpcid allocate db
This changeset includes changes to vpcid allocate db to handle collisions when two controllers are trying to allocate same id at the same time. Changeset also addresses comments from review of 1691822. Change-Id: Iad00f6a468bb2c3fcd35c6e39b75349aa8ff7fe2 Closes-bug: #1701267
This commit is contained in:
parent
dbcdea1275
commit
c7c014abcb
|
@ -105,8 +105,7 @@ class ML2MechCiscoConfig(object):
|
|||
else:
|
||||
for if_id in value[0].split(','):
|
||||
if_type, port = (
|
||||
nexus_help.split_interface_name(
|
||||
if_id))
|
||||
nexus_help.split_interface_name(if_id))
|
||||
interface = nexus_help.format_interface_name(
|
||||
if_type, port)
|
||||
nxos_db.add_host_mapping(
|
||||
|
|
|
@ -67,7 +67,7 @@ class NexusHostMappingNotFound(exceptions.NeutronException):
|
|||
|
||||
class NexusVPCAllocNotFound(exceptions.NeutronException):
|
||||
"""Nexus VPC alloc is not present."""
|
||||
message = _("Nexus VPC Alloc (%(filters)s) is not present")
|
||||
message = _("Nexus VPC Alloc (%(filters)s) is not present.")
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
filters = ','.join('%s=%s' % i for i in kwargs.items())
|
||||
|
@ -78,7 +78,7 @@ class NexusVPCAllocInvalidArgValue(exceptions.NeutronException):
|
|||
"""Nexus VPC alloc arg values not valid."""
|
||||
message = _("Nexus VPC Alloc init failed. Args "
|
||||
"start (%(vpcstart)s) and end (%(vpcend)s) "
|
||||
"difference should be greater than 0. ")
|
||||
"difference should be greater than 0.")
|
||||
|
||||
|
||||
class NexusVPCAllocIncorrectArgCount(exceptions.NeutronException):
|
||||
|
|
|
@ -1538,7 +1538,13 @@ class CiscoNexusMechanismDriver(api.MechanismDriver):
|
|||
nexus_port)
|
||||
self.driver.delete_port_channel(switch_ip,
|
||||
nexus_port)
|
||||
nxos_db.free_vpcid_for_switch(nexus_port, switch_ip)
|
||||
try:
|
||||
nxos_db.free_vpcid_for_switch(nexus_port, switch_ip)
|
||||
except excep.NexusVPCAllocNotFound:
|
||||
# Not all learned port channels will be in this db when
|
||||
# they're outside the configured vpc_pool so
|
||||
# this exception may be possible.
|
||||
pass
|
||||
|
||||
def _delete_switch_entry(self, port, vlan_id, device_id, host_id, vni,
|
||||
is_provider_vlan):
|
||||
|
|
|
@ -15,6 +15,9 @@
|
|||
#
|
||||
|
||||
from oslo_log import log as logging
|
||||
from random import shuffle
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.orm import aliased
|
||||
import sqlalchemy.orm.exc as sa_exc
|
||||
from sqlalchemy.sql import func
|
||||
|
||||
|
@ -593,6 +596,32 @@ def _lookup_vpc_count_min_max(session=None, **bfilter):
|
|||
raise c_exc.NexusVPCAllocNotFound(**bfilter)
|
||||
|
||||
|
||||
def _lookup_vpcids(query_type, session=None, **bfilter):
|
||||
"""Look up 'query_type' Nexus VPC Allocs matching the filter.
|
||||
|
||||
:param query_type: 'all', 'one' or 'first'
|
||||
:param session: db session
|
||||
:param bfilter: filter for mappings query
|
||||
:return: VPCIDs if query gave a result, else
|
||||
raise NexusVPCAllocNotFound.
|
||||
"""
|
||||
|
||||
if session is None:
|
||||
session = db.get_session()
|
||||
|
||||
query_method = getattr(session.query(
|
||||
nexus_models_v2.NexusVPCAlloc.vpc_id).filter_by(**bfilter), query_type)
|
||||
|
||||
try:
|
||||
vpcs = query_method()
|
||||
if vpcs:
|
||||
return vpcs
|
||||
except sa_exc.NoResultFound:
|
||||
pass
|
||||
|
||||
raise c_exc.NexusVPCAllocNotFound(**bfilter)
|
||||
|
||||
|
||||
def _lookup_all_vpc_allocs(session=None, **bfilter):
|
||||
return _lookup_vpc_allocs('all', session, **bfilter)
|
||||
|
||||
|
@ -601,6 +630,34 @@ def _lookup_one_vpc_allocs(session=None, **bfilter):
|
|||
return _lookup_vpc_allocs('one', session, **bfilter)
|
||||
|
||||
|
||||
def _lookup_all_vpcids(session=None, **bfilter):
|
||||
return _lookup_vpcids('all', session, **bfilter)
|
||||
|
||||
|
||||
def _get_free_vpcids_on_switches(switch_ip_list):
|
||||
'''Get intersect list of free vpcids in list of switches.'''
|
||||
|
||||
session = db.get_session()
|
||||
|
||||
prev_view = aliased(nexus_models_v2.NexusVPCAlloc)
|
||||
query = session.query(prev_view.vpc_id)
|
||||
prev_swip = switch_ip_list[0]
|
||||
|
||||
for ip in switch_ip_list[1:]:
|
||||
cur_view = aliased(nexus_models_v2.NexusVPCAlloc)
|
||||
cur_swip = ip
|
||||
query = query.join(cur_view, sa.and_(
|
||||
prev_view.switch_ip == prev_swip, prev_view.active == False, # noqa
|
||||
cur_view.switch_ip == cur_swip, cur_view.active == False, # noqa
|
||||
prev_view.vpc_id == cur_view.vpc_id))
|
||||
prev_view = cur_view
|
||||
prev_swip = cur_swip
|
||||
|
||||
unique_vpcids = query.all()
|
||||
shuffle(unique_vpcids)
|
||||
return unique_vpcids
|
||||
|
||||
|
||||
def get_all_switch_vpc_allocs(switch_ip):
|
||||
return(_lookup_all_vpc_allocs(switch_ip=switch_ip))
|
||||
|
||||
|
@ -648,55 +705,45 @@ def init_vpc_entries(nexus_ip, vpc_start, vpc_end):
|
|||
session.flush()
|
||||
|
||||
|
||||
def update_vpc_entry(nexus_ip, vpc_id, learned, active):
|
||||
def update_vpc_entry(nexus_ips, vpc_id, learned, active):
|
||||
"""Change active state in vpc_allocate data base."""
|
||||
|
||||
LOG.debug("update_vpc_entry called")
|
||||
|
||||
session = db.get_session()
|
||||
try:
|
||||
vpc_alloc = _lookup_one_vpc_allocs(
|
||||
switch_ip=nexus_ip,
|
||||
vpc_id=vpc_id)
|
||||
except c_exc.NexusVPCAllocNotFound:
|
||||
return None
|
||||
|
||||
vpc_alloc.learned = learned
|
||||
vpc_alloc.active = active
|
||||
|
||||
session.merge(vpc_alloc)
|
||||
session.flush()
|
||||
|
||||
return vpc_alloc
|
||||
with session.begin():
|
||||
for n_ip in nexus_ips:
|
||||
flipit = not active
|
||||
x = session.execute(
|
||||
sa.update(nexus_models_v2.NexusVPCAlloc).values({
|
||||
'learned': learned,
|
||||
'active': active}).where(sa.and_(
|
||||
nexus_models_v2.NexusVPCAlloc.switch_ip == n_ip,
|
||||
nexus_models_v2.NexusVPCAlloc.vpc_id == vpc_id,
|
||||
nexus_models_v2.NexusVPCAlloc.active == flipit
|
||||
)))
|
||||
if x.rowcount != 1:
|
||||
raise c_exc.NexusVPCAllocNotFound(
|
||||
switch_ip=n_ip, vpc_id=vpc_id, active=active)
|
||||
|
||||
|
||||
def alloc_vpcid(nexus_ips):
|
||||
"""Allocate a vpc id for the given list of switch_ips."""
|
||||
|
||||
LOG.debug("alloc_vpc() called")
|
||||
vpc_list = []
|
||||
|
||||
# First build a set of vlans for each switch
|
||||
for n_ip in nexus_ips:
|
||||
switch_free_list = get_free_switch_vpc_allocs(n_ip)
|
||||
vpc_set = set()
|
||||
for switch_ip, vpcid, learned, active in switch_free_list:
|
||||
vpc_set.add(vpcid[1])
|
||||
vpc_list.append(vpc_set)
|
||||
|
||||
# Now get intersection
|
||||
intersect = vpc_list[0]
|
||||
for switch_list in vpc_list:
|
||||
intersect = intersect & switch_list
|
||||
|
||||
intersect = list(intersect)
|
||||
if len(intersect) > 0:
|
||||
intersect.sort()
|
||||
vpc_id = intersect[0] # get smallest
|
||||
for n_ip in nexus_ips:
|
||||
update_vpc_entry(n_ip, vpc_id, False, True)
|
||||
else:
|
||||
vpc_id = 0
|
||||
vpc_id = 0
|
||||
intersect = _get_free_vpcids_on_switches(nexus_ips)
|
||||
for intersect_tuple in intersect:
|
||||
try:
|
||||
update_vpc_entry(nexus_ips, intersect_tuple.vpc_id,
|
||||
False, True)
|
||||
vpc_id = intersect_tuple.vpc_id
|
||||
break
|
||||
except Exception:
|
||||
# Another controller may have beaten us to this vpcid
|
||||
pass
|
||||
|
||||
return vpc_id
|
||||
|
||||
|
@ -705,18 +752,13 @@ def free_vpcid_for_switch_list(vpc_id, nexus_ips):
|
|||
"""Free a vpc id for the given list of switch_ips."""
|
||||
|
||||
LOG.debug("free_vpcid_for_switch_list() called")
|
||||
if vpc_id == 0:
|
||||
return
|
||||
|
||||
for n_ip in nexus_ips:
|
||||
update_vpc_entry(n_ip, vpc_id, False, False)
|
||||
if vpc_id != 0:
|
||||
update_vpc_entry(nexus_ips, vpc_id, False, False)
|
||||
|
||||
|
||||
def free_vpcid_for_switch(vpc_id, nexus_ip):
|
||||
"""Free a vpc id for the given switch_ip."""
|
||||
|
||||
LOG.debug("free_vpcid_for_switch() called")
|
||||
if vpc_id == 0:
|
||||
return
|
||||
|
||||
update_vpc_entry(nexus_ip, vpc_id, False, False)
|
||||
if vpc_id != 0:
|
||||
update_vpc_entry([nexus_ip], vpc_id, False, False)
|
||||
|
|
|
@ -354,8 +354,13 @@ class CiscoNexusRestapiDriver(basedrvr.CiscoNexusBaseDriver):
|
|||
second=this_if)
|
||||
# if newly learned ch_grp
|
||||
if ch_grp_saved == 0:
|
||||
nxos_db.update_vpc_entry(
|
||||
nexus_host, ch_grp, learned, True)
|
||||
try:
|
||||
nxos_db.update_vpc_entry(
|
||||
nexus_host, ch_grp, learned, True)
|
||||
except cexc.NexusVPCAllocNotFound:
|
||||
# Will get this error if learned ch_grp
|
||||
# not part of configured vpc_pool
|
||||
pass
|
||||
elif learned_ch_grp != 0:
|
||||
# Remove port-channels just created
|
||||
for y in range(i):
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
# under the License.
|
||||
|
||||
import collections
|
||||
import mock
|
||||
import testtools
|
||||
|
||||
from networking_cisco.plugins.ml2.drivers.cisco.nexus import exceptions
|
||||
|
@ -216,6 +217,17 @@ class TestCiscoNexusVpcAllocDbTest(testlib_api.SqlTestCase):
|
|||
"""Unit tests for Cisco mechanism driver's Nexus vpc alloc database."""
|
||||
|
||||
def setUp(self):
|
||||
original_intersect = nexus_db_v2._get_free_vpcids_on_switches
|
||||
|
||||
def new_get_free_vpcids_on_switches(nexus_ips):
|
||||
intersect = list(original_intersect(nexus_ips))
|
||||
intersect.sort()
|
||||
return intersect
|
||||
|
||||
mock.patch.object(nexus_db_v2,
|
||||
'_get_free_vpcids_on_switches',
|
||||
new=new_get_free_vpcids_on_switches).start()
|
||||
|
||||
super(TestCiscoNexusVpcAllocDbTest, self).setUp()
|
||||
|
||||
def test_vpcalloc_init(self):
|
||||
|
@ -227,9 +239,15 @@ class TestCiscoNexusVpcAllocDbTest(testlib_api.SqlTestCase):
|
|||
allocs = nexus_db_v2.get_free_switch_vpc_allocs(this_ip)
|
||||
self.assertEqual(len(allocs), 25)
|
||||
|
||||
nexus_db_v2.update_vpc_entry('1.1.1.1', 1001, False, True)
|
||||
nexus_db_v2.update_vpc_entry('2.2.2.2', 1002, False, True)
|
||||
nexus_db_v2.update_vpc_entry('3.3.3.3', 1003, False, True)
|
||||
nexus_db_v2.update_vpc_entry(['1.1.1.1'], 1001, False, True)
|
||||
nexus_db_v2.update_vpc_entry(['2.2.2.2'], 1002, False, True)
|
||||
nexus_db_v2.update_vpc_entry(['3.3.3.3'], 1003, False, True)
|
||||
|
||||
# Verify this update fails since entry already active
|
||||
self.assertRaises(
|
||||
exceptions.NexusVPCAllocNotFound,
|
||||
nexus_db_v2.update_vpc_entry,
|
||||
['3.3.3.3'], 1003, False, True)
|
||||
|
||||
new_vpcid = nexus_db_v2.alloc_vpcid(nexus_ips)
|
||||
self.assertEqual(new_vpcid, 1004)
|
||||
|
@ -237,9 +255,77 @@ class TestCiscoNexusVpcAllocDbTest(testlib_api.SqlTestCase):
|
|||
nexus_db_v2.free_vpcid_for_switch(1002, '2.2.2.2')
|
||||
nexus_db_v2.free_vpcid_for_switch_list(1004, nexus_ips)
|
||||
|
||||
# verify vpc 1002 can now be reused
|
||||
new_vpcid = nexus_db_v2.alloc_vpcid(nexus_ips)
|
||||
self.assertEqual(new_vpcid, 1002)
|
||||
|
||||
def test_vpcalloc_rollback(self):
|
||||
|
||||
nexus_ips = ['1.1.1.1', '2.2.2.2', '3.3.3.3']
|
||||
|
||||
for this_ip in nexus_ips:
|
||||
nexus_db_v2.init_vpc_entries(this_ip, 1001, 1025)
|
||||
|
||||
nexus_db_v2.update_vpc_entry(
|
||||
nexus_ips, 1001, False, True)
|
||||
allocs = nexus_db_v2.get_free_switch_vpc_allocs('1.1.1.1')
|
||||
self.assertEqual(len(allocs), 24)
|
||||
allocs = nexus_db_v2.get_free_switch_vpc_allocs('2.2.2.2')
|
||||
self.assertEqual(len(allocs), 24)
|
||||
allocs = nexus_db_v2.get_free_switch_vpc_allocs('3.3.3.3')
|
||||
self.assertEqual(len(allocs), 24)
|
||||
|
||||
nexus_db_v2.update_vpc_entry(
|
||||
nexus_ips, 1001, False, False)
|
||||
allocs = nexus_db_v2.get_free_switch_vpc_allocs('1.1.1.1')
|
||||
self.assertEqual(len(allocs), 25)
|
||||
allocs = nexus_db_v2.get_free_switch_vpc_allocs('2.2.2.2')
|
||||
self.assertEqual(len(allocs), 25)
|
||||
allocs = nexus_db_v2.get_free_switch_vpc_allocs('3.3.3.3')
|
||||
self.assertEqual(len(allocs), 25)
|
||||
|
||||
nexus_db_v2.update_vpc_entry(['3.3.3.3'], 1001, False, True)
|
||||
try:
|
||||
nexus_db_v2.update_vpc_entry(
|
||||
nexus_ips, 1001, False, True)
|
||||
except exceptions.NexusVPCAllocNotFound:
|
||||
allocs = nexus_db_v2.get_free_switch_vpc_allocs('1.1.1.1')
|
||||
self.assertEqual(len(allocs), 25)
|
||||
allocs = nexus_db_v2.get_free_switch_vpc_allocs('2.2.2.2')
|
||||
self.assertEqual(len(allocs), 25)
|
||||
allocs = nexus_db_v2.get_free_switch_vpc_allocs('3.3.3.3')
|
||||
self.assertEqual(len(allocs), 24)
|
||||
|
||||
def test_vpcalloc_test_alloc_collision(self):
|
||||
|
||||
def new_get_free_vpcids_on_switches(nexus_ips):
|
||||
results = nexus_db_v2.get_free_switch_vpc_allocs('4.4.4.4')
|
||||
return results
|
||||
|
||||
nexus_ips = ['1.1.1.1', '2.2.2.2', '3.3.3.3']
|
||||
|
||||
for this_ip in nexus_ips:
|
||||
nexus_db_v2.init_vpc_entries(this_ip, 1001, 1025)
|
||||
# IP 4.4.4.4 is added only to return a list of vpc ids
|
||||
# in same format as sql will return.
|
||||
nexus_db_v2.init_vpc_entries('4.4.4.4', 1001, 1003)
|
||||
mock.patch.object(nexus_db_v2,
|
||||
'_get_free_vpcids_on_switches',
|
||||
new=new_get_free_vpcids_on_switches).start()
|
||||
|
||||
# configure '3.3.3.3', vpcid 1001 so alloc_vpcid will fail
|
||||
# on 1001 after updating 1.1.1.1 and 2.2.2.2 and rollback
|
||||
# occurs. Then moves onto successfully allocating 1002.
|
||||
nexus_db_v2.update_vpc_entry(['3.3.3.3'], 1001, False, True)
|
||||
vpc_id = nexus_db_v2.alloc_vpcid(nexus_ips)
|
||||
self.assertEqual(vpc_id, 1002)
|
||||
allocs = nexus_db_v2.get_free_switch_vpc_allocs('1.1.1.1')
|
||||
self.assertEqual(len(allocs), 24)
|
||||
allocs = nexus_db_v2.get_free_switch_vpc_allocs('2.2.2.2')
|
||||
self.assertEqual(len(allocs), 24)
|
||||
allocs = nexus_db_v2.get_free_switch_vpc_allocs('3.3.3.3')
|
||||
self.assertEqual(len(allocs), 23)
|
||||
|
||||
def test_vpcalloc_min_max(self):
|
||||
|
||||
# Initialize 3 switch vpc entries
|
||||
|
|
|
@ -27,6 +27,7 @@ apply to ssh only OR because rerunning the test would be
|
|||
redundant.
|
||||
"""
|
||||
|
||||
import mock
|
||||
import unittest
|
||||
|
||||
from oslo_config import cfg
|
||||
|
@ -784,7 +785,16 @@ class TestCiscoNexusRestBaremetalDevice(
|
|||
|
||||
def setUp(self):
|
||||
"""Sets up mock ncclient, and switch and credentials dictionaries."""
|
||||
original_intersect = nxos_db._get_free_vpcids_on_switches
|
||||
|
||||
def new_get_free_vpcids_on_switches(nexus_ips):
|
||||
intersect = list(original_intersect(nexus_ips))
|
||||
intersect.sort()
|
||||
return intersect
|
||||
|
||||
mock.patch.object(nxos_db,
|
||||
'_get_free_vpcids_on_switches',
|
||||
new=new_get_free_vpcids_on_switches).start()
|
||||
cfg.CONF.set_override('nexus_driver', 'restapi', 'ml2_cisco')
|
||||
cfg.CONF.set_override('never_cache_ssh_connection', False, 'ml2_cisco')
|
||||
super(TestCiscoNexusRestBaremetalDevice, self).setUp()
|
||||
|
|
|
@ -30,6 +30,7 @@ redundant.
|
|||
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
|
||||
from networking_cisco.plugins.ml2.drivers.cisco.nexus import (
|
||||
|
@ -869,7 +870,16 @@ class TestCiscoNexusRestBaremetalReplay(
|
|||
|
||||
def setUp(self):
|
||||
"""Sets up mock ncclient, and switch and credentials dictionaries."""
|
||||
original_intersect = nxos_db._get_free_vpcids_on_switches
|
||||
|
||||
def new_get_free_vpcids_on_switches(nexus_ips):
|
||||
intersect = list(original_intersect(nexus_ips))
|
||||
intersect.sort()
|
||||
return intersect
|
||||
|
||||
mock.patch.object(nxos_db,
|
||||
'_get_free_vpcids_on_switches',
|
||||
new=new_get_free_vpcids_on_switches).start()
|
||||
cfg.CONF.set_override('nexus_driver', 'restapi', 'ml2_cisco')
|
||||
cfg.CONF.set_override('never_cache_ssh_connection', False, 'ml2_cisco')
|
||||
super(TestCiscoNexusRestBaremetalReplay, self).setUp()
|
||||
|
|
Loading…
Reference in New Issue