Merge "os-brick refactor get_connector_properties"

This commit is contained in:
Jenkins 2016-05-04 17:17:37 +00:00 committed by Gerrit Code Review
commit 6d71d55caf
4 changed files with 376 additions and 51 deletions

View File

@ -38,13 +38,11 @@ from oslo_concurrency import lockutils
from oslo_concurrency import processutils as putils
from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_utils import importutils
from oslo_utils import strutils
import six
from six.moves import urllib
S390X = "s390x"
S390 = "s390"
from os_brick import exception
from os_brick import executor
from os_brick import utils
@ -54,7 +52,6 @@ from os_brick.initiator import linuxfc
from os_brick.initiator import linuxrbd
from os_brick.initiator import linuxscsi
from os_brick.initiator import linuxsheepdog
from os_brick.privileged import rootwrap as priv_rootwrap
from os_brick.remotefs import remotefs
from os_brick.i18n import _, _LE, _LI, _LW
@ -66,6 +63,15 @@ MULTIPATH_ERROR_REGEX = re.compile("\w{3} \d+ \d\d:\d\d:\d\d \|.*$")
MULTIPATH_DEV_CHECK_REGEX = re.compile("\s+dm-\d+\s+")
MULTIPATH_PATH_CHECK_REGEX = re.compile("\s+\d+:\d+:\d+:\d+\s+")
PLATFORM_ALL = 'ALL'
PLATFORM_x86 = 'X86'
PLATFORM_S390 = 'S390'
OS_TYPE_ALL = 'ALL'
OS_TYPE_LINUX = 'LINUX'
S390X = "s390x"
S390 = "s390"
ISCSI = "ISCSI"
ISER = "ISER"
FIBRE_CHANNEL = "FIBRE_CHANNEL"
@ -84,19 +90,21 @@ DISCO = "DISCO"
VZSTORAGE = "VZSTORAGE"
SHEEPDOG = "SHEEPDOG"
def _check_multipathd_running(root_helper, enforce_multipath):
try:
priv_rootwrap.execute('multipathd', 'show', 'status',
run_as_root=True, root_helper=root_helper)
except putils.ProcessExecutionError as err:
LOG.error(_LE('multipathd is not running: exit code %(err)s'),
{'err': err.exit_code})
if enforce_multipath:
raise
return False
return True
connector_list = [
'os_brick.initiator.connector.InitiatorConnector',
'os_brick.initiator.connector.ISCSIConnector',
'os_brick.initiator.connector.FibreChannelConnector',
'os_brick.initiator.connector.FibreChannelConnectorS390X',
'os_brick.initiator.connector.AoEConnector',
'os_brick.initiator.connector.RemoteFsConnector',
'os_brick.initiator.connector.RBDConnector',
'os_brick.initiator.connector.LocalConnector',
'os_brick.initiator.connector.DRBDConnector',
'os_brick.initiator.connector.HuaweiStorHyperConnector',
'os_brick.initiator.connector.HGSTConnector',
'os_brick.initiator.connector.ScaleIOConnector',
'os_brick.initiator.connector.DISCOConnector',
]
def get_connector_properties(root_helper, my_ip, multipath, enforce_multipath,
@ -123,32 +131,39 @@ def get_connector_properties(root_helper, my_ip, multipath, enforce_multipath,
:type enforce_multipath: bool
:returns: dict containing all of the collected initiator values.
"""
iscsi = ISCSIConnector(root_helper=root_helper)
fc = linuxfc.LinuxFibreChannel(root_helper=root_helper)
props = {}
props['ip'] = my_ip
props['host'] = host if host else socket.gethostname()
initiator = iscsi.get_initiator()
if initiator:
props['initiator'] = initiator
wwpns = fc.get_fc_wwpns()
if wwpns:
props['wwpns'] = wwpns
wwnns = fc.get_fc_wwnns()
if wwnns:
props['wwnns'] = wwnns
props['multipath'] = (multipath and
_check_multipathd_running(root_helper,
enforce_multipath))
props['platform'] = platform.machine()
props['os_type'] = sys.platform
props['ip'] = my_ip
props['host'] = host if host else socket.gethostname()
for item in connector_list:
connector = importutils.import_class(item)
if (utils.platform_matches(props['platform'], connector.platform) and
utils.os_matches(props['os_type'], connector.os_type)):
LOG.debug("Fetching connector for %s" % connector.__name__)
props = utils.merge_dict(props,
connector.get_connector_properties(
root_helper,
host=host,
multipath=multipath,
enforce_multipath=enforce_multipath))
return props
@six.add_metaclass(abc.ABCMeta)
class InitiatorConnector(executor.Executor):
# This object can be used on any platform (x86, S390)
platform = PLATFORM_ALL
# This object can be used on any os type (linux, windows)
# TODO(walter-boring) This class stil has a reliance on
# linuxscsi object, making it specific to linux. Need to fix that.
os_type = OS_TYPE_LINUX
def __init__(self, root_helper, driver=None, execute=None,
device_scan_attempts=DEVICE_SCAN_ATTEMPTS_DEFAULT,
*args, **kwargs):
@ -162,9 +177,21 @@ class InitiatorConnector(executor.Executor):
def set_driver(self, driver):
"""The driver is used to find used LUNs."""
self.driver = driver
@staticmethod
def get_connector_properties(root_helper, *args, **kwargs):
"""The generic connector properties."""
multipath = kwargs['multipath']
enforce_multipath = kwargs['enforce_multipath']
props = {}
# TODO(walter-boring) move this into platform specific lib
props['multipath'] = (multipath and
linuxscsi.LinuxSCSI.is_multipath_running(
enforce_multipath, root_helper))
return props
@staticmethod
def factory(protocol, root_helper, driver=None,
use_multipath=False,
@ -514,6 +541,17 @@ class ISCSIConnector(InitiatorConnector):
self.use_multipath = use_multipath
self.transport = self._validate_iface_transport(transport)
@staticmethod
def get_connector_properties(root_helper, *args, **kwargs):
"""The iSCSI connector properties."""
props = {}
iscsi = ISCSIConnector(root_helper=root_helper)
initiator = iscsi.get_initiator()
if initiator:
props['initiator'] = initiator
return props
def get_search_path(self):
"""Where do we look for iSCSI based volumes."""
return '/dev/disk/by-path'
@ -1318,6 +1356,21 @@ class FibreChannelConnector(InitiatorConnector):
self._linuxscsi.set_execute(execute)
self._linuxfc.set_execute(execute)
@staticmethod
def get_connector_properties(root_helper, *args, **kwargs):
"""The Fibre Channel connector properties."""
props = {}
fc = linuxfc.LinuxFibreChannel(root_helper)
wwpns = fc.get_fc_wwpns()
if wwpns:
props['wwpns'] = wwpns
wwnns = fc.get_fc_wwnns()
if wwnns:
props['wwnns'] = wwnns
return props
def get_search_path(self):
"""Where do we look for FC based volumes."""
return '/dev/disk/by-path'
@ -1550,6 +1603,7 @@ class FibreChannelConnector(InitiatorConnector):
class FibreChannelConnectorS390X(FibreChannelConnector):
"""Connector class to attach/detach Fibre Channel volumes on S390X arch."""
platform = PLATFORM_S390
def __init__(self, root_helper, driver=None,
execute=None, use_multipath=False,
@ -1613,6 +1667,7 @@ class FibreChannelConnectorS390X(FibreChannelConnector):
class AoEConnector(InitiatorConnector):
"""Connector class to attach/detach AoE volumes."""
def __init__(self, root_helper, driver=None,
device_scan_attempts=DEVICE_SCAN_ATTEMPTS_DEFAULT,
*args, **kwargs):
@ -1622,6 +1677,11 @@ class AoEConnector(InitiatorConnector):
device_scan_attempts=device_scan_attempts,
*args, **kwargs)
@staticmethod
def get_connector_properties(root_helper, *args, **kwargs):
"""The AoE connector properties."""
return {}
def get_search_path(self):
return '/dev/etherd'
@ -1785,6 +1845,11 @@ class RemoteFsConnector(InitiatorConnector):
device_scan_attempts=device_scan_attempts,
*args, **kwargs)
@staticmethod
def get_connector_properties(root_helper, *args, **kwargs):
"""The RemoteFS connector properties."""
return {}
def set_execute(self, execute):
super(RemoteFsConnector, self).set_execute(execute)
self._remotefsclient.set_execute(execute)
@ -1852,6 +1917,11 @@ class RBDConnector(InitiatorConnector):
device_scan_attempts,
*args, **kwargs)
@staticmethod
def get_connector_properties(root_helper, *args, **kwargs):
"""The RBD connector properties."""
return {}
def get_volume_paths(self, connection_properties):
# TODO(walter-boring): don't know where the connector
# looks for RBD volumes.
@ -1938,6 +2008,11 @@ class LocalConnector(InitiatorConnector):
super(LocalConnector, self).__init__(root_helper, driver=driver,
*args, **kwargs)
@staticmethod
def get_connector_properties(root_helper, *args, **kwargs):
"""The Local connector properties."""
return {}
def get_volume_paths(self, connection_properties):
path = connection_properties['device_path']
return [path]
@ -1987,6 +2062,20 @@ class LocalConnector(InitiatorConnector):
class DRBDConnector(InitiatorConnector):
""""Connector class to attach/detach DRBD resources."""
def __init__(self, root_helper, driver=None,
execute=putils.execute, *args, **kwargs):
super(DRBDConnector, self).__init__(root_helper, driver=driver,
execute=execute, *args, **kwargs)
self._execute = execute
self._root_helper = root_helper
@staticmethod
def get_connector_properties(root_helper, *args, **kwargs):
"""The DRBD connector properties."""
return {}
def check_valid_device(self, path, run_as_root=True):
"""Verify an existing volume."""
# TODO(linbit): check via drbdsetup first, to avoid blocking/hanging
@ -2058,6 +2147,7 @@ class DRBDConnector(InitiatorConnector):
class HuaweiStorHyperConnector(InitiatorConnector):
""""Connector class to attach/detach SDSHypervisor volumes."""
attached_success_code = 0
has_been_attached_code = 50151401
attach_mnid_done_code = 50151405
@ -2080,6 +2170,11 @@ class HuaweiStorHyperConnector(InitiatorConnector):
driver=driver,
*args, **kwargs)
@staticmethod
def get_connector_properties(root_helper, *args, **kwargs):
"""The HuaweiStor connector properties."""
return {}
def get_search_path(self):
# TODO(walter-boring): Where is the location on the filesystem to
# look for Huawei volumes to show up?
@ -2216,6 +2311,7 @@ class HuaweiStorHyperConnector(InitiatorConnector):
class HGSTConnector(InitiatorConnector):
"""Connector class to attach/detach HGST volumes."""
VGCCLUSTER = 'vgc-cluster'
def __init__(self, root_helper, driver=None,
@ -2227,6 +2323,11 @@ class HGSTConnector(InitiatorConnector):
*args, **kwargs)
self._vgc_host = None
@staticmethod
def get_connector_properties(root_helper, *args, **kwargs):
"""The HGST connector properties."""
return {}
def _log_cli_err(self, err):
"""Dumps the full command output to a logfile in error cases."""
LOG.error(_LE("CLI fail: '%(cmd)s' = %(code)s\nout: %(stdout)s\n"
@ -2363,6 +2464,7 @@ class HGSTConnector(InitiatorConnector):
class ScaleIOConnector(InitiatorConnector):
"""Class implements the connector driver for ScaleIO."""
OK_STATUS_CODE = 200
VOLUME_NOT_MAPPED_ERROR = 84
VOLUME_ALREADY_MAPPED_ERROR = 81
@ -2390,6 +2492,11 @@ class ScaleIOConnector(InitiatorConnector):
self.iops_limit = None
self.bandwidth_limit = None
@staticmethod
def get_connector_properties(root_helper, *args, **kwargs):
"""The ScaleIO connector properties."""
return {}
def get_search_path(self):
return "/dev/disk/by-id"
@ -2830,6 +2937,11 @@ class DISCOConnector(InitiatorConnector):
self.server_port = None
self.server_ip = None
@staticmethod
def get_connector_properties(root_helper, *args, **kwargs):
"""The DISCO connector properties."""
return {}
def get_search_path(self):
"""Get directory path where to get DISCO volumes."""
return "/dev"
@ -2990,6 +3102,11 @@ class SheepdogConnector(InitiatorConnector):
device_scan_attempts,
*args, **kwargs)
@staticmethod
def get_connector_properties(root_helper, *args, **kwargs):
"""The Sheepdog connector properties."""
return {}
def get_volume_paths(self, connection_properties):
# TODO(lixiaoy1): don't know where the connector
# looks for sheepdog volumes.

View File

@ -25,8 +25,10 @@ from oslo_log import log as logging
from os_brick import exception
from os_brick import executor
from os_brick.i18n import _LE
from os_brick.i18n import _LI
from os_brick.i18n import _LW
from os_brick.privileged import rootwrap as priv_rootwrap
from os_brick import utils
LOG = logging.getLogger(__name__)
@ -110,6 +112,21 @@ class LinuxSCSI(executor.Executor):
root_helper=self._root_helper)
return out.strip()
@staticmethod
def is_multipath_running(enforce_multipath, root_helper):
try:
priv_rootwrap.execute('multipathd', 'show', 'status',
run_as_root=True,
root_helper=root_helper)
except putils.ProcessExecutionError as err:
LOG.error(_LE('multipathd is not running: exit code %(err)s'),
{'err': err.exit_code})
if enforce_multipath:
raise
return False
return True
def remove_multipath_device(self, device):
"""This removes LUNs associated with a multipath device
and the multipath device itself.

View File

@ -14,6 +14,7 @@
import os.path
import platform
import sys
import tempfile
import time
@ -77,6 +78,29 @@ class ConnectorUtilsTestCase(base.TestCase):
'platform': platform}
self.assertEqual(props, props_actual)
def test_brick_get_connector_properties_connectors_called(self):
"""Make sure every connector is called."""
mock_list = []
# Make sure every connector is called
for item in connector.connector_list:
patched = mock.MagicMock()
patched.platform = platform.machine()
patched.os_type = sys.platform
patched.__name__ = item
patched.get_connector_properties.return_value = {}
patcher = mock.patch(item, new=patched)
patcher.start()
self.addCleanup(patcher.stop)
mock_list.append(patched)
connector.get_connector_properties('sudo',
MY_IP,
True, True)
for item in mock_list:
assert item.get_connector_properties.called
def test_brick_get_connector_properties(self):
self._test_brick_get_connector_properties(False, False, False)
@ -138,6 +162,37 @@ class ConnectorTestCase(base.TestCase):
def test_disconnect_volume(self):
self.connector = connector.FakeConnector(None)
def test_get_connector_properties(self):
with mock.patch.object(priv_rootwrap, 'execute') as mock_exec:
mock_exec.return_value = True
multipath = True
enforce_multipath = True
props = connector.InitiatorConnector.get_connector_properties(
'sudo', multipath=multipath,
enforce_multipath=enforce_multipath)
expected_props = {'multipath': True}
self.assertEqual(expected_props, props)
multipath = False
enforce_multipath = True
props = connector.InitiatorConnector.get_connector_properties(
'sudo', multipath=multipath,
enforce_multipath=enforce_multipath)
expected_props = {'multipath': False}
self.assertEqual(expected_props, props)
with mock.patch.object(priv_rootwrap, 'execute',
side_effect=putils.ProcessExecutionError):
multipath = True
enforce_multipath = True
self.assertRaises(
putils.ProcessExecutionError,
connector.InitiatorConnector.get_connector_properties,
'sudo', multipath=multipath,
enforce_multipath=enforce_multipath)
def test_factory(self):
obj = connector.InitiatorConnector.factory('iscsi', None)
self.assertEqual(obj.__class__.__name__, "ISCSIConnector")
@ -217,6 +272,7 @@ class ISCSIConnectorTestCase(ConnectorTestCase):
mock.patch.object(self.connector._linuxscsi, 'get_name_from_path',
return_value="/dev/sdb").start()
self.addCleanup(mock.patch.stopall)
self._fake_iqn = 'iqn.1234-56.foo.bar:01:23456789abc'
def generate_device(self, location, iqn, transport=None, lun=1):
dev_format = "ip-%s-iscsi-%s-lun-%s" % (location, iqn, lun)
@ -267,29 +323,41 @@ class ISCSIConnectorTestCase(ConnectorTestCase):
}
}
def _initiator_get_text(self, *arg, **kwargs):
text = ('## DO NOT EDIT OR REMOVE THIS FILE!\n'
'## If you remove this file, the iSCSI daemon '
'will not start.\n'
'## If you change the InitiatorName, existing '
'access control lists\n'
'## may reject this initiator. The InitiatorName must '
'be unique\n'
'## for each iSCSI initiator. Do NOT duplicate iSCSI '
'InitiatorNames.\n'
'InitiatorName=%s' % self._fake_iqn)
return text, None
def test_get_initiator(self):
def initiator_no_file(*args, **kwargs):
raise putils.ProcessExecutionError('No file')
def initiator_get_text(*arg, **kwargs):
text = ('## DO NOT EDIT OR REMOVE THIS FILE!\n'
'## If you remove this file, the iSCSI daemon '
'will not start.\n'
'## If you change the InitiatorName, existing '
'access control lists\n'
'## may reject this initiator. The InitiatorName must '
'be unique\n'
'## for each iSCSI initiator. Do NOT duplicate iSCSI '
'InitiatorNames.\n'
'InitiatorName=iqn.1234-56.foo.bar:01:23456789abc')
return text, None
self.connector._execute = initiator_no_file
initiator = self.connector.get_initiator()
self.assertIsNone(initiator)
self.connector._execute = initiator_get_text
self.connector._execute = self._initiator_get_text
initiator = self.connector.get_initiator()
self.assertEqual(initiator, 'iqn.1234-56.foo.bar:01:23456789abc')
self.assertEqual(initiator, self._fake_iqn)
def test_get_connector_properties(self):
with mock.patch.object(priv_rootwrap, 'execute') as mock_exec:
mock_exec.return_value = self._initiator_get_text()
multipath = True
enforce_multipath = True
props = connector.ISCSIConnector.get_connector_properties(
'sudo', multipath=multipath,
enforce_multipath=enforce_multipath)
expected_props = {'initiator': self._fake_iqn}
self.assertEqual(expected_props, props)
@mock.patch.object(connector.ISCSIConnector, '_run_iscsiadm_bare')
def test_brick_iscsi_validate_transport(self, mock_iscsiadm):
@ -1185,6 +1253,20 @@ class FibreChannelConnectorTestCase(ConnectorTestCase):
'target_lun': 1,
}}
@mock.patch.object(linuxfc.LinuxFibreChannel, 'get_fc_hbas')
def test_get_connector_properties(self, mock_hbas):
mock_hbas.return_value = self.fake_get_fc_hbas()
multipath = True
enforce_multipath = True
props = connector.FibreChannelConnector.get_connector_properties(
'sudo', multipath=multipath,
enforce_multipath=enforce_multipath)
hbas = self.fake_get_fc_hbas()
expected_props = {'wwpns': [hbas[0]['port_name'].replace('0x', '')],
'wwnns': [hbas[0]['node_name'].replace('0x', '')]}
self.assertEqual(expected_props, props)
def test_get_search_path(self):
search_path = self.connector.get_search_path()
expected = "/dev/disk/by-path"
@ -1592,6 +1674,13 @@ class AoEConnectorTestCase(ConnectorTestCase):
paths = self.connector.get_volume_paths(self.connection_properties)
self.assertEqual(expected, paths)
def test_get_connector_properties(self):
props = connector.AoEConnector.get_connector_properties(
'sudo', multipath=True, enforce_multipath=True)
expected_props = {}
self.assertEqual(expected_props, props)
@mock.patch.object(os.path, 'exists', side_effect=[True, True])
def test_connect_volume(self, exists_mock):
"""Ensure that if path exist aoe-revalidate was called."""
@ -1669,6 +1758,13 @@ class RemoteFsConnectorTestCase(ConnectorTestCase):
connector.RemoteFsConnector('scality', root_helper='sudo')
self.assertEqual(1, mock_scality_remotefs_client.call_count)
def test_get_connector_properties(self):
props = connector.RemoteFsConnector.get_connector_properties(
'sudo', multipath=True, enforce_multipath=True)
expected_props = {}
self.assertEqual(expected_props, props)
def test_get_search_path(self):
expected = self.TEST_BASE
actual = self.connector.get_search_path()
@ -1707,6 +1803,13 @@ class LocalConnectorTestCase(ConnectorTestCase):
'device_path': '/tmp/bar'}
self.connector = connector.LocalConnector(None)
def test_get_connector_properties(self):
props = connector.LocalConnector.get_connector_properties(
'sudo', multipath=True, enforce_multipath=True)
expected_props = {}
self.assertEqual(expected_props, props)
def test_get_search_path(self):
actual = self.connector.get_search_path()
self.assertIsNone(actual)
@ -1802,6 +1905,13 @@ class HuaweiStorHyperConnectorTestCase(ConnectorTestCase):
HuaweiStorHyperConnectorTestCase.attached = True
return 'ret_code=330155007', None
def test_get_connector_properties(self):
props = connector.HuaweiStorHyperConnector.get_connector_properties(
'sudo', multipath=True, enforce_multipath=True)
expected_props = {}
self.assertEqual(expected_props, props)
def test_get_search_path(self):
actual = self.connector.get_search_path()
self.assertIsNone(actual)
@ -2054,6 +2164,13 @@ Request Succeeded
self.assertEqual('space', dev_info['device'])
self.assertEqual('/dev/space', dev_info['path'])
def test_get_connector_properties(self):
props = connector.HGSTConnector.get_connector_properties(
'sudo', multipath=True, enforce_multipath=True)
expected_props = {}
self.assertEqual(expected_props, props)
def test_connect_volume_nohost_fail(self):
"""This host should not be found, connect should fail."""
self._fail_set_apphosts = False
@ -2169,6 +2286,13 @@ class RBDConnectorTestCase(ConnectorTestCase):
actual = rbd.get_volume_paths(self.connection_properties)
self.assertEqual(expected, actual)
def test_get_connector_properties(self):
props = connector.RBDConnector.get_connector_properties(
'sudo', multipath=True, enforce_multipath=True)
expected_props = {}
self.assertEqual(expected_props, props)
@mock.patch('os_brick.initiator.linuxrbd.rbd')
@mock.patch('os_brick.initiator.linuxrbd.rados')
def test_connect_volume(self, mock_rados, mock_rbd):
@ -2240,6 +2364,13 @@ class DRBDConnectorTestCase(ConnectorTestCase):
# out, err
return ('', '')
def test_get_connector_properties(self):
props = connector.DRBDConnector.get_connector_properties(
'sudo', multipath=True, enforce_multipath=True)
expected_props = {}
self.assertEqual(expected_props, props)
def test_connect_volume(self):
"""Test connect_volume."""
@ -2417,6 +2548,13 @@ class ScaleIOConnectorTestCase(ConnectorTestCase):
self.fake_connection_properties)
self.assertEqual(expected, actual)
def test_get_connector_properties(self):
props = connector.ScaleIOConnector.get_connector_properties(
'sudo', multipath=True, enforce_multipath=True)
expected_props = {}
self.assertEqual(expected_props, props)
def test_connect_volume(self):
"""Successful connect to volume"""
self.connector.connect_volume(self.fake_connection_properties)
@ -2584,6 +2722,13 @@ class DISCOConnectorTestCase(ConnectorTestCase):
volume_path = ''.join(volume_items)
return [volume_path]
def test_get_connector_properties(self):
props = connector.DISCOConnector.get_connector_properties(
'sudo', multipath=True, enforce_multipath=True)
expected_props = {}
self.assertEqual(expected_props, props)
def test_get_search_path(self):
"""DISCO volumes should be under /dev."""
expected = "/dev"
@ -2665,6 +2810,13 @@ class SheepdogConnectorTestCase(ConnectorTestCase):
'ports': self.ports,
}
def test_get_connector_properties(self):
props = connector.SheepdogConnector.get_connector_properties(
'sudo', multipath=True, enforce_multipath=True)
expected_props = {}
self.assertEqual(expected_props, props)
def test_get_search_path(self):
sheepdog = connector.SheepdogConnector(None)
path = sheepdog.get_search_path()

View File

@ -56,3 +56,42 @@ def retry(exceptions, interval=1, retries=3, backoff_rate=2):
return _wrapper
return _decorator
def platform_matches(current_platform, connector_platform):
curr_p = current_platform.upper()
conn_p = connector_platform.upper()
if conn_p == 'ALL':
return True
# Add tests against families of platforms
if curr_p == conn_p:
return True
return False
def os_matches(current_os, connector_os):
curr_os = current_os.upper()
conn_os = connector_os.upper()
if conn_os == 'ALL':
return True
# add tests against OSs
if (conn_os == curr_os or
conn_os in curr_os):
return True
return False
def merge_dict(dict1, dict2):
"""Try to safely merge 2 dictionaries."""
if type(dict1) is not dict:
raise Exception("dict1 is not a dictionary")
if type(dict2) is not dict:
raise Exception("dict2 is not a dictionary")
dict3 = dict1.copy()
dict3.update(dict2)
return dict3