Fix the mocking mess

Our tests' mocking is a little messy, we have tests that start a mocking
but don't stop it, others try to stop it but use a mock object instead
of a patcher object, so they don't stop it, and we keep using stopall to
stop the patchings instead of individual stops, and we've seen in Cinder
that this can be problematic.

This patch fixes these issue, most of them just by using base TestCase
mock_object method, and some calling the right stop method (ie:
VolumeEncryptorTestCase).

Change-Id: I545abfa8749e778e923c37e0b908efc578f70619
This commit is contained in:
Gorka Eguileor 2016-08-03 12:49:33 +02:00
parent c5e3d8affb
commit 45184cb9b0
9 changed files with 31 additions and 43 deletions

View File

@ -25,9 +25,10 @@ from oslo_utils import strutils
class TestCase(testtools.TestCase):
"""Test case base class for all unit tests."""
SENTINEL = object()
def setUp(self):
"""Run before each test method to initialize test environment."""
super(TestCase, self).setUp()
@ -82,18 +83,19 @@ class TestCase(testtools.TestCase):
log_root = logging.getLogger(None).logger
log_root.setLevel(level)
def mock_object(self, obj, attr_name, new_attr=None, **kwargs):
def mock_object(self, obj, attr_name, new_attr=SENTINEL, **kwargs):
"""Use python mock to mock an object attribute
Mocks the specified objects attribute with the given value.
Automatically performs 'addCleanup' for the mock.
"""
if not new_attr:
new_attr = mock.Mock()
patcher = mock.patch.object(obj, attr_name, new_attr, **kwargs)
patcher.start()
args = [obj, attr_name]
if new_attr is not self.SENTINEL:
args.append(new_attr)
patcher = mock.patch.object(*args, **kwargs)
mocked = patcher.start()
self.addCleanup(patcher.stop)
return mocked
# Useful assertions
def assertDictMatch(self, d1, d2, approx_equal=False, tolerance=0.001):

View File

@ -36,9 +36,9 @@ class VolumeEncryptorTestCase(base.TestCase):
":volume-fake_uuid-lun-1",
},
}
self.mock_execute = (
mock.patch("os_brick.privileged.rootwrap.execute").start())
self.addCleanup(self.mock_execute.stop)
patcher = mock.patch("os_brick.privileged.rootwrap.execute")
self.mock_execute = patcher.start()
self.addCleanup(patcher.stop)
_hex = codecs.getdecoder("hex_codec")('0' * 32)[0]
self.encryption_key = array.array('B', _hex).tolist()
self.root_helper = None

View File

@ -52,9 +52,8 @@ class AoEConnectorTestCase(test_connector.ConnectorTestCase):
self.connector = aoe.AoEConnector('sudo')
self.connection_properties = {'target_shelf': 'fake_shelf',
'target_lun': 'fake_lun'}
mock.patch.object(loopingcall, 'FixedIntervalLoopingCall',
FakeFixedIntervalLoopingCall).start()
self.addCleanup(mock.patch.stopall)
self.mock_object(loopingcall, 'FixedIntervalLoopingCall',
FakeFixedIntervalLoopingCall)
def test_get_search_path(self):
expected = "/dev/etherd"

View File

