Add instance job terminate timeout

This change lets the caller set a timeout for the job terminate
operation. If the timeout is None or 0, this operation will wait
undefinetely for the jobs to be terminated (while attempting to
stop them).

Also, error state jobs are now properly handled, as we were not
considering them to be finished.

Change-Id: I47beec8b36cecd9db3960c2deff09e342418046a
(cherry picked from commit b5d60890e2)
This commit is contained in:
Lucian Petrut 2016-07-01 15:52:32 +03:00 committed by Claudiu Belu
parent cb18c29f81
commit 07a68ebd8b
5 changed files with 141 additions and 35 deletions

View File

@ -89,8 +89,26 @@ def get_wrapped_function(function):
return _get_wrapped_function(function)
def retry_decorator(max_retry_count=5, inc_sleep_time=1,
def retry_decorator(max_retry_count=5, timeout=None, inc_sleep_time=1,
max_sleep_time=1, exceptions=(), error_codes=()):
"""Retries invoking the decorated method in case of expected exceptions.
:param max_retry_count: The maximum number of retries performed. If 0, no
retry is performed. If None, there will be no limit
on the number of retries.
:param timeout: The maximum time for which we'll retry invoking the method.
If 0 or None, there will be no time limit.
:param inc_sleep_time: The time sleep increment used between retries.
:param max_sleep_time: The maximum time to wait between retries.
:param exceptions: A list of expected exceptions for which retries will be
performed.
:param error_codes: A list of expected error codes. The error code is
retrieved from the 'error_code' exception attribute,
for example in case of Win32Exception. If this argument
is not passed, retries will be performed for any of the
expected exceptions.
"""
if isinstance(error_codes, six.integer_types):
error_codes = (error_codes, )
@ -98,6 +116,7 @@ def retry_decorator(max_retry_count=5, inc_sleep_time=1,
def inner(*args, **kwargs):
try_count = 0
sleep_time = 0
time_start = time.time()
while True:
try:
@ -107,25 +126,42 @@ def retry_decorator(max_retry_count=5, inc_sleep_time=1,
err_code = getattr(exc, 'error_code', None)
expected_err_code = (err_code in error_codes
or not error_codes)
should_retry = (expected_err_code
and try_count < max_retry_count)
time_elapsed = time.time() - time_start
time_left = (timeout - time_elapsed
if timeout else 'undefined')
tries_left = (max_retry_count - try_count
if max_retry_count is not None
else 'undefined')
should_retry = (
expected_err_code
and tries_left
and (time_left == 'undefined'
or time_left > 0))
ctxt.reraise = not should_retry
if should_retry:
try_count += 1
func_name = reflection.get_callable_name(f)
LOG.debug("Got expected exception %(exc)s while "
"calling function %(func_name)s. "
"Retries left: %(retries_left)d. "
"Retrying in %(sleep_time)s seconds.",
dict(exc=exc,
func_name=func_name,
retries_left=(
max_retry_count - try_count),
sleep_time=sleep_time))
sleep_time = min(sleep_time + inc_sleep_time,
max_sleep_time)
if timeout:
sleep_time = min(sleep_time, time_left)
LOG.debug("Got expected exception %(exc)s while "
"calling function %(func_name)s. "
"Retries left: %(retries_left)s. "
"Time left: %(time_left)s. "
"Time elapsed: %(time_elapsed)s "
"Retrying in %(sleep_time)s seconds.",
dict(exc=exc,
func_name=func_name,
retries_left=tries_left,
time_left=time_left,
time_elapsed=time_elapsed,
sleep_time=sleep_time))
time.sleep(sleep_time)
return inner
return wrapper

View File

@ -102,6 +102,7 @@ VM_GEN_2 = 2
JOB_STATE_COMPLETED = 7
JOB_STATE_TERMINATED = 8
JOB_STATE_KILLED = 9
JOB_STATE_EXCEPTION = 10
JOB_STATE_COMPLETED_WITH_WARNINGS = 32768
# Special vlan_id value in ovs_vlan_allocations table indicating flat network

View File

@ -68,28 +68,66 @@ class UtilsTestCase(base.BaseTestCase):
return fake_func, func_side_effect
@mock.patch('time.sleep')
def test_retry_decorator(self, mock_sleep):
@mock.patch.object(_utils, 'time')
def test_retry_decorator(self, mock_time):
err_code = 1
max_retry_count = 5
max_sleep_time = 4
max_sleep_time = 2
timeout = max_retry_count + 1
mock_time.time.side_effect = range(timeout)
raised_exc = exceptions.Win32Exception(message='fake_exc',
error_code=err_code)
side_effect = [raised_exc] * max_retry_count
side_effect.append(mock.sentinel.ret_val)
fake_func = self._get_fake_func_with_retry_decorator(
(fake_func,
fake_func_side_effect) = self._get_fake_func_with_retry_decorator(
error_codes=err_code,
exceptions=exceptions.Win32Exception,
max_retry_count=max_retry_count,
max_sleep_time=max_sleep_time,
side_effect=side_effect)[0]
timeout=timeout,
side_effect=side_effect)
ret_val = fake_func()
ret_val = fake_func(mock.sentinel.arg,
kwarg=mock.sentinel.kwarg)
self.assertEqual(mock.sentinel.ret_val, ret_val)
mock_sleep.assert_has_calls([mock.call(sleep_time)
for sleep_time in [1, 2, 3, 4, 4]])
fake_func_side_effect.assert_has_calls(
[mock.call(mock.sentinel.arg, kwarg=mock.sentinel.kwarg)] *
(max_retry_count + 1))
self.assertEqual(max_retry_count + 1, mock_time.time.call_count)
mock_time.sleep.assert_has_calls(
[mock.call(sleep_time)
for sleep_time in [1, 2, 2, 2, 1]])
@mock.patch.object(_utils, 'time')
def _test_retry_decorator_exceeded(self, mock_time, expected_try_count,
mock_time_side_eff=None,
timeout=None, max_retry_count=None):
raised_exc = exceptions.Win32Exception(message='fake_exc')
mock_time.time.side_effect = mock_time_side_eff
(fake_func,
fake_func_side_effect) = self._get_fake_func_with_retry_decorator(
exceptions=exceptions.Win32Exception,
timeout=timeout,
side_effect=raised_exc)
self.assertRaises(exceptions.Win32Exception, fake_func)
fake_func_side_effect.assert_has_calls(
[mock.call()] * expected_try_count)
def test_retry_decorator_tries_exceeded(self):
self._test_retry_decorator_exceeded(
max_retry_count=2,
expected_try_count=3)
def test_retry_decorator_time_exceeded(self):
self._test_retry_decorator_exceeded(
mock_time_side_eff=[0, 1, 4],
timeout=3,
expected_try_count=1)
@mock.patch('time.sleep')
def _test_retry_decorator_no_retry(self, mock_sleep,

View File

@ -85,19 +85,27 @@ class JobUtilsTestCase(test_base.OsWinBaseTestCase):
job = self.jobutils._wait_for_job(self._FAKE_JOB_PATH)
self.assertEqual(mock_job, job)
def test_get_pending_jobs(self):
@ddt.data(True, False)
def test_get_pending_jobs(self, ignore_error_state):
mock_killed_job = mock.Mock(JobState=constants.JOB_STATE_KILLED)
mock_running_job = mock.Mock(JobState=constants.WMI_JOB_STATE_RUNNING)
mock_error_st_job = mock.Mock(JobState=constants.JOB_STATE_EXCEPTION)
mappings = [mock.Mock(AffectingElement=None),
mock.Mock(AffectingElement=mock_killed_job),
mock.Mock(AffectingElement=mock_running_job)]
mock.Mock(AffectingElement=mock_running_job),
mock.Mock(AffectingElement=mock_error_st_job)]
self.jobutils._conn.Msvm_AffectedJobElement.return_value = mappings
mock_affected_element = mock.Mock()
expected_pending_jobs = [mock_running_job]
if not ignore_error_state:
expected_pending_jobs.append(mock_error_st_job)
pending_jobs = self.jobutils._get_pending_jobs_affecting_element(
mock_affected_element)
self.assertEqual([mock_running_job], pending_jobs)
mock_affected_element,
ignore_error_state=ignore_error_state)
self.assertEqual(expected_pending_jobs, pending_jobs)
self.jobutils._conn.Msvm_AffectedJobElement.assert_called_once_with(
AffectedElement=mock_affected_element.path_.return_value)
@ -105,7 +113,7 @@ class JobUtilsTestCase(test_base.OsWinBaseTestCase):
@ddt.data(True, False)
@mock.patch.object(jobutils.JobUtils,
'_get_pending_jobs_affecting_element')
def test_stop_jobs(self, jobs_ended, mock_get_pending_jobs):
def test_stop_jobs_helper(self, jobs_ended, mock_get_pending_jobs):
mock_job1 = mock.Mock(Cancellable=True)
mock_job2 = mock.Mock(Cancellable=True)
mock_job3 = mock.Mock(Cancellable=False)
@ -121,20 +129,27 @@ class JobUtilsTestCase(test_base.OsWinBaseTestCase):
test_base.FakeWMIExc(hresult=mock.sentinel.hresult))
if jobs_ended:
self.jobutils.stop_jobs(mock.sentinel.vm)
self.jobutils._stop_jobs(mock.sentinel.vm)
else:
self.assertRaises(exceptions.JobTerminateFailed,
self.jobutils.stop_jobs,
self.jobutils._stop_jobs,
mock.sentinel.vm)
mock_get_pending_jobs.assert_has_calls(
[mock.call(mock.sentinel.vm)] * 2)
[mock.call(mock.sentinel.vm, ignore_error_state=False),
mock.call(mock.sentinel.vm)])
mock_job1.RequestStateChange.assert_called_once_with(
self.jobutils._KILL_JOB_STATE_CHANGE_REQUEST)
mock_job2.RequestStateChange.assert_called_once_with(
self.jobutils._KILL_JOB_STATE_CHANGE_REQUEST)
self.assertFalse(mock_job3.RequestStateChange.called)
self.assertFalse(mock_job3.RequestStateqqChange.called)
@mock.patch.object(jobutils.JobUtils, '_stop_jobs')
def test_stop_jobs(self, mock_stop_jobs_helper):
fake_timeout = 1
self.jobutils.stop_jobs(mock.sentinel.element, fake_timeout)
mock_stop_jobs_helper.assert_called_once_with(mock.sentinel.element)
def test_is_job_completed_true(self):
job = mock.MagicMock(JobState=constants.WMI_JOB_STATE_COMPLETED)

