Add ".sahara" suffix automatically to swift URLs in workflows

Currently Swift URLs accessed from Hadoop must be of the form
"swift://container.service/object", where for Sahara the service
name is "sahara".  Hadoop uses the service name to look up credentials
for accessing the data.

This mechansism puts different constraints on swift urls used for
job binaries (accessed from Sahara) and data sources (accessed from
Hadoop) which may be unexpected.

This CR automatically adds the ".sahara" suffix to swift URLs when
the workflow is generated.

Change-Id: I9afc25b2354c008dc5357058bf980cb0f05da864
This commit is contained in:
Trevor McKay 2014-05-05 17:20:17 -04:00
parent c3fa215e64
commit 3ffb9874a3
6 changed files with 93 additions and 34 deletions

View File

@ -14,6 +14,7 @@
# limitations under the License.
import six
import six.moves.urllib.parse as urlparse
from sahara import conductor as c
from sahara import context
@ -22,15 +23,14 @@ from sahara.service.edp.workflow_creator import hive_workflow
from sahara.service.edp.workflow_creator import java_workflow
from sahara.service.edp.workflow_creator import mapreduce_workflow
from sahara.service.edp.workflow_creator import pig_workflow
from sahara.swift import swift_helper as sw
from sahara.swift import utils as su
from sahara.utils import edp
from sahara.utils import xmlutils
conductor = c.API
swift_username = 'fs.swift.service.sahara.username'
swift_password = 'fs.swift.service.sahara.password'
class BaseFactory(object):
def _separate_edp_configs(self, job_dict):
@ -69,6 +69,14 @@ class BaseFactory(object):
new_vals = src.get(key, {})
value.update(new_vals)
def inject_swift_url_suffix(self, url):
if url.startswith("swift://"):
u = urlparse.urlparse(url)
if not u.netloc.endswith(su.SWIFT_URL_SUFFIX):
return url.replace(u.netloc,
u.netloc+"%s" % su.SWIFT_URL_SUFFIX, 1)
return url
def update_job_dict(self, job_dict, exec_dict):
pruned_exec_dict, edp_configs = self._prune_edp_configs(exec_dict)
self._update_dict(job_dict, pruned_exec_dict)
@ -79,14 +87,29 @@ class BaseFactory(object):
# Args are listed, not named. Simply replace them.
job_dict['args'] = pruned_exec_dict.get('args', [])
# Find all swift:// paths in args, configs, and params and
# add the .sahara suffix to the container if it is not there
# already
job_dict['args'] = [
# TODO(tmckay) args for Pig can actually be -param name=value
# and value could conceivably contain swift paths
self.inject_swift_url_suffix(arg) for arg in job_dict['args']]
for k, v in six.iteritems(job_dict.get('configs', {})):
job_dict['configs'][k] = self.inject_swift_url_suffix(v)
for k, v in six.iteritems(job_dict.get('params', {})):
job_dict['params'][k] = self.inject_swift_url_suffix(v)
def get_configs(self, input_data, output_data):
configs = {}
for src in (input_data, output_data):
if src.type == "swift" and hasattr(src, "credentials"):
if "user" in src.credentials:
configs[swift_username] = src.credentials['user']
configs[sw.HADOOP_SWIFT_USERNAME] = src.credentials['user']
if "password" in src.credentials:
configs[swift_password] = src.credentials['password']
configs[
sw.HADOOP_SWIFT_PASSWORD] = src.credentials['password']
break
return configs
@ -175,6 +198,7 @@ class JavaFactory(BaseFactory):
job_dict = {'configs': {},
'args': []}
self.update_job_dict(job_dict, execution.job_configs)
main_class, java_opts = self._get_java_configs(job_dict)
creator = java_workflow.JavaWorkflowCreator()
creator.build_workflow_xml(main_class,

View File

@ -66,8 +66,12 @@ def _check_swift_data_source_create(data):
if url.scheme != "swift":
raise ex.InvalidException("URL scheme must be 'swift'")
# We must have the suffix, and the path must be more than '/'
if not url.netloc.endswith(su.SWIFT_URL_SUFFIX) or len(url.path) <= 1:
# The swift url suffix does not have to be included in the netloc.
# However, if the swift suffix indicator is part of the netloc then
# we require the right suffix.
# Additionally, the path must be more than '/'
if (su.SWIFT_URL_SUFFIX_START in url.netloc and not url.netloc.endswith(
su.SWIFT_URL_SUFFIX)) or len(url.path) <= 1:
raise ex.InvalidException(
"URL must be of the form swift://container%s/object"
% su.SWIFT_URL_SUFFIX)

View File

@ -26,7 +26,8 @@ CONF = cfg.CONF
SWIFT_INTERNAL_PREFIX = "swift://"
# TODO(mattf): remove support for OLD_SWIFT_INTERNAL_PREFIX
OLD_SWIFT_INTERNAL_PREFIX = "swift-internal://"
SWIFT_URL_SUFFIX = '.sahara'
SWIFT_URL_SUFFIX_START = '.'
SWIFT_URL_SUFFIX = SWIFT_URL_SUFFIX_START + 'sahara'
def _get_service_address(service_type):

View File

@ -19,6 +19,7 @@ import time
import uuid
from sahara.openstack.common import excutils
from sahara.swift import swift_helper as sw
from sahara.tests.integration.tests import base
from sahara.utils import edp
@ -107,16 +108,16 @@ class EDPTest(base.ITestCase):
self.sahara.data_sources.delete(output_id)
def _add_swift_configs(self, configs):
swift_user = "fs.swift.service.sahara.username"
swift_passw = "fs.swift.service.sahara.password"
if "configs" not in configs:
configs["configs"] = {}
if swift_user not in configs["configs"]:
configs["configs"][swift_user] = self.common_config.OS_USERNAME
if swift_passw not in configs["configs"]:
configs["configs"][swift_passw] = self.common_config.OS_PASSWORD
if sw.HADOOP_SWIFT_USERNAME not in configs["configs"]:
configs["configs"][
sw.HADOOP_SWIFT_USERNAME] = self.common_config.OS_USERNAME
if sw.HADOOP_SWIFT_PASSWORD not in configs["configs"]:
configs["configs"][
sw.HADOOP_SWIFT_PASSWORD] = self.common_config.OS_PASSWORD
@base.skip_test('SKIP_EDP_TEST',
'Test for EDP was skipped.')

View File

@ -21,6 +21,7 @@ from sahara import conductor as cond
from sahara.plugins import base as pb
from sahara.service.edp import job_manager
from sahara.service.edp.workflow_creator import workflow_factory
from sahara.swift import swift_helper as sw
from sahara.tests.unit import base
from sahara.utils import edp
from sahara.utils import patches as p
@ -96,8 +97,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
job, job_exec = _create_all_stack(edp.JOB_TYPE_PIG)
job_binary.return_value = {"name": "script.pig"}
input_data = _create_data_source('swift://ex.sahara/i')
output_data = _create_data_source('swift://ex.sahara/o')
input_data = _create_data_source('swift://ex/i')
output_data = _create_data_source('swift://ex/o')
creator = workflow_factory.get_creator(job)
@ -129,7 +130,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
job, job_exec = _create_all_stack(edp.JOB_TYPE_PIG)
job_binary.return_value = {"name": "script.pig"}
input_data = _create_data_source('swift://ex.sahara/i')
input_data = _create_data_source('swift://ex/i')
output_data = _create_data_source('hdfs://user/hadoop/out')
creator = workflow_factory.get_creator(job)
@ -149,7 +150,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
</configuration>""", res)
input_data = _create_data_source('hdfs://user/hadoop/in')
output_data = _create_data_source('swift://ex.sahara/o')
output_data = _create_data_source('swift://ex/o')
creator = workflow_factory.get_creator(job)
@ -196,8 +197,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
job, job_exec = _create_all_stack(job_type, configs)
input_data = _create_data_source('swift://ex.sahara/i')
output_data = _create_data_source('swift://ex.sahara/o')
input_data = _create_data_source('swift://ex/i')
output_data = _create_data_source('swift://ex/o')
creator = workflow_factory.get_creator(job)
@ -243,12 +244,12 @@ class TestJobManager(base.SaharaWithDbTestCase):
# If args include swift paths, user and password values
# will have to be supplied via configs instead of being
# lifted from input or output data sources
configs = {workflow_factory.swift_username: 'admin',
workflow_factory.swift_password: 'admin1'}
configs = {sw.HADOOP_SWIFT_USERNAME: 'admin',
sw.HADOOP_SWIFT_PASSWORD: 'admin1'}
configs = {
'configs': configs,
'args': ['input_path',
'args': ['swift://ex/i',
'output_path']
}
@ -269,7 +270,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
</configuration>
<main-class>%s</main-class>
<java-opts>%s</java-opts>
<arg>input_path</arg>
<arg>swift://ex.sahara/i</arg>
<arg>output_path</arg>""" % (_java_main_class, _java_opts), res)
@mock.patch('sahara.conductor.API.job_binary_get')
@ -278,8 +279,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
job, job_exec = _create_all_stack(edp.JOB_TYPE_HIVE)
job_binary.return_value = {"name": "script.q"}
input_data = _create_data_source('swift://ex.sahara/i')
output_data = _create_data_source('swift://ex.sahara/o')
input_data = _create_data_source('swift://ex/i')
output_data = _create_data_source('swift://ex/o')
creator = workflow_factory.get_creator(job)
@ -305,8 +306,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
def _build_workflow_with_conf_common(self, job_type):
job, _ = _create_all_stack(job_type)
input_data = _create_data_source('swift://ex.sahara/i')
output_data = _create_data_source('swift://ex.sahara/o')
input_data = _create_data_source('swift://ex/i')
output_data = _create_data_source('swift://ex/o')
job_exec = _create_job_exec(job.id,
job_type, configs={"configs": {'c': 'f'}})
@ -369,6 +370,15 @@ class TestJobManager(base.SaharaWithDbTestCase):
self.assertEqual(orig_exec_job_dict, exec_job_dict)
def test_inject_swift_url_suffix(self):
w = workflow_factory.BaseFactory()
self.assertEqual(w.inject_swift_url_suffix("swift://ex/o"),
"swift://ex.sahara/o")
self.assertEqual(w.inject_swift_url_suffix("swift://ex.sahara/o"),
"swift://ex.sahara/o")
self.assertEqual(w.inject_swift_url_suffix("hdfs://my/path"),
"hdfs://my/path")
def _create_all_stack(type, configs=None):
b = _create_job_binary('1', type)

