Testing basic Network policy IPBlock functionality

Creating network policy with ipblock_cidr for ingress and egress
and testing that appropriate Security group rules were created

Change-Id: Id97a4a9c0a3e45300a18251ab30ca7dd72a415e0
This commit is contained in:
Genadi Chereshnya 2019-07-15 17:01:42 +03:00 committed by Itzik Brown
parent b197432a1d
commit 4175a85a16
2 changed files with 83 additions and 12 deletions

View File

@ -79,26 +79,57 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest):
@classmethod
def create_network_policy(cls, name=None, namespace='default',
match_labels=None):
match_labels=None, match_expressions=None,
ingress_port=None, ingress_port_protocol='TCP',
ingress_ipblock_cidr=None,
ingress_ipblock_except=[],
egress_port=None, egress_port_protocol='TCP',
egress_ipblock_cidr=None,
egress_ipblock_except=[]):
if not name:
name = data_utils.rand_name(prefix='kuryr-network-policy')
np = cls.k8s_client.V1NetworkPolicy()
np = k8s_client.V1NetworkPolicy()
np.kind = 'NetworkPolicy'
np.api_version = 'networking.k8s.io/v1'
np.metadata = cls.k8s_client.V1ObjectMeta(name=name,
namespace=namespace)
np.spec = cls.k8s_client.V1NetworkPolicySpec(
egress=[cls.k8s_client.V1NetworkPolicyEgressRule(ports=None,
to=None)],
ingress=[cls.k8s_client.V1NetworkPolicyIngressRule(_from=None,
ports=None)],
pod_selector=cls.k8s_client.V1LabelSelector(
match_expressions=None,
np.metadata = k8s_client.V1ObjectMeta(name=name,
namespace=namespace)
to, _from = None, None
if egress_ipblock_cidr:
to = [k8s_client.V1NetworkPolicyPeer(
ip_block=k8s_client.V1IPBlock(cidr=egress_ipblock_cidr,
_except=egress_ipblock_except))]
if ingress_ipblock_cidr:
_from = [k8s_client.V1NetworkPolicyPeer(
ip_block=k8s_client.V1IPBlock(cidr=ingress_ipblock_cidr,
_except=ingress_ipblock_except))]
if ingress_port:
ingress_port = [k8s_client.V1NetworkPolicyPort(
port=ingress_port, protocol=ingress_port_protocol)]
if egress_port:
egress_port = [k8s_client.V1NetworkPolicyPort(
port=egress_port, protocol=egress_port_protocol)]
np.spec = k8s_client.V1NetworkPolicySpec(
egress=[k8s_client.V1NetworkPolicyEgressRule(
ports=egress_port,
to=to)],
ingress=[k8s_client.V1NetworkPolicyIngressRule(
ports=ingress_port,
_from=_from)],
pod_selector=k8s_client.V1LabelSelector(
match_expressions=match_expressions,
match_labels=match_labels),
policy_types=['Ingress', 'Egress'])
return cls.k8s_client.NetworkingV1Api(
return k8s_client.NetworkingV1Api(
).create_namespaced_network_policy(namespace=namespace, body=np)
@classmethod
def list_security_group_rules(cls, security_group_id):
rules = cls.os_admin.security_groups_client.show_security_group(
security_group_id)['security_group']['security_group_rules']
return rules
@classmethod
def update_network_policy(cls, np):
np_name = np.metadata.name

View File

@ -38,6 +38,46 @@ class TestNetworkPolicyScenario(base.BaseKuryrScenarioTest):
raise cls.skipException('Network Policy driver and handler must '
'be enabled to run this tests')
@decorators.idempotent_id('a9db5bc5-e921-4719-8201-5431537c86f8')
def test_ipblock_network_policy_sg_rules(self):
ingress_ipblock = "5.5.5.0/24"
egress_ipblock = "4.4.4.0/24"
namespace_name, namespace = self.create_namespace()
self.addCleanup(self.delete_namespace, namespace_name)
np = self.create_network_policy(namespace=namespace_name,
ingress_ipblock_cidr=ingress_ipblock,
egress_ipblock_cidr=egress_ipblock,
ingress_port=2500)
LOG.debug("Creating network policy %s", np)
self.addCleanup(self.delete_network_policy, np.metadata.name,
namespace_name)
network_policy_name = np.metadata.name
kuryr_netpolicy_crd_name = 'np-' + network_policy_name
kuryrnetpolicies = None
start = time.time()
while time.time() - start < TIMEOUT_PERIOD:
try:
kuryrnetpolicies = self.get_kuryr_netpolicy_crds(
name=kuryr_netpolicy_crd_name,
namespace=namespace_name)
break
except kubernetes.client.rest.ApiException:
time.sleep(1)
continue
self.assertIsNotNone(kuryrnetpolicies)
sg_id = kuryrnetpolicies['spec']['securityGroupId']
sec_group_rules = self.list_security_group_rules(sg_id)
ingress_block_found, egress_block_found = False, False
for rule in sec_group_rules:
if (rule['direction'] == 'ingress' and
rule['remote_ip_prefix'] == ingress_ipblock):
ingress_block_found = True
if (rule['direction'] == 'egress' and
rule['remote_ip_prefix'] == egress_ipblock):
egress_block_found = True
self.assertTrue(ingress_block_found)
self.assertTrue(egress_block_found)
@decorators.idempotent_id('24577a9b-1d29-409b-8b60-da3b49d776b1')
def test_create_delete_network_policy(self):
np = self.create_network_policy()