Merge "APIv2 support for all Tempest-based tests"

This commit is contained in:
Zuul 2019-02-17 23:05:11 +00:00 committed by Gerrit Code Review
commit 66df22936d
23 changed files with 674 additions and 78 deletions

View File

@ -13,6 +13,7 @@
voting: false
- sahara-tests-scenario-runner-py3
- sahara-tests-tempest
- sahara-tests-tempest-v2
- sahara-tests-scenario-rocky
- sahara-tests-scenario-queens
- sahara-tests-scenario-pike
@ -70,6 +71,22 @@
- ^releasenotes/.*$
- ^sahara_tests/.*$
- job:
name: sahara-tests-tempest-v2
description: |
Run Tempest tests from the Sahara plugin against Sahara APIv2.
parent: sahara-tests-tempest
required-projects:
- openstack/python-saharaclient
branches: master
vars:
devstack_local_conf:
test-config:
$TEMPEST_CONFIG:
data-processing:
api_version_saharaclient: '2'
use_api_v2: 'True'
# variant for pre-Rocky branches (no S3)
- job:
name: sahara-tests-tempest

View File

@ -0,0 +1,10 @@
---
prelude: >
Tempest tests now support APIv2.
features:
- |
The Tempest plugin provides an APIv2 DataProcessing client and
tempest tests can be executed against APIv2 too.
The type of API used is driven by a tempest.conf configuration key
(data_processing.use_api_v2 for API tests,
data_processing.api_version_saharaclient for client and CLI tests)

View File

