Add processutils module

This change adds a processutils module, which includes several
process/job helpers.

Associating a job with a process will allow ensuring that when a
parent process is killed, processes associated with jobs (e.g. child
processes) will be killed as well.

This is especially useful for Cinder, which will use it for the
multiple backend per service scenario.

Job objects may have resource limits. We may use jobs in order to
implement resource limit support within oslo_concurrency (currently
not available on Windows).

Related-Bug: #1734334

Change-Id: Icc2bbe9191e6db685c0fd294abc1d0eb24bc420c
This commit is contained in:
Lucian Petrut 2017-11-22 14:28:02 +02:00
parent 0e58eac4a6
commit f913051b00
10 changed files with 490 additions and 2 deletions

View File

@ -199,6 +199,10 @@ class DNSException(OSWinException):
pass
class Timeout(OSWinException):
msg_fmt = _("Timed out waiting for the specified resource.")
class DNSZoneNotFound(NotFound, DNSException):
msg_fmt = _("DNS Zone not found: %(zone_name)s")

View File

@ -0,0 +1,195 @@
# Copyright 2017 Cloudbase Solutions Srl
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import mock
from os_win.tests.unit import test_base
from os_win.utils import processutils
from os_win.utils.winapi import constants as w_const
@ddt.ddt
class ProcessUtilsTestCase(test_base.OsWinBaseTestCase):
def setUp(self):
super(ProcessUtilsTestCase, self).setUp()
self._setup_lib_mocks()
self._procutils = processutils.ProcessUtils()
self._procutils._win32_utils = mock.Mock()
self._win32_utils = self._procutils._win32_utils
self._mock_run = self._win32_utils.run_and_check_output
self.addCleanup(mock.patch.stopall)
def _setup_lib_mocks(self):
self._ctypes = mock.Mock()
# This is used in order to easily make assertions on the variables
# passed by reference.
self._ctypes.byref = lambda x: (x, "byref")
self._ctypes.c_wchar_p = lambda x: (x, 'c_wchar_p')
self._ctypes.sizeof = lambda x: (x, 'sizeof')
self._ctypes_patcher = mock.patch.multiple(
processutils, ctypes=self._ctypes)
self._ctypes_patcher.start()
self._mock_kernel32 = mock.Mock()
mock.patch.multiple(processutils,
kernel32=self._mock_kernel32).start()
def test_create_job_object(self):
job_handle = self._procutils.create_job_object(mock.sentinel.name)
self._mock_run.assert_called_once_with(
self._mock_kernel32.CreateJobObjectW,
None,
self._ctypes.c_wchar_p(mock.sentinel.name),
error_ret_vals=[None],
kernel32_lib_func=True)
self.assertEqual(self._mock_run.return_value, job_handle)
def test_set_information_job_object(self):
self._procutils.set_information_job_object(
mock.sentinel.job_handle,
mock.sentinel.job_info_class,
mock.sentinel.job_info)
self._mock_run.assert_called_once_with(
self._mock_kernel32.SetInformationJobObject,
mock.sentinel.job_handle,
mock.sentinel.job_info_class,
self._ctypes.byref(mock.sentinel.job_info),
self._ctypes.sizeof(mock.sentinel.job_info),
kernel32_lib_func=True)
def test_assign_process_to_job_object(self):
self._procutils.assign_process_to_job_object(
mock.sentinel.job_handle,
mock.sentinel.process_handle)
self._mock_run.assert_called_once_with(
self._mock_kernel32.AssignProcessToJobObject,
mock.sentinel.job_handle,
mock.sentinel.process_handle,
kernel32_lib_func=True)
def test_open_process(self):
process_handle = self._procutils.open_process(
mock.sentinel.pid,
mock.sentinel.desired_access,
mock.sentinel.inherit_handle)
self._mock_run.assert_called_once_with(
self._mock_kernel32.OpenProcess,
mock.sentinel.desired_access,
mock.sentinel.inherit_handle,
mock.sentinel.pid,
error_ret_vals=[None],
kernel32_lib_func=True)
self.assertEqual(self._mock_run.return_value, process_handle)
@ddt.data({},
{'assign_job_exc': Exception})
@ddt.unpack
@mock.patch.object(processutils.ProcessUtils, 'open_process')
@mock.patch.object(processutils.ProcessUtils, 'create_job_object')
@mock.patch.object(processutils.ProcessUtils,
'set_information_job_object')
@mock.patch.object(processutils.ProcessUtils,
'assign_process_to_job_object')
@mock.patch.object(processutils.kernel32_struct,
'JOBOBJECT_EXTENDED_LIMIT_INFORMATION')
def test_kill_process_on_job_close(self, mock_job_limit_struct,
mock_assign_job,
mock_set_job_info,
mock_create_job,
mock_open_process,
assign_job_exc=None):
mock_assign_job.side_effect = assign_job_exc
mock_open_process.return_value = mock.sentinel.process_handle
mock_create_job.return_value = mock.sentinel.job_handle
if assign_job_exc:
self.assertRaises(assign_job_exc,
self._procutils.kill_process_on_job_close,
mock.sentinel.pid)
else:
self._procutils.kill_process_on_job_close(mock.sentinel.pid)
mock_open_process.assert_called_once_with(
mock.sentinel.pid,
w_const.PROCESS_SET_QUOTA | w_const.PROCESS_TERMINATE)
mock_create_job.assert_called_once_with()
mock_job_limit_struct.assert_called_once_with()
mock_job_limit = mock_job_limit_struct.return_value
self.assertEqual(w_const.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE,
mock_job_limit.BasicLimitInformation.LimitFlags)
mock_set_job_info.assert_called_once_with(
mock.sentinel.job_handle,
w_const.JobObjectExtendedLimitInformation,
mock_job_limit)
mock_assign_job.assert_called_once_with(
mock.sentinel.job_handle,
mock.sentinel.process_handle)
exp_closed_handles = [mock.sentinel.process_handle]
if assign_job_exc:
exp_closed_handles.append(mock.sentinel.job_handle)
self._win32_utils.close_handle.assert_has_calls(
[mock.call(handle) for handle in exp_closed_handles])
@ddt.data({},
{'wait_exc': Exception})
@ddt.unpack
@mock.patch.object(processutils.ProcessUtils, 'open_process')
def test_wait_for_multiple_processes(self, mock_open_process,
wait_exc=None):
pids = [mock.sentinel.pid0, mock.sentinel.pid1]
phandles = [mock.sentinel.process_handle_0,
mock.sentinel.process_handle_1]
mock_wait = self._win32_utils.wait_for_multiple_objects
mock_wait.side_effect = wait_exc
mock_open_process.side_effect = phandles
if wait_exc:
self.assertRaises(wait_exc,
self._procutils.wait_for_multiple_processes,
pids,
mock.sentinel.wait_all,
mock.sentinel.milliseconds)
else:
self._procutils.wait_for_multiple_processes(
pids,
mock.sentinel.wait_all,
mock.sentinel.milliseconds)
mock_open_process.assert_has_calls(
[mock.call(pid,
desired_access=w_const.SYNCHRONIZE)
for pid in pids])
self._win32_utils.close_handle.assert_has_calls(
[mock.call(handle) for handle in phandles])
mock_wait.assert_called_once_with(phandles,
mock.sentinel.wait_all,
mock.sentinel.milliseconds)

