Add more tests

Change-Id: I4427eb78b9c6b73a0a492cb246e9ecd532dc58d6
This commit is contained in:
Andrey Pavlov 2015-02-09 15:53:38 +03:00
parent c116351a17
commit b51104258e
14 changed files with 2246 additions and 12 deletions

View File

@ -1,4 +1,7 @@
[DEFAULT]
test_command=OS_STDOUT_CAPTURE=1 OS_STDERR_CAPTURE=1 OS_LOG_CAPTURE=1 ${PYTHON:-python} -m subunit.run discover -t ./ ec2api/tests/unit $LISTOPT $IDOPTION
test_command=OS_STDOUT_CAPTURE=1 \
OS_STDERR_CAPTURE=1 \
OS_LOG_CAPTURE=1 \
${PYTHON:-python} -m subunit.run discover -t ./ ec2api/tests/unit $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list

View File

@ -248,7 +248,7 @@ def assign_private_ip_addresses(context, network_interface_id,
fixed_ips = os_port['fixed_ips'] or []
if private_ip_address is not None:
for ip_address in private_ip_address:
if ip_address not in subnet_ipnet:
if netaddr.IPAddress(ip_address) not in subnet_ipnet:
raise exception.InvalidParameterValue(
value=str(ip_address),
parameter='private_ip_address',

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/bin/bash -x
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -14,24 +14,30 @@
# This script is executed inside post_test_hook function in devstack gate.
CONFIG_DIR="."
export TEST_CONFIG_DIR=$(readlink -f $CONFIG_DIR)
# Sleep some time until all services are started
sleep 5
export TEST_CONFIG_DIR=$(readlink -f .)
export TEST_CONFIG="functional_tests.conf"
if [[ ! -f $CONFIG_DIR/$TEST_CONFIG ]]; then
if [[ ! -f $TEST_CONFIG_DIR/$TEST_CONFIG ]]; then
IMAGE_ID=$(euca-describe-images | grep "ami-" | head -n 1 | awk '{print $2}')
IMAGE_ID=$(euca-describe-images | grep "cirros" | grep "ami-" | head -n 1 | awk '{print $2}')
cat > $CONFIG_DIR/$TEST_CONFIG <<EOF
sudo bash -c "cat > $TEST_CONFIG_DIR/$TEST_CONFIG <<EOF
[aws]
ec2_url = $EC2_URL
aws_access = $EC2_ACCESS_KEY
aws_secret = $EC2_SECRET_KEY
image_id = $IMAGE_ID
EOF
EOF"
fi
python -m testtools.run discover -v -t ./ ec2api/tests/functional
sudo pip install -r test-requirements.txt
# botocore not in openstack requirements now, so install it manually
sudo pip install botocore==0.85
sudo OS_STDOUT_CAPTURE=-1 OS_STDERR_CAPTURE=-1 OS_TEST_TIMEOUT=500 OS_TEST_LOCK_PATH=${TMPDIR:-'/tmp'} \
python -m subunit.run discover -t ./ ./ec2api/tests/functional | subunit-2to1 | tools/colorizer.py
RETVAL=$?
# Here can be some commands for log archiving, etc...

View File

@ -0,0 +1,182 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from tempest_lib.openstack.common import log
from ec2api.tests.functional import base
LOG = log.getLogger(__name__)
class DhcpOptionsTest(base.EC2TestCase):
VPC_CIDR = '10.12.0.0/24'
vpc_id = None
def test_create_delete_dhcp_options(self):
kwargs = {
'DhcpConfigurations': [
{'Key': 'domain-name',
'Values': ['my.com', 'it.com']},
{'Key': 'domain-name-servers',
'Values': ['8.8.8.8', '8.8.4.4']},
{'Key': 'ntp-servers',
'Values': ['1.2.3.4']},
{'Key': 'netbios-name-servers',
'Values': ['4.3.2.1']},
{'Key': 'netbios-node-type',
'Values': ['2']},
],
}
resp, data = self.client.CreateDhcpOptions(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
options = data['DhcpOptions']
id = options['DhcpOptionsId']
res_clean = self.addResourceCleanUp(self.client.DeleteDhcpOptions,
DhcpOptionsId=id)
self.assertEqual(5, len(options['DhcpConfigurations']))
for cfg in options['DhcpConfigurations']:
self.assertEqual(2, len(cfg))
if cfg['Key'] == 'domain-name':
self.assertEqual(2, len(cfg['Values']))
values = [i['Value'] for i in cfg['Values']]
self.assertIn('my.com', values)
self.assertIn('it.com', values)
elif cfg['Key'] == 'domain-name-servers':
self.assertEqual(2, len(cfg['Values']))
values = [i['Value'] for i in cfg['Values']]
self.assertIn('8.8.8.8', values)
self.assertIn('8.8.4.4', values)
elif cfg['Key'] == 'ntp-servers':
self.assertEqual(1, len(cfg['Values']))
self.assertEqual('1.2.3.4', cfg['Values'][0]['Value'])
elif cfg['Key'] == 'netbios-name-servers':
self.assertEqual(1, len(cfg['Values']))
self.assertEqual('4.3.2.1', cfg['Values'][0]['Value'])
elif cfg['Key'] == 'netbios-node-type':
self.assertEqual(1, len(cfg['Values']))
self.assertEqual('2', cfg['Values'][0]['Value'])
else:
self.fail('Unknown key name in result - %s' % cfg['Key'])
resp, data = self.client.DeleteDhcpOptions(DhcpOptionsId=id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
def test_invalid_create_delete(self):
kwargs = {
'DhcpConfigurations': [
],
}
resp, data = self.client.CreateDhcpOptions(*[], **kwargs)
self.assertEqual(400, resp.status_code)
self.assertEqual('MissingParameter', data['Error']['Code'])
kwargs = {
'DhcpConfigurations': [{'Key': 'aaa', 'Values': []}],
}
resp, data = self.client.CreateDhcpOptions(*[], **kwargs)
if resp.status_code == 200:
self.addResourceCleanUp(self.client.DeleteDhcpOptions,
DhcpOptionsId=data['DhcpOptions']['DhcpOptionsId'])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
kwargs = {
'DhcpConfigurations': [{'Key': 'domain-name', 'Values': []}],
}
resp, data = self.client.CreateDhcpOptions(*[], **kwargs)
if resp.status_code == 200:
self.addResourceCleanUp(self.client.DeleteDhcpOptions,
DhcpOptionsId=data['DhcpOptions']['DhcpOptionsId'])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
def test_describe_dhcp_options(self):
kwargs = {
'DhcpConfigurations': [
{'Key': 'domain-name',
'Values': ['my.com']},
],
}
resp, data = self.client.CreateDhcpOptions(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
options = data['DhcpOptions']
id = options['DhcpOptionsId']
res_clean = self.addResourceCleanUp(self.client.DeleteDhcpOptions,
DhcpOptionsId=id)
time.sleep(10)
kwargs = {
'DhcpOptionsIds': [id],
}
resp, data = self.client.DescribeDhcpOptions(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['DhcpOptions']))
options = data['DhcpOptions'][0]
self.assertEqual(id, options['DhcpOptionsId'])
self.assertEqual(1, len(options['DhcpConfigurations']))
cfg = options['DhcpConfigurations'][0]
self.assertEqual(2, len(cfg))
self.assertEqual('domain-name', cfg['Key'])
self.assertEqual(1, len(cfg['Values']))
self.assertIn('my.com', cfg['Values'][0]['Value'])
resp, data = self.client.DeleteDhcpOptions(DhcpOptionsId=id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
def test_associate_dhcp_options(self):
kwargs = {
'DhcpConfigurations': [
{'Key': 'domain-name',
'Values': ['my.com']},
],
}
resp, data = self.client.CreateDhcpOptions(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
options = data['DhcpOptions']
id = options['DhcpOptionsId']
res_clean = self.addResourceCleanUp(self.client.DeleteDhcpOptions,
DhcpOptionsId=id)
cidr = '10.0.0.0/24'
resp, data = self.client.CreateVpc(CidrBlock=cidr)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
vpc_id = data['Vpc']['VpcId']
dv_clean = self.addResourceCleanUp(self.client.DeleteVpc, VpcId=vpc_id)
kwargs = {
'DhcpOptionsId': id,
'VpcId': vpc_id,
}
resp, data = self.client.AssociateDhcpOptions(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.DeleteDhcpOptions(DhcpOptionsId=id)
self.assertEqual(400, resp.status_code)
self.assertEqual('DependencyViolation', data['Error']['Code'])
resp, data = self.client.DeleteVpc(VpcId=vpc_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(dv_clean)
self.get_vpc_waiter().wait_delete(vpc_id)
resp, data = self.client.DeleteDhcpOptions(DhcpOptionsId=id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)

View File

@ -0,0 +1,257 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from tempest_lib.openstack.common import log
import testtools
from ec2api.tests.functional import base
from ec2api.tests.functional import config
CONF = config.CONF
LOG = log.getLogger(__name__)
class InternetGatewayTest(base.EC2TestCase):
VPC_CIDR = '10.4.0.0/20'
VPC_CIDR_ALT = '10.5.0.0/20'
vpc_id = None
vpc_id_alt = None
@classmethod
@base.safe_setup
def setUpClass(cls):
super(InternetGatewayTest, cls).setUpClass()
resp, data = cls.client.CreateVpc(CidrBlock=cls.VPC_CIDR)
if resp.status_code != 200:
LOG.error(base.EC2ErrorConverter(data))
assert 200 == resp.status_code
cls.vpc_id = data['Vpc']['VpcId']
cls.get_vpc_waiter().wait_available(cls.vpc_id)
cls.addResourceCleanUpStatic(cls.client.DeleteVpc, VpcId=cls.vpc_id)
resp, data = cls.client.CreateVpc(CidrBlock=cls.VPC_CIDR_ALT)
if resp.status_code != 200:
LOG.error(base.EC2ErrorConverter(data))
assert 200 == resp.status_code
cls.vpc_id_alt = data['Vpc']['VpcId']
cls.get_vpc_waiter().wait_available(cls.vpc_id_alt)
cls.addResourceCleanUpStatic(cls.client.DeleteVpc,
VpcId=cls.vpc_id_alt)
def test_create_attach_internet_gateway(self):
resp, data = self.client.CreateInternetGateway()
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
gw_id = data['InternetGateway']['InternetGatewayId']
res_clean = self.addResourceCleanUp(self.client.DeleteInternetGateway,
InternetGatewayId=gw_id)
self.assertEmpty(data['InternetGateway'].get('Attachments', []))
resp, data = self.client.AttachInternetGateway(VpcId=self.vpc_id,
InternetGatewayId=gw_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.DetachInternetGateway(VpcId=self.vpc_id,
InternetGatewayId=gw_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.DeleteInternetGateway(InternetGatewayId=gw_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
resp, data = self.client.DescribeInternetGateways(
InternetGatewayIds=[gw_id])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidInternetGatewayID.NotFound',
data['Error']['Code'])
def test_delete_attached_internet_gateway(self):
resp, data = self.client.CreateInternetGateway()
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
gw_id = data['InternetGateway']['InternetGatewayId']
res_clean = self.addResourceCleanUp(self.client.DeleteInternetGateway,
InternetGatewayId=gw_id)
self.assertEmpty(data['InternetGateway'].get('Attachments', []))
resp, data = self.client.AttachInternetGateway(VpcId=self.vpc_id,
InternetGatewayId=gw_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.DeleteInternetGateway(InternetGatewayId=gw_id)
self.assertEqual(400, resp.status_code)
self.assertEqual('DependencyViolation', data['Error']['Code'])
resp, data = self.client.DetachInternetGateway(VpcId=self.vpc_id,
InternetGatewayId=gw_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.DeleteInternetGateway(InternetGatewayId=gw_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
@testtools.skipUnless(CONF.aws.run_incompatible_tests,
"Another error code returned - InvalidParameterValue")
def test_attach_detach_invalid_internet_gateway(self):
gw_id = "gw-1"
resp, data = self.client.AttachInternetGateway(VpcId=self.vpc_id,
InternetGatewayId=gw_id)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidInternetGatewayID.NotFound',
data['Error']['Code'])
resp, data = self.client.DetachInternetGateway(VpcId=self.vpc_id,
InternetGatewayId=gw_id)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidInternetGatewayID.NotFound',
data['Error']['Code'])
def test_double_attach_internet_gateway(self):
resp, data = self.client.CreateInternetGateway()
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
gw_id = data['InternetGateway']['InternetGatewayId']
res_clean = self.addResourceCleanUp(self.client.DeleteInternetGateway,
InternetGatewayId=gw_id)
self.assertEmpty(data['InternetGateway'].get('Attachments', []))
resp, data = self.client.AttachInternetGateway(VpcId=self.vpc_id,
InternetGatewayId=gw_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.AttachInternetGateway(VpcId=self.vpc_id,
InternetGatewayId=gw_id)
self.assertEqual(400, resp.status_code)
self.assertEqual('Resource.AlreadyAssociated', data['Error']['Code'])
resp, data = self.client.DetachInternetGateway(VpcId=self.vpc_id,
InternetGatewayId=gw_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.DeleteInternetGateway(InternetGatewayId=gw_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
def test_attach_one_internet_gateway_to_two_vpcs(self):
resp, data = self.client.CreateInternetGateway()
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
gw_id = data['InternetGateway']['InternetGatewayId']
res_clean = self.addResourceCleanUp(self.client.DeleteInternetGateway,
InternetGatewayId=gw_id)
self.assertEmpty(data['InternetGateway'].get('Attachments', []))
resp, data = self.client.AttachInternetGateway(VpcId=self.vpc_id,
InternetGatewayId=gw_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.AttachInternetGateway(VpcId=self.vpc_id_alt,
InternetGatewayId=gw_id)
self.assertEqual(400, resp.status_code)
self.assertEqual('Resource.AlreadyAssociated', data['Error']['Code'])
resp, data = self.client.DetachInternetGateway(VpcId=self.vpc_id,
InternetGatewayId=gw_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.DeleteInternetGateway(InternetGatewayId=gw_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
def test_describe_internet_gateways_base(self):
resp, data = self.client.CreateInternetGateway()
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
gw_id = data['InternetGateway']['InternetGatewayId']
res_clean = self.addResourceCleanUp(self.client.DeleteInternetGateway,
InternetGatewayId=gw_id)
self.assertEmpty(data['InternetGateway'].get('Attachments', []))
resp, data = self.client.AttachInternetGateway(VpcId=self.vpc_id,
InternetGatewayId=gw_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.addResourceCleanUp(self.client.DetachInternetGateway,
VpcId=self.vpc_id,
InternetGatewayId=gw_id)
time.sleep(2)
# NOTE(andrey-mp): by real id
resp, data = self.client.DescribeInternetGateways(
InternetGatewayIds=[gw_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['InternetGateways']))
# NOTE(andrey-mp): by fake id
resp, data = self.client.DescribeInternetGateways(
InternetGatewayIds=['igw-0'])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidInternetGatewayID.NotFound',
data['Error']['Code'])
resp, data = self.client.DetachInternetGateway(VpcId=self.vpc_id,
InternetGatewayId=gw_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.DeleteInternetGateway(InternetGatewayId=gw_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
def test_describe_internet_gateways_filters(self):
# NOTE(andrey-mp): by filter real vpc-id before creation
resp, data = self.client.DescribeInternetGateways(
Filters=[{'Name': 'attachment.vpc-id', 'Values': [self.vpc_id]}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(0, len(data['InternetGateways']))
resp, data = self.client.CreateInternetGateway()
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
gw_id = data['InternetGateway']['InternetGatewayId']
res_clean = self.addResourceCleanUp(self.client.DeleteInternetGateway,
InternetGatewayId=gw_id)
self.assertEmpty(data['InternetGateway'].get('Attachments', []))
resp, data = self.client.AttachInternetGateway(VpcId=self.vpc_id,
InternetGatewayId=gw_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.addResourceCleanUp(self.client.DetachInternetGateway,
VpcId=self.vpc_id,
InternetGatewayId=gw_id)
time.sleep(2)
# NOTE(andrey-mp): by filter real vpc-id
resp, data = self.client.DescribeInternetGateways(
Filters=[{'Name': 'attachment.vpc-id', 'Values': [self.vpc_id]}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['InternetGateways']))
self.assertEqual(gw_id,
data['InternetGateways'][0]['InternetGatewayId'])
# NOTE(andrey-mp): by filter fake vpc-id
resp, data = self.client.DescribeInternetGateways(
Filters=[{'Name': 'attachment.vpc-id', 'Values': ['vpc-0']}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(0, len(data['InternetGateways']))
# NOTE(andrey-mp): by fake filter
resp, data = self.client.DescribeInternetGateways(
Filters=[{'Name': 'fake', 'Values': ['fake']}])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
resp, data = self.client.DetachInternetGateway(VpcId=self.vpc_id,
InternetGatewayId=gw_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.DeleteInternetGateway(InternetGatewayId=gw_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)

View File

@ -0,0 +1,459 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from tempest_lib.openstack.common import log
from ec2api.tests.functional import base
from ec2api.tests.functional import config
CONF = config.CONF
LOG = log.getLogger(__name__)
class NetworkInterfaceTest(base.EC2TestCase):
VPC_CIDR = '10.7.0.0/20'
vpc_id = None
SUBNET_CIDR = '10.7.0.0/28'
subnet_id = None
@classmethod
@base.safe_setup
def setUpClass(cls):
super(NetworkInterfaceTest, cls).setUpClass()
resp, data = cls.client.CreateVpc(CidrBlock=cls.VPC_CIDR)
if resp.status_code != 200:
LOG.error(base.EC2ErrorConverter(data))
assert 200 == resp.status_code
cls.vpc_id = data['Vpc']['VpcId']
cls.addResourceCleanUpStatic(cls.client.DeleteVpc, VpcId=cls.vpc_id)
cls.get_vpc_waiter().wait_available(cls.vpc_id)
aws_zone = CONF.aws.aws_zone
resp, data = cls.client.CreateSubnet(VpcId=cls.vpc_id,
CidrBlock=cls.SUBNET_CIDR,
AvailabilityZone=aws_zone)
if resp.status_code != 200:
LOG.error(base.EC2ErrorConverter(data))
assert 200 == resp.status_code
cls.subnet_id = data['Subnet']['SubnetId']
cls.addResourceCleanUpStatic(cls.client.DeleteSubnet,
SubnetId=cls.subnet_id)
cls.get_subnet_waiter().wait_available(cls.subnet_id)
def _wait_assignment(self, ni_id, resp, data):
# NOTE(andrey-mp): Amazon don't do it quickly and there is no way
# to wait this request
time.sleep(5)
def test_delete_subnet_with_network_interface(self):
resp, data = self.client.CreateSubnet(VpcId=self.vpc_id,
CidrBlock='10.7.1.0/28')
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
subnet_id = data['Subnet']['SubnetId']
res_clean_subnet = self.addResourceCleanUp(self.client.DeleteSubnet,
SubnetId=subnet_id)
self.get_subnet_waiter().wait_available(subnet_id)
kwargs = {
'SubnetId': subnet_id,
'Description': base.rand_name('ni')
}
resp, data = self.client.CreateNetworkInterface(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
ni_id = data['NetworkInterface']['NetworkInterfaceId']
res_clean_ni = self.addResourceCleanUp(
self.client.DeleteNetworkInterface, NetworkInterfaceId=ni_id)
self.get_network_interface_waiter().wait_available(ni_id)
resp, data = self.client.DeleteSubnet(SubnetId=subnet_id)
self.assertEqual(400, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual('DependencyViolation', data['Error']['Code'])
resp, data = self.client.DeleteNetworkInterface(
NetworkInterfaceId=ni_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean_ni)
self.get_network_interface_waiter().wait_delete(ni_id)
resp, data = self.client.DeleteSubnet(SubnetId=subnet_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean_subnet)
self.get_subnet_waiter().wait_delete(subnet_id)
def test_create_network_interface(self):
kwargs = {
'SubnetId': self.subnet_id,
'Description': base.rand_name('ni')
}
resp, data = self.client.CreateNetworkInterface(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
ni_id = data['NetworkInterface']['NetworkInterfaceId']
res_clean = self.addResourceCleanUp(self.client.DeleteNetworkInterface,
NetworkInterfaceId=ni_id)
ni = data['NetworkInterface']
self.assertEqual(self.vpc_id, ni['VpcId'])
self.assertEqual(self.subnet_id, ni['SubnetId'])
self.assertEqual(kwargs['Description'], ni['Description'])
self.assertNotEmpty(ni.get('Groups'))
self.assertEqual('default', ni['Groups'][0]['GroupName'])
address = ni.get('PrivateIpAddress')
self.assertIsNotNone(address)
addresses = ni.get('PrivateIpAddresses')
self.assertIsNotNone(addresses)
self.assertEqual(1, len(addresses))
self.assertTrue(addresses[0]['Primary'])
self.assertEqual(address, addresses[0]['PrivateIpAddress'])
self.assertIsNotNone(ni.get('MacAddress'))
self.assertIsNotNone(ni.get('OwnerId'))
self.assertIsNotNone(ni.get('RequesterManaged'))
self.assertIsNotNone(ni.get('SourceDestCheck'))
self.get_network_interface_waiter().wait_available(ni_id)
resp, data = self.client.DeleteNetworkInterface(
NetworkInterfaceId=ni_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_network_interface_waiter().wait_delete(ni_id)
resp, data = self.client.DescribeNetworkInterfaces(
NetworkInterfaceIds=[ni_id])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidNetworkInterfaceID.NotFound',
data['Error']['Code'])
# TODO(andrey-mp): add creation with addresses
def test_create_max_network_interface(self):
# NOTE(andrey-mp): wait some time while all ports will be deleted
# for this subnet(that are deleting after previous test)
time.sleep(5)
resp, data = self.client.DescribeSubnets(SubnetIds=[self.subnet_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
count_before = data['Subnets'][0]['AvailableIpAddressCount']
kwargs = {
'SubnetId': self.subnet_id,
}
addresses = []
while True:
resp, data = self.client.CreateNetworkInterface(*[], **kwargs)
if resp.status_code != 200:
break
ni_id = data['NetworkInterface']['NetworkInterfaceId']
res_clean = self.addResourceCleanUp(
self.client.DeleteNetworkInterface,
NetworkInterfaceId=ni_id)
addresses.append((ni_id, res_clean))
self.assertEqual(400, resp.status_code)
self.assertEqual('NetworkInterfaceLimitExceeded',
data['Error']['Code'])
resp, data = self.client.DescribeSubnets(SubnetIds=[self.subnet_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
count_after = data['Subnets'][0]['AvailableIpAddressCount']
# NOTE(andrey-mp): This is strange but Amazon can't create last NI
# and Openstack can
self.assertIn(count_after, [0, 1])
self.assertEqual(len(addresses), count_before - count_after)
for addr in addresses:
kwargs = {
'NetworkInterfaceId': addr[0],
}
resp, data = self.client.DeleteNetworkInterface(*[], **kwargs)
self.assertEqual(200, resp.status_code,
base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(addr[1])
self.get_network_interface_waiter().wait_delete(addr[0])
def test_unassign_primary_addresses(self):
kwargs = {
'SubnetId': self.subnet_id,
}
resp, data = self.client.CreateNetworkInterface(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
ni_id = data['NetworkInterface']['NetworkInterfaceId']
res_clean = self.addResourceCleanUp(self.client.DeleteNetworkInterface,
NetworkInterfaceId=ni_id)
primary_address = data['NetworkInterface'].get('PrivateIpAddress')
self.get_network_interface_waiter().wait_available(ni_id)
resp, data = self.client.UnassignPrivateIpAddresses(
NetworkInterfaceId=ni_id,
PrivateIpAddresses=[primary_address])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
resp, data = self.client.DeleteNetworkInterface(
NetworkInterfaceId=ni_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_network_interface_waiter().wait_delete(ni_id)
def test_assign_unassign_private_addresses_by_count(self):
resp, data = self.client.DescribeSubnets(SubnetIds=[self.subnet_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
count = data['Subnets'][0]['AvailableIpAddressCount']
kwargs = {
'SubnetId': self.subnet_id,
}
resp, data = self.client.CreateNetworkInterface(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
ni_id = data['NetworkInterface']['NetworkInterfaceId']
res_clean = self.addResourceCleanUp(self.client.DeleteNetworkInterface,
NetworkInterfaceId=ni_id)
self.get_network_interface_waiter().wait_available(ni_id)
resp, data = self.client.AssignPrivateIpAddresses(
NetworkInterfaceId=ni_id,
SecondaryPrivateIpAddressCount=2)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self._wait_assignment(ni_id, resp, data)
resp, data = self.client.DescribeSubnets(SubnetIds=[self.subnet_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
count_after = data['Subnets'][0]['AvailableIpAddressCount']
self.assertEqual(count - 3, count_after)
resp, data = self.client.DescribeNetworkInterfaces(
NetworkInterfaceIds=[ni_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
addresses = []
for addr in data['NetworkInterfaces'][0]['PrivateIpAddresses']:
if not addr['Primary']:
addresses.append(addr['PrivateIpAddress'])
self.assertEqual(2, len(addresses))
resp, data = self.client.UnassignPrivateIpAddresses(
NetworkInterfaceId=ni_id,
PrivateIpAddresses=addresses)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self._wait_assignment(ni_id, resp, data)
resp, data = self.client.DescribeSubnets(SubnetIds=[self.subnet_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
count_after = data['Subnets'][0]['AvailableIpAddressCount']
self.assertEqual(count - 1, count_after)
resp, data = self.client.DeleteNetworkInterface(
NetworkInterfaceId=ni_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_network_interface_waiter().wait_delete(ni_id)
def test_assign_unassign_private_addresses_by_addresses(self):
resp, data = self.client.DescribeSubnets(SubnetIds=[self.subnet_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
count = data['Subnets'][0]['AvailableIpAddressCount']
kwargs = {
'SubnetId': self.subnet_id,
}
resp, data = self.client.CreateNetworkInterface(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
ni_id = data['NetworkInterface']['NetworkInterfaceId']
res_clean = self.addResourceCleanUp(self.client.DeleteNetworkInterface,
NetworkInterfaceId=ni_id)
self.get_network_interface_waiter().wait_available(ni_id)
addresses = ['10.7.0.10', '10.7.0.11']
resp, data = self.client.AssignPrivateIpAddresses(
NetworkInterfaceId=ni_id,
PrivateIpAddresses=addresses)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self._wait_assignment(ni_id, resp, data)
resp, data = self.client.DescribeSubnets(SubnetIds=[self.subnet_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
count_after = data['Subnets'][0]['AvailableIpAddressCount']
# NOTE(Alex): Amazon misses 1 IP address by some reason.
self.assertIn(count_after, [count - 3, count - 4])
resp, data = self.client.DescribeNetworkInterfaces(
NetworkInterfaceIds=[ni_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
assigned_addresses = []
for addr in data['NetworkInterfaces'][0]['PrivateIpAddresses']:
if not addr['Primary']:
self.assertIn(addr['PrivateIpAddress'], addresses)
assigned_addresses.append(addr['PrivateIpAddress'])
self.assertEqual(2, len(assigned_addresses))
resp, data = self.client.UnassignPrivateIpAddresses(
NetworkInterfaceId=ni_id,
PrivateIpAddresses=addresses)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self._wait_assignment(ni_id, resp, data)
resp, data = self.client.DescribeSubnets(SubnetIds=[self.subnet_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
count_after = data['Subnets'][0]['AvailableIpAddressCount']
self.assertIn(count_after, [count - 1, count - 2])
resp, data = self.client.DeleteNetworkInterface(
NetworkInterfaceId=ni_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_network_interface_waiter().wait_delete(ni_id)
def test_network_interface_attribute(self):
desc = base.rand_name('ni')
kwargs = {
'SubnetId': self.subnet_id,
'Description': desc
}
resp, data = self.client.CreateNetworkInterface(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
ni_id = data['NetworkInterface']['NetworkInterfaceId']
res_clean = self.addResourceCleanUp(self.client.DeleteNetworkInterface,
NetworkInterfaceId=ni_id)
self.get_network_interface_waiter().wait_available(ni_id)
resp, data = self.client.DescribeNetworkInterfaceAttribute(
NetworkInterfaceId=ni_id,
attribute='description')
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(desc, data['Description']['Value'])
new_desc = base.rand_name('new-ni')
kwargs = {
'NetworkInterfaceId': ni_id,
'Description': {'Value': new_desc}
}
resp, data = self.client.ModifyNetworkInterfaceAttribute(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.DescribeNetworkInterfaceAttribute(
NetworkInterfaceId=ni_id,
attribute='description')
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(new_desc, data['Description']['Value'])
kwargs = {
'NetworkInterfaceId': ni_id,
'SourceDestCheck': {'Value': False}
}
resp, data = self.client.ModifyNetworkInterfaceAttribute(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.DescribeNetworkInterfaceAttribute(
NetworkInterfaceId=ni_id,
attribute='sourceDestCheck')
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(False, data['SourceDestCheck']['Value'])
# NOTE(andrey-mp): ResetNetworkInterfaceAttribute has inadequate json
# scheme in botocore.
kwargs = {
'NetworkInterfaceId': ni_id,
'SourceDestCheck': {'Value': True}
}
resp, data = self.client.ModifyNetworkInterfaceAttribute(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.DescribeNetworkInterfaceAttribute(
NetworkInterfaceId=ni_id,
attribute='sourceDestCheck')
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(True, data['SourceDestCheck']['Value'])
resp, data = self.client.DeleteNetworkInterface(
NetworkInterfaceId=ni_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_network_interface_waiter().wait_delete(ni_id)
def test_attach_network_interface(self):
instance_type = CONF.aws.instance_type
image_id = CONF.aws.image_id
if not image_id:
raise self.skipException('aws image_id does not provided')
kwargs = {
'SubnetId': self.subnet_id,
}
resp, data = self.client.CreateNetworkInterface(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
ni_id = data['NetworkInterface']['NetworkInterfaceId']
self.addResourceCleanUp(self.client.DeleteNetworkInterface,
NetworkInterfaceId=ni_id)
ni = data['NetworkInterface']
address = ni.get('PrivateIpAddress')
self.assertIsNotNone(address)
self.get_network_interface_waiter().wait_available(ni_id)
resp, data = self.client.RunInstances(
ImageId=image_id, InstanceType=instance_type, MinCount=1,
MaxCount=1, SubnetId=self.subnet_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
instance_id = data['Instances'][0]['InstanceId']
self.addResourceCleanUp(self.client.TerminateInstances,
InstanceIds=[instance_id])
self.get_instance_waiter().wait_available(instance_id,
final_set=('running'))
if CONF.aws.run_incompatible_tests:
# NOTE(andrey-mp): Amazon can't attach to device index = 0
kwargs = {
'DeviceIndex': 0,
'InstanceId': instance_id,
'NetworkInterfaceId': ni_id
}
resp, data = self.client.AttachNetworkInterface(*[], **kwargs)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
kwargs = {
'DeviceIndex': 2,
'InstanceId': instance_id,
'NetworkInterfaceId': ni_id
}
resp, data = self.client.AttachNetworkInterface(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
attachment_id = data['AttachmentId']
resp, data = self.client.DescribeInstances(InstanceIds=[instance_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
reservations = data.get('Reservations', [])
self.assertNotEmpty(reservations)
instances = reservations[0].get('Instances', [])
self.assertNotEmpty(instances)
instance = instances[0]
nis = instance.get('NetworkInterfaces', [])
self.assertEqual(2, len(nis))
ids = [nis[0]['Attachment']['AttachmentId'],
nis[1]['Attachment']['AttachmentId']]
self.assertIn(attachment_id, ids)
resp, data = self.client.DeleteNetworkInterface(
NetworkInterfaceId=ni_id)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
kwargs = {
'AttachmentId': attachment_id,
}
resp, data = self.client.DetachNetworkInterface(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))

View File

@ -0,0 +1,368 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest_lib.openstack.common import log
from ec2api.tests.functional import base
LOG = log.getLogger(__name__)
class RouteTest(base.EC2TestCase):
VPC_CIDR = '10.14.0.0/20'
SUBNET_CIDR = '10.14.0.0/24'
vpc_id = None
@classmethod
@base.safe_setup
def setUpClass(cls):
super(RouteTest, cls).setUpClass()
resp, data = cls.client.CreateVpc(CidrBlock=cls.VPC_CIDR)
if resp.status_code != 200:
LOG.error(base.EC2ErrorConverter(data))
assert 200 == resp.status_code
cls.vpc_id = data['Vpc']['VpcId']
cls.addResourceCleanUpStatic(cls.client.DeleteVpc, VpcId=cls.vpc_id)
cls.get_vpc_waiter().wait_available(cls.vpc_id)
def test_create_delete_route_table(self):
resp, data = self.client.CreateRouteTable(VpcId=self.vpc_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
rt_id = data['RouteTable']['RouteTableId']
res_clean = self.addResourceCleanUp(self.client.DeleteRouteTable,
RouteTableId=rt_id)
rt = data['RouteTable']
self.assertEqual(self.vpc_id, rt['VpcId'])
self.assertEqual(1, len(rt['Routes']))
route = rt['Routes'][0]
self.assertEqual(self.VPC_CIDR, route['DestinationCidrBlock'])
self.assertEqual('active', route['State'])
resp, data = self.client.DeleteRouteTable(RouteTableId=rt_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
resp, data = self.client.DescribeRouteTables(RouteTableIds=[rt_id])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidRouteTableID.NotFound', data['Error']['Code'])
resp, data = self.client.DeleteRouteTable(RouteTableId=rt_id)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidRouteTableID.NotFound', data['Error']['Code'])
def test_describe_route_tables_base(self):
resp, data = self.client.CreateRouteTable(VpcId=self.vpc_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
rt_id = data['RouteTable']['RouteTableId']
res_clean = self.addResourceCleanUp(self.client.DeleteRouteTable,
RouteTableId=rt_id)
# NOTE(andrey-mp): by real id
resp, data = self.client.DescribeRouteTables(RouteTableIds=[rt_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['RouteTables']))
# NOTE(andrey-mp): by fake id
resp, data = self.client.DescribeRouteTables(RouteTableIds=['rtb-0'])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidRouteTableID.NotFound', data['Error']['Code'])
resp, data = self.client.DeleteRouteTable(RouteTableId=rt_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
def test_describe_route_tables_filters(self):
resp, data = self.client.CreateRouteTable(VpcId=self.vpc_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
rt_id = data['RouteTable']['RouteTableId']
self.addResourceCleanUp(self.client.DeleteRouteTable,
RouteTableId=rt_id)
resp, data = self.client.CreateSubnet(VpcId=self.vpc_id,
CidrBlock=self.SUBNET_CIDR)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
subnet_id = data['Subnet']['SubnetId']
self.addResourceCleanUp(self.client.DeleteSubnet,
SubnetId=subnet_id)
self.get_subnet_waiter().wait_available(subnet_id)
resp, data = self.client.AssociateRouteTable(RouteTableId=rt_id,
SubnetId=subnet_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
assoc_id = data['AssociationId']
self.addResourceCleanUp(self.client.DisassociateRouteTable,
AssociationId=assoc_id)
# NOTE(andrey-mp): by association_id
resp, data = self.client.DescribeRouteTables(
Filters=[{'Name': 'association.route-table-association-id',
'Values': [assoc_id]}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['RouteTables']))
# NOTE(andrey-mp): by route table id
resp, data = self.client.DescribeRouteTables(
Filters=[{'Name': 'association.route-table-id',
'Values': [rt_id]}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['RouteTables']))
# NOTE(andrey-mp): by subnet id
resp, data = self.client.DescribeRouteTables(
Filters=[{'Name': 'association.subnet-id',
'Values': [subnet_id]}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['RouteTables']))
# NOTE(andrey-mp): by filter real vpc
resp, data = self.client.DescribeRouteTables(
Filters=[{'Name': 'vpc-id', 'Values': [self.vpc_id]}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertLess(0, len(data['RouteTables']))
# NOTE(andrey-mp): by filter fake vpc
resp, data = self.client.DescribeRouteTables(
Filters=[{'Name': 'vpc-id', 'Values': ['vpc-0']}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(0, len(data['RouteTables']))
# NOTE(andrey-mp): by fake filter
resp, data = self.client.DescribeRouteTables(
Filters=[{'Name': 'fake', 'Values': ['fake']}])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
def test_associate_disassociate_route_table(self):
resp, data = self.client.CreateRouteTable(VpcId=self.vpc_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
rt_id = data['RouteTable']['RouteTableId']
res_clean_rt = self.addResourceCleanUp(self.client.DeleteRouteTable,
RouteTableId=rt_id)
resp, data = self.client.CreateSubnet(VpcId=self.vpc_id,
CidrBlock=self.SUBNET_CIDR)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
subnet_id = data['Subnet']['SubnetId']
res_clean_subnet = self.addResourceCleanUp(self.client.DeleteSubnet,
SubnetId=subnet_id)
self.get_subnet_waiter().wait_available(subnet_id)
resp, data = self.client.AssociateRouteTable(RouteTableId=rt_id,
SubnetId=subnet_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
assoc_id = data['AssociationId']
res_clean = self.addResourceCleanUp(self.client.DisassociateRouteTable,
AssociationId=assoc_id)
resp, data = self.client.DisassociateRouteTable(AssociationId=assoc_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
resp, data = self.client.DeleteSubnet(SubnetId=subnet_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean_subnet)
self.get_subnet_waiter().wait_delete(subnet_id)
resp, data = self.client.DeleteRouteTable(RouteTableId=rt_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean_rt)
def test_replace_route_table(self):
resp, data = self.client.CreateSubnet(VpcId=self.vpc_id,
CidrBlock=self.SUBNET_CIDR)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
subnet_id = data['Subnet']['SubnetId']
res_clean_subnet = self.addResourceCleanUp(self.client.DeleteSubnet,
SubnetId=subnet_id)
self.get_subnet_waiter().wait_available(subnet_id)
# NOTE(andrey-mp): by vpc id
resp, data = self.client.DescribeRouteTables(
Filters=[{'Name': 'vpc-id', 'Values': [self.vpc_id]}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['RouteTables']))
self.assertEqual(1, len(data['RouteTables'][0]['Associations']))
default_rt_id = data['RouteTables'][0]['RouteTableId']
main_assoc = data['RouteTables'][0]['Associations'][0]
self.assertTrue(main_assoc['Main'])
main_assoc_id = main_assoc['RouteTableAssociationId']
resp, data = self.client.CreateRouteTable(VpcId=self.vpc_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
rt_id = data['RouteTable']['RouteTableId']
res_clean_rt = self.addResourceCleanUp(self.client.DeleteRouteTable,
RouteTableId=rt_id)
resp, data = self.client.ReplaceRouteTableAssociation(
RouteTableId=rt_id, AssociationId=main_assoc_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
assoc_id = data['NewAssociationId']
res_clean = self.addResourceCleanUp(
self.client.ReplaceRouteTableAssociation,
RouteTableId=default_rt_id,
AssociationId=assoc_id)
# NOTE(andrey-mp): by vpc id
resp, data = self.client.DescribeRouteTables(
Filters=[{'Name': 'vpc-id', 'Values': [self.vpc_id]}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(2, len(data['RouteTables']))
for rt in data['RouteTables']:
if rt['RouteTableId'] == rt_id:
self.assertEqual(1, len(rt['Associations']))
self.assertTrue(rt['Associations'][0]['Main'])
else:
self.assertEmpty(rt.get('Associations', []))
resp, data = self.client.DeleteRouteTable(RouteTableId=rt_id)
self.assertEqual(400, resp.status_code)
self.assertEqual('DependencyViolation', data['Error']['Code'])
resp, data = self.client.DisassociateRouteTable(AssociationId=assoc_id)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
resp, data = self.client.ReplaceRouteTableAssociation(
RouteTableId=default_rt_id,
AssociationId=assoc_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
resp, data = self.client.DeleteRouteTable(RouteTableId=rt_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean_rt)
resp, data = self.client.DeleteSubnet(SubnetId=subnet_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean_subnet)
self.get_subnet_waiter().wait_delete(subnet_id)
def test_create_delete_route(self):
resp, data = self.client.CreateSubnet(VpcId=self.vpc_id,
CidrBlock=self.SUBNET_CIDR)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
subnet_id = data['Subnet']['SubnetId']
res_clean_subnet = self.addResourceCleanUp(self.client.DeleteSubnet,
SubnetId=subnet_id)
self.get_subnet_waiter().wait_available(subnet_id)
kwargs = {
'SubnetId': subnet_id,
}
resp, data = self.client.CreateNetworkInterface(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
ni_id = data['NetworkInterface']['NetworkInterfaceId']
res_clean_ni = self.addResourceCleanUp(
self.client.DeleteNetworkInterface,
NetworkInterfaceId=ni_id)
resp, data = self.client.CreateRouteTable(VpcId=self.vpc_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
rt_id = data['RouteTable']['RouteTableId']
res_clean_rt = self.addResourceCleanUp(self.client.DeleteRouteTable,
RouteTableId=rt_id)
kwargs = {
'DestinationCidrBlock': self.VPC_CIDR,
'RouteTableId': rt_id,
'NetworkInterfaceId': ni_id
}
resp, data = self.client.CreateRoute(*[], **kwargs)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
# can create wider route
kwargs = {
'DestinationCidrBlock': '10.14.0.0/19',
'RouteTableId': rt_id,
'NetworkInterfaceId': ni_id
}
resp, data = self.client.CreateRoute(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
# can create to another vpc
kwargs = {
'DestinationCidrBlock': '10.15.0.0/20',
'RouteTableId': rt_id,
'NetworkInterfaceId': ni_id
}
resp, data = self.client.CreateRoute(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.DescribeRouteTables(RouteTableIds=[rt_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['RouteTables']))
self.assertEqual(3, len(data['RouteTables'][0]['Routes']))
kwargs = {
'DestinationCidrBlock': '10.15.0.0/24',
'RouteTableId': rt_id,
}
resp, data = self.client.DeleteRoute(*[], **kwargs)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidRoute.NotFound', data['Error']['Code'])
kwargs = {
'DestinationCidrBlock': self.VPC_CIDR,
'RouteTableId': rt_id,
}
resp, data = self.client.DeleteRoute(*[], **kwargs)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
kwargs = {
'DestinationCidrBlock': self.SUBNET_CIDR,
'RouteTableId': rt_id,
}
resp, data = self.client.DeleteRoute(*[], **kwargs)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidRoute.NotFound', data['Error']['Code'])
kwargs = {
'DestinationCidrBlock': '10.16.0.0/24',
'RouteTableId': rt_id,
}
resp, data = self.client.DeleteRoute(*[], **kwargs)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidRoute.NotFound', data['Error']['Code'])
kwargs = {
'DestinationCidrBlock': '10.15.0.0/20',
'RouteTableId': rt_id,
}
resp, data = self.client.DeleteRoute(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
kwargs = {
'DestinationCidrBlock': '10.14.0.0/19',
'RouteTableId': rt_id,
}
resp, data = self.client.DeleteRoute(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.DeleteRouteTable(RouteTableId=rt_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean_rt)
resp, data = self.client.DeleteNetworkInterface(
NetworkInterfaceId=ni_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean_ni)
self.get_network_interface_waiter().wait_delete(ni_id)
resp, data = self.client.DeleteSubnet(SubnetId=subnet_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean_subnet)
self.get_subnet_waiter().wait_delete(subnet_id)

View File

@ -0,0 +1,160 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from tempest_lib.openstack.common import log
import testtools
from ec2api.tests.functional import base
from ec2api.tests.functional import config
CONF = config.CONF
LOG = log.getLogger(__name__)
class SecurityGroupTest(base.EC2TestCase):
VPC_CIDR = '10.10.0.0/20'
vpc_id = None
@classmethod
@base.safe_setup
def setUpClass(cls):
super(SecurityGroupTest, cls).setUpClass()
resp, data = cls.client.CreateVpc(CidrBlock=cls.VPC_CIDR)
if resp.status_code != 200:
LOG.error(base.EC2ErrorConverter(data))
assert 200 == resp.status_code
cls.vpc_id = data['Vpc']['VpcId']
cls.addResourceCleanUpStatic(cls.client.DeleteVpc, VpcId=cls.vpc_id)
cls.get_vpc_waiter().wait_available(cls.vpc_id)
def test_create_delete_security_group(self):
name = base.rand_name('sgName')
desc = base.rand_name('sgDesc')
resp, data = self.client.CreateSecurityGroup(VpcId=self.vpc_id,
GroupName=name,
Description=desc)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
group_id = data['GroupId']
res_clean = self.addResourceCleanUp(self.client.DeleteSecurityGroup,
GroupId=group_id)
time.sleep(2)
resp, data = self.client.DeleteSecurityGroup(GroupId=group_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
resp, data = self.client.DescribeSecurityGroups(GroupIds=[group_id])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidGroup.NotFound', data['Error']['Code'])
resp, data = self.client.DeleteSecurityGroup(GroupId=group_id)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidGroup.NotFound', data['Error']['Code'])
@testtools.skipUnless(CONF.aws.run_incompatible_tests,
"MismatchError: 'InvalidParameterValue' != 'ValidationError'")
def test_create_invalid_name_desc(self):
valid = base.rand_name('sgName')
invalid = 'name%"'
resp, data = self.client.CreateSecurityGroup(VpcId=self.vpc_id,
GroupName=invalid,
Description=valid)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
resp, data = self.client.CreateSecurityGroup(VpcId=self.vpc_id,
GroupName=valid,
Description=invalid)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
resp, data = self.client.CreateSecurityGroup(VpcId=self.vpc_id,
GroupName=valid)
self.assertEqual(400, resp.status_code)
self.assertEqual('MissingParameter', data['Error']['Code'])
resp, data = self.client.CreateSecurityGroup(VpcId=self.vpc_id,
Description=valid)
self.assertEqual(400, resp.status_code)
self.assertEqual('MissingParameter', data['Error']['Code'])
def test_ingress_rules(self):
self._test_rules(self.client.AuthorizeSecurityGroupIngress,
self.client.RevokeSecurityGroupIngress,
'IpPermissions')
def test_egress_rules(self):
self._test_rules(self.client.AuthorizeSecurityGroupEgress,
self.client.RevokeSecurityGroupEgress,
'IpPermissionsEgress')
def _test_rules(self, add_func, del_func, field):
name = base.rand_name('sgName')
desc = base.rand_name('sgDesc')
resp, data = self.client.CreateSecurityGroup(VpcId=self.vpc_id,
GroupName=name,
Description=desc)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
group_id = data['GroupId']
res_clean = self.addResourceCleanUp(self.client.DeleteSecurityGroup,
GroupId=group_id)
time.sleep(2)
resp, data = self.client.DescribeSecurityGroups(GroupIds=[group_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
count = len(data['SecurityGroups'][0][field])
kwargs = {
'GroupId': group_id,
'IpPermissions': [{
'IpProtocol': 'icmp',
'FromPort': -1,
'ToPort': -1,
'IpRanges': [{
'CidrIp': '10.0.0.0/8'
}],
}]
}
resp, data = add_func(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.DescribeSecurityGroups(GroupIds=[group_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['SecurityGroups']))
self.assertEqual(count + 1, len(data['SecurityGroups'][0][field]))
found = False
for perm in data['SecurityGroups'][0][field]:
cidrs = [v['CidrIp'] for v in perm.get('IpRanges', [])]
if (perm.get('FromPort') == -1 and
perm.get('ToPort') == -1 and
perm.get('IpProtocol') == 'icmp' and
len(perm.get('IpRanges')) == 1 and
'10.0.0.0/8' in cidrs):
found = True
self.assertTrue(found)
resp, data = del_func(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = del_func(*[], **kwargs)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidPermission.NotFound', data['Error']['Code'])
resp, data = self.client.DeleteSecurityGroup(GroupId=group_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)

View File

@ -0,0 +1,211 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest_lib.openstack.common import log
import testtools
from ec2api.tests.functional import base
from ec2api.tests.functional import config
CONF = config.CONF
LOG = log.getLogger(__name__)
class SubnetTest(base.EC2TestCase):
VPC_CIDR = '10.2.0.0/20'
vpc_id = None
@classmethod
@base.safe_setup
def setUpClass(cls):
super(SubnetTest, cls).setUpClass()
resp, data = cls.client.CreateVpc(CidrBlock=cls.VPC_CIDR)
if resp.status_code != 200:
LOG.error(base.EC2ErrorConverter(data))
assert 200 == resp.status_code
cls.vpc_id = data['Vpc']['VpcId']
cls.addResourceCleanUpStatic(cls.client.DeleteVpc, VpcId=cls.vpc_id)
cls.get_vpc_waiter().wait_available(cls.vpc_id)
def test_create_delete_subnet(self):
cidr = '10.2.0.0/24'
resp, data = self.client.CreateSubnet(VpcId=self.vpc_id,
CidrBlock=cidr)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
subnet_id = data['Subnet']['SubnetId']
res_clean = self.addResourceCleanUp(self.client.DeleteSubnet,
SubnetId=subnet_id)
self.assertEqual(cidr, data['Subnet']['CidrBlock'])
self.assertIsNotNone(data['Subnet'].get('AvailableIpAddressCount'))
self.get_subnet_waiter().wait_available(subnet_id)
resp, data = self.client.DeleteSubnet(SubnetId=subnet_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_subnet_waiter().wait_delete(subnet_id)
resp, data = self.client.DescribeSubnets(SubnetIds=[subnet_id])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidSubnetID.NotFound', data['Error']['Code'])
resp, data = self.client.DeleteSubnet(SubnetId=subnet_id)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidSubnetID.NotFound', data['Error']['Code'])
def test_dependency_subnet_to_vpc(self):
resp, data = self.client.CreateVpc(CidrBlock=self.VPC_CIDR)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
vpc_id = data['Vpc']['VpcId']
vpc_clean = self.addResourceCleanUp(self.client.DeleteVpc,
VpcId=vpc_id)
self.get_vpc_waiter().wait_available(vpc_id)
cidr = '10.2.0.0/24'
resp, data = self.client.CreateSubnet(VpcId=vpc_id,
CidrBlock=cidr)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
subnet_id = data['Subnet']['SubnetId']
res_clean = self.addResourceCleanUp(self.client.DeleteSubnet,
SubnetId=subnet_id)
self.get_subnet_waiter().wait_available(subnet_id)
resp, data = self.client.DeleteVpc(VpcId=vpc_id)
self.assertEqual(400, resp.status_code)
self.assertEqual('DependencyViolation', data['Error']['Code'])
resp, data = self.client.DeleteSubnet(SubnetId=subnet_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_subnet_waiter().wait_delete(subnet_id)
self.client.DeleteVpc(VpcId=vpc_id)
self.cancelResourceCleanUp(vpc_clean)
@testtools.skipUnless(CONF.aws.run_incompatible_tests,
"bug with overlapped subnets")
def test_create_overlapped_subnet(self):
cidr = '10.2.0.0/24'
resp, data = self.client.CreateSubnet(VpcId=self.vpc_id,
CidrBlock=cidr)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
subnet_id = data['Subnet']['SubnetId']
res_clean = self.addResourceCleanUp(self.client.DeleteSubnet,
SubnetId=subnet_id)
self.get_subnet_waiter().wait_available(subnet_id)
cidr = '10.2.0.128/26'
resp, data = self.client.CreateSubnet(VpcId=self.vpc_id,
CidrBlock=cidr)
if resp.status_code == 200:
self.addResourceCleanUp(self.client.DeleteSubnet,
SubnetId=data['Subnet']['SubnetId'])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidSubnet.Conflict', data['Error']['Code'])
resp, data = self.client.DeleteSubnet(SubnetId=subnet_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_subnet_waiter().wait_delete(subnet_id)
def test_create_subnet_invalid_cidr(self):
# NOTE(andrey-mp): another cidr than VPC has
cidr = '10.1.0.0/24'
resp, data = self.client.CreateSubnet(VpcId=self.vpc_id,
CidrBlock=cidr)
if resp.status_code == 200:
self.addResourceCleanUp(self.client.DeleteSubnet,
SubnetId=data['Subnet']['SubnetId'])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidSubnet.Range', data['Error']['Code'])
# NOTE(andrey-mp): bigger cidr than VPC has
cidr = '10.2.0.0/19'
resp, data = self.client.CreateSubnet(VpcId=self.vpc_id,
CidrBlock=cidr)
if resp.status_code == 200:
self.addResourceCleanUp(self.client.DeleteSubnet,
SubnetId=data['Subnet']['SubnetId'])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidSubnet.Range', data['Error']['Code'])
# NOTE(andrey-mp): too small cidr
cidr = '10.2.0.0/29'
resp, data = self.client.CreateSubnet(VpcId=self.vpc_id,
CidrBlock=cidr)
if resp.status_code == 200:
self.addResourceCleanUp(self.client.DeleteSubnet,
SubnetId=data['Subnet']['SubnetId'])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidSubnet.Range', data['Error']['Code'])
def test_describe_subnets_base(self):
cidr = '10.2.0.0/24'
resp, data = self.client.CreateSubnet(VpcId=self.vpc_id,
CidrBlock=cidr)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
subnet_id = data['Subnet']['SubnetId']
res_clean = self.addResourceCleanUp(self.client.DeleteSubnet,
SubnetId=subnet_id)
self.get_subnet_waiter().wait_available(subnet_id)
# NOTE(andrey-mp): by real id
resp, data = self.client.DescribeSubnets(SubnetIds=[subnet_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['Subnets']))
# NOTE(andrey-mp): by fake id
resp, data = self.client.DescribeSubnets(SubnetIds=['subnet-0'])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidSubnetID.NotFound', data['Error']['Code'])
resp, data = self.client.DeleteSubnet(SubnetId=subnet_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_subnet_waiter().wait_delete(subnet_id)
def test_describe_subnets_filters(self):
cidr = '10.2.0.0/24'
resp, data = self.client.CreateSubnet(VpcId=self.vpc_id,
CidrBlock=cidr)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
subnet_id = data['Subnet']['SubnetId']
res_clean = self.addResourceCleanUp(self.client.DeleteSubnet,
SubnetId=subnet_id)
self.get_subnet_waiter().wait_available(subnet_id)
# NOTE(andrey-mp): by filter real cidr
resp, data = self.client.DescribeSubnets(
Filters=[{'Name': 'cidr', 'Values': [cidr]}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['Subnets']))
# NOTE(andrey-mp): by filter fake cidr
resp, data = self.client.DescribeSubnets(
Filters=[{'Name': 'cidr', 'Values': ['123.0.0.0/16']}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(0, len(data['Subnets']))
# NOTE(andrey-mp): by fake filter
resp, data = self.client.DescribeSubnets(
Filters=[{'Name': 'fake', 'Values': ['fake']}])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
resp, data = self.client.DeleteSubnet(SubnetId=subnet_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_subnet_waiter().wait_delete(subnet_id)

View File

@ -0,0 +1,241 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from ec2api.tests.functional import base
from ec2api.tests.functional import config
CONF = config.CONF
class VPCTest(base.EC2TestCase):
def test_create_delete_vpc(self):
cidr = '10.1.0.0/16'
resp, data = self.client.CreateVpc(CidrBlock=cidr)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
vpc_id = data['Vpc']['VpcId']
dv_clean = self.addResourceCleanUp(self.client.DeleteVpc, VpcId=vpc_id)
self.assertEqual(cidr, data['Vpc']['CidrBlock'])
if CONF.aws.run_incompatible_tests:
# NOTE(andrey-mp): not ready
self.assertEqual('default', data['Vpc']['InstanceTenancy'])
self.assertIsNotNone(data['Vpc'].get('DhcpOptionsId'))
self.get_vpc_waiter().wait_available(vpc_id)
resp, data = self.client.DeleteVpc(VpcId=vpc_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(dv_clean)
self.get_vpc_waiter().wait_delete(vpc_id)
resp, data = self.client.DescribeVpcs(VpcIds=[vpc_id])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidVpcID.NotFound', data['Error']['Code'])
resp, data = self.client.DeleteVpc(VpcId=vpc_id)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidVpcID.NotFound', data['Error']['Code'])
def test_create_more_than_one_vpc(self):
cidr = '10.0.0.0/16'
resp, data = self.client.CreateVpc(CidrBlock=cidr)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
vpc_id1 = data['Vpc']['VpcId']
rc1 = self.addResourceCleanUp(self.client.DeleteVpc, VpcId=vpc_id1)
self.get_vpc_waiter().wait_available(vpc_id1)
cidr = '10.1.0.0/16'
resp, data = self.client.CreateVpc(CidrBlock=cidr)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
vpc_id2 = data['Vpc']['VpcId']
rc2 = self.addResourceCleanUp(self.client.DeleteVpc, VpcId=vpc_id2)
self.get_vpc_waiter().wait_available(vpc_id2)
resp, data = self.client.DeleteVpc(VpcId=vpc_id1)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(rc1)
self.get_vpc_waiter().wait_delete(vpc_id1)
resp, data = self.client.DeleteVpc(VpcId=vpc_id2)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(rc2)
self.get_vpc_waiter().wait_delete(vpc_id2)
def test_describe_vpcs_base(self):
cidr = '10.1.0.0/16'
resp, data = self.client.CreateVpc(CidrBlock=cidr)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
vpc_id = data['Vpc']['VpcId']
dv_clean = self.addResourceCleanUp(self.client.DeleteVpc, VpcId=vpc_id)
self.get_vpc_waiter().wait_available(vpc_id)
# NOTE(andrey-mp): by real id
resp, data = self.client.DescribeVpcs(VpcIds=[vpc_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['Vpcs']))
# NOTE(andrey-mp): by fake id
resp, data = self.client.DescribeVpcs(VpcIds=['vpc-0'])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidVpcID.NotFound', data['Error']['Code'])
resp, data = self.client.DeleteVpc(VpcId=vpc_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(dv_clean)
self.get_vpc_waiter().wait_delete(vpc_id)
def test_describe_vpcs_filters(self):
cidr = '10.1.0.0/16'
resp, data = self.client.CreateVpc(CidrBlock=cidr)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
vpc_id = data['Vpc']['VpcId']
dv_clean = self.addResourceCleanUp(self.client.DeleteVpc, VpcId=vpc_id)
self.get_vpc_waiter().wait_available(vpc_id)
# NOTE(andrey-mp): by filter real cidr
resp, data = self.client.DescribeVpcs(
Filters=[{'Name': 'cidr', 'Values': [cidr]}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['Vpcs']))
# NOTE(andrey-mp): by filter fake cidr
resp, data = self.client.DescribeVpcs(
Filters=[{'Name': 'cidr', 'Values': ['123.0.0.0/16']}])
self.assertEqual(200, resp.status_code)
self.assertEqual(0, len(data['Vpcs']))
if CONF.aws.run_incompatible_tests:
# NOTE(andrey-mp): describe no attributes
resp, data = self.client.DescribeVpcAttribute(VpcId=vpc_id)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterCombination',
data['Error']['Code'])
# NOTE(andrey-mp): by fake filter
resp, data = self.client.DescribeVpcs(
Filters=[{'Name': 'fake', 'Values': ['fake']}])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue',
data['Error']['Code'])
resp, data = self.client.DeleteVpc(VpcId=vpc_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(dv_clean)
self.get_vpc_waiter().wait_delete(vpc_id)
@testtools.skipUnless(CONF.aws.run_incompatible_tests,
"Invalid request on checking vpc atributes.")
def test_vpc_attributes(self):
cidr = '10.1.0.0/16'
resp, data = self.client.CreateVpc(CidrBlock=cidr)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
vpc_id = data['Vpc']['VpcId']
dv_clean = self.addResourceCleanUp(self.client.DeleteVpc, VpcId=vpc_id)
self.get_vpc_waiter().wait_available(vpc_id)
self._check_attribute(vpc_id, 'EnableDnsHostnames')
self._check_attribute(vpc_id, 'EnableDnsSupport')
resp, data = self.client.DeleteVpc(VpcId=vpc_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(dv_clean)
self.get_vpc_waiter().wait_delete(vpc_id)
def _check_attribute(self, vpc_id, attribute):
req_attr = attribute[0].lower() + attribute[1:]
resp, data = self.client.DescribeVpcAttribute(VpcId=vpc_id,
Attribute=req_attr)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
attr = data[attribute].get('Value')
self.assertIsNotNone(attr)
kwargs = {'VpcId': vpc_id, attribute: {'Value': not attr}}
resp, data = self.client.ModifyVpcAttribute(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.DescribeVpcAttribute(VpcId=vpc_id,
Attribute=req_attr)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertNotEqual(attr, data[attribute].get('Value'))
@testtools.skipUnless(CONF.aws.run_incompatible_tests,
"InvalidParameterCombination' != 'InvalidRequest")
def test_describe_invalid_attributes(self):
cidr = '10.1.0.0/16'
resp, data = self.client.CreateVpc(CidrBlock=cidr)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
vpc_id = data['Vpc']['VpcId']
dv_clean = self.addResourceCleanUp(self.client.DeleteVpc, VpcId=vpc_id)
self.get_vpc_waiter().wait_available(vpc_id)
# NOTE(andrey-mp): describe no attributes
resp, data = self.client.DescribeVpcAttribute(VpcId=vpc_id)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterCombination',
data['Error']['Code'])
resp, data = self.client.DeleteVpc(VpcId=vpc_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(dv_clean)
self.get_vpc_waiter().wait_delete(vpc_id)
def test_create_with_invalid_cidr(self):
# NOTE(andrey-mp): The largest uses a /16 netmask
resp, data = self.client.CreateVpc(CidrBlock='10.0.0.0/15')
if resp.status_code == 200:
self.addResourceCleanUp(self.client.DeleteVpc,
VpcId=data['Vpc']['VpcId'])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidVpc.Range', data['Error']['Code'])
# NOTE(andrey-mp): The smallest VPC you can create uses a /28 netmask
resp, data = self.client.CreateVpc(CidrBlock='10.0.0.0/29')
if resp.status_code == 200:
self.addResourceCleanUp(self.client.DeleteVpc,
VpcId=data['Vpc']['VpcId'])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidVpc.Range', data['Error']['Code'])
def test_describe_non_existing_vpc_by_id(self):
vpc_id = 'vpc-00000000'
resp, data = self.client.DescribeVpcs(VpcIds=[vpc_id])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidVpcID.NotFound', data['Error']['Code'])
def test_describe_non_existing_vpc_by_cidr(self):
resp, data = self.client.DescribeVpcs(
Filters=[{'Name': 'cidr', 'Values': ['123.0.0.0/16']}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(0, len(data['Vpcs']))
def test_describe_with_invalid_filter(self):
cidr = '10.1.0.0/16'
resp, data = self.client.CreateVpc(CidrBlock=cidr)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
vpc_id = data['Vpc']['VpcId']
dv_clean = self.addResourceCleanUp(self.client.DeleteVpc, VpcId=vpc_id)
self.get_vpc_waiter().wait_available(vpc_id)
resp, data = self.client.DescribeVpcs(
Filters=[{'Name': 'unknown', 'Values': ['unknown']}])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
resp, data = self.client.DeleteVpc(VpcId=vpc_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(dv_clean)
self.get_vpc_waiter().wait_delete(vpc_id)

View File

@ -14,6 +14,7 @@
# under the License.
import logging
import random
import sys
import time
import traceback
@ -36,6 +37,15 @@ logging.getLogger(
).setLevel(logging.WARNING)
# TODO(andrey-mp): remove it when new tempest_lib with this will be
def rand_name(name=''):
randbits = str(random.randint(1, 0x7fffffff))
if name:
return name + '-' + randbits
else:
return randbits
def safe_setup(f):
"""A decorator used to wrap the setUpClass for safe setup."""
@ -456,3 +466,9 @@ class EC2TestCase(base.BaseTestCase):
@classmethod
def get_snapshot_waiter(cls):
return EC2Waiter(cls._snapshot_get_state)
def assertEmpty(self, list_obj, msg=None):
self.assertTrue(len(list_obj) == 0, msg)
def assertNotEmpty(self, list_obj, msg=None):
self.assertTrue(len(list_obj) > 0, msg)

View File

@ -27,5 +27,3 @@ SQLAlchemy>=0.9.7,<=0.9.99
sqlalchemy-migrate>=0.9.1,!=0.9.2
stevedore>=1.1.0 # Apache-2.0
WebOb>=1.2.3

View File

@ -9,5 +9,6 @@ oslotest
pylint==0.25.2
python-subunit>=0.0.18
sphinx>=1.1.2,!=1.2.0,<1.3
tempest-lib>=0.2.0
testrepository>=0.0.18
testtools>=0.9.34

332
tools/colorizer.py Executable file
View File

@ -0,0 +1,332 @@
#!/usr/bin/env python
# Copyright (c) 2013, Nebula, Inc.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Colorizer Code is borrowed from Twisted:
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Display a subunit stream through a colorized unittest test runner."""
import heapq
import sys
import unittest
import subunit
import testtools
class _AnsiColorizer(object):
"""
A colorizer is an object that loosely wraps around a stream, allowing
callers to write text to the stream in a particular color.
Colorizer classes must implement C{supported()} and C{write(text, color)}.
"""
_colors = dict(black=30, red=31, green=32, yellow=33,
blue=34, magenta=35, cyan=36, white=37)
def __init__(self, stream):
self.stream = stream
def supported(cls, stream=sys.stdout):
"""
A class method that returns True if the current platform supports
coloring terminal output using this method. Returns False otherwise.
"""
if not stream.isatty():
return False # auto color only on TTYs
try:
import curses
except ImportError:
return False
else:
try:
try:
return curses.tigetnum("colors") > 2
except curses.error:
curses.setupterm()
return curses.tigetnum("colors") > 2
except Exception:
# guess false in case of error
return False
supported = classmethod(supported)
def write(self, text, color):
"""
Write the given text to the stream in the given color.
@param text: Text to be written to the stream.
@param color: A string label for a color. e.g. 'red', 'white'.
"""
color = self._colors[color]
self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
class _Win32Colorizer(object):
"""
See _AnsiColorizer docstring.
"""
def __init__(self, stream):
import win32console
red, green, blue, bold = (win32console.FOREGROUND_RED,
win32console.FOREGROUND_GREEN,
win32console.FOREGROUND_BLUE,
win32console.FOREGROUND_INTENSITY)
self.stream = stream
self.screenBuffer = win32console.GetStdHandle(
win32console.STD_OUT_HANDLE)
self._colors = {'normal': red | green | blue,
'red': red | bold,
'green': green | bold,
'blue': blue | bold,
'yellow': red | green | bold,
'magenta': red | blue | bold,
'cyan': green | blue | bold,
'white': red | green | blue | bold}
def supported(cls, stream=sys.stdout):
try:
import win32console
screenBuffer = win32console.GetStdHandle(
win32console.STD_OUT_HANDLE)
except ImportError:
return False
import pywintypes
try:
screenBuffer.SetConsoleTextAttribute(
win32console.FOREGROUND_RED |
win32console.FOREGROUND_GREEN |
win32console.FOREGROUND_BLUE)
except pywintypes.error:
return False
else:
return True
supported = classmethod(supported)
def write(self, text, color):
color = self._colors[color]
self.screenBuffer.SetConsoleTextAttribute(color)
self.stream.write(text)
self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
class _NullColorizer(object):
"""
See _AnsiColorizer docstring.
"""
def __init__(self, stream):
self.stream = stream
def supported(cls, stream=sys.stdout):
return True
supported = classmethod(supported)
def write(self, text, color):
self.stream.write(text)
def get_elapsed_time_color(elapsed_time):
if elapsed_time > 1.0:
return 'red'
elif elapsed_time > 0.25:
return 'yellow'
else:
return 'green'
class NovaTestResult(testtools.TestResult):
def __init__(self, stream, descriptions, verbosity):
super(NovaTestResult, self).__init__()
self.stream = stream
self.showAll = verbosity > 1
self.num_slow_tests = 10
self.slow_tests = [] # this is a fixed-sized heap
self.colorizer = None
# NOTE(vish): reset stdout for the terminal check
stdout = sys.stdout
sys.stdout = sys.__stdout__
for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
if colorizer.supported():
self.colorizer = colorizer(self.stream)
break
sys.stdout = stdout
self.start_time = None
self.last_time = {}
self.results = {}
self.last_written = None
def _writeElapsedTime(self, elapsed):
color = get_elapsed_time_color(elapsed)
self.colorizer.write(" %.2f" % elapsed, color)
def _addResult(self, test, *args):
try:
name = test.id()
except AttributeError:
name = 'Unknown.unknown'
test_class, test_name = name.rsplit('.', 1)
elapsed = (self._now() - self.start_time).total_seconds()
item = (elapsed, test_class, test_name)
if len(self.slow_tests) >= self.num_slow_tests:
heapq.heappushpop(self.slow_tests, item)
else:
heapq.heappush(self.slow_tests, item)
self.results.setdefault(test_class, [])
self.results[test_class].append((test_name, elapsed) + args)
self.last_time[test_class] = self._now()
self.writeTests()
def _writeResult(self, test_name, elapsed, long_result, color,
short_result, success):
if self.showAll:
self.stream.write(' %s' % str(test_name).ljust(66))
self.colorizer.write(long_result, color)
if success:
self._writeElapsedTime(elapsed)
self.stream.writeln()
else:
self.colorizer.write(short_result, color)
def addSuccess(self, test):
super(NovaTestResult, self).addSuccess(test)
self._addResult(test, 'OK', 'green', '.', True)
def addFailure(self, test, err):
if test.id() == 'process-returncode':
return
super(NovaTestResult, self).addFailure(test, err)
self._addResult(test, 'FAIL', 'red', 'F', False)
def addError(self, test, err):
super(NovaTestResult, self).addFailure(test, err)
self._addResult(test, 'ERROR', 'red', 'E', False)
def addSkip(self, test, reason=None, details=None):
super(NovaTestResult, self).addSkip(test, reason, details)
self._addResult(test, 'SKIP', 'blue', 'S', True)
def startTest(self, test):
self.start_time = self._now()
super(NovaTestResult, self).startTest(test)
def writeTestCase(self, cls):
if not self.results.get(cls):
return
if cls != self.last_written:
self.colorizer.write(cls, 'white')
self.stream.writeln()
for result in self.results[cls]:
self._writeResult(*result)
del self.results[cls]
self.stream.flush()
self.last_written = cls
def writeTests(self):
time = self.last_time.get(self.last_written, self._now())
if not self.last_written or (self._now() - time).total_seconds() > 2.0:
diff = 3.0
while diff > 2.0:
classes = self.results.keys()
oldest = min(classes, key=lambda x: self.last_time[x])
diff = (self._now() - self.last_time[oldest]).total_seconds()
self.writeTestCase(oldest)
else:
self.writeTestCase(self.last_written)
def done(self):
self.stopTestRun()
def stopTestRun(self):
for cls in list(self.results.iterkeys()):
self.writeTestCase(cls)
self.stream.writeln()
self.writeSlowTests()
def writeSlowTests(self):
# Pare out 'fast' tests
slow_tests = [item for item in self.slow_tests
if get_elapsed_time_color(item[0]) != 'green']
if slow_tests:
slow_total_time = sum(item[0] for item in slow_tests)
slow = ("Slowest %i tests took %.2f secs:"
% (len(slow_tests), slow_total_time))
self.colorizer.write(slow, 'yellow')
self.stream.writeln()
last_cls = None
# sort by name
for elapsed, cls, name in sorted(slow_tests,
key=lambda x: x[1] + x[2]):
if cls != last_cls:
self.colorizer.write(cls, 'white')
self.stream.writeln()
last_cls = cls
self.stream.write(' %s' % str(name).ljust(68))
self._writeElapsedTime(elapsed)
self.stream.writeln()
def printErrors(self):
if self.showAll:
self.stream.writeln()
self.printErrorList('ERROR', self.errors)
self.printErrorList('FAIL', self.failures)
def printErrorList(self, flavor, errors):
for test, err in errors:
self.colorizer.write("=" * 70, 'red')
self.stream.writeln()
self.colorizer.write(flavor, 'red')
self.stream.writeln(": %s" % test.id())
self.colorizer.write("-" * 70, 'red')
self.stream.writeln()
self.stream.writeln("%s" % err)
test = subunit.ProtocolTestCase(sys.stdin, passthrough=None)
if sys.version_info[0:2] <= (2, 6):
runner = unittest.TextTestRunner(verbosity=2)
else:
runner = unittest.TextTestRunner(verbosity=2, resultclass=NovaTestResult)
if runner.run(test).wasSuccessful():
exit_code = 0
else:
exit_code = 1
sys.exit(exit_code)