Refactoring job execution flow

Done:
* Job is removed
* Job.type moved to JobOrigin
* JobOrigin renamed to Job

Change-Id: I398e1cfad0ff691e3df8858992540a9de9e34927
This commit is contained in:
Nadya Privalova 2013-09-24 13:23:52 +04:00
parent 9a928730ae
commit 363feae05b
16 changed files with 231 additions and 595 deletions

View File

@ -21,7 +21,6 @@ from savanna.service.validations.edp import job as v_j
from savanna.service.validations.edp import job_binary as v_j_b
from savanna.service.validations.edp import job_binary_internal as v_j_b_i
from savanna.service.validations.edp import job_executor as v_j_e
from savanna.service.validations.edp import job_origin as v_j_o
import savanna.utils.api as u
@ -32,31 +31,6 @@ rest = u.Rest('v11', __name__)
## EDP ops
@rest.get('/jobs')
def jobs_list():
return u.render(
jobs=[j.to_dict() for j in api.get_jobs()])
@rest.post('/jobs')
@v.validate(v_j.JOB_SCHEMA, v_j.check_job_create)
def job_create(data):
return u.render(api.create_job(data).to_wrapped_dict())
@rest.get('/jobs/<job_id>')
@v.check_exists(api.get_job, id='job_id')
def job_get(job_id):
return u.render(api.get_job(job_id).to_wrapped_dict())
@rest.delete('/jobs/<job_id>')
@v.check_exists(api.get_job, id='job_id')
def job_delete(job_id):
api.delete_job(job_id)
return u.render()
@rest.post('/jobs/<job_id>/execute')
@v.check_exists(api.get_job, id='job_id')
@v.validate(v_j_e.JOB_EXEC_SCHEMA, v_j_e.check_job_executor)
@ -129,27 +103,27 @@ def data_source_delete(data_source_id):
return u.render()
@rest.get('/job-origins')
def job_origin_list():
return u.render(job_origins=[j.to_dict() for j in api.get_job_origins()])
@rest.get('/jobs')
def job_list():
return u.render(jobs=[j.to_dict() for j in api.get_jobs()])
@rest.post('/job-origins')
@v.validate(v_j_o.JOB_ORIGIN_SCHEMA, v_j_o.check_mains_libs)
def job_origin_create(data):
return u.render(api.create_job_origin(data).to_wrapped_dict())
@rest.post('/jobs')
@v.validate(v_j.JOB_SCHEMA, v_j.check_mains_libs)
def job_create(data):
return u.render(api.create_job(data).to_wrapped_dict())
@rest.get('/job-origins/<job_origin_id>')
@v.check_exists(api.get_job_origin, id='job_origin_id')
def job_origin_get(job_origin_id):
return u.render(api.get_job_origin(job_origin_id).to_wrapped_dict())
@rest.get('/jobs/<job_id>')
@v.check_exists(api.get_job, id='job_id')
def job_get(job_id):
return u.render(api.get_job(job_id).to_wrapped_dict())
@rest.delete('/job-origins/<job_origin_id>')
@v.check_exists(api.get_job_origin, id='job_origin_id')
def job_origin_delete(job_origin_id):
api.delete_job_origin(job_origin_id)
@rest.delete('/jobs/<job_id>')
@v.check_exists(api.get_job, id='job_id')
def job_delete(job_id):
api.delete_job(job_id)
return u.render()

View File

@ -217,27 +217,6 @@ class LocalApi(object):
"""Destroy the Data Source or raise if it does not exist."""
self._manager.data_source_destroy(context, _get_id(data_source))
## Job ops
@r.wrap(r.Job)
def job_get(self, context, job):
"""Return the Job or None if it does not exist."""
return self._manager.job_get(context, _get_id(job))
@r.wrap(r.Job)
def job_get_all(self, context):
"""Get all Jobs."""
return self._manager.job_get_all(context)
@r.wrap(r.Job)
def job_create(self, context, values):
"""Create a Job from the values dictionary."""
return self._manager.job_create(context, values)
def job_destroy(self, context, job):
"""Destroy the Job or raise if it does not exist."""
self._manager.job_destroy(context, _get_id(job))
## JobExecution ops
@r.wrap(r.JobExecution)
@ -275,43 +254,43 @@ class LocalApi(object):
"""Destroy the JobExecution or raise if it does not exist."""
self._manager.job_execution_destroy(context, _get_id(job_execution))
## JobOrigin ops
## Job ops
@r.wrap(r.JobOrigin)
def job_origin_get(self, context, job_origin):
"""Return the JobOrigin or None if it does not exist."""
return self._manager.job_origin_get(context, _get_id(job_origin))
@r.wrap(r.Job)
def job_get(self, context, job):
"""Return the Job or None if it does not exist."""
return self._manager.job_get(context, _get_id(job))
@r.wrap(r.JobOrigin)
def job_origin_get_all(self, context):
"""Get all JobOrigins."""
return self._manager.job_origin_get_all(context)
@r.wrap(r.Job)
def job_get_all(self, context):
"""Get all Jobs."""
return self._manager.job_get_all(context)
@r.wrap(r.JobOrigin)
def job_origin_create(self, context, values):
"""Create a JobOrigin from the values dictionary."""
return self._manager.job_origin_create(context, values)
@r.wrap(r.Job)
def job_create(self, context, values):
"""Create a Job from the values dictionary."""
return self._manager.job_create(context, values)
def job_origin_update(self, context, job_origin, values):
"""Update the JobOrigin or raise if it does not exist."""
return self._manager.job_origin_update(context, _get_id(job_origin),
values)
def job_update(self, context, job, values):
"""Update the Job or raise if it does not exist."""
return self._manager.job_update(context, _get_id(job),
values)
def job_origin_destroy(self, context, job_origin):
"""Destroy the JobOrigin or raise if it does not exist."""
self._manager.job_origin_destroy(context, _get_id(job_origin))
def job_destroy(self, context, job):
"""Destroy the Job or raise if it does not exist."""
self._manager.job_destroy(context, _get_id(job))
def job_origin_main_name(self, context, job_origin):
def job_main_name(self, context, job):
"""Return the name of the first main JobBinary or None
At present the 'mains' element is expected to contain a single element.
In the future if 'mains' contains more than one element we will need
a scheme or convention for retrieving a name from the list of binaries.
:param job_origin: This is expected to be a JobOrigin object
:param job: This is expected to be a Job object
"""
if job_origin.mains:
binary = self.job_binary_get(context, job_origin.mains[0])
if job.mains:
binary = self.job_binary_get(context, job.mains[0])
if binary is not None:
return binary["name"]
return None

