Merge "S3 data source"

This commit is contained in:
Zuul 2018-07-11 23:02:24 +00:00 committed by Gerrit Code Review
commit 1b9cd66f86
14 changed files with 301 additions and 13 deletions

View File

@ -19,6 +19,7 @@ of jobs on clusters created from sahara. EDP supports:
+ HDFS for all job types
+ swift for all types excluding Hive
+ manila (NFS shares only) for all types excluding Pig
+ Any S3-like object store
* configuration of jobs at submission time
* execution of jobs on existing clusters or transient clusters
@ -69,7 +70,8 @@ running in the same OpenStack installation referenced by sahara.
Sahara requires the following credentials/configs to access files stored in an
S3-like object store: ``accesskey``, ``secretkey``, ``endpoint``.
These credentials are specified through the `extra` in the body of the request
when creating a job binary or data source referencing S3.
when creating a job binary referencing S3. The value of ``endpoint`` should
include a protocol: *http* or *https*.
To reference a binary file stored in manila, create the job binary with the
URL ``manila://{share_id}/{path}``. This assumes that you have already stored
@ -132,6 +134,19 @@ share as a data source, create the data source with the URL
share will be automatically mounted to your cluster's nodes as needed to
access the data source.
Finally, Sahara supports data sources referring to S3-like object stores. The
URL should be of the form ``s3a://{bucket}/{path}``. Also, the following
credentials/configs are understood: ``accesskey``, ``secretkey``,
``endpoint``, ``bucket_in_path``, and ``ssl``. These credentials are specified
through the ``credentials`` attribute of the body of the request when creating
a data source referencing S3. The value of ``endpoint`` should **NOT** include
a protocol (*http* or *https*), unlike when referencing an S3 job binary. It
can also be noted that Sahara clusters can interact with S3-like stores even
when not using EDP, i.e. when manually operating the cluster instead. Consult
the `hadoop-aws documentation <https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html>`_
for more information. Also, be advised that hadoop-aws will only write a job's
output into a bucket which already exists: it does not create new buckets.
Some job types require the use of data source objects to specify input and
output when a job is launched. For example, when running a Pig job the UI will
prompt the user for input and output data source objects.
@ -572,7 +587,8 @@ Spark jobs use some special configuration values:
* ``edp.spark.adapt_for_swift`` (optional) If set to **True**, instructs
sahara to modify the job's Hadoop configuration so that swift paths may be
accessed. Without this configuration value, swift paths will not be
accessible to Spark jobs. The default is **False**.
accessible to Spark jobs. The default is **False**. Despite the name, the
same principle applies to jobs which reference paths in S3-like stores.
* ``edp.spark.driver.classpath`` (optional) If set to empty string sahara
will use default classpath for the cluster during job execution.
@ -620,6 +636,9 @@ For job binaries only, S3 urls take the form:
``s3://bucket/path/to/object``
For data sources, S3 urls take the standard Hadoop form:
``s3a://bucket/path/to/object``
EDP Requirements
================

View File

@ -0,0 +1,3 @@
---
features:
- An EDP data source may reference a file stored in a S3-like object store.

View File

@ -436,6 +436,10 @@ class ConductorManager(db_base.Base):
values['credentials'].get('password')):
values['credentials']['password'] = key_manager.store_secret(
values['credentials']['password'], context)
if (values.get('credentials') and
values['credentials'].get('secretkey')):
values['credentials']['secretkey'] = key_manager.store_secret(
values['credentials']['secretkey'], context)
return self.db.data_source_create(context, values)
def data_source_destroy(self, context, data_source):
@ -451,6 +455,11 @@ class ConductorManager(db_base.Base):
ds_record['credentials'].get('password')):
key_manager.delete_secret(
ds_record['credentials']['password'], context)
if CONF.use_barbican_key_manager:
if (ds_record.get('credentials') and
ds_record['credentials'].get('secretkey')):
key_manager.delete_secret(
ds_record['credentials']['secretkey'], context)
return self.db.data_source_destroy(context, data_source)
def data_source_update(self, context, id, values):
@ -468,20 +477,31 @@ class ConductorManager(db_base.Base):
# it should be noted that the jsonschema validation ensures that
# if the proxy domain is not in use then credentials must be
# sent with this record.
if (CONF.use_barbican_key_manager and not
CONF.use_domain_for_proxy_users):
# first we retrieve the original record to get the old key
# uuid, and delete it.
# first we retrieve the original record to get the old key
# uuid, and delete it.
# next we create the new key.
if CONF.use_barbican_key_manager:
ds_record = self.data_source_get(context, id)
if (ds_record.get('credentials') and
ds_record['credentials'].get('password')):
ds_record['credentials'].get('password') and
not CONF.use_domain_for_proxy_users):
key_manager.delete_secret(
ds_record['credentials']['password'], context)
# next we create the new key.
if (values.get('credentials') and
values['credentials'].get('password')):
values['credentials'].get('password') and
not CONF.use_domain_for_proxy_users):
values['credentials']['password'] = key_manager.store_secret(
values['credentials']['password'], context)
if (ds_record.get('credentials') and
ds_record['credentials'].get('secretkey')):
key_manager.delete_secret(
ds_record['credentials']['secretkey'], context)
if (values.get('credentials') and
values['credentials'].get('secretkey')):
values['credentials']['secretkey'] = key_manager.store_secret(
values['credentials']['secretkey'], context)
return self.db.data_source_update(context, values)
# JobExecution ops

