libvirt: Fix races with nfs volume mount/umount

A single nfs export typically contains multiple volumes. We were
handling this in the libvirt driver by:

1. On mount, we 'ensure' the mount is available, so we don't fail if
   another instance already has it mounted.

2. On umount, we trap and ignore 'device is busy' so we don't fail if
   another instance is already using it.

Unfortunately, while this works for serial mounts and unmounts, there
are multiple failure cases when volumes from the same export are
mounted and unmounted simultaneously. It causes an error if an
instance is stopped: as the qemu process is not actively using the
mountpoint it will not prevent an unmount for another volume on the
same mountpoint from succeeding. It will not be possible to restart
the instance, because its mountpoint will not be mounted.

To fix this, we create a singleton manager object, which tracks mounts
and umount requests per export, and calls the real mount/umount only
when required. It uses per-export locks to allow concurrency while
avoiding races. Because we now expect to know the state of the host at
all times, we no longer need to execute speculative mount/umount
commands.

As we track attachments (a mapping from volume to instance) rather
than volumes, we also gracefully support multi-attach.

This change implements this for nfs, but the solution is intended to
be extended to all LibvirtBaseFileSystemVolumeDrivers.

Closes-Bug: #1421550
Change-Id: I3155984d76df06371a6c45f633aa448168a96d64
This commit is contained in:
Matthew Booth 2016-10-07 19:14:38 +01:00
parent b66b7d4f9d
commit 4aa39c44a4
6 changed files with 1137 additions and 118 deletions

View File