View File

@ -267,26 +267,6 @@ class ConductorManager(db_base.Base):
"""Destroy the Data Source or raise if it does not exist."""
return self.db.data_source_destroy(context, data_source)
##Job ops
def job_get(self, context, job):
"""Return the Job or None if it does not exist."""
return self.db.job_get(context, job)
def job_get_all(self, context):
"""Get all Jobs."""
return self.db.job_get_all(context)
def job_create(self, context, values):
"""Create a Job from the values dictionary."""
values = copy.deepcopy(values)
values['tenant_id'] = context.tenant_id
return self.db.job_create(context, values)
def job_destroy(self, context, job):
"""Destroy the Job or raise if it does not exist."""
return self.db.job_destroy(context, job)
##JobExecution ops
def job_execution_get(self, context, job_execution):
@ -309,11 +289,6 @@ class ConductorManager(db_base.Base):
"""Create a JobExecution from the values dictionary."""
values = copy.deepcopy(values)
values['tenant_id'] = context.tenant_id
job = self.job_get(context, values['job_id'])
configs = job['job_configs']
if configs:
configs.update(values.get('job_configs', {}))
values['job_configs'] = configs
return self.db.job_execution_create(context, values)
def job_execution_update(self, context, job_execution, values):
@ -324,29 +299,29 @@ class ConductorManager(db_base.Base):
"""Destroy the JobExecution or raise if it does not exist."""
return self.db.job_execution_destroy(context, job_execution)
## JobOrigin ops
## Job ops
def job_origin_get(self, context, job_origin):
"""Return the JobOrigin or None if it does not exist."""
return self.db.job_origin_get(context, job_origin)
def job_get(self, context, job):
"""Return the Job or None if it does not exist."""
return self.db.job_get(context, job)
def job_origin_get_all(self, context):
"""Get all JobOrigins."""
return self.db.job_origin_get_all(context)
def job_get_all(self, context):
"""Get all Jobs."""
return self.db.job_get_all(context)
def job_origin_create(self, context, values):
"""Create a JobOrigin from the values dictionary."""
def job_create(self, context, values):
"""Create a Job from the values dictionary."""
values = copy.deepcopy(values)
values['tenant_id'] = context.tenant_id
return self.db.job_origin_create(context, values)
return self.db.job_create(context, values)
def job_origin_update(self, context, job_origin, values):
"""Updates a JobOrigin from the values dictionary."""
return self.db.job_origin_update(context, job_origin, values)
def job_update(self, context, job, values):
"""Updates a Job from the values dictionary."""
return self.db.job_update(context, job, values)
def job_origin_destroy(self, context, job_origin):
"""Destroy the JobOrigin or raise if it does not exist."""
self.db.job_origin_destroy(context, job_origin)
def job_destroy(self, context, job):
"""Destroy the Job or raise if it does not exist."""
self.db.job_destroy(context, job)
## JobBinary ops

View File

@ -187,20 +187,6 @@ class DataSource(object):
"""
class Job(object):
"""An object representing Job
id
tenant_id
name
description
type
job_origin_id
input_type
output_type
"""
class JobExecution(object):
"""An object representing JobExecution
@ -219,13 +205,14 @@ class JobExecution(object):
"""
class JobOrigin(object):
"""An object representing JobOrigin
class Job(object):
"""An object representing Job
id
tenant_id
name
description
type
mains
libs
"""

View File

