From 0b95415c1482ac71b8bd101e3689b4d58456db30 Mon Sep 17 00:00:00 2001 From: Maysa Macedo Date: Mon, 17 Dec 2018 12:53:51 +0000 Subject: [PATCH] Update CRD when NP has podSelectors When a pod gets created, deleted or updated and its labels matches the PodSelector of a NP, the sg must be updated. Partially Implements: blueprint k8s-network-policies Change-Id: Ic0dd3bc93e2453460c4d8dea360efd414b6ae42b --- kuryr_kubernetes/controller/drivers/base.py | 21 ++ .../drivers/default_security_groups.py | 12 ++ .../drivers/namespace_security_groups.py | 12 ++ .../controller/drivers/network_policy.py | 89 ++------ .../drivers/network_policy_security_groups.py | 204 +++++++++++++++--- kuryr_kubernetes/controller/drivers/utils.py | 128 +++++++++++ .../controller/handlers/pod_label.py | 2 + kuryr_kubernetes/controller/handlers/vif.py | 14 +- kuryr_kubernetes/opts.py | 2 + .../controller/drivers/test_network_policy.py | 37 ++-- .../test_network_policy_security_groups.py | 185 +++++++++++++++- 11 files changed, 588 insertions(+), 118 deletions(-) diff --git a/kuryr_kubernetes/controller/drivers/base.py b/kuryr_kubernetes/controller/drivers/base.py index 68db9ea7b..00bc5685f 100644 --- a/kuryr_kubernetes/controller/drivers/base.py +++ b/kuryr_kubernetes/controller/drivers/base.py @@ -247,6 +247,27 @@ class PodSecurityGroupsDriver(DriverBase): """ raise NotImplementedError() + def create_sg_rules(self, pod): + """Create security group rules for a pod. + + :param pod: dict containing Kubernetes Pod object + """ + raise NotImplementedError() + + def delete_sg_rules(self, pod): + """Delete security group rules for a pod + + :param pod: dict containing Kubernetes Pod object + """ + raise NotImplementedError() + + def update_sg_rules(self, pod): + """Update security group rules for a pod + + :param pod: dict containing Kubernetes Pod object + """ + raise NotImplementedError() + @six.add_metaclass(abc.ABCMeta) class ServiceSecurityGroupsDriver(DriverBase): diff --git a/kuryr_kubernetes/controller/drivers/default_security_groups.py b/kuryr_kubernetes/controller/drivers/default_security_groups.py index 88faee0da..0a4dd9573 100644 --- a/kuryr_kubernetes/controller/drivers/default_security_groups.py +++ b/kuryr_kubernetes/controller/drivers/default_security_groups.py @@ -47,6 +47,18 @@ class DefaultPodSecurityGroupsDriver(base.PodSecurityGroupsDriver): LOG.debug("Security group driver does not implement deleting " "SGs.") + def create_sg_rules(self, pod): + LOG.debug("Security group driver does not create SG rules for " + "the pods.") + + def delete_sg_rules(self, pod): + LOG.debug("Security group driver does not delete SG rules for " + "the pods.") + + def update_sg_rules(self, pod): + LOG.debug("Security group driver does not update SG rules for " + "the pods.") + class DefaultServiceSecurityGroupsDriver(base.ServiceSecurityGroupsDriver): """Provides security groups for Service based on a configuration option.""" diff --git a/kuryr_kubernetes/controller/drivers/namespace_security_groups.py b/kuryr_kubernetes/controller/drivers/namespace_security_groups.py index cf44bbec2..3e25b56c2 100644 --- a/kuryr_kubernetes/controller/drivers/namespace_security_groups.py +++ b/kuryr_kubernetes/controller/drivers/namespace_security_groups.py @@ -131,6 +131,18 @@ class NamespacePodSecurityGroupsDriver(base.PodSecurityGroupsDriver): LOG.exception("Error deleting security group %s.", sg_id) raise + def create_sg_rules(self, pod): + LOG.debug("Security group driver does not create SG rules for " + "the pods.") + + def delete_sg_rules(self, pod): + LOG.debug("Security group driver does not delete SG rules for " + "the pods.") + + def update_sg_rules(self, pod): + LOG.debug("Security group driver does not update SG rules for " + "the pods.") + class NamespaceServiceSecurityGroupsDriver(base.ServiceSecurityGroupsDriver): """Provides security groups for Service based on a configuration option.""" diff --git a/kuryr_kubernetes/controller/drivers/network_policy.py b/kuryr_kubernetes/controller/drivers/network_policy.py index 912b4596f..80162571c 100644 --- a/kuryr_kubernetes/controller/drivers/network_policy.py +++ b/kuryr_kubernetes/controller/drivers/network_policy.py @@ -93,14 +93,14 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver): current_sg_rules] for sg_rule in sg_rules_to_delete: try: - self._delete_security_group_rule(sgr_ids[sg_rule]) + utils.delete_security_group_rule(sgr_ids[sg_rule]) except n_exc.NotFound: LOG.debug('Trying to delete non existing sg_rule %s', sg_rule) # Create new rules that weren't already on the security group sg_rules_to_add = [rule for rule in current_sg_rules if rule not in existing_sg_rules] for sg_rule in sg_rules_to_add: - sgr_id = self._create_security_group_rule(sg_rule) + sgr_id = utils.create_security_group_rule(sg_rule) if sg_rule['security_group_rule'].get('direction') == 'ingress': for i_rule in i_rules: if sg_rule == i_rule: @@ -111,17 +111,9 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver): e_rule["security_group_rule"]["id"] = sgr_id # Annotate kuryrnetpolicy CRD with current policy and ruleset pod_selector = policy['spec'].get('podSelector') - LOG.debug('Patching KuryrNetPolicy CRD %s' % crd_name) - try: - self.kubernetes.patch('spec', crd['metadata']['selfLink'], - {'ingressSgRules': i_rules, - 'egressSgRules': e_rules, - 'podSelector': pod_selector, - 'networkpolicy_spec': policy['spec']}) + utils.patch_kuryr_crd(crd, i_rules, e_rules, pod_selector, + np_spec=policy['spec']) - except exceptions.K8sClientException: - LOG.exception('Error updating kuryrnetpolicy CRD %s', crd_name) - raise if existing_pod_selector != pod_selector: return existing_pod_selector return False @@ -150,11 +142,13 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver): sg_id = sg['security_group']['id'] i_rules, e_rules = self.parse_network_policy_rules(policy, sg_id) for i_rule in i_rules: - sgr_id = self._create_security_group_rule(i_rule) + sgr_id = utils.create_security_group_rule(i_rule) i_rule['security_group_rule']['id'] = sgr_id + for e_rule in e_rules: - sgr_id = self._create_security_group_rule(e_rule) + sgr_id = utils.create_security_group_rule(e_rule) e_rule['security_group_rule']['id'] = sgr_id + except (n_exc.NeutronClientException, exceptions.ResourceNotReady): LOG.exception("Error creating security group for network policy " " %s", policy['metadata']['name']) @@ -270,6 +264,7 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver): allowed_cidrs.extend(self._get_pods_ips( pod_selector, namespace=policy_namespace)) + return allow_all, selectors, allowed_cidrs def _parse_sg_rules(self, sg_rule_body_list, direction, policy, sg_id): @@ -285,7 +280,7 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver): if rule_list[0] == {}: LOG.debug('Applying default all open policy from %s', policy['metadata']['selfLink']) - rule = self._create_security_group_rule_body( + rule = utils.create_security_group_rule_body( sg_id, direction, port_range_min=1, port_range_max=65535) sg_rule_body_list.append(rule) @@ -299,31 +294,31 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver): for port in rule_block['ports']: if allowed_cidrs or allow_all or selectors: for cidr in allowed_cidrs: - rule = self._create_security_group_rule_body( + rule = utils.create_security_group_rule_body( sg_id, direction, port.get('port'), protocol=port.get('protocol'), cidr=cidr) sg_rule_body_list.append(rule) if allow_all: - rule = self._create_security_group_rule_body( + rule = utils.create_security_group_rule_body( sg_id, direction, port.get('port'), protocol=port.get('protocol')) sg_rule_body_list.append(rule) else: - rule = self._create_security_group_rule_body( + rule = utils.create_security_group_rule_body( sg_id, direction, port.get('port'), protocol=port.get('protocol')) sg_rule_body_list.append(rule) elif allowed_cidrs or allow_all or selectors: for cidr in allowed_cidrs: - rule = self._create_security_group_rule_body( + rule = utils.create_security_group_rule_body( sg_id, direction, port_range_min=1, port_range_max=65535, cidr=cidr) sg_rule_body_list.append(rule) if allow_all: - rule = self._create_security_group_rule_body( + rule = utils.create_security_group_rule_body( sg_id, direction, port_range_min=1, port_range_max=65535) @@ -353,59 +348,6 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver): return ingress_sg_rule_body_list, egress_sg_rule_body_list - def _create_security_group_rule_body( - self, security_group_id, direction, port_range_min, - port_range_max=None, protocol=None, ethertype='IPv4', cidr=None, - description="Kuryr-Kubernetes NetPolicy SG rule"): - if not port_range_min: - port_range_min = 1 - port_range_max = 65535 - elif not port_range_max: - port_range_max = port_range_min - if not protocol: - protocol = 'TCP' - security_group_rule_body = { - u'security_group_rule': { - u'ethertype': ethertype, - u'security_group_id': security_group_id, - u'description': description, - u'direction': direction, - u'protocol': protocol.lower(), - u'port_range_min': port_range_min, - u'port_range_max': port_range_max - } - } - if cidr: - security_group_rule_body[u'security_group_rule'][ - u'remote_ip_prefix'] = cidr - LOG.debug("Creating sg rule body %s", security_group_rule_body) - return security_group_rule_body - - def _create_security_group_rule(self, body): - sgr = '' - try: - sgr = self.neutron.create_security_group_rule( - body=body) - except n_exc.Conflict: - LOG.debug("Failed to create already existing security group " - "rule %s", body) - except n_exc.NeutronClientException: - LOG.debug("Error creating security group rule") - raise - return sgr["security_group_rule"]["id"] - - def _delete_security_group_rule(self, security_group_rule_id): - try: - self.neutron.delete_security_group_rule( - security_group_rule=security_group_rule_id) - except n_exc.NotFound: - LOG.debug("Error deleting security group rule as it does not " - "exist: %s", security_group_rule_id) - except n_exc.NeutronClientException: - LOG.debug("Error deleting security group rule: %s", - security_group_rule_id) - raise - def release_network_policy(self, netpolicy_crd): if netpolicy_crd is not None: try: @@ -460,7 +402,6 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver): netpolicy_crd_name = "np-" + networkpolicy_name namespace = policy['metadata']['namespace'] pod_selector = policy['spec'].get('podSelector') - netpolicy_crd = { 'apiVersion': 'openstack.org/v1', 'kind': constants.K8S_OBJ_KURYRNETPOLICY, diff --git a/kuryr_kubernetes/controller/drivers/network_policy_security_groups.py b/kuryr_kubernetes/controller/drivers/network_policy_security_groups.py index 641af9369..10e6d1a94 100644 --- a/kuryr_kubernetes/controller/drivers/network_policy_security_groups.py +++ b/kuryr_kubernetes/controller/drivers/network_policy_security_groups.py @@ -17,6 +17,7 @@ from kuryr_kubernetes import clients from kuryr_kubernetes import config from kuryr_kubernetes import constants from kuryr_kubernetes.controller.drivers import base +from kuryr_kubernetes.controller.drivers import utils as driver_utils from kuryr_kubernetes import exceptions from oslo_config import cfg @@ -29,12 +30,15 @@ OPERATORS_WITH_VALUES = [constants.K8S_OPERATOR_IN, constants.K8S_OPERATOR_NOT_IN] -def _get_kuryrnetpolicy_crds(namespace='default'): +def _get_kuryrnetpolicy_crds(namespace=None): kubernetes = clients.get_kubernetes_client() try: - knp_path = '{}/{}/kuryrnetpolicies'.format( - constants.K8S_API_CRD_NAMESPACES, namespace) + if namespace: + knp_path = '{}/{}/kuryrnetpolicies'.format( + constants.K8S_API_CRD_NAMESPACES, namespace) + else: + knp_path = constants.K8S_API_CRD_KURYRNETPOLICIES LOG.debug("K8s API Query %s", knp_path) knps = kubernetes.get(knp_path) LOG.debug("Return Kuryr Network Policies with label %s", knps) @@ -47,26 +51,26 @@ def _get_kuryrnetpolicy_crds(namespace='default'): return knps -def _match_expressions(expressions, pod_labels): +def _match_expressions(expressions, labels): for exp in expressions: exp_op = exp['operator'].lower() - if pod_labels: + if labels: if exp_op in OPERATORS_WITH_VALUES: exp_values = exp['values'] - pod_value = pod_labels.get(str(exp['key']), None) + label_value = labels.get(str(exp['key']), None) if exp_op == constants.K8S_OPERATOR_IN: - if pod_value is None or pod_value not in exp_values: + if label_value is None or label_value not in exp_values: return False elif exp_op == constants.K8S_OPERATOR_NOT_IN: - if pod_value in exp_values: + if label_value in exp_values: return False else: if exp_op == constants.K8S_OPERATOR_EXISTS: - exists = pod_labels.get(str(exp['key']), None) + exists = labels.get(str(exp['key']), None) if exists is None: return False elif exp_op == constants.K8S_OPERATOR_DOES_NOT_EXIST: - exists = pod_labels.get(str(exp['key']), None) + exists = labels.get(str(exp['key']), None) if exists is not None: return False else: @@ -76,14 +80,118 @@ def _match_expressions(expressions, pod_labels): return True -def _match_labels(crd_labels, pod_labels): - for label_key, label_value in crd_labels.items(): - pod_value = pod_labels.get(label_key, None) - if not pod_value or label_value != pod_value: +def _match_labels(crd_labels, labels): + for crd_key, crd_value in crd_labels.items(): + label_value = labels.get(crd_key, None) + if not label_value or crd_value != label_value: return False return True +def _match_selector(selector, labels): + crd_labels = selector.get('matchLabels', None) + crd_expressions = selector.get('matchExpressions', None) + + match_exp = match_lb = True + if crd_expressions: + match_exp = _match_expressions(crd_expressions, + labels) + if crd_labels and labels: + match_lb = _match_labels(crd_labels, labels) + return match_exp and match_lb + + +def _get_namespace_labels(namespace): + kubernetes = clients.get_kubernetes_client() + + try: + path = '{}/{}'.format( + constants.K8S_API_NAMESPACES, namespace) + LOG.debug("K8s API Query %s", path) + namespaces = kubernetes.get(path) + LOG.debug("Return Namespace: %s", namespaces) + except exceptions.K8sResourceNotFound: + LOG.exception("Namespace not found") + raise + except exceptions.K8sClientException: + LOG.exception("Kubernetes Client Exception") + raise + return namespaces['metadata'].get('labels') + + +def _create_sg_rules(crd, pod, namespace_selector, pod_selector, + rule_block, crd_rules, direction, + matched): + pod_labels = pod['metadata'].get('labels') + + # NOTE (maysams) No need to differentiate between podSelector + # with empty value or with '{}', as they have same result in here. + if (pod_selector and + _match_selector(pod_selector, pod_labels)): + matched = True + pod_ip = driver_utils.get_pod_ip(pod) + sg_id = crd['spec']['securityGroupId'] + if 'ports' in rule_block: + for port in rule_block['ports']: + sg_rule = driver_utils.create_security_group_rule_body( + sg_id, direction, port.get('port'), + protocol=port.get('protocol'), cidr=pod_ip) + sgr_id = driver_utils.create_security_group_rule(sg_rule) + sg_rule['security_group_rule']['id'] = sgr_id + crd_rules.append(sg_rule) + else: + sg_rule = driver_utils.create_security_group_rule_body( + sg_id, direction, + port_range_min=1, + port_range_max=65535, + cidr=pod_ip) + sgr_id = driver_utils.create_security_group_rule(sg_rule) + sg_rule['security_group_rule']['id'] = sgr_id + crd_rules.append(sg_rule) + return matched + + +def _parse_rules(direction, crd, pod): + policy = crd['spec']['networkpolicy_spec'] + + pod_namespace = pod['metadata']['namespace'] + pod_namespace_labels = _get_namespace_labels(pod_namespace) + policy_namespace = crd['metadata']['namespace'] + + rule_direction = 'from' + crd_rules = crd['spec'].get('ingressSgRules') + if direction == 'egress': + rule_direction = 'to' + crd_rules = crd['spec'].get('egressSgRules') + + matched = False + rule_list = policy.get(direction, None) + for rule_block in rule_list: + for rule in rule_block.get(rule_direction, []): + namespace_selector = rule.get('namespaceSelector') + pod_selector = rule.get('podSelector') + if namespace_selector == {}: + if _create_sg_rules(crd, pod, namespace_selector, + pod_selector, rule_block, crd_rules, + direction, matched): + matched = True + elif namespace_selector: + if (pod_namespace_labels and + _match_selector(namespace_selector, + pod_namespace_labels)): + if _create_sg_rules(crd, pod, namespace_selector, + pod_selector, rule_block, crd_rules, + direction, matched): + matched = True + else: + if pod_namespace == policy_namespace: + if _create_sg_rules(crd, pod, namespace_selector, + pod_selector, rule_block, crd_rules, + direction, matched): + matched = True + return matched, crd_rules + + class NetworkPolicySecurityGroupsDriver(base.PodSecurityGroupsDriver): """Provides security groups for pods based on network policies""" @@ -97,16 +205,7 @@ class NetworkPolicySecurityGroupsDriver(base.PodSecurityGroupsDriver): for crd in knp_crds.get('items'): pod_selector = crd['spec'].get('podSelector') if pod_selector: - crd_labels = pod_selector.get('matchLabels', None) - crd_expressions = pod_selector.get('matchExpressions', None) - - match_exp = match_lb = True - if crd_expressions: - match_exp = _match_expressions(crd_expressions, - pod_labels) - if crd_labels and pod_labels: - match_lb = _match_labels(crd_labels, pod_labels) - if match_exp and match_lb: + if _match_selector(pod_selector, pod_labels): LOG.debug("Appending %s", str(crd['spec']['securityGroupId'])) sg_list.append(str(crd['spec']['securityGroupId'])) @@ -124,6 +223,63 @@ class NetworkPolicySecurityGroupsDriver(base.PodSecurityGroupsDriver): return sg_list[:] + def create_sg_rules(self, pod): + LOG.debug("Creating sg rule for pod: %s", pod['metadata']['name']) + knp_crds = _get_kuryrnetpolicy_crds() + for crd in knp_crds.get('items'): + crd_selector = crd['spec'].get('podSelector') + + i_matched, i_rules = _parse_rules('ingress', crd, pod) + e_matched, e_rules = _parse_rules('egress', crd, pod) + + if i_matched or e_matched: + driver_utils.patch_kuryr_crd(crd, i_rules, + e_rules, crd_selector) + + def delete_sg_rules(self, pod): + LOG.debug("Deleting sg rule for pod: %s", pod['metadata']['name']) + pod_ip = driver_utils.get_pod_ip(pod) + + knp_crds = _get_kuryrnetpolicy_crds() + for crd in knp_crds.get('items'): + crd_selector = crd['spec'].get('podSelector') + ingress_rule_list = crd['spec'].get('ingressSgRules') + egress_rule_list = crd['spec'].get('egressSgRules') + i_rules = [] + e_rules = [] + + matched = False + for i_rule in ingress_rule_list: + LOG.debug("Parsing ingress rule: %r", i_rule) + remote_ip_prefix = i_rule['security_group_rule'].get( + 'remote_ip_prefix') + if remote_ip_prefix and remote_ip_prefix == pod_ip: + matched = True + driver_utils.delete_security_group_rule( + i_rule['security_group_rule']['id']) + else: + i_rules.append(i_rule) + + for e_rule in egress_rule_list: + LOG.debug("Parsing egress rule: %r", e_rule) + remote_ip_prefix = e_rule['security_group_rule'].get( + 'remote_ip_prefix') + if remote_ip_prefix and remote_ip_prefix == pod_ip: + matched = True + driver_utils.delete_security_group_rule( + e_rule['security_group_rule']['id']) + else: + e_rules.append(e_rule) + + if matched: + driver_utils.patch_kuryr_crd(crd, i_rules, e_rules, + crd_selector) + + def update_sg_rules(self, pod): + LOG.debug("Updating sg rule for pod: %s", pod['metadata']['name']) + self.delete_sg_rules(pod) + self.create_sg_rules(pod) + def create_namespace_sg(self, namespace, project_id, crd_spec): LOG.debug("Security group driver does not create SGs for the " "namespaces.") diff --git a/kuryr_kubernetes/controller/drivers/utils.py b/kuryr_kubernetes/controller/drivers/utils.py index e53cca4d7..7dd0ec3db 100644 --- a/kuryr_kubernetes/controller/drivers/utils.py +++ b/kuryr_kubernetes/controller/drivers/utils.py @@ -13,6 +13,9 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo_cache import core as cache +from oslo_config import cfg +from oslo_log import log from oslo_serialization import jsonutils from six.moves.urllib import parse @@ -22,9 +25,29 @@ from kuryr_kubernetes import exceptions as k_exc from kuryr_kubernetes import os_vif_util as ovu from kuryr_kubernetes import utils +from neutronclient.common import exceptions as n_exc + OPERATORS_WITH_VALUES = [constants.K8S_OPERATOR_IN, constants.K8S_OPERATOR_NOT_IN] +LOG = log.getLogger(__name__) + +CONF = cfg.CONF + +pod_ip_caching_opts = [ + cfg.BoolOpt('caching', default=True), + cfg.IntOpt('cache_time', default=3600), +] + +CONF.register_opts(pod_ip_caching_opts, "pod_ip_caching") + +cache.configure(CONF) +pod_ip_cache_region = cache.create_region() +MEMOIZE = cache.get_memoization_decorator( + CONF, pod_ip_cache_region, "pod_ip_caching") + +cache.configure_cache_region(CONF, pod_ip_cache_region) + def get_network_id(subnets): ids = ovu.osvif_to_neutron_network_ids(subnets) @@ -155,3 +178,108 @@ def replace_encoded_characters(labels): # the matchLabels with ',' or '%2C' instead labels = labels.replace('&', ',') return labels + + +def create_security_group_rule(body): + neutron = clients.get_neutron_client() + sgr = '' + try: + sgr = neutron.create_security_group_rule( + body=body) + except n_exc.Conflict as ex: + LOG.debug("Failed to create already existing security group " + "rule %s", body) + # Get existent sg rule id from exception message + sgr_id = str(ex).split("Rule id is", 1)[1].split()[0][:-1] + return sgr_id + except n_exc.NeutronClientException: + LOG.debug("Error creating security group rule") + raise + return sgr["security_group_rule"]["id"] + + +def delete_security_group_rule(security_group_rule_id): + neutron = clients.get_neutron_client() + try: + LOG.debug("Deleting sg rule with ID: %s", security_group_rule_id) + neutron.delete_security_group_rule( + security_group_rule=security_group_rule_id) + except n_exc.NotFound: + LOG.debug("Error deleting security group rule as it does not " + "exist: %s", security_group_rule_id) + except n_exc.NeutronClientException: + LOG.debug("Error deleting security group rule: %s", + security_group_rule_id) + raise + + +def patch_kuryr_crd(crd, i_rules, e_rules, pod_selector, np_spec=None): + kubernetes = clients.get_kubernetes_client() + crd_name = crd['metadata']['name'] + if not np_spec: + np_spec = crd['spec']['networkpolicy_spec'] + LOG.debug('Patching KuryrNetPolicy CRD %s' % crd_name) + try: + kubernetes.patch('spec', crd['metadata']['selfLink'], + {'ingressSgRules': i_rules, + 'egressSgRules': e_rules, + 'podSelector': pod_selector, + 'networkpolicy_spec': np_spec}) + except k_exc.K8sClientException: + LOG.exception('Error updating kuryrnetpolicy CRD %s', crd_name) + raise + + +def create_security_group_rule_body( + security_group_id, direction, port_range_min, + port_range_max=None, protocol=None, ethertype='IPv4', cidr=None, + description="Kuryr-Kubernetes NetPolicy SG rule"): + if not port_range_min: + port_range_min = 1 + port_range_max = 65535 + elif not port_range_max: + port_range_max = port_range_min + if not protocol: + protocol = 'TCP' + security_group_rule_body = { + u'security_group_rule': { + u'ethertype': ethertype, + u'security_group_id': security_group_id, + u'description': description, + u'direction': direction, + u'protocol': protocol.lower(), + u'port_range_min': port_range_min, + u'port_range_max': port_range_max, + } + } + if cidr: + security_group_rule_body[u'security_group_rule'][ + u'remote_ip_prefix'] = cidr + LOG.debug("Creating sg rule body %s", security_group_rule_body) + return security_group_rule_body + + +@MEMOIZE +def get_pod_ip(pod): + vif = pod['metadata']['annotations'].get('openstack.org/kuryr-vif') + if vif is None: + return vif + vif = jsonutils.loads(vif) + vif = vif['versioned_object.data']['default_vif'] + network = (vif['versioned_object.data']['network'] + ['versioned_object.data']) + first_subnet = (network['subnets']['versioned_object.data'] + ['objects'][0]['versioned_object.data']) + first_subnet_ip = (first_subnet['ips']['versioned_object.data'] + ['objects'][0]['versioned_object.data']['address']) + return first_subnet_ip + + +def get_pod_annotated_labels(pod): + try: + annotations = pod['metadata']['annotations'] + pod_labels_annotation = annotations[constants.K8S_ANNOTATION_LABEL] + except KeyError: + return None + pod_labels = jsonutils.loads(pod_labels_annotation) + return pod_labels diff --git a/kuryr_kubernetes/controller/handlers/pod_label.py b/kuryr_kubernetes/controller/handlers/pod_label.py index 44dd47e1f..c745d2dc0 100644 --- a/kuryr_kubernetes/controller/handlers/pod_label.py +++ b/kuryr_kubernetes/controller/handlers/pod_label.py @@ -57,6 +57,8 @@ class PodLabelHandler(k8s_base.ResourceEventHandler): if current_pod_labels == previous_pod_labels: return + self._drv_sg.update_sg_rules(pod) + project_id = self._drv_project.get_project(pod) security_groups = self._drv_sg.get_security_groups(pod, project_id) self._drv_vif_pool.update_vif_sgs(pod, security_groups) diff --git a/kuryr_kubernetes/controller/handlers/vif.py b/kuryr_kubernetes/controller/handlers/vif.py index 0568b4476..4ba15542d 100644 --- a/kuryr_kubernetes/controller/handlers/vif.py +++ b/kuryr_kubernetes/controller/handlers/vif.py @@ -125,11 +125,14 @@ class VIFHandler(k8s_base.ResourceEventHandler): changed = True if changed: self._set_pod_state(pod, state) + self._drv_sg.create_sg_rules(pod) def on_deleted(self, pod): if driver_utils.is_host_network(pod): return + project_id = self._drv_project.get_project(pod) + self._drv_sg.delete_sg_rules(pod) try: security_groups = self._drv_sg.get_security_groups(pod, project_id) except k_exc.ResourceNotReady: @@ -176,6 +179,14 @@ class VIFHandler(k8s_base.ResourceEventHandler): annotation = jsonutils.dumps(state_dict, sort_keys=True) LOG.debug("Setting VIFs annotation: %r", annotation) + labels = pod['metadata'].get('labels') + if not labels: + LOG.debug("Removing Label annotation: %r", labels) + labels_annotation = None + else: + labels_annotation = jsonutils.dumps(labels, sort_keys=True) + LOG.debug("Setting Labels annotation: %r", labels_annotation) + # NOTE(dulek): We don't care about compatibility with Queens format # here, as eventually all Kuryr services will be upgraded # and cluster will start working normally. Meanwhile @@ -184,5 +195,6 @@ class VIFHandler(k8s_base.ResourceEventHandler): k8s = clients.get_kubernetes_client() k8s.annotate(pod['metadata']['selfLink'], - {constants.K8S_ANNOTATION_VIF: annotation}, + {constants.K8S_ANNOTATION_VIF: annotation, + constants.K8S_ANNOTATION_LABEL: labels_annotation}, resource_version=pod['metadata']['resourceVersion']) diff --git a/kuryr_kubernetes/opts.py b/kuryr_kubernetes/opts.py index a2507df0b..28de8e95e 100644 --- a/kuryr_kubernetes/opts.py +++ b/kuryr_kubernetes/opts.py @@ -18,6 +18,7 @@ from kuryr_kubernetes.cni import health as cni_health from kuryr_kubernetes import config from kuryr_kubernetes.controller.drivers import namespace_security_groups from kuryr_kubernetes.controller.drivers import namespace_subnet +from kuryr_kubernetes.controller.drivers import utils as driver_utils from kuryr_kubernetes.controller.drivers import vif_pool from kuryr_kubernetes.controller.handlers import namespace from kuryr_kubernetes.controller.handlers import policy @@ -47,6 +48,7 @@ _kuryr_k8s_opts = [ ('namespace_handler_caching', namespace.namespace_handler_caching_opts), ('np_handler_caching', policy.np_handler_caching_opts), ('vif_handler_caching', vif.vif_handler_caching_opts), + ('pod_ip_caching', driver_utils.pod_ip_caching_opts), ] diff --git a/kuryr_kubernetes/tests/unit/controller/drivers/test_network_policy.py b/kuryr_kubernetes/tests/unit/controller/drivers/test_network_policy.py index 563e29bf4..cce530383 100644 --- a/kuryr_kubernetes/tests/unit/controller/drivers/test_network_policy.py +++ b/kuryr_kubernetes/tests/unit/controller/drivers/test_network_policy.py @@ -119,6 +119,7 @@ class TestNetworkPolicyDriver(test_base.TestCase): 'security_group_id': self._sg_id, 'id': mock.sentinel.id }}], + 'networkpolicy_spec': self._policy['spec'], 'securityGroupId': self._sg_id, 'securityGroupName': mock.sentinel.sg_name}} @@ -245,8 +246,8 @@ class TestNetworkPolicyDriver(test_base.TestCase): self._driver.create_security_group_rules_from_network_policy, self._policy, self._project_id) - @mock.patch.object(network_policy.NetworkPolicyDriver, - '_create_security_group_rule') + @mock.patch('kuryr_kubernetes.controller.drivers.utils.' + 'create_security_group_rule') @mock.patch.object(network_policy.NetworkPolicyDriver, 'get_kuryrnetpolicy_crd') @mock.patch.object(network_policy.NetworkPolicyDriver, @@ -261,8 +262,8 @@ class TestNetworkPolicyDriver(test_base.TestCase): policy) m_parse.assert_called_with(policy, self._sg_id) - @mock.patch.object(network_policy.NetworkPolicyDriver, - '_create_security_group_rule') + @mock.patch('kuryr_kubernetes.controller.drivers.utils.' + 'create_security_group_rule') @mock.patch.object(network_policy.NetworkPolicyDriver, 'get_kuryrnetpolicy_crd') @mock.patch.object(network_policy.NetworkPolicyDriver, @@ -313,8 +314,8 @@ class TestNetworkPolicyDriver(test_base.TestCase): @mock.patch.object(network_policy.NetworkPolicyDriver, '_get_namespaces_cidr') - @mock.patch.object(network_policy.NetworkPolicyDriver, - '_create_security_group_rule_body') + @mock.patch('kuryr_kubernetes.controller.drivers.utils.' + 'create_security_group_rule_body') def test_parse_network_policy_rules_with_rules(self, m_create, m_get_ns_cidr): subnet_cidr = '10.10.0.0/24' @@ -325,8 +326,8 @@ class TestNetworkPolicyDriver(test_base.TestCase): @mock.patch.object(network_policy.NetworkPolicyDriver, '_get_namespaces_cidr') - @mock.patch.object(network_policy.NetworkPolicyDriver, - '_create_security_group_rule_body') + @mock.patch('kuryr_kubernetes.controller.drivers.utils.' + 'create_security_group_rule_body') def test_parse_network_policy_rules_with_no_rules(self, m_create, m_get_ns_cidr): policy = self._policy.copy() @@ -342,8 +343,8 @@ class TestNetworkPolicyDriver(test_base.TestCase): @mock.patch.object(network_policy.NetworkPolicyDriver, '_get_namespaces_cidr') - @mock.patch.object(network_policy.NetworkPolicyDriver, - '_create_security_group_rule_body') + @mock.patch('kuryr_kubernetes.controller.drivers.utils.' + 'create_security_group_rule_body') def test_parse_network_policy_rules_with_no_pod_selector(self, m_create, m_get_ns_cidr): policy = self._policy.copy() @@ -357,23 +358,23 @@ class TestNetworkPolicyDriver(test_base.TestCase): @mock.patch.object(network_policy.NetworkPolicyDriver, '_get_namespaces_cidr') - @mock.patch.object(network_policy.NetworkPolicyDriver, - '_create_security_group_rule_body') + @mock.patch('kuryr_kubernetes.controller.drivers.utils.' + 'create_security_group_rule_body') def test_parse_network_policy_rules_with_no_ports(self, m_create, m_get_ns_cidr): subnet_cidr = '10.10.0.0/24' m_get_ns_cidr.return_value = [subnet_cidr] policy = self._policy.copy() + selectors = {'namespaceSelector': { + 'matchLabels': { + 'project': 'myproject'}}} policy['spec']['egress'] = [ {'to': - [{'namespaceSelector': { - 'matchLabels': { - 'project': 'myproject'}}}]}] + [selectors]}] policy['spec']['ingress'] = [ {'from': - [{'namespaceSelector': { - 'matchLabels': { - 'project': 'myproject'}}}]}] + [selectors]}] + selectors = {'namespace_selector': selectors['namespaceSelector']} self._driver.parse_network_policy_rules(policy, self._sg_id) m_get_ns_cidr.assert_called() calls = [mock.call(self._sg_id, 'ingress', port_range_min=1, diff --git a/kuryr_kubernetes/tests/unit/controller/drivers/test_network_policy_security_groups.py b/kuryr_kubernetes/tests/unit/controller/drivers/test_network_policy_security_groups.py index 7d2c6fefb..3ca030ceb 100644 --- a/kuryr_kubernetes/tests/unit/controller/drivers/test_network_policy_security_groups.py +++ b/kuryr_kubernetes/tests/unit/controller/drivers/test_network_policy_security_groups.py @@ -136,7 +136,10 @@ class TestNetworkPolicySecurityGroupsDriver(test_base.TestCase): 'namespace': self._namespace, 'labels': { 'run': 'demo', - 'environment': 'development'}}, + 'environment': 'development'}, + 'annotations': { + 'openstack.org/kuryr-pod-label': '{' + '"run": "demo","environment": "development"}'}}, 'spec': { 'containers': [{ 'image': 'kuryr/demo', @@ -163,6 +166,186 @@ class TestNetworkPolicySecurityGroupsDriver(test_base.TestCase): self._driver = ( network_policy_security_groups.NetworkPolicySecurityGroupsDriver()) + self._crd_sg_id = mock.sentinel.crd_sg_id + self._crd_without_rules = { + "apiVersion": "openstack.org/v1", + "kind": "KuryrNetPolicy", + "metadata": {"name": "np-test-network-policy", + "namespace": "default"}, + "spec": { + "egressSgRules": [], + "ingressSgRules": [], + "networkpolicy_spec": { + "ingress": [ + {"from": [ + {"namespaceSelector": { + "matchLabels": {"name": "dev"}}, + "podSelector": { + "matchLabels": {"tier": "backend"}}}], + "ports": [ + {"port": 6379, + "protocol": "TCP"}]}], + "podSelector": {"matchLabels": {"app": "demo"}}, + "policyTypes": ["Ingress"]}, + "podSelector": {"matchLabels": {"app": "demo"}}, + "securityGroupId": self._crd_sg_id}} + + self._pod_ip = mock.sentinel.pod_ip + self._pod_dev_namespace = { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': { + 'name': mock.sentinel.pod_name, + 'namespace': 'dev', + 'labels': { + 'tier': 'backend'}, + 'annotations': { + 'openstack.org/kuryr-pod-label': '{"tier": "backend"}'}}, + 'spec': { + 'containers': [{ + 'image': 'kuryr/demo', + 'imagePullPolicy': 'Always', + 'name': mock.sentinel.pod_name + }]}, + 'status': {'podIP': self._pod_ip}} + + self._sg_rule_body = { + u'security_group_rule': { + u'direction': 'ingress', + u'protocol': u'tcp', + u'description': 'Kuryr-Kubernetes NetPolicy SG rule', + u'ethertype': 'IPv4', + u'port_range_max': 6379, + u'security_group_id': self._crd_sg_id, + u'port_range_min': 6379, + u'remote_ip_prefix': self._pod_ip}} + + self._new_rule_id = mock.sentinel.id + self._crd_with_rule = { + "apiVersion": "openstack.org/v1", + "kind": "KuryrNetPolicy", + "metadata": {"name": "np-test-network-policy", + "namespace": "default"}, + "spec": { + "egressSgRules": [], + "ingressSgRules": [{ + "security_group_rule": { + "description": "Kuryr-Kubernetes NetPolicy SG rule", + "direction": "ingress", + "ethertype": "IPv4", + "id": self._new_rule_id, + "port_range_max": 6379, + "port_range_min": 6379, + "protocol": "tcp", + "remote_ip_prefix": self._pod_ip, + "security_group_id": self._crd_sg_id}}], + "networkpolicy_spec": { + "ingress": [ + {"from": [ + {"namespaceSelector": { + "matchLabels": {"name": "dev"}}, + "podSelector": { + "matchLabels": {"tier": "backend"}}}], + "ports": [ + {"port": 6379, + "protocol": "TCP"}]}], + "podSelector": {"matchLabels": {"app": "demo"}}, + "policyTypes": ["Ingress"]}, + "podSelector": {"matchLabels": {"app": "demo"}}, + "securityGroupId": self._crd_sg_id}} + + @mock.patch('kuryr_kubernetes.controller.drivers.utils.' + 'create_security_group_rule') + @mock.patch('kuryr_kubernetes.controller.drivers.utils.' + 'create_security_group_rule_body') + @mock.patch.object(network_policy_security_groups, + '_match_selector', return_value=True) + @mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_ip') + def test__create_sg_rules(self, m_get_pod_ip, + m_match_selector, + m_create_sg_rule_body, + m_create_sg_rule): + m_get_pod_ip.return_value = self._pod_ip + m_create_sg_rule_body.return_value = self._sg_rule_body + sgr_id = mock.sentinel.sgr_id + m_create_sg_rule.return_value = sgr_id + crd = self._crd_without_rules + pod = self._pod_dev_namespace + matched = False + new_sg_rule = self._sg_rule_body + + policy = crd['spec']['networkpolicy_spec'] + rule_list = policy.get('ingress', None) + crd_rules = crd['spec'].get('ingressSgRules') + + for rule_block in rule_list: + for rule in rule_block.get('from', []): + namespace_selector = rule.get('namespaceSelector') + pod_selector = rule.get('podSelector') + matched = network_policy_security_groups._create_sg_rules( + crd, pod, namespace_selector, pod_selector, rule_block, + crd_rules, 'ingress', matched) + new_sg_rule['namespaceSelector'] = namespace_selector + new_sg_rule['podSelector'] = pod_selector + new_sg_rule['security_group_rule']['id'] = sgr_id + m_match_selector.assert_called_once_with( + pod_selector, pod['metadata']['labels']) + m_get_pod_ip.assert_called_once_with(pod) + m_create_sg_rule_body.assert_called_once() + m_create_sg_rule.assert_called_once() + self.assertEqual([new_sg_rule], crd_rules) + self.assertEqual(matched, True) + + @mock.patch.object(network_policy_security_groups, + '_match_selector', return_value=False) + def test__create_sg_rules_no_match(self, m_match_selector): + crd = self._crd_without_rules + pod = self._pod2 + + policy = crd['spec']['networkpolicy_spec'] + rule_list = policy.get('ingress', None) + crd_rules = crd['spec'].get('ingressSgRules') + + for rule_block in rule_list: + for rule in rule_block.get('from', []): + namespace_selector = rule.get('namespaceSelector') + pod_selector = rule.get('podSelector') + matched = network_policy_security_groups._create_sg_rules( + crd, pod, namespace_selector, pod_selector, rule_block, + crd_rules, 'ingress', False) + self.assertEqual(matched, False) + + @mock.patch('kuryr_kubernetes.controller.drivers.utils.' + 'patch_kuryr_crd') + @mock.patch.object(network_policy_security_groups, + '_get_kuryrnetpolicy_crds') + @mock.patch('kuryr_kubernetes.controller.drivers.utils.' + 'delete_security_group_rule') + @mock.patch('kuryr_kubernetes.controller.drivers.utils.get_pod_ip') + def test_delete_sg_rules(self, m_get_pod_ip, m_delete_sg_rule, + m_get_knp_crds, m_patch_kuryr_crd): + crd = self._crd_with_rule + i_rule = crd['spec'].get('ingressSgRules')[0] + sgr_id = i_rule['security_group_rule'].get('id') + m_get_pod_ip.return_value = self._pod_ip + m_get_knp_crds.return_value = { + "apiVersion": "v1", + "items": [crd], + "kind": "List", + "metadata": { + "resourceVersion": "", + "selfLink": mock.sentinel.selfLink}} + i_rules = e_rules = [] + pod = self._pod_dev_namespace + + self._driver.delete_sg_rules(pod) + + m_get_knp_crds.assert_called_once() + m_get_pod_ip.assert_called_once_with(pod) + m_delete_sg_rule.assert_called_once_with(sgr_id) + m_patch_kuryr_crd.assert_called_with( + crd, i_rules, e_rules, crd['spec'].get('podSelector')) + @mock.patch('kuryr_kubernetes.config.CONF') @mock.patch.object(network_policy_security_groups, '_get_kuryrnetpolicy_crds')