@ -271,7 +271,8 @@ def get_default_version(plugin):
def get_node_group_template(nodegroup='worker1',
default_version=None,
floating_ip_pool=None):
floating_ip_pool=None,
api_version='1.1'):
"""Returns a node group template for the default plugin."""
try:
flavor = CONF.compute.flavor_ref
@ -283,19 +284,23 @@ def get_node_group_template(nodegroup='worker1',
node_group_template = {
'description': 'Test node group template',
'plugin_name': default_plugin_name,
'hadoop_version': default_version,
'node_processes': nodegroup_data['node_processes'],
'flavor_id': flavor,
'floating_ip_pool': floating_ip_pool,
'node_configs': nodegroup_data.get('node_configs', {})
}
if api_version == '1.1':
node_group_template['hadoop_version'] = default_version
else:
node_group_template['plugin_version'] = default_version
return node_group_template
except (IndexError, KeyError):
return None
def get_cluster_template(node_group_template_ids=None,
default_version=None):
default_version=None,
api_version='1.1'):
"""Returns a cluster template for the default plugin.
node_group_template_ids contains the type and ID of pre-defined
@ -334,10 +339,13 @@ def get_cluster_template(node_group_template_ids=None,
cluster_template = {
'description': 'Test cluster template',
'plugin_name': default_plugin_name,
'hadoop_version': default_version,
'cluster_configs': plugin_data.get('cluster_configs', {}),
'node_groups': all_node_groups,
}
if api_version == '1.1':
cluster_template['hadoop_version'] = default_version
else:
cluster_template['plugin_version'] = default_version
return cluster_template
except (IndexError, KeyError):
return None

View File

@ -51,6 +51,9 @@ DataProcessingAdditionalGroup = [
cfg.StrOpt('test_ssh_user',
default='ubuntu',
help='username used to access the test image.'),
cfg.BoolOpt('use_api_v2',
default=False,
help='Run API tests against APIv2 instead of 1.1'),
cfg.StrOpt('api_version_saharaclient',
default='1.1',
help='Version of Sahara API used by saharaclient',

View File

@ -62,4 +62,12 @@ class SaharaTempestPlugin(plugins.TempestPlugin):
'client_names': ['DataProcessingClient']
}
params.update(data_processing_config)
return [params]
params_v2 = {
'name': 'data_processing_v2',
'service_version': 'data_processing.v2',
'module_path':
'sahara_tempest_plugin.services.data_processing.v2',
'client_names': ['DataProcessingClient']
}
params_v2.update(data_processing_config)
return [params, params_v2]

View File

@ -0,0 +1,50 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_serialization import jsonutils as json
from tempest.lib.common import rest_client
class BaseDataProcessingClient(rest_client.RestClient):
def _request_and_check_resp(self, request_func, uri, resp_status):
"""Make a request and check response status code.
It returns a ResponseBody.
"""
resp, body = request_func(uri)
self.expected_success(resp_status, resp.status)
return rest_client.ResponseBody(resp, body)
def _request_and_check_resp_data(self, request_func, uri, resp_status):
"""Make a request and check response status code.
It returns pair: resp and response data.
"""
resp, body = request_func(uri)
self.expected_success(resp_status, resp.status)
return resp, body
def _request_check_and_parse_resp(self, request_func, uri,
resp_status, *args, **kwargs):
"""Make a request, check response status code and parse response body.
It returns a ResponseBody.
"""
headers = {'Content-Type': 'application/json'}
resp, body = request_func(uri, headers=headers, *args, **kwargs)
self.expected_success(resp_status, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)

View File

@ -14,43 +14,13 @@
from oslo_serialization import jsonutils as json
from tempest.lib.common import rest_client
from sahara_tempest_plugin.services.data_processing import base_client
class DataProcessingClient(rest_client.RestClient):
class DataProcessingClient(base_client.BaseDataProcessingClient):
api_version = "v1.1"
def _request_and_check_resp(self, request_func, uri, resp_status):
"""Make a request and check response status code.
It returns a ResponseBody.
"""
resp, body = request_func(uri)
self.expected_success(resp_status, resp.status)
return rest_client.ResponseBody(resp, body)
def _request_and_check_resp_data(self, request_func, uri, resp_status):
"""Make a request and check response status code.
It returns pair: resp and response data.
"""
resp, body = request_func(uri)
self.expected_success(resp_status, resp.status)
return resp, body
def _request_check_and_parse_resp(self, request_func, uri,
resp_status, *args, **kwargs):
"""Make a request, check response status code and parse response body.
It returns a ResponseBody.
"""
headers = {'Content-Type': 'application/json'}
resp, body = request_func(uri, headers=headers, *args, **kwargs)
self.expected_success(resp_status, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def list_node_group_templates(self):
"""List all node group templates for a user."""

View File

@ -0,0 +1,18 @@
# Copyright (c) 2016 Hewlett-Packard Enterprise Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
from sahara_tempest_plugin.services.data_processing.v2.data_processing_client import \
DataProcessingClient
__all__ = ['DataProcessingClient']

View File

@ -0,0 +1,241 @@
# Copyright (c) 2013 Mirantis Inc.
# Copyright (c) 2018 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_serialization import jsonutils as json
from sahara_tempest_plugin.services.data_processing import base_client
class DataProcessingClient(base_client.BaseDataProcessingClient):
api_version = "v2"
def list_node_group_templates(self):
"""List all node group templates for a user."""
uri = 'node-group-templates'
return self._request_check_and_parse_resp(self.get, uri, 200)
def get_node_group_template(self, tmpl_id):
"""Returns the details of a single node group template."""
uri = 'node-group-templates/%s' % tmpl_id
return self._request_check_and_parse_resp(self.get, uri, 200)
def create_node_group_template(self, name, plugin_name, plugin_version,
node_processes, flavor_id,
node_configs=None, **kwargs):
"""Creates node group template with specified params.
It supports passing additional params using kwargs and returns created
object.
"""
uri = 'node-group-templates'
body = kwargs.copy()
body.update({
'name': name,
'plugin_name': plugin_name,
'plugin_version': plugin_version,
'node_processes': node_processes,
'flavor_id': flavor_id,
'node_configs': node_configs or dict(),
})
return self._request_check_and_parse_resp(self.post, uri, 202,
body=json.dumps(body))
def delete_node_group_template(self, tmpl_id):
"""Deletes the specified node group template by id."""
uri = 'node-group-templates/%s' % tmpl_id
return self._request_and_check_resp(self.delete, uri, 204)
def update_node_group_template(self, tmpl_id, **kwargs):
"""Updates the details of a single node group template."""
uri = 'node-group-templates/%s' % tmpl_id
return self._request_check_and_parse_resp(self.patch, uri, 202,
body=json.dumps(kwargs))
def list_plugins(self):
"""List all enabled plugins."""
uri = 'plugins'
return self._request_check_and_parse_resp(self.get, uri, 200)
def get_plugin(self, plugin_name, plugin_version=None):
"""Returns the details of a single plugin."""
uri = 'plugins/%s' % plugin_name
if plugin_version:
uri += '/%s' % plugin_version
return self._request_check_and_parse_resp(self.get, uri, 200)
def list_cluster_templates(self):
"""List all cluster templates for a user."""
uri = 'cluster-templates'
return self._request_check_and_parse_resp(self.get, uri, 200)
def get_cluster_template(self, tmpl_id):
"""Returns the details of a single cluster template."""
uri = 'cluster-templates/%s' % tmpl_id
return self._request_check_and_parse_resp(self.get, uri, 200)
def create_cluster_template(self, name, plugin_name, plugin_version,
node_groups, cluster_configs=None,
**kwargs):
"""Creates cluster template with specified params.
It supports passing additional params using kwargs and returns created
object.
"""
uri = 'cluster-templates'
body = kwargs.copy()
body.update({
'name': name,
'plugin_name': plugin_name,
'plugin_version': plugin_version,
'node_groups': node_groups,
'cluster_configs': cluster_configs or dict(),
})
return self._request_check_and_parse_resp(self.post, uri, 202,
body=json.dumps(body))
def delete_cluster_template(self, tmpl_id):
"""Deletes the specified cluster template by id."""
uri = 'cluster-templates/%s' % tmpl_id
return self._request_and_check_resp(self.delete, uri, 204)
def update_cluster_template(self, tmpl_id, **kwargs):
"""Updates the specificed cluster template."""
uri = 'cluster-templates/%s' % tmpl_id
return self._request_check_and_parse_resp(self.patch, uri, 202,
body=json.dumps(kwargs))
def list_data_sources(self):
"""List all data sources for a user."""
uri = 'data-sources'
return self._request_check_and_parse_resp(self.get, uri, 200)
def get_data_source(self, source_id):
"""Returns the details of a single data source."""
uri = 'data-sources/%s' % source_id
return self._request_check_and_parse_resp(self.get, uri, 200)
def create_data_source(self, name, data_source_type, url, **kwargs):
"""Creates data source with specified params.
It supports passing additional params using kwargs and returns created
object.
"""
uri = 'data-sources'
body = kwargs.copy()
body.update({
'name': name,
'type': data_source_type,
'url': url
})
return self._request_check_and_parse_resp(self.post, uri,
202, body=json.dumps(body))
def delete_data_source(self, source_id):
"""Deletes the specified data source by id."""
uri = 'data-sources/%s' % source_id
return self._request_and_check_resp(self.delete, uri, 204)
def update_data_source(self, source_id, **kwargs):
"""Updates a data source"""
uri = 'data-sources/%s' % source_id
return self._request_check_and_parse_resp(self.patch, uri, 202,
body=json.dumps(kwargs))
def list_job_binaries(self):
"""List all job binaries for a user."""
uri = 'job-binaries'
return self._request_check_and_parse_resp(self.get, uri, 200)
def get_job_binary(self, job_binary_id):
"""Returns the details of a single job binary."""
uri = 'job-binaries/%s' % job_binary_id
return self._request_check_and_parse_resp(self.get, uri, 200)
def create_job_binary(self, name, url, extra=None, **kwargs):
"""Creates job binary with specified params.
It supports passing additional params using kwargs and returns created
object.
"""
uri = 'job-binaries'
body = kwargs.copy()
body.update({
'name': name,
'url': url,
'extra': extra or dict(),
})
return self._request_check_and_parse_resp(self.post, uri,
202, body=json.dumps(body))
def delete_job_binary(self, job_binary_id):
"""Deletes the specified job binary by id."""
uri = 'job-binaries/%s' % job_binary_id
return self._request_and_check_resp(self.delete, uri, 204)
def get_job_binary_data(self, job_binary_id):
"""Returns data of a single job binary."""
uri = 'job-binaries/%s/data' % job_binary_id
return self._request_and_check_resp_data(self.get, uri, 200)
def list_job_templates(self):
"""List all jobs templates for a user."""
uri = 'job-templates'
return self._request_check_and_parse_resp(self.get, uri, 200)
def get_job_template(self, job_id):
"""Returns the details of a single job template."""
uri = 'job-templates/%s' % job_id
return self._request_check_and_parse_resp(self.get, uri, 200)
def create_job_template(self, name, job_type, mains, libs=None, **kwargs):
"""Creates job with specified params.
It supports passing additional params using kwargs and returns created
object.
"""
uri = 'job-templates'
body = kwargs.copy()
body.update({
'name': name,
'type': job_type,
'mains': mains,
'libs': libs or list(),
})
return self._request_check_and_parse_resp(self.post, uri,
202, body=json.dumps(body))
def delete_job_template(self, job_id):
"""Deletes the specified job by id."""
uri = 'job-templates/%s' % job_id
return self._request_and_check_resp(self.delete, uri, 204)

View File

@ -43,7 +43,13 @@ class BaseDataProcessingTest(tempest.test.BaseTestCase):
@classmethod
def setup_clients(cls):
super(BaseDataProcessingTest, cls).setup_clients()
cls.client = cls.os_primary.data_processing.DataProcessingClient()
if not CONF.data_processing.use_api_v2:
cls.api_version = '1.1'
cls.client = cls.os_primary.data_processing.DataProcessingClient()
else:
cls.api_version = '2.0'
cls.client = \
cls.os_primary.data_processing_v2.DataProcessingClient()
@classmethod
def resource_setup(cls):
@ -72,11 +78,17 @@ class BaseDataProcessingTest(tempest.test.BaseTestCase):
cls.client.delete_cluster_template)
cls.cleanup_resources(getattr(cls, '_node_group_templates', []),
cls.client.delete_node_group_template)
cls.cleanup_resources(getattr(cls, '_jobs', []), cls.client.delete_job)
if cls.api_version == '1.1':
cls.cleanup_resources(getattr(cls, '_jobs', []),
cls.client.delete_job)
else:
cls.cleanup_resources(getattr(cls, '_jobs', []),
cls.client.delete_job_template)
cls.cleanup_resources(getattr(cls, '_job_binaries', []),
cls.client.delete_job_binary)
cls.cleanup_resources(getattr(cls, '_job_binary_internals', []),
cls.client.delete_job_binary_internal)
if cls.api_version == '1.1':
cls.cleanup_resources(getattr(cls, '_job_binary_internals', []),
cls.client.delete_job_binary_internal)
cls.cleanup_resources(getattr(cls, '_data_sources', []),
cls.client.delete_data_source)
super(BaseDataProcessingTest, cls).resource_cleanup()
@ -178,7 +190,7 @@ class BaseDataProcessingTest(tempest.test.BaseTestCase):
@classmethod
def create_job(cls, name, job_type, mains, libs=None, **kwargs):
"""Creates watched job with specified params.
"""Creates watched job (v1) with specified params.
It supports passing additional params using kwargs and returns created
object. All resources created in this method will be automatically
@ -192,11 +204,29 @@ class BaseDataProcessingTest(tempest.test.BaseTestCase):
return resp_body
@classmethod
def create_job_template(cls, name, job_type, mains, libs=None, **kwargs):
"""Creates watched job template (v2) with specified params.
It supports passing additional params using kwargs and returns created
object. All resources created in this method will be automatically
removed in tearDownClass method.
"""
resp_body = cls.client.create_job_template(name, job_type, mains,
libs, **kwargs)
resp_body = resp_body['job_template']
# store id of created job
cls._jobs.append(resp_body['id'])
return resp_body
@classmethod
def get_node_group_template(cls, nodegroup='worker1'):
"""Returns a node group template for the default plugin."""
return plugin_utils.get_node_group_template(nodegroup,
cls.default_version)
cls.default_version,
None,
cls.api_version)
@classmethod
def get_cluster_template(cls, node_group_template_ids=None):
@ -207,7 +237,8 @@ class BaseDataProcessingTest(tempest.test.BaseTestCase):
(instead of dynamically defining them with 'node_processes').
"""
return plugin_utils.get_cluster_template(node_group_template_ids,
cls.default_version)
cls.default_version,
cls.api_version)
@classmethod
def wait_for_resource_deletion(cls, resource_id, get_resource):