@ -0,0 +1,611 @@
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os.path
import threading
import time
import eventlet
import fixtures
import mock
from oslo_concurrency import processutils
from nova import exception
from nova import test
from nova.tests import uuidsentinel as uuids
from nova.virt.libvirt import config as libvirt_config
from nova.virt.libvirt import guest as libvirt_guest
from nova.virt.libvirt import host as libvirt_host
from nova.virt.libvirt.volume import mount
# We wait on events in a few cases. In normal execution the wait period should
# be in the order of fractions of a millisecond. However, if we've hit a bug we
# might deadlock and never return. To be nice to our test environment, we cut
# this short at MAX_WAIT seconds. This should be large enough that normal
# jitter won't trigger it, but not so long that it's annoying to wait for.
MAX_WAIT = 2
class TestThreadController(object):
"""Helper class for executing a test thread incrementally by waiting at
named waitpoints.
def test(ctl):
things()
ctl.waitpoint('foo')
more_things()
ctl.waitpoint('bar')
final_things()
ctl = TestThreadController(test)
ctl.runto('foo')
assert(things)
ctl.runto('bar')
assert(more_things)
ctl.finish()
assert(final_things)
This gets more interesting when the waitpoints are mocked into non-test
code.
"""
# A map of threads to controllers
all_threads = {}
def __init__(self, fn):
"""Create a TestThreadController.
:param fn: A test function which takes a TestThreadController as its
only argument
"""
# All updates to wait_at and waiting are guarded by wait_lock
self.wait_lock = threading.Condition()
# The name of the next wait point
self.wait_at = None
# True when waiting at a waitpoint
self.waiting = False
# Incremented every time we continue from a waitpoint
self.epoch = 1
# The last epoch we waited at
self.last_epoch = 0
self.start_event = eventlet.event.Event()
self.running = False
self.complete = False
# We must not execute fn() until the thread has been registered in
# all_threads. eventlet doesn't give us an API to do this directly,
# so we defer with an Event
def deferred_start():
self.start_event.wait()
fn()
with self.wait_lock:
self.complete = True
self.wait_lock.notify_all()
self.thread = eventlet.greenthread.spawn(deferred_start)
self.all_threads[self.thread] = self
@classmethod
def current(cls):
return cls.all_threads.get(eventlet.greenthread.getcurrent())
def _ensure_running(self):
if not self.running:
self.running = True
self.start_event.send()
def waitpoint(self, name):
"""Called by the test thread. Wait at a waitpoint called name"""
with self.wait_lock:
wait_since = time.time()
while name == self.wait_at:
self.waiting = True
self.wait_lock.notify_all()
self.wait_lock.wait(1)
assert(time.time() - wait_since < MAX_WAIT)
self.epoch += 1
self.waiting = False
self.wait_lock.notify_all()
def runto(self, name):
"""Called by the control thread. Cause the test thread to run until
reaching a waitpoint called name. When runto() exits, the test
thread is guaranteed to have reached this waitpoint.
"""
with self.wait_lock:
# Set a new wait point
self.wait_at = name
self.wait_lock.notify_all()
# We deliberately don't do this first to avoid a race the first
# time we call runto()
self._ensure_running()
# Wait until the test thread is at the wait point
wait_since = time.time()
while self.epoch == self.last_epoch or not self.waiting:
self.wait_lock.wait(1)
assert(time.time() - wait_since < MAX_WAIT)
self.last_epoch = self.epoch
def start(self):
"""Called by the control thread. Cause the test thread to start
running, but to not wait for it to complete.
"""
self._ensure_running()
def finish(self):
"""Called by the control thread. Cause the test thread to run to
completion. When finish() exits, the test thread is guaranteed to
have completed.
"""
self._ensure_running()
wait_since = time.time()
with self.wait_lock:
self.wait_at = None
self.wait_lock.notify_all()
while not self.complete:
self.wait_lock.wait(1)
assert(time.time() - wait_since < MAX_WAIT)
self.thread.wait()
class HostMountStateTestCase(test.NoDBTestCase):
def setUp(self):
super(HostMountStateTestCase, self).setUp()
self.mounted = set()
def fake_execute(cmd, *args, **kwargs):
if cmd == 'mount':
path = args[-1]
if path in self.mounted:
raise processutils.ProcessExecutionError('Already mounted')
self.mounted.add(path)
elif cmd == 'umount':
path = args[-1]
if path not in self.mounted:
raise processutils.ProcessExecutionError('Not mounted')
self.mounted.remove(path)
def fake_ismount(path):
return path in self.mounted
mock_execute = mock.MagicMock(side_effect=fake_execute)
self.useFixture(fixtures.MonkeyPatch('nova.utils.execute',
mock_execute))
self.useFixture(fixtures.MonkeyPatch('os.path.ismount', fake_ismount))
def test_init(self):
# Test that we initialise the state of MountManager correctly at
# startup
def fake_disk(disk):
libvirt_disk = libvirt_config.LibvirtConfigGuestDisk()
libvirt_disk.source_type = disk[0]
libvirt_disk.source_path = os.path.join(*disk[1])
return libvirt_disk
def mock_guest(uuid, disks):
guest = mock.create_autospec(libvirt_guest.Guest)
guest.uuid = uuid
guest.get_all_disks.return_value = map(fake_disk, disks)
return guest
local_dir = '/local'
mountpoint_a = '/mnt/a'
mountpoint_b = '/mnt/b'
self.mounted.add(mountpoint_a)
self.mounted.add(mountpoint_b)
guests = map(mock_guest, [uuids.instance_a, uuids.instance_b], [
# Local file root disk and a volume on each of mountpoints a and b
[
('file', (local_dir, uuids.instance_a, 'disk')),
('file', (mountpoint_a, 'vola1')),
('file', (mountpoint_b, 'volb1')),
],
# Local LVM root disk and a volume on each of mountpoints a and b
[
('block', ('/dev', 'vg', uuids.instance_b + '_disk')),
('file', (mountpoint_a, 'vola2')),
('file', (mountpoint_b, 'volb2')),
]
])
host = mock.create_autospec(libvirt_host.Host)
host.list_guests.return_value = guests
m = mount._HostMountState(host, 0)
self.assertEqual([mountpoint_a, mountpoint_b],
sorted(m.mountpoints.keys()))
self.assertSetEqual(set([('vola1', uuids.instance_a),
('vola2', uuids.instance_b)]),
m.mountpoints[mountpoint_a].attachments)
self.assertSetEqual(set([('volb1', uuids.instance_a),
('volb2', uuids.instance_b)]),
m.mountpoints[mountpoint_b].attachments)
@staticmethod
def _get_clean_hostmountstate():
# list_guests returns no guests: _HostMountState initial state is
# clean.
host = mock.create_autospec(libvirt_host.Host)
host.list_guests.return_value = []
return mount._HostMountState(host, 0)
def _sentinel_mount(self, m, vol, mountpoint=mock.sentinel.mountpoint,
instance=None):
if instance is None:
instance = mock.sentinel.instance
instance.uuid = uuids.instance
m.mount(mock.sentinel.fstype, mock.sentinel.export,
vol, mountpoint, instance,
[mock.sentinel.option1, mock.sentinel.option2])
def _sentinel_umount(self, m, vol, mountpoint=mock.sentinel.mountpoint,
instance=mock.sentinel.instance):
m.umount(vol, mountpoint, instance)
@staticmethod
def _expected_sentinel_mount_calls(mountpoint=mock.sentinel.mountpoint):
return [mock.call('mkdir', '-p', mountpoint),
mock.call('mount', '-t', mock.sentinel.fstype,
mock.sentinel.option1, mock.sentinel.option2,
mock.sentinel.export, mountpoint,
run_as_root=True)]
@staticmethod
def _expected_sentinel_umount_calls(mountpoint=mock.sentinel.mountpoint):
return [mock.call('umount', mountpoint,
attempts=3, delay_on_retry=True,
run_as_root=True),
mock.call('rmdir', mountpoint)]
def test_mount_umount(self):
# Mount 2 different volumes from the same export. Test that we only
# mount and umount once.
m = self._get_clean_hostmountstate()
# Mount vol_a from export
self._sentinel_mount(m, mock.sentinel.vol_a)
expected_calls = self._expected_sentinel_mount_calls()
mount.utils.execute.assert_has_calls(expected_calls)
# Mount vol_b from export. We shouldn't have mounted again
self._sentinel_mount(m, mock.sentinel.vol_b)
mount.utils.execute.assert_has_calls(expected_calls)
# Unmount vol_a. We shouldn't have unmounted
self._sentinel_umount(m, mock.sentinel.vol_a)
mount.utils.execute.assert_has_calls(expected_calls)
# Unmount vol_b. We should have umounted.
self._sentinel_umount(m, mock.sentinel.vol_b)
expected_calls.extend(self._expected_sentinel_umount_calls())
mount.utils.execute.assert_has_calls(expected_calls)
def test_mount_umount_multi_attach(self):
# Mount a volume from a single export for 2 different instances. Test
# that we only mount and umount once.
m = self._get_clean_hostmountstate()
instance_a = mock.sentinel.instance_a
instance_a.uuid = uuids.instance_a
instance_b = mock.sentinel.instance_b
instance_b.uuid = uuids.instance_b
# Mount vol_a for instance_a
self._sentinel_mount(m, mock.sentinel.vol_a, instance=instance_a)
expected_calls = self._expected_sentinel_mount_calls()
mount.utils.execute.assert_has_calls(expected_calls)
# Mount vol_a for instance_b. We shouldn't have mounted again
self._sentinel_mount(m, mock.sentinel.vol_a, instance=instance_b)
mount.utils.execute.assert_has_calls(expected_calls)
# Unmount vol_a for instance_a. We shouldn't have unmounted
self._sentinel_umount(m, mock.sentinel.vol_a, instance=instance_a)
mount.utils.execute.assert_has_calls(expected_calls)
# Unmount vol_a for instance_b. We should have umounted.
self._sentinel_umount(m, mock.sentinel.vol_a, instance=instance_b)
expected_calls.extend(self._expected_sentinel_umount_calls())
mount.utils.execute.assert_has_calls(expected_calls)
def test_mount_concurrent(self):
# This is 2 tests in 1, because the first test is the precondition
# for the second.
# The first test is that if 2 threads call mount simultaneously,
# only one of them will call mount
# The second test is that we correctly handle the case where we
# delete a lock after umount. During the umount of the first test,
# which will delete the lock when it completes, we start 2 more
# threads which both call mount. These threads are holding a lock
# which is about to be deleted. We test that they still don't race,
# and only one of them calls mount.
m = self._get_clean_hostmountstate()
def mount_a():
# Mount vol_a from export
self._sentinel_mount(m, mock.sentinel.vol_a)
TestThreadController.current().waitpoint('mounted')
self._sentinel_umount(m, mock.sentinel.vol_a)
def mount_b():
# Mount vol_b from export
self._sentinel_mount(m, mock.sentinel.vol_b)
self._sentinel_umount(m, mock.sentinel.vol_b)
def mount_c():
self._sentinel_mount(m, mock.sentinel.vol_c)
def mount_d():
self._sentinel_mount(m, mock.sentinel.vol_d)
ctl_a = TestThreadController(mount_a)
ctl_b = TestThreadController(mount_b)
ctl_c = TestThreadController(mount_c)
ctl_d = TestThreadController(mount_d)
orig_execute = mount.utils.execute.side_effect
def trap_mount_umount(cmd, *args, **kwargs):
# Conditionally wait at a waitpoint named after the command
# we're executing
ctl = TestThreadController.current()
ctl.waitpoint(cmd)
orig_execute(cmd, *args, **kwargs)
mount.utils.execute.side_effect = trap_mount_umount
expected_calls = []
# Run the first thread until it's blocked while calling mount
ctl_a.runto('mount')
expected_calls.extend(self._expected_sentinel_mount_calls())
self.assertEqual(expected_calls, mount.utils.execute.call_args_list)
# Start the second mount, and ensure it's got plenty of opportunity
# to race.
ctl_b.start()
time.sleep(0.01)
self.assertEqual(expected_calls, mount.utils.execute.call_args_list)
# Allow ctl_a to complete its mount
ctl_a.runto('mounted')
self.assertEqual(expected_calls, mount.utils.execute.call_args_list)
# Allow ctl_b to finish. We should not have done a umount
ctl_b.finish()
self.assertEqual(expected_calls, mount.utils.execute.call_args_list)
# Allow ctl_a to start umounting
ctl_a.runto('umount')
expected_calls.extend(self._expected_sentinel_umount_calls())
# We haven't executed rmdir yet, beause we've blocked during umount
rmdir = expected_calls.pop()
self.assertEqual(expected_calls, mount.utils.execute.call_args_list)
expected_calls.append(rmdir)
# While ctl_a is umounting, simultaneously start both ctl_c and
# ctl_d, and ensure they have an opportunity to race
ctl_c.start()
ctl_d.start()
time.sleep(0.01)
# Allow a, c, and d to complete
for ctl in (ctl_a, ctl_c, ctl_d):
ctl.finish()
# We should have completed the previous umount, then remounted
# exactly once
expected_calls.extend(self._expected_sentinel_mount_calls())
self.assertEqual(expected_calls, mount.utils.execute.call_args_list)
def test_mount_concurrent_no_interfere(self):
# Test that concurrent calls to mount volumes in different exports
# run concurrently
m = self._get_clean_hostmountstate()
def mount_a():
# Mount vol on mountpoint a
self._sentinel_mount(m, mock.sentinel.vol,
mock.sentinel.mountpoint_a)
TestThreadController.current().waitpoint('mounted')
self._sentinel_umount(m, mock.sentinel.vol,
mock.sentinel.mountpoint_a)
def mount_b():
# Mount vol on mountpoint b
self._sentinel_mount(m, mock.sentinel.vol,
mock.sentinel.mountpoint_b)
self._sentinel_umount(m, mock.sentinel.vol,
mock.sentinel.mountpoint_b)
ctl_a = TestThreadController(mount_a)
ctl_b = TestThreadController(mount_b)
expected_calls = []
ctl_a.runto('mounted')
expected_calls.extend(self._expected_sentinel_mount_calls(
mock.sentinel.mountpoint_a))
self.assertEqual(expected_calls, mount.utils.execute.call_args_list)
ctl_b.finish()
expected_calls.extend(self._expected_sentinel_mount_calls(
mock.sentinel.mountpoint_b))
expected_calls.extend(self._expected_sentinel_umount_calls(
mock.sentinel.mountpoint_b))
self.assertEqual(expected_calls, mount.utils.execute.call_args_list)
ctl_a.finish()
expected_calls.extend(self._expected_sentinel_umount_calls(
mock.sentinel.mountpoint_a))
self.assertEqual(expected_calls, mount.utils.execute.call_args_list)
def test_mount_after_failed_umount(self):
# Test that MountManager correctly tracks state when umount fails.
# Test that when umount fails a subsequent mount doesn't try to
# remount it.
# We've already got a fake execute (see setUp) which is ensuring mount,
# umount, and ismount work as expected. We don't want to mess with
# that, except that we want umount to raise an exception. We store the
# original here so we can call it if we're not unmounting, and so we
# can restore it when we no longer want the exception.
orig_execute = mount.utils.execute.side_effect
def raise_on_umount(cmd, *args, **kwargs):
if cmd == 'umount':
raise mount.processutils.ProcessExecutionError()
orig_execute(cmd, *args, **kwargs)
mount.utils.execute.side_effect = raise_on_umount
expected_calls = []
m = self._get_clean_hostmountstate()
# Mount vol_a
self._sentinel_mount(m, mock.sentinel.vol_a)
expected_calls.extend(self._expected_sentinel_mount_calls())
self.assertEqual(expected_calls, mount.utils.execute.call_args_list)
# Umount vol_a. The umount command will fail.
self._sentinel_umount(m, mock.sentinel.vol_a)
expected_calls.extend(self._expected_sentinel_umount_calls())
# We should not have called rmdir, because umount failed
expected_calls.pop()
self.assertEqual(expected_calls, mount.utils.execute.call_args_list)
# Mount vol_a again. We should not have called mount, because umount
# failed.
self._sentinel_mount(m, mock.sentinel.vol_a)
self.assertEqual(expected_calls, mount.utils.execute.call_args_list)
# Prevent future failure of umount
mount.utils.execute.side_effect = orig_execute
# Umount vol_a successfully
self._sentinel_umount(m, mock.sentinel.vol_a)
expected_calls.extend(self._expected_sentinel_umount_calls())
self.assertEqual(expected_calls, mount.utils.execute.call_args_list)
@mock.patch.object(mount.LOG, 'error')
def test_umount_log_failure(self, mock_LOG_error):
# Test that we log an error when umount fails
orig_execute = mount.utils.execute.side_effect
def raise_on_umount(cmd, *args, **kwargs):
if cmd == 'umount':
raise mount.processutils.ProcessExecutionError(
None, None, None, 'umount', 'umount: device is busy.')
orig_execute(cmd, *args, **kwargs)
mount.utils.execute.side_effect = raise_on_umount
m = self._get_clean_hostmountstate()
self._sentinel_mount(m, mock.sentinel.vol_a)
self._sentinel_umount(m, mock.sentinel.vol_a)
mock_LOG_error.assert_called()
class MountManagerTestCase(test.NoDBTestCase):
class FakeHostMountState(object):
def __init__(self, host, generation):
self.host = host
self.generation = generation
ctl = TestThreadController.current()
if ctl is not None:
ctl.waitpoint('init')
def setUp(self):
super(MountManagerTestCase, self).setUp()
self.useFixture(fixtures.MonkeyPatch(
'nova.virt.libvirt.volume.mount._HostMountState',
self.FakeHostMountState))
self.m = mount.get_manager()
self.m._reset_state()
def _get_state(self):
with self.m.get_state() as state:
return state
def test_host_up_down(self):
self.m.host_up(mock.sentinel.host)
state = self._get_state()
self.assertEqual(state.host, mock.sentinel.host)
self.assertEqual(state.generation, 0)
self.m.host_down()
self.assertRaises(exception.HypervisorUnavailable, self._get_state)
def test_host_up_waits_for_completion(self):
self.m.host_up(mock.sentinel.host)
def txn():
with self.m.get_state():
TestThreadController.current().waitpoint('running')
# Start a thread which blocks holding a state object
ctl = TestThreadController(txn)
ctl.runto('running')
# Host goes down
self.m.host_down()
# Call host_up in a separate thread because it will block, and give
# it plenty of time to race
host_up = eventlet.greenthread.spawn(self.m.host_up,
mock.sentinel.host)
time.sleep(0.01)
# Assert that we haven't instantiated a new state while there's an
# ongoing operation from the previous state
self.assertRaises(exception.HypervisorUnavailable, self._get_state)
# Allow the previous ongoing operation and host_up to complete
ctl.finish()
host_up.wait()
# Assert that we've got a new state generation
state = self._get_state()
self.assertEqual(1, state.generation)

