Pools support with Network Policies

This patch adapts the pools support to the use of Network Policies.
Unlike with the other drivers, when Network Policies are applied the
pods' ports changes their security groups while being used. That means
their original pool will not fit them anymore with the next two
consequences:
1.- Ports will have their original SG reapplied when pods are deleted,
with the consequent performance impact do to increasing the number of
calls to neutron
2.- Original pools may become useless, as different SGs are being used,
therefore wasting neutron ports

To accomodate for network policies, this patch removes the SG ids from
the pool key, merging all the pools with same network/project/host ids
but with different security groups into the same pool. This will not
change the behavior of the other drivers as there was a unique pool per
network/project/host ids already, i.e., the same SG ids were being used.
However, this will helps to avoid problem 1) as it is no longer
re-applying the SG, but simply putting the port back into its current
pool. And it will fix problem 2) as it will pick a port for an existing
pool that matches network/project/host ids. First it will search for one
with already matching SGs, and if not found, it will recycle one of the
others by reapplying the needed SGs (note it picks a port from one of
the pools that are less frequently used -- assumes they may belong to
a deleted NP that it is not needed anymore, thus removing the port
wastage problem)

Partially Implements: blueprint k8s-network-policies
Change-Id: I2c1e47fd5112c64b8e9984e5ac5d8572d91ac202
This commit is contained in:
Luis Tomas Bolivar 2019-02-04 12:32:12 +01:00
parent 375e61a566
commit e8c418c196
2 changed files with 447 additions and 134 deletions

View File