View File

@ -42,6 +42,14 @@ class ClusterTemplateTest(dp_base.BaseDataProcessingTest):
node_group_template_w['name'] = data_utils.rand_name(
'sahara-ng-template')
# hack the arguments: keep the compatibility with the signature
# of self.create_node_group_template
if 'plugin_version' in node_group_template_w:
plugin_version_value = node_group_template_w['plugin_version']
del node_group_template_w['plugin_version']
node_group_template_w['hadoop_version'] = plugin_version_value
resp_body = cls.create_node_group_template(**node_group_template_w)
node_group_template_id = resp_body['id']
configured_node_group_templates = {'worker1': node_group_template_id}
@ -79,9 +87,17 @@ class ClusterTemplateTest(dp_base.BaseDataProcessingTest):
# generate random name if it's not specified
template_name = data_utils.rand_name('sahara-cluster-template')
# hack the arguments: keep the compatibility with the signature
# of self.create_cluster_template
full_cluster_template_w = self.full_cluster_template.copy()
if 'plugin_version' in full_cluster_template_w:
plugin_version_value = full_cluster_template_w['plugin_version']
del full_cluster_template_w['plugin_version']
full_cluster_template_w['hadoop_version'] = plugin_version_value
# create cluster template
resp_body = self.create_cluster_template(template_name,
**self.full_cluster_template)
**full_cluster_template_w)
# ensure that template created successfully
self.assertEqual(template_name, resp_body['name'])

