Merge "Adding option for Java jobs to be adapted to Oozie"

This commit is contained in:
Jenkins 2015-09-03 11:38:55 +00:00 committed by Gerrit Code Review
commit 51647a585a
4 changed files with 71 additions and 2 deletions

View File

@ -169,6 +169,7 @@
}
if (job_type != "Java") {
$("[name=hbase_common_lib]").closest(".form-group").hide();
$("[name=adapt_oozie]").closest(".form-group").hide();
}
}

View File

@ -83,3 +83,54 @@ class DataProcessingJobTests(test.TestCase):
self.assertNoFormErrors(res)
self.assertRedirectsNoFollow(res, INDEX_URL)
self.assertMessageCount(success=1)
@test.create_stubs({api.sahara: ('job_execution_create',
'job_get',
'job_get_configs',
'job_list',
'cluster_list',
'data_source_list')})
def test_launch(self):
job = self.jobs.first()
job_execution = self.job_executions.first()
cluster = self.clusters.first()
input_ds = self.data_sources.first()
output_ds = self.data_sources.first()
api.sahara.job_get(IsA(http.HttpRequest), IsA(unicode)) \
.AndReturn(job)
api.sahara.job_get_configs(IsA(http.HttpRequest), job.type) \
.AndReturn(job)
api.sahara.cluster_list(IsA(http.HttpRequest)) \
.AndReturn(self.clusters.list())
api.sahara.data_source_list(IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(self.data_sources.list())
api.sahara.job_list(IsA(http.HttpRequest)) \
.AndReturn(self.jobs.list())
api.sahara.job_execution_create(IsA(http.HttpRequest),
IsA(unicode),
IsA(unicode),
IsA(unicode),
IsA(unicode),
IsA(dict)).AndReturn(job_execution)
self.mox.ReplayAll()
url = reverse('horizon:project:data_processing.jobs:launch-job')
form_data = {
'job': self.jobs.first().id,
'cluster': cluster.id,
'job_input': input_ds.id,
'job_output': output_ds.id,
'config': {},
'adapt_oozie': 'on',
'hbase_common_lib': 'on',
'java_opts': '',
'job_args_array': [[], []],
'job_configs': [{}, {}],
'job_params': [{}, {}],
'job_type': 'Pig',
'streaming_mapper': '',
'streaming_reducer': ''
}
res = self.client.post(url, form_data)
self.assertNoFormErrors(res)

View File

@ -134,6 +134,7 @@ class JobConfigAction(workflows.Action):
EDP_REDUCER = "edp.streaming.reducer"
EDP_PREFIX = "edp."
EDP_HBASE_COMMON_LIB = "edp.hbase_common_lib"
EDP_ADAPT_FOR_OOZIE = "edp.java.adapt_for_oozie"
property_name = forms.ChoiceField(
required=False,
@ -170,6 +171,13 @@ class JobConfigAction(workflows.Action):
help_text=_("Run HBase EDP Jobs with common HBase library on HDFS"),
required=False, initial=True)
adapt_oozie = forms.BooleanField(
label=_("Adapt For Oozie"),
help_text=_("Automatically modify the Hadoop configuration"
" so that job config values are set and so that"
" Oozie will handle exit codes correctly."),
required=False, initial=True)
def __init__(self, request, *args, **kwargs):
super(JobConfigAction, self).__init__(request, *args, **kwargs)
job_ex_id = request.REQUEST.get("job_execution_id")
@ -209,6 +217,9 @@ class JobConfigAction(workflows.Action):
if self.EDP_HBASE_COMMON_LIB in edp_configs:
self.fields['hbase_common_lib'].initial = (
edp_configs[self.EDP_HBASE_COMMON_LIB])
if self.EDP_ADAPT_FOR_OOZIE in edp_configs:
self.fields['adapt_oozie'].initial = (
edp_configs[self.EDP_ADAPT_FOR_OOZIE])
def clean(self):
cleaned_data = super(workflows.Action, self).clean()
@ -243,7 +254,8 @@ class JobConfigAction(workflows.Action):
self.EDP_MAPPER,
self.EDP_REDUCER,
self.MAIN_CLASS,
self.JAVA_OPTS]:
self.JAVA_OPTS,
self.EDP_ADAPT_FOR_OOZIE, ]:
del configs[rmkey]
return (configs, edp_configs)
@ -304,6 +316,10 @@ class JobConfig(workflows.Step):
context["job_config"]["configs"][
JobConfigAction.EDP_HBASE_COMMON_LIB] = (
data.get("hbase_common_lib", True))
if job_type == "Java":
context["job_config"]["configs"][
JobConfigAction.EDP_ADAPT_FOR_OOZIE] = (
data.get("adapt_oozie", True))
elif job_type == "MapReduce.Streaming":
context["job_config"]["configs"][JobConfigAction.EDP_MAPPER] = (
data.get("streaming_mapper", ""))

View File

@ -455,7 +455,8 @@ def data(TEST):
"name": "pigjob",
"tenant_id": "429ad8447c2d47bc8e0382d244e1d1df",
"type": "Pig",
"updated_at": None
"updated_at": None,
"job_config": {"configs": {}}
}
job1 = jobs.Job(jobs.JobsManager(None), job1_dict)