First version of job manager

Change-Id: Iedd7141a7bd2f3f1fba4165ae8ebe5c4a0b86538
This commit is contained in:
Nadya Privalova 2013-08-15 17:06:36 +04:00
parent d8739a6d07
commit 28f9bae334
15 changed files with 425 additions and 32 deletions

View File

@ -14,6 +14,7 @@
# limitations under the License.
from savanna.openstack.common import log as logging
from savanna.service import api as c_api
from savanna.service.edp import api
from savanna.service import validation as v
from savanna.service.validations.edp import data_source as v_d_s
@ -54,12 +55,18 @@ def job_delete(job_id):
return u.render()
@rest.post('/jobs/<job_id>/execute/<input_id>/<output_id>')
#TODO(nprivalova): path will be updated and data will contain
# params for the job. For this purpose we
# need strong validation. Will be done in next commit
@rest.post('/jobs/<job_id>/execute/from/<input_id>/to/<output_id>/on/'
'<cluster_id>')
@v.check_exists(api.get_job, id='job_id')
@v.check_exists(api.get_data_source, id='input_id')
@v.check_exists(api.get_data_source, id='output_id')
def job_execute(job_id, input_id, output_id):
return u.render(jobs=api.execute_job(job_id, input_id, output_id))
@v.check_exists(c_api.get_cluster, 'cluster_id')
def job_execute(job_id, input_id, output_id, cluster_id, data):
return u.render(job_execution=api.execute_job(job_id, input_id, output_id,
cluster_id).to_dict())
@rest.get('/data-sources')

View File

@ -246,8 +246,9 @@ class LocalApi(object):
def job_execution_update(self, context, job_execution, values):
"""Update the JobExecution or raise if it does not exist."""
self._manager.job_execution_update(context, _get_id(job_execution),
values)
return self._manager.job_execution_update(context,
_get_id(job_execution),
values)
def job_execution_destroy(self, context, job_execution):
"""Destroy the JobExecution or raise if it does not exist."""
@ -272,7 +273,8 @@ class LocalApi(object):
def job_origin_update(self, context, job_origin, values):
"""Update the JobOrigin or raise if it does not exist."""
self._manager.job_origin_update(context, _get_id(job_origin), values)
return self._manager.job_origin_update(context, _get_id(job_origin),
values)
def job_origin_destroy(self, context, job_origin):
"""Destroy the JobOrigin or raise if it does not exist."""

View File

@ -282,6 +282,7 @@ class ConductorManager(db_base.Base):
def job_execution_create(self, context, values):
"""Create a JobExecution from the values dictionary."""
values['tenant_id'] = context.tenant_id
return self.db.job_execution_create(context, values)
def job_execution_update(self, context, job_execution, values):
@ -310,7 +311,7 @@ class ConductorManager(db_base.Base):
def job_origin_update(self, context, job_origin, values):
"""Updates a JobOrigin from the values dictionary."""
self.db.job_origin_update(context, job_origin, values)
return self.db.job_origin_update(context, job_origin, values)
def job_origin_destroy(self, context, job_origin):
"""Destroy the JobOrigin or raise if it does not exist."""

View File

@ -321,7 +321,7 @@ def job_origin_create(context, values):
def job_origin_update(context, job_origin, values):
"""Update a JobOrigin from the values dictionary."""
IMPL.job_origin_update(context, job_origin, values)
return IMPL.job_origin_update(context, job_origin, values)
def job_origin_destroy(context, job_origin):

View File

@ -493,7 +493,9 @@ def job_execution_update(context, job_execution, values):
# raise not found error
raise RuntimeError("JobExecution not found!")
job_ex.update(values)
job_ex.save()
job_ex.save(session=session)
return _job_execution_get(context, job_execution)
def job_execution_destroy(context, job_execution_id):
@ -547,6 +549,7 @@ def job_origin_update(context, job_origin, values):
raise RuntimeError("JobOrigin not found!")
job_origin.update(values)
job_origin.save()
return _job_origin_get(context, session, job_origin)
def job_origin_destroy(context, job_origin_id):

