Testing basic Network policy IPBlock functionality
Creating network policy with ipblock_cidr for ingress and egress and testing that appropriate Security group rules were created Change-Id: Id97a4a9c0a3e45300a18251ab30ca7dd72a415e0
This commit is contained in:
parent
b197432a1d
commit
4175a85a16
|
@ -79,26 +79,57 @@ class BaseKuryrScenarioTest(manager.NetworkScenarioTest):
|
|||
|
||||
@classmethod
|
||||
def create_network_policy(cls, name=None, namespace='default',
|
||||
match_labels=None):
|
||||
match_labels=None, match_expressions=None,
|
||||
ingress_port=None, ingress_port_protocol='TCP',
|
||||
ingress_ipblock_cidr=None,
|
||||
ingress_ipblock_except=[],
|
||||
egress_port=None, egress_port_protocol='TCP',
|
||||
egress_ipblock_cidr=None,
|
||||
egress_ipblock_except=[]):
|
||||
if not name:
|
||||
name = data_utils.rand_name(prefix='kuryr-network-policy')
|
||||
np = cls.k8s_client.V1NetworkPolicy()
|
||||
np = k8s_client.V1NetworkPolicy()
|
||||
np.kind = 'NetworkPolicy'
|
||||
np.api_version = 'networking.k8s.io/v1'
|
||||
np.metadata = cls.k8s_client.V1ObjectMeta(name=name,
|
||||
namespace=namespace)
|
||||
np.spec = cls.k8s_client.V1NetworkPolicySpec(
|
||||
egress=[cls.k8s_client.V1NetworkPolicyEgressRule(ports=None,
|
||||
to=None)],
|
||||
ingress=[cls.k8s_client.V1NetworkPolicyIngressRule(_from=None,
|
||||
ports=None)],
|
||||
pod_selector=cls.k8s_client.V1LabelSelector(
|
||||
match_expressions=None,
|
||||
np.metadata = k8s_client.V1ObjectMeta(name=name,
|
||||
namespace=namespace)
|
||||
to, _from = None, None
|
||||
if egress_ipblock_cidr:
|
||||
to = [k8s_client.V1NetworkPolicyPeer(
|
||||
ip_block=k8s_client.V1IPBlock(cidr=egress_ipblock_cidr,
|
||||
_except=egress_ipblock_except))]
|
||||
if ingress_ipblock_cidr:
|
||||
_from = [k8s_client.V1NetworkPolicyPeer(
|
||||
ip_block=k8s_client.V1IPBlock(cidr=ingress_ipblock_cidr,
|
||||
_except=ingress_ipblock_except))]
|
||||
if ingress_port:
|
||||
ingress_port = [k8s_client.V1NetworkPolicyPort(
|
||||
port=ingress_port, protocol=ingress_port_protocol)]
|
||||
if egress_port:
|
||||
egress_port = [k8s_client.V1NetworkPolicyPort(
|
||||
port=egress_port, protocol=egress_port_protocol)]
|
||||
|
||||
np.spec = k8s_client.V1NetworkPolicySpec(
|
||||
egress=[k8s_client.V1NetworkPolicyEgressRule(
|
||||
ports=egress_port,
|
||||
to=to)],
|
||||
ingress=[k8s_client.V1NetworkPolicyIngressRule(
|
||||
ports=ingress_port,
|
||||
_from=_from)],
|
||||
pod_selector=k8s_client.V1LabelSelector(
|
||||
match_expressions=match_expressions,
|
||||
match_labels=match_labels),
|
||||
policy_types=['Ingress', 'Egress'])
|
||||
return cls.k8s_client.NetworkingV1Api(
|
||||
|
||||
return k8s_client.NetworkingV1Api(
|
||||
).create_namespaced_network_policy(namespace=namespace, body=np)
|
||||
|
||||
@classmethod
|
||||
def list_security_group_rules(cls, security_group_id):
|
||||
rules = cls.os_admin.security_groups_client.show_security_group(
|
||||
security_group_id)['security_group']['security_group_rules']
|
||||
return rules
|
||||
|
||||
@classmethod
|
||||
def update_network_policy(cls, np):
|
||||
np_name = np.metadata.name
|
||||
|
|
|
@ -38,6 +38,46 @@ class TestNetworkPolicyScenario(base.BaseKuryrScenarioTest):
|
|||
raise cls.skipException('Network Policy driver and handler must '
|
||||
'be enabled to run this tests')
|
||||
|
||||
@decorators.idempotent_id('a9db5bc5-e921-4719-8201-5431537c86f8')
|
||||
def test_ipblock_network_policy_sg_rules(self):
|
||||
ingress_ipblock = "5.5.5.0/24"
|
||||
egress_ipblock = "4.4.4.0/24"
|
||||
namespace_name, namespace = self.create_namespace()
|
||||
self.addCleanup(self.delete_namespace, namespace_name)
|
||||
np = self.create_network_policy(namespace=namespace_name,
|
||||
ingress_ipblock_cidr=ingress_ipblock,
|
||||
egress_ipblock_cidr=egress_ipblock,
|
||||
ingress_port=2500)
|
||||
LOG.debug("Creating network policy %s", np)
|
||||
self.addCleanup(self.delete_network_policy, np.metadata.name,
|
||||
namespace_name)
|
||||
network_policy_name = np.metadata.name
|
||||
kuryr_netpolicy_crd_name = 'np-' + network_policy_name
|
||||
kuryrnetpolicies = None
|
||||
start = time.time()
|
||||
while time.time() - start < TIMEOUT_PERIOD:
|
||||
try:
|
||||
kuryrnetpolicies = self.get_kuryr_netpolicy_crds(
|
||||
name=kuryr_netpolicy_crd_name,
|
||||
namespace=namespace_name)
|
||||
break
|
||||
except kubernetes.client.rest.ApiException:
|
||||
time.sleep(1)
|
||||
continue
|
||||
self.assertIsNotNone(kuryrnetpolicies)
|
||||
sg_id = kuryrnetpolicies['spec']['securityGroupId']
|
||||
sec_group_rules = self.list_security_group_rules(sg_id)
|
||||
ingress_block_found, egress_block_found = False, False
|
||||
for rule in sec_group_rules:
|
||||
if (rule['direction'] == 'ingress' and
|
||||
rule['remote_ip_prefix'] == ingress_ipblock):
|
||||
ingress_block_found = True
|
||||
if (rule['direction'] == 'egress' and
|
||||
rule['remote_ip_prefix'] == egress_ipblock):
|
||||
egress_block_found = True
|
||||
self.assertTrue(ingress_block_found)
|
||||
self.assertTrue(egress_block_found)
|
||||
|
||||
@decorators.idempotent_id('24577a9b-1d29-409b-8b60-da3b49d776b1')
|
||||
def test_create_delete_network_policy(self):
|
||||
np = self.create_network_policy()
|
||||
|
|
Loading…
Reference in New Issue