diff --git a/savanna/context.py b/savanna/context.py index fbde97c046..7b2ad35fe9 100644 --- a/savanna/context.py +++ b/savanna/context.py @@ -119,12 +119,17 @@ def model_update(model, context=None, **kwargs): return model_save(model, context) -def spawn(func, *args, **kwargs): +def spawn(thread_description, func, *args, **kwargs): ctx = current().clone() def wrapper(ctx, func, *args, **kwargs): - set_ctx(ctx) - func(*args, **kwargs) + try: + set_ctx(ctx) + func(*args, **kwargs) + set_ctx(None) + except Exception as e: + LOG.exception("Thread '%s' fails with exception: '%s'" + % (thread_description, e)) eventlet.spawn(wrapper, ctx, func, *args, **kwargs) diff --git a/savanna/plugins/hdp/ambariplugin.py b/savanna/plugins/hdp/ambariplugin.py index 6d867897e1..c112975226 100644 --- a/savanna/plugins/hdp/ambariplugin.py +++ b/savanna/plugins/hdp/ambariplugin.py @@ -236,7 +236,9 @@ class AmbariPlugin(p.ProvisioningPluginBase): ambari_info.get_address())) for server in servers: - context.spawn(server.provision_ambari, ambari_info) + context.spawn("hdp-provision-instance-%s" % + server.instance.hostname, + server.provision_ambari, ambari_info) self._wait_for_host_registrations(len(servers), ambari_info) self._set_ambari_credentials(cluster_spec, ambari_info) @@ -680,7 +682,9 @@ class AmbariPlugin(p.ProvisioningPluginBase): self._update_ambari_info_credentials(cluster_spec, ambari_info) for server in servers: - context.spawn(server.provision_ambari, ambari_info) + context.spawn("hdp-scaling-instance-%s" % + server.instance.hostname, + server.provision_ambari, ambari_info) self._wait_for_host_registrations(self._get_num_hosts(cluster), ambari_info) diff --git a/savanna/service/api.py b/savanna/service/api.py index 145a192987..71a4090770 100644 --- a/savanna/service/api.py +++ b/savanna/service/api.py @@ -65,7 +65,8 @@ def scale_cluster(cluster_id, data): to_be_enlarged.update({add_n_g.name: additional[add_n_g]}) context.model_save(cluster) - context.spawn(_provision_nodes, cluster_id, to_be_enlarged) + context.spawn("cluster-scaling-%s" % cluster_id, + _provision_nodes, cluster_id, to_be_enlarged) return cluster @@ -84,7 +85,8 @@ def create_cluster(values): status_description=str(ex)) LOG.info(g.format_cluster_status(cluster)) - context.spawn(_provision_cluster, cluster.id) + context.spawn("cluster-creating-%s" % cluster.id, + _provision_cluster, cluster.id) return cluster