Wait for jobs to complete

- Wait for jobs to show as completed, instead of relying on pods
  associated with the job to show healthy, as the pods can go
  healthy or be removed while the job is still processing. Armada
  would continue forward as soon as all pods in current scope
  show as healthy.
- Refactor delete pod action a bit, including removing unused code.
- Fixed bug in waiting for pods to delete (in tiller handler L274).
  Bug caused a hung state while deleting pods as a pre-update hook,
  by passing timeout value in the incorrect position.

Change-Id: I2a942f0a6290e8337fd7a43c3e8c9b4c9e350a10
This commit is contained in:
Marshall Margenau 2018-07-18 15:37:54 -05:00
parent cd0242780e
commit ad790b98d7
2 changed files with 152 additions and 50 deletions

View File

@ -55,15 +55,17 @@ class K8s(object):
propagation_policy='Foreground',
timeout=DEFAULT_K8S_TIMEOUT):
'''
Delete a job from a namespace (see _delete_item_action).
:param name: name of job
:param namespace: namespace of job
:param namespace: namespace
:param propagation_policy: The Kubernetes propagation_policy to apply
to the delete. Default 'Foreground' means that child pods to the
job will be deleted before the job is marked as deleted.
to the delete.
:param timeout: The timeout to wait for the delete to complete
'''
self._delete_job_action(self.batch_api.list_namespaced_job,
self.batch_api.delete_namespaced_job, "job",
name, namespace, propagation_policy, timeout)
self._delete_item_action(self.batch_api.list_namespaced_job,
self.batch_api.delete_namespaced_job, "job",
name, namespace, propagation_policy, timeout)
def delete_cron_job_action(self,
name,
@ -71,30 +73,69 @@ class K8s(object):
propagation_policy='Foreground',
timeout=DEFAULT_K8S_TIMEOUT):
'''
Delete a cron job from a namespace (see _delete_item_action).
:param name: name of cron job
:param namespace: namespace of cron job
:param namespace: namespace
:param propagation_policy: The Kubernetes propagation_policy to apply
to the delete. Default 'Foreground' means that child pods of the
cron job will be deleted before the cron job is marked as deleted.
to the delete.
:param timeout: The timeout to wait for the delete to complete
'''
self._delete_job_action(
self._delete_item_action(
self.batch_v1beta1_api.list_namespaced_cron_job,
self.batch_v1beta1_api.delete_namespaced_cron_job, "cron job",
name, namespace, propagation_policy, timeout)
def _delete_job_action(self,
list_func,
delete_func,
job_type_description,
name,
namespace="default",
propagation_policy='Foreground',
timeout=DEFAULT_K8S_TIMEOUT):
def delete_pod_action(self,
name,
namespace="default",
propagation_policy='Foreground',
timeout=DEFAULT_K8S_TIMEOUT):
'''
Delete a pod from a namespace (see _delete_item_action).
:param name: name of pod
:param namespace: namespace
:param propagation_policy: The Kubernetes propagation_policy to apply
to the delete.
:param timeout: The timeout to wait for the delete to complete
'''
self._delete_item_action(self.client.list_namespaced_pod,
self.client.delete_namespaced_pod, "pod",
name, namespace, propagation_policy, timeout)
def _delete_item_action(self,
list_func,
delete_func,
object_type_description,
name,
namespace="default",
propagation_policy='Foreground',
timeout=DEFAULT_K8S_TIMEOUT):
'''
This function takes the action to delete an object (job, cronjob, pod)
from kubernetes. It will wait for the object to be fully deleted before
returning to processing or timing out.
:param list_func: The callback function to list the specified object
type
:param delete_func: The callback function to delete the specified
object type
:param object_type_description: The types of objects to delete,
in `job`, `cronjob`, or `pod`
:param name: The name of the object to delete
:param namespace: The namespace of the object
:param propagation_policy: The Kubernetes propagation_policy to apply
to the delete. Default 'Foreground' means that child objects
will be deleted before the given object is marked as deleted.
See: https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/#controlling-how-the-garbage-collector-deletes-dependents # noqa
:param timeout: The timeout to wait for the delete to complete
'''
try:
timeout = self._check_timeout(timeout)
LOG.debug('Watching to delete %s %s, Wait timeout=%s',
job_type_description, name, timeout)
object_type_description, name, timeout)
body = client.V1DeleteOptions()
w = watch.Watch()
issue_delete = True
@ -110,29 +151,29 @@ class K8s(object):
issue_delete = False
event_type = event['type'].upper()
job_name = event['object'].metadata.name
LOG.debug('Watch event %s on %s', event_type, job_name)
item_name = event['object'].metadata.name
LOG.debug('Watch event %s on %s', event_type, item_name)
if job_name == name:
if item_name == name:
found_events = True
if event_type == 'DELETED':
LOG.info('Successfully deleted %s %s',
job_type_description, job_name)
object_type_description, item_name)
return
if not found_events:
LOG.warn('Saw no delete events for %s %s in namespace=%s',
job_type_description, name, namespace)
object_type_description, name, namespace)
err_msg = ('Reached timeout while waiting to delete %s: '
'name=%s, namespace=%s' % (job_type_description, name,
namespace))
'name=%s, namespace=%s' % (object_type_description,
name, namespace))
LOG.error(err_msg)
raise exceptions.KubernetesWatchTimeoutException(err_msg)
except ApiException as e:
LOG.exception("Exception when deleting %s: name=%s, namespace=%s",
job_type_description, name, namespace)
object_type_description, name, namespace)
raise e
def get_namespace_job(self, namespace="default", label_selector=''):
@ -223,19 +264,6 @@ class K8s(object):
return self.extension_api.delete_namespaced_daemon_set(
name, namespace, body)
def delete_namespace_pod(self, name, namespace="default", body=None):
'''
:param name: name of the Pod
:param namespace: namespace of the Pod
:param body: V1DeleteOptions
Deletes pod by name and returns V1Status object
'''
if body is None:
body = client.V1DeleteOptions()
return self.client.delete_namespaced_pod(name, namespace, body)
def wait_for_pod_redeployment(self, old_pod_name, namespace):
'''
:param old_pod_name: name of pods
@ -338,8 +366,16 @@ class K8s(object):
LOG.warn('"label_selector" not specified, waiting with no labels '
'may cause unintended consequences.')
# Track the overall deadline for timing out during waits
deadline = time.time() + timeout
# First, we should watch for jobs before checking pods, as a job can
# still be running even after its current pods look healthy or have
# been removed and are pending reschedule
found_jobs = self.get_namespace_job(namespace, label_selector)
if len(found_jobs.items):
self._watch_job_completion(namespace, label_selector, timeout)
# NOTE(mark-burnett): Attempt to wait multiple times without
# modification, in case new pods appear after our watch exits.
@ -347,9 +383,13 @@ class K8s(object):
while successes < wait_attempts:
deadline_remaining = int(round(deadline - time.time()))
if deadline_remaining <= 0:
return False
LOG.info('Timed out while waiting for pods.')
raise exceptions.KubernetesWatchTimeoutException(
'Timed out while waiting on namespace=(%s) labels=(%s)' %
(namespace, label_selector))
timed_out, modified_pods, unready_pods, found_events = (
self._wait_one_time(
self._watch_pod_completions(
namespace=namespace,
label_selector=label_selector,
timeout=deadline_remaining))
@ -357,8 +397,8 @@ class K8s(object):
if not found_events:
LOG.warn(
'Saw no install/update events for release=%s, '
'namespace=%s, labels=(%s)', release, namespace,
label_selector)
'namespace=%s, labels=(%s). Are the labels correct?',
release, namespace, label_selector)
if timed_out:
LOG.info('Timed out waiting for pods: %s',
@ -366,7 +406,6 @@ class K8s(object):
raise exceptions.KubernetesWatchTimeoutException(
'Timed out while waiting on namespace=(%s) labels=(%s)' %
(namespace, label_selector))
return False
if modified_pods:
successes = 0
@ -381,9 +420,14 @@ class K8s(object):
return True
def _wait_one_time(self, namespace, label_selector, timeout=100):
def _watch_pod_completions(self, namespace, label_selector, timeout=100):
'''
Watch and wait for pod completions.
Returns lists of pods in various conditions for the calling function
to handle.
'''
LOG.debug(
'Starting to wait: namespace=%s, label_selector=(%s), '
'Starting to wait on pods: namespace=%s, label_selector=(%s), '
'timeout=%s', namespace, label_selector, timeout)
ready_pods = {}
modified_pods = set()
@ -476,3 +520,56 @@ class K8s(object):
'using default %ss.', DEFAULT_K8S_TIMEOUT)
timeout = DEFAULT_K8S_TIMEOUT
return timeout
def _watch_job_completion(self, namespace, label_selector, timeout):
'''
Watch and wait for job completion.
Returns when conditions are met, or raises a timeout exception.
'''
try:
timeout = self._check_timeout(timeout)
ready_jobs = {}
w = watch.Watch()
for event in w.stream(
self.batch_api.list_namespaced_job,
namespace=namespace,
label_selector=label_selector,
timeout_seconds=timeout):
job_name = event['object'].metadata.name
LOG.debug('Watch event %s on job %s', event['type'].upper(),
job_name)
# Track the expected and actual number of completed pods
# See: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/ # noqa
expected = event['object'].spec.completions
completed = event['object'].status.succeeded
if expected != completed:
ready_jobs[job_name] = False
else:
ready_jobs[job_name] = True
LOG.debug(
'Job %s complete (spec.completions=%s, '
'status.succeeded=%s)', job_name, expected, completed)
if all(ready_jobs.values()):
return True
except ApiException as e:
LOG.exception(
"Exception when watching jobs: namespace=%s, labels=(%s)",
namespace, label_selector)
raise e
if not ready_jobs:
LOG.warn(
'Saw no job events for namespace=%s, labels=(%s). '
'Are the labels correct?', namespace, label_selector)
return False
err_msg = ('Reached timeout while waiting for job completions: '
'namespace=%s, labels=(%s)' % (namespace, label_selector))
LOG.error(err_msg)
raise exceptions.KubernetesWatchTimeoutException(err_msg)

View File

@ -265,8 +265,13 @@ class Tiller(object):
action_type = action.get('type')
labels = action.get('labels', None)
self.delete_resources(release_name, name, action_type, labels,
namespace, timeout)
self.delete_resources(
release_name,
name,
action_type,
labels,
namespace,
timeout=timeout)
except Exception:
LOG.warn("PRE: Could not delete anything, please check yaml")
raise ex.PreUpdateJobDeleteException(name, namespace)
@ -660,7 +665,7 @@ class Tiller(object):
LOG.info("Deleting pod %s in namespace: %s", pod_name,
namespace)
self.k8s.delete_namespace_pod(pod_name, namespace)
self.k8s.delete_pod_action(pod_name, namespace)
if wait:
self.k8s.wait_for_pod_redeployment(pod_name, namespace)
handled = True