IPv4 and IPv6 Security Groups Scenario Tests

- IPv4 Security Rules egress tests for publicnet, servicenet
  and isolatednet
- IPv6 Security Rules egress tests for publicnet and isolatednet
- Covering ICMP, TCP and UDP protocols

Change-Id: If79dbce8cc5dd9e444f55e6d22759dc87d428329
This commit is contained in:
Leonardo Maycotte 2016-03-01 14:03:50 -06:00
parent ec4869da92
commit 23644d63e0
3 changed files with 1132 additions and 9 deletions

View File

@ -42,6 +42,7 @@ from cloudcafe.networking.networks.extensions.security_groups_api.composites \
import SecurityGroupsComposite
from cloudcafe.networking.networks.extensions.security_groups_api.models.\
response import SecurityGroup, SecurityGroupRule
from cloudcafe.networking.networks.personas import ServerPersona
class NetworkingFixture(BaseTestFixture):
@ -991,6 +992,9 @@ class NetworkingComputeFixture(NetworkingSecurityGroupsFixture):
# Other reusable values
cls.flavor_ref = cls.flavors.config.primary_flavor
cls.image_ref = cls.images.config.primary_image
cls.ssh_username = (cls.images.config.primary_image_default_user or
'root')
cls.auth_strategy = cls.servers.config.instance_auth_strategy or 'key'
cls.delete_servers = []
cls.failed_servers = []
@ -1042,22 +1046,183 @@ class NetworkingComputeFixture(NetworkingSecurityGroupsFixture):
# Using rand_name to avoid HTTP 409 Conflict due to duplicate names
name = rand_name(name)
cls.fixture_log.info('Creating test server keypair')
resp = cls.keypairs.client.create_keypair(name)
msg = ('Unable to create server keypair: received HTTP {0} instead of '
'the expected HTTP {1} response').format(
resp.status_code, ComputeResponseCodes.CREATE_KEYPAIR)
assert resp.status_code == ComputeResponseCodes.CREATE_KEYPAIR, msg
cls.delete_keypairs.append(resp.entity.name)
return resp.entity
def verify_remote_client_auth(self, server, remote_client,
sec_group=None):
msg = ('Remote client unable to authenticate for server {0} {1} '
'with security group: {2}').format(server.name, server.id,
sec_group)
self.assertTrue(remote_client.can_authenticate(), msg)
@classmethod
def create_server_network(cls, name, ipv4=False, ipv6=False):
"""
@summary: Create an isolated network
@param name: network name
@type name: str
@param ipv4: flag to create network with IPv4 subnet
@type ipv4: bool
@param ipv6: flag to create network with IPv6 subnet
@type ipv6: bool
"""
net_msg = 'Creating {0} isolated network'.format(name)
cls.fixture_log.info(net_msg)
net_req = cls.networks.behaviors.create_network(name=name)
network = net_req.response.entity
cls.delete_networks.append(network.id)
if ipv4:
cls.fixture_log.info('Creating IPv4 subnet')
cls.subnets.behaviors.create_subnet(network_id=network.id,
ip_version=4)
if ipv6:
cls.fixture_log.info('Creating IPv6 subnet')
cls.subnets.behaviors.create_subnet(network_id=network.id,
ip_version=6)
return network
@classmethod
def create_multiple_servers(cls, server_names, keypair_name=None,
networks=None, pnet=True, snet=True):
"""
@summary: Create multiple test servers
@param server_names: names of servers to create
@type server_names: list(str)
@param keypair_name: (optional) keypair to create servers with
@type keypair_name: str
@param networks: (optional) isolated network ids to create servers with
@type networks: list(str)
@param pnet: flag to create server with public network
@type pnet: bool
@param snet: flag to create server with service (private) network
@type snet: bool
@return: server entity objects
@rtype: dict(server name: server entity object)
"""
cls.fixture_log.debug('Defining the network IDs to be used')
network_ids = []
if pnet:
network_ids.append(cls.public_network_id)
if snet:
network_ids.append(cls.service_network_id)
if networks:
network_ids.extend(networks)
# Response dict where the key will be the server name and the value the
# server entity object
servers = dict()
server_ids = []
for name in server_names:
server = cls.create_test_server(name=name, key_name=keypair_name,
network_ids=network_ids,
active_server=False)
server_ids.append(server.id)
servers[name] = server
# Waiting for the servers to be active
cls.net.behaviors.wait_for_servers_to_be_active(
server_id_list=server_ids)
return servers
@classmethod
def create_multiple_personas(cls, persona_servers, persona_kwargs=None):
"""
@summary: Create multiple server personas
@param persona_servers: servers to create personas from
@type persona_servers: dict(persona_label: server)
@param persona_kwargs: (optional) server persona attributes, excluding
the server one that is at the persona_servers
@type persona_kwargs: dict
@return: servers personas
@rtype: dict(persona_label: persona)
"""
# In case serer personas are created with default values
if not persona_kwargs:
persona_kwargs = dict()
# Response dict where the key will be the persona label and the value
# the persona object
personas = dict()
for persona_label, persona_server in persona_servers.items():
persona_kwargs.update(server=persona_server)
server_persona = ServerPersona(**persona_kwargs)
personas[persona_label] = server_persona
return personas
@classmethod
def update_server_ports_w_sec_groups(cls, port_ids, security_groups,
raise_exception=True):
"""
@summary: Updates server ports with security groups
@param port_ids: ports to update
@type port_ids: list(str)
@param security_groups: security groups to add to the ports
@type security_groups: list(str)
@param raise_exception: raise exception port was not updated
@type raise_exception: bool
"""
for port_id in port_ids:
cls.ports.behaviors.update_port(
port_id=port_id, security_groups=security_groups,
raise_exception=raise_exception)
def verify_remote_clients_auth(self, servers, remote_clients,
sec_groups=None):
"""
@summary: verifying remote clients authentication
@param servers: server entities
@type servers: list of server entities
@param remote_clients: remote instance clients from servers
@type remote_clients: list of remote instance clients
@param sec_groups: security group applied to the server
@type sec_groups: list of security groups entities
"""
error_msg = ('Remote client unable to authenticate for server {0} '
'with security group: {1}')
errors = []
# In case there are no security groups associated with the servers
if not sec_groups:
sec_groups = [''] * len(remote_clients)
for server, remote_client, sec_group in (
zip(servers, remote_clients, sec_groups)):
if not remote_client.can_authenticate():
msg = error_msg.format(server.id, sec_group)
errors.append(msg)
return errors
def verify_ping(self, remote_client, ip_address, ip_version=4,
count=3, accepted_packet_loss=0):
"""
@summary: Verify ICMP connectivity between two servers
@param remote_client: remote client server to ping from
@type remote_client: cloudcafe.compute.common.clients.
remote_instance.linux.linux_client.LinuxClient
@param ip_address: IP address to ping
@type ip_address: str
@param ip_version: version of IP address
@type ip_version: int
@param count: number of pings, for ex. ping -c count (by default 3)
@type count: int
@param accepted_packet_loss: fail if packet loss greater (by default 0)
@type accepted_packet_loss: int
"""
count = self.config.ping_count or count
accepted_packet_loss = self.config.accepted_packet_loss or (
accepted_packet_loss)
ping_packet_loss_regex = '(\d{1,3})\.?\d*\%.*loss'
if ip_version == 6:
@ -1087,10 +1252,10 @@ class NetworkingComputeFixture(NetworkingSecurityGroupsFixture):
expected_data, ip_version=4):
"""
@summary: Verify UDP port connectivity between two servers
@param listener_client: remote client server that receives TCP packages
@param listener_client: remote client server that receives UDP packages
@type listener_client: cloudcafe.compute.common.clients.
remote_instance.linux.linux_client.LinuxClient
@param sender_client: remote client server that sends TCP packages
@param sender_client: remote client server that sends UDP packages
@type sender_client: cloudcafe.compute.common.clients.
remote_instance.linux.linux_client.LinuxClient
@param listener_ip: public, service or isolated network IP
@ -1104,7 +1269,7 @@ class NetworkingComputeFixture(NetworkingSecurityGroupsFixture):
@type expected_data: str
"""
file_name = 'udp_transfer'
# Can be set as the default_file_path property in the config
# file servers section, or to be set to /root by default
dir_path = self.servers.config.default_file_path or '/root'

