Fix queues actions for pooling

Based on current design, Zaqar is storing queues in management database
instead of message database. As a result, it's not necessary now to look
up pools to get the correct queue controller. This patch fixes the pooling
issue and the test.

Closes-Bug: #1679888

Depends-On: I5195911d22d5935aa3de105fe735ac159276a025

Co-Authored-By: wangxiyuan<wangxiyuan@huawei.com>
Change-Id: Ie20e0c611dc0756818e4612f33ebc2ab2ecd2990
This commit is contained in:
Fei Long Wang 2017-04-05 14:57:50 +12:00 committed by wangxiyuan
parent c9bbd0b1bc
commit fe4f1e59ae
11 changed files with 71 additions and 37 deletions

View File

@ -159,7 +159,7 @@ class DataDriver(storage.DataDriverBase):
class QueueController(storage.Queue):
"""Routes operations to a queue controller in the appropriate pool.
"""Routes operations to get the appropriate queue controller.
:param pool_catalog: a catalog of available pools
:type pool_catalog: queues.pooling.base.Catalog
@ -168,16 +168,14 @@ class QueueController(storage.Queue):
def __init__(self, pool_catalog):
super(QueueController, self).__init__(None)
self._pool_catalog = pool_catalog
self._mgt_queue_ctrl = self._pool_catalog.control.queue_controller
self._get_controller = self._pool_catalog.get_queue_controller
def _list(self, project=None, marker=None,
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False):
def all_pages():
pool = self._pool_catalog.get_default_pool()
if pool is None:
raise errors.NoPoolFound()
yield next(pool.queue_controller.list(
yield next(self._mgt_queue_ctrl.list(
project=project,
marker=marker,
limit=limit,
@ -218,17 +216,15 @@ class QueueController(storage.Queue):
# however. If between the time we register a queue and go to
# look it up, the queue is deleted, then this assertion will
# fail.
control = self._get_controller(name, project)
if not control:
pool = self._pool_catalog.lookup(name, project)
if not pool:
raise RuntimeError('Failed to register queue')
return control.create(name, metadata=metadata, project=project)
return self._mgt_queue_ctrl.create(name, metadata=metadata,
project=project)
def _delete(self, name, project=None):
# NOTE(cpp-cabrera): If we fail to find a project/queue in the
# catalogue for a delete, just ignore it.
control = self._get_controller(name, project)
if control:
mqHandler = self._get_controller(name, project)
if mqHandler:
# NOTE(cpp-cabrera): delete from the catalogue first. If
# zaqar crashes in the middle of these two operations,
# it is desirable that the entry be missing from the
@ -239,34 +235,24 @@ class QueueController(storage.Queue):
# latter case is more difficult to reason about, and may
# yield 500s in some operations.
self._pool_catalog.deregister(name, project)
ret = control.delete(name, project)
return ret
mqHandler.delete(name, project)
return None
return self._mgt_queue_ctrl.delete(name, project)
def _exists(self, name, project=None):
control = self._get_controller(name, project)
if control:
return control.exists(name, project=project)
return False
return self._mgt_queue_ctrl.exists(name, project=project)
def get_metadata(self, name, project=None):
control = self._get_controller(name, project)
if control:
return control.get_metadata(name, project=project)
raise errors.QueueDoesNotExist(name, project)
return self._mgt_queue_ctrl.get_metadata(name, project=project)
def set_metadata(self, name, metadata, project=None):
control = self._get_controller(name, project)
if control:
return control.set_metadata(name, metadata=metadata,
project=project)
raise errors.QueueDoesNotExist(name, project)
return self._mgt_queue_ctrl.set_metadata(name, metadata=metadata,
project=project)
def _stats(self, name, project=None):
control = self._get_controller(name, project)
if control:
return control.stats(name, project=project)
mqHandler = self._get_controller(name, project)
if mqHandler:
return mqHandler.stats(name, project=project)
raise errors.QueueDoesNotExist(name, project)

View File

@ -145,6 +145,9 @@ class ClaimController(storage.Claim):
return claim_id, claimed
def update(self, queue, claim_id, metadata, project=None):
if not self._queue_ctrl.exists(queue, project):
raise errors.QueueDoesNotExist(queue, project)
container = utils._claim_container(queue, project)
try:
headers, obj = self._client.get_object(container, claim_id)

View File

@ -312,7 +312,13 @@ class MessageQueueHandler(object):
total = 0
claimed = 0
container = utils._message_container(name, project)
_, objects = self._client.get_container(container)
try:
_, objects = self._client.get_container(container)
except swiftclient.ClientException as exc:
if exc.http_status == 404:
raise errors.QueueIsEmpty(name, project)
newest = None
oldest = None
now = timeutils.utcnow_ts(True)

View File

@ -15,3 +15,6 @@ database = zaqar_test_pooled
[drivers:management_store:mongodb]
uri = mongodb://127.0.0.1:27017
database = zaqar_test
[pooling:catalog]
enable_virtual_pool = True

View File

@ -0,0 +1,20 @@
[DEFAULT]
pooling = True
admin_mode = True
unreliable = True
enable_deprecated_api_versions = 1,1.1
[drivers]
transport = wsgi
message_store = mongodb
[drivers:message_store:mongodb]
uri = mongodb://127.0.0.1:27017
database = zaqar_test_pooled
[drivers:management_store:mongodb]
uri = mongodb://127.0.0.1:27017
database = zaqar_test
[pooling:catalog]
enable_virtual_pool = False

View File

@ -14,4 +14,7 @@ reconnect_sleep = 1
[drivers:management_store:redis]
uri = redis://127.0.0.1:6379
max_reconnect_attempts = 3
reconnect_sleep = 1
reconnect_sleep = 1
[pooling:catalog]
enable_virtual_pool = True

View File

@ -12,3 +12,5 @@ bind = 0.0.0.0
port = 8888
workers = 20
[pooling:catalog]
enable_virtual_pool = True

View File

@ -29,7 +29,7 @@ from zaqar import tests as testing
@testing.requires_mongodb
class PoolCatalogTest(testing.TestBase):
config_file = 'wsgi_mongodb_pooled.conf'
config_file = 'wsgi_mongodb_pooled_disable_virtual_pool.conf'
def setUp(self):
super(PoolCatalogTest, self).setUp()

View File

@ -54,6 +54,15 @@ class Resource(object):
resp.body = utils.to_json(resp_dict)
# status defaults to 200
except storage_errors.QueueIsEmpty as ex:
resp_dict = {
'messages': {
'claimed': 0,
'free': 0,
'total': 0
}
}
resp.body = utils.to_json(resp_dict)
except storage_errors.DoesNotExist as ex:
LOG.debug(ex)
raise wsgi_errors.HTTPNotFound(six.text_type(ex))

View File

@ -53,7 +53,8 @@ class Resource(object):
resp.body = utils.to_json(resp_dict)
# status defaults to 200
except storage_errors.QueueDoesNotExist as ex:
except (storage_errors.QueueDoesNotExist,
storage_errors.QueueIsEmpty) as ex:
resp_dict = {
'messages': {
'claimed': 0,

View File

@ -57,7 +57,8 @@ class Resource(object):
resp.body = utils.to_json(resp_dict)
# status defaults to 200
except storage_errors.QueueDoesNotExist as ex:
except (storage_errors.QueueDoesNotExist,
storage_errors.QueueIsEmpty) as ex:
resp_dict = {
'messages': {
'claimed': 0,