Added job execution after cluster start and operation for job execution

Implements: blueprint epd-start-job-after-cluster-creation

Change-Id: I37bb3409dd30bd144ca35ac663398726b22b9317
This commit is contained in:
Alexander Kuznetsov 2013-08-23 17:40:48 +04:00
parent 6699fe9943
commit 231996fd42
14 changed files with 146 additions and 13 deletions

View File

@ -65,8 +65,43 @@ def job_delete(job_id):
@v.check_exists(api.get_data_source, id='output_id')
@v.check_exists(c_api.get_cluster, 'cluster_id')
def job_execute(job_id, input_id, output_id, cluster_id, data):
return u.render(job_execution=api.execute_job(job_id, input_id, output_id,
cluster_id).to_dict())
job_execution = api.execute_job(job_id, input_id, output_id,
cluster_id, data)
return u.render(job_execution.to_wrapped_dict())
@rest.get('/job-executions')
def job_executions_list():
job_executions = [je.to_dict() for je in api.job_execution_list()]
return u.render(job_executions=job_executions)
@rest.get('/job-executions/<job_execution_id>')
@v.check_exists(api.get_job_execution, id='job_execution_id')
def job_executions(job_execution_id):
job_execution = api.get_job_execution(job_execution_id)
return u.render(job_execution.to_wrapped_dict())
@rest.get('/job-executions/<job_execution_id>/refresh-status')
@v.check_exists(api.get_job_execution, id='job_execution_id')
def job_executions_status(job_execution_id):
job_execution = api.get_job_execution_status(job_execution_id)
return u.render(job_execution.to_wrapped_dict())
@rest.get('/job-executions/<job_execution_id>/cancel')
@v.check_exists(api.get_job_execution, id='job_execution_id')
def job_executions_cancel(job_execution_id):
job_execution = api.cancel_job_execution(job_execution_id)
return u.render(job_execution.to_wrapped_dict())
@rest.delete('/job-executions/<job_execution_id>')
@v.check_exists(api.get_job_execution, id='job_execution_id')
def job_executions_delete(job_execution_id):
api.delete_job_execution(job_execution_id)
return u.render()
@rest.get('/data-sources')

View File