View File

@ -0,0 +1,514 @@
"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import time
from cafe.drivers.unittest.decorators import tags
from cloudroast.networking.networks.fixtures import NetworkingComputeFixture
# TCP ports to open on listener
TCP_PORT1 = '443'
TCP_PORT2 = '444'
# TCP port range to check from sender (SG rule port range 443-445)
TCP_PORT_RANGE = '442-445'
# UDP ports for sending a file: port 750 within UDP egress rule, 749 not
UDP_PORT_750 = '750'
UDP_PORT_749 = '749'
# Operation now in progress if a reply from a port outside the rule
TCP_RULE_EXPECTED_DATA = ['442 (tcp) timed out: Operation now in progress',
'443 port [tcp/*] succeeded!',
'444 port [tcp/*] succeeded!',
'445 (tcp) failed: Connection refused']
TCP_EXPECTED_DATA = ['442 (tcp) failed: Connection refused',
'443 port [tcp/*] succeeded!',
'444 port [tcp/*] succeeded!',
'445 (tcp) failed: Connection refused']
class SecurityGroupsEgressIPv4Test(NetworkingComputeFixture):
@classmethod
def setUpClass(cls):
super(SecurityGroupsEgressIPv4Test, cls).setUpClass()
base_name = 'sg_egress_v4_{0}'
keypair_name = base_name.format('keypair')
network_name = base_name.format('net')
cls.network = cls.create_server_network(name=network_name, ipv4=True)
cls.keypair = cls.create_keypair(name=keypair_name)
server_labels = ['listener', 'sender', 'icmp_sender', 'other_sender']
server_names = [base_name.format(label) for label in server_labels]
# Creating servers on the same isolated network and
# getting a dict with the server name as key and server obj as value
servers = cls.create_multiple_servers(server_names=server_names,
keypair_name=cls.keypair.name,
networks=[cls.network.id])
# Setting the servers as class attributes identified by server label
for label, name in zip(server_labels, server_names):
setattr(cls, label, servers[name])
# Creating the security group and rules for IPv4 TCP testing
cls.fixture_log.debug('Creating the security groups and rules')
sg_tcp_ipv4_req = cls.sec.behaviors.create_security_group(
name='sg_tcp_ipv4_egress',
description='SG for testing IPv4 TCP egress rules')
cls.sec_group_tcp_ipv4 = sg_tcp_ipv4_req.response.entity
cls.delete_secgroups.append(cls.sec_group_tcp_ipv4.id)
egress_tcp_ipv4_rule_req = (
cls.sec.behaviors.create_security_group_rule(
security_group_id=cls.sec_group_tcp_ipv4.id,
direction='egress', ethertype='IPv4', protocol='tcp',
port_range_min=443, port_range_max=445))
egress_tcp_rule = egress_tcp_ipv4_rule_req.response.entity
cls.delete_secgroups_rules.append(egress_tcp_rule.id)
# Creating the security group rule for IPv4 UDP testing
egress_udp_ipv4_rule_req = (
cls.sec.behaviors.create_security_group_rule(
security_group_id=cls.sec_group_tcp_ipv4.id,
direction='egress', ethertype='IPv4', protocol='udp',
port_range_min=750, port_range_max=752))
egress_udp_rule = egress_udp_ipv4_rule_req.response.entity
cls.delete_secgroups_rules.append(egress_udp_rule.id)
# Adding rules for remote client connectivity
cls.create_ping_ssh_ingress_rules(
sec_group_id=cls.sec_group_tcp_ipv4.id)
# Creating the security group and rules for IPv4 ICMP testing
sg_icmp_ipv4_req = cls.sec.behaviors.create_security_group(
name='sg_icmp_ipv4_egress',
description='SG for testing IPv4 ICMP egress rules')
cls.sec_group_icmp_ipv4 = sg_icmp_ipv4_req.response.entity
cls.delete_secgroups.append(cls.sec_group_icmp_ipv4.id)
egress_icmp_ipv4_rule_req = (
cls.sec.behaviors.create_security_group_rule(
security_group_id=cls.sec_group_icmp_ipv4.id,
direction='egress', ethertype='IPv4', protocol='icmp'))
egress_icmp_ipv4_rule = egress_icmp_ipv4_rule_req.response.entity
cls.delete_secgroups_rules.append(egress_icmp_ipv4_rule.id)
# Adding rules for remote client connectivity
cls.create_ping_ssh_ingress_rules(
sec_group_id=cls.sec_group_icmp_ipv4.id)
cls.security_group_ids = [cls.sec_group_tcp_ipv4.id,
cls.sec_group_icmp_ipv4.id]
cls.sec_group_tcp_ipv4 = cls.sec.behaviors.get_security_group(
cls.security_group_ids[0]).response.entity
cls.sec_group_icmp_ipv4 = cls.sec.behaviors.get_security_group(
cls.security_group_ids[1]).response.entity
# Defining the server personas
cls.fixture_log.debug('Defining the server personas for quick port '
'and IP address access')
# Persona labels as keys and the server to create the persona as value
persona_servers = {'lp': cls.listener, 'op': cls.other_sender,
'sp': cls.sender, 'spi': cls.icmp_sender}
persona_kwargs = dict(inet=True, network=cls.network,
inet_port_count=1, inet_fix_ipv4_count=1)
# Getting a dict with persona label as key and persona object as value
personas = cls.create_multiple_personas(
persona_servers=persona_servers, persona_kwargs=persona_kwargs)
# Setting the personas as class attributes identified by persona label
for persona_label, persona in personas.items():
setattr(cls, persona_label, persona)
# Updating server ports with security groups
ports_to_update = [{'port_ids': [cls.sp.pnet_port_ids[0],
cls.sp.snet_port_ids[0],
cls.sp.inet_port_ids[0]],
'security_groups': [cls.security_group_ids[0]]},
{'port_ids': [cls.spi.pnet_port_ids[0],
cls.spi.snet_port_ids[0],
cls.spi.inet_port_ids[0]],
'security_groups': [cls.security_group_ids[1]]}]
for ports in ports_to_update:
cls.update_server_ports_w_sec_groups(
port_ids=ports['port_ids'],
security_groups=ports['security_groups'])
# Wait time for security groups to be enabled on server ports
delay_msg = 'data plane delay {0}'.format(
cls.sec.config.data_plane_delay)
cls.fixture_log.debug(delay_msg)
time.sleep(cls.sec.config.data_plane_delay)
def setUp(self):
""" Creating the remote clients """
self.fixture_log.debug('Creating the Remote Clients')
self.lp_rc = self.servers.behaviors.get_remote_instance_client(
server=self.listener, ip_address=self.lp.pnet_fix_ipv4[0],
username=self.ssh_username, key=self.keypair.private_key,
auth_strategy=self.auth_strategy)
self.op_rc = self.servers.behaviors.get_remote_instance_client(
server=self.other_sender, ip_address=self.op.pnet_fix_ipv4[0],
username=self.ssh_username, key=self.keypair.private_key,
auth_strategy=self.auth_strategy)
self.fixture_log.debug('Sender Remote Clients require ingress and '
'egress rules working for ICMP and ingress '
'rules for TCP')
self.sp_rc = self.servers.behaviors.get_remote_instance_client(
server=self.sender, ip_address=self.sp.pnet_fix_ipv4[0],
username=self.ssh_username, key=self.keypair.private_key,
auth_strategy=self.auth_strategy)
self.spi_rc = self.servers.behaviors.get_remote_instance_client(
server=self.icmp_sender, ip_address=self.spi.pnet_fix_ipv4[0],
username=self.ssh_username, key=self.keypair.private_key,
auth_strategy=self.auth_strategy)
@tags('publicnet', 'servicenet', 'isolatednet')
def test_remote_client_connectivity(self):
"""
@summary: Testing the remote clients
"""
servers = [self.listener, self.other_sender, self.sender,
self.icmp_sender]
remote_clients = [self.lp_rc, self.op_rc, self.sp_rc, self.spi_rc]
# Empty string for servers without security group
sec_groups = ['', '', self.sec_group_tcp_ipv4,
self.sec_group_icmp_ipv4]
result = self.verify_remote_clients_auth(
servers=servers, remote_clients=remote_clients,
sec_groups=sec_groups)
self.assertFalse(result)
@tags('publicnet')
def test_publicnet_ping(self):
"""
@summary: Testing ping from other sender without security rules
"""
ip_address = self.lp.pnet_fix_ipv4[0]
self.verify_ping(remote_client=self.op_rc, ip_address=ip_address)
@tags('servicenet')
def test_servicenet_ping(self):
"""
@summary: Testing ping from other sender without security rules
"""
ip_address = self.lp.snet_fix_ipv4[0]
self.verify_ping(remote_client=self.op_rc, ip_address=ip_address)
@tags('isolatednet')
def test_isolatednet_ping(self):
"""
@summary: Testing ping from other sender without security rules
"""
ip_address = self.lp.inet_fix_ipv4[0]
self.verify_ping(remote_client=self.op_rc, ip_address=ip_address)
@tags('publicnet')
def test_publicnet_ping_w_icmp_egress(self):
"""
@summary: Testing ICMP egress rule on publicnet
"""
ip_address = self.lp.pnet_fix_ipv4[0]
self.verify_ping(remote_client=self.spi_rc, ip_address=ip_address)
@tags('servicenet')
def test_servicenet_ping_w_icmp_egress(self):
"""
@summary: Testing ICMP egress rule on servicenet
"""
ip_address = self.lp.snet_fix_ipv4[0]
self.verify_ping(remote_client=self.spi_rc, ip_address=ip_address)
def test_isolatednet_ping_w_icmp_egress(self):
"""
@summary: Testing ICMP egress rule on isolatednet
"""
ip_address = self.lp.inet_fix_ipv4[0]
self.verify_ping(remote_client=self.spi_rc, ip_address=ip_address)
@tags('publicnet')
def test_publicnet_ports_w_tcp(self):
"""
@summary: Testing TCP ports on publicnet
"""
self.verify_tcp_connectivity(listener_client=self.lp_rc,
sender_client=self.op_rc,
listener_ip=self.lp.pnet_fix_ipv4[0],
port1=TCP_PORT1, port2=TCP_PORT2,
port_range=TCP_PORT_RANGE,
expected_data=TCP_EXPECTED_DATA)
@tags('servicenet')
def test_servicenet_ports_w_tcp(self):
"""
@summary: Testing TCP ports on servicenet
"""
self.verify_tcp_connectivity(listener_client=self.lp_rc,
sender_client=self.op_rc,
listener_ip=self.lp.snet_fix_ipv4[0],
port1=TCP_PORT1, port2=TCP_PORT2,
port_range=TCP_PORT_RANGE,
expected_data=TCP_EXPECTED_DATA)
@tags('isolatednet')
def test_isolatednet_ports_w_tcp(self):
"""
@summary: Testing TCP ports on isolatednet
"""
self.verify_tcp_connectivity(listener_client=self.lp_rc,
sender_client=self.op_rc,
listener_ip=self.lp.inet_fix_ipv4[0],
port1=TCP_PORT1, port2=TCP_PORT2,
port_range=TCP_PORT_RANGE,
expected_data=TCP_EXPECTED_DATA)
@tags('publicnet')
def test_publicnet_ports_w_tcp_egress(self):
"""
@summary: Testing TCP egress rule on publicnet
"""
self.verify_tcp_connectivity(listener_client=self.lp_rc,
sender_client=self.sp_rc,
listener_ip=self.lp.pnet_fix_ipv4[0],
port1=TCP_PORT1, port2=TCP_PORT2,
port_range=TCP_PORT_RANGE,
expected_data=TCP_RULE_EXPECTED_DATA)
@tags('servicenet')
def test_servicenet_ports_w_tcp_egress(self):
"""
@summary: Testing TCP egress rule on servicenet
"""
self.verify_tcp_connectivity(listener_client=self.lp_rc,
sender_client=self.sp_rc,
listener_ip=self.lp.snet_fix_ipv4[0],
port1=TCP_PORT1, port2=TCP_PORT2,
port_range=TCP_PORT_RANGE,
expected_data=TCP_RULE_EXPECTED_DATA)
@tags('isolatednet')
def test_isolatednet_ports_w_tcp_egress(self):
"""
@summary: Testing TCP egress rule on isolatednet
"""
self.verify_tcp_connectivity(listener_client=self.lp_rc,
sender_client=self.sp_rc,
listener_ip=self.lp.inet_fix_ipv4[0],
port1=TCP_PORT1, port2=TCP_PORT2,
port_range=TCP_PORT_RANGE,
expected_data=TCP_RULE_EXPECTED_DATA)
@tags('isolatednet')
def test_isolatednet_udp_port_750(self):
"""
@summary: Testing UDP from other sender without security rules
over isolatednet on port 750
"""
file_content = 'Security Groups UDP 750 testing from other sender'
expected_data = 'XXXXX{0}'.format(file_content)
# UDP rule NOT applied to sender so the port is not limited here
self.verify_udp_connectivity(
listener_client=self.lp_rc, sender_client=self.op_rc,
listener_ip=self.lp.inet_fix_ipv4[0], port=UDP_PORT_750,
file_content=file_content, expected_data=expected_data)
@tags('isolatednet')
def test_isolatednet_udp_port_749(self):
"""
@summary: Testing UDP from other sender without security rules
over isolatednet on port 749
"""
file_content = 'Security Groups UDP 749 testing from other sender'
expected_data = 'XXXXX{0}'.format(file_content)
# Other sender server has no rules applied, both ports should work
self.verify_udp_connectivity(
listener_client=self.lp_rc, sender_client=self.op_rc,
listener_ip=self.lp.inet_fix_ipv4[0], port=UDP_PORT_749,
file_content=file_content, expected_data=expected_data)
@tags('isolatednet')
def test_isolatednet_udp_port_750_w_udp_egress(self):
"""
@summary: Testing UDP from sender with security egress rules on
port 750 that is part of the egress rule
"""
file_content = 'Security Groups UDP 750 testing from sender'
expected_data = 'XXXXX{0}'.format(file_content)
self.verify_udp_connectivity(
listener_client=self.lp_rc, sender_client=self.sp_rc,
listener_ip=self.lp.inet_fix_ipv4[0], port=UDP_PORT_750,
file_content=file_content, expected_data=expected_data)
@tags('isolatednet')
def test_isolatednet_udp_port_749_w_udp_egress(self):
"""
@summary: Testing UDP from sender with security egress rules on
port 749 that is NOT part of the egress rule
"""
file_content = 'Security Groups UDP 749 testing from other sender'
expected_data = ''
# Port 749 NOT within rule, data should not be transmitted
self.verify_udp_connectivity(
listener_client=self.lp_rc, sender_client=self.sp_rc,
listener_ip=self.lp.inet_fix_ipv4[0], port=UDP_PORT_749,
file_content=file_content, expected_data=expected_data)
@tags('servicenet')
def test_servicenet_udp_port_750(self):
"""
@summary: Testing UDP from other sender without security rules
over servicenet on port 750
"""
file_content = 'Security Groups UDP 750 testing from other sender'
expected_data = 'XXXXX{0}'.format(file_content)
# UDP rule NOT applied to sender so the port is not limited here
self.verify_udp_connectivity(
listener_client=self.lp_rc, sender_client=self.op_rc,
listener_ip=self.lp.snet_fix_ipv4[0], port=UDP_PORT_750,
file_content=file_content, expected_data=expected_data)
@tags('servicenet')
def test_servicenet_udp_port_749(self):
"""
@summary: Testing UDP from other sender without security rules
over servicenet on port 749
"""
file_content = 'Security Groups UDP 749 testing from other sender'
expected_data = 'XXXXX{0}'.format(file_content)
# Other sender server has no rules applied, both ports should work
self.verify_udp_connectivity(
listener_client=self.lp_rc, sender_client=self.op_rc,
listener_ip=self.lp.snet_fix_ipv4[0], port=UDP_PORT_749,
file_content=file_content, expected_data=expected_data)
@tags('servicenet')
def test_servicenet_udp_port_750_w_udp_egress(self):
"""
@summary: Testing UDP from sender with security egress rules on
port 750 that is part of the egress rule
"""
file_content = 'Security Groups UDP 750 testing from sender'
expected_data = 'XXXXX{0}'.format(file_content)
self.verify_udp_connectivity(
listener_client=self.lp_rc, sender_client=self.sp_rc,
listener_ip=self.lp.snet_fix_ipv4[0], port=UDP_PORT_750,
file_content=file_content, expected_data=expected_data)
@tags('servicenet')
def test_servicenet_udp_port_749_w_udp_egress(self):
"""
@summary: Testing UDP from sender with security egress rules on
port 749 that is NOT part of the egress rule
"""
file_content = 'Security Groups UDP 749 testing from other sender'
expected_data = ''
# Port 749 NOT within rule, data should not be transmitted
self.verify_udp_connectivity(
listener_client=self.lp_rc, sender_client=self.sp_rc,
listener_ip=self.lp.snet_fix_ipv4[0], port=UDP_PORT_749,
file_content=file_content, expected_data=expected_data)
@tags('publicnet')
def test_publicnet_udp_port_750(self):
"""
@summary: Testing UDP from other sender without security rules
over publicnet on port 750
"""
file_content = 'Security Groups UDP 750 testing from other sender'
expected_data = 'XXXXX{0}'.format(file_content)
# UDP rule NOT applied to sender so the port is not limited here
self.verify_udp_connectivity(
listener_client=self.lp_rc, sender_client=self.op_rc,
listener_ip=self.lp.pnet_fix_ipv4[0], port=UDP_PORT_750,
file_content=file_content, expected_data=expected_data)
@tags('publicnet')
def test_publicnet_udp_port_749(self):
"""
@summary: Testing UDP from other sender without security rules
over publicnet on port 749
"""
file_content = 'Security Groups UDP 749 testing from other sender'
expected_data = 'XXXXX{0}'.format(file_content)
# Other sender server has no rules applied, both ports should work
self.verify_udp_connectivity(
listener_client=self.lp_rc, sender_client=self.op_rc,
listener_ip=self.lp.pnet_fix_ipv4[0], port=UDP_PORT_749,
file_content=file_content, expected_data=expected_data)
@tags('publicnet')
def test_publicnet_udp_port_750_w_udp_egress(self):
"""
@summary: Testing UDP from sender with security egress rules on
port 750 that is part of the egress rule
"""
file_content = 'Security Groups UDP 750 testing from sender'
expected_data = 'XXXXX{0}'.format(file_content)
self.verify_udp_connectivity(
listener_client=self.lp_rc, sender_client=self.sp_rc,
listener_ip=self.lp.pnet_fix_ipv4[0], port=UDP_PORT_750,
file_content=file_content, expected_data=expected_data)
@tags('publicnet')
def test_publicnet_udp_port_749_w_udp_egress(self):
"""
@summary: Testing UDP from sender with security egress rules on
port 749 that is NOT part of the egress rule
"""
file_content = 'Security Groups UDP 749 testing from other sender'
expected_data = ''
# Port 749 NOT within rule, data should not be transmitted
self.verify_udp_connectivity(
listener_client=self.lp_rc, sender_client=self.sp_rc,
listener_ip=self.lp.pnet_fix_ipv4[0], port=UDP_PORT_749,
file_content=file_content, expected_data=expected_data)

