Skip v2 block-storage tests when service is not found
We have skips in for the cloud functional tests. Add the same skips for the resource layer tests. Since the resource layer is tied to version 2, put in an explicit config override in the setUp to set block_storage_api_version. Shift block_storage v2 tests to use self.user_cloud and not self.conn. In doing so, test_type turns out to need admin privs, so use self.op_cloud for it. Change-Id: I8b231e7b5e0ebad6751702d9f0cb85e3d2192ac5
This commit is contained in:
parent
e18899d19f
commit
7027c17b32
|
@ -26,4 +26,8 @@ class BaseBlockStorageTest(base.BaseFunctionalTest):
|
|||
|
||||
def setUp(self):
|
||||
super(BaseBlockStorageTest, self).setUp()
|
||||
self.skipTest('block-storage v2 functional tests broken')
|
||||
self._set_user_cloud(block_storage_api_version='2')
|
||||
self._set_operator_cloud(block_storage_api_version='2')
|
||||
|
||||
if not self.user_cloud.has_service('block-storage'):
|
||||
self.skipTest('block-storage service not supported by cloud')
|
||||
|
|
|
@ -20,7 +20,7 @@ class TestBackup(base.BaseBlockStorageTest):
|
|||
def setUp(self):
|
||||
super(TestBackup, self).setUp()
|
||||
|
||||
if not self.conn.has_service('object-store'):
|
||||
if not self.user_cloud.has_service('object-store'):
|
||||
self.skipTest('Object service is requred, but not available')
|
||||
|
||||
self.VOLUME_NAME = self.getUniqueString()
|
||||
|
@ -28,10 +28,10 @@ class TestBackup(base.BaseBlockStorageTest):
|
|||
self.BACKUP_NAME = self.getUniqueString()
|
||||
self.BACKUP_ID = None
|
||||
|
||||
volume = self.conn.block_storage.create_volume(
|
||||
volume = self.user_cloud.block_storage.create_volume(
|
||||
name=self.VOLUME_NAME,
|
||||
size=1)
|
||||
self.conn.block_storage.wait_for_status(
|
||||
self.user_cloud.block_storage.wait_for_status(
|
||||
volume,
|
||||
status='available',
|
||||
failures=['error'],
|
||||
|
@ -40,10 +40,10 @@ class TestBackup(base.BaseBlockStorageTest):
|
|||
assert isinstance(volume, _volume.Volume)
|
||||
self.VOLUME_ID = volume.id
|
||||
|
||||
backup = self.conn.block_storage.create_backup(
|
||||
backup = self.user_cloud.block_storage.create_backup(
|
||||
name=self.BACKUP_NAME,
|
||||
volume_id=volume.id)
|
||||
self.conn.block_storage.wait_for_status(
|
||||
self.user_cloud.block_storage.wait_for_status(
|
||||
backup,
|
||||
status='available',
|
||||
failures=['error'],
|
||||
|
@ -54,15 +54,15 @@ class TestBackup(base.BaseBlockStorageTest):
|
|||
self.BACKUP_ID = backup.id
|
||||
|
||||
def tearDown(self):
|
||||
sot = self.conn.block_storage.delete_backup(
|
||||
sot = self.user_cloud.block_storage.delete_backup(
|
||||
self.BACKUP_ID,
|
||||
ignore_missing=False)
|
||||
sot = self.conn.block_storage.delete_volume(
|
||||
sot = self.user_cloud.block_storage.delete_volume(
|
||||
self.VOLUME_ID,
|
||||
ignore_missing=False)
|
||||
self.assertIsNone(sot)
|
||||
super(TestBackup, self).tearDown()
|
||||
|
||||
def test_get(self):
|
||||
sot = self.conn.block_storage.get_backup(self.BACKUP_ID)
|
||||
sot = self.user_cloud.block_storage.get_backup(self.BACKUP_ID)
|
||||
self.assertEqual(self.BACKUP_NAME, sot.name)
|
||||
|
|
|
@ -26,10 +26,10 @@ class TestSnapshot(base.BaseBlockStorageTest):
|
|||
self.VOLUME_NAME = self.getUniqueString()
|
||||
self.VOLUME_ID = None
|
||||
|
||||
volume = self.conn.block_storage.create_volume(
|
||||
volume = self.user_cloud.block_storage.create_volume(
|
||||
name=self.VOLUME_NAME,
|
||||
size=1)
|
||||
self.conn.block_storage.wait_for_status(
|
||||
self.user_cloud.block_storage.wait_for_status(
|
||||
volume,
|
||||
status='available',
|
||||
failures=['error'],
|
||||
|
@ -38,10 +38,10 @@ class TestSnapshot(base.BaseBlockStorageTest):
|
|||
assert isinstance(volume, _volume.Volume)
|
||||
self.assertEqual(self.VOLUME_NAME, volume.name)
|
||||
self.VOLUME_ID = volume.id
|
||||
snapshot = self.conn.block_storage.create_snapshot(
|
||||
snapshot = self.user_cloud.block_storage.create_snapshot(
|
||||
name=self.SNAPSHOT_NAME,
|
||||
volume_id=self.VOLUME_ID)
|
||||
self.conn.block_storage.wait_for_status(
|
||||
self.user_cloud.block_storage.wait_for_status(
|
||||
snapshot,
|
||||
status='available',
|
||||
failures=['error'],
|
||||
|
@ -52,17 +52,17 @@ class TestSnapshot(base.BaseBlockStorageTest):
|
|||
self.SNAPSHOT_ID = snapshot.id
|
||||
|
||||
def tearDown(self):
|
||||
snapshot = self.conn.block_storage.get_snapshot(self.SNAPSHOT_ID)
|
||||
sot = self.conn.block_storage.delete_snapshot(
|
||||
snapshot = self.user_cloud.block_storage.get_snapshot(self.SNAPSHOT_ID)
|
||||
sot = self.user_cloud.block_storage.delete_snapshot(
|
||||
snapshot, ignore_missing=False)
|
||||
self.conn.block_storage.wait_for_delete(
|
||||
self.user_cloud.block_storage.wait_for_delete(
|
||||
snapshot, interval=2, wait=self._wait_for_timeout)
|
||||
self.assertIsNone(sot)
|
||||
sot = self.conn.block_storage.delete_volume(
|
||||
sot = self.user_cloud.block_storage.delete_volume(
|
||||
self.VOLUME_ID, ignore_missing=False)
|
||||
self.assertIsNone(sot)
|
||||
super(TestSnapshot, self).tearDown()
|
||||
|
||||
def test_get(self):
|
||||
sot = self.conn.block_storage.get_snapshot(self.SNAPSHOT_ID)
|
||||
sot = self.user_cloud.block_storage.get_snapshot(self.SNAPSHOT_ID)
|
||||
self.assertEqual(self.SNAPSHOT_NAME, sot.name)
|
||||
|
|
|
@ -23,17 +23,18 @@ class TestType(base.BaseBlockStorageTest):
|
|||
self.TYPE_NAME = self.getUniqueString()
|
||||
self.TYPE_ID = None
|
||||
|
||||
sot = self.conn.block_storage.create_type(name=self.TYPE_NAME)
|
||||
sot = self.operator_cloud.block_storage.create_type(
|
||||
name=self.TYPE_NAME)
|
||||
assert isinstance(sot, _type.Type)
|
||||
self.assertEqual(self.TYPE_NAME, sot.name)
|
||||
self.TYPE_ID = sot.id
|
||||
|
||||
def tearDown(self):
|
||||
sot = self.conn.block_storage.delete_type(
|
||||
sot = self.operator_cloud.block_storage.delete_type(
|
||||
self.TYPE_ID, ignore_missing=False)
|
||||
self.assertIsNone(sot)
|
||||
super(TestType, self).tearDown()
|
||||
|
||||
def test_get(self):
|
||||
sot = self.conn.block_storage.get_type(self.TYPE_ID)
|
||||
sot = self.operator_cloud.block_storage.get_type(self.TYPE_ID)
|
||||
self.assertEqual(self.TYPE_NAME, sot.name)
|
||||
|
|
|
@ -19,13 +19,16 @@ class TestVolume(base.BaseBlockStorageTest):
|
|||
def setUp(self):
|
||||
super(TestVolume, self).setUp()
|
||||
|
||||
if not self.user_cloud.has_service('block-storage'):
|
||||
self.skipTest('block-storage service not supported by cloud')
|
||||
|
||||
self.VOLUME_NAME = self.getUniqueString()
|
||||
self.VOLUME_ID = None
|
||||
|
||||
volume = self.conn.block_storage.create_volume(
|
||||
volume = self.user_cloud.block_storage.create_volume(
|
||||
name=self.VOLUME_NAME,
|
||||
size=1)
|
||||
self.conn.block_storage.wait_for_status(
|
||||
self.user_cloud.block_storage.wait_for_status(
|
||||
volume,
|
||||
status='available',
|
||||
failures=['error'],
|
||||
|
@ -36,12 +39,12 @@ class TestVolume(base.BaseBlockStorageTest):
|
|||
self.VOLUME_ID = volume.id
|
||||
|
||||
def tearDown(self):
|
||||
sot = self.conn.block_storage.delete_volume(
|
||||
sot = self.user_cloud.block_storage.delete_volume(
|
||||
self.VOLUME_ID,
|
||||
ignore_missing=False)
|
||||
self.assertIsNone(sot)
|
||||
super(TestVolume, self).tearDown()
|
||||
|
||||
def test_get(self):
|
||||
sot = self.conn.block_storage.get_volume(self.VOLUME_ID)
|
||||
sot = self.user_cloud.block_storage.get_volume(self.VOLUME_ID)
|
||||
self.assertEqual(self.VOLUME_NAME, sot.name)
|
||||
|
|
Loading…
Reference in New Issue