View File

@ -18,6 +18,8 @@ from savanna import conductor as c
from savanna import context
from savanna.openstack.common import log as logging
from savanna.service.edp import job_manager as manager
conductor = c.API
LOG = logging.getLogger(__name__)
@ -39,8 +41,11 @@ def delete_job(id):
conductor.job_destroy(context.ctx(), id)
def execute_job(job_id, input_id, output_id):
pass
def execute_job(job_id, input_id, output_id, cluster_id):
job_ex_dict = {'input_id': input_id, 'output_id': output_id,
'job_id': job_id, 'cluster_id': cluster_id}
job_execution = conductor.job_execution_create(context.ctx(), job_ex_dict)
return manager.run_job(context.ctx(), job_execution)
def get_data_sources():

View File

@ -0,0 +1,33 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def put_file_to_hdfs(r, file, file_name, path):
r.write_file_to('/tmp/%s' % file_name, file)
move_from_local(r, '/tmp/%s' % file_name, path + '/' + file_name)
def copy_from_local(r, source, target):
r.execute_command('sudo su - -c "hadoop dfs -copyFromLocal '
'%s %s" hadoop' % (source, target))
def move_from_local(r, source, target):
r.execute_command('sudo su - -c "hadoop dfs -moveFromLocal '
'%s %s" hadoop' % (source, target))
def create_dir(r, dir_name):
r.execute_command('sudo su - -c "hadoop dfs -mkdir %s" hadoop' % dir_name)

View File

