libvirt: Kill rsync/scp processes before deleting instance

In the resize operation, during copying files from source to
destination compute node scp/rsync processes are not aborted after
the instance is deleted because linux kernel doesn't delete instance
files physically until all processes using the file handle is closed
completely. Hence rsync/scp process keeps on running until it
transfers 100% of file data.

Added new module instancejobtracker to libvirt driver which will add,
remove or terminate the processes running against particular instances.
Added callback methods to execute call which will store the pid of
scp/rsync process in cache as a key: value pair and to remove the
pid from the cache after process completion. Process id will be used to
kill the process if it is running while deleting the instance. Instance
uuid is used as a key in the cache and pid will be the value.

Conflicts:
        nova/virt/libvirt/driver.py

SecurityImpact

Closes-bug: #1387543
Change-Id: Ie03acc00a7c904aec13c90ae6a53938d08e5e0c9
(cherry picked from commit 7ab75d5b0b)
This commit is contained in:
abhishekkekane 2015-07-06 01:51:26 -07:00
parent 90e1eacee8
commit b5020a047f
5 changed files with 168 additions and 9 deletions

View File

@ -23,6 +23,7 @@ import os
import random
import re
import shutil
import signal
import threading
import time
import uuid
@ -9817,6 +9818,15 @@ Active: 8381604 kB
self.mox.ReplayAll()
self.assertTrue(drvr._is_storage_shared_with('foo', '/path'))
def test_store_pid_remove_pid(self):
instance = objects.Instance(**self.test_instance)
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
popen = mock.Mock(pid=3)
drvr.job_tracker.add_job(instance, popen.pid)
self.assertIn(3, drvr.job_tracker.jobs[instance.uuid])
drvr.job_tracker.remove_job(instance, popen.pid)
self.assertNotIn(instance.uuid, drvr.job_tracker.jobs)
@mock.patch('nova.virt.libvirt.host.Host.get_domain')
def test_get_domain_info_with_more_return(self, mock_get_domain):
instance = objects.Instance(**self.test_instance)
@ -11316,12 +11326,18 @@ class LibvirtDriverTestCase(test.NoDBTestCase):
def fake_execute(*args, **kwargs):
pass
def fake_copy_image(src, dest, host=None, receive=False,
on_execute=None, on_completion=None):
self.assertIsNotNone(on_execute)
self.assertIsNotNone(on_completion)
self.stubs.Set(self.drvr, 'get_instance_disk_info',
fake_get_instance_disk_info)
self.stubs.Set(self.drvr, '_destroy', fake_destroy)
self.stubs.Set(self.drvr, 'get_host_ip_addr',
fake_get_host_ip_addr)
self.stubs.Set(utils, 'execute', fake_execute)
self.stubs.Set(libvirt_utils, 'copy_image', fake_copy_image)
ins_ref = self._create_instance(params=params_for_instance)
@ -12425,6 +12441,28 @@ class LibvirtDriverTestCase(test.NoDBTestCase):
shutil.assert_called_with('/path_del')
self.assertTrue(result)
@mock.patch('shutil.rmtree')
@mock.patch('nova.utils.execute')
@mock.patch('os.path.exists')
@mock.patch('os.kill')
@mock.patch('nova.virt.libvirt.utils.get_instance_path')
def test_delete_instance_files_kill_running(
self, get_instance_path, kill, exists, exe, shutil):
get_instance_path.return_value = '/path'
instance = objects.Instance(uuid='fake-uuid', id=1)
self.drvr.job_tracker.jobs[instance.uuid] = [3, 4]
exists.side_effect = [False, False, True, False]
result = self.drvr.delete_instance_files(instance)
get_instance_path.assert_called_with(instance)
exe.assert_called_with('mv', '/path', '/path_del')
kill.assert_has_calls([mock.call(3, signal.SIGKILL), mock.call(3, 0),
mock.call(4, signal.SIGKILL), mock.call(4, 0)])
shutil.assert_called_with('/path_del')
self.assertTrue(result)
self.assertNotIn(instance.uuid, self.drvr.job_tracker.jobs)
@mock.patch('shutil.rmtree')
@mock.patch('nova.utils.execute')
@mock.patch('os.path.exists')

View File

@ -62,7 +62,8 @@ blah BLAH: bb
mock_execute.assert_called_once_with('cp', 'src', 'dest')
_rsync_call = functools.partial(mock.call,
'rsync', '--sparse', '--compress')
'rsync', '--sparse', '--compress',
on_execute=None, on_completion=None)
@mock.patch('nova.utils.execute')
def test_copy_image_rsync(self, mock_execute):
@ -85,7 +86,8 @@ blah BLAH: bb
mock_execute.assert_has_calls([
self._rsync_call('--dry-run', 'src', 'host:dest'),
mock.call('scp', 'src', 'host:dest'),
mock.call('scp', 'src', 'host:dest',
on_execute=None, on_completion=None),
])
self.assertEqual(2, mock_execute.call_count)
@ -110,7 +112,8 @@ blah BLAH: bb
mock_execute.assert_has_calls([
self._rsync_call('--dry-run', 'src', '[2600::]:dest'),
mock.call('scp', 'src', '[2600::]:dest'),
mock.call('scp', 'src', '[2600::]:dest',
on_execute=None, on_completion=None),
])
self.assertEqual(2, mock_execute.call_count)

View File

