From 30369502bba47a652a9152673e26a2b35917972a Mon Sep 17 00:00:00 2001 From: Luis Tomas Bolivar Date: Fri, 7 Dec 2018 18:31:50 +0100 Subject: [PATCH] Add support for podSelector This include support for both types, when pod selector is used alone or together with a namespace selector. TODO in follow up patch sets: - React to new pods/namespaces created with labels - React to pod/namespaces relabeling/deletion Partially Implements: blueprint k8s-network-policies Change-Id: Ie29b9da64fcd5df7b9a0e9af7b4835208f76da66 --- .../controller/drivers/network_policy.py | 273 ++++++++++-------- kuryr_kubernetes/controller/drivers/utils.py | 50 +++- .../controller/drivers/test_network_policy.py | 3 +- 3 files changed, 206 insertions(+), 120 deletions(-) diff --git a/kuryr_kubernetes/controller/drivers/network_policy.py b/kuryr_kubernetes/controller/drivers/network_policy.py index 9912eefab..912b4596f 100644 --- a/kuryr_kubernetes/controller/drivers/network_policy.py +++ b/kuryr_kubernetes/controller/drivers/network_policy.py @@ -12,9 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - -from six.moves.urllib.parse import urlencode - from oslo_log import log as logging from neutronclient.common import exceptions as n_exc @@ -72,7 +69,7 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver): LOG.debug("Already existing CRD %s", crd_name) sg_id = crd['spec']['securityGroupId'] # Fetch existing SG rules from kuryrnetpolicy CRD - existing_sg_rules = None + existing_sg_rules = [] existing_i_rules = crd['spec'].get('ingressSgRules') existing_e_rules = crd['spec'].get('egressSgRules') if existing_i_rules or existing_e_rules: @@ -158,7 +155,7 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver): for e_rule in e_rules: sgr_id = self._create_security_group_rule(e_rule) e_rule['security_group_rule']['id'] = sgr_id - except n_exc.NeutronClientException: + except (n_exc.NeutronClientException, exceptions.ResourceNotReady): LOG.exception("Error creating security group for network policy " " %s", policy['metadata']['name']) # If there's any issue creating sg rules, remove them @@ -183,36 +180,161 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver): LOG.exception('Error annotating network policy') raise - def _get_namespaces_cidr(self, namespace_selector): + def _get_pods_ips(self, pod_selector, namespace=None, + namespace_selector=None): + ips = [] + matching_pods = [] + if namespace_selector: + matching_namespaces = utils.get_namespaces(namespace_selector) + for ns in matching_namespaces.get('items'): + matching_pods = utils.get_pods(pod_selector, + ns['metadata']['name']) + else: + matching_pods = utils.get_pods(pod_selector, namespace) + for pod in matching_pods.get('items'): + if pod['status']['podIP']: + ips.append(pod['status']['podIP']) + return ips + + def _get_namespace_subnet_cidr(self, namespace): + try: + ns_annotations = namespace['metadata']['annotations'] + ns_name = ns_annotations[constants.K8S_ANNOTATION_NET_CRD] + except KeyError: + LOG.exception('Namespace handler must be enabled to support ' + 'Network Policies with namespaceSelector') + raise exceptions.ResourceNotReady(namespace) + try: + net_crd = self.kubernetes.get('{}/kuryrnets/{}'.format( + constants.K8S_API_CRD, ns_name)) + except exceptions.K8sClientException: + LOG.exception("Kubernetes Client Exception.") + raise + return net_crd['spec']['subnetCIDR'] + + def _get_namespaces_cidr(self, namespace_selector, namespace=None): cidrs = [] - namespace_label = urlencode(namespace_selector[ - 'matchLabels']) - # NOTE(maysams): K8s API does not accept &, so we need to replace - # it with ',' or '%2C' instead - namespace_label = namespace_label.replace('&', ',') - matching_namespaces = self.kubernetes.get( - '{}/namespaces?labelSelector={}'.format( - constants.K8S_API_BASE, namespace_label)).get('items') - for ns in matching_namespaces: - # NOTE(ltomasbo): This requires the namespace handler to be - # also enabled - try: - ns_annotations = ns['metadata']['annotations'] - ns_name = ns_annotations[constants.K8S_ANNOTATION_NET_CRD] - except KeyError: - LOG.exception('Namespace handler must be enabled to support ' - 'Network Policies with namespaceSelector') - raise - try: - net_crd = self.kubernetes.get('{}/kuryrnets/{}'.format( - constants.K8S_API_CRD, ns_name)) - except exceptions.K8sClientException: - LOG.exception("Kubernetes Client Exception.") - raise - ns_cidr = net_crd['spec']['subnetCIDR'] + if not namespace_selector and namespace: + ns = self.kubernetes.get( + '{}/namespaces/{}'.format(constants.K8S_API_BASE, namespace)) + ns_cidr = self._get_namespace_subnet_cidr(ns) cidrs.append(ns_cidr) + else: + matching_namespaces = utils.get_namespaces(namespace_selector) + for ns in matching_namespaces.get('items'): + # NOTE(ltomasbo): This requires the namespace handler to be + # also enabled + ns_cidr = self._get_namespace_subnet_cidr(ns) + cidrs.append(ns_cidr) return cidrs + def _parse_selectors(self, rule_block, rule_direction, policy_namespace): + allowed_cidrs = [] + allow_all = False + selectors = False + for rule in rule_block.get(rule_direction, []): + namespace_selector = rule.get('namespaceSelector') + pod_selector = rule.get('podSelector') + if namespace_selector == {}: + selectors = True + if pod_selector: + # allow matching pods in all namespaces + allowed_cidrs.extend(self._get_pods_ips( + pod_selector)) + else: + # allow from all + allow_all = True + elif namespace_selector: + selectors = True + if pod_selector: + # allow matching pods on maching namespaces + allowed_cidrs.extend(self._get_pods_ips( + pod_selector, + namespace_selector=namespace_selector)) + else: + # allow from/to all on the maching namespaces + allowed_cidrs.extend(self._get_namespaces_cidr( + namespace_selector)) + else: + if pod_selector == {}: + # allow from/to all pods on the network policy + # namespace + selectors = True + allowed_cidrs.extend(self._get_namespaces_cidr( + None, + namespace=policy_namespace)) + elif pod_selector: + # allow matching pods on the network policy + # namespace + selectors = True + allowed_cidrs.extend(self._get_pods_ips( + pod_selector, + namespace=policy_namespace)) + return allow_all, selectors, allowed_cidrs + + def _parse_sg_rules(self, sg_rule_body_list, direction, policy, sg_id): + rule_list = policy['spec'].get(direction) + if not rule_list: + return + + policy_namespace = policy['metadata']['namespace'] + rule_direction = 'from' + if direction == 'egress': + rule_direction = 'to' + + if rule_list[0] == {}: + LOG.debug('Applying default all open policy from %s', + policy['metadata']['selfLink']) + rule = self._create_security_group_rule_body( + sg_id, direction, port_range_min=1, port_range_max=65535) + sg_rule_body_list.append(rule) + + for rule_block in rule_list: + LOG.debug('Parsing %(dir)s Rule %(rule)s', {'dir': direction, + 'rule': rule_block}) + allow_all, selectors, allowed_cidrs = self._parse_selectors( + rule_block, rule_direction, policy_namespace) + + if 'ports' in rule_block: + for port in rule_block['ports']: + if allowed_cidrs or allow_all or selectors: + for cidr in allowed_cidrs: + rule = self._create_security_group_rule_body( + sg_id, direction, port.get('port'), + protocol=port.get('protocol'), + cidr=cidr) + sg_rule_body_list.append(rule) + if allow_all: + rule = self._create_security_group_rule_body( + sg_id, direction, port.get('port'), + protocol=port.get('protocol')) + sg_rule_body_list.append(rule) + else: + rule = self._create_security_group_rule_body( + sg_id, direction, port.get('port'), + protocol=port.get('protocol')) + sg_rule_body_list.append(rule) + elif allowed_cidrs or allow_all or selectors: + for cidr in allowed_cidrs: + rule = self._create_security_group_rule_body( + sg_id, direction, + port_range_min=1, + port_range_max=65535, + cidr=cidr) + sg_rule_body_list.append(rule) + if allow_all: + rule = self._create_security_group_rule_body( + sg_id, direction, + port_range_min=1, + port_range_max=65535) + sg_rule_body_list.append(rule) + else: + LOG.debug('This network policy specifies no %(direction)s ' + '%(rule_direction)s and no ports: %(policy)s', + {'direction': direction, + 'rule_direction': rule_direction, + 'policy': policy['metadata']['selfLink']}) + def parse_network_policy_rules(self, policy, sg_id): """Create security group rule bodies out of network policies. @@ -221,94 +343,13 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver): ingress and egress ports blocks. """ LOG.debug('Parsing Network Policy %s' % policy['metadata']['name']) - ingress_rule_list = policy['spec'].get('ingress') - egress_rule_list = policy['spec'].get('egress') ingress_sg_rule_body_list = [] egress_sg_rule_body_list = [] - if ingress_rule_list: - if ingress_rule_list[0] == {}: - LOG.debug('Applying default all open policy from %s', - policy['metadata']['selfLink']) - i_rule = self._create_security_group_rule_body( - sg_id, 'ingress', port_range_min=1, port_range_max=65535) - ingress_sg_rule_body_list.append(i_rule) - for ingress_rule in ingress_rule_list: - LOG.debug('Parsing Ingress Rule %s', ingress_rule) - allowed_cidrs = [] - for from_rule in ingress_rule.get('from', []): - namespace_selector = from_rule.get('namespaceSelector') - if namespace_selector: - allowed_cidrs = self._get_namespaces_cidr( - namespace_selector) - if 'ports' in ingress_rule: - for port in ingress_rule['ports']: - if allowed_cidrs: - for cidr in allowed_cidrs: - i_rule = self._create_security_group_rule_body( - sg_id, 'ingress', port.get('port'), - protocol=port.get('protocol'), - cidr=cidr) - ingress_sg_rule_body_list.append(i_rule) - else: - i_rule = self._create_security_group_rule_body( - sg_id, 'ingress', port.get('port'), - protocol=port.get('protocol')) - ingress_sg_rule_body_list.append(i_rule) - elif allowed_cidrs: - for cidr in allowed_cidrs: - i_rule = self._create_security_group_rule_body( - sg_id, 'ingress', - port_range_min=1, - port_range_max=65535, - cidr=cidr) - ingress_sg_rule_body_list.append(i_rule) - else: - LOG.debug('This network policy specifies no ingress from ' - 'and no ports: %s', - policy['metadata']['selfLink']) - - if egress_rule_list: - if egress_rule_list[0] == {}: - LOG.debug('Applying default all open policy from %s', - policy['metadata']['selfLink']) - e_rule = self._create_security_group_rule_body( - sg_id, 'egress', port_range_min=1, port_range_max=65535) - egress_sg_rule_body_list.append(e_rule) - for egress_rule in egress_rule_list: - LOG.debug('Parsing Egress Rule %s', egress_rule) - allowed_cidrs = [] - for from_rule in egress_rule.get('to', []): - namespace_selector = from_rule.get('namespaceSelector') - if namespace_selector: - allowed_cidrs = self._get_namespaces_cidr( - namespace_selector) - if 'ports' in egress_rule: - for port in egress_rule['ports']: - if allowed_cidrs: - for cidr in allowed_cidrs: - e_rule = self._create_security_group_rule_body( - sg_id, 'egress', port.get('port'), - protocol=port.get('protocol'), - cidr=cidr) - egress_sg_rule_body_list.append(e_rule) - else: - e_rule = self._create_security_group_rule_body( - sg_id, 'egress', port.get('port'), - protocol=port.get('protocol')) - egress_sg_rule_body_list.append(e_rule) - elif allowed_cidrs: - for cidr in allowed_cidrs: - e_rule = self._create_security_group_rule_body( - sg_id, 'egress', - port_range_min=1, - port_range_max=65535, - cidr=cidr) - egress_sg_rule_body_list.append(e_rule) - else: - LOG.debug('This network policy specifies no egrees to ' - 'and no ports: %s', - policy['metadata']['selfLink']) + self._parse_sg_rules(ingress_sg_rule_body_list, 'ingress', policy, + sg_id) + self._parse_sg_rules(egress_sg_rule_body_list, 'egress', policy, + sg_id) return ingress_sg_rule_body_list, egress_sg_rule_body_list diff --git a/kuryr_kubernetes/controller/drivers/utils.py b/kuryr_kubernetes/controller/drivers/utils.py index 63f1430d3..871830a35 100644 --- a/kuryr_kubernetes/controller/drivers/utils.py +++ b/kuryr_kubernetes/controller/drivers/utils.py @@ -66,6 +66,18 @@ def is_host_network(pod): def get_pods(selector, namespace): + """Return a k8s object list with the pods matching the selector. + + It accepts an optional parameter to state the namespace where the pod + selector will be apply. If empty namespace is passed, then the pod + selector is applied in all namespaces. + + param selector: k8s selector of types matchLabels or matchExpressions + param namespace: namespace name where the selector will be applied. If + None, the pod selector is applied in all namespaces + return: k8s list objec containing all matching pods + + """ kubernetes = clients.get_kubernetes_client() labels = selector.get('matchLabels', None) if labels: @@ -83,13 +95,45 @@ def get_pods(selector, namespace): else: labels = parse.quote(exps) - pods = kubernetes.get( - '{}/namespaces/{}/pods?labelSelector={}'.format( - constants.K8S_API_BASE, namespace, labels)) + if namespace: + pods = kubernetes.get( + '{}/namespaces/{}/pods?labelSelector={}'.format( + constants.K8S_API_BASE, namespace, labels)) + else: + pods = kubernetes.get( + '{}/pods?labelSelector={}'.format(constants.K8S_API_BASE, labels)) return pods +def get_namespaces(selector): + """Return a k8s object list with the namespaces matching the selector. + + param selector: k8s selector of types matchLabels or matchExpressions + return: k8s list objec containing all matching namespaces + + """ + kubernetes = clients.get_kubernetes_client() + labels = selector.get('matchLabels', None) + if labels: + labels = replace_encoded_characters(labels) + + exps = selector.get('matchExpressions', None) + if exps: + exps = ', '.join(format_expression(exp) for exp in exps) + if labels: + expressions = parse.quote("," + exps) + labels += expressions + else: + labels = parse.quote(exps) + + namespaces = kubernetes.get( + '{}/namespaces?labelSelector={}'.format( + constants.K8S_API_BASE, labels)) + + return namespaces + + def format_expression(expression): key = expression['key'] operator = expression['operator'].lower() diff --git a/kuryr_kubernetes/tests/unit/controller/drivers/test_network_policy.py b/kuryr_kubernetes/tests/unit/controller/drivers/test_network_policy.py index d012a33a4..563e29bf4 100644 --- a/kuryr_kubernetes/tests/unit/controller/drivers/test_network_policy.py +++ b/kuryr_kubernetes/tests/unit/controller/drivers/test_network_policy.py @@ -306,7 +306,8 @@ class TestNetworkPolicyDriver(test_base.TestCase): pod = get_pod_obj() self.kubernetes.get.return_value = {'items': [pod]} - self.assertRaises(KeyError, self._driver._get_namespaces_cidr, + self.assertRaises(exceptions.ResourceNotReady, + self._driver._get_namespaces_cidr, namespace_selector) self.kubernetes.get.assert_called_once()