diff --git a/kuryr_kubernetes/controller/handlers/lbaas.py b/kuryr_kubernetes/controller/handlers/lbaas.py index 97a505867..e6330ef78 100644 --- a/kuryr_kubernetes/controller/handlers/lbaas.py +++ b/kuryr_kubernetes/controller/handlers/lbaas.py @@ -118,33 +118,7 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler): def _has_lbaas_spec_changes(self, service, lbaas_spec): return (self._has_ip_changes(service, lbaas_spec) or - self._has_port_changes(service, lbaas_spec)) - - def _get_service_ports(self, service): - return [{'name': port.get('name'), - 'protocol': port.get('protocol', 'TCP'), - 'port': port['port'], - 'targetPort': port['targetPort']} - for port in service['spec']['ports']] - - def _has_port_changes(self, service, lbaas_spec): - link = service['metadata']['selfLink'] - - fields = obj_lbaas.LBaaSPortSpec.fields - svc_port_set = {tuple(port[attr] for attr in fields) - for port in self._get_service_ports(service)} - - spec_port_set = {tuple(getattr(port, attr) - for attr in fields - if port.obj_attr_is_set(attr)) - for port in lbaas_spec.ports} - - if svc_port_set != spec_port_set: - LOG.debug("LBaaS spec ports %(spec_ports)s != %(svc_ports)s " - "for %(link)s" % {'spec_ports': spec_port_set, - 'svc_ports': svc_port_set, - 'link': link}) - return svc_port_set != spec_port_set + utils.has_port_changes(service, lbaas_spec)) def _has_ip_changes(self, service, lbaas_spec): link = service['metadata']['selfLink'] @@ -166,7 +140,7 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler): def _generate_lbaas_port_specs(self, service): return [obj_lbaas.LBaaSPortSpec(**port) - for port in self._get_service_ports(service)] + for port in utils.get_service_ports(service)] class LoadBalancerHandler(k8s_base.ResourceEventHandler): @@ -262,24 +236,25 @@ class LoadBalancerHandler(k8s_base.ResourceEventHandler): obj_lbaas.LBaaSServiceSpec()) def _should_ignore(self, endpoints, lbaas_spec): + # NOTE(ltomasbo): we must wait until service handler has annotated the + # endpoints to process them. Thus, if annotations are not updated to + # match the endpoints information, we should skip the event return not(lbaas_spec and self._has_pods(endpoints) and - self._is_lbaas_spec_in_sync(endpoints, lbaas_spec)) + self._svc_handler_annotations_updated(endpoints, + lbaas_spec)) - def _is_lbaas_spec_in_sync(self, endpoints, lbaas_spec): - ports = lbaas_spec.ports - ep_ports = list(set((port.get('name'), port.get('port')) - if ports[0].obj_attr_is_set('targetPort') - else port.get('name') - for subset in endpoints.get('subsets', []) - for port in subset.get('ports', []))) - - spec_ports = [(port.name, port.targetPort) - if port.obj_attr_is_set('targetPort') - else port.name - for port in ports] - - return sorted(ep_ports) == sorted(spec_ports) + def _svc_handler_annotations_updated(self, endpoints, lbaas_spec): + svc_link = self._get_service_link(endpoints) + k8s = clients.get_kubernetes_client() + service = k8s.get(svc_link) + if utils.has_port_changes(service, lbaas_spec): + # NOTE(ltomasbo): Ensuring lbaas_spec annotated on the endpoints + # is in sync with the service status, i.e., upon a service + # modification it will ensure endpoint modifications are not + # handled until the service handler has performed its annotations + return False + return True def _has_pods(self, endpoints): ep_subsets = endpoints.get('subsets', []) diff --git a/kuryr_kubernetes/objects/lbaas.py b/kuryr_kubernetes/objects/lbaas.py index 87352ea2e..b0c971327 100644 --- a/kuryr_kubernetes/objects/lbaas.py +++ b/kuryr_kubernetes/objects/lbaas.py @@ -128,7 +128,7 @@ class LBaaSPortSpec(k_obj.KuryrK8sObjectBase): 'name': obj_fields.StringField(nullable=True), 'protocol': obj_fields.StringField(), 'port': obj_fields.IntegerField(), - 'targetPort': obj_fields.IntegerField(), + 'targetPort': obj_fields.StringField(), } diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py index 81ac42b63..014142406 100644 --- a/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py @@ -193,7 +193,8 @@ class TestLBaaSSpecHandler(test_base.TestCase): m_drv_sg.get_security_groups.assert_called_once_with( service, project_id) - def test_has_lbaas_spec_changes(self): + @mock.patch('kuryr_kubernetes.utils.has_port_changes') + def test_has_lbaas_spec_changes(self, m_port_changes): m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) service = mock.sentinel.service lbaas_spec = mock.sentinel.lbaas_spec @@ -201,65 +202,11 @@ class TestLBaaSSpecHandler(test_base.TestCase): for has_ip_changes in (True, False): for has_port_changes in (True, False): m_handler._has_ip_changes.return_value = has_ip_changes - m_handler._has_port_changes.return_value = has_port_changes + m_port_changes.return_value = has_port_changes ret = h_lbaas.LBaaSSpecHandler._has_lbaas_spec_changes( m_handler, service, lbaas_spec) self.assertEqual(has_ip_changes or has_port_changes, ret) - def test_get_service_ports(self): - m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) - service = {'spec': {'ports': [ - {'port': 1, 'targetPort': 1}, - {'port': 2, 'name': 'X', 'protocol': 'UDP', 'targetPort': 2} - ]}} - expected_ret = [ - {'port': 1, 'name': None, 'protocol': 'TCP', 'targetPort': 1}, - {'port': 2, 'name': 'X', 'protocol': 'UDP', 'targetPort': 2}] - - ret = h_lbaas.LBaaSSpecHandler._get_service_ports(m_handler, service) - self.assertEqual(expected_ret, ret) - - def test_has_port_changes(self): - m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) - m_service = mock.MagicMock() - m_handler._get_service_ports.return_value = [ - {'port': 1, 'name': 'X', 'protocol': 'TCP', 'targetPort': 1}, - ] - - m_lbaas_spec = mock.MagicMock() - m_lbaas_spec.ports = [ - obj_lbaas.LBaaSPortSpec(name='X', protocol='TCP', port=1, - targetPort=1), - obj_lbaas.LBaaSPortSpec(name='Y', protocol='TCP', port=2, - targetPort=2), - ] - - ret = h_lbaas.LBaaSSpecHandler._has_port_changes( - m_handler, m_service, m_lbaas_spec) - - self.assertTrue(ret) - - def test_has_port_changes__no_changes(self): - m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) - m_service = mock.MagicMock() - m_handler._get_service_ports.return_value = [ - {'port': 1, 'name': 'X', 'protocol': 'TCP', 'targetPort': 1}, - {'port': 2, 'name': 'Y', 'protocol': 'TCP', 'targetPort': 2} - ] - - m_lbaas_spec = mock.MagicMock() - m_lbaas_spec.ports = [ - obj_lbaas.LBaaSPortSpec(name='X', protocol='TCP', port=1, - targetPort=1), - obj_lbaas.LBaaSPortSpec(name='Y', protocol='TCP', port=2, - targetPort=2), - ] - - ret = h_lbaas.LBaaSSpecHandler._has_port_changes( - m_handler, m_service, m_lbaas_spec) - - self.assertFalse(ret) - def test_has_ip_changes(self): m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) m_service = mock.MagicMock() @@ -302,9 +249,10 @@ class TestLBaaSSpecHandler(test_base.TestCase): m_handler, m_service, m_lbaas_spec) self.assertFalse(ret) - def test_generate_lbaas_port_specs(self): + @mock.patch('kuryr_kubernetes.utils.get_service_ports') + def test_generate_lbaas_port_specs(self, m_get_service_ports): m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) - m_handler._get_service_ports.return_value = [ + m_get_service_ports.return_value = [ {'port': 1, 'name': 'X', 'protocol': 'TCP'}, {'port': 2, 'name': 'Y', 'protocol': 'TCP'} ] @@ -316,12 +264,13 @@ class TestLBaaSSpecHandler(test_base.TestCase): ret = h_lbaas.LBaaSSpecHandler._generate_lbaas_port_specs( m_handler, mock.sentinel.service) self.assertEqual(expected_ports, ret) - m_handler._get_service_ports.assert_called_once_with( + m_get_service_ports.assert_called_once_with( mock.sentinel.service) - def test_generate_lbaas_port_specs_udp(self): + @mock.patch('kuryr_kubernetes.utils.get_service_ports') + def test_generate_lbaas_port_specs_udp(self, m_get_service_ports): m_handler = mock.Mock(spec=h_lbaas.LBaaSSpecHandler) - m_handler._get_service_ports.return_value = [ + m_get_service_ports.return_value = [ {'port': 1, 'name': 'X', 'protocol': 'TCP'}, {'port': 2, 'name': 'Y', 'protocol': 'UDP'} ] @@ -333,7 +282,7 @@ class TestLBaaSSpecHandler(test_base.TestCase): ret = h_lbaas.LBaaSSpecHandler._generate_lbaas_port_specs( m_handler, mock.sentinel.service) self.assertEqual(expected_ports, ret) - m_handler._get_service_ports.assert_called_once_with( + m_get_service_ports.assert_called_once_with( mock.sentinel.service) def test_set_lbaas_spec(self): @@ -657,30 +606,16 @@ class TestLoadBalancerHandler(test_base.TestCase): # REVISIT(ivc): ddt? m_handler = mock.Mock(spec=h_lbaas.LoadBalancerHandler) m_handler._has_pods.return_value = True - m_handler._is_lbaas_spec_in_sync.return_value = True + m_handler._svc_handler_annotations_updated.return_value = True ret = h_lbaas.LoadBalancerHandler._should_ignore( m_handler, endpoints, lbaas_spec) self.assertEqual(False, ret) m_handler._has_pods.assert_called_once_with(endpoints) - m_handler._is_lbaas_spec_in_sync.assert_called_once_with( + m_handler._svc_handler_annotations_updated.assert_called_once_with( endpoints, lbaas_spec) - def test_is_lbaas_spec_in_sync(self): - names = ['a', 'b', 'c'] - endpoints = {'subsets': [{'ports': [{'name': n, 'port': 1} - for n in names]}]} - lbaas_spec = obj_lbaas.LBaaSServiceSpec(ports=[ - obj_lbaas.LBaaSPortSpec(name=n, targetPort=1) - for n in reversed(names)]) - - m_handler = mock.Mock(spec=h_lbaas.LoadBalancerHandler) - ret = h_lbaas.LoadBalancerHandler._is_lbaas_spec_in_sync( - m_handler, endpoints, lbaas_spec) - - self.assertEqual(True, ret) - def test_has_pods(self): # REVISIT(ivc): ddt? endpoints = {'subsets': [ diff --git a/kuryr_kubernetes/tests/unit/test_object.py b/kuryr_kubernetes/tests/unit/test_object.py index b6d95a558..1ec788800 100644 --- a/kuryr_kubernetes/tests/unit/test_object.py +++ b/kuryr_kubernetes/tests/unit/test_object.py @@ -28,7 +28,7 @@ object_data = { 'LBaaSLoadBalancer': '1.3-8bc0a9bdbd160da67572aa38784378d1', 'LBaaSMember': '1.0-a770c6884c27d6d8c21186b27d0e2ccb', 'LBaaSPool': '1.1-6e77370d7632a902445444249eb77b01', - 'LBaaSPortSpec': '1.1-fcfa2fd07f4bc5619b96fa41bcdf6e23', + 'LBaaSPortSpec': '1.1-1b307f34630617086c7af70f2cb8b215', 'LBaaSPubIp': '1.0-83992edec2c60fb4ab8998ea42a4ff74', 'LBaaSRouteNotifEntry': '1.0-dd2f2be956f68814b1f47cb13483a885', 'LBaaSRouteNotifier': '1.0-f0bfd8e772434abe7557930d7e0180c1', diff --git a/kuryr_kubernetes/tests/unit/test_utils.py b/kuryr_kubernetes/tests/unit/test_utils.py index 74f26d3d7..f493f14b6 100644 --- a/kuryr_kubernetes/tests/unit/test_utils.py +++ b/kuryr_kubernetes/tests/unit/test_utils.py @@ -18,6 +18,7 @@ from oslo_config import cfg from kuryr_kubernetes import constants as k_const from kuryr_kubernetes import exceptions as k_exc +from kuryr_kubernetes.objects import lbaas as obj_lbaas from kuryr_kubernetes.objects import vif from kuryr_kubernetes.tests import base as test_base from kuryr_kubernetes.tests.unit import kuryr_fixtures as k_fix @@ -164,3 +165,53 @@ class TestUtils(test_base.TestCase): ret = utils.get_endpoints_link(service) expected_link = "/api/v1/namespaces/default/endpoints/test" self.assertEqual(expected_link, ret) + + def test_get_service_ports(self): + service = {'spec': {'ports': [ + {'port': 1, 'targetPort': 1}, + {'port': 2, 'name': 'X', 'protocol': 'UDP', 'targetPort': 2} + ]}} + expected_ret = [ + {'port': 1, 'name': None, 'protocol': 'TCP', 'targetPort': '1'}, + {'port': 2, 'name': 'X', 'protocol': 'UDP', 'targetPort': '2'}] + + ret = utils.get_service_ports(service) + self.assertEqual(expected_ret, ret) + + @mock.patch('kuryr_kubernetes.utils.get_service_ports') + def test_has_port_changes(self, m_get_service_ports): + service = mock.MagicMock() + m_get_service_ports.return_value = [ + {'port': 1, 'name': 'X', 'protocol': 'TCP', 'targetPort': 1}, + ] + + lbaas_spec = mock.MagicMock() + lbaas_spec.ports = [ + obj_lbaas.LBaaSPortSpec(name='X', protocol='TCP', port=1, + targetPort=1), + obj_lbaas.LBaaSPortSpec(name='Y', protocol='TCP', port=2, + targetPort=2), + ] + + ret = utils.has_port_changes(service, lbaas_spec) + self.assertTrue(ret) + + @mock.patch('kuryr_kubernetes.utils.get_service_ports') + def test_has_port_changes__no_changes(self, m_get_service_ports): + service = mock.MagicMock() + m_get_service_ports.return_value = [ + {'port': 1, 'name': 'X', 'protocol': 'TCP', 'targetPort': '1'}, + {'port': 2, 'name': 'Y', 'protocol': 'TCP', 'targetPort': '2'} + ] + + lbaas_spec = mock.MagicMock() + lbaas_spec.ports = [ + obj_lbaas.LBaaSPortSpec(name='X', protocol='TCP', port=1, + targetPort=1), + obj_lbaas.LBaaSPortSpec(name='Y', protocol='TCP', port=2, + targetPort=2), + ] + + ret = utils.has_port_changes(service, lbaas_spec) + + self.assertFalse(ret) diff --git a/kuryr_kubernetes/utils.py b/kuryr_kubernetes/utils.py index aa560e051..87c3cb1e1 100644 --- a/kuryr_kubernetes/utils.py +++ b/kuryr_kubernetes/utils.py @@ -263,3 +263,31 @@ def get_endpoints_link(service): link_parts[-2] = 'endpoints' return "/".join(link_parts) + + +def has_port_changes(service, lbaas_spec): + link = service['metadata']['selfLink'] + + fields = obj_lbaas.LBaaSPortSpec.fields + svc_port_set = {tuple(port[attr] for attr in fields) + for port in get_service_ports(service)} + + spec_port_set = {tuple(getattr(port, attr) + for attr in fields + if port.obj_attr_is_set(attr)) + for port in lbaas_spec.ports} + + if svc_port_set != spec_port_set: + LOG.debug("LBaaS spec ports %(spec_ports)s != %(svc_ports)s " + "for %(link)s" % {'spec_ports': spec_port_set, + 'svc_ports': svc_port_set, + 'link': link}) + return svc_port_set != spec_port_set + + +def get_service_ports(service): + return [{'name': port.get('name'), + 'protocol': port.get('protocol', 'TCP'), + 'port': port['port'], + 'targetPort': str(port['targetPort'])} + for port in service['spec']['ports']]