Merge "Replace pexpect with processutils in volume.py"

This commit is contained in:
Jenkins 2016-12-23 19:39:40 +00:00 committed by Gerrit Code Review
commit c4468ac65d
4 changed files with 274 additions and 223 deletions

View File

@ -14,15 +14,14 @@
# under the License.
import os
import shlex
from tempfile import NamedTemporaryFile
import traceback
from oslo_log import log as logging
import pexpect
from trove.common import cfg
from trove.common.exception import GuestError
from trove.common.exception import ProcessExecutionError
from trove.common import exception
from trove.common.i18n import _
from trove.common import utils
from trove.guestagent.common import operating_system
@ -33,6 +32,12 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF
def log_and_raise(message):
LOG.exception(message)
raise_msg = message + _("\nExc: %s") % traceback.format_exc()
raise exception.GuestError(original_message=raise_msg)
class VolumeDevice(object):
def __init__(self, device_path):
@ -48,9 +53,14 @@ class VolumeDevice(object):
target_dir = TMP_MOUNT_POINT
if target_subdir:
target_dir = target_dir + "/" + target_subdir
utils.execute("sudo", "rsync", "--safe-links", "--perms",
"--recursive", "--owner", "--group", "--xattrs",
"--sparse", source_dir, target_dir)
try:
utils.execute("rsync", "--safe-links", "--perms",
"--recursive", "--owner", "--group", "--xattrs",
"--sparse", source_dir, target_dir,
run_as_root=True, root_helper="sudo")
except exception.ProcessExecutionError:
msg = _("Could not migrate data.")
log_and_raise(msg)
self.unmount(TMP_MOUNT_POINT)
def _check_device_exists(self):
@ -64,46 +74,48 @@ class VolumeDevice(object):
num_tries = CONF.num_tries
LOG.debug("Checking if %s exists." % self.device_path)
utils.execute('sudo', 'blockdev', '--getsize64', self.device_path,
utils.execute("blockdev", "--getsize64", self.device_path,
run_as_root=True, root_helper="sudo",
attempts=num_tries)
except ProcessExecutionError:
LOG.exception(_("Error getting device status"))
raise GuestError(original_message=_(
"InvalidDevicePath(path=%s)") % self.device_path)
except exception.ProcessExecutionError:
msg = _("Device '%s' is not ready.") % self.device_path
log_and_raise(msg)
def _check_format(self):
"""Checks that an unmounted volume is formatted."""
cmd = "sudo dumpe2fs %s" % self.device_path
LOG.debug("Checking whether %s is formatted: %s." %
(self.device_path, cmd))
child = pexpect.spawn(cmd)
"""Checks that a volume is formatted."""
LOG.debug("Checking whether '%s' is formatted." % self.device_path)
try:
i = child.expect(['has_journal', 'Wrong magic number'])
if i == 0:
return
volume_fstype = CONF.volume_fstype
raise IOError(
_('Device path at {0} did not seem to be {1}.').format(
self.device_path, volume_fstype))
except pexpect.EOF:
raise IOError(_("Volume was not formatted."))
child.expect(pexpect.EOF)
stdout, stderr = utils.execute(
"dumpe2fs", self.device_path,
run_as_root=True, root_helper="sudo")
if 'has_journal' not in stdout:
msg = _("Volume '%s' does not appear to be formatted.") % (
self.device_path)
raise exception.GuestError(original_message=msg)
except exception.ProcessExecutionError as pe:
if 'Wrong magic number' in pe.stderr:
volume_fstype = CONF.volume_fstype
msg = _("'Device '%(dev)s' did not seem to be '%(type)s'.") % (
{'dev': self.device_path, 'type': volume_fstype})
log_and_raise(msg)
msg = _("Volume '%s' was not formatted.") % self.device_path
log_and_raise(msg)
def _format(self):
"""Calls mkfs to format the device at device_path."""
volume_fstype = CONF.volume_fstype
format_options = CONF.format_options
cmd = "sudo mkfs -t %s %s %s" % (volume_fstype,
format_options, self.device_path)
format_options = shlex.split(CONF.format_options)
format_options.append(self.device_path)
volume_format_timeout = CONF.volume_format_timeout
LOG.debug("Formatting %s. Executing: %s." %
(self.device_path, cmd))
child = pexpect.spawn(cmd, timeout=volume_format_timeout)
# child.expect("(y,n)")
# child.sendline('y')
child.expect(pexpect.EOF)
LOG.debug("Formatting '%s'." % self.device_path)
try:
utils.execute_with_timeout(
"mkfs", "--type", volume_fstype, *format_options,
run_as_root=True, root_helper="sudo",
timeout=volume_format_timeout)
except exception.ProcessExecutionError:
msg = _("Could not format '%s'.") % self.device_path
log_and_raise(msg)
def format(self):
"""Formats the device at device_path and checks the filesystem."""
@ -120,59 +132,77 @@ class VolumeDevice(object):
if write_to_fstab:
mount_point.write_to_fstab()
def _wait_for_mount(self, mount_point, timeout=2):
"""Wait for a fs to be mounted."""
def wait_for_mount():
return operating_system.is_mount(mount_point)
try:
utils.poll_until(wait_for_mount, sleep_time=1, time_out=timeout)
except exception.PollTimeOut:
return False
return True
def resize_fs(self, mount_point):
"""Resize the filesystem on the specified device."""
self._check_device_exists()
# Some OS's will mount a file systems after it's attached if
# an entry is put in the fstab file (like Trove does).
# Thus it may be necessary to wait for the mount and then unmount
# the fs again (since the volume was just attached).
if self._wait_for_mount(mount_point, timeout=2):
LOG.debug("Unmounting '%s' before resizing." % mount_point)
self.unmount(mount_point)
try:
# check if the device is mounted at mount_point before e2fsck
if not os.path.ismount(mount_point):
utils.execute("e2fsck", "-f", "-p", self.device_path,
run_as_root=True, root_helper="sudo")
utils.execute("e2fsck", "-f", "-p", self.device_path,
run_as_root=True, root_helper="sudo")
utils.execute("resize2fs", self.device_path,
run_as_root=True, root_helper="sudo")
except ProcessExecutionError:
LOG.exception(_("Error resizing file system."))
msg = _("Error resizing the filesystem with device '%(dev)s'.\n"
"Exc: %(exc)s") % (
{'dev': self.device_path,
'exc': traceback.format_exc()})
raise GuestError(original_message=msg)
except exception.ProcessExecutionError:
msg = _("Error resizing the filesystem with device '%s'.") % (
self.device_path)
log_and_raise(msg)
def unmount(self, mount_point):
if operating_system.is_mount(mount_point):
cmd = "sudo umount %s" % mount_point
child = pexpect.spawn(cmd)
child.expect(pexpect.EOF)
try:
utils.execute("umount", mount_point,
run_as_root=True, root_helper='sudo')
except exception.ProcessExecutionError:
msg = _("Error unmounting '%s'.") % mount_point
log_and_raise(msg)
else:
LOG.debug("'%s' is not a mounted fs, cannot unmount", mount_point)
def unmount_device(self, device_path):
# unmount if device is already mounted
mount_points = self.mount_points(device_path)
for mnt in mount_points:
LOG.info(_("Device %(device)s is already mounted in "
"%(mount_point)s. Unmounting now.") %
LOG.info(_("Device '%(device)s' is mounted on "
"'%(mount_point)s'. Unmounting now.") %
{'device': device_path, 'mount_point': mnt})
self.unmount(mnt)
def mount_points(self, device_path):
"""Returns a list of mount points on the specified device."""
stdout, stderr = utils.execute(
"grep %s /etc/mtab" % device_path,
"grep '^%s ' /etc/mtab" % device_path,
shell=True, check_exit_code=[0, 1])
return [entry.strip().split()[1] for entry in stdout.splitlines()]
def set_readahead_size(self, readahead_size,
execute_function=utils.execute):
def set_readahead_size(self, readahead_size):
"""Set the readahead size of disk."""
self._check_device_exists()
try:
execute_function("sudo", "blockdev", "--setra",
readahead_size, self.device_path)
except ProcessExecutionError:
LOG.exception(_("Error setting readhead size to %(size)s "
"for device %(device)s.") %
{'size': readahead_size, 'device': self.device_path})
raise GuestError(original_message=_(
"Error setting readhead size: %s.") % self.device_path)
utils.execute("blockdev", "--setra",
readahead_size, self.device_path,
run_as_root=True, root_helper="sudo")
except exception.ProcessExecutionError:
msg = _("Error setting readahead size to %(size)s "
"for device %(device)s.") % {
'size': readahead_size, 'device': self.device_path}
log_and_raise(msg)
class VolumeMountPoint(object):
@ -184,17 +214,21 @@ class VolumeMountPoint(object):
self.mount_options = CONF.mount_options
def mount(self):
if not os.path.exists(self.mount_point):
if not operating_system.exists(self.mount_point, is_directory=True,
as_root=True):
operating_system.create_directory(self.mount_point, as_root=True)
LOG.debug("Mounting volume. Device path:{0}, mount_point:{1}, "
"volume_type:{2}, mount options:{3}".format(
self.device_path, self.mount_point, self.volume_fstype,
self.mount_options))
cmd = ("sudo mount -t %s -o %s %s %s" %
(self.volume_fstype, self.mount_options, self.device_path,
self.mount_point))
child = pexpect.spawn(cmd)
child.expect(pexpect.EOF)
try:
utils.execute("mount", "-t", self.volume_fstype,
"-o", self.mount_options,
self.device_path, self.mount_point,
run_as_root=True, root_helper="sudo")
except exception.ProcessExecutionError:
msg = _("Could not mount '%s'.") % self.mount_point
log_and_raise(msg)
def write_to_fstab(self):
fstab_line = ("%s\t%s\t%s\t%s\t0\t0" %
@ -205,6 +239,11 @@ class VolumeMountPoint(object):
fstab_content = fstab.read()
with NamedTemporaryFile(mode='w', delete=False) as tempfstab:
tempfstab.write(fstab_content + fstab_line)
utils.execute("sudo", "install", "-o", "root", "-g", "root", "-m",
"644", tempfstab.name, "/etc/fstab")
try:
utils.execute("install", "-o", "root", "-g", "root",
"-m", "644", tempfstab.name, "/etc/fstab",
run_as_root=True, root_helper="sudo")
except exception.ProcessExecutionError:
msg = _("Could not add '%s' to fstab.") % self.mount_point
log_and_raise(msg)
os.remove(tempfstab.name)

View File

@ -1534,7 +1534,7 @@ def persist_instance_fault(notification, event_qualifier):
save_instance_fault(instance_id, message, details)
def save_instance_fault(instance_id, message, details):
def save_instance_fault(instance_id, message, details, skip_delta=None):
if instance_id:
try:
# Make sure it's a valid id - sometimes the error is related
@ -1544,8 +1544,19 @@ def save_instance_fault(instance_id, message, details):
det = utils.format_output(details)
try:
fault = DBInstanceFault.find_by(instance_id=instance_id)
fault.set_info(msg, det)
fault.save()
skip = False
# If we were passed in a skip_delta, only update the fault
# if the old one is at least skip_delta seconds in the past
if skip_delta:
skip_time = fault.updated + timedelta(seconds=skip_delta)
now = datetime.now()
skip = now < skip_time
if skip:
LOG.debug(
"Skipping fault message in favor of previous one")
else:
fault.set_info(msg, det)
fault.save()
except exception.ModelNotFoundError:
DBInstanceFault.create(
instance_id=instance_id,

View File

@ -364,7 +364,8 @@ class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin):
finally:
if error_message:
inst_models.save_instance_fault(
self.id, error_message, error_details)
self.id, error_message, error_details,
skip_delta=USAGE_SLEEP_TIME + 1)
def create_instance(self, flavor, image_id, databases, users,
datastore_manager, packages, volume_size,

View File

@ -12,194 +12,191 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
from mock import ANY, call, DEFAULT, patch, mock_open
from mock import Mock, MagicMock, patch, mock_open
import pexpect
from trove.common.exception import GuestError, ProcessExecutionError
from trove.common import exception
from trove.common import utils
from trove.guestagent.common import operating_system
from trove.guestagent import volume
from trove.tests.unittests import trove_testtools
def _setUp_fake_spawn(return_val=0):
fake_spawn = pexpect.spawn('echo')
fake_spawn.expect = Mock(return_value=return_val)
pexpect.spawn = Mock(return_value=fake_spawn)
return fake_spawn
class VolumeDeviceTest(trove_testtools.TestCase):
def setUp(self):
super(VolumeDeviceTest, self).setUp()
self.volumeDevice = volume.VolumeDevice('/dev/vdb')
self.exec_patcher = patch.object(
utils, 'execute', return_value=('has_journal', ''))
self.mock_exec = self.exec_patcher.start()
self.addCleanup(self.exec_patcher.stop)
self.ismount_patcher = patch.object(operating_system, 'is_mount')
self.mock_ismount = self.ismount_patcher.start()
self.addCleanup(self.ismount_patcher.stop)
def tearDown(self):
super(VolumeDeviceTest, self).tearDown()
@patch.object(pexpect, 'spawn', Mock())
def test_migrate_data(self):
origin_execute = utils.execute
utils.execute = Mock()
origin_os_path_exists = os.path.exists
os.path.exists = Mock()
fake_spawn = _setUp_fake_spawn()
origin_unmount = self.volumeDevice.unmount
self.volumeDevice.unmount = MagicMock()
self.volumeDevice.migrate_data('/')
self.assertEqual(1, fake_spawn.expect.call_count)
self.assertEqual(1, utils.execute.call_count)
self.assertEqual(1, self.volumeDevice.unmount.call_count)
utils.execute = origin_execute
self.volumeDevice.unmount = origin_unmount
os.path.exists = origin_os_path_exists
with patch.multiple(self.volumeDevice,
mount=DEFAULT, unmount=DEFAULT) as mocks:
self.volumeDevice.migrate_data('/')
self.assertEqual(1, mocks['mount'].call_count)
self.assertEqual(1, mocks['unmount'].call_count)
self.assertEqual(1, self.mock_exec.call_count)
calls = [
call('rsync', '--safe-links', '--perms', '--recursive',
'--owner', '--group', '--xattrs',
'--sparse', '/', '/mnt/volume',
root_helper='sudo', run_as_root=True),
]
self.mock_exec.assert_has_calls(calls)
def test__check_device_exists(self):
origin_execute = utils.execute
utils.execute = Mock()
self.volumeDevice._check_device_exists()
self.assertEqual(1, utils.execute.call_count)
utils.execute = origin_execute
self.assertEqual(1, self.mock_exec.call_count)
calls = [
call('blockdev', '--getsize64', '/dev/vdb', attempts=3,
root_helper='sudo', run_as_root=True)
]
self.mock_exec.assert_has_calls(calls)
@patch('trove.guestagent.volume.LOG')
def test_fail__check_device_exists(self, mock_logging):
with patch.object(utils, 'execute', side_effect=ProcessExecutionError):
self.assertRaises(GuestError,
with patch.object(utils, 'execute',
side_effect=exception.ProcessExecutionError):
self.assertRaises(exception.GuestError,
self.volumeDevice._check_device_exists)
@patch.object(pexpect, 'spawn', Mock())
def test__check_format(self):
fake_spawn = _setUp_fake_spawn()
self.volumeDevice._check_format()
self.assertEqual(1, fake_spawn.expect.call_count)
self.assertEqual(1, self.mock_exec.call_count)
calls = [
call('dumpe2fs', '/dev/vdb', root_helper='sudo', run_as_root=True)
]
self.mock_exec.assert_has_calls(calls)
@patch.object(pexpect, 'spawn', Mock())
def test__check_format_2(self):
fake_spawn = _setUp_fake_spawn(return_val=1)
@patch('trove.guestagent.volume.LOG')
def test__check_format_2(self, mock_logging):
self.assertEqual(0, self.mock_exec.call_count)
proc_err = exception.ProcessExecutionError()
proc_err.stderr = 'Wrong magic number'
self.mock_exec.side_effect = proc_err
self.assertRaises(exception.GuestError,
self.volumeDevice._check_format)
self.assertEqual(0, fake_spawn.expect.call_count)
self.assertRaises(IOError, self.volumeDevice._check_format)
@patch.object(pexpect, 'spawn', Mock())
def test__format(self):
fake_spawn = _setUp_fake_spawn()
self.volumeDevice._format()
self.assertEqual(1, fake_spawn.expect.call_count)
self.assertEqual(1, pexpect.spawn.call_count)
self.assertEqual(1, self.mock_exec.call_count)
calls = [
call('mkfs', '--type', 'ext3', '-m', '5', '/dev/vdb',
root_helper='sudo', run_as_root=True)
]
self.mock_exec.assert_has_calls(calls)
def test_format(self):
origin_check_device_exists = self.volumeDevice._check_device_exists
origin_format = self.volumeDevice._format
origin_check_format = self.volumeDevice._check_format
self.volumeDevice._check_device_exists = MagicMock()
self.volumeDevice._check_format = MagicMock()
self.volumeDevice._format = MagicMock()
self.volumeDevice.format()
self.assertEqual(1, self.volumeDevice._check_device_exists.call_count)
self.assertEqual(1, self.volumeDevice._format.call_count)
self.assertEqual(1, self.volumeDevice._check_format.call_count)
self.volumeDevice._check_device_exists = origin_check_device_exists
self.volumeDevice._format = origin_format
self.volumeDevice._check_format = origin_check_format
self.assertEqual(3, self.mock_exec.call_count)
calls = [
call('blockdev', '--getsize64', '/dev/vdb', attempts=3,
root_helper='sudo', run_as_root=True),
call('mkfs', '--type', 'ext3', '-m', '5', '/dev/vdb',
root_helper='sudo', run_as_root=True),
call('dumpe2fs', '/dev/vdb', root_helper='sudo', run_as_root=True)
]
self.mock_exec.assert_has_calls(calls)
def test_mount(self):
origin_ = volume.VolumeMountPoint.mount
volume.VolumeMountPoint.mount = Mock()
origin_os_path_exists = os.path.exists
os.path.exists = Mock()
origin_write_to_fstab = volume.VolumeMountPoint.write_to_fstab
volume.VolumeMountPoint.write_to_fstab = Mock()
self.volumeDevice.mount(Mock)
self.assertEqual(1, volume.VolumeMountPoint.mount.call_count)
self.assertEqual(1, volume.VolumeMountPoint.write_to_fstab.call_count)
volume.VolumeMountPoint.mount = origin_
volume.VolumeMountPoint.write_to_fstab = origin_write_to_fstab
os.path.exists = origin_os_path_exists
with patch.multiple(volume.VolumeMountPoint,
mount=DEFAULT, write_to_fstab=DEFAULT) as mocks:
self.volumeDevice.mount('/dev/vba')
self.assertEqual(1, mocks['mount'].call_count,
"Wrong number of calls to mount()")
self.assertEqual(1, mocks['write_to_fstab'].call_count,
"Wrong number of calls to write_to_fstab()")
self.mock_exec.assert_not_called()
def test_resize_fs(self):
origin_check_device_exists = self.volumeDevice._check_device_exists
origin_execute = utils.execute
utils.execute = Mock()
self.volumeDevice._check_device_exists = MagicMock()
origin_os_path_exists = os.path.exists
os.path.exists = Mock()
with patch.object(operating_system, 'is_mount', return_value=True):
mount_point = '/mnt/volume'
self.volumeDevice.resize_fs(mount_point)
self.assertEqual(4, self.mock_exec.call_count)
calls = [
call('blockdev', '--getsize64', '/dev/vdb', attempts=3,
root_helper='sudo', run_as_root=True),
call("umount", mount_point, run_as_root=True,
root_helper='sudo'),
call('e2fsck', '-f', '-p', '/dev/vdb', root_helper='sudo',
run_as_root=True),
call('resize2fs', '/dev/vdb', root_helper='sudo',
run_as_root=True)
]
self.mock_exec.assert_has_calls(calls)
self.volumeDevice.resize_fs('/mnt/volume')
self.assertEqual(1, self.volumeDevice._check_device_exists.call_count)
self.assertEqual(2, utils.execute.call_count)
self.volumeDevice._check_device_exists = origin_check_device_exists
os.path.exists = origin_os_path_exists
utils.execute = origin_execute
@patch.object(os.path, 'ismount', return_value=True)
@patch.object(utils, 'execute', side_effect=ProcessExecutionError)
@patch.object(utils, 'execute',
side_effect=exception.ProcessExecutionError)
@patch('trove.guestagent.volume.LOG')
def test_fail_resize_fs(self, mock_logging, mock_execute, mock_mount):
def test_fail_resize_fs(self, mock_logging, mock_execute):
with patch.object(self.volumeDevice, '_check_device_exists'):
self.assertRaises(GuestError,
self.assertRaises(exception.GuestError,
self.volumeDevice.resize_fs, '/mnt/volume')
self.assertEqual(1,
self.volumeDevice._check_device_exists.call_count)
self.assertEqual(1, mock_mount.call_count)
self.assertEqual(2, self.mock_ismount.call_count)
def test_unmount_positive(self):
self._test_unmount()
def test_unmount_negative(self):
self._test_unmount(False)
self._test_unmount(has_mount=False)
@patch.object(pexpect, 'spawn', Mock())
def _test_unmount(self, positive=True):
origin_is_mount = operating_system.is_mount
operating_system.is_mount = MagicMock(return_value=positive)
fake_spawn = _setUp_fake_spawn()
def _test_unmount(self, has_mount=True):
with patch.object(operating_system, 'is_mount',
return_value=has_mount):
self.volumeDevice.unmount('/mnt/volume')
if has_mount:
self.assertEqual(1, self.mock_exec.call_count)
else:
self.mock_exec.assert_not_called()
self.volumeDevice.unmount('/mnt/volume')
COUNT = 1
if not positive:
COUNT = 0
self.assertEqual(COUNT, fake_spawn.expect.call_count)
operating_system.is_mount = origin_is_mount
@patch.object(utils, 'execute')
def test_mount_points(self, mock_execute):
mock_execute.return_value = (
def test_mount_points(self):
self.mock_exec.return_value = (
("/dev/vdb /var/lib/mysql xfs rw 0 0", ""))
mount_point = self.volumeDevice.mount_points('/dev/vdb')
self.assertEqual(['/var/lib/mysql'], mount_point)
self.assertEqual(1, self.mock_exec.call_count)
calls = [
call("grep '^/dev/vdb ' /etc/mtab", check_exit_code=[0, 1],
shell=True)
]
self.mock_exec.assert_has_calls(calls)
def test_set_readahead_size(self):
origin_check_device_exists = self.volumeDevice._check_device_exists
self.volumeDevice._check_device_exists = MagicMock()
mock_execute = MagicMock(return_value=None)
readahead_size = 2048
self.volumeDevice.set_readahead_size(readahead_size,
execute_function=mock_execute)
blockdev = mock_execute.call_args_list[0]
blockdev.assert_called_with("sudo", "blockdev", "--setra",
readahead_size, "/dev/vdb")
self.volumeDevice._check_device_exists = origin_check_device_exists
self.volumeDevice.set_readahead_size(readahead_size)
self.assertEqual(2, self.mock_exec.call_count)
calls = [
call('blockdev', '--getsize64', '/dev/vdb', attempts=3,
root_helper='sudo', run_as_root=True),
call('blockdev', '--setra', readahead_size, '/dev/vdb',
root_helper='sudo', run_as_root=True),
]
self.mock_exec.assert_has_calls(calls)
@patch('trove.guestagent.volume.LOG')
def test_fail_set_readahead_size(self, mock_logging):
mock_execute = MagicMock(side_effect=ProcessExecutionError)
self.mock_exec.side_effect = exception.ProcessExecutionError
readahead_size = 2048
with patch.object(self.volumeDevice, '_check_device_exists'):
self.assertRaises(GuestError, self.volumeDevice.set_readahead_size,
readahead_size, execute_function=mock_execute)
self.volumeDevice._check_device_exists.assert_any_call()
self.assertRaises(exception.GuestError,
self.volumeDevice.set_readahead_size,
readahead_size)
self.assertEqual(1, self.mock_exec.call_count)
calls = [
call('blockdev', '--getsize64', '/dev/vdb', attempts=3,
root_helper='sudo', run_as_root=True),
]
self.mock_exec.assert_has_calls(calls)
class VolumeMountPointTest(trove_testtools.TestCase):
@ -208,32 +205,35 @@ class VolumeMountPointTest(trove_testtools.TestCase):
super(VolumeMountPointTest, self).setUp()
self.volumeMountPoint = volume.VolumeMountPoint('/mnt/device',
'/dev/vdb')
self.exec_patcher = patch.object(utils, 'execute',
return_value=('', ''))
self.mock_exec = self.exec_patcher.start()
self.addCleanup(self.exec_patcher.stop)
def tearDown(self):
super(VolumeMountPointTest, self).tearDown()
@patch.object(pexpect, 'spawn', Mock())
def test_mount(self):
origin_ = os.path.exists
os.path.exists = MagicMock(return_value=False)
fake_spawn = _setUp_fake_spawn()
with patch.object(utils, 'execute_with_timeout',
return_value=('0', '')):
with patch.object(operating_system, 'exists', return_value=False):
self.volumeMountPoint.mount()
self.assertEqual(1, os.path.exists.call_count)
self.assertEqual(1, utils.execute_with_timeout.call_count)
self.assertEqual(1, fake_spawn.expect.call_count)
os.path.exists = origin_
self.assertEqual(2, self.mock_exec.call_count)
calls = [
call('mkdir', '-p', '/dev/vdb', root_helper='sudo',
run_as_root=True),
call('mount', '-t', 'ext3', '-o', 'defaults,noatime',
'/mnt/device', '/dev/vdb', root_helper='sudo',
run_as_root=True)
]
self.mock_exec.assert_has_calls(calls)
def test_write_to_fstab(self):
origin_execute = utils.execute
utils.execute = Mock()
m = mock_open()
with patch('%s.open' % volume.__name__, m, create=True):
mock_file = mock_open()
with patch('%s.open' % volume.__name__, mock_file, create=True):
self.volumeMountPoint.write_to_fstab()
self.assertEqual(1, utils.execute.call_count)
utils.execute = origin_execute
self.assertEqual(1, self.mock_exec.call_count)
calls = [
call('install', '-o', 'root', '-g', 'root', '-m', '644',
ANY, '/etc/fstab', root_helper='sudo',
run_as_root=True)
]
self.mock_exec.assert_has_calls(calls)