Merge "IPv4 and IPv6 Security Groups Scenario Tests"
This commit is contained in:
commit
1c6e1328bd
|
@ -42,6 +42,7 @@ from cloudcafe.networking.networks.extensions.security_groups_api.composites \
|
|||
import SecurityGroupsComposite
|
||||
from cloudcafe.networking.networks.extensions.security_groups_api.models.\
|
||||
response import SecurityGroup, SecurityGroupRule
|
||||
from cloudcafe.networking.networks.personas import ServerPersona
|
||||
|
||||
|
||||
class NetworkingFixture(BaseTestFixture):
|
||||
|
@ -991,6 +992,9 @@ class NetworkingComputeFixture(NetworkingSecurityGroupsFixture):
|
|||
# Other reusable values
|
||||
cls.flavor_ref = cls.flavors.config.primary_flavor
|
||||
cls.image_ref = cls.images.config.primary_image
|
||||
cls.ssh_username = (cls.images.config.primary_image_default_user or
|
||||
'root')
|
||||
cls.auth_strategy = cls.servers.config.instance_auth_strategy or 'key'
|
||||
|
||||
cls.delete_servers = []
|
||||
cls.failed_servers = []
|
||||
|
@ -1042,22 +1046,183 @@ class NetworkingComputeFixture(NetworkingSecurityGroupsFixture):
|
|||
|
||||
# Using rand_name to avoid HTTP 409 Conflict due to duplicate names
|
||||
name = rand_name(name)
|
||||
cls.fixture_log.info('Creating test server keypair')
|
||||
resp = cls.keypairs.client.create_keypair(name)
|
||||
msg = ('Unable to create server keypair: received HTTP {0} instead of '
|
||||
'the expected HTTP {1} response').format(
|
||||
resp.status_code, ComputeResponseCodes.CREATE_KEYPAIR)
|
||||
assert resp.status_code == ComputeResponseCodes.CREATE_KEYPAIR, msg
|
||||
cls.delete_keypairs.append(resp.entity.name)
|
||||
return resp.entity
|
||||
|
||||
def verify_remote_client_auth(self, server, remote_client,
|
||||
sec_group=None):
|
||||
msg = ('Remote client unable to authenticate for server {0} {1} '
|
||||
'with security group: {2}').format(server.name, server.id,
|
||||
sec_group)
|
||||
self.assertTrue(remote_client.can_authenticate(), msg)
|
||||
@classmethod
|
||||
def create_server_network(cls, name, ipv4=False, ipv6=False):
|
||||
"""
|
||||
@summary: Create an isolated network
|
||||
@param name: network name
|
||||
@type name: str
|
||||
@param ipv4: flag to create network with IPv4 subnet
|
||||
@type ipv4: bool
|
||||
@param ipv6: flag to create network with IPv6 subnet
|
||||
@type ipv6: bool
|
||||
"""
|
||||
|
||||
net_msg = 'Creating {0} isolated network'.format(name)
|
||||
cls.fixture_log.info(net_msg)
|
||||
net_req = cls.networks.behaviors.create_network(name=name)
|
||||
network = net_req.response.entity
|
||||
cls.delete_networks.append(network.id)
|
||||
|
||||
if ipv4:
|
||||
cls.fixture_log.info('Creating IPv4 subnet')
|
||||
cls.subnets.behaviors.create_subnet(network_id=network.id,
|
||||
ip_version=4)
|
||||
if ipv6:
|
||||
cls.fixture_log.info('Creating IPv6 subnet')
|
||||
cls.subnets.behaviors.create_subnet(network_id=network.id,
|
||||
ip_version=6)
|
||||
|
||||
return network
|
||||
|
||||
@classmethod
|
||||
def create_multiple_servers(cls, server_names, keypair_name=None,
|
||||
networks=None, pnet=True, snet=True):
|
||||
"""
|
||||
@summary: Create multiple test servers
|
||||
@param server_names: names of servers to create
|
||||
@type server_names: list(str)
|
||||
@param keypair_name: (optional) keypair to create servers with
|
||||
@type keypair_name: str
|
||||
@param networks: (optional) isolated network ids to create servers with
|
||||
@type networks: list(str)
|
||||
@param pnet: flag to create server with public network
|
||||
@type pnet: bool
|
||||
@param snet: flag to create server with service (private) network
|
||||
@type snet: bool
|
||||
@return: server entity objects
|
||||
@rtype: dict(server name: server entity object)
|
||||
"""
|
||||
|
||||
cls.fixture_log.debug('Defining the network IDs to be used')
|
||||
network_ids = []
|
||||
|
||||
if pnet:
|
||||
network_ids.append(cls.public_network_id)
|
||||
if snet:
|
||||
network_ids.append(cls.service_network_id)
|
||||
if networks:
|
||||
network_ids.extend(networks)
|
||||
|
||||
# Response dict where the key will be the server name and the value the
|
||||
# server entity object
|
||||
servers = dict()
|
||||
server_ids = []
|
||||
for name in server_names:
|
||||
server = cls.create_test_server(name=name, key_name=keypair_name,
|
||||
network_ids=network_ids,
|
||||
active_server=False)
|
||||
server_ids.append(server.id)
|
||||
servers[name] = server
|
||||
|
||||
# Waiting for the servers to be active
|
||||
cls.net.behaviors.wait_for_servers_to_be_active(
|
||||
server_id_list=server_ids)
|
||||
|
||||
return servers
|
||||
|
||||
@classmethod
|
||||
def create_multiple_personas(cls, persona_servers, persona_kwargs=None):
|
||||
"""
|
||||
@summary: Create multiple server personas
|
||||
@param persona_servers: servers to create personas from
|
||||
@type persona_servers: dict(persona_label: server)
|
||||
@param persona_kwargs: (optional) server persona attributes, excluding
|
||||
the server one that is at the persona_servers
|
||||
@type persona_kwargs: dict
|
||||
@return: servers personas
|
||||
@rtype: dict(persona_label: persona)
|
||||
"""
|
||||
|
||||
# In case serer personas are created with default values
|
||||
if not persona_kwargs:
|
||||
persona_kwargs = dict()
|
||||
|
||||
# Response dict where the key will be the persona label and the value
|
||||
# the persona object
|
||||
personas = dict()
|
||||
for persona_label, persona_server in persona_servers.items():
|
||||
persona_kwargs.update(server=persona_server)
|
||||
server_persona = ServerPersona(**persona_kwargs)
|
||||
personas[persona_label] = server_persona
|
||||
|
||||
return personas
|
||||
|
||||
@classmethod
|
||||
def update_server_ports_w_sec_groups(cls, port_ids, security_groups,
|
||||
raise_exception=True):
|
||||
"""
|
||||
@summary: Updates server ports with security groups
|
||||
@param port_ids: ports to update
|
||||
@type port_ids: list(str)
|
||||
@param security_groups: security groups to add to the ports
|
||||
@type security_groups: list(str)
|
||||
@param raise_exception: raise exception port was not updated
|
||||
@type raise_exception: bool
|
||||
"""
|
||||
|
||||
for port_id in port_ids:
|
||||
cls.ports.behaviors.update_port(
|
||||
port_id=port_id, security_groups=security_groups,
|
||||
raise_exception=raise_exception)
|
||||
|
||||
def verify_remote_clients_auth(self, servers, remote_clients,
|
||||
sec_groups=None):
|
||||
"""
|
||||
@summary: verifying remote clients authentication
|
||||
@param servers: server entities
|
||||
@type servers: list of server entities
|
||||
@param remote_clients: remote instance clients from servers
|
||||
@type remote_clients: list of remote instance clients
|
||||
@param sec_groups: security group applied to the server
|
||||
@type sec_groups: list of security groups entities
|
||||
"""
|
||||
error_msg = ('Remote client unable to authenticate for server {0} '
|
||||
'with security group: {1}')
|
||||
errors = []
|
||||
|
||||
# In case there are no security groups associated with the servers
|
||||
if not sec_groups:
|
||||
sec_groups = [''] * len(remote_clients)
|
||||
|
||||
for server, remote_client, sec_group in (
|
||||
zip(servers, remote_clients, sec_groups)):
|
||||
|
||||
if not remote_client.can_authenticate():
|
||||
msg = error_msg.format(server.id, sec_group)
|
||||
errors.append(msg)
|
||||
|
||||
return errors
|
||||
|
||||
def verify_ping(self, remote_client, ip_address, ip_version=4,
|
||||
count=3, accepted_packet_loss=0):
|
||||
"""
|
||||
@summary: Verify ICMP connectivity between two servers
|
||||
@param remote_client: remote client server to ping from
|
||||
@type remote_client: cloudcafe.compute.common.clients.
|
||||
remote_instance.linux.linux_client.LinuxClient
|
||||
@param ip_address: IP address to ping
|
||||
@type ip_address: str
|
||||
@param ip_version: version of IP address
|
||||
@type ip_version: int
|
||||
@param count: number of pings, for ex. ping -c count (by default 3)
|
||||
@type count: int
|
||||
@param accepted_packet_loss: fail if packet loss greater (by default 0)
|
||||
@type accepted_packet_loss: int
|
||||
"""
|
||||
|
||||
count = self.config.ping_count or count
|
||||
accepted_packet_loss = self.config.accepted_packet_loss or (
|
||||
accepted_packet_loss)
|
||||
ping_packet_loss_regex = '(\d{1,3})\.?\d*\%.*loss'
|
||||
|
||||
if ip_version == 6:
|
||||
|
@ -1087,10 +1252,10 @@ class NetworkingComputeFixture(NetworkingSecurityGroupsFixture):
|
|||
expected_data, ip_version=4):
|
||||
"""
|
||||
@summary: Verify UDP port connectivity between two servers
|
||||
@param listener_client: remote client server that receives TCP packages
|
||||
@param listener_client: remote client server that receives UDP packages
|
||||
@type listener_client: cloudcafe.compute.common.clients.
|
||||
remote_instance.linux.linux_client.LinuxClient
|
||||
@param sender_client: remote client server that sends TCP packages
|
||||
@param sender_client: remote client server that sends UDP packages
|
||||
@type sender_client: cloudcafe.compute.common.clients.
|
||||
remote_instance.linux.linux_client.LinuxClient
|
||||
@param listener_ip: public, service or isolated network IP
|
||||
|
@ -1104,7 +1269,7 @@ class NetworkingComputeFixture(NetworkingSecurityGroupsFixture):
|
|||
@type expected_data: str
|
||||
"""
|
||||
file_name = 'udp_transfer'
|
||||
|
||||
|
||||
# Can be set as the default_file_path property in the config
|
||||
# file servers section, or to be set to /root by default
|
||||
dir_path = self.servers.config.default_file_path or '/root'
|
||||
|
|
|
@ -0,0 +1,514 @@
|
|||
"""
|
||||
Copyright 2015 Rackspace
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
"""
|
||||
import time
|
||||
|
||||
from cafe.drivers.unittest.decorators import tags
|
||||
from cloudroast.networking.networks.fixtures import NetworkingComputeFixture
|
||||
|
||||
|
||||
# TCP ports to open on listener
|
||||
TCP_PORT1 = '443'
|
||||
TCP_PORT2 = '444'
|
||||
|
||||
# TCP port range to check from sender (SG rule port range 443-445)
|
||||
TCP_PORT_RANGE = '442-445'
|
||||
|
||||
# UDP ports for sending a file: port 750 within UDP egress rule, 749 not
|
||||
UDP_PORT_750 = '750'
|
||||
UDP_PORT_749 = '749'
|
||||
|
||||
# Operation now in progress if a reply from a port outside the rule
|
||||
TCP_RULE_EXPECTED_DATA = ['442 (tcp) timed out: Operation now in progress',
|
||||
'443 port [tcp/*] succeeded!',
|
||||
'444 port [tcp/*] succeeded!',
|
||||
'445 (tcp) failed: Connection refused']
|
||||
|
||||
TCP_EXPECTED_DATA = ['442 (tcp) failed: Connection refused',
|
||||
'443 port [tcp/*] succeeded!',
|
||||
'444 port [tcp/*] succeeded!',
|
||||
'445 (tcp) failed: Connection refused']
|
||||
|
||||
|
||||
class SecurityGroupsEgressIPv4Test(NetworkingComputeFixture):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(SecurityGroupsEgressIPv4Test, cls).setUpClass()
|
||||
|
||||
base_name = 'sg_egress_v4_{0}'
|
||||
keypair_name = base_name.format('keypair')
|
||||
network_name = base_name.format('net')
|
||||
|
||||
cls.network = cls.create_server_network(name=network_name, ipv4=True)
|
||||
cls.keypair = cls.create_keypair(name=keypair_name)
|
||||
|
||||
server_labels = ['listener', 'sender', 'icmp_sender', 'other_sender']
|
||||
server_names = [base_name.format(label) for label in server_labels]
|
||||
|
||||
# Creating servers on the same isolated network and
|
||||
# getting a dict with the server name as key and server obj as value
|
||||
servers = cls.create_multiple_servers(server_names=server_names,
|
||||
keypair_name=cls.keypair.name,
|
||||
networks=[cls.network.id])
|
||||
|
||||
# Setting the servers as class attributes identified by server label
|
||||
for label, name in zip(server_labels, server_names):
|
||||
setattr(cls, label, servers[name])
|
||||
|
||||
# Creating the security group and rules for IPv4 TCP testing
|
||||
cls.fixture_log.debug('Creating the security groups and rules')
|
||||
sg_tcp_ipv4_req = cls.sec.behaviors.create_security_group(
|
||||
name='sg_tcp_ipv4_egress',
|
||||
description='SG for testing IPv4 TCP egress rules')
|
||||
cls.sec_group_tcp_ipv4 = sg_tcp_ipv4_req.response.entity
|
||||
cls.delete_secgroups.append(cls.sec_group_tcp_ipv4.id)
|
||||
|
||||
egress_tcp_ipv4_rule_req = (
|
||||
cls.sec.behaviors.create_security_group_rule(
|
||||
security_group_id=cls.sec_group_tcp_ipv4.id,
|
||||
direction='egress', ethertype='IPv4', protocol='tcp',
|
||||
port_range_min=443, port_range_max=445))
|
||||
egress_tcp_rule = egress_tcp_ipv4_rule_req.response.entity
|
||||
cls.delete_secgroups_rules.append(egress_tcp_rule.id)
|
||||
|
||||
# Creating the security group rule for IPv4 UDP testing
|
||||
egress_udp_ipv4_rule_req = (
|
||||
cls.sec.behaviors.create_security_group_rule(
|
||||
security_group_id=cls.sec_group_tcp_ipv4.id,
|
||||
direction='egress', ethertype='IPv4', protocol='udp',
|
||||
port_range_min=750, port_range_max=752))
|
||||
egress_udp_rule = egress_udp_ipv4_rule_req.response.entity
|
||||
cls.delete_secgroups_rules.append(egress_udp_rule.id)
|
||||
|
||||
# Adding rules for remote client connectivity
|
||||
cls.create_ping_ssh_ingress_rules(
|
||||
sec_group_id=cls.sec_group_tcp_ipv4.id)
|
||||
|
||||
# Creating the security group and rules for IPv4 ICMP testing
|
||||
sg_icmp_ipv4_req = cls.sec.behaviors.create_security_group(
|
||||
name='sg_icmp_ipv4_egress',
|
||||
description='SG for testing IPv4 ICMP egress rules')
|
||||
cls.sec_group_icmp_ipv4 = sg_icmp_ipv4_req.response.entity
|
||||
cls.delete_secgroups.append(cls.sec_group_icmp_ipv4.id)
|
||||
|
||||
egress_icmp_ipv4_rule_req = (
|
||||
cls.sec.behaviors.create_security_group_rule(
|
||||
security_group_id=cls.sec_group_icmp_ipv4.id,
|
||||
direction='egress', ethertype='IPv4', protocol='icmp'))
|
||||
egress_icmp_ipv4_rule = egress_icmp_ipv4_rule_req.response.entity
|
||||
cls.delete_secgroups_rules.append(egress_icmp_ipv4_rule.id)
|
||||
|
||||
# Adding rules for remote client connectivity
|
||||
cls.create_ping_ssh_ingress_rules(
|
||||
sec_group_id=cls.sec_group_icmp_ipv4.id)
|
||||
|
||||
cls.security_group_ids = [cls.sec_group_tcp_ipv4.id,
|
||||
cls.sec_group_icmp_ipv4.id]
|
||||
|
||||
cls.sec_group_tcp_ipv4 = cls.sec.behaviors.get_security_group(
|
||||
cls.security_group_ids[0]).response.entity
|
||||
cls.sec_group_icmp_ipv4 = cls.sec.behaviors.get_security_group(
|
||||
cls.security_group_ids[1]).response.entity
|
||||
|
||||
# Defining the server personas
|
||||
cls.fixture_log.debug('Defining the server personas for quick port '
|
||||
'and IP address access')
|
||||
|
||||
# Persona labels as keys and the server to create the persona as value
|
||||
persona_servers = {'lp': cls.listener, 'op': cls.other_sender,
|
||||
'sp': cls.sender, 'spi': cls.icmp_sender}
|
||||
persona_kwargs = dict(inet=True, network=cls.network,
|
||||
inet_port_count=1, inet_fix_ipv4_count=1)
|
||||
|
||||
# Getting a dict with persona label as key and persona object as value
|
||||
personas = cls.create_multiple_personas(
|
||||
persona_servers=persona_servers, persona_kwargs=persona_kwargs)
|
||||
|
||||
# Setting the personas as class attributes identified by persona label
|
||||
for persona_label, persona in personas.items():
|
||||
setattr(cls, persona_label, persona)
|
||||
|
||||
# Updating server ports with security groups
|
||||
ports_to_update = [{'port_ids': [cls.sp.pnet_port_ids[0],
|
||||
cls.sp.snet_port_ids[0],
|
||||
cls.sp.inet_port_ids[0]],
|
||||
'security_groups': [cls.security_group_ids[0]]},
|
||||
{'port_ids': [cls.spi.pnet_port_ids[0],
|
||||
cls.spi.snet_port_ids[0],
|
||||
cls.spi.inet_port_ids[0]],
|
||||
'security_groups': [cls.security_group_ids[1]]}]
|
||||
|
||||
for ports in ports_to_update:
|
||||
cls.update_server_ports_w_sec_groups(
|
||||
port_ids=ports['port_ids'],
|
||||
security_groups=ports['security_groups'])
|
||||
|
||||
# Wait time for security groups to be enabled on server ports
|
||||
delay_msg = 'data plane delay {0}'.format(
|
||||
cls.sec.config.data_plane_delay)
|
||||
cls.fixture_log.debug(delay_msg)
|
||||
time.sleep(cls.sec.config.data_plane_delay)
|
||||
|
||||
def setUp(self):
|
||||
""" Creating the remote clients """
|
||||
self.fixture_log.debug('Creating the Remote Clients')
|
||||
self.lp_rc = self.servers.behaviors.get_remote_instance_client(
|
||||
server=self.listener, ip_address=self.lp.pnet_fix_ipv4[0],
|
||||
username=self.ssh_username, key=self.keypair.private_key,
|
||||
auth_strategy=self.auth_strategy)
|
||||
self.op_rc = self.servers.behaviors.get_remote_instance_client(
|
||||
server=self.other_sender, ip_address=self.op.pnet_fix_ipv4[0],
|
||||
username=self.ssh_username, key=self.keypair.private_key,
|
||||
auth_strategy=self.auth_strategy)
|
||||
|
||||
self.fixture_log.debug('Sender Remote Clients require ingress and '
|
||||
'egress rules working for ICMP and ingress '
|
||||
'rules for TCP')
|
||||
self.sp_rc = self.servers.behaviors.get_remote_instance_client(
|
||||
server=self.sender, ip_address=self.sp.pnet_fix_ipv4[0],
|
||||
username=self.ssh_username, key=self.keypair.private_key,
|
||||
auth_strategy=self.auth_strategy)
|
||||
self.spi_rc = self.servers.behaviors.get_remote_instance_client(
|
||||
server=self.icmp_sender, ip_address=self.spi.pnet_fix_ipv4[0],
|
||||
username=self.ssh_username, key=self.keypair.private_key,
|
||||
auth_strategy=self.auth_strategy)
|
||||
|
||||
@tags('publicnet', 'servicenet', 'isolatednet')
|
||||
def test_remote_client_connectivity(self):
|
||||
"""
|
||||
@summary: Testing the remote clients
|
||||
"""
|
||||
|
||||
servers = [self.listener, self.other_sender, self.sender,
|
||||
self.icmp_sender]
|
||||
remote_clients = [self.lp_rc, self.op_rc, self.sp_rc, self.spi_rc]
|
||||
|
||||
# Empty string for servers without security group
|
||||
sec_groups = ['', '', self.sec_group_tcp_ipv4,
|
||||
self.sec_group_icmp_ipv4]
|
||||
|
||||
result = self.verify_remote_clients_auth(
|
||||
servers=servers, remote_clients=remote_clients,
|
||||
sec_groups=sec_groups)
|
||||
|
||||
self.assertFalse(result)
|
||||
|
||||
@tags('publicnet')
|
||||
def test_publicnet_ping(self):
|
||||
"""
|
||||
@summary: Testing ping from other sender without security rules
|
||||
"""
|
||||
ip_address = self.lp.pnet_fix_ipv4[0]
|
||||
self.verify_ping(remote_client=self.op_rc, ip_address=ip_address)
|
||||
|
||||
@tags('servicenet')
|
||||
def test_servicenet_ping(self):
|
||||
"""
|
||||
@summary: Testing ping from other sender without security rules
|
||||
"""
|
||||
ip_address = self.lp.snet_fix_ipv4[0]
|
||||
self.verify_ping(remote_client=self.op_rc, ip_address=ip_address)
|
||||
|
||||
@tags('isolatednet')
|
||||
def test_isolatednet_ping(self):
|
||||
"""
|
||||
@summary: Testing ping from other sender without security rules
|
||||
"""
|
||||
ip_address = self.lp.inet_fix_ipv4[0]
|
||||
self.verify_ping(remote_client=self.op_rc, ip_address=ip_address)
|
||||
|
||||
@tags('publicnet')
|
||||
def test_publicnet_ping_w_icmp_egress(self):
|
||||
"""
|
||||
@summary: Testing ICMP egress rule on publicnet
|
||||
"""
|
||||
ip_address = self.lp.pnet_fix_ipv4[0]
|
||||
self.verify_ping(remote_client=self.spi_rc, ip_address=ip_address)
|
||||
|
||||
@tags('servicenet')
|
||||
def test_servicenet_ping_w_icmp_egress(self):
|
||||
"""
|
||||
@summary: Testing ICMP egress rule on servicenet
|
||||
"""
|
||||
ip_address = self.lp.snet_fix_ipv4[0]
|
||||
self.verify_ping(remote_client=self.spi_rc, ip_address=ip_address)
|
||||
|
||||
def test_isolatednet_ping_w_icmp_egress(self):
|
||||
"""
|
||||
@summary: Testing ICMP egress rule on isolatednet
|
||||
"""
|
||||
ip_address = self.lp.inet_fix_ipv4[0]
|
||||
self.verify_ping(remote_client=self.spi_rc, ip_address=ip_address)
|
||||
|
||||
@tags('publicnet')
|
||||
def test_publicnet_ports_w_tcp(self):
|
||||
"""
|
||||
@summary: Testing TCP ports on publicnet
|
||||
"""
|
||||
self.verify_tcp_connectivity(listener_client=self.lp_rc,
|
||||
sender_client=self.op_rc,
|
||||
listener_ip=self.lp.pnet_fix_ipv4[0],
|
||||
port1=TCP_PORT1, port2=TCP_PORT2,
|
||||
port_range=TCP_PORT_RANGE,
|
||||
expected_data=TCP_EXPECTED_DATA)
|
||||
|
||||
@tags('servicenet')
|
||||
def test_servicenet_ports_w_tcp(self):
|
||||
"""
|
||||
@summary: Testing TCP ports on servicenet
|
||||
"""
|
||||
self.verify_tcp_connectivity(listener_client=self.lp_rc,
|
||||
sender_client=self.op_rc,
|
||||
listener_ip=self.lp.snet_fix_ipv4[0],
|
||||
port1=TCP_PORT1, port2=TCP_PORT2,
|
||||
port_range=TCP_PORT_RANGE,
|
||||
expected_data=TCP_EXPECTED_DATA)
|
||||
|
||||
@tags('isolatednet')
|
||||
def test_isolatednet_ports_w_tcp(self):
|
||||
"""
|
||||
@summary: Testing TCP ports on isolatednet
|
||||
"""
|
||||
self.verify_tcp_connectivity(listener_client=self.lp_rc,
|
||||
sender_client=self.op_rc,
|
||||
listener_ip=self.lp.inet_fix_ipv4[0],
|
||||
port1=TCP_PORT1, port2=TCP_PORT2,
|
||||
port_range=TCP_PORT_RANGE,
|
||||
expected_data=TCP_EXPECTED_DATA)
|
||||
|
||||
@tags('publicnet')
|
||||
def test_publicnet_ports_w_tcp_egress(self):
|
||||
"""
|
||||
@summary: Testing TCP egress rule on publicnet
|
||||
"""
|
||||
self.verify_tcp_connectivity(listener_client=self.lp_rc,
|
||||
sender_client=self.sp_rc,
|
||||
listener_ip=self.lp.pnet_fix_ipv4[0],
|
||||
port1=TCP_PORT1, port2=TCP_PORT2,
|
||||
port_range=TCP_PORT_RANGE,
|
||||
expected_data=TCP_RULE_EXPECTED_DATA)
|
||||
|
||||
@tags('servicenet')
|
||||
def test_servicenet_ports_w_tcp_egress(self):
|
||||
"""
|
||||
@summary: Testing TCP egress rule on servicenet
|
||||
"""
|
||||
self.verify_tcp_connectivity(listener_client=self.lp_rc,
|
||||
sender_client=self.sp_rc,
|
||||
listener_ip=self.lp.snet_fix_ipv4[0],
|
||||
port1=TCP_PORT1, port2=TCP_PORT2,
|
||||
port_range=TCP_PORT_RANGE,
|
||||
expected_data=TCP_RULE_EXPECTED_DATA)
|
||||
|
||||
@tags('isolatednet')
|
||||
def test_isolatednet_ports_w_tcp_egress(self):
|
||||
"""
|
||||
@summary: Testing TCP egress rule on isolatednet
|
||||
"""
|
||||
self.verify_tcp_connectivity(listener_client=self.lp_rc,
|
||||
sender_client=self.sp_rc,
|
||||
listener_ip=self.lp.inet_fix_ipv4[0],
|
||||
port1=TCP_PORT1, port2=TCP_PORT2,
|
||||
port_range=TCP_PORT_RANGE,
|
||||
expected_data=TCP_RULE_EXPECTED_DATA)
|
||||
|
||||
@tags('isolatednet')
|
||||
def test_isolatednet_udp_port_750(self):
|
||||
"""
|
||||
@summary: Testing UDP from other sender without security rules
|
||||
over isolatednet on port 750
|
||||
"""
|
||||
|
||||
file_content = 'Security Groups UDP 750 testing from other sender'
|
||||
expected_data = 'XXXXX{0}'.format(file_content)
|
||||
|
||||
# UDP rule NOT applied to sender so the port is not limited here
|
||||
self.verify_udp_connectivity(
|
||||
listener_client=self.lp_rc, sender_client=self.op_rc,
|
||||
listener_ip=self.lp.inet_fix_ipv4[0], port=UDP_PORT_750,
|
||||
file_content=file_content, expected_data=expected_data)
|
||||
|
||||
@tags('isolatednet')
|
||||
def test_isolatednet_udp_port_749(self):
|
||||
"""
|
||||
@summary: Testing UDP from other sender without security rules
|
||||
over isolatednet on port 749
|
||||
"""
|
||||
|
||||
file_content = 'Security Groups UDP 749 testing from other sender'
|
||||
expected_data = 'XXXXX{0}'.format(file_content)
|
||||
|
||||
# Other sender server has no rules applied, both ports should work
|
||||
self.verify_udp_connectivity(
|
||||
listener_client=self.lp_rc, sender_client=self.op_rc,
|
||||
listener_ip=self.lp.inet_fix_ipv4[0], port=UDP_PORT_749,
|
||||
file_content=file_content, expected_data=expected_data)
|
||||
|
||||
@tags('isolatednet')
|
||||
def test_isolatednet_udp_port_750_w_udp_egress(self):
|
||||
"""
|
||||
@summary: Testing UDP from sender with security egress rules on
|
||||
port 750 that is part of the egress rule
|
||||
"""
|
||||
|
||||
file_content = 'Security Groups UDP 750 testing from sender'
|
||||
expected_data = 'XXXXX{0}'.format(file_content)
|
||||
|
||||
self.verify_udp_connectivity(
|
||||
listener_client=self.lp_rc, sender_client=self.sp_rc,
|
||||
listener_ip=self.lp.inet_fix_ipv4[0], port=UDP_PORT_750,
|
||||
file_content=file_content, expected_data=expected_data)
|
||||
|
||||
@tags('isolatednet')
|
||||
def test_isolatednet_udp_port_749_w_udp_egress(self):
|
||||
"""
|
||||
@summary: Testing UDP from sender with security egress rules on
|
||||
port 749 that is NOT part of the egress rule
|
||||
"""
|
||||
|
||||
file_content = 'Security Groups UDP 749 testing from other sender'
|
||||
expected_data = ''
|
||||
|
||||
# Port 749 NOT within rule, data should not be transmitted
|
||||
self.verify_udp_connectivity(
|
||||
listener_client=self.lp_rc, sender_client=self.sp_rc,
|
||||
listener_ip=self.lp.inet_fix_ipv4[0], port=UDP_PORT_749,
|
||||
file_content=file_content, expected_data=expected_data)
|
||||
|
||||
@tags('servicenet')
|
||||
def test_servicenet_udp_port_750(self):
|
||||
"""
|
||||
@summary: Testing UDP from other sender without security rules
|
||||
over servicenet on port 750
|
||||
"""
|
||||
|
||||
file_content = 'Security Groups UDP 750 testing from other sender'
|
||||
expected_data = 'XXXXX{0}'.format(file_content)
|
||||
|
||||
# UDP rule NOT applied to sender so the port is not limited here
|
||||
self.verify_udp_connectivity(
|
||||
listener_client=self.lp_rc, sender_client=self.op_rc,
|
||||
listener_ip=self.lp.snet_fix_ipv4[0], port=UDP_PORT_750,
|
||||
file_content=file_content, expected_data=expected_data)
|
||||
|
||||
@tags('servicenet')
|
||||
def test_servicenet_udp_port_749(self):
|
||||
"""
|
||||
@summary: Testing UDP from other sender without security rules
|
||||
over servicenet on port 749
|
||||
"""
|
||||
|
||||
file_content = 'Security Groups UDP 749 testing from other sender'
|
||||
expected_data = 'XXXXX{0}'.format(file_content)
|
||||
|
||||
# Other sender server has no rules applied, both ports should work
|
||||
self.verify_udp_connectivity(
|
||||
listener_client=self.lp_rc, sender_client=self.op_rc,
|
||||
listener_ip=self.lp.snet_fix_ipv4[0], port=UDP_PORT_749,
|
||||
file_content=file_content, expected_data=expected_data)
|
||||
|
||||
@tags('servicenet')
|
||||
def test_servicenet_udp_port_750_w_udp_egress(self):
|
||||
"""
|
||||
@summary: Testing UDP from sender with security egress rules on
|
||||
port 750 that is part of the egress rule
|
||||
"""
|
||||
|
||||
file_content = 'Security Groups UDP 750 testing from sender'
|
||||
expected_data = 'XXXXX{0}'.format(file_content)
|
||||
|
||||
self.verify_udp_connectivity(
|
||||
listener_client=self.lp_rc, sender_client=self.sp_rc,
|
||||
listener_ip=self.lp.snet_fix_ipv4[0], port=UDP_PORT_750,
|
||||
file_content=file_content, expected_data=expected_data)
|
||||
|
||||
@tags('servicenet')
|
||||
def test_servicenet_udp_port_749_w_udp_egress(self):
|
||||
"""
|
||||
@summary: Testing UDP from sender with security egress rules on
|
||||
port 749 that is NOT part of the egress rule
|
||||
"""
|
||||
|
||||
file_content = 'Security Groups UDP 749 testing from other sender'
|
||||
expected_data = ''
|
||||
|
||||
# Port 749 NOT within rule, data should not be transmitted
|
||||
self.verify_udp_connectivity(
|
||||
listener_client=self.lp_rc, sender_client=self.sp_rc,
|
||||
listener_ip=self.lp.snet_fix_ipv4[0], port=UDP_PORT_749,
|
||||
file_content=file_content, expected_data=expected_data)
|
||||
|
||||
@tags('publicnet')
|
||||
def test_publicnet_udp_port_750(self):
|
||||
"""
|
||||
@summary: Testing UDP from other sender without security rules
|
||||
over publicnet on port 750
|
||||
"""
|
||||
|
||||
file_content = 'Security Groups UDP 750 testing from other sender'
|
||||
expected_data = 'XXXXX{0}'.format(file_content)
|
||||
|
||||
# UDP rule NOT applied to sender so the port is not limited here
|
||||
self.verify_udp_connectivity(
|
||||
listener_client=self.lp_rc, sender_client=self.op_rc,
|
||||
listener_ip=self.lp.pnet_fix_ipv4[0], port=UDP_PORT_750,
|
||||
file_content=file_content, expected_data=expected_data)
|
||||
|
||||
@tags('publicnet')
|
||||
def test_publicnet_udp_port_749(self):
|
||||
"""
|
||||
@summary: Testing UDP from other sender without security rules
|
||||
over publicnet on port 749
|
||||
"""
|
||||
|
||||
file_content = 'Security Groups UDP 749 testing from other sender'
|
||||
expected_data = 'XXXXX{0}'.format(file_content)
|
||||
|
||||
# Other sender server has no rules applied, both ports should work
|
||||
self.verify_udp_connectivity(
|
||||
listener_client=self.lp_rc, sender_client=self.op_rc,
|
||||
listener_ip=self.lp.pnet_fix_ipv4[0], port=UDP_PORT_749,
|
||||
file_content=file_content, expected_data=expected_data)
|
||||
|
||||
@tags('publicnet')
|
||||
def test_publicnet_udp_port_750_w_udp_egress(self):
|
||||
"""
|
||||
@summary: Testing UDP from sender with security egress rules on
|
||||
port 750 that is part of the egress rule
|
||||
"""
|
||||
|
||||
file_content = 'Security Groups UDP 750 testing from sender'
|
||||
expected_data = 'XXXXX{0}'.format(file_content)
|
||||
|
||||
self.verify_udp_connectivity(
|
||||
listener_client=self.lp_rc, sender_client=self.sp_rc,
|
||||
listener_ip=self.lp.pnet_fix_ipv4[0], port=UDP_PORT_750,
|
||||
file_content=file_content, expected_data=expected_data)
|
||||
|
||||
@tags('publicnet')
|
||||
def test_publicnet_udp_port_749_w_udp_egress(self):
|
||||
"""
|
||||
@summary: Testing UDP from sender with security egress rules on
|
||||
port 749 that is NOT part of the egress rule
|
||||
"""
|
||||
|
||||
file_content = 'Security Groups UDP 749 testing from other sender'
|
||||
expected_data = ''
|
||||
|
||||
# Port 749 NOT within rule, data should not be transmitted
|
||||
self.verify_udp_connectivity(
|
||||
listener_client=self.lp_rc, sender_client=self.sp_rc,
|
||||
listener_ip=self.lp.pnet_fix_ipv4[0], port=UDP_PORT_749,
|
||||
file_content=file_content, expected_data=expected_data)
|
|
@ -0,0 +1,444 @@
|
|||
"""
|
||||
Copyright 2015 Rackspace
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
"""
|
||||
import time
|
||||
|
||||
from cafe.drivers.unittest.decorators import tags
|
||||
from cloudroast.networking.networks.fixtures import NetworkingComputeFixture
|
||||
|
||||
|
||||
# For TCP testing
|
||||
TCP_PORT1 = '993'
|
||||
TCP_PORT2 = '994'
|
||||
TCP_PORT_RANGE = '992-995'
|
||||
|
||||
# UDP ports for sending a file: port 750 within UDP egress rule, 749 not
|
||||
UDP_PORT_750 = '750'
|
||||
UDP_PORT_749 = '749'
|
||||
|
||||
# Operation now in progress if a reply from a port outside the rule
|
||||
TCP_RULE_EXPECTED_DATA = ['992 (tcp) timed out: Operation now in progress',
|
||||
'993 port [tcp/*] succeeded!',
|
||||
'994 port [tcp/*] succeeded!',
|
||||
'995 (tcp) failed: Connection refused']
|
||||
|
||||
TCP_EXPECTED_DATA = ['992 (tcp) failed: Connection refused',
|
||||
'993 port [tcp/*] succeeded!',
|
||||
'994 port [tcp/*] succeeded!',
|
||||
'995 (tcp) failed: Connection refused']
|
||||
|
||||
|
||||
class SecurityGroupsEgressIPv6Test(NetworkingComputeFixture):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(SecurityGroupsEgressIPv6Test, cls).setUpClass()
|
||||
|
||||
base_name = 'sg_egress_v6_{0}'
|
||||
keypair_name = base_name.format('keypair')
|
||||
network_name = base_name.format('net')
|
||||
|
||||
cls.network = cls.create_server_network(name=network_name, ipv6=True)
|
||||
cls.keypair = cls.create_keypair(name=keypair_name)
|
||||
|
||||
server_labels = ['listener', 'sender', 'icmp_sender', 'other_sender']
|
||||
server_names = [base_name.format(label) for label in server_labels]
|
||||
|
||||
# Creating servers on the same isolated network and
|
||||
# getting a dict with the server name as key and server obj as value
|
||||
servers = cls.create_multiple_servers(server_names=server_names,
|
||||
keypair_name=cls.keypair.name,
|
||||
networks=[cls.network.id])
|
||||
|
||||
# Setting the servers as class attributes identified by server label
|
||||
for label, name in zip(server_labels, server_names):
|
||||
setattr(cls, label, servers[name])
|
||||
|
||||
# Creating the security group and rules for IPv6 TCP testing
|
||||
cls.fixture_log.debug('Creating the security groups and rules')
|
||||
sg_tcp_ipv6_req = cls.sec.behaviors.create_security_group(
|
||||
name='sg_tcp_ipv6_egress',
|
||||
description='SG for testing IPv6 TCP egress rules')
|
||||
cls.sec_group_tcp_ipv6 = sg_tcp_ipv6_req.response.entity
|
||||
cls.delete_secgroups.append(cls.sec_group_tcp_ipv6.id)
|
||||
|
||||
egress_tcp_ipv6_rule_req = (
|
||||
cls.sec.behaviors.create_security_group_rule(
|
||||
security_group_id=cls.sec_group_tcp_ipv6.id,
|
||||
direction='egress', ethertype='IPv6', protocol='tcp',
|
||||
port_range_min=993, port_range_max=995))
|
||||
egress_tcp_rule = egress_tcp_ipv6_rule_req.response.entity
|
||||
cls.delete_secgroups_rules.append(egress_tcp_rule.id)
|
||||
|
||||
# Creating the security group rule for IPv6 UDP testing
|
||||
egress_udp_ipv6_rule_req = (
|
||||
cls.sec.behaviors.create_security_group_rule(
|
||||
security_group_id=cls.sec_group_tcp_ipv6.id,
|
||||
direction='egress', ethertype='IPv6', protocol='udp',
|
||||
port_range_min=750, port_range_max=752))
|
||||
egress_udp_rule = egress_udp_ipv6_rule_req.response.entity
|
||||
cls.delete_secgroups_rules.append(egress_udp_rule.id)
|
||||
|
||||
cls.create_ping_ssh_ingress_rules(
|
||||
sec_group_id=cls.sec_group_tcp_ipv6.id)
|
||||
|
||||
# Creating the security group and rules for IPv6 ICMP testing
|
||||
sg_icmp_ipv6_req = cls.sec.behaviors.create_security_group(
|
||||
name='sg_icmp_ipv6_egress',
|
||||
description='SG for testing IPv6 ICMP egress rules')
|
||||
cls.sec_group_icmp_ipv6 = sg_icmp_ipv6_req.response.entity
|
||||
cls.delete_secgroups.append(cls.sec_group_icmp_ipv6.id)
|
||||
|
||||
egress_icmp_ipv6_rule_req = (
|
||||
cls.sec.behaviors.create_security_group_rule(
|
||||
security_group_id=cls.sec_group_icmp_ipv6.id,
|
||||
direction='egress', ethertype='IPv6', protocol='icmp'))
|
||||
egress_icmp_ipv6_rule = egress_icmp_ipv6_rule_req.response.entity
|
||||
cls.delete_secgroups_rules.append(egress_icmp_ipv6_rule.id)
|
||||
|
||||
# ICMP ingress rules are also required to see the reply
|
||||
egress_icmp_ipv6_rule_req = (
|
||||
cls.sec.behaviors.create_security_group_rule(
|
||||
security_group_id=cls.sec_group_icmp_ipv6.id,
|
||||
direction='ingress', ethertype='IPv6', protocol='icmp'))
|
||||
egress_icmp_ipv6_rule = egress_icmp_ipv6_rule_req.response.entity
|
||||
cls.delete_secgroups_rules.append(egress_icmp_ipv6_rule.id)
|
||||
|
||||
cls.create_ping_ssh_ingress_rules(
|
||||
sec_group_id=cls.sec_group_icmp_ipv6.id)
|
||||
|
||||
cls.security_group_ids = [cls.sec_group_tcp_ipv6.id,
|
||||
cls.sec_group_icmp_ipv6.id]
|
||||
|
||||
cls.sec_group_tcp_ipv6 = cls.sec.behaviors.get_security_group(
|
||||
cls.security_group_ids[0]).response.entity
|
||||
cls.sec_group_icmp_ipv6 = cls.sec.behaviors.get_security_group(
|
||||
cls.security_group_ids[1]).response.entity
|
||||
|
||||
# Defining the server personas
|
||||
cls.fixture_log.debug('Defining the server personas for quick port '
|
||||
'and IP address access')
|
||||
|
||||
# Persona labels as keys and the server to create the persona as value
|
||||
persona_servers = {'lp': cls.listener, 'op': cls.other_sender,
|
||||
'sp': cls.sender, 'spi': cls.icmp_sender}
|
||||
persona_kwargs = dict(inet=True, network=cls.network,
|
||||
inet_port_count=1, inet_fix_ipv6_count=1)
|
||||
|
||||
# Getting a dict with persona label as key and persona object as value
|
||||
personas = cls.create_multiple_personas(
|
||||
persona_servers=persona_servers, persona_kwargs=persona_kwargs)
|
||||
|
||||
# Setting the personas as class attributes identified by persona label
|
||||
for persona_label, persona in personas.items():
|
||||
setattr(cls, persona_label, persona)
|
||||
|
||||
# Creating personas as class attributes, for ex. cls.lp, etc.
|
||||
cls.fixture_log.debug('Defining the server personas for quick port '
|
||||
'and IP address access')
|
||||
persona_servers = {'lp': cls.listener, 'op': cls.other_sender,
|
||||
'sp': cls.sender, 'spi': cls.icmp_sender}
|
||||
persona_kwargs = dict(inet=True, network=cls.network,
|
||||
inet_port_count=1, inet_fix_ipv4_count=1)
|
||||
|
||||
cls.create_multiple_personas(persona_servers=persona_servers,
|
||||
persona_kwargs=persona_kwargs)
|
||||
|
||||
# Updating server ports with security groups
|
||||
ports_to_update = [{'port_ids': [cls.sp.pnet_port_ids[0],
|
||||
cls.sp.snet_port_ids[0],
|
||||
cls.sp.inet_port_ids[0]],
|
||||
'security_groups': [cls.security_group_ids[0]]},
|
||||
{'port_ids': [cls.spi.pnet_port_ids[0],
|
||||
cls.spi.snet_port_ids[0],
|
||||
cls.spi.inet_port_ids[0]],
|
||||
'security_groups': [cls.security_group_ids[1]]}]
|
||||
|
||||
for ports in ports_to_update:
|
||||
cls.update_server_ports_w_sec_groups(
|
||||
port_ids=ports['port_ids'],
|
||||
security_groups=ports['security_groups'])
|
||||
|
||||
# Wait time for security groups to be enabled on server ports
|
||||
delay_msg = 'data plane delay {0}'.format(
|
||||
cls.sec.config.data_plane_delay)
|
||||
cls.fixture_log.debug(delay_msg)
|
||||
time.sleep(cls.sec.config.data_plane_delay)
|
||||
|
||||
def setUp(self):
|
||||
""" Creating the remote clients """
|
||||
|
||||
self.fixture_log.debug('Creating the Remote Clients')
|
||||
self.lp_rc = self.servers.behaviors.get_remote_instance_client(
|
||||
server=self.listener, ip_address=self.lp.pnet_fix_ipv4[0],
|
||||
username=self.ssh_username, key=self.keypair.private_key,
|
||||
auth_strategy=self.auth_strategy)
|
||||
self.op_rc = self.servers.behaviors.get_remote_instance_client(
|
||||
server=self.other_sender, ip_address=self.op.pnet_fix_ipv4[0],
|
||||
username=self.ssh_username, key=self.keypair.private_key,
|
||||
auth_strategy=self.auth_strategy)
|
||||
|
||||
self.fixture_log.debug('Sender Remote Clients require ingress and '
|
||||
'egress rules working for ICMP and ingress '
|
||||
'rules for TCP')
|
||||
self.sp_rc = self.servers.behaviors.get_remote_instance_client(
|
||||
server=self.sender, ip_address=self.sp.pnet_fix_ipv4[0],
|
||||
username=self.ssh_username, key=self.keypair.private_key,
|
||||
auth_strategy=self.auth_strategy)
|
||||
self.spi_rc = self.servers.behaviors.get_remote_instance_client(
|
||||
server=self.icmp_sender, ip_address=self.spi.pnet_fix_ipv4[0],
|
||||
username=self.ssh_username, key=self.keypair.private_key,
|
||||
auth_strategy=self.auth_strategy)
|
||||
|
||||
@tags('publicnet', 'isolatednet')
|
||||
def test_remote_client_connectivity_v6(self):
|
||||
"""
|
||||
@summary: Testing the remote clients
|
||||
"""
|
||||
|
||||
servers = [self.listener, self.other_sender, self.sender,
|
||||
self.icmp_sender]
|
||||
remote_clients = [self.lp_rc, self.op_rc, self.sp_rc, self.spi_rc]
|
||||
|
||||
# Empty string for servers without security group
|
||||
sec_groups = ['', '', self.sec_group_tcp_ipv6,
|
||||
self.sec_group_icmp_ipv6]
|
||||
|
||||
result = self.verify_remote_clients_auth(
|
||||
servers=servers, remote_clients=remote_clients,
|
||||
sec_groups=sec_groups)
|
||||
|
||||
self.assertFalse(result)
|
||||
|
||||
@tags('publicnet')
|
||||
def test_publicnet_ping_v6(self):
|
||||
"""
|
||||
@summary: Testing ping from other sender without security rules
|
||||
"""
|
||||
ip_address = self.lp.pnet_fix_ipv6[0]
|
||||
self.verify_ping(remote_client=self.op_rc, ip_address=ip_address,
|
||||
ip_version=6)
|
||||
|
||||
@tags('isolatednet')
|
||||
def test_isolatednet_ping_v6(self):
|
||||
"""
|
||||
@summary: Testing ping from other sender without security rules
|
||||
"""
|
||||
ip_address = self.lp.inet_fix_ipv6[0]
|
||||
self.verify_ping(remote_client=self.op_rc, ip_address=ip_address,
|
||||
ip_version=6)
|
||||
|
||||
@tags('publicnet')
|
||||
def test_publicnet_ping_w_icmp_egress_v6(self):
|
||||
"""
|
||||
@summary: Testing ICMP egress rule on publicnet
|
||||
"""
|
||||
ip_address = self.lp.pnet_fix_ipv6[0]
|
||||
self.verify_ping(remote_client=self.spi_rc, ip_address=ip_address,
|
||||
ip_version=6)
|
||||
|
||||
@tags('isolatednet')
|
||||
def test_isolatednet_ping_w_icmp_egress_v6(self):
|
||||
"""
|
||||
@summary: Testing ICMP egress rule on isolatednet
|
||||
"""
|
||||
ip_address = self.lp.inet_fix_ipv6[0]
|
||||
self.verify_ping(remote_client=self.spi_rc, ip_address=ip_address,
|
||||
ip_version=6)
|
||||
|
||||
@tags('publicnet')
|
||||
def test_publicnet_ports_w_tcp_v6(self):
|
||||
"""
|
||||
@summary: Testing TCP ports on publicnet
|
||||
"""
|
||||
self.verify_tcp_connectivity(listener_client=self.lp_rc,
|
||||
sender_client=self.op_rc,
|
||||
listener_ip=self.lp.pnet_fix_ipv6[0],
|
||||
port1=TCP_PORT1, port2=TCP_PORT2,
|
||||
port_range=TCP_PORT_RANGE,
|
||||
expected_data=TCP_EXPECTED_DATA,
|
||||
ip_version=6)
|
||||
|
||||
@tags('isolatednet')
|
||||
def test_isolatednet_ports_w_tcp_v6(self):
|
||||
"""
|
||||
@summary: Testing TCP ports on isolatednet
|
||||
"""
|
||||
self.verify_tcp_connectivity(listener_client=self.lp_rc,
|
||||
sender_client=self.op_rc,
|
||||
listener_ip=self.lp.inet_fix_ipv6[0],
|
||||
port1=TCP_PORT1, port2=TCP_PORT2,
|
||||
port_range=TCP_PORT_RANGE,
|
||||
expected_data=TCP_EXPECTED_DATA,
|
||||
ip_version=6)
|
||||
|
||||
@tags('publicnet')
|
||||
def test_publicnet_ports_w_tcp_egress_v6(self):
|
||||
"""
|
||||
@summary: Testing TCP egress rule on publicnet
|
||||
"""
|
||||
self.verify_tcp_connectivity(listener_client=self.lp_rc,
|
||||
sender_client=self.sp_rc,
|
||||
listener_ip=self.lp.pnet_fix_ipv6[0],
|
||||
port1=TCP_PORT1, port2=TCP_PORT2,
|
||||
port_range=TCP_PORT_RANGE,
|
||||
expected_data=TCP_RULE_EXPECTED_DATA,
|
||||
ip_version=6)
|
||||
|
||||
@tags('isolatednet')
|
||||
def test_isolatednet_ports_w_tcp_egress_v6(self):
|
||||
"""
|
||||
@summary: Testing TCP egress rule on isolatednet
|
||||
"""
|
||||
self.verify_tcp_connectivity(listener_client=self.lp_rc,
|
||||
sender_client=self.sp_rc,
|
||||
listener_ip=self.lp.inet_fix_ipv6[0],
|
||||
port1=TCP_PORT1, port2=TCP_PORT2,
|
||||
port_range=TCP_PORT_RANGE,
|
||||
expected_data=TCP_RULE_EXPECTED_DATA,
|
||||
ip_version=6)
|
||||
|
||||
@tags('isolatednet')
|
||||
def test_isolatednet_udp_port_750_v6(self):
|
||||
"""
|
||||
@summary: Testing UDP from other sender without security rules
|
||||
over isolatednet on port 750
|
||||
"""
|
||||
|
||||
file_content = 'Security Groups UDP 750 testing from other sender'
|
||||
expected_data = 'XXXXX{0}'.format(file_content)
|
||||
|
||||
# UDP rule NOT applied to sender so the port is not limited here
|
||||
self.verify_udp_connectivity(
|
||||
listener_client=self.lp_rc, sender_client=self.op_rc,
|
||||
listener_ip=self.lp.inet_fix_ipv6[0], port=UDP_PORT_750,
|
||||
file_content=file_content, expected_data=expected_data,
|
||||
ip_version=6)
|
||||
|
||||
@tags('isolatednet')
|
||||
def test_isolatednet_udp_port_749_v6(self):
|
||||
"""
|
||||
@summary: Testing UDP from other sender without security rules
|
||||
over isolatednet on port 749
|
||||
"""
|
||||
|
||||
file_content = 'Security Groups UDP 749 testing from other sender'
|
||||
expected_data = 'XXXXX{0}'.format(file_content)
|
||||
|
||||
# Other sender server has no rules applied, both ports should work
|
||||
self.verify_udp_connectivity(
|
||||
listener_client=self.lp_rc, sender_client=self.op_rc,
|
||||
listener_ip=self.lp.inet_fix_ipv6[0], port=UDP_PORT_749,
|
||||
file_content=file_content, expected_data=expected_data,
|
||||
ip_version=6)
|
||||
|
||||
@tags('isolatednet')
|
||||
def test_isolatednet_udp_port_750_w_udp_egress_v6(self):
|
||||
"""
|
||||
@summary: Testing UDP from sender with security egress rules on
|
||||
port 750 that is part of the egress rule
|
||||
"""
|
||||
|
||||
file_content = 'Security Groups UDP 750 testing from sender'
|
||||
expected_data = 'XXXXX{0}'.format(file_content)
|
||||
|
||||
self.verify_udp_connectivity(
|
||||
listener_client=self.lp_rc, sender_client=self.sp_rc,
|
||||
listener_ip=self.lp.inet_fix_ipv6[0], port=UDP_PORT_750,
|
||||
file_content=file_content, expected_data=expected_data,
|
||||
ip_version=6)
|
||||
|
||||
@tags('isolatednet')
|
||||
def test_isolatednet_udp_port_749_w_udp_egress_v6(self):
|
||||
"""
|
||||
@summary: Testing UDP from sender with security egress rules on
|
||||
port 749 that is NOT part of the egress rule
|
||||
"""
|
||||
|
||||
file_content = 'Security Groups UDP 749 testing from other sender'
|
||||
expected_data = ''
|
||||
|
||||
# Port 749 NOT within rule, data should not be transmitted
|
||||
self.verify_udp_connectivity(
|
||||
listener_client=self.lp_rc, sender_client=self.sp_rc,
|
||||
listener_ip=self.lp.inet_fix_ipv6[0], port=UDP_PORT_749,
|
||||
file_content=file_content, expected_data=expected_data,
|
||||
ip_version=6)
|
||||
|
||||
@tags('publicnet')
|
||||
def test_publicnet_udp_port_750_v6(self):
|
||||
"""
|
||||
@summary: Testing UDP from other sender without security rules
|
||||
over publicnet on port 750
|
||||
"""
|
||||
|
||||
file_content = 'Security Groups UDP 750 testing from other sender'
|
||||
expected_data = 'XXXXX{0}'.format(file_content)
|
||||
|
||||
# UDP rule NOT applied to sender so the port is not limited here
|
||||
self.verify_udp_connectivity(
|
||||
listener_client=self.lp_rc, sender_client=self.op_rc,
|
||||
listener_ip=self.lp.pnet_fix_ipv6[0], port=UDP_PORT_750,
|
||||
file_content=file_content, expected_data=expected_data,
|
||||
ip_version=6)
|
||||
|
||||
@tags('publicnet')
|
||||
def test_publicnet_udp_port_749_v6(self):
|
||||
"""
|
||||
@summary: Testing UDP from other sender without security rules
|
||||
over publicnet on port 749
|
||||
"""
|
||||
|
||||
file_content = 'Security Groups UDP 749 testing from other sender'
|
||||
expected_data = 'XXXXX{0}'.format(file_content)
|
||||
|
||||
# Other sender server has no rules applied, both ports should work
|
||||
self.verify_udp_connectivity(
|
||||
listener_client=self.lp_rc, sender_client=self.op_rc,
|
||||
listener_ip=self.lp.pnet_fix_ipv6[0], port=UDP_PORT_749,
|
||||
file_content=file_content, expected_data=expected_data,
|
||||
ip_version=6)
|
||||
|
||||
@tags('publicnet')
|
||||
def test_publicnet_udp_port_750_w_udp_egress_v6(self):
|
||||
"""
|
||||
@summary: Testing UDP from sender with security egress rules on
|
||||
port 750 that is part of the egress rule
|
||||
"""
|
||||
|
||||
file_content = 'Security Groups UDP 750 testing from sender'
|
||||
expected_data = 'XXXXX{0}'.format(file_content)
|
||||
|
||||
self.verify_udp_connectivity(
|
||||
listener_client=self.lp_rc, sender_client=self.sp_rc,
|
||||
listener_ip=self.lp.pnet_fix_ipv6[0], port=UDP_PORT_750,
|
||||
file_content=file_content, expected_data=expected_data,
|
||||
ip_version=6)
|
||||
|
||||
@tags('publicnet')
|
||||
def test_publicnet_udp_port_749_w_udp_egress_v6(self):
|
||||
"""
|
||||
@summary: Testing UDP from sender with security egress rules on
|
||||
port 749 that is NOT part of the egress rule
|
||||
"""
|
||||
|
||||
file_content = 'Security Groups UDP 749 testing from other sender'
|
||||
expected_data = ''
|
||||
|
||||
# Port 749 NOT within rule, data should not be transmitted
|
||||
self.verify_udp_connectivity(
|
||||
listener_client=self.lp_rc, sender_client=self.sp_rc,
|
||||
listener_ip=self.lp.pnet_fix_ipv6[0], port=UDP_PORT_749,
|
||||
file_content=file_content, expected_data=expected_data,
|
||||
ip_version=6)
|
Loading…
Reference in New Issue