@ -95,6 +95,7 @@ from nova.virt.libvirt import firewall as libvirt_firewall
from nova.virt.libvirt import host
from nova.virt.libvirt import imagebackend
from nova.virt.libvirt import imagecache
from nova.virt.libvirt import instancejobtracker
from nova.virt.libvirt import lvm
from nova.virt.libvirt import rbd_utils
from nova.virt.libvirt import utils as libvirt_utils
@ -465,6 +466,8 @@ class LibvirtDriver(driver.ComputeDriver):
'expect': ', '.join("'%s'" % k for k in
sysinfo_serial_funcs.keys())})
self.job_tracker = instancejobtracker.InstanceJobTracker()
def _get_volume_drivers(self):
return libvirt_volume_drivers
@ -6301,6 +6304,11 @@ class LibvirtDriver(driver.ComputeDriver):
# finish_migration/_create_image to re-create it for us.
continue
on_execute = lambda process: self.job_tracker.add_job(
instance, process.pid)
on_completion = lambda process: self.job_tracker.remove_job(
instance, process.pid)
if info['type'] == 'qcow2' and info['backing_file']:
tmp_path = from_path + "_rbase"
# merge backing file
@ -6310,11 +6318,15 @@ class LibvirtDriver(driver.ComputeDriver):
if shared_storage:
utils.execute('mv', tmp_path, img_path)
else:
libvirt_utils.copy_image(tmp_path, img_path, host=dest)
libvirt_utils.copy_image(tmp_path, img_path, host=dest,
on_execute=on_execute,
on_completion=on_completion)
utils.execute('rm', '-f', tmp_path)
else: # raw or qcow2 with no backing file
libvirt_utils.copy_image(from_path, img_path, host=dest)
libvirt_utils.copy_image(from_path, img_path, host=dest,
on_execute=on_execute,
on_completion=on_completion)
except Exception:
with excutils.save_and_reraise_exception():
self._cleanup_remote_migration(dest, inst_base,
@ -6683,6 +6695,8 @@ class LibvirtDriver(driver.ComputeDriver):
# invocation failed due to the absence of both target and
# target_resize.
if not remaining_path and os.path.exists(target_del):
self.job_tracker.terminate_jobs(instance)
LOG.info(_LI('Deleting instance files %s'), target_del,
instance=instance)
remaining_path = target_del

View File

@ -0,0 +1,98 @@
# Copyright 2015 NTT corp.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import errno
import os
import signal
from oslo_log import log as logging
from nova.i18n import _LE
from nova.i18n import _LW
LOG = logging.getLogger(__name__)
class InstanceJobTracker(object):
def __init__(self):
self.jobs = collections.defaultdict(list)
def add_job(self, instance, pid):
"""Appends process_id of instance to cache.
This method will store the pid of a process in cache as
a key: value pair which will be used to kill the process if it
is running while deleting the instance. Instance uuid is used as
a key in the cache and pid will be the value.
:param instance: Object of instance
:param pid: Id of the process
"""
self.jobs[instance.uuid].append(pid)
def remove_job(self, instance, pid):
"""Removes pid of process from cache.
This method will remove the pid of a process from the cache.
:param instance: Object of instance
:param pid: Id of the process
"""
uuid = instance.uuid
if uuid in self.jobs and pid in self.jobs[uuid]:
self.jobs[uuid].remove(pid)
# remove instance.uuid if no pid's remaining
if not self.jobs[uuid]:
self.jobs.pop(uuid, None)
def terminate_jobs(self, instance):
"""Kills the running processes for given instance.
This method is used to kill all running processes of the instance if
it is deleted in between.
:param instance: Object of instance
"""
pids_to_remove = list(self.jobs.get(instance.uuid, []))
for pid in pids_to_remove:
try:
# Try to kill the process
os.kill(pid, signal.SIGKILL)
except OSError as exc:
if exc.errno != errno.ESRCH:
LOG.error(_LE('Failed to kill process %(pid)s '
'due to %(reason)s, while deleting the '
'instance.'), {'pid': pid, 'reason': exc},
instance=instance)
try:
# Check if the process is still alive.
os.kill(pid, 0)
except OSError as exc:
if exc.errno != errno.ESRCH:
LOG.error(_LE('Unexpected error while checking process '
'%(pid)s.'), {'pid': pid},
instance=instance)
else:
# The process is still around
LOG.warn(_LW("Failed to kill a long running process "
"%(pid)s related to the instance when "
"deleting it."), {'pid': pid},
instance=instance)
self.remove_job(instance, pid)

View File

@ -294,13 +294,16 @@ def get_disk_backing_file(path, basename=True):
return backing_file
def copy_image(src, dest, host=None, receive=False):
def copy_image(src, dest, host=None, receive=False,
on_execute=None, on_completion=None):
"""Copy a disk image to an existing directory
:param src: Source image
:param dest: Destination path
:param host: Remote host
:param receive: Reverse the rsync direction
:param on_execute: Callback method to store pid of process in cache
:param on_completion: Callback method to remove pid of process from cache
"""
if not host:
@ -322,11 +325,14 @@ def copy_image(src, dest, host=None, receive=False):
# Do a relatively light weight test first, so that we
# can fall back to scp, without having run out of space
# on the destination for example.
execute('rsync', '--sparse', '--compress', '--dry-run', src, dest)
execute('rsync', '--sparse', '--compress', '--dry-run', src, dest,
on_execute=on_execute, on_completion=on_completion)
except processutils.ProcessExecutionError:
execute('scp', src, dest)
execute('scp', src, dest, on_execute=on_execute,
on_completion=on_completion)
else:
execute('rsync', '--sparse', '--compress', src, dest)
execute('rsync', '--sparse', '--compress', src, dest,
on_execute=on_execute, on_completion=on_completion)
def write_to_file(path, contents, umask=None):