Merge "add simple snapshots' tests"

This commit is contained in:
Jenkins 2015-01-15 12:16:24 +00:00 committed by Gerrit Code Review
commit d8c2508624
4 changed files with 257 additions and 7 deletions

View File

@ -151,9 +151,6 @@ def _format_volume(context, volume, os_volume, instances={},
snapshot_id = snapshot['id']
ec2_volume['snapshotId'] = snapshot_id
# NOTE(apavlov): code doesn't support 'volumeType' and
# 'encrypted' fields for volume.
return ec2_volume

View File

@ -199,8 +199,10 @@ LOCATION_IMAGE_1 = 'fake_bucket/fake.img.manifest.xml'
# volumes constants
ID_EC2_VOLUME_1 = random_ec2_id('vol')
ID_EC2_VOLUME_2 = random_ec2_id('vol')
ID_EC2_VOLUME_3 = random_ec2_id('vol')
ID_OS_VOLUME_1 = random_os_id()
ID_OS_VOLUME_2 = random_os_id()
ID_OS_VOLUME_3 = random_os_id()
# snapshots constants
@ -1108,9 +1110,48 @@ OS_IMAGE_2 = {
# snapshot objects
class OSSnapshot(object):
def __init__(self, snapshot_dict):
self.id = snapshot_dict['id']
def __init__(self, snapshot):
self.id = snapshot['id']
self.status = snapshot['status']
self.volume_id = snapshot['volume_id']
self.created_at = snapshot['created_at']
self.progress = snapshot['progress']
self.project_id = ID_OS_PROJECT
self.size = snapshot['size']
self.display_description = snapshot['description']
def get(self):
pass
def delete(self):
pass
def update(self, *args, **kwargs):
pass
TIME_CREATE_SNAPSHOT_1 = timeutils.isotime(None, True)
TIME_CREATE_SNAPSHOT_2 = timeutils.isotime(None, True)
EC2_SNAPSHOT_1 = {
'description': None,
'volumeId': ID_EC2_VOLUME_2,
'status': 'completed',
'volumeSize': 1,
'progress': '100%',
'startTime': TIME_CREATE_SNAPSHOT_1,
'snapshotId': ID_EC2_SNAPSHOT_1,
'ownerId': ID_OS_PROJECT
}
EC2_SNAPSHOT_2 = {
'description': 'fake description',
'volumeId': ID_EC2_VOLUME_2,
'status': 'completed',
'volumeSize': 1,
'progress': '100%',
'startTime': TIME_CREATE_SNAPSHOT_2,
'snapshotId': ID_EC2_SNAPSHOT_2,
'ownerId': ID_OS_PROJECT
}
DB_SNAPSHOT_1 = {
'id': ID_EC2_SNAPSHOT_1,
@ -1123,9 +1164,21 @@ DB_SNAPSHOT_2 = {
OS_SNAPSHOT_1 = {
'id': ID_OS_SNAPSHOT_1,
'status': 'available',
'volume_id': ID_OS_VOLUME_2,
'created_at': TIME_CREATE_SNAPSHOT_1,
'progress': '100%',
'size': 1,
'description': None,
}
OS_SNAPSHOT_2 = {
'id': ID_OS_SNAPSHOT_2,
'status': 'available',
'volume_id': ID_OS_VOLUME_2,
'created_at': TIME_CREATE_SNAPSHOT_2,
'progress': '100%',
'size': 1,
'description': 'fake description',
}
@ -1244,6 +1297,7 @@ class CinderVolume(object):
TIME_CREATE_VOLUME_1 = timeutils.isotime(None, True)
TIME_CREATE_VOLUME_2 = timeutils.isotime(None, True)
TIME_CREATE_VOLUME_3 = timeutils.isotime(None, True)
VOLUME_AVAILABILITY_ZONE = 'fake_zone'
EC2_VOLUME_1 = {
@ -1264,6 +1318,15 @@ EC2_VOLUME_2 = {
'status': 'available',
'attachmentSet': [],
}
EC2_VOLUME_3 = {
'volumeId': ID_EC2_VOLUME_3,
'snapshotId': ID_EC2_SNAPSHOT_1,
'availabilityZone': VOLUME_AVAILABILITY_ZONE,
'createTime': TIME_CREATE_VOLUME_3,
'size': 1,
'status': 'available',
'attachmentSet': [],
}
DB_VOLUME_1 = {
'id': ID_EC2_VOLUME_1,
@ -1273,6 +1336,10 @@ DB_VOLUME_2 = {
'id': ID_EC2_VOLUME_2,
'os_id': ID_OS_VOLUME_2,
}
DB_VOLUME_3 = {
'id': ID_EC2_VOLUME_3,
'os_id': ID_OS_VOLUME_3,
}
OS_VOLUME_1 = {
'id': ID_OS_VOLUME_1,
@ -1281,7 +1348,8 @@ OS_VOLUME_1 = {
'size': 1,
'created_at': TIME_CREATE_VOLUME_1,
'display_name': 'test-vol-name',
'display_description': 'test-vol-desc'
'display_description': 'test-vol-desc',
'snapshot_id': None
}
OS_VOLUME_2 = {
'id': ID_OS_VOLUME_2,
@ -1290,7 +1358,18 @@ OS_VOLUME_2 = {
'size': 1,
'created_at': TIME_CREATE_VOLUME_2,
'display_name': 'test-vol-name',
'display_description': 'test-vol-desc'
'display_description': 'test-vol-desc',
'snapshot_id': None
}
OS_VOLUME_3 = {
'id': ID_OS_VOLUME_3,
'status': 'available',
'availability_zone': VOLUME_AVAILABILITY_ZONE,
'size': 1,
'created_at': TIME_CREATE_VOLUME_3,
'display_name': 'test-vol-name',
'display_description': 'test-vol-desc',
'snapshot_id': ID_OS_SNAPSHOT_1
}

View File

@ -0,0 +1,145 @@
# Copyright 2014
# The Cloudscaling Group, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from ec2api.tests import base
from ec2api.tests import fakes
from ec2api.tests import matchers
from ec2api.tests import tools
class SnapshotTestCase(base.ApiTestCase):
def test_describe_snapshots(self):
self.cinder.volume_snapshots.list.return_value = [
fakes.OSSnapshot(fakes.OS_SNAPSHOT_1),
fakes.OSSnapshot(fakes.OS_SNAPSHOT_2)]
self.db_api.get_items.side_effect = (
fakes.get_db_api_get_items({
'snap': [fakes.DB_SNAPSHOT_1, fakes.DB_SNAPSHOT_2],
'vol': [fakes.DB_VOLUME_2]}))
resp = self.execute('DescribeSnapshots', {})
self.assertEqual(200, resp['http_status_code'])
resp.pop('http_status_code')
self.assertThat(resp, matchers.DictMatches(
{'snapshotSet': [fakes.EC2_SNAPSHOT_1, fakes.EC2_SNAPSHOT_2]},
orderless_lists=True))
self.db_api.get_items.assert_any_call(mock.ANY, 'vol')
self.db_api.get_items_by_ids.side_effect = (
lambda context, kind, ids: [fakes.DB_SNAPSHOT_1])
self.db_api.get_items_by_ids = tools.CopyingMock(
side_effect=self.db_api.get_items_by_ids.side_effect)
resp = self.execute('DescribeSnapshots',
{'SnapshotId.1': fakes.ID_EC2_SNAPSHOT_1})
self.assertEqual(200, resp['http_status_code'])
resp.pop('http_status_code')
self.assertThat(resp, matchers.DictMatches(
{'snapshotSet': [fakes.EC2_SNAPSHOT_1]},
orderless_lists=True))
self.db_api.get_items_by_ids.assert_any_call(
mock.ANY, 'snap', set([fakes.ID_EC2_SNAPSHOT_1]))
def test_describe_snapshots_invalid_parameters(self):
self.cinder.volume_snapshots.list.return_value = [
fakes.OSSnapshot(fakes.OS_SNAPSHOT_1),
fakes.OSSnapshot(fakes.OS_SNAPSHOT_2)]
resp = self.execute('DescribeSnapshots',
{'SnapshotId.1': fakes.random_ec2_id('snap')})
self.assertEqual(400, resp['http_status_code'])
self.assertEqual('InvalidSnapshot.NotFound', resp['Error']['Code'])
self.cinder.volume_snapshots.list.side_effect = lambda: []
resp = self.execute('DescribeSnapshots',
{'SnapshotId.1': fakes.ID_EC2_SNAPSHOT_1})
self.assertEqual(400, resp['http_status_code'])
self.assertEqual('InvalidSnapshot.NotFound', resp['Error']['Code'])
def test_create_snapshot_from_volume(self):
self.cinder.volume_snapshots.create.return_value = (
fakes.OSSnapshot(fakes.OS_SNAPSHOT_1))
self.db_api.add_item.side_effect = (
fakes.get_db_api_add_item(fakes.ID_EC2_SNAPSHOT_1))
self.db_api.get_item_by_id.side_effect = (
fakes.get_db_api_get_item_by_id({
fakes.ID_EC2_VOLUME_2: fakes.DB_VOLUME_2}))
self.cinder.volumes.get.side_effect = (
lambda vol_id: (fakes.CinderVolume(fakes.OS_VOLUME_2)
if vol_id == fakes.ID_OS_VOLUME_2
else None))
resp = self.execute(
'CreateSnapshot',
{'VolumeId': fakes.ID_EC2_VOLUME_2})
self.assertEqual(200, resp['http_status_code'])
self.assertThat(fakes.EC2_SNAPSHOT_1, matchers.DictMatches(
tools.purge_dict(resp, {'http_status_code'})))
self.cinder.volume_snapshots.create.assert_called_once(mock.ANY)
self.db_api.add_item.assert_called_once_with(
mock.ANY, 'snap',
tools.purge_dict(fakes.DB_SNAPSHOT_1, ('id',)))
self.cinder.volume_snapshots.create.assert_called_once_with(
fakes.ID_OS_VOLUME_2, force=True, display_description=None)
def test_format_snapshot_maps_status(self):
fake_snapshot = fakes.OSSnapshot(fakes.OS_SNAPSHOT_1)
self.cinder.volume_snapshots.list.return_value = [fake_snapshot]
self.db_api.get_items.side_effect = (
fakes.get_db_api_get_items({
'snap': [fakes.DB_SNAPSHOT_1],
'vol': [fakes.DB_VOLUME_2]}))
fake_snapshot.status = 'new'
resp = self.execute('DescribeSnapshots', {})
self.assertEqual(200, resp['http_status_code'])
self.assertEqual('pending', resp['snapshotSet'][0]['status'])
fake_snapshot.status = 'creating'
resp = self.execute('DescribeSnapshots', {})
self.assertEqual(200, resp['http_status_code'])
self.assertEqual('pending', resp['snapshotSet'][0]['status'])
fake_snapshot.status = 'available'
resp = self.execute('DescribeSnapshots', {})
self.assertEqual(200, resp['http_status_code'])
self.assertEqual('completed', resp['snapshotSet'][0]['status'])
fake_snapshot.status = 'active'
resp = self.execute('DescribeSnapshots', {})
self.assertEqual(200, resp['http_status_code'])
self.assertEqual('completed', resp['snapshotSet'][0]['status'])
fake_snapshot.status = 'deleting'
resp = self.execute('DescribeSnapshots', {})
self.assertEqual(200, resp['http_status_code'])
self.assertEqual('pending', resp['snapshotSet'][0]['status'])
fake_snapshot.status = 'error'
resp = self.execute('DescribeSnapshots', {})
self.assertEqual(200, resp['http_status_code'])
self.assertEqual('error', resp['snapshotSet'][0]['status'])
fake_snapshot.status = 'banana'
resp = self.execute('DescribeSnapshots', {})
self.assertEqual(200, resp['http_status_code'])
self.assertEqual('banana', resp['snapshotSet'][0]['status'])

View File

@ -89,6 +89,35 @@ class VolumeTestCase(base.ApiTestCase):
mock.ANY, 'vol',
tools.purge_dict(fakes.DB_VOLUME_1, ('id',)))
self.cinder.volumes.create.assert_called_once_with(
None, snapshot_id=None, volume_type=None,
availability_zone=fakes.VOLUME_AVAILABILITY_ZONE)
def test_create_volume_from_snapshot(self):
self.cinder.volumes.create.return_value = (
fakes.CinderVolume(fakes.OS_VOLUME_3))
self.db_api.add_item.side_effect = (
fakes.get_db_api_add_item(fakes.ID_EC2_VOLUME_3))
self.db_api.get_item_by_id.side_effect = (
fakes.get_db_api_get_item_by_id({
fakes.ID_EC2_SNAPSHOT_1: fakes.DB_SNAPSHOT_1}))
resp = self.execute(
'CreateVolume',
{'AvailabilityZone': fakes.VOLUME_AVAILABILITY_ZONE,
'SnapshotId': fakes.ID_EC2_SNAPSHOT_1})
self.assertEqual(200, resp['http_status_code'])
self.assertThat(fakes.EC2_VOLUME_3, matchers.DictMatches(
tools.purge_dict(resp, {'http_status_code'})))
self.cinder.volumes.create.assert_called_once(mock.ANY)
self.db_api.add_item.assert_called_once_with(
mock.ANY, 'vol',
tools.purge_dict(fakes.DB_VOLUME_3, ('id',)))
self.cinder.volumes.create.assert_called_once_with(
None, snapshot_id=fakes.ID_OS_SNAPSHOT_1, volume_type=None,
availability_zone=fakes.VOLUME_AVAILABILITY_ZONE)
def test_format_volume_maps_status(self):
fake_volume = fakes.CinderVolume(fakes.OS_VOLUME_1)
self.cinder.volumes.list.return_value = [fake_volume]