Enhance unit tests for zfssa drivers

This fix improves the unit tests for the zfssa drivers
and increases the test coverage.

Change-Id: I5a143b7ca4bf8e6560c7e389e1fd9b0e23e4ea16
Closes-Bug: #1404339
This commit is contained in:
Abhiram Moturi 2015-07-21 00:58:23 +00:00
parent 814cbb8a93
commit af3a73e720
2 changed files with 479 additions and 314 deletions

View File

@ -1,4 +1,4 @@
# Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2014, 2015, Oracle and/or its affiliates. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -17,11 +17,14 @@ import json
import mock
from oslo_utils import units
import six
from cinder import exception
from cinder import test
from cinder.tests.unit import fake_utils
from cinder.volume import configuration as conf
from cinder.volume.drivers.zfssa import restclient as client
from cinder.volume.drivers.zfssa import webdavclient
from cinder.volume.drivers.zfssa import zfssaiscsi as iscsi
from cinder.volume.drivers.zfssa import zfssanfs
from cinder.volume.drivers.zfssa import zfssarest as rest
@ -31,267 +34,15 @@ nfs_logbias = 'latency'
nfs_compression = 'off'
class FakeZFSSA(object):
"""Fake ZFS SA."""
def __init__(self):
self.user = None
self.host = None
def login(self, user):
self.user = user
def set_host(self, host, timeout=None):
self.host = host
def create_project(self, pool, project, compression, logbias):
out = {}
if not self.host or not self.user:
return out
out = {"status": "online",
"name": "pool",
"usage": {"available": 10,
"total": 10,
"dedupratio": 100,
"used": 1},
"peer": "00000000-0000-0000-0000-000000000000",
"owner": "host",
"asn": "11111111-2222-3333-4444-555555555555"}
return out
def create_initiator(self, init, initgrp, chapuser, chapsecret):
out = {}
if not self.host or not self.user:
return out
out = {"href": "fake_href",
"alias": "fake_alias",
"initiator": "fake_iqn.1993-08.org.fake:01:000000000000",
"chapuser": "",
"chapsecret": ""
}
return out
def add_to_initiatorgroup(self, init, initgrp):
r = rest.ZFSSAApi()
type(r).rclient = mock.PropertyMock(return_value=FakeAddIni2InitGrp())
r.add_to_initiatorgroup(init, initgrp)
def create_target(self, tgtalias, inter, tchapuser, tchapsecret):
out = {}
if not self.host or not self.user:
return out
out = {"href": "fake_href",
"alias": "fake_tgtgrp",
"iqn": "iqn.1986-03.com.sun:02:00000-aaaa-bbbb-cccc-ddddd",
"auth": "none",
"targetchapuser": "",
"targetchapsecret": "",
"interfaces": ["eth0"]
}
return out
def add_to_targetgroup(self, iqn, tgtgrp):
out = {}
if not self.host or not self.user:
return {}
out = {"href": "fake_href",
"name": "fake_tgtgrp",
"targets": ["iqn.1986-03.com.sun:02:00000-aaaa-bbbb-cccc-ddddd"]
}
return out
def get_lun(self, pool, project, lun):
ret = {
'guid': '600144F0F8FBD5BD000053CE53AB0001',
'number': 0,
'initiatorgroup': 'fake_initgrp',
'size': 1 * units.Gi
}
return ret
def get_target(self, target):
return 'iqn.1986-03.com.sun:02:00000-aaaa-bbbb-cccc-ddddd'
def create_lun(self, pool, project, lun, volsize, targetgroup, specs):
out = {}
if not self.host and not self.user:
return out
out = {
"status": "online",
"lunguid": "600144F0F8FBD5BD000053CE53AB0001",
"initiatorgroup": ["fake_initgrp"],
"volsize": volsize,
"pool": pool,
"name": lun,
"project": project,
"targetgroup": targetgroup,
"lun": {"assignednumber": 0},
}
if specs:
out.update(specs)
return out
def delete_lun(self, pool, project, lun):
out = {}
if not self.host and not self.user:
return out
out = {"pool": pool,
"project": project,
"name": lun}
return out
def create_snapshot(self, pool, project, vol, snap):
out = {}
if not self.host and not self.user:
return {}
out = {"name": snap,
"numclones": 0,
"share": vol,
"project": project,
"pool": pool}
return out
def delete_snapshot(self, pool, project, vol, snap):
out = {}
if not self.host and not self.user:
return {}
out = {"name": snap,
"share": vol,
"project": project,
"pool": pool}
return out
def clone_snapshot(self, pool, project, pvol, snap, vol):
out = {}
if not self.host and not self.user:
return out
out = {"origin": {"project": project,
"snapshot": snap,
"share": pvol,
"pool": pool},
"logbias": "latency",
"assignednumber": 1,
"status": "online",
"lunguid": "600144F0F8FBD5BD000053CE67A50002",
"volsize": 1,
"pool": pool,
"name": vol,
"project": project}
return out
def set_lun_initiatorgroup(self, pool, project, vol, initgrp):
out = {}
if not self.host and not self.user:
return out
out = {"lunguid": "600144F0F8FBD5BD000053CE67A50002",
"pool": pool,
"name": vol,
"project": project,
"initiatorgroup": ["fake_initgrp"]}
return out
def has_clones(self, pool, project, vol, snapshot):
return False
def set_lun_props(self, pool, project, vol, **kargs):
out = {}
if not self.host and not self.user:
return out
out = {"pool": pool,
"name": vol,
"project": project,
"volsize": kargs['volsize']}
return out
def get_initiator_initiatorgroup(self, initiator):
ret = ['test-init-grp1']
return ret
class FakeResponse(object):
def __init__(self, statuscode, data='data'):
self.status = statuscode
self.data = data
class FakeNFSZFSSA(FakeZFSSA):
"""Fake ZFS SA for the NFS Driver."""
def set_webdav(self, https_path, auth_str):
self.webdavclient = https_path
def create_share(self, pool, project, share, args):
out = {}
if not self.host and not self.user:
return out
out = {"logbias": nfs_logbias,
"compression": nfs_compression,
"status": "online",
"pool": pool,
"name": share,
"project": project,
"mountpoint": '/export/nfs_share'}
return out
def get_share(self, pool, project, share):
out = {}
if not self.host and not self.user:
return out
out = {"logbias": nfs_logbias,
"compression": nfs_compression,
"encryption": "off",
"status": "online",
"pool": pool,
"name": share,
"project": project,
"mountpoint": '/export/nfs_share'}
return out
def create_snapshot_of_volume_file(self, src_file="", dst_file=""):
out = {}
if not self.host and not self.user:
return out
out = {"status": 201}
return out
def delete_snapshot_of_volume_file(self, src_file=""):
out = {}
if not self.host and not self.user:
return out
out = {"status": 204}
return out
def create_volume_from_snapshot_file(self, src_file="", dst_file="",
method='COPY'):
out = {}
if not self.host and not self.user:
return out
out = {"status": 202}
return out
def modify_service(self, service, args):
out = {}
if not self.host and not self.user:
return out
out = {"service": {"<status>": "online"}}
return out
def enable_service(self, service):
out = {}
if not self.host and not self.user:
return out
out = {"service": {"<status>": "online"}}
return out
class FakeSSL(object):
def _create_unverified_context(self):
return 'fakecontext'
class TestZFSSAISCSIDriver(test.TestCase):
@ -321,7 +72,7 @@ class TestZFSSAISCSIDriver(test.TestCase):
def setUp(self, _factory_zfssa):
super(TestZFSSAISCSIDriver, self).setUp()
self._create_fake_config()
_factory_zfssa.return_value = FakeZFSSA()
_factory_zfssa.return_value = mock.MagicMock(spec=rest.ZFSSAApi)
iscsi.ZFSSAISCSIDriver._execute = fake_utils.fake_execute
self.drv = iscsi.ZFSSAISCSIDriver(configuration=self.configuration)
self.drv.do_setup({})
@ -355,45 +106,117 @@ class TestZFSSAISCSIDriver(test.TestCase):
self.configuration.safe_get = self.fake_safe_get
def test_create_delete_volume(self):
self.drv.zfssa.get_lun.return_value = {'guid':
'00000000000000000000000000000',
'number': 0,
'initiatorgroup': 'default',
'size': 1,
'nodestroy': False}
lcfg = self.configuration
self.drv.create_volume(self.test_vol)
self.drv.zfssa.create_lun.assert_called_once_with(
lcfg.zfssa_pool,
lcfg.zfssa_project,
self.test_vol['name'],
six.text_type(self.test_vol['size']) + 'g',
lcfg.zfssa_target_group,
mock.ANY)
self.drv.delete_volume(self.test_vol)
self.drv.zfssa.get_lun.assert_called_once_with(lcfg.zfssa_pool,
lcfg.zfssa_project,
self.test_vol['name'])
self.drv.zfssa.delete_lun.assert_called_once_with(
pool=lcfg.zfssa_pool,
project=lcfg.zfssa_project,
lun=self.test_vol['name'])
def test_create_delete_snapshot(self):
self.drv.create_volume(self.test_vol)
self.drv.zfssa.has_clones.return_value = False
lcfg = self.configuration
self.drv.create_snapshot(self.test_snap)
self.drv.zfssa.create_snapshot.assert_called_once_with(
lcfg.zfssa_pool,
lcfg.zfssa_project,
self.test_snap['volume_name'],
self.test_snap['name'])
self.drv.delete_snapshot(self.test_snap)
self.drv.delete_volume(self.test_vol)
self.drv.zfssa.delete_snapshot.assert_called_once_with(
lcfg.zfssa_pool,
lcfg.zfssa_project,
self.test_snap['volume_name'],
self.test_snap['name'])
def test_create_volume_from_snapshot(self):
self.drv.create_volume(self.test_vol)
@mock.patch.object(iscsi.ZFSSAISCSIDriver, '_verify_clone_size')
def test_create_volume_from_snapshot(self, _verify_clone_size):
self.drv._verify_clone_size.return_value = True
lcfg = self.configuration
self.drv.create_snapshot(self.test_snap)
self.drv.zfssa.create_snapshot.assert_called_once_with(
lcfg.zfssa_pool,
lcfg.zfssa_project,
self.test_snap['volume_name'],
self.test_snap['name'])
self.drv.create_volume_from_snapshot(self.test_vol_snap,
self.test_snap)
self.drv.delete_volume(self.test_vol)
self.drv._verify_clone_size.assert_called_once_with(
self.test_snap,
self.test_vol_snap['size'] * units.Gi)
self.drv.zfssa.clone_snapshot.assert_called_once_with(
lcfg.zfssa_pool,
lcfg.zfssa_project,
self.test_snap['volume_name'],
self.test_snap['name'],
self.test_vol_snap['name'])
def test_remove_export(self):
self.drv.create_volume(self.test_vol)
self.drv.terminate_connection(self.test_vol, '')
self.drv.delete_volume(self.test_vol)
def test_volume_attach_detach(self):
self.drv.create_volume(self.test_vol)
@mock.patch.object(iscsi.ZFSSAISCSIDriver, '_get_provider_info')
def test_volume_attach_detach(self, _get_provider_info):
lcfg = self.configuration
test_target_iqn = 'iqn.1986-03.com.sun:02:00000-aaaa-bbbb-cccc-ddddd'
stub_val = {'provider_location':
'%s %s 0' % (lcfg.zfssa_target_portal, test_target_iqn)}
self.drv._get_provider_info.return_value = stub_val
connector = dict(initiator='iqn.1-0.org.deb:01:d7')
props = self.drv.initialize_connection(self.test_vol, connector)
self.drv._get_provider_info.assert_called_once_with(self.test_vol)
self.assertEqual('iscsi', props['driver_volume_type'])
self.assertEqual(self.test_vol['id'], props['data']['volume_id'])
self.assertEqual(lcfg.zfssa_target_portal,
props['data']['target_portal'])
self.assertEqual(test_target_iqn, props['data']['target_iqn'])
self.assertEqual('0', props['data']['target_lun'])
self.assertFalse(props['data']['target_discovered'])
self.drv.terminate_connection(self.test_vol, '')
self.drv.delete_volume(self.test_vol)
self.drv.zfssa.set_lun_initiatorgroup.assert_called_once_with(
lcfg.zfssa_pool,
lcfg.zfssa_project,
self.test_vol['name'],
'')
def test_get_volume_stats(self):
self.drv.get_volume_stats(refresh=False)
self.drv.zfssa.get_pool_stats.return_value = 2 * units.Gi, 3 * units.Gi
lcfg = self.configuration
stats = self.drv.get_volume_stats(refresh=True)
self.drv.zfssa.get_pool_stats.assert_called_once_with(lcfg.zfssa_pool)
self.assertEqual('Oracle', stats['vendor_name'])
self.assertEqual(self.configuration.volume_backend_name,
stats['volume_backend_name'])
self.assertEqual(self.drv.VERSION, stats['driver_version'])
self.assertEqual(self.drv.protocol, stats['storage_protocol'])
self.assertEqual(0, stats['reserved_percentage'])
self.assertFalse(stats['QoS_support'])
self.assertEqual(3, stats['total_capacity_gb'])
self.assertEqual(2, stats['free_capacity_gb'])
def test_extend_volume(self):
self.drv.create_volume(self.test_vol)
lcfg = self.configuration
self.drv.extend_volume(self.test_vol, 3)
self.drv.delete_volume(self.test_vol)
self.drv.zfssa.set_lun_props.assert_called_once_with(
lcfg.zfssa_pool,
lcfg.zfssa_project,
self.test_vol['name'],
volsize= 3 * units.Gi)
@mock.patch('cinder.volume.volume_types.get_volume_type_extra_specs')
def test_get_voltype_specs(self, get_volume_type_extra_specs):
@ -422,35 +245,6 @@ class TestZFSSAISCSIDriver(test.TestCase):
return val
class FakeAddIni2InitGrp(object):
def logout(self):
result = client.RestResult()
result.status = client.Status.ACCEPTED
return result
def get(self, path, **kwargs):
result = client.RestResult()
result.status = client.Status.OK
result.data = json.JSONEncoder().encode({'group':
{'initiators':
['iqn.1-0.org.deb:01:d7']}})
return result
def put(self, path, body="", **kwargs):
result = client.RestResult()
result.status = client.Status.ACCEPTED
return result
def post(self, path, body="", **kwargs):
result = client.RestResult()
result.status = client.Status.CREATED
return result
def islogin(self):
return True
class TestZFSSANFSDriver(test.TestCase):
test_vol = {
@ -476,7 +270,7 @@ class TestZFSSANFSDriver(test.TestCase):
def setUp(self, _factory_zfssa):
super(TestZFSSANFSDriver, self).setUp()
self._create_fake_config()
_factory_zfssa.return_value = FakeNFSZFSSA()
_factory_zfssa.return_value = mock.MagicMock(spec=rest.ZFSSANfsApi)
self.drv = zfssanfs.ZFSSANFSDriver(configuration=self.configuration)
self.drv._execute = fake_utils.fake_execute
self.drv.do_setup({})
@ -499,17 +293,31 @@ class TestZFSSANFSDriver(test.TestCase):
self.configuration.nfs_used_ratio = 1
def test_create_delete_snapshot(self):
lcfg = self.configuration
self.drv.create_snapshot(self.test_snap)
self.drv.zfssa.create_snapshot.assert_called_once_with(
lcfg.zfssa_nfs_pool,
lcfg.zfssa_nfs_project,
lcfg.zfssa_nfs_share,
mock.ANY)
self.drv.zfssa.create_snapshot_of_volume_file.assert_called_once_with(
src_file=mock.ANY,
dst_file=self.test_snap['name'])
self.drv.delete_snapshot(self.test_snap)
self.drv.zfssa.delete_snapshot_of_volume_file.assert_called_with(
src_file=self.test_snap['name'])
def test_create_volume_from_snapshot(self):
self.drv.create_snapshot(self.test_snap)
with mock.patch.object(self.drv, '_ensure_shares_mounted'):
prov_loc = self.drv.create_volume_from_snapshot(self.test_vol_snap,
self.test_snap,
method='COPY')
self.assertEqual('2.2.2.2:/export/nfs_share',
prov_loc['provider_location'])
self.drv.create_volume_from_snapshot(self.test_vol_snap,
self.test_snap,
method='COPY')
self.drv.zfssa.create_volume_from_snapshot_file.\
assert_called_once_with(src_file=self.test_snap['name'],
dst_file=self.test_vol_snap['name'],
method='COPY')
def test_get_volume_stats(self):
self.drv._mounted_shares = ['nfs_share']
@ -518,7 +326,360 @@ class TestZFSSANFSDriver(test.TestCase):
mock_get_share_capacity_info:
mock_get_share_capacity_info.return_value = (1073741824,
9663676416)
self.drv.get_volume_stats(refresh=True)
stats = self.drv.get_volume_stats(refresh=True)
self.assertEqual(1, stats['free_capacity_gb'])
self.assertEqual(10, stats['total_capacity_gb'])
def tearDown(self):
super(TestZFSSANFSDriver, self).tearDown()
class TestZFSSAApi(test.TestCase):
@mock.patch.object(rest, 'factory_restclient')
def setUp(self, _restclient):
super(TestZFSSAApi, self).setUp()
self.host = 'fakehost'
self.user = 'fakeuser'
self.url = None
self.pool = 'fakepool'
self.project = 'fakeproject'
self.vol = 'fakevol'
self.snap = 'fakesnapshot'
self.clone = 'fakeclone'
self.targetalias = 'fakealias'
_restclient.return_value = mock.MagicMock(spec=client.RestClientURL)
self.zfssa = rest.ZFSSAApi()
self.zfssa.set_host('fakehost')
self.pool_url = '/api/storage/v1/pools/'
def _create_response(self, status, data='data'):
response = FakeResponse(status, data)
return response
def test_create_project(self):
self.zfssa.rclient.get.return_value = self._create_response(
client.Status.OK)
self.zfssa.create_project(self.pool, self.project)
expected_svc = self.pool_url + self.pool + '/projects/' + self.project
self.zfssa.rclient.get.assert_called_with(expected_svc)
def test_create_initiator(self):
self.zfssa.rclient.get.return_value = self._create_response(
client.Status.OK)
initiator = 'iqn.1986-03.com.sun:02:00000-aaaa-bbbb-cccc-ddddd'
alias = 'init-group'
self.zfssa.create_initiator(initiator, alias)
self.zfssa.rclient.get.assert_called_with(
'/api/san/v1/iscsi/initiators/alias=' + alias)
def test_create_target(self):
self.zfssa.rclient.get.return_value = self._create_response(
client.Status.NOT_FOUND)
ret_val = json.dumps(
{'target': {'iqn':
'iqn.1986-03.com.sun:02:00000-aaaa-bbbb-cccc-ddddd'}})
self.zfssa.rclient.post.return_value = self._create_response(
client.Status.CREATED, ret_val)
alias = 'tgt-group'
self.zfssa.create_target(alias)
self.zfssa.rclient.post.assert_called_with('/api/san/v1/iscsi/targets',
{'alias': alias})
def test_get_target(self):
ret_val = json.dumps(
{'target': {'href': 'fake_href',
'alias': 'tgt-group',
'iqn':
'iqn.1986-03.com.sun:02:00000-aaaa-bbbb-cccc-ddddd',
'targetchapuser': '',
'targetchapsecret': '',
'interfaces': ['nge0']}})
self.zfssa.rclient.get.return_value = self._create_response(
client.Status.OK, ret_val)
ret = self.zfssa.get_target('tgt-group')
self.zfssa.rclient.get.assert_called_once_with(
'/api/san/v1/iscsi/targets/alias=tgt-group')
self.assertEqual('iqn.1986-03.com.sun:02:00000-aaaa-bbbb-cccc-ddddd',
ret)
def test_verify_pool(self):
self.zfssa.rclient.get.return_value = self._create_response(
client.Status.OK)
self.zfssa.verify_pool(self.pool)
self.zfssa.rclient.get.assert_called_with(self.pool_url + self.pool)
def test_verify_project(self):
self.zfssa.rclient.get.return_value = self._create_response(
client.Status.NOT_FOUND)
self.assertRaises(exception.VolumeBackendAPIException,
self.zfssa.verify_project,
self.pool,
self.project)
def test_verify_initiator(self):
self.zfssa.rclient.get.return_value = self._create_response(
client.Status.OK)
self.zfssa.verify_initiator('iqn.1-0.org.deb:01:d7')
self.zfssa.rclient.get.assert_called_with(
'/api/san/v1/iscsi/initiators/iqn.1-0.org.deb:01:d7')
def test_verify_target(self):
self.zfssa.rclient.get.return_value = self._create_response(
client.Status.BAD_REQUEST)
self.assertRaises(exception.VolumeBackendAPIException,
self.zfssa.verify_target,
self.targetalias)
def test_create_delete_lun(self):
arg = json.dumps({'name': self.vol,
'initiatorgroup': 'com.sun.ms.vss.hg.maskAll'})
self.zfssa.rclient.post.return_value = self._create_response(
client.Status.CREATED, data=arg)
self.zfssa.create_lun(self.pool, self.project, self.vol, 1, 'tgt-grp',
None)
expected_arg = {'name': self.vol,
'volsize': 1,
'targetgroup': 'tgt-grp',
'initiatorgroup': 'com.sun.ms.vss.hg.maskAll'}
self.zfssa.rclient.post.assert_called_with(
self.pool_url + self.pool + '/projects/' + self.project + '/luns',
expected_arg)
self.zfssa.rclient.delete.return_value = self._create_response(
client.Status.NO_CONTENT)
self.zfssa.delete_lun(self.pool, self.project, self.vol)
self.zfssa.rclient.delete.assert_called_with(
self.pool_url + self.pool + '/projects/' + self.project +
'/luns/' + self.vol)
def test_create_delete_snapshot(self):
self.zfssa.rclient.post.return_value = self._create_response(
client.Status.CREATED)
self.zfssa.create_snapshot(self.pool,
self.project,
self.vol,
self.snap)
expected_arg = {'name': self.snap}
self.zfssa.rclient.post.assert_called_with(
self.pool_url + self.pool + '/projects/' + self.project +
'/luns/' + self.vol + '/snapshots', expected_arg)
self.zfssa.rclient.delete.return_value = self._create_response(
client.Status.NO_CONTENT)
self.zfssa.delete_snapshot(self.pool,
self.project,
self.vol,
self.snap)
self.zfssa.rclient.delete.assert_called_with(
self.pool_url + self.pool + '/projects/' + self.project +
'/luns/' + self.vol + '/snapshots/' + self.snap)
def test_clone_snapshot(self):
self.zfssa.rclient.put.return_value = self._create_response(
client.Status.CREATED)
self.zfssa.clone_snapshot(self.pool,
self.project,
self.vol,
self.snap,
self.clone)
expected_svc = '/api/storage/v1/pools/' + self.pool + '/projects/' + \
self.project + '/luns/' + self.vol + '/snapshots/' + self.snap + \
'/clone'
expected_arg = {'project': self.project,
'share': self.clone,
'nodestroy': True}
self.zfssa.rclient.put.assert_called_with(expected_svc, expected_arg)
class TestZFSSANfsApi(test.TestCase):
@mock.patch.object(rest, 'factory_restclient')
def setUp(self, _restclient):
super(TestZFSSANfsApi, self).setUp()
self.host = 'fakehost'
self.user = 'fakeuser'
self.url = None
self.pool = 'fakepool'
self.project = 'fakeproject'
self.share = 'fakeshare'
self.snap = 'fakesnapshot'
self.targetalias = 'fakealias'
_restclient.return_value = mock.MagicMock(spec=client.RestClientURL)
self.webdavclient = mock.MagicMock(spec=webdavclient.ZFSSAWebDAVClient)
self.zfssa = rest.ZFSSANfsApi()
self.zfssa.set_host('fakehost')
self.pool_url = '/api/storage/v1/pools/'
def _create_response(self, status, data='data'):
response = FakeResponse(status, data)
return response
def test_verify_share(self):
self.zfssa.rclient.get.return_value = self._create_response(
client.Status.OK)
self.zfssa.verify_share(self.pool, self.project, self.share)
self.zfssa.rclient.get.assert_called_with(self.pool_url + self.pool +
'/projects/' + self.project +
'/filesystems/' + self.share)
def test_create_delete_snapshot(self):
self.zfssa.rclient.post.return_value = self._create_response(
client.Status.CREATED)
self.zfssa.create_snapshot(self.pool,
self.project,
self.share,
self.snap)
expected_arg = {'name': self.snap}
self.zfssa.rclient.post.assert_called_with(
self.pool_url + self.pool + '/projects/' + self.project +
'/filesystems/' + self.share + '/snapshots', expected_arg)
self.zfssa.rclient.delete.return_value = self._create_response(
client.Status.NO_CONTENT)
self.zfssa.delete_snapshot(self.pool,
self.project,
self.share,
self.snap)
self.zfssa.rclient.delete.assert_called_with(
self.pool_url + self.pool + '/projects/' + self.project +
'/filesystems/' + self.share + '/snapshots/' + self.snap)
def create_delete_snapshot_of_volume_file(self):
src_file = "fake_src_file"
dst_file = "fake_dst_file"
self.zfssa.create_snapshot_of_volume_file(src_file=src_file,
dst_file=dst_file)
self.zfssa.webdavclient.request.assert_called_once_with(
src_file=src_file,
dst_file=dst_file,
method='COPY')
self.zfssa.delete_snapshot_of_volume_file(src_file=src_file)
self.zfssa.webdavclient.request.assert_called_once_with(
src_file=src_file, method='DELETE')
def test_get_share(self):
ret_val = json.dumps({'filesystem': 'test_fs'})
self.zfssa.rclient.get.return_value = self._create_response(
client.Status.OK, ret_val)
ret = self.zfssa.get_share(self.pool, self.project, self.share)
self.zfssa.rclient.get.assert_called_with(self.pool_url + self.pool +
'/projects/' + self.project +
'/filesystems/' + self.share)
self.assertEqual('test_fs', ret)
def test_create_share(self):
self.zfssa.rclient.get.return_value = self._create_response(
client.Status.NOT_FOUND)
self.zfssa.rclient.post.return_value = self._create_response(
client.Status.BAD_REQUEST)
self.assertRaises(exception.VolumeBackendAPIException,
self.zfssa.create_share,
self.pool,
self.project,
self.share,
{})
@mock.patch.object(rest.ZFSSANfsApi, '_change_service_state')
@mock.patch.object(rest.ZFSSANfsApi, 'verify_service')
def test_enable_disable_modify_service(self,
verify_service,
_change_service_state):
self.zfssa.enable_service('http')
self.zfssa._change_service_state.assert_called_with(
'http', state='enable')
self.zfssa.verify_service.assert_called_with('http')
self.zfssa.disable_service('http')
self.zfssa._change_service_state.assert_called_with(
'http', state='disable')
self.zfssa.verify_service.assert_called_with('http', status='offline')
ret_val = json.dumps({'service': {
"href": "/api/service/v1/services/http",
"<status>": "online",
"require_login": False,
"protocols": "http/https",
"listen_port": 81,
"https_port": 443}})
self.zfssa.rclient.put.return_value = self._create_response(
client.Status.ACCEPTED, ret_val)
args = {'listen_port': 81}
self.zfssa.modify_service('http', args)
self.zfssa.rclient.put.called_with('/api/service/v1/services/http',
args)
class TestRestClientURL(test.TestCase):
def setUp(self):
super(TestRestClientURL, self).setUp()
self.timeout = 60
self.url = '1.1.1.1'
self.client = client.RestClientURL(self.url, timeout=self.timeout)
@mock.patch.object(client.RestClientURL, 'request')
def test_post(self, _request):
path = '/api/storage/v1/pools'
body = {'name': 'fakepool'}
self.client.post(path, body=body)
self.client.request.assert_called_with(path, 'POST', body)
@mock.patch.object(client.RestClientURL, 'request')
def test_get(self, _request):
path = '/api/storage/v1/pools'
self.client.get(path)
self.client.request.assert_called_with(path, 'GET')
@mock.patch.object(client.RestClientURL, 'request')
def test_put(self, _request):
path = '/api/storage/v1/pools'
body = {'name': 'fakepool'}
self.client.put(path, body=body)
self.client.request.assert_called_with(path, 'PUT', body)
@mock.patch.object(client.RestClientURL, 'request')
def test_delete(self, _request):
path = '/api/storage/v1/pools'
self.client.delete(path)
self.client.request.assert_called_with(path, 'DELETE')
@mock.patch.object(client.RestClientURL, 'request')
def test_head(self, _request):
path = '/api/storage/v1/pools'
self.client.head(path)
self.client.request.assert_called_with(path, 'HEAD')
@mock.patch.object(client, 'RestResult')
@mock.patch.object(client.urllib.request, 'Request')
@mock.patch.object(client.urllib.request, 'urlopen')
def test_request(self, _urlopen, _Request, _RestResult):
path = '/api/storage/v1/pools'
_urlopen.return_value = mock.Mock()
self.client.request(path, mock.ANY)
_Request.assert_called_with(self.url + path, None, self.client.headers)
_urlopen.assert_called_with(mock.ANY, timeout=self.timeout)
_RestResult.assert_called_with(response=mock.ANY)
@mock.patch.object(client, 'RestResult')
@mock.patch.object(client.urllib.request, 'Request')
@mock.patch.object(client.urllib.request, 'urlopen')
@mock.patch.object(client, 'ssl', new_callable=FakeSSL)
def test_ssl_with_context(self, _ssl, _urlopen, _Request, _RestResult):
"""Test PEP476 certificate opt_out fix. """
path = '/api/storage/v1/pools'
_urlopen.return_value = mock.Mock()
self.client.request(path, mock.ANY)
_urlopen.assert_called_once_with(mock.ANY,
timeout=self.timeout,
context='fakecontext')
@mock.patch.object(client, 'RestResult')
@mock.patch.object(client.urllib.request, 'Request')
@mock.patch.object(client.urllib.request, 'urlopen')
@mock.patch.object(client, 'ssl', new_callable=object)
def test_ssl_no_context(self, _ssl, _urlopen, _Request, _RestResult):
"""Verify the PEP476 fix backward compatibility. """
path = '/api/storage/v1/pools'
_urlopen.return_value = mock.Mock()
self.client.request(path, mock.ANY)
_urlopen.assert_called_once_with(mock.ANY, timeout=self.timeout)

View File

@ -26,6 +26,10 @@ from cinder.volume.drivers.zfssa import webdavclient
LOG = log.getLogger(__name__)
def factory_restclient(url, **kwargs):
return restclient.RestClientURL(url, **kwargs)
class ZFSSAApi(object):
"""ZFSSA API proxy class"""
@ -60,7 +64,7 @@ class ZFSSAApi(object):
def set_host(self, host, timeout=None):
self.host = host
self.url = "https://" + self.host + ":215"
self.rclient = restclient.RestClientURL(self.url, timeout=timeout)
self.rclient = factory_restclient(self.url, timeout=timeout)
def login(self, auth_str):
"""Login to the appliance"""