@ -0,0 +1,162 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
from oslo.config import cfg
from savanna import conductor as c
from savanna import context
from savanna.openstack.common import uuidutils
from savanna.plugins.general import utils as u
from savanna.service.edp import hdfs_helper as h
from savanna.service.edp import oozie as o
from savanna.service.edp.workflow_creator import mapreduce_workflow as mr_flow
from savanna.service.edp.workflow_creator import pig_workflow as pig_flow
from savanna.utils import remote
from savanna.utils import xmlutils as x
opts = [
cfg.StrOpt('job_workflow_postfix',
default='',
help='Postfix for storing jobs in hdfs. Will be '
'added to /user/hadoop/')
]
CONF = cfg.CONF
CONF.register_opts(opts)
conductor = c.API
main_res_names = {'Pig': 'script.pig',
'Jar': 'main.jar'}
def run_job(ctx, job_execution):
cluster = conductor.cluster_get(ctx, job_execution.cluster_id)
if cluster.status != 'Active':
return job_execution.status
job = conductor.job_get(ctx, job_execution.job_id)
job_origin = conductor.job_origin_get(context.ctx(), job.job_origin_id)
input_source = conductor.data_source_get(ctx, job_execution.input_id)
output_source = conductor.data_source_get(ctx, job_execution.output_id)
#TODO(nprivalova): should be removed after all features implemented
validate(input_source, output_source, job)
wf_dir = create_workflow_dir(u.get_jobtracker(cluster), job)
upload_job_file(u.get_jobtracker(cluster), wf_dir, job_origin, job)
wf_xml = build_workflow_for_job(job.type, input_source,
output_source)
path_to_workflow = upload_workflow_file(u.get_jobtracker(cluster),
wf_dir, wf_xml)
jt_path = '%s:8021' % u.get_jobtracker(cluster).hostname
nn_path = 'hdfs://%s:8020' % u.get_namenode(cluster).hostname
client = o.OozieClient(cluster['info']['JobFlow']['Oozie'] + "/oozie/")
job_parameters = {"jobTracker": jt_path,
"nameNode": nn_path,
"user.name": "hadoop",
"oozie.wf.application.path":
"%s%s" % (nn_path, path_to_workflow),
"oozie.use.system.libpath": "true"}
oozie_job_id = client.add_job(x.create_hadoop_xml(job_parameters))
client.run_job(oozie_job_id)
job_execution = conductor.job_execution_update(ctx, job_execution,
{'oozie_job_id':
oozie_job_id,
'start_time':
datetime.datetime.now()})
return job_execution
def upload_job_file(where, job_dir, job_origin, job):
main_binary = conductor.job_binary_get_raw_data(context.ctx(),
job_origin.url)
if job.type == 'Jar':
job_dir += '/lib'
with remote.get_remote(where) as r:
h.put_file_to_hdfs(r, main_binary, main_res_names[job.type], job_dir)
return "%s/%s" % (job_dir, main_res_names[job.type])
def upload_workflow_file(where, job_dir, wf_xml):
with remote.get_remote(where) as r:
h.put_file_to_hdfs(r, wf_xml, "workflow.xml", job_dir)
return "%s/workflow.xml" % job_dir
def create_workflow_dir(where, job):
constructed_dir = '/user/hadoop/'
constructed_dir = _add_postfix(constructed_dir)
constructed_dir += '%s/%s' % (job.name, uuidutils.generate_uuid())
with remote.get_remote(where) as r:
h.create_dir(r, constructed_dir)
return constructed_dir
def build_workflow_for_job(job_type, input_data, output_data, data=None):
configs = {'fs.swift.service.savanna.username':
input_data.credentials['user'],
'fs.swift.service.savanna.password':
input_data.credentials['password']}
if job_type == 'Pig':
creator = pig_flow.PigWorkflowCreator()
creator.build_workflow_xml(main_res_names['Pig'],
configuration=configs,
params={'INPUT': input_data.url,
'OUTPUT': output_data.url})
if job_type == 'Jar':
creator = mr_flow.MapReduceWorkFlowCreator()
if data and data.get('configs'):
for k, v in data['configs'].items():
configs[k] = v
configs['mapred.input.dir'] = input_data.url
configs['mapred.output.dir'] = output_data.url
creator.build_workflow_xml(configuration=configs)
return creator.get_built_workflow_xml()
def _add_postfix(constructed_dir):
constructed_dir = _append_slash_if_needed(constructed_dir)
if CONF.job_workflow_postfix:
constructed_dir = ''.join([str(constructed_dir),
str(CONF.job_workflow_postfix)])
return _append_slash_if_needed(constructed_dir)
def _append_slash_if_needed(path):
if path[-1] != '/':
path += '/'
return path
#TODO(nprivalova): this validation should be removed after implementing
# all features
def validate(input_data, output_data, job):
if input_data.type != 'swift' or output_data.type != 'swift':
raise RuntimeError
if job.type not in ['Pig', 'Jar']:
raise RuntimeError

View File

@ -1,8 +1,6 @@
<workflow-app xmlns="uri:oozie:workflow:0.2" name="job-wf">
<start to="job-node"/>
<action name="job-node">
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>

View File

@ -28,13 +28,18 @@ class OozieWorkflowCreator(object):
def __init__(self, name):
self.doc = x.load_xml_document("service/edp/resources/workflow.xml")
self.tag_name = name
x.add_child(self.doc, 'action', self.tag_name)
def _add_jobtracker_namenode_elements(self, job_tracker, name_node):
ok_elem = xml.parseString('<%s to="%s"/>' % ("ok", "end"))
x.add_element(self.doc, 'action', ok_elem.firstChild)
error_elem = xml.parseString('<%s to="%s"/>' % ("error", "fail"))
x.add_element(self.doc, 'action', error_elem.firstChild)
x.add_text_element_to_tag(self.doc, self.tag_name,
'job-tracker', job_tracker)
'job-tracker', "${jobTracker}")
x.add_text_element_to_tag(self.doc, self.tag_name,
'name-node', name_node)
'name-node', "${nameNode}")
def _add_to_prepare_element(self, element, paths):
if element not in ['delete', 'mkdir']:

View File

