From e8c418c196cb2297b226be6830275e13563c08a4 Mon Sep 17 00:00:00 2001 From: Luis Tomas Bolivar Date: Mon, 4 Feb 2019 12:32:12 +0100 Subject: [PATCH] Pools support with Network Policies This patch adapts the pools support to the use of Network Policies. Unlike with the other drivers, when Network Policies are applied the pods' ports changes their security groups while being used. That means their original pool will not fit them anymore with the next two consequences: 1.- Ports will have their original SG reapplied when pods are deleted, with the consequent performance impact do to increasing the number of calls to neutron 2.- Original pools may become useless, as different SGs are being used, therefore wasting neutron ports To accomodate for network policies, this patch removes the SG ids from the pool key, merging all the pools with same network/project/host ids but with different security groups into the same pool. This will not change the behavior of the other drivers as there was a unique pool per network/project/host ids already, i.e., the same SG ids were being used. However, this will helps to avoid problem 1) as it is no longer re-applying the SG, but simply putting the port back into its current pool. And it will fix problem 2) as it will pick a port for an existing pool that matches network/project/host ids. First it will search for one with already matching SGs, and if not found, it will recycle one of the others by reapplying the needed SGs (note it picks a port from one of the pools that are less frequently used -- assumes they may belong to a deleted NP that it is not needed anymore, thus removing the port wastage problem) Partially Implements: blueprint k8s-network-policies Change-Id: I2c1e47fd5112c64b8e9984e5ac5d8572d91ac202 --- .../controller/drivers/vif_pool.py | 217 +++++++---- .../unit/controller/drivers/test_vif_pool.py | 364 ++++++++++++++---- 2 files changed, 447 insertions(+), 134 deletions(-) diff --git a/kuryr_kubernetes/controller/drivers/vif_pool.py b/kuryr_kubernetes/controller/drivers/vif_pool.py index 9ad60ebe4..0e34f78f2 100644 --- a/kuryr_kubernetes/controller/drivers/vif_pool.py +++ b/kuryr_kubernetes/controller/drivers/vif_pool.py @@ -155,23 +155,25 @@ class BaseVIFPool(base.VIFPoolDriver): def update_vif_sgs(self, pod, sgs): self._drv_vif.update_vif_sgs(pod, sgs) - def _get_pool_size(self, pool_key=None): - return len(self._available_ports_pools.get(pool_key, [])) + def _get_pool_size(self, pool_key): + pool = self._available_ports_pools.get(pool_key, {}) + pool_members = [] + for port_list in pool.values(): + pool_members.extend(port_list) + return len(pool_members) def _get_host_addr(self, pod): return pod['status']['hostIP'] - def _get_pool_key(self, host, project_id, security_groups, net_id=None, - subnets=None): + def _get_pool_key(self, host, project_id, net_id=None, subnets=None): if not net_id and subnets: net_obj = list(subnets.values())[0] net_id = net_obj.id - pool_key = (host, project_id, tuple(sorted(security_groups)), - net_id) + pool_key = (host, project_id, net_id) return pool_key def _get_pool_key_net(self, pool_key): - return pool_key[3] + return pool_key[2] def request_vif(self, pod, project_id, subnets, security_groups): try: @@ -179,33 +181,37 @@ class BaseVIFPool(base.VIFPoolDriver): except KeyError: LOG.warning("Pod has not been scheduled yet.") raise - pool_key = self._get_pool_key(host_addr, project_id, security_groups, - None, subnets) + pool_key = self._get_pool_key(host_addr, project_id, None, subnets) try: - return self._get_port_from_pool(pool_key, pod, subnets) + return self._get_port_from_pool(pool_key, pod, subnets, + tuple(sorted(security_groups))) except exceptions.ResourceNotReady: LOG.warning("Ports pool does not have available ports!") - eventlet.spawn(self._populate_pool, pool_key, pod, subnets) + eventlet.spawn(self._populate_pool, pool_key, pod, subnets, + tuple(sorted(security_groups))) raise - def _get_port_from_pool(self, pool_key, pod, subnets): + def _get_port_from_pool(self, pool_key, pod, subnets, security_groups): raise NotImplementedError() - def _populate_pool(self, pool_key, pod, subnets): + def _populate_pool(self, pool_key, pod, subnets, security_groups): # REVISIT(ltomasbo): Drop the subnets parameter and get the information # from the pool_key, which will be required when multi-network is # supported now = time.time() - try: - if (now - oslo_cfg.CONF.vif_pool.ports_pool_update_frequency < - self._last_update.get(pool_key, 0)): - LOG.info("Not enough time since the last pool update") + pool_updates = self._last_update.get(pool_key) + if pool_updates: + last_update = pool_updates.get(security_groups, 0) + try: + if (now - oslo_cfg.CONF.vif_pool.ports_pool_update_frequency < + last_update): + LOG.info("Not enough time since the last pool update") + return + except AttributeError: + LOG.info("Kuryr-controller not yet ready to populate pools") return - except AttributeError: - LOG.info("Kuryr-controller not yet ready to populate pools") - return - self._last_update[pool_key] = now + self._last_update[pool_key] = {security_groups: now} pool_size = self._get_pool_size(pool_key) if pool_size < oslo_cfg.CONF.vif_pool.ports_pool_min: @@ -215,18 +221,19 @@ class BaseVIFPool(base.VIFPoolDriver): pod=pod, project_id=pool_key[1], subnets=subnets, - security_groups=list(pool_key[2]), + security_groups=security_groups, num_ports=num_ports) for vif in vifs: self._existing_vifs[vif.id] = vif - self._available_ports_pools.setdefault(pool_key, - []).append(vif.id) + self._available_ports_pools.setdefault( + pool_key, {}).setdefault( + security_groups, []).append(vif.id) def release_vif(self, pod, vif, project_id, security_groups): host_addr = self._get_host_addr(pod) - pool_key = self._get_pool_key(host_addr, project_id, security_groups, - vif.network.id, None) + pool_key = self._get_pool_key(host_addr, project_id, vif.network.id, + None) try: if not self._existing_vifs.get(vif.id): @@ -288,12 +295,10 @@ class BaseVIFPool(base.VIFPoolDriver): @lockutils.synchronized('return_to_pool_baremetal') @lockutils.synchronized('return_to_pool_nested') def sync_pools(self): - self._available_ports_pools = collections.defaultdict( - collections.deque) - self._existing_vifs = collections.defaultdict(collections.defaultdict) - self._recyclable_ports = collections.defaultdict( - collections.defaultdict) - self._last_update = collections.defaultdict(collections.defaultdict) + self._available_ports_pools = collections.defaultdict() + self._existing_vifs = collections.defaultdict() + self._recyclable_ports = collections.defaultdict() + self._last_update = collections.defaultdict() # NOTE(ltomasbo): Ensure previously created ports are recovered into # their respective pools self._recover_precreated_ports() @@ -365,11 +370,45 @@ class NeutronVIFPool(BaseVIFPool): def _get_host_addr(self, pod): return pod['spec']['nodeName'] - def _get_port_from_pool(self, pool_key, pod, subnets): + def _get_port_from_pool(self, pool_key, pod, subnets, security_groups): try: - port_id = self._available_ports_pools[pool_key].pop() - except (IndexError, AttributeError): + pool_ports = self._available_ports_pools[pool_key] + except (KeyError, AttributeError): raise exceptions.ResourceNotReady(pod) + try: + port_id = pool_ports[security_groups].pop() + except (KeyError, IndexError): + # Get another port from the pool and update the SG to the + # appropriate one. It uses a port from the group that was updated + # longer ago + pool_updates = self._last_update.get(pool_key, {}) + if not pool_updates: + # No pools update info. Selecting a random one + for sg_group, ports in pool_ports.items(): + if len(ports) > 0: + port_id = pool_ports[sg_group].pop() + break + else: + raise exceptions.ResourceNotReady(pod) + else: + min_date = -1 + for sg_group, date in pool_updates.items(): + if pool_ports.get(sg_group): + if min_date == -1 or date < min_date: + min_date = date + min_sg_group = sg_group + if min_date == -1: + # pool is empty, no port to reuse + raise exceptions.ResourceNotReady(pod) + port_id = pool_ports[min_sg_group].pop() + neutron = clients.get_neutron_client() + neutron.update_port( + port_id, + { + "port": { + 'security_groups': list(security_groups) + } + }) if config.CONF.kubernetes.port_debug: neutron = clients.get_neutron_client() neutron.update_port( @@ -383,7 +422,8 @@ class NeutronVIFPool(BaseVIFPool): # check if the pool needs to be populated if (self._get_pool_size(pool_key) < oslo_cfg.CONF.vif_pool.ports_pool_min): - eventlet.spawn(self._populate_pool, pool_key, pod, subnets) + eventlet.spawn(self._populate_pool, pool_key, pod, subnets, + security_groups) return self._existing_vifs[port_id] def _return_ports_to_pool(self): @@ -414,7 +454,8 @@ class NeutronVIFPool(BaseVIFPool): device_owner=kl_const.DEVICE_OWNER) for port in kuryr_ports: if port['id'] in self._recyclable_ports: - sg_current[port['id']] = port['security_groups'] + sg_current[port['id']] = tuple(sorted( + port['security_groups'])) for port_id, pool_key in self._recyclable_ports.copy().items(): if (not oslo_cfg.CONF.vif_pool.ports_pool_max or @@ -423,25 +464,24 @@ class NeutronVIFPool(BaseVIFPool): port_name = (constants.KURYR_PORT_NAME if config.CONF.kubernetes.port_debug else '') - if (config.CONF.kubernetes.port_debug or - list(pool_key[2]) != sg_current.get(port_id)): + if config.CONF.kubernetes.port_debug: try: neutron.update_port( port_id, { "port": { 'name': port_name, - 'device_id': '', - 'security_groups': list(pool_key[2]) + 'device_id': '' } }) except n_exc.NeutronClientException: - LOG.warning("Error preparing port %s to be " + LOG.warning("Error changing name for port %s to be " "reused, put back on the cleanable " "pool.", port_id) continue self._available_ports_pools.setdefault( - pool_key, []).append(port_id) + pool_key, {}).setdefault( + sg_current.get(port_id), []).append(port_id) else: try: del self._existing_vifs[port_id] @@ -490,12 +530,13 @@ class NeutronVIFPool(BaseVIFPool): net_obj = subnet[subnet_id] pool_key = self._get_pool_key(port_host, port['project_id'], - port['security_groups'], net_obj.id, None) self._existing_vifs[port['id']] = vif self._available_ports_pools.setdefault( - pool_key, []).append(port['id']) + pool_key, {}).setdefault( + tuple(sorted(port['security_groups'])), []).append( + port['id']) LOG.info("PORTS POOL: pools updated with pre-created ports") self._create_healthcheck_file() @@ -512,10 +553,13 @@ class NeutronVIFPool(BaseVIFPool): # on the available_ports_pools dict. The next call forces it to be on # that dict before cleaning it up self._trigger_return_to_pool() - for pool_key, ports_id in self._available_ports_pools.items(): + for pool_key, ports in self._available_ports_pools.items(): if self._get_pool_key_net(pool_key) != net_id: continue self._available_ports_pools[pool_key] = [] + ports_id = [] + for sg_ports in ports.values(): + ports_id.extend(sg_ports) for port_id in ports_id: try: del self._existing_vifs[port_id] @@ -548,11 +592,45 @@ class NestedVIFPool(BaseVIFPool): def set_vif_driver(self, driver): self._drv_vif = driver - def _get_port_from_pool(self, pool_key, pod, subnets): + def _get_port_from_pool(self, pool_key, pod, subnets, security_groups): try: - port_id = self._available_ports_pools[pool_key].pop() - except (IndexError, AttributeError): + pool_ports = self._available_ports_pools[pool_key] + except (KeyError, AttributeError): raise exceptions.ResourceNotReady(pod) + try: + port_id = pool_ports[security_groups].pop() + except (KeyError, IndexError): + # Get another port from the pool and update the SG to the + # appropriate one. It uses a port from the group that was updated + # longer ago + pool_updates = self._last_update.get(pool_key, {}) + if not pool_updates: + # No pools update info. Selecting a random one + for sg_group, ports in pool_ports.items(): + if len(ports) > 0: + port_id = pool_ports[sg_group].pop() + break + else: + raise exceptions.ResourceNotReady(pod) + else: + min_date = -1 + for sg_group, date in pool_updates.items(): + if pool_ports.get(sg_group): + if min_date == -1 or date < min_date: + min_date = date + min_sg_group = sg_group + if min_date == -1: + # pool is empty, no port to reuse + raise exceptions.ResourceNotReady(pod) + port_id = pool_ports[min_sg_group].pop() + neutron = clients.get_neutron_client() + neutron.update_port( + port_id, + { + "port": { + 'security_groups': list(security_groups) + } + }) if config.CONF.kubernetes.port_debug: neutron = clients.get_neutron_client() neutron.update_port( @@ -565,7 +643,8 @@ class NestedVIFPool(BaseVIFPool): # check if the pool needs to be populated if (self._get_pool_size(pool_key) < oslo_cfg.CONF.vif_pool.ports_pool_min): - eventlet.spawn(self._populate_pool, pool_key, pod, subnets) + eventlet.spawn(self._populate_pool, pool_key, pod, subnets, + security_groups) return self._existing_vifs[port_id] def _return_ports_to_pool(self): @@ -596,7 +675,8 @@ class NestedVIFPool(BaseVIFPool): device_owner=['trunk:subport', kl_const.DEVICE_OWNER]) for subport in kuryr_subports: if subport['id'] in self._recyclable_ports: - sg_current[subport['id']] = subport['security_groups'] + sg_current[subport['id']] = tuple(sorted( + subport['security_groups'])) for port_id, pool_key in self._recyclable_ports.copy().items(): if (not oslo_cfg.CONF.vif_pool.ports_pool_max or @@ -605,24 +685,23 @@ class NestedVIFPool(BaseVIFPool): port_name = (constants.KURYR_PORT_NAME if config.CONF.kubernetes.port_debug else '') - if (config.CONF.kubernetes.port_debug or - list(pool_key[2]) != sg_current.get(port_id)): + if config.CONF.kubernetes.port_debug: try: neutron.update_port( port_id, { "port": { 'name': port_name, - 'security_groups': list(pool_key[2]) } }) except n_exc.NeutronClientException: - LOG.warning("Error preparing port %s to be " + LOG.warning("Error changing name for port %s to be " "reused, put back on the cleanable " "pool.", port_id) continue self._available_ports_pools.setdefault( - pool_key, []).append(port_id) + pool_key, {}).setdefault( + sg_current.get(port_id), []).append(port_id) else: trunk_id = self._get_trunk_id(neutron, pool_key) try: @@ -701,8 +780,6 @@ class NestedVIFPool(BaseVIFPool): net_obj = subnet[subnet_id] pool_key = self._get_pool_key(host_addr, kuryr_subport['project_id'], - kuryr_subport[ - 'security_groups'], net_obj.id, None) if action == 'recover': @@ -711,7 +788,9 @@ class NestedVIFPool(BaseVIFPool): self._existing_vifs[kuryr_subport['id']] = vif self._available_ports_pools.setdefault( - pool_key, []).append(kuryr_subport['id']) + pool_key, {}).setdefault(tuple(sorted( + kuryr_subport['security_groups'])), + []).append(kuryr_subport['id']) elif action == 'free': try: @@ -721,8 +800,9 @@ class NestedVIFPool(BaseVIFPool): self._drv_vif._release_vlan_id( subport['segmentation_id']) del self._existing_vifs[kuryr_subport['id']] - self._available_ports_pools[pool_key].remove( - kuryr_subport['id']) + self._available_ports_pools[pool_key][ + tuple(sorted(kuryr_subport['security_groups'] + ))].remove(kuryr_subport['id']) except n_exc.PortNotFoundClient: LOG.debug('Unable to release port %s as it no ' 'longer exists.', kuryr_subport['id']) @@ -752,12 +832,11 @@ class NestedVIFPool(BaseVIFPool): num_ports=num_ports, trunk_ip=trunk_ip) - pool_key = self._get_pool_key(trunk_ip, project_id, security_groups, - None, subnets) + pool_key = self._get_pool_key(trunk_ip, project_id, None, subnets) for vif in vifs: self._existing_vifs[vif.id] = vif - self._available_ports_pools.setdefault(pool_key, - []).append(vif.id) + self._available_ports_pools.setdefault(pool_key, {}).setdefault( + tuple(sorted(security_groups)), []).append(vif.id) def free_pool(self, trunk_ips=None): """Removes subports from the pool and deletes neutron port resource. @@ -779,19 +858,21 @@ class NestedVIFPool(BaseVIFPool): # on the available_ports_pools dict. The next call forces it to be on # that dict before cleaning it up self._trigger_return_to_pool() - for pool_key, ports_ids in self._available_ports_pools.items(): + for pool_key, ports in self._available_ports_pools.items(): if self._get_pool_key_net(pool_key) != net_id: continue self._available_ports_pools[pool_key] = [] trunk_id = self._get_trunk_id(neutron, pool_key) + ports_id = [p_id for sg_ports in ports.values() + for p_id in sg_ports] try: - self._drv_vif._remove_subports(neutron, trunk_id, ports_ids) + self._drv_vif._remove_subports(neutron, trunk_id, ports_id) except n_exc.NeutronClientException: LOG.exception('Error removing subports from trunk: %s', trunk_id) continue - for port_id in ports_ids: + for port_id in ports_id: try: self._drv_vif._release_vlan_id( self._existing_vifs[port_id].vlan_id) diff --git a/kuryr_kubernetes/tests/unit/controller/drivers/test_vif_pool.py b/kuryr_kubernetes/tests/unit/controller/drivers/test_vif_pool.py index 8885dde35..ec1173371 100644 --- a/kuryr_kubernetes/tests/unit/controller/drivers/test_vif_pool.py +++ b/kuryr_kubernetes/tests/unit/controller/drivers/test_vif_pool.py @@ -187,15 +187,14 @@ class BaseVIFPool(test_base.TestCase): pod = mock.sentinel.pod project_id = str(uuid.uuid4()) subnets = mock.sentinel.subnets - security_groups = [mock.sentinel.security_groups] - pool_key = (mock.sentinel.host_addr, project_id, - tuple(security_groups)) + security_groups = 'test-sg' + pool_key = (mock.sentinel.host_addr, project_id) vif = osv_vif.VIFOpenVSwitch(id='0fa0e837-d34e-4580-a6c4-04f5f607d93e') vifs = [vif] m_driver._existing_vifs = {} m_driver._available_ports_pools = {} - m_driver._last_update = {pool_key: 1} + m_driver._last_update = {pool_key: {tuple(security_groups): 1}} oslo_cfg.CONF.set_override('ports_pool_min', 5, @@ -206,7 +205,8 @@ class BaseVIFPool(test_base.TestCase): m_driver._get_pool_size.return_value = 2 vif_driver.request_vifs.return_value = vifs - cls._populate_pool(m_driver, pool_key, pod, subnets) + cls._populate_pool(m_driver, pool_key, pod, subnets, + tuple(security_groups)) m_driver._get_pool_size.assert_called_once() m_driver._drv_vif.request_vifs.assert_called_once() @@ -218,16 +218,16 @@ class BaseVIFPool(test_base.TestCase): pod = mock.sentinel.pod project_id = str(uuid.uuid4()) subnets = mock.sentinel.subnets - security_groups = [mock.sentinel.security_groups] - pool_key = (mock.sentinel.host_addr, project_id, - tuple(security_groups)) + security_groups = 'test-sg' + pool_key = (mock.sentinel.host_addr, project_id) oslo_cfg.CONF.set_override('ports_pool_update_frequency', 15, group='vif_pool') - m_driver._last_update = {pool_key: 1} + m_driver._last_update = {pool_key: {tuple(security_groups): 1}} - cls._populate_pool(m_driver, pool_key, pod, subnets) + cls._populate_pool(m_driver, pool_key, pod, subnets, + tuple(security_groups)) m_driver._get_pool_size.assert_not_called() @mock.patch('time.time', return_value=50) @@ -244,9 +244,8 @@ class BaseVIFPool(test_base.TestCase): pod = mock.sentinel.pod project_id = str(uuid.uuid4()) subnets = mock.sentinel.subnets - security_groups = [mock.sentinel.security_groups] - pool_key = (mock.sentinel.host_addr, project_id, - tuple(security_groups)) + security_groups = 'test-sg' + pool_key = (mock.sentinel.host_addr, project_id) oslo_cfg.CONF.set_override('ports_pool_update_frequency', 15, @@ -254,10 +253,11 @@ class BaseVIFPool(test_base.TestCase): oslo_cfg.CONF.set_override('ports_pool_min', 5, group='vif_pool') - m_driver._last_update = {pool_key: 1} + m_driver._last_update = {pool_key: {tuple(security_groups): 1}} m_driver._get_pool_size.return_value = 10 - cls._populate_pool(m_driver, pool_key, pod, subnets) + cls._populate_pool(m_driver, pool_key, pod, subnets, + tuple(security_groups)) m_driver._get_pool_size.assert_called_once() m_driver._drv_vif.request_vifs.assert_not_called() @@ -341,11 +341,12 @@ class NeutronVIFPool(test_base.TestCase): port_id = str(uuid.uuid4()) port = mock.sentinel.port subnets = mock.sentinel.subnets + security_groups = 'test-sg' pod = get_pod_obj() m_driver._available_ports_pools = { - pool_key: collections.deque([port_id])} + pool_key: {tuple(security_groups): collections.deque([port_id])}} m_driver._existing_vifs = {port_id: port} m_get_port_name.return_value = get_pod_name(pod) @@ -362,7 +363,7 @@ class NeutronVIFPool(test_base.TestCase): m_driver._get_pool_size.return_value = pool_length self.assertEqual(port, cls._get_port_from_pool( - m_driver, pool_key, pod, subnets)) + m_driver, pool_key, pod, subnets, tuple(security_groups))) neutron.update_port.assert_called_once_with( port_id, @@ -386,11 +387,12 @@ class NeutronVIFPool(test_base.TestCase): port_id = str(uuid.uuid4()) port = mock.sentinel.port subnets = mock.sentinel.subnets + security_groups = 'test-sg' pod = get_pod_obj() m_driver._available_ports_pools = { - pool_key: collections.deque([port_id])} + pool_key: {tuple(security_groups): collections.deque([port_id])}} m_driver._existing_vifs = {port_id: port} m_get_port_name.return_value = get_pod_name(pod) @@ -404,7 +406,7 @@ class NeutronVIFPool(test_base.TestCase): m_driver._get_pool_size.return_value = pool_length self.assertEqual(port, cls._get_port_from_pool( - m_driver, pool_key, pod, subnets)) + m_driver, pool_key, pod, subnets, tuple(security_groups))) neutron.update_port.assert_called_once_with( port_id, @@ -424,11 +426,124 @@ class NeutronVIFPool(test_base.TestCase): pod = get_pod_obj() pool_key = mock.sentinel.pool_key subnets = mock.sentinel.subnets + security_groups = 'test-sg' - m_driver._available_ports_pools = {pool_key: collections.deque([])} + m_driver._available_ports_pools = { + pool_key: {tuple(security_groups): collections.deque([])}} + m_driver._last_update = {pool_key: {tuple(security_groups): 1}} self.assertRaises(exceptions.ResourceNotReady, cls._get_port_from_pool, - m_driver, pool_key, pod, subnets) + m_driver, pool_key, pod, subnets, + tuple(security_groups)) + + neutron.update_port.assert_not_called() + + @mock.patch('eventlet.spawn') + def test__get_port_from_pool_empty_pool_reuse(self, m_eventlet): + cls = vif_pool.NeutronVIFPool + m_driver = mock.MagicMock(spec=cls) + neutron = self.useFixture(k_fix.MockNeutronClient()).client + + pod = get_pod_obj() + port_id = str(uuid.uuid4()) + port = mock.sentinel.port + pool_key = mock.sentinel.pool_key + subnets = mock.sentinel.subnets + security_groups = 'test-sg' + security_groups_2 = 'test-sg2' + + oslo_cfg.CONF.set_override('port_debug', + False, + group='kubernetes') + pool_length = 5 + m_driver._get_pool_size.return_value = pool_length + + m_driver._available_ports_pools = { + pool_key: {tuple(security_groups): collections.deque([]), + tuple(security_groups_2): collections.deque([port_id])}} + m_driver._last_update = {pool_key: {tuple(security_groups): 1, + tuple(security_groups_2): 0}} + m_driver._existing_vifs = {port_id: port} + + self.assertEqual(port, cls._get_port_from_pool( + m_driver, pool_key, pod, subnets, tuple(security_groups))) + + neutron.update_port.assert_called_once_with( + port_id, + { + "port": { + 'security_groups': list(security_groups), + } + }) + m_eventlet.assert_not_called() + + @mock.patch('eventlet.spawn') + def test__get_port_from_pool_empty_pool_reuse_no_update_info(self, + m_eventlet): + cls = vif_pool.NeutronVIFPool + m_driver = mock.MagicMock(spec=cls) + neutron = self.useFixture(k_fix.MockNeutronClient()).client + + pod = get_pod_obj() + port_id = str(uuid.uuid4()) + port = mock.sentinel.port + pool_key = mock.sentinel.pool_key + subnets = mock.sentinel.subnets + security_groups = 'test-sg' + security_groups_2 = 'test-sg2' + + oslo_cfg.CONF.set_override('port_debug', + False, + group='kubernetes') + pool_length = 5 + m_driver._get_pool_size.return_value = pool_length + + m_driver._available_ports_pools = { + pool_key: {tuple(security_groups): collections.deque([]), + tuple(security_groups_2): collections.deque([port_id])}} + m_driver._last_update = {} + m_driver._existing_vifs = {port_id: port} + + self.assertEqual(port, cls._get_port_from_pool( + m_driver, pool_key, pod, subnets, tuple(security_groups))) + + neutron.update_port.assert_called_once_with( + port_id, + { + "port": { + 'security_groups': list(security_groups), + } + }) + m_eventlet.assert_not_called() + + def test__get_port_from_pool_empty_pool_reuse_no_ports(self): + cls = vif_pool.NeutronVIFPool + m_driver = mock.MagicMock(spec=cls) + neutron = self.useFixture(k_fix.MockNeutronClient()).client + + pod = get_pod_obj() + port_id = str(uuid.uuid4()) + port = mock.sentinel.port + pool_key = mock.sentinel.pool_key + subnets = mock.sentinel.subnets + security_groups = 'test-sg' + security_groups_2 = 'test-sg2' + + oslo_cfg.CONF.set_override('port_debug', + False, + group='kubernetes') + pool_length = 5 + m_driver._get_pool_size.return_value = pool_length + + m_driver._available_ports_pools = { + pool_key: {tuple(security_groups): collections.deque([]), + tuple(security_groups_2): collections.deque([])}} + m_driver._last_update = {} + m_driver._existing_vifs = {port_id: port} + + self.assertRaises(exceptions.ResourceNotReady, cls._get_port_from_pool, + m_driver, pool_key, pod, subnets, tuple( + security_groups)) neutron.update_port.assert_not_called() @@ -438,7 +553,7 @@ class NeutronVIFPool(test_base.TestCase): m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 5 @@ -462,7 +577,6 @@ class NeutronVIFPool(test_base.TestCase): "port": { 'name': constants.KURYR_PORT_NAME, 'device_id': '', - 'security_groups': ['security_group'] } }) neutron.delete_port.assert_not_called() @@ -473,7 +587,7 @@ class NeutronVIFPool(test_base.TestCase): m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 5 @@ -499,7 +613,7 @@ class NeutronVIFPool(test_base.TestCase): m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 10 vif = mock.sentinel.vif @@ -524,7 +638,7 @@ class NeutronVIFPool(test_base.TestCase): m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 5 @@ -552,7 +666,6 @@ class NeutronVIFPool(test_base.TestCase): "port": { 'name': constants.KURYR_PORT_NAME, 'device_id': '', - 'security_groups': ['security_group'] } }) neutron.delete_port.assert_not_called() @@ -562,7 +675,7 @@ class NeutronVIFPool(test_base.TestCase): m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 10 vif = mock.sentinel.vif @@ -588,7 +701,7 @@ class NeutronVIFPool(test_base.TestCase): m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 10 @@ -639,8 +752,7 @@ class NeutronVIFPool(test_base.TestCase): vif = mock.sentinel.vif m_to_osvif.return_value = vif - pool_key = (port['binding:host_id'], port['project_id'], - tuple(port['security_groups']), net_id) + pool_key = (port['binding:host_id'], port['project_id'], net_id) m_driver._get_pool_key.return_value = pool_key m_driver._get_trunks_info.return_value = ({}, {}, {}) @@ -652,7 +764,8 @@ class NeutronVIFPool(test_base.TestCase): m_to_osvif.assert_called_once_with(vif_plugin, port, subnet) self.assertEqual(m_driver._existing_vifs[port_id], vif) - self.assertEqual(m_driver._available_ports_pools[pool_key], [port_id]) + self.assertEqual(m_driver._available_ports_pools[pool_key], + {tuple(port['security_groups']): [port_id]}) @mock.patch('kuryr_kubernetes.os_vif_util.neutron_to_osvif_vif') @mock.patch('kuryr_kubernetes.utils.get_subnet') @@ -681,9 +794,10 @@ class NeutronVIFPool(test_base.TestCase): neutron = self.useFixture(k_fix.MockNeutronClient()).client net_id = mock.sentinel.net_id - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) - m_driver._available_ports_pools = {pool_key: [port_id]} + m_driver._available_ports_pools = {pool_key: { + tuple(['security_group']): [port_id]}} m_driver._existing_vifs = {port_id: mock.sentinel.vif} m_driver._get_pool_key_net.return_value = net_id @@ -701,9 +815,10 @@ class NeutronVIFPool(test_base.TestCase): neutron = self.useFixture(k_fix.MockNeutronClient()).client net_id = mock.sentinel.net_id - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) - m_driver._available_ports_pools = {pool_key: [port_id]} + m_driver._available_ports_pools = {pool_key: { + tuple(['security_group']): [port_id]}} m_driver._existing_vifs = {} neutron.delete_port.side_effect = n_exc.PortNotFoundClient @@ -765,11 +880,12 @@ class NestedVIFPool(test_base.TestCase): port_id = str(uuid.uuid4()) port = mock.sentinel.port subnets = mock.sentinel.subnets + security_groups = 'test-sg' pod = get_pod_obj() m_driver._available_ports_pools = { - pool_key: collections.deque([port_id])} + pool_key: {tuple(security_groups): collections.deque([port_id])}} m_driver._existing_vifs = {port_id: port} m_get_port_name.return_value = get_pod_name(pod) @@ -783,7 +899,7 @@ class NestedVIFPool(test_base.TestCase): m_driver._get_pool_size.return_value = pool_length self.assertEqual(port, cls._get_port_from_pool( - m_driver, pool_key, pod, subnets)) + m_driver, pool_key, pod, subnets, tuple(security_groups))) neutron.update_port.assert_called_once_with( port_id, @@ -806,11 +922,12 @@ class NestedVIFPool(test_base.TestCase): port_id = str(uuid.uuid4()) port = mock.sentinel.port subnets = mock.sentinel.subnets + security_groups = 'test-sg' pod = get_pod_obj() m_driver._available_ports_pools = { - pool_key: collections.deque([port_id])} + pool_key: {tuple(security_groups): collections.deque([port_id])}} m_driver._existing_vifs = {port_id: port} m_get_port_name.return_value = get_pod_name(pod) @@ -824,7 +941,7 @@ class NestedVIFPool(test_base.TestCase): m_driver._get_pool_size.return_value = pool_length self.assertEqual(port, cls._get_port_from_pool( - m_driver, pool_key, pod, subnets)) + m_driver, pool_key, pod, subnets, tuple(security_groups))) neutron.update_port.assert_called_once_with( port_id, @@ -843,11 +960,124 @@ class NestedVIFPool(test_base.TestCase): pod = mock.sentinel.pod pool_key = mock.sentinel.pool_key subnets = mock.sentinel.subnets + security_groups = 'test-sg' - m_driver._available_ports_pools = {pool_key: collections.deque([])} + m_driver._available_ports_pools = { + pool_key: {tuple(security_groups): collections.deque([])}} + m_driver._last_update = {pool_key: {tuple(security_groups): 1}} self.assertRaises(exceptions.ResourceNotReady, cls._get_port_from_pool, - m_driver, pool_key, pod, subnets) + m_driver, pool_key, pod, subnets, tuple( + security_groups)) + + neutron.update_port.assert_not_called() + + @mock.patch('eventlet.spawn') + def test__get_port_from_pool_empty_pool_reuse(self, m_eventlet): + cls = vif_pool.NestedVIFPool + m_driver = mock.MagicMock(spec=cls) + neutron = self.useFixture(k_fix.MockNeutronClient()).client + + pod = mock.sentinel.pod + port_id = str(uuid.uuid4()) + port = mock.sentinel.port + pool_key = mock.sentinel.pool_key + subnets = mock.sentinel.subnets + security_groups = 'test-sg' + security_groups_2 = 'test-sg2' + + oslo_cfg.CONF.set_override('port_debug', + False, + group='kubernetes') + pool_length = 5 + m_driver._get_pool_size.return_value = pool_length + + m_driver._available_ports_pools = { + pool_key: {tuple(security_groups): collections.deque([]), + tuple(security_groups_2): collections.deque([port_id])}} + m_driver._last_update = {pool_key: {tuple(security_groups): 1, + tuple(security_groups_2): 0}} + m_driver._existing_vifs = {port_id: port} + + self.assertEqual(port, cls._get_port_from_pool( + m_driver, pool_key, pod, subnets, tuple(security_groups))) + + neutron.update_port.assert_called_once_with( + port_id, + { + "port": { + 'security_groups': list(security_groups), + } + }) + m_eventlet.assert_not_called() + + @mock.patch('eventlet.spawn') + def test__get_port_from_pool_empty_pool_reuse_no_update_info(self, + m_eventlet): + cls = vif_pool.NestedVIFPool + m_driver = mock.MagicMock(spec=cls) + neutron = self.useFixture(k_fix.MockNeutronClient()).client + + pod = mock.sentinel.pod + port_id = str(uuid.uuid4()) + port = mock.sentinel.port + pool_key = mock.sentinel.pool_key + subnets = mock.sentinel.subnets + security_groups = 'test-sg' + security_groups_2 = 'test-sg2' + + oslo_cfg.CONF.set_override('port_debug', + False, + group='kubernetes') + pool_length = 5 + m_driver._get_pool_size.return_value = pool_length + + m_driver._available_ports_pools = { + pool_key: {tuple(security_groups): collections.deque([]), + tuple(security_groups_2): collections.deque([port_id])}} + m_driver._last_update = {} + m_driver._existing_vifs = {port_id: port} + + self.assertEqual(port, cls._get_port_from_pool( + m_driver, pool_key, pod, subnets, tuple(security_groups))) + + neutron.update_port.assert_called_once_with( + port_id, + { + "port": { + 'security_groups': list(security_groups), + } + }) + m_eventlet.assert_not_called() + + def test__get_port_from_pool_empty_pool_reuse_no_ports(self): + cls = vif_pool.NestedVIFPool + m_driver = mock.MagicMock(spec=cls) + neutron = self.useFixture(k_fix.MockNeutronClient()).client + + pod = mock.sentinel.pod + port_id = str(uuid.uuid4()) + port = mock.sentinel.port + pool_key = mock.sentinel.pool_key + subnets = mock.sentinel.subnets + security_groups = 'test-sg' + security_groups_2 = 'test-sg2' + + oslo_cfg.CONF.set_override('port_debug', + False, + group='kubernetes') + pool_length = 5 + m_driver._get_pool_size.return_value = pool_length + + m_driver._available_ports_pools = { + pool_key: {tuple(security_groups): collections.deque([]), + tuple(security_groups_2): collections.deque([])}} + m_driver._last_update = {} + m_driver._existing_vifs = {port_id: port} + + self.assertRaises(exceptions.ResourceNotReady, cls._get_port_from_pool, + m_driver, pool_key, pod, subnets, tuple( + security_groups)) neutron.update_port.assert_not_called() @@ -857,7 +1087,7 @@ class NestedVIFPool(test_base.TestCase): m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 5 @@ -880,7 +1110,6 @@ class NestedVIFPool(test_base.TestCase): { "port": { 'name': constants.KURYR_PORT_NAME, - 'security_groups': ['security_group'] } }) neutron.delete_port.assert_not_called() @@ -891,7 +1120,7 @@ class NestedVIFPool(test_base.TestCase): m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 5 @@ -920,7 +1149,7 @@ class NestedVIFPool(test_base.TestCase): vif_driver = mock.MagicMock(spec=cls_vif_driver) m_driver._drv_vif = vif_driver - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 10 vif = mock.MagicMock() @@ -953,7 +1182,7 @@ class NestedVIFPool(test_base.TestCase): m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 5 @@ -977,7 +1206,6 @@ class NestedVIFPool(test_base.TestCase): { "port": { 'name': constants.KURYR_PORT_NAME, - 'security_groups': ['security_group'] } }) neutron.delete_port.assert_not_called() @@ -990,7 +1218,7 @@ class NestedVIFPool(test_base.TestCase): vif_driver = mock.MagicMock(spec=cls_vif_driver) m_driver._drv_vif = vif_driver - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 10 vif = mock.MagicMock() @@ -1027,7 +1255,7 @@ class NestedVIFPool(test_base.TestCase): vif_driver = mock.MagicMock(spec=cls_vif_driver) m_driver._drv_vif = vif_driver - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 10 trunk_id = str(uuid.uuid4()) @@ -1163,15 +1391,15 @@ class NestedVIFPool(test_base.TestCase): vif = mock.sentinel.vif m_to_osvif.return_value = vif - pool_key = (port['binding:host_id'], port['project_id'], - tuple(port['security_groups']), net_id) + pool_key = (port['binding:host_id'], port['project_id'], net_id) m_driver._get_pool_key.return_value = pool_key cls._precreated_ports(m_driver, 'recover') m_driver._get_trunks_info.assert_called_once() self.assertEqual(m_driver._existing_vifs[port_id], vif) - self.assertEqual(m_driver._available_ports_pools[pool_key], [port_id]) + self.assertEqual(m_driver._available_ports_pools[pool_key], + {tuple(port['security_groups']): [port_id]}) neutron.delete_port.assert_not_called() def test__precreated_ports_free(self): @@ -1200,10 +1428,10 @@ class NestedVIFPool(test_base.TestCase): m_driver._get_trunks_info.return_value = (p_ports, a_subports, subnets) - pool_key = (port['binding:host_id'], port['project_id'], - tuple(port['security_groups']), net_id) + pool_key = (port['binding:host_id'], port['project_id'], net_id) m_driver._get_pool_key.return_value = pool_key - m_driver._available_ports_pools = {pool_key: [port_id]} + m_driver._available_ports_pools = { + pool_key: {tuple(port['security_groups']): [port_id]}} m_driver._existing_vifs = {port_id: mock.sentinel.vif} cls._precreated_ports(m_driver, 'free') @@ -1214,7 +1442,8 @@ class NestedVIFPool(test_base.TestCase): m_driver._drv_vif._release_vlan_id.assert_called_once() self.assertEqual(m_driver._existing_vifs, {}) - self.assertEqual(m_driver._available_ports_pools[pool_key], []) + self.assertEqual(m_driver._available_ports_pools[pool_key][tuple( + port['security_groups'])], []) @mock.patch('kuryr_kubernetes.os_vif_util.' 'neutron_to_osvif_vif_nested_vlan') @@ -1302,8 +1531,7 @@ class NestedVIFPool(test_base.TestCase): vif = mock.sentinel.vif m_to_osvif.return_value = vif - pool_key = (port1['binding:host_id'], port1['project_id'], - tuple(port1['security_groups']), net_id) + pool_key = (port1['binding:host_id'], port1['project_id'], net_id) m_driver._get_pool_key.return_value = pool_key cls._precreated_ports(m_driver, 'recover') @@ -1311,7 +1539,8 @@ class NestedVIFPool(test_base.TestCase): self.assertEqual(m_driver._existing_vifs, {port_id1: vif, port_id2: vif}) self.assertEqual(m_driver._available_ports_pools[pool_key], - [port_id1, port_id2]) + {tuple(port1['security_groups']): [port_id1, + port_id2]}) neutron.delete_port.assert_not_called() @ddt.data(('recover'), ('free')) @@ -1382,13 +1611,14 @@ class NestedVIFPool(test_base.TestCase): neutron = self.useFixture(k_fix.MockNeutronClient()).client net_id = mock.sentinel.net_id - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) trunk_id = str(uuid.uuid4()) vif = mock.MagicMock() vlan_id = mock.sentinel.vlan_id vif.vlan_id = vlan_id - m_driver._available_ports_pools = {pool_key: [port_id]} + m_driver._available_ports_pools = {pool_key: { + tuple(['security_group']): [port_id]}} m_driver._existing_vifs = {port_id: vif} m_driver._get_trunk_id.return_value = trunk_id @@ -1415,13 +1645,14 @@ class NestedVIFPool(test_base.TestCase): neutron = self.useFixture(k_fix.MockNeutronClient()).client net_id = mock.sentinel.net_id - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) trunk_id = str(uuid.uuid4()) vif = mock.MagicMock() vlan_id = mock.sentinel.vlan_id vif.vlan_id = vlan_id - m_driver._available_ports_pools = {pool_key: [port_id]} + m_driver._available_ports_pools = {pool_key: { + tuple(['security_group']): [port_id]}} m_driver._existing_vifs = {port_id: vif} m_driver._get_trunk_id.return_value = trunk_id @@ -1450,13 +1681,14 @@ class NestedVIFPool(test_base.TestCase): neutron = self.useFixture(k_fix.MockNeutronClient()).client net_id = mock.sentinel.net_id - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) trunk_id = str(uuid.uuid4()) vif = mock.MagicMock() vlan_id = mock.sentinel.vlan_id vif.vlan_id = vlan_id - m_driver._available_ports_pools = {pool_key: [port_id]} + m_driver._available_ports_pools = {pool_key: { + tuple(['security_group']): [port_id]}} m_driver._existing_vifs = {} m_driver._get_trunk_id.return_value = trunk_id