View File

@ -31,6 +31,7 @@ from os_win.utils.dns import dnsutils
from os_win.utils import hostutils
from os_win.utils.network import networkutils
from os_win.utils import pathutils
from os_win.utils import processutils
from os_win.utils.storage import diskutils
from os_win.utils.storage.initiator import iscsi_utils
from os_win.utils.storage import smbutils
@ -135,3 +136,8 @@ class TestHyperVUtilsFactory(test_base.OsWinBaseTestCase):
self._check_get_class(
expected_class=migrationutils.MigrationUtils,
class_type='migrationutils')
def test_get_processutils(self):
self._check_get_class(
expected_class=processutils.ProcessUtils,
class_type='processutils')

View File

@ -22,6 +22,7 @@ from os_win import _utils
from os_win import exceptions
from os_win.utils import win32utils
from os_win.utils.winapi import constants as w_const
from os_win.utils.winapi import wintypes
@ddt.ddt
@ -46,7 +47,6 @@ class Win32UtilsTestCase(base.BaseTestCase):
mock.patch.multiple(win32utils,
kernel32=mock.DEFAULT,
wintypes=mock.DEFAULT,
create=True).start()
@mock.patch.object(win32utils.Win32Utils, 'get_error_message')
@ -208,3 +208,36 @@ class Win32UtilsTestCase(base.BaseTestCase):
mock_localfree.assert_any_call(mock.sentinel.handle)
self.assertEqual(bool(ret_val), mock_log_exc.called)
@mock.patch.object(win32utils.Win32Utils, 'run_and_check_output')
def test_wait_for_multiple_objects(self, mock_helper):
fake_handles = [10, 11]
ret_val = self._win32_utils.wait_for_multiple_objects(
fake_handles, mock.sentinel.wait_all, mock.sentinel.milliseconds)
mock_helper.assert_called_once_with(
win32utils.kernel32.WaitForMultipleObjects,
len(fake_handles),
mock.ANY,
mock.sentinel.wait_all,
mock.sentinel.milliseconds,
kernel32_lib_func=True,
error_ret_vals=[w_const.WAIT_FAILED])
self.assertEqual(mock_helper.return_value, ret_val)
handles_arg = mock_helper.call_args_list[0][0][2]
self.assertIsInstance(handles_arg,
wintypes.HANDLE * len(fake_handles))
self.assertEqual(fake_handles, handles_arg[:])
@mock.patch.object(win32utils.Win32Utils, 'run_and_check_output')
def test_wait_for_multiple_objects_timeout(self, mock_helper):
fake_handles = [10]
mock_helper.return_value = w_const.ERROR_WAIT_TIMEOUT
self.assertRaises(
exceptions.Timeout,
self._win32_utils.wait_for_multiple_objects,
fake_handles, mock.sentinel.wait_all,
mock.sentinel.milliseconds)

