Fix running EDP job on transient cluster
Change-Id: Iea182c10f958a4a0034c472a77e58711b4d4fa73
Closes-bug: #1307960
(cherry picked from commit 0eda1b3b53
)
This commit is contained in:
parent
d7aef2377b
commit
a61a061603
|
@ -25,6 +25,10 @@ def get_node_groups(cluster, node_process=None):
|
|||
node_process in [n.lower() for n in ng.node_processes])]
|
||||
|
||||
|
||||
def get_instances_count(cluster, node_process=None):
|
||||
return sum([ng.count for ng in get_node_groups(cluster, node_process)])
|
||||
|
||||
|
||||
def get_instances(cluster, node_process=None):
|
||||
nodes = get_node_groups(cluster, node_process)
|
||||
return reduce(lambda a, b: a + b.instances, nodes, [])
|
||||
|
|
|
@ -142,6 +142,11 @@ class AmbariPlugin(p.ProvisioningPluginBase):
|
|||
def get_oozie_server(self, cluster):
|
||||
return u.get_instance(cluster, "oozie_server")
|
||||
|
||||
def validate_edp(self, cluster):
|
||||
oo_count = u.get_instances_count(cluster, 'oozie_server')
|
||||
if oo_count != 1:
|
||||
raise ex.InvalidComponentCountException('oozie', '1', oo_count)
|
||||
|
||||
def get_resource_manager_uri(self, cluster):
|
||||
version_handler = (
|
||||
self.version_factory.get_version_handler(cluster.hadoop_version))
|
||||
|
|
|
@ -66,6 +66,10 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
|
|||
def get_oozie_server(self, cluster):
|
||||
pass
|
||||
|
||||
@plugins_base.optional
|
||||
def validate_edp(self, cluster):
|
||||
pass
|
||||
|
||||
@plugins_base.optional
|
||||
def get_resource_manager_uri(self, cluster):
|
||||
pass
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.plugins.general import exceptions as ex
|
||||
from sahara.plugins.general import utils as u
|
||||
from sahara.plugins import provisioning as p
|
||||
from sahara.plugins.vanilla import versionfactory as vhf
|
||||
|
||||
|
@ -77,3 +79,8 @@ class VanillaProvider(p.ProvisioningPluginBase):
|
|||
def get_oozie_server(self, cluster):
|
||||
return self._get_version_handler(
|
||||
cluster.hadoop_version).get_oozie_server(cluster)
|
||||
|
||||
def validate_edp(self, cluster):
|
||||
oo_count = u.get_instances_count(cluster, 'oozie')
|
||||
if oo_count != 1:
|
||||
raise ex.InvalidComponentCountException('oozie', '1', oo_count)
|
||||
|
|
|
@ -321,9 +321,6 @@ def check_required_image_tags(plugin_name, hadoop_version, image_id):
|
|||
## EDP
|
||||
|
||||
|
||||
def check_cluster_contains_oozie(cluster_id):
|
||||
def check_edp_job_support(cluster_id):
|
||||
cluster = api.get_cluster(cluster_id)
|
||||
if not plugin_base.PLUGINS.get_plugin(
|
||||
cluster.plugin_name).get_oozie_server(cluster):
|
||||
raise ex.InvalidDataException('MapReduce job could not be run, '
|
||||
'Oozie service is not found.')
|
||||
plugin_base.PLUGINS.get_plugin(cluster.plugin_name).validate_edp(cluster)
|
||||
|
|
|
@ -63,7 +63,7 @@ def check_job_executor(data, job_id):
|
|||
job_type, subtype = edp.split_job_type(job.type)
|
||||
|
||||
# Check if cluster contains Oozie service to run job
|
||||
main_base.check_cluster_contains_oozie(data['cluster_id'])
|
||||
main_base.check_edp_job_support(data['cluster_id'])
|
||||
|
||||
# All types except Java require input and output objects
|
||||
if job_type == 'Java':
|
||||
|
|
|
@ -45,7 +45,7 @@ class TestJobExecValidation(u.ValidationTestCase):
|
|||
'check_data_sources_are_different', lambda x, y: None)
|
||||
@mock.patch('sahara.service.validations.base.check_cluster_exists',
|
||||
lambda x: None)
|
||||
@mock.patch('sahara.service.validations.base.check_cluster_contains_oozie')
|
||||
@mock.patch('sahara.service.validations.base.check_edp_job_support')
|
||||
@mock.patch('sahara.service.validations'
|
||||
'.edp.base.check_data_source_exists')
|
||||
@mock.patch('sahara.service.edp.api.get_job')
|
||||
|
@ -83,7 +83,7 @@ class TestJobExecValidation(u.ValidationTestCase):
|
|||
|
||||
@mock.patch('sahara.service.validations.base.check_cluster_exists',
|
||||
lambda x: None)
|
||||
@mock.patch('sahara.service.validations.base.check_cluster_contains_oozie',
|
||||
@mock.patch('sahara.service.validations.base.check_edp_job_support',
|
||||
lambda x: None)
|
||||
@mock.patch('sahara.service.edp.api.get_data_source')
|
||||
@mock.patch('sahara.service.edp.api.get_job')
|
||||
|
@ -134,7 +134,7 @@ class TestJobExecValidation(u.ValidationTestCase):
|
|||
|
||||
@mock.patch('sahara.service.api.get_cluster')
|
||||
@mock.patch('sahara.service.edp.api.get_job')
|
||||
def test_check_oozie(self, get_job, get_cluster):
|
||||
def test_check_edp_job_support(self, get_job, get_cluster):
|
||||
get_job.return_value = FakeJob()
|
||||
self._assert_create_object_validation(
|
||||
data={
|
||||
|
@ -142,11 +142,12 @@ class TestJobExecValidation(u.ValidationTestCase):
|
|||
"input_id": six.text_type(uuid.uuid4()),
|
||||
"output_id": six.text_type(uuid.uuid4())
|
||||
},
|
||||
bad_req_i=(1, "INVALID_DATA", "MapReduce job could not be run, "
|
||||
"Oozie service is not found."))
|
||||
bad_req_i=(1, "INVALID_COMPONENT_COUNT",
|
||||
"Hadoop cluster should contain 1 oozie components. "
|
||||
"Actual oozie count is 0"))
|
||||
|
||||
ng = tu.make_ng_dict('master', 42, ['oozie'], 1,
|
||||
instances=[tu.make_inst_dict('id', 'name')])
|
||||
get_cluster.return_value = tu.create_cluster("cluster", "tenant1",
|
||||
"vanilla", "1.2.1", [ng])
|
||||
validation_base.check_cluster_contains_oozie('some_id')
|
||||
validation_base.check_edp_job_support('some_id')
|
||||
|
|
|
@ -37,7 +37,7 @@ class TestJobExecValidation(u.ValidationTestCase):
|
|||
self._create_object_fun = wrap_it
|
||||
self.scheme = je.JOB_EXEC_SCHEMA
|
||||
|
||||
@mock.patch('sahara.service.validations.base.check_cluster_contains_oozie')
|
||||
@mock.patch('sahara.service.validations.base.check_edp_job_support')
|
||||
@mock.patch('sahara.service.validations.base.check_cluster_exists')
|
||||
@mock.patch('sahara.service.edp.api.get_job')
|
||||
def test_java(self, get_job, check_cluster, check_oozie):
|
||||
|
|
Loading…
Reference in New Issue