summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.openstack.org>2018-07-11 23:02:24 +0000
committerGerrit Code Review <review@openstack.org>2018-07-11 23:02:24 +0000
commit1b9cd66f860c4cc3c4df3a46e6fa4b7c3192e018 (patch)
tree57fcdc0c6546d22ff8d71d107bbc2f716dbf1d8a
parent927c7c215ce130bca240f5ed077bd6d2771cf6e4 (diff)
parenta449558ac08a59cef0dc0913fbf499912b0606d2 (diff)
Merge "S3 data source"
-rw-r--r--doc/source/user/edp.rst23
-rw-r--r--releasenotes/notes/support-s3-data-source-a912e2cdf4cd51fb.yaml3
-rw-r--r--sahara/conductor/manager.py34
-rw-r--r--sahara/conductor/resource.py6
-rw-r--r--sahara/service/edp/data_sources/opts.py2
-rw-r--r--sahara/service/edp/data_sources/s3/__init__.py0
-rw-r--r--sahara/service/edp/data_sources/s3/implementation.py82
-rw-r--r--sahara/service/edp/oozie/workflow_creator/workflow_factory.py20
-rw-r--r--sahara/service/edp/s3_common.py10
-rw-r--r--sahara/service/edp/spark/engine.py18
-rw-r--r--sahara/service/validations/edp/base.py2
-rw-r--r--sahara/tests/unit/service/edp/data_sources/s3/__init__.py0
-rw-r--r--sahara/tests/unit/service/edp/data_sources/s3/test_s3_type.py113
-rw-r--r--setup.cfg1
14 files changed, 301 insertions, 13 deletions
diff --git a/doc/source/user/edp.rst b/doc/source/user/edp.rst
index 863d617..1052654 100644
--- a/doc/source/user/edp.rst
+++ b/doc/source/user/edp.rst
@@ -19,6 +19,7 @@ of jobs on clusters created from sahara. EDP supports:
19 + HDFS for all job types 19 + HDFS for all job types
20 + swift for all types excluding Hive 20 + swift for all types excluding Hive
21 + manila (NFS shares only) for all types excluding Pig 21 + manila (NFS shares only) for all types excluding Pig
22 + Any S3-like object store
22 23
23* configuration of jobs at submission time 24* configuration of jobs at submission time
24* execution of jobs on existing clusters or transient clusters 25* execution of jobs on existing clusters or transient clusters
@@ -69,7 +70,8 @@ running in the same OpenStack installation referenced by sahara.
69Sahara requires the following credentials/configs to access files stored in an 70Sahara requires the following credentials/configs to access files stored in an
70S3-like object store: ``accesskey``, ``secretkey``, ``endpoint``. 71S3-like object store: ``accesskey``, ``secretkey``, ``endpoint``.
71These credentials are specified through the `extra` in the body of the request 72These credentials are specified through the `extra` in the body of the request
72when creating a job binary or data source referencing S3. 73when creating a job binary referencing S3. The value of ``endpoint`` should
74include a protocol: *http* or *https*.
73 75
74To reference a binary file stored in manila, create the job binary with the 76To reference a binary file stored in manila, create the job binary with the
75URL ``manila://{share_id}/{path}``. This assumes that you have already stored 77URL ``manila://{share_id}/{path}``. This assumes that you have already stored
@@ -132,6 +134,19 @@ share as a data source, create the data source with the URL
132share will be automatically mounted to your cluster's nodes as needed to 134share will be automatically mounted to your cluster's nodes as needed to
133access the data source. 135access the data source.
134 136
137Finally, Sahara supports data sources referring to S3-like object stores. The
138URL should be of the form ``s3a://{bucket}/{path}``. Also, the following
139credentials/configs are understood: ``accesskey``, ``secretkey``,
140``endpoint``, ``bucket_in_path``, and ``ssl``. These credentials are specified
141through the ``credentials`` attribute of the body of the request when creating
142a data source referencing S3. The value of ``endpoint`` should **NOT** include
143a protocol (*http* or *https*), unlike when referencing an S3 job binary. It
144can also be noted that Sahara clusters can interact with S3-like stores even
145when not using EDP, i.e. when manually operating the cluster instead. Consult
146the `hadoop-aws documentation <https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html>`_
147for more information. Also, be advised that hadoop-aws will only write a job's
148output into a bucket which already exists: it does not create new buckets.
149
135Some job types require the use of data source objects to specify input and 150Some job types require the use of data source objects to specify input and
136output when a job is launched. For example, when running a Pig job the UI will 151output when a job is launched. For example, when running a Pig job the UI will
137prompt the user for input and output data source objects. 152prompt the user for input and output data source objects.
@@ -572,7 +587,8 @@ Spark jobs use some special configuration values:
572* ``edp.spark.adapt_for_swift`` (optional) If set to **True**, instructs 587* ``edp.spark.adapt_for_swift`` (optional) If set to **True**, instructs
573 sahara to modify the job's Hadoop configuration so that swift paths may be 588 sahara to modify the job's Hadoop configuration so that swift paths may be
574 accessed. Without this configuration value, swift paths will not be 589 accessed. Without this configuration value, swift paths will not be
575 accessible to Spark jobs. The default is **False**. 590 accessible to Spark jobs. The default is **False**. Despite the name, the
591 same principle applies to jobs which reference paths in S3-like stores.
576 592
577* ``edp.spark.driver.classpath`` (optional) If set to empty string sahara 593* ``edp.spark.driver.classpath`` (optional) If set to empty string sahara
578 will use default classpath for the cluster during job execution. 594 will use default classpath for the cluster during job execution.
@@ -620,6 +636,9 @@ For job binaries only, S3 urls take the form:
620 636
621``s3://bucket/path/to/object`` 637``s3://bucket/path/to/object``
622 638
639For data sources, S3 urls take the standard Hadoop form:
640
641``s3a://bucket/path/to/object``
623 642
624EDP Requirements 643EDP Requirements
625================ 644================
diff --git a/releasenotes/notes/support-s3-data-source-a912e2cdf4cd51fb.yaml b/releasenotes/notes/support-s3-data-source-a912e2cdf4cd51fb.yaml
new file mode 100644
index 0000000..255c65d
--- /dev/null
+++ b/releasenotes/notes/support-s3-data-source-a912e2cdf4cd51fb.yaml
@@ -0,0 +1,3 @@
1---
2features:
3 - An EDP data source may reference a file stored in a S3-like object store.
diff --git a/sahara/conductor/manager.py b/sahara/conductor/manager.py
index 5405cb4..329f999 100644
--- a/sahara/conductor/manager.py
+++ b/sahara/conductor/manager.py
@@ -436,6 +436,10 @@ class ConductorManager(db_base.Base):
436 values['credentials'].get('password')): 436 values['credentials'].get('password')):
437 values['credentials']['password'] = key_manager.store_secret( 437 values['credentials']['password'] = key_manager.store_secret(
438 values['credentials']['password'], context) 438 values['credentials']['password'], context)
439 if (values.get('credentials') and
440 values['credentials'].get('secretkey')):
441 values['credentials']['secretkey'] = key_manager.store_secret(
442 values['credentials']['secretkey'], context)
439 return self.db.data_source_create(context, values) 443 return self.db.data_source_create(context, values)
440 444
441 def data_source_destroy(self, context, data_source): 445 def data_source_destroy(self, context, data_source):
@@ -451,6 +455,11 @@ class ConductorManager(db_base.Base):
451 ds_record['credentials'].get('password')): 455 ds_record['credentials'].get('password')):
452 key_manager.delete_secret( 456 key_manager.delete_secret(
453 ds_record['credentials']['password'], context) 457 ds_record['credentials']['password'], context)
458 if CONF.use_barbican_key_manager:
459 if (ds_record.get('credentials') and
460 ds_record['credentials'].get('secretkey')):
461 key_manager.delete_secret(
462 ds_record['credentials']['secretkey'], context)
454 return self.db.data_source_destroy(context, data_source) 463 return self.db.data_source_destroy(context, data_source)
455 464
456 def data_source_update(self, context, id, values): 465 def data_source_update(self, context, id, values):
@@ -468,20 +477,31 @@ class ConductorManager(db_base.Base):
468 # it should be noted that the jsonschema validation ensures that 477 # it should be noted that the jsonschema validation ensures that
469 # if the proxy domain is not in use then credentials must be 478 # if the proxy domain is not in use then credentials must be
470 # sent with this record. 479 # sent with this record.
471 if (CONF.use_barbican_key_manager and not 480
472 CONF.use_domain_for_proxy_users): 481 # first we retrieve the original record to get the old key
473 # first we retrieve the original record to get the old key 482 # uuid, and delete it.
474 # uuid, and delete it. 483 # next we create the new key.
484
485 if CONF.use_barbican_key_manager:
475 ds_record = self.data_source_get(context, id) 486 ds_record = self.data_source_get(context, id)
476 if (ds_record.get('credentials') and 487 if (ds_record.get('credentials') and
477 ds_record['credentials'].get('password')): 488 ds_record['credentials'].get('password') and
489 not CONF.use_domain_for_proxy_users):
478 key_manager.delete_secret( 490 key_manager.delete_secret(
479 ds_record['credentials']['password'], context) 491 ds_record['credentials']['password'], context)
480 # next we create the new key.
481 if (values.get('credentials') and 492 if (values.get('credentials') and
482 values['credentials'].get('password')): 493 values['credentials'].get('password') and
494 not CONF.use_domain_for_proxy_users):
483 values['credentials']['password'] = key_manager.store_secret( 495 values['credentials']['password'] = key_manager.store_secret(
484 values['credentials']['password'], context) 496 values['credentials']['password'], context)
497 if (ds_record.get('credentials') and
498 ds_record['credentials'].get('secretkey')):
499 key_manager.delete_secret(
500 ds_record['credentials']['secretkey'], context)
501 if (values.get('credentials') and
502 values['credentials'].get('secretkey')):
503 values['credentials']['secretkey'] = key_manager.store_secret(
504 values['credentials']['secretkey'], context)
485 return self.db.data_source_update(context, values) 505 return self.db.data_source_update(context, values)
486 506
487 # JobExecution ops 507 # JobExecution ops
diff --git a/sahara/conductor/resource.py b/sahara/conductor/resource.py
index 3cd9d1f..9271ee1 100644
--- a/sahara/conductor/resource.py
+++ b/sahara/conductor/resource.py
@@ -30,6 +30,7 @@ import six
30from sahara.conductor import objects 30from sahara.conductor import objects
31from sahara import exceptions as ex 31from sahara import exceptions as ex
32from sahara.i18n import _ 32from sahara.i18n import _
33from sahara.service.edp import s3_common
33from sahara.swift import swift_helper 34from sahara.swift import swift_helper
34from sahara.utils import types 35from sahara.utils import types
35 36
@@ -277,6 +278,11 @@ class JobExecution(Resource, objects.JobExecution):
277 configs[swift_helper.HADOOP_SWIFT_USERNAME] = "" 278 configs[swift_helper.HADOOP_SWIFT_USERNAME] = ""
278 if swift_helper.HADOOP_SWIFT_PASSWORD in configs: 279 if swift_helper.HADOOP_SWIFT_PASSWORD in configs:
279 configs[swift_helper.HADOOP_SWIFT_PASSWORD] = "" 280 configs[swift_helper.HADOOP_SWIFT_PASSWORD] = ""
281 if s3_common.S3_ACCESS_KEY_CONFIG in configs:
282 configs[s3_common.S3_ACCESS_KEY_CONFIG] = ""
283 if s3_common.S3_SECRET_KEY_CONFIG in configs:
284 configs[s3_common.S3_SECRET_KEY_CONFIG] = ""
285
280 if 'trusts' in job_configs: 286 if 'trusts' in job_configs:
281 del job_configs['trusts'] 287 del job_configs['trusts']
282 if 'proxy_configs' in job_configs: 288 if 'proxy_configs' in job_configs:
diff --git a/sahara/service/edp/data_sources/opts.py b/sahara/service/edp/data_sources/opts.py
index 7fd5cc3..d63dfc7 100644
--- a/sahara/service/edp/data_sources/opts.py
+++ b/sahara/service/edp/data_sources/opts.py
@@ -19,7 +19,7 @@ from oslo_config import cfg
19 19
20opts = [ 20opts = [
21 cfg.ListOpt('data_source_types', 21 cfg.ListOpt('data_source_types',
22 default=['swift', 'hdfs', 'maprfs', 'manila'], 22 default=['swift', 'hdfs', 'maprfs', 'manila', 's3'],
23 help='List of data sources types to be loaded. Sahara ' 23 help='List of data sources types to be loaded. Sahara '
24 'preserves the order of the list when returning it.'), 24 'preserves the order of the list when returning it.'),
25] 25]
diff --git a/sahara/service/edp/data_sources/s3/__init__.py b/sahara/service/edp/data_sources/s3/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/sahara/service/edp/data_sources/s3/__init__.py
diff --git a/sahara/service/edp/data_sources/s3/implementation.py b/sahara/service/edp/data_sources/s3/implementation.py
new file mode 100644
index 0000000..2a74000
--- /dev/null
+++ b/sahara/service/edp/data_sources/s3/implementation.py
@@ -0,0 +1,82 @@
1# Copyright (c) 2018 OpenStack Contributors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import six.moves.urllib.parse as urlparse
17
18from sahara import exceptions as ex
19from sahara.i18n import _
20from sahara.service.edp.data_sources.base import DataSourceType
21from sahara.service.edp import s3_common
22from sahara.utils import types
23
24
25class S3Type(DataSourceType):
26 configs_map = {"accesskey": s3_common.S3_ACCESS_KEY_CONFIG,
27 "secretkey": s3_common.S3_SECRET_KEY_CONFIG,
28 "endpoint": s3_common.S3_ENDPOINT_CONFIG,
29 "bucket_in_path": s3_common.S3_BUCKET_IN_PATH_CONFIG,
30 "ssl": s3_common.S3_SSL_CONFIG}
31
32 bool_keys = ["bucket_in_path",
33 "ssl"]
34
35 def validate(self, data):
36 self._validate_url(data['url'])
37
38 # Do validation loosely, and don't require much... the user might have
39 # (by their own preference) set some or all configs manually
40
41 if "credentials" not in data:
42 return
43
44 for key in data["credentials"].keys():
45 if key not in self.configs_map.keys():
46 raise ex.InvalidDataException(
47 _("Unknown config '%s' for S3 data source") % key)
48 if key in self.bool_keys:
49 if not isinstance(data["credentials"][key], bool):
50 raise ex.InvalidDataException(
51 _("Config '%s' must be boolean") % key)
52
53 def _validate_url(self, url):
54 if len(url) == 0:
55 raise ex.InvalidDataException(_("S3 url must not be empty"))
56
57 url = urlparse.urlparse(url)
58 if url.scheme != "s3a":
59 raise ex.InvalidDataException(_("URL scheme must be 's3a'"))
60
61 if not url.hostname:
62 raise ex.InvalidDataException(_("Bucket name must be present"))
63
64 if not url.path:
65 raise ex.InvalidDataException(_("Object name must be present"))
66
67 def prepare_cluster(self, data_source, cluster, **kwargs):
68 if hasattr(data_source, "credentials"):
69 job_configs = kwargs.pop('job_configs')
70
71 if isinstance(job_configs, types.FrozenDict):
72 return
73 if job_configs.get('configs', None) is None:
74 return
75
76 creds = data_source.credentials
77 job_conf = job_configs['configs']
78
79 for config_name, s3a_cfg_name in self.configs_map.items():
80 if job_conf.get(s3a_cfg_name, None) is None: # no overwrite
81 if creds.get(config_name, None) is not None:
82 job_conf[s3a_cfg_name] = creds[config_name]
diff --git a/sahara/service/edp/oozie/workflow_creator/workflow_factory.py b/sahara/service/edp/oozie/workflow_creator/workflow_factory.py
index c3c4f63..ae97fac 100644
--- a/sahara/service/edp/oozie/workflow_creator/workflow_factory.py
+++ b/sahara/service/edp/oozie/workflow_creator/workflow_factory.py
@@ -24,6 +24,7 @@ from sahara.service.edp.oozie.workflow_creator import java_workflow
24from sahara.service.edp.oozie.workflow_creator import mapreduce_workflow 24from sahara.service.edp.oozie.workflow_creator import mapreduce_workflow
25from sahara.service.edp.oozie.workflow_creator import pig_workflow 25from sahara.service.edp.oozie.workflow_creator import pig_workflow
26from sahara.service.edp.oozie.workflow_creator import shell_workflow 26from sahara.service.edp.oozie.workflow_creator import shell_workflow
27from sahara.service.edp import s3_common
27from sahara.swift import swift_helper as sw 28from sahara.swift import swift_helper as sw
28from sahara.swift import utils as su 29from sahara.swift import utils as su
29from sahara.utils import edp 30from sahara.utils import edp
@@ -117,6 +118,24 @@ class BaseFactory(object):
117 configs[sw.HADOOP_SWIFT_PASSWORD] = ( 118 configs[sw.HADOOP_SWIFT_PASSWORD] = (
118 key_manager.get_secret(src.credentials['password'])) 119 key_manager.get_secret(src.credentials['password']))
119 break 120 break
121 for src in (input_data, output_data):
122 if src.type == "s3" and hasattr(src, "credentials"):
123 if "accesskey" in src.credentials:
124 configs[s3_common.S3_ACCESS_KEY_CONFIG] = (
125 src.credentials['accesskey'])
126 if "secretkey" in src.credentials:
127 configs[s3_common.S3_SECRET_KEY_CONFIG] = (
128 key_manager.get_secret(src.credentials['secretkey']))
129 if "endpoint" in src.credentials:
130 configs[s3_common.S3_ENDPOINT_CONFIG] = (
131 src.credentials['endpoint'])
132 if "bucket_in_path" in src.credentials:
133 configs[s3_common.S3_BUCKET_IN_PATH_CONFIG] = (
134 src.credentials['bucket_in_path'])
135 if "ssl" in src.credentials:
136 configs[s3_common.S3_SSL_CONFIG] = (
137 src.credentials['ssl'])
138 break
120 return configs 139 return configs
121 140
122 def get_params(self, input_data, output_data, data_source_urls): 141 def get_params(self, input_data, output_data, data_source_urls):
@@ -220,6 +239,7 @@ class JavaFactory(BaseFactory):
220 return main_class, java_opts, args 239 return main_class, java_opts, args
221 240
222 def get_configs(self, proxy_configs=None): 241 def get_configs(self, proxy_configs=None):
242 # TODO(jfreud): allow s3 and non-proxy swift configs here?
223 configs = {} 243 configs = {}
224 244
225 if proxy_configs: 245 if proxy_configs:
diff --git a/sahara/service/edp/s3_common.py b/sahara/service/edp/s3_common.py
index 4b6e532..dbfa899 100644
--- a/sahara/service/edp/s3_common.py
+++ b/sahara/service/edp/s3_common.py
@@ -22,6 +22,16 @@ from sahara.i18n import _
22from sahara.service.castellan import utils as key_manager 22from sahara.service.castellan import utils as key_manager
23 23
24S3_JB_PREFIX = "s3://" 24S3_JB_PREFIX = "s3://"
25S3_ACCESS_KEY_CONFIG = "fs.s3a.access.key"
26S3_SECRET_KEY_CONFIG = "fs.s3a.secret.key"
27S3_ENDPOINT_CONFIG = "fs.s3a.endpoint"
28S3_BUCKET_IN_PATH_CONFIG = "fs.s3a.path.style.access"
29S3_SSL_CONFIG = "fs.s3a.connection.ssl.enabled"
30S3_DS_CONFIGS = [S3_ACCESS_KEY_CONFIG,
31 S3_SECRET_KEY_CONFIG,
32 S3_ENDPOINT_CONFIG,
33 S3_BUCKET_IN_PATH_CONFIG,
34 S3_SSL_CONFIG]
25CONF = cfg.CONF 35CONF = cfg.CONF
26 36
27 37
diff --git a/sahara/service/edp/spark/engine.py b/sahara/service/edp/spark/engine.py
index fb774c1..83a35ff 100644
--- a/sahara/service/edp/spark/engine.py
+++ b/sahara/service/edp/spark/engine.py
@@ -27,6 +27,7 @@ from sahara.service.castellan import utils as key_manager
27from sahara.service.edp import base_engine 27from sahara.service.edp import base_engine
28from sahara.service.edp.job_binaries import manager as jb_manager 28from sahara.service.edp.job_binaries import manager as jb_manager
29from sahara.service.edp import job_utils 29from sahara.service.edp import job_utils
30from sahara.service.edp import s3_common
30from sahara.service.validations.edp import job_execution as j 31from sahara.service.validations.edp import job_execution as j
31from sahara.swift import swift_helper as sw 32from sahara.swift import swift_helper as sw
32from sahara.swift import utils as su 33from sahara.swift import utils as su
@@ -115,6 +116,7 @@ class SparkJobEngine(base_engine.JobEngine):
115 xml_name = 'spark.xml' 116 xml_name = 'spark.xml'
116 proxy_configs = job_configs.get('proxy_configs') 117 proxy_configs = job_configs.get('proxy_configs')
117 configs = {} 118 configs = {}
119 cfgs = job_configs.get('configs', {})
118 if proxy_configs: 120 if proxy_configs:
119 configs[sw.HADOOP_SWIFT_USERNAME] = proxy_configs.get( 121 configs[sw.HADOOP_SWIFT_USERNAME] = proxy_configs.get(
120 'proxy_username') 122 'proxy_username')
@@ -124,9 +126,21 @@ class SparkJobEngine(base_engine.JobEngine):
124 'proxy_trust_id') 126 'proxy_trust_id')
125 configs[sw.HADOOP_SWIFT_DOMAIN_NAME] = CONF.proxy_user_domain_name 127 configs[sw.HADOOP_SWIFT_DOMAIN_NAME] = CONF.proxy_user_domain_name
126 else: 128 else:
127 cfgs = job_configs.get('configs', {}) 129 targets = [sw.HADOOP_SWIFT_USERNAME]
128 targets = [sw.HADOOP_SWIFT_USERNAME, sw.HADOOP_SWIFT_PASSWORD]
129 configs = {k: cfgs[k] for k in targets if k in cfgs} 130 configs = {k: cfgs[k] for k in targets if k in cfgs}
131 if sw.HADOOP_SWIFT_PASSWORD in cfgs:
132 configs[sw.HADOOP_SWIFT_PASSWORD] = (
133 key_manager.get_secret(cfgs[sw.HADOOP_SWIFT_PASSWORD])
134 )
135
136 for s3_cfg_key in s3_common.S3_DS_CONFIGS:
137 if s3_cfg_key in cfgs:
138 if s3_cfg_key == s3_common.S3_SECRET_KEY_CONFIG:
139 configs[s3_cfg_key] = (
140 key_manager.get_secret(cfgs[s3_cfg_key])
141 )
142 else:
143 configs[s3_cfg_key] = cfgs[s3_cfg_key]
130 144
131 content = xmlutils.create_hadoop_xml(configs) 145 content = xmlutils.create_hadoop_xml(configs)
132 with remote.get_remote(where) as r: 146 with remote.get_remote(where) as r:
diff --git a/sahara/service/validations/edp/base.py b/sahara/service/validations/edp/base.py
index 9ed04b4..31caec3 100644
--- a/sahara/service/validations/edp/base.py
+++ b/sahara/service/validations/edp/base.py
@@ -24,7 +24,7 @@ conductor = c.API
24 24
25data_source_type = { 25data_source_type = {
26 "type": "string", 26 "type": "string",
27 "enum": ["swift", "hdfs", "maprfs", "manila"] 27 "enum": ["swift", "hdfs", "maprfs", "manila", "s3"]
28} 28}
29 29
30job_configs = { 30job_configs = {
diff --git a/sahara/tests/unit/service/edp/data_sources/s3/__init__.py b/sahara/tests/unit/service/edp/data_sources/s3/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/sahara/tests/unit/service/edp/data_sources/s3/__init__.py
diff --git a/sahara/tests/unit/service/edp/data_sources/s3/test_s3_type.py b/sahara/tests/unit/service/edp/data_sources/s3/test_s3_type.py
new file mode 100644
index 0000000..2da7a12
--- /dev/null
+++ b/sahara/tests/unit/service/edp/data_sources/s3/test_s3_type.py
@@ -0,0 +1,113 @@
1# Copyright (c) 2018 OpenStack Contributors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12# implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import mock
17import testtools
18
19import sahara.exceptions as ex
20from sahara.service.edp.data_sources.s3.implementation import S3Type
21from sahara.tests.unit import base
22from sahara.utils.types import FrozenDict
23
24
25class TestSwiftType(base.SaharaTestCase):
26 def setUp(self):
27 super(TestSwiftType, self).setUp()
28 self.s_type = S3Type()
29
30 def test_validate(self):
31 data = {
32 "name": "test_data_data_source",
33 "type": "s3",
34 "url": "s3a://mybucket/myobject",
35 }
36 self.s_type.validate(data)
37
38 creds = {}
39 data["credentials"] = creds
40 self.s_type.validate(data)
41
42 creds["accesskey"] = "key"
43 creds["secretkey"] = "key2"
44 self.s_type.validate(data)
45
46 creds["bucket_in_path"] = True
47 creds["ssl"] = True
48 creds["endpoint"] = "blah.org"
49 self.s_type.validate(data)
50
51 creds["cool_key"] = "wow"
52 with testtools.ExpectedException(ex.InvalidDataException):
53 self.s_type.validate(data)
54
55 creds.pop("cool_key")
56
57 creds["ssl"] = "yeah"
58 with testtools.ExpectedException(ex.InvalidDataException):
59 self.s_type.validate(data)
60
61 creds["ssl"] = True
62 creds["bucket_in_path"] = "yeah"
63 with testtools.ExpectedException(ex.InvalidDataException):
64 self.s_type.validate(data)
65
66 def test_validate_url(self):
67 url = ""
68 with testtools.ExpectedException(ex.InvalidDataException):
69 self.s_type._validate_url(url)
70 url = "s3a://"
71 with testtools.ExpectedException(ex.InvalidDataException):
72 self.s_type._validate_url(url)
73 url = "s3a:///"
74 with testtools.ExpectedException(ex.InvalidDataException):
75 self.s_type._validate_url(url)
76 url = "s3a://bucket"
77 with testtools.ExpectedException(ex.InvalidDataException):
78 self.s_type._validate_url(url)
79 url = "s3b://bucket/obj"
80 with testtools.ExpectedException(ex.InvalidDataException):
81 self.s_type._validate_url(url)
82 url = "s3a://bucket/obj"
83 self.s_type._validate_url(url)
84 url = "s3a://bucket/fold/obj"
85 self.s_type._validate_url(url)
86 url = "s3a://bucket/obj/"
87 self.s_type._validate_url(url)
88
89 def test_prepare_cluster(self):
90 ds = mock.Mock()
91 cluster = mock.Mock()
92 ds.credentials = {}
93 job_configs = {}
94
95 self.s_type.prepare_cluster(ds, cluster, job_configs=job_configs)
96 self.assertEqual(job_configs, {})
97
98 job_configs['configs'] = {}
99 ds.credentials['accesskey'] = 'key'
100 self.s_type.prepare_cluster(ds, cluster, job_configs=job_configs)
101 self.assertEqual(job_configs['configs'], {'fs.s3a.access.key': 'key'})
102
103 job_configs['configs'] = {'fs.s3a.access.key': 'key2'}
104 self.s_type.prepare_cluster(ds, cluster, job_configs=job_configs)
105 self.assertEqual(job_configs['configs'], {'fs.s3a.access.key': 'key2'})
106
107 job_configs = FrozenDict({'configs': {}})
108 self.s_type.prepare_cluster(ds, cluster, job_configs=job_configs)
109 self.assertNotIn(job_configs['configs'], 'accesskey')
110
111 job_configs = {}
112 self.s_type.prepare_cluster(ds, cluster, job_configs=job_configs)
113 self.assertEqual(job_configs, {})
diff --git a/setup.cfg b/setup.cfg
index 38f2b84..48de0d7 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -59,6 +59,7 @@ sahara.data_source.types =
59 manila = sahara.service.edp.data_sources.manila.implementation:ManilaType 59 manila = sahara.service.edp.data_sources.manila.implementation:ManilaType
60 maprfs = sahara.service.edp.data_sources.maprfs.implementation:MapRFSType 60 maprfs = sahara.service.edp.data_sources.maprfs.implementation:MapRFSType
61 swift = sahara.service.edp.data_sources.swift.implementation:SwiftType 61 swift = sahara.service.edp.data_sources.swift.implementation:SwiftType
62 s3 = sahara.service.edp.data_sources.s3.implementation:S3Type
62 63
63sahara.job_binary.types = 64sahara.job_binary.types =
64 internal-db = sahara.service.edp.job_binaries.internal_db.implementation:InternalDBType 65 internal-db = sahara.service.edp.job_binaries.internal_db.implementation:InternalDBType