View File

@ -12,12 +12,13 @@
import os
import fixtures
import mock
from oslo_concurrency import processutils
from nova.tests.unit.virt.libvirt.volume import test_volume
from nova.tests import uuidsentinel as uuids
from nova import utils
from nova.virt.libvirt import utils as libvirt_utils
from nova.virt.libvirt.volume import mount
from nova.virt.libvirt.volume import nfs
@ -26,11 +27,38 @@ class LibvirtNFSVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase):
def setUp(self):
super(LibvirtNFSVolumeDriverTestCase, self).setUp()
m = mount.get_manager()
m._reset_state()
self.mnt_base = '/mnt'
m.host_up(self.fake_host)
self.flags(nfs_mount_point_base=self.mnt_base, group='libvirt')
@mock.patch.object(libvirt_utils, 'is_mounted', return_value=False)
def test_libvirt_nfs_driver(self, mock_is_mounted):
# Caution: this is also faked by the superclass
orig_execute = utils.execute
mounted = [False]
def fake_execute(*cmd, **kwargs):
orig_execute(*cmd, **kwargs)
if cmd[0] == 'mount':
mounted[0] = True
if cmd[0] == 'umount':
mounted[0] = False
self.useFixture(fixtures.MonkeyPatch('nova.utils.execute',
fake_execute))
# Mock ismount to return the current mount state
# N.B. This is only valid for tests which mount and unmount a single
# directory.
self.useFixture(fixtures.MonkeyPatch('os.path.ismount',
lambda *args, **kwargs: mounted[0]))
def test_libvirt_nfs_driver(self):
libvirt_driver = nfs.LibvirtNFSVolumeDriver(self.fake_host)
export_string = '192.168.1.1:/nfs/share1'
@ -39,8 +67,10 @@ class LibvirtNFSVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase):
connection_info = {'data': {'export': export_string,
'name': self.name}}
instance = mock.sentinel.instance
instance.uuid = uuids.instance
libvirt_driver.connect_volume(connection_info, self.disk_info,
mock.sentinel.instance)
instance)
libvirt_driver.disconnect_volume(connection_info, "vde",
mock.sentinel.instance)
@ -50,39 +80,9 @@ class LibvirtNFSVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase):
expected_commands = [
('mkdir', '-p', export_mnt_base),
('mount', '-t', 'nfs', export_string, export_mnt_base),
('umount', export_mnt_base)]
('umount', export_mnt_base),
('rmdir', export_mnt_base)]
self.assertEqual(expected_commands, self.executes)
self.assertTrue(mock_is_mounted.called)
@mock.patch.object(nfs.utils, 'execute')
@mock.patch.object(nfs.LOG, 'debug')
@mock.patch.object(nfs.LOG, 'exception')
def test_libvirt_nfs_driver_umount_error(self, mock_LOG_exception,
mock_LOG_debug, mock_utils_exe):
export_string = '192.168.1.1:/nfs/share1'
connection_info = {'data': {'export': export_string,
'name': self.name}}
libvirt_driver = nfs.LibvirtNFSVolumeDriver(self.fake_host)
mock_utils_exe.side_effect = processutils.ProcessExecutionError(
None, None, None, 'umount', 'umount: device is busy.')
libvirt_driver.disconnect_volume(connection_info, "vde",
mock.sentinel.instance)
self.assertTrue(mock_LOG_debug.called)
mock_utils_exe.side_effect = processutils.ProcessExecutionError(
None, None, None, 'umount', 'umount: target is busy.')
libvirt_driver.disconnect_volume(connection_info, "vde",
mock.sentinel.instance)
self.assertTrue(mock_LOG_debug.called)
mock_utils_exe.side_effect = processutils.ProcessExecutionError(
None, None, None, 'umount', 'umount: not mounted.')
libvirt_driver.disconnect_volume(connection_info, "vde",
mock.sentinel.instance)
self.assertTrue(mock_LOG_debug.called)
mock_utils_exe.side_effect = processutils.ProcessExecutionError(
None, None, None, 'umount', 'umount: Other error.')
libvirt_driver.disconnect_volume(connection_info, "vde",
mock.sentinel.instance)
self.assertTrue(mock_LOG_exception.called)
def test_libvirt_nfs_driver_get_config(self):
libvirt_driver = nfs.LibvirtNFSVolumeDriver(self.fake_host)
@ -100,27 +100,7 @@ class LibvirtNFSVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase):
self.assertEqual('raw', tree.find('./driver').get('type'))
self.assertEqual('native', tree.find('./driver').get('io'))
@mock.patch.object(libvirt_utils, 'is_mounted', return_value=True)
def test_libvirt_nfs_driver_already_mounted(self, mock_is_mounted):
libvirt_driver = nfs.LibvirtNFSVolumeDriver(self.fake_host)
export_string = '192.168.1.1:/nfs/share1'
export_mnt_base = os.path.join(self.mnt_base,
utils.get_hash_str(export_string))
connection_info = {'data': {'export': export_string,
'name': self.name}}
libvirt_driver.connect_volume(connection_info, self.disk_info,
mock.sentinel.instance)
libvirt_driver.disconnect_volume(connection_info, "vde",
mock.sentinel.instance)
expected_commands = [
('umount', export_mnt_base)]
self.assertEqual(expected_commands, self.executes)
@mock.patch.object(libvirt_utils, 'is_mounted', return_value=False)
def test_libvirt_nfs_driver_with_opts(self, mock_is_mounted):
def test_libvirt_nfs_driver_with_opts(self):
libvirt_driver = nfs.LibvirtNFSVolumeDriver(self.fake_host)
export_string = '192.168.1.1:/nfs/share1'
options = '-o intr,nfsvers=3'
@ -130,8 +110,10 @@ class LibvirtNFSVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase):
connection_info = {'data': {'export': export_string,
'name': self.name,
'options': options}}
instance = mock.sentinel.instance
instance.uuid = uuids.instance
libvirt_driver.connect_volume(connection_info, self.disk_info,
mock.sentinel.instance)
instance)
libvirt_driver.disconnect_volume(connection_info, "vde",
mock.sentinel.instance)
@ -140,6 +122,6 @@ class LibvirtNFSVolumeDriverTestCase(test_volume.LibvirtVolumeBaseTestCase):
('mount', '-t', 'nfs', '-o', 'intr,nfsvers=3',
export_string, export_mnt_base),
('umount', export_mnt_base),
('rmdir', export_mnt_base)
]
self.assertEqual(expected_commands, self.executes)
self.assertTrue(mock_is_mounted.called)

