diff --git a/kuryr_kubernetes/config.py b/kuryr_kubernetes/config.py index 3ad930c41..1c5253cf5 100644 --- a/kuryr_kubernetes/config.py +++ b/kuryr_kubernetes/config.py @@ -63,6 +63,10 @@ k8s_opts = [ cfg.StrOpt('pod_vif_driver', help=_("The driver that provides VIFs for Kubernetes Pods."), default='generic'), + cfg.StrOpt('endpoints_lbaas_driver', + help=_("The driver that provides LoadBalancers for Kubernetes " + "Endpoints"), + default='lbaasv2'), ] neutron_defaults = [ diff --git a/kuryr_kubernetes/constants.py b/kuryr_kubernetes/constants.py index d81e96075..e2237d739 100644 --- a/kuryr_kubernetes/constants.py +++ b/kuryr_kubernetes/constants.py @@ -26,6 +26,7 @@ K8S_POD_STATUS_PENDING = 'Pending' K8S_ANNOTATION_PREFIX = 'openstack.org/kuryr' K8S_ANNOTATION_VIF = K8S_ANNOTATION_PREFIX + '-vif' K8S_ANNOTATION_LBAAS_SPEC = K8S_ANNOTATION_PREFIX + '-lbaas-spec' +K8S_ANNOTATION_LBAAS_STATE = K8S_ANNOTATION_PREFIX + '-lbaas-state' K8S_OS_VIF_NOOP_PLUGIN = "noop" diff --git a/kuryr_kubernetes/controller/drivers/base.py b/kuryr_kubernetes/controller/drivers/base.py index ae507b5a8..3230827cb 100644 --- a/kuryr_kubernetes/controller/drivers/base.py +++ b/kuryr_kubernetes/controller/drivers/base.py @@ -257,7 +257,7 @@ class PodVIFDriver(DriverBase): class LBaaSDriver(DriverBase): """Manages Neutron/Octavia load balancer to support Kubernetes Services.""" - ALIAS = 'lbaas' + ALIAS = 'endpoints_lbaas' @abc.abstractmethod def ensure_loadbalancer(self, endpoints, project_id, subnet_id, ip, diff --git a/kuryr_kubernetes/controller/handlers/lbaas.py b/kuryr_kubernetes/controller/handlers/lbaas.py index 375ed9482..940d2f967 100644 --- a/kuryr_kubernetes/controller/handlers/lbaas.py +++ b/kuryr_kubernetes/controller/handlers/lbaas.py @@ -53,7 +53,6 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler): spec = service['spec'] if spec.get('type') == 'ClusterIP': return spec.get('clusterIP') - return None def _get_subnet_id(self, service, project_id, ip): subnets_mapping = self._drv_subnets.get_subnets(service, project_id) @@ -181,3 +180,315 @@ class LBaaSSpecHandler(k8s_base.ResourceEventHandler): obj = obj_lbaas.LBaaSServiceSpec.obj_from_primitive(obj_dict) LOG.debug("Got LBaaSServiceSpec from annotation: %r", obj) return obj + + +class LoadBalancerHandler(k8s_base.ResourceEventHandler): + """LoadBalancerHandler handles K8s Endpoints events. + + LoadBalancerHandler handles K8s Endpoints events and tracks changes in + LBaaSServiceSpec to update Neutron LBaaS accordingly and to reflect its' + actual state in LBaaSState. + """ + + OBJECT_KIND = k_const.K8S_OBJ_ENDPOINTS + + def __init__(self): + self._drv_lbaas = drv_base.LBaaSDriver.get_instance() + self._drv_pod_project = drv_base.PodProjectDriver.get_instance() + self._drv_pod_subnets = drv_base.PodSubnetsDriver.get_instance() + + def on_present(self, endpoints): + lbaas_spec = self._get_lbaas_spec(endpoints) + if self._should_ignore(endpoints, lbaas_spec): + return + + lbaas_state = self._get_lbaas_state(endpoints) + if not lbaas_state: + lbaas_state = obj_lbaas.LBaaSState() + + if self._sync_lbaas_members(endpoints, lbaas_state, lbaas_spec): + # REVISIT(ivc): since _sync_lbaas_members is responsible for + # creating all lbaas components (i.e. load balancer, listeners, + # pools, members), it is currently possible for it to fail (due + # to invalid Kuryr/K8s/Neutron configuration, e.g. Members' IPs + # not belonging to configured Neutron subnet or Service IP being + # in use by gateway or VMs) leaving some Neutron entities without + # properly updating annotation. Some sort of failsafe mechanism is + # required to deal with such situations (e.g. cleanup, or skip + # failing items, or validate configuration) to prevent annotation + # being out of sync with the actual Neutron state. + self._set_lbaas_state(endpoints, lbaas_state) + + def on_deleted(self, endpoints): + lbaas_state = self._get_lbaas_state(endpoints) + if not lbaas_state: + return + # NOTE(ivc): deleting pool deletes its members + lbaas_state.members = [] + self._sync_lbaas_members(endpoints, lbaas_state, + obj_lbaas.LBaaSServiceSpec()) + + def _should_ignore(self, endpoints, lbaas_spec): + return not(lbaas_spec and + self._has_pods(endpoints) and + self._is_lbaas_spec_in_sync(endpoints, lbaas_spec)) + + def _is_lbaas_spec_in_sync(self, endpoints, lbaas_spec): + # REVISIT(ivc): consider other options instead of using 'name' + ep_ports = list(set(port.get('name') + for subset in endpoints.get('subsets', []) + for port in subset.get('ports', []))) + spec_ports = [port.name for port in lbaas_spec.ports] + + return sorted(ep_ports) == sorted(spec_ports) + + def _has_pods(self, endpoints): + return any(True + for subset in endpoints.get('subsets', []) + for address in subset.get('addresses', []) + if address.get('targetRef', {}).get('kind') == 'Pod') + + def _sync_lbaas_members(self, endpoints, lbaas_state, lbaas_spec): + changed = False + + if self._remove_unused_members(endpoints, lbaas_state, lbaas_spec): + changed = True + + if self._sync_lbaas_pools(endpoints, lbaas_state, lbaas_spec): + changed = True + + if self._add_new_members(endpoints, lbaas_state, lbaas_spec): + changed = True + + return changed + + def _add_new_members(self, endpoints, lbaas_state, lbaas_spec): + changed = False + + lsnr_by_id = {l.id: l for l in lbaas_state.listeners} + pool_by_lsnr_port = {(lsnr_by_id[p.listener_id].protocol, + lsnr_by_id[p.listener_id].port): p + for p in lbaas_state.pools} + pool_by_tgt_name = {p.name: pool_by_lsnr_port[p.protocol, p.port] + for p in lbaas_spec.ports} + current_targets = {(str(m.ip), m.port) for m in lbaas_state.members} + + for subset in endpoints.get('subsets', []): + subset_ports = subset.get('ports', []) + for subset_address in subset.get('addresses', []): + try: + target_ip = subset_address['ip'] + target_ref = subset_address['targetRef'] + if target_ref['kind'] != k_const.K8S_OBJ_POD: + continue + except KeyError: + continue + + for subset_port in subset_ports: + target_port = subset_port['port'] + if (target_ip, target_port) in current_targets: + continue + port_name = subset_port.get('name') + pool = pool_by_tgt_name[port_name] + target_subnet_id = self._get_pod_subnet(target_ref, + target_ip) + member = self._drv_lbaas.ensure_member( + endpoints=endpoints, + loadbalancer=lbaas_state.loadbalancer, + pool=pool, + subnet_id=target_subnet_id, + ip=target_ip, + port=target_port, + target_ref=target_ref) + lbaas_state.members.append(member) + changed = True + + return changed + + def _get_pod_subnet(self, target_ref, ip): + # REVISIT(ivc): consider using true pod object instead + pod = {'kind': target_ref['kind'], + 'metadata': {'name': target_ref['name'], + 'namespace': target_ref['namespace']}} + project_id = self._drv_pod_project.get_project(pod) + subnets_map = self._drv_pod_subnets.get_subnets(pod, project_id) + # FIXME(ivc): potentially unsafe [0] index + return [subnet_id for subnet_id, network in six.iteritems(subnets_map) + for subnet in network.subnets.objects + if ip in subnet.cidr][0] + + def _remove_unused_members(self, endpoints, lbaas_state, lbaas_spec): + spec_port_names = {p.name for p in lbaas_spec.ports} + current_targets = {(a['ip'], p['port']) + for s in endpoints['subsets'] + for a in s['addresses'] + for p in s['ports'] + if p.get('name') in spec_port_names} + removed_ids = set() + for member in lbaas_state.members: + if (str(member.ip), member.port) in current_targets: + continue + self._drv_lbaas.release_member(endpoints, + lbaas_state.loadbalancer, + member) + removed_ids.add(member.id) + if removed_ids: + lbaas_state.members = [m for m in lbaas_state.members + if m.id not in removed_ids] + return bool(removed_ids) + + def _sync_lbaas_pools(self, endpoints, lbaas_state, lbaas_spec): + changed = False + + if self._remove_unused_pools(endpoints, lbaas_state, lbaas_spec): + changed = True + + if self._sync_lbaas_listeners(endpoints, lbaas_state, lbaas_spec): + changed = True + + if self._add_new_pools(endpoints, lbaas_state, lbaas_spec): + changed = True + + return changed + + def _add_new_pools(self, endpoints, lbaas_state, lbaas_spec): + changed = False + + current_listeners_ids = {pool.listener_id + for pool in lbaas_state.pools} + for listener in lbaas_state.listeners: + if listener.id in current_listeners_ids: + continue + pool = self._drv_lbaas.ensure_pool(endpoints, + lbaas_state.loadbalancer, + listener) + lbaas_state.pools.append(pool) + changed = True + + return changed + + def _remove_unused_pools(self, endpoints, lbaas_state, lbaas_spec): + current_pools = {m.pool_id for m in lbaas_state.members} + removed_ids = set() + for pool in lbaas_state.pools: + if pool.id in current_pools: + continue + self._drv_lbaas.release_pool(endpoints, + lbaas_state.loadbalancer, + pool) + removed_ids.add(pool.id) + if removed_ids: + lbaas_state.pools = [p for p in lbaas_state.pools + if p.id not in removed_ids] + return bool(removed_ids) + + def _sync_lbaas_listeners(self, endpoints, lbaas_state, lbaas_spec): + changed = False + + if self._remove_unused_listeners(endpoints, lbaas_state, lbaas_spec): + changed = True + + if self._sync_lbaas_loadbalancer(endpoints, lbaas_state, lbaas_spec): + changed = True + + if self._add_new_listeners(endpoints, lbaas_spec, lbaas_state): + changed = True + + return changed + + def _add_new_listeners(self, endpoints, lbaas_spec, lbaas_state): + changed = False + current_port_tuples = {(listener.protocol, listener.port) + for listener in lbaas_state.listeners} + for port_spec in lbaas_spec.ports: + protocol = port_spec.protocol + port = port_spec.port + if (protocol, port) in current_port_tuples: + continue + + listener = self._drv_lbaas.ensure_listener( + endpoints=endpoints, + loadbalancer=lbaas_state.loadbalancer, + protocol=protocol, + port=port) + lbaas_state.listeners.append(listener) + changed = True + return changed + + def _remove_unused_listeners(self, endpoints, lbaas_state, lbaas_spec): + current_listeners = {p.listener_id for p in lbaas_state.pools} + + removed_ids = set() + for listener in lbaas_state.listeners: + if listener.id in current_listeners: + continue + self._drv_lbaas.release_listener(endpoints, + lbaas_state.loadbalancer, + listener) + removed_ids.add(listener.id) + if removed_ids: + lbaas_state.listeners = [l for l in lbaas_state.listeners + if l.id not in removed_ids] + return bool(removed_ids) + + def _sync_lbaas_loadbalancer(self, endpoints, lbaas_state, lbaas_spec): + changed = False + lb = lbaas_state.loadbalancer + + if lb and lb.ip != lbaas_spec.ip: + self._drv_lbaas.release_loadbalancer( + endpoints=endpoints, + loadbalancer=lb) + lb = None + changed = True + + if not lb and lbaas_spec.ip: + lb = self._drv_lbaas.ensure_loadbalancer( + endpoints=endpoints, + project_id=lbaas_spec.project_id, + subnet_id=lbaas_spec.subnet_id, + ip=lbaas_spec.ip, + security_groups_ids=lbaas_spec.security_groups_ids) + changed = True + + lbaas_state.loadbalancer = lb + return changed + + def _get_lbaas_spec(self, endpoints): + # TODO(ivc): same as '_get_lbaas_state' + try: + annotations = endpoints['metadata']['annotations'] + annotation = annotations[k_const.K8S_ANNOTATION_LBAAS_SPEC] + except KeyError: + return None + obj_dict = jsonutils.loads(annotation) + obj = obj_lbaas.LBaaSServiceSpec.obj_from_primitive(obj_dict) + LOG.debug("Got LBaaSServiceSpec from annotation: %r", obj) + return obj + + def _set_lbaas_state(self, endpoints, lbaas_state): + # TODO(ivc): extract annotation interactions + if lbaas_state is None: + LOG.debug("Removing LBaaSState annotation: %r", lbaas_state) + annotation = None + else: + lbaas_state.obj_reset_changes(recursive=True) + LOG.debug("Setting LBaaSState annotation: %r", lbaas_state) + annotation = jsonutils.dumps(lbaas_state.obj_to_primitive(), + sort_keys=True) + k8s = clients.get_kubernetes_client() + k8s.annotate(endpoints['metadata']['selfLink'], + {k_const.K8S_ANNOTATION_LBAAS_STATE: annotation}, + resource_version=endpoints['metadata']['resourceVersion']) + + def _get_lbaas_state(self, endpoints): + # TODO(ivc): same as '_set_lbaas_state' + try: + annotations = endpoints['metadata']['annotations'] + annotation = annotations[k_const.K8S_ANNOTATION_LBAAS_STATE] + except KeyError: + return None + obj_dict = jsonutils.loads(annotation) + obj = obj_lbaas.LBaaSState.obj_from_primitive(obj_dict) + LOG.debug("Got LBaaSState from annotation: %r", obj) + return obj diff --git a/kuryr_kubernetes/controller/service.py b/kuryr_kubernetes/controller/service.py index 56258fcd4..e9dd8e063 100644 --- a/kuryr_kubernetes/controller/service.py +++ b/kuryr_kubernetes/controller/service.py @@ -45,6 +45,7 @@ class KuryrK8sService(service.Service): self.watcher.add("%s/%s" % (constants.K8S_API_BASE, resource)) pipeline.register(h_vif.VIFHandler()) pipeline.register(h_lbaas.LBaaSSpecHandler()) + pipeline.register(h_lbaas.LoadBalancerHandler()) def start(self): LOG.info("Service '%s' starting", self.__class__.__name__) diff --git a/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py b/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py index 6b46eee46..894439a54 100644 --- a/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py +++ b/kuryr_kubernetes/tests/unit/controller/handlers/test_lbaas.py @@ -13,10 +13,13 @@ # License for the specific language governing permissions and limitations # under the License. +import itertools import mock import os_vif.objects.network as osv_network import os_vif.objects.subnet as osv_subnet +from oslo_utils import uuidutils +from kuryr_kubernetes import constants as k_const from kuryr_kubernetes.controller.drivers import base as drv_base from kuryr_kubernetes.controller.handlers import lbaas as h_lbaas from kuryr_kubernetes import exceptions as k_exc @@ -321,3 +324,303 @@ class TestLBaaSSpecHandler(test_base.TestCase): def test_get_lbaas_spec(self): self.skipTest("skipping until generalised annotation handling is " "implemented") + + +class FakeLBaaSDriver(drv_base.LBaaSDriver): + def ensure_loadbalancer(self, endpoints, project_id, subnet_id, ip, + security_groups_ids): + name = str(ip) + return obj_lbaas.LBaaSLoadBalancer(name=name, + project_id=project_id, + subnet_id=subnet_id, + ip=ip, + id=uuidutils.generate_uuid()) + + def ensure_listener(self, endpoints, loadbalancer, protocol, port): + name = "%s:%s:%s" % (loadbalancer.name, protocol, port) + return obj_lbaas.LBaaSListener(name=name, + project_id=loadbalancer.project_id, + loadbalancer_id=loadbalancer.id, + protocol=protocol, + port=port, + id=uuidutils.generate_uuid()) + + def ensure_pool(self, endpoints, loadbalancer, listener): + return obj_lbaas.LBaaSPool(name=listener.name, + project_id=loadbalancer.project_id, + loadbalancer_id=loadbalancer.id, + listener_id=listener.id, + protocol=listener.protocol, + id=uuidutils.generate_uuid()) + + def ensure_member(self, endpoints, loadbalancer, pool, subnet_id, ip, port, + target_ref): + name = "%s:%s:%s" % (loadbalancer.name, ip, port) + return obj_lbaas.LBaaSMember(name=name, + project_id=pool.project_id, + pool_id=pool.id, + subnet_id=subnet_id, + ip=ip, + port=port, + id=uuidutils.generate_uuid()) + + def release_loadbalancer(self, endpoints, loadbalancer): + pass + + def release_listener(self, endpoints, loadbalancer, listener): + pass + + def release_pool(self, endpoints, loadbalancer, pool): + pass + + def release_member(self, endpoints, loadbalancer, member): + pass + + +class TestLoadBalancerHandler(test_base.TestCase): + @mock.patch('kuryr_kubernetes.controller.drivers.base' + '.PodSubnetsDriver.get_instance') + @mock.patch('kuryr_kubernetes.controller.drivers.base' + '.PodProjectDriver.get_instance') + @mock.patch('kuryr_kubernetes.controller.drivers.base' + '.LBaaSDriver.get_instance') + def test_init(self, m_get_drv_lbaas, m_get_drv_project, m_get_drv_subnets): + m_get_drv_lbaas.return_value = mock.sentinel.drv_lbaas + m_get_drv_project.return_value = mock.sentinel.drv_project + m_get_drv_subnets.return_value = mock.sentinel.drv_subnets + + handler = h_lbaas.LoadBalancerHandler() + + self.assertEqual(mock.sentinel.drv_lbaas, handler._drv_lbaas) + self.assertEqual(mock.sentinel.drv_project, handler._drv_pod_project) + self.assertEqual(mock.sentinel.drv_subnets, handler._drv_pod_subnets) + + def test_on_present(self): + lbaas_spec = mock.sentinel.lbaas_spec + lbaas_state = mock.sentinel.lbaas_state + endpoints = mock.sentinel.endpoints + + m_handler = mock.Mock(spec=h_lbaas.LoadBalancerHandler) + m_handler._get_lbaas_spec.return_value = lbaas_spec + m_handler._should_ignore.return_value = False + m_handler._get_lbaas_state.return_value = lbaas_state + m_handler._sync_lbaas_members.return_value = True + + h_lbaas.LoadBalancerHandler.on_present(m_handler, endpoints) + + m_handler._get_lbaas_spec.assert_called_once_with(endpoints) + m_handler._should_ignore.assert_called_once_with(endpoints, lbaas_spec) + m_handler._get_lbaas_state.assert_called_once_with(endpoints) + m_handler._sync_lbaas_members.assert_called_once_with( + endpoints, lbaas_state, lbaas_spec) + m_handler._set_lbaas_state.assert_called_once_with( + endpoints, lbaas_state) + + @mock.patch('kuryr_kubernetes.objects.lbaas' + '.LBaaSServiceSpec') + def test_on_deleted(self, m_svc_spec_ctor): + endpoints = mock.sentinel.endpoints + empty_spec = mock.sentinel.empty_spec + lbaas_state = mock.sentinel.lbaas_state + m_svc_spec_ctor.return_value = empty_spec + + m_handler = mock.Mock(spec=h_lbaas.LoadBalancerHandler) + m_handler._get_lbaas_state.return_value = lbaas_state + + h_lbaas.LoadBalancerHandler.on_deleted(m_handler, endpoints) + + m_handler._get_lbaas_state.assert_called_once_with(endpoints) + m_handler._sync_lbaas_members.assert_called_once_with( + endpoints, lbaas_state, empty_spec) + + def test_should_ignore(self): + endpoints = mock.sentinel.endpoints + lbaas_spec = mock.sentinel.lbaas_spec + + # REVISIT(ivc): ddt? + m_handler = mock.Mock(spec=h_lbaas.LoadBalancerHandler) + m_handler._has_pods.return_value = True + m_handler._is_lbaas_spec_in_sync.return_value = True + + ret = h_lbaas.LoadBalancerHandler._should_ignore( + m_handler, endpoints, lbaas_spec) + self.assertEqual(False, ret) + + m_handler._has_pods.assert_called_once_with(endpoints) + m_handler._is_lbaas_spec_in_sync.assert_called_once_with( + endpoints, lbaas_spec) + + def test_is_lbaas_spec_in_sync(self): + names = ['a', 'b', 'c'] + endpoints = {'subsets': [{'ports': [{'name': n} for n in names]}]} + lbaas_spec = obj_lbaas.LBaaSServiceSpec(ports=[ + obj_lbaas.LBaaSPortSpec(name=n) for n in reversed(names)]) + + m_handler = mock.Mock(spec=h_lbaas.LoadBalancerHandler) + ret = h_lbaas.LoadBalancerHandler._is_lbaas_spec_in_sync( + m_handler, endpoints, lbaas_spec) + + self.assertEqual(True, ret) + + def test_has_pods(self): + # REVISIT(ivc): ddt? + endpoints = {'subsets': [ + {}, + {'addresses': []}, + {'addresses': [{'targetRef': {}}]}, + {'addresses': [{'targetRef': {'kind': k_const.K8S_OBJ_POD}}]} + ]} + + m_handler = mock.Mock(spec=h_lbaas.LoadBalancerHandler) + + ret = h_lbaas.LoadBalancerHandler._has_pods(m_handler, endpoints) + + self.assertEqual(True, ret) + + def test_get_pod_subnet(self): + subnet_id = mock.sentinel.subnet_id + project_id = mock.sentinel.project_id + target_ref = {'kind': k_const.K8S_OBJ_POD, + 'name': 'pod-name', + 'namespace': 'default'} + ip = '1.2.3.4' + m_handler = mock.Mock(spec=h_lbaas.LoadBalancerHandler) + m_drv_pod_project = mock.Mock() + m_drv_pod_project.get_project.return_value = project_id + m_handler._drv_pod_project = m_drv_pod_project + m_drv_pod_subnets = mock.Mock() + m_drv_pod_subnets.get_subnets.return_value = { + subnet_id: osv_network.Network(subnets=osv_subnet.SubnetList( + objects=[osv_subnet.Subnet(cidr='1.2.3.0/24')]))} + m_handler._drv_pod_subnets = m_drv_pod_subnets + + observed_subnet_id = h_lbaas.LoadBalancerHandler._get_pod_subnet( + m_handler, target_ref, ip) + + self.assertEqual(subnet_id, observed_subnet_id) + + def _generate_lbaas_state(self, vip, targets, project_id, subnet_id): + endpoints = mock.sentinel.endpoints + drv = FakeLBaaSDriver() + lb = drv.ensure_loadbalancer( + endpoints, project_id, subnet_id, vip, None) + listeners = {} + pools = {} + members = {} + for ip, (listen_port, target_port) in targets.items(): + lsnr = listeners.setdefault(listen_port, drv.ensure_listener( + endpoints, lb, 'TCP', listen_port)) + pool = pools.setdefault(listen_port, drv.ensure_pool( + endpoints, lb, lsnr)) + members.setdefault((ip, listen_port, target_port), + drv.ensure_member(endpoints, lb, pool, + subnet_id, ip, + target_port, None)) + return obj_lbaas.LBaaSState( + loadbalancer=lb, + listeners=list(listeners.values()), + pools=list(pools.values()), + members=list(members.values())) + + def _generate_lbaas_spec(self, vip, targets, project_id, subnet_id): + return obj_lbaas.LBaaSServiceSpec( + ip=vip, + project_id=project_id, + subnet_id=subnet_id, + ports=[obj_lbaas.LBaaSPortSpec(name=str(port), + protocol='TCP', + port=port) + for port in set(t[0] for t in targets.values())]) + + def _generate_endpoints(self, targets): + def _target_to_port(item): + _, (listen_port, target_port) = item + return {'port': target_port, 'name': str(listen_port)} + port_with_addrs = [ + (p, [e[0] for e in grp]) + for p, grp in itertools.groupby( + sorted(targets.items()), _target_to_port)] + return { + 'subsets': [ + { + 'addresses': [ + { + 'ip': ip, + 'targetRef': { + 'kind': k_const.K8S_OBJ_POD, + 'name': ip, + 'namespace': 'default' + } + } + for ip in addrs + ], + 'ports': [port] + } + for port, addrs in port_with_addrs + ] + } + + @mock.patch('kuryr_kubernetes.controller.drivers.base' + '.PodSubnetsDriver.get_instance') + @mock.patch('kuryr_kubernetes.controller.drivers.base' + '.PodProjectDriver.get_instance') + @mock.patch('kuryr_kubernetes.controller.drivers.base' + '.LBaaSDriver.get_instance') + def test_sync_lbaas_members(self, m_get_drv_lbaas, m_get_drv_project, + m_get_drv_subnets): + # REVISIT(ivc): test methods separately and verify ensure/release + project_id = uuidutils.generate_uuid() + subnet_id = uuidutils.generate_uuid() + current_ip = '1.1.1.1' + current_targets = { + '1.1.1.101': (1001, 10001), + '1.1.1.111': (1001, 10001), + '1.1.1.201': (2001, 20001)} + expected_ip = '2.2.2.2' + expected_targets = { + '2.2.2.101': (1201, 12001), + '2.2.2.111': (1201, 12001), + '2.2.2.201': (2201, 22001)} + endpoints = self._generate_endpoints(expected_targets) + state = self._generate_lbaas_state( + current_ip, current_targets, project_id, subnet_id) + spec = self._generate_lbaas_spec(expected_ip, expected_targets, + project_id, subnet_id) + + m_drv_lbaas = mock.Mock(wraps=FakeLBaaSDriver()) + m_drv_project = mock.Mock() + m_drv_project.get_project.return_value = project_id + m_drv_subnets = mock.Mock() + m_drv_subnets.get_subnets.return_value = { + subnet_id: mock.sentinel.subnet} + m_get_drv_lbaas.return_value = m_drv_lbaas + m_get_drv_project.return_value = m_drv_project + m_get_drv_subnets.return_value = m_drv_subnets + + handler = h_lbaas.LoadBalancerHandler() + + with mock.patch.object(handler, '_get_pod_subnet') as m_get_pod_subnet: + m_get_pod_subnet.return_value = subnet_id + handler._sync_lbaas_members(endpoints, state, spec) + + lsnrs = {lsnr.id: lsnr for lsnr in state.listeners} + pools = {pool.id: pool for pool in state.pools} + observed_targets = sorted( + (str(member.ip), ( + lsnrs[pools[member.pool_id].listener_id].port, + member.port)) + for member in state.members) + self.assertEqual(sorted(expected_targets.items()), observed_targets) + self.assertEqual(expected_ip, str(state.loadbalancer.ip)) + + def test_get_lbaas_spec(self): + self.skipTest("skipping until generalised annotation handling is " + "implemented") + + def test_get_lbaas_state(self): + self.skipTest("skipping until generalised annotation handling is " + "implemented") + + def test_set_lbaas_state(self): + self.skipTest("skipping until generalised annotation handling is " + "implemented") diff --git a/setup.cfg b/setup.cfg index e6352cb9f..156c1f49b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -61,6 +61,9 @@ kuryr_kubernetes.controller.drivers.pod_vif = generic = kuryr_kubernetes.controller.drivers.generic_vif:GenericPodVIFDriver nested-vlan = kuryr_kubernetes.controller.drivers.nested_vlan_vif:NestedVlanPodVIFDriver +kuryr_kubernetes.controller.drivers.endpoints_lbaas = + lbaasv2 = kuryr_kubernetes.controller.drivers.lbaasv2:LBaaSv2Driver + [files] packages = kuryr_kubernetes