Merge "S3 data source"
This commit is contained in:
commit
1b9cd66f86
|
@ -19,6 +19,7 @@ of jobs on clusters created from sahara. EDP supports:
|
|||
+ HDFS for all job types
|
||||
+ swift for all types excluding Hive
|
||||
+ manila (NFS shares only) for all types excluding Pig
|
||||
+ Any S3-like object store
|
||||
|
||||
* configuration of jobs at submission time
|
||||
* execution of jobs on existing clusters or transient clusters
|
||||
|
@ -69,7 +70,8 @@ running in the same OpenStack installation referenced by sahara.
|
|||
Sahara requires the following credentials/configs to access files stored in an
|
||||
S3-like object store: ``accesskey``, ``secretkey``, ``endpoint``.
|
||||
These credentials are specified through the `extra` in the body of the request
|
||||
when creating a job binary or data source referencing S3.
|
||||
when creating a job binary referencing S3. The value of ``endpoint`` should
|
||||
include a protocol: *http* or *https*.
|
||||
|
||||
To reference a binary file stored in manila, create the job binary with the
|
||||
URL ``manila://{share_id}/{path}``. This assumes that you have already stored
|
||||
|
@ -132,6 +134,19 @@ share as a data source, create the data source with the URL
|
|||
share will be automatically mounted to your cluster's nodes as needed to
|
||||
access the data source.
|
||||
|
||||
Finally, Sahara supports data sources referring to S3-like object stores. The
|
||||
URL should be of the form ``s3a://{bucket}/{path}``. Also, the following
|
||||
credentials/configs are understood: ``accesskey``, ``secretkey``,
|
||||
``endpoint``, ``bucket_in_path``, and ``ssl``. These credentials are specified
|
||||
through the ``credentials`` attribute of the body of the request when creating
|
||||
a data source referencing S3. The value of ``endpoint`` should **NOT** include
|
||||
a protocol (*http* or *https*), unlike when referencing an S3 job binary. It
|
||||
can also be noted that Sahara clusters can interact with S3-like stores even
|
||||
when not using EDP, i.e. when manually operating the cluster instead. Consult
|
||||
the `hadoop-aws documentation <https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html>`_
|
||||
for more information. Also, be advised that hadoop-aws will only write a job's
|
||||
output into a bucket which already exists: it does not create new buckets.
|
||||
|
||||
Some job types require the use of data source objects to specify input and
|
||||
output when a job is launched. For example, when running a Pig job the UI will
|
||||
prompt the user for input and output data source objects.
|
||||
|
@ -572,7 +587,8 @@ Spark jobs use some special configuration values:
|
|||
* ``edp.spark.adapt_for_swift`` (optional) If set to **True**, instructs
|
||||
sahara to modify the job's Hadoop configuration so that swift paths may be
|
||||
accessed. Without this configuration value, swift paths will not be
|
||||
accessible to Spark jobs. The default is **False**.
|
||||
accessible to Spark jobs. The default is **False**. Despite the name, the
|
||||
same principle applies to jobs which reference paths in S3-like stores.
|
||||
|
||||
* ``edp.spark.driver.classpath`` (optional) If set to empty string sahara
|
||||
will use default classpath for the cluster during job execution.
|
||||
|
@ -620,6 +636,9 @@ For job binaries only, S3 urls take the form:
|
|||
|
||||
``s3://bucket/path/to/object``
|
||||
|
||||
For data sources, S3 urls take the standard Hadoop form:
|
||||
|
||||
``s3a://bucket/path/to/object``
|
||||
|
||||
EDP Requirements
|
||||
================
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
---
|
||||
features:
|
||||
- An EDP data source may reference a file stored in a S3-like object store.
|
|
@ -436,6 +436,10 @@ class ConductorManager(db_base.Base):
|
|||
values['credentials'].get('password')):
|
||||
values['credentials']['password'] = key_manager.store_secret(
|
||||
values['credentials']['password'], context)
|
||||
if (values.get('credentials') and
|
||||
values['credentials'].get('secretkey')):
|
||||
values['credentials']['secretkey'] = key_manager.store_secret(
|
||||
values['credentials']['secretkey'], context)
|
||||
return self.db.data_source_create(context, values)
|
||||
|
||||
def data_source_destroy(self, context, data_source):
|
||||
|
@ -451,6 +455,11 @@ class ConductorManager(db_base.Base):
|
|||
ds_record['credentials'].get('password')):
|
||||
key_manager.delete_secret(
|
||||
ds_record['credentials']['password'], context)
|
||||
if CONF.use_barbican_key_manager:
|
||||
if (ds_record.get('credentials') and
|
||||
ds_record['credentials'].get('secretkey')):
|
||||
key_manager.delete_secret(
|
||||
ds_record['credentials']['secretkey'], context)
|
||||
return self.db.data_source_destroy(context, data_source)
|
||||
|
||||
def data_source_update(self, context, id, values):
|
||||
|
@ -468,20 +477,31 @@ class ConductorManager(db_base.Base):
|
|||
# it should be noted that the jsonschema validation ensures that
|
||||
# if the proxy domain is not in use then credentials must be
|
||||
# sent with this record.
|
||||
if (CONF.use_barbican_key_manager and not
|
||||
CONF.use_domain_for_proxy_users):
|
||||
# first we retrieve the original record to get the old key
|
||||
# uuid, and delete it.
|
||||
|
||||
# first we retrieve the original record to get the old key
|
||||
# uuid, and delete it.
|
||||
# next we create the new key.
|
||||
|
||||
if CONF.use_barbican_key_manager:
|
||||
ds_record = self.data_source_get(context, id)
|
||||
if (ds_record.get('credentials') and
|
||||
ds_record['credentials'].get('password')):
|
||||
ds_record['credentials'].get('password') and
|
||||
not CONF.use_domain_for_proxy_users):
|
||||
key_manager.delete_secret(
|
||||
ds_record['credentials']['password'], context)
|
||||
# next we create the new key.
|
||||
if (values.get('credentials') and
|
||||
values['credentials'].get('password')):
|
||||
values['credentials'].get('password') and
|
||||
not CONF.use_domain_for_proxy_users):
|
||||
values['credentials']['password'] = key_manager.store_secret(
|
||||
values['credentials']['password'], context)
|
||||
if (ds_record.get('credentials') and
|
||||
ds_record['credentials'].get('secretkey')):
|
||||
key_manager.delete_secret(
|
||||
ds_record['credentials']['secretkey'], context)
|
||||
if (values.get('credentials') and
|
||||
values['credentials'].get('secretkey')):
|
||||
values['credentials']['secretkey'] = key_manager.store_secret(
|
||||
values['credentials']['secretkey'], context)
|
||||
return self.db.data_source_update(context, values)
|
||||
|
||||
# JobExecution ops
|
||||
|
|
|
@ -30,6 +30,7 @@ import six
|
|||
from sahara.conductor import objects
|
||||
from sahara import exceptions as ex
|
||||
from sahara.i18n import _
|
||||
from sahara.service.edp import s3_common
|
||||
from sahara.swift import swift_helper
|
||||
from sahara.utils import types
|
||||
|
||||
|
@ -277,6 +278,11 @@ class JobExecution(Resource, objects.JobExecution):
|
|||
configs[swift_helper.HADOOP_SWIFT_USERNAME] = ""
|
||||
if swift_helper.HADOOP_SWIFT_PASSWORD in configs:
|
||||
configs[swift_helper.HADOOP_SWIFT_PASSWORD] = ""
|
||||
if s3_common.S3_ACCESS_KEY_CONFIG in configs:
|
||||
configs[s3_common.S3_ACCESS_KEY_CONFIG] = ""
|
||||
if s3_common.S3_SECRET_KEY_CONFIG in configs:
|
||||
configs[s3_common.S3_SECRET_KEY_CONFIG] = ""
|
||||
|
||||
if 'trusts' in job_configs:
|
||||
del job_configs['trusts']
|
||||
if 'proxy_configs' in job_configs:
|
||||
|
|
|
@ -19,7 +19,7 @@ from oslo_config import cfg
|
|||
|
||||
opts = [
|
||||
cfg.ListOpt('data_source_types',
|
||||
default=['swift', 'hdfs', 'maprfs', 'manila'],
|
||||
default=['swift', 'hdfs', 'maprfs', 'manila', 's3'],
|
||||
help='List of data sources types to be loaded. Sahara '
|
||||
'preserves the order of the list when returning it.'),
|
||||
]
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
# Copyright (c) 2018 OpenStack Contributors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import six.moves.urllib.parse as urlparse
|
||||
|
||||
from sahara import exceptions as ex
|
||||
from sahara.i18n import _
|
||||
from sahara.service.edp.data_sources.base import DataSourceType
|
||||
from sahara.service.edp import s3_common
|
||||
from sahara.utils import types
|
||||
|
||||
|
||||
class S3Type(DataSourceType):
|
||||
configs_map = {"accesskey": s3_common.S3_ACCESS_KEY_CONFIG,
|
||||
"secretkey": s3_common.S3_SECRET_KEY_CONFIG,
|
||||
"endpoint": s3_common.S3_ENDPOINT_CONFIG,
|
||||
"bucket_in_path": s3_common.S3_BUCKET_IN_PATH_CONFIG,
|
||||
"ssl": s3_common.S3_SSL_CONFIG}
|
||||
|
||||
bool_keys = ["bucket_in_path",
|
||||
"ssl"]
|
||||
|
||||
def validate(self, data):
|
||||
self._validate_url(data['url'])
|
||||
|
||||
# Do validation loosely, and don't require much... the user might have
|
||||
# (by their own preference) set some or all configs manually
|
||||
|
||||
if "credentials" not in data:
|
||||
return
|
||||
|
||||
for key in data["credentials"].keys():
|
||||
if key not in self.configs_map.keys():
|
||||
raise ex.InvalidDataException(
|
||||
_("Unknown config '%s' for S3 data source") % key)
|
||||
if key in self.bool_keys:
|
||||
if not isinstance(data["credentials"][key], bool):
|
||||
raise ex.InvalidDataException(
|
||||
_("Config '%s' must be boolean") % key)
|
||||
|
||||
def _validate_url(self, url):
|
||||
if len(url) == 0:
|
||||
raise ex.InvalidDataException(_("S3 url must not be empty"))
|
||||
|
||||
url = urlparse.urlparse(url)
|
||||
if url.scheme != "s3a":
|
||||
raise ex.InvalidDataException(_("URL scheme must be 's3a'"))
|
||||
|
||||
if not url.hostname:
|
||||
raise ex.InvalidDataException(_("Bucket name must be present"))
|
||||
|
||||
if not url.path:
|
||||
raise ex.InvalidDataException(_("Object name must be present"))
|
||||
|
||||
def prepare_cluster(self, data_source, cluster, **kwargs):
|
||||
if hasattr(data_source, "credentials"):
|
||||
job_configs = kwargs.pop('job_configs')
|
||||
|
||||
if isinstance(job_configs, types.FrozenDict):
|
||||
return
|
||||
if job_configs.get('configs', None) is None:
|
||||
return
|
||||
|
||||
creds = data_source.credentials
|
||||
job_conf = job_configs['configs']
|
||||
|
||||
for config_name, s3a_cfg_name in self.configs_map.items():
|
||||
if job_conf.get(s3a_cfg_name, None) is None: # no overwrite
|
||||
if creds.get(config_name, None) is not None:
|
||||
job_conf[s3a_cfg_name] = creds[config_name]
|
|
@ -24,6 +24,7 @@ from sahara.service.edp.oozie.workflow_creator import java_workflow
|
|||
from sahara.service.edp.oozie.workflow_creator import mapreduce_workflow
|
||||
from sahara.service.edp.oozie.workflow_creator import pig_workflow
|
||||
from sahara.service.edp.oozie.workflow_creator import shell_workflow
|
||||
from sahara.service.edp import s3_common
|
||||
from sahara.swift import swift_helper as sw
|
||||
from sahara.swift import utils as su
|
||||
from sahara.utils import edp
|
||||
|
@ -117,6 +118,24 @@ class BaseFactory(object):
|
|||
configs[sw.HADOOP_SWIFT_PASSWORD] = (
|
||||
key_manager.get_secret(src.credentials['password']))
|
||||
break
|
||||
for src in (input_data, output_data):
|
||||
if src.type == "s3" and hasattr(src, "credentials"):
|
||||
if "accesskey" in src.credentials:
|
||||
configs[s3_common.S3_ACCESS_KEY_CONFIG] = (
|
||||
src.credentials['accesskey'])
|
||||
if "secretkey" in src.credentials:
|
||||
configs[s3_common.S3_SECRET_KEY_CONFIG] = (
|
||||
key_manager.get_secret(src.credentials['secretkey']))
|
||||
if "endpoint" in src.credentials:
|
||||
configs[s3_common.S3_ENDPOINT_CONFIG] = (
|
||||
src.credentials['endpoint'])
|
||||
if "bucket_in_path" in src.credentials:
|
||||
configs[s3_common.S3_BUCKET_IN_PATH_CONFIG] = (
|
||||
src.credentials['bucket_in_path'])
|
||||
if "ssl" in src.credentials:
|
||||
configs[s3_common.S3_SSL_CONFIG] = (
|
||||
src.credentials['ssl'])
|
||||
break
|
||||
return configs
|
||||
|
||||
def get_params(self, input_data, output_data, data_source_urls):
|
||||
|
@ -220,6 +239,7 @@ class JavaFactory(BaseFactory):
|
|||
return main_class, java_opts, args
|
||||
|
||||
def get_configs(self, proxy_configs=None):
|
||||
# TODO(jfreud): allow s3 and non-proxy swift configs here?
|
||||
configs = {}
|
||||
|
||||
if proxy_configs:
|
||||
|
|
|
@ -22,6 +22,16 @@ from sahara.i18n import _
|
|||
from sahara.service.castellan import utils as key_manager
|
||||
|
||||
S3_JB_PREFIX = "s3://"
|
||||
S3_ACCESS_KEY_CONFIG = "fs.s3a.access.key"
|
||||
S3_SECRET_KEY_CONFIG = "fs.s3a.secret.key"
|
||||
S3_ENDPOINT_CONFIG = "fs.s3a.endpoint"
|
||||
S3_BUCKET_IN_PATH_CONFIG = "fs.s3a.path.style.access"
|
||||
S3_SSL_CONFIG = "fs.s3a.connection.ssl.enabled"
|
||||
S3_DS_CONFIGS = [S3_ACCESS_KEY_CONFIG,
|
||||
S3_SECRET_KEY_CONFIG,
|
||||
S3_ENDPOINT_CONFIG,
|
||||
S3_BUCKET_IN_PATH_CONFIG,
|
||||
S3_SSL_CONFIG]
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ from sahara.service.castellan import utils as key_manager
|
|||
from sahara.service.edp import base_engine
|
||||
from sahara.service.edp.job_binaries import manager as jb_manager
|
||||
from sahara.service.edp import job_utils
|
||||
from sahara.service.edp import s3_common
|
||||
from sahara.service.validations.edp import job_execution as j
|
||||
from sahara.swift import swift_helper as sw
|
||||
from sahara.swift import utils as su
|
||||
|
@ -115,6 +116,7 @@ class SparkJobEngine(base_engine.JobEngine):
|
|||
xml_name = 'spark.xml'
|
||||
proxy_configs = job_configs.get('proxy_configs')
|
||||
configs = {}
|
||||
cfgs = job_configs.get('configs', {})
|
||||
if proxy_configs:
|
||||
configs[sw.HADOOP_SWIFT_USERNAME] = proxy_configs.get(
|
||||
'proxy_username')
|
||||
|
@ -124,9 +126,21 @@ class SparkJobEngine(base_engine.JobEngine):
|
|||
'proxy_trust_id')
|
||||
configs[sw.HADOOP_SWIFT_DOMAIN_NAME] = CONF.proxy_user_domain_name
|
||||
else:
|
||||
cfgs = job_configs.get('configs', {})
|
||||
targets = [sw.HADOOP_SWIFT_USERNAME, sw.HADOOP_SWIFT_PASSWORD]
|
||||
targets = [sw.HADOOP_SWIFT_USERNAME]
|
||||
configs = {k: cfgs[k] for k in targets if k in cfgs}
|
||||
if sw.HADOOP_SWIFT_PASSWORD in cfgs:
|
||||
configs[sw.HADOOP_SWIFT_PASSWORD] = (
|
||||
key_manager.get_secret(cfgs[sw.HADOOP_SWIFT_PASSWORD])
|
||||
)
|
||||
|
||||
for s3_cfg_key in s3_common.S3_DS_CONFIGS:
|
||||
if s3_cfg_key in cfgs:
|
||||
if s3_cfg_key == s3_common.S3_SECRET_KEY_CONFIG:
|
||||
configs[s3_cfg_key] = (
|
||||
key_manager.get_secret(cfgs[s3_cfg_key])
|
||||
)
|
||||
else:
|
||||
configs[s3_cfg_key] = cfgs[s3_cfg_key]
|
||||
|
||||
content = xmlutils.create_hadoop_xml(configs)
|
||||
with remote.get_remote(where) as r:
|
||||
|
|
|
@ -24,7 +24,7 @@ conductor = c.API
|
|||
|
||||
data_source_type = {
|
||||
"type": "string",
|
||||
"enum": ["swift", "hdfs", "maprfs", "manila"]
|
||||
"enum": ["swift", "hdfs", "maprfs", "manila", "s3"]
|
||||
}
|
||||
|
||||
job_configs = {
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
# Copyright (c) 2018 OpenStack Contributors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
import testtools
|
||||
|
||||
import sahara.exceptions as ex
|
||||
from sahara.service.edp.data_sources.s3.implementation import S3Type
|
||||
from sahara.tests.unit import base
|
||||
from sahara.utils.types import FrozenDict
|
||||
|
||||
|
||||
class TestSwiftType(base.SaharaTestCase):
|
||||
def setUp(self):
|
||||
super(TestSwiftType, self).setUp()
|
||||
self.s_type = S3Type()
|
||||
|
||||
def test_validate(self):
|
||||
data = {
|
||||
"name": "test_data_data_source",
|
||||
"type": "s3",
|
||||
"url": "s3a://mybucket/myobject",
|
||||
}
|
||||
self.s_type.validate(data)
|
||||
|
||||
creds = {}
|
||||
data["credentials"] = creds
|
||||
self.s_type.validate(data)
|
||||
|
||||
creds["accesskey"] = "key"
|
||||
creds["secretkey"] = "key2"
|
||||
self.s_type.validate(data)
|
||||
|
||||
creds["bucket_in_path"] = True
|
||||
creds["ssl"] = True
|
||||
creds["endpoint"] = "blah.org"
|
||||
self.s_type.validate(data)
|
||||
|
||||
creds["cool_key"] = "wow"
|
||||
with testtools.ExpectedException(ex.InvalidDataException):
|
||||
self.s_type.validate(data)
|
||||
|
||||
creds.pop("cool_key")
|
||||
|
||||
creds["ssl"] = "yeah"
|
||||
with testtools.ExpectedException(ex.InvalidDataException):
|
||||
self.s_type.validate(data)
|
||||
|
||||
creds["ssl"] = True
|
||||
creds["bucket_in_path"] = "yeah"
|
||||
with testtools.ExpectedException(ex.InvalidDataException):
|
||||
self.s_type.validate(data)
|
||||
|
||||
def test_validate_url(self):
|
||||
url = ""
|
||||
with testtools.ExpectedException(ex.InvalidDataException):
|
||||
self.s_type._validate_url(url)
|
||||
url = "s3a://"
|
||||
with testtools.ExpectedException(ex.InvalidDataException):
|
||||
self.s_type._validate_url(url)
|
||||
url = "s3a:///"
|
||||
with testtools.ExpectedException(ex.InvalidDataException):
|
||||
self.s_type._validate_url(url)
|
||||
url = "s3a://bucket"
|
||||
with testtools.ExpectedException(ex.InvalidDataException):
|
||||
self.s_type._validate_url(url)
|
||||
url = "s3b://bucket/obj"
|
||||
with testtools.ExpectedException(ex.InvalidDataException):
|
||||
self.s_type._validate_url(url)
|
||||
url = "s3a://bucket/obj"
|
||||
self.s_type._validate_url(url)
|
||||
url = "s3a://bucket/fold/obj"
|
||||
self.s_type._validate_url(url)
|
||||
url = "s3a://bucket/obj/"
|
||||
self.s_type._validate_url(url)
|
||||
|
||||
def test_prepare_cluster(self):
|
||||
ds = mock.Mock()
|
||||
cluster = mock.Mock()
|
||||
ds.credentials = {}
|
||||
job_configs = {}
|
||||
|
||||
self.s_type.prepare_cluster(ds, cluster, job_configs=job_configs)
|
||||
self.assertEqual(job_configs, {})
|
||||
|
||||
job_configs['configs'] = {}
|
||||
ds.credentials['accesskey'] = 'key'
|
||||
self.s_type.prepare_cluster(ds, cluster, job_configs=job_configs)
|
||||
self.assertEqual(job_configs['configs'], {'fs.s3a.access.key': 'key'})
|
||||
|
||||
job_configs['configs'] = {'fs.s3a.access.key': 'key2'}
|
||||
self.s_type.prepare_cluster(ds, cluster, job_configs=job_configs)
|
||||
self.assertEqual(job_configs['configs'], {'fs.s3a.access.key': 'key2'})
|
||||
|
||||
job_configs = FrozenDict({'configs': {}})
|
||||
self.s_type.prepare_cluster(ds, cluster, job_configs=job_configs)
|
||||
self.assertNotIn(job_configs['configs'], 'accesskey')
|
||||
|
||||
job_configs = {}
|
||||
self.s_type.prepare_cluster(ds, cluster, job_configs=job_configs)
|
||||
self.assertEqual(job_configs, {})
|
|
@ -59,6 +59,7 @@ sahara.data_source.types =
|
|||
manila = sahara.service.edp.data_sources.manila.implementation:ManilaType
|
||||
maprfs = sahara.service.edp.data_sources.maprfs.implementation:MapRFSType
|
||||
swift = sahara.service.edp.data_sources.swift.implementation:SwiftType
|
||||
s3 = sahara.service.edp.data_sources.s3.implementation:S3Type
|
||||
|
||||
sahara.job_binary.types =
|
||||
internal-db = sahara.service.edp.job_binaries.internal_db.implementation:InternalDBType
|
||||
|
|
Loading…
Reference in New Issue