Merge "Adds TaskStub class"

This commit is contained in:
Jenkins 2014-04-08 20:41:48 +00:00 committed by Gerrit Code Review
commit 0741c4004a
13 changed files with 227 additions and 64 deletions

View File

@ -429,8 +429,8 @@ class TaskRepoProxy(glance.domain.proxy.TaskRepo):
self.context = context
super(TaskRepoProxy, self).__init__(task_repo)
def get_task_and_details(self, task_id):
task, task_details = self.task_repo.get_task_and_details(task_id)
def get_task_stub_and_details(self, task_id):
task, task_details = self.task_repo.get_task_stub_and_details(task_id)
return proxy_task(self.context, task), proxy_task_details(self.context,
task,
task_details)

View File

@ -380,9 +380,9 @@ class TaskRepoProxy(glance.domain.proxy.TaskRepo):
task_details_proxy_class=TaskDetailsProxy,
task_details_proxy_kwargs=proxy_kwargs)
def get_task_and_details(self, task_id):
def get_task_stub_and_details(self, task_id):
self.policy.enforce(self.context, 'get_task', {})
return super(TaskRepoProxy, self).get_task_and_details(task_id)
return super(TaskRepoProxy, self).get_task_stub_and_details(task_id)
def list_tasks(self, *args, **kwargs):
self.policy.enforce(self.context, 'get_tasks', {})

View File

@ -56,9 +56,9 @@ class TasksController(object):
task_repo = self.gateway.get_task_repo(req.context)
live_time = CONF.task.task_time_to_live
try:
new_task = task_factory.new_task(task_type=task['type'],
owner=req.context.owner,
task_time_to_live=live_time)
new_task = task_factory.new_task_stub(task_type=task['type'],
owner=req.context.owner,
task_time_to_live=live_time)
new_task_details = task_factory.new_task_details(new_task.task_id,
task['input'])
task_repo.add(new_task, new_task_details)
@ -104,7 +104,7 @@ class TasksController(object):
def get(self, req, task_id):
try:
task_repo = self.gateway.get_task_repo(req.context)
task, task_details = task_repo.get_task_and_details(task_id)
task, task_details = task_repo.get_task_stub_and_details(task_id)
except exception.NotFound as e:
msg = (_("Failed to find task %(task_id)s. Reason: %(reason)s") %
{'task_id': task_id, 'reason': unicode(e)})

View File

