Scenario test runner: support for S3 testing

- use botocore to support operations on S3 entities
  (objects and buckets).
- enable the S3 API in the RadosGW job and prepare
  the job code to setup the settings required for S3
  testing (even if it may not be possible to use them
  on -infra right now).
- add the default templates for S3 settings, so that
  they can be used when the "s3" feature is requested.
- add an S3 job to the templates for all plugins and
  versions that support S3 in rocky and master.
  Tag them with the "s3" feature, so they are not
  executed by default.

Story: 2004701
Task: 28725
Change-Id: Ie3da4d5ad604115e90b41fab9856107684b3a9d0
This commit is contained in:
Luigi Toscano 2019-01-18 17:38:13 +01:00
parent 7e39b7b05d
commit 5e33889cee
38 changed files with 430 additions and 18 deletions

View File

@ -1,3 +1,3 @@
[DEFAULT]
test_path=sahara_tests/unit/scenario
test_path=sahara_tests/unit
group_regex=([^\.]+\.)+

View File

@ -238,6 +238,7 @@
s-container: false
s-object: false
s-proxy: false
sahara_enable_s3: True
- job:
name: sahara-tests-scenario-multinode-spark

View File

@ -0,0 +1,6 @@
credentials:
s3_accesskey: ${s3_accesskey}
s3_secretkey: ${s3_secretkey}
s3_endpoint: ${s3_endpoint}
s3_endpoint_ssl: ${s3_endpoint_ssl}
s3_bucket_path: ${s3_bucket_path}

View File

@ -0,0 +1,25 @@
edp_jobs_flow:
spark_wordcount_s3:
- type: Spark
input_datasource:
type: s3
source: edp-examples/edp-spark/sample_input.txt
output_datasource:
type: swift
destination: edp-output
main_lib:
type: swift
source: edp-examples/edp-spark/spark-wordcount.jar
configs:
edp.java.main_class: sahara.edp.spark.SparkWordCount
edp.spark.adapt_for_swift: true
fs.swift.service.sahara.username: ${os_username}
fs.swift.service.sahara.password: ${os_password}
fs.s3a.access.key: ${s3_accesskey}
fs.s3a.secret.key: ${s3_secretkey}
fs.s3a.endpoint: ${s3_endpoint}
fs.s3a.connection.ssl.enabled: ${s3_endpoint_ssl}
fs.s3a.path.style.access: ${s3_bucket_path}
args:
- '{input_datasource}'
- '{output_datasource}'

View File

@ -32,6 +32,7 @@ clusters:
node_group: worker
size: 1
scenario:
- run_jobs
- scale
edp_jobs_flow:
- spark_pi

View File

@ -23,7 +23,10 @@ clusters:
- operation: add
node_group: cdh-worker
size: 1
edp_jobs_flow: test_flow
edp_jobs_flow:
- test_flow
- name: test_manila
features: manila
edp_jobs_flow:
test_flow:
@ -71,9 +74,8 @@ edp_jobs_flow:
configs:
edp.streaming.mapper: /bin/cat
edp.streaming.reducer: /usr/bin/wc
test_manila:
- type: Pig
features:
- manila
input_datasource:
type: manila
source: etc/edp-examples/edp-pig/top-todoers/data/input

View File

@ -0,0 +1,8 @@
---
features:
- |
sahara-scenario now supports testing the S3 API for job binaries
and data sources, a feature introduced in Rocky.
The code can be enabled using the "s3" feature and
various templates now runs an S3-based job too when
the feature is enabled from the command line.

View File

@ -5,6 +5,7 @@
pbr>=1.6 # Apache-2.0
Mako>=0.4.0 # MIT
botocore>=1.5.1 # Apache-2.0
fixtures>=3.0.0 # Apache-2.0/BSD
jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT
keystoneauth1>=2.1.0 # Apache-2.0

View File