@ -21,12 +21,10 @@ class MapReduceWorkFlowCreator(base_workflow.OozieWorkflowCreator):
def __init__(self):
super(MapReduceWorkFlowCreator, self).__init__('map-reduce')
def build_workflow_xml(self, job_tracker, name_node, prepare={},
def build_workflow_xml(self, prepare={},
job_xml=None, configuration=None,
files=[], archives=[]):
self._add_jobtracker_namenode_elements(job_tracker, name_node)
for k, v in prepare.items():
self._add_to_prepare_element(k, v)

View File

@ -22,12 +22,10 @@ class PigWorkflowCreator(base_workflow.OozieWorkflowCreator):
def __init__(self):
super(PigWorkflowCreator, self).__init__('pig')
def build_workflow_xml(self, job_tracker, name_node, script, prepare={},
def build_workflow_xml(self, script, prepare={},
job_xml=None, configuration=None, params={},
arguments={}, files=[], archives=[]):
self._add_jobtracker_namenode_elements(job_tracker, name_node)
for k, v in prepare.items():
self._add_to_prepare_element(k, v)

View File

@ -0,0 +1,179 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from savanna import conductor as cond
from savanna.conductor import resource as r
from savanna.service.edp import job_manager
from savanna.tests.unit import base as models_test_base
from savanna.utils import patches as p
conductor = cond.API
def _resource_passthrough(*args, **kwargs):
return True
class TestJobManager(models_test_base.DbTestCase):
def setUp(self):
r.Resource._is_passthrough_type = _resource_passthrough
p.patch_minidom_writexml()
super(TestJobManager, self).setUp()
@mock.patch('savanna.utils.remote.get_remote')
@mock.patch('savanna.service.edp.hdfs_helper.create_dir')
@mock.patch('savanna.utils.remote.InstanceInteropHelper')
def test_create_job_dir(self, remote_class, helper, remote):
remote_class.__exit__.return_value = 'closed'
remote.return_value = remote_class
helper.return_value = 'ok'
job = _create_all_stack('Pig')[0]
res = job_manager.create_workflow_dir(mock.Mock(), job)
self.assertIn('/user/hadoop/special_name/', res)
remote.reset_mock()
remote_class.reset_mock()
helper.reset_mock()
@mock.patch('savanna.utils.remote.get_remote')
@mock.patch('savanna.service.edp.hdfs_helper.put_file_to_hdfs')
@mock.patch('savanna.utils.remote.InstanceInteropHelper')
@mock.patch('savanna.conductor.API.job_binary_get_raw_data')
def test_upload_job_file(self, conductor_raw_data, remote_class,
helper, remote):
remote_class.__exit__.return_value = 'closed'
remote.return_value = remote_class
helper.return_value = 'ok'
conductor_raw_data.return_value = 'ok'
job, job_origin = _create_all_stack('Pig')
res = job_manager.upload_job_file(mock.Mock(), 'job_prefix',
job_origin, job)
self.assertEqual('job_prefix/script.pig', res)
job, job_origin = _create_all_stack('Jar')
res = job_manager.upload_job_file(mock.Mock(), 'job_prefix',
job_origin, job)
self.assertEqual('job_prefix/lib/main.jar', res)
remote.reset_mock()
remote_class.reset_mock()
helper.reset_mock()
@mock.patch('oslo.config.cfg.CONF.job_workflow_postfix')
def test_add_postfix(self, conf):
conf.__str__.return_value = 'caba'
res = job_manager._add_postfix('aba')
self.assertEqual("aba/caba/", res)
conf.__str__.return_value = ''
res = job_manager._add_postfix('aba')
self.assertEqual("aba/", res)
conf.reset_mock()
def test_build_workflow_for_job_pig(self):
input_data = _create_data_source('swift://ex.savanna/i')
output_data = _create_data_source('swift://ex.savanna/o')
res = job_manager.build_workflow_for_job('Pig', input_data,
output_data)
self.assertIn("""
<param>INPUT=swift://ex.savanna/i</param>
<param>OUTPUT=swift://ex.savanna/o</param>""", res)
self.assertIn("""
<configuration>
<property>
<name>fs.swift.service.savanna.password</name>
<value>admin1</value>
</property>
<property>
<name>fs.swift.service.savanna.username</name>
<value>admin</value>
</property>
</configuration>""", res)
self.assertIn("<script>script.pig</script>", res)
def test_build_workflow_for_job_jar(self):
input_data = _create_data_source('swift://ex.savanna/i')
output_data = _create_data_source('swift://ex.savanna/o')
res = job_manager.build_workflow_for_job('Jar', input_data,
output_data)
self.assertIn("""
<configuration>
<property>
<name>mapred.output.dir</name>
<value>swift://ex.savanna/o</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>swift://ex.savanna/i</value>
</property>
<property>
<name>fs.swift.service.savanna.password</name>
<value>admin1</value>
</property>
<property>
<name>fs.swift.service.savanna.username</name>
<value>admin</value>
</property>
</configuration>""", res)
def _create_all_stack(type):
b = _create_job_binary('1')
o = _create_job_origin('2', b.id)
j = _create_job('3', o.id, type)
return j, o
def _create_job(id, origin_id, type):
job = mock.Mock()
job.id = id
job.job_origin_id = origin_id
job.type = type
job.name = 'special_name'
return job
def _create_job_origin(id, binary_id):
origin = mock.Mock()
origin.id = id
origin.url = binary_id
return origin
def _create_job_binary(id):
binary = mock.Mock()
binary.id = id
return binary
def _create_data_source(url):
data_source = mock.Mock()
data_source.url = url
data_source.credentials = {'user': 'admin',
'password': 'admin1'}
return data_source

View File

@ -24,8 +24,6 @@ class TestPigWorkflowCreator(unittest2.TestCase):
def setUp(self):
p.patch_minidom_writexml()
self.job_tracker = 'job-tracker-host:8021'
self.name_node = 'hdfs://name-node-host:8020'
self.prepare = {'delete': ['delete_dir_1', 'delete_dir_2'],
'mkdir': ['mkdir_1']}
self.job_xml = 'job_xml.xml'
@ -36,14 +34,13 @@ class TestPigWorkflowCreator(unittest2.TestCase):
def test_create_mapreduce_workflow(self):
mr_workflow = mrw.MapReduceWorkFlowCreator()
mr_workflow.build_workflow_xml(self.job_tracker, self.name_node,
self.prepare, self.job_xml,
mr_workflow.build_workflow_xml(self.prepare, self.job_xml,
self.configuration, self.files,
self.archives)
res = mr_workflow.get_built_workflow_xml()
mr_action = """ <map-reduce>
<job-tracker>job-tracker-host:8021</job-tracker>
<name-node>hdfs://name-node-host:8020</name-node>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<mkdir path="mkdir_1"/>
<delete path="delete_dir_1"/>
@ -74,15 +71,14 @@ class TestPigWorkflowCreator(unittest2.TestCase):
param_dict = {'param1': 'param_value1'}
arg_dict = {'arg1': 'arg_value1', 'arg2': 'arg_value2'}
pig_workflow.build_workflow_xml(self.job_tracker, self.name_node,
pig_script, self.prepare,
pig_workflow.build_workflow_xml(pig_script, self.prepare,
self.job_xml, self.configuration,
param_dict, arg_dict,
self.files, self.archives)
res = pig_workflow.get_built_workflow_xml()
pig_action = """ <pig>
<job-tracker>job-tracker-host:8021</job-tracker>
<name-node>hdfs://name-node-host:8020</name-node>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<mkdir path="mkdir_1"/>
<delete path="delete_dir_1"/>

View File

@ -95,6 +95,12 @@ def add_child(doc, parent, tag_to_add):
return actions[0].lastChild
def add_element(doc, parent, element):
actions = doc.getElementsByTagName(parent)
actions[0].appendChild(element)
return actions[0].lastChild
def get_and_create_if_not_exist(doc, parent, element):
prop = doc.getElementsByTagName(element)
if len(prop) != 0: