Add image.task resource

This change implements a Task resource of the image service with a 
wait_for_task method to wait for task to reach certain (normally 
'success') status

Change-Id: Ib74bb59bb06b6753720fc5047af1168c3cd66898
This commit is contained in:
Artem Goncharov 2019-02-19 18:34:44 +01:00
parent a185a7fd2d
commit 4417a7e646
8 changed files with 227 additions and 0 deletions

View File

@ -40,3 +40,13 @@ Member Operations
.. automethod:: openstack.image.v2._proxy.Proxy.get_member
.. automethod:: openstack.image.v2._proxy.Proxy.find_member
.. automethod:: openstack.image.v2._proxy.Proxy.members
Task Operations
^^^^^^^^^^^^^^^
.. autoclass:: openstack.image.v2._proxy.Proxy
.. automethod:: openstack.image.v2._proxy.Proxy.tasks
.. automethod:: openstack.image.v2._proxy.Proxy.create_task
.. automethod:: openstack.image.v2._proxy.Proxy.get_task
.. automethod:: openstack.image.v2._proxy.Proxy.wait_for_task

View File

@ -14,3 +14,4 @@ Image v2 Resources
v2/image
v2/member
v2/task

View File

@ -0,0 +1,12 @@
openstack.image.v2.task
=======================
.. automodule:: openstack.image.v2.task
The Task Class
--------------
The ``Task`` class inherits from :class:`~openstack.resource.Resource`.
.. autoclass:: openstack.image.v2.task.Task
:members:

View File