@ -4,3 +4,4 @@ sahara_cloud_demo: 'devstack-admin'
sahara_scenario_conf: '{{ ansible_user_dir }}/template_vars.ini'
sahara_scenario_test_template: 'fake.yaml.mako'
sahara_scenario_tox_env: 'venv'
sahara_enable_s3: False

View File

@ -4,11 +4,18 @@
tox -e {{ sahara_scenario_tox_env }} --sitepackages -- sahara-scenario --verbose -V {{ sahara_scenario_conf }} \
etc/scenario/gate/credentials.yaml.mako \
etc/scenario/gate/edp.yaml.mako \
{% if sahara_enable_s3 -%}
etc/scenario/gate/credentials_s3.yaml.mako \
etc/scenario/gate/edp_s3.yaml.mako \
{% endif -%}
etc/scenario/gate/{{ sahara_scenario_test_template }} \
--os-cloud {{ sahara_cloud_demo }} \
{% if sahara_scenario_use_api_v2|default(False) -%}
--v2 \
{% endif %}
{% endif -%}
{% if sahara_enable_s3 -%}
--feature s3 \
{% endif -%}
| tee scenario.log
if grep -qE '(FAILED|ERROR:)' scenario.log; then
exit 1

View File

@ -7,6 +7,7 @@ sahara_image_name: 'xenial-server'
sahara_image_user: 'ubuntu'
sahara_image_format: 'qcow2'
sahara_scenario_conf: '{{ ansible_user_dir }}/template_vars.ini'
sahara_enable_s3: False
sahara_network_type: 'neutron'
private_network_name: 'private'
public_network_name: 'public'

View File

@ -28,6 +28,10 @@
openstack --os-cloud {{ sahara_cloud_demo }} dataprocessing image tags add {{ sahara_image_name }} --tags \
{{ sahara_plugin_version }} {{ sahara_plugin }}
- name: S3 configuration
import_tasks: setup_s3.yaml
when: sahara_enable_s3
# we cannot use os_nova_flavor as well (see above)
- name: create the required flavor(s)
command: |

View File

@ -0,0 +1,29 @@
- name: create the S3 credentials
shell: |
ACCESS_KEY=$(openstack --os-cloud {{ sahara_cloud_demo }} ec2 credentials list -f value -c Access | head -n1)
if [ -z "${ACCESS_KEY}" ]; then
ACCESS_KEY=$(openstack --os-cloud {{ sahara_cloud_demo }} ec2 credentials create -f value -c access)
fi
SECRET_KEY=$(openstack --os-cloud {{ sahara_cloud_demo }} ec2 credentials list -f value -c Secret | head -n1)
printf "${ACCESS_KEY}\n${SECRET_KEY}"
register: sahara_s3_credentials_out
# This task should not be needed normally and the endpoint should be discovered by default
- name: find the swift endpoint for S3
shell: |
ENDPOINT=$(openstack --os-cloud {{ sahara_cloud_admin }} endpoint list --service swift --interface public -c URL -f value)
ENDPOINT_PREFIX=$(awk -F'//' '{print $1}')
ENDPOINT_SSL="False"
if [ "${ENDPOINT_PREFIX}" = "https" ]; then
ENDPOINT_SSL="True"
fi
printf "${ENDPOINT}\n${ENDPOINT_SSL}"
register: sahara_s3_endpoint_out
- name: save the S3 access data
set_fact:
sahara_s3_accesskey: "{{ sahara_s3_credentials_out.stdout_lines[0] }}"
sahara_s3_secretkey: "{{ sahara_s3_credentials_out.stdout_lines[1] }}"
sahara_s3_endpoint: "{{ sahara_s3_endpoint_out.stdout_lines[0] }}"
sahara_s3_endpoint_ssl: "{{ sahara_s3_endpoint_out.stdout_lines[1] }}"
sahara_s3_bucket_path: True

View File

