Add image task client and image tests task APIs.

This patch add task APIs as in the doc following and tests for task APIs.

[doc]https://docs.openstack.org/api-ref/image/v2/#tasks

Change-Id: I237e0467a0e8edde0b2858551d8bc01e8ba410e0
This commit is contained in:
msava 2023-07-18 11:27:04 +03:00
parent 274878649f
commit 88660d436b
7 changed files with 371 additions and 1 deletions

View File

@ -0,0 +1,54 @@
---
features:
- |
The following ``tasks_client`` tempest client for glance v2 image
task API is implemented in this release.

View File

@ -0,0 +1,139 @@
# Copyright 2023 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.image import base
from tempest.common import waiters
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
CONF = config.CONF
class ImageTaskCreate(base.BaseV2ImageAdminTest):
"""Test image task operations"""
@classmethod
def skip_checks(cls):
# TODO(msava): Add additional skipcheck with task conversion_format and
# glance ceph backend then will be available
# in tempest image service config options.
super(ImageTaskCreate, cls).skip_checks()
if not CONF.image.http_image:
skip_msg = ("%s skipped as http_image is not available " %
cls.__name__)
raise cls.skipException(skip_msg)
@classmethod
def resource_setup(cls):
super(ImageTaskCreate, cls).resource_setup()
@staticmethod
def _prepare_image_tasks_param(type="import",
disk_format=['qcow2'],
image_from_format=['qcow2'],
image_location=CONF.image.http_image):
# TODO(msava): Need to add additional disk formats then
# task conversion_format and glance Ceph backend will be
# available in image service options
"""Prepare image task params.
By default, will create task type 'import'
The same index is used for both params and creates a task
:param type Type of the task.
:param disk_format: Each format in the list is a different task.
:param image_from_format: Each format in the list is a different task.
:param image_location Location to import image from.
:return: A list with all task.
"""
i = 0
tasks = list()
while i < len(disk_format):
image_name = data_utils.rand_name("task_image")
image_property = {"container_format": "bare",
"disk_format": disk_format[0],
"visibility": "public",
"name": image_name
}
task = {
"type": type,
"input": {
"image_properties": image_property,
"import_from_format": image_from_format[0],
"import_from": image_location
}
}
tasks.append(task)
i += 1
return tasks
def _verify_disk_format(self, task_body):
expected_disk_format = \
task_body['input']['image_properties']['disk_format']
image_id = task_body['result']['image_id']
observed_disk_format = self.admin_client.show_image(
image_id)['disk_format']
# If glance backend storage is Ceph glance will convert
# image to raw format.
# TODO(msava): Need to change next lines once task conversion_format
# and glance ceph backend will be available in image service options
if observed_disk_format == 'raw':
return
self.assertEqual(observed_disk_format, expected_disk_format,
message="Expected disk format not match ")
@decorators.idempotent_id('669d5387-0340-4abf-b62d-7cc89f539c8c')
def test_image_tasks_create(self):
"""Test task type 'import' image """
# Prepare params for task type 'import'
tasks = self._prepare_image_tasks_param()
# Create task type 'import'
body = self.os_admin.tasks_client.create_task(**tasks[0])
task_id = body['id']
task_body = waiters.wait_for_tasks_status(self.os_admin.tasks_client,
task_id, 'success')
self.addCleanup(self.admin_client.delete_image,
task_body['result']['image_id'])
task_image_id = task_body['result']['image_id']
waiters.wait_for_image_status(self.client, task_image_id, 'active')
self._verify_disk_format(task_body)
# Verify disk format
image_body = self.client.show_image(task_image_id)
task_disk_format = \
task_body['input']['image_properties']['disk_format']
image_disk_format = image_body['disk_format']
self.assertEqual(
image_disk_format, task_disk_format,
message="Image Disc format %s not match to expected %s"
% (image_disk_format, task_disk_format))
@decorators.idempotent_id("ad6450c6-7060-4ee7-a2d1-41c2604b446c")
@decorators.attr(type=['negative'])
def test_task_create_fake_image_location(self):
http_fake_url = ''.join(
["http://", data_utils.rand_name('dummy-img-file'), ".qcow2"])
task = self._prepare_image_tasks_param(
image_from_format=['qcow2'],
disk_format=['qcow2'],
image_location=http_fake_url)
body = self.os_admin.tasks_client.create_task(**task[0])
task_observed = \
waiters.wait_for_tasks_status(self.os_admin.tasks_client,
body['id'], 'failure')
task_observed = task_observed['status']
self.assertEqual(task_observed, 'failure')

View File

@ -97,6 +97,7 @@ class Manager(clients.ServiceClients):
self.image_v2.NamespacePropertiesClient()
self.namespace_tags_client = self.image_v2.NamespaceTagsClient()
self.image_versions_client = self.image_v2.VersionsClient()
self.tasks_client = self.image_v2.TaskClient()
# NOTE(danms): If no alternate endpoint is configured,
# this client will work the same as the base self.images_client.
# If your test needs to know if these are different, check the

View File

