Add instance job terminate timeout
This change lets the caller set a timeout for the job terminate
operation. If the timeout is None or 0, this operation will wait
undefinetely for the jobs to be terminated (while attempting to
stop them).
Also, error state jobs are now properly handled, as we were not
considering them to be finished.
Change-Id: I47beec8b36cecd9db3960c2deff09e342418046a
(cherry picked from commit b5d60890e2
)
This commit is contained in:
parent
cb18c29f81
commit
07a68ebd8b
|
@ -89,8 +89,26 @@ def get_wrapped_function(function):
|
|||
return _get_wrapped_function(function)
|
||||
|
||||
|
||||
def retry_decorator(max_retry_count=5, inc_sleep_time=1,
|
||||
def retry_decorator(max_retry_count=5, timeout=None, inc_sleep_time=1,
|
||||
max_sleep_time=1, exceptions=(), error_codes=()):
|
||||
"""Retries invoking the decorated method in case of expected exceptions.
|
||||
|
||||
:param max_retry_count: The maximum number of retries performed. If 0, no
|
||||
retry is performed. If None, there will be no limit
|
||||
on the number of retries.
|
||||
:param timeout: The maximum time for which we'll retry invoking the method.
|
||||
If 0 or None, there will be no time limit.
|
||||
:param inc_sleep_time: The time sleep increment used between retries.
|
||||
:param max_sleep_time: The maximum time to wait between retries.
|
||||
:param exceptions: A list of expected exceptions for which retries will be
|
||||
performed.
|
||||
:param error_codes: A list of expected error codes. The error code is
|
||||
retrieved from the 'error_code' exception attribute,
|
||||
for example in case of Win32Exception. If this argument
|
||||
is not passed, retries will be performed for any of the
|
||||
expected exceptions.
|
||||
"""
|
||||
|
||||
if isinstance(error_codes, six.integer_types):
|
||||
error_codes = (error_codes, )
|
||||
|
||||
|
@ -98,6 +116,7 @@ def retry_decorator(max_retry_count=5, inc_sleep_time=1,
|
|||
def inner(*args, **kwargs):
|
||||
try_count = 0
|
||||
sleep_time = 0
|
||||
time_start = time.time()
|
||||
|
||||
while True:
|
||||
try:
|
||||
|
@ -107,25 +126,42 @@ def retry_decorator(max_retry_count=5, inc_sleep_time=1,
|
|||
err_code = getattr(exc, 'error_code', None)
|
||||
expected_err_code = (err_code in error_codes
|
||||
or not error_codes)
|
||||
should_retry = (expected_err_code
|
||||
and try_count < max_retry_count)
|
||||
|
||||
time_elapsed = time.time() - time_start
|
||||
time_left = (timeout - time_elapsed
|
||||
if timeout else 'undefined')
|
||||
tries_left = (max_retry_count - try_count
|
||||
if max_retry_count is not None
|
||||
else 'undefined')
|
||||
|
||||
should_retry = (
|
||||
expected_err_code
|
||||
and tries_left
|
||||
and (time_left == 'undefined'
|
||||
or time_left > 0))
|
||||
ctxt.reraise = not should_retry
|
||||
|
||||
if should_retry:
|
||||
try_count += 1
|
||||
func_name = reflection.get_callable_name(f)
|
||||
LOG.debug("Got expected exception %(exc)s while "
|
||||
"calling function %(func_name)s. "
|
||||
"Retries left: %(retries_left)d. "
|
||||
"Retrying in %(sleep_time)s seconds.",
|
||||
dict(exc=exc,
|
||||
func_name=func_name,
|
||||
retries_left=(
|
||||
max_retry_count - try_count),
|
||||
sleep_time=sleep_time))
|
||||
|
||||
sleep_time = min(sleep_time + inc_sleep_time,
|
||||
max_sleep_time)
|
||||
if timeout:
|
||||
sleep_time = min(sleep_time, time_left)
|
||||
|
||||
LOG.debug("Got expected exception %(exc)s while "
|
||||
"calling function %(func_name)s. "
|
||||
"Retries left: %(retries_left)s. "
|
||||
"Time left: %(time_left)s. "
|
||||
"Time elapsed: %(time_elapsed)s "
|
||||
"Retrying in %(sleep_time)s seconds.",
|
||||
dict(exc=exc,
|
||||
func_name=func_name,
|
||||
retries_left=tries_left,
|
||||
time_left=time_left,
|
||||
time_elapsed=time_elapsed,
|
||||
sleep_time=sleep_time))
|
||||
time.sleep(sleep_time)
|
||||
return inner
|
||||
return wrapper
|
||||
|
|
|
@ -102,6 +102,7 @@ VM_GEN_2 = 2
|
|||
JOB_STATE_COMPLETED = 7
|
||||
JOB_STATE_TERMINATED = 8
|
||||
JOB_STATE_KILLED = 9
|
||||
JOB_STATE_EXCEPTION = 10
|
||||
JOB_STATE_COMPLETED_WITH_WARNINGS = 32768
|
||||
|
||||
# Special vlan_id value in ovs_vlan_allocations table indicating flat network
|
||||
|
|
|
@ -68,28 +68,66 @@ class UtilsTestCase(base.BaseTestCase):
|
|||
|
||||
return fake_func, func_side_effect
|
||||
|
||||
@mock.patch('time.sleep')
|
||||
def test_retry_decorator(self, mock_sleep):
|
||||
@mock.patch.object(_utils, 'time')
|
||||
def test_retry_decorator(self, mock_time):
|
||||
err_code = 1
|
||||
max_retry_count = 5
|
||||
max_sleep_time = 4
|
||||
max_sleep_time = 2
|
||||
timeout = max_retry_count + 1
|
||||
mock_time.time.side_effect = range(timeout)
|
||||
|
||||
raised_exc = exceptions.Win32Exception(message='fake_exc',
|
||||
error_code=err_code)
|
||||
side_effect = [raised_exc] * max_retry_count
|
||||
side_effect.append(mock.sentinel.ret_val)
|
||||
|
||||
fake_func = self._get_fake_func_with_retry_decorator(
|
||||
(fake_func,
|
||||
fake_func_side_effect) = self._get_fake_func_with_retry_decorator(
|
||||
error_codes=err_code,
|
||||
exceptions=exceptions.Win32Exception,
|
||||
max_retry_count=max_retry_count,
|
||||
max_sleep_time=max_sleep_time,
|
||||
side_effect=side_effect)[0]
|
||||
timeout=timeout,
|
||||
side_effect=side_effect)
|
||||
|
||||
ret_val = fake_func()
|
||||
ret_val = fake_func(mock.sentinel.arg,
|
||||
kwarg=mock.sentinel.kwarg)
|
||||
self.assertEqual(mock.sentinel.ret_val, ret_val)
|
||||
mock_sleep.assert_has_calls([mock.call(sleep_time)
|
||||
for sleep_time in [1, 2, 3, 4, 4]])
|
||||
fake_func_side_effect.assert_has_calls(
|
||||
[mock.call(mock.sentinel.arg, kwarg=mock.sentinel.kwarg)] *
|
||||
(max_retry_count + 1))
|
||||
self.assertEqual(max_retry_count + 1, mock_time.time.call_count)
|
||||
mock_time.sleep.assert_has_calls(
|
||||
[mock.call(sleep_time)
|
||||
for sleep_time in [1, 2, 2, 2, 1]])
|
||||
|
||||
@mock.patch.object(_utils, 'time')
|
||||
def _test_retry_decorator_exceeded(self, mock_time, expected_try_count,
|
||||
mock_time_side_eff=None,
|
||||
timeout=None, max_retry_count=None):
|
||||
raised_exc = exceptions.Win32Exception(message='fake_exc')
|
||||
mock_time.time.side_effect = mock_time_side_eff
|
||||
|
||||
(fake_func,
|
||||
fake_func_side_effect) = self._get_fake_func_with_retry_decorator(
|
||||
exceptions=exceptions.Win32Exception,
|
||||
timeout=timeout,
|
||||
side_effect=raised_exc)
|
||||
|
||||
self.assertRaises(exceptions.Win32Exception, fake_func)
|
||||
fake_func_side_effect.assert_has_calls(
|
||||
[mock.call()] * expected_try_count)
|
||||
|
||||
def test_retry_decorator_tries_exceeded(self):
|
||||
self._test_retry_decorator_exceeded(
|
||||
max_retry_count=2,
|
||||
expected_try_count=3)
|
||||
|
||||
def test_retry_decorator_time_exceeded(self):
|
||||
self._test_retry_decorator_exceeded(
|
||||
mock_time_side_eff=[0, 1, 4],
|
||||
timeout=3,
|
||||
expected_try_count=1)
|
||||
|
||||
@mock.patch('time.sleep')
|
||||
def _test_retry_decorator_no_retry(self, mock_sleep,
|
||||
|
|
|
@ -85,19 +85,27 @@ class JobUtilsTestCase(test_base.OsWinBaseTestCase):
|
|||
job = self.jobutils._wait_for_job(self._FAKE_JOB_PATH)
|
||||
self.assertEqual(mock_job, job)
|
||||
|
||||
def test_get_pending_jobs(self):
|
||||
@ddt.data(True, False)
|
||||
def test_get_pending_jobs(self, ignore_error_state):
|
||||
mock_killed_job = mock.Mock(JobState=constants.JOB_STATE_KILLED)
|
||||
mock_running_job = mock.Mock(JobState=constants.WMI_JOB_STATE_RUNNING)
|
||||
mock_error_st_job = mock.Mock(JobState=constants.JOB_STATE_EXCEPTION)
|
||||
mappings = [mock.Mock(AffectingElement=None),
|
||||
mock.Mock(AffectingElement=mock_killed_job),
|
||||
mock.Mock(AffectingElement=mock_running_job)]
|
||||
mock.Mock(AffectingElement=mock_running_job),
|
||||
mock.Mock(AffectingElement=mock_error_st_job)]
|
||||
self.jobutils._conn.Msvm_AffectedJobElement.return_value = mappings
|
||||
|
||||
mock_affected_element = mock.Mock()
|
||||
|
||||
expected_pending_jobs = [mock_running_job]
|
||||
if not ignore_error_state:
|
||||
expected_pending_jobs.append(mock_error_st_job)
|
||||
|
||||
pending_jobs = self.jobutils._get_pending_jobs_affecting_element(
|
||||
mock_affected_element)
|
||||
self.assertEqual([mock_running_job], pending_jobs)
|
||||
mock_affected_element,
|
||||
ignore_error_state=ignore_error_state)
|
||||
self.assertEqual(expected_pending_jobs, pending_jobs)
|
||||
|
||||
self.jobutils._conn.Msvm_AffectedJobElement.assert_called_once_with(
|
||||
AffectedElement=mock_affected_element.path_.return_value)
|
||||
|
@ -105,7 +113,7 @@ class JobUtilsTestCase(test_base.OsWinBaseTestCase):
|
|||
@ddt.data(True, False)
|
||||
@mock.patch.object(jobutils.JobUtils,
|
||||
'_get_pending_jobs_affecting_element')
|
||||
def test_stop_jobs(self, jobs_ended, mock_get_pending_jobs):
|
||||
def test_stop_jobs_helper(self, jobs_ended, mock_get_pending_jobs):
|
||||
mock_job1 = mock.Mock(Cancellable=True)
|
||||
mock_job2 = mock.Mock(Cancellable=True)
|
||||
mock_job3 = mock.Mock(Cancellable=False)
|
||||
|
@ -121,20 +129,27 @@ class JobUtilsTestCase(test_base.OsWinBaseTestCase):
|
|||
test_base.FakeWMIExc(hresult=mock.sentinel.hresult))
|
||||
|
||||
if jobs_ended:
|
||||
self.jobutils.stop_jobs(mock.sentinel.vm)
|
||||
self.jobutils._stop_jobs(mock.sentinel.vm)
|
||||
else:
|
||||
self.assertRaises(exceptions.JobTerminateFailed,
|
||||
self.jobutils.stop_jobs,
|
||||
self.jobutils._stop_jobs,
|
||||
mock.sentinel.vm)
|
||||
|
||||
mock_get_pending_jobs.assert_has_calls(
|
||||
[mock.call(mock.sentinel.vm)] * 2)
|
||||
[mock.call(mock.sentinel.vm, ignore_error_state=False),
|
||||
mock.call(mock.sentinel.vm)])
|
||||
|
||||
mock_job1.RequestStateChange.assert_called_once_with(
|
||||
self.jobutils._KILL_JOB_STATE_CHANGE_REQUEST)
|
||||
mock_job2.RequestStateChange.assert_called_once_with(
|
||||
self.jobutils._KILL_JOB_STATE_CHANGE_REQUEST)
|
||||
self.assertFalse(mock_job3.RequestStateChange.called)
|
||||
self.assertFalse(mock_job3.RequestStateqqChange.called)
|
||||
|
||||
@mock.patch.object(jobutils.JobUtils, '_stop_jobs')
|
||||
def test_stop_jobs(self, mock_stop_jobs_helper):
|
||||
fake_timeout = 1
|
||||
self.jobutils.stop_jobs(mock.sentinel.element, fake_timeout)
|
||||
mock_stop_jobs_helper.assert_called_once_with(mock.sentinel.element)
|
||||
|
||||
def test_is_job_completed_true(self):
|
||||
job = mock.MagicMock(JobState=constants.WMI_JOB_STATE_COMPLETED)
|
||||
|
|
|
@ -40,6 +40,7 @@ class JobUtils(baseutils.BaseUtilsVirt):
|
|||
|
||||
_CONCRETE_JOB_CLASS = "Msvm_ConcreteJob"
|
||||
|
||||
_DEFAULT_JOB_TERMINATE_TIMEOUT = 15 # seconds
|
||||
_KILL_JOB_STATE_CHANGE_REQUEST = 5
|
||||
_WBEM_E_NOT_FOUND = 0x80041002
|
||||
|
||||
|
@ -98,7 +99,8 @@ class JobUtils(baseutils.BaseUtilsVirt):
|
|||
{'desc': desc, 'elap': elap})
|
||||
return job
|
||||
|
||||
def _get_pending_jobs_affecting_element(self, element):
|
||||
def _get_pending_jobs_affecting_element(self, element,
|
||||
ignore_error_state=True):
|
||||
# Msvm_AffectedJobElement is in fact an association between
|
||||
# the affected element and the affecting job.
|
||||
mappings = self._conn.Msvm_AffectedJobElement(
|
||||
|
@ -107,17 +109,21 @@ class JobUtils(baseutils.BaseUtilsVirt):
|
|||
mapping.AffectingElement
|
||||
for mapping in mappings
|
||||
if (mapping.AffectingElement and not
|
||||
self._is_job_completed(mapping.AffectingElement))]
|
||||
self._is_job_completed(mapping.AffectingElement,
|
||||
ignore_error_state))]
|
||||
return pending_jobs
|
||||
|
||||
def stop_jobs(self, element):
|
||||
pending_jobs = self._get_pending_jobs_affecting_element(element)
|
||||
def _stop_jobs(self, element):
|
||||
pending_jobs = self._get_pending_jobs_affecting_element(
|
||||
element, ignore_error_state=False)
|
||||
for job in pending_jobs:
|
||||
try:
|
||||
if not job.Cancellable:
|
||||
LOG.debug("Got request to terminate "
|
||||
"non-cancelable job.")
|
||||
continue
|
||||
elif job.JobState == constants.JOB_STATE_EXCEPTION:
|
||||
LOG.debug("Attempting to terminate exception state job.")
|
||||
|
||||
job.RequestStateChange(
|
||||
self._KILL_JOB_STATE_CHANGE_REQUEST)
|
||||
|
@ -138,8 +144,18 @@ class JobUtils(baseutils.BaseUtilsVirt):
|
|||
pending_count=len(pending_jobs)))
|
||||
raise exceptions.JobTerminateFailed()
|
||||
|
||||
def _is_job_completed(self, job):
|
||||
return job.JobState in self._completed_job_states
|
||||
def _is_job_completed(self, job, ignore_error_state=True):
|
||||
return (job.JobState in self._completed_job_states or
|
||||
(job.JobState == constants.JOB_STATE_EXCEPTION
|
||||
and ignore_error_state))
|
||||
|
||||
def stop_jobs(self, element, timeout=_DEFAULT_JOB_TERMINATE_TIMEOUT):
|
||||
@_utils.retry_decorator(exceptions=exceptions.JobTerminateFailed,
|
||||
timeout=timeout, max_retry_count=None)
|
||||
def _stop_jobs_with_timeout():
|
||||
self._stop_jobs(element)
|
||||
|
||||
_stop_jobs_with_timeout()
|
||||
|
||||
@_utils.retry_decorator(exceptions=exceptions.HyperVException)
|
||||
def add_virt_resource(self, virt_resource, parent):
|
||||
|
|
Loading…
Reference in New Issue