@ -7,3 +7,10 @@ ci_flavor_id: {{ sahara_flavor_small }}
cluster_name: testc
is_transient: {{ sahara_cluster_transient }}
auto_security_group: {{ sahara_auto_security_group }}
{% if sahara_enable_s3 -%}
s3_accesskey: {{ sahara_s3_accesskey }}
s3_secretkey: {{ sahara_s3_secretkey }}
s3_endpoint: {{ sahara_s3_endpoint }}
s3_endpoint_ssl: {{ sahara_s3_endpoint_ssl }}
s3_bucket_path: {{ sahara_s3_bucket_path }}
{% endif -%}

View File

@ -33,6 +33,7 @@ from sahara_tests.scenario import clients
from sahara_tests.scenario import timeouts
from sahara_tests.scenario import utils
from sahara_tests.utils import crypto as ssh
from sahara_tests.utils import url as utils_url
logger = logging.getLogger('swiftclient')
logger.setLevel(logging.CRITICAL)
@ -150,6 +151,13 @@ class BaseTestCase(base.BaseTestCase):
cacert=self.credentials.get('ssl_cert'),
tenant_name=tenant_name)
self.glance = clients.GlanceClient(session=session)
# boto is not an OpenStack client, but we can handle it as well
self.boto = None
if self.credentials.get("s3_endpoint", None):
self.boto = clients.BotoClient(
endpoint=self.credentials["s3_endpoint"],
accesskey=self.credentials["s3_accesskey"],
secretkey=self.credentials["s3_secretkey"])
def create_cluster(self):
self.cluster_id = self.sahara.get_cluster_id(
@ -239,17 +247,34 @@ class BaseTestCase(base.BaseTestCase):
def _create_datasources(self, job):
def create(ds, name):
credential_vars = {}
source = ds.get('source', None)
destination = None if source else utils.rand_name(
ds['destination'])
if ds['type'] == 'swift':
url = self._create_swift_data(source, destination)
if ds['type'] == 'hdfs':
credential_vars = {
'credential_user': self.credentials['os_username'],
'credential_pass': self.credentials['os_password']
}
elif ds['type'] == 's3':
url = self._create_s3_data(source, destination)
credential_vars = {
's3_credentials': {
'accesskey': self.credentials['s3_accesskey'],
'secretkey': self.credentials['s3_secretkey'],
'endpoint': utils_url.url_schema_remover(
self.credentials['s3_endpoint']),
'ssl': self.credentials['s3_endpoint_ssl'],
'bucket_in_path': self.credentials['s3_bucket_path']
}
}
elif ds['type'] == 'hdfs':
url = self._create_dfs_data(source, destination,
self.testcase.get('hdfs_username',
'hadoop'),
ds['type'])
if ds['type'] == 'maprfs':
elif ds['type'] == 'maprfs':
url = self._create_dfs_data(source, destination,
ds.get('maprfs_username', 'mapr'),
ds['type'])
@ -257,8 +282,7 @@ class BaseTestCase(base.BaseTestCase):
name=utils.rand_name(name),
description='',
data_source_type=ds['type'], url=url,
credential_user=self.credentials['os_username'],
credential_pass=self.credentials['os_password'])
**credential_vars)
input_id, output_id = None, None
if job.get('input_datasource'):
@ -289,7 +313,12 @@ class BaseTestCase(base.BaseTestCase):
url = self._create_swift_data(job_binary['source'])
extra['user'] = self.credentials['os_username']
extra['password'] = self.credentials['os_password']
if job_binary['type'] == 'database':
elif job_binary['type'] == 's3':
url = self._create_s3_data(job_binary['source'])
extra['accesskey'] = self.credentials['s3_accesskey']
extra['secretkey'] = self.credentials['s3_secretkey']
extra['endpoint'] = self.credentials['s3_endpoint']
elif job_binary['type'] == 'database':
url = self._create_internal_db_data(job_binary['source'])
job_binary_name = '%s-%s' % (
@ -383,6 +412,15 @@ class BaseTestCase(base.BaseTestCase):
return 'swift://%s.sahara/%s' % (container, path)
def _create_s3_data(self, source=None, destination=None):
bucket = self._get_s3_bucket()
path = utils.rand_name(destination if destination else 'test')
data = self._read_source_file(source)
self.__upload_to_bucket(bucket, path, data)
return 's3://%s/%s' % (bucket, path)
def _create_dfs_data(self, source, destination, hdfs_username, fs):
def to_hex_present(string):
@ -430,6 +468,12 @@ class BaseTestCase(base.BaseTestCase):
utils.rand_name('sahara-tests'))
return self.__swift_container
def _get_s3_bucket(self):
if not getattr(self, '__s3_bucket', None):
self.__s3_bucket = self.__create_bucket(
utils.rand_name('sahara-tests'))
return self.__s3_bucket
@track_result("Cluster scaling", False)
def check_scale(self):
scale_ops = []
@ -792,6 +836,19 @@ class BaseTestCase(base.BaseTestCase):
self.addCleanup(self.swift.delete_object, container_name,
object_name)
def __create_bucket(self, bucket_name):
self.boto.create_bucket(bucket_name)
if not self.testcase['retain_resources']:
self.addCleanup(self.boto.delete_bucket, bucket_name)
return bucket_name
def __upload_to_bucket(self, bucket_name, object_name, data=None):
if data:
self.boto.upload_data(bucket_name, object_name, data)
if not self.testcase['retain_resources']:
self.addCleanup(self.boto.delete_object, bucket_name,
object_name)
def __create_keypair(self):
key = utils.rand_name('scenario_key')
self.nova.nova_client.keypairs.create(key,

View File

@ -17,6 +17,8 @@ from __future__ import print_function
import time
import fixtures
from botocore.exceptions import ClientError as BotoClientError
from botocore import session as botocore_session
from glanceclient import client as glance_client
from keystoneauth1.identity import v3 as identity_v3
from keystoneauth1 import session
@ -363,6 +365,76 @@ class SwiftClient(Client):
return False
class BotoClient(Client):
def __init__(self, *args, **kwargs):
sess = botocore_session.get_session()
self.boto_client = sess.create_client(
's3',
endpoint_url=kwargs['endpoint'],
aws_access_key_id=kwargs['accesskey'],
aws_secret_access_key=kwargs['secretkey']
)
def create_bucket(self, bucket_name):
return self.boto_client.create_bucket(Bucket=bucket_name)
def _delete_and_check_bucket(self, bucket_name):
bucket_deleted = False
operation_parameters = {'Bucket': bucket_name}
try:
# While list_objects_v2 is the suggested function, pagination
# does not seems to work properly with RadosGW when it's used.
paginator = self.boto_client.get_paginator('list_objects')
page_iterator = paginator.paginate(**operation_parameters)
for page in page_iterator:
if 'Contents' not in page:
continue
for item in page['Contents']:
self.boto_client.delete_object(Bucket=bucket_name,
Key=item['Key'])
self.boto_client.delete_bucket(Bucket=bucket_name)
except BotoClientError as ex:
error = ex.response.get('Error', {})
# without the conversion the value is a tuple
error_code = '%s' % (error.get('Code', ''))
if error_code == 'NoSuchBucket':
bucket_deleted = True
return bucket_deleted
def delete_bucket(self, bucket_name):
return self.delete_resource(
self._delete_and_check_bucket, bucket_name)
def upload_data(self, bucket_name, object_name, data):
return self.boto_client.put_object(
Bucket=bucket_name,
Key=object_name,
Body=data)
def _delete_and_check_object(self, bucket_name, object_name):
self.boto_client.delete_object(Bucket=bucket_name, Key=object_name)
object_deleted = False
try:
self.boto_client.head_object(Bucket=bucket_name, Key=object_name)
except BotoClientError as ex:
error = ex.response.get('Error', {})
# without the conversion the value is a tuple
error_code = '%s' % (error.get('Code', ''))
if error_code == '404':
object_deleted = True
return object_deleted
def delete_object(self, bucket_name, object_name):
return self.delete_resource(
self._delete_and_check_object,
bucket_name, object_name)
def is_resource_deleted(self, method, *args, **kwargs):
# Exceptions are handled directly inside the call to "method",
# because they are not the same for objects and buckets.
return method(*args, **kwargs)
class GlanceClient(Client):
def __init__(self, *args, **kwargs):
self.glance_client = glance_client.Client('2', *args, **kwargs)

View File

@ -66,4 +66,7 @@ clusters:
size: 1
edp_jobs_flow:
- java_job
- name: mapreduce_job_s3
features:
- s3
- spark_pi

View File

@ -66,4 +66,7 @@ clusters:
size: 1
edp_jobs_flow:
- java_job
- name: mapreduce_job_s3
features:
- s3
- spark_pi

View File

@ -66,4 +66,7 @@ clusters:
size: 1
edp_jobs_flow:
- java_job
- name: mapreduce_job_s3
features:
- s3
- spark_pi

View File

@ -89,6 +89,9 @@ clusters:
edp_jobs_flow:
- pig_job
- mapreduce_job
- name: mapreduce_job_s3
features:
- s3
- mapreduce_streaming_job
- java_job
- spark_wordcount

View File

@ -89,6 +89,9 @@ clusters:
edp_jobs_flow:
- pig_job
- mapreduce_job
- name: mapreduce_job_s3
features:
- s3
- mapreduce_streaming_job
- java_job
- spark_wordcount

View File

@ -89,6 +89,9 @@ clusters:
edp_jobs_flow:
- pig_job
- mapreduce_job
- name: mapreduce_job_s3
features:
- s3
- mapreduce_streaming_job
- java_job
- spark_wordcount

View File

@ -0,0 +1,6 @@
credentials:
s3_accesskey: ${s3_accesskey}
s3_secretkey: ${s3_secretkey}
s3_endpoint: ${s3_endpoint}
s3_endpoint_ssl: ${s3_endpoint_ssl}
s3_bucket_path: ${s3_bucket_path}

View File

@ -0,0 +1,49 @@
edp_jobs_flow:
mapreduce_job_s3:
- type: MapReduce
input_datasource:
type: s3
source: edp-examples/edp-pig/trim-spaces/data/input
output_datasource:
type: s3
destination: /user/hadoop/edp-output
additional_libs:
- type: s3
source: edp-examples/edp-mapreduce/edp-mapreduce.jar
configs:
mapred.map.class: org.apache.oozie.example.SampleMapper
mapred.reduce.class: org.apache.oozie.example.SampleReducer
mapreduce.framework.name: yarn
fs.swift.service.sahara.username: ${os_username}
fs.swift.service.sahara.password: ${os_password}
fs.s3a.access.key: ${s3_accesskey}
fs.s3a.secret.key: ${s3_secretkey}
fs.s3a.endpoint: ${s3_endpoint}
fs.s3a.connection.ssl.enabled: ${s3_endpoint_ssl}
fs.s3a.path.style.access: ${s3_bucket_path}
spark_wordcount_s3:
- type: Spark
input_datasource:
type: s3
source: edp-examples/edp-spark/sample_input.txt
output_datasource:
type: s3
destination: edp-output
main_lib:
type: s3
source: edp-examples/edp-spark/spark-wordcount.jar
configs:
edp.java.main_class: sahara.edp.spark.SparkWordCount
edp.spark.adapt_for_swift: true
fs.swift.service.sahara.username: ${os_username}
fs.swift.service.sahara.password: ${os_password}
fs.swift.service.sahara.username: ${os_username}
fs.swift.service.sahara.password: ${os_password}
fs.s3a.access.key: ${s3_accesskey}
fs.s3a.secret.key: ${s3_secretkey}
fs.s3a.endpoint: ${s3_endpoint}
fs.s3a.connection.ssl.enabled: ${s3_endpoint_ssl}
fs.s3a.path.style.access: ${s3_bucket_path}
args:
- '{input_datasource}'
- '{output_datasource}'

View File

@ -54,3 +54,6 @@ clusters:
size: 1
edp_jobs_flow:
- mapr
- name: mapreduce_job_s3
features:
- s3

View File

@ -66,4 +66,7 @@ clusters:
size: 1
edp_jobs_flow:
- java_job
- name: mapreduce_job_s3
features:
- s3
- spark_pi

View File

@ -66,4 +66,7 @@ clusters:
size: 1
edp_jobs_flow:
- java_job
- name: mapreduce_job_s3
features:
- s3
- spark_pi

View File

@ -66,4 +66,7 @@ clusters:
size: 1
edp_jobs_flow:
- java_job
- name: mapreduce_job_s3
features:
- s3
- spark_pi

View File

@ -89,6 +89,9 @@ clusters:
edp_jobs_flow:
- pig_job
- mapreduce_job
- name: mapreduce_job_s3
features:
- s3
- mapreduce_streaming_job
- java_job
- spark_wordcount

View File

@ -89,6 +89,9 @@ clusters:
edp_jobs_flow:
- pig_job
- mapreduce_job
- name: mapreduce_job_s3
features:
- s3
- mapreduce_streaming_job
- java_job
- spark_wordcount

View File

@ -89,6 +89,9 @@ clusters:
edp_jobs_flow:
- pig_job
- mapreduce_job
- name: mapreduce_job_s3
features:
- s3
- mapreduce_streaming_job
- java_job
- spark_wordcount

View File

@ -54,3 +54,6 @@ clusters:
size: 1
edp_jobs_flow:
- mapr
- name: mapreduce_job_s3
features:
- s3

View File

@ -125,7 +125,7 @@ def get_base_parser():
parser.add_argument('--report', default=False, action='store_true',
help='Write results of test to file')
parser.add_argument('--feature', '-f', default=[],
nargs='?', help='Set of features to enable')
action='append', help='Set of features to enable')
parser.add_argument('--count', default=1, nargs='?', type=valid_count,
help='Specify count of runs current cases.')
parser.add_argument('--v2', '-2', default=False, action='store_true',

View File

@ -60,6 +60,21 @@ SCHEMA = {
"type": "string",
"minLength": 1
},
"s3_accesskey": {
"type": "string",
},
"s3_secretkey": {
"type": "string",
},
"s3_endpoint": {
"type": "string",
},
"s3_endpoint_ssl": {
"type": "boolean",
},
"s3_bucket_path": {
"type": "boolean",
},
},
"additionalProperties": False
},
@ -370,7 +385,8 @@ SCHEMA = {
"properties": {
"type": {
"type": "string",
"enum": ["swift", "hdfs", "maprfs"]
"enum": ["swift", "hdfs", "maprfs",
"s3"]
},
"source": {
"type": "string"
@ -384,7 +400,8 @@ SCHEMA = {
"properties": {
"type": {
"type": "string",
"enum": ["swift", "hdfs", "maprfs"]
"enum": ["swift", "hdfs", "maprfs",
"s3"]
},
"destination": {
"type": "string"
@ -398,7 +415,7 @@ SCHEMA = {
"properties": {
"type": {
"type": "string",
"enum": ["swift", "database"]
"enum": ["swift", "s3", "database"]
},
"source": {
"type": "string"
@ -414,7 +431,7 @@ SCHEMA = {
"properties": {
"type": {
"type": "string",
"enum": ["swift", "database"]
"enum": ["swift", "s3", "database"]
},
"source": {
"type": "string"

View File

@ -88,6 +88,9 @@ class TestBase(testtools.TestCase):
'os_tenant': 'admin',
'os_auth_url':
'http://localhost:5000/v2.0',
's3_accesskey': 'very_long_key',
's3_secretkey': 'very_long_secret',
's3_endpoint': 'https://localhost',
'sahara_service_type':
'data-processing-local',
'sahara_url':
@ -143,7 +146,7 @@ class TestBase(testtools.TestCase):
"destination": "/user/hadoop/edp-output"
},
"main_lib": {
"type": "swift",
"type": "s3",
"source": "sahara_tests/scenario/defaults/"
"edp-examples/edp-pig/"
"top-todoers/example.pig"
@ -350,12 +353,18 @@ class TestBase(testtools.TestCase):
@mock.patch('saharaclient.api.base.ResourceManager._create',
return_value=FakeResponse(set_id='id_for_job_binaries'))
@mock.patch('sahara_tests.scenario.clients.BotoClient.upload_data',
return_value={})
@mock.patch('sahara_tests.scenario.clients.BotoClient.create_bucket',
return_value={'Location': 'foo'})
@mock.patch('swiftclient.client.Connection.put_object',
return_value=None)
@mock.patch('swiftclient.client.Connection.put_container',
return_value=None)
def test__create_create_job_binaries(self, mock_swiftcontainer,
mock_swiftobject,
mock_create_bucket,
mock_upload_bucket_data,
mock_sahara_create):
self.base_scenario._init_clients()
self.assertEqual((['id_for_job_binaries'], []),
@ -364,6 +373,8 @@ class TestBase(testtools.TestCase):
@mock.patch('saharaclient.api.base.ResourceManager._create',
return_value=FakeResponse(set_id='id_for_job_binary'))
@mock.patch('sahara_tests.scenario.clients.BotoClient.create_bucket',
return_value={'Location': 'foo'})
@mock.patch('swiftclient.client.Connection.put_object',
return_value=None)
@mock.patch('swiftclient.client.Connection.put_container',
@ -371,7 +382,7 @@ class TestBase(testtools.TestCase):
@mock.patch('saharaclient.client.Client', return_value=FakeSaharaClient())
def test__create_create_job_binary(self, mock_saharaclient,
mock_swiftcontainer, mock_swiftobject,
mock_sahara_create):
mock_create_bucket, mock_sahara_create):
self.base_scenario._init_clients()
self.assertEqual('id_for_job_binary',
self.base_scenario._create_job_binary(self.job.get(
@ -431,7 +442,7 @@ class TestBase(testtools.TestCase):
{
"type": "Pig",
"input_datasource": {
"type": "swift",
"type": "s3",
"source": "sahara_tests/scenario/defaults/edp-examples/"
"edp-pig/top-todoers/"
"data/input"

View File

View File

@ -0,0 +1,38 @@
# Copyright 2018 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import testtools
from sahara_tests.utils import url as utils_url
class UrlUnitTest(testtools.TestCase):
def _check_clean_url(self, url, expected_clean_url):
"""Check if the cleaned URL matches the expected one."""
clean_url = utils_url.url_schema_remover(url)
self.assertEqual(clean_url, expected_clean_url)
def test_clean_url_http(self):
self._check_clean_url('https://s3.amazonaws.com',
's3.amazonaws.com')
def test_clean_url_https_longer(self):
self._check_clean_url('https://s3.amazonaws.com/foo',
's3.amazonaws.com/foo')
def test_clean_url_file(self):
self._check_clean_url('file:///s3.amazonaws.com/bar',
'/s3.amazonaws.com/bar')

27
sahara_tests/utils/url.py Normal file
View File

@ -0,0 +1,27 @@
# Copyright 2018 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from six.moves.urllib import parse as urlparse
def url_schema_remover(url):
""" Return the same URL without the schema.
Example: prefix://host/path -> host/path
"""
parsed = urlparse.urlsplit(url)
cleaned = urlparse.urlunsplit((('',) + parsed[1:]))
if cleaned.startswith('//'):
cleaned = cleaned[2:]
return cleaned