@ -283,8 +283,12 @@ class ImageMemberRepo(object):
class TaskRepo(object):
def _format_task_from_db(self, db_task):
return glance.domain.Task(
def __init__(self, context, db_api):
self.context = context
self.db_api = db_api
def _format_task_stub_from_db(self, db_task):
return glance.domain.TaskStub(
task_id=db_task['id'],
task_type=db_task['type'],
status=db_task['status'],
@ -302,7 +306,7 @@ class TaskRepo(object):
message=db_task['message'],
)
def _format_task_to_db(self, task, task_details=None):
def _format_task_stub_and_details_to_db(self, task, task_details=None):
task = {'id': task.task_id,
'type': task.type,
'status': task.status,
@ -323,17 +327,13 @@ class TaskRepo(object):
return task
def __init__(self, context, db_api):
self.context = context
self.db_api = db_api
def get_task_and_details(self, task_id):
def get_task_stub_and_details(self, task_id):
try:
db_api_task = self.db_api.task_get(self.context, task_id)
except (exception.NotFound, exception.Forbidden):
msg = _('Could not find task %s') % task_id
raise exception.NotFound(msg)
return (self._format_task_from_db(db_api_task),
return (self._format_task_stub_from_db(db_api_task),
self._format_task_details_from_db(db_api_task))
def list_tasks(self,
@ -348,10 +348,11 @@ class TaskRepo(object):
limit=limit,
sort_key=sort_key,
sort_dir=sort_dir)
return [self._format_task_from_db(task) for task in db_api_tasks]
return [self._format_task_stub_from_db(task) for task in db_api_tasks]
def save(self, task, task_details=None):
task_values = self._format_task_to_db(task, task_details)
task_values = self._format_task_stub_and_details_to_db(task,
task_details)
try:
updated_values = self.db_api.task_update(self.context,
task.task_id,
@ -362,13 +363,14 @@ class TaskRepo(object):
task.updated_at = updated_values['updated_at']
def add(self, task, task_details=None):
task_values = self._format_task_to_db(task, task_details)
task_values = self._format_task_stub_and_details_to_db(task,
task_details)
updated_values = self.db_api.task_create(self.context, task_values)
task.created_at = updated_values['created_at']
task.updated_at = updated_values['updated_at']
def remove(self, task):
task_values = self._format_task_to_db(task)
task_values = self._format_task_stub_and_details_to_db(task)
try:
self.db_api.task_update(self.context, task.task_id, task_values)
updated_values = self.db_api.task_delete(self.context,

View File

@ -311,7 +311,8 @@ class Task(object):
_supported_task_status = ('pending', 'processing', 'success', 'failure')
def __init__(self, task_id, task_type, status, owner,
expires_at, created_at, updated_at, task_time_to_live=48):
expires_at, created_at, updated_at,
task_input, result, message, task_time_to_live=48):
if task_type not in self._supported_task_type:
raise exception.InvalidTaskType(task_type)
@ -329,14 +330,14 @@ class Task(object):
self._time_to_live = datetime.timedelta(hours=task_time_to_live)
self.created_at = created_at
self.updated_at = updated_at
self.task_input = task_input
self.result = result
self.message = message
@property
def status(self):
return self._status
def run(self, executor):
pass
def _validate_task_status_transition(self, cur_status, new_status):
valid_transitions = {
'pending': ['processing', 'failure'],
@ -384,6 +385,27 @@ class Task(object):
self.expires_at = timeutils.utcnow() + self._time_to_live
class TaskStub(object):
def __init__(self, task_id, task_type, status, owner,
expires_at, created_at, updated_at, task_time_to_live=48):
self.task_id = task_id
self._status = status
self.type = task_type
self.owner = owner
self.expires_at = expires_at
self._time_to_live = datetime.timedelta(hours=task_time_to_live)
self.created_at = created_at
self.updated_at = updated_at
@property
def status(self):
return self._status
def run(self, executor):
pass
class TaskDetails(object):
def __init__(self, task_id, task_input, message, result):
@ -407,6 +429,28 @@ class TaskFactory(object):
created_at = timeutils.utcnow()
updated_at = created_at
return Task(
task_id,
task_type,
status,
owner,
expires_at,
created_at,
updated_at,
None, # input
None, # result
None, # message
task_time_to_live
)
def new_task_stub(self, task_type, owner, task_time_to_live=48):
task_id = str(uuid.uuid4())
status = 'pending'
# Note(nikhil): expires_at would be set on the task, only when it
# succeeds or fails.
expires_at = None
created_at = timeutils.utcnow()
updated_at = created_at
return TaskStub(
task_id,
task_type,
status,

View File

@ -55,8 +55,8 @@ class TaskRepo(object):
self.task_details_proxy_helper = Helper(task_details_proxy_class,
task_details_proxy_kwargs)
def get_task_and_details(self, task_id):
task, task_details = self.base.get_task_and_details(task_id)
def get_task_stub_and_details(self, task_id):
task, task_details = self.base.get_task_stub_and_details(task_id)
return (self.task_proxy_helper.proxy(task),
self.task_details_proxy_helper.proxy(task_details))
@ -178,9 +178,9 @@ class Task(object):
expires_at = _proxy('base', 'expires_at')
created_at = _proxy('base', 'created_at')
updated_at = _proxy('base', 'updated_at')
def run(self, executor):
self.base.run(executor)
task_input = _proxy('base', 'task_input')
result = _proxy('base', 'result')
message = _proxy('base', 'message')
def begin_processing(self):
self.base.begin_processing()
@ -192,6 +192,22 @@ class Task(object):
self.base.fail(message)
class TaskStub(object):
def __init__(self, base):
self.base = base
task_id = _proxy('base', 'task_id')
type = _proxy('base', 'type')
status = _proxy('base', 'status')
owner = _proxy('base', 'owner')
expires_at = _proxy('base', 'expires_at')
created_at = _proxy('base', 'created_at')
updated_at = _proxy('base', 'updated_at')
def run(self, executor):
self.base.run(executor)
class TaskDetails(object):
def __init__(self, base):
self.base = base
@ -218,6 +234,10 @@ class TaskFactory(object):
t = self.base.new_task(**kwargs)
return self.task_helper.proxy(t)
def new_task_stub(self, **kwargs):
t = self.base.new_task_stub(**kwargs)
return self.task_helper.proxy(t)
def new_task_details(self, task_id, task_input, message=None, result=None):
td = self.base.new_task_details(task_id, task_input, message, result)
return self.task_details_helper.proxy(td)

View File

@ -347,11 +347,6 @@ class TaskProxy(glance.domain.proxy.Task):
self.notifier = notifier
super(TaskProxy, self).__init__(task)
def run(self, executor):
self.notifier.info('task.run',
format_task_notification(self.task))
return super(TaskProxy, self).run(executor)
def begin_processing(self):
self.notifier.info(
'task.processing',
@ -370,6 +365,20 @@ class TaskProxy(glance.domain.proxy.Task):
return super(TaskProxy, self).fail(message)
class TaskStubProxy(glance.domain.proxy.TaskStub):
def __init__(self, task, context, notifier):
self.task = task
self.context = context
self.notifier = notifier
super(TaskStubProxy, self).__init__(task)
def run(self, executor):
self.notifier.info('task.run',
format_task_notification(self.task))
return super(TaskStubProxy, self).run(executor)
class TaskDetailsProxy(glance.domain.proxy.TaskDetails):
def __init__(self, task_details, context, notifier):

View File

@ -977,7 +977,7 @@ class TestTaskRepoProxy(utils.BaseTestCase):
def __init__(self, fixtures):
self.fixtures = fixtures
def get_task_and_details(self, task_id):
def get_task_stub_and_details(self, task_id):
for f in self.fixtures:
if f.task_id == task_id:
return f, None
@ -1005,12 +1005,13 @@ class TestTaskRepoProxy(utils.BaseTestCase):
)
def test_get_mutable_task(self):
task, _ = self.task_repo.get_task_and_details(self.fixtures[0].task_id)
task, _ = self.task_repo.get_task_stub_and_details(
self.fixtures[0].task_id)
self.assertEqual(task.task_id, self.fixtures[0].task_id)
def test_get_immutable_task(self):
task_id = self.fixtures[1].task_id
task, task_details = self.task_repo.get_task_and_details(task_id)
task, task_details = self.task_repo.get_task_stub_and_details(task_id)
self.assertRaises(exception.Forbidden,
setattr,
task_details,

View File

@ -562,7 +562,7 @@ class TestTaskRepo(test_utils.BaseTestCase):
[self.db.task_create(None, task) for task in self.tasks]
def test_get(self):
task, task_details = self.task_repo.get_task_and_details(UUID1)
task, task_details = self.task_repo.get_task_stub_and_details(UUID1)
self.assertEqual(task.task_id, UUID1)
self.assertEqual(task.type, 'import')
self.assertEqual(task.status, 'pending')
@ -573,12 +573,12 @@ class TestTaskRepo(test_utils.BaseTestCase):
def test_get_not_found(self):
self.assertRaises(exception.NotFound,
self.task_repo.get_task_and_details,
self.task_repo.get_task_stub_and_details,
str(uuid.uuid4()))
def test_get_forbidden(self):
self.assertRaises(exception.NotFound,
self.task_repo.get_task_and_details,
self.task_repo.get_task_stub_and_details,
UUID4)
def test_list(self):
@ -635,24 +635,24 @@ class TestTaskRepo(test_utils.BaseTestCase):
self.task_repo.add(task, task_details)
retrieved_task, retrieved_task_details = \
self.task_repo.get_task_and_details(task.task_id)
self.task_repo.get_task_stub_and_details(task.task_id)
self.assertEqual(retrieved_task.updated_at, task.updated_at)
self.assertEqual(retrieved_task_details.task_id,
retrieved_task.task_id)
self.assertEqual(retrieved_task_details.input, task_details.input)
def test_save_task(self):
task, task_details = self.task_repo.get_task_and_details(UUID1)
task, task_details = self.task_repo.get_task_stub_and_details(UUID1)
original_update_time = task.updated_at
self.task_repo.save(task)
current_update_time = task.updated_at
self.assertTrue(current_update_time > original_update_time)
task, task_details = self.task_repo.get_task_and_details(UUID1)
task, task_details = self.task_repo.get_task_stub_and_details(UUID1)
self.assertEqual(task.updated_at, current_update_time)
def test_remove_task(self):
task, task_details = self.task_repo.get_task_and_details(UUID1)
task, task_details = self.task_repo.get_task_stub_and_details(UUID1)
self.task_repo.remove(task)
self.assertRaises(exception.NotFound,
self.task_repo.get_task_and_details,
self.task_repo.get_task_stub_and_details,
task.task_id)

View File

@ -23,7 +23,6 @@ from oslo.config import cfg
from glance.common import exception
from glance import domain
from glance.openstack.common import timeutils
import glance.tests.unit.utils as unittest_utils
import glance.tests.utils as test_utils
@ -308,11 +307,28 @@ class TestTaskFactory(test_utils.BaseTestCase):
task_type = 'import'
owner = TENANT1
task = self.task_factory.new_task(task_type, owner)
self.assertTrue(task.task_id is not None)
self.assertTrue(task.created_at is not None)
self.assertIsNotNone(task.task_id)
self.assertEqual('pending', task.status)
self.assertEqual(task_type, task.type)
self.assertEqual(owner, task.owner)
self.assertIsNone(task.expires_at)
self.assertIsNotNone(task.created_at)
self.assertEqual(task.created_at, task.updated_at)
self.assertIsNone(task.task_input)
self.assertIsNone(task.result)
self.assertIsNone(task.message)
def test_new_task_stub(self):
task_type = 'import'
owner = TENANT1
task = self.task_factory.new_task_stub(task_type, owner)
self.assertIsNotNone(task.task_id)
self.assertEqual('pending', task.status)
self.assertEqual(task_type, task.type)
self.assertEqual(owner, task.owner)
self.assertIsNone(task.expires_at)
self.assertIsNotNone(task.created_at)
self.assertEqual(task.created_at, task.updated_at)
self.assertEqual(task.status, 'pending')
self.assertEqual(task.owner, TENANT1)
def test_new_task_invalid_type(self):
task_type = 'blah'
@ -347,7 +363,6 @@ class TestTask(test_utils.BaseTestCase):
task_type = 'import'
owner = TENANT1
task_ttl = CONF.task.task_time_to_live
self.gateway = unittest_utils.FakeGateway()
self.task = self.task_factory.new_task(task_type,
owner,
task_time_to_live=task_ttl)
@ -364,7 +379,10 @@ class TestTask(test_utils.BaseTestCase):
owner=None,
expires_at=None,
created_at=timeutils.utcnow(),
updated_at=timeutils.utcnow()
updated_at=timeutils.utcnow(),
task_input=None,
result=None,
message=None
)
def test_validate_status_transition_from_pending(self):
@ -429,6 +447,8 @@ class TestTask(test_utils.BaseTestCase):
self.task.begin_processing()
self.task.succeed('{"location": "file://home"}')
self.assertEqual(self.task.status, 'success')
self.assertEqual(self.task.result, '{"location": "file://home"}')
self.assertEqual(self.task.message, None)
expected = (timeutils.utcnow() +
datetime.timedelta(hours=CONF.task.task_time_to_live))
self.assertEqual(
@ -442,6 +462,8 @@ class TestTask(test_utils.BaseTestCase):
self.task.begin_processing()
self.task.fail('{"message": "connection failed"}')
self.assertEqual(self.task.status, 'failure')
self.assertEqual(self.task.message, '{"message": "connection failed"}')
self.assertEqual(self.task.result, None)
expected = (timeutils.utcnow() +
datetime.timedelta(hours=CONF.task.task_time_to_live))
self.assertEqual(
@ -450,6 +472,49 @@ class TestTask(test_utils.BaseTestCase):
)
class TestTaskStub(test_utils.BaseTestCase):
def setUp(self):
super(TestTaskStub, self).setUp()
self.task_id = str(uuid.uuid4())
self.task_type = 'import'
self.owner = TENANT1
self.task_ttl = CONF.task.task_time_to_live
def test_task_stub_init(self):
self.task_factory = domain.TaskFactory()
task = domain.TaskStub(
self.task_id,
self.task_type,
'status',
self.owner,
'expires_at',
'created_at',
'updated_at',
task_time_to_live=self.task_ttl
)
self.assertEqual(self.task_id, task.task_id)
self.assertEqual(self.task_type, task.type)
self.assertEqual(self.owner, task.owner)
self.assertEqual('status', task.status)
self.assertEqual('expires_at', task.expires_at)
self.assertEqual('created_at', task.created_at)
self.assertEqual('updated_at', task.updated_at)
def test_task_stub_get_status(self):
status = 'pending'
task = domain.TaskStub(
self.task_id,
self.task_type,
status,
self.owner,
'expires_at',
'created_at',
'updated_at',
task_time_to_live=self.task_ttl
)
self.assertEqual(status, task.status)
class TestTaskDetails(test_utils.BaseTestCase):
def setUp(self):
super(TestTaskDetails, self).setUp()

View File

@ -63,10 +63,12 @@ class ImageRepoStub(object):
return ['images_from_list']
class TaskStub(glance.domain.Task):
class TaskStub(glance.domain.TaskStub):
def run(self, executor):
pass
class Task(glance.domain.Task):
def succeed(self, result):
pass
@ -402,17 +404,31 @@ class TestTaskNotifications(utils.BaseTestCase):
def setUp(self):
super(TestTaskNotifications, self).setUp()
self.task = TaskStub(
task_input = {"loc": "fake"}
self.task_stub = TaskStub(
task_id='aaa',
task_type='import',
status='pending',
owner=TENANT2,
expires_at=None,
created_at=DATETIME,
updated_at=DATETIME
updated_at=DATETIME,
)
self.task = Task(
task_id='aaa',
task_type='import',
status='pending',
owner=TENANT2,
expires_at=None,
created_at=DATETIME,
updated_at=DATETIME,
task_input=task_input,
result='res',
message='blah'
)
self.task_details = domain.TaskDetails(task_id=self.task.task_id,
task_input={"loc": "fake"},
task_input=task_input,
result='',
message='')
self.context = glance.context.RequestContext(
@ -431,6 +447,11 @@ class TestTaskNotifications(utils.BaseTestCase):
self.context,
self.notifier
)
self.task_stub_proxy = glance.notifier.TaskStubProxy(
self.task_stub,
self.context,
self.notifier
)
self.task_details_proxy = notifier.TaskDetailsProxy(self.task_details,
self.context,
self.notifier)
@ -443,7 +464,8 @@ class TestTaskNotifications(utils.BaseTestCase):
self.patcher.stop()
def test_task_create_notification(self):
self.task_repo_proxy.add(self.task_proxy, self.task_details_proxy)
self.task_repo_proxy.add(self.task_stub_proxy,
self.task_details_proxy)
output_logs = self.notifier.get_logs()
self.assertEqual(len(output_logs), 1)
output_log = output_logs[0]
@ -463,7 +485,7 @@ class TestTaskNotifications(utils.BaseTestCase):
def test_task_delete_notification(self):
now = timeutils.isotime()
self.task_repo_proxy.remove(self.task_proxy)
self.task_repo_proxy.remove(self.task_stub_proxy)
output_logs = self.notifier.get_logs()
self.assertEqual(len(output_logs), 1)
output_log = output_logs[0]
@ -486,7 +508,7 @@ class TestTaskNotifications(utils.BaseTestCase):
self.fail('Notification contained location field.')
def test_task_run_notification(self):
self.task_proxy.run(executor=None)
self.task_stub_proxy.run(executor=None)
output_logs = self.notifier.get_logs()
self.assertEqual(len(output_logs), 1)
output_log = output_logs[0]

View File

@ -85,7 +85,7 @@ class ImageMembershipStub(object):
class TaskRepoStub(object):
def get_task_and_details(self, *args, **kwargs):
def get_task_stub_and_details(self, *args, **kwargs):
return 'task_from_get', 'task_details_from_get'
def add(self, *args, **kwargs):
@ -386,7 +386,7 @@ class TestTaskPolicy(test_utils.BaseTestCase):
self.policy
)
self.assertRaises(exception.Forbidden,
task_repo.get_task_and_details,
task_repo.get_task_stub_and_details,
UUID1)
def test_get_task_allowed(self):
@ -397,7 +397,7 @@ class TestTaskPolicy(test_utils.BaseTestCase):
{},
self.policy
)
task, task_details = task_repo.get_task_and_details(UUID1)
task, task_details = task_repo.get_task_stub_and_details(UUID1)
self.assertIsInstance(task, glance.api.policy.TaskProxy)
self.assertEqual(task.task, 'task_from_get')

View File

@ -72,7 +72,7 @@ def _domain_fixture(task_id, **kwargs):
'created_at': kwargs.get('created_at', default_datetime),
'updated_at': kwargs.get('updated_at', default_datetime),
}
task = glance.domain.Task(**task_properties)
task = glance.domain.TaskStub(**task_properties)
task_details = glance.domain.TaskDetails(task_id,
kwargs.get('input', {}),
kwargs.get('message', None),