108 lines
4.2 KiB
Python
108 lines
4.2 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2010 United States Government as represented by the
|
|
# Administrator of the National Aeronautics and Space Administration.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from eventlet import greenthread
|
|
|
|
from nova import context
|
|
from nova import db
|
|
from nova import flags
|
|
from nova import log as logging
|
|
from nova import rpc
|
|
from nova import test
|
|
from nova import utils
|
|
from nova.auth import manager
|
|
from nova.api.ec2 import admin
|
|
from nova.image import fake
|
|
|
|
|
|
FLAGS = flags.FLAGS
|
|
LOG = logging.getLogger('nova.tests.adminapi')
|
|
|
|
|
|
class AdminApiTestCase(test.TestCase):
|
|
def setUp(self):
|
|
super(AdminApiTestCase, self).setUp()
|
|
self.flags(connection_type='fake')
|
|
|
|
self.conn = rpc.Connection.instance()
|
|
|
|
# set up our cloud
|
|
self.api = admin.AdminController()
|
|
|
|
# set up services
|
|
self.compute = self.start_service('compute')
|
|
self.scheduter = self.start_service('scheduler')
|
|
self.network = self.start_service('network')
|
|
self.volume = self.start_service('volume')
|
|
self.image_service = utils.import_object(FLAGS.image_service)
|
|
|
|
self.manager = manager.AuthManager()
|
|
self.user = self.manager.create_user('admin', 'admin', 'admin', True)
|
|
self.project = self.manager.create_project('proj', 'admin', 'proj')
|
|
self.context = context.RequestContext(user=self.user,
|
|
project=self.project)
|
|
|
|
def fake_show(meh, context, id):
|
|
return {'id': 1, 'properties': {'kernel_id': 1, 'ramdisk_id': 1,
|
|
'type': 'machine', 'image_state': 'available'}}
|
|
|
|
self.stubs.Set(fake._FakeImageService, 'show', fake_show)
|
|
self.stubs.Set(fake._FakeImageService, 'show_by_name', fake_show)
|
|
|
|
# NOTE(vish): set up a manual wait so rpc.cast has a chance to finish
|
|
rpc_cast = rpc.cast
|
|
|
|
def finish_cast(*args, **kwargs):
|
|
rpc_cast(*args, **kwargs)
|
|
greenthread.sleep(0.2)
|
|
|
|
self.stubs.Set(rpc, 'cast', finish_cast)
|
|
|
|
def tearDown(self):
|
|
self.manager.delete_project(self.project)
|
|
self.manager.delete_user(self.user)
|
|
super(AdminApiTestCase, self).tearDown()
|
|
|
|
def test_block_external_ips(self):
|
|
"""Make sure provider firewall rules are created."""
|
|
result = self.api.block_external_addresses(self.context, '1.1.1.1/32')
|
|
self.api.remove_external_address_block(self.context, '1.1.1.1/32')
|
|
self.assertEqual('OK', result['status'])
|
|
self.assertEqual('Added 3 rules', result['message'])
|
|
|
|
def test_list_blocked_ips(self):
|
|
"""Make sure we can see the external blocks that exist."""
|
|
self.api.block_external_addresses(self.context, '1.1.1.2/32')
|
|
result = self.api.describe_external_address_blocks(self.context)
|
|
num = len(db.provider_fw_rule_get_all(self.context))
|
|
self.api.remove_external_address_block(self.context, '1.1.1.2/32')
|
|
# we only list IP, not tcp/udp/icmp rules
|
|
self.assertEqual(num / 3, len(result['externalIpBlockInfo']))
|
|
|
|
def test_remove_ip_block(self):
|
|
"""Remove ip blocks."""
|
|
result = self.api.block_external_addresses(self.context, '1.1.1.3/32')
|
|
self.assertEqual('OK', result['status'])
|
|
num0 = len(db.provider_fw_rule_get_all(self.context))
|
|
result = self.api.remove_external_address_block(self.context,
|
|
'1.1.1.3/32')
|
|
self.assertEqual('OK', result['status'])
|
|
self.assertEqual('Deleted 3 rules', result['message'])
|
|
num1 = len(db.provider_fw_rule_get_all(self.context))
|
|
self.assert_(num1 < num0)
|