Scenario test runner: support for S3 testing
- use botocore to support operations on S3 entities (objects and buckets). - enable the S3 API in the RadosGW job and prepare the job code to setup the settings required for S3 testing (even if it may not be possible to use them on -infra right now). - add the default templates for S3 settings, so that they can be used when the "s3" feature is requested. - add an S3 job to the templates for all plugins and versions that support S3 in rocky and master. Tag them with the "s3" feature, so they are not executed by default. Story: 2004701 Task: 28725 Change-Id: Ie3da4d5ad604115e90b41fab9856107684b3a9d0
This commit is contained in:
parent
7e39b7b05d
commit
5e33889cee
|
@ -1,3 +1,3 @@
|
|||
[DEFAULT]
|
||||
test_path=sahara_tests/unit/scenario
|
||||
test_path=sahara_tests/unit
|
||||
group_regex=([^\.]+\.)+
|
||||
|
|
|
@ -238,6 +238,7 @@
|
|||
s-container: false
|
||||
s-object: false
|
||||
s-proxy: false
|
||||
sahara_enable_s3: True
|
||||
|
||||
- job:
|
||||
name: sahara-tests-scenario-multinode-spark
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
credentials:
|
||||
s3_accesskey: ${s3_accesskey}
|
||||
s3_secretkey: ${s3_secretkey}
|
||||
s3_endpoint: ${s3_endpoint}
|
||||
s3_endpoint_ssl: ${s3_endpoint_ssl}
|
||||
s3_bucket_path: ${s3_bucket_path}
|
|
@ -0,0 +1,25 @@
|
|||
edp_jobs_flow:
|
||||
spark_wordcount_s3:
|
||||
- type: Spark
|
||||
input_datasource:
|
||||
type: s3
|
||||
source: edp-examples/edp-spark/sample_input.txt
|
||||
output_datasource:
|
||||
type: swift
|
||||
destination: edp-output
|
||||
main_lib:
|
||||
type: swift
|
||||
source: edp-examples/edp-spark/spark-wordcount.jar
|
||||
configs:
|
||||
edp.java.main_class: sahara.edp.spark.SparkWordCount
|
||||
edp.spark.adapt_for_swift: true
|
||||
fs.swift.service.sahara.username: ${os_username}
|
||||
fs.swift.service.sahara.password: ${os_password}
|
||||
fs.s3a.access.key: ${s3_accesskey}
|
||||
fs.s3a.secret.key: ${s3_secretkey}
|
||||
fs.s3a.endpoint: ${s3_endpoint}
|
||||
fs.s3a.connection.ssl.enabled: ${s3_endpoint_ssl}
|
||||
fs.s3a.path.style.access: ${s3_bucket_path}
|
||||
args:
|
||||
- '{input_datasource}'
|
||||
- '{output_datasource}'
|
|
@ -32,6 +32,7 @@ clusters:
|
|||
node_group: worker
|
||||
size: 1
|
||||
scenario:
|
||||
- run_jobs
|
||||
- scale
|
||||
edp_jobs_flow:
|
||||
- spark_pi
|
||||
|
|
|
@ -23,7 +23,10 @@ clusters:
|
|||
- operation: add
|
||||
node_group: cdh-worker
|
||||
size: 1
|
||||
edp_jobs_flow: test_flow
|
||||
edp_jobs_flow:
|
||||
- test_flow
|
||||
- name: test_manila
|
||||
features: manila
|
||||
|
||||
edp_jobs_flow:
|
||||
test_flow:
|
||||
|
@ -71,9 +74,8 @@ edp_jobs_flow:
|
|||
configs:
|
||||
edp.streaming.mapper: /bin/cat
|
||||
edp.streaming.reducer: /usr/bin/wc
|
||||
test_manila:
|
||||
- type: Pig
|
||||
features:
|
||||
- manila
|
||||
input_datasource:
|
||||
type: manila
|
||||
source: etc/edp-examples/edp-pig/top-todoers/data/input
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
sahara-scenario now supports testing the S3 API for job binaries
|
||||
and data sources, a feature introduced in Rocky.
|
||||
The code can be enabled using the "s3" feature and
|
||||
various templates now runs an S3-based job too when
|
||||
the feature is enabled from the command line.
|
|
@ -5,6 +5,7 @@
|
|||
pbr>=1.6 # Apache-2.0
|
||||
|
||||
Mako>=0.4.0 # MIT
|
||||
botocore>=1.5.1 # Apache-2.0
|
||||
fixtures>=3.0.0 # Apache-2.0/BSD
|
||||
jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT
|
||||
keystoneauth1>=2.1.0 # Apache-2.0
|
||||
|
|
|
@ -4,3 +4,4 @@ sahara_cloud_demo: 'devstack-admin'
|
|||
sahara_scenario_conf: '{{ ansible_user_dir }}/template_vars.ini'
|
||||
sahara_scenario_test_template: 'fake.yaml.mako'
|
||||
sahara_scenario_tox_env: 'venv'
|
||||
sahara_enable_s3: False
|
||||
|
|
|
@ -4,11 +4,18 @@
|
|||
tox -e {{ sahara_scenario_tox_env }} --sitepackages -- sahara-scenario --verbose -V {{ sahara_scenario_conf }} \
|
||||
etc/scenario/gate/credentials.yaml.mako \
|
||||
etc/scenario/gate/edp.yaml.mako \
|
||||
{% if sahara_enable_s3 -%}
|
||||
etc/scenario/gate/credentials_s3.yaml.mako \
|
||||
etc/scenario/gate/edp_s3.yaml.mako \
|
||||
{% endif -%}
|
||||
etc/scenario/gate/{{ sahara_scenario_test_template }} \
|
||||
--os-cloud {{ sahara_cloud_demo }} \
|
||||
{% if sahara_scenario_use_api_v2|default(False) -%}
|
||||
--v2 \
|
||||
{% endif %}
|
||||
{% endif -%}
|
||||
{% if sahara_enable_s3 -%}
|
||||
--feature s3 \
|
||||
{% endif -%}
|
||||
| tee scenario.log
|
||||
if grep -qE '(FAILED|ERROR:)' scenario.log; then
|
||||
exit 1
|
||||
|
|
|
@ -7,6 +7,7 @@ sahara_image_name: 'xenial-server'
|
|||
sahara_image_user: 'ubuntu'
|
||||
sahara_image_format: 'qcow2'
|
||||
sahara_scenario_conf: '{{ ansible_user_dir }}/template_vars.ini'
|
||||
sahara_enable_s3: False
|
||||
sahara_network_type: 'neutron'
|
||||
private_network_name: 'private'
|
||||
public_network_name: 'public'
|
||||
|
|
|
@ -28,6 +28,10 @@
|
|||
openstack --os-cloud {{ sahara_cloud_demo }} dataprocessing image tags add {{ sahara_image_name }} --tags \
|
||||
{{ sahara_plugin_version }} {{ sahara_plugin }}
|
||||
|
||||
- name: S3 configuration
|
||||
import_tasks: setup_s3.yaml
|
||||
when: sahara_enable_s3
|
||||
|
||||
# we cannot use os_nova_flavor as well (see above)
|
||||
- name: create the required flavor(s)
|
||||
command: |
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
- name: create the S3 credentials
|
||||
shell: |
|
||||
ACCESS_KEY=$(openstack --os-cloud {{ sahara_cloud_demo }} ec2 credentials list -f value -c Access | head -n1)
|
||||
if [ -z "${ACCESS_KEY}" ]; then
|
||||
ACCESS_KEY=$(openstack --os-cloud {{ sahara_cloud_demo }} ec2 credentials create -f value -c access)
|
||||
fi
|
||||
SECRET_KEY=$(openstack --os-cloud {{ sahara_cloud_demo }} ec2 credentials list -f value -c Secret | head -n1)
|
||||
printf "${ACCESS_KEY}\n${SECRET_KEY}"
|
||||
register: sahara_s3_credentials_out
|
||||
|
||||
# This task should not be needed normally and the endpoint should be discovered by default
|
||||
- name: find the swift endpoint for S3
|
||||
shell: |
|
||||
ENDPOINT=$(openstack --os-cloud {{ sahara_cloud_admin }} endpoint list --service swift --interface public -c URL -f value)
|
||||
ENDPOINT_PREFIX=$(awk -F'//' '{print $1}')
|
||||
ENDPOINT_SSL="False"
|
||||
if [ "${ENDPOINT_PREFIX}" = "https" ]; then
|
||||
ENDPOINT_SSL="True"
|
||||
fi
|
||||
printf "${ENDPOINT}\n${ENDPOINT_SSL}"
|
||||
register: sahara_s3_endpoint_out
|
||||
|
||||
- name: save the S3 access data
|
||||
set_fact:
|
||||
sahara_s3_accesskey: "{{ sahara_s3_credentials_out.stdout_lines[0] }}"
|
||||
sahara_s3_secretkey: "{{ sahara_s3_credentials_out.stdout_lines[1] }}"
|
||||
sahara_s3_endpoint: "{{ sahara_s3_endpoint_out.stdout_lines[0] }}"
|
||||
sahara_s3_endpoint_ssl: "{{ sahara_s3_endpoint_out.stdout_lines[1] }}"
|
||||
sahara_s3_bucket_path: True
|
|
@ -7,3 +7,10 @@ ci_flavor_id: {{ sahara_flavor_small }}
|
|||
cluster_name: testc
|
||||
is_transient: {{ sahara_cluster_transient }}
|
||||
auto_security_group: {{ sahara_auto_security_group }}
|
||||
{% if sahara_enable_s3 -%}
|
||||
s3_accesskey: {{ sahara_s3_accesskey }}
|
||||
s3_secretkey: {{ sahara_s3_secretkey }}
|
||||
s3_endpoint: {{ sahara_s3_endpoint }}
|
||||
s3_endpoint_ssl: {{ sahara_s3_endpoint_ssl }}
|
||||
s3_bucket_path: {{ sahara_s3_bucket_path }}
|
||||
{% endif -%}
|
||||
|
|
|
@ -33,6 +33,7 @@ from sahara_tests.scenario import clients
|
|||
from sahara_tests.scenario import timeouts
|
||||
from sahara_tests.scenario import utils
|
||||
from sahara_tests.utils import crypto as ssh
|
||||
from sahara_tests.utils import url as utils_url
|
||||
|
||||
logger = logging.getLogger('swiftclient')
|
||||
logger.setLevel(logging.CRITICAL)
|
||||
|
@ -150,6 +151,13 @@ class BaseTestCase(base.BaseTestCase):
|
|||
cacert=self.credentials.get('ssl_cert'),
|
||||
tenant_name=tenant_name)
|
||||
self.glance = clients.GlanceClient(session=session)
|
||||
# boto is not an OpenStack client, but we can handle it as well
|
||||
self.boto = None
|
||||
if self.credentials.get("s3_endpoint", None):
|
||||
self.boto = clients.BotoClient(
|
||||
endpoint=self.credentials["s3_endpoint"],
|
||||
accesskey=self.credentials["s3_accesskey"],
|
||||
secretkey=self.credentials["s3_secretkey"])
|
||||
|
||||
def create_cluster(self):
|
||||
self.cluster_id = self.sahara.get_cluster_id(
|
||||
|
@ -239,17 +247,34 @@ class BaseTestCase(base.BaseTestCase):
|
|||
|
||||
def _create_datasources(self, job):
|
||||
def create(ds, name):
|
||||
credential_vars = {}
|
||||
source = ds.get('source', None)
|
||||
destination = None if source else utils.rand_name(
|
||||
ds['destination'])
|
||||
if ds['type'] == 'swift':
|
||||
url = self._create_swift_data(source, destination)
|
||||
if ds['type'] == 'hdfs':
|
||||
credential_vars = {
|
||||
'credential_user': self.credentials['os_username'],
|
||||
'credential_pass': self.credentials['os_password']
|
||||
}
|
||||
elif ds['type'] == 's3':
|
||||
url = self._create_s3_data(source, destination)
|
||||
credential_vars = {
|
||||
's3_credentials': {
|
||||
'accesskey': self.credentials['s3_accesskey'],
|
||||
'secretkey': self.credentials['s3_secretkey'],
|
||||
'endpoint': utils_url.url_schema_remover(
|
||||
self.credentials['s3_endpoint']),
|
||||
'ssl': self.credentials['s3_endpoint_ssl'],
|
||||
'bucket_in_path': self.credentials['s3_bucket_path']
|
||||
}
|
||||
}
|
||||
elif ds['type'] == 'hdfs':
|
||||
url = self._create_dfs_data(source, destination,
|
||||
self.testcase.get('hdfs_username',
|
||||
'hadoop'),
|
||||
ds['type'])
|
||||
if ds['type'] == 'maprfs':
|
||||
elif ds['type'] == 'maprfs':
|
||||
url = self._create_dfs_data(source, destination,
|
||||
ds.get('maprfs_username', 'mapr'),
|
||||
ds['type'])
|
||||
|
@ -257,8 +282,7 @@ class BaseTestCase(base.BaseTestCase):
|
|||
name=utils.rand_name(name),
|
||||
description='',
|
||||
data_source_type=ds['type'], url=url,
|
||||
credential_user=self.credentials['os_username'],
|
||||
credential_pass=self.credentials['os_password'])
|
||||
**credential_vars)
|
||||
|
||||
input_id, output_id = None, None
|
||||
if job.get('input_datasource'):
|
||||
|
@ -289,7 +313,12 @@ class BaseTestCase(base.BaseTestCase):
|
|||
url = self._create_swift_data(job_binary['source'])
|
||||
extra['user'] = self.credentials['os_username']
|
||||
extra['password'] = self.credentials['os_password']
|
||||
if job_binary['type'] == 'database':
|
||||
elif job_binary['type'] == 's3':
|
||||
url = self._create_s3_data(job_binary['source'])
|
||||
extra['accesskey'] = self.credentials['s3_accesskey']
|
||||
extra['secretkey'] = self.credentials['s3_secretkey']
|
||||
extra['endpoint'] = self.credentials['s3_endpoint']
|
||||
elif job_binary['type'] == 'database':
|
||||
url = self._create_internal_db_data(job_binary['source'])
|
||||
|
||||
job_binary_name = '%s-%s' % (
|
||||
|
@ -383,6 +412,15 @@ class BaseTestCase(base.BaseTestCase):
|
|||
|
||||
return 'swift://%s.sahara/%s' % (container, path)
|
||||
|
||||
def _create_s3_data(self, source=None, destination=None):
|
||||
bucket = self._get_s3_bucket()
|
||||
path = utils.rand_name(destination if destination else 'test')
|
||||
data = self._read_source_file(source)
|
||||
|
||||
self.__upload_to_bucket(bucket, path, data)
|
||||
|
||||
return 's3://%s/%s' % (bucket, path)
|
||||
|
||||
def _create_dfs_data(self, source, destination, hdfs_username, fs):
|
||||
|
||||
def to_hex_present(string):
|
||||
|
@ -430,6 +468,12 @@ class BaseTestCase(base.BaseTestCase):
|
|||
utils.rand_name('sahara-tests'))
|
||||
return self.__swift_container
|
||||
|
||||
def _get_s3_bucket(self):
|
||||
if not getattr(self, '__s3_bucket', None):
|
||||
self.__s3_bucket = self.__create_bucket(
|
||||
utils.rand_name('sahara-tests'))
|
||||
return self.__s3_bucket
|
||||
|
||||
@track_result("Cluster scaling", False)
|
||||
def check_scale(self):
|
||||
scale_ops = []
|
||||
|
@ -792,6 +836,19 @@ class BaseTestCase(base.BaseTestCase):
|
|||
self.addCleanup(self.swift.delete_object, container_name,
|
||||
object_name)
|
||||
|
||||
def __create_bucket(self, bucket_name):
|
||||
self.boto.create_bucket(bucket_name)
|
||||
if not self.testcase['retain_resources']:
|
||||
self.addCleanup(self.boto.delete_bucket, bucket_name)
|
||||
return bucket_name
|
||||
|
||||
def __upload_to_bucket(self, bucket_name, object_name, data=None):
|
||||
if data:
|
||||
self.boto.upload_data(bucket_name, object_name, data)
|
||||
if not self.testcase['retain_resources']:
|
||||
self.addCleanup(self.boto.delete_object, bucket_name,
|
||||
object_name)
|
||||
|
||||
def __create_keypair(self):
|
||||
key = utils.rand_name('scenario_key')
|
||||
self.nova.nova_client.keypairs.create(key,
|
||||
|
|
|
@ -17,6 +17,8 @@ from __future__ import print_function
|
|||
import time
|
||||
|
||||
import fixtures
|
||||
from botocore.exceptions import ClientError as BotoClientError
|
||||
from botocore import session as botocore_session
|
||||
from glanceclient import client as glance_client
|
||||
from keystoneauth1.identity import v3 as identity_v3
|
||||
from keystoneauth1 import session
|
||||
|
@ -363,6 +365,76 @@ class SwiftClient(Client):
|
|||
return False
|
||||
|
||||
|
||||
class BotoClient(Client):
|
||||
def __init__(self, *args, **kwargs):
|
||||
sess = botocore_session.get_session()
|
||||
self.boto_client = sess.create_client(
|
||||
's3',
|
||||
endpoint_url=kwargs['endpoint'],
|
||||
aws_access_key_id=kwargs['accesskey'],
|
||||
aws_secret_access_key=kwargs['secretkey']
|
||||
)
|
||||
|
||||
def create_bucket(self, bucket_name):
|
||||
return self.boto_client.create_bucket(Bucket=bucket_name)
|
||||
|
||||
def _delete_and_check_bucket(self, bucket_name):
|
||||
bucket_deleted = False
|
||||
operation_parameters = {'Bucket': bucket_name}
|
||||
try:
|
||||
# While list_objects_v2 is the suggested function, pagination
|
||||
# does not seems to work properly with RadosGW when it's used.
|
||||
paginator = self.boto_client.get_paginator('list_objects')
|
||||
page_iterator = paginator.paginate(**operation_parameters)
|
||||
for page in page_iterator:
|
||||
if 'Contents' not in page:
|
||||
continue
|
||||
for item in page['Contents']:
|
||||
self.boto_client.delete_object(Bucket=bucket_name,
|
||||
Key=item['Key'])
|
||||
self.boto_client.delete_bucket(Bucket=bucket_name)
|
||||
except BotoClientError as ex:
|
||||
error = ex.response.get('Error', {})
|
||||
# without the conversion the value is a tuple
|
||||
error_code = '%s' % (error.get('Code', ''))
|
||||
if error_code == 'NoSuchBucket':
|
||||
bucket_deleted = True
|
||||
return bucket_deleted
|
||||
|
||||
def delete_bucket(self, bucket_name):
|
||||
return self.delete_resource(
|
||||
self._delete_and_check_bucket, bucket_name)
|
||||
|
||||
def upload_data(self, bucket_name, object_name, data):
|
||||
return self.boto_client.put_object(
|
||||
Bucket=bucket_name,
|
||||
Key=object_name,
|
||||
Body=data)
|
||||
|
||||
def _delete_and_check_object(self, bucket_name, object_name):
|
||||
self.boto_client.delete_object(Bucket=bucket_name, Key=object_name)
|
||||
object_deleted = False
|
||||
try:
|
||||
self.boto_client.head_object(Bucket=bucket_name, Key=object_name)
|
||||
except BotoClientError as ex:
|
||||
error = ex.response.get('Error', {})
|
||||
# without the conversion the value is a tuple
|
||||
error_code = '%s' % (error.get('Code', ''))
|
||||
if error_code == '404':
|
||||
object_deleted = True
|
||||
return object_deleted
|
||||
|
||||
def delete_object(self, bucket_name, object_name):
|
||||
return self.delete_resource(
|
||||
self._delete_and_check_object,
|
||||
bucket_name, object_name)
|
||||
|
||||
def is_resource_deleted(self, method, *args, **kwargs):
|
||||
# Exceptions are handled directly inside the call to "method",
|
||||
# because they are not the same for objects and buckets.
|
||||
return method(*args, **kwargs)
|
||||
|
||||
|
||||
class GlanceClient(Client):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.glance_client = glance_client.Client('2', *args, **kwargs)
|
||||
|
|
|
@ -66,4 +66,7 @@ clusters:
|
|||
size: 1
|
||||
edp_jobs_flow:
|
||||
- java_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- spark_pi
|
||||
|
|
|
@ -66,4 +66,7 @@ clusters:
|
|||
size: 1
|
||||
edp_jobs_flow:
|
||||
- java_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- spark_pi
|
||||
|
|
|
@ -66,4 +66,7 @@ clusters:
|
|||
size: 1
|
||||
edp_jobs_flow:
|
||||
- java_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- spark_pi
|
||||
|
|
|
@ -89,6 +89,9 @@ clusters:
|
|||
edp_jobs_flow:
|
||||
- pig_job
|
||||
- mapreduce_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- mapreduce_streaming_job
|
||||
- java_job
|
||||
- spark_wordcount
|
||||
|
|
|
@ -89,6 +89,9 @@ clusters:
|
|||
edp_jobs_flow:
|
||||
- pig_job
|
||||
- mapreduce_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- mapreduce_streaming_job
|
||||
- java_job
|
||||
- spark_wordcount
|
||||
|
|
|
@ -89,6 +89,9 @@ clusters:
|
|||
edp_jobs_flow:
|
||||
- pig_job
|
||||
- mapreduce_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- mapreduce_streaming_job
|
||||
- java_job
|
||||
- spark_wordcount
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
credentials:
|
||||
s3_accesskey: ${s3_accesskey}
|
||||
s3_secretkey: ${s3_secretkey}
|
||||
s3_endpoint: ${s3_endpoint}
|
||||
s3_endpoint_ssl: ${s3_endpoint_ssl}
|
||||
s3_bucket_path: ${s3_bucket_path}
|
|
@ -0,0 +1,49 @@
|
|||
edp_jobs_flow:
|
||||
mapreduce_job_s3:
|
||||
- type: MapReduce
|
||||
input_datasource:
|
||||
type: s3
|
||||
source: edp-examples/edp-pig/trim-spaces/data/input
|
||||
output_datasource:
|
||||
type: s3
|
||||
destination: /user/hadoop/edp-output
|
||||
additional_libs:
|
||||
- type: s3
|
||||
source: edp-examples/edp-mapreduce/edp-mapreduce.jar
|
||||
configs:
|
||||
mapred.map.class: org.apache.oozie.example.SampleMapper
|
||||
mapred.reduce.class: org.apache.oozie.example.SampleReducer
|
||||
mapreduce.framework.name: yarn
|
||||
fs.swift.service.sahara.username: ${os_username}
|
||||
fs.swift.service.sahara.password: ${os_password}
|
||||
fs.s3a.access.key: ${s3_accesskey}
|
||||
fs.s3a.secret.key: ${s3_secretkey}
|
||||
fs.s3a.endpoint: ${s3_endpoint}
|
||||
fs.s3a.connection.ssl.enabled: ${s3_endpoint_ssl}
|
||||
fs.s3a.path.style.access: ${s3_bucket_path}
|
||||
spark_wordcount_s3:
|
||||
- type: Spark
|
||||
input_datasource:
|
||||
type: s3
|
||||
source: edp-examples/edp-spark/sample_input.txt
|
||||
output_datasource:
|
||||
type: s3
|
||||
destination: edp-output
|
||||
main_lib:
|
||||
type: s3
|
||||
source: edp-examples/edp-spark/spark-wordcount.jar
|
||||
configs:
|
||||
edp.java.main_class: sahara.edp.spark.SparkWordCount
|
||||
edp.spark.adapt_for_swift: true
|
||||
fs.swift.service.sahara.username: ${os_username}
|
||||
fs.swift.service.sahara.password: ${os_password}
|
||||
fs.swift.service.sahara.username: ${os_username}
|
||||
fs.swift.service.sahara.password: ${os_password}
|
||||
fs.s3a.access.key: ${s3_accesskey}
|
||||
fs.s3a.secret.key: ${s3_secretkey}
|
||||
fs.s3a.endpoint: ${s3_endpoint}
|
||||
fs.s3a.connection.ssl.enabled: ${s3_endpoint_ssl}
|
||||
fs.s3a.path.style.access: ${s3_bucket_path}
|
||||
args:
|
||||
- '{input_datasource}'
|
||||
- '{output_datasource}'
|
|
@ -54,3 +54,6 @@ clusters:
|
|||
size: 1
|
||||
edp_jobs_flow:
|
||||
- mapr
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
|
|
|
@ -66,4 +66,7 @@ clusters:
|
|||
size: 1
|
||||
edp_jobs_flow:
|
||||
- java_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- spark_pi
|
||||
|
|
|
@ -66,4 +66,7 @@ clusters:
|
|||
size: 1
|
||||
edp_jobs_flow:
|
||||
- java_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- spark_pi
|
||||
|
|
|
@ -66,4 +66,7 @@ clusters:
|
|||
size: 1
|
||||
edp_jobs_flow:
|
||||
- java_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- spark_pi
|
||||
|
|
|
@ -89,6 +89,9 @@ clusters:
|
|||
edp_jobs_flow:
|
||||
- pig_job
|
||||
- mapreduce_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- mapreduce_streaming_job
|
||||
- java_job
|
||||
- spark_wordcount
|
||||
|
|
|
@ -89,6 +89,9 @@ clusters:
|
|||
edp_jobs_flow:
|
||||
- pig_job
|
||||
- mapreduce_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- mapreduce_streaming_job
|
||||
- java_job
|
||||
- spark_wordcount
|
||||
|
|
|
@ -89,6 +89,9 @@ clusters:
|
|||
edp_jobs_flow:
|
||||
- pig_job
|
||||
- mapreduce_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- mapreduce_streaming_job
|
||||
- java_job
|
||||
- spark_wordcount
|
||||
|
|
|
@ -54,3 +54,6 @@ clusters:
|
|||
size: 1
|
||||
edp_jobs_flow:
|
||||
- mapr
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
|
|
|
@ -125,7 +125,7 @@ def get_base_parser():
|
|||
parser.add_argument('--report', default=False, action='store_true',
|
||||
help='Write results of test to file')
|
||||
parser.add_argument('--feature', '-f', default=[],
|
||||
nargs='?', help='Set of features to enable')
|
||||
action='append', help='Set of features to enable')
|
||||
parser.add_argument('--count', default=1, nargs='?', type=valid_count,
|
||||
help='Specify count of runs current cases.')
|
||||
parser.add_argument('--v2', '-2', default=False, action='store_true',
|
||||
|
|
|
@ -60,6 +60,21 @@ SCHEMA = {
|
|||
"type": "string",
|
||||
"minLength": 1
|
||||
},
|
||||
"s3_accesskey": {
|
||||
"type": "string",
|
||||
},
|
||||
"s3_secretkey": {
|
||||
"type": "string",
|
||||
},
|
||||
"s3_endpoint": {
|
||||
"type": "string",
|
||||
},
|
||||
"s3_endpoint_ssl": {
|
||||
"type": "boolean",
|
||||
},
|
||||
"s3_bucket_path": {
|
||||
"type": "boolean",
|
||||
},
|
||||
},
|
||||
"additionalProperties": False
|
||||
},
|
||||
|
@ -370,7 +385,8 @@ SCHEMA = {
|
|||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"enum": ["swift", "hdfs", "maprfs"]
|
||||
"enum": ["swift", "hdfs", "maprfs",
|
||||
"s3"]
|
||||
},
|
||||
"source": {
|
||||
"type": "string"
|
||||
|
@ -384,7 +400,8 @@ SCHEMA = {
|
|||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"enum": ["swift", "hdfs", "maprfs"]
|
||||
"enum": ["swift", "hdfs", "maprfs",
|
||||
"s3"]
|
||||
},
|
||||
"destination": {
|
||||
"type": "string"
|
||||
|
@ -398,7 +415,7 @@ SCHEMA = {
|
|||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"enum": ["swift", "database"]
|
||||
"enum": ["swift", "s3", "database"]
|
||||
},
|
||||
"source": {
|
||||
"type": "string"
|
||||
|
@ -414,7 +431,7 @@ SCHEMA = {
|
|||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"enum": ["swift", "database"]
|
||||
"enum": ["swift", "s3", "database"]
|
||||
},
|
||||
"source": {
|
||||
"type": "string"
|
||||
|
|
|
@ -88,6 +88,9 @@ class TestBase(testtools.TestCase):
|
|||
'os_tenant': 'admin',
|
||||
'os_auth_url':
|
||||
'http://localhost:5000/v2.0',
|
||||
's3_accesskey': 'very_long_key',
|
||||
's3_secretkey': 'very_long_secret',
|
||||
's3_endpoint': 'https://localhost',
|
||||
'sahara_service_type':
|
||||
'data-processing-local',
|
||||
'sahara_url':
|
||||
|
@ -143,7 +146,7 @@ class TestBase(testtools.TestCase):
|
|||
"destination": "/user/hadoop/edp-output"
|
||||
},
|
||||
"main_lib": {
|
||||
"type": "swift",
|
||||
"type": "s3",
|
||||
"source": "sahara_tests/scenario/defaults/"
|
||||
"edp-examples/edp-pig/"
|
||||
"top-todoers/example.pig"
|
||||
|
@ -350,12 +353,18 @@ class TestBase(testtools.TestCase):
|
|||
|
||||
@mock.patch('saharaclient.api.base.ResourceManager._create',
|
||||
return_value=FakeResponse(set_id='id_for_job_binaries'))
|
||||
@mock.patch('sahara_tests.scenario.clients.BotoClient.upload_data',
|
||||
return_value={})
|
||||
@mock.patch('sahara_tests.scenario.clients.BotoClient.create_bucket',
|
||||
return_value={'Location': 'foo'})
|
||||
@mock.patch('swiftclient.client.Connection.put_object',
|
||||
return_value=None)
|
||||
@mock.patch('swiftclient.client.Connection.put_container',
|
||||
return_value=None)
|
||||
def test__create_create_job_binaries(self, mock_swiftcontainer,
|
||||
mock_swiftobject,
|
||||
mock_create_bucket,
|
||||
mock_upload_bucket_data,
|
||||
mock_sahara_create):
|
||||
self.base_scenario._init_clients()
|
||||
self.assertEqual((['id_for_job_binaries'], []),
|
||||
|
@ -364,6 +373,8 @@ class TestBase(testtools.TestCase):
|
|||
|
||||
@mock.patch('saharaclient.api.base.ResourceManager._create',
|
||||
return_value=FakeResponse(set_id='id_for_job_binary'))
|
||||
@mock.patch('sahara_tests.scenario.clients.BotoClient.create_bucket',
|
||||
return_value={'Location': 'foo'})
|
||||
@mock.patch('swiftclient.client.Connection.put_object',
|
||||
return_value=None)
|
||||
@mock.patch('swiftclient.client.Connection.put_container',
|
||||
|
@ -371,7 +382,7 @@ class TestBase(testtools.TestCase):
|
|||
@mock.patch('saharaclient.client.Client', return_value=FakeSaharaClient())
|
||||
def test__create_create_job_binary(self, mock_saharaclient,
|
||||
mock_swiftcontainer, mock_swiftobject,
|
||||
mock_sahara_create):
|
||||
mock_create_bucket, mock_sahara_create):
|
||||
self.base_scenario._init_clients()
|
||||
self.assertEqual('id_for_job_binary',
|
||||
self.base_scenario._create_job_binary(self.job.get(
|
||||
|
@ -431,7 +442,7 @@ class TestBase(testtools.TestCase):
|
|||
{
|
||||
"type": "Pig",
|
||||
"input_datasource": {
|
||||
"type": "swift",
|
||||
"type": "s3",
|
||||
"source": "sahara_tests/scenario/defaults/edp-examples/"
|
||||
"edp-pig/top-todoers/"
|
||||
"data/input"
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
# Copyright 2018 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import testtools
|
||||
|
||||
from sahara_tests.utils import url as utils_url
|
||||
|
||||
|
||||
class UrlUnitTest(testtools.TestCase):
|
||||
|
||||
def _check_clean_url(self, url, expected_clean_url):
|
||||
"""Check if the cleaned URL matches the expected one."""
|
||||
clean_url = utils_url.url_schema_remover(url)
|
||||
self.assertEqual(clean_url, expected_clean_url)
|
||||
|
||||
def test_clean_url_http(self):
|
||||
self._check_clean_url('https://s3.amazonaws.com',
|
||||
's3.amazonaws.com')
|
||||
|
||||
def test_clean_url_https_longer(self):
|
||||
self._check_clean_url('https://s3.amazonaws.com/foo',
|
||||
's3.amazonaws.com/foo')
|
||||
|
||||
def test_clean_url_file(self):
|
||||
self._check_clean_url('file:///s3.amazonaws.com/bar',
|
||||
'/s3.amazonaws.com/bar')
|
|
@ -0,0 +1,27 @@
|
|||
# Copyright 2018 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from six.moves.urllib import parse as urlparse
|
||||
|
||||
|
||||
def url_schema_remover(url):
|
||||
""" Return the same URL without the schema.
|
||||
Example: prefix://host/path -> host/path
|
||||
"""
|
||||
parsed = urlparse.urlsplit(url)
|
||||
cleaned = urlparse.urlunsplit((('',) + parsed[1:]))
|
||||
if cleaned.startswith('//'):
|
||||
cleaned = cleaned[2:]
|
||||
return cleaned
|
Loading…
Reference in New Issue