Merge "VMAX driver - Implement IPv6 support for Dell EMC VMAX driver"

This commit is contained in:
Zuul 2018-06-12 17:18:59 +00:00 committed by Gerrit Code Review
commit 11344744e4
3 changed files with 807 additions and 28 deletions

View File

@ -16,6 +16,7 @@
import copy
import random
import six
from oslo_config import cfg
from oslo_log import log
@ -33,7 +34,11 @@ from manila.share.drivers.dell_emc.plugins.vmax import (
from manila.share import utils as share_utils
from manila import utils
VERSION = "1.0.0"
"""Version history:
1.0.0 - Initial version
2.0.0 - Implement IPv6 support
"""
VERSION = "2.0.0"
LOG = log.getLogger(__name__)
@ -71,6 +76,7 @@ class VMAXStorageConnection(driver.StorageConnection):
self.reserved_percentage = None
self.driver_handles_share_servers = True
self.port_conf = None
self.ipv6_implemented = True
def create_share(self, context, share, share_server=None):
"""Create a share and export it based on protocol used."""
@ -171,7 +177,7 @@ class VMAXStorageConnection(driver.StorageConnection):
LOG.error(message)
raise exception.EMCVmaxXMLAPIError(err=message)
interface = server['interfaces'][0]
interface = enas_utils.export_unc_path(server['interfaces'][0])
self._get_context('CIFSShare').create(share_name, server['name'],
vdm_name)
@ -193,8 +199,11 @@ class VMAXStorageConnection(driver.StorageConnection):
self._get_context('NFSShare').create(share_name, vdm_name)
nfs_if = enas_utils.convert_ipv6_format_if_needed(
share_server['backend_details']['nfs_if'])
return ('%(nfs_if)s:/%(share_name)s'
% {'nfs_if': share_server['backend_details']['nfs_if'],
% {'nfs_if': nfs_if,
'share_name': share_name})
def create_share_from_snapshot(self, context, share, snapshot,
@ -222,10 +231,13 @@ class VMAXStorageConnection(driver.StorageConnection):
self._allocate_container_from_snapshot(
share, snapshot, share_server, pool_name)
nfs_if = enas_utils.convert_ipv6_format_if_needed(
share_server['backend_details']['nfs_if'])
if share_proto == 'NFS':
self._create_nfs_share(share_name, share_server)
location = ('%(nfs_if)s:/%(share_name)s'
% {'nfs_if': share_server['backend_details']['nfs_if'],
% {'nfs_if': nfs_if,
'share_name': share_name})
elif share_proto == 'CIFS':
location = self._create_cifs_share(share_name, share_server)
@ -412,7 +424,9 @@ class VMAXStorageConnection(driver.StorageConnection):
white_list = []
for rule in access_rules:
self.allow_access(context, share, rule, share_server)
white_list.append(rule['access_to'])
white_list.append(
enas_utils.convert_ipv6_format_if_needed(
rule['access_to']))
self.clear_access(share, share_server, white_list)
def clear_access(self, share, share_server, white_list):
@ -487,9 +501,9 @@ class VMAXStorageConnection(driver.StorageConnection):
status, server = self._get_context('CIFSServer').get(server_name,
vdm_name)
if status != constants.STATUS_OK:
message = (_("CIFS server %s not found.") % server_name)
LOG.error(message)
raise exception.EMCVmaxXMLAPIError(err=message)
message = (_("CIFS server %s not found.") % server_name)
LOG.error(message)
raise exception.EMCVmaxXMLAPIError(err=message)
self._get_context('CIFSShare').deny_share_access(
vdm_name,
@ -508,7 +522,7 @@ class VMAXStorageConnection(driver.StorageConnection):
LOG.warning("Only ip access type allowed.")
return
host_ip = access['access_to']
host_ip = enas_utils.convert_ipv6_format_if_needed(access['access_to'])
self._get_context('NFSShare').deny_share_access(share['id'], host_ip,
vdm_name)
@ -619,6 +633,7 @@ class VMAXStorageConnection(driver.StorageConnection):
'snapshot_support': True,
'create_share_from_snapshot_support': True,
'revert_to_snapshot_support': False,
'ipv6_support': True
}
stats_dict['pools'].append(pool_stat)
@ -686,20 +701,26 @@ class VMAXStorageConnection(driver.StorageConnection):
'share server...', vdm_name)
self._get_context('VDM').create(vdm_name, self.mover_name)
netmask = utils.cidr_to_netmask(network_info['cidr'])
devices = self.get_managed_ports()
for net_info in network_info['network_allocations']:
random.shuffle(devices)
ip_version = net_info['ip_version']
interface = {
'name': net_info['id'][-12:],
'device_name': devices[0],
'ip': net_info['ip_address'],
'mover_name': self.mover_name,
'net_mask': netmask,
'vlan_id': vlan_id if vlan_id else -1,
}
if ip_version == 6:
interface['ip_version'] = ip_version
interface['net_mask'] = six.text_type(
utils.cidr_to_prefixlen(
network_info['cidr']))
else:
interface['net_mask'] = utils.cidr_to_netmask(
network_info['cidr'])
self._get_context('MoverInterface').create(interface)

View File

@ -1160,19 +1160,21 @@ class MoverInterface(StorageObject):
mover_id = self._get_mover_id(mover_name, False)
params = dict(device=device_name,
ipAddress=six.text_type(ip_addr),
mover=mover_id,
name=name,
netMask=net_mask,
vlanid=six.text_type(vlan_id))
if interface.get('ip_version') == 6:
params['ipVersion'] = 'IPv6'
if self.xml_retry:
self.xml_retry = False
request = self._build_task_package(
self.elt_maker.NewMoverInterface(
device=device_name,
ipAddress=six.text_type(ip_addr),
mover=mover_id,
name=name,
netMask=net_mask,
vlanid=six.text_type(vlan_id)
)
)
self.elt_maker.NewMoverInterface(**params))
response = self._send_request(request)
@ -1871,13 +1873,14 @@ class NFSShare(StorageObject):
for field in fields:
field = field.strip()
if field.startswith('rw='):
nfs_share['RwHosts'] = field[3:].split(":")
nfs_share['RwHosts'] = vmax_utils.parse_ipaddr(field[3:])
elif field.startswith('access='):
nfs_share['AccessHosts'] = field[7:].split(":")
nfs_share['AccessHosts'] = vmax_utils.parse_ipaddr(
field[7:])
elif field.startswith('root='):
nfs_share['RootHosts'] = field[5:].split(":")
nfs_share['RootHosts'] = vmax_utils.parse_ipaddr(field[5:])
elif field.startswith('ro='):
nfs_share['RoHosts'] = field[3:].split(":")
nfs_share['RoHosts'] = vmax_utils.parse_ipaddr(field[3:])
self.nfs_share_map[name] = nfs_share
else:
@ -1898,6 +1901,9 @@ class NFSShare(StorageObject):
changed = False
rwhosts = share['RwHosts']
rohosts = share['RoHosts']
host_ip = vmax_utils.convert_ipv6_format_if_needed(host_ip)
if access_level == const.ACCESS_LEVEL_RW:
if host_ip not in rwhosts:
rwhosts.append(host_ip)

View File

@ -173,7 +173,51 @@ class StorageConnectionTestCase(test.TestCase):
ssh_calls = [mock.call(self.cifs_share.cmd_disable_access(), True)]
ssh_cmd_mock.assert_has_calls(ssh_calls)
self.assertEqual(location, r'\\192.168.1.1\%s' % share['name'],
self.assertEqual(location, r'\\%s\%s' %
(fakes.FakeData.network_allocations_ip1,
share['name']),
'CIFS export path is incorrect')
def test_create_cifs_share_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
hook.append(self.pool.resp_get_succeed())
hook.append(self.fs.resp_task_succeed())
hook.append(self.cifs_share.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append()
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
location = self.connection.create_share(None, share, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
mock.call(self.pool.req_get()),
mock.call(self.fs.req_create_on_vdm()),
mock.call(self.cifs_share.req_create(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [mock.call(self.cifs_share.cmd_disable_access(), True)]
ssh_cmd_mock.assert_has_calls(ssh_calls)
self.assertEqual(location, r'\\%s.ipv6-literal.net\%s' %
(fakes.FakeData.network_allocations_ip3.replace(
':', '-'),
share['name']),
'CIFS export path is incorrect')
def test_create_nfs_share(self):
@ -207,6 +251,41 @@ class StorageConnectionTestCase(test.TestCase):
self.assertEqual(location, '192.168.1.2:/%s' % share['name'],
'NFS export path is incorrect')
def test_create_nfs_share_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.pool.resp_get_succeed())
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.fs.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_create())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
location = self.connection.create_share(None, share, share_server)
expected_calls = [
mock.call(self.pool.req_get()),
mock.call(self.vdm.req_get()),
mock.call(self.fs.req_create_on_vdm()),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [mock.call(self.nfs_share.cmd_create(), True)]
ssh_cmd_mock.assert_has_calls(ssh_calls)
self.assertEqual(location, '[%s]:/%s' %
(fakes.FakeData.network_allocations_ip4,
share['name']),
'NFS export path is incorrect')
def test_create_cifs_share_without_share_server(self):
share = fakes.CIFS_SHARE
@ -336,6 +415,70 @@ class StorageConnectionTestCase(test.TestCase):
self.assertEqual(location, r'\\192.168.1.1\%s' % share['name'],
'CIFS export path is incorrect')
def test_create_cifs_share_from_snapshot_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
snapshot = fake_share.fake_snapshot(
name=fakes.FakeData.src_snap_name,
share_name=fakes.FakeData.src_share_name,
share_id=fakes.FakeData.src_share_name,
id=fakes.FakeData.src_snap_name)
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_succeed())
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
hook.append(self.cifs_share.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.mover.output_get_interconnect_id())
ssh_hook.append()
ssh_hook.append()
ssh_hook.append(self.fs.output_copy_ckpt)
ssh_hook.append(self.fs.output_info())
ssh_hook.append()
ssh_hook.append()
ssh_hook.append()
ssh_hook.append()
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
location = self.connection.create_share_from_snapshot(
None, share, snapshot, share_server)
expected_calls = [
mock.call(self.fs.req_get()),
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
mock.call(self.cifs_share.req_create(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.mover.cmd_get_interconnect_id(), False),
mock.call(self.fs.cmd_create_from_ckpt(), False),
mock.call(self.mount.cmd_server_mount('ro'), False),
mock.call(self.fs.cmd_copy_ckpt(), True),
mock.call(self.fs.cmd_nas_fs_info(), False),
mock.call(self.mount.cmd_server_umount(), False),
mock.call(self.fs.cmd_delete(), False),
mock.call(self.mount.cmd_server_mount('rw'), False),
mock.call(self.cifs_share.cmd_disable_access(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
self.assertEqual(location, r'\\%s.ipv6-literal.net\%s' %
(fakes.FakeData.network_allocations_ip3.replace(':',
'-'),
share['name']),
'CIFS export path is incorrect')
def test_create_nfs_share_from_snapshot(self):
share_server = fakes.SHARE_SERVER
share = fakes.NFS_SHARE
@ -385,6 +528,57 @@ class StorageConnectionTestCase(test.TestCase):
self.assertEqual(location, '192.168.1.2:/%s' % share['name'],
'NFS export path is incorrect')
def test_create_nfs_share_from_snapshot_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
snapshot = fake_share.fake_snapshot(
name=fakes.FakeData.src_snap_name,
share_name=fakes.FakeData.src_share_name,
share_id=fakes.FakeData.src_share_name,
id=fakes.FakeData.src_snap_name)
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.mover.output_get_interconnect_id())
ssh_hook.append()
ssh_hook.append()
ssh_hook.append(self.fs.output_copy_ckpt)
ssh_hook.append(self.fs.output_info())
ssh_hook.append()
ssh_hook.append()
ssh_hook.append()
ssh_hook.append(self.nfs_share.output_create())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
location = self.connection.create_share_from_snapshot(
None, share, snapshot, share_server)
expected_calls = [mock.call(self.fs.req_get())]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.mover.cmd_get_interconnect_id(), False),
mock.call(self.fs.cmd_create_from_ckpt(), False),
mock.call(self.mount.cmd_server_mount('ro'), False),
mock.call(self.fs.cmd_copy_ckpt(), True),
mock.call(self.fs.cmd_nas_fs_info(), False),
mock.call(self.mount.cmd_server_umount(), False),
mock.call(self.fs.cmd_delete(), False),
mock.call(self.mount.cmd_server_mount('rw'), False),
mock.call(self.nfs_share.cmd_create(), True)
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
self.assertEqual(location, '[%s]:/%s' %
(fakes.FakeData.network_allocations_ip4,
share['name']),
'NFS export path is incorrect')
def test_create_share_with_incorrect_proto(self):
share_server = fakes.SHARE_SERVER
share = fake_share.fake_share(share_proto='FAKE_PROTO')
@ -440,6 +634,34 @@ class StorageConnectionTestCase(test.TestCase):
]
xml_req_mock.assert_has_calls(expected_calls)
def test_delete_cifs_share_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.cifs_share.resp_get_succeed(self.vdm.vdm_id))
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_share.resp_task_succeed())
hook.append(self.mount.resp_task_succeed())
hook.append(self.fs.resp_get_succeed())
hook.append(self.fs.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.connection.delete_share(None, share, share_server)
expected_calls = [
mock.call(self.cifs_share.req_get()),
mock.call(self.vdm.req_get()),
mock.call(self.cifs_share.req_delete(self.vdm.vdm_id)),
mock.call(self.mount.req_delete(self.vdm.vdm_id)),
mock.call(self.fs.req_get()),
mock.call(self.fs.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_delete_nfs_share(self):
share_server = fakes.SHARE_SERVER
share = fakes.NFS_SHARE
@ -476,6 +698,44 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_delete_nfs_share_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.mount.resp_task_succeed())
hook.append(self.fs.resp_get_succeed())
hook.append(self.fs.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=self.nfs_share.rw_hosts,
ro_hosts=self.nfs_share.ro_hosts))
ssh_hook.append(self.nfs_share.output_delete_succeed())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.delete_share(None, share, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.mount.req_delete(self.vdm.vdm_id)),
mock.call(self.fs.req_get()),
mock.call(self.fs.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.nfs_share.cmd_get(), False),
mock.call(self.nfs_share.cmd_delete(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_delete_share_without_share_server(self):
share = fakes.CIFS_SHARE
@ -538,6 +798,27 @@ class StorageConnectionTestCase(test.TestCase):
]
xml_req_mock.assert_has_calls(expected_calls)
def test_extend_share_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
new_size = fakes.FakeData.new_size
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_succeed())
hook.append(self.pool.resp_get_succeed())
hook.append(self.fs.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.connection.extend_share(share, new_size, share_server)
expected_calls = [
mock.call(self.fs.req_get()),
mock.call(self.pool.req_get()),
mock.call(self.fs.req_extend()),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_extend_share_without_pool_name(self):
share_server = fakes.SHARE_SERVER
share = fake_share.fake_share(host='HostA@BackendB',
@ -569,6 +850,27 @@ class StorageConnectionTestCase(test.TestCase):
]
xml_req_mock.assert_has_calls(expected_calls)
def test_create_snapshot_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
snapshot = fake_share.fake_snapshot(
id=fakes.FakeData.snapshot_name,
share_id=fakes.FakeData.filesystem_name,
share_name=fakes.FakeData.share_name)
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_succeed())
hook.append(self.snap.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.connection.create_snapshot(None, snapshot, share_server)
expected_calls = [
mock.call(self.fs.req_get()),
mock.call(self.snap.req_create()),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_create_snapshot_with_incorrect_share_info(self):
share_server = fakes.SHARE_SERVER
snapshot = fake_share.fake_snapshot(
@ -609,6 +911,27 @@ class StorageConnectionTestCase(test.TestCase):
]
xml_req_mock.assert_has_calls(expected_calls)
def test_delete_snapshot_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
snapshot = fake_share.fake_snapshot(
id=fakes.FakeData.snapshot_name,
share_id=fakes.FakeData.filesystem_name,
share_name=fakes.FakeData.share_name)
hook = utils.RequestSideEffect()
hook.append(self.snap.resp_get_succeed())
hook.append(self.snap.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.connection.delete_snapshot(None, snapshot, share_server)
expected_calls = [
mock.call(self.snap.req_get()),
mock.call(self.snap.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
@utils.patch_get_managed_ports_vnx(return_value=['cge-1-0'])
def test_setup_server(self):
hook = utils.RequestSideEffect()
@ -630,8 +953,8 @@ class StorageConnectionTestCase(test.TestCase):
self.connection.setup_server(fakes.NETWORK_INFO, None)
if_name_1 = fakes.FakeData.network_allocations_id1[-12:]
if_name_2 = fakes.FakeData.network_allocations_id2[-12:]
if_name_1 = fakes.FakeData.interface_name1
if_name_2 = fakes.FakeData.interface_name2
expected_calls = [
mock.call(self.vdm.req_get()),
@ -654,6 +977,59 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
@utils.patch_get_managed_ports_vnx(return_value=['cge-1-0'])
def test_setup_server_with_ipv6(self):
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_but_not_found())
hook.append(self.mover.resp_get_ref_succeed())
hook.append(self.vdm.resp_task_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.dns.resp_task_succeed())
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append()
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.setup_server(fakes.NETWORK_INFO_IPV6, None)
if_name_1 = fakes.FakeData.interface_name3
if_name_2 = fakes.FakeData.interface_name4
expect_ip_1 = fakes.FakeData.network_allocations_ip3
expect_ip_2 = fakes.FakeData.network_allocations_ip4
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.mover.req_get_ref()),
mock.call(self.vdm.req_create()),
mock.call(self.mover.req_create_interface_with_ipv6(
if_name=if_name_1,
ip=expect_ip_1)),
mock.call(self.mover.req_create_interface_with_ipv6(
if_name=if_name_2,
ip=expect_ip_2)),
mock.call(self.dns.req_create(
ip_addr=fakes.FakeData.dns_ipv6_address)),
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_create(
self.vdm.vdm_id,
ip_addr=fakes.FakeData.network_allocations_ip3)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.vdm.cmd_attach_nfs_interface(
interface=fakes.FakeData.interface_name4), False),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
@utils.patch_get_managed_ports_vnx(return_value=['cge-1-0'])
def test_setup_server_with_existing_vdm(self):
hook = utils.RequestSideEffect()
@ -834,6 +1210,51 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_teardown_server_with_ipv6(self):
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True))
hook.append(self.cifs_server.resp_task_succeed())
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=False))
hook.append(self.mover.resp_get_ref_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.vdm.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.vdm.output_get_interfaces_vdm())
ssh_hook.append()
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.teardown_server(fakes.SERVER_DETAIL_IPV6,
fakes.SECURITY_SERVICE_IPV6)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
mock.call(self.cifs_server.req_modify(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=False)),
mock.call(self.cifs_server.req_delete(self.vdm.vdm_id)),
mock.call(self.mover.req_get_ref()),
mock.call(self.mover.req_delete_interface(
fakes.FakeData.network_allocations_ip3)),
mock.call(self.mover.req_delete_interface(
fakes.FakeData.network_allocations_ip4)),
mock.call(self.vdm.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.vdm.cmd_get_interfaces(), False),
mock.call(self.vdm.cmd_detach_nfs_interface(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_teardown_server_without_server_detail(self):
self.connection.teardown_server(None, fakes.SECURITY_SERVICE)
@ -1006,6 +1427,40 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_add_cifs_rw_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
access = fakes.CIFS_RW_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.update_access(None, share, [], [access], [],
share_server=share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_deny_nfs(self):
share_server = fakes.SHARE_SERVER
share = fakes.NFS_SHARE
@ -1037,6 +1492,37 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_deny_nfs_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
access = fakes.NFS_RW_ACCESS
rw_hosts = copy.deepcopy(fakes.FakeData.rw_hosts_ipv6)
rw_hosts.append(access['access_to'])
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=rw_hosts,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_hook.append(self.nfs_share.output_set_access_success())
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=fakes.FakeData.rw_hosts_ipv6,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_cmd_mock = utils.EMCNFSShareMock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.update_access(None, share, [], [], [access],
share_server=share_server)
ssh_calls = [
mock.call(self.nfs_share.cmd_get(), True),
mock.call(self.nfs_share.cmd_set_access(
rw_hosts=self.nfs_share.rw_hosts_ipv6,
ro_hosts=self.nfs_share.ro_hosts_ipv6), True),
mock.call(self.nfs_share.cmd_get(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_recover_nfs_rule(self):
share_server = fakes.SHARE_SERVER
share = fakes.NFS_SHARE
@ -1069,6 +1555,38 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_recover_nfs_rule_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
access = fakes.NFS_RW_ACCESS_IPV6
hosts = ['fdf8:f53b:82e1::5']
rw_hosts = copy.deepcopy(fakes.FakeData.rw_hosts_ipv6)
rw_hosts.append(access['access_to'])
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=rw_hosts,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_hook.append(self.nfs_share.output_set_access_success())
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=hosts,
ro_hosts=[]))
ssh_cmd_mock = utils.EMCNFSShareMock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.update_access(None, share, [access], [], [],
share_server=share_server)
ssh_calls = [
mock.call(self.nfs_share.cmd_get(), True),
mock.call(self.nfs_share.cmd_set_access(
rw_hosts=hosts,
ro_hosts=[]), True),
mock.call(self.nfs_share.cmd_get(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_recover_cifs_rule(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
@ -1106,6 +1624,46 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_recover_cifs_rule_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
access = fakes.CIFS_RW_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_hook.append(fakes.FakeData.cifs_access)
ssh_hook.append('Command succeeded')
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.update_access(None, share, [access], [], [],
share_server=share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access(), True),
mock.call(self.cifs_share.cmd_get_access(), True),
mock.call(self.cifs_share.cmd_change_access(
action='revoke', user='guest'), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_cifs_clear_access_server_not_found(self):
server = fakes.SHARE_SERVER
@ -1157,6 +1715,39 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_allow_cifs_rw_access_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
access = fakes.CIFS_RW_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.allow_access(None, share, access, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_allow_cifs_ro_access(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
@ -1187,6 +1778,39 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_allow_cifs_ro_access_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
access = fakes.CIFS_RO_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.allow_access(None, share, access, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access('ro'), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_allow_ro_access_without_share_server_name(self):
share = fakes.CIFS_SHARE
share_server = copy.deepcopy(fakes.SHARE_SERVER)
@ -1277,6 +1901,37 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_allow_nfs_access_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
access = fakes.NFS_RW_ACCESS_IPV6
rw_hosts = copy.deepcopy(fakes.FakeData.rw_hosts_ipv6)
rw_hosts.append(access['access_to'])
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=fakes.FakeData.rw_hosts_ipv6,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_hook.append(self.nfs_share.output_set_access_success())
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=rw_hosts,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_cmd_mock = utils.EMCNFSShareMock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.allow_access(None, share, access, share_server)
ssh_calls = [
mock.call(self.nfs_share.cmd_get(), True),
mock.call(self.nfs_share.cmd_set_access(
rw_hosts=rw_hosts,
ro_hosts=self.nfs_share.ro_hosts_ipv6),
True),
mock.call(self.nfs_share.cmd_get(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_allow_cifs_access_with_incorrect_access_type(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
@ -1335,6 +1990,40 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_deny_cifs_rw_access_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
access = fakes.CIFS_RW_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.deny_access(None, share, access, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access(action='revoke'),
True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_deny_cifs_ro_access(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
@ -1365,6 +2054,39 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_deny_cifs_ro_access_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
access = fakes.CIFS_RO_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.deny_access(None, share, access, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access('ro', 'revoke'), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_deny_cifs_access_with_invliad_share_server_name(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
@ -1416,6 +2138,36 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_deny_nfs_access_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
access = fakes.NFS_RW_ACCESS_IPV6
rw_hosts = copy.deepcopy(fakes.FakeData.rw_hosts_ipv6)
rw_hosts.append(access['access_to'])
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=rw_hosts,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_hook.append(self.nfs_share.output_set_access_success())
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=fakes.FakeData.rw_hosts_ipv6,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_cmd_mock = utils.EMCNFSShareMock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.deny_access(None, share, access, share_server)
ssh_calls = [
mock.call(self.nfs_share.cmd_get(), True),
mock.call(self.nfs_share.cmd_set_access(
rw_hosts=self.nfs_share.rw_hosts_ipv6,
ro_hosts=self.nfs_share.ro_hosts_ipv6), True),
mock.call(self.nfs_share.cmd_get(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_deny_access_with_incorrect_proto(self):
share_server = fakes.SHARE_SERVER
share = fake_share.fake_share(share_proto='FAKE_PROTO')