View File

@ -30,6 +30,7 @@ import six
from sahara.conductor import objects
from sahara import exceptions as ex
from sahara.i18n import _
from sahara.service.edp import s3_common
from sahara.swift import swift_helper
from sahara.utils import types
@ -277,6 +278,11 @@ class JobExecution(Resource, objects.JobExecution):
configs[swift_helper.HADOOP_SWIFT_USERNAME] = ""
if swift_helper.HADOOP_SWIFT_PASSWORD in configs:
configs[swift_helper.HADOOP_SWIFT_PASSWORD] = ""
if s3_common.S3_ACCESS_KEY_CONFIG in configs:
configs[s3_common.S3_ACCESS_KEY_CONFIG] = ""
if s3_common.S3_SECRET_KEY_CONFIG in configs:
configs[s3_common.S3_SECRET_KEY_CONFIG] = ""
if 'trusts' in job_configs:
del job_configs['trusts']
if 'proxy_configs' in job_configs:

View File

@ -19,7 +19,7 @@ from oslo_config import cfg
opts = [
cfg.ListOpt('data_source_types',
default=['swift', 'hdfs', 'maprfs', 'manila'],
default=['swift', 'hdfs', 'maprfs', 'manila', 's3'],
help='List of data sources types to be loaded. Sahara '
'preserves the order of the list when returning it.'),
]

View File

@ -0,0 +1,82 @@
# Copyright (c) 2018 OpenStack Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import six.moves.urllib.parse as urlparse
from sahara import exceptions as ex
from sahara.i18n import _
from sahara.service.edp.data_sources.base import DataSourceType
from sahara.service.edp import s3_common
from sahara.utils import types
class S3Type(DataSourceType):
configs_map = {"accesskey": s3_common.S3_ACCESS_KEY_CONFIG,
"secretkey": s3_common.S3_SECRET_KEY_CONFIG,
"endpoint": s3_common.S3_ENDPOINT_CONFIG,
"bucket_in_path": s3_common.S3_BUCKET_IN_PATH_CONFIG,
"ssl": s3_common.S3_SSL_CONFIG}
bool_keys = ["bucket_in_path",
"ssl"]
def validate(self, data):
self._validate_url(data['url'])
# Do validation loosely, and don't require much... the user might have
# (by their own preference) set some or all configs manually
if "credentials" not in data:
return
for key in data["credentials"].keys():
if key not in self.configs_map.keys():
raise ex.InvalidDataException(
_("Unknown config '%s' for S3 data source") % key)
if key in self.bool_keys:
if not isinstance(data["credentials"][key], bool):
raise ex.InvalidDataException(
_("Config '%s' must be boolean") % key)
def _validate_url(self, url):
if len(url) == 0:
raise ex.InvalidDataException(_("S3 url must not be empty"))
url = urlparse.urlparse(url)
if url.scheme != "s3a":
raise ex.InvalidDataException(_("URL scheme must be 's3a'"))
if not url.hostname:
raise ex.InvalidDataException(_("Bucket name must be present"))
if not url.path:
raise ex.InvalidDataException(_("Object name must be present"))
def prepare_cluster(self, data_source, cluster, **kwargs):
if hasattr(data_source, "credentials"):
job_configs = kwargs.pop('job_configs')
if isinstance(job_configs, types.FrozenDict):
return
if job_configs.get('configs', None) is None:
return
creds = data_source.credentials
job_conf = job_configs['configs']
for config_name, s3a_cfg_name in self.configs_map.items():
if job_conf.get(s3a_cfg_name, None) is None: # no overwrite
if creds.get(config_name, None) is not None:
job_conf[s3a_cfg_name] = creds[config_name]

