From 3ffb9874a3dfaadfff8df5215dd3f7b47d9c8dc0 Mon Sep 17 00:00:00 2001 From: Trevor McKay Date: Mon, 5 May 2014 17:20:17 -0400 Subject: [PATCH] Add ".sahara" suffix automatically to swift URLs in workflows Currently Swift URLs accessed from Hadoop must be of the form "swift://container.service/object", where for Sahara the service name is "sahara". Hadoop uses the service name to look up credentials for accessing the data. This mechansism puts different constraints on swift urls used for job binaries (accessed from Sahara) and data sources (accessed from Hadoop) which may be unexpected. This CR automatically adds the ".sahara" suffix to swift URLs when the workflow is generated. Change-Id: I9afc25b2354c008dc5357058bf980cb0f05da864 --- .../edp/workflow_creator/workflow_factory.py | 34 ++++++++++++++--- sahara/service/validations/edp/data_source.py | 8 +++- sahara/swift/utils.py | 3 +- sahara/tests/integration/tests/edp.py | 13 ++++--- .../unit/service/edp/test_job_manager.py | 38 ++++++++++++------- .../validation/edp/test_data_source.py | 31 ++++++++++++--- 6 files changed, 93 insertions(+), 34 deletions(-) diff --git a/sahara/service/edp/workflow_creator/workflow_factory.py b/sahara/service/edp/workflow_creator/workflow_factory.py index 25f5776b2a..d31daa29c9 100644 --- a/sahara/service/edp/workflow_creator/workflow_factory.py +++ b/sahara/service/edp/workflow_creator/workflow_factory.py @@ -14,6 +14,7 @@ # limitations under the License. import six +import six.moves.urllib.parse as urlparse from sahara import conductor as c from sahara import context @@ -22,15 +23,14 @@ from sahara.service.edp.workflow_creator import hive_workflow from sahara.service.edp.workflow_creator import java_workflow from sahara.service.edp.workflow_creator import mapreduce_workflow from sahara.service.edp.workflow_creator import pig_workflow +from sahara.swift import swift_helper as sw +from sahara.swift import utils as su from sahara.utils import edp from sahara.utils import xmlutils conductor = c.API -swift_username = 'fs.swift.service.sahara.username' -swift_password = 'fs.swift.service.sahara.password' - class BaseFactory(object): def _separate_edp_configs(self, job_dict): @@ -69,6 +69,14 @@ class BaseFactory(object): new_vals = src.get(key, {}) value.update(new_vals) + def inject_swift_url_suffix(self, url): + if url.startswith("swift://"): + u = urlparse.urlparse(url) + if not u.netloc.endswith(su.SWIFT_URL_SUFFIX): + return url.replace(u.netloc, + u.netloc+"%s" % su.SWIFT_URL_SUFFIX, 1) + return url + def update_job_dict(self, job_dict, exec_dict): pruned_exec_dict, edp_configs = self._prune_edp_configs(exec_dict) self._update_dict(job_dict, pruned_exec_dict) @@ -79,14 +87,29 @@ class BaseFactory(object): # Args are listed, not named. Simply replace them. job_dict['args'] = pruned_exec_dict.get('args', []) + # Find all swift:// paths in args, configs, and params and + # add the .sahara suffix to the container if it is not there + # already + job_dict['args'] = [ + # TODO(tmckay) args for Pig can actually be -param name=value + # and value could conceivably contain swift paths + self.inject_swift_url_suffix(arg) for arg in job_dict['args']] + + for k, v in six.iteritems(job_dict.get('configs', {})): + job_dict['configs'][k] = self.inject_swift_url_suffix(v) + + for k, v in six.iteritems(job_dict.get('params', {})): + job_dict['params'][k] = self.inject_swift_url_suffix(v) + def get_configs(self, input_data, output_data): configs = {} for src in (input_data, output_data): if src.type == "swift" and hasattr(src, "credentials"): if "user" in src.credentials: - configs[swift_username] = src.credentials['user'] + configs[sw.HADOOP_SWIFT_USERNAME] = src.credentials['user'] if "password" in src.credentials: - configs[swift_password] = src.credentials['password'] + configs[ + sw.HADOOP_SWIFT_PASSWORD] = src.credentials['password'] break return configs @@ -175,6 +198,7 @@ class JavaFactory(BaseFactory): job_dict = {'configs': {}, 'args': []} self.update_job_dict(job_dict, execution.job_configs) + main_class, java_opts = self._get_java_configs(job_dict) creator = java_workflow.JavaWorkflowCreator() creator.build_workflow_xml(main_class, diff --git a/sahara/service/validations/edp/data_source.py b/sahara/service/validations/edp/data_source.py index 8d7b456948..a4d124aed4 100644 --- a/sahara/service/validations/edp/data_source.py +++ b/sahara/service/validations/edp/data_source.py @@ -66,8 +66,12 @@ def _check_swift_data_source_create(data): if url.scheme != "swift": raise ex.InvalidException("URL scheme must be 'swift'") - # We must have the suffix, and the path must be more than '/' - if not url.netloc.endswith(su.SWIFT_URL_SUFFIX) or len(url.path) <= 1: + # The swift url suffix does not have to be included in the netloc. + # However, if the swift suffix indicator is part of the netloc then + # we require the right suffix. + # Additionally, the path must be more than '/' + if (su.SWIFT_URL_SUFFIX_START in url.netloc and not url.netloc.endswith( + su.SWIFT_URL_SUFFIX)) or len(url.path) <= 1: raise ex.InvalidException( "URL must be of the form swift://container%s/object" % su.SWIFT_URL_SUFFIX) diff --git a/sahara/swift/utils.py b/sahara/swift/utils.py index 581c00178a..2a19ff878b 100644 --- a/sahara/swift/utils.py +++ b/sahara/swift/utils.py @@ -26,7 +26,8 @@ CONF = cfg.CONF SWIFT_INTERNAL_PREFIX = "swift://" # TODO(mattf): remove support for OLD_SWIFT_INTERNAL_PREFIX OLD_SWIFT_INTERNAL_PREFIX = "swift-internal://" -SWIFT_URL_SUFFIX = '.sahara' +SWIFT_URL_SUFFIX_START = '.' +SWIFT_URL_SUFFIX = SWIFT_URL_SUFFIX_START + 'sahara' def _get_service_address(service_type): diff --git a/sahara/tests/integration/tests/edp.py b/sahara/tests/integration/tests/edp.py index 74aa59eb82..fdb93a0f4c 100644 --- a/sahara/tests/integration/tests/edp.py +++ b/sahara/tests/integration/tests/edp.py @@ -19,6 +19,7 @@ import time import uuid from sahara.openstack.common import excutils +from sahara.swift import swift_helper as sw from sahara.tests.integration.tests import base from sahara.utils import edp @@ -107,16 +108,16 @@ class EDPTest(base.ITestCase): self.sahara.data_sources.delete(output_id) def _add_swift_configs(self, configs): - swift_user = "fs.swift.service.sahara.username" - swift_passw = "fs.swift.service.sahara.password" if "configs" not in configs: configs["configs"] = {} - if swift_user not in configs["configs"]: - configs["configs"][swift_user] = self.common_config.OS_USERNAME - if swift_passw not in configs["configs"]: - configs["configs"][swift_passw] = self.common_config.OS_PASSWORD + if sw.HADOOP_SWIFT_USERNAME not in configs["configs"]: + configs["configs"][ + sw.HADOOP_SWIFT_USERNAME] = self.common_config.OS_USERNAME + if sw.HADOOP_SWIFT_PASSWORD not in configs["configs"]: + configs["configs"][ + sw.HADOOP_SWIFT_PASSWORD] = self.common_config.OS_PASSWORD @base.skip_test('SKIP_EDP_TEST', 'Test for EDP was skipped.') diff --git a/sahara/tests/unit/service/edp/test_job_manager.py b/sahara/tests/unit/service/edp/test_job_manager.py index 8c0d6a9c8f..90bba2bb27 100644 --- a/sahara/tests/unit/service/edp/test_job_manager.py +++ b/sahara/tests/unit/service/edp/test_job_manager.py @@ -21,6 +21,7 @@ from sahara import conductor as cond from sahara.plugins import base as pb from sahara.service.edp import job_manager from sahara.service.edp.workflow_creator import workflow_factory +from sahara.swift import swift_helper as sw from sahara.tests.unit import base from sahara.utils import edp from sahara.utils import patches as p @@ -96,8 +97,8 @@ class TestJobManager(base.SaharaWithDbTestCase): job, job_exec = _create_all_stack(edp.JOB_TYPE_PIG) job_binary.return_value = {"name": "script.pig"} - input_data = _create_data_source('swift://ex.sahara/i') - output_data = _create_data_source('swift://ex.sahara/o') + input_data = _create_data_source('swift://ex/i') + output_data = _create_data_source('swift://ex/o') creator = workflow_factory.get_creator(job) @@ -129,7 +130,7 @@ class TestJobManager(base.SaharaWithDbTestCase): job, job_exec = _create_all_stack(edp.JOB_TYPE_PIG) job_binary.return_value = {"name": "script.pig"} - input_data = _create_data_source('swift://ex.sahara/i') + input_data = _create_data_source('swift://ex/i') output_data = _create_data_source('hdfs://user/hadoop/out') creator = workflow_factory.get_creator(job) @@ -149,7 +150,7 @@ class TestJobManager(base.SaharaWithDbTestCase): """, res) input_data = _create_data_source('hdfs://user/hadoop/in') - output_data = _create_data_source('swift://ex.sahara/o') + output_data = _create_data_source('swift://ex/o') creator = workflow_factory.get_creator(job) @@ -196,8 +197,8 @@ class TestJobManager(base.SaharaWithDbTestCase): job, job_exec = _create_all_stack(job_type, configs) - input_data = _create_data_source('swift://ex.sahara/i') - output_data = _create_data_source('swift://ex.sahara/o') + input_data = _create_data_source('swift://ex/i') + output_data = _create_data_source('swift://ex/o') creator = workflow_factory.get_creator(job) @@ -243,12 +244,12 @@ class TestJobManager(base.SaharaWithDbTestCase): # If args include swift paths, user and password values # will have to be supplied via configs instead of being # lifted from input or output data sources - configs = {workflow_factory.swift_username: 'admin', - workflow_factory.swift_password: 'admin1'} + configs = {sw.HADOOP_SWIFT_USERNAME: 'admin', + sw.HADOOP_SWIFT_PASSWORD: 'admin1'} configs = { 'configs': configs, - 'args': ['input_path', + 'args': ['swift://ex/i', 'output_path'] } @@ -269,7 +270,7 @@ class TestJobManager(base.SaharaWithDbTestCase): %s %s - input_path + swift://ex.sahara/i output_path""" % (_java_main_class, _java_opts), res) @mock.patch('sahara.conductor.API.job_binary_get') @@ -278,8 +279,8 @@ class TestJobManager(base.SaharaWithDbTestCase): job, job_exec = _create_all_stack(edp.JOB_TYPE_HIVE) job_binary.return_value = {"name": "script.q"} - input_data = _create_data_source('swift://ex.sahara/i') - output_data = _create_data_source('swift://ex.sahara/o') + input_data = _create_data_source('swift://ex/i') + output_data = _create_data_source('swift://ex/o') creator = workflow_factory.get_creator(job) @@ -305,8 +306,8 @@ class TestJobManager(base.SaharaWithDbTestCase): def _build_workflow_with_conf_common(self, job_type): job, _ = _create_all_stack(job_type) - input_data = _create_data_source('swift://ex.sahara/i') - output_data = _create_data_source('swift://ex.sahara/o') + input_data = _create_data_source('swift://ex/i') + output_data = _create_data_source('swift://ex/o') job_exec = _create_job_exec(job.id, job_type, configs={"configs": {'c': 'f'}}) @@ -369,6 +370,15 @@ class TestJobManager(base.SaharaWithDbTestCase): self.assertEqual(orig_exec_job_dict, exec_job_dict) + def test_inject_swift_url_suffix(self): + w = workflow_factory.BaseFactory() + self.assertEqual(w.inject_swift_url_suffix("swift://ex/o"), + "swift://ex.sahara/o") + self.assertEqual(w.inject_swift_url_suffix("swift://ex.sahara/o"), + "swift://ex.sahara/o") + self.assertEqual(w.inject_swift_url_suffix("hdfs://my/path"), + "hdfs://my/path") + def _create_all_stack(type, configs=None): b = _create_job_binary('1', type) diff --git a/sahara/tests/unit/service/validation/edp/test_data_source.py b/sahara/tests/unit/service/validation/edp/test_data_source.py index f68c3083b9..ca813592f4 100644 --- a/sahara/tests/unit/service/validation/edp/test_data_source.py +++ b/sahara/tests/unit/service/validation/edp/test_data_source.py @@ -21,7 +21,8 @@ from sahara.service.validations.edp import data_source as ds from sahara.swift import utils as su from sahara.tests.unit.service.validation import utils as u -SAMPLE_SWIFT_URL = "swift://1234%s/object" % su.SWIFT_URL_SUFFIX +SAMPLE_SWIFT_URL = "swift://1234/object" +SAMPLE_SWIFT_URL_WITH_SUFFIX = "swift://1234%s/object" % su.SWIFT_URL_SUFFIX class TestDataSourceValidation(u.ValidationTestCase): @@ -103,7 +104,7 @@ class TestDataSourceValidation(u.ValidationTestCase): data = { "name": "test_data_data_source", - "url": "swif://1234%s/object" % su.SWIFT_URL_SUFFIX, + "url": "swif://1234/object", "type": "swift", "description": "incorrect url schema" } @@ -112,13 +113,31 @@ class TestDataSourceValidation(u.ValidationTestCase): @mock.patch("sahara.service.validations." "edp.base.check_data_source_unique_name") - def test_swift_creation_missing_suffix(self, - check_data_source_unique_name): + def test_swift_creation_explicit_suffix(self, + check_data_source_unique_name): check_data_source_unique_name.return_value = True data = { "name": "test_data_data_source", - "url": "swift://1234/object", + "url": SAMPLE_SWIFT_URL_WITH_SUFFIX, + "type": "swift", + "description": "incorrect url schema", + "credentials": { + "user": "user", + "password": "password" + } + } + self._assert_types(data) + + @mock.patch("sahara.service.validations." + "edp.base.check_data_source_unique_name") + def test_swift_creation_wrong_suffix(self, + check_data_source_unique_name): + check_data_source_unique_name.return_value = True + + data = { + "name": "test_data_data_source", + "url": "swift://1234.suffix/object", "type": "swift", "description": "incorrect url schema" } @@ -133,7 +152,7 @@ class TestDataSourceValidation(u.ValidationTestCase): data = { "name": "test_data_data_source", - "url": "swift://1234%s/" % su.SWIFT_URL_SUFFIX, + "url": "swift://1234/", "type": "swift", "description": "incorrect url schema" }