View File

@ -0,0 +1,130 @@
# Copyright 2017 Cloudbase Solutions Srl
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ctypes
from oslo_log import log as logging
from os_win.utils import win32utils
from os_win.utils.winapi import constants as w_const
from os_win.utils.winapi import libs as w_lib
from os_win.utils.winapi.libs import kernel32 as kernel32_struct
kernel32 = w_lib.get_shared_lib_handle(w_lib.KERNEL32)
LOG = logging.getLogger(__name__)
class ProcessUtils(object):
def __init__(self):
self._win32_utils = win32utils.Win32Utils()
def _run_and_check_output(self, *args, **kwargs):
kwargs.update(kernel32_lib_func=True)
return self._win32_utils.run_and_check_output(*args, **kwargs)
def create_job_object(self, name=None):
"""Create or open a job object.
:param name: (Optional) the job name.
:returns: a handle of the created job.
"""
pname = None if name is None else ctypes.c_wchar_p(name)
return self._run_and_check_output(kernel32.CreateJobObjectW,
None, # job security attributes
pname,
error_ret_vals=[None])
def set_information_job_object(self, job_handle, job_object_info_class,
job_object_info):
self._run_and_check_output(kernel32.SetInformationJobObject,
job_handle,
job_object_info_class,
ctypes.byref(job_object_info),
ctypes.sizeof(job_object_info))
def assign_process_to_job_object(self, job_handle, process_handle):
self._run_and_check_output(kernel32.AssignProcessToJobObject,
job_handle, process_handle)
def open_process(self, pid, desired_access, inherit_handle=False):
"""Open an existing process."""
return self._run_and_check_output(kernel32.OpenProcess,
desired_access,
inherit_handle,
pid,
error_ret_vals=[None])
def kill_process_on_job_close(self, pid):
"""Associates a new job to the specified process.
The process is immediately killed when the last job handle is closed.
This mechanism can be useful when ensuring that child processes get
killed along with a parent process.
This method does not check if the specified process is already part of
a job. Starting with WS 2012, nested jobs are available.
:returns: the job handle, if a job was successfully created and
associated with the process, otherwise "None".
"""
process_handle = None
job_handle = None
job_associated = False
try:
desired_process_access = (w_const.PROCESS_SET_QUOTA |
w_const.PROCESS_TERMINATE)
process_handle = self.open_process(pid, desired_process_access)
job_handle = self.create_job_object()
job_info = kernel32_struct.JOBOBJECT_EXTENDED_LIMIT_INFORMATION()
job_info.BasicLimitInformation.LimitFlags = (
w_const.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE)
job_info_class = w_const.JobObjectExtendedLimitInformation
self.set_information_job_object(job_handle,
job_info_class,
job_info)
self.assign_process_to_job_object(job_handle, process_handle)
job_associated = True
finally:
if process_handle:
self._win32_utils.close_handle(process_handle)
if not job_associated and job_handle:
# We have an unassociated job object. Closing the handle
# will also destroy the job object.
self._win32_utils.close_handle(job_handle)
return job_handle
def wait_for_multiple_processes(self, pids, wait_all=True,
milliseconds=w_const.INFINITE):
handles = []
try:
for pid in pids:
handle = self.open_process(pid,
desired_access=w_const.SYNCHRONIZE)
handles.append(handle)
return self._win32_utils.wait_for_multiple_objects(
handles, wait_all, milliseconds)
finally:
for handle in handles:
self._win32_utils.close_handle(handle)

