ec2-api/ec2api/tests/unit/base.py

193 lines
7.8 KiB
Python

# Copyright 2014
# The Cloudscaling Group, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import mock
from oslo.config import cfg
from oslotest import base as test_base
import ec2api.api.apirequest
from ec2api.api import ec2utils
from ec2api.tests.unit import fakes
from ec2api.tests.unit import matchers
from ec2api.tests.unit import tools
import ec2api.wsgi
def skip_not_implemented(test_item):
def decorator(test_item):
test_item.skip('The feature is not yet implemented')
return decorator
class ApiTestCase(test_base.BaseTestCase):
def setUp(self):
super(ApiTestCase, self).setUp()
neutron_patcher = mock.patch('neutronclient.v2_0.client.Client')
self.neutron = neutron_patcher.start().return_value
self.addCleanup(neutron_patcher.stop)
nova_patcher = mock.patch('novaclient.v1_1.client.Client')
nova_mock = nova_patcher.start()
self.nova_availability_zones = (
nova_mock.return_value.availability_zones)
self.nova_servers = nova_mock.return_value.servers
self.nova_flavors = nova_mock.return_value.flavors
self.nova_floating_ips = nova_mock.return_value.floating_ips
self.nova_fixed_ips = nova_mock.return_value.fixed_ips
self.nova_key_pairs = nova_mock.return_value.keypairs
self.nova_security_groups = nova_mock.return_value.security_groups
self.nova_security_group_rules = (
nova_mock.return_value.security_group_rules)
self.nova_volumes = nova_mock.return_value.volumes
self.nova_quotas = nova_mock.return_value.quotas
self.addCleanup(nova_patcher.stop)
glance_patcher = mock.patch('glanceclient.client.Client')
self.glance = glance_patcher.start().return_value
self.addCleanup(glance_patcher.stop)
cinder_patcher = mock.patch('cinderclient.v1.client.Client')
self.cinder = cinder_patcher.start().return_value
self.addCleanup(cinder_patcher.stop)
db_api_patcher = mock.patch('ec2api.db.api.IMPL')
self.db_api = db_api_patcher.start()
self.addCleanup(db_api_patcher.stop)
isotime_patcher = mock.patch('ec2api.openstack.common.timeutils.'
'isotime')
self.isotime = isotime_patcher.start()
self.addCleanup(isotime_patcher.stop)
conf = cfg.CONF
conf.set_override('fatal_exception_format_errors', True)
self.addCleanup(conf.reset)
def execute(self, action, args):
ec2_request = ec2api.api.apirequest.APIRequest(action, 'fake_v1', args)
ec2_context = self._create_context()
environ = {'REQUEST_METHOD': 'FAKE',
'ec2.request': ec2_request,
'ec2api.context': ec2_context}
request = ec2api.wsgi.Request(environ)
response = request.send(ec2api.api.Executor())
return self._check_and_transform_response(response, action)
def _create_context(self):
return ec2api.context.RequestContext(
fakes.ID_OS_USER, fakes.ID_OS_PROJECT,
service_catalog=[{'type': 'network',
'endpoints': [{'publicUrl': 'fake_url'}]}])
def _check_and_transform_response(self, response, action):
body = tools.parse_xml(response.body)
if response.status_code == 200:
action_tag = '%sResponse' % action
self.assertIn(action_tag, body)
body = body.pop(action_tag)
self.assertIn('requestId', body)
body.pop('requestId')
else:
self.assertIn('Response', body)
body = body.pop('Response')
self.assertIn('RequestID', body)
body.pop('RequestID')
self.assertEqual(1, len(body))
self.assertIn('Errors', body)
body = body.pop('Errors')
self.assertEqual(1, len(body))
self.assertIn('Error', body)
self.assertEqual(2, len(body['Error']))
body['http_status_code'] = response.status_code
return body
def assert_any_call(self, func, *args, **kwargs):
calls = func.mock_calls
for call in calls:
call_args = call[1]
if matchers.ListMatches(call_args, args, orderless_lists=True):
return
self.assertEqual(False, True)
def check_filtering(self, operation, resultset_key, filters):
for name, value in filters:
resp = self.execute(operation,
{'Filter.1.Name': name,
'Filter.1.Value.1': str(value)})
self.assertEqual(200, resp['http_status_code'])
self.assertTrue(len(resp[resultset_key]) > 0,
'Filter by %s does not work' % name)
resp = self.execute(operation,
{'Filter.1.Name': name,
'Filter.1.Value.1': 'dummy filter value'})
self.assertEqual(200, resp['http_status_code'])
self.assertTrue(resp[resultset_key] is None or
len(resp[resultset_key]) == 0)
def check_tag_support(self, operation, resultset_key, sample_item_id,
id_key, item_kinds=[]):
self.db_api.get_tags = tools.CopyingMock(
return_value=[{'item_id': sample_item_id,
'key': 'fake_key',
'value': 'fake_value'}])
ec2_tags = [{'key': 'fake_key',
'value': 'fake_value'}]
resp = self.execute(operation, {})
self.assertEqual(200, resp['http_status_code'])
tag_found = False
if type(resultset_key) is list:
resp_items = itertools.chain(*(r[resultset_key[1]]
for r in resp[resultset_key[0]]))
else:
resp_items = resp[resultset_key]
resultset_key = [resultset_key]
for resp_item in resp_items:
if resp_item[id_key] == sample_item_id:
self.assertIn('tagSet', resp_item)
self.assertThat(resp_item['tagSet'],
matchers.ListMatches(ec2_tags))
tag_found = True
else:
self.assertTrue('tagSet' not in resp_item or
resp_item['tagSet'] == [])
self.assertTrue(tag_found)
if not item_kinds:
item_kinds = (ec2utils.get_ec2_id_kind(sample_item_id),)
self.assertTrue(self.db_api.get_tags.call_count == 1 and
(self.db_api.get_tags.mock_calls[0] in
(mock.call(mock.ANY, item_kinds, set()),
mock.call(mock.ANY, item_kinds, None))))
self.db_api.reset_mock()
id_param = '%s%s.1' % (id_key[0].capitalize(), id_key[1:])
resp = self.execute(operation, {id_param: sample_item_id})
self.assertEqual(200, resp['http_status_code'])
self.assertTrue(
self.db_api.get_tags.call_count == 1 and
(self.db_api.get_tags.mock_calls[0] in
(mock.call(mock.ANY, item_kinds, set([sample_item_id])),
mock.call(mock.ANY, item_kinds, [sample_item_id]))))
self.check_filtering(
operation, resultset_key[0],
[('tag-key', 'fake_key'),
('tag-value', 'fake_value'),
('tag:fake_key', 'fake_value')])