View File

@ -24,6 +24,7 @@ from sahara.service.edp.oozie.workflow_creator import java_workflow
from sahara.service.edp.oozie.workflow_creator import mapreduce_workflow
from sahara.service.edp.oozie.workflow_creator import pig_workflow
from sahara.service.edp.oozie.workflow_creator import shell_workflow
from sahara.service.edp import s3_common
from sahara.swift import swift_helper as sw
from sahara.swift import utils as su
from sahara.utils import edp
@ -117,6 +118,24 @@ class BaseFactory(object):
configs[sw.HADOOP_SWIFT_PASSWORD] = (
key_manager.get_secret(src.credentials['password']))
break
for src in (input_data, output_data):
if src.type == "s3" and hasattr(src, "credentials"):
if "accesskey" in src.credentials:
configs[s3_common.S3_ACCESS_KEY_CONFIG] = (
src.credentials['accesskey'])
if "secretkey" in src.credentials:
configs[s3_common.S3_SECRET_KEY_CONFIG] = (
key_manager.get_secret(src.credentials['secretkey']))
if "endpoint" in src.credentials:
configs[s3_common.S3_ENDPOINT_CONFIG] = (
src.credentials['endpoint'])
if "bucket_in_path" in src.credentials:
configs[s3_common.S3_BUCKET_IN_PATH_CONFIG] = (
src.credentials['bucket_in_path'])
if "ssl" in src.credentials:
configs[s3_common.S3_SSL_CONFIG] = (
src.credentials['ssl'])
break
return configs
def get_params(self, input_data, output_data, data_source_urls):
@ -220,6 +239,7 @@ class JavaFactory(BaseFactory):
return main_class, java_opts, args
def get_configs(self, proxy_configs=None):
# TODO(jfreud): allow s3 and non-proxy swift configs here?
configs = {}
if proxy_configs:

View File

@ -22,6 +22,16 @@ from sahara.i18n import _
from sahara.service.castellan import utils as key_manager
S3_JB_PREFIX = "s3://"
S3_ACCESS_KEY_CONFIG = "fs.s3a.access.key"
S3_SECRET_KEY_CONFIG = "fs.s3a.secret.key"
S3_ENDPOINT_CONFIG = "fs.s3a.endpoint"
S3_BUCKET_IN_PATH_CONFIG = "fs.s3a.path.style.access"
S3_SSL_CONFIG = "fs.s3a.connection.ssl.enabled"
S3_DS_CONFIGS = [S3_ACCESS_KEY_CONFIG,
S3_SECRET_KEY_CONFIG,
S3_ENDPOINT_CONFIG,
S3_BUCKET_IN_PATH_CONFIG,
S3_SSL_CONFIG]
CONF = cfg.CONF

View File

@ -27,6 +27,7 @@ from sahara.service.castellan import utils as key_manager
from sahara.service.edp import base_engine
from sahara.service.edp.job_binaries import manager as jb_manager
from sahara.service.edp import job_utils
from sahara.service.edp import s3_common
from sahara.service.validations.edp import job_execution as j
from sahara.swift import swift_helper as sw
from sahara.swift import utils as su
@ -115,6 +116,7 @@ class SparkJobEngine(base_engine.JobEngine):
xml_name = 'spark.xml'
proxy_configs = job_configs.get('proxy_configs')
configs = {}
cfgs = job_configs.get('configs', {})
if proxy_configs:
configs[sw.HADOOP_SWIFT_USERNAME] = proxy_configs.get(
'proxy_username')
@ -124,9 +126,21 @@ class SparkJobEngine(base_engine.JobEngine):
'proxy_trust_id')
configs[sw.HADOOP_SWIFT_DOMAIN_NAME] = CONF.proxy_user_domain_name
else:
cfgs = job_configs.get('configs', {})
targets = [sw.HADOOP_SWIFT_USERNAME, sw.HADOOP_SWIFT_PASSWORD]
targets = [sw.HADOOP_SWIFT_USERNAME]
configs = {k: cfgs[k] for k in targets if k in cfgs}
if sw.HADOOP_SWIFT_PASSWORD in cfgs:
configs[sw.HADOOP_SWIFT_PASSWORD] = (
key_manager.get_secret(cfgs[sw.HADOOP_SWIFT_PASSWORD])
)
for s3_cfg_key in s3_common.S3_DS_CONFIGS:
if s3_cfg_key in cfgs:
if s3_cfg_key == s3_common.S3_SECRET_KEY_CONFIG:
configs[s3_cfg_key] = (
key_manager.get_secret(cfgs[s3_cfg_key])
)
else:
configs[s3_cfg_key] = cfgs[s3_cfg_key]
content = xmlutils.create_hadoop_xml(configs)
with remote.get_remote(where) as r:

