Namespace isolation tempest coverage

It creates a new test to ensure proper isolation between namespaces.
It creates different pods in different namespaces and check that
traffic is block between them, unless it is the default namespace,
which can reach all the namespaces and be reached by all of them.

Depends-On: Ibf63841b2a6b0c339c4c76980f1489e26af016d7
Implements: blueprint openshift-project-isolation-support

Change-Id: Ie72c93564f38bc51a3abd009085beda92430daea
This commit is contained in:
Luis Tomas Bolivar 2018-07-06 16:48:48 +02:00
parent f02385f453
commit 08ba88d048
1 changed files with 82 additions and 3 deletions

View File

@ -72,8 +72,10 @@ class TestNamespaceScenario(base.BaseKuryrScenarioTest):
pod_name, pod = self.create_pod(labels={"app": 'pod-label'},
namespace=namespace_name)
svc_name, _ = self.create_service(pod_label=pod.metadata.labels,
spec_type='LoadBalancer',
namespace=namespace_name)
svc_service_ip = self.get_service_ip(service_name=svc_name,
spec_type='LoadBalancer',
namespace=namespace_name)
self.wait_service_status(svc_service_ip,
CONF.kuryr_kubernetes.lb_build_timeout)
@ -87,19 +89,96 @@ class TestNamespaceScenario(base.BaseKuryrScenarioTest):
raise lib_exc.UnexpectedResponseCode()
# Check resources are deleted
self.delete_namespace(namespace_name)
self._delete_namespace_resources(namespace_name, kuryr_net_crd_name,
subnet_name)
def test_namespace_sg_isolation(self):
# Check security group resources are created
ns1_name, ns1 = self.create_namespace()
ns2_name, ns2 = self.create_namespace()
existing_namespaces = [ns.metadata.name
for ns in self.list_namespaces().items]
self.assertIn(ns1_name, existing_namespaces)
self.assertIn(ns2_name, existing_namespaces)
self.assertIn('default', existing_namespaces)
subnet_ns1_name = 'ns/' + ns1_name + '-subnet'
subnet_ns2_name = 'ns/' + ns2_name + '-subnet'
net_crd_ns1_name = 'ns-' + ns1_name
net_crd_ns2_name = 'ns-' + ns2_name
net_crd_ns1 = self.get_kuryr_net_crds(net_crd_ns1_name)
net_crd_ns2 = self.get_kuryr_net_crds(net_crd_ns2_name)
self.assertIn(net_crd_ns1_name, net_crd_ns1['metadata']['name'])
self.assertIn(net_crd_ns2_name, net_crd_ns2['metadata']['name'])
seen_sgs = self.os_admin.security_groups_client.list_security_groups()
seen_sg_ids = [sg['id'] for sg in seen_sgs['security_groups']]
self.assertIn(net_crd_ns1['spec']['sgId'], seen_sg_ids)
self.assertIn(net_crd_ns2['spec']['sgId'], seen_sg_ids)
# Create pods in different namespaces
pod_ns1_name, pod_ns1 = self.create_pod(labels={"app": 'pod-label'},
namespace=ns1_name)
pod_ns2_name, pod_ns2 = self.create_pod(labels={"app": 'pod-label'},
namespace=ns2_name)
pod_nsdefault_name, pod_nsdefault = self.create_pod(
labels={"app": 'pod-label'}, namespace='default')
self.addCleanup(self.delete_pod, pod_nsdefault_name)
# Check namespace pod connectivity and isolation
pod_ns2_ip = self.get_pod_ip(pod_ns2_name, ns2_name)
pod_nsdefault_ip = self.get_pod_ip(pod_nsdefault_name)
# check connectivity from NS1 to default
cmd = ["/bin/sh", "-c", "curl {dst_ip}:8080".format(
dst_ip=pod_nsdefault_ip)]
self.assertIn('HELLO! I AM ALIVE!!!',
self.exec_command_in_pod(pod_ns1_name, cmd, ns1_name))
# check no connectivity from NS1 to NS2
cmd = ["/bin/sh", "-c", "curl {dst_ip}:8080".format(
dst_ip=pod_ns2_ip)]
self.assertNotIn('HELLO! I AM ALIVE!!!',
self.exec_command_in_pod(pod_ns1_name, cmd, ns1_name))
# check connectivity from default to NS2
cmd = ["/bin/sh", "-c", "curl {dst_ip}:8080".format(
dst_ip=pod_ns2_ip)]
self.assertIn('HELLO! I AM ALIVE!!!',
self.exec_command_in_pod(pod_nsdefault_name, cmd))
self._delete_namespace_resources(ns1_name, net_crd_ns1_name,
subnet_ns1_name)
self._delete_namespace_resources(ns2_name, net_crd_ns2_name,
subnet_ns2_name)
def _delete_namespace_resources(self, namespace, net_crd, subnet):
# Check resources are deleted
self.delete_namespace(namespace)
while True:
time.sleep(1)
try:
self.get_kuryr_net_crds(kuryr_net_crd_name)
self.get_kuryr_net_crds(net_crd)
except kubernetes.client.rest.ApiException:
break
existing_namespaces = [ns.metadata.name
for ns in self.list_namespaces().items]
self.assertNotIn(namespace, existing_namespaces)
seen_subnets = self.os_admin.subnets_client.list_subnets()
seen_subnet_names = [n['name'] for n in seen_subnets['subnets']]
self.assertNotIn(subnet, seen_subnet_names)
self.assertNotIn(subnet_name, seen_subnet_names)
seen_sgs = self.os_admin.security_groups_client.list_security_groups()
seen_sg_ids = [sg['id'] for sg in seen_sgs['security_groups']]
if net_crd['spec'].get('sgId', None):
self.assertNotIn(net_crd['spec']['sgId'], seen_sg_ids)