View File

@ -0,0 +1,444 @@
"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import time
from cafe.drivers.unittest.decorators import tags
from cloudroast.networking.networks.fixtures import NetworkingComputeFixture
# For TCP testing
TCP_PORT1 = '993'
TCP_PORT2 = '994'
TCP_PORT_RANGE = '992-995'
# UDP ports for sending a file: port 750 within UDP egress rule, 749 not
UDP_PORT_750 = '750'
UDP_PORT_749 = '749'
# Operation now in progress if a reply from a port outside the rule
TCP_RULE_EXPECTED_DATA = ['992 (tcp) timed out: Operation now in progress',
'993 port [tcp/*] succeeded!',
'994 port [tcp/*] succeeded!',
'995 (tcp) failed: Connection refused']
TCP_EXPECTED_DATA = ['992 (tcp) failed: Connection refused',
'993 port [tcp/*] succeeded!',
'994 port [tcp/*] succeeded!',
'995 (tcp) failed: Connection refused']
class SecurityGroupsEgressIPv6Test(NetworkingComputeFixture):
@classmethod
def setUpClass(cls):
super(SecurityGroupsEgressIPv6Test, cls).setUpClass()
base_name = 'sg_egress_v6_{0}'
keypair_name = base_name.format('keypair')
network_name = base_name.format('net')
cls.network = cls.create_server_network(name=network_name, ipv6=True)
cls.keypair = cls.create_keypair(name=keypair_name)
server_labels = ['listener', 'sender', 'icmp_sender', 'other_sender']
server_names = [base_name.format(label) for label in server_labels]
# Creating servers on the same isolated network and
# getting a dict with the server name as key and server obj as value
servers = cls.create_multiple_servers(server_names=server_names,
keypair_name=cls.keypair.name,
networks=[cls.network.id])
# Setting the servers as class attributes identified by server label
for label, name in zip(server_labels, server_names):
setattr(cls, label, servers[name])
# Creating the security group and rules for IPv6 TCP testing
cls.fixture_log.debug('Creating the security groups and rules')
sg_tcp_ipv6_req = cls.sec.behaviors.create_security_group(
name='sg_tcp_ipv6_egress',
description='SG for testing IPv6 TCP egress rules')
cls.sec_group_tcp_ipv6 = sg_tcp_ipv6_req.response.entity
cls.delete_secgroups.append(cls.sec_group_tcp_ipv6.id)
egress_tcp_ipv6_rule_req = (
cls.sec.behaviors.create_security_group_rule(
security_group_id=cls.sec_group_tcp_ipv6.id,
direction='egress', ethertype='IPv6', protocol='tcp',
port_range_min=993, port_range_max=995))
egress_tcp_rule = egress_tcp_ipv6_rule_req.response.entity
cls.delete_secgroups_rules.append(egress_tcp_rule.id)
# Creating the security group rule for IPv6 UDP testing
egress_udp_ipv6_rule_req = (
cls.sec.behaviors.create_security_group_rule(
security_group_id=cls.sec_group_tcp_ipv6.id,
direction='egress', ethertype='IPv6', protocol='udp',
port_range_min=750, port_range_max=752))
egress_udp_rule = egress_udp_ipv6_rule_req.response.entity
cls.delete_secgroups_rules.append(egress_udp_rule.id)
cls.create_ping_ssh_ingress_rules(
sec_group_id=cls.sec_group_tcp_ipv6.id)
# Creating the security group and rules for IPv6 ICMP testing
sg_icmp_ipv6_req = cls.sec.behaviors.create_security_group(
name='sg_icmp_ipv6_egress',
description='SG for testing IPv6 ICMP egress rules')
cls.sec_group_icmp_ipv6 = sg_icmp_ipv6_req.response.entity
cls.delete_secgroups.append(cls.sec_group_icmp_ipv6.id)
egress_icmp_ipv6_rule_req = (
cls.sec.behaviors.create_security_group_rule(
security_group_id=cls.sec_group_icmp_ipv6.id,
direction='egress', ethertype='IPv6', protocol='icmp'))
egress_icmp_ipv6_rule = egress_icmp_ipv6_rule_req.response.entity
cls.delete_secgroups_rules.append(egress_icmp_ipv6_rule.id)
# ICMP ingress rules are also required to see the reply
egress_icmp_ipv6_rule_req = (
cls.sec.behaviors.create_security_group_rule(
security_group_id=cls.sec_group_icmp_ipv6.id,
direction='ingress', ethertype='IPv6', protocol='icmp'))
egress_icmp_ipv6_rule = egress_icmp_ipv6_rule_req.response.entity
cls.delete_secgroups_rules.append(egress_icmp_ipv6_rule.id)
cls.create_ping_ssh_ingress_rules(
sec_group_id=cls.sec_group_icmp_ipv6.id)
cls.security_group_ids = [cls.sec_group_tcp_ipv6.id,
cls.sec_group_icmp_ipv6.id]
cls.sec_group_tcp_ipv6 = cls.sec.behaviors.get_security_group(
cls.security_group_ids[0]).response.entity
cls.sec_group_icmp_ipv6 = cls.sec.behaviors.get_security_group(
cls.security_group_ids[1]).response.entity
# Defining the server personas
cls.fixture_log.debug('Defining the server personas for quick port '
'and IP address access')
# Persona labels as keys and the server to create the persona as value
persona_servers = {'lp': cls.listener, 'op': cls.other_sender,
'sp': cls.sender, 'spi': cls.icmp_sender}
persona_kwargs = dict(inet=True, network=cls.network,
inet_port_count=1, inet_fix_ipv6_count=1)
# Getting a dict with persona label as key and persona object as value
personas = cls.create_multiple_personas(
persona_servers=persona_servers, persona_kwargs=persona_kwargs)
# Setting the personas as class attributes identified by persona label
for persona_label, persona in personas.items():
setattr(cls, persona_label, persona)
# Creating personas as class attributes, for ex. cls.lp, etc.
cls.fixture_log.debug('Defining the server personas for quick port '
'and IP address access')
persona_servers = {'lp': cls.listener, 'op': cls.other_sender,
'sp': cls.sender, 'spi': cls.icmp_sender}
persona_kwargs = dict(inet=True, network=cls.network,
inet_port_count=1, inet_fix_ipv4_count=1)
cls.create_multiple_personas(persona_servers=persona_servers,
persona_kwargs=persona_kwargs)
# Updating server ports with security groups
ports_to_update = [{'port_ids': [cls.sp.pnet_port_ids[0],
cls.sp.snet_port_ids[0],
cls.sp.inet_port_ids[0]],
'security_groups': [cls.security_group_ids[0]]},
{'port_ids': [cls.spi.pnet_port_ids[0],
cls.spi.snet_port_ids[0],
cls.spi.inet_port_ids[0]],
'security_groups': [cls.security_group_ids[1]]}]
for ports in ports_to_update:
cls.update_server_ports_w_sec_groups(
port_ids=ports['port_ids'],
security_groups=ports['security_groups'])
# Wait time for security groups to be enabled on server ports
delay_msg = 'data plane delay {0}'.format(
cls.sec.config.data_plane_delay)
cls.fixture_log.debug(delay_msg)
time.sleep(cls.sec.config.data_plane_delay)
def setUp(self):
""" Creating the remote clients """
self.fixture_log.debug('Creating the Remote Clients')
self.lp_rc = self.servers.behaviors.get_remote_instance_client(
server=self.listener, ip_address=self.lp.pnet_fix_ipv4[0],
username=self.ssh_username, key=self.keypair.private_key,
auth_strategy=self.auth_strategy)
self.op_rc = self.servers.behaviors.get_remote_instance_client(
server=self.other_sender, ip_address=self.op.pnet_fix_ipv4[0],
username=self.ssh_username, key=self.keypair.private_key,
auth_strategy=self.auth_strategy)
self.fixture_log.debug('Sender Remote Clients require ingress and '
'egress rules working for ICMP and ingress '
'rules for TCP')
self.sp_rc = self.servers.behaviors.get_remote_instance_client(
server=self.sender, ip_address=self.sp.pnet_fix_ipv4[0],
username=self.ssh_username, key=self.keypair.private_key,
auth_strategy=self.auth_strategy)
self.spi_rc = self.servers.behaviors.get_remote_instance_client(
server=self.icmp_sender, ip_address=self.spi.pnet_fix_ipv4[0],
username=self.ssh_username, key=self.keypair.private_key,
auth_strategy=self.auth_strategy)
@tags('publicnet', 'isolatednet')
def test_remote_client_connectivity_v6(self):
"""
@summary: Testing the remote clients
"""
servers = [self.listener, self.other_sender, self.sender,
self.icmp_sender]
remote_clients = [self.lp_rc, self.op_rc, self.sp_rc, self.spi_rc]
# Empty string for servers without security group
sec_groups = ['', '', self.sec_group_tcp_ipv6,
self.sec_group_icmp_ipv6]
result = self.verify_remote_clients_auth(
servers=servers, remote_clients=remote_clients,
sec_groups=sec_groups)
self.assertFalse(result)
@tags('publicnet')
def test_publicnet_ping_v6(self):
"""
@summary: Testing ping from other sender without security rules
"""
ip_address = self.lp.pnet_fix_ipv6[0]
self.verify_ping(remote_client=self.op_rc, ip_address=ip_address,
ip_version=6)
@tags('isolatednet')
def test_isolatednet_ping_v6(self):
"""
@summary: Testing ping from other sender without security rules
"""
ip_address = self.lp.inet_fix_ipv6[0]
self.verify_ping(remote_client=self.op_rc, ip_address=ip_address,
ip_version=6)
@tags('publicnet')
def test_publicnet_ping_w_icmp_egress_v6(self):
"""
@summary: Testing ICMP egress rule on publicnet
"""
ip_address = self.lp.pnet_fix_ipv6[0]
self.verify_ping(remote_client=self.spi_rc, ip_address=ip_address,
ip_version=6)
@tags('isolatednet')
def test_isolatednet_ping_w_icmp_egress_v6(self):
"""
@summary: Testing ICMP egress rule on isolatednet
"""
ip_address = self.lp.inet_fix_ipv6[0]
self.verify_ping(remote_client=self.spi_rc, ip_address=ip_address,
ip_version=6)
@tags('publicnet')
def test_publicnet_ports_w_tcp_v6(self):
"""
@summary: Testing TCP ports on publicnet
"""
self.verify_tcp_connectivity(listener_client=self.lp_rc,
sender_client=self.op_rc,
listener_ip=self.lp.pnet_fix_ipv6[0],
port1=TCP_PORT1, port2=TCP_PORT2,
port_range=TCP_PORT_RANGE,
expected_data=TCP_EXPECTED_DATA,
ip_version=6)
@tags('isolatednet')
def test_isolatednet_ports_w_tcp_v6(self):
"""
@summary: Testing TCP ports on isolatednet
"""
self.verify_tcp_connectivity(listener_client=self.lp_rc,
sender_client=self.op_rc,
listener_ip=self.lp.inet_fix_ipv6[0],
port1=TCP_PORT1, port2=TCP_PORT2,
port_range=TCP_PORT_RANGE,
expected_data=TCP_EXPECTED_DATA,
ip_version=6)
@tags('publicnet')
def test_publicnet_ports_w_tcp_egress_v6(self):
"""
@summary: Testing TCP egress rule on publicnet
"""
self.verify_tcp_connectivity(listener_client=self.lp_rc,
sender_client=self.sp_rc,
listener_ip=self.lp.pnet_fix_ipv6[0],
port1=TCP_PORT1, port2=TCP_PORT2,
port_range=TCP_PORT_RANGE,
expected_data=TCP_RULE_EXPECTED_DATA,
ip_version=6)
@tags('isolatednet')
def test_isolatednet_ports_w_tcp_egress_v6(self):
"""
@summary: Testing TCP egress rule on isolatednet
"""
self.verify_tcp_connectivity(listener_client=self.lp_rc,
sender_client=self.sp_rc,
listener_ip=self.lp.inet_fix_ipv6[0],
port1=TCP_PORT1, port2=TCP_PORT2,
port_range=TCP_PORT_RANGE,
expected_data=TCP_RULE_EXPECTED_DATA,
ip_version=6)
@tags('isolatednet')
def test_isolatednet_udp_port_750_v6(self):
"""
@summary: Testing UDP from other sender without security rules
over isolatednet on port 750
"""
file_content = 'Security Groups UDP 750 testing from other sender'
expected_data = 'XXXXX{0}'.format(file_content)
# UDP rule NOT applied to sender so the port is not limited here
self.verify_udp_connectivity(
listener_client=self.lp_rc, sender_client=self.op_rc,
listener_ip=self.lp.inet_fix_ipv6[0], port=UDP_PORT_750,
file_content=file_content, expected_data=expected_data,
ip_version=6)
@tags('isolatednet')
def test_isolatednet_udp_port_749_v6(self):
"""
@summary: Testing UDP from other sender without security rules
over isolatednet on port 749
"""
file_content = 'Security Groups UDP 749 testing from other sender'
expected_data = 'XXXXX{0}'.format(file_content)
# Other sender server has no rules applied, both ports should work
self.verify_udp_connectivity(
listener_client=self.lp_rc, sender_client=self.op_rc,
listener_ip=self.lp.inet_fix_ipv6[0], port=UDP_PORT_749,
file_content=file_content, expected_data=expected_data,
ip_version=6)
@tags('isolatednet')
def test_isolatednet_udp_port_750_w_udp_egress_v6(self):
"""
@summary: Testing UDP from sender with security egress rules on
port 750 that is part of the egress rule
"""
file_content = 'Security Groups UDP 750 testing from sender'
expected_data = 'XXXXX{0}'.format(file_content)
self.verify_udp_connectivity(
listener_client=self.lp_rc, sender_client=self.sp_rc,
listener_ip=self.lp.inet_fix_ipv6[0], port=UDP_PORT_750,
file_content=file_content, expected_data=expected_data,
ip_version=6)
@tags('isolatednet')
def test_isolatednet_udp_port_749_w_udp_egress_v6(self):
"""
@summary: Testing UDP from sender with security egress rules on
port 749 that is NOT part of the egress rule
"""
file_content = 'Security Groups UDP 749 testing from other sender'
expected_data = ''
# Port 749 NOT within rule, data should not be transmitted
self.verify_udp_connectivity(
listener_client=self.lp_rc, sender_client=self.sp_rc,
listener_ip=self.lp.inet_fix_ipv6[0], port=UDP_PORT_749,
file_content=file_content, expected_data=expected_data,
ip_version=6)
@tags('publicnet')
def test_publicnet_udp_port_750_v6(self):
"""
@summary: Testing UDP from other sender without security rules
over publicnet on port 750
"""
file_content = 'Security Groups UDP 750 testing from other sender'
expected_data = 'XXXXX{0}'.format(file_content)
# UDP rule NOT applied to sender so the port is not limited here
self.verify_udp_connectivity(
listener_client=self.lp_rc, sender_client=self.op_rc,
listener_ip=self.lp.pnet_fix_ipv6[0], port=UDP_PORT_750,
file_content=file_content, expected_data=expected_data,
ip_version=6)
@tags('publicnet')
def test_publicnet_udp_port_749_v6(self):
"""
@summary: Testing UDP from other sender without security rules
over publicnet on port 749
"""
file_content = 'Security Groups UDP 749 testing from other sender'
expected_data = 'XXXXX{0}'.format(file_content)
# Other sender server has no rules applied, both ports should work
self.verify_udp_connectivity(
listener_client=self.lp_rc, sender_client=self.op_rc,
listener_ip=self.lp.pnet_fix_ipv6[0], port=UDP_PORT_749,
file_content=file_content, expected_data=expected_data,
ip_version=6)
@tags('publicnet')
def test_publicnet_udp_port_750_w_udp_egress_v6(self):
"""
@summary: Testing UDP from sender with security egress rules on
port 750 that is part of the egress rule
"""
file_content = 'Security Groups UDP 750 testing from sender'
expected_data = 'XXXXX{0}'.format(file_content)
self.verify_udp_connectivity(
listener_client=self.lp_rc, sender_client=self.sp_rc,
listener_ip=self.lp.pnet_fix_ipv6[0], port=UDP_PORT_750,
file_content=file_content, expected_data=expected_data,
ip_version=6)
@tags('publicnet')
def test_publicnet_udp_port_749_w_udp_egress_v6(self):
"""
@summary: Testing UDP from sender with security egress rules on
port 749 that is NOT part of the egress rule
"""
file_content = 'Security Groups UDP 749 testing from other sender'
expected_data = ''
# Port 749 NOT within rule, data should not be transmitted
self.verify_udp_connectivity(
listener_client=self.lp_rc, sender_client=self.sp_rc,
listener_ip=self.lp.pnet_fix_ipv6[0], port=UDP_PORT_749,
file_content=file_content, expected_data=expected_data,
ip_version=6)