@ -155,23 +155,25 @@ class BaseVIFPool(base.VIFPoolDriver):
def update_vif_sgs(self, pod, sgs):
self._drv_vif.update_vif_sgs(pod, sgs)
def _get_pool_size(self, pool_key=None):
return len(self._available_ports_pools.get(pool_key, []))
def _get_pool_size(self, pool_key):
pool = self._available_ports_pools.get(pool_key, {})
pool_members = []
for port_list in pool.values():
pool_members.extend(port_list)
return len(pool_members)
def _get_host_addr(self, pod):
return pod['status']['hostIP']
def _get_pool_key(self, host, project_id, security_groups, net_id=None,
subnets=None):
def _get_pool_key(self, host, project_id, net_id=None, subnets=None):
if not net_id and subnets:
net_obj = list(subnets.values())[0]
net_id = net_obj.id
pool_key = (host, project_id, tuple(sorted(security_groups)),
net_id)
pool_key = (host, project_id, net_id)
return pool_key
def _get_pool_key_net(self, pool_key):
return pool_key[3]
return pool_key[2]
def request_vif(self, pod, project_id, subnets, security_groups):
try:
@ -179,33 +181,37 @@ class BaseVIFPool(base.VIFPoolDriver):
except KeyError:
LOG.warning("Pod has not been scheduled yet.")
raise
pool_key = self._get_pool_key(host_addr, project_id, security_groups,
None, subnets)
pool_key = self._get_pool_key(host_addr, project_id, None, subnets)
try:
return self._get_port_from_pool(pool_key, pod, subnets)
return self._get_port_from_pool(pool_key, pod, subnets,
tuple(sorted(security_groups)))
except exceptions.ResourceNotReady:
LOG.warning("Ports pool does not have available ports!")
eventlet.spawn(self._populate_pool, pool_key, pod, subnets)
eventlet.spawn(self._populate_pool, pool_key, pod, subnets,
tuple(sorted(security_groups)))
raise
def _get_port_from_pool(self, pool_key, pod, subnets):
def _get_port_from_pool(self, pool_key, pod, subnets, security_groups):
raise NotImplementedError()
def _populate_pool(self, pool_key, pod, subnets):
def _populate_pool(self, pool_key, pod, subnets, security_groups):
# REVISIT(ltomasbo): Drop the subnets parameter and get the information
# from the pool_key, which will be required when multi-network is
# supported
now = time.time()
try:
if (now - oslo_cfg.CONF.vif_pool.ports_pool_update_frequency <
self._last_update.get(pool_key, 0)):
LOG.info("Not enough time since the last pool update")
pool_updates = self._last_update.get(pool_key)
if pool_updates:
last_update = pool_updates.get(security_groups, 0)
try:
if (now - oslo_cfg.CONF.vif_pool.ports_pool_update_frequency <
last_update):
LOG.info("Not enough time since the last pool update")
return
except AttributeError:
LOG.info("Kuryr-controller not yet ready to populate pools")
return
except AttributeError:
LOG.info("Kuryr-controller not yet ready to populate pools")
return
self._last_update[pool_key] = now
self._last_update[pool_key] = {security_groups: now}
pool_size = self._get_pool_size(pool_key)
if pool_size < oslo_cfg.CONF.vif_pool.ports_pool_min:
@ -215,18 +221,19 @@ class BaseVIFPool(base.VIFPoolDriver):
pod=pod,
project_id=pool_key[1],
subnets=subnets,
security_groups=list(pool_key[2]),
security_groups=security_groups,
num_ports=num_ports)
for vif in vifs:
self._existing_vifs[vif.id] = vif
self._available_ports_pools.setdefault(pool_key,
[]).append(vif.id)
self._available_ports_pools.setdefault(
pool_key, {}).setdefault(
security_groups, []).append(vif.id)
def release_vif(self, pod, vif, project_id, security_groups):
host_addr = self._get_host_addr(pod)
pool_key = self._get_pool_key(host_addr, project_id, security_groups,
vif.network.id, None)
pool_key = self._get_pool_key(host_addr, project_id, vif.network.id,
None)
try:
if not self._existing_vifs.get(vif.id):
@ -288,12 +295,10 @@ class BaseVIFPool(base.VIFPoolDriver):
@lockutils.synchronized('return_to_pool_baremetal')
@lockutils.synchronized('return_to_pool_nested')
def sync_pools(self):
self._available_ports_pools = collections.defaultdict(
collections.deque)
self._existing_vifs = collections.defaultdict(collections.defaultdict)
self._recyclable_ports = collections.defaultdict(
collections.defaultdict)
self._last_update = collections.defaultdict(collections.defaultdict)
self._available_ports_pools = collections.defaultdict()
self._existing_vifs = collections.defaultdict()
self._recyclable_ports = collections.defaultdict()
self._last_update = collections.defaultdict()
# NOTE(ltomasbo): Ensure previously created ports are recovered into
# their respective pools
self._recover_precreated_ports()
@ -365,11 +370,45 @@ class NeutronVIFPool(BaseVIFPool):
def _get_host_addr(self, pod):
return pod['spec']['nodeName']
def _get_port_from_pool(self, pool_key, pod, subnets):
def _get_port_from_pool(self, pool_key, pod, subnets, security_groups):
try:
port_id = self._available_ports_pools[pool_key].pop()
except (IndexError, AttributeError):
pool_ports = self._available_ports_pools[pool_key]
except (KeyError, AttributeError):
raise exceptions.ResourceNotReady(pod)
try:
port_id = pool_ports[security_groups].pop()
except (KeyError, IndexError):
# Get another port from the pool and update the SG to the
# appropriate one. It uses a port from the group that was updated
# longer ago
pool_updates = self._last_update.get(pool_key, {})
if not pool_updates:
# No pools update info. Selecting a random one
for sg_group, ports in pool_ports.items():
if len(ports) > 0:
port_id = pool_ports[sg_group].pop()
break
else:
raise exceptions.ResourceNotReady(pod)
else:
min_date = -1
for sg_group, date in pool_updates.items():
if pool_ports.get(sg_group):
if min_date == -1 or date < min_date:
min_date = date
min_sg_group = sg_group
if min_date == -1:
# pool is empty, no port to reuse
raise exceptions.ResourceNotReady(pod)
port_id = pool_ports[min_sg_group].pop()
neutron = clients.get_neutron_client()
neutron.update_port(
port_id,
{
"port": {
'security_groups': list(security_groups)
}
})
if config.CONF.kubernetes.port_debug:
neutron = clients.get_neutron_client()
neutron.update_port(
@ -383,7 +422,8 @@ class NeutronVIFPool(BaseVIFPool):
# check if the pool needs to be populated
if (self._get_pool_size(pool_key) <
oslo_cfg.CONF.vif_pool.ports_pool_min):
eventlet.spawn(self._populate_pool, pool_key, pod, subnets)
eventlet.spawn(self._populate_pool, pool_key, pod, subnets,
security_groups)
return self._existing_vifs[port_id]
def _return_ports_to_pool(self):
@ -414,7 +454,8 @@ class NeutronVIFPool(BaseVIFPool):
device_owner=kl_const.DEVICE_OWNER)
for port in kuryr_ports:
if port['id'] in self._recyclable_ports:
sg_current[port['id']] = port['security_groups']
sg_current[port['id']] = tuple(sorted(
port['security_groups']))
for port_id, pool_key in self._recyclable_ports.copy().items():
if (not oslo_cfg.CONF.vif_pool.ports_pool_max or
@ -423,25 +464,24 @@ class NeutronVIFPool(BaseVIFPool):
port_name = (constants.KURYR_PORT_NAME
if config.CONF.kubernetes.port_debug
else '')
if (config.CONF.kubernetes.port_debug or
list(pool_key[2]) != sg_current.get(port_id)):
if config.CONF.kubernetes.port_debug:
try:
neutron.update_port(
port_id,
{
"port": {
'name': port_name,
'device_id': '',
'security_groups': list(pool_key[2])
'device_id': ''
}
})
except n_exc.NeutronClientException:
LOG.warning("Error preparing port %s to be "
LOG.warning("Error changing name for port %s to be "
"reused, put back on the cleanable "
"pool.", port_id)
continue
self._available_ports_pools.setdefault(
pool_key, []).append(port_id)
pool_key, {}).setdefault(
sg_current.get(port_id), []).append(port_id)
else:
try:
del self._existing_vifs[port_id]
@ -490,12 +530,13 @@ class NeutronVIFPool(BaseVIFPool):
net_obj = subnet[subnet_id]
pool_key = self._get_pool_key(port_host,
port['project_id'],
port['security_groups'],
net_obj.id, None)
self._existing_vifs[port['id']] = vif
self._available_ports_pools.setdefault(
pool_key, []).append(port['id'])
pool_key, {}).setdefault(
tuple(sorted(port['security_groups'])), []).append(
port['id'])
LOG.info("PORTS POOL: pools updated with pre-created ports")
self._create_healthcheck_file()
@ -512,10 +553,13 @@ class NeutronVIFPool(BaseVIFPool):
# on the available_ports_pools dict. The next call forces it to be on
# that dict before cleaning it up
self._trigger_return_to_pool()
for pool_key, ports_id in self._available_ports_pools.items():
for pool_key, ports in self._available_ports_pools.items():
if self._get_pool_key_net(pool_key) != net_id:
continue
self._available_ports_pools[pool_key] = []
ports_id = []
for sg_ports in ports.values():
ports_id.extend(sg_ports)
for port_id in ports_id:
try:
del self._existing_vifs[port_id]
@ -548,11 +592,45 @@ class NestedVIFPool(BaseVIFPool):
def set_vif_driver(self, driver):
self._drv_vif = driver
def _get_port_from_pool(self, pool_key, pod, subnets):
def _get_port_from_pool(self, pool_key, pod, subnets, security_groups):
try:
port_id = self._available_ports_pools[pool_key].pop()
except (IndexError, AttributeError):
pool_ports = self._available_ports_pools[pool_key]
except (KeyError, AttributeError):
raise exceptions.ResourceNotReady(pod)
try:
port_id = pool_ports[security_groups].pop()
except (KeyError, IndexError):
# Get another port from the pool and update the SG to the
# appropriate one. It uses a port from the group that was updated
# longer ago
pool_updates = self._last_update.get(pool_key, {})
if not pool_updates:
# No pools update info. Selecting a random one
for sg_group, ports in pool_ports.items():
if len(ports) > 0:
port_id = pool_ports[sg_group].pop()
break
else:
raise exceptions.ResourceNotReady(pod)
else:
min_date = -1
for sg_group, date in pool_updates.items():
if pool_ports.get(sg_group):
if min_date == -1 or date < min_date:
min_date = date
min_sg_group = sg_group
if min_date == -1:
# pool is empty, no port to reuse
raise exceptions.ResourceNotReady(pod)
port_id = pool_ports[min_sg_group].pop()
neutron = clients.get_neutron_client()
neutron.update_port(
port_id,
{
"port": {
'security_groups': list(security_groups)
}
})
if config.CONF.kubernetes.port_debug:
neutron = clients.get_neutron_client()
neutron.update_port(
@ -565,7 +643,8 @@ class NestedVIFPool(BaseVIFPool):
# check if the pool needs to be populated
if (self._get_pool_size(pool_key) <
oslo_cfg.CONF.vif_pool.ports_pool_min):
eventlet.spawn(self._populate_pool, pool_key, pod, subnets)
eventlet.spawn(self._populate_pool, pool_key, pod, subnets,
security_groups)
return self._existing_vifs[port_id]
def _return_ports_to_pool(self):
@ -596,7 +675,8 @@ class NestedVIFPool(BaseVIFPool):
device_owner=['trunk:subport', kl_const.DEVICE_OWNER])
for subport in kuryr_subports:
if subport['id'] in self._recyclable_ports:
sg_current[subport['id']] = subport['security_groups']
sg_current[subport['id']] = tuple(sorted(
subport['security_groups']))
for port_id, pool_key in self._recyclable_ports.copy().items():
if (not oslo_cfg.CONF.vif_pool.ports_pool_max or
@ -605,24 +685,23 @@ class NestedVIFPool(BaseVIFPool):
port_name = (constants.KURYR_PORT_NAME
if config.CONF.kubernetes.port_debug
else '')
if (config.CONF.kubernetes.port_debug or
list(pool_key[2]) != sg_current.get(port_id)):
if config.CONF.kubernetes.port_debug:
try:
neutron.update_port(
port_id,
{
"port": {
'name': port_name,
'security_groups': list(pool_key[2])
}
})
except n_exc.NeutronClientException:
LOG.warning("Error preparing port %s to be "
LOG.warning("Error changing name for port %s to be "
"reused, put back on the cleanable "
"pool.", port_id)
continue
self._available_ports_pools.setdefault(
pool_key, []).append(port_id)
pool_key, {}).setdefault(
sg_current.get(port_id), []).append(port_id)
else:
trunk_id = self._get_trunk_id(neutron, pool_key)
try:
@ -701,8 +780,6 @@ class NestedVIFPool(BaseVIFPool):
net_obj = subnet[subnet_id]
pool_key = self._get_pool_key(host_addr,
kuryr_subport['project_id'],
kuryr_subport[
'security_groups'],
net_obj.id, None)
if action == 'recover':
@ -711,7 +788,9 @@ class NestedVIFPool(BaseVIFPool):
self._existing_vifs[kuryr_subport['id']] = vif
self._available_ports_pools.setdefault(
pool_key, []).append(kuryr_subport['id'])
pool_key, {}).setdefault(tuple(sorted(
kuryr_subport['security_groups'])),
[]).append(kuryr_subport['id'])
elif action == 'free':
try:
@ -721,8 +800,9 @@ class NestedVIFPool(BaseVIFPool):
self._drv_vif._release_vlan_id(
subport['segmentation_id'])
del self._existing_vifs[kuryr_subport['id']]
self._available_ports_pools[pool_key].remove(
kuryr_subport['id'])
self._available_ports_pools[pool_key][
tuple(sorted(kuryr_subport['security_groups']
))].remove(kuryr_subport['id'])
except n_exc.PortNotFoundClient:
LOG.debug('Unable to release port %s as it no '
'longer exists.', kuryr_subport['id'])
@ -752,12 +832,11 @@ class NestedVIFPool(BaseVIFPool):
num_ports=num_ports,
trunk_ip=trunk_ip)
pool_key = self._get_pool_key(trunk_ip, project_id, security_groups,
None, subnets)
pool_key = self._get_pool_key(trunk_ip, project_id, None, subnets)
for vif in vifs:
self._existing_vifs[vif.id] = vif
self._available_ports_pools.setdefault(pool_key,
[]).append(vif.id)
self._available_ports_pools.setdefault(pool_key, {}).setdefault(
tuple(sorted(security_groups)), []).append(vif.id)
def free_pool(self, trunk_ips=None):
"""Removes subports from the pool and deletes neutron port resource.
@ -779,19 +858,21 @@ class NestedVIFPool(BaseVIFPool):
# on the available_ports_pools dict. The next call forces it to be on
# that dict before cleaning it up
self._trigger_return_to_pool()
for pool_key, ports_ids in self._available_ports_pools.items():
for pool_key, ports in self._available_ports_pools.items():
if self._get_pool_key_net(pool_key) != net_id:
continue
self._available_ports_pools[pool_key] = []
trunk_id = self._get_trunk_id(neutron, pool_key)
ports_id = [p_id for sg_ports in ports.values()
for p_id in sg_ports]
try:
self._drv_vif._remove_subports(neutron, trunk_id, ports_ids)
self._drv_vif._remove_subports(neutron, trunk_id, ports_id)
except n_exc.NeutronClientException:
LOG.exception('Error removing subports from trunk: %s',
trunk_id)
continue
for port_id in ports_ids:
for port_id in ports_id:
try:
self._drv_vif._release_vlan_id(
self._existing_vifs[port_id].vlan_id)

View File

@ -187,15 +187,14 @@ class BaseVIFPool(test_base.TestCase):
pod = mock.sentinel.pod
project_id = str(uuid.uuid4())
subnets = mock.sentinel.subnets
security_groups = [mock.sentinel.security_groups]
pool_key = (mock.sentinel.host_addr, project_id,
tuple(security_groups))
security_groups = 'test-sg'
pool_key = (mock.sentinel.host_addr, project_id)
vif = osv_vif.VIFOpenVSwitch(id='0fa0e837-d34e-4580-a6c4-04f5f607d93e')
vifs = [vif]
m_driver._existing_vifs = {}
m_driver._available_ports_pools = {}
m_driver._last_update = {pool_key: 1}
m_driver._last_update = {pool_key: {tuple(security_groups): 1}}
oslo_cfg.CONF.set_override('ports_pool_min',
5,
@ -206,7 +205,8 @@ class BaseVIFPool(test_base.TestCase):
m_driver._get_pool_size.return_value = 2
vif_driver.request_vifs.return_value = vifs
cls._populate_pool(m_driver, pool_key, pod, subnets)
cls._populate_pool(m_driver, pool_key, pod, subnets,
tuple(security_groups))
m_driver._get_pool_size.assert_called_once()
m_driver._drv_vif.request_vifs.assert_called_once()
@ -218,16 +218,16 @@ class BaseVIFPool(test_base.TestCase):
pod = mock.sentinel.pod
project_id = str(uuid.uuid4())
subnets = mock.sentinel.subnets
security_groups = [mock.sentinel.security_groups]
pool_key = (mock.sentinel.host_addr, project_id,
tuple(security_groups))
security_groups = 'test-sg'
pool_key = (mock.sentinel.host_addr, project_id)
oslo_cfg.CONF.set_override('ports_pool_update_frequency',
15,
group='vif_pool')
m_driver._last_update = {pool_key: 1}
m_driver._last_update = {pool_key: {tuple(security_groups): 1}}
cls._populate_pool(m_driver, pool_key, pod, subnets)
cls._populate_pool(m_driver, pool_key, pod, subnets,
tuple(security_groups))
m_driver._get_pool_size.assert_not_called()
@mock.patch('time.time', return_value=50)
@ -244,9 +244,8 @@ class BaseVIFPool(test_base.TestCase):
pod = mock.sentinel.pod
project_id = str(uuid.uuid4())
subnets = mock.sentinel.subnets
security_groups = [mock.sentinel.security_groups]
pool_key = (mock.sentinel.host_addr, project_id,
tuple(security_groups))
security_groups = 'test-sg'
pool_key = (mock.sentinel.host_addr, project_id)
oslo_cfg.CONF.set_override('ports_pool_update_frequency',
15,
@ -254,10 +253,11 @@ class BaseVIFPool(test_base.TestCase):
oslo_cfg.CONF.set_override('ports_pool_min',
5,
group='vif_pool')
m_driver._last_update = {pool_key: 1}
m_driver._last_update = {pool_key: {tuple(security_groups): 1}}
m_driver._get_pool_size.return_value = 10
cls._populate_pool(m_driver, pool_key, pod, subnets)
cls._populate_pool(m_driver, pool_key, pod, subnets,
tuple(security_groups))
m_driver._get_pool_size.assert_called_once()
m_driver._drv_vif.request_vifs.assert_not_called()
@ -341,11 +341,12 @@ class NeutronVIFPool(test_base.TestCase):
port_id = str(uuid.uuid4())
port = mock.sentinel.port
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
pod = get_pod_obj()
m_driver._available_ports_pools = {
pool_key: collections.deque([port_id])}
pool_key: {tuple(security_groups): collections.deque([port_id])}}
m_driver._existing_vifs = {port_id: port}
m_get_port_name.return_value = get_pod_name(pod)
@ -362,7 +363,7 @@ class NeutronVIFPool(test_base.TestCase):
m_driver._get_pool_size.return_value = pool_length
self.assertEqual(port, cls._get_port_from_pool(
m_driver, pool_key, pod, subnets))
m_driver, pool_key, pod, subnets, tuple(security_groups)))
neutron.update_port.assert_called_once_with(
port_id,
@ -386,11 +387,12 @@ class NeutronVIFPool(test_base.TestCase):
port_id = str(uuid.uuid4())
port = mock.sentinel.port
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
pod = get_pod_obj()
m_driver._available_ports_pools = {
pool_key: collections.deque([port_id])}
pool_key: {tuple(security_groups): collections.deque([port_id])}}
m_driver._existing_vifs = {port_id: port}
m_get_port_name.return_value = get_pod_name(pod)
@ -404,7 +406,7 @@ class NeutronVIFPool(test_base.TestCase):
m_driver._get_pool_size.return_value = pool_length
self.assertEqual(port, cls._get_port_from_pool(
m_driver, pool_key, pod, subnets))
m_driver, pool_key, pod, subnets, tuple(security_groups)))
neutron.update_port.assert_called_once_with(
port_id,
@ -424,11 +426,124 @@ class NeutronVIFPool(test_base.TestCase):
pod = get_pod_obj()
pool_key = mock.sentinel.pool_key
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
m_driver._available_ports_pools = {pool_key: collections.deque([])}
m_driver._available_ports_pools = {
pool_key: {tuple(security_groups): collections.deque([])}}
m_driver._last_update = {pool_key: {tuple(security_groups): 1}}
self.assertRaises(exceptions.ResourceNotReady, cls._get_port_from_pool,
m_driver, pool_key, pod, subnets)
m_driver, pool_key, pod, subnets,
tuple(security_groups))
neutron.update_port.assert_not_called()
@mock.patch('eventlet.spawn')
def test__get_port_from_pool_empty_pool_reuse(self, m_eventlet):
cls = vif_pool.NeutronVIFPool
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pod = get_pod_obj()
port_id = str(uuid.uuid4())
port = mock.sentinel.port
pool_key = mock.sentinel.pool_key
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
security_groups_2 = 'test-sg2'
oslo_cfg.CONF.set_override('port_debug',
False,
group='kubernetes')
pool_length = 5
m_driver._get_pool_size.return_value = pool_length
m_driver._available_ports_pools = {
pool_key: {tuple(security_groups): collections.deque([]),
tuple(security_groups_2): collections.deque([port_id])}}
m_driver._last_update = {pool_key: {tuple(security_groups): 1,
tuple(security_groups_2): 0}}
m_driver._existing_vifs = {port_id: port}
self.assertEqual(port, cls._get_port_from_pool(
m_driver, pool_key, pod, subnets, tuple(security_groups)))
neutron.update_port.assert_called_once_with(
port_id,
{
"port": {
'security_groups': list(security_groups),
}
})
m_eventlet.assert_not_called()
@mock.patch('eventlet.spawn')
def test__get_port_from_pool_empty_pool_reuse_no_update_info(self,
m_eventlet):
cls = vif_pool.NeutronVIFPool
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pod = get_pod_obj()
port_id = str(uuid.uuid4())
port = mock.sentinel.port
pool_key = mock.sentinel.pool_key
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
security_groups_2 = 'test-sg2'
oslo_cfg.CONF.set_override('port_debug',
False,
group='kubernetes')
pool_length = 5
m_driver._get_pool_size.return_value = pool_length
m_driver._available_ports_pools = {
pool_key: {tuple(security_groups): collections.deque([]),
tuple(security_groups_2): collections.deque([port_id])}}
m_driver._last_update = {}
m_driver._existing_vifs = {port_id: port}
self.assertEqual(port, cls._get_port_from_pool(
m_driver, pool_key, pod, subnets, tuple(security_groups)))
neutron.update_port.assert_called_once_with(
port_id,
{
"port": {
'security_groups': list(security_groups),
}
})
m_eventlet.assert_not_called()
def test__get_port_from_pool_empty_pool_reuse_no_ports(self):
cls = vif_pool.NeutronVIFPool
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pod = get_pod_obj()
port_id = str(uuid.uuid4())
port = mock.sentinel.port
pool_key = mock.sentinel.pool_key
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
security_groups_2 = 'test-sg2'
oslo_cfg.CONF.set_override('port_debug',
False,
group='kubernetes')
pool_length = 5
m_driver._get_pool_size.return_value = pool_length
m_driver._available_ports_pools = {
pool_key: {tuple(security_groups): collections.deque([]),
tuple(security_groups_2): collections.deque([])}}
m_driver._last_update = {}
m_driver._existing_vifs = {port_id: port}
self.assertRaises(exceptions.ResourceNotReady, cls._get_port_from_pool,
m_driver, pool_key, pod, subnets, tuple(
security_groups))
neutron.update_port.assert_not_called()
@ -438,7 +553,7 @@ class NeutronVIFPool(test_base.TestCase):
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 5
@ -462,7 +577,6 @@ class NeutronVIFPool(test_base.TestCase):
"port": {
'name': constants.KURYR_PORT_NAME,
'device_id': '',
'security_groups': ['security_group']
}
})
neutron.delete_port.assert_not_called()
@ -473,7 +587,7 @@ class NeutronVIFPool(test_base.TestCase):
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 5
@ -499,7 +613,7 @@ class NeutronVIFPool(test_base.TestCase):
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 10
vif = mock.sentinel.vif
@ -524,7 +638,7 @@ class NeutronVIFPool(test_base.TestCase):
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 5
@ -552,7 +666,6 @@ class NeutronVIFPool(test_base.TestCase):
"port": {
'name': constants.KURYR_PORT_NAME,
'device_id': '',
'security_groups': ['security_group']
}
})
neutron.delete_port.assert_not_called()
@ -562,7 +675,7 @@ class NeutronVIFPool(test_base.TestCase):
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 10
vif = mock.sentinel.vif
@ -588,7 +701,7 @@ class NeutronVIFPool(test_base.TestCase):
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 10
@ -639,8 +752,7 @@ class NeutronVIFPool(test_base.TestCase):
vif = mock.sentinel.vif
m_to_osvif.return_value = vif
pool_key = (port['binding:host_id'], port['project_id'],
tuple(port['security_groups']), net_id)
pool_key = (port['binding:host_id'], port['project_id'], net_id)
m_driver._get_pool_key.return_value = pool_key
m_driver._get_trunks_info.return_value = ({}, {}, {})
@ -652,7 +764,8 @@ class NeutronVIFPool(test_base.TestCase):
m_to_osvif.assert_called_once_with(vif_plugin, port, subnet)
self.assertEqual(m_driver._existing_vifs[port_id], vif)
self.assertEqual(m_driver._available_ports_pools[pool_key], [port_id])
self.assertEqual(m_driver._available_ports_pools[pool_key],
{tuple(port['security_groups']): [port_id]})
@mock.patch('kuryr_kubernetes.os_vif_util.neutron_to_osvif_vif')
@mock.patch('kuryr_kubernetes.utils.get_subnet')
@ -681,9 +794,10 @@ class NeutronVIFPool(test_base.TestCase):
neutron = self.useFixture(k_fix.MockNeutronClient()).client
net_id = mock.sentinel.net_id
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
m_driver._available_ports_pools = {pool_key: [port_id]}
m_driver._available_ports_pools = {pool_key: {
tuple(['security_group']): [port_id]}}
m_driver._existing_vifs = {port_id: mock.sentinel.vif}
m_driver._get_pool_key_net.return_value = net_id
@ -701,9 +815,10 @@ class NeutronVIFPool(test_base.TestCase):
neutron = self.useFixture(k_fix.MockNeutronClient()).client
net_id = mock.sentinel.net_id
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
m_driver._available_ports_pools = {pool_key: [port_id]}
m_driver._available_ports_pools = {pool_key: {
tuple(['security_group']): [port_id]}}
m_driver._existing_vifs = {}
neutron.delete_port.side_effect = n_exc.PortNotFoundClient
@ -765,11 +880,12 @@ class NestedVIFPool(test_base.TestCase):
port_id = str(uuid.uuid4())
port = mock.sentinel.port
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
pod = get_pod_obj()
m_driver._available_ports_pools = {
pool_key: collections.deque([port_id])}
pool_key: {tuple(security_groups): collections.deque([port_id])}}
m_driver._existing_vifs = {port_id: port}
m_get_port_name.return_value = get_pod_name(pod)
@ -783,7 +899,7 @@ class NestedVIFPool(test_base.TestCase):
m_driver._get_pool_size.return_value = pool_length
self.assertEqual(port, cls._get_port_from_pool(
m_driver, pool_key, pod, subnets))
m_driver, pool_key, pod, subnets, tuple(security_groups)))
neutron.update_port.assert_called_once_with(
port_id,
@ -806,11 +922,12 @@ class NestedVIFPool(test_base.TestCase):
port_id = str(uuid.uuid4())
port = mock.sentinel.port
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
pod = get_pod_obj()
m_driver._available_ports_pools = {
pool_key: collections.deque([port_id])}
pool_key: {tuple(security_groups): collections.deque([port_id])}}
m_driver._existing_vifs = {port_id: port}
m_get_port_name.return_value = get_pod_name(pod)
@ -824,7 +941,7 @@ class NestedVIFPool(test_base.TestCase):
m_driver._get_pool_size.return_value = pool_length
self.assertEqual(port, cls._get_port_from_pool(
m_driver, pool_key, pod, subnets))
m_driver, pool_key, pod, subnets, tuple(security_groups)))
neutron.update_port.assert_called_once_with(
port_id,
@ -843,11 +960,124 @@ class NestedVIFPool(test_base.TestCase):
pod = mock.sentinel.pod
pool_key = mock.sentinel.pool_key
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
m_driver._available_ports_pools = {pool_key: collections.deque([])}
m_driver._available_ports_pools = {
pool_key: {tuple(security_groups): collections.deque([])}}
m_driver._last_update = {pool_key: {tuple(security_groups): 1}}
self.assertRaises(exceptions.ResourceNotReady, cls._get_port_from_pool,
m_driver, pool_key, pod, subnets)
m_driver, pool_key, pod, subnets, tuple(
security_groups))
neutron.update_port.assert_not_called()
@mock.patch('eventlet.spawn')
def test__get_port_from_pool_empty_pool_reuse(self, m_eventlet):
cls = vif_pool.NestedVIFPool
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pod = mock.sentinel.pod
port_id = str(uuid.uuid4())
port = mock.sentinel.port
pool_key = mock.sentinel.pool_key
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
security_groups_2 = 'test-sg2'
oslo_cfg.CONF.set_override('port_debug',
False,
group='kubernetes')
pool_length = 5
m_driver._get_pool_size.return_value = pool_length
m_driver._available_ports_pools = {
pool_key: {tuple(security_groups): collections.deque([]),
tuple(security_groups_2): collections.deque([port_id])}}
m_driver._last_update = {pool_key: {tuple(security_groups): 1,
tuple(security_groups_2): 0}}
m_driver._existing_vifs = {port_id: port}
self.assertEqual(port, cls._get_port_from_pool(
m_driver, pool_key, pod, subnets, tuple(security_groups)))
neutron.update_port.assert_called_once_with(
port_id,
{
"port": {
'security_groups': list(security_groups),
}
})
m_eventlet.assert_not_called()
@mock.patch('eventlet.spawn')
def test__get_port_from_pool_empty_pool_reuse_no_update_info(self,
m_eventlet):
cls = vif_pool.NestedVIFPool
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pod = mock.sentinel.pod
port_id = str(uuid.uuid4())
port = mock.sentinel.port
pool_key = mock.sentinel.pool_key
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
security_groups_2 = 'test-sg2'
oslo_cfg.CONF.set_override('port_debug',
False,
group='kubernetes')
pool_length = 5
m_driver._get_pool_size.return_value = pool_length
m_driver._available_ports_pools = {
pool_key: {tuple(security_groups): collections.deque([]),
tuple(security_groups_2): collections.deque([port_id])}}
m_driver._last_update = {}
m_driver._existing_vifs = {port_id: port}
self.assertEqual(port, cls._get_port_from_pool(
m_driver, pool_key, pod, subnets, tuple(security_groups)))
neutron.update_port.assert_called_once_with(
port_id,
{
"port": {
'security_groups': list(security_groups),
}
})
m_eventlet.assert_not_called()
def test__get_port_from_pool_empty_pool_reuse_no_ports(self):
cls = vif_pool.NestedVIFPool
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pod = mock.sentinel.pod
port_id = str(uuid.uuid4())
port = mock.sentinel.port
pool_key = mock.sentinel.pool_key
subnets = mock.sentinel.subnets
security_groups = 'test-sg'
security_groups_2 = 'test-sg2'
oslo_cfg.CONF.set_override('port_debug',
False,
group='kubernetes')
pool_length = 5
m_driver._get_pool_size.return_value = pool_length
m_driver._available_ports_pools = {
pool_key: {tuple(security_groups): collections.deque([]),
tuple(security_groups_2): collections.deque([])}}
m_driver._last_update = {}
m_driver._existing_vifs = {port_id: port}
self.assertRaises(exceptions.ResourceNotReady, cls._get_port_from_pool,
m_driver, pool_key, pod, subnets, tuple(
security_groups))
neutron.update_port.assert_not_called()
@ -857,7 +1087,7 @@ class NestedVIFPool(test_base.TestCase):
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 5
@ -880,7 +1110,6 @@ class NestedVIFPool(test_base.TestCase):
{
"port": {
'name': constants.KURYR_PORT_NAME,
'security_groups': ['security_group']
}
})
neutron.delete_port.assert_not_called()
@ -891,7 +1120,7 @@ class NestedVIFPool(test_base.TestCase):
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 5
@ -920,7 +1149,7 @@ class NestedVIFPool(test_base.TestCase):
vif_driver = mock.MagicMock(spec=cls_vif_driver)
m_driver._drv_vif = vif_driver
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 10
vif = mock.MagicMock()
@ -953,7 +1182,7 @@ class NestedVIFPool(test_base.TestCase):
m_driver = mock.MagicMock(spec=cls)
neutron = self.useFixture(k_fix.MockNeutronClient()).client
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 5
@ -977,7 +1206,6 @@ class NestedVIFPool(test_base.TestCase):
{
"port": {
'name': constants.KURYR_PORT_NAME,
'security_groups': ['security_group']
}
})
neutron.delete_port.assert_not_called()
@ -990,7 +1218,7 @@ class NestedVIFPool(test_base.TestCase):
vif_driver = mock.MagicMock(spec=cls_vif_driver)
m_driver._drv_vif = vif_driver
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 10
vif = mock.MagicMock()
@ -1027,7 +1255,7 @@ class NestedVIFPool(test_base.TestCase):
vif_driver = mock.MagicMock(spec=cls_vif_driver)
m_driver._drv_vif = vif_driver
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
pool_length = 10
trunk_id = str(uuid.uuid4())
@ -1163,15 +1391,15 @@ class NestedVIFPool(test_base.TestCase):
vif = mock.sentinel.vif
m_to_osvif.return_value = vif
pool_key = (port['binding:host_id'], port['project_id'],
tuple(port['security_groups']), net_id)
pool_key = (port['binding:host_id'], port['project_id'], net_id)
m_driver._get_pool_key.return_value = pool_key
cls._precreated_ports(m_driver, 'recover')
m_driver._get_trunks_info.assert_called_once()
self.assertEqual(m_driver._existing_vifs[port_id], vif)
self.assertEqual(m_driver._available_ports_pools[pool_key], [port_id])
self.assertEqual(m_driver._available_ports_pools[pool_key],
{tuple(port['security_groups']): [port_id]})
neutron.delete_port.assert_not_called()
def test__precreated_ports_free(self):
@ -1200,10 +1428,10 @@ class NestedVIFPool(test_base.TestCase):
m_driver._get_trunks_info.return_value = (p_ports, a_subports,
subnets)
pool_key = (port['binding:host_id'], port['project_id'],
tuple(port['security_groups']), net_id)
pool_key = (port['binding:host_id'], port['project_id'], net_id)
m_driver._get_pool_key.return_value = pool_key
m_driver._available_ports_pools = {pool_key: [port_id]}
m_driver._available_ports_pools = {
pool_key: {tuple(port['security_groups']): [port_id]}}
m_driver._existing_vifs = {port_id: mock.sentinel.vif}
cls._precreated_ports(m_driver, 'free')
@ -1214,7 +1442,8 @@ class NestedVIFPool(test_base.TestCase):
m_driver._drv_vif._release_vlan_id.assert_called_once()
self.assertEqual(m_driver._existing_vifs, {})
self.assertEqual(m_driver._available_ports_pools[pool_key], [])
self.assertEqual(m_driver._available_ports_pools[pool_key][tuple(
port['security_groups'])], [])
@mock.patch('kuryr_kubernetes.os_vif_util.'
'neutron_to_osvif_vif_nested_vlan')
@ -1302,8 +1531,7 @@ class NestedVIFPool(test_base.TestCase):
vif = mock.sentinel.vif
m_to_osvif.return_value = vif
pool_key = (port1['binding:host_id'], port1['project_id'],
tuple(port1['security_groups']), net_id)
pool_key = (port1['binding:host_id'], port1['project_id'], net_id)
m_driver._get_pool_key.return_value = pool_key
cls._precreated_ports(m_driver, 'recover')
@ -1311,7 +1539,8 @@ class NestedVIFPool(test_base.TestCase):
self.assertEqual(m_driver._existing_vifs, {port_id1: vif,
port_id2: vif})
self.assertEqual(m_driver._available_ports_pools[pool_key],
[port_id1, port_id2])
{tuple(port1['security_groups']): [port_id1,
port_id2]})
neutron.delete_port.assert_not_called()
@ddt.data(('recover'), ('free'))
@ -1382,13 +1611,14 @@ class NestedVIFPool(test_base.TestCase):
neutron = self.useFixture(k_fix.MockNeutronClient()).client
net_id = mock.sentinel.net_id
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
trunk_id = str(uuid.uuid4())
vif = mock.MagicMock()
vlan_id = mock.sentinel.vlan_id
vif.vlan_id = vlan_id
m_driver._available_ports_pools = {pool_key: [port_id]}
m_driver._available_ports_pools = {pool_key: {
tuple(['security_group']): [port_id]}}
m_driver._existing_vifs = {port_id: vif}
m_driver._get_trunk_id.return_value = trunk_id
@ -1415,13 +1645,14 @@ class NestedVIFPool(test_base.TestCase):
neutron = self.useFixture(k_fix.MockNeutronClient()).client
net_id = mock.sentinel.net_id
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
trunk_id = str(uuid.uuid4())
vif = mock.MagicMock()
vlan_id = mock.sentinel.vlan_id
vif.vlan_id = vlan_id
m_driver._available_ports_pools = {pool_key: [port_id]}
m_driver._available_ports_pools = {pool_key: {
tuple(['security_group']): [port_id]}}
m_driver._existing_vifs = {port_id: vif}
m_driver._get_trunk_id.return_value = trunk_id
@ -1450,13 +1681,14 @@ class NestedVIFPool(test_base.TestCase):
neutron = self.useFixture(k_fix.MockNeutronClient()).client
net_id = mock.sentinel.net_id
pool_key = ('node_ip', 'project_id', tuple(['security_group']))
pool_key = ('node_ip', 'project_id')
port_id = str(uuid.uuid4())
trunk_id = str(uuid.uuid4())
vif = mock.MagicMock()
vlan_id = mock.sentinel.vlan_id
vif.vlan_id = vlan_id
m_driver._available_ports_pools = {pool_key: [port_id]}
m_driver._available_ports_pools = {pool_key: {
tuple(['security_group']): [port_id]}}
m_driver._existing_vifs = {}
m_driver._get_trunk_id.return_value = trunk_id