Skip v2 block-storage tests when service is not found

We have skips in for the cloud functional tests. Add the same skips for the
resource layer tests.

Since the resource layer is tied to version 2, put in an explicit
config override in the setUp to set block_storage_api_version.

Shift block_storage v2 tests to use self.user_cloud and not self.conn.

In doing so, test_type turns out to need admin privs, so use self.op_cloud
for it.

Change-Id: I8b231e7b5e0ebad6751702d9f0cb85e3d2192ac5
This commit is contained in:
Monty Taylor 2019-01-07 13:45:08 +00:00
parent e18899d19f
commit 7027c17b32
No known key found for this signature in database
GPG Key ID: 7BAE94BC7141A594
5 changed files with 33 additions and 25 deletions

View File

@ -26,4 +26,8 @@ class BaseBlockStorageTest(base.BaseFunctionalTest):
def setUp(self):
super(BaseBlockStorageTest, self).setUp()
self.skipTest('block-storage v2 functional tests broken')
self._set_user_cloud(block_storage_api_version='2')
self._set_operator_cloud(block_storage_api_version='2')
if not self.user_cloud.has_service('block-storage'):
self.skipTest('block-storage service not supported by cloud')

View File

@ -20,7 +20,7 @@ class TestBackup(base.BaseBlockStorageTest):
def setUp(self):
super(TestBackup, self).setUp()
if not self.conn.has_service('object-store'):
if not self.user_cloud.has_service('object-store'):
self.skipTest('Object service is requred, but not available')
self.VOLUME_NAME = self.getUniqueString()
@ -28,10 +28,10 @@ class TestBackup(base.BaseBlockStorageTest):
self.BACKUP_NAME = self.getUniqueString()
self.BACKUP_ID = None
volume = self.conn.block_storage.create_volume(
volume = self.user_cloud.block_storage.create_volume(
name=self.VOLUME_NAME,
size=1)
self.conn.block_storage.wait_for_status(
self.user_cloud.block_storage.wait_for_status(
volume,
status='available',
failures=['error'],
@ -40,10 +40,10 @@ class TestBackup(base.BaseBlockStorageTest):
assert isinstance(volume, _volume.Volume)
self.VOLUME_ID = volume.id
backup = self.conn.block_storage.create_backup(
backup = self.user_cloud.block_storage.create_backup(
name=self.BACKUP_NAME,
volume_id=volume.id)
self.conn.block_storage.wait_for_status(
self.user_cloud.block_storage.wait_for_status(
backup,
status='available',
failures=['error'],
@ -54,15 +54,15 @@ class TestBackup(base.BaseBlockStorageTest):
self.BACKUP_ID = backup.id
def tearDown(self):
sot = self.conn.block_storage.delete_backup(
sot = self.user_cloud.block_storage.delete_backup(
self.BACKUP_ID,
ignore_missing=False)
sot = self.conn.block_storage.delete_volume(
sot = self.user_cloud.block_storage.delete_volume(
self.VOLUME_ID,
ignore_missing=False)
self.assertIsNone(sot)
super(TestBackup, self).tearDown()
def test_get(self):
sot = self.conn.block_storage.get_backup(self.BACKUP_ID)
sot = self.user_cloud.block_storage.get_backup(self.BACKUP_ID)
self.assertEqual(self.BACKUP_NAME, sot.name)

View File

@ -26,10 +26,10 @@ class TestSnapshot(base.BaseBlockStorageTest):
self.VOLUME_NAME = self.getUniqueString()
self.VOLUME_ID = None
volume = self.conn.block_storage.create_volume(
volume = self.user_cloud.block_storage.create_volume(
name=self.VOLUME_NAME,
size=1)
self.conn.block_storage.wait_for_status(
self.user_cloud.block_storage.wait_for_status(
volume,
status='available',
failures=['error'],
@ -38,10 +38,10 @@ class TestSnapshot(base.BaseBlockStorageTest):
assert isinstance(volume, _volume.Volume)
self.assertEqual(self.VOLUME_NAME, volume.name)
self.VOLUME_ID = volume.id
snapshot = self.conn.block_storage.create_snapshot(
snapshot = self.user_cloud.block_storage.create_snapshot(
name=self.SNAPSHOT_NAME,
volume_id=self.VOLUME_ID)
self.conn.block_storage.wait_for_status(
self.user_cloud.block_storage.wait_for_status(
snapshot,
status='available',
failures=['error'],
@ -52,17 +52,17 @@ class TestSnapshot(base.BaseBlockStorageTest):
self.SNAPSHOT_ID = snapshot.id
def tearDown(self):
snapshot = self.conn.block_storage.get_snapshot(self.SNAPSHOT_ID)
sot = self.conn.block_storage.delete_snapshot(
snapshot = self.user_cloud.block_storage.get_snapshot(self.SNAPSHOT_ID)
sot = self.user_cloud.block_storage.delete_snapshot(
snapshot, ignore_missing=False)
self.conn.block_storage.wait_for_delete(
self.user_cloud.block_storage.wait_for_delete(
snapshot, interval=2, wait=self._wait_for_timeout)
self.assertIsNone(sot)
sot = self.conn.block_storage.delete_volume(
sot = self.user_cloud.block_storage.delete_volume(
self.VOLUME_ID, ignore_missing=False)
self.assertIsNone(sot)
super(TestSnapshot, self).tearDown()
def test_get(self):
sot = self.conn.block_storage.get_snapshot(self.SNAPSHOT_ID)
sot = self.user_cloud.block_storage.get_snapshot(self.SNAPSHOT_ID)
self.assertEqual(self.SNAPSHOT_NAME, sot.name)

View File

@ -23,17 +23,18 @@ class TestType(base.BaseBlockStorageTest):
self.TYPE_NAME = self.getUniqueString()
self.TYPE_ID = None
sot = self.conn.block_storage.create_type(name=self.TYPE_NAME)
sot = self.operator_cloud.block_storage.create_type(
name=self.TYPE_NAME)
assert isinstance(sot, _type.Type)
self.assertEqual(self.TYPE_NAME, sot.name)
self.TYPE_ID = sot.id
def tearDown(self):
sot = self.conn.block_storage.delete_type(
sot = self.operator_cloud.block_storage.delete_type(
self.TYPE_ID, ignore_missing=False)
self.assertIsNone(sot)
super(TestType, self).tearDown()
def test_get(self):
sot = self.conn.block_storage.get_type(self.TYPE_ID)
sot = self.operator_cloud.block_storage.get_type(self.TYPE_ID)
self.assertEqual(self.TYPE_NAME, sot.name)

View File

@ -19,13 +19,16 @@ class TestVolume(base.BaseBlockStorageTest):
def setUp(self):
super(TestVolume, self).setUp()
if not self.user_cloud.has_service('block-storage'):
self.skipTest('block-storage service not supported by cloud')
self.VOLUME_NAME = self.getUniqueString()
self.VOLUME_ID = None
volume = self.conn.block_storage.create_volume(
volume = self.user_cloud.block_storage.create_volume(
name=self.VOLUME_NAME,
size=1)
self.conn.block_storage.wait_for_status(
self.user_cloud.block_storage.wait_for_status(
volume,
status='available',
failures=['error'],
@ -36,12 +39,12 @@ class TestVolume(base.BaseBlockStorageTest):
self.VOLUME_ID = volume.id
def tearDown(self):
sot = self.conn.block_storage.delete_volume(
sot = self.user_cloud.block_storage.delete_volume(
self.VOLUME_ID,
ignore_missing=False)
self.assertIsNone(sot)
super(TestVolume, self).tearDown()
def test_get(self):
sot = self.conn.block_storage.get_volume(self.VOLUME_ID)
sot = self.user_cloud.block_storage.get_volume(self.VOLUME_ID)
self.assertEqual(self.VOLUME_NAME, sot.name)