View File

@ -59,12 +59,13 @@ class JobBinaryTest(dp_base.BaseDataProcessingTest):
name = data_utils.rand_name('sahara-internal-job-binary')
cls.job_binary_data = 'Some script may be data'
job_binary_internal = (
cls.create_job_binary_internal(name, cls.job_binary_data))
cls.internal_db_job_binary = {
'url': 'internal-db://%s' % job_binary_internal['id'],
'description': 'Test job binary',
}
if not CONF.data_processing.use_api_v2:
job_binary_internal = (
cls.create_job_binary_internal(name, cls.job_binary_data))
cls.internal_db_job_binary = {
'url': 'internal-db://%s' % job_binary_internal['id'],
'description': 'Test job binary',
}
def _create_job_binary(self, binary_body, binary_name=None):
"""Creates Job Binary with optional name specified.
@ -167,11 +168,17 @@ class JobBinaryTest(dp_base.BaseDataProcessingTest):
@tc.attr('smoke')
@decorators.idempotent_id('63662f6d-8291-407e-a6fc-f654522ebab6')
@testtools.skipIf(CONF.data_processing.api_version_saharaclient != '1.1',
'Job binaries stored on the internal db are available '
'only with API v1.1')
def test_internal_db_job_binary_create(self):
self._create_job_binary(self.internal_db_job_binary)
@tc.attr('smoke')
@decorators.idempotent_id('38731e7b-6d9d-4ffa-8fd1-193c453e88b1')
@testtools.skipIf(CONF.data_processing.api_version_saharaclient != '1.1',
'Job binaries stored on the internal db are available '
'only with API v1.1')
def test_internal_db_job_binary_list(self):
binary_info = self._create_job_binary(self.internal_db_job_binary)
@ -182,6 +189,9 @@ class JobBinaryTest(dp_base.BaseDataProcessingTest):
@tc.attr('smoke')
@decorators.idempotent_id('1b32199b-c3f5-43e1-a37a-3797e57b7066')
@testtools.skipIf(CONF.data_processing.api_version_saharaclient != '1.1',
'Job binaries stored on the internal db are available '
'only with API v1.1')
def test_internal_db_job_binary_get(self):
binary_id, binary_name = (
self._create_job_binary(self.internal_db_job_binary))
@ -193,6 +203,9 @@ class JobBinaryTest(dp_base.BaseDataProcessingTest):
@tc.attr('smoke')
@decorators.idempotent_id('3c42b0c3-3e03-46a5-adf0-df0650271a4e')
@testtools.skipIf(CONF.data_processing.api_version_saharaclient != '1.1',
'Job binaries stored on the internal db are available '
'only with API v1.1')
def test_internal_db_job_binary_delete(self):
binary_id, _ = self._create_job_binary(self.internal_db_job_binary)
@ -206,6 +219,9 @@ class JobBinaryTest(dp_base.BaseDataProcessingTest):
@tc.attr('smoke')
@decorators.idempotent_id('d5d47659-7e2c-4ea7-b292-5b3e559e8587')
@testtools.skipIf(CONF.data_processing.api_version_saharaclient != '1.1',
'Job binaries stored on the internal db are available '
'only with API v1.1')
def test_job_binary_get_data(self):
binary_id, _ = self._create_job_binary(self.internal_db_job_binary)

View File

@ -14,16 +14,27 @@
from testtools import testcase as tc
from tempest import config
from tempest.lib import decorators
from tempest.lib.common.utils import data_utils
from sahara_tempest_plugin.tests.api import base as dp_base
CONF = config.CONF
class JobBinaryInternalTest(dp_base.BaseDataProcessingTest):
# Link to the API documentation is https://developer.openstack.org/
# api-ref/data-processing/#job-binary-internals
@classmethod
def skip_checks(cls):
super(JobBinaryInternalTest, cls).skip_checks()
if CONF.data_processing.use_api_v2:
raise cls.skipException('Job binaries stored on the internal db '
'are available only with API v1.1')
@classmethod
def resource_setup(cls):
super(JobBinaryInternalTest, cls).resource_setup()

View File

@ -0,0 +1,113 @@
# Copyright (c) 2014 Mirantis Inc.
# Copyright (c) 2018 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from testtools import testcase as tc
from tempest import config
from tempest.lib import decorators
from tempest.lib.common.utils import data_utils
from sahara_tempest_plugin.tests.api import base as dp_base
CONF = config.CONF
class JobTemplatesTest(dp_base.BaseDataProcessingTest):
# NOTE: Link to the API documentation: https://developer.openstack.org/
# api-ref/data-processing/v2/#job-templates
@classmethod
def skip_checks(cls):
super(JobTemplatesTest, cls).skip_checks()
if not CONF.data_processing.use_api_v2:
raise cls.skipException('These tests require API v2')
@classmethod
def resource_setup(cls):
super(JobTemplatesTest, cls).resource_setup()
# create job binary
job_binary = {
'name': data_utils.rand_name('sahara-job-binary'),
'url': 'swift://sahara-container.sahara/example.jar',
'description': 'Test job binary',
'extra': {
'user': cls.os_primary.credentials.username,
'password': cls.os_primary.credentials.password
}
}
resp_body = cls.create_job_binary(**job_binary)
job_binary_id = resp_body['id']
cls.job = {
'job_type': 'Pig',
'mains': [job_binary_id]
}
def _create_job_template(self, job_name=None):
"""Creates Job with optional name specified.
It creates job and ensures job name. Returns id and name of created
job.
"""
if not job_name:
# generate random name if it's not specified
job_name = data_utils.rand_name('sahara-job')
# create job
resp_body = self.create_job_template(job_name, **self.job)
# ensure that job created successfully
self.assertEqual(job_name, resp_body['name'])
return resp_body['id'], job_name
@decorators.idempotent_id('26e39bc9-df9c-422f-9401-0d2cf5c87c63')
@tc.attr('smoke')
def test_job_template_create(self):
self._create_job_template()
@decorators.idempotent_id('6d3ce0da-cd37-4ac1-abfa-e53835bd2c08')
@tc.attr('smoke')
def test_job_template_list(self):
job_info = self._create_job_template()
# check for job in list
jobs = self.client.list_job_templates()['job_templates']
jobs_info = [(job['id'], job['name']) for job in jobs]
self.assertIn(job_info, jobs_info)
@decorators.idempotent_id('4396453f-f4b2-415c-916c-6929a51ba89f')
@tc.attr('smoke')
def test_job_template_get(self):
job_id, job_name = self._create_job_template()
# check job fetch by id
job_t = self.client.get_job_template(job_id)['job_template']
self.assertEqual(job_name, job_t['name'])
@decorators.idempotent_id('2d816b08-b20c-438f-8580-3e40fd741eb4')
@tc.attr('smoke')
def test_job_template_delete(self):
job_id, _ = self._create_job_template()
# delete the job by id
self.client.delete_job_template(job_id)
self.wait_for_resource_deletion(job_id, self.client.get_job_template)
jobs = self.client.list_job_templates()['job_templates']
jobs_ids = [job['id'] for job in jobs]
self.assertNotIn(job_id, jobs_ids)

View File

@ -14,15 +14,25 @@
from testtools import testcase as tc
from tempest import config
from tempest.lib import decorators
from tempest.lib.common.utils import data_utils
from sahara_tempest_plugin.tests.api import base as dp_base
CONF = config.CONF
class JobTest(dp_base.BaseDataProcessingTest):
# NOTE: Link to the API documentation: https://developer.openstack.org/
# api-ref/data-processing/#jobs
# api-ref/data-processing/v1.1/#jobs
@classmethod
def skip_checks(cls):
super(JobTest, cls).skip_checks()
if CONF.data_processing.use_api_v2:
raise cls.skipException('These tests require API v1.1')
@classmethod
def resource_setup(cls):

View File

@ -46,9 +46,17 @@ class NodeGroupTemplateTest(dp_base.BaseDataProcessingTest):
# generate random name if it's not specified
template_name = data_utils.rand_name('sahara-ng-template')
# hack the arguments: keep the compatibility with the signature
# of self.create_node_group_template
node_group_template_w = self.node_group_template.copy()
if 'plugin_version' in node_group_template_w:
plugin_version_value = node_group_template_w['plugin_version']
del node_group_template_w['plugin_version']
node_group_template_w['hadoop_version'] = plugin_version_value
# create node group template
resp_body = self.create_node_group_template(template_name,
**self.node_group_template)
**node_group_template_w)
# ensure that template created successfully
self.assertEqual(template_name, resp_body['name'])

View File

@ -29,21 +29,27 @@ class SaharaJobBinaryCLITest(base.ClientTestBase):
'Url'
])
def openstack_job_binary_create(self):
fd, script_name = tempfile.mkstemp()
with fdopen(fd, 'w+') as jb:
jb.write('test-script')
def openstack_job_binary_create(self, job_internal=True):
job_binary_name = data_utils.rand_name('job-fake')
flag = ("%(jb_name)s %(data)s "
% {'jb_name': ('--name %s' % job_binary_name),
'data': ' --data %s' % script_name})
if job_internal:
fd, script_name = tempfile.mkstemp()
with fdopen(fd, 'w+') as jb:
jb.write('test-script')
flag = ("%(jb_name)s %(data)s "
% {'jb_name': ('--name %s' % job_binary_name),
'data': ' --data %s' % script_name})
else:
flag = ("%(jb_name)s --url swift://mybucket.sahara/foo "
"--username foo --password bar"
% {'jb_name': ('--name %s' % job_binary_name)})
self.assertTableStruct(
self.listing_result('job binary create %s' % flag),
[
'Field',
'Value'
])
remove(script_name)
if job_internal:
remove(script_name)
return job_binary_name
def openstack_job_binary_download(self, job_binary_name):

View File

@ -10,6 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from tempest import config
from tempest.lib import decorators
@ -154,6 +156,8 @@ class Scenario(images.SaharaImageCLITest,
self.openstack_image_unregister(image_name)
self.negative_unregister_not_existing_image(image_name)
@testtools.skipIf(TEMPEST_CONF.data_processing.api_version_saharaclient !=
'1.1', "Full job binaries testing requires API v1.1")
def test_job_binary_cli(self):
job_binary_name = self.openstack_job_binary_create()
self.addCleanup(self.delete_resource, 'job binary', job_binary_name)
@ -169,7 +173,7 @@ class Scenario(images.SaharaImageCLITest,
self.negative_delete_removed_job_binary(job_binary_name)
def test_job_template_cli(self):
job_binary_name = self.openstack_job_binary_create()
job_binary_name = self.openstack_job_binary_create(job_internal=False)
self.addCleanup(self.delete_resource, 'job binary', job_binary_name)
job_template_name = self.openstack_job_template_create(job_binary_name)

View File

@ -65,6 +65,11 @@ class BaseDataProcessingTest(tempest.test.BaseTestCase):
service_type=catalog_type,
endpoint_type=endpoint_type)
if TEMPEST_CONF.data_processing.api_version_saharaclient == '1.1':
sahara_api_version = '1.1'
else:
sahara_api_version = '2.0'
if TEMPEST_CONF.service_available.glance:
# Check if glance v1 is available to determine which client to use.
if TEMPEST_CONF.image_feature_enabled.api_v1:
@ -98,16 +103,19 @@ class BaseDataProcessingTest(tempest.test.BaseTestCase):
cls.worker_template = (
plugin_utils.get_node_group_template('worker1',
default_version,
cls.floating_ip_pool))
cls.floating_ip_pool,
sahara_api_version))
cls.master_template = (
plugin_utils.get_node_group_template('master1',
default_version,
cls.floating_ip_pool))
cls.floating_ip_pool,
sahara_api_version))
cls.cluster_template = (
plugin_utils.get_cluster_template(
default_version=default_version))
default_version=default_version,
api_version=sahara_api_version))
cls.swift_data_source_with_creds = {
'url': 'swift://sahara-container/input-source',
@ -230,14 +238,17 @@ class BaseDataProcessingTest(tempest.test.BaseTestCase):
return resp_body
def create_job(self, name, job_type, mains, libs=None, description=None):
if TEMPEST_CONF.data_processing.api_version_saharaclient == '1.1':
base_client_class = self.client.jobs
else:
base_client_class = self.client.job_templates
libs = libs or ()
description = description or ''
resp_body = self.client.jobs.create(
resp_body = base_client_class.create(
name, job_type, mains, libs, description)
self.addCleanup(self.delete_resource, self.client.jobs, resp_body.id)
self.addCleanup(self.delete_resource, base_client_class, resp_body.id)
return resp_body
@ -267,11 +278,14 @@ class BaseDataProcessingTest(tempest.test.BaseTestCase):
% (CLUSTER_STATUS_ACTIVE, timeout))
def create_job_execution(self, **kwargs):
if TEMPEST_CONF.data_processing.api_version_saharaclient == '1.1':
base_client_class = self.client.job_executions
else:
base_client_class = self.client.jobs
resp_body = self.client.job_executions.create(**kwargs)
resp_body = base_client_class.create(**kwargs)
self.addCleanup(self.delete_resource, self.client.job_executions,
resp_body.id)
self.addCleanup(self.delete_resource, base_client_class, resp_body.id)
return resp_body

View File

@ -12,11 +12,17 @@
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from tempest import config
from tempest.lib.common.utils import data_utils
from sahara_tempest_plugin.tests.clients import base
CONF = config.CONF
class JobBinariesTest(base.BaseDataProcessingTest):
def _check_job_binary_create(self, binary_body):
binary_name = data_utils.rand_name('sahara-job-binary')
@ -123,6 +129,9 @@ class JobBinariesTest(base.BaseDataProcessingTest):
self._check_swift_job_binary_update(binary_id)
self._check_job_binary_delete(binary_id)
@testtools.skipIf(CONF.data_processing.api_version_saharaclient != '1.1',
'Job binaries stored on the internal db are available '
'only with API v1.1')
def test_internal_job_binaries(self):
binary_id, binary_name = self._check_internal_db_job_binary_create()
self._check_job_binary_list(binary_id, binary_name)

View File

@ -12,12 +12,23 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest import config
from tempest.lib.common.utils import data_utils
from sahara_tempest_plugin.tests.clients import base
CONF = config.CONF
class JobBinaryInternalsTest(base.BaseDataProcessingTest):
@classmethod
def skip_checks(cls):
super(JobBinaryInternalsTest, cls).skip_checks()
if CONF.data_processing.api_version_saharaclient != '1.1':
raise cls.skipException('Job binaries stored on the internal db '
'are available only with API v1.1')
def _check_job_binary_internal_create(self):
name = data_utils.rand_name('sahara-internal-job-binary')
self.job_binary_data = 'Some data'

View File

@ -104,10 +104,13 @@ class JobExecutionTest(base.BaseDataProcessingTest):
self.cluster_info = {
'name': cluster_name,
'plugin_name': 'fake',
'hadoop_version': '0.1',
'cluster_template_id': cluster_template.id,
'default_image_id': self.test_image_id
}
plugin_version_option = 'plugin_version'
if CONF.data_processing.api_version_saharaclient == '1.1':
plugin_version_option = 'hadoop_version'
self.cluster_info[plugin_version_option] = '0.1'
# create cluster
cluster = self.create_cluster(**self.cluster_info)

View File

@ -12,12 +12,30 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest import config
from tempest.lib.common.utils import data_utils
from sahara_tempest_plugin.tests.clients import base
CONF = config.CONF
class JobTest(base.BaseDataProcessingTest):
def _get_job_template_client(self):
if CONF.data_processing.api_version_saharaclient == '1.1':
job_template_client_class = self.client.jobs
else:
job_template_client_class = self.client.job_templates
return job_template_client_class
def _get_job_template_items(self, job_template_object):
if CONF.data_processing.api_version_saharaclient == '1.1':
return job_template_object.job
else:
return job_template_object.job_template
def _check_create_job(self):
job_binary = {
'name': data_utils.rand_name('sahara-job-binary'),
@ -45,13 +63,13 @@ class JobTest(base.BaseDataProcessingTest):
def _check_job_list(self, job_id, job_name):
# check for job in list
job_list = self.client.jobs.list()
job_list = self._get_job_template_client().list()
jobs_info = [(job.id, job.name) for job in job_list]
self.assertIn((job_id, job_name), jobs_info)
def _check_get_job(self, job_id, job_name):
# check job fetch by id
job = self.client.jobs.get(job_id)
job = self._get_job_template_client().get(job_id)
self.assertEqual(job_name, job.name)
def _check_job_update(self, job_id):
@ -61,14 +79,15 @@ class JobTest(base.BaseDataProcessingTest):
'description': 'description'
}
job = self.client.jobs.update(job_id, **values)
self.assertDictContainsSubset(values, job.job)
job = self._get_job_template_client().update(job_id, **values)
self.assertDictContainsSubset(values,
self._get_job_template_items(job))
def _check_delete_job(self, job_id):
# delete job by id
self.client.jobs.delete(job_id)
self._get_job_template_client().delete(job_id)
# check that job really deleted
job_list = self.client.jobs.list()
job_list = self._get_job_template_client().list()
self.assertNotIn(job_id, [job.id for job in job_list])
def test_job(self):