Merge "Scenario test runner: support for S3 testing"
This commit is contained in:
commit
101b519fd3
|
@ -1,3 +1,3 @@
|
|||
[DEFAULT]
|
||||
test_path=sahara_tests/unit/scenario
|
||||
test_path=sahara_tests/unit
|
||||
group_regex=([^\.]+\.)+
|
||||
|
|
|
@ -229,6 +229,7 @@
|
|||
s-container: false
|
||||
s-object: false
|
||||
s-proxy: false
|
||||
sahara_enable_s3: True
|
||||
|
||||
- job:
|
||||
name: sahara-tests-scenario-multinode-spark
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
credentials:
|
||||
s3_accesskey: ${s3_accesskey}
|
||||
s3_secretkey: ${s3_secretkey}
|
||||
s3_endpoint: ${s3_endpoint}
|
||||
s3_endpoint_ssl: ${s3_endpoint_ssl}
|
||||
s3_bucket_path: ${s3_bucket_path}
|
|
@ -0,0 +1,25 @@
|
|||
edp_jobs_flow:
|
||||
spark_wordcount_s3:
|
||||
- type: Spark
|
||||
input_datasource:
|
||||
type: s3
|
||||
source: edp-examples/edp-spark/sample_input.txt
|
||||
output_datasource:
|
||||
type: swift
|
||||
destination: edp-output
|
||||
main_lib:
|
||||
type: swift
|
||||
source: edp-examples/edp-spark/spark-wordcount.jar
|
||||
configs:
|
||||
edp.java.main_class: sahara.edp.spark.SparkWordCount
|
||||
edp.spark.adapt_for_swift: true
|
||||
fs.swift.service.sahara.username: ${os_username}
|
||||
fs.swift.service.sahara.password: ${os_password}
|
||||
fs.s3a.access.key: ${s3_accesskey}
|
||||
fs.s3a.secret.key: ${s3_secretkey}
|
||||
fs.s3a.endpoint: ${s3_endpoint}
|
||||
fs.s3a.connection.ssl.enabled: ${s3_endpoint_ssl}
|
||||
fs.s3a.path.style.access: ${s3_bucket_path}
|
||||
args:
|
||||
- '{input_datasource}'
|
||||
- '{output_datasource}'
|
|
@ -32,6 +32,7 @@ clusters:
|
|||
node_group: worker
|
||||
size: 1
|
||||
scenario:
|
||||
- run_jobs
|
||||
- scale
|
||||
edp_jobs_flow:
|
||||
- spark_pi
|
||||
|
|
|
@ -23,7 +23,10 @@ clusters:
|
|||
- operation: add
|
||||
node_group: cdh-worker
|
||||
size: 1
|
||||
edp_jobs_flow: test_flow
|
||||
edp_jobs_flow:
|
||||
- test_flow
|
||||
- name: test_manila
|
||||
features: manila
|
||||
|
||||
edp_jobs_flow:
|
||||
test_flow:
|
||||
|
@ -71,9 +74,8 @@ edp_jobs_flow:
|
|||
configs:
|
||||
edp.streaming.mapper: /bin/cat
|
||||
edp.streaming.reducer: /usr/bin/wc
|
||||
test_manila:
|
||||
- type: Pig
|
||||
features:
|
||||
- manila
|
||||
input_datasource:
|
||||
type: manila
|
||||
source: etc/edp-examples/edp-pig/top-todoers/data/input
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
---
|
||||
features:
|
||||
- |
|
||||
sahara-scenario now supports testing the S3 API for job binaries
|
||||
and data sources, a feature introduced in Rocky.
|
||||
The code can be enabled using the "s3" feature and
|
||||
various templates now runs an S3-based job too when
|
||||
the feature is enabled from the command line.
|
|
@ -5,6 +5,7 @@
|
|||
pbr>=1.6 # Apache-2.0
|
||||
|
||||
Mako>=0.4.0 # MIT
|
||||
botocore>=1.5.1 # Apache-2.0
|
||||
fixtures>=3.0.0 # Apache-2.0/BSD
|
||||
jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT
|
||||
keystoneauth1>=2.1.0 # Apache-2.0
|
||||
|
|
|
@ -4,3 +4,4 @@ sahara_cloud_demo: 'devstack-admin'
|
|||
sahara_scenario_conf: '{{ ansible_user_dir }}/template_vars.ini'
|
||||
sahara_scenario_test_template: 'fake.yaml.mako'
|
||||
sahara_scenario_tox_env: 'venv'
|
||||
sahara_enable_s3: False
|
||||
|
|
|
@ -4,11 +4,18 @@
|
|||
tox -e {{ sahara_scenario_tox_env }} --sitepackages -- sahara-scenario --verbose -V {{ sahara_scenario_conf }} \
|
||||
etc/scenario/gate/credentials.yaml.mako \
|
||||
etc/scenario/gate/edp.yaml.mako \
|
||||
{% if sahara_enable_s3 -%}
|
||||
etc/scenario/gate/credentials_s3.yaml.mako \
|
||||
etc/scenario/gate/edp_s3.yaml.mako \
|
||||
{% endif -%}
|
||||
etc/scenario/gate/{{ sahara_scenario_test_template }} \
|
||||
--os-cloud {{ sahara_cloud_demo }} \
|
||||
{% if sahara_scenario_use_api_v2|default(False) -%}
|
||||
--v2 \
|
||||
{% endif %}
|
||||
{% endif -%}
|
||||
{% if sahara_enable_s3 -%}
|
||||
--feature s3 \
|
||||
{% endif -%}
|
||||
| tee scenario.log
|
||||
if grep -qE '(FAILED|ERROR:)' scenario.log; then
|
||||
exit 1
|
||||
|
|
|
@ -7,6 +7,7 @@ sahara_image_name: 'xenial-server'
|
|||
sahara_image_user: 'ubuntu'
|
||||
sahara_image_format: 'qcow2'
|
||||
sahara_scenario_conf: '{{ ansible_user_dir }}/template_vars.ini'
|
||||
sahara_enable_s3: False
|
||||
sahara_network_type: 'neutron'
|
||||
private_network_name: 'private'
|
||||
public_network_name: 'public'
|
||||
|
|
|
@ -28,6 +28,10 @@
|
|||
openstack --os-cloud {{ sahara_cloud_demo }} dataprocessing image tags add {{ sahara_image_name }} --tags \
|
||||
{{ sahara_plugin_version }} {{ sahara_plugin }}
|
||||
|
||||
- name: S3 configuration
|
||||
import_tasks: setup_s3.yaml
|
||||
when: sahara_enable_s3
|
||||
|
||||
# we cannot use os_nova_flavor as well (see above)
|
||||
- name: create the required flavor(s)
|
||||
command: |
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
- name: create the S3 credentials
|
||||
shell: |
|
||||
ACCESS_KEY=$(openstack --os-cloud {{ sahara_cloud_demo }} ec2 credentials list -f value -c Access | head -n1)
|
||||
if [ -z "${ACCESS_KEY}" ]; then
|
||||
ACCESS_KEY=$(openstack --os-cloud {{ sahara_cloud_demo }} ec2 credentials create -f value -c access)
|
||||
fi
|
||||
SECRET_KEY=$(openstack --os-cloud {{ sahara_cloud_demo }} ec2 credentials list -f value -c Secret | head -n1)
|
||||
printf "${ACCESS_KEY}\n${SECRET_KEY}"
|
||||
register: sahara_s3_credentials_out
|
||||
|
||||
# This task should not be needed normally and the endpoint should be discovered by default
|
||||
- name: find the swift endpoint for S3
|
||||
shell: |
|
||||
ENDPOINT=$(openstack --os-cloud {{ sahara_cloud_admin }} endpoint list --service swift --interface public -c URL -f value)
|
||||
ENDPOINT_PREFIX=$(awk -F'//' '{print $1}')
|
||||
ENDPOINT_SSL="False"
|
||||
if [ "${ENDPOINT_PREFIX}" = "https" ]; then
|
||||
ENDPOINT_SSL="True"
|
||||
fi
|
||||
printf "${ENDPOINT}\n${ENDPOINT_SSL}"
|
||||
register: sahara_s3_endpoint_out
|
||||
|
||||
- name: save the S3 access data
|
||||
set_fact:
|
||||
sahara_s3_accesskey: "{{ sahara_s3_credentials_out.stdout_lines[0] }}"
|
||||
sahara_s3_secretkey: "{{ sahara_s3_credentials_out.stdout_lines[1] }}"
|
||||
sahara_s3_endpoint: "{{ sahara_s3_endpoint_out.stdout_lines[0] }}"
|
||||
sahara_s3_endpoint_ssl: "{{ sahara_s3_endpoint_out.stdout_lines[1] }}"
|
||||
sahara_s3_bucket_path: True
|
|
@ -7,3 +7,10 @@ ci_flavor_id: {{ sahara_flavor_small }}
|
|||
cluster_name: testc
|
||||
is_transient: {{ sahara_cluster_transient }}
|
||||
auto_security_group: {{ sahara_auto_security_group }}
|
||||
{% if sahara_enable_s3 -%}
|
||||
s3_accesskey: {{ sahara_s3_accesskey }}
|
||||
s3_secretkey: {{ sahara_s3_secretkey }}
|
||||
s3_endpoint: {{ sahara_s3_endpoint }}
|
||||
s3_endpoint_ssl: {{ sahara_s3_endpoint_ssl }}
|
||||
s3_bucket_path: {{ sahara_s3_bucket_path }}
|
||||
{% endif -%}
|
||||
|
|
|
@ -33,6 +33,7 @@ from sahara_tests.scenario import clients
|
|||
from sahara_tests.scenario import timeouts
|
||||
from sahara_tests.scenario import utils
|
||||
from sahara_tests.utils import crypto as ssh
|
||||
from sahara_tests.utils import url as utils_url
|
||||
|
||||
logger = logging.getLogger('swiftclient')
|
||||
logger.setLevel(logging.CRITICAL)
|
||||
|
@ -150,6 +151,13 @@ class BaseTestCase(base.BaseTestCase):
|
|||
cacert=self.credentials.get('ssl_cert'),
|
||||
tenant_name=tenant_name)
|
||||
self.glance = clients.GlanceClient(session=session)
|
||||
# boto is not an OpenStack client, but we can handle it as well
|
||||
self.boto = None
|
||||
if self.credentials.get("s3_endpoint", None):
|
||||
self.boto = clients.BotoClient(
|
||||
endpoint=self.credentials["s3_endpoint"],
|
||||
accesskey=self.credentials["s3_accesskey"],
|
||||
secretkey=self.credentials["s3_secretkey"])
|
||||
|
||||
def create_cluster(self):
|
||||
self.cluster_id = self.sahara.get_cluster_id(
|
||||
|
@ -239,17 +247,34 @@ class BaseTestCase(base.BaseTestCase):
|
|||
|
||||
def _create_datasources(self, job):
|
||||
def create(ds, name):
|
||||
credential_vars = {}
|
||||
source = ds.get('source', None)
|
||||
destination = None if source else utils.rand_name(
|
||||
ds['destination'])
|
||||
if ds['type'] == 'swift':
|
||||
url = self._create_swift_data(source, destination)
|
||||
if ds['type'] == 'hdfs':
|
||||
credential_vars = {
|
||||
'credential_user': self.credentials['os_username'],
|
||||
'credential_pass': self.credentials['os_password']
|
||||
}
|
||||
elif ds['type'] == 's3':
|
||||
url = self._create_s3_data(source, destination)
|
||||
credential_vars = {
|
||||
's3_credentials': {
|
||||
'accesskey': self.credentials['s3_accesskey'],
|
||||
'secretkey': self.credentials['s3_secretkey'],
|
||||
'endpoint': utils_url.url_schema_remover(
|
||||
self.credentials['s3_endpoint']),
|
||||
'ssl': self.credentials['s3_endpoint_ssl'],
|
||||
'bucket_in_path': self.credentials['s3_bucket_path']
|
||||
}
|
||||
}
|
||||
elif ds['type'] == 'hdfs':
|
||||
url = self._create_dfs_data(source, destination,
|
||||
self.testcase.get('hdfs_username',
|
||||
'hadoop'),
|
||||
ds['type'])
|
||||
if ds['type'] == 'maprfs':
|
||||
elif ds['type'] == 'maprfs':
|
||||
url = self._create_dfs_data(source, destination,
|
||||
ds.get('maprfs_username', 'mapr'),
|
||||
ds['type'])
|
||||
|
@ -257,8 +282,7 @@ class BaseTestCase(base.BaseTestCase):
|
|||
name=utils.rand_name(name),
|
||||
description='',
|
||||
data_source_type=ds['type'], url=url,
|
||||
credential_user=self.credentials['os_username'],
|
||||
credential_pass=self.credentials['os_password'])
|
||||
**credential_vars)
|
||||
|
||||
input_id, output_id = None, None
|
||||
if job.get('input_datasource'):
|
||||
|
@ -289,7 +313,12 @@ class BaseTestCase(base.BaseTestCase):
|
|||
url = self._create_swift_data(job_binary['source'])
|
||||
extra['user'] = self.credentials['os_username']
|
||||
extra['password'] = self.credentials['os_password']
|
||||
if job_binary['type'] == 'database':
|
||||
elif job_binary['type'] == 's3':
|
||||
url = self._create_s3_data(job_binary['source'])
|
||||
extra['accesskey'] = self.credentials['s3_accesskey']
|
||||
extra['secretkey'] = self.credentials['s3_secretkey']
|
||||
extra['endpoint'] = self.credentials['s3_endpoint']
|
||||
elif job_binary['type'] == 'database':
|
||||
url = self._create_internal_db_data(job_binary['source'])
|
||||
|
||||
job_binary_name = '%s-%s' % (
|
||||
|
@ -383,6 +412,15 @@ class BaseTestCase(base.BaseTestCase):
|
|||
|
||||
return 'swift://%s.sahara/%s' % (container, path)
|
||||
|
||||
def _create_s3_data(self, source=None, destination=None):
|
||||
bucket = self._get_s3_bucket()
|
||||
path = utils.rand_name(destination if destination else 'test')
|
||||
data = self._read_source_file(source)
|
||||
|
||||
self.__upload_to_bucket(bucket, path, data)
|
||||
|
||||
return 's3://%s/%s' % (bucket, path)
|
||||
|
||||
def _create_dfs_data(self, source, destination, hdfs_username, fs):
|
||||
|
||||
def to_hex_present(string):
|
||||
|
@ -430,6 +468,12 @@ class BaseTestCase(base.BaseTestCase):
|
|||
utils.rand_name('sahara-tests'))
|
||||
return self.__swift_container
|
||||
|
||||
def _get_s3_bucket(self):
|
||||
if not getattr(self, '__s3_bucket', None):
|
||||
self.__s3_bucket = self.__create_bucket(
|
||||
utils.rand_name('sahara-tests'))
|
||||
return self.__s3_bucket
|
||||
|
||||
@track_result("Cluster scaling", False)
|
||||
def check_scale(self):
|
||||
scale_ops = []
|
||||
|
@ -792,6 +836,19 @@ class BaseTestCase(base.BaseTestCase):
|
|||
self.addCleanup(self.swift.delete_object, container_name,
|
||||
object_name)
|
||||
|
||||
def __create_bucket(self, bucket_name):
|
||||
self.boto.create_bucket(bucket_name)
|
||||
if not self.testcase['retain_resources']:
|
||||
self.addCleanup(self.boto.delete_bucket, bucket_name)
|
||||
return bucket_name
|
||||
|
||||
def __upload_to_bucket(self, bucket_name, object_name, data=None):
|
||||
if data:
|
||||
self.boto.upload_data(bucket_name, object_name, data)
|
||||
if not self.testcase['retain_resources']:
|
||||
self.addCleanup(self.boto.delete_object, bucket_name,
|
||||
object_name)
|
||||
|
||||
def __create_keypair(self):
|
||||
key = utils.rand_name('scenario_key')
|
||||
self.nova.nova_client.keypairs.create(key,
|
||||
|
|
|
@ -17,6 +17,8 @@ from __future__ import print_function
|
|||
import time
|
||||
|
||||
import fixtures
|
||||
from botocore.exceptions import ClientError as BotoClientError
|
||||
from botocore import session as botocore_session
|
||||
from glanceclient import client as glance_client
|
||||
from keystoneauth1.identity import v3 as identity_v3
|
||||
from keystoneauth1 import session
|
||||
|
@ -363,6 +365,76 @@ class SwiftClient(Client):
|
|||
return False
|
||||
|
||||
|
||||
class BotoClient(Client):
|
||||
def __init__(self, *args, **kwargs):
|
||||
sess = botocore_session.get_session()
|
||||
self.boto_client = sess.create_client(
|
||||
's3',
|
||||
endpoint_url=kwargs['endpoint'],
|
||||
aws_access_key_id=kwargs['accesskey'],
|
||||
aws_secret_access_key=kwargs['secretkey']
|
||||
)
|
||||
|
||||
def create_bucket(self, bucket_name):
|
||||
return self.boto_client.create_bucket(Bucket=bucket_name)
|
||||
|
||||
def _delete_and_check_bucket(self, bucket_name):
|
||||
bucket_deleted = False
|
||||
operation_parameters = {'Bucket': bucket_name}
|
||||
try:
|
||||
# While list_objects_v2 is the suggested function, pagination
|
||||
# does not seems to work properly with RadosGW when it's used.
|
||||
paginator = self.boto_client.get_paginator('list_objects')
|
||||
page_iterator = paginator.paginate(**operation_parameters)
|
||||
for page in page_iterator:
|
||||
if 'Contents' not in page:
|
||||
continue
|
||||
for item in page['Contents']:
|
||||
self.boto_client.delete_object(Bucket=bucket_name,
|
||||
Key=item['Key'])
|
||||
self.boto_client.delete_bucket(Bucket=bucket_name)
|
||||
except BotoClientError as ex:
|
||||
error = ex.response.get('Error', {})
|
||||
# without the conversion the value is a tuple
|
||||
error_code = '%s' % (error.get('Code', ''))
|
||||
if error_code == 'NoSuchBucket':
|
||||
bucket_deleted = True
|
||||
return bucket_deleted
|
||||
|
||||
def delete_bucket(self, bucket_name):
|
||||
return self.delete_resource(
|
||||
self._delete_and_check_bucket, bucket_name)
|
||||
|
||||
def upload_data(self, bucket_name, object_name, data):
|
||||
return self.boto_client.put_object(
|
||||
Bucket=bucket_name,
|
||||
Key=object_name,
|
||||
Body=data)
|
||||
|
||||
def _delete_and_check_object(self, bucket_name, object_name):
|
||||
self.boto_client.delete_object(Bucket=bucket_name, Key=object_name)
|
||||
object_deleted = False
|
||||
try:
|
||||
self.boto_client.head_object(Bucket=bucket_name, Key=object_name)
|
||||
except BotoClientError as ex:
|
||||
error = ex.response.get('Error', {})
|
||||
# without the conversion the value is a tuple
|
||||
error_code = '%s' % (error.get('Code', ''))
|
||||
if error_code == '404':
|
||||
object_deleted = True
|
||||
return object_deleted
|
||||
|
||||
def delete_object(self, bucket_name, object_name):
|
||||
return self.delete_resource(
|
||||
self._delete_and_check_object,
|
||||
bucket_name, object_name)
|
||||
|
||||
def is_resource_deleted(self, method, *args, **kwargs):
|
||||
# Exceptions are handled directly inside the call to "method",
|
||||
# because they are not the same for objects and buckets.
|
||||
return method(*args, **kwargs)
|
||||
|
||||
|
||||
class GlanceClient(Client):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.glance_client = glance_client.Client('2', *args, **kwargs)
|
||||
|
|
|
@ -66,4 +66,7 @@ clusters:
|
|||
size: 1
|
||||
edp_jobs_flow:
|
||||
- java_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- spark_pi
|
||||
|
|
|
@ -66,4 +66,7 @@ clusters:
|
|||
size: 1
|
||||
edp_jobs_flow:
|
||||
- java_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- spark_pi
|
||||
|
|
|
@ -66,4 +66,7 @@ clusters:
|
|||
size: 1
|
||||
edp_jobs_flow:
|
||||
- java_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- spark_pi
|
||||
|
|
|
@ -89,6 +89,9 @@ clusters:
|
|||
edp_jobs_flow:
|
||||
- pig_job
|
||||
- mapreduce_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- mapreduce_streaming_job
|
||||
- java_job
|
||||
- spark_wordcount
|
||||
|
|
|
@ -89,6 +89,9 @@ clusters:
|
|||
edp_jobs_flow:
|
||||
- pig_job
|
||||
- mapreduce_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- mapreduce_streaming_job
|
||||
- java_job
|
||||
- spark_wordcount
|
||||
|
|
|
@ -89,6 +89,9 @@ clusters:
|
|||
edp_jobs_flow:
|
||||
- pig_job
|
||||
- mapreduce_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- mapreduce_streaming_job
|
||||
- java_job
|
||||
- spark_wordcount
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
credentials:
|
||||
s3_accesskey: ${s3_accesskey}
|
||||
s3_secretkey: ${s3_secretkey}
|
||||
s3_endpoint: ${s3_endpoint}
|
||||
s3_endpoint_ssl: ${s3_endpoint_ssl}
|
||||
s3_bucket_path: ${s3_bucket_path}
|
|
@ -0,0 +1,49 @@
|
|||
edp_jobs_flow:
|
||||
mapreduce_job_s3:
|
||||
- type: MapReduce
|
||||
input_datasource:
|
||||
type: s3
|
||||
source: edp-examples/edp-pig/trim-spaces/data/input
|
||||
output_datasource:
|
||||
type: s3
|
||||
destination: /user/hadoop/edp-output
|
||||
additional_libs:
|
||||
- type: s3
|
||||
source: edp-examples/edp-mapreduce/edp-mapreduce.jar
|
||||
configs:
|
||||
mapred.map.class: org.apache.oozie.example.SampleMapper
|
||||
mapred.reduce.class: org.apache.oozie.example.SampleReducer
|
||||
mapreduce.framework.name: yarn
|
||||
fs.swift.service.sahara.username: ${os_username}
|
||||
fs.swift.service.sahara.password: ${os_password}
|
||||
fs.s3a.access.key: ${s3_accesskey}
|
||||
fs.s3a.secret.key: ${s3_secretkey}
|
||||
fs.s3a.endpoint: ${s3_endpoint}
|
||||
fs.s3a.connection.ssl.enabled: ${s3_endpoint_ssl}
|
||||
fs.s3a.path.style.access: ${s3_bucket_path}
|
||||
spark_wordcount_s3:
|
||||
- type: Spark
|
||||
input_datasource:
|
||||
type: s3
|
||||
source: edp-examples/edp-spark/sample_input.txt
|
||||
output_datasource:
|
||||
type: s3
|
||||
destination: edp-output
|
||||
main_lib:
|
||||
type: s3
|
||||
source: edp-examples/edp-spark/spark-wordcount.jar
|
||||
configs:
|
||||
edp.java.main_class: sahara.edp.spark.SparkWordCount
|
||||
edp.spark.adapt_for_swift: true
|
||||
fs.swift.service.sahara.username: ${os_username}
|
||||
fs.swift.service.sahara.password: ${os_password}
|
||||
fs.swift.service.sahara.username: ${os_username}
|
||||
fs.swift.service.sahara.password: ${os_password}
|
||||
fs.s3a.access.key: ${s3_accesskey}
|
||||
fs.s3a.secret.key: ${s3_secretkey}
|
||||
fs.s3a.endpoint: ${s3_endpoint}
|
||||
fs.s3a.connection.ssl.enabled: ${s3_endpoint_ssl}
|
||||
fs.s3a.path.style.access: ${s3_bucket_path}
|
||||
args:
|
||||
- '{input_datasource}'
|
||||
- '{output_datasource}'
|
|
@ -54,3 +54,6 @@ clusters:
|
|||
size: 1
|
||||
edp_jobs_flow:
|
||||
- mapr
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
|
|
|
@ -66,4 +66,7 @@ clusters:
|
|||
size: 1
|
||||
edp_jobs_flow:
|
||||
- java_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- spark_pi
|
||||
|
|
|
@ -66,4 +66,7 @@ clusters:
|
|||
size: 1
|
||||
edp_jobs_flow:
|
||||
- java_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- spark_pi
|
||||
|
|
|
@ -66,4 +66,7 @@ clusters:
|
|||
size: 1
|
||||
edp_jobs_flow:
|
||||
- java_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- spark_pi
|
||||
|
|
|
@ -89,6 +89,9 @@ clusters:
|
|||
edp_jobs_flow:
|
||||
- pig_job
|
||||
- mapreduce_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- mapreduce_streaming_job
|
||||
- java_job
|
||||
- spark_wordcount
|
||||
|
|
|
@ -89,6 +89,9 @@ clusters:
|
|||
edp_jobs_flow:
|
||||
- pig_job
|
||||
- mapreduce_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- mapreduce_streaming_job
|
||||
- java_job
|
||||
- spark_wordcount
|
||||
|
|
|
@ -89,6 +89,9 @@ clusters:
|
|||
edp_jobs_flow:
|
||||
- pig_job
|
||||
- mapreduce_job
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
- mapreduce_streaming_job
|
||||
- java_job
|
||||
- spark_wordcount
|
||||
|
|
|
@ -54,3 +54,6 @@ clusters:
|
|||
size: 1
|
||||
edp_jobs_flow:
|
||||
- mapr
|
||||
- name: mapreduce_job_s3
|
||||
features:
|
||||
- s3
|
||||
|
|
|
@ -125,7 +125,7 @@ def get_base_parser():
|
|||
parser.add_argument('--report', default=False, action='store_true',
|
||||
help='Write results of test to file')
|
||||
parser.add_argument('--feature', '-f', default=[],
|
||||
nargs='?', help='Set of features to enable')
|
||||
action='append', help='Set of features to enable')
|
||||
parser.add_argument('--count', default=1, nargs='?', type=valid_count,
|
||||
help='Specify count of runs current cases.')
|
||||
parser.add_argument('--v2', '-2', default=False, action='store_true',
|
||||
|
|
|
@ -60,6 +60,21 @@ SCHEMA = {
|
|||
"type": "string",
|
||||
"minLength": 1
|
||||
},
|
||||
"s3_accesskey": {
|
||||
"type": "string",
|
||||
},
|
||||
"s3_secretkey": {
|
||||
"type": "string",
|
||||
},
|
||||
"s3_endpoint": {
|
||||
"type": "string",
|
||||
},
|
||||
"s3_endpoint_ssl": {
|
||||
"type": "boolean",
|
||||
},
|
||||
"s3_bucket_path": {
|
||||
"type": "boolean",
|
||||
},
|
||||
},
|
||||
"additionalProperties": False
|
||||
},
|
||||
|
@ -370,7 +385,8 @@ SCHEMA = {
|
|||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"enum": ["swift", "hdfs", "maprfs"]
|
||||
"enum": ["swift", "hdfs", "maprfs",
|
||||
"s3"]
|
||||
},
|
||||
"source": {
|
||||
"type": "string"
|
||||
|
@ -384,7 +400,8 @@ SCHEMA = {
|
|||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"enum": ["swift", "hdfs", "maprfs"]
|
||||
"enum": ["swift", "hdfs", "maprfs",
|
||||
"s3"]
|
||||
},
|
||||
"destination": {
|
||||
"type": "string"
|
||||
|
@ -398,7 +415,7 @@ SCHEMA = {
|
|||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"enum": ["swift", "database"]
|
||||
"enum": ["swift", "s3", "database"]
|
||||
},
|
||||
"source": {
|
||||
"type": "string"
|
||||
|
@ -414,7 +431,7 @@ SCHEMA = {
|
|||
"properties": {
|
||||
"type": {
|
||||
"type": "string",
|
||||
"enum": ["swift", "database"]
|
||||
"enum": ["swift", "s3", "database"]
|
||||
},
|
||||
"source": {
|
||||
"type": "string"
|
||||
|
|
|
@ -88,6 +88,9 @@ class TestBase(testtools.TestCase):
|
|||
'os_tenant': 'admin',
|
||||
'os_auth_url':
|
||||
'http://localhost:5000/v2.0',
|
||||
's3_accesskey': 'very_long_key',
|
||||
's3_secretkey': 'very_long_secret',
|
||||
's3_endpoint': 'https://localhost',
|
||||
'sahara_service_type':
|
||||
'data-processing-local',
|
||||
'sahara_url':
|
||||
|
@ -143,7 +146,7 @@ class TestBase(testtools.TestCase):
|
|||
"destination": "/user/hadoop/edp-output"
|
||||
},
|
||||
"main_lib": {
|
||||
"type": "swift",
|
||||
"type": "s3",
|
||||
"source": "sahara_tests/scenario/defaults/"
|
||||
"edp-examples/edp-pig/"
|
||||
"top-todoers/example.pig"
|
||||
|
@ -350,12 +353,18 @@ class TestBase(testtools.TestCase):
|
|||
|
||||
@mock.patch('saharaclient.api.base.ResourceManager._create',
|
||||
return_value=FakeResponse(set_id='id_for_job_binaries'))
|
||||
@mock.patch('sahara_tests.scenario.clients.BotoClient.upload_data',
|
||||
return_value={})
|
||||
@mock.patch('sahara_tests.scenario.clients.BotoClient.create_bucket',
|
||||
return_value={'Location': 'foo'})
|
||||
@mock.patch('swiftclient.client.Connection.put_object',
|
||||
return_value=None)
|
||||
@mock.patch('swiftclient.client.Connection.put_container',
|
||||
return_value=None)
|
||||
def test__create_create_job_binaries(self, mock_swiftcontainer,
|
||||
mock_swiftobject,
|
||||
mock_create_bucket,
|
||||
mock_upload_bucket_data,
|
||||
mock_sahara_create):
|
||||
self.base_scenario._init_clients()
|
||||
self.assertEqual((['id_for_job_binaries'], []),
|
||||
|
@ -364,6 +373,8 @@ class TestBase(testtools.TestCase):
|
|||
|
||||
@mock.patch('saharaclient.api.base.ResourceManager._create',
|
||||
return_value=FakeResponse(set_id='id_for_job_binary'))
|
||||
@mock.patch('sahara_tests.scenario.clients.BotoClient.create_bucket',
|
||||
return_value={'Location': 'foo'})
|
||||
@mock.patch('swiftclient.client.Connection.put_object',
|
||||
return_value=None)
|
||||
@mock.patch('swiftclient.client.Connection.put_container',
|
||||
|
@ -371,7 +382,7 @@ class TestBase(testtools.TestCase):
|
|||
@mock.patch('saharaclient.client.Client', return_value=FakeSaharaClient())
|
||||
def test__create_create_job_binary(self, mock_saharaclient,
|
||||
mock_swiftcontainer, mock_swiftobject,
|
||||
mock_sahara_create):
|
||||
mock_create_bucket, mock_sahara_create):
|
||||
self.base_scenario._init_clients()
|
||||
self.assertEqual('id_for_job_binary',
|
||||
self.base_scenario._create_job_binary(self.job.get(
|
||||
|
@ -431,7 +442,7 @@ class TestBase(testtools.TestCase):
|
|||
{
|
||||
"type": "Pig",
|
||||
"input_datasource": {
|
||||
"type": "swift",
|
||||
"type": "s3",
|
||||
"source": "sahara_tests/scenario/defaults/edp-examples/"
|
||||
"edp-pig/top-todoers/"
|
||||
"data/input"
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
# Copyright 2018 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import testtools
|
||||
|
||||
from sahara_tests.utils import url as utils_url
|
||||
|
||||
|
||||
class UrlUnitTest(testtools.TestCase):
|
||||
|
||||
def _check_clean_url(self, url, expected_clean_url):
|
||||
"""Check if the cleaned URL matches the expected one."""
|
||||
clean_url = utils_url.url_schema_remover(url)
|
||||
self.assertEqual(clean_url, expected_clean_url)
|
||||
|
||||
def test_clean_url_http(self):
|
||||
self._check_clean_url('https://s3.amazonaws.com',
|
||||
's3.amazonaws.com')
|
||||
|
||||
def test_clean_url_https_longer(self):
|
||||
self._check_clean_url('https://s3.amazonaws.com/foo',
|
||||
's3.amazonaws.com/foo')
|
||||
|
||||
def test_clean_url_file(self):
|
||||
self._check_clean_url('file:///s3.amazonaws.com/bar',
|
||||
'/s3.amazonaws.com/bar')
|
|
@ -0,0 +1,27 @@
|
|||
# Copyright 2018 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from six.moves.urllib import parse as urlparse
|
||||
|
||||
|
||||
def url_schema_remover(url):
|
||||
""" Return the same URL without the schema.
|
||||
Example: prefix://host/path -> host/path
|
||||
"""
|
||||
parsed = urlparse.urlsplit(url)
|
||||
cleaned = urlparse.urlunsplit((('',) + parsed[1:]))
|
||||
if cleaned.startswith('//'):
|
||||
cleaned = cleaned[2:]
|
||||
return cleaned
|
Loading…
Reference in New Issue