diff --git a/.stestr.conf b/.stestr.conf index e8c88eaa..ed3a4ffb 100644 --- a/.stestr.conf +++ b/.stestr.conf @@ -1,3 +1,3 @@ [DEFAULT] -test_path=sahara_tests/unit/scenario +test_path=sahara_tests/unit group_regex=([^\.]+\.)+ diff --git a/.zuul.yaml b/.zuul.yaml index ddeae83c..39e97b62 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -229,6 +229,7 @@ s-container: false s-object: false s-proxy: false + sahara_enable_s3: True - job: name: sahara-tests-scenario-multinode-spark diff --git a/etc/scenario/gate/credentials_s3.yaml.mako b/etc/scenario/gate/credentials_s3.yaml.mako new file mode 100644 index 00000000..48e01da7 --- /dev/null +++ b/etc/scenario/gate/credentials_s3.yaml.mako @@ -0,0 +1,6 @@ +credentials: + s3_accesskey: ${s3_accesskey} + s3_secretkey: ${s3_secretkey} + s3_endpoint: ${s3_endpoint} + s3_endpoint_ssl: ${s3_endpoint_ssl} + s3_bucket_path: ${s3_bucket_path} diff --git a/etc/scenario/gate/edp_s3.yaml.mako b/etc/scenario/gate/edp_s3.yaml.mako new file mode 100644 index 00000000..2a3aa11a --- /dev/null +++ b/etc/scenario/gate/edp_s3.yaml.mako @@ -0,0 +1,25 @@ +edp_jobs_flow: + spark_wordcount_s3: + - type: Spark + input_datasource: + type: s3 + source: edp-examples/edp-spark/sample_input.txt + output_datasource: + type: swift + destination: edp-output + main_lib: + type: swift + source: edp-examples/edp-spark/spark-wordcount.jar + configs: + edp.java.main_class: sahara.edp.spark.SparkWordCount + edp.spark.adapt_for_swift: true + fs.swift.service.sahara.username: ${os_username} + fs.swift.service.sahara.password: ${os_password} + fs.s3a.access.key: ${s3_accesskey} + fs.s3a.secret.key: ${s3_secretkey} + fs.s3a.endpoint: ${s3_endpoint} + fs.s3a.connection.ssl.enabled: ${s3_endpoint_ssl} + fs.s3a.path.style.access: ${s3_bucket_path} + args: + - '{input_datasource}' + - '{output_datasource}' diff --git a/etc/scenario/gate/spark-1.6.0.yaml.mako b/etc/scenario/gate/spark-1.6.0.yaml.mako index 81ca9330..29929223 100644 --- a/etc/scenario/gate/spark-1.6.0.yaml.mako +++ b/etc/scenario/gate/spark-1.6.0.yaml.mako @@ -32,6 +32,7 @@ clusters: node_group: worker size: 1 scenario: + - run_jobs - scale edp_jobs_flow: - spark_pi diff --git a/etc/scenario/simple-testcase.yaml b/etc/scenario/simple-testcase.yaml index 5fb8d08e..50dde9cb 100644 --- a/etc/scenario/simple-testcase.yaml +++ b/etc/scenario/simple-testcase.yaml @@ -23,7 +23,10 @@ clusters: - operation: add node_group: cdh-worker size: 1 - edp_jobs_flow: test_flow + edp_jobs_flow: + - test_flow + - name: test_manila + features: manila edp_jobs_flow: test_flow: @@ -71,9 +74,8 @@ edp_jobs_flow: configs: edp.streaming.mapper: /bin/cat edp.streaming.reducer: /usr/bin/wc + test_manila: - type: Pig - features: - - manila input_datasource: type: manila source: etc/edp-examples/edp-pig/top-todoers/data/input diff --git a/releasenotes/notes/scenario-s3-20d41440d84af3a9.yaml b/releasenotes/notes/scenario-s3-20d41440d84af3a9.yaml new file mode 100644 index 00000000..1b292a3c --- /dev/null +++ b/releasenotes/notes/scenario-s3-20d41440d84af3a9.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + sahara-scenario now supports testing the S3 API for job binaries + and data sources, a feature introduced in Rocky. + The code can be enabled using the "s3" feature and + various templates now runs an S3-based job too when + the feature is enabled from the command line. diff --git a/requirements.txt b/requirements.txt index c0f8a5db..b5a77783 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ pbr>=1.6 # Apache-2.0 Mako>=0.4.0 # MIT +botocore>=1.5.1 # Apache-2.0 fixtures>=3.0.0 # Apache-2.0/BSD jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT keystoneauth1>=2.1.0 # Apache-2.0 diff --git a/roles/run-sahara-scenario/defaults/main.yaml b/roles/run-sahara-scenario/defaults/main.yaml index d7d7875f..217c1108 100644 --- a/roles/run-sahara-scenario/defaults/main.yaml +++ b/roles/run-sahara-scenario/defaults/main.yaml @@ -4,3 +4,4 @@ sahara_cloud_demo: 'devstack-admin' sahara_scenario_conf: '{{ ansible_user_dir }}/template_vars.ini' sahara_scenario_test_template: 'fake.yaml.mako' sahara_scenario_tox_env: 'venv' +sahara_enable_s3: False diff --git a/roles/run-sahara-scenario/tasks/main.yaml b/roles/run-sahara-scenario/tasks/main.yaml index 73849582..9a1b7ecf 100644 --- a/roles/run-sahara-scenario/tasks/main.yaml +++ b/roles/run-sahara-scenario/tasks/main.yaml @@ -4,11 +4,18 @@ tox -e {{ sahara_scenario_tox_env }} --sitepackages -- sahara-scenario --verbose -V {{ sahara_scenario_conf }} \ etc/scenario/gate/credentials.yaml.mako \ etc/scenario/gate/edp.yaml.mako \ + {% if sahara_enable_s3 -%} + etc/scenario/gate/credentials_s3.yaml.mako \ + etc/scenario/gate/edp_s3.yaml.mako \ + {% endif -%} etc/scenario/gate/{{ sahara_scenario_test_template }} \ --os-cloud {{ sahara_cloud_demo }} \ {% if sahara_scenario_use_api_v2|default(False) -%} --v2 \ - {% endif %} + {% endif -%} + {% if sahara_enable_s3 -%} + --feature s3 \ + {% endif -%} | tee scenario.log if grep -qE '(FAILED|ERROR:)' scenario.log; then exit 1 diff --git a/roles/setup-sahara-scenario-env/defaults/main.yaml b/roles/setup-sahara-scenario-env/defaults/main.yaml index d6883e31..6ca4669f 100644 --- a/roles/setup-sahara-scenario-env/defaults/main.yaml +++ b/roles/setup-sahara-scenario-env/defaults/main.yaml @@ -7,6 +7,7 @@ sahara_image_name: 'xenial-server' sahara_image_user: 'ubuntu' sahara_image_format: 'qcow2' sahara_scenario_conf: '{{ ansible_user_dir }}/template_vars.ini' +sahara_enable_s3: False sahara_network_type: 'neutron' private_network_name: 'private' public_network_name: 'public' diff --git a/roles/setup-sahara-scenario-env/tasks/main.yaml b/roles/setup-sahara-scenario-env/tasks/main.yaml index 2cee0de6..7152c3d6 100644 --- a/roles/setup-sahara-scenario-env/tasks/main.yaml +++ b/roles/setup-sahara-scenario-env/tasks/main.yaml @@ -28,6 +28,10 @@ openstack --os-cloud {{ sahara_cloud_demo }} dataprocessing image tags add {{ sahara_image_name }} --tags \ {{ sahara_plugin_version }} {{ sahara_plugin }} +- name: S3 configuration + import_tasks: setup_s3.yaml + when: sahara_enable_s3 + # we cannot use os_nova_flavor as well (see above) - name: create the required flavor(s) command: | diff --git a/roles/setup-sahara-scenario-env/tasks/setup_s3.yaml b/roles/setup-sahara-scenario-env/tasks/setup_s3.yaml new file mode 100644 index 00000000..9d581b98 --- /dev/null +++ b/roles/setup-sahara-scenario-env/tasks/setup_s3.yaml @@ -0,0 +1,29 @@ +- name: create the S3 credentials + shell: | + ACCESS_KEY=$(openstack --os-cloud {{ sahara_cloud_demo }} ec2 credentials list -f value -c Access | head -n1) + if [ -z "${ACCESS_KEY}" ]; then + ACCESS_KEY=$(openstack --os-cloud {{ sahara_cloud_demo }} ec2 credentials create -f value -c access) + fi + SECRET_KEY=$(openstack --os-cloud {{ sahara_cloud_demo }} ec2 credentials list -f value -c Secret | head -n1) + printf "${ACCESS_KEY}\n${SECRET_KEY}" + register: sahara_s3_credentials_out + +# This task should not be needed normally and the endpoint should be discovered by default +- name: find the swift endpoint for S3 + shell: | + ENDPOINT=$(openstack --os-cloud {{ sahara_cloud_admin }} endpoint list --service swift --interface public -c URL -f value) + ENDPOINT_PREFIX=$(awk -F'//' '{print $1}') + ENDPOINT_SSL="False" + if [ "${ENDPOINT_PREFIX}" = "https" ]; then + ENDPOINT_SSL="True" + fi + printf "${ENDPOINT}\n${ENDPOINT_SSL}" + register: sahara_s3_endpoint_out + +- name: save the S3 access data + set_fact: + sahara_s3_accesskey: "{{ sahara_s3_credentials_out.stdout_lines[0] }}" + sahara_s3_secretkey: "{{ sahara_s3_credentials_out.stdout_lines[1] }}" + sahara_s3_endpoint: "{{ sahara_s3_endpoint_out.stdout_lines[0] }}" + sahara_s3_endpoint_ssl: "{{ sahara_s3_endpoint_out.stdout_lines[1] }}" + sahara_s3_bucket_path: True diff --git a/roles/setup-sahara-scenario-env/templates/sahara_scenario_conf.ini.j2 b/roles/setup-sahara-scenario-env/templates/sahara_scenario_conf.ini.j2 index a932ea78..40dd4dab 100644 --- a/roles/setup-sahara-scenario-env/templates/sahara_scenario_conf.ini.j2 +++ b/roles/setup-sahara-scenario-env/templates/sahara_scenario_conf.ini.j2 @@ -7,3 +7,10 @@ ci_flavor_id: {{ sahara_flavor_small }} cluster_name: testc is_transient: {{ sahara_cluster_transient }} auto_security_group: {{ sahara_auto_security_group }} +{% if sahara_enable_s3 -%} +s3_accesskey: {{ sahara_s3_accesskey }} +s3_secretkey: {{ sahara_s3_secretkey }} +s3_endpoint: {{ sahara_s3_endpoint }} +s3_endpoint_ssl: {{ sahara_s3_endpoint_ssl }} +s3_bucket_path: {{ sahara_s3_bucket_path }} +{% endif -%} diff --git a/sahara_tests/scenario/base.py b/sahara_tests/scenario/base.py index 1b2f78ac..0832a165 100644 --- a/sahara_tests/scenario/base.py +++ b/sahara_tests/scenario/base.py @@ -33,6 +33,7 @@ from sahara_tests.scenario import clients from sahara_tests.scenario import timeouts from sahara_tests.scenario import utils from sahara_tests.utils import crypto as ssh +from sahara_tests.utils import url as utils_url logger = logging.getLogger('swiftclient') logger.setLevel(logging.CRITICAL) @@ -150,6 +151,13 @@ class BaseTestCase(base.BaseTestCase): cacert=self.credentials.get('ssl_cert'), tenant_name=tenant_name) self.glance = clients.GlanceClient(session=session) + # boto is not an OpenStack client, but we can handle it as well + self.boto = None + if self.credentials.get("s3_endpoint", None): + self.boto = clients.BotoClient( + endpoint=self.credentials["s3_endpoint"], + accesskey=self.credentials["s3_accesskey"], + secretkey=self.credentials["s3_secretkey"]) def create_cluster(self): self.cluster_id = self.sahara.get_cluster_id( @@ -239,17 +247,34 @@ class BaseTestCase(base.BaseTestCase): def _create_datasources(self, job): def create(ds, name): + credential_vars = {} source = ds.get('source', None) destination = None if source else utils.rand_name( ds['destination']) if ds['type'] == 'swift': url = self._create_swift_data(source, destination) - if ds['type'] == 'hdfs': + credential_vars = { + 'credential_user': self.credentials['os_username'], + 'credential_pass': self.credentials['os_password'] + } + elif ds['type'] == 's3': + url = self._create_s3_data(source, destination) + credential_vars = { + 's3_credentials': { + 'accesskey': self.credentials['s3_accesskey'], + 'secretkey': self.credentials['s3_secretkey'], + 'endpoint': utils_url.url_schema_remover( + self.credentials['s3_endpoint']), + 'ssl': self.credentials['s3_endpoint_ssl'], + 'bucket_in_path': self.credentials['s3_bucket_path'] + } + } + elif ds['type'] == 'hdfs': url = self._create_dfs_data(source, destination, self.testcase.get('hdfs_username', 'hadoop'), ds['type']) - if ds['type'] == 'maprfs': + elif ds['type'] == 'maprfs': url = self._create_dfs_data(source, destination, ds.get('maprfs_username', 'mapr'), ds['type']) @@ -257,8 +282,7 @@ class BaseTestCase(base.BaseTestCase): name=utils.rand_name(name), description='', data_source_type=ds['type'], url=url, - credential_user=self.credentials['os_username'], - credential_pass=self.credentials['os_password']) + **credential_vars) input_id, output_id = None, None if job.get('input_datasource'): @@ -289,7 +313,12 @@ class BaseTestCase(base.BaseTestCase): url = self._create_swift_data(job_binary['source']) extra['user'] = self.credentials['os_username'] extra['password'] = self.credentials['os_password'] - if job_binary['type'] == 'database': + elif job_binary['type'] == 's3': + url = self._create_s3_data(job_binary['source']) + extra['accesskey'] = self.credentials['s3_accesskey'] + extra['secretkey'] = self.credentials['s3_secretkey'] + extra['endpoint'] = self.credentials['s3_endpoint'] + elif job_binary['type'] == 'database': url = self._create_internal_db_data(job_binary['source']) job_binary_name = '%s-%s' % ( @@ -383,6 +412,15 @@ class BaseTestCase(base.BaseTestCase): return 'swift://%s.sahara/%s' % (container, path) + def _create_s3_data(self, source=None, destination=None): + bucket = self._get_s3_bucket() + path = utils.rand_name(destination if destination else 'test') + data = self._read_source_file(source) + + self.__upload_to_bucket(bucket, path, data) + + return 's3://%s/%s' % (bucket, path) + def _create_dfs_data(self, source, destination, hdfs_username, fs): def to_hex_present(string): @@ -430,6 +468,12 @@ class BaseTestCase(base.BaseTestCase): utils.rand_name('sahara-tests')) return self.__swift_container + def _get_s3_bucket(self): + if not getattr(self, '__s3_bucket', None): + self.__s3_bucket = self.__create_bucket( + utils.rand_name('sahara-tests')) + return self.__s3_bucket + @track_result("Cluster scaling", False) def check_scale(self): scale_ops = [] @@ -792,6 +836,19 @@ class BaseTestCase(base.BaseTestCase): self.addCleanup(self.swift.delete_object, container_name, object_name) + def __create_bucket(self, bucket_name): + self.boto.create_bucket(bucket_name) + if not self.testcase['retain_resources']: + self.addCleanup(self.boto.delete_bucket, bucket_name) + return bucket_name + + def __upload_to_bucket(self, bucket_name, object_name, data=None): + if data: + self.boto.upload_data(bucket_name, object_name, data) + if not self.testcase['retain_resources']: + self.addCleanup(self.boto.delete_object, bucket_name, + object_name) + def __create_keypair(self): key = utils.rand_name('scenario_key') self.nova.nova_client.keypairs.create(key, diff --git a/sahara_tests/scenario/clients.py b/sahara_tests/scenario/clients.py index 42a4fe1b..eb19a655 100644 --- a/sahara_tests/scenario/clients.py +++ b/sahara_tests/scenario/clients.py @@ -17,6 +17,8 @@ from __future__ import print_function import time import fixtures +from botocore.exceptions import ClientError as BotoClientError +from botocore import session as botocore_session from glanceclient import client as glance_client from keystoneauth1.identity import v3 as identity_v3 from keystoneauth1 import session @@ -363,6 +365,76 @@ class SwiftClient(Client): return False +class BotoClient(Client): + def __init__(self, *args, **kwargs): + sess = botocore_session.get_session() + self.boto_client = sess.create_client( + 's3', + endpoint_url=kwargs['endpoint'], + aws_access_key_id=kwargs['accesskey'], + aws_secret_access_key=kwargs['secretkey'] + ) + + def create_bucket(self, bucket_name): + return self.boto_client.create_bucket(Bucket=bucket_name) + + def _delete_and_check_bucket(self, bucket_name): + bucket_deleted = False + operation_parameters = {'Bucket': bucket_name} + try: + # While list_objects_v2 is the suggested function, pagination + # does not seems to work properly with RadosGW when it's used. + paginator = self.boto_client.get_paginator('list_objects') + page_iterator = paginator.paginate(**operation_parameters) + for page in page_iterator: + if 'Contents' not in page: + continue + for item in page['Contents']: + self.boto_client.delete_object(Bucket=bucket_name, + Key=item['Key']) + self.boto_client.delete_bucket(Bucket=bucket_name) + except BotoClientError as ex: + error = ex.response.get('Error', {}) + # without the conversion the value is a tuple + error_code = '%s' % (error.get('Code', '')) + if error_code == 'NoSuchBucket': + bucket_deleted = True + return bucket_deleted + + def delete_bucket(self, bucket_name): + return self.delete_resource( + self._delete_and_check_bucket, bucket_name) + + def upload_data(self, bucket_name, object_name, data): + return self.boto_client.put_object( + Bucket=bucket_name, + Key=object_name, + Body=data) + + def _delete_and_check_object(self, bucket_name, object_name): + self.boto_client.delete_object(Bucket=bucket_name, Key=object_name) + object_deleted = False + try: + self.boto_client.head_object(Bucket=bucket_name, Key=object_name) + except BotoClientError as ex: + error = ex.response.get('Error', {}) + # without the conversion the value is a tuple + error_code = '%s' % (error.get('Code', '')) + if error_code == '404': + object_deleted = True + return object_deleted + + def delete_object(self, bucket_name, object_name): + return self.delete_resource( + self._delete_and_check_object, + bucket_name, object_name) + + def is_resource_deleted(self, method, *args, **kwargs): + # Exceptions are handled directly inside the call to "method", + # because they are not the same for objects and buckets. + return method(*args, **kwargs) + + class GlanceClient(Client): def __init__(self, *args, **kwargs): self.glance_client = glance_client.Client('2', *args, **kwargs) diff --git a/sahara_tests/scenario/defaults/ambari-2.4.yaml.mako b/sahara_tests/scenario/defaults/ambari-2.4.yaml.mako index 08e04c7c..a95e88e4 100644 --- a/sahara_tests/scenario/defaults/ambari-2.4.yaml.mako +++ b/sahara_tests/scenario/defaults/ambari-2.4.yaml.mako @@ -66,4 +66,7 @@ clusters: size: 1 edp_jobs_flow: - java_job + - name: mapreduce_job_s3 + features: + - s3 - spark_pi diff --git a/sahara_tests/scenario/defaults/ambari-2.5.yaml.mako b/sahara_tests/scenario/defaults/ambari-2.5.yaml.mako index 52de39fc..fab951cb 100644 --- a/sahara_tests/scenario/defaults/ambari-2.5.yaml.mako +++ b/sahara_tests/scenario/defaults/ambari-2.5.yaml.mako @@ -66,4 +66,7 @@ clusters: size: 1 edp_jobs_flow: - java_job + - name: mapreduce_job_s3 + features: + - s3 - spark_pi diff --git a/sahara_tests/scenario/defaults/ambari-2.6.yaml.mako b/sahara_tests/scenario/defaults/ambari-2.6.yaml.mako index ddbbe66e..a06aac88 100644 --- a/sahara_tests/scenario/defaults/ambari-2.6.yaml.mako +++ b/sahara_tests/scenario/defaults/ambari-2.6.yaml.mako @@ -66,4 +66,7 @@ clusters: size: 1 edp_jobs_flow: - java_job + - name: mapreduce_job_s3 + features: + - s3 - spark_pi diff --git a/sahara_tests/scenario/defaults/cdh-5.11.0.yaml.mako b/sahara_tests/scenario/defaults/cdh-5.11.0.yaml.mako index 85e393d4..6e647659 100644 --- a/sahara_tests/scenario/defaults/cdh-5.11.0.yaml.mako +++ b/sahara_tests/scenario/defaults/cdh-5.11.0.yaml.mako @@ -89,6 +89,9 @@ clusters: edp_jobs_flow: - pig_job - mapreduce_job + - name: mapreduce_job_s3 + features: + - s3 - mapreduce_streaming_job - java_job - spark_wordcount diff --git a/sahara_tests/scenario/defaults/cdh-5.13.0.yaml.mako b/sahara_tests/scenario/defaults/cdh-5.13.0.yaml.mako index 986654ab..9631ba72 100644 --- a/sahara_tests/scenario/defaults/cdh-5.13.0.yaml.mako +++ b/sahara_tests/scenario/defaults/cdh-5.13.0.yaml.mako @@ -89,6 +89,9 @@ clusters: edp_jobs_flow: - pig_job - mapreduce_job + - name: mapreduce_job_s3 + features: + - s3 - mapreduce_streaming_job - java_job - spark_wordcount diff --git a/sahara_tests/scenario/defaults/cdh-5.9.0.yaml.mako b/sahara_tests/scenario/defaults/cdh-5.9.0.yaml.mako index e5c6c92c..22c40d8e 100644 --- a/sahara_tests/scenario/defaults/cdh-5.9.0.yaml.mako +++ b/sahara_tests/scenario/defaults/cdh-5.9.0.yaml.mako @@ -89,6 +89,9 @@ clusters: edp_jobs_flow: - pig_job - mapreduce_job + - name: mapreduce_job_s3 + features: + - s3 - mapreduce_streaming_job - java_job - spark_wordcount diff --git a/sahara_tests/scenario/defaults/credentials_s3.yaml.mako b/sahara_tests/scenario/defaults/credentials_s3.yaml.mako new file mode 100644 index 00000000..48e01da7 --- /dev/null +++ b/sahara_tests/scenario/defaults/credentials_s3.yaml.mako @@ -0,0 +1,6 @@ +credentials: + s3_accesskey: ${s3_accesskey} + s3_secretkey: ${s3_secretkey} + s3_endpoint: ${s3_endpoint} + s3_endpoint_ssl: ${s3_endpoint_ssl} + s3_bucket_path: ${s3_bucket_path} diff --git a/sahara_tests/scenario/defaults/edp_s3.yaml.mako b/sahara_tests/scenario/defaults/edp_s3.yaml.mako new file mode 100644 index 00000000..a7c14622 --- /dev/null +++ b/sahara_tests/scenario/defaults/edp_s3.yaml.mako @@ -0,0 +1,49 @@ +edp_jobs_flow: + mapreduce_job_s3: + - type: MapReduce + input_datasource: + type: s3 + source: edp-examples/edp-pig/trim-spaces/data/input + output_datasource: + type: s3 + destination: /user/hadoop/edp-output + additional_libs: + - type: s3 + source: edp-examples/edp-mapreduce/edp-mapreduce.jar + configs: + mapred.map.class: org.apache.oozie.example.SampleMapper + mapred.reduce.class: org.apache.oozie.example.SampleReducer + mapreduce.framework.name: yarn + fs.swift.service.sahara.username: ${os_username} + fs.swift.service.sahara.password: ${os_password} + fs.s3a.access.key: ${s3_accesskey} + fs.s3a.secret.key: ${s3_secretkey} + fs.s3a.endpoint: ${s3_endpoint} + fs.s3a.connection.ssl.enabled: ${s3_endpoint_ssl} + fs.s3a.path.style.access: ${s3_bucket_path} + spark_wordcount_s3: + - type: Spark + input_datasource: + type: s3 + source: edp-examples/edp-spark/sample_input.txt + output_datasource: + type: s3 + destination: edp-output + main_lib: + type: s3 + source: edp-examples/edp-spark/spark-wordcount.jar + configs: + edp.java.main_class: sahara.edp.spark.SparkWordCount + edp.spark.adapt_for_swift: true + fs.swift.service.sahara.username: ${os_username} + fs.swift.service.sahara.password: ${os_password} + fs.swift.service.sahara.username: ${os_username} + fs.swift.service.sahara.password: ${os_password} + fs.s3a.access.key: ${s3_accesskey} + fs.s3a.secret.key: ${s3_secretkey} + fs.s3a.endpoint: ${s3_endpoint} + fs.s3a.connection.ssl.enabled: ${s3_endpoint_ssl} + fs.s3a.path.style.access: ${s3_bucket_path} + args: + - '{input_datasource}' + - '{output_datasource}' diff --git a/sahara_tests/scenario/defaults/mapr-5.2.0.mrv2.yaml.mako b/sahara_tests/scenario/defaults/mapr-5.2.0.mrv2.yaml.mako index 777a73ab..b1064d4f 100644 --- a/sahara_tests/scenario/defaults/mapr-5.2.0.mrv2.yaml.mako +++ b/sahara_tests/scenario/defaults/mapr-5.2.0.mrv2.yaml.mako @@ -54,3 +54,6 @@ clusters: size: 1 edp_jobs_flow: - mapr + - name: mapreduce_job_s3 + features: + - s3 diff --git a/sahara_tests/scenario/defaults/rocky/ambari-2.4.yaml.mako b/sahara_tests/scenario/defaults/rocky/ambari-2.4.yaml.mako index 08e04c7c..a95e88e4 100644 --- a/sahara_tests/scenario/defaults/rocky/ambari-2.4.yaml.mako +++ b/sahara_tests/scenario/defaults/rocky/ambari-2.4.yaml.mako @@ -66,4 +66,7 @@ clusters: size: 1 edp_jobs_flow: - java_job + - name: mapreduce_job_s3 + features: + - s3 - spark_pi diff --git a/sahara_tests/scenario/defaults/rocky/ambari-2.5.yaml.mako b/sahara_tests/scenario/defaults/rocky/ambari-2.5.yaml.mako index 52de39fc..fab951cb 100644 --- a/sahara_tests/scenario/defaults/rocky/ambari-2.5.yaml.mako +++ b/sahara_tests/scenario/defaults/rocky/ambari-2.5.yaml.mako @@ -66,4 +66,7 @@ clusters: size: 1 edp_jobs_flow: - java_job + - name: mapreduce_job_s3 + features: + - s3 - spark_pi diff --git a/sahara_tests/scenario/defaults/rocky/ambari-2.6.yaml.mako b/sahara_tests/scenario/defaults/rocky/ambari-2.6.yaml.mako index ddbbe66e..a06aac88 100644 --- a/sahara_tests/scenario/defaults/rocky/ambari-2.6.yaml.mako +++ b/sahara_tests/scenario/defaults/rocky/ambari-2.6.yaml.mako @@ -66,4 +66,7 @@ clusters: size: 1 edp_jobs_flow: - java_job + - name: mapreduce_job_s3 + features: + - s3 - spark_pi diff --git a/sahara_tests/scenario/defaults/rocky/cdh-5.11.0.yaml.mako b/sahara_tests/scenario/defaults/rocky/cdh-5.11.0.yaml.mako index 85e393d4..6e647659 100644 --- a/sahara_tests/scenario/defaults/rocky/cdh-5.11.0.yaml.mako +++ b/sahara_tests/scenario/defaults/rocky/cdh-5.11.0.yaml.mako @@ -89,6 +89,9 @@ clusters: edp_jobs_flow: - pig_job - mapreduce_job + - name: mapreduce_job_s3 + features: + - s3 - mapreduce_streaming_job - java_job - spark_wordcount diff --git a/sahara_tests/scenario/defaults/rocky/cdh-5.13.0.yaml.mako b/sahara_tests/scenario/defaults/rocky/cdh-5.13.0.yaml.mako index 986654ab..9631ba72 100644 --- a/sahara_tests/scenario/defaults/rocky/cdh-5.13.0.yaml.mako +++ b/sahara_tests/scenario/defaults/rocky/cdh-5.13.0.yaml.mako @@ -89,6 +89,9 @@ clusters: edp_jobs_flow: - pig_job - mapreduce_job + - name: mapreduce_job_s3 + features: + - s3 - mapreduce_streaming_job - java_job - spark_wordcount diff --git a/sahara_tests/scenario/defaults/rocky/cdh-5.9.0.yaml.mako b/sahara_tests/scenario/defaults/rocky/cdh-5.9.0.yaml.mako index e5c6c92c..22c40d8e 100644 --- a/sahara_tests/scenario/defaults/rocky/cdh-5.9.0.yaml.mako +++ b/sahara_tests/scenario/defaults/rocky/cdh-5.9.0.yaml.mako @@ -89,6 +89,9 @@ clusters: edp_jobs_flow: - pig_job - mapreduce_job + - name: mapreduce_job_s3 + features: + - s3 - mapreduce_streaming_job - java_job - spark_wordcount diff --git a/sahara_tests/scenario/defaults/rocky/mapr-5.2.0.mrv2.yaml.mako b/sahara_tests/scenario/defaults/rocky/mapr-5.2.0.mrv2.yaml.mako index 777a73ab..b1064d4f 100644 --- a/sahara_tests/scenario/defaults/rocky/mapr-5.2.0.mrv2.yaml.mako +++ b/sahara_tests/scenario/defaults/rocky/mapr-5.2.0.mrv2.yaml.mako @@ -54,3 +54,6 @@ clusters: size: 1 edp_jobs_flow: - mapr + - name: mapreduce_job_s3 + features: + - s3 diff --git a/sahara_tests/scenario/runner.py b/sahara_tests/scenario/runner.py index 25854ba3..72377bb5 100755 --- a/sahara_tests/scenario/runner.py +++ b/sahara_tests/scenario/runner.py @@ -125,7 +125,7 @@ def get_base_parser(): parser.add_argument('--report', default=False, action='store_true', help='Write results of test to file') parser.add_argument('--feature', '-f', default=[], - nargs='?', help='Set of features to enable') + action='append', help='Set of features to enable') parser.add_argument('--count', default=1, nargs='?', type=valid_count, help='Specify count of runs current cases.') parser.add_argument('--v2', '-2', default=False, action='store_true', diff --git a/sahara_tests/scenario/validation.py b/sahara_tests/scenario/validation.py index 0a7c2982..9beee192 100644 --- a/sahara_tests/scenario/validation.py +++ b/sahara_tests/scenario/validation.py @@ -60,6 +60,21 @@ SCHEMA = { "type": "string", "minLength": 1 }, + "s3_accesskey": { + "type": "string", + }, + "s3_secretkey": { + "type": "string", + }, + "s3_endpoint": { + "type": "string", + }, + "s3_endpoint_ssl": { + "type": "boolean", + }, + "s3_bucket_path": { + "type": "boolean", + }, }, "additionalProperties": False }, @@ -370,7 +385,8 @@ SCHEMA = { "properties": { "type": { "type": "string", - "enum": ["swift", "hdfs", "maprfs"] + "enum": ["swift", "hdfs", "maprfs", + "s3"] }, "source": { "type": "string" @@ -384,7 +400,8 @@ SCHEMA = { "properties": { "type": { "type": "string", - "enum": ["swift", "hdfs", "maprfs"] + "enum": ["swift", "hdfs", "maprfs", + "s3"] }, "destination": { "type": "string" @@ -398,7 +415,7 @@ SCHEMA = { "properties": { "type": { "type": "string", - "enum": ["swift", "database"] + "enum": ["swift", "s3", "database"] }, "source": { "type": "string" @@ -414,7 +431,7 @@ SCHEMA = { "properties": { "type": { "type": "string", - "enum": ["swift", "database"] + "enum": ["swift", "s3", "database"] }, "source": { "type": "string" diff --git a/sahara_tests/unit/scenario/test_base.py b/sahara_tests/unit/scenario/test_base.py index 1d3f4c3e..a78ee43f 100644 --- a/sahara_tests/unit/scenario/test_base.py +++ b/sahara_tests/unit/scenario/test_base.py @@ -88,6 +88,9 @@ class TestBase(testtools.TestCase): 'os_tenant': 'admin', 'os_auth_url': 'http://localhost:5000/v2.0', + 's3_accesskey': 'very_long_key', + 's3_secretkey': 'very_long_secret', + 's3_endpoint': 'https://localhost', 'sahara_service_type': 'data-processing-local', 'sahara_url': @@ -143,7 +146,7 @@ class TestBase(testtools.TestCase): "destination": "/user/hadoop/edp-output" }, "main_lib": { - "type": "swift", + "type": "s3", "source": "sahara_tests/scenario/defaults/" "edp-examples/edp-pig/" "top-todoers/example.pig" @@ -350,12 +353,18 @@ class TestBase(testtools.TestCase): @mock.patch('saharaclient.api.base.ResourceManager._create', return_value=FakeResponse(set_id='id_for_job_binaries')) + @mock.patch('sahara_tests.scenario.clients.BotoClient.upload_data', + return_value={}) + @mock.patch('sahara_tests.scenario.clients.BotoClient.create_bucket', + return_value={'Location': 'foo'}) @mock.patch('swiftclient.client.Connection.put_object', return_value=None) @mock.patch('swiftclient.client.Connection.put_container', return_value=None) def test__create_create_job_binaries(self, mock_swiftcontainer, mock_swiftobject, + mock_create_bucket, + mock_upload_bucket_data, mock_sahara_create): self.base_scenario._init_clients() self.assertEqual((['id_for_job_binaries'], []), @@ -364,6 +373,8 @@ class TestBase(testtools.TestCase): @mock.patch('saharaclient.api.base.ResourceManager._create', return_value=FakeResponse(set_id='id_for_job_binary')) + @mock.patch('sahara_tests.scenario.clients.BotoClient.create_bucket', + return_value={'Location': 'foo'}) @mock.patch('swiftclient.client.Connection.put_object', return_value=None) @mock.patch('swiftclient.client.Connection.put_container', @@ -371,7 +382,7 @@ class TestBase(testtools.TestCase): @mock.patch('saharaclient.client.Client', return_value=FakeSaharaClient()) def test__create_create_job_binary(self, mock_saharaclient, mock_swiftcontainer, mock_swiftobject, - mock_sahara_create): + mock_create_bucket, mock_sahara_create): self.base_scenario._init_clients() self.assertEqual('id_for_job_binary', self.base_scenario._create_job_binary(self.job.get( @@ -431,7 +442,7 @@ class TestBase(testtools.TestCase): { "type": "Pig", "input_datasource": { - "type": "swift", + "type": "s3", "source": "sahara_tests/scenario/defaults/edp-examples/" "edp-pig/top-todoers/" "data/input" diff --git a/sahara_tests/unit/utils/__init__.py b/sahara_tests/unit/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sahara_tests/unit/utils/test_url.py b/sahara_tests/unit/utils/test_url.py new file mode 100644 index 00000000..840f7ca2 --- /dev/null +++ b/sahara_tests/unit/utils/test_url.py @@ -0,0 +1,38 @@ +# Copyright 2018 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import testtools + +from sahara_tests.utils import url as utils_url + + +class UrlUnitTest(testtools.TestCase): + + def _check_clean_url(self, url, expected_clean_url): + """Check if the cleaned URL matches the expected one.""" + clean_url = utils_url.url_schema_remover(url) + self.assertEqual(clean_url, expected_clean_url) + + def test_clean_url_http(self): + self._check_clean_url('https://s3.amazonaws.com', + 's3.amazonaws.com') + + def test_clean_url_https_longer(self): + self._check_clean_url('https://s3.amazonaws.com/foo', + 's3.amazonaws.com/foo') + + def test_clean_url_file(self): + self._check_clean_url('file:///s3.amazonaws.com/bar', + '/s3.amazonaws.com/bar') diff --git a/sahara_tests/utils/url.py b/sahara_tests/utils/url.py new file mode 100644 index 00000000..dbda2533 --- /dev/null +++ b/sahara_tests/utils/url.py @@ -0,0 +1,27 @@ +# Copyright 2018 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from six.moves.urllib import parse as urlparse + + +def url_schema_remover(url): + """ Return the same URL without the schema. + Example: prefix://host/path -> host/path + """ + parsed = urlparse.urlsplit(url) + cleaned = urlparse.urlunsplit((('',) + parsed[1:])) + if cleaned.startswith('//'): + cleaned = cleaned[2:] + return cleaned