Fix running EDP job on transient cluster

Change-Id: Iea182c10f958a4a0034c472a77e58711b4d4fa73
Closes-bug: #1307960
(cherry picked from commit 0eda1b3b53)
This commit is contained in:
Sergey Reshetnyak 2014-04-15 16:23:06 +04:00 committed by Sergey Lukjanov
parent d7aef2377b
commit a61a061603
8 changed files with 31 additions and 13 deletions

View File

@ -25,6 +25,10 @@ def get_node_groups(cluster, node_process=None):
node_process in [n.lower() for n in ng.node_processes])]
def get_instances_count(cluster, node_process=None):
return sum([ng.count for ng in get_node_groups(cluster, node_process)])
def get_instances(cluster, node_process=None):
nodes = get_node_groups(cluster, node_process)
return reduce(lambda a, b: a + b.instances, nodes, [])

View File

@ -142,6 +142,11 @@ class AmbariPlugin(p.ProvisioningPluginBase):
def get_oozie_server(self, cluster):
return u.get_instance(cluster, "oozie_server")
def validate_edp(self, cluster):
oo_count = u.get_instances_count(cluster, 'oozie_server')
if oo_count != 1:
raise ex.InvalidComponentCountException('oozie', '1', oo_count)
def get_resource_manager_uri(self, cluster):
version_handler = (
self.version_factory.get_version_handler(cluster.hadoop_version))

View File

@ -66,6 +66,10 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
def get_oozie_server(self, cluster):
pass
@plugins_base.optional
def validate_edp(self, cluster):
pass
@plugins_base.optional
def get_resource_manager_uri(self, cluster):
pass

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.plugins.general import exceptions as ex
from sahara.plugins.general import utils as u
from sahara.plugins import provisioning as p
from sahara.plugins.vanilla import versionfactory as vhf
@ -77,3 +79,8 @@ class VanillaProvider(p.ProvisioningPluginBase):
def get_oozie_server(self, cluster):
return self._get_version_handler(
cluster.hadoop_version).get_oozie_server(cluster)
def validate_edp(self, cluster):
oo_count = u.get_instances_count(cluster, 'oozie')
if oo_count != 1:
raise ex.InvalidComponentCountException('oozie', '1', oo_count)

View File

@ -321,9 +321,6 @@ def check_required_image_tags(plugin_name, hadoop_version, image_id):
## EDP
def check_cluster_contains_oozie(cluster_id):
def check_edp_job_support(cluster_id):
cluster = api.get_cluster(cluster_id)
if not plugin_base.PLUGINS.get_plugin(
cluster.plugin_name).get_oozie_server(cluster):
raise ex.InvalidDataException('MapReduce job could not be run, '
'Oozie service is not found.')
plugin_base.PLUGINS.get_plugin(cluster.plugin_name).validate_edp(cluster)

View File

@ -63,7 +63,7 @@ def check_job_executor(data, job_id):
job_type, subtype = edp.split_job_type(job.type)
# Check if cluster contains Oozie service to run job
main_base.check_cluster_contains_oozie(data['cluster_id'])
main_base.check_edp_job_support(data['cluster_id'])
# All types except Java require input and output objects
if job_type == 'Java':

View File

@ -45,7 +45,7 @@ class TestJobExecValidation(u.ValidationTestCase):
'check_data_sources_are_different', lambda x, y: None)
@mock.patch('sahara.service.validations.base.check_cluster_exists',
lambda x: None)
@mock.patch('sahara.service.validations.base.check_cluster_contains_oozie')
@mock.patch('sahara.service.validations.base.check_edp_job_support')
@mock.patch('sahara.service.validations'
'.edp.base.check_data_source_exists')
@mock.patch('sahara.service.edp.api.get_job')
@ -83,7 +83,7 @@ class TestJobExecValidation(u.ValidationTestCase):
@mock.patch('sahara.service.validations.base.check_cluster_exists',
lambda x: None)
@mock.patch('sahara.service.validations.base.check_cluster_contains_oozie',
@mock.patch('sahara.service.validations.base.check_edp_job_support',
lambda x: None)
@mock.patch('sahara.service.edp.api.get_data_source')
@mock.patch('sahara.service.edp.api.get_job')
@ -134,7 +134,7 @@ class TestJobExecValidation(u.ValidationTestCase):
@mock.patch('sahara.service.api.get_cluster')
@mock.patch('sahara.service.edp.api.get_job')
def test_check_oozie(self, get_job, get_cluster):
def test_check_edp_job_support(self, get_job, get_cluster):
get_job.return_value = FakeJob()
self._assert_create_object_validation(
data={
@ -142,11 +142,12 @@ class TestJobExecValidation(u.ValidationTestCase):
"input_id": six.text_type(uuid.uuid4()),
"output_id": six.text_type(uuid.uuid4())
},
bad_req_i=(1, "INVALID_DATA", "MapReduce job could not be run, "
"Oozie service is not found."))
bad_req_i=(1, "INVALID_COMPONENT_COUNT",
"Hadoop cluster should contain 1 oozie components. "
"Actual oozie count is 0"))
ng = tu.make_ng_dict('master', 42, ['oozie'], 1,
instances=[tu.make_inst_dict('id', 'name')])
get_cluster.return_value = tu.create_cluster("cluster", "tenant1",
"vanilla", "1.2.1", [ng])
validation_base.check_cluster_contains_oozie('some_id')
validation_base.check_edp_job_support('some_id')

View File

@ -37,7 +37,7 @@ class TestJobExecValidation(u.ValidationTestCase):
self._create_object_fun = wrap_it
self.scheme = je.JOB_EXEC_SCHEMA
@mock.patch('sahara.service.validations.base.check_cluster_contains_oozie')
@mock.patch('sahara.service.validations.base.check_edp_job_support')
@mock.patch('sahara.service.validations.base.check_cluster_exists')
@mock.patch('sahara.service.edp.api.get_job')
def test_java(self, get_job, check_cluster, check_oozie):