From 4aa39c44a4b08ee4e05548d5c258e795089b2bdd Mon Sep 17 00:00:00 2001 From: Matthew Booth Date: Fri, 7 Oct 2016 19:14:38 +0100 Subject: [PATCH] libvirt: Fix races with nfs volume mount/umount A single nfs export typically contains multiple volumes. We were handling this in the libvirt driver by: 1. On mount, we 'ensure' the mount is available, so we don't fail if another instance already has it mounted. 2. On umount, we trap and ignore 'device is busy' so we don't fail if another instance is already using it. Unfortunately, while this works for serial mounts and unmounts, there are multiple failure cases when volumes from the same export are mounted and unmounted simultaneously. It causes an error if an instance is stopped: as the qemu process is not actively using the mountpoint it will not prevent an unmount for another volume on the same mountpoint from succeeding. It will not be possible to restart the instance, because its mountpoint will not be mounted. To fix this, we create a singleton manager object, which tracks mounts and umount requests per export, and calls the real mount/umount only when required. It uses per-export locks to allow concurrency while avoiding races. Because we now expect to know the state of the host at all times, we no longer need to execute speculative mount/umount commands. As we track attachments (a mapping from volume to instance) rather than volumes, we also gracefully support multi-attach. This change implements this for nfs, but the solution is intended to be extended to all LibvirtBaseFileSystemVolumeDrivers. Closes-Bug: #1421550 Change-Id: I3155984d76df06371a6c45f633aa448168a96d64 --- .../unit/virt/libvirt/volume/test_mount.py | 611 ++++++++++++++++++ .../unit/virt/libvirt/volume/test_nfs.py | 102 ++- nova/virt/libvirt/driver.py | 6 + nova/virt/libvirt/volume/fs.py | 38 ++ nova/virt/libvirt/volume/mount.py | 428 ++++++++++++ nova/virt/libvirt/volume/nfs.py | 70 +- 6 files changed, 1137 insertions(+), 118 deletions(-) create mode 100644 nova/tests/unit/virt/libvirt/volume/test_mount.py create mode 100644 nova/virt/libvirt/volume/mount.py diff --git a/nova/tests/unit/virt/libvirt/volume/test_mount.py b/nova/tests/unit/virt/libvirt/volume/test_mount.py new file mode 100644 index 000000000000..8dd136c7eaaa --- /dev/null +++ b/nova/tests/unit/virt/libvirt/volume/test_mount.py @@ -0,0 +1,611 @@ +# Copyright 2017 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import os.path +import threading +import time + +import eventlet +import fixtures +import mock + +from oslo_concurrency import processutils + +from nova import exception +from nova import test +from nova.tests import uuidsentinel as uuids +from nova.virt.libvirt import config as libvirt_config +from nova.virt.libvirt import guest as libvirt_guest +from nova.virt.libvirt import host as libvirt_host +from nova.virt.libvirt.volume import mount + + +# We wait on events in a few cases. In normal execution the wait period should +# be in the order of fractions of a millisecond. However, if we've hit a bug we +# might deadlock and never return. To be nice to our test environment, we cut +# this short at MAX_WAIT seconds. This should be large enough that normal +# jitter won't trigger it, but not so long that it's annoying to wait for. +MAX_WAIT = 2 + + +class TestThreadController(object): + """Helper class for executing a test thread incrementally by waiting at + named waitpoints. + + def test(ctl): + things() + ctl.waitpoint('foo') + more_things() + ctl.waitpoint('bar') + final_things() + + ctl = TestThreadController(test) + ctl.runto('foo') + assert(things) + ctl.runto('bar') + assert(more_things) + ctl.finish() + assert(final_things) + + This gets more interesting when the waitpoints are mocked into non-test + code. + """ + + # A map of threads to controllers + all_threads = {} + + def __init__(self, fn): + """Create a TestThreadController. + + :param fn: A test function which takes a TestThreadController as its + only argument + """ + + # All updates to wait_at and waiting are guarded by wait_lock + self.wait_lock = threading.Condition() + # The name of the next wait point + self.wait_at = None + # True when waiting at a waitpoint + self.waiting = False + # Incremented every time we continue from a waitpoint + self.epoch = 1 + # The last epoch we waited at + self.last_epoch = 0 + + self.start_event = eventlet.event.Event() + self.running = False + self.complete = False + + # We must not execute fn() until the thread has been registered in + # all_threads. eventlet doesn't give us an API to do this directly, + # so we defer with an Event + def deferred_start(): + self.start_event.wait() + fn() + + with self.wait_lock: + self.complete = True + self.wait_lock.notify_all() + + self.thread = eventlet.greenthread.spawn(deferred_start) + self.all_threads[self.thread] = self + + @classmethod + def current(cls): + return cls.all_threads.get(eventlet.greenthread.getcurrent()) + + def _ensure_running(self): + if not self.running: + self.running = True + self.start_event.send() + + def waitpoint(self, name): + """Called by the test thread. Wait at a waitpoint called name""" + with self.wait_lock: + wait_since = time.time() + while name == self.wait_at: + self.waiting = True + self.wait_lock.notify_all() + self.wait_lock.wait(1) + assert(time.time() - wait_since < MAX_WAIT) + + self.epoch += 1 + self.waiting = False + self.wait_lock.notify_all() + + def runto(self, name): + """Called by the control thread. Cause the test thread to run until + reaching a waitpoint called name. When runto() exits, the test + thread is guaranteed to have reached this waitpoint. + """ + with self.wait_lock: + # Set a new wait point + self.wait_at = name + self.wait_lock.notify_all() + + # We deliberately don't do this first to avoid a race the first + # time we call runto() + self._ensure_running() + + # Wait until the test thread is at the wait point + wait_since = time.time() + while self.epoch == self.last_epoch or not self.waiting: + self.wait_lock.wait(1) + assert(time.time() - wait_since < MAX_WAIT) + + self.last_epoch = self.epoch + + def start(self): + """Called by the control thread. Cause the test thread to start + running, but to not wait for it to complete. + """ + self._ensure_running() + + def finish(self): + """Called by the control thread. Cause the test thread to run to + completion. When finish() exits, the test thread is guaranteed to + have completed. + """ + self._ensure_running() + + wait_since = time.time() + with self.wait_lock: + self.wait_at = None + self.wait_lock.notify_all() + while not self.complete: + self.wait_lock.wait(1) + assert(time.time() - wait_since < MAX_WAIT) + + self.thread.wait() + + +class HostMountStateTestCase(test.NoDBTestCase): + def setUp(self): + super(HostMountStateTestCase, self).setUp() + + self.mounted = set() + + def fake_execute(cmd, *args, **kwargs): + if cmd == 'mount': + path = args[-1] + if path in self.mounted: + raise processutils.ProcessExecutionError('Already mounted') + self.mounted.add(path) + elif cmd == 'umount': + path = args[-1] + if path not in self.mounted: + raise processutils.ProcessExecutionError('Not mounted') + self.mounted.remove(path) + + def fake_ismount(path): + return path in self.mounted + + mock_execute = mock.MagicMock(side_effect=fake_execute) + + self.useFixture(fixtures.MonkeyPatch('nova.utils.execute', + mock_execute)) + self.useFixture(fixtures.MonkeyPatch('os.path.ismount', fake_ismount)) + + def test_init(self): + # Test that we initialise the state of MountManager correctly at + # startup + def fake_disk(disk): + libvirt_disk = libvirt_config.LibvirtConfigGuestDisk() + libvirt_disk.source_type = disk[0] + libvirt_disk.source_path = os.path.join(*disk[1]) + return libvirt_disk + + def mock_guest(uuid, disks): + guest = mock.create_autospec(libvirt_guest.Guest) + guest.uuid = uuid + guest.get_all_disks.return_value = map(fake_disk, disks) + return guest + + local_dir = '/local' + mountpoint_a = '/mnt/a' + mountpoint_b = '/mnt/b' + + self.mounted.add(mountpoint_a) + self.mounted.add(mountpoint_b) + + guests = map(mock_guest, [uuids.instance_a, uuids.instance_b], [ + # Local file root disk and a volume on each of mountpoints a and b + [ + ('file', (local_dir, uuids.instance_a, 'disk')), + ('file', (mountpoint_a, 'vola1')), + ('file', (mountpoint_b, 'volb1')), + ], + + # Local LVM root disk and a volume on each of mountpoints a and b + [ + ('block', ('/dev', 'vg', uuids.instance_b + '_disk')), + ('file', (mountpoint_a, 'vola2')), + ('file', (mountpoint_b, 'volb2')), + ] + ]) + + host = mock.create_autospec(libvirt_host.Host) + host.list_guests.return_value = guests + + m = mount._HostMountState(host, 0) + + self.assertEqual([mountpoint_a, mountpoint_b], + sorted(m.mountpoints.keys())) + + self.assertSetEqual(set([('vola1', uuids.instance_a), + ('vola2', uuids.instance_b)]), + m.mountpoints[mountpoint_a].attachments) + self.assertSetEqual(set([('volb1', uuids.instance_a), + ('volb2', uuids.instance_b)]), + m.mountpoints[mountpoint_b].attachments) + + @staticmethod + def _get_clean_hostmountstate(): + # list_guests returns no guests: _HostMountState initial state is + # clean. + host = mock.create_autospec(libvirt_host.Host) + host.list_guests.return_value = [] + return mount._HostMountState(host, 0) + + def _sentinel_mount(self, m, vol, mountpoint=mock.sentinel.mountpoint, + instance=None): + if instance is None: + instance = mock.sentinel.instance + instance.uuid = uuids.instance + + m.mount(mock.sentinel.fstype, mock.sentinel.export, + vol, mountpoint, instance, + [mock.sentinel.option1, mock.sentinel.option2]) + + def _sentinel_umount(self, m, vol, mountpoint=mock.sentinel.mountpoint, + instance=mock.sentinel.instance): + m.umount(vol, mountpoint, instance) + + @staticmethod + def _expected_sentinel_mount_calls(mountpoint=mock.sentinel.mountpoint): + return [mock.call('mkdir', '-p', mountpoint), + mock.call('mount', '-t', mock.sentinel.fstype, + mock.sentinel.option1, mock.sentinel.option2, + mock.sentinel.export, mountpoint, + run_as_root=True)] + + @staticmethod + def _expected_sentinel_umount_calls(mountpoint=mock.sentinel.mountpoint): + return [mock.call('umount', mountpoint, + attempts=3, delay_on_retry=True, + run_as_root=True), + mock.call('rmdir', mountpoint)] + + def test_mount_umount(self): + # Mount 2 different volumes from the same export. Test that we only + # mount and umount once. + m = self._get_clean_hostmountstate() + + # Mount vol_a from export + self._sentinel_mount(m, mock.sentinel.vol_a) + expected_calls = self._expected_sentinel_mount_calls() + mount.utils.execute.assert_has_calls(expected_calls) + + # Mount vol_b from export. We shouldn't have mounted again + self._sentinel_mount(m, mock.sentinel.vol_b) + mount.utils.execute.assert_has_calls(expected_calls) + + # Unmount vol_a. We shouldn't have unmounted + self._sentinel_umount(m, mock.sentinel.vol_a) + mount.utils.execute.assert_has_calls(expected_calls) + + # Unmount vol_b. We should have umounted. + self._sentinel_umount(m, mock.sentinel.vol_b) + expected_calls.extend(self._expected_sentinel_umount_calls()) + mount.utils.execute.assert_has_calls(expected_calls) + + def test_mount_umount_multi_attach(self): + # Mount a volume from a single export for 2 different instances. Test + # that we only mount and umount once. + m = self._get_clean_hostmountstate() + + instance_a = mock.sentinel.instance_a + instance_a.uuid = uuids.instance_a + instance_b = mock.sentinel.instance_b + instance_b.uuid = uuids.instance_b + + # Mount vol_a for instance_a + self._sentinel_mount(m, mock.sentinel.vol_a, instance=instance_a) + expected_calls = self._expected_sentinel_mount_calls() + mount.utils.execute.assert_has_calls(expected_calls) + + # Mount vol_a for instance_b. We shouldn't have mounted again + self._sentinel_mount(m, mock.sentinel.vol_a, instance=instance_b) + mount.utils.execute.assert_has_calls(expected_calls) + + # Unmount vol_a for instance_a. We shouldn't have unmounted + self._sentinel_umount(m, mock.sentinel.vol_a, instance=instance_a) + mount.utils.execute.assert_has_calls(expected_calls) + + # Unmount vol_a for instance_b. We should have umounted. + self._sentinel_umount(m, mock.sentinel.vol_a, instance=instance_b) + expected_calls.extend(self._expected_sentinel_umount_calls()) + mount.utils.execute.assert_has_calls(expected_calls) + + def test_mount_concurrent(self): + # This is 2 tests in 1, because the first test is the precondition + # for the second. + + # The first test is that if 2 threads call mount simultaneously, + # only one of them will call mount + + # The second test is that we correctly handle the case where we + # delete a lock after umount. During the umount of the first test, + # which will delete the lock when it completes, we start 2 more + # threads which both call mount. These threads are holding a lock + # which is about to be deleted. We test that they still don't race, + # and only one of them calls mount. + m = self._get_clean_hostmountstate() + + def mount_a(): + # Mount vol_a from export + self._sentinel_mount(m, mock.sentinel.vol_a) + TestThreadController.current().waitpoint('mounted') + self._sentinel_umount(m, mock.sentinel.vol_a) + + def mount_b(): + # Mount vol_b from export + self._sentinel_mount(m, mock.sentinel.vol_b) + self._sentinel_umount(m, mock.sentinel.vol_b) + + def mount_c(): + self._sentinel_mount(m, mock.sentinel.vol_c) + + def mount_d(): + self._sentinel_mount(m, mock.sentinel.vol_d) + + ctl_a = TestThreadController(mount_a) + ctl_b = TestThreadController(mount_b) + ctl_c = TestThreadController(mount_c) + ctl_d = TestThreadController(mount_d) + + orig_execute = mount.utils.execute.side_effect + + def trap_mount_umount(cmd, *args, **kwargs): + # Conditionally wait at a waitpoint named after the command + # we're executing + ctl = TestThreadController.current() + ctl.waitpoint(cmd) + + orig_execute(cmd, *args, **kwargs) + + mount.utils.execute.side_effect = trap_mount_umount + + expected_calls = [] + + # Run the first thread until it's blocked while calling mount + ctl_a.runto('mount') + expected_calls.extend(self._expected_sentinel_mount_calls()) + + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + # Start the second mount, and ensure it's got plenty of opportunity + # to race. + ctl_b.start() + time.sleep(0.01) + + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + # Allow ctl_a to complete its mount + ctl_a.runto('mounted') + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + # Allow ctl_b to finish. We should not have done a umount + ctl_b.finish() + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + # Allow ctl_a to start umounting + ctl_a.runto('umount') + + expected_calls.extend(self._expected_sentinel_umount_calls()) + # We haven't executed rmdir yet, beause we've blocked during umount + rmdir = expected_calls.pop() + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + expected_calls.append(rmdir) + + # While ctl_a is umounting, simultaneously start both ctl_c and + # ctl_d, and ensure they have an opportunity to race + ctl_c.start() + ctl_d.start() + time.sleep(0.01) + + # Allow a, c, and d to complete + for ctl in (ctl_a, ctl_c, ctl_d): + ctl.finish() + + # We should have completed the previous umount, then remounted + # exactly once + expected_calls.extend(self._expected_sentinel_mount_calls()) + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + def test_mount_concurrent_no_interfere(self): + # Test that concurrent calls to mount volumes in different exports + # run concurrently + m = self._get_clean_hostmountstate() + + def mount_a(): + # Mount vol on mountpoint a + self._sentinel_mount(m, mock.sentinel.vol, + mock.sentinel.mountpoint_a) + TestThreadController.current().waitpoint('mounted') + self._sentinel_umount(m, mock.sentinel.vol, + mock.sentinel.mountpoint_a) + + def mount_b(): + # Mount vol on mountpoint b + self._sentinel_mount(m, mock.sentinel.vol, + mock.sentinel.mountpoint_b) + self._sentinel_umount(m, mock.sentinel.vol, + mock.sentinel.mountpoint_b) + + ctl_a = TestThreadController(mount_a) + ctl_b = TestThreadController(mount_b) + + expected_calls = [] + + ctl_a.runto('mounted') + expected_calls.extend(self._expected_sentinel_mount_calls( + mock.sentinel.mountpoint_a)) + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + ctl_b.finish() + expected_calls.extend(self._expected_sentinel_mount_calls( + mock.sentinel.mountpoint_b)) + expected_calls.extend(self._expected_sentinel_umount_calls( + mock.sentinel.mountpoint_b)) + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + ctl_a.finish() + expected_calls.extend(self._expected_sentinel_umount_calls( + mock.sentinel.mountpoint_a)) + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + def test_mount_after_failed_umount(self): + # Test that MountManager correctly tracks state when umount fails. + # Test that when umount fails a subsequent mount doesn't try to + # remount it. + + # We've already got a fake execute (see setUp) which is ensuring mount, + # umount, and ismount work as expected. We don't want to mess with + # that, except that we want umount to raise an exception. We store the + # original here so we can call it if we're not unmounting, and so we + # can restore it when we no longer want the exception. + orig_execute = mount.utils.execute.side_effect + + def raise_on_umount(cmd, *args, **kwargs): + if cmd == 'umount': + raise mount.processutils.ProcessExecutionError() + orig_execute(cmd, *args, **kwargs) + + mount.utils.execute.side_effect = raise_on_umount + + expected_calls = [] + + m = self._get_clean_hostmountstate() + + # Mount vol_a + self._sentinel_mount(m, mock.sentinel.vol_a) + expected_calls.extend(self._expected_sentinel_mount_calls()) + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + # Umount vol_a. The umount command will fail. + self._sentinel_umount(m, mock.sentinel.vol_a) + expected_calls.extend(self._expected_sentinel_umount_calls()) + + # We should not have called rmdir, because umount failed + expected_calls.pop() + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + # Mount vol_a again. We should not have called mount, because umount + # failed. + self._sentinel_mount(m, mock.sentinel.vol_a) + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + # Prevent future failure of umount + mount.utils.execute.side_effect = orig_execute + + # Umount vol_a successfully + self._sentinel_umount(m, mock.sentinel.vol_a) + expected_calls.extend(self._expected_sentinel_umount_calls()) + self.assertEqual(expected_calls, mount.utils.execute.call_args_list) + + @mock.patch.object(mount.LOG, 'error') + def test_umount_log_failure(self, mock_LOG_error): + # Test that we log an error when umount fails + orig_execute = mount.utils.execute.side_effect + + def raise_on_umount(cmd, *args, **kwargs): + if cmd == 'umount': + raise mount.processutils.ProcessExecutionError( + None, None, None, 'umount', 'umount: device is busy.') + orig_execute(cmd, *args, **kwargs) + + mount.utils.execute.side_effect = raise_on_umount + + m = self._get_clean_hostmountstate() + + self._sentinel_mount(m, mock.sentinel.vol_a) + self._sentinel_umount(m, mock.sentinel.vol_a) + + mock_LOG_error.assert_called() + + +class MountManagerTestCase(test.NoDBTestCase): + class FakeHostMountState(object): + def __init__(self, host, generation): + self.host = host + self.generation = generation + + ctl = TestThreadController.current() + if ctl is not None: + ctl.waitpoint('init') + + def setUp(self): + super(MountManagerTestCase, self).setUp() + + self.useFixture(fixtures.MonkeyPatch( + 'nova.virt.libvirt.volume.mount._HostMountState', + self.FakeHostMountState)) + + self.m = mount.get_manager() + self.m._reset_state() + + def _get_state(self): + with self.m.get_state() as state: + return state + + def test_host_up_down(self): + self.m.host_up(mock.sentinel.host) + state = self._get_state() + self.assertEqual(state.host, mock.sentinel.host) + self.assertEqual(state.generation, 0) + + self.m.host_down() + self.assertRaises(exception.HypervisorUnavailable, self._get_state) + + def test_host_up_waits_for_completion(self): + self.m.host_up(mock.sentinel.host) + + def txn(): + with self.m.get_state(): + TestThreadController.current().waitpoint('running') + + # Start a thread which blocks holding a state object + ctl = TestThreadController(txn) + ctl.runto('running') + + # Host goes down + self.m.host_down() + + # Call host_up in a separate thread because it will block, and give + # it plenty of time to race + host_up = eventlet.greenthread.spawn(self.m.host_up, + mock.sentinel.host) + time.sleep(0.01) + + # Assert that we haven't instantiated a new state while there's an + # ongoing operation from the previous state + self.assertRaises(exception.HypervisorUnavailable, self._get_state) + + # Allow the previous ongoing operation and host_up to complete + ctl.finish() + host_up.wait() + + # Assert that we've got a new state generation + state = self._get_state() + self.assertEqual(1, state.generation) diff --git a/nova/tests/unit/virt/libvirt/volume/test_nfs.py b/nova/tests/unit/virt/libvirt/volume/test_nfs.py index 8f58cbeec826..a101791aca3a 100644 --- a/nova/tests/unit/virt/libvirt/volume/test_nfs.py +++ b/nova/tests/unit/virt/libvirt/volume/test_nfs.py @@ -12,12 +12,13 @@ import os +import fixtures import mock -from oslo_concurrency import processutils from nova.tests.unit.virt.libvirt.volume import test_volume +from nova.tests import uuidsentinel as uuids from nova import utils -from nova.virt.libvirt import utils as libvirt_utils +from nova.virt.libvirt.volume import mount from nova.virt.libvirt.volume import nfs @@ -26,11 +27,38 @@ class LibvirtNFSVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase): def setUp(self): super(LibvirtNFSVolumeDriverTestCase, self).setUp() + + m = mount.get_manager() + m._reset_state() + self.mnt_base = '/mnt' + m.host_up(self.fake_host) self.flags(nfs_mount_point_base=self.mnt_base, group='libvirt') - @mock.patch.object(libvirt_utils, 'is_mounted', return_value=False) - def test_libvirt_nfs_driver(self, mock_is_mounted): + # Caution: this is also faked by the superclass + orig_execute = utils.execute + + mounted = [False] + + def fake_execute(*cmd, **kwargs): + orig_execute(*cmd, **kwargs) + + if cmd[0] == 'mount': + mounted[0] = True + + if cmd[0] == 'umount': + mounted[0] = False + + self.useFixture(fixtures.MonkeyPatch('nova.utils.execute', + fake_execute)) + + # Mock ismount to return the current mount state + # N.B. This is only valid for tests which mount and unmount a single + # directory. + self.useFixture(fixtures.MonkeyPatch('os.path.ismount', + lambda *args, **kwargs: mounted[0])) + + def test_libvirt_nfs_driver(self): libvirt_driver = nfs.LibvirtNFSVolumeDriver(self.fake_host) export_string = '192.168.1.1:/nfs/share1' @@ -39,8 +67,10 @@ class LibvirtNFSVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase): connection_info = {'data': {'export': export_string, 'name': self.name}} + instance = mock.sentinel.instance + instance.uuid = uuids.instance libvirt_driver.connect_volume(connection_info, self.disk_info, - mock.sentinel.instance) + instance) libvirt_driver.disconnect_volume(connection_info, "vde", mock.sentinel.instance) @@ -50,39 +80,9 @@ class LibvirtNFSVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase): expected_commands = [ ('mkdir', '-p', export_mnt_base), ('mount', '-t', 'nfs', export_string, export_mnt_base), - ('umount', export_mnt_base)] + ('umount', export_mnt_base), + ('rmdir', export_mnt_base)] self.assertEqual(expected_commands, self.executes) - self.assertTrue(mock_is_mounted.called) - - @mock.patch.object(nfs.utils, 'execute') - @mock.patch.object(nfs.LOG, 'debug') - @mock.patch.object(nfs.LOG, 'exception') - def test_libvirt_nfs_driver_umount_error(self, mock_LOG_exception, - mock_LOG_debug, mock_utils_exe): - export_string = '192.168.1.1:/nfs/share1' - connection_info = {'data': {'export': export_string, - 'name': self.name}} - libvirt_driver = nfs.LibvirtNFSVolumeDriver(self.fake_host) - mock_utils_exe.side_effect = processutils.ProcessExecutionError( - None, None, None, 'umount', 'umount: device is busy.') - libvirt_driver.disconnect_volume(connection_info, "vde", - mock.sentinel.instance) - self.assertTrue(mock_LOG_debug.called) - mock_utils_exe.side_effect = processutils.ProcessExecutionError( - None, None, None, 'umount', 'umount: target is busy.') - libvirt_driver.disconnect_volume(connection_info, "vde", - mock.sentinel.instance) - self.assertTrue(mock_LOG_debug.called) - mock_utils_exe.side_effect = processutils.ProcessExecutionError( - None, None, None, 'umount', 'umount: not mounted.') - libvirt_driver.disconnect_volume(connection_info, "vde", - mock.sentinel.instance) - self.assertTrue(mock_LOG_debug.called) - mock_utils_exe.side_effect = processutils.ProcessExecutionError( - None, None, None, 'umount', 'umount: Other error.') - libvirt_driver.disconnect_volume(connection_info, "vde", - mock.sentinel.instance) - self.assertTrue(mock_LOG_exception.called) def test_libvirt_nfs_driver_get_config(self): libvirt_driver = nfs.LibvirtNFSVolumeDriver(self.fake_host) @@ -100,27 +100,7 @@ class LibvirtNFSVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase): self.assertEqual('raw', tree.find('./driver').get('type')) self.assertEqual('native', tree.find('./driver').get('io')) - @mock.patch.object(libvirt_utils, 'is_mounted', return_value=True) - def test_libvirt_nfs_driver_already_mounted(self, mock_is_mounted): - libvirt_driver = nfs.LibvirtNFSVolumeDriver(self.fake_host) - - export_string = '192.168.1.1:/nfs/share1' - export_mnt_base = os.path.join(self.mnt_base, - utils.get_hash_str(export_string)) - - connection_info = {'data': {'export': export_string, - 'name': self.name}} - libvirt_driver.connect_volume(connection_info, self.disk_info, - mock.sentinel.instance) - libvirt_driver.disconnect_volume(connection_info, "vde", - mock.sentinel.instance) - - expected_commands = [ - ('umount', export_mnt_base)] - self.assertEqual(expected_commands, self.executes) - - @mock.patch.object(libvirt_utils, 'is_mounted', return_value=False) - def test_libvirt_nfs_driver_with_opts(self, mock_is_mounted): + def test_libvirt_nfs_driver_with_opts(self): libvirt_driver = nfs.LibvirtNFSVolumeDriver(self.fake_host) export_string = '192.168.1.1:/nfs/share1' options = '-o intr,nfsvers=3' @@ -130,8 +110,10 @@ class LibvirtNFSVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase): connection_info = {'data': {'export': export_string, 'name': self.name, 'options': options}} + instance = mock.sentinel.instance + instance.uuid = uuids.instance libvirt_driver.connect_volume(connection_info, self.disk_info, - mock.sentinel.instance) + instance) libvirt_driver.disconnect_volume(connection_info, "vde", mock.sentinel.instance) @@ -140,6 +122,6 @@ class LibvirtNFSVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase): ('mount', '-t', 'nfs', '-o', 'intr,nfsvers=3', export_string, export_mnt_base), ('umount', export_mnt_base), + ('rmdir', export_mnt_base) ] self.assertEqual(expected_commands, self.executes) - self.assertTrue(mock_is_mounted.called) diff --git a/nova/virt/libvirt/driver.py b/nova/virt/libvirt/driver.py index 6b1a1e5855b7..3cc061d1aae9 100644 --- a/nova/virt/libvirt/driver.py +++ b/nova/virt/libvirt/driver.py @@ -108,6 +108,7 @@ from nova.virt.libvirt.storage import lvm from nova.virt.libvirt.storage import rbd_utils from nova.virt.libvirt import utils as libvirt_utils from nova.virt.libvirt import vif as libvirt_vif +from nova.virt.libvirt.volume import mount from nova.virt.libvirt.volume import remotefs from nova.virt import netutils from nova.volume import cinder @@ -3462,6 +3463,11 @@ class LibvirtDriver(driver.ComputeDriver): 'due to an unexpected exception.'), CONF.host, exc_info=True) + if enabled: + mount.get_manager().host_up(self._host) + else: + mount.get_manager().host_down() + def _get_guest_cpu_model_config(self): mode = CONF.libvirt.cpu_mode model = CONF.libvirt.cpu_model diff --git a/nova/virt/libvirt/volume/fs.py b/nova/virt/libvirt/volume/fs.py index df84f068a16c..b14f2942aeb0 100644 --- a/nova/virt/libvirt/volume/fs.py +++ b/nova/virt/libvirt/volume/fs.py @@ -18,6 +18,7 @@ import os import six from nova import utils +from nova.virt.libvirt.volume import mount from nova.virt.libvirt.volume import volume as libvirt_volume @@ -93,3 +94,40 @@ class LibvirtBaseFileSystemVolumeDriver( """ mount_path = self._get_mount_path(connection_info) return os.path.join(mount_path, connection_info['data']['name']) + + +@six.add_metaclass(abc.ABCMeta) +class LibvirtMountedFileSystemVolumeDriver(LibvirtBaseFileSystemVolumeDriver): + # NOTE(mdbooth): Hopefully we'll get to the point where everything which + # previously subclassed LibvirtBaseFileSystemVolumeDriver now subclasses + # LibvirtMountedFileSystemVolumeDriver. If we get there, we should fold + # this class into the base class. + def __init__(self, host, fstype): + super(LibvirtMountedFileSystemVolumeDriver, self).__init__(host) + + self.fstype = fstype + + def connect_volume(self, connection_info, disk_info, instance): + """Connect the volume.""" + export = connection_info['data']['export'] + vol_name = connection_info['data']['name'] + mountpoint = self._get_mount_path(connection_info) + + mount.mount(self.fstype, export, vol_name, mountpoint, instance, + self._mount_options(connection_info)) + + connection_info['data']['device_path'] = \ + self._get_device_path(connection_info) + + def disconnect_volume(self, connection_info, disk_dev, instance): + """Disconnect the volume.""" + vol_name = connection_info['data']['name'] + mountpoint = self._get_mount_path(connection_info) + + mount.umount(vol_name, mountpoint, instance) + + @abc.abstractmethod + def _mount_options(self, connection_info): + """Return a list of additional arguments to pass to the mount command. + """ + pass diff --git a/nova/virt/libvirt/volume/mount.py b/nova/virt/libvirt/volume/mount.py new file mode 100644 index 000000000000..b761a33494ba --- /dev/null +++ b/nova/virt/libvirt/volume/mount.py @@ -0,0 +1,428 @@ +# Copyright 2016,2017 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import contextlib +import os.path +import threading + +from oslo_concurrency import processutils +from oslo_log import log +import six + +import nova.conf +from nova import exception +from nova.i18n import _LE, _LW +from nova import utils + +CONF = nova.conf.CONF +LOG = log.getLogger(__name__) + + +class _HostMountStateManager(object): + """A global manager of filesystem mounts. + + _HostMountStateManager manages a _HostMountState object for the current + compute node. Primarily it creates one on host_up(), destroys it on + host_down(), and returns it via get_state(). + + _HostMountStateManager manages concurrency itself. Independent callers do + not need to consider interactions between multiple _HostMountStateManager + calls when designing their own locking. + + _HostMountStateManager is a singleton, and must only be accessed via: + + mount.get_manager() + """ + + def __init__(self): + self._reset_state() + + def _reset_state(self): + """Reset state of global _HostMountStateManager. + + Should only be called by __init__ and tests. + """ + + self.state = None + self.use_count = 0 + + # Guards both state and use_count + self.cond = threading.Condition() + + # Incremented each time we initialise a new mount state. Aids + # debugging. + self.generation = 0 + + @contextlib.contextmanager + def get_state(self): + """Return the current mount state. + + _HostMountStateManager will not permit a new state object to be + created while any previous state object is still in use. + + get_state will raise HypervisorUnavailable if the libvirt connection is + currently down. + + :rtype: _HostMountState + """ + + # We hold the instance lock here so that if a _HostMountState is + # currently initialising we'll wait for it to complete rather than + # fail. + with self.cond: + state = self.state + if state is None: + raise exception.HypervisorUnavailable(host=CONF.host) + self.use_count += 1 + + try: + LOG.debug('Got _HostMountState generation %(gen)i', + {'gen': state.generation}) + + yield state + finally: + with self.cond: + self.use_count -= 1 + self.cond.notify_all() + + def host_up(self, host): + """Inialise a new _HostMountState when the libvirt connection comes + up. + + host_up will destroy and re-initialise the current state if one + already exists, but this is considered an error. + + host_up will block before creating a new state until all operations + using a previous state have completed. + + :param host: A connected libvirt Host object + """ + with self.cond: + if self.state is not None: + LOG.warning(_LW("host_up called, but we think host is " + "already up")) + self._host_down() + + # Wait until all operations using a previous state generation are + # complete before initialising a new one. Note that self.state is + # already None, set either by initialisation or by host_down. This + # means the current state will not be returned to any new callers, + # and use_count will eventually reach zero. + # We do this to avoid a race between _HostMountState initialisation + # and an on-going mount/unmount operation + while self.use_count != 0: + self.cond.wait() + + # Another thread might have initialised state while we were + # wait()ing + if self.state is None: + LOG.debug('Initialising _HostMountState generation %(gen)i', + {'gen': self.generation}) + self.state = _HostMountState(host, self.generation) + self.generation += 1 + + def host_down(self): + """Destroy the current _HostMountState when the libvirt connection + goes down. + """ + with self.cond: + if self.state is None: + LOG.warning(_LW("host_down called, but we don't think host " + "is up")) + return + + self._host_down() + + def _host_down(self): + LOG.debug('Destroying MountManager generation %(gen)i', + {'gen': self.state.generation}) + self.state = None + + +class _HostMountState(object): + """A data structure recording all managed mountpoints and the + attachments in use for each one. _HostMountState ensures that the compute + node only attempts to mount a single mountpoint in use by multiple + attachments once, and that it is not unmounted until it is no longer in use + by any attachments. + + Callers should not create a _HostMountState directly, but should obtain + it via: + + with mount.get_manager().get_state() as state: + state.mount(...) + + On creation _HostMountState inspects the compute host directly to discover + all current mountpoints and the attachments on them. After creation it + expects to have exclusive control of these mountpoints until it is + destroyed. + + _HostMountState manages concurrency itself. Independent callers do not need + to consider interactions between multiple _HostMountState calls when + designing their own locking. + """ + + class _MountPoint(object): + """A single mountpoint, and the set of attachments in use on it.""" + def __init__(self): + # A guard for operations on this mountpoint + # N.B. Care is required using this lock, as it will be deleted + # if the containing _MountPoint is deleted. + self.lock = threading.Lock() + + # The set of attachments on this mountpoint. + self.attachments = set() + + def add_attachment(self, vol_name, instance_uuid): + self.attachments.add((vol_name, instance_uuid)) + + def remove_attachment(self, vol_name, instance_uuid): + self.attachments.remove((vol_name, instance_uuid)) + + def in_use(self): + return len(self.attachments) > 0 + + def __init__(self, host, generation): + """Initialise a _HostMountState by inspecting the current compute + host for mountpoints and the attachments in use on them. + + :param host: A connected libvirt Host object + :param generation: An integer indicating the generation of this + _HostMountState object. This is 0 for the first + _HostMountState created, and incremented for each + created subsequently. It is used in log messages to + aid debugging. + """ + self.generation = generation + self.mountpoints = collections.defaultdict(self._MountPoint) + + # Iterate over all guests on the connected libvirt + for guest in host.list_guests(only_running=False): + for disk in guest.get_all_disks(): + + # All remote filesystem volumes are files + if disk.source_type != 'file': + continue + + # NOTE(mdbooth): We're assuming that the mountpoint is our + # immediate parent, which is currently true for all + # volume drivers. We deliberately don't do anything clever + # here, because we don't want to, e.g.: + # * Add mountpoints for non-volume disks + # * Get it wrong when a non-running domain references a + # volume which isn't mounted because the host just rebooted. + # and this is good enough. We could probably do better here + # with more thought. + + mountpoint = os.path.dirname(disk.source_path) + if not os.path.ismount(mountpoint): + continue + + name = os.path.basename(disk.source_path) + mount = self.mountpoints[mountpoint] + mount.add_attachment(name, guest.uuid) + + LOG.debug('Discovered volume %(vol)s in use for existing ' + 'mountpoint %(mountpoint)s', + {'vol': name, 'mountpoint': mountpoint}) + + @contextlib.contextmanager + def _get_locked(self, mountpoint): + """Get a locked mountpoint object + + :param mountpoint: The path of the mountpoint whose object we should + return. + :rtype: _HostMountState._MountPoint + """ + # This dance is because we delete locks. We need to be sure that the + # lock we hold does not belong to an object which has been deleted. + # We do this by checking that mountpoint still refers to this object + # when we hold the lock. This is safe because: + # * we only delete an object from mountpounts whilst holding its lock + # * mountpoints is a defaultdict which will atomically create a new + # object on access + while True: + mount = self.mountpoints[mountpoint] + with mount.lock: + if self.mountpoints[mountpoint] is mount: + yield mount + break + + def mount(self, fstype, export, vol_name, mountpoint, instance, options): + """Ensure a mountpoint is available for an attachment, mounting it + if necessary. + + If this is the first attachment on this mountpoint, we will mount it + with: + + mount -t + + :param fstype: The filesystem type to be passed to mount command. + :param export: The type-specific identifier of the filesystem to be + mounted. e.g. for nfs 'host.example.com:/mountpoint'. + :param vol_name: The name of the volume on the remote filesystem. + :param mountpoint: The directory where the filesystem will be + mounted on the local compute host. + :param instance: The instance the volume will be attached to. + :param options: An arbitrary list of additional arguments to be + passed to the mount command immediate before export + and mountpoint. + """ + + # NOTE(mdbooth): mount() may currently be called multiple times for a + # single attachment. Any operation which calls + # LibvirtDriver._hard_reboot will re-attach volumes which are probably + # already attached, resulting in multiple mount calls. + + LOG.debug('_HostMountState.mount(fstype=%(fstype)s, ' + 'export=%(export)s, vol_name=%(vol_name)s, %(mountpoint)s, ' + 'options=%(options)s) generation %(gen)s', + {'fstype': fstype, 'export': export, 'vol_name': vol_name, + 'mountpoint': mountpoint, 'options': options, + 'gen': self.generation}) + with self._get_locked(mountpoint) as mount: + if not os.path.ismount(mountpoint): + LOG.debug('Mounting %(mountpoint)s generation %(gen)s', + {'mountpoint': mountpoint, 'gen': self.generation}) + + utils.execute('mkdir', '-p', mountpoint) + + mount_cmd = ['mount', '-t', fstype] + if options is not None: + mount_cmd.extend(options) + mount_cmd.extend([export, mountpoint]) + + try: + utils.execute(*mount_cmd, run_as_root=True) + except Exception: + # Check to see if mountpoint is mounted despite the error + # eg it was already mounted + if os.path.ismount(mountpoint): + # We're not going to raise the exception because we're + # in the desired state anyway. However, this is still + # unusual so we'll log it. + LOG.exception(_LE('Error mounting %(fstype)s export ' + '%(export)s on %(mountpoint)s. ' + 'Continuing because mountpount is ' + 'mounted despite this.'), + {'fstype': fstype, 'export': export, + 'mountpoint': mountpoint}) + + else: + # If the mount failed there's no reason for us to keep + # a record of it. It will be created again if the + # caller retries. + + # Delete while holding lock + del self.mountpoints[mountpoint] + + raise + + mount.add_attachment(vol_name, instance.uuid) + + LOG.debug('_HostMountState.mount() for %(mountpoint)s ' + 'generation %(gen)s completed successfully', + {'mountpoint': mountpoint, 'gen': self.generation}) + + def umount(self, vol_name, mountpoint, instance): + """Mark an attachment as no longer in use, and unmount its mountpoint + if necessary. + + :param vol_name: The name of the volume on the remote filesystem. + :param mountpoint: The directory where the filesystem is be + mounted on the local compute host. + :param instance: The instance the volume was attached to. + """ + LOG.debug('_HostMountState.umount(vol_name=%(vol_name)s, ' + 'mountpoint=%(mountpoint)s) generation %(gen)s', + {'vol_name': vol_name, 'mountpoint': mountpoint, + 'gen': self.generation}) + with self._get_locked(mountpoint) as mount: + try: + mount.remove_attachment(vol_name, instance.uuid) + except KeyError: + LOG.warning(_LW("Request to remove attachment " + "(%(vol_name)s, %(instance)s) from " + "%(mountpoint)s, but we don't think it's in " + "use."), + {'vol_name': vol_name, 'instance': instance.uuid, + 'mountpoint': mountpoint}) + + if not mount.in_use(): + mounted = os.path.ismount(mountpoint) + + if mounted: + mounted = self._real_umount(mountpoint) + + # Delete our record entirely if it's unmounted + if not mounted: + del self.mountpoints[mountpoint] + + LOG.debug('_HostMountState.umount() for %(mountpoint)s ' + 'generation %(gen)s completed successfully', + {'mountpoint': mountpoint, 'gen': self.generation}) + + def _real_umount(self, mountpoint): + # Unmount and delete a mountpoint. + # Return mount state after umount (i.e. True means still mounted) + LOG.debug('Unmounting %(mountpoint)s generation %(gen)s', + {'mountpoint': mountpoint, 'gen': self.generation}) + + try: + utils.execute('umount', mountpoint, run_as_root=True, + attempts=3, delay_on_retry=True) + except processutils.ProcessExecutionError as ex: + LOG.error(_LE("Couldn't unmount %(mountpoint)s: %(reason)s"), + {'mountpoint': mountpoint, 'reason': six.text_type(ex)}) + + if not os.path.ismount(mountpoint): + try: + utils.execute('rmdir', mountpoint) + except processutils.ProcessExecutionError as ex: + LOG.error(_LE("Couldn't remove directory %(mountpoint)s: " + "%(reason)s"), + {'mountpoint': mountpoint, + 'reason': six.text_type(ex)}) + return False + + return True + + +__manager__ = _HostMountStateManager() + + +def get_manager(): + """Return the _HostMountStateManager singleton. + + :rtype: _HostMountStateManager + """ + return __manager__ + + +def mount(fstype, export, vol_name, mountpoint, instance, options=None): + """A convenience wrapper around _HostMountState.mount(), called via the + _HostMountStateManager singleton. + """ + with __manager__.get_state() as mount_state: + mount_state.mount(fstype, export, vol_name, mountpoint, instance, + options) + + +def umount(vol_name, mountpoint, instance): + """A convenience wrapper around _HostMountState.umount(), called via the + _HostMountStateManager singleton. + """ + with __manager__.get_state() as mount_state: + mount_state.umount(vol_name, mountpoint, instance) diff --git a/nova/virt/libvirt/volume/nfs.py b/nova/virt/libvirt/volume/nfs.py index 327328f0e665..594950573ffa 100644 --- a/nova/virt/libvirt/volume/nfs.py +++ b/nova/virt/libvirt/volume/nfs.py @@ -11,14 +11,9 @@ # under the License. -from oslo_concurrency import processutils from oslo_log import log as logging -import six import nova.conf -from nova.i18n import _LE, _LW -from nova import utils -from nova.virt.libvirt import utils as libvirt_utils from nova.virt.libvirt.volume import fs LOG = logging.getLogger(__name__) @@ -26,9 +21,12 @@ LOG = logging.getLogger(__name__) CONF = nova.conf.CONF -class LibvirtNFSVolumeDriver(fs.LibvirtBaseFileSystemVolumeDriver): +class LibvirtNFSVolumeDriver(fs.LibvirtMountedFileSystemVolumeDriver): """Class implements libvirt part of volume driver for NFS.""" + def __init__(self, connection): + super(LibvirtNFSVolumeDriver, self).__init__(connection, 'nfs') + def _get_mount_point_base(self): return CONF.libvirt.nfs_mount_point_base @@ -43,57 +41,13 @@ class LibvirtNFSVolumeDriver(fs.LibvirtBaseFileSystemVolumeDriver): conf.driver_io = "native" return conf - def connect_volume(self, connection_info, disk_info, instance): - """Connect the volume.""" - self._ensure_mounted(connection_info) - - connection_info['data']['device_path'] = \ - self._get_device_path(connection_info) - - def disconnect_volume(self, connection_info, disk_dev, instance): - """Disconnect the volume.""" - - mount_path = self._get_mount_path(connection_info) - - try: - utils.execute('umount', mount_path, run_as_root=True) - except processutils.ProcessExecutionError as exc: - export = connection_info['data']['export'] - if ('device is busy' in six.text_type(exc) or - 'target is busy' in six.text_type(exc)): - LOG.debug("The NFS share %s is still in use.", export) - elif ('not mounted' in six.text_type(exc)): - LOG.debug("The NFS share %s has already been unmounted.", - export) - else: - LOG.exception(_LE("Couldn't unmount the NFS share %s"), export) - - def _ensure_mounted(self, connection_info): - """@type connection_info: dict - """ - nfs_export = connection_info['data']['export'] - mount_path = self._get_mount_path(connection_info) - if not libvirt_utils.is_mounted(mount_path, nfs_export): - options = connection_info['data'].get('options') - self._mount_nfs(mount_path, nfs_export, options, ensure=True) - return mount_path - - def _mount_nfs(self, mount_path, nfs_share, options=None, ensure=False): - """Mount nfs export to mount path.""" - utils.execute('mkdir', '-p', mount_path) - - # Construct the NFS mount command. - nfs_cmd = ['mount', '-t', 'nfs'] + def _mount_options(self, connection_info): + options = [] if CONF.libvirt.nfs_mount_options is not None: - nfs_cmd.extend(['-o', CONF.libvirt.nfs_mount_options]) - if options: - nfs_cmd.extend(options.split(' ')) - nfs_cmd.extend([nfs_share, mount_path]) + options.extend(['-o', CONF.libvirt.nfs_mount_options]) - try: - utils.execute(*nfs_cmd, run_as_root=True) - except processutils.ProcessExecutionError as exc: - if ensure and 'already mounted' in six.text_type(exc): - LOG.warning(_LW("%s is already mounted"), nfs_share) - else: - raise + conn_options = connection_info['data'].get('options') + if conn_options: + options.extend(conn_options.split(' ')) + + return options