diff --git a/doc/source/user/edp.rst b/doc/source/user/edp.rst index 863d6177b1..1052654ecc 100644 --- a/doc/source/user/edp.rst +++ b/doc/source/user/edp.rst @@ -19,6 +19,7 @@ of jobs on clusters created from sahara. EDP supports: + HDFS for all job types + swift for all types excluding Hive + manila (NFS shares only) for all types excluding Pig + + Any S3-like object store * configuration of jobs at submission time * execution of jobs on existing clusters or transient clusters @@ -69,7 +70,8 @@ running in the same OpenStack installation referenced by sahara. Sahara requires the following credentials/configs to access files stored in an S3-like object store: ``accesskey``, ``secretkey``, ``endpoint``. These credentials are specified through the `extra` in the body of the request -when creating a job binary or data source referencing S3. +when creating a job binary referencing S3. The value of ``endpoint`` should +include a protocol: *http* or *https*. To reference a binary file stored in manila, create the job binary with the URL ``manila://{share_id}/{path}``. This assumes that you have already stored @@ -132,6 +134,19 @@ share as a data source, create the data source with the URL share will be automatically mounted to your cluster's nodes as needed to access the data source. +Finally, Sahara supports data sources referring to S3-like object stores. The +URL should be of the form ``s3a://{bucket}/{path}``. Also, the following +credentials/configs are understood: ``accesskey``, ``secretkey``, +``endpoint``, ``bucket_in_path``, and ``ssl``. These credentials are specified +through the ``credentials`` attribute of the body of the request when creating +a data source referencing S3. The value of ``endpoint`` should **NOT** include +a protocol (*http* or *https*), unlike when referencing an S3 job binary. It +can also be noted that Sahara clusters can interact with S3-like stores even +when not using EDP, i.e. when manually operating the cluster instead. Consult +the `hadoop-aws documentation `_ +for more information. Also, be advised that hadoop-aws will only write a job's +output into a bucket which already exists: it does not create new buckets. + Some job types require the use of data source objects to specify input and output when a job is launched. For example, when running a Pig job the UI will prompt the user for input and output data source objects. @@ -572,7 +587,8 @@ Spark jobs use some special configuration values: * ``edp.spark.adapt_for_swift`` (optional) If set to **True**, instructs sahara to modify the job's Hadoop configuration so that swift paths may be accessed. Without this configuration value, swift paths will not be - accessible to Spark jobs. The default is **False**. + accessible to Spark jobs. The default is **False**. Despite the name, the + same principle applies to jobs which reference paths in S3-like stores. * ``edp.spark.driver.classpath`` (optional) If set to empty string sahara will use default classpath for the cluster during job execution. @@ -620,6 +636,9 @@ For job binaries only, S3 urls take the form: ``s3://bucket/path/to/object`` +For data sources, S3 urls take the standard Hadoop form: + +``s3a://bucket/path/to/object`` EDP Requirements ================ diff --git a/releasenotes/notes/support-s3-data-source-a912e2cdf4cd51fb.yaml b/releasenotes/notes/support-s3-data-source-a912e2cdf4cd51fb.yaml new file mode 100644 index 0000000000..255c65d598 --- /dev/null +++ b/releasenotes/notes/support-s3-data-source-a912e2cdf4cd51fb.yaml @@ -0,0 +1,3 @@ +--- +features: + - An EDP data source may reference a file stored in a S3-like object store. diff --git a/sahara/conductor/manager.py b/sahara/conductor/manager.py index 5405cb4ee0..329f999abb 100644 --- a/sahara/conductor/manager.py +++ b/sahara/conductor/manager.py @@ -436,6 +436,10 @@ class ConductorManager(db_base.Base): values['credentials'].get('password')): values['credentials']['password'] = key_manager.store_secret( values['credentials']['password'], context) + if (values.get('credentials') and + values['credentials'].get('secretkey')): + values['credentials']['secretkey'] = key_manager.store_secret( + values['credentials']['secretkey'], context) return self.db.data_source_create(context, values) def data_source_destroy(self, context, data_source): @@ -451,6 +455,11 @@ class ConductorManager(db_base.Base): ds_record['credentials'].get('password')): key_manager.delete_secret( ds_record['credentials']['password'], context) + if CONF.use_barbican_key_manager: + if (ds_record.get('credentials') and + ds_record['credentials'].get('secretkey')): + key_manager.delete_secret( + ds_record['credentials']['secretkey'], context) return self.db.data_source_destroy(context, data_source) def data_source_update(self, context, id, values): @@ -468,20 +477,31 @@ class ConductorManager(db_base.Base): # it should be noted that the jsonschema validation ensures that # if the proxy domain is not in use then credentials must be # sent with this record. - if (CONF.use_barbican_key_manager and not - CONF.use_domain_for_proxy_users): - # first we retrieve the original record to get the old key - # uuid, and delete it. + + # first we retrieve the original record to get the old key + # uuid, and delete it. + # next we create the new key. + + if CONF.use_barbican_key_manager: ds_record = self.data_source_get(context, id) if (ds_record.get('credentials') and - ds_record['credentials'].get('password')): + ds_record['credentials'].get('password') and + not CONF.use_domain_for_proxy_users): key_manager.delete_secret( ds_record['credentials']['password'], context) - # next we create the new key. if (values.get('credentials') and - values['credentials'].get('password')): + values['credentials'].get('password') and + not CONF.use_domain_for_proxy_users): values['credentials']['password'] = key_manager.store_secret( values['credentials']['password'], context) + if (ds_record.get('credentials') and + ds_record['credentials'].get('secretkey')): + key_manager.delete_secret( + ds_record['credentials']['secretkey'], context) + if (values.get('credentials') and + values['credentials'].get('secretkey')): + values['credentials']['secretkey'] = key_manager.store_secret( + values['credentials']['secretkey'], context) return self.db.data_source_update(context, values) # JobExecution ops diff --git a/sahara/conductor/resource.py b/sahara/conductor/resource.py index 3cd9d1f009..9271ee1aee 100644 --- a/sahara/conductor/resource.py +++ b/sahara/conductor/resource.py @@ -30,6 +30,7 @@ import six from sahara.conductor import objects from sahara import exceptions as ex from sahara.i18n import _ +from sahara.service.edp import s3_common from sahara.swift import swift_helper from sahara.utils import types @@ -277,6 +278,11 @@ class JobExecution(Resource, objects.JobExecution): configs[swift_helper.HADOOP_SWIFT_USERNAME] = "" if swift_helper.HADOOP_SWIFT_PASSWORD in configs: configs[swift_helper.HADOOP_SWIFT_PASSWORD] = "" + if s3_common.S3_ACCESS_KEY_CONFIG in configs: + configs[s3_common.S3_ACCESS_KEY_CONFIG] = "" + if s3_common.S3_SECRET_KEY_CONFIG in configs: + configs[s3_common.S3_SECRET_KEY_CONFIG] = "" + if 'trusts' in job_configs: del job_configs['trusts'] if 'proxy_configs' in job_configs: diff --git a/sahara/service/edp/data_sources/opts.py b/sahara/service/edp/data_sources/opts.py index 7fd5cc3372..d63dfc7ff5 100644 --- a/sahara/service/edp/data_sources/opts.py +++ b/sahara/service/edp/data_sources/opts.py @@ -19,7 +19,7 @@ from oslo_config import cfg opts = [ cfg.ListOpt('data_source_types', - default=['swift', 'hdfs', 'maprfs', 'manila'], + default=['swift', 'hdfs', 'maprfs', 'manila', 's3'], help='List of data sources types to be loaded. Sahara ' 'preserves the order of the list when returning it.'), ] diff --git a/sahara/service/edp/data_sources/s3/__init__.py b/sahara/service/edp/data_sources/s3/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sahara/service/edp/data_sources/s3/implementation.py b/sahara/service/edp/data_sources/s3/implementation.py new file mode 100644 index 0000000000..2a74000185 --- /dev/null +++ b/sahara/service/edp/data_sources/s3/implementation.py @@ -0,0 +1,82 @@ +# Copyright (c) 2018 OpenStack Contributors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six.moves.urllib.parse as urlparse + +from sahara import exceptions as ex +from sahara.i18n import _ +from sahara.service.edp.data_sources.base import DataSourceType +from sahara.service.edp import s3_common +from sahara.utils import types + + +class S3Type(DataSourceType): + configs_map = {"accesskey": s3_common.S3_ACCESS_KEY_CONFIG, + "secretkey": s3_common.S3_SECRET_KEY_CONFIG, + "endpoint": s3_common.S3_ENDPOINT_CONFIG, + "bucket_in_path": s3_common.S3_BUCKET_IN_PATH_CONFIG, + "ssl": s3_common.S3_SSL_CONFIG} + + bool_keys = ["bucket_in_path", + "ssl"] + + def validate(self, data): + self._validate_url(data['url']) + + # Do validation loosely, and don't require much... the user might have + # (by their own preference) set some or all configs manually + + if "credentials" not in data: + return + + for key in data["credentials"].keys(): + if key not in self.configs_map.keys(): + raise ex.InvalidDataException( + _("Unknown config '%s' for S3 data source") % key) + if key in self.bool_keys: + if not isinstance(data["credentials"][key], bool): + raise ex.InvalidDataException( + _("Config '%s' must be boolean") % key) + + def _validate_url(self, url): + if len(url) == 0: + raise ex.InvalidDataException(_("S3 url must not be empty")) + + url = urlparse.urlparse(url) + if url.scheme != "s3a": + raise ex.InvalidDataException(_("URL scheme must be 's3a'")) + + if not url.hostname: + raise ex.InvalidDataException(_("Bucket name must be present")) + + if not url.path: + raise ex.InvalidDataException(_("Object name must be present")) + + def prepare_cluster(self, data_source, cluster, **kwargs): + if hasattr(data_source, "credentials"): + job_configs = kwargs.pop('job_configs') + + if isinstance(job_configs, types.FrozenDict): + return + if job_configs.get('configs', None) is None: + return + + creds = data_source.credentials + job_conf = job_configs['configs'] + + for config_name, s3a_cfg_name in self.configs_map.items(): + if job_conf.get(s3a_cfg_name, None) is None: # no overwrite + if creds.get(config_name, None) is not None: + job_conf[s3a_cfg_name] = creds[config_name] diff --git a/sahara/service/edp/oozie/workflow_creator/workflow_factory.py b/sahara/service/edp/oozie/workflow_creator/workflow_factory.py index c3c4f633a1..ae97fac6b0 100644 --- a/sahara/service/edp/oozie/workflow_creator/workflow_factory.py +++ b/sahara/service/edp/oozie/workflow_creator/workflow_factory.py @@ -24,6 +24,7 @@ from sahara.service.edp.oozie.workflow_creator import java_workflow from sahara.service.edp.oozie.workflow_creator import mapreduce_workflow from sahara.service.edp.oozie.workflow_creator import pig_workflow from sahara.service.edp.oozie.workflow_creator import shell_workflow +from sahara.service.edp import s3_common from sahara.swift import swift_helper as sw from sahara.swift import utils as su from sahara.utils import edp @@ -117,6 +118,24 @@ class BaseFactory(object): configs[sw.HADOOP_SWIFT_PASSWORD] = ( key_manager.get_secret(src.credentials['password'])) break + for src in (input_data, output_data): + if src.type == "s3" and hasattr(src, "credentials"): + if "accesskey" in src.credentials: + configs[s3_common.S3_ACCESS_KEY_CONFIG] = ( + src.credentials['accesskey']) + if "secretkey" in src.credentials: + configs[s3_common.S3_SECRET_KEY_CONFIG] = ( + key_manager.get_secret(src.credentials['secretkey'])) + if "endpoint" in src.credentials: + configs[s3_common.S3_ENDPOINT_CONFIG] = ( + src.credentials['endpoint']) + if "bucket_in_path" in src.credentials: + configs[s3_common.S3_BUCKET_IN_PATH_CONFIG] = ( + src.credentials['bucket_in_path']) + if "ssl" in src.credentials: + configs[s3_common.S3_SSL_CONFIG] = ( + src.credentials['ssl']) + break return configs def get_params(self, input_data, output_data, data_source_urls): @@ -220,6 +239,7 @@ class JavaFactory(BaseFactory): return main_class, java_opts, args def get_configs(self, proxy_configs=None): + # TODO(jfreud): allow s3 and non-proxy swift configs here? configs = {} if proxy_configs: diff --git a/sahara/service/edp/s3_common.py b/sahara/service/edp/s3_common.py index 4b6e532bdf..dbfa899075 100644 --- a/sahara/service/edp/s3_common.py +++ b/sahara/service/edp/s3_common.py @@ -22,6 +22,16 @@ from sahara.i18n import _ from sahara.service.castellan import utils as key_manager S3_JB_PREFIX = "s3://" +S3_ACCESS_KEY_CONFIG = "fs.s3a.access.key" +S3_SECRET_KEY_CONFIG = "fs.s3a.secret.key" +S3_ENDPOINT_CONFIG = "fs.s3a.endpoint" +S3_BUCKET_IN_PATH_CONFIG = "fs.s3a.path.style.access" +S3_SSL_CONFIG = "fs.s3a.connection.ssl.enabled" +S3_DS_CONFIGS = [S3_ACCESS_KEY_CONFIG, + S3_SECRET_KEY_CONFIG, + S3_ENDPOINT_CONFIG, + S3_BUCKET_IN_PATH_CONFIG, + S3_SSL_CONFIG] CONF = cfg.CONF diff --git a/sahara/service/edp/spark/engine.py b/sahara/service/edp/spark/engine.py index fb774c1666..83a35ff3d4 100644 --- a/sahara/service/edp/spark/engine.py +++ b/sahara/service/edp/spark/engine.py @@ -27,6 +27,7 @@ from sahara.service.castellan import utils as key_manager from sahara.service.edp import base_engine from sahara.service.edp.job_binaries import manager as jb_manager from sahara.service.edp import job_utils +from sahara.service.edp import s3_common from sahara.service.validations.edp import job_execution as j from sahara.swift import swift_helper as sw from sahara.swift import utils as su @@ -115,6 +116,7 @@ class SparkJobEngine(base_engine.JobEngine): xml_name = 'spark.xml' proxy_configs = job_configs.get('proxy_configs') configs = {} + cfgs = job_configs.get('configs', {}) if proxy_configs: configs[sw.HADOOP_SWIFT_USERNAME] = proxy_configs.get( 'proxy_username') @@ -124,9 +126,21 @@ class SparkJobEngine(base_engine.JobEngine): 'proxy_trust_id') configs[sw.HADOOP_SWIFT_DOMAIN_NAME] = CONF.proxy_user_domain_name else: - cfgs = job_configs.get('configs', {}) - targets = [sw.HADOOP_SWIFT_USERNAME, sw.HADOOP_SWIFT_PASSWORD] + targets = [sw.HADOOP_SWIFT_USERNAME] configs = {k: cfgs[k] for k in targets if k in cfgs} + if sw.HADOOP_SWIFT_PASSWORD in cfgs: + configs[sw.HADOOP_SWIFT_PASSWORD] = ( + key_manager.get_secret(cfgs[sw.HADOOP_SWIFT_PASSWORD]) + ) + + for s3_cfg_key in s3_common.S3_DS_CONFIGS: + if s3_cfg_key in cfgs: + if s3_cfg_key == s3_common.S3_SECRET_KEY_CONFIG: + configs[s3_cfg_key] = ( + key_manager.get_secret(cfgs[s3_cfg_key]) + ) + else: + configs[s3_cfg_key] = cfgs[s3_cfg_key] content = xmlutils.create_hadoop_xml(configs) with remote.get_remote(where) as r: diff --git a/sahara/service/validations/edp/base.py b/sahara/service/validations/edp/base.py index 9ed04b49eb..31caec3168 100644 --- a/sahara/service/validations/edp/base.py +++ b/sahara/service/validations/edp/base.py @@ -24,7 +24,7 @@ conductor = c.API data_source_type = { "type": "string", - "enum": ["swift", "hdfs", "maprfs", "manila"] + "enum": ["swift", "hdfs", "maprfs", "manila", "s3"] } job_configs = { diff --git a/sahara/tests/unit/service/edp/data_sources/s3/__init__.py b/sahara/tests/unit/service/edp/data_sources/s3/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sahara/tests/unit/service/edp/data_sources/s3/test_s3_type.py b/sahara/tests/unit/service/edp/data_sources/s3/test_s3_type.py new file mode 100644 index 0000000000..2da7a129c2 --- /dev/null +++ b/sahara/tests/unit/service/edp/data_sources/s3/test_s3_type.py @@ -0,0 +1,113 @@ +# Copyright (c) 2018 OpenStack Contributors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import testtools + +import sahara.exceptions as ex +from sahara.service.edp.data_sources.s3.implementation import S3Type +from sahara.tests.unit import base +from sahara.utils.types import FrozenDict + + +class TestSwiftType(base.SaharaTestCase): + def setUp(self): + super(TestSwiftType, self).setUp() + self.s_type = S3Type() + + def test_validate(self): + data = { + "name": "test_data_data_source", + "type": "s3", + "url": "s3a://mybucket/myobject", + } + self.s_type.validate(data) + + creds = {} + data["credentials"] = creds + self.s_type.validate(data) + + creds["accesskey"] = "key" + creds["secretkey"] = "key2" + self.s_type.validate(data) + + creds["bucket_in_path"] = True + creds["ssl"] = True + creds["endpoint"] = "blah.org" + self.s_type.validate(data) + + creds["cool_key"] = "wow" + with testtools.ExpectedException(ex.InvalidDataException): + self.s_type.validate(data) + + creds.pop("cool_key") + + creds["ssl"] = "yeah" + with testtools.ExpectedException(ex.InvalidDataException): + self.s_type.validate(data) + + creds["ssl"] = True + creds["bucket_in_path"] = "yeah" + with testtools.ExpectedException(ex.InvalidDataException): + self.s_type.validate(data) + + def test_validate_url(self): + url = "" + with testtools.ExpectedException(ex.InvalidDataException): + self.s_type._validate_url(url) + url = "s3a://" + with testtools.ExpectedException(ex.InvalidDataException): + self.s_type._validate_url(url) + url = "s3a:///" + with testtools.ExpectedException(ex.InvalidDataException): + self.s_type._validate_url(url) + url = "s3a://bucket" + with testtools.ExpectedException(ex.InvalidDataException): + self.s_type._validate_url(url) + url = "s3b://bucket/obj" + with testtools.ExpectedException(ex.InvalidDataException): + self.s_type._validate_url(url) + url = "s3a://bucket/obj" + self.s_type._validate_url(url) + url = "s3a://bucket/fold/obj" + self.s_type._validate_url(url) + url = "s3a://bucket/obj/" + self.s_type._validate_url(url) + + def test_prepare_cluster(self): + ds = mock.Mock() + cluster = mock.Mock() + ds.credentials = {} + job_configs = {} + + self.s_type.prepare_cluster(ds, cluster, job_configs=job_configs) + self.assertEqual(job_configs, {}) + + job_configs['configs'] = {} + ds.credentials['accesskey'] = 'key' + self.s_type.prepare_cluster(ds, cluster, job_configs=job_configs) + self.assertEqual(job_configs['configs'], {'fs.s3a.access.key': 'key'}) + + job_configs['configs'] = {'fs.s3a.access.key': 'key2'} + self.s_type.prepare_cluster(ds, cluster, job_configs=job_configs) + self.assertEqual(job_configs['configs'], {'fs.s3a.access.key': 'key2'}) + + job_configs = FrozenDict({'configs': {}}) + self.s_type.prepare_cluster(ds, cluster, job_configs=job_configs) + self.assertNotIn(job_configs['configs'], 'accesskey') + + job_configs = {} + self.s_type.prepare_cluster(ds, cluster, job_configs=job_configs) + self.assertEqual(job_configs, {}) diff --git a/setup.cfg b/setup.cfg index 38f2b84d3b..48de0d796f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -59,6 +59,7 @@ sahara.data_source.types = manila = sahara.service.edp.data_sources.manila.implementation:ManilaType maprfs = sahara.service.edp.data_sources.maprfs.implementation:MapRFSType swift = sahara.service.edp.data_sources.swift.implementation:SwiftType + s3 = sahara.service.edp.data_sources.s3.implementation:S3Type sahara.job_binary.types = internal-db = sahara.service.edp.job_binaries.internal_db.implementation:InternalDBType