@ -209,16 +209,12 @@ class DataSource(Resource, objects.DataSource):
_resource_name = "data_source"
class Job(Resource, objects.Job):
_resource_name = "job"
class JobExecution(Resource, objects.JobExecution):
_resource_name = "job_execution"
class JobOrigin(Resource, objects.JobOrigin):
_resource_name = "job_origin"
class Job(Resource, objects.Job):
_resource_name = "job"
class JobBinary(Resource, objects.JobBinary):

View File

@ -258,31 +258,6 @@ def data_source_destroy(context, data_source):
IMPL.data_source_destroy(context, data_source)
## Jobs ops
@to_dict
def job_get(context, job):
"""Return the Job or None if it does not exist."""
return IMPL.job_get(context, job)
@to_dict
def job_get_all(context):
"""Get all Jobs."""
return IMPL.job_get_all(context)
@to_dict
def job_create(context, values):
"""Create a Job from the values dictionary."""
return IMPL.job_create(context, values)
def job_destroy(context, job):
"""Destroy the Job or raise if it does not exist."""
IMPL.job_destroy(context, job)
## JobExecutions ops
@to_dict
@ -323,34 +298,34 @@ def job_execution_destroy(context, job_execution):
IMPL.job_execution_destroy(context, job_execution)
## JobOrigin ops
## Job ops
@to_dict
def job_origin_get(context, job_origin):
"""Return the JobOrigin or None if it does not exist."""
return IMPL.job_origin_get(context, job_origin)
def job_get(context, job):
"""Return the Job or None if it does not exist."""
return IMPL.job_get(context, job)
@to_dict
def job_origin_get_all(context):
"""Get all JobOrigins."""
return IMPL.job_origin_get_all(context)
def job_get_all(context):
"""Get all Jobs."""
return IMPL.job_get_all(context)
@to_dict
def job_origin_create(context, values):
"""Create a JobOrigin from the values dictionary."""
return IMPL.job_origin_create(context, values)
def job_create(context, values):
"""Create a Job from the values dictionary."""
return IMPL.job_create(context, values)
def job_origin_update(context, job_origin, values):
"""Update a JobOrigin from the values dictionary."""
return IMPL.job_origin_update(context, job_origin, values)
def job_update(context, job, values):
"""Update a Job from the values dictionary."""
return IMPL.job_update(context, job, values)
def job_origin_destroy(context, job_origin):
"""Destroy the JobOrigin or raise if it does not exist."""
IMPL.job_origin_destroy(context, job_origin)
def job_destroy(context, job):
"""Destroy the Job or raise if it does not exist."""
IMPL.job_destroy(context, job)
@to_dict

View File

@ -437,45 +437,6 @@ def data_source_destroy(context, data_source_id):
session.delete(data_source)
## Job ops
def _job_get(context, session, job_id):
query = model_query(m.Job, context, session)
return query.filter_by(id=job_id).first()
def job_get(context, job_id):
return _job_get(context, get_session(), job_id)
def job_get_all(context):
query = model_query(m.Job, context)
return query.all()
def job_create(context, values):
job = m.Job()
job.update(values)
try:
job.save()
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for Job: %s" % e.columns)
return job
def job_destroy(context, job_id):
session = get_session()
with session.begin():
job = _job_get(context, session, job_id)
if not job:
raise ex.NotFoundException(job_id, "Job id '%s' not found!")
session.delete(job)
## JobExecution ops
def _job_execution_get(context, session, job_execution_id):
@ -537,81 +498,81 @@ def job_execution_destroy(context, job_execution_id):
session.delete(job_ex)
## JobOrigin ops
## Job ops
def _job_origin_get(context, session, job_origin_id):
query = model_query(m.JobOrigin, context, session)
return query.filter_by(id=job_origin_id).first()
def _job_get(context, session, job_id):
query = model_query(m.Job, context, session)
return query.filter_by(id=job_id).first()
def job_origin_get(context, job_origin_id):
return _job_origin_get(context, get_session(), job_origin_id)
def job_get(context, job_id):
return _job_get(context, get_session(), job_id)
def job_origin_get_all(context):
query = model_query(m.JobOrigin, context)
def job_get_all(context):
query = model_query(m.Job, context)
return query.all()
def job_origin_create(context, values):
def job_create(context, values):
mains = values.pop("mains", [])
libs = values.pop("libs", [])
session = get_session()
with session.begin():
job_origin = m.JobOrigin()
job_origin.update(values)
job = m.Job()
job.update(values)
# libs and mains are 'lazy' objects. The initialization below
# is needed here because it provides libs and mains to be initialized
# within a session even if the lists are empty
job_origin.mains = []
job_origin.libs = []
job.mains = []
job.libs = []
try:
for main in mains:
query = model_query(m.JobBinary,
context, session).filter_by(id=main)
job_binary = query.first()
if job_binary is not None:
job_origin.mains.append(job_binary)
job.mains.append(job_binary)
for lib in libs:
query = model_query(m.JobBinary,
context, session).filter_by(id=lib)
job_binary = query.first()
if job_binary is not None:
job_origin.libs.append(job_binary)
job.libs.append(job_binary)
job_origin.save(session=session)
job.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for JobOrigin: %s"
raise ex.DBDuplicateEntry("Duplicate entry for Job: %s"
% e.columns)
return job_origin
return job
def job_origin_update(context, job_origin_id, values):
def job_update(context, job_id, values):
session = get_session()
with session.begin():
job_origin = _job_origin_get(context, session, job_origin_id)
if not job_origin:
raise ex.NotFoundException(job_origin_id,
"JobOrigin id '%s' not found!")
job_origin.update(values)
job = _job_get(context, session, job_id)
if not job:
raise ex.NotFoundException(job_id,
"Job id '%s' not found!")
job.update(values)
return job_origin
return job
def job_origin_destroy(context, job_origin_id):
def job_destroy(context, job_id):
session = get_session()
with session.begin():
job_origin = _job_origin_get(context, session, job_origin_id)
job = _job_get(context, session, job_id)
if not job_origin:
raise ex.NotFoundException(job_origin_id,
"JobOrigin id '%s' not found!")
if not job:
raise ex.NotFoundException(job_id,
"Job id '%s' not found!")
session.delete(job_origin)
session.delete(job)
## JobBinary ops

View File

@ -214,7 +214,7 @@ class TemplatesRelation(mb.SavannaBase):
floating_ip_pool = sa.Column(sa.String(36))
## EDP objects: DataSource, JobOrigin, Job, Job Execution, JobBinary
## EDP objects: DataSource, Job, Job Execution, JobBinary
class DataSource(mb.SavannaBase):
"""DataSource - represent a diffident types of data source,
@ -236,29 +236,6 @@ class DataSource(mb.SavannaBase):
credentials = sa.Column(st.JsonDictType())
class Job(mb.SavannaBase):
"""Job - represent a job object, to start job
user should provide a valid data input and output.
"""
__tablename__ = 'jobs'
__table_args__ = (
sa.UniqueConstraint('name', 'tenant_id'),
)
id = _id_column()
tenant_id = sa.Column(sa.String(36))
name = sa.Column(sa.String(80), nullable=False)
description = sa.Column(sa.Text())
type = sa.Column(sa.String(80), nullable=False)
job_origin_id = sa.Column(sa.String(36),
sa.ForeignKey('job_origins.id'))
input_type = sa.Column(sa.String(80), nullable=False)
output_type = sa.Column(sa.String(80), nullable=False)
job_configs = sa.Column(st.JsonDictType())
class JobExecution(mb.SavannaBase):
"""JobExecution - represent a job execution of specific cluster
"""
@ -285,9 +262,9 @@ class JobExecution(mb.SavannaBase):
mains_association = sa.Table("mains_association",
mb.SavannaBase.metadata,
sa.Column("JobOrigin_id",
sa.Column("Job_id",
sa.String(36),
sa.ForeignKey("job_origins.id")),
sa.ForeignKey("jobs.id")),
sa.Column("JobBinary_id",
sa.String(36),
sa.ForeignKey("job_binaries.id"))
@ -296,20 +273,20 @@ mains_association = sa.Table("mains_association",
libs_association = sa.Table("libs_association",
mb.SavannaBase.metadata,
sa.Column("JobOrigin_id",
sa.Column("Job_id",
sa.String(36),
sa.ForeignKey("job_origins.id")),
sa.ForeignKey("jobs.id")),
sa.Column("JobBinary_id",
sa.String(36),
sa.ForeignKey("job_binaries.id"))
)
class JobOrigin(mb.SavannaBase):
"""JobOrigin - description and location of a job binary
class Job(mb.SavannaBase):
"""Job - description and location of a job binary
"""
__tablename__ = 'job_origins'
__tablename__ = 'jobs'
__table_args__ = (
sa.UniqueConstraint('name', 'tenant_id'),
@ -319,6 +296,7 @@ class JobOrigin(mb.SavannaBase):
tenant_id = sa.Column(sa.String(36))
name = sa.Column(sa.String(80), nullable=False)
description = sa.Column(sa.Text())
type = sa.Column(sa.String(80), nullable=False)
mains = relationship("JobBinary",
secondary=mains_association, lazy="joined")
@ -327,7 +305,7 @@ class JobOrigin(mb.SavannaBase):
secondary=libs_association, lazy="joined")
def to_dict(self):
d = super(JobOrigin, self).to_dict()
d = super(Job, self).to_dict()
d['mains'] = [jb.to_dict() for jb in self.mains]
d['libs'] = [jb.to_dict() for jb in self.libs]
return d

View File

@ -25,22 +25,6 @@ conductor = c.API
LOG = logging.getLogger(__name__)
def get_jobs():
return conductor.job_get_all(context.ctx())
def get_job(id):
return conductor.job_get(context.ctx(), id)
def create_job(values):
return conductor.job_create(context.ctx(), values)
def delete_job(id):
conductor.job_destroy(context.ctx(), id)
def execute_job(job_id, input_id, output_id, cluster_id, configs):
job_ex_dict = {'input_id': input_id, 'output_id': output_id,
'job_id': job_id, 'cluster_id': cluster_id,
@ -85,20 +69,20 @@ def register_data_source(values):
return conductor.data_source_create(context.ctx(), values)
def get_job_origins():
return conductor.job_origin_get_all(context.ctx())
def get_jobs():
return conductor.job_get_all(context.ctx())
def get_job_origin(id):
return conductor.job_origin_get(context.ctx(), id)
def get_job(id):
return conductor.job_get(context.ctx(), id)
def create_job_origin(values):
return conductor.job_origin_create(context.ctx(), values)
def create_job(values):
return conductor.job_create(context.ctx(), values)
def delete_job_origin(job_origin_id):
return conductor.job_origin_destroy(context.ctx(), job_origin_id)
def delete_job(job_id):
return conductor.job_destroy(context.ctx(), job_id)
def create_job_binary(values):

View File

@ -99,17 +99,16 @@ def run_job(ctx, job_execution):
if cluster.status != 'Active':
return job_execution
job = conductor.job_get(ctx, job_execution.job_id)
job_origin = conductor.job_origin_get(context.ctx(), job.job_origin_id)
job = conductor.job_get(context.ctx(), job_execution.job_id)
input_source = conductor.data_source_get(ctx, job_execution.input_id)
output_source = conductor.data_source_get(ctx, job_execution.output_id)
#TODO(nprivalova): should be removed after all features implemented
validate(input_source, output_source, job)
wf_dir = create_workflow_dir(u.get_jobtracker(cluster), job)
upload_job_files(u.get_jobtracker(cluster), wf_dir, job_origin)
upload_job_files(u.get_jobtracker(cluster), wf_dir, job)
creator = workflow_factory.get_creator(job.type, job_origin)
creator = workflow_factory.get_creator(job)
# Do other job type specific setup here, for example
# uploading hive configuration
@ -143,10 +142,10 @@ def run_job(ctx, job_execution):
return job_execution
def upload_job_files(where, job_dir, job_origin):
def upload_job_files(where, job_dir, job):
mains = job_origin.mains or []
libs = job_origin.libs or []
mains = job.mains or []
libs = job.libs or []
uploaded_paths = []
with remote.get_remote(where) as r:

View File

@ -59,13 +59,13 @@ class BaseFactory(object):
class PigFactory(BaseFactory):
def __init__(self, job_origin):
def __init__(self, job):
super(PigFactory, self).__init__()
self.name = self.get_script_name(job_origin)
self.name = self.get_script_name(job)
def get_script_name(self, job_origin):
return conductor.job_origin_main_name(context.ctx(), job_origin)
def get_script_name(self, job):
return conductor.job_main_name(context.ctx(), job)
def get_workflow_xml(self, execution_configs, input_data, output_data):
configs = {'configs': self.get_configs(input_data, output_data),
@ -81,14 +81,14 @@ class PigFactory(BaseFactory):
class HiveFactory(BaseFactory):
def __init__(self, job_origin):
def __init__(self, job):
super(HiveFactory, self).__init__()
self.name = self.get_script_name(job_origin)
self.name = self.get_script_name(job)
self.job_xml = "hive-site.xml"
def get_script_name(self, job_origin):
return conductor.job_origin_main_name(context.ctx(), job_origin)
def get_script_name(self, job):
return conductor.job_main_name(context.ctx(), job)
def get_workflow_xml(self, execution_configs, input_data, output_data):
configs = {'configs': self.get_configs(input_data, output_data),
@ -125,16 +125,16 @@ class MapReduceFactory(BaseFactory):
return creator.get_built_workflow_xml()
def get_creator(job_type, job_origin):
def get_creator(job):
def make_PigFactory():
return PigFactory(job_origin)
return PigFactory(job)
def make_HiveFactory():
return HiveFactory(job_origin)
return HiveFactory(job)
type_map = {"Pig": make_PigFactory,
"Hive": make_HiveFactory,
"Jar": MapReduceFactory}
return type_map[job_type]()
return type_map[job.type]()

View File

@ -13,7 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import savanna.service.validations.edp.base as b
import savanna.exceptions as e
from savanna.service.edp import api
JOB_SCHEMA = {
"type": "object",
@ -31,27 +32,54 @@ JOB_SCHEMA = {
"enum": [
"Pig",
"Hive",
"Oozie",
"Jar",
"StreamingAPI"
],
},
"job_origin_id": {
"type": "string",
"format": "uuid",
"mains": {
"type": "array",
"minItems": 1,
"uniqueItems": True,
"items": {
"type": "string",
"minLength": 1,
}
},
"libs": {
"type": "array",
"uniqueItems": True,
"items": {
"type": "string",
"minLength": 1,
}
},
"input_type": b.data_source_type,
"output_type": b.data_source_type,
"job_configs": b.job_configs,
},
"additionalProperties": False,
"required": [
"name",
"type",
"job_origin_id"
]
}
def check_job_create(data, **kwargs):
b.check_job_unique_name(data['name'])
def _check_binaries(values):
for job_binary in values:
if not api.get_job_binary(job_binary):
raise e.NotFoundException(job_binary,
"Job binary '%s' does not exist")
def check_mains_libs(data, **kwargs):
mains = data.get("mains", [])
libs = data.get("libs", [])
# As a basic check, mains or libs has to be non-empty
if not (mains or libs):
raise e.InvalidDataException("'mains' or 'libs' must be non-empty")
# Check for overlap
if set(mains).intersection(set(libs)):
raise e.InvalidDataException("'mains' and 'libs' overlap")
# Make sure that all referenced binaries exist
_check_binaries(mains)
_check_binaries(libs)

View File

@ -1,76 +0,0 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import savanna.exceptions as e
from savanna.service.edp import api
JOB_ORIGIN_SCHEMA = {
"type": "object",
"properties": {
"name": {
"type": "string",
"minLength": 1,
"maxLength": 50
},
"description": {
"type": "string"
},
"mains": {
"type": "array",
"minItems": 1,
"uniqueItems": True,
"items": {
"type": "string",
"minLength": 1,
}
},
"libs": {
"type": "array",
"uniqueItems": True,
"items": {
"type": "string",
"minLength": 1,
}
},
},
"additionalProperties": False,
"required": [
"name",
]
}
def _check_binaries(values):
for job_binary in values:
if not api.get_job_binary(job_binary):
raise e.NotFoundException(job_binary,
"Job binary '%s' does not exist")
def check_mains_libs(data, **kwargs):
mains = data.get("mains", [])
libs = data.get("libs", [])
# As a basic check, mains or libs has to be non-empty
if not (mains or libs):
raise e.InvalidDataException("'mains' or 'libs' must be non-empty")
# Check for overlap
if set(mains).intersection(set(libs)):
raise e.InvalidDataException("'mains' and 'libs' overlap")
# Make sure that all referenced binaries exist
_check_binaries(mains)
_check_binaries(libs)

View File

@ -34,30 +34,9 @@ SAMPLE_DATA_SOURCE = {
SAMPLE_JOB = {
"tenant_id": "test_tenant",
"name": "ngt_test",
"description": "test_desc",
"type": "db",
"input_type": "swift",
"output_type": "swift"
}
SAMPLE_CONFIGURED_JOB = {
"tenant_id": "test_tenant",
"name": "ngt_test",
"description": "test_desc",
"type": "db",
"input_type": "swift",
"output_type": "swift",
"job_configs": {
"conf1": "value_j",
"conf2": "value_j"
}
}
SAMPLE_JOB_ORIGIN = {
"tenant_id": "test_tenant",
"name": "job_origin_test",
"name": "job_test",
"description": "test_desc",
"type": "Pig",
"mains": []
}
@ -90,7 +69,7 @@ BINARY_DATA = "vU}\x97\x1c\xdf\xa686\x08\xf2\tf\x0b\xb1}"
SAMPLE_JOB_BINARY_INTERNAL = {
"tenant_id": "test_tenant",
"name": "job_origin_test",
"name": "job_test",
"data": BINARY_DATA
}
@ -153,56 +132,6 @@ class DataSourceTest(test_base.ConductorManagerTestCase):
self.api.data_source_destroy(ctx, _id)
class JobTest(test_base.ConductorManagerTestCase):
def __init__(self, *args, **kwargs):
super(JobTest, self).__init__(
checks=[
lambda: SAMPLE_JOB
], *args, **kwargs)
def test_crud_operation_create_list_delete(self):
ctx = context.ctx()
self.api.job_create(ctx, SAMPLE_JOB)
lst = self.api.job_get_all(ctx)
self.assertEqual(len(lst), 1)
job_id = lst[0]['id']
self.api.job_destroy(ctx, job_id)
lst = self.api.job_get_all(ctx)
self.assertEqual(len(lst), 0)
def test_duplicate_job_create(self):
ctx = context.ctx()
self.api.job_create(ctx, SAMPLE_JOB)
with self.assertRaises(ex.DBDuplicateEntry):
self.api.job_create(ctx, SAMPLE_JOB)
def test_job_fields(self):
ctx = context.ctx()
ctx.tenant_id = SAMPLE_JOB['tenant_id']
job_db_obj_id = self.api.job_create(ctx, SAMPLE_JOB)['id']
job_db_obj = self.api.job_get(ctx, job_db_obj_id)
self.assertIsInstance(job_db_obj, dict)
for key, val in SAMPLE_JOB.items():
self.assertEqual(val, job_db_obj.get(key),
"Key not found %s" % key)
def test_job_delete(self):
ctx = context.ctx()
job_db_obj = self.api.job_create(ctx, SAMPLE_JOB)
_id = job_db_obj['id']
self.api.job_destroy(ctx, _id)
with self.assertRaises(ex.NotFoundException):
self.api.job_destroy(ctx, _id)
class JobExecutionTest(test_base.ConductorManagerTestCase):
def test_crud_operation_create_list_delete_update(self):
ctx = context.ctx()
@ -246,7 +175,7 @@ class JobExecutionTest(test_base.ConductorManagerTestCase):
def test_crud_operation_on_configured_jobs(self):
ctx = context.ctx()
job = self.api.job_create(ctx, SAMPLE_CONFIGURED_JOB)
job = self.api.job_create(ctx, SAMPLE_JOB)
ds_input = self.api.data_source_create(ctx, SAMPLE_DATA_SOURCE)
SAMPLE_DATA_OUTPUT = copy.copy(SAMPLE_DATA_SOURCE)
SAMPLE_DATA_OUTPUT['name'] = 'output'
@ -263,53 +192,51 @@ class JobExecutionTest(test_base.ConductorManagerTestCase):
job_ex = lst[0]
configs = {
'conf1': 'value_j',
'conf2': 'value_je',
'conf3': 'value_je'
}
self.assertEqual(configs, job_ex['job_configs'])
class JobOriginTest(test_base.ConductorManagerTestCase):
class JobTest(test_base.ConductorManagerTestCase):
def __init__(self, *args, **kwargs):
super(JobOriginTest, self).__init__(
super(JobTest, self).__init__(
checks=[
lambda: SAMPLE_JOB_ORIGIN
lambda: SAMPLE_JOB
], *args, **kwargs)
def test_crud_operation_create_list_delete_update(self):
ctx = context.ctx()
self.api.job_origin_create(ctx, SAMPLE_JOB_ORIGIN)
self.api.job_create(ctx, SAMPLE_JOB)
lst = self.api.job_origin_get_all(ctx)
lst = self.api.job_get_all(ctx)
self.assertEqual(len(lst), 1)
jo_id = lst[0]['id']
update_jo = self.api.job_origin_update(ctx, jo_id,
{'description': 'update'})
update_jo = self.api.job_update(ctx, jo_id,
{'description': 'update'})
self.assertEqual(update_jo['description'], 'update')
self.api.job_origin_destroy(ctx, jo_id)
self.api.job_destroy(ctx, jo_id)
lst = self.api.job_origin_get_all(ctx)
lst = self.api.job_get_all(ctx)
self.assertEqual(len(lst), 0)
with self.assertRaises(ex.NotFoundException):
self.api.job_origin_destroy(ctx, jo_id)
self.api.job_destroy(ctx, jo_id)
def test_job_origin_fields(self):
def test_job_fields(self):
ctx = context.ctx()
ctx.tenant_id = SAMPLE_JOB_ORIGIN['tenant_id']
job_origin_id = self.api.job_origin_create(ctx,
SAMPLE_JOB_ORIGIN)['id']
ctx.tenant_id = SAMPLE_JOB['tenant_id']
job_id = self.api.job_create(ctx, SAMPLE_JOB)['id']
job_origin = self.api.job_origin_get(ctx, job_origin_id)
self.assertIsInstance(job_origin, dict)
job = self.api.job_get(ctx, job_id)
self.assertIsInstance(job, dict)
for key, val in SAMPLE_JOB_ORIGIN.items():
self.assertEqual(val, job_origin.get(key),
for key, val in SAMPLE_JOB.items():
self.assertEqual(val, job.get(key),
"Key not found %s" % key)
@ -420,17 +347,16 @@ class JobBinaryTest(test_base.ConductorManagerTestCase):
job_binary_id = self.api.job_binary_create(ctx,
SAMPLE_JOB_BINARY)['id']
job_origin_values = copy.copy(SAMPLE_JOB_ORIGIN)
job_origin_values[reference] = [job_binary_id]
job_origin_id = self.api.job_origin_create(ctx,
job_origin_values)['id']
job_values = copy.copy(SAMPLE_JOB)
job_values[reference] = [job_binary_id]
job_id = self.api.job_create(ctx, job_values)['id']
# Delete while referenced, fails
with self.assertRaises(ex.DeletionFailed):
self.api.job_binary_destroy(ctx, job_binary_id)
# Delete while not referenced
self.api.job_origin_destroy(ctx, job_origin_id)
self.api.job_destroy(ctx, job_id)
self.api.job_binary_destroy(ctx, job_binary_id)
lst = self.api.job_binary_get_all(ctx)
self.assertEqual(len(lst), 0)

View File

@ -44,7 +44,7 @@ class TestJobManager(models_test_base.DbTestCase):
remote.return_value = remote_class
helper.return_value = 'ok'
job = _create_all_stack('Pig')[0]
job, _ = _create_all_stack('Pig')
res = job_manager.create_workflow_dir(mock.Mock(), job)
self.assertIn('/user/hadoop/special_name/', res)
@ -63,14 +63,14 @@ class TestJobManager(models_test_base.DbTestCase):
helper.return_value = 'ok'
conductor_raw_data.return_value = 'ok'
job, job_origin = _create_all_stack('Pig')
job, _ = _create_all_stack('Pig')
res = job_manager.upload_job_files(mock.Mock(), 'job_prefix',
job_origin)
job)
self.assertEqual(['job_prefix/script.pig'], res)
job, job_origin = _create_all_stack('Jar')
job, _ = _create_all_stack('Jar')
res = job_manager.upload_job_files(mock.Mock(), 'job_prefix',
job_origin)
job)
self.assertEqual(['job_prefix/lib/main.jar'], res)
remote.reset_mock()
@ -92,14 +92,13 @@ class TestJobManager(models_test_base.DbTestCase):
@mock.patch('savanna.conductor.API.job_binary_get')
def test_build_workflow_for_job_pig(self, job_binary):
job, origin = _create_all_stack('Pig')
job_exec = _create_job_exec(job.id)
job, job_exec = _create_all_stack('Pig')
job_binary.return_value = {"name": "script.pig"}
input_data = _create_data_source('swift://ex.savanna/i')
output_data = _create_data_source('swift://ex.savanna/o')
creator = workflow_factory.get_creator('Pig', origin)
creator = workflow_factory.get_creator(job)
res = creator.get_workflow_xml(job_exec.job_configs,
input_data, output_data)
@ -124,13 +123,12 @@ class TestJobManager(models_test_base.DbTestCase):
def test_build_workflow_for_job_jar(self):
job, origin = _create_all_stack('Jar')
job_exec = _create_job_exec(job.id)
job, job_exec = _create_all_stack('Jar')
input_data = _create_data_source('swift://ex.savanna/i')
output_data = _create_data_source('swift://ex.savanna/o')
creator = workflow_factory.get_creator('Jar', origin)
creator = workflow_factory.get_creator(job)
res = creator.get_workflow_xml(job_exec.job_configs,
input_data, output_data)
@ -162,14 +160,13 @@ class TestJobManager(models_test_base.DbTestCase):
@mock.patch('savanna.conductor.API.job_binary_get')
def test_build_workflow_for_job_hive(self, job_binary):
job, origin = _create_all_stack('Hive')
job_exec = _create_job_exec(job.id)
job, job_exec = _create_all_stack('Hive')
job_binary.return_value = {"name": "script.q"}
input_data = _create_data_source('swift://ex.savanna/i')
output_data = _create_data_source('swift://ex.savanna/o')
creator = workflow_factory.get_creator('Hive', origin)
creator = workflow_factory.get_creator(job)
res = creator.get_workflow_xml(job_exec.job_configs,
input_data, output_data)
@ -191,14 +188,14 @@ class TestJobManager(models_test_base.DbTestCase):
<param>OUTPUT=swift://ex.savanna/o</param>""", res)
def test_build_workflow_for_job_jar_with_conf(self):
job, origin = _create_all_stack('Jar')
job, _ = _create_all_stack('Jar')
input_data = _create_data_source('swift://ex.savanna/i')
output_data = _create_data_source('swift://ex.savanna/o')
job_exec = _create_job_exec(job.id, configs={"configs": {'c': 'f'}})
creator = workflow_factory.get_creator('Jar', origin)
creator = workflow_factory.get_creator(job)
res = creator.get_workflow_xml(job_exec.job_configs,
input_data, output_data)
@ -222,33 +219,25 @@ class TestJobManager(models_test_base.DbTestCase):
</property>""", res)
def _create_all_stack(type, configs=None):
def _create_all_stack(type):
b = _create_job_binary('1', type)
o = _create_job_origin('2', b, type)
j = _create_job('3', o.id, type)
j.configs = configs
return j, o
j = _create_job('2', b, type)
e = _create_job_exec(j.id)
return j, e
def _create_job(id, origin_id, type):
def _create_job(id, job_binary, type):
job = mock.Mock()
job.id = id
job.job_origin_id = origin_id
job.type = type
job.name = 'special_name'
return job
def _create_job_origin(id, job_binary, type):
origin = mock.Mock()
origin.id = id
if type == 'Pig' or type == 'Hive':
origin.mains = [job_binary]
origin.libs = None
job.mains = [job_binary]
job.libs = None
if type == 'Jar':
origin.libs = [job_binary]
origin.mains = None
return origin
job.libs = [job_binary]
job.mains = None
return job
def _create_job_binary(id, type):

View File

@ -1,39 +0,0 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna.service import api
from savanna.service.validations.edp import job_executor as j_exec
from savanna.tests.unit.service.validation import utils as u
class TestJobValidation(u.ValidationTestCase):
def setUp(self):
self._create_object_fun = j_exec.check_job_executor
self.scheme = j_exec.JOB_EXEC_SCHEMA
api.plugin_base.setup_plugins()
def test_job_execution_validation(self):
data = {
"input_id": "9830d572-e242-4f2a-962f-5a850c787e09",
"output_id": "9830d572-e242-4f2a-962f-5a850c787e09",
"cluster_id": "9830d572-e242-4f2a-962f-5a850c787e09",
"job_configs": {
"a": "True",
"b": 2,
"c": True
}
}
self._assert_types(data)