From 28fbe115ebd97760b128d4c9efd21d7d426922d3 Mon Sep 17 00:00:00 2001 From: Itzik Brown Date: Tue, 29 Sep 2020 12:48:08 +0300 Subject: [PATCH] Add test_network_policy_add_remove_pod - Create a service and check connectivity to the service - Create network policies with a podSelector with labels of the pods in the service as well as for the tester pod and check connectivity to the service - Deleting one of the service pods and creating a new one - Check connectivity to the service and verify only one pod answering - Create a new pod with the same label as the one we deleted - Check connectivity to the service, expecting two pods answering Also adds an option to create_network_policy to apply a rule to allow traffic only from pods with specific labels Change-Id: Ibf8113d9667c02a1b6d080aa50e91fde0809fd75 Depends-On: If23b311ed07578b3fbe85f46aa4a314e6a05b7f3 --- kuryr_tempest_plugin/tests/scenario/base.py | 58 +++++- .../tests/scenario/base_network_policy.py | 188 +++++++++++++++++- 2 files changed, 234 insertions(+), 12 deletions(-) diff --git a/kuryr_tempest_plugin/tests/scenario/base.py b/kuryr_tempest_plugin/tests/scenario/base.py index 58f670c3..cba8e1ad 100644 --- a/kuryr_tempest_plugin/tests/scenario/base.py +++ b/kuryr_tempest_plugin/tests/scenario/base.py @@ -93,7 +93,9 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest): ingress_ipblock_except=[], egress_port=None, egress_port_protocol='TCP', egress_ipblock_cidr=None, - egress_ipblock_except=[]): + egress_ipblock_except=[], + ingress_match_expressions=None, + egress_match_expressions=None): if not name: name = data_utils.rand_name(prefix='kuryr-network-policy') np = k8s_client.V1NetworkPolicy() @@ -101,15 +103,15 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest): np.api_version = 'networking.k8s.io/v1' np.metadata = k8s_client.V1ObjectMeta(name=name, namespace=namespace) - to, _from = None, None + to, _from = [], [] if egress_ipblock_cidr: - to = [k8s_client.V1NetworkPolicyPeer( + to.append(k8s_client.V1NetworkPolicyPeer( ip_block=k8s_client.V1IPBlock(cidr=egress_ipblock_cidr, - _except=egress_ipblock_except))] + _except=egress_ipblock_except))) if ingress_ipblock_cidr: - _from = [k8s_client.V1NetworkPolicyPeer( + _from.append(k8s_client.V1NetworkPolicyPeer( ip_block=k8s_client.V1IPBlock(cidr=ingress_ipblock_cidr, - _except=ingress_ipblock_except))] + _except=ingress_ipblock_except))) if ingress_port: ingress_port = [k8s_client.V1NetworkPolicyPort( port=ingress_port, protocol=ingress_port_protocol)] @@ -117,6 +119,14 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest): egress_port = [k8s_client.V1NetworkPolicyPort( port=egress_port, protocol=egress_port_protocol)] + if ingress_match_expressions: + _from.append(k8s_client.V1NetworkPolicyPeer( + pod_selector=k8s_client.V1LabelSelector( + match_expressions=ingress_match_expressions))) + if egress_match_expressions: + to.append(k8s_client.V1NetworkPolicyPeer( + pod_selector=k8s_client.V1LabelSelector( + match_expressions=egress_match_expressions))) np.spec = k8s_client.V1NetworkPolicySpec( egress=[k8s_client.V1NetworkPolicyEgressRule( ports=egress_port, @@ -746,6 +756,10 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest): return [pod.metadata.name for pod in self.get_pod_list( namespace=namespace, label_selector=label_selector)] + def get_pod_ip_list(self, namespace='default', label_selector=''): + return [pod.status.pod_ip for pod in self.get_pod_list( + namespace=namespace, label_selector=label_selector)] + def get_controller_pod_names(self): controller_label = CONF.kuryr_kubernetes.controller_label controller_pod_names = self.get_pod_name_list( @@ -1200,21 +1214,43 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest): def check_service_internal_connectivity(self, service_port='80', protocol='TCP', namespace='default', + labels=None, + pod_num=None, + pod_name=None, cleanup=True): + """Verify client pod to service connectivity + + Create a pod unless a value for the pod_name parameter is provided and + check connectivity to a service from that pod. + + :param service_port - The port of the service we check + :param protocol - The service protocol we check + :namespace - The namespace of the client pod + :param labels - The labels of the client pod + :param pod_num - The number of pods expected to serve the service + :param pod_name - If supplied no pod will be created and instead a pod + with this name will be used + :param cleanup - Whether to add a cleanup function for the created pod + :returns: The name of the client pod that was created or passed to + the function + """ # FIXME(itzikb): Use the clusterIP to # check service status as there are some issues with the FIPs # and OVN gates clusterip_svc_ip = self.get_service_ip(self.service_name, spec_type='ClusterIP', namespace=namespace) - pod_name, pod = self.create_pod(namespace=namespace) - if cleanup: - self.addClassResourceCleanup(self.delete_pod, pod_name, - namespace=namespace) + pod_num = pod_num or self.pod_num + if not pod_name: + pod_name, _ = self.create_pod(namespace=namespace, labels=labels) + if cleanup: + self.addClassResourceCleanup(self.delete_pod, pod_name, + namespace=namespace) self.assert_backend_amount_from_pod( clusterip_svc_ip, - self.pod_num, + pod_num, pod_name, service_port, protocol, namespace_name=namespace) + return pod_name diff --git a/kuryr_tempest_plugin/tests/scenario/base_network_policy.py b/kuryr_tempest_plugin/tests/scenario/base_network_policy.py index d8bfa3fa..c06d85ec 100644 --- a/kuryr_tempest_plugin/tests/scenario/base_network_policy.py +++ b/kuryr_tempest_plugin/tests/scenario/base_network_policy.py @@ -27,7 +27,7 @@ from kuryr_tempest_plugin.tests.scenario import consts LOG = logging.getLogger(__name__) CONF = config.CONF -TIMEOUT_PERIOD = 120 +TIMEOUT_PERIOD = 180 class TestNetworkPolicyScenario(base.BaseKuryrScenarioTest): @@ -39,6 +39,59 @@ class TestNetworkPolicyScenario(base.BaseKuryrScenarioTest): raise cls.skipException('Network Policy driver and handler must ' 'be enabled to run this tests') + def get_sg_rules_for_np(self, namespace, network_policy_name): + start = time.time() + while time.time() - start < TIMEOUT_PERIOD: + try: + time.sleep(1) + sg_id, _ = self.get_np_crd_info(name=network_policy_name, + namespace=namespace) + if sg_id: + break + except kubernetes.client.rest.ApiException: + continue + self.assertIsNotNone(sg_id) + return self.list_security_group_rules(sg_id) + + def check_sg_rules_for_np(self, namespace, np, + ingress_cidrs_should_exist=(), + egress_cidrs_should_exist=(), + ingress_cidrs_shouldnt_exist=(), + egress_cidrs_shouldnt_exist=()): + ingress_cidrs_found = set() + egress_cidrs_found = set() + ingress_cidrs_should_exist = set(ingress_cidrs_should_exist) + egress_cidrs_should_exist = set(egress_cidrs_should_exist) + ingress_cidrs_shouldnt_exist = set(ingress_cidrs_shouldnt_exist) + egress_cidrs_shouldnt_exist = set(egress_cidrs_shouldnt_exist) + + rules_match = False + start = time.time() + + while ((not rules_match) and (time.time() - start < TIMEOUT_PERIOD)): + sg_rules = self.get_sg_rules_for_np(namespace, np) + + for rule in sg_rules: + if rule['direction'] == 'ingress': + ingress_cidrs_found.add(rule['remote_ip_prefix']) + elif rule['direction'] == 'egress': + egress_cidrs_found.add(rule['remote_ip_prefix']) + + if (ingress_cidrs_should_exist.issubset(ingress_cidrs_found) + and (not ingress_cidrs_shouldnt_exist + or not ingress_cidrs_shouldnt_exist.issubset( + ingress_cidrs_found)) + and egress_cidrs_should_exist.issubset(egress_cidrs_found) + and (not egress_cidrs_shouldnt_exist + or not egress_cidrs_shouldnt_exist.issubset( + egress_cidrs_found))): + rules_match = True + else: + time.sleep(10) + if not rules_match: + msg = 'Timed out waiting sg rules for np %s to match' % np + raise lib_exc.TimeoutException(msg) + @decorators.idempotent_id('a9db5bc5-e921-4719-8201-5431537c86f8') @decorators.unstable_test(bug="1860554") def test_ipblock_network_policy_sg_rules(self): @@ -374,3 +427,136 @@ class TestNetworkPolicyScenario(base.BaseKuryrScenarioTest): time.sleep(1) if time.time() - start >= TIMEOUT_PERIOD: raise lib_exc.TimeoutException('Sec group ID still exists') + + @decorators.idempotent_id('a93b5bc5-e931-4719-8201-54315c5c86f8') + def test_network_policy_add_remove_pod(self): + np_name_server = 'allow-all-server' + np_name_client = 'allow-all-client' + server_label = {'app': 'server'} + client_label = {'app': 'client'} + namespace_name, namespace = self.create_namespace() + self.addCleanup(self.delete_namespace, namespace_name) + + self.create_setup_for_service_test(label='server', + namespace=namespace_name, + cleanup=False) + LOG.debug("A service %s and two pods were created in namespace %s", + self.service_name, namespace_name) + service_pods = self.get_pod_list(namespace=namespace_name, + label_selector='app=server') + service_pods_cidrs = [pod.status.pod_ip+'/32' for pod in service_pods] + (first_server_pod_cidr, first_server_pod_name) = ( + service_pods[0].status.pod_ip+"/32", + service_pods[0].metadata.name) + client_pod_name = self.check_service_internal_connectivity( + namespace=namespace_name, + labels=client_label, + cleanup=False) + client_pod_ip = self.get_pod_ip(client_pod_name, + namespace=namespace_name) + client_pod_cidr = client_pod_ip + "/32" + LOG.debug("Client pod %s was created", client_pod_name) + LOG.debug("Connection to service %s from %s was successful", + self.service_name, client_pod_name) + # Check connectivity in the same namespace + connect_to_service_cmd = ["/bin/sh", "-c", "curl {dst_ip}".format( + dst_ip=self.service_ip)] + blocked_pod, _ = self.create_pod(namespace=namespace_name) + self.assertIn(consts.POD_OUTPUT, + self.exec_command_in_pod(blocked_pod, + connect_to_service_cmd, + namespace_name)) + + pods_server_match_expression = {'key': 'app', + 'operator': 'In', + 'values': ['server']} + pods_client_match_expression = {'key': 'app', + 'operator': 'In', + 'values': ['client']} + np_server = self.create_network_policy( + name=np_name_server, + namespace=namespace_name, + match_labels=server_label, + ingress_match_expressions=[pods_client_match_expression], + egress_match_expressions=[pods_client_match_expression]) + LOG.debug("Network policy %s with match expression %s was created", + np_server, pods_server_match_expression) + self.addCleanup(self.delete_network_policy, np_server.metadata.name, + namespace_name) + np_client = self.create_network_policy( + name=np_name_client, + namespace=namespace_name, + match_labels=client_label, + ingress_match_expressions=[pods_server_match_expression], + egress_match_expressions=[pods_server_match_expression]) + LOG.debug("Network policy %s with match expression %s was created", + np_client, pods_client_match_expression) + self.addCleanup(self.delete_network_policy, np_client.metadata.name, + namespace_name) + self.check_sg_rules_for_np( + namespace_name, np_name_server, + ingress_cidrs_should_exist=[client_pod_cidr], + egress_cidrs_should_exist=[client_pod_cidr], + ingress_cidrs_shouldnt_exist=[], + egress_cidrs_shouldnt_exist=[]) + self.check_sg_rules_for_np( + namespace_name, np_name_client, + ingress_cidrs_should_exist=service_pods_cidrs, + egress_cidrs_should_exist=service_pods_cidrs, + ingress_cidrs_shouldnt_exist=[], + egress_cidrs_shouldnt_exist=[]) + self.check_service_internal_connectivity(namespace=namespace_name, + pod_name=client_pod_name) + LOG.debug("Connection to service %s from %s was successful after " + "network policy was applied", + self.service_name, client_pod_name) + + # Check no connectivity from a pod not in the NP + self.assertNotIn(consts.POD_OUTPUT, + self.exec_command_in_pod(blocked_pod, + connect_to_service_cmd, + namespace_name)) + + self.delete_pod(first_server_pod_name, namespace=namespace_name) + LOG.debug("Deleted pod %s from service %s", + first_server_pod_name, self.service_name) + self.verify_lbaas_endpoints_configured(self.service_name, + 1, namespace_name) + self.check_service_internal_connectivity(namespace=namespace_name, + pod_name=client_pod_name, + pod_num=1) + LOG.debug("Connection to service %s with one pod from %s was " + "successful", self.service_name, client_pod_name) + + pod_name, pod = self.create_pod(labels=server_label, + namespace=namespace_name) + LOG.debug("Pod server %s with label %s was created", + pod_name, server_label) + self.verify_lbaas_endpoints_configured(self.service_name, + 2, namespace_name) + service_pods = self.get_pod_list(namespace=namespace_name, + label_selector='app=server') + service_pods_cidrs = [pod.status.pod_ip+'/32' for pod in service_pods] + self.check_sg_rules_for_np( + namespace_name, np_name_server, + ingress_cidrs_should_exist=[client_pod_cidr], + egress_cidrs_should_exist=[client_pod_cidr], + ingress_cidrs_shouldnt_exist=[], + egress_cidrs_shouldnt_exist=[]) + self.check_sg_rules_for_np( + namespace_name, np_name_client, + ingress_cidrs_should_exist=service_pods_cidrs, + egress_cidrs_should_exist=service_pods_cidrs, + ingress_cidrs_shouldnt_exist=[ + first_server_pod_cidr], + egress_cidrs_shouldnt_exist=[ + first_server_pod_cidr]) + self.check_service_internal_connectivity(namespace=namespace_name, + pod_name=client_pod_name) + LOG.debug("Connection to service %s from %s was successful", + self.service_name, client_pod_name) + # Check no connectivity from a pod not in the NP + self.assertNotIn(consts.POD_OUTPUT, + self.exec_command_in_pod(blocked_pod, + connect_to_service_cmd, + namespace_name))