View File

@ -108,6 +108,7 @@ from nova.virt.libvirt.storage import lvm
from nova.virt.libvirt.storage import rbd_utils
from nova.virt.libvirt import utils as libvirt_utils
from nova.virt.libvirt import vif as libvirt_vif
from nova.virt.libvirt.volume import mount
from nova.virt.libvirt.volume import remotefs
from nova.virt import netutils
from nova.volume import cinder
@ -3462,6 +3463,11 @@ class LibvirtDriver(driver.ComputeDriver):
'due to an unexpected exception.'), CONF.host,
exc_info=True)
if enabled:
mount.get_manager().host_up(self._host)
else:
mount.get_manager().host_down()
def _get_guest_cpu_model_config(self):
mode = CONF.libvirt.cpu_mode
model = CONF.libvirt.cpu_model

View File

@ -18,6 +18,7 @@ import os
import six
from nova import utils
from nova.virt.libvirt.volume import mount
from nova.virt.libvirt.volume import volume as libvirt_volume
@ -93,3 +94,40 @@ class LibvirtBaseFileSystemVolumeDriver(
"""
mount_path = self._get_mount_path(connection_info)
return os.path.join(mount_path, connection_info['data']['name'])
@six.add_metaclass(abc.ABCMeta)
class LibvirtMountedFileSystemVolumeDriver(LibvirtBaseFileSystemVolumeDriver):
# NOTE(mdbooth): Hopefully we'll get to the point where everything which
# previously subclassed LibvirtBaseFileSystemVolumeDriver now subclasses
# LibvirtMountedFileSystemVolumeDriver. If we get there, we should fold
# this class into the base class.
def __init__(self, host, fstype):
super(LibvirtMountedFileSystemVolumeDriver, self).__init__(host)
self.fstype = fstype
def connect_volume(self, connection_info, disk_info, instance):
"""Connect the volume."""
export = connection_info['data']['export']
vol_name = connection_info['data']['name']
mountpoint = self._get_mount_path(connection_info)
mount.mount(self.fstype, export, vol_name, mountpoint, instance,
self._mount_options(connection_info))
connection_info['data']['device_path'] = \
self._get_device_path(connection_info)
def disconnect_volume(self, connection_info, disk_dev, instance):
"""Disconnect the volume."""
vol_name = connection_info['data']['name']
mountpoint = self._get_mount_path(connection_info)
mount.umount(vol_name, mountpoint, instance)
@abc.abstractmethod
def _mount_options(self, connection_info):
"""Return a list of additional arguments to pass to the mount command.
"""
pass

View File

@ -0,0 +1,428 @@
# Copyright 2016,2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import contextlib
import os.path
import threading
from oslo_concurrency import processutils
from oslo_log import log
import six
import nova.conf
from nova import exception
from nova.i18n import _LE, _LW
from nova import utils
CONF = nova.conf.CONF
LOG = log.getLogger(__name__)
class _HostMountStateManager(object):
"""A global manager of filesystem mounts.
_HostMountStateManager manages a _HostMountState object for the current
compute node. Primarily it creates one on host_up(), destroys it on
host_down(), and returns it via get_state().
_HostMountStateManager manages concurrency itself. Independent callers do
not need to consider interactions between multiple _HostMountStateManager
calls when designing their own locking.
_HostMountStateManager is a singleton, and must only be accessed via:
mount.get_manager()
"""
def __init__(self):
self._reset_state()
def _reset_state(self):
"""Reset state of global _HostMountStateManager.
Should only be called by __init__ and tests.
"""
self.state = None
self.use_count = 0
# Guards both state and use_count
self.cond = threading.Condition()
# Incremented each time we initialise a new mount state. Aids
# debugging.
self.generation = 0
@contextlib.contextmanager
def get_state(self):
"""Return the current mount state.
_HostMountStateManager will not permit a new state object to be
created while any previous state object is still in use.
get_state will raise HypervisorUnavailable if the libvirt connection is
currently down.
:rtype: _HostMountState
"""
# We hold the instance lock here so that if a _HostMountState is
# currently initialising we'll wait for it to complete rather than
# fail.
with self.cond:
state = self.state
if state is None:
raise exception.HypervisorUnavailable(host=CONF.host)
self.use_count += 1
try:
LOG.debug('Got _HostMountState generation %(gen)i',
{'gen': state.generation})
yield state
finally:
with self.cond:
self.use_count -= 1
self.cond.notify_all()
def host_up(self, host):
"""Inialise a new _HostMountState when the libvirt connection comes
up.
host_up will destroy and re-initialise the current state if one
already exists, but this is considered an error.
host_up will block before creating a new state until all operations
using a previous state have completed.
:param host: A connected libvirt Host object
"""
with self.cond:
if self.state is not None:
LOG.warning(_LW("host_up called, but we think host is "
"already up"))
self._host_down()
# Wait until all operations using a previous state generation are
# complete before initialising a new one. Note that self.state is
# already None, set either by initialisation or by host_down. This
# means the current state will not be returned to any new callers,
# and use_count will eventually reach zero.
# We do this to avoid a race between _HostMountState initialisation
# and an on-going mount/unmount operation
while self.use_count != 0:
self.cond.wait()
# Another thread might have initialised state while we were
# wait()ing
if self.state is None:
LOG.debug('Initialising _HostMountState generation %(gen)i',
{'gen': self.generation})
self.state = _HostMountState(host, self.generation)
self.generation += 1
def host_down(self):
"""Destroy the current _HostMountState when the libvirt connection
goes down.
"""
with self.cond:
if self.state is None:
LOG.warning(_LW("host_down called, but we don't think host "
"is up"))
return
self._host_down()
def _host_down(self):
LOG.debug('Destroying MountManager generation %(gen)i',
{'gen': self.state.generation})
self.state = None
class _HostMountState(object):
"""A data structure recording all managed mountpoints and the
attachments in use for each one. _HostMountState ensures that the compute
node only attempts to mount a single mountpoint in use by multiple
attachments once, and that it is not unmounted until it is no longer in use
by any attachments.
Callers should not create a _HostMountState directly, but should obtain
it via:
with mount.get_manager().get_state() as state:
state.mount(...)
On creation _HostMountState inspects the compute host directly to discover
all current mountpoints and the attachments on them. After creation it
expects to have exclusive control of these mountpoints until it is
destroyed.
_HostMountState manages concurrency itself. Independent callers do not need
to consider interactions between multiple _HostMountState calls when
designing their own locking.
"""
class _MountPoint(object):
"""A single mountpoint, and the set of attachments in use on it."""
def __init__(self):
# A guard for operations on this mountpoint
# N.B. Care is required using this lock, as it will be deleted
# if the containing _MountPoint is deleted.
self.lock = threading.Lock()
# The set of attachments on this mountpoint.
self.attachments = set()
def add_attachment(self, vol_name, instance_uuid):
self.attachments.add((vol_name, instance_uuid))
def remove_attachment(self, vol_name, instance_uuid):
self.attachments.remove((vol_name, instance_uuid))
def in_use(self):
return len(self.attachments) > 0
def __init__(self, host, generation):
"""Initialise a _HostMountState by inspecting the current compute
host for mountpoints and the attachments in use on them.
:param host: A connected libvirt Host object
:param generation: An integer indicating the generation of this
_HostMountState object. This is 0 for the first
_HostMountState created, and incremented for each
created subsequently. It is used in log messages to
aid debugging.
"""
self.generation = generation
self.mountpoints = collections.defaultdict(self._MountPoint)
# Iterate over all guests on the connected libvirt
for guest in host.list_guests(only_running=False):
for disk in guest.get_all_disks():
# All remote filesystem volumes are files
if disk.source_type != 'file':
continue
# NOTE(mdbooth): We're assuming that the mountpoint is our
# immediate parent, which is currently true for all
# volume drivers. We deliberately don't do anything clever
# here, because we don't want to, e.g.:
# * Add mountpoints for non-volume disks
# * Get it wrong when a non-running domain references a
# volume which isn't mounted because the host just rebooted.
# and this is good enough. We could probably do better here
# with more thought.
mountpoint = os.path.dirname(disk.source_path)
if not os.path.ismount(mountpoint):
continue
name = os.path.basename(disk.source_path)
mount = self.mountpoints[mountpoint]
mount.add_attachment(name, guest.uuid)
LOG.debug('Discovered volume %(vol)s in use for existing '
'mountpoint %(mountpoint)s',
{'vol': name, 'mountpoint': mountpoint})
@contextlib.contextmanager
def _get_locked(self, mountpoint):
"""Get a locked mountpoint object
:param mountpoint: The path of the mountpoint whose object we should
return.
:rtype: _HostMountState._MountPoint
"""
# This dance is because we delete locks. We need to be sure that the
# lock we hold does not belong to an object which has been deleted.
# We do this by checking that mountpoint still refers to this object
# when we hold the lock. This is safe because:
# * we only delete an object from mountpounts whilst holding its lock
# * mountpoints is a defaultdict which will atomically create a new
# object on access
while True:
mount = self.mountpoints[mountpoint]
with mount.lock:
if self.mountpoints[mountpoint] is mount:
yield mount
break
def mount(self, fstype, export, vol_name, mountpoint, instance, options):
"""Ensure a mountpoint is available for an attachment, mounting it
if necessary.
If this is the first attachment on this mountpoint, we will mount it
with:
mount -t <fstype> <options> <export> <mountpoint>
:param fstype: The filesystem type to be passed to mount command.
:param export: The type-specific identifier of the filesystem to be
mounted. e.g. for nfs 'host.example.com:/mountpoint'.
:param vol_name: The name of the volume on the remote filesystem.
:param mountpoint: The directory where the filesystem will be
mounted on the local compute host.
:param instance: The instance the volume will be attached to.
:param options: An arbitrary list of additional arguments to be
passed to the mount command immediate before export
and mountpoint.
"""
# NOTE(mdbooth): mount() may currently be called multiple times for a
# single attachment. Any operation which calls
# LibvirtDriver._hard_reboot will re-attach volumes which are probably
# already attached, resulting in multiple mount calls.
LOG.debug('_HostMountState.mount(fstype=%(fstype)s, '
'export=%(export)s, vol_name=%(vol_name)s, %(mountpoint)s, '
'options=%(options)s) generation %(gen)s',
{'fstype': fstype, 'export': export, 'vol_name': vol_name,
'mountpoint': mountpoint, 'options': options,
'gen': self.generation})
with self._get_locked(mountpoint) as mount:
if not os.path.ismount(mountpoint):
LOG.debug('Mounting %(mountpoint)s generation %(gen)s',
{'mountpoint': mountpoint, 'gen': self.generation})
utils.execute('mkdir', '-p', mountpoint)
mount_cmd = ['mount', '-t', fstype]
if options is not None:
mount_cmd.extend(options)
mount_cmd.extend([export, mountpoint])
try:
utils.execute(*mount_cmd, run_as_root=True)
except Exception:
# Check to see if mountpoint is mounted despite the error
# eg it was already mounted
if os.path.ismount(mountpoint):
# We're not going to raise the exception because we're
# in the desired state anyway. However, this is still
# unusual so we'll log it.
LOG.exception(_LE('Error mounting %(fstype)s export '
'%(export)s on %(mountpoint)s. '
'Continuing because mountpount is '
'mounted despite this.'),
{'fstype': fstype, 'export': export,
'mountpoint': mountpoint})
else:
# If the mount failed there's no reason for us to keep
# a record of it. It will be created again if the
# caller retries.
# Delete while holding lock
del self.mountpoints[mountpoint]
raise
mount.add_attachment(vol_name, instance.uuid)
LOG.debug('_HostMountState.mount() for %(mountpoint)s '
'generation %(gen)s completed successfully',
{'mountpoint': mountpoint, 'gen': self.generation})
def umount(self, vol_name, mountpoint, instance):
"""Mark an attachment as no longer in use, and unmount its mountpoint
if necessary.
:param vol_name: The name of the volume on the remote filesystem.
:param mountpoint: The directory where the filesystem is be
mounted on the local compute host.
:param instance: The instance the volume was attached to.
"""
LOG.debug('_HostMountState.umount(vol_name=%(vol_name)s, '
'mountpoint=%(mountpoint)s) generation %(gen)s',
{'vol_name': vol_name, 'mountpoint': mountpoint,
'gen': self.generation})
with self._get_locked(mountpoint) as mount:
try:
mount.remove_attachment(vol_name, instance.uuid)
except KeyError:
LOG.warning(_LW("Request to remove attachment "
"(%(vol_name)s, %(instance)s) from "
"%(mountpoint)s, but we don't think it's in "
"use."),
{'vol_name': vol_name, 'instance': instance.uuid,
'mountpoint': mountpoint})
if not mount.in_use():
mounted = os.path.ismount(mountpoint)
if mounted:
mounted = self._real_umount(mountpoint)
# Delete our record entirely if it's unmounted
if not mounted:
del self.mountpoints[mountpoint]
LOG.debug('_HostMountState.umount() for %(mountpoint)s '
'generation %(gen)s completed successfully',
{'mountpoint': mountpoint, 'gen': self.generation})
def _real_umount(self, mountpoint):
# Unmount and delete a mountpoint.
# Return mount state after umount (i.e. True means still mounted)
LOG.debug('Unmounting %(mountpoint)s generation %(gen)s',
{'mountpoint': mountpoint, 'gen': self.generation})
try:
utils.execute('umount', mountpoint, run_as_root=True,
attempts=3, delay_on_retry=True)
except processutils.ProcessExecutionError as ex:
LOG.error(_LE("Couldn't unmount %(mountpoint)s: %(reason)s"),
{'mountpoint': mountpoint, 'reason': six.text_type(ex)})
if not os.path.ismount(mountpoint):
try:
utils.execute('rmdir', mountpoint)
except processutils.ProcessExecutionError as ex:
LOG.error(_LE("Couldn't remove directory %(mountpoint)s: "
"%(reason)s"),
{'mountpoint': mountpoint,
'reason': six.text_type(ex)})
return False
return True
__manager__ = _HostMountStateManager()
def get_manager():
"""Return the _HostMountStateManager singleton.
:rtype: _HostMountStateManager
"""
return __manager__
def mount(fstype, export, vol_name, mountpoint, instance, options=None):
"""A convenience wrapper around _HostMountState.mount(), called via the
_HostMountStateManager singleton.
"""
with __manager__.get_state() as mount_state:
mount_state.mount(fstype, export, vol_name, mountpoint, instance,
options)
def umount(vol_name, mountpoint, instance):
"""A convenience wrapper around _HostMountState.umount(), called via the
_HostMountStateManager singleton.
"""
with __manager__.get_state() as mount_state:
mount_state.umount(vol_name, mountpoint, instance)

View File

@ -11,14 +11,9 @@
# under the License.
from oslo_concurrency import processutils
from oslo_log import log as logging
import six
import nova.conf
from nova.i18n import _LE, _LW
from nova import utils
from nova.virt.libvirt import utils as libvirt_utils
from nova.virt.libvirt.volume import fs
LOG = logging.getLogger(__name__)
@ -26,9 +21,12 @@ LOG = logging.getLogger(__name__)
CONF = nova.conf.CONF
class LibvirtNFSVolumeDriver(fs.LibvirtBaseFileSystemVolumeDriver):
class LibvirtNFSVolumeDriver(fs.LibvirtMountedFileSystemVolumeDriver):
"""Class implements libvirt part of volume driver for NFS."""
def __init__(self, connection):
super(LibvirtNFSVolumeDriver, self).__init__(connection, 'nfs')
def _get_mount_point_base(self):
return CONF.libvirt.nfs_mount_point_base
@ -43,57 +41,13 @@ class LibvirtNFSVolumeDriver(fs.LibvirtBaseFileSystemVolumeDriver):
conf.driver_io = "native"
return conf
def connect_volume(self, connection_info, disk_info, instance):
"""Connect the volume."""
self._ensure_mounted(connection_info)
connection_info['data']['device_path'] = \
self._get_device_path(connection_info)
def disconnect_volume(self, connection_info, disk_dev, instance):
"""Disconnect the volume."""
mount_path = self._get_mount_path(connection_info)
try:
utils.execute('umount', mount_path, run_as_root=True)
except processutils.ProcessExecutionError as exc:
export = connection_info['data']['export']
if ('device is busy' in six.text_type(exc) or
'target is busy' in six.text_type(exc)):
LOG.debug("The NFS share %s is still in use.", export)
elif ('not mounted' in six.text_type(exc)):
LOG.debug("The NFS share %s has already been unmounted.",
export)
else:
LOG.exception(_LE("Couldn't unmount the NFS share %s"), export)
def _ensure_mounted(self, connection_info):
"""@type connection_info: dict
"""
nfs_export = connection_info['data']['export']
mount_path = self._get_mount_path(connection_info)
if not libvirt_utils.is_mounted(mount_path, nfs_export):
options = connection_info['data'].get('options')
self._mount_nfs(mount_path, nfs_export, options, ensure=True)
return mount_path
def _mount_nfs(self, mount_path, nfs_share, options=None, ensure=False):
"""Mount nfs export to mount path."""
utils.execute('mkdir', '-p', mount_path)
# Construct the NFS mount command.
nfs_cmd = ['mount', '-t', 'nfs']
def _mount_options(self, connection_info):
options = []
if CONF.libvirt.nfs_mount_options is not None:
nfs_cmd.extend(['-o', CONF.libvirt.nfs_mount_options])
if options:
nfs_cmd.extend(options.split(' '))
nfs_cmd.extend([nfs_share, mount_path])
options.extend(['-o', CONF.libvirt.nfs_mount_options])
try:
utils.execute(*nfs_cmd, run_as_root=True)
except processutils.ProcessExecutionError as exc:
if ensure and 'already mounted' in six.text_type(exc):
LOG.warning(_LW("%s is already mounted"), nfs_share)
else:
raise
conn_options = connection_info['data'].get('options')
if conn_options:
options.extend(conn_options.split(' '))
return options