From ad790b98d71b814d220bcecc37d5d44327fb749e Mon Sep 17 00:00:00 2001 From: Marshall Margenau Date: Wed, 18 Jul 2018 15:37:54 -0500 Subject: [PATCH] Wait for jobs to complete - Wait for jobs to show as completed, instead of relying on pods associated with the job to show healthy, as the pods can go healthy or be removed while the job is still processing. Armada would continue forward as soon as all pods in current scope show as healthy. - Refactor delete pod action a bit, including removing unused code. - Fixed bug in waiting for pods to delete (in tiller handler L274). Bug caused a hung state while deleting pods as a pre-update hook, by passing timeout value in the incorrect position. Change-Id: I2a942f0a6290e8337fd7a43c3e8c9b4c9e350a10 --- armada/handlers/k8s.py | 191 ++++++++++++++++++++++++++++---------- armada/handlers/tiller.py | 11 ++- 2 files changed, 152 insertions(+), 50 deletions(-) diff --git a/armada/handlers/k8s.py b/armada/handlers/k8s.py index c81209e0..d4de220a 100644 --- a/armada/handlers/k8s.py +++ b/armada/handlers/k8s.py @@ -55,15 +55,17 @@ class K8s(object): propagation_policy='Foreground', timeout=DEFAULT_K8S_TIMEOUT): ''' + Delete a job from a namespace (see _delete_item_action). + :param name: name of job - :param namespace: namespace of job + :param namespace: namespace :param propagation_policy: The Kubernetes propagation_policy to apply - to the delete. Default 'Foreground' means that child pods to the - job will be deleted before the job is marked as deleted. + to the delete. + :param timeout: The timeout to wait for the delete to complete ''' - self._delete_job_action(self.batch_api.list_namespaced_job, - self.batch_api.delete_namespaced_job, "job", - name, namespace, propagation_policy, timeout) + self._delete_item_action(self.batch_api.list_namespaced_job, + self.batch_api.delete_namespaced_job, "job", + name, namespace, propagation_policy, timeout) def delete_cron_job_action(self, name, @@ -71,30 +73,69 @@ class K8s(object): propagation_policy='Foreground', timeout=DEFAULT_K8S_TIMEOUT): ''' + Delete a cron job from a namespace (see _delete_item_action). + :param name: name of cron job - :param namespace: namespace of cron job + :param namespace: namespace :param propagation_policy: The Kubernetes propagation_policy to apply - to the delete. Default 'Foreground' means that child pods of the - cron job will be deleted before the cron job is marked as deleted. + to the delete. + :param timeout: The timeout to wait for the delete to complete ''' - self._delete_job_action( + self._delete_item_action( self.batch_v1beta1_api.list_namespaced_cron_job, self.batch_v1beta1_api.delete_namespaced_cron_job, "cron job", name, namespace, propagation_policy, timeout) - def _delete_job_action(self, - list_func, - delete_func, - job_type_description, - name, - namespace="default", - propagation_policy='Foreground', - timeout=DEFAULT_K8S_TIMEOUT): + def delete_pod_action(self, + name, + namespace="default", + propagation_policy='Foreground', + timeout=DEFAULT_K8S_TIMEOUT): + ''' + Delete a pod from a namespace (see _delete_item_action). + + :param name: name of pod + :param namespace: namespace + :param propagation_policy: The Kubernetes propagation_policy to apply + to the delete. + :param timeout: The timeout to wait for the delete to complete + ''' + self._delete_item_action(self.client.list_namespaced_pod, + self.client.delete_namespaced_pod, "pod", + name, namespace, propagation_policy, timeout) + + def _delete_item_action(self, + list_func, + delete_func, + object_type_description, + name, + namespace="default", + propagation_policy='Foreground', + timeout=DEFAULT_K8S_TIMEOUT): + ''' + This function takes the action to delete an object (job, cronjob, pod) + from kubernetes. It will wait for the object to be fully deleted before + returning to processing or timing out. + + :param list_func: The callback function to list the specified object + type + :param delete_func: The callback function to delete the specified + object type + :param object_type_description: The types of objects to delete, + in `job`, `cronjob`, or `pod` + :param name: The name of the object to delete + :param namespace: The namespace of the object + :param propagation_policy: The Kubernetes propagation_policy to apply + to the delete. Default 'Foreground' means that child objects + will be deleted before the given object is marked as deleted. + See: https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/#controlling-how-the-garbage-collector-deletes-dependents # noqa + :param timeout: The timeout to wait for the delete to complete + ''' try: timeout = self._check_timeout(timeout) LOG.debug('Watching to delete %s %s, Wait timeout=%s', - job_type_description, name, timeout) + object_type_description, name, timeout) body = client.V1DeleteOptions() w = watch.Watch() issue_delete = True @@ -110,29 +151,29 @@ class K8s(object): issue_delete = False event_type = event['type'].upper() - job_name = event['object'].metadata.name - LOG.debug('Watch event %s on %s', event_type, job_name) + item_name = event['object'].metadata.name + LOG.debug('Watch event %s on %s', event_type, item_name) - if job_name == name: + if item_name == name: found_events = True if event_type == 'DELETED': LOG.info('Successfully deleted %s %s', - job_type_description, job_name) + object_type_description, item_name) return if not found_events: LOG.warn('Saw no delete events for %s %s in namespace=%s', - job_type_description, name, namespace) + object_type_description, name, namespace) err_msg = ('Reached timeout while waiting to delete %s: ' - 'name=%s, namespace=%s' % (job_type_description, name, - namespace)) + 'name=%s, namespace=%s' % (object_type_description, + name, namespace)) LOG.error(err_msg) raise exceptions.KubernetesWatchTimeoutException(err_msg) except ApiException as e: LOG.exception("Exception when deleting %s: name=%s, namespace=%s", - job_type_description, name, namespace) + object_type_description, name, namespace) raise e def get_namespace_job(self, namespace="default", label_selector=''): @@ -223,19 +264,6 @@ class K8s(object): return self.extension_api.delete_namespaced_daemon_set( name, namespace, body) - def delete_namespace_pod(self, name, namespace="default", body=None): - ''' - :param name: name of the Pod - :param namespace: namespace of the Pod - :param body: V1DeleteOptions - - Deletes pod by name and returns V1Status object - ''' - if body is None: - body = client.V1DeleteOptions() - - return self.client.delete_namespaced_pod(name, namespace, body) - def wait_for_pod_redeployment(self, old_pod_name, namespace): ''' :param old_pod_name: name of pods @@ -338,8 +366,16 @@ class K8s(object): LOG.warn('"label_selector" not specified, waiting with no labels ' 'may cause unintended consequences.') + # Track the overall deadline for timing out during waits deadline = time.time() + timeout + # First, we should watch for jobs before checking pods, as a job can + # still be running even after its current pods look healthy or have + # been removed and are pending reschedule + found_jobs = self.get_namespace_job(namespace, label_selector) + if len(found_jobs.items): + self._watch_job_completion(namespace, label_selector, timeout) + # NOTE(mark-burnett): Attempt to wait multiple times without # modification, in case new pods appear after our watch exits. @@ -347,9 +383,13 @@ class K8s(object): while successes < wait_attempts: deadline_remaining = int(round(deadline - time.time())) if deadline_remaining <= 0: - return False + LOG.info('Timed out while waiting for pods.') + raise exceptions.KubernetesWatchTimeoutException( + 'Timed out while waiting on namespace=(%s) labels=(%s)' % + (namespace, label_selector)) + timed_out, modified_pods, unready_pods, found_events = ( - self._wait_one_time( + self._watch_pod_completions( namespace=namespace, label_selector=label_selector, timeout=deadline_remaining)) @@ -357,8 +397,8 @@ class K8s(object): if not found_events: LOG.warn( 'Saw no install/update events for release=%s, ' - 'namespace=%s, labels=(%s)', release, namespace, - label_selector) + 'namespace=%s, labels=(%s). Are the labels correct?', + release, namespace, label_selector) if timed_out: LOG.info('Timed out waiting for pods: %s', @@ -366,7 +406,6 @@ class K8s(object): raise exceptions.KubernetesWatchTimeoutException( 'Timed out while waiting on namespace=(%s) labels=(%s)' % (namespace, label_selector)) - return False if modified_pods: successes = 0 @@ -381,9 +420,14 @@ class K8s(object): return True - def _wait_one_time(self, namespace, label_selector, timeout=100): + def _watch_pod_completions(self, namespace, label_selector, timeout=100): + ''' + Watch and wait for pod completions. + Returns lists of pods in various conditions for the calling function + to handle. + ''' LOG.debug( - 'Starting to wait: namespace=%s, label_selector=(%s), ' + 'Starting to wait on pods: namespace=%s, label_selector=(%s), ' 'timeout=%s', namespace, label_selector, timeout) ready_pods = {} modified_pods = set() @@ -476,3 +520,56 @@ class K8s(object): 'using default %ss.', DEFAULT_K8S_TIMEOUT) timeout = DEFAULT_K8S_TIMEOUT return timeout + + def _watch_job_completion(self, namespace, label_selector, timeout): + ''' + Watch and wait for job completion. + Returns when conditions are met, or raises a timeout exception. + ''' + try: + timeout = self._check_timeout(timeout) + + ready_jobs = {} + w = watch.Watch() + for event in w.stream( + self.batch_api.list_namespaced_job, + namespace=namespace, + label_selector=label_selector, + timeout_seconds=timeout): + + job_name = event['object'].metadata.name + LOG.debug('Watch event %s on job %s', event['type'].upper(), + job_name) + + # Track the expected and actual number of completed pods + # See: https://kubernetes.io/docs/concepts/workloads/controllers/jobs-run-to-completion/ # noqa + expected = event['object'].spec.completions + completed = event['object'].status.succeeded + + if expected != completed: + ready_jobs[job_name] = False + else: + ready_jobs[job_name] = True + LOG.debug( + 'Job %s complete (spec.completions=%s, ' + 'status.succeeded=%s)', job_name, expected, completed) + + if all(ready_jobs.values()): + return True + + except ApiException as e: + LOG.exception( + "Exception when watching jobs: namespace=%s, labels=(%s)", + namespace, label_selector) + raise e + + if not ready_jobs: + LOG.warn( + 'Saw no job events for namespace=%s, labels=(%s). ' + 'Are the labels correct?', namespace, label_selector) + return False + + err_msg = ('Reached timeout while waiting for job completions: ' + 'namespace=%s, labels=(%s)' % (namespace, label_selector)) + LOG.error(err_msg) + raise exceptions.KubernetesWatchTimeoutException(err_msg) diff --git a/armada/handlers/tiller.py b/armada/handlers/tiller.py index aa4d4d6d..7ae9e653 100644 --- a/armada/handlers/tiller.py +++ b/armada/handlers/tiller.py @@ -265,8 +265,13 @@ class Tiller(object): action_type = action.get('type') labels = action.get('labels', None) - self.delete_resources(release_name, name, action_type, labels, - namespace, timeout) + self.delete_resources( + release_name, + name, + action_type, + labels, + namespace, + timeout=timeout) except Exception: LOG.warn("PRE: Could not delete anything, please check yaml") raise ex.PreUpdateJobDeleteException(name, namespace) @@ -660,7 +665,7 @@ class Tiller(object): LOG.info("Deleting pod %s in namespace: %s", pod_name, namespace) - self.k8s.delete_namespace_pod(pod_name, namespace) + self.k8s.delete_pod_action(pod_name, namespace) if wait: self.k8s.wait_for_pod_redeployment(pod_name, namespace) handled = True