View File

@ -24,7 +24,7 @@ conductor = c.API
data_source_type = {
"type": "string",
"enum": ["swift", "hdfs", "maprfs", "manila"]
"enum": ["swift", "hdfs", "maprfs", "manila", "s3"]
}
job_configs = {

View File

@ -0,0 +1,113 @@
# Copyright (c) 2018 OpenStack Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import testtools
import sahara.exceptions as ex
from sahara.service.edp.data_sources.s3.implementation import S3Type
from sahara.tests.unit import base
from sahara.utils.types import FrozenDict
class TestSwiftType(base.SaharaTestCase):
def setUp(self):
super(TestSwiftType, self).setUp()
self.s_type = S3Type()
def test_validate(self):
data = {
"name": "test_data_data_source",
"type": "s3",
"url": "s3a://mybucket/myobject",
}
self.s_type.validate(data)
creds = {}
data["credentials"] = creds
self.s_type.validate(data)
creds["accesskey"] = "key"
creds["secretkey"] = "key2"
self.s_type.validate(data)
creds["bucket_in_path"] = True
creds["ssl"] = True
creds["endpoint"] = "blah.org"
self.s_type.validate(data)
creds["cool_key"] = "wow"
with testtools.ExpectedException(ex.InvalidDataException):
self.s_type.validate(data)
creds.pop("cool_key")
creds["ssl"] = "yeah"
with testtools.ExpectedException(ex.InvalidDataException):
self.s_type.validate(data)
creds["ssl"] = True
creds["bucket_in_path"] = "yeah"
with testtools.ExpectedException(ex.InvalidDataException):
self.s_type.validate(data)
def test_validate_url(self):
url = ""
with testtools.ExpectedException(ex.InvalidDataException):
self.s_type._validate_url(url)
url = "s3a://"
with testtools.ExpectedException(ex.InvalidDataException):
self.s_type._validate_url(url)
url = "s3a:///"
with testtools.ExpectedException(ex.InvalidDataException):
self.s_type._validate_url(url)
url = "s3a://bucket"
with testtools.ExpectedException(ex.InvalidDataException):
self.s_type._validate_url(url)
url = "s3b://bucket/obj"
with testtools.ExpectedException(ex.InvalidDataException):
self.s_type._validate_url(url)
url = "s3a://bucket/obj"
self.s_type._validate_url(url)
url = "s3a://bucket/fold/obj"
self.s_type._validate_url(url)
url = "s3a://bucket/obj/"
self.s_type._validate_url(url)
def test_prepare_cluster(self):
ds = mock.Mock()
cluster = mock.Mock()
ds.credentials = {}
job_configs = {}
self.s_type.prepare_cluster(ds, cluster, job_configs=job_configs)
self.assertEqual(job_configs, {})
job_configs['configs'] = {}
ds.credentials['accesskey'] = 'key'
self.s_type.prepare_cluster(ds, cluster, job_configs=job_configs)
self.assertEqual(job_configs['configs'], {'fs.s3a.access.key': 'key'})
job_configs['configs'] = {'fs.s3a.access.key': 'key2'}
self.s_type.prepare_cluster(ds, cluster, job_configs=job_configs)
self.assertEqual(job_configs['configs'], {'fs.s3a.access.key': 'key2'})
job_configs = FrozenDict({'configs': {}})
self.s_type.prepare_cluster(ds, cluster, job_configs=job_configs)
self.assertNotIn(job_configs['configs'], 'accesskey')
job_configs = {}
self.s_type.prepare_cluster(ds, cluster, job_configs=job_configs)
self.assertEqual(job_configs, {})

View File

@ -59,6 +59,7 @@ sahara.data_source.types =
manila = sahara.service.edp.data_sources.manila.implementation:ManilaType
maprfs = sahara.service.edp.data_sources.maprfs.implementation:MapRFSType
swift = sahara.service.edp.data_sources.swift.implementation:SwiftType
s3 = sahara.service.edp.data_sources.s3.implementation:S3Type
sahara.job_binary.types =
internal-db = sahara.service.edp.job_binaries.internal_db.implementation:InternalDBType