View File

@ -21,7 +21,8 @@ from sahara.service.validations.edp import data_source as ds
from sahara.swift import utils as su
from sahara.tests.unit.service.validation import utils as u
SAMPLE_SWIFT_URL = "swift://1234%s/object" % su.SWIFT_URL_SUFFIX
SAMPLE_SWIFT_URL = "swift://1234/object"
SAMPLE_SWIFT_URL_WITH_SUFFIX = "swift://1234%s/object" % su.SWIFT_URL_SUFFIX
class TestDataSourceValidation(u.ValidationTestCase):
@ -103,7 +104,7 @@ class TestDataSourceValidation(u.ValidationTestCase):
data = {
"name": "test_data_data_source",
"url": "swif://1234%s/object" % su.SWIFT_URL_SUFFIX,
"url": "swif://1234/object",
"type": "swift",
"description": "incorrect url schema"
}
@ -112,13 +113,31 @@ class TestDataSourceValidation(u.ValidationTestCase):
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_swift_creation_missing_suffix(self,
check_data_source_unique_name):
def test_swift_creation_explicit_suffix(self,
check_data_source_unique_name):
check_data_source_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": "swift://1234/object",
"url": SAMPLE_SWIFT_URL_WITH_SUFFIX,
"type": "swift",
"description": "incorrect url schema",
"credentials": {
"user": "user",
"password": "password"
}
}
self._assert_types(data)
@mock.patch("sahara.service.validations."
"edp.base.check_data_source_unique_name")
def test_swift_creation_wrong_suffix(self,
check_data_source_unique_name):
check_data_source_unique_name.return_value = True
data = {
"name": "test_data_data_source",
"url": "swift://1234.suffix/object",
"type": "swift",
"description": "incorrect url schema"
}
@ -133,7 +152,7 @@ class TestDataSourceValidation(u.ValidationTestCase):
data = {
"name": "test_data_data_source",
"url": "swift://1234%s/" % su.SWIFT_URL_SUFFIX,
"url": "swift://1234/",
"type": "swift",
"description": "incorrect url schema"
}