View File

@ -22,6 +22,7 @@ from os_win import _utils
from os_win import exceptions
from os_win.utils.winapi import constants as w_const
from os_win.utils.winapi import libs as w_lib
from os_win.utils.winapi import wintypes
kernel32 = w_lib.get_shared_lib_handle(w_lib.KERNEL32)
@ -128,3 +129,19 @@ class Win32Utils(object):
def close_handle(self, handle):
kernel32.CloseHandle(handle)
def wait_for_multiple_objects(self, handles, wait_all=True,
milliseconds=w_const.INFINITE):
handle_array = (wintypes.HANDLE * len(handles))(*handles)
ret_val = self.run_and_check_output(
kernel32.WaitForMultipleObjects,
len(handles),
handle_array,
wait_all,
milliseconds,
kernel32_lib_func=True,
error_ret_vals=[w_const.WAIT_FAILED])
if ret_val == w_const.ERROR_WAIT_TIMEOUT:
raise exceptions.Timeout()
return ret_val

View File

@ -39,6 +39,11 @@ FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
JobObjectBasicLimitInformation = 2
JobObjectExtendedLimitInformation = 9
INFINITE = 0xFFFFFFFF # Infinite timeout
# FileAPI.h
OPEN_EXISTING = 3
@ -57,6 +62,20 @@ GROUP_SECURITY_INFORMATION = 0x00000002
DACL_SECURITY_INFORMATION = 0x00000004
SACL_SECURITY_INFORMATION = 0x00000008
# If the following flag is set, all processes associated with
# the job are terminated when the last job handle is closed.
JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE = 0x00002000
# The following flags specify access rights that may be
# requested when opening proccesses.
#
# Allows setting process limits.
PROCESS_SET_QUOTA = 0x0100
# Allows terminating a process.
PROCESS_TERMINATE = 0x0001
# Allows waiting for a process.
SYNCHRONIZE = 0x00100000
# winioctl.h
FILE_DEVICE_DISK = 7

