libvirt: Kill rsync/scp processes before deleting instance
In the resize operation, during copying files from source to destination compute node scp/rsync processes are not aborted after the instance is deleted because linux kernel doesn't delete instance files physically until all processes using the file handle is closed completely. Hence rsync/scp process keeps on running until it transfers 100% of file data. Added new module instancejobtracker to libvirt driver which will add, remove or terminate the processes running against particular instances. Added callback methods to execute call which will store the pid of scp/rsync process in cache as a key: value pair and to remove the pid from the cache after process completion. Process id will be used to kill the process if it is running while deleting the instance. Instance uuid is used as a key in the cache and pid will be the value. Conflicts: nova/tests/unit/virt/libvirt/test_driver.py nova/tests/unit/virt/libvirt/test_utils.py nova/virt/libvirt/driver.py nova/virt/libvirt/utils.py Note: The required unit-tests are manually added to the below path, as new path for unit-tests is not present in stable/juno release. nova/tests/virt/libvirt/test_driver.py nova/tests/virt/libvirt/test_utils.py SecurityImpact Closes-bug: #1387543 Change-Id: Ie03acc00a7c904aec13c90ae6a53938d08e5e0c9 (cherry picked from commit7ab75d5b0b
) (cherry picked from commitb5020a047f
)
This commit is contained in:
parent
7bce0d2db1
commit
539693e403
|
@ -23,6 +23,7 @@ import os
|
|||
import random
|
||||
import re
|
||||
import shutil
|
||||
import signal
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
|
@ -9188,6 +9189,17 @@ Active: 8381604 kB
|
|||
self.mox.ReplayAll()
|
||||
self.assertTrue(conn._is_storage_shared_with('foo', '/path'))
|
||||
|
||||
def test_store_pid_remove_pid(self):
|
||||
params = self.test_instance
|
||||
params['pci_devices'] = objects.PciDeviceList()
|
||||
instance = objects.Instance(**params)
|
||||
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
|
||||
popen = mock.Mock(pid=3)
|
||||
drvr.job_tracker.add_job(instance, popen.pid)
|
||||
self.assertIn(3, drvr.job_tracker.jobs[instance.uuid])
|
||||
drvr.job_tracker.remove_job(instance, popen.pid)
|
||||
self.assertNotIn(instance.uuid, drvr.job_tracker.jobs)
|
||||
|
||||
@mock.patch('nova.virt.libvirt.driver.LibvirtDriver._lookup_by_name')
|
||||
def test_get_domain_info_with_more_return(self, lookup_mock):
|
||||
instance = fake_instance.fake_instance_obj(mock.sentinel.ctx)
|
||||
|
@ -11592,12 +11604,18 @@ class LibvirtDriverTestCase(test.TestCase):
|
|||
def fake_execute(*args, **kwargs):
|
||||
pass
|
||||
|
||||
def fake_copy_image(src, dest, host=None, receive=False,
|
||||
on_execute=None, on_completion=None):
|
||||
self.assertIsNotNone(on_execute)
|
||||
self.assertIsNotNone(on_completion)
|
||||
|
||||
self.stubs.Set(self.libvirtconnection, 'get_instance_disk_info',
|
||||
fake_get_instance_disk_info)
|
||||
self.stubs.Set(self.libvirtconnection, '_destroy', fake_destroy)
|
||||
self.stubs.Set(self.libvirtconnection, 'get_host_ip_addr',
|
||||
fake_get_host_ip_addr)
|
||||
self.stubs.Set(utils, 'execute', fake_execute)
|
||||
self.stubs.Set(libvirt_utils, 'copy_image', fake_copy_image)
|
||||
|
||||
ins_ref = self._create_instance()
|
||||
flavor = {'root_gb': 10, 'ephemeral_gb': 20}
|
||||
|
@ -12531,6 +12549,29 @@ class LibvirtDriverTestCase(test.TestCase):
|
|||
shutil.assert_called_with('/path_del')
|
||||
self.assertTrue(result)
|
||||
|
||||
@mock.patch('shutil.rmtree')
|
||||
@mock.patch('nova.utils.execute')
|
||||
@mock.patch('os.path.exists')
|
||||
@mock.patch('os.kill')
|
||||
@mock.patch('nova.virt.libvirt.utils.get_instance_path')
|
||||
def test_delete_instance_files_kill_running(
|
||||
self, get_instance_path, kill, exists, exe, shutil):
|
||||
lv = self.libvirtconnection
|
||||
get_instance_path.return_value = '/path'
|
||||
instance = objects.Instance(uuid='fake-uuid', id=1)
|
||||
lv.job_tracker.jobs[instance.uuid] = [3, 4]
|
||||
|
||||
exists.side_effect = [False, False, True, False]
|
||||
|
||||
result = lv.delete_instance_files(instance)
|
||||
get_instance_path.assert_called_with(instance)
|
||||
exe.assert_called_with('mv', '/path', '/path_del')
|
||||
kill.assert_has_calls([mock.call(3, signal.SIGKILL), mock.call(3, 0),
|
||||
mock.call(4, signal.SIGKILL), mock.call(4, 0)])
|
||||
shutil.assert_called_with('/path_del')
|
||||
self.assertTrue(result)
|
||||
self.assertNotIn(instance.uuid, lv.job_tracker.jobs)
|
||||
|
||||
@mock.patch('shutil.rmtree')
|
||||
@mock.patch('nova.utils.execute')
|
||||
@mock.patch('os.path.exists')
|
||||
|
|
|
@ -53,7 +53,8 @@ blah BLAH: bb
|
|||
mock_execute.assert_called_once_with('cp', 'src', 'dest')
|
||||
|
||||
_rsync_call = functools.partial(mock.call,
|
||||
'rsync', '--sparse', '--compress')
|
||||
'rsync', '--sparse', '--compress',
|
||||
on_execute=None, on_completion=None)
|
||||
|
||||
@mock.patch('nova.utils.execute')
|
||||
def test_copy_image_rsync(self, mock_execute):
|
||||
|
@ -76,7 +77,8 @@ blah BLAH: bb
|
|||
|
||||
mock_execute.assert_has_calls([
|
||||
self._rsync_call('--dry-run', 'src', 'host:dest'),
|
||||
mock.call('scp', 'src', 'host:dest'),
|
||||
mock.call('scp', 'src', 'host:dest',
|
||||
on_execute=None, on_completion=None),
|
||||
])
|
||||
self.assertEqual(2, mock_execute.call_count)
|
||||
|
||||
|
|
|
@ -100,6 +100,7 @@ from nova.virt.libvirt import dmcrypt
|
|||
from nova.virt.libvirt import firewall as libvirt_firewall
|
||||
from nova.virt.libvirt import imagebackend
|
||||
from nova.virt.libvirt import imagecache
|
||||
from nova.virt.libvirt import instancejobtracker
|
||||
from nova.virt.libvirt import lvm
|
||||
from nova.virt.libvirt import rbd_utils
|
||||
from nova.virt.libvirt import utils as libvirt_utils
|
||||
|
@ -459,6 +460,8 @@ class LibvirtDriver(driver.ComputeDriver):
|
|||
'expect': ', '.join("'%s'" % k for k in
|
||||
sysinfo_serial_funcs.keys())})
|
||||
|
||||
self.job_tracker = instancejobtracker.InstanceJobTracker()
|
||||
|
||||
@property
|
||||
def disk_cachemode(self):
|
||||
if self._disk_cachemode is None:
|
||||
|
@ -5967,6 +5970,11 @@ class LibvirtDriver(driver.ComputeDriver):
|
|||
# finish_migration/_create_image to re-create it for us.
|
||||
continue
|
||||
|
||||
on_execute = lambda process: self.job_tracker.add_job(
|
||||
instance, process.pid)
|
||||
on_completion = lambda process: self.job_tracker.remove_job(
|
||||
instance, process.pid)
|
||||
|
||||
if info['type'] == 'qcow2' and info['backing_file']:
|
||||
tmp_path = from_path + "_rbase"
|
||||
# merge backing file
|
||||
|
@ -5976,11 +5984,15 @@ class LibvirtDriver(driver.ComputeDriver):
|
|||
if shared_storage:
|
||||
utils.execute('mv', tmp_path, img_path)
|
||||
else:
|
||||
libvirt_utils.copy_image(tmp_path, img_path, host=dest)
|
||||
libvirt_utils.copy_image(tmp_path, img_path, host=dest,
|
||||
on_execute=on_execute,
|
||||
on_completion=on_completion)
|
||||
utils.execute('rm', '-f', tmp_path)
|
||||
|
||||
else: # raw or qcow2 with no backing file
|
||||
libvirt_utils.copy_image(from_path, img_path, host=dest)
|
||||
libvirt_utils.copy_image(from_path, img_path, host=dest,
|
||||
on_execute=on_execute,
|
||||
on_completion=on_completion)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
self._cleanup_remote_migration(dest, inst_base,
|
||||
|
@ -6349,6 +6361,8 @@ class LibvirtDriver(driver.ComputeDriver):
|
|||
# invocation failed due to the absence of both target and
|
||||
# target_resize.
|
||||
if not remaining_path and os.path.exists(target_del):
|
||||
self.job_tracker.terminate_jobs(instance)
|
||||
|
||||
LOG.info(_LI('Deleting instance files %s'), target_del,
|
||||
instance=instance)
|
||||
remaining_path = target_del
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
# Copyright 2015 NTT corp.
|
||||
# All Rights Reserved.
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
import collections
|
||||
import errno
|
||||
import os
|
||||
import signal
|
||||
|
||||
from nova.i18n import _LE
|
||||
from nova.i18n import _LW
|
||||
from nova.openstack.common import log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class InstanceJobTracker(object):
|
||||
def __init__(self):
|
||||
self.jobs = collections.defaultdict(list)
|
||||
|
||||
def add_job(self, instance, pid):
|
||||
"""Appends process_id of instance to cache.
|
||||
|
||||
This method will store the pid of a process in cache as
|
||||
a key: value pair which will be used to kill the process if it
|
||||
is running while deleting the instance. Instance uuid is used as
|
||||
a key in the cache and pid will be the value.
|
||||
|
||||
:param instance: Object of instance
|
||||
:param pid: Id of the process
|
||||
"""
|
||||
self.jobs[instance.uuid].append(pid)
|
||||
|
||||
def remove_job(self, instance, pid):
|
||||
"""Removes pid of process from cache.
|
||||
|
||||
This method will remove the pid of a process from the cache.
|
||||
|
||||
:param instance: Object of instance
|
||||
:param pid: Id of the process
|
||||
"""
|
||||
uuid = instance.uuid
|
||||
if uuid in self.jobs and pid in self.jobs[uuid]:
|
||||
self.jobs[uuid].remove(pid)
|
||||
|
||||
# remove instance.uuid if no pid's remaining
|
||||
if not self.jobs[uuid]:
|
||||
self.jobs.pop(uuid, None)
|
||||
|
||||
def terminate_jobs(self, instance):
|
||||
"""Kills the running processes for given instance.
|
||||
|
||||
This method is used to kill all running processes of the instance if
|
||||
it is deleted in between.
|
||||
|
||||
:param instance: Object of instance
|
||||
"""
|
||||
pids_to_remove = list(self.jobs.get(instance.uuid, []))
|
||||
for pid in pids_to_remove:
|
||||
try:
|
||||
# Try to kill the process
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
except OSError as exc:
|
||||
if exc.errno != errno.ESRCH:
|
||||
LOG.error(_LE('Failed to kill process %(pid)s '
|
||||
'due to %(reason)s, while deleting the '
|
||||
'instance.'), {'pid': pid, 'reason': exc},
|
||||
instance=instance)
|
||||
|
||||
try:
|
||||
# Check if the process is still alive.
|
||||
os.kill(pid, 0)
|
||||
except OSError as exc:
|
||||
if exc.errno != errno.ESRCH:
|
||||
LOG.error(_LE('Unexpected error while checking process '
|
||||
'%(pid)s.'), {'pid': pid},
|
||||
instance=instance)
|
||||
else:
|
||||
# The process is still around
|
||||
LOG.warn(_LW("Failed to kill a long running process "
|
||||
"%(pid)s related to the instance when "
|
||||
"deleting it."), {'pid': pid},
|
||||
instance=instance)
|
||||
|
||||
self.remove_job(instance, pid)
|
|
@ -259,12 +259,15 @@ def get_disk_backing_file(path, basename=True):
|
|||
return backing_file
|
||||
|
||||
|
||||
def copy_image(src, dest, host=None):
|
||||
def copy_image(src, dest, host=None, on_execute=None,
|
||||
on_completion=None):
|
||||
"""Copy a disk image to an existing directory
|
||||
|
||||
:param src: Source image
|
||||
:param dest: Destination path
|
||||
:param host: Remote host
|
||||
:param on_execute: Callback method to store pid of process in cache
|
||||
:param on_completion: Callback method to remove pid of process from cache
|
||||
"""
|
||||
|
||||
if not host:
|
||||
|
@ -283,11 +286,14 @@ def copy_image(src, dest, host=None):
|
|||
# Do a relatively light weight test first, so that we
|
||||
# can fall back to scp, without having run out of space
|
||||
# on the destination for example.
|
||||
execute('rsync', '--sparse', '--compress', '--dry-run', src, dest)
|
||||
execute('rsync', '--sparse', '--compress', '--dry-run', src, dest,
|
||||
on_execute=on_execute, on_completion=on_completion)
|
||||
except processutils.ProcessExecutionError:
|
||||
execute('scp', src, dest)
|
||||
execute('scp', src, dest, on_execute=on_execute,
|
||||
on_completion=on_completion)
|
||||
else:
|
||||
execute('rsync', '--sparse', '--compress', src, dest)
|
||||
execute('rsync', '--sparse', '--compress', src, dest,
|
||||
on_execute=on_execute, on_completion=on_completion)
|
||||
|
||||
|
||||
def write_to_file(path, contents, umask=None):
|
||||
|
|
Loading…
Reference in New Issue