@ -14,6 +14,7 @@ from openstack import exceptions
from openstack.image.v2 import image as _image
from openstack.image.v2 import member as _member
from openstack.image.v2 import schema as _schema
from openstack.image.v2 import task as _task
from openstack import proxy
from openstack import resource
@ -348,3 +349,64 @@ class Proxy(proxy.Proxy):
"""
return self._get(_schema.Schema, requires_id=False,
base_path='/schemas/member')
def tasks(self, **query):
"""Return a generator of tasks
:param kwargs query: Optional query parameters to be sent to limit
the resources being returned.
:returns: A generator of task objects
:rtype: :class:`~openstack.image.v2.task.Task`
"""
return self._list(_task.Task, **query)
def get_task(self, task):
"""Get task details
:param task: The value can be the ID of a task or a
:class:`~openstack.image.v2.task.Task` instance.
:returns: One :class:`~openstack.image.v2.task.Task`
:raises: :class:`~openstack.exceptions.ResourceNotFound`
when no resource can be found.
"""
return self._get(_task.Task, task)
def create_task(self, **attrs):
"""Create a new task from attributes
:param dict attrs: Keyword arguments which will be used to create
a :class:`~openstack.image.v2.task.Task`,
comprised of the properties on the Task class.
:returns: The results of task creation
:rtype: :class:`~openstack.image.v2.task.Task`
"""
return self._create(_task.Task, **attrs)
def wait_for_task(self, task, status='success', failures=None,
interval=2, wait=120):
"""Wait for a task to be in a particular status.
:param task: The resource to wait on to reach the specified status.
The resource must have a ``status`` attribute.
:type resource: A :class:`~openstack.resource.Resource` object.
:param status: Desired status.
:param failures: Statuses that would be interpreted as failures.
:type failures: :py:class:`list`
:param interval: Number of seconds to wait before to consecutive
checks. Default to 2.
:param wait: Maximum number of seconds to wait before the change.
Default to 120.
:returns: The resource is returned on success.
:raises: :class:`~openstack.exceptions.ResourceTimeout` if transition
to the desired status failed to occur in specified seconds.
:raises: :class:`~openstack.exceptions.ResourceFailure` if the resource
has transited to one of the failure statuses.
:raises: :class:`~AttributeError` if the resource does not have a
``status`` attribute.
"""
failures = ['failure'] if failures is None else failures
return resource.wait_for_status(
self, task, status, failures, interval, wait)

View File

@ -0,0 +1,50 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack import resource
class Task(resource.Resource):
resources_key = 'tasks'
base_path = '/tasks'
# capabilities
allow_create = True
allow_fetch = True
allow_list = True
_query_mapping = resource.QueryParameters(
'type', 'status', 'sort_dir', 'sort_key'
)
#: The date and time when the task was created.
created_at = resource.Body('created_at')
#: The date and time when the task is subject to removal.
expires_at = resource.Body('expires_at')
#: A JSON object specifying the input parameters to the task.
input = resource.Body('input')
#: Human-readable text, possibly an empty string, usually displayed
#: in an error situation to provide more information about what
#: has occurred.
message = resource.Body('message')
#: The ID of the owner, or project, of the task.
owner_id = resource.Body('owner')
#: A JSON object specifying the outcome of the task.
result = resource.Body('result')
#: The URL for schema of the task.
schema = resource.Body('schema')
#: The status of the task.
status = resource.Body('status')
#: The type of task represented by this content.
type = resource.Body('type')
#: The date and time when the task was updated.
updated_at = resource.Body('updated_at')

View File

@ -61,3 +61,7 @@ class TestImage(base.BaseFunctionalTest):
def test_get_member_schema(self):
schema = self.conn.image.get_member_schema()
self.assertIsNotNone(schema)
def test_list_tasks(self):
tasks = self.conn.image.tasks()
self.assertIsNotNone(tasks)

View File

@ -17,6 +17,7 @@ from openstack.image.v2 import _proxy
from openstack.image.v2 import image
from openstack.image.v2 import member
from openstack.image.v2 import schema
from openstack.image.v2 import task
from openstack.tests.unit.image.v2 import test_image as fake_image
from openstack.tests.unit import test_proxy_base
@ -183,3 +184,19 @@ class TestImageProxy(test_proxy_base.TestProxyBase):
expected_args=[schema.Schema],
expected_kwargs={'base_path': '/schemas/member',
'requires_id': False})
def test_task_get(self):
self.verify_get(self.proxy.get_task, task.Task)
def test_tasks(self):
self.verify_list(self.proxy.tasks, task.Task)
def test_task_create(self):
self.verify_create(self.proxy.create_task, task.Task)
def test_task_wait_for(self):
value = task.Task(id='1234')
self.verify_wait_for_status(
self.proxy.wait_for_task,
method_args=[value],
expected_args=[value, 'success', ['failure'], 2, 120])

View File

@ -0,0 +1,71 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.tests.unit import base
from openstack.image.v2 import task
IDENTIFIER = 'IDENTIFIER'
EXAMPLE = {
'created_at': '2016-06-24T14:40:19Z',
'id': IDENTIFIER,
'input': {
'image_properties': {
'container_format': 'ovf',
'disk_format': 'vhd'
},
'import_from': 'http://example.com',
'import_from_format': 'qcow2'
},
'message': 'message',
'owner': 'fa6c8c1600f4444281658a23ee6da8e8',
'result': 'some result',
'schema': '/v2/schemas/task',
'status': 'processing',
'type': 'import',
'updated_at': '2016-06-24T14:40:20Z'
}
class TestTask(base.TestCase):
def test_basic(self):
sot = task.Task()
self.assertIsNone(sot.resource_key)
self.assertEqual('tasks', sot.resources_key)
self.assertEqual('/tasks', sot.base_path)
self.assertTrue(sot.allow_create)
self.assertTrue(sot.allow_fetch)
self.assertFalse(sot.allow_commit)
self.assertFalse(sot.allow_delete)
self.assertTrue(sot.allow_list)
self.assertDictEqual({'limit': 'limit',
'marker': 'marker',
'sort_dir': 'sort_dir',
'sort_key': 'sort_key',
'status': 'status',
'type': 'type',
},
sot._query_mapping._mapping)
def test_make_it(self):
sot = task.Task(**EXAMPLE)
self.assertEqual(IDENTIFIER, sot.id)
self.assertEqual(EXAMPLE['created_at'], sot.created_at)
self.assertEqual(EXAMPLE['input'], sot.input)
self.assertEqual(EXAMPLE['message'], sot.message)
self.assertEqual(EXAMPLE['owner'], sot.owner_id)
self.assertEqual(EXAMPLE['result'], sot.result)
self.assertEqual(EXAMPLE['schema'], sot.schema)
self.assertEqual(EXAMPLE['status'], sot.status)
self.assertEqual(EXAMPLE['type'], sot.type)
self.assertEqual(EXAMPLE['updated_at'], sot.updated_at)