153 lines
6.2 KiB
Python
153 lines
6.2 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2010 United States Government as represented by the
|
|
# Administrator of the National Aeronautics and Space Administration.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import logging
|
|
|
|
from nova import context
|
|
from nova import db
|
|
from nova import exception
|
|
from nova import flags
|
|
from nova import quota
|
|
from nova import test
|
|
from nova import utils
|
|
from nova.auth import manager
|
|
from nova.api.ec2 import cloud
|
|
|
|
|
|
FLAGS = flags.FLAGS
|
|
|
|
|
|
class QuotaTestCase(test.TrialTestCase):
|
|
def setUp(self):
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
super(QuotaTestCase, self).setUp()
|
|
self.flags(connection_type='fake',
|
|
quota_instances=2,
|
|
quota_cores=4,
|
|
quota_volumes=2,
|
|
quota_gigabytes=20,
|
|
quota_floating_ips=1)
|
|
|
|
self.cloud = cloud.CloudController()
|
|
self.manager = manager.AuthManager()
|
|
self.user = self.manager.create_user('admin', 'admin', 'admin', True)
|
|
self.project = self.manager.create_project('admin', 'admin', 'admin')
|
|
self.network = utils.import_object(FLAGS.network_manager)
|
|
self.context = context.RequestContext(project=self.project,
|
|
user=self.user)
|
|
|
|
def tearDown(self):
|
|
manager.AuthManager().delete_project(self.project)
|
|
manager.AuthManager().delete_user(self.user)
|
|
super(QuotaTestCase, self).tearDown()
|
|
|
|
def _create_instance(self, cores=2):
|
|
"""Create a test instance"""
|
|
inst = {}
|
|
inst['image_id'] = 'ami-test'
|
|
inst['reservation_id'] = 'r-fakeres'
|
|
inst['user_id'] = self.user.id
|
|
inst['project_id'] = self.project.id
|
|
inst['instance_type'] = 'm1.large'
|
|
inst['vcpus'] = cores
|
|
inst['mac_address'] = utils.generate_mac()
|
|
return db.instance_create(self.context, inst)['id']
|
|
|
|
def _create_volume(self, size=10):
|
|
"""Create a test volume"""
|
|
vol = {}
|
|
vol['user_id'] = self.user.id
|
|
vol['project_id'] = self.project.id
|
|
vol['size'] = size
|
|
return db.volume_create(self.context, vol)['id']
|
|
|
|
def test_quota_overrides(self):
|
|
"""Make sure overriding a projects quotas works"""
|
|
num_instances = quota.allowed_instances(self.context, 100, 'm1.small')
|
|
self.assertEqual(num_instances, 2)
|
|
db.quota_create(self.context, {'project_id': self.project.id,
|
|
'instances': 10})
|
|
num_instances = quota.allowed_instances(self.context, 100, 'm1.small')
|
|
self.assertEqual(num_instances, 4)
|
|
db.quota_update(self.context, self.project.id, {'cores': 100})
|
|
num_instances = quota.allowed_instances(self.context, 100, 'm1.small')
|
|
self.assertEqual(num_instances, 10)
|
|
db.quota_destroy(self.context, self.project.id)
|
|
|
|
def test_too_many_instances(self):
|
|
instance_ids = []
|
|
for i in range(FLAGS.quota_instances):
|
|
instance_id = self._create_instance()
|
|
instance_ids.append(instance_id)
|
|
self.assertRaises(cloud.QuotaError, self.cloud.run_instances,
|
|
self.context,
|
|
min_count=1,
|
|
max_count=1,
|
|
instance_type='m1.small')
|
|
for instance_id in instance_ids:
|
|
db.instance_destroy(self.context, instance_id)
|
|
|
|
def test_too_many_cores(self):
|
|
instance_ids = []
|
|
instance_id = self._create_instance(cores=4)
|
|
instance_ids.append(instance_id)
|
|
self.assertRaises(cloud.QuotaError, self.cloud.run_instances,
|
|
self.context,
|
|
min_count=1,
|
|
max_count=1,
|
|
instance_type='m1.small')
|
|
for instance_id in instance_ids:
|
|
db.instance_destroy(self.context, instance_id)
|
|
|
|
def test_too_many_volumes(self):
|
|
volume_ids = []
|
|
for i in range(FLAGS.quota_volumes):
|
|
volume_id = self._create_volume()
|
|
volume_ids.append(volume_id)
|
|
self.assertRaises(cloud.QuotaError, self.cloud.create_volume,
|
|
self.context,
|
|
size=10)
|
|
for volume_id in volume_ids:
|
|
db.volume_destroy(self.context, volume_id)
|
|
|
|
def test_too_many_gigabytes(self):
|
|
volume_ids = []
|
|
volume_id = self._create_volume(size=20)
|
|
volume_ids.append(volume_id)
|
|
self.assertRaises(cloud.QuotaError,
|
|
self.cloud.create_volume,
|
|
self.context,
|
|
size=10)
|
|
for volume_id in volume_ids:
|
|
db.volume_destroy(self.context, volume_id)
|
|
|
|
def test_too_many_addresses(self):
|
|
address = '192.168.0.100'
|
|
try:
|
|
db.floating_ip_get_by_address(context.get_admin_context(), address)
|
|
except exception.NotFound:
|
|
db.floating_ip_create(context.get_admin_context(), {'address': address,
|
|
'host': FLAGS.host})
|
|
float_addr = self.network.allocate_floating_ip(self.context,
|
|
self.project.id)
|
|
# NOTE(vish): This assert never fails. When cloud attempts to
|
|
# make an rpc.call, the test just finishes with OK. It
|
|
# appears to be something in the magic inline callbacks
|
|
# that is breaking.
|
|
self.assertRaises(cloud.QuotaError, self.cloud.allocate_address, self.context)
|