@ -234,6 +234,12 @@ class LocalApi(object):
return self._manager.job_execution_get(context,
_get_id(job_execution))
@r.wrap(r.JobExecution)
def job_execution_get_by_cluster(self, context, cluster):
"""Return the all JobExecutions for specific cluster."""
return self._manager.job_execution_get_by_cluster(context,
_get_id(cluster))
@r.wrap(r.JobExecution)
def job_execution_get_all(self, context):
"""Get all JobExecutions."""
@ -244,6 +250,7 @@ class LocalApi(object):
"""Create a JobExecution from the values dictionary."""
return self._manager.job_execution_create(context, values)
@r.wrap(r.JobExecution)
def job_execution_update(self, context, job_execution, values):
"""Update the JobExecution or raise if it does not exist."""
return self._manager.job_execution_update(context,

View File

@ -276,6 +276,10 @@ class ConductorManager(db_base.Base):
"""Return the JobExecution or None if it does not exist."""
return self.db.job_execution_get(context, job_execution)
def job_execution_get_by_cluster(self, context, cluster):
"""Return the all JobExecutions for specific cluster."""
return self.db.job_execution_get_by_cluster(context, cluster)
def job_execution_get_all(self, context):
"""Get all JobExecutions."""
return self.db.job_execution_get_all(context)

View File

@ -200,16 +200,17 @@ class JobExecution(object):
"""An object representing JobExecution
id
tenant_id
job_id
input_id
output_id
start_time
end_time
cluster_id
info
progress
logs
oozie_job_id
return_code
map_tasks - list of map_tasks
reduce_tasks - list of reduce_tasks
"""

View File

@ -23,6 +23,7 @@ Resource class might provide back references to parent objects
and helper methods.
"""
import datetime
import six
from savanna.conductor import objects
@ -33,6 +34,7 @@ def wrap(resource_class):
"""A decorator which wraps dict returned by a given function into
a Resource.
"""
def decorator(func):
def handle(*args, **kwargs):
ret = func(*args, **kwargs)
@ -44,6 +46,7 @@ def wrap(resource_class):
return None
return handle
return decorator
@ -130,7 +133,8 @@ class Resource(types.FrozenDict):
def _is_passthrough_type(self, entity):
return (entity is None or
isinstance(entity,
(six.integer_types, float, six.string_types)))
(six.integer_types, float,
datetime.datetime, six.string_types)))
# Conversion to dict

View File

@ -277,6 +277,12 @@ def job_execution_get(context, job_execution):
return IMPL.job_execution_get(context, job_execution)
@to_dict
def job_execution_get_by_cluster(context, cluster_id):
"""Return the all JobExecutions for specific cluster."""
return IMPL.job_execution_get_by_cluster(context, cluster_id)
@to_dict
def job_execution_get_all(context):
"""Get all JobExecutions."""
@ -289,6 +295,7 @@ def job_execution_create(context, values):
return IMPL.job_execution_create(context, values)
@to_dict
def job_execution_update(context, job_execution, values):
"""Create a JobExecution from the values dictionary."""
return IMPL.job_execution_update(context, job_execution, values)

View File

@ -464,6 +464,11 @@ def job_execution_get(context, job_execution_id):
return _job_execution_get(context, job_execution_id)
def job_execution_get_by_cluster(context, cluster_id):
query = model_query(m.JobExecution, context, get_session())
return query.filter_by(cluster_id=cluster_id).all()
def job_execution_get_all(context):
query = model_query(m.JobExecution, context)
return query.all()

View File

@ -265,10 +265,11 @@ class JobExecution(mb.SavannaBase):
sa.ForeignKey('data_sources.id'))
output_id = sa.Column(sa.String(36),
sa.ForeignKey('data_sources.id'))
start_time = sa.Column(sa.Date())
end_time = sa.Column(sa.Date())
start_time = sa.Column(sa.DateTime())
end_time = sa.Column(sa.DateTime())
cluster_id = sa.Column(sa.String(36),
sa.ForeignKey('clusters.id'))
info = sa.Column(st.JsonDictType())
progress = sa.Column(sa.Float)
oozie_job_id = sa.Column(sa.String(100))
return_code = sa.Column(sa.String(80))

View File

@ -19,6 +19,7 @@ from savanna.openstack.common import excutils
from savanna.openstack.common import log as logging
from savanna.plugins import base as plugin_base
from savanna.plugins import provisioning
from savanna.service.edp import job_manager as jm
from savanna.service import instances as i
from savanna.utils import general as g
from savanna.utils.openstack import nova
@ -149,6 +150,10 @@ def _provision_cluster(cluster_id):
cluster = conductor.cluster_update(ctx, cluster, {"status": "Active"})
LOG.info(g.format_cluster_status(cluster))
# schedule execution pending job for cluster
for je in conductor.job_execution_get_by_cluster(ctx, cluster.id):
jm.run_job(ctx, je)
def terminate_cluster(id):
ctx = context.ctx()

View File

@ -41,13 +41,34 @@ def delete_job(id):
conductor.job_destroy(context.ctx(), id)
def execute_job(job_id, input_id, output_id, cluster_id):
def execute_job(job_id, input_id, output_id, cluster_id, data):
job_ex_dict = {'input_id': input_id, 'output_id': output_id,
'job_id': job_id, 'cluster_id': cluster_id}
'job_id': job_id, 'cluster_id': cluster_id,
'info': {'status': 'Pending'}}
job_execution = conductor.job_execution_create(context.ctx(), job_ex_dict)
return manager.run_job(context.ctx(), job_execution)
def get_job_execution_status(id):
return manager.get_job_status(id)
def job_execution_list():
return conductor.job_execution_get_all(context.ctx())
def get_job_execution(id):
return conductor.job_execution_get(context.ctx(), id)
def cancel_job_execution(id):
return manager.cancel_job(id)
def delete_job_execution(id):
conductor.job_execution_destroy(context.ctx(), id)
def get_data_sources():
return conductor.data_source_get_all(context.ctx())

View File

@ -44,10 +44,42 @@ main_res_names = {'Pig': 'script.pig',
'Jar': 'main.jar'}
def get_job_status(job_execution_id):
ctx = context.ctx()
job_execution = conductor.job_execution_get(ctx, job_execution_id)
cluster = conductor.cluster_get(ctx, job_execution.cluster_id)
if cluster.status != 'Active':
return job_execution.status
client = o.OozieClient(cluster['info']['JobFlow']['Oozie'] + "/oozie/")
job_info = client.get_job_status(job_execution.oozie_job_id)
job_execution = conductor.job_execution_update(ctx, job_execution,
{"info": job_info})
return job_execution
def cancel_job(job_execution_id):
ctx = context.ctx()
job_execution = conductor.job_execution_get(ctx, job_execution_id)
cluster = conductor.cluster_get(ctx, job_execution.cluster_id)
client = o.OozieClient(cluster['info']['JobFlow']['Oozie'] + "/oozie/")
client.kill_job(job_execution.oozie_job_id)
job_info = client.get_job_status(job_execution.oozie_job_id)
update = {"info": job_info,
"end_time": datetime.datetime.now()}
job_execution = conductor.job_execution_update(ctx, job_execution,
update)
return job_execution
def run_job(ctx, job_execution):
cluster = conductor.cluster_get(ctx, job_execution.cluster_id)
if cluster.status != 'Active':
return job_execution.status
return job_execution
job = conductor.job_get(ctx, job_execution.job_id)
job_origin = conductor.job_origin_get(context.ctx(), job.job_origin_id)

View File

@ -22,7 +22,7 @@ class PigWorkflowCreator(base_workflow.OozieWorkflowCreator):
def __init__(self):
super(PigWorkflowCreator, self).__init__('pig')
def build_workflow_xml(self, script, prepare={},
def build_workflow_xml(self, script_name, prepare={},
job_xml=None, configuration=None, params={},
arguments={}, files=[], archives=[]):
@ -33,7 +33,7 @@ class PigWorkflowCreator(base_workflow.OozieWorkflowCreator):
self._add_configuration_elements(configuration)
x.add_text_element_to_tag(self.doc, self.tag_name,
'script', script)
'script', script_name)
x.add_equal_separated_dict(self.doc, self.tag_name, 'param', params)
x.add_equal_separated_dict(self.doc, self.tag_name, 'argument',

View File

@ -364,6 +364,12 @@ def _rollback_cluster_scaling(cluster, instances, ex):
_shutdown_instance(i)
def _clean_job_executions(cluster):
ctx = context.ctx()
for je in conductor.job_execution_get_by_cluster(ctx, cluster.id):
conductor.job_execution_update(ctx, je, {"cluster_id": None})
def _shutdown_instances(cluster):
for node_group in cluster.node_groups:
for instance in node_group.instances:
@ -387,6 +393,7 @@ def shutdown_cluster(cluster):
volumes.detach(cluster)
finally:
_shutdown_instances(cluster)
_clean_job_executions(cluster)
def clean_cluster_from_empty_ng(cluster):

View File

@ -14,6 +14,7 @@
# limitations under the License.
import copy
import datetime
from savanna import context
import savanna.tests.unit.conductor.base as test_base
@ -58,6 +59,7 @@ SAMPLE_JOB_EXECUTION = {
"job_id": "undefined",
"input_id": "undefined",
"output_id": "undefined",
"start_time": datetime.datetime.now(),
"cluster_id": None
}
@ -194,6 +196,8 @@ class JobExecutionTest(test_base.ConductorManagerTestCase):
self.api.job_execution_update(ctx, job_ex_id, {'progress': '0.2'})
updated_job = self.api.job_execution_get(ctx, job_ex_id)
self.assertEqual(updated_job['progress'], 0.2)
self.assertEqual(updated_job['start_time'],
SAMPLE_JOB_EXECUTION['start_time'])
self.api.job_execution_destroy(ctx, job_ex_id)