Avoid deleting transient cluster before job is started

In order to run job on a transient cluster client needs to execute
two commands:
 * create cluster with is_transient=true
 * run job on it in a regular manner

We terminate unneeded transient clusters in a periodic job, which
terminates cluster if cluster is transient and no job is running on it
at a time. We also do not terminate cluster if its lifetime is smaller
then config parameter min_transient_cluster_active_time. For some
reason the parameter is set to 0 by default, which could cause
premature cluster termination if periodic task runs between
cluster creation and job execution.

Also added unit tests to verify min_transient_cluster_active_time.

Change-Id: I5330a969ea7cd81ec2759a7fe32bab4a5de3fb4c
(cherry picked from commit 400d90da07)
This commit is contained in:
Dmitry Mescheryakov 2014-04-15 18:22:50 +04:00 committed by Sergey Lukjanov
parent 21601312c9
commit 9cd9013a49
3 changed files with 52 additions and 15 deletions

View File

@ -258,7 +258,7 @@
# Minimal "lifetime" in seconds for a transient cluster.
# Cluster is guarantied to be "alive" within this time period.
# (integer value)
#min_transient_cluster_active_time=0
#min_transient_cluster_active_time=30
#

View File

@ -44,7 +44,7 @@ periodic_opts = [
help='Max interval size between periodic tasks execution in '
'seconds'),
cfg.IntOpt('min_transient_cluster_active_time',
default=0,
default=30,
help='Minimal "lifetime" in seconds for a transient cluster. '
'Cluster is guarantied to be "alive" within this time '
'period.'),

View File

@ -51,23 +51,20 @@ class TestPeriodicBack(base.SaharaWithDbTestCase):
get_job_status.assert_has_calls([mock.call(u'2'),
mock.call(u'3')])
@mock.patch('sahara.service.edp.job_manager.get_job_status')
@mock.patch('sahara.openstack.common.timeutils.utcnow')
@mock.patch('sahara.service.api.terminate_cluster')
def test_cluster_terminate(self, terminate_cluster, get_job_status):
self.override_config("use_identity_api_v3", True)
def test_cluster_terminate(self, terminate_cluster, utcnow):
utcnow.return_value = datetime.datetime(2005, 2, 1, 0, 0)
ctx = context.ctx()
job = self.api.job_create(ctx, te.SAMPLE_JOB)
ds = self.api.data_source_create(ctx, te.SAMPLE_DATA_SOURCE)
c = tc.SAMPLE_CLUSTER.copy()
c["status"] = "Active"
c["id"] = "1"
c["name"] = "1"
c['updated_at'] = timeutils.utcnow()
self.api.cluster_create(ctx, c)
c["id"] = "2"
c["name"] = "2"
self.api.cluster_create(ctx, c)
self._create_job_execution({"end_time": datetime.datetime.now(),
self._make_cluster('1')
self._make_cluster('2')
self._create_job_execution({"end_time": timeutils.utcnow(),
"id": 1,
"cluster_id": "1"},
job, ds, ds)
@ -79,10 +76,50 @@ class TestPeriodicBack(base.SaharaWithDbTestCase):
"id": 3,
"cluster_id": "2"},
job, ds, ds)
utcnow.return_value = datetime.datetime(2005, 2, 1, 0, 1)
p.SaharaPeriodicTasks().terminate_unneeded_clusters(None)
self.assertEqual(terminate_cluster.call_count, 1)
terminate_cluster.assert_has_calls([mock.call(u'1')])
@mock.patch('sahara.openstack.common.timeutils.utcnow')
@mock.patch('sahara.service.api.terminate_cluster')
def test_cluster_not_killed_too_early(self, terminate_cluster, utcnow):
utcnow.return_value = datetime.datetime(2005, 2, 1, second=0)
self._make_cluster('1')
utcnow.return_value = datetime.datetime(2005, 2, 1, second=20)
p.SaharaPeriodicTasks().terminate_unneeded_clusters(None)
self.assertEqual(terminate_cluster.call_count, 0)
@mock.patch('sahara.openstack.common.timeutils.utcnow')
@mock.patch('sahara.service.api.terminate_cluster')
def test_cluster_killed_in_time(self, terminate_cluster, utcnow):
utcnow.return_value = datetime.datetime(2005, 2, 1, second=0)
self._make_cluster('1')
utcnow.return_value = datetime.datetime(2005, 2, 1, second=40)
p.SaharaPeriodicTasks().terminate_unneeded_clusters(None)
self.assertEqual(terminate_cluster.call_count, 1)
terminate_cluster.assert_has_calls([mock.call(u'1')])
def _make_cluster(self, id_name):
ctx = context.ctx()
c = tc.SAMPLE_CLUSTER.copy()
c["status"] = "Active"
c["id"] = id_name
c["name"] = id_name
c['updated_at'] = timeutils.utcnow()
self.api.cluster_create(ctx, c)
def _create_job_execution(self, values, job, input, output):
values.update({"job_id": job['id'],
"input_id": input['id'],