View File

@ -20,6 +20,42 @@ from os_win.utils.winapi import wintypes
lib_handle = None
class IO_COUNTERS(ctypes.Structure):
_fields_ = [
('ReadOperationCount', wintypes.ULONGLONG),
('WriteOperationCount', wintypes.ULONGLONG),
('OtherOperationCount', wintypes.ULONGLONG),
('ReadTransferCount', wintypes.ULONGLONG),
('WriteTransferCount', wintypes.ULONGLONG),
('OtherTransferCount', wintypes.ULONGLONG)
]
class JOBOBJECT_BASIC_LIMIT_INFORMATION(ctypes.Structure):
_fields_ = [
('PerProcessUserTimeLimit', wintypes.LARGE_INTEGER),
('PerJobUserTimeLimit', wintypes.LARGE_INTEGER),
('LimitFlags', wintypes.DWORD),
('MinimumWorkingSetSize', ctypes.c_size_t),
('MaximumWorkingSetSize', ctypes.c_size_t),
('ActiveProcessLimit', wintypes.DWORD),
('Affinity', wintypes.PULONG),
('PriorityClass', wintypes.DWORD),
('SchedulingClass', wintypes.DWORD)
]
class JOBOBJECT_EXTENDED_LIMIT_INFORMATION(ctypes.Structure):
_fields_ = [
('BasicLimitInformation', JOBOBJECT_BASIC_LIMIT_INFORMATION),
('IoInfo', IO_COUNTERS),
('ProcessMemoryLimit', ctypes.c_size_t),
('JobMemoryLimit', ctypes.c_size_t),
('PeakProcessMemoryUsed', ctypes.c_size_t),
('PeakJobMemoryUsed', ctypes.c_size_t)
]
def register():
global lib_handle
lib_handle = ctypes.windll.kernel32
@ -139,3 +175,38 @@ def register():
wintypes.LPOVERLAPPED_COMPLETION_ROUTINE
]
lib_handle.WriteFileEx.restype = wintypes.BOOL
lib_handle.CreateJobObjectW.argtypes = [
wintypes.LPCVOID,
wintypes.LPCWSTR
]
lib_handle.CreateJobObjectW.restype = wintypes.HANDLE
lib_handle.SetInformationJobObject.argtypes = [
wintypes.HANDLE,
wintypes.INT,
wintypes.LPVOID,
wintypes.DWORD
]
lib_handle.SetInformationJobObject.restype = wintypes.BOOL
lib_handle.AssignProcessToJobObject.argtypes = [
wintypes.HANDLE,
wintypes.HANDLE
]
lib_handle.AssignProcessToJobObject.restype = wintypes.BOOL
lib_handle.OpenProcess.argtypes = [
wintypes.DWORD,
wintypes.BOOL,
wintypes.DWORD
]
lib_handle.OpenProcess.restype = wintypes.HANDLE
lib_handle.WaitForMultipleObjects.argtypes = [
wintypes.DWORD,
wintypes.LPHANDLE,
wintypes.BOOL,
wintypes.DWORD
]
lib_handle.WaitForMultipleObjects.restype = wintypes.DWORD

View File

@ -132,7 +132,12 @@ utils_map = {
'DNSUtils': {
'min_version': 6.2,
'max_version': None,
'path': 'os_win.utils.dns.dnsutils.DNSUtils'}}
'path': 'os_win.utils.dns.dnsutils.DNSUtils'}},
'processutils': {
'ProcessUtils': {
'min_version': 6.2,
'max_version': None,
'path': 'os_win.utils.processutils.ProcessUtils'}}
}
@ -230,3 +235,7 @@ def get_dnsutils():
def get_migrationutils():
return _get_class(class_type='migrationutils')
def get_processutils():
return _get_class(class_type='processutils')

View File

@ -0,0 +1,4 @@
---
features:
- |
os-win now allows mapping processes to job objects.