diff --git a/kuryr_kubernetes/controller/drivers/vif_pool.py b/kuryr_kubernetes/controller/drivers/vif_pool.py index 2e594fd37..7d708dd0b 100644 --- a/kuryr_kubernetes/controller/drivers/vif_pool.py +++ b/kuryr_kubernetes/controller/drivers/vif_pool.py @@ -167,23 +167,25 @@ class BaseVIFPool(base.VIFPoolDriver): def update_vif_sgs(self, pod, sgs): self._drv_vif.update_vif_sgs(pod, sgs) - def _get_pool_size(self, pool_key=None): - return len(self._available_ports_pools.get(pool_key, [])) + def _get_pool_size(self, pool_key): + pool = self._available_ports_pools.get(pool_key, {}) + pool_members = [] + for port_list in pool.values(): + pool_members.extend(port_list) + return len(pool_members) def _get_host_addr(self, pod): return pod['status']['hostIP'] - def _get_pool_key(self, host, project_id, security_groups, net_id=None, - subnets=None): + def _get_pool_key(self, host, project_id, net_id=None, subnets=None): if not net_id and subnets: net_obj = list(subnets.values())[0] net_id = net_obj.id - pool_key = (host, project_id, tuple(sorted(security_groups)), - net_id) + pool_key = (host, project_id, net_id) return pool_key def _get_pool_key_net(self, pool_key): - return pool_key[3] + return pool_key[2] def request_vif(self, pod, project_id, subnets, security_groups): try: @@ -191,33 +193,37 @@ class BaseVIFPool(base.VIFPoolDriver): except KeyError: LOG.warning("Pod has not been scheduled yet.") raise - pool_key = self._get_pool_key(host_addr, project_id, security_groups, - None, subnets) + pool_key = self._get_pool_key(host_addr, project_id, None, subnets) try: - return self._get_port_from_pool(pool_key, pod, subnets) + return self._get_port_from_pool(pool_key, pod, subnets, + tuple(sorted(security_groups))) except exceptions.ResourceNotReady: LOG.warning("Ports pool does not have available ports!") - eventlet.spawn(self._populate_pool, pool_key, pod, subnets) + eventlet.spawn(self._populate_pool, pool_key, pod, subnets, + tuple(sorted(security_groups))) raise - def _get_port_from_pool(self, pool_key, pod, subnets): + def _get_port_from_pool(self, pool_key, pod, subnets, security_groups): raise NotImplementedError() - def _populate_pool(self, pool_key, pod, subnets): + def _populate_pool(self, pool_key, pod, subnets, security_groups): # REVISIT(ltomasbo): Drop the subnets parameter and get the information # from the pool_key, which will be required when multi-network is # supported now = time.time() - try: - if (now - oslo_cfg.CONF.vif_pool.ports_pool_update_frequency < - self._last_update.get(pool_key, 0)): - LOG.info("Not enough time since the last pool update") + pool_updates = self._last_update.get(pool_key) + if pool_updates: + last_update = pool_updates.get(security_groups, 0) + try: + if (now - oslo_cfg.CONF.vif_pool.ports_pool_update_frequency < + last_update): + LOG.info("Not enough time since the last pool update") + return + except AttributeError: + LOG.info("Kuryr-controller not yet ready to populate pools") return - except AttributeError: - LOG.info("Kuryr-controller not yet ready to populate pools") - return - self._last_update[pool_key] = now + self._last_update[pool_key] = {security_groups: now} pool_size = self._get_pool_size(pool_key) if pool_size < oslo_cfg.CONF.vif_pool.ports_pool_min: @@ -227,18 +233,19 @@ class BaseVIFPool(base.VIFPoolDriver): pod=pod, project_id=pool_key[1], subnets=subnets, - security_groups=list(pool_key[2]), + security_groups=security_groups, num_ports=num_ports) for vif in vifs: self._existing_vifs[vif.id] = vif - self._available_ports_pools.setdefault(pool_key, - []).append(vif.id) + self._available_ports_pools.setdefault( + pool_key, {}).setdefault( + security_groups, []).append(vif.id) def release_vif(self, pod, vif, project_id, security_groups): host_addr = self._get_host_addr(pod) - pool_key = self._get_pool_key(host_addr, project_id, security_groups, - vif.network.id, None) + pool_key = self._get_pool_key(host_addr, project_id, vif.network.id, + None) try: if not self._existing_vifs.get(vif.id): @@ -300,12 +307,10 @@ class BaseVIFPool(base.VIFPoolDriver): @lockutils.synchronized('return_to_pool_baremetal') @lockutils.synchronized('return_to_pool_nested') def sync_pools(self): - self._available_ports_pools = collections.defaultdict( - collections.deque) - self._existing_vifs = collections.defaultdict(collections.defaultdict) - self._recyclable_ports = collections.defaultdict( - collections.defaultdict) - self._last_update = collections.defaultdict(collections.defaultdict) + self._available_ports_pools = collections.defaultdict() + self._existing_vifs = collections.defaultdict() + self._recyclable_ports = collections.defaultdict() + self._last_update = collections.defaultdict() # NOTE(ltomasbo): Ensure previously created ports are recovered into # their respective pools self._recover_precreated_ports() @@ -377,11 +382,45 @@ class NeutronVIFPool(BaseVIFPool): def _get_host_addr(self, pod): return pod['spec']['nodeName'] - def _get_port_from_pool(self, pool_key, pod, subnets): + def _get_port_from_pool(self, pool_key, pod, subnets, security_groups): try: - port_id = self._available_ports_pools[pool_key].pop() - except (IndexError, AttributeError): + pool_ports = self._available_ports_pools[pool_key] + except (KeyError, AttributeError): raise exceptions.ResourceNotReady(pod) + try: + port_id = pool_ports[security_groups].pop() + except (KeyError, IndexError): + # Get another port from the pool and update the SG to the + # appropriate one. It uses a port from the group that was updated + # longer ago + pool_updates = self._last_update.get(pool_key, {}) + if not pool_updates: + # No pools update info. Selecting a random one + for sg_group, ports in pool_ports.items(): + if len(ports) > 0: + port_id = pool_ports[sg_group].pop() + break + else: + raise exceptions.ResourceNotReady(pod) + else: + min_date = -1 + for sg_group, date in pool_updates.items(): + if pool_ports.get(sg_group): + if min_date == -1 or date < min_date: + min_date = date + min_sg_group = sg_group + if min_date == -1: + # pool is empty, no port to reuse + raise exceptions.ResourceNotReady(pod) + port_id = pool_ports[min_sg_group].pop() + neutron = clients.get_neutron_client() + neutron.update_port( + port_id, + { + "port": { + 'security_groups': list(security_groups) + } + }) if config.CONF.kubernetes.port_debug: neutron = clients.get_neutron_client() neutron.update_port( @@ -395,7 +434,8 @@ class NeutronVIFPool(BaseVIFPool): # check if the pool needs to be populated if (self._get_pool_size(pool_key) < oslo_cfg.CONF.vif_pool.ports_pool_min): - eventlet.spawn(self._populate_pool, pool_key, pod, subnets) + eventlet.spawn(self._populate_pool, pool_key, pod, subnets, + security_groups) return self._existing_vifs[port_id] def _return_ports_to_pool(self): @@ -426,7 +466,8 @@ class NeutronVIFPool(BaseVIFPool): device_owner=kl_const.DEVICE_OWNER) for port in kuryr_ports: if port['id'] in self._recyclable_ports: - sg_current[port['id']] = port['security_groups'] + sg_current[port['id']] = tuple(sorted( + port['security_groups'])) for port_id, pool_key in self._recyclable_ports.copy().items(): if (not oslo_cfg.CONF.vif_pool.ports_pool_max or @@ -435,25 +476,24 @@ class NeutronVIFPool(BaseVIFPool): port_name = (constants.KURYR_PORT_NAME if config.CONF.kubernetes.port_debug else '') - if (config.CONF.kubernetes.port_debug or - list(pool_key[2]) != sg_current.get(port_id)): + if config.CONF.kubernetes.port_debug: try: neutron.update_port( port_id, { "port": { 'name': port_name, - 'device_id': '', - 'security_groups': list(pool_key[2]) + 'device_id': '' } }) except n_exc.NeutronClientException: - LOG.warning("Error preparing port %s to be " + LOG.warning("Error changing name for port %s to be " "reused, put back on the cleanable " "pool.", port_id) continue self._available_ports_pools.setdefault( - pool_key, []).append(port_id) + pool_key, {}).setdefault( + sg_current.get(port_id), []).append(port_id) else: try: del self._existing_vifs[port_id] @@ -502,12 +542,13 @@ class NeutronVIFPool(BaseVIFPool): net_obj = subnet[subnet_id] pool_key = self._get_pool_key(port_host, port['project_id'], - port['security_groups'], net_obj.id, None) self._existing_vifs[port['id']] = vif self._available_ports_pools.setdefault( - pool_key, []).append(port['id']) + pool_key, {}).setdefault( + tuple(sorted(port['security_groups'])), []).append( + port['id']) LOG.info("PORTS POOL: pools updated with pre-created ports") self._create_healthcheck_file() @@ -524,10 +565,13 @@ class NeutronVIFPool(BaseVIFPool): # on the available_ports_pools dict. The next call forces it to be on # that dict before cleaning it up self._trigger_return_to_pool() - for pool_key, ports_id in self._available_ports_pools.items(): + for pool_key, ports in self._available_ports_pools.items(): if self._get_pool_key_net(pool_key) != net_id: continue self._available_ports_pools[pool_key] = [] + ports_id = [] + for sg_ports in ports.values(): + ports_id.extend(sg_ports) for port_id in ports_id: try: del self._existing_vifs[port_id] @@ -560,11 +604,45 @@ class NestedVIFPool(BaseVIFPool): def set_vif_driver(self, driver): self._drv_vif = driver - def _get_port_from_pool(self, pool_key, pod, subnets): + def _get_port_from_pool(self, pool_key, pod, subnets, security_groups): try: - port_id = self._available_ports_pools[pool_key].pop() - except (IndexError, AttributeError): + pool_ports = self._available_ports_pools[pool_key] + except (KeyError, AttributeError): raise exceptions.ResourceNotReady(pod) + try: + port_id = pool_ports[security_groups].pop() + except (KeyError, IndexError): + # Get another port from the pool and update the SG to the + # appropriate one. It uses a port from the group that was updated + # longer ago + pool_updates = self._last_update.get(pool_key, {}) + if not pool_updates: + # No pools update info. Selecting a random one + for sg_group, ports in pool_ports.items(): + if len(ports) > 0: + port_id = pool_ports[sg_group].pop() + break + else: + raise exceptions.ResourceNotReady(pod) + else: + min_date = -1 + for sg_group, date in pool_updates.items(): + if pool_ports.get(sg_group): + if min_date == -1 or date < min_date: + min_date = date + min_sg_group = sg_group + if min_date == -1: + # pool is empty, no port to reuse + raise exceptions.ResourceNotReady(pod) + port_id = pool_ports[min_sg_group].pop() + neutron = clients.get_neutron_client() + neutron.update_port( + port_id, + { + "port": { + 'security_groups': list(security_groups) + } + }) if config.CONF.kubernetes.port_debug: neutron = clients.get_neutron_client() neutron.update_port( @@ -577,7 +655,8 @@ class NestedVIFPool(BaseVIFPool): # check if the pool needs to be populated if (self._get_pool_size(pool_key) < oslo_cfg.CONF.vif_pool.ports_pool_min): - eventlet.spawn(self._populate_pool, pool_key, pod, subnets) + eventlet.spawn(self._populate_pool, pool_key, pod, subnets, + security_groups) return self._existing_vifs[port_id] def _return_ports_to_pool(self): @@ -608,7 +687,8 @@ class NestedVIFPool(BaseVIFPool): device_owner=['trunk:subport', kl_const.DEVICE_OWNER]) for subport in kuryr_subports: if subport['id'] in self._recyclable_ports: - sg_current[subport['id']] = subport['security_groups'] + sg_current[subport['id']] = tuple(sorted( + subport['security_groups'])) for port_id, pool_key in self._recyclable_ports.copy().items(): if (not oslo_cfg.CONF.vif_pool.ports_pool_max or @@ -617,24 +697,23 @@ class NestedVIFPool(BaseVIFPool): port_name = (constants.KURYR_PORT_NAME if config.CONF.kubernetes.port_debug else '') - if (config.CONF.kubernetes.port_debug or - list(pool_key[2]) != sg_current.get(port_id)): + if config.CONF.kubernetes.port_debug: try: neutron.update_port( port_id, { "port": { 'name': port_name, - 'security_groups': list(pool_key[2]) } }) except n_exc.NeutronClientException: - LOG.warning("Error preparing port %s to be " + LOG.warning("Error changing name for port %s to be " "reused, put back on the cleanable " "pool.", port_id) continue self._available_ports_pools.setdefault( - pool_key, []).append(port_id) + pool_key, {}).setdefault( + sg_current.get(port_id), []).append(port_id) else: trunk_id = self._get_trunk_id(neutron, pool_key) try: @@ -713,8 +792,6 @@ class NestedVIFPool(BaseVIFPool): net_obj = subnet[subnet_id] pool_key = self._get_pool_key(host_addr, kuryr_subport['project_id'], - kuryr_subport[ - 'security_groups'], net_obj.id, None) if action == 'recover': @@ -723,7 +800,9 @@ class NestedVIFPool(BaseVIFPool): self._existing_vifs[kuryr_subport['id']] = vif self._available_ports_pools.setdefault( - pool_key, []).append(kuryr_subport['id']) + pool_key, {}).setdefault(tuple(sorted( + kuryr_subport['security_groups'])), + []).append(kuryr_subport['id']) elif action == 'free': try: @@ -733,8 +812,9 @@ class NestedVIFPool(BaseVIFPool): self._drv_vif._release_vlan_id( subport['segmentation_id']) del self._existing_vifs[kuryr_subport['id']] - self._available_ports_pools[pool_key].remove( - kuryr_subport['id']) + self._available_ports_pools[pool_key][ + tuple(sorted(kuryr_subport['security_groups'] + ))].remove(kuryr_subport['id']) except n_exc.PortNotFoundClient: LOG.debug('Unable to release port %s as it no ' 'longer exists.', kuryr_subport['id']) @@ -764,12 +844,11 @@ class NestedVIFPool(BaseVIFPool): num_ports=num_ports, trunk_ip=trunk_ip) - pool_key = self._get_pool_key(trunk_ip, project_id, security_groups, - None, subnets) + pool_key = self._get_pool_key(trunk_ip, project_id, None, subnets) for vif in vifs: self._existing_vifs[vif.id] = vif - self._available_ports_pools.setdefault(pool_key, - []).append(vif.id) + self._available_ports_pools.setdefault(pool_key, {}).setdefault( + tuple(sorted(security_groups)), []).append(vif.id) def free_pool(self, trunk_ips=None): """Removes subports from the pool and deletes neutron port resource. @@ -791,19 +870,21 @@ class NestedVIFPool(BaseVIFPool): # on the available_ports_pools dict. The next call forces it to be on # that dict before cleaning it up self._trigger_return_to_pool() - for pool_key, ports_ids in self._available_ports_pools.items(): + for pool_key, ports in self._available_ports_pools.items(): if self._get_pool_key_net(pool_key) != net_id: continue self._available_ports_pools[pool_key] = [] trunk_id = self._get_trunk_id(neutron, pool_key) + ports_id = [p_id for sg_ports in ports.values() + for p_id in sg_ports] try: - self._drv_vif._remove_subports(neutron, trunk_id, ports_ids) + self._drv_vif._remove_subports(neutron, trunk_id, ports_id) except n_exc.NeutronClientException: LOG.exception('Error removing subports from trunk: %s', trunk_id) continue - for port_id in ports_ids: + for port_id in ports_id: try: self._drv_vif._release_vlan_id( self._existing_vifs[port_id].vlan_id) diff --git a/kuryr_kubernetes/tests/unit/controller/drivers/test_vif_pool.py b/kuryr_kubernetes/tests/unit/controller/drivers/test_vif_pool.py index 8885dde35..ec1173371 100644 --- a/kuryr_kubernetes/tests/unit/controller/drivers/test_vif_pool.py +++ b/kuryr_kubernetes/tests/unit/controller/drivers/test_vif_pool.py @@ -187,15 +187,14 @@ class BaseVIFPool(test_base.TestCase): pod = mock.sentinel.pod project_id = str(uuid.uuid4()) subnets = mock.sentinel.subnets - security_groups = [mock.sentinel.security_groups] - pool_key = (mock.sentinel.host_addr, project_id, - tuple(security_groups)) + security_groups = 'test-sg' + pool_key = (mock.sentinel.host_addr, project_id) vif = osv_vif.VIFOpenVSwitch(id='0fa0e837-d34e-4580-a6c4-04f5f607d93e') vifs = [vif] m_driver._existing_vifs = {} m_driver._available_ports_pools = {} - m_driver._last_update = {pool_key: 1} + m_driver._last_update = {pool_key: {tuple(security_groups): 1}} oslo_cfg.CONF.set_override('ports_pool_min', 5, @@ -206,7 +205,8 @@ class BaseVIFPool(test_base.TestCase): m_driver._get_pool_size.return_value = 2 vif_driver.request_vifs.return_value = vifs - cls._populate_pool(m_driver, pool_key, pod, subnets) + cls._populate_pool(m_driver, pool_key, pod, subnets, + tuple(security_groups)) m_driver._get_pool_size.assert_called_once() m_driver._drv_vif.request_vifs.assert_called_once() @@ -218,16 +218,16 @@ class BaseVIFPool(test_base.TestCase): pod = mock.sentinel.pod project_id = str(uuid.uuid4()) subnets = mock.sentinel.subnets - security_groups = [mock.sentinel.security_groups] - pool_key = (mock.sentinel.host_addr, project_id, - tuple(security_groups)) + security_groups = 'test-sg' + pool_key = (mock.sentinel.host_addr, project_id) oslo_cfg.CONF.set_override('ports_pool_update_frequency', 15, group='vif_pool') - m_driver._last_update = {pool_key: 1} + m_driver._last_update = {pool_key: {tuple(security_groups): 1}} - cls._populate_pool(m_driver, pool_key, pod, subnets) + cls._populate_pool(m_driver, pool_key, pod, subnets, + tuple(security_groups)) m_driver._get_pool_size.assert_not_called() @mock.patch('time.time', return_value=50) @@ -244,9 +244,8 @@ class BaseVIFPool(test_base.TestCase): pod = mock.sentinel.pod project_id = str(uuid.uuid4()) subnets = mock.sentinel.subnets - security_groups = [mock.sentinel.security_groups] - pool_key = (mock.sentinel.host_addr, project_id, - tuple(security_groups)) + security_groups = 'test-sg' + pool_key = (mock.sentinel.host_addr, project_id) oslo_cfg.CONF.set_override('ports_pool_update_frequency', 15, @@ -254,10 +253,11 @@ class BaseVIFPool(test_base.TestCase): oslo_cfg.CONF.set_override('ports_pool_min', 5, group='vif_pool') - m_driver._last_update = {pool_key: 1} + m_driver._last_update = {pool_key: {tuple(security_groups): 1}} m_driver._get_pool_size.return_value = 10 - cls._populate_pool(m_driver, pool_key, pod, subnets) + cls._populate_pool(m_driver, pool_key, pod, subnets, + tuple(security_groups)) m_driver._get_pool_size.assert_called_once() m_driver._drv_vif.request_vifs.assert_not_called() @@ -341,11 +341,12 @@ class NeutronVIFPool(test_base.TestCase): port_id = str(uuid.uuid4()) port = mock.sentinel.port subnets = mock.sentinel.subnets + security_groups = 'test-sg' pod = get_pod_obj() m_driver._available_ports_pools = { - pool_key: collections.deque([port_id])} + pool_key: {tuple(security_groups): collections.deque([port_id])}} m_driver._existing_vifs = {port_id: port} m_get_port_name.return_value = get_pod_name(pod) @@ -362,7 +363,7 @@ class NeutronVIFPool(test_base.TestCase): m_driver._get_pool_size.return_value = pool_length self.assertEqual(port, cls._get_port_from_pool( - m_driver, pool_key, pod, subnets)) + m_driver, pool_key, pod, subnets, tuple(security_groups))) neutron.update_port.assert_called_once_with( port_id, @@ -386,11 +387,12 @@ class NeutronVIFPool(test_base.TestCase): port_id = str(uuid.uuid4()) port = mock.sentinel.port subnets = mock.sentinel.subnets + security_groups = 'test-sg' pod = get_pod_obj() m_driver._available_ports_pools = { - pool_key: collections.deque([port_id])} + pool_key: {tuple(security_groups): collections.deque([port_id])}} m_driver._existing_vifs = {port_id: port} m_get_port_name.return_value = get_pod_name(pod) @@ -404,7 +406,7 @@ class NeutronVIFPool(test_base.TestCase): m_driver._get_pool_size.return_value = pool_length self.assertEqual(port, cls._get_port_from_pool( - m_driver, pool_key, pod, subnets)) + m_driver, pool_key, pod, subnets, tuple(security_groups))) neutron.update_port.assert_called_once_with( port_id, @@ -424,11 +426,124 @@ class NeutronVIFPool(test_base.TestCase): pod = get_pod_obj() pool_key = mock.sentinel.pool_key subnets = mock.sentinel.subnets + security_groups = 'test-sg' - m_driver._available_ports_pools = {pool_key: collections.deque([])} + m_driver._available_ports_pools = { + pool_key: {tuple(security_groups): collections.deque([])}} + m_driver._last_update = {pool_key: {tuple(security_groups): 1}} self.assertRaises(exceptions.ResourceNotReady, cls._get_port_from_pool, - m_driver, pool_key, pod, subnets) + m_driver, pool_key, pod, subnets, + tuple(security_groups)) + + neutron.update_port.assert_not_called() + + @mock.patch('eventlet.spawn') + def test__get_port_from_pool_empty_pool_reuse(self, m_eventlet): + cls = vif_pool.NeutronVIFPool + m_driver = mock.MagicMock(spec=cls) + neutron = self.useFixture(k_fix.MockNeutronClient()).client + + pod = get_pod_obj() + port_id = str(uuid.uuid4()) + port = mock.sentinel.port + pool_key = mock.sentinel.pool_key + subnets = mock.sentinel.subnets + security_groups = 'test-sg' + security_groups_2 = 'test-sg2' + + oslo_cfg.CONF.set_override('port_debug', + False, + group='kubernetes') + pool_length = 5 + m_driver._get_pool_size.return_value = pool_length + + m_driver._available_ports_pools = { + pool_key: {tuple(security_groups): collections.deque([]), + tuple(security_groups_2): collections.deque([port_id])}} + m_driver._last_update = {pool_key: {tuple(security_groups): 1, + tuple(security_groups_2): 0}} + m_driver._existing_vifs = {port_id: port} + + self.assertEqual(port, cls._get_port_from_pool( + m_driver, pool_key, pod, subnets, tuple(security_groups))) + + neutron.update_port.assert_called_once_with( + port_id, + { + "port": { + 'security_groups': list(security_groups), + } + }) + m_eventlet.assert_not_called() + + @mock.patch('eventlet.spawn') + def test__get_port_from_pool_empty_pool_reuse_no_update_info(self, + m_eventlet): + cls = vif_pool.NeutronVIFPool + m_driver = mock.MagicMock(spec=cls) + neutron = self.useFixture(k_fix.MockNeutronClient()).client + + pod = get_pod_obj() + port_id = str(uuid.uuid4()) + port = mock.sentinel.port + pool_key = mock.sentinel.pool_key + subnets = mock.sentinel.subnets + security_groups = 'test-sg' + security_groups_2 = 'test-sg2' + + oslo_cfg.CONF.set_override('port_debug', + False, + group='kubernetes') + pool_length = 5 + m_driver._get_pool_size.return_value = pool_length + + m_driver._available_ports_pools = { + pool_key: {tuple(security_groups): collections.deque([]), + tuple(security_groups_2): collections.deque([port_id])}} + m_driver._last_update = {} + m_driver._existing_vifs = {port_id: port} + + self.assertEqual(port, cls._get_port_from_pool( + m_driver, pool_key, pod, subnets, tuple(security_groups))) + + neutron.update_port.assert_called_once_with( + port_id, + { + "port": { + 'security_groups': list(security_groups), + } + }) + m_eventlet.assert_not_called() + + def test__get_port_from_pool_empty_pool_reuse_no_ports(self): + cls = vif_pool.NeutronVIFPool + m_driver = mock.MagicMock(spec=cls) + neutron = self.useFixture(k_fix.MockNeutronClient()).client + + pod = get_pod_obj() + port_id = str(uuid.uuid4()) + port = mock.sentinel.port + pool_key = mock.sentinel.pool_key + subnets = mock.sentinel.subnets + security_groups = 'test-sg' + security_groups_2 = 'test-sg2' + + oslo_cfg.CONF.set_override('port_debug', + False, + group='kubernetes') + pool_length = 5 + m_driver._get_pool_size.return_value = pool_length + + m_driver._available_ports_pools = { + pool_key: {tuple(security_groups): collections.deque([]), + tuple(security_groups_2): collections.deque([])}} + m_driver._last_update = {} + m_driver._existing_vifs = {port_id: port} + + self.assertRaises(exceptions.ResourceNotReady, cls._get_port_from_pool, + m_driver, pool_key, pod, subnets, tuple( + security_groups)) neutron.update_port.assert_not_called() @@ -438,7 +553,7 @@ class NeutronVIFPool(test_base.TestCase): m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 5 @@ -462,7 +577,6 @@ class NeutronVIFPool(test_base.TestCase): "port": { 'name': constants.KURYR_PORT_NAME, 'device_id': '', - 'security_groups': ['security_group'] } }) neutron.delete_port.assert_not_called() @@ -473,7 +587,7 @@ class NeutronVIFPool(test_base.TestCase): m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 5 @@ -499,7 +613,7 @@ class NeutronVIFPool(test_base.TestCase): m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 10 vif = mock.sentinel.vif @@ -524,7 +638,7 @@ class NeutronVIFPool(test_base.TestCase): m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 5 @@ -552,7 +666,6 @@ class NeutronVIFPool(test_base.TestCase): "port": { 'name': constants.KURYR_PORT_NAME, 'device_id': '', - 'security_groups': ['security_group'] } }) neutron.delete_port.assert_not_called() @@ -562,7 +675,7 @@ class NeutronVIFPool(test_base.TestCase): m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 10 vif = mock.sentinel.vif @@ -588,7 +701,7 @@ class NeutronVIFPool(test_base.TestCase): m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 10 @@ -639,8 +752,7 @@ class NeutronVIFPool(test_base.TestCase): vif = mock.sentinel.vif m_to_osvif.return_value = vif - pool_key = (port['binding:host_id'], port['project_id'], - tuple(port['security_groups']), net_id) + pool_key = (port['binding:host_id'], port['project_id'], net_id) m_driver._get_pool_key.return_value = pool_key m_driver._get_trunks_info.return_value = ({}, {}, {}) @@ -652,7 +764,8 @@ class NeutronVIFPool(test_base.TestCase): m_to_osvif.assert_called_once_with(vif_plugin, port, subnet) self.assertEqual(m_driver._existing_vifs[port_id], vif) - self.assertEqual(m_driver._available_ports_pools[pool_key], [port_id]) + self.assertEqual(m_driver._available_ports_pools[pool_key], + {tuple(port['security_groups']): [port_id]}) @mock.patch('kuryr_kubernetes.os_vif_util.neutron_to_osvif_vif') @mock.patch('kuryr_kubernetes.utils.get_subnet') @@ -681,9 +794,10 @@ class NeutronVIFPool(test_base.TestCase): neutron = self.useFixture(k_fix.MockNeutronClient()).client net_id = mock.sentinel.net_id - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) - m_driver._available_ports_pools = {pool_key: [port_id]} + m_driver._available_ports_pools = {pool_key: { + tuple(['security_group']): [port_id]}} m_driver._existing_vifs = {port_id: mock.sentinel.vif} m_driver._get_pool_key_net.return_value = net_id @@ -701,9 +815,10 @@ class NeutronVIFPool(test_base.TestCase): neutron = self.useFixture(k_fix.MockNeutronClient()).client net_id = mock.sentinel.net_id - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) - m_driver._available_ports_pools = {pool_key: [port_id]} + m_driver._available_ports_pools = {pool_key: { + tuple(['security_group']): [port_id]}} m_driver._existing_vifs = {} neutron.delete_port.side_effect = n_exc.PortNotFoundClient @@ -765,11 +880,12 @@ class NestedVIFPool(test_base.TestCase): port_id = str(uuid.uuid4()) port = mock.sentinel.port subnets = mock.sentinel.subnets + security_groups = 'test-sg' pod = get_pod_obj() m_driver._available_ports_pools = { - pool_key: collections.deque([port_id])} + pool_key: {tuple(security_groups): collections.deque([port_id])}} m_driver._existing_vifs = {port_id: port} m_get_port_name.return_value = get_pod_name(pod) @@ -783,7 +899,7 @@ class NestedVIFPool(test_base.TestCase): m_driver._get_pool_size.return_value = pool_length self.assertEqual(port, cls._get_port_from_pool( - m_driver, pool_key, pod, subnets)) + m_driver, pool_key, pod, subnets, tuple(security_groups))) neutron.update_port.assert_called_once_with( port_id, @@ -806,11 +922,12 @@ class NestedVIFPool(test_base.TestCase): port_id = str(uuid.uuid4()) port = mock.sentinel.port subnets = mock.sentinel.subnets + security_groups = 'test-sg' pod = get_pod_obj() m_driver._available_ports_pools = { - pool_key: collections.deque([port_id])} + pool_key: {tuple(security_groups): collections.deque([port_id])}} m_driver._existing_vifs = {port_id: port} m_get_port_name.return_value = get_pod_name(pod) @@ -824,7 +941,7 @@ class NestedVIFPool(test_base.TestCase): m_driver._get_pool_size.return_value = pool_length self.assertEqual(port, cls._get_port_from_pool( - m_driver, pool_key, pod, subnets)) + m_driver, pool_key, pod, subnets, tuple(security_groups))) neutron.update_port.assert_called_once_with( port_id, @@ -843,11 +960,124 @@ class NestedVIFPool(test_base.TestCase): pod = mock.sentinel.pod pool_key = mock.sentinel.pool_key subnets = mock.sentinel.subnets + security_groups = 'test-sg' - m_driver._available_ports_pools = {pool_key: collections.deque([])} + m_driver._available_ports_pools = { + pool_key: {tuple(security_groups): collections.deque([])}} + m_driver._last_update = {pool_key: {tuple(security_groups): 1}} self.assertRaises(exceptions.ResourceNotReady, cls._get_port_from_pool, - m_driver, pool_key, pod, subnets) + m_driver, pool_key, pod, subnets, tuple( + security_groups)) + + neutron.update_port.assert_not_called() + + @mock.patch('eventlet.spawn') + def test__get_port_from_pool_empty_pool_reuse(self, m_eventlet): + cls = vif_pool.NestedVIFPool + m_driver = mock.MagicMock(spec=cls) + neutron = self.useFixture(k_fix.MockNeutronClient()).client + + pod = mock.sentinel.pod + port_id = str(uuid.uuid4()) + port = mock.sentinel.port + pool_key = mock.sentinel.pool_key + subnets = mock.sentinel.subnets + security_groups = 'test-sg' + security_groups_2 = 'test-sg2' + + oslo_cfg.CONF.set_override('port_debug', + False, + group='kubernetes') + pool_length = 5 + m_driver._get_pool_size.return_value = pool_length + + m_driver._available_ports_pools = { + pool_key: {tuple(security_groups): collections.deque([]), + tuple(security_groups_2): collections.deque([port_id])}} + m_driver._last_update = {pool_key: {tuple(security_groups): 1, + tuple(security_groups_2): 0}} + m_driver._existing_vifs = {port_id: port} + + self.assertEqual(port, cls._get_port_from_pool( + m_driver, pool_key, pod, subnets, tuple(security_groups))) + + neutron.update_port.assert_called_once_with( + port_id, + { + "port": { + 'security_groups': list(security_groups), + } + }) + m_eventlet.assert_not_called() + + @mock.patch('eventlet.spawn') + def test__get_port_from_pool_empty_pool_reuse_no_update_info(self, + m_eventlet): + cls = vif_pool.NestedVIFPool + m_driver = mock.MagicMock(spec=cls) + neutron = self.useFixture(k_fix.MockNeutronClient()).client + + pod = mock.sentinel.pod + port_id = str(uuid.uuid4()) + port = mock.sentinel.port + pool_key = mock.sentinel.pool_key + subnets = mock.sentinel.subnets + security_groups = 'test-sg' + security_groups_2 = 'test-sg2' + + oslo_cfg.CONF.set_override('port_debug', + False, + group='kubernetes') + pool_length = 5 + m_driver._get_pool_size.return_value = pool_length + + m_driver._available_ports_pools = { + pool_key: {tuple(security_groups): collections.deque([]), + tuple(security_groups_2): collections.deque([port_id])}} + m_driver._last_update = {} + m_driver._existing_vifs = {port_id: port} + + self.assertEqual(port, cls._get_port_from_pool( + m_driver, pool_key, pod, subnets, tuple(security_groups))) + + neutron.update_port.assert_called_once_with( + port_id, + { + "port": { + 'security_groups': list(security_groups), + } + }) + m_eventlet.assert_not_called() + + def test__get_port_from_pool_empty_pool_reuse_no_ports(self): + cls = vif_pool.NestedVIFPool + m_driver = mock.MagicMock(spec=cls) + neutron = self.useFixture(k_fix.MockNeutronClient()).client + + pod = mock.sentinel.pod + port_id = str(uuid.uuid4()) + port = mock.sentinel.port + pool_key = mock.sentinel.pool_key + subnets = mock.sentinel.subnets + security_groups = 'test-sg' + security_groups_2 = 'test-sg2' + + oslo_cfg.CONF.set_override('port_debug', + False, + group='kubernetes') + pool_length = 5 + m_driver._get_pool_size.return_value = pool_length + + m_driver._available_ports_pools = { + pool_key: {tuple(security_groups): collections.deque([]), + tuple(security_groups_2): collections.deque([])}} + m_driver._last_update = {} + m_driver._existing_vifs = {port_id: port} + + self.assertRaises(exceptions.ResourceNotReady, cls._get_port_from_pool, + m_driver, pool_key, pod, subnets, tuple( + security_groups)) neutron.update_port.assert_not_called() @@ -857,7 +1087,7 @@ class NestedVIFPool(test_base.TestCase): m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 5 @@ -880,7 +1110,6 @@ class NestedVIFPool(test_base.TestCase): { "port": { 'name': constants.KURYR_PORT_NAME, - 'security_groups': ['security_group'] } }) neutron.delete_port.assert_not_called() @@ -891,7 +1120,7 @@ class NestedVIFPool(test_base.TestCase): m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 5 @@ -920,7 +1149,7 @@ class NestedVIFPool(test_base.TestCase): vif_driver = mock.MagicMock(spec=cls_vif_driver) m_driver._drv_vif = vif_driver - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 10 vif = mock.MagicMock() @@ -953,7 +1182,7 @@ class NestedVIFPool(test_base.TestCase): m_driver = mock.MagicMock(spec=cls) neutron = self.useFixture(k_fix.MockNeutronClient()).client - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 5 @@ -977,7 +1206,6 @@ class NestedVIFPool(test_base.TestCase): { "port": { 'name': constants.KURYR_PORT_NAME, - 'security_groups': ['security_group'] } }) neutron.delete_port.assert_not_called() @@ -990,7 +1218,7 @@ class NestedVIFPool(test_base.TestCase): vif_driver = mock.MagicMock(spec=cls_vif_driver) m_driver._drv_vif = vif_driver - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 10 vif = mock.MagicMock() @@ -1027,7 +1255,7 @@ class NestedVIFPool(test_base.TestCase): vif_driver = mock.MagicMock(spec=cls_vif_driver) m_driver._drv_vif = vif_driver - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) pool_length = 10 trunk_id = str(uuid.uuid4()) @@ -1163,15 +1391,15 @@ class NestedVIFPool(test_base.TestCase): vif = mock.sentinel.vif m_to_osvif.return_value = vif - pool_key = (port['binding:host_id'], port['project_id'], - tuple(port['security_groups']), net_id) + pool_key = (port['binding:host_id'], port['project_id'], net_id) m_driver._get_pool_key.return_value = pool_key cls._precreated_ports(m_driver, 'recover') m_driver._get_trunks_info.assert_called_once() self.assertEqual(m_driver._existing_vifs[port_id], vif) - self.assertEqual(m_driver._available_ports_pools[pool_key], [port_id]) + self.assertEqual(m_driver._available_ports_pools[pool_key], + {tuple(port['security_groups']): [port_id]}) neutron.delete_port.assert_not_called() def test__precreated_ports_free(self): @@ -1200,10 +1428,10 @@ class NestedVIFPool(test_base.TestCase): m_driver._get_trunks_info.return_value = (p_ports, a_subports, subnets) - pool_key = (port['binding:host_id'], port['project_id'], - tuple(port['security_groups']), net_id) + pool_key = (port['binding:host_id'], port['project_id'], net_id) m_driver._get_pool_key.return_value = pool_key - m_driver._available_ports_pools = {pool_key: [port_id]} + m_driver._available_ports_pools = { + pool_key: {tuple(port['security_groups']): [port_id]}} m_driver._existing_vifs = {port_id: mock.sentinel.vif} cls._precreated_ports(m_driver, 'free') @@ -1214,7 +1442,8 @@ class NestedVIFPool(test_base.TestCase): m_driver._drv_vif._release_vlan_id.assert_called_once() self.assertEqual(m_driver._existing_vifs, {}) - self.assertEqual(m_driver._available_ports_pools[pool_key], []) + self.assertEqual(m_driver._available_ports_pools[pool_key][tuple( + port['security_groups'])], []) @mock.patch('kuryr_kubernetes.os_vif_util.' 'neutron_to_osvif_vif_nested_vlan') @@ -1302,8 +1531,7 @@ class NestedVIFPool(test_base.TestCase): vif = mock.sentinel.vif m_to_osvif.return_value = vif - pool_key = (port1['binding:host_id'], port1['project_id'], - tuple(port1['security_groups']), net_id) + pool_key = (port1['binding:host_id'], port1['project_id'], net_id) m_driver._get_pool_key.return_value = pool_key cls._precreated_ports(m_driver, 'recover') @@ -1311,7 +1539,8 @@ class NestedVIFPool(test_base.TestCase): self.assertEqual(m_driver._existing_vifs, {port_id1: vif, port_id2: vif}) self.assertEqual(m_driver._available_ports_pools[pool_key], - [port_id1, port_id2]) + {tuple(port1['security_groups']): [port_id1, + port_id2]}) neutron.delete_port.assert_not_called() @ddt.data(('recover'), ('free')) @@ -1382,13 +1611,14 @@ class NestedVIFPool(test_base.TestCase): neutron = self.useFixture(k_fix.MockNeutronClient()).client net_id = mock.sentinel.net_id - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) trunk_id = str(uuid.uuid4()) vif = mock.MagicMock() vlan_id = mock.sentinel.vlan_id vif.vlan_id = vlan_id - m_driver._available_ports_pools = {pool_key: [port_id]} + m_driver._available_ports_pools = {pool_key: { + tuple(['security_group']): [port_id]}} m_driver._existing_vifs = {port_id: vif} m_driver._get_trunk_id.return_value = trunk_id @@ -1415,13 +1645,14 @@ class NestedVIFPool(test_base.TestCase): neutron = self.useFixture(k_fix.MockNeutronClient()).client net_id = mock.sentinel.net_id - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) trunk_id = str(uuid.uuid4()) vif = mock.MagicMock() vlan_id = mock.sentinel.vlan_id vif.vlan_id = vlan_id - m_driver._available_ports_pools = {pool_key: [port_id]} + m_driver._available_ports_pools = {pool_key: { + tuple(['security_group']): [port_id]}} m_driver._existing_vifs = {port_id: vif} m_driver._get_trunk_id.return_value = trunk_id @@ -1450,13 +1681,14 @@ class NestedVIFPool(test_base.TestCase): neutron = self.useFixture(k_fix.MockNeutronClient()).client net_id = mock.sentinel.net_id - pool_key = ('node_ip', 'project_id', tuple(['security_group'])) + pool_key = ('node_ip', 'project_id') port_id = str(uuid.uuid4()) trunk_id = str(uuid.uuid4()) vif = mock.MagicMock() vlan_id = mock.sentinel.vlan_id vif.vlan_id = vlan_id - m_driver._available_ports_pools = {pool_key: [port_id]} + m_driver._available_ports_pools = {pool_key: { + tuple(['security_group']): [port_id]}} m_driver._existing_vifs = {} m_driver._get_trunk_id.return_value = trunk_id