View File

@ -40,6 +40,7 @@ class JobUtils(baseutils.BaseUtilsVirt):
_CONCRETE_JOB_CLASS = "Msvm_ConcreteJob"
_DEFAULT_JOB_TERMINATE_TIMEOUT = 15 # seconds
_KILL_JOB_STATE_CHANGE_REQUEST = 5
_WBEM_E_NOT_FOUND = 0x80041002
@ -98,7 +99,8 @@ class JobUtils(baseutils.BaseUtilsVirt):
{'desc': desc, 'elap': elap})
return job
def _get_pending_jobs_affecting_element(self, element):
def _get_pending_jobs_affecting_element(self, element,
ignore_error_state=True):
# Msvm_AffectedJobElement is in fact an association between
# the affected element and the affecting job.
mappings = self._conn.Msvm_AffectedJobElement(
@ -107,17 +109,21 @@ class JobUtils(baseutils.BaseUtilsVirt):
mapping.AffectingElement
for mapping in mappings
if (mapping.AffectingElement and not
self._is_job_completed(mapping.AffectingElement))]
self._is_job_completed(mapping.AffectingElement,
ignore_error_state))]
return pending_jobs
def stop_jobs(self, element):
pending_jobs = self._get_pending_jobs_affecting_element(element)
def _stop_jobs(self, element):
pending_jobs = self._get_pending_jobs_affecting_element(
element, ignore_error_state=False)
for job in pending_jobs:
try:
if not job.Cancellable:
LOG.debug("Got request to terminate "
"non-cancelable job.")
continue
elif job.JobState == constants.JOB_STATE_EXCEPTION:
LOG.debug("Attempting to terminate exception state job.")
job.RequestStateChange(
self._KILL_JOB_STATE_CHANGE_REQUEST)
@ -138,8 +144,18 @@ class JobUtils(baseutils.BaseUtilsVirt):
pending_count=len(pending_jobs)))
raise exceptions.JobTerminateFailed()
def _is_job_completed(self, job):
return job.JobState in self._completed_job_states
def _is_job_completed(self, job, ignore_error_state=True):
return (job.JobState in self._completed_job_states or
(job.JobState == constants.JOB_STATE_EXCEPTION
and ignore_error_state))
def stop_jobs(self, element, timeout=_DEFAULT_JOB_TERMINATE_TIMEOUT):
@_utils.retry_decorator(exceptions=exceptions.JobTerminateFailed,
timeout=timeout, max_retry_count=None)
def _stop_jobs_with_timeout():
self._stop_jobs(element)
_stop_jobs_with_timeout()
@_utils.retry_decorator(exceptions=exceptions.HyperVException)
def add_virt_resource(self, virt_resource, parent):