Added tests of firewalls api.

Change-Id: I66b2d3080d259e8be50138c02749a9a7f7e58d19
This commit is contained in:
alexey-mr 2015-11-18 17:05:19 +03:00
parent 54e55e4b47
commit bee22697b8
5 changed files with 221 additions and 13 deletions

View File

@ -56,6 +56,9 @@ gce_opts = [
default=500,
help='Default new volume size if sizeGb, sourceSnapshot and '
'sourceImage are not provided'),
cfg.StrOpt('default_network_name',
default='default',
help='Default network name that expected to exists'),
cfg.StrOpt('default_network_ip_range',
default='10.240.0.0/16',
help='Default new network ip range if it is not provided'),

View File

@ -14,6 +14,7 @@
import copy
from oslo_config import cfg
from oslo_log import log as logging
from gceapi.api import base_api
@ -31,6 +32,7 @@ PROTOCOL_MAP = {
'17': 'udp',
}
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class API(base_api.API):
@ -76,9 +78,18 @@ class API(base_api.API):
return items
def add_item(self, context, name, body, scope=None):
network = self._get_network_by_url(context, body['network'])
# expected that either network is provided in parameters or
# default network exists (as in Google)
network = self._get_network_by_url(
context,
body.get('network', CONF.default_network_name)
)
self._check_rules(body)
group_description = body.get("description", "")
default_description = _("Firewall rules for network {}")
group_description = body.get(
"description",
default_description.format(network['name'])
)
client = clients.nova(context)
operation_util.start_operation(context)
sg = client.security_groups.create(body['name'], group_description)
@ -94,7 +105,6 @@ class API(base_api.API):
raise
new_firewall = utils.to_dict(client.security_groups.get(sg.id))
new_firewall = self._prepare_firewall(new_firewall)
new_firewall["creationTimestamp"] = 1
new_firewall["network_name"] = network["name"]
new_firewall = self._add_db_item(context, new_firewall)
self._process_callbacks(
@ -202,7 +212,7 @@ class API(base_api.API):
return network_api.API().get_item(context, network_name)
def _check_rules(self, firewall):
if not firewall.get('sourceRanges') or firewall.get('sourceTags'):
if not (firewall.get('sourceRanges') or firewall.get('sourceTags')):
msg = _("Not 'sourceRange' neither 'sourceTags' is provided")
raise exception.InvalidRequest(msg)
for allowed in firewall.get('allowed', []):

View File

@ -0,0 +1,195 @@
# Copyright 2015 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from gceapi.tests.functional.api import test_networks
def ip_to_re_pattern(ip):
return test_networks.ip_to_re_pattern(ip)
class TestFirewallBase(test_networks.TestNetworksBase):
@property
def firewalls(self):
res = self.api.compute.firewalls()
self.assertIsNotNone(
res,
'Null firewalls object, api is not built properly')
return res
def _create_firewall(self, options):
project_id = self.cfg.project_id
self.trace('Crete firewall with options {}'.format(options))
request = self.firewalls.insert(
project=project_id,
body=options)
self._add_cleanup(self._delete_firewall, options['name'])
self._execute_async_request(request, project_id)
def _delete_firewall(self, name):
cfg = self.cfg
project_id = cfg.project_id
self.trace('Delete firewall: project_id={} firewall={}'.
format(project_id, name))
request = self.firewalls.delete(
project=project_id,
firewall=name)
self._execute_async_request(request, project_id)
self._remove_cleanup(self._delete_firewall, name)
def _list_firewalls(self, filter=None):
project_id = self.cfg.project_id
self.trace('List firewalls: project_id={}'.format(project_id))
request = self.firewalls.list(project=project_id, filter=filter)
self.trace_request(request)
result = request.execute()
self.trace('Firewalls: {}'.format(result))
self.api.validate_schema(value=result, schema_name='FirewallList')
return result
def _get_firewall(self, name):
project_id = self.cfg.project_id
self.trace('Get firewall: project_id={} firewall={}'.
format(project_id, name))
request = self.firewalls.get(
project=project_id,
firewall=name)
result = request.execute()
self.trace('Firewall: {}'.format(result))
self.api.validate_schema(value=result, schema_name='Firewall')
return result
def _get_expected_firewall(self, options):
firewall = copy.deepcopy(options)
firewall.setdefault('kind', u'compute#firewall')
self_link = 'global/firewalls/{}'.format(firewall['name'])
firewall.setdefault('selfLink', self.api.get_project_url(self_link))
# just to check on exist
firewall.setdefault('allowed', [])
# TODO(alexey-mr): OS GCE default firewall doesn't provide network
# firewall.setdefault('network', '.*')
return firewall
def _ensure_firewall_created(self, options):
result = self._get_firewall(options['name'])
expected = self._get_expected_firewall(options)
self.assertObject(expected, result)
return result
def _create_firewall_and_validate(self, options):
self._create_firewall(options)
result = self._get_firewall(options['name'])
expected = self._get_expected_firewall(options)
self.assertObject(expected, result)
return expected
class TestFirewalls(TestFirewallBase):
def test_list_default_firewalls(self):
result = self._list_firewalls()
for firewall in result['items']:
options = {
'name': firewall['name']
}
expected = self._get_expected_firewall(options)
self.assertObject(expected, firewall)
def test_create_delete_firewall_ip_range_tcp_port(self):
name = self._rand_name('testfirewall')
options = {
'name': name,
'allowed': [
{
'IPProtocol': 'tcp',
'ports': ['44444']
}
],
'sourceRanges': ['10.240.0.0/16']
}
self._create_firewall_and_validate(options)
self._delete_firewall(name)
def test_create_delete_firewall_source_tag_tcp_port_range(self):
if not self.is_real_gce:
self.skipTest('Skip because of OS GCE does not support tags')
return
name = self._rand_name('testfirewall')
options = {
'name': name,
'allowed': [
{
'IPProtocol': 'tcp',
'ports': ['50000-55000']
}
],
'sourceTags': ['no-ip']
}
self._create_firewall_and_validate(options)
self._delete_firewall(name)
def test_create_delete_firewall_target_tag_tcp_empty_ports(self):
if not self.is_real_gce:
self.skipTest('Skip because of OS GCE does not support tags')
return
name = self._rand_name('testfirewall')
options = {
'name': name,
'allowed': [
{
'IPProtocol': 'tcp'
}
],
'sourceTags': ['src-no-ip'],
'targetTags': ['trg-no-ip']
}
self._create_firewall_and_validate(options)
self._delete_firewall(name)
def _prepare_network(self):
name = self._rand_name('testnetwork')
options = {
'name': name,
'IPv4Range': '10.241.0.0/16',
}
self._create_network(options)
options['gatewayIPv4'] = '10.241.0.1'
return self._ensure_network_created(options)
def test_create_delete_firewall_custom_network(self):
if self.is_nova_network:
self.skipTest('Skip because of nova-network cannot create network')
return
network = self._prepare_network()
name = self._rand_name('testfirewall')
options = {
'name': name,
'allowed': [
{
'IPProtocol': 'udp',
'ports': ['30000', '40000', '50000-51000']
},
{
'IPProtocol': 'icmp'
}
],
'sourceRanges': [network['IPv4Range']],
'network': network['selfLink']
}
self._create_firewall_and_validate(options)
self._delete_firewall(name)
self._delete_network(network['name'])

View File

@ -93,10 +93,6 @@ class TestNetworksBase(test_base.GCETestCase):
class TestNetworks(TestNetworksBase):
@property
def _is_nova_network(self):
return self.cfg.networking == 'nova-network'
def test_get_default_network(self):
name = 'default'
network = self._get_network(name)
@ -127,7 +123,7 @@ class TestNetworks(TestNetworksBase):
self.assertObject(expected, result)
def test_create_network_default(self):
if self._is_nova_network:
if self.is_nova_network:
self.skipTest('Skip network because of nova-network')
return
name = self._rand_name('testnetwork')
@ -139,8 +135,8 @@ class TestNetworks(TestNetworksBase):
self._delete_network(name)
def test_create_network_with_ip_range(self):
if self._is_nova_network:
self.skipTest('Skip network because of nova-network')
if self.is_nova_network:
self.skipTest('Skip because of nova-network cannot create network')
return
name = self._rand_name('testnetwork')
options = {
@ -153,7 +149,7 @@ class TestNetworks(TestNetworksBase):
self._delete_network(name)
def test_create_network_with_gateway(self):
if self._is_nova_network:
if self.is_nova_network:
self.skipTest('Skip network because of nova-network')
return
name = self._rand_name('testnetwork')
@ -167,7 +163,7 @@ class TestNetworks(TestNetworksBase):
self._delete_network(name)
def test_list_networks_by_filter_name(self):
if self._is_nova_network:
if self.is_nova_network:
self.skipTest('Skip network because of nova-network')
return
names = [self._rand_name('testnetwork') for _ in range(0, 3)]

View File

@ -301,6 +301,10 @@ class GCETestCase(base.BaseTestCase):
def is_real_gce(self):
return self._credentials_provider.is_google_auth
@property
def is_nova_network(self):
return self.cfg.networking == 'nova-network'
def _get_operations_request(self, name, project, zone, region):
if zone is not None:
return self.api.compute.zoneOperations().get(