@ -222,6 +222,24 @@ def wait_for_image_tasks_status(client, image_id, status):
raise lib_exc.TimeoutException(message)
def wait_for_tasks_status(client, task_id, status):
start = int(time.time())
while int(time.time()) - start < client.build_timeout:
task = client.show_tasks(task_id)
if task['status'] == status:
return task
time.sleep(client.build_interval)
message = ('Task %(task_id)s tasks: '
'failed to reach %(status)s state within the required '
'time (%(timeout)s s).' % {'task_id': task_id,
'status': status,
'timeout': client.build_timeout})
caller = test_utils.find_test_caller()
if caller:
message = '(%s) %s' % (caller, message)
raise lib_exc.TimeoutException(message)
def wait_for_image_imported_to_stores(client, image_id, stores=None):
"""Waits for an image to be imported to all requested stores.

View File

@ -27,9 +27,11 @@ from tempest.lib.services.image.v2.namespaces_client import NamespacesClient
from tempest.lib.services.image.v2.resource_types_client import \
ResourceTypesClient
from tempest.lib.services.image.v2.schemas_client import SchemasClient
from tempest.lib.services.image.v2.tasks_client import TaskClient
from tempest.lib.services.image.v2.versions_client import VersionsClient
__all__ = ['ImageMembersClient', 'ImagesClient', 'ImageCacheClient',
'NamespaceObjectsClient', 'NamespacePropertiesClient',
'NamespaceTagsClient', 'NamespacesClient', 'ResourceTypesClient',
'SchemasClient', 'VersionsClient']
'SchemasClient', 'TaskClient', 'VersionsClient']

View File

@ -0,0 +1,70 @@
# Copyright 2023 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from urllib import parse as urllib
from oslo_serialization import jsonutils as json
from tempest.lib.common import rest_client
CHUNKSIZE = 1024 * 64 # 64kB
class TaskClient(rest_client.RestClient):
api_version = "v2"
def create_task(self, **kwargs):
"""Create a task.
For a full list of available parameters, please refer to the official
API reference:
https://developer.openstack.org/api-ref/image/v2/#create-task
"""
data = json.dumps(kwargs)
resp, body = self.post('tasks', data)
self.expected_success(201, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def list_tasks(self, **kwargs):
"""List tasks.
For a full list of available parameters, please refer to the official
API reference:
https://developer.openstack.org/api-ref/image/v2/#list-tasks
"""
url = 'tasks'
if kwargs:
url += '?%s' % urllib.urlencode(kwargs)
resp, body = self.get(url)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
def show_tasks(self, task_id):
"""Show task details.
For a full list of available parameters, please refer to the official
API reference:
https://docs.openstack.org/api-ref/image/v2/#show-task-details
"""
url = 'tasks/%s' % task_id
resp, body = self.get(url)
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)

View File

@ -0,0 +1,86 @@
# Copyright 2023 Red Hat, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib.services.image.v2 import tasks_client
from tempest.tests.lib import fake_auth_provider
from tempest.tests.lib.services import base
class TestImageTaskClient(base.BaseServiceTest):
def setUp(self):
super(TestImageTaskClient, self).setUp()
fake_auth = fake_auth_provider.FakeAuthProvider()
self.client = tasks_client.TaskClient(
fake_auth, 'image', 'regionOne')
def test_list_task(self):
fake_result = {
"first": "/v2/tasks",
"schema": "/v2/schemas/tasks",
"tasks": [
{
"id": "08b7e1c8-3821-4f54-b3b8-d6655d178cdf",
"owner": "fa6c8c1600f4444281658a23ee6da8e8",
"schema": "/v2/schemas/task",
"self": "/v2/tasks/08b7e1c8-3821-4f54-b3b8-d6655d178cdf",
"status": "processing",
"type": "import"
},
{
"id": "231c311d-3557-4e23-afc4-6d98af1419e7",
"owner": "fa6c8c1600f4444281658a23ee6da8e8",
"schema": "/v2/schemas/task",
"self": "/v2/tasks/231c311d-3557-4e23-afc4-6d98af1419e7",
"status": "processing",
"type": "import"
}
]
}
self.check_service_client_function(
self.client.list_tasks,
'tempest.lib.common.rest_client.RestClient.get',
fake_result,
mock_args=['tasks'])
def test_create_task(self):
fake_result = {
"type": "import",
"input": {
"import_from":
"http://download.cirros-cloud.net/0.6.1/ \
cirros-0.6.1-x86_64-disk.img",
"import_from_format": "qcow2",
"image_properties": {
"disk_format": "qcow2",
"container_format": "bare"
}
}
}
self.check_service_client_function(
self.client.create_task,
'tempest.lib.common.rest_client.RestClient.post',
fake_result,
status=201)
def test_show_task(self):
fake_result = {
"task_id": "08b7e1c8-3821-4f54-b3b8-d6655d178cdf"
}
self.check_service_client_function(
self.client.show_tasks,
'tempest.lib.common.rest_client.RestClient.get',
fake_result,
status=200,
task_id="e485aab9-0907-4973-921c-bb6da8a8fcf8")