@ -12,7 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import glob
import mock
import os
from os_brick import exception
@ -54,14 +53,11 @@ class DISCOConnectorTestCase(test_connector.ConnectorTestCase):
self.request_status = 'success'
# Patch the request and os calls to fake versions
mock.patch.object(disco.DISCOConnector,
'_send_disco_vol_cmd',
self.perform_disco_request).start()
mock.patch.object(os.path,
'exists', self.is_volume_attached).start()
mock.patch.object(glob,
'glob', self.list_disco_volume).start()
self.addCleanup(mock.patch.stopall)
self.mock_object(disco.DISCOConnector,
'_send_disco_vol_cmd',
self.perform_disco_request)
self.mock_object(os.path, 'exists', self.is_volume_attached)
self.mock_object(glob, 'glob', self.list_disco_volume)
# The actual DISCO connector
self.connector = disco.DISCOConnector(

View File

@ -37,9 +37,8 @@ class ISCSIConnectorTestCase(test_connector.ConnectorTestCase):
self.connector_with_multipath = iscsi.ISCSIConnector(
None, execute=self.fake_execute, use_multipath=True)
mock.patch.object(self.connector._linuxscsi, 'get_name_from_path',
return_value="/dev/sdb").start()
self.addCleanup(mock.patch.stopall)
self.mock_object(self.connector._linuxscsi, 'get_name_from_path',
return_value="/dev/sdb")
self._fake_iqn = 'iqn.1234-56.foo.bar:01:23456789abc'
def generate_device(self, location, iqn, transport=None, lun=1):

View File

@ -78,15 +78,11 @@ class ScaleIOConnectorTestCase(test_connector.ConnectorTestCase):
), status_code=404)
# Patch the request and os calls to fake versions
mock.patch.object(
requests, 'get', self.handle_scaleio_request).start()
mock.patch.object(
requests, 'post', self.handle_scaleio_request).start()
mock.patch.object(os.path, 'isdir', return_value=True).start()
mock.patch.object(
os, 'listdir', return_value=["emc-vol-{}".format(self.vol['id'])]
).start()
self.addCleanup(mock.patch.stopall)
self.mock_object(requests, 'get', self.handle_scaleio_request)
self.mock_object(requests, 'post', self.handle_scaleio_request)
self.mock_object(os.path, 'isdir', return_value=True)
self.mock_object(os, 'listdir',
return_value=["emc-vol-{}".format(self.vol['id'])])
# The actual ScaleIO connector
self.connector = scaleio.ScaleIOConnector(
@ -239,9 +235,7 @@ class ScaleIOConnectorTestCase(test_connector.ConnectorTestCase):
@mock.patch('time.sleep')
def test_error_path_not_found(self, sleep_mock):
"""Timeout waiting for volume to map to local file system"""
mock.patch.object(
os, 'listdir', return_value=["emc-vol-no-volume"]
).start()
self.mock_object(os, 'listdir', return_value=["emc-vol-no-volume"])
self.assertRaises(exception.BrickException, self.test_connect_volume)
self.assertTrue(sleep_mock.called)

View File

@ -26,8 +26,7 @@ class LinuxFCTestCase(base.TestCase):
super(LinuxFCTestCase, self).setUp()
self.cmds = []
mock.patch.object(os.path, 'exists', return_value=True).start()
self.addCleanup(mock.patch.stopall)
self.mock_object(os.path, 'exists', return_value=True)
self.lfc = linuxfc.LinuxFibreChannel(None, execute=self.fake_execute)
def fake_execute(self, *cmd, **kwargs):

View File

@ -31,9 +31,8 @@ class LinuxSCSITestCase(base.TestCase):
def setUp(self):
super(LinuxSCSITestCase, self).setUp()
self.cmds = []
mock.patch.object(os.path, 'realpath', return_value='/dev/sdc').start()
mock.patch.object(os, 'stat', returns=os.stat(__file__)).start()
self.addCleanup(mock.patch.stopall)
self.mock_object(os.path, 'realpath', return_value='/dev/sdc')
self.mock_object(os, 'stat', returns=os.stat(__file__))
self.linuxscsi = linuxscsi.LinuxSCSI(None, execute=self.fake_execute)
def fake_execute(self, *cmd, **kwargs):

View File

@ -28,7 +28,7 @@ class RemoteFsClientTestCase(base.TestCase):
def setUp(self):
super(RemoteFsClientTestCase, self).setUp()
self.mock_execute = mock.patch.object(priv_rootwrap, 'execute').start()
self.mock_execute = self.mock_object(priv_rootwrap, 'execute')
@mock.patch.object(remotefs.RemoteFsClient, '_read_mounts',
return_value=[])