Add "mains" and "libs" fields to JobOrigins

These fields allow many-to-many relationships between JobOrigin
and JobBinary objects.

Co-Authored-By: Nadya Privalova <nprivalova@mirantis.com>

Implements: blueprint edp-resources-for-job

Change-Id: I8e01445d2e3f10704ae281fd1b1d67dfeadaf427
This commit is contained in:
Trevor McKay 2013-09-04 17:47:36 -04:00
parent fe5b782bd3
commit a1714e041a
23 changed files with 813 additions and 179 deletions

View File

@ -19,6 +19,7 @@ from savanna.service import validation as v
from savanna.service.validations.edp import data_source as v_d_s
from savanna.service.validations.edp import job as v_j
from savanna.service.validations.edp import job_binary as v_j_b
from savanna.service.validations.edp import job_binary_internal as v_j_b_i
from savanna.service.validations.edp import job_executor as v_j_e
from savanna.service.validations.edp import job_origin as v_j_o
import savanna.utils.api as u
@ -134,7 +135,7 @@ def job_origin_list():
@rest.post('/job-origins')
@v.validate(v_j_o.JOB_ORIGIN_SCHEMA)
@v.validate(v_j_o.JOB_ORIGIN_SCHEMA, v_j_o.check_mains_libs)
def job_origin_create(data):
return u.render(api.create_job_origin(data).to_wrapped_dict())
@ -152,10 +153,10 @@ def job_origin_delete(job_origin_id):
return u.render()
@rest.put_file('/job-binaries/<name>')
@v.validate(None, v_j_b.check_data_type_length)
def job_binary_create(**values):
return u.render(api.create_job_binary(values).to_wrapped_dict())
@rest.post('/job-binaries')
@v.validate(v_j_b.JOB_BINARY_SCHEMA, v_j_b.check_job_binary)
def job_binary_create(data):
return u.render(api.create_job_binary(data).to_wrapped_dict())
@rest.get('/job-binaries')
@ -176,7 +177,33 @@ def job_binary_delete(job_binary_id):
return u.render()
@rest.get('/job-binaries/<job_binary_id>/data')
@v.check_exists(api.get_job_binary, id='job_binary_id')
def job_binary_data(job_binary_id):
return api.get_job_binary_data(job_binary_id)
@rest.put_file('/job-binary-internals/<name>')
@v.validate(None, v_j_b_i.check_data_type_length)
def job_binary_internal_create(**values):
return u.render(api.create_job_binary_internal(values).to_wrapped_dict())
@rest.get('/job-binary-internals')
def job_binary_internal_list():
return u.render(binaries=[j.to_dict() for j in
api.get_job_binary_internals()])
@rest.get('/job-binary-internals/<job_binary_internal_id>')
@v.check_exists(api.get_job_binary_internal, 'job_binary_internal_id')
def job_binary_internal_get(job_binary_internal_id):
return u.render(api.get_job_binary_internal(job_binary_internal_id
).to_wrapped_dict())
@rest.delete('/job-binary-internals/<job_binary_internal_id>')
@v.check_exists(api.get_job_binary_internal, 'job_binary_internal_id')
def job_binary_internal_delete(job_binary_internal_id):
api.delete_job_binary_internal(job_binary_internal_id)
return u.render()
@rest.get('/job-binary-internals/<job_binary_internal_id>/data')
@v.check_exists(api.get_job_binary_internal, 'job_binary_internal_id')
def job_binary_internal_data(job_binary_internal_id):
return api.get_job_binary_internal_data(job_binary_internal_id)

View File

@ -318,9 +318,37 @@ class LocalApi(object):
"""Destroy the JobBinary or raise if it does not exist."""
self._manager.job_binary_destroy(context, _get_id(job_binary))
def job_binary_get_raw_data(self, context, job_binary_id):
"""Return the binary data field from the specified JobBinary."""
return self._manager.job_binary_get_raw_data(context, job_binary_id)
## JobBinaryInternal ops
@r.wrap(r.JobBinaryInternal)
def job_binary_internal_get_all(self, context):
"""Get all JobBinaryInternals."""
return self._manager.job_binary_internal_get_all(context)
@r.wrap(r.JobBinaryInternal)
def job_binary_internal_get(self, context, job_binary_internal):
"""Return the JobBinaryInternal or None if it does not exist."""
return self._manager.job_binary_internal_get(
context,
_get_id(job_binary_internal))
@r.wrap(r.JobBinaryInternal)
def job_binary_internal_create(self, context, values):
"""Create a JobBinaryInternal from the values dictionary."""
return self._manager.job_binary_internal_create(context, values)
def job_binary_internal_destroy(self, context, job_binary_internal_id):
"""Destroy the JobBinaryInternal or raise if it does not exist."""
self._manager.job_binary_internal_destroy(
context,
_get_id(job_binary_internal_id))
def job_binary_internal_get_raw_data(self, context,
job_binary_internal_id):
"""Return the binary data field from a JobBinaryInternal."""
return self._manager.job_binary_internal_get_raw_data(
context,
job_binary_internal_id)
class RemoteApi(LocalApi):

View File

@ -344,34 +344,58 @@ class ConductorManager(db_base.Base):
## JobBinary ops
def job_binary_get_all(self, context):
"""Get all JobBinarys
The JobBinarys returned do not contain a data field.
"""
"""Get all JobBinaries."""
return self.db.job_binary_get_all(context)
def job_binary_get(self, context, job_binary_id):
"""Return the JobBinary or None if it does not exist
The JobBinary returned does not contain a data field.
"""
"""Return the JobBinary or None if it does not exist."""
return self.db.job_binary_get(context, job_binary_id)
def job_binary_create(self, context, values):
"""Create a JobBinary from the values dictionary."""
values = copy.deepcopy(values)
values['tenant_id'] = context.tenant_id
return self.db.job_binary_create(context, values)
def job_binary_destroy(self, context, job_binary):
"""Destroy the JobBinary or raise if it does not exist."""
self.db.job_binary_destroy(context, job_binary)
## JobBinaryInternal ops
def job_binary_internal_get_all(self, context):
"""Get all JobBinaryInternals
The JobBinaryInternals returned do not contain a data field.
"""
return self.db.job_binary_internal_get_all(context)
def job_binary_internal_get(self, context, job_binary_internal_id):
"""Return the JobBinaryInternal or None if it does not exist
The JobBinaryInternal returned does not contain a data field.
"""
return self.db.job_binary_internal_get(context, job_binary_internal_id)
def job_binary_internal_create(self, context, values):
"""Create a JobBinaryInternal from the values dictionary."""
# Since values["data"] is (should be) encoded as a string
# here the deepcopy of values only incs a reference count on data.
# This is nice, since data could be big...
values = copy.deepcopy(values)
values['tenant_id'] = context.tenant_id
values['datasize'] = len(values["data"])
return self.db.job_binary_create(context, values)
return self.db.job_binary_internal_create(context, values)
def job_binary_destroy(self, context, job_binary):
"""Destroy the JobBinary or raise if it does not exist."""
self.db.job_binary_destroy(context, job_binary)
def job_binary_internal_destroy(self, context, job_binary_internal):
"""Destroy the JobBinaryInternal or raise if it does not exist."""
self.db.job_binary_internal_destroy(context, job_binary_internal)
def job_binary_get_raw_data(self, context, job_binary_id):
"""Return the binary data field from the specified JobBinary."""
return self.db.job_binary_get_raw_data(context, job_binary_id)
def job_binary_internal_get_raw_data(self,
context, job_binary_internal_id):
"""Return the binary data field from a JobBinaryInternal."""
return self.db.job_binary_internal_get_raw_data(
context,
job_binary_internal_id)

View File

@ -222,15 +222,27 @@ class JobOrigin(object):
tenant_id
name
description
storage_type
url
credentials
mains
libs
"""
class JobBinary(object):
"""An object representing JobBinary
id
tenant_id
name
description
url - URLs may be the following: savanna-db://URL, internal-swift://,
external-swift://
extra - extra may contain not only user-password but e.g. auth-token
"""
class JobBinaryInternal(object):
"""An object representing JobBinaryInternal
Note that the 'data' field is not returned. It uses deferred
loading and must be requested explicitly with the
job_binary_get_raw_data() conductor method.

View File

@ -223,3 +223,7 @@ class JobOrigin(Resource, objects.JobOrigin):
class JobBinary(Resource, objects.JobBinary):
pass
class JobBinaryInternal(Resource, objects.JobBinaryInternal):
pass

View File

@ -371,6 +371,30 @@ def job_binary_destroy(context, job_binary):
IMPL.job_binary_destroy(context, job_binary)
def job_binary_get_raw_data(context, job_binary_id):
"""Return the binary data field from the specified JobBinary."""
return IMPL.job_binary_get_raw_data(context, job_binary_id)
@to_dict
def job_binary_internal_get_all(context):
"""Get all JobBinaryInternals."""
return IMPL.job_binary_internal_get_all(context)
@to_dict
def job_binary_internal_get(context, job_binary_internal):
"""Return the JobBinaryInternal or None if it does not exist."""
return IMPL.job_binary_internal_get(context, job_binary_internal)
@to_dict
def job_binary_internal_create(context, values):
"""Create a JobBinaryInternal from the values dictionary."""
return IMPL.job_binary_internal_create(context, values)
def job_binary_internal_destroy(context, job_binary_internal):
"""Destroy the JobBinaryInternal or raise if it does not exist."""
IMPL.job_binary_internal_destroy(context, job_binary_internal)
def job_binary_internal_get_raw_data(context, job_binary_internal_id):
"""Return the binary data field from the specified JobBinaryInternal."""
return IMPL.job_binary_internal_get_raw_data(context,
job_binary_internal_id)

View File

@ -55,17 +55,6 @@ def model_query(model, context, session=None, project_only=None):
return query
def column_query(context, *columns, **kwargs):
session = kwargs.get("session") or get_session()
query = session.query(*columns)
if kwargs.get("project_only"):
query = query.filter_by(tenant_id=context.tenant_id)
return query
def setup_db():
try:
engine = db_session.get_engine(sqlite_fk=True)
@ -554,14 +543,37 @@ def job_origin_get_all(context):
def job_origin_create(context, values):
job_origin = m.JobOrigin()
job_origin.update(values)
mains = values.pop("mains", [])
libs = values.pop("libs", [])
try:
job_origin.save()
except db_exc.DBDuplicateEntry as e:
# raise exception about duplicated columns (e.columns)
raise RuntimeError("DBDuplicateEntry: %s" % e.columns)
session = get_session()
with session.begin():
job_origin = m.JobOrigin()
job_origin.update(values)
# libs and mains are 'lazy' objects. The initialization below
# is needed here because it provides libs and mains to be initialized
# within a session even if the lists are empty
job_origin.mains = []
job_origin.libs = []
try:
for main in mains:
query = model_query(m.JobBinary,
context, session).filter_by(id=main)
job_binary = query.first()
if job_binary is not None:
job_origin.mains.append(job_binary)
for lib in libs:
query = model_query(m.JobBinary,
context, session).filter_by(id=lib)
job_binary = query.first()
if job_binary is not None:
job_origin.libs.append(job_binary)
job_origin.save(session=session)
except db_exc.DBDuplicateEntry as e:
# raise exception about duplicated columns (e.columns)
raise RuntimeError("DBDuplicateEntry: %s" % e.columns)
return job_origin
@ -611,15 +623,6 @@ def job_binary_get(context, job_binary_id):
return query.first()
def job_binary_get_raw_data(context, job_binary_id):
"""Returns only the data field for the specified JobBinary."""
query = model_query(m.JobBinary, context).options(sa.orm.undefer("data"))
res = query.filter_by(id=job_binary_id).first()
if res is not None:
res = res.data
return res
def job_binary_create(context, values):
"""Returns a JobBinary that does not contain a data field
@ -637,15 +640,91 @@ def job_binary_create(context, values):
return job_binary_get(context, job_binary.id)
def _check_job_binary_referenced(session, id):
args = {"JobBinary_id": id}
return model_query(m.mains_association,
None, session).filter_by(**args).first() is not None or\
model_query(m.libs_association,
None, session).filter_by(**args).first() is not None
def job_binary_destroy(context, job_binary_id):
session = get_session()
with session.begin():
job_binary = model_query(m.JobBinary,
context).filter_by(id=job_binary_id).first()
context,
session).filter_by(id=job_binary_id).first()
if not job_binary:
raise ex.NotFoundException(job_binary_id,
"JobBinary id '%s' not found!")
if _check_job_binary_referenced(session, job_binary.id):
raise RuntimeError("JobBinary is referenced and cannot be deleted")
session.delete(job_binary)
## JobBinaryInternal ops
def job_binary_internal_get_all(context):
"""Returns JobBinaryInternal objects that do not contain a data field
The data column uses deferred loading.
"""
query = model_query(m.JobBinaryInternal, context)
return query.all()
def job_binary_internal_get(context, job_binary_internal_id):
"""Returns a JobBinaryInternal object that does not contain a data field
The data column uses deferred loadling.
"""
query = model_query(m.JobBinaryInternal, context).filter_by(
id=job_binary_internal_id)
return query.first()
def job_binary_internal_get_raw_data(context, job_binary_internal_id):
"""Returns only the data field for the specified JobBinaryInternal."""
query = model_query(m.JobBinaryInternal, context).options(
sa.orm.undefer("data"))
res = query.filter_by(id=job_binary_internal_id).first()
if res is not None:
res = res.data
return res
def job_binary_internal_create(context, values):
"""Returns a JobBinaryInternal that does not contain a data field
The data column uses deferred loading.
"""
job_binary_int = m.JobBinaryInternal()
job_binary_int.update(values)
try:
job_binary_int.save()
except db_exc.DBDuplicateEntry as e:
# raise exception about duplicated columns (e.columns)
raise RuntimeError("DBDuplicateEntry: %s" % e.columns)
return job_binary_internal_get(context, job_binary_int.id)
def job_binary_internal_destroy(context, job_binary_internal_id):
session = get_session()
with session.begin():
b_intrnl = model_query(m.JobBinaryInternal,
context
).filter_by(id=job_binary_internal_id).first()
if not b_intrnl:
raise ex.NotFoundException(job_binary_internal_id,
"JobBinaryInternal id '%s' not found!")
session.delete(b_intrnl)

View File

@ -279,6 +279,28 @@ class JobExecution(mb.SavannaBase):
job_configs = sa.Column(st.JsonDictType())
mains_association = sa.Table("mains_association",
mb.SavannaBase.metadata,
sa.Column("JobOrigin_id",
sa.String(36),
sa.ForeignKey("job_origins.id")),
sa.Column("JobBinary_id",
sa.String(36),
sa.ForeignKey("job_binaries.id"))
)
libs_association = sa.Table("libs_association",
mb.SavannaBase.metadata,
sa.Column("JobOrigin_id",
sa.String(36),
sa.ForeignKey("job_origins.id")),
sa.Column("JobBinary_id",
sa.String(36),
sa.ForeignKey("job_binaries.id"))
)
class JobOrigin(mb.SavannaBase):
"""JobOrigin - description and location of a job binary
"""
@ -293,9 +315,35 @@ class JobOrigin(mb.SavannaBase):
tenant_id = sa.Column(sa.String(36))
name = sa.Column(sa.String(80), nullable=False)
description = sa.Column(sa.Text())
storage_type = sa.Column(sa.String(16))
url = sa.Column(sa.String())
credentials = sa.Column(st.JsonDictType())
mains = relationship("JobBinary",
secondary=mains_association, lazy="joined")
libs = relationship("JobBinary",
secondary=libs_association, lazy="joined")
def to_dict(self):
d = super(JobOrigin, self).to_dict()
d['mains'] = [jb.to_dict() for jb in self.mains]
d['libs'] = [jb.to_dict() for jb in self.libs]
return d
class JobBinaryInternal(mb.SavannaBase):
"""JobBinaryInternal - raw binary storage for executable jobs
"""
__tablename__ = 'job_binary_internal'
__table_args__ = (
sa.UniqueConstraint('name', 'tenant_id'),
)
id = _id_column()
tenant_id = sa.Column(sa.String(36))
name = sa.Column(sa.String(80), nullable=False)
data = sa.orm.deferred(sa.Column(sa.LargeBinary))
datasize = sa.Column(sa.BIGINT)
class JobBinary(mb.SavannaBase):
@ -311,5 +359,6 @@ class JobBinary(mb.SavannaBase):
id = _id_column()
tenant_id = sa.Column(sa.String(36))
name = sa.Column(sa.String(80), nullable=False)
data = sa.orm.deferred(sa.Column(sa.LargeBinary))
datasize = sa.Column(sa.BIGINT)
description = sa.Column(sa.Text())
url = sa.Column(sa.String(), nullable=False)
extra = sa.Column(st.JsonDictType())

View File

@ -88,3 +88,38 @@ class RemoteCommandException(SavannaException):
if stdout:
self.message += '\nSTDOUT:\n' + stdout
class InvalidDataException(SavannaException):
"""General exception to use for invalid data
A more useful message should be passed to __init__ which
tells the user more about why the data is invalid.
"""
message = "Data is invalid"
code = "INVALID_DATA"
def __init__(self, message=None):
if message:
self.message = message
super(InvalidDataException, self).__init__(self.message, self.code)
class BadJobBinaryInternalException(SavannaException):
message = "Job binary internal data must be a string of length " \
"greater than zero"
def __init__(self):
self.code = "BAD_JOB_BINARY"
super(BadJobBinaryInternalException, self
).__init__(self.message, self.code)
class BadJobBinaryException(SavannaException):
message = "To work with JobBinary located in internal swift add 'user'" \
" and 'password' to extra"
def __init__(self):
self.code = "BAD_JOB_BINARY"
super(BadJobBinaryException, self).__init__(self.message, self.code)

View File

@ -117,5 +117,21 @@ def delete_job_binary(id):
conductor.job_binary_destroy(context.ctx(), id)
def get_job_binary_data(id):
return conductor.job_binary_get_raw_data(context.ctx(), id)
def create_job_binary_internal(values):
return conductor.job_binary_internal_create(context.ctx(), values)
def get_job_binary_internals():
return conductor.job_binary_internal_get_all(context.ctx())
def get_job_binary_internal(id):
return conductor.job_binary_internal_get(context.ctx(), id)
def delete_job_binary_internal(id):
conductor.job_binary_internal_destroy(context.ctx(), id)
def get_job_binary_internal_data(id):
return conductor.job_binary_internal_get_raw_data(context.ctx(), id)

View File

@ -0,0 +1,26 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna import context
from savanna.service.edp.binary_retrievers import internal_swift as i_swift
from savanna.service.edp.binary_retrievers import savanna_db as db
def get_raw_binary(job_binary):
url = job_binary.url
if url.startswith("savanna-db://"):
return db.get_raw_data(context.ctx(), job_binary)
if url.startswith("internal-swift://"):
return i_swift.get_raw_data(context.ctx(), job_binary)

View File

@ -0,0 +1,18 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def get_raw_data(context, job_binary):
pass

View File

@ -0,0 +1,25 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna import conductor as c
conductor = c.API
def get_raw_data(context, job_binary):
# url example: 'savanna-db://JobBinaryInternal-UUID
binary_internal_id = job_binary.url[len("savanna-db://"):]
return conductor.job_binary_internal_get_raw_data(context,
binary_internal_id)

View File

@ -27,6 +27,9 @@ from savanna.service.edp import oozie as o
from savanna.service.edp.workflow_creator import hive_workflow as hive_flow
from savanna.service.edp.workflow_creator import mapreduce_workflow as mr_flow
from savanna.service.edp.workflow_creator import pig_workflow as pig_flow
from savanna.service.edp.binary_retrievers import dispatch
from savanna.utils import remote
from savanna.utils import xmlutils as x
@ -42,10 +45,6 @@ CONF.register_opts(opts)
conductor = c.API
main_res_names = {'Pig': 'script.pig',
'Jar': 'main.jar',
'Hive': 'script.q'}
def get_job_status(job_execution_id):
ctx = context.ctx()
@ -92,13 +91,13 @@ def run_job(ctx, job_execution):
validate(input_source, output_source, job)
wf_dir = create_workflow_dir(u.get_jobtracker(cluster), job)
upload_job_file(u.get_jobtracker(cluster), wf_dir, job_origin, job)
upload_job_files(u.get_jobtracker(cluster), wf_dir, job_origin)
if job.type == 'Hive':
upload_hive_site(cluster, wf_dir)
wf_xml = build_workflow_for_job(job.type, job_execution, input_source,
output_source)
wf_xml = build_workflow_for_job(job.type, job_execution, job_origin,
input_source, output_source)
path_to_workflow = upload_workflow_file(u.get_jobtracker(cluster),
wf_dir, wf_xml)
@ -124,15 +123,22 @@ def run_job(ctx, job_execution):
return job_execution
def upload_job_file(where, job_dir, job_origin, job):
main_binary = conductor.job_binary_get_raw_data(context.ctx(),
job_origin.url)
if job.type == 'Jar':
job_dir += '/lib'
with remote.get_remote(where) as r:
h.put_file_to_hdfs(r, main_binary, main_res_names[job.type], job_dir)
def upload_job_files(where, job_dir, job_origin):
return "%s/%s" % (job_dir, main_res_names[job.type])
mains = job_origin.mains or []
libs = job_origin.libs or []
uploaded_paths = []
with remote.get_remote(where) as r:
for main in mains:
raw_data = dispatch.get_raw_binary(main)
h.put_file_to_hdfs(r, raw_data, main.name, job_dir)
uploaded_paths.append(job_dir + '/' + main.name)
for lib in libs:
raw_data = dispatch.get_raw_binary(lib)
h.put_file_to_hdfs(r, raw_data, lib.name, job_dir + "/lib")
uploaded_paths.append(job_dir + '/lib/' + lib.name)
return uploaded_paths
def upload_workflow_file(where, job_dir, wf_xml):
@ -159,8 +165,9 @@ def create_workflow_dir(where, job):
return constructed_dir
def build_workflow_for_job(job_type, job_execution, input_data, output_data):
def build_workflow_for_job(job_type, job_execution, job_origin,
input_data, output_data):
ctx = context.ctx()
configs = {'fs.swift.service.savanna.username':
input_data.credentials['user'],
'fs.swift.service.savanna.password':
@ -172,13 +179,17 @@ def build_workflow_for_job(job_type, job_execution, input_data, output_data):
if job_type == 'Pig':
creator = pig_flow.PigWorkflowCreator()
creator.build_workflow_xml(main_res_names['Pig'],
# In case of Pig there should be one element in mains
script = conductor.job_binary_get(ctx, job_origin.mains[0])
creator.build_workflow_xml(script["name"],
configuration=configs,
params={'INPUT': input_data.url,
'OUTPUT': output_data.url})
if job_type == 'Hive':
creator = hive_flow.HiveWorkflowCreator()
creator.build_workflow_xml(main_res_names['Hive'],
# In case of Hive there should be one element in mains
script = conductor.job_binary_get(ctx, job_origin.mains[0])
creator.build_workflow_xml(script["name"],
job_xml="hive-site.xml",
configuration=configs,
params={'INPUT': input_data.url,

View File

@ -76,3 +76,9 @@ def check_job_unique_name(name):
if name in [j.name for j in api.get_jobs()]:
raise ex.NameAlreadyExistsException("Job with name '%s' "
"already exists" % name)
def check_job_binary_internal_exists(jbi_id):
if not api.get_job_binary_internal(jbi_id):
raise ex.InvalidException("JobBinaryInternal with id '%s'"
" doesn't exist" % jbi_id)

View File

@ -14,16 +14,43 @@
# limitations under the License.
import savanna.exceptions as e
import savanna.service.validations.edp.base as b
JOB_BINARY_SCHEMA = {
"type": "object",
"properties": {
"name": {
"type": "string",
"minLength": 1,
"maxLength": 50
},
"description": {
"type": "string"
},
"url": {
"type": "string",
"format": "valid_job_location"
},
# extra is simple_config for now because we may need not only
# user-password it the case of external storage
"extra": {
"type": "simple_config",
}
},
"additionalProperties": False,
"required": [
"name",
"url"
]
}
class BadJobBinaryException(e.SavannaException):
message = "Job binary data must be a string of length greater than zero"
def __init__(self):
self.code = "BAD_JOB_BINARY"
super(BadJobBinaryException, self).__init__(self.message, self.code)
def check_data_type_length(data, **kwargs):
if not (type(data) is str and len(data) > 0):
raise BadJobBinaryException()
def check_job_binary(data, **kwargs):
job_binary_location_type = data["url"]
extra = data.get("extra", {})
if job_binary_location_type.startswith("swift-internal"):
if not extra.get("user") or not extra.get("password"):
raise e.BadJobBinaryException()
if job_binary_location_type.startswith("savanna-db"):
internal_uid = job_binary_location_type[len("savanna-db://"):]
b.check_job_binary_internal_exists(internal_uid)

View File

@ -0,0 +1,21 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import savanna.exceptions as e
def check_data_type_length(data, **kwargs):
if not (type(data) is str and len(data) > 0):
raise e.BadJobBinaryInternalException()

View File

@ -13,7 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import savanna.service.validations.edp.base as b
import savanna.exceptions as e
from savanna.service.edp import api
JOB_ORIGIN_SCHEMA = {
"type": "object",
@ -26,22 +27,50 @@ JOB_ORIGIN_SCHEMA = {
"description": {
"type": "string"
},
"storage_type": b.job_storage,
"url": {
"type": "string"
"mains": {
"type": "array",
"minItems": 1,
"uniqueItems": True,
"items": {
"type": "string",
"minLength": 1,
}
},
"libs": {
"type": "array",
"uniqueItems": True,
"items": {
"type": "string",
"minLength": 1,
}
},
# Allow credentials schemas to be
# added in the future
"credentials": {
"oneOf": [
b.user_pass
]
}
},
"additionalProperties": False,
"required": [
"name",
"storage_type",
"url"
]
}
def _check_binaries(values):
for job_binary in values:
if not api.get_job_binary(job_binary):
raise e.NotFoundException(job_binary,
"Job binary '%s' does not exist")
def check_mains_libs(data, **kwargs):
mains = data.get("mains", [])
libs = data.get("libs", [])
# As a basic check, mains or libs has to be non-empty
if not (mains or libs):
raise e.InvalidDataException("'mains' or 'libs' must be non-empty")
# Check for overlap
if set(mains).intersection(set(libs)):
raise e.InvalidDataException("'mains' and 'libs' overlap")
# Make sure that all referenced binaries exist
_check_binaries(mains)
_check_binaries(libs)

View File

@ -58,12 +58,7 @@ SAMPLE_JOB_ORIGIN = {
"tenant_id": "test_tenant",
"name": "job_origin_test",
"description": "test_desc",
"storage_type": "swift",
"url": "localhost:1080",
"credentials": {
"user": "test",
"password": "123"
}
"mains": []
}
SAMPLE_JOB_EXECUTION = {
@ -93,13 +88,21 @@ SAMPLE_CONF_JOB_EXECUTION = {
BINARY_DATA = "vU}\x97\x1c\xdf\xa686\x08\xf2\tf\x0b\xb1}"
SAMPLE_JOB_BINARY = {
SAMPLE_JOB_BINARY_INTERNAL = {
"tenant_id": "test_tenant",
"name": "job_origin_test",
"data": BINARY_DATA
}
SAMPLE_JOB_BINARY = {
"tenant_id": "test_tenant",
"name": "job_binary_test",
"description": "test_dec",
"url": "savanna-db://test_binary",
}
class DataSourceTest(test_base.ConductorManagerTestCase):
def __init__(self, *args, **kwargs):
super(DataSourceTest, self).__init__(
@ -288,6 +291,84 @@ class JobOriginTest(test_base.ConductorManagerTestCase):
with self.assertRaises(ex.NotFoundException):
self.api.job_origin_destroy(ctx, jo)
def test_job_origin_fields(self):
ctx = context.ctx()
ctx.tenant_id = SAMPLE_JOB_ORIGIN['tenant_id']
job_origin_id = self.api.job_origin_create(ctx,
SAMPLE_JOB_ORIGIN)['id']
job_origin = self.api.job_origin_get(ctx, job_origin_id)
self.assertIsInstance(job_origin, dict)
for key, val in SAMPLE_JOB_ORIGIN.items():
self.assertEqual(val, job_origin.get(key),
"Key not found %s" % key)
class JobBinaryInternalTest(test_base.ConductorManagerTestCase):
def __init__(self, *args, **kwargs):
super(JobBinaryInternalTest, self).__init__(
checks=[
lambda: SAMPLE_JOB_BINARY_INTERNAL
], *args, **kwargs)
def test_crud_operation_create_list_delete(self):
ctx = context.ctx()
self.api.job_binary_internal_create(ctx, SAMPLE_JOB_BINARY_INTERNAL)
lst = self.api.job_binary_internal_get_all(ctx)
self.assertEqual(len(lst), 1)
job_bin_int_id = lst[0]['id']
self.api.job_binary_internal_destroy(ctx, job_bin_int_id)
lst = self.api.job_binary_internal_get_all(ctx)
self.assertEqual(len(lst), 0)
with self.assertRaises(ex.NotFoundException):
self.api.job_binary_internal_destroy(ctx, job_bin_int_id)
def test_duplicate_job_binary_internal_create(self):
ctx = context.ctx()
self.api.job_binary_internal_create(ctx, SAMPLE_JOB_BINARY_INTERNAL)
with self.assertRaises(RuntimeError):
self.api.job_binary_internal_create(ctx,
SAMPLE_JOB_BINARY_INTERNAL)
def test_job_binary_internal_get_raw(self):
ctx = context.ctx()
id = self.api.job_binary_internal_create(ctx,
SAMPLE_JOB_BINARY_INTERNAL
)['id']
data = self.api.job_binary_internal_get_raw_data(ctx, id)
self.assertEqual(data, SAMPLE_JOB_BINARY_INTERNAL["data"])
self.api.job_binary_internal_destroy(ctx, id)
data = self.api.job_binary_internal_get_raw_data(ctx, id)
self.assertEqual(data, None)
def test_job_binary_internal_fields(self):
ctx = context.ctx()
ctx.tenant_id = SAMPLE_JOB_BINARY_INTERNAL['tenant_id']
id = self.api.job_binary_internal_create(
ctx, SAMPLE_JOB_BINARY_INTERNAL)['id']
internal = self.api.job_binary_internal_get(ctx, id)
self.assertIsInstance(internal, dict)
with self.assertRaises(KeyError):
internal["data"]
internal["data"] = self.api.job_binary_internal_get_raw_data(ctx, id)
for key, val in SAMPLE_JOB_BINARY_INTERNAL.items():
if key == "datasize":
self.assertEqual(len(BINARY_DATA), internal["datasize"])
else:
self.assertEqual(val, internal.get(key),
"Key not found %s" % key)
class JobBinaryTest(test_base.ConductorManagerTestCase):
def __init__(self, *args, **kwargs):
@ -313,40 +394,48 @@ class JobBinaryTest(test_base.ConductorManagerTestCase):
with self.assertRaises(ex.NotFoundException):
self.api.job_binary_destroy(ctx, job_binary_id)
def test_job_binary_fields(self):
ctx = context.ctx()
ctx.tenant_id = SAMPLE_JOB_BINARY['tenant_id']
job_binary_id = self.api.job_binary_create(ctx,
SAMPLE_JOB_BINARY)['id']
job_binary = self.api.job_binary_get(ctx, job_binary_id)
self.assertIsInstance(job_binary, dict)
for key, val in SAMPLE_JOB_BINARY.items():
self.assertEqual(val, job_binary.get(key),
"Key not found %s" % key)
def _test_job_binary_referenced(self, reference):
ctx = context.ctx()
job_binary_id = self.api.job_binary_create(ctx,
SAMPLE_JOB_BINARY)['id']
job_origin_values = copy.copy(SAMPLE_JOB_ORIGIN)
job_origin_values[reference] = [job_binary_id]
job_origin_id = self.api.job_origin_create(ctx,
job_origin_values)['id']
# Delete while referenced, fails
with self.assertRaises(RuntimeError):
self.api.job_binary_destroy(ctx, job_binary_id)
# Delete while not referenced
self.api.job_origin_destroy(ctx, job_origin_id)
self.api.job_binary_destroy(ctx, job_binary_id)
lst = self.api.job_binary_get_all(ctx)
self.assertEqual(len(lst), 0)
def test_job_binary_referenced_mains(self):
self._test_job_binary_referenced("mains")
def test_job_binary_referenced_libs(self):
self._test_job_binary_referenced("libs")
def test_duplicate_job_binary_create(self):
ctx = context.ctx()
self.api.job_binary_create(ctx, SAMPLE_JOB_BINARY)
with self.assertRaises(RuntimeError):
self.api.job_binary_create(ctx, SAMPLE_JOB_BINARY)
def test_job_binary_get_raw(self):
ctx = context.ctx()
id = self.api.job_binary_create(ctx,
SAMPLE_JOB_BINARY)['id']
data = self.api.job_binary_get_raw_data(ctx, id)
self.assertEqual(data, SAMPLE_JOB_BINARY["data"])
self.api.job_binary_destroy(ctx, id)
data = self.api.job_binary_get_raw_data(ctx, id)
self.assertEqual(data, None)
def test_job_binary_fields(self):
ctx = context.ctx()
ctx.tenant_id = SAMPLE_JOB_BINARY['tenant_id']
id = self.api.job_binary_create(ctx,
SAMPLE_JOB_BINARY)['id']
job_binary = self.api.job_binary_get(ctx, id)
self.assertIsInstance(job_binary, dict)
with self.assertRaises(KeyError):
job_binary["data"]
job_binary["data"] = self.api.job_binary_get_raw_data(ctx, id)
for key, val in SAMPLE_JOB_BINARY.items():
if key == "datasize":
self.assertEqual(len(BINARY_DATA), job_binary["datasize"])
else:
self.assertEqual(val, job_binary.get(key),
"Key not found %s" % key)
self.api.job_binary_create(ctx,
SAMPLE_JOB_BINARY)

View File

@ -54,23 +54,23 @@ class TestJobManager(models_test_base.DbTestCase):
@mock.patch('savanna.utils.remote.get_remote')
@mock.patch('savanna.service.edp.hdfs_helper.put_file_to_hdfs')
@mock.patch('savanna.utils.remote.InstanceInteropHelper')
@mock.patch('savanna.conductor.API.job_binary_get_raw_data')
def test_upload_job_file(self, conductor_raw_data, remote_class,
helper, remote):
@mock.patch('savanna.conductor.API.job_binary_internal_get_raw_data')
def test_upload_job_files(self, conductor_raw_data, remote_class,
helper, remote):
remote_class.__exit__.return_value = 'closed'
remote.return_value = remote_class
helper.return_value = 'ok'
conductor_raw_data.return_value = 'ok'
job, job_origin = _create_all_stack('Pig')
res = job_manager.upload_job_file(mock.Mock(), 'job_prefix',
job_origin, job)
self.assertEqual('job_prefix/script.pig', res)
res = job_manager.upload_job_files(mock.Mock(), 'job_prefix',
job_origin)
self.assertEqual(['job_prefix/script.pig'], res)
job, job_origin = _create_all_stack('Jar')
res = job_manager.upload_job_file(mock.Mock(), 'job_prefix',
job_origin, job)
self.assertEqual('job_prefix/lib/main.jar', res)
res = job_manager.upload_job_files(mock.Mock(), 'job_prefix',
job_origin)
self.assertEqual(['job_prefix/lib/main.jar'], res)
remote.reset_mock()
remote_class.reset_mock()
@ -88,16 +88,18 @@ class TestJobManager(models_test_base.DbTestCase):
conf.reset_mock()
def test_build_workflow_for_job_pig(self):
@mock.patch('savanna.conductor.API.job_binary_get')
def test_build_workflow_for_job_pig(self, job_binary):
job = _create_all_stack('Pig')[0]
job, origin = _create_all_stack('Pig')
job_exec = _create_job_exec(job.id)
job_binary.return_value = {"name": "script.pig"}
input_data = _create_data_source('swift://ex.savanna/i')
output_data = _create_data_source('swift://ex.savanna/o')
res = job_manager.build_workflow_for_job('Pig', job_exec, input_data,
output_data)
res = job_manager.build_workflow_for_job('Pig', job_exec, origin,
input_data, output_data)
self.assertIn("""
<param>INPUT=swift://ex.savanna/i</param>
@ -117,14 +119,14 @@ class TestJobManager(models_test_base.DbTestCase):
def test_build_workflow_for_job_jar(self):
job = _create_all_stack('Jar')[0]
job, origin = _create_all_stack('Jar')
job_exec = _create_job_exec(job.id)
input_data = _create_data_source('swift://ex.savanna/i')
output_data = _create_data_source('swift://ex.savanna/o')
res = job_manager.build_workflow_for_job('Jar', job_exec, input_data,
output_data)
res = job_manager.build_workflow_for_job('Jar', job_exec, origin,
input_data, output_data)
self.assertIn("""
<configuration>
<property>
@ -145,16 +147,18 @@ class TestJobManager(models_test_base.DbTestCase):
</property>
</configuration>""", res)
def test_build_workflow_for_job_hive(self):
@mock.patch('savanna.conductor.API.job_binary_get')
def test_build_workflow_for_job_hive(self, job_binary):
job = _create_all_stack('Hive')[0]
job, origin = _create_all_stack('Hive')
job_exec = _create_job_exec(job.id)
job_binary.return_value = {"name": "script.q"}
input_data = _create_data_source('swift://ex.savanna/i')
output_data = _create_data_source('swift://ex.savanna/o')
res = job_manager.build_workflow_for_job('Hive', job_exec, input_data,
output_data)
res = job_manager.build_workflow_for_job('Hive', job_exec, origin,
input_data, output_data)
self.assertIn("""
<job-xml>hive-site.xml</job-xml>
@ -173,14 +177,14 @@ class TestJobManager(models_test_base.DbTestCase):
<param>OUTPUT=swift://ex.savanna/o</param>""", res)
def test_build_workflow_for_job_jar_with_conf(self):
job = _create_all_stack('Jar')[0]
job, origin = _create_all_stack('Jar')
input_data = _create_data_source('swift://ex.savanna/i')
output_data = _create_data_source('swift://ex.savanna/o')
job_exec = _create_job_exec(job.id, configs={'c': 'f'})
res = job_manager.build_workflow_for_job('Jar', job_exec, input_data,
output_data)
res = job_manager.build_workflow_for_job('Jar', job_exec, origin,
input_data, output_data)
self.assertIn("""
<property>
<name>c</name>
@ -193,8 +197,8 @@ class TestJobManager(models_test_base.DbTestCase):
def _create_all_stack(type, configs=None):
b = _create_job_binary('1')
o = _create_job_origin('2', b.id)
b = _create_job_binary('1', type)
o = _create_job_origin('2', b, type)
j = _create_job('3', o.id, type)
j.configs = configs
return j, o
@ -209,16 +213,28 @@ def _create_job(id, origin_id, type):
return job
def _create_job_origin(id, binary_id):
def _create_job_origin(id, job_binary, type):
origin = mock.Mock()
origin.id = id
origin.url = binary_id
if type == 'Pig' or type == 'Hive':
origin.mains = [job_binary]
origin.libs = None
if type == 'Jar':
origin.libs = [job_binary]
origin.mains = None
return origin
def _create_job_binary(id):
def _create_job_binary(id, type):
binary = mock.Mock()
binary.id = id
binary.url = "savanna-db://42"
if type == "Pig":
binary.name = "script.pig"
if type == "Jar":
binary.name = "main.jar"
if type == "Hive":
binary.name = "script.q"
return binary

View File

@ -0,0 +1,57 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna.service import api
from savanna.service.validations.edp import job_binary as b
from savanna.tests.unit.service.validation import utils as u
class TestJobBinaryValidation(u.ValidationTestCase):
def setUp(self):
self._create_object_fun = b.check_job_binary
self.scheme = b.JOB_BINARY_SCHEMA
api.plugin_base.setup_plugins()
def test_creation(self):
data = {
"name": "main.jar",
"url": "savanna-db://3e4651a5-1f08-4880-94c4-596372b37c64",
"extra": {
"user": "user",
"password": "password"
},
"description": "long description"
}
self._assert_types(data)
def test_job_binary_create_swift(self):
self._assert_create_object_validation(
data={
"name": "j_o_w",
"url": "swift-internal://o.savanna/k"
},
bad_req_i=(1, "BAD_JOB_BINARY",
"To work with JobBinary located in internal "
"swift add 'user' and 'password' to extra"))
def test_job_binary_create_internal(self):
self._assert_create_object_validation(
data={
"name": "main.jar",
"url": "savanna-db://abacaba",
},
bad_req_i=(1, "VALIDATION_ERROR",
"'savanna-db://abacaba' is not a "
"'valid_job_location'"))

View File

@ -29,6 +29,17 @@ def validate_name_format(entry):
return res is not None
@jsonschema.FormatChecker.cls_checks('valid_job_location')
def validate_job_location_format(entry):
if entry.startswith('savanna-db://'):
return uuidutils.is_uuid_like(entry[len("savanna-db://"):])
if (entry.startswith('swift-internal://') or
entry.startswith('swift-external://')):
#TODO(nprivalova):add hostname validation
return True
return False
@jsonschema.FormatChecker.cls_checks('valid_tag')
def validate_valid_tag_format(entry):
res = re.match(r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-_]"