Add update queue function in v2

This patch did:
1. Add two commands: "set/get metadata" in API v2.
2. As v1.1 and v2 don't contain the queue.exist() function,
and the queue exist check has been done in server side. We
should only use it in v1.0.
3. As v1.1 doesn't support PATCH in zaqar server side,
we should not allow set metadata in v1.1.

DocImpact

Closes-bug: #1554326
Change-Id: I01b523ece09e87689516ecccf0c2c7795db46bb7
This commit is contained in:
wangxiyuan 2016-03-18 10:37:32 +08:00
parent 1ef277f7a0
commit 2854847fb4
9 changed files with 244 additions and 12 deletions

View File

@ -73,6 +73,8 @@ openstack.messaging.v2 =
queue_create = zaqarclient.queues.v2.cli:CreateQueue
queue_delete = zaqarclient.queues.v2.cli:DeleteQueue
queue_stats = zaqarclient.queues.v2.cli:GetQueueStats
queue_set_metadata = zaqarclient.queues.v2.cli:SetQueueMetadata
queue_get_metadata = zaqarclient.queues.v2.cli:GetQueueMetadata
pool_create = zaqarclient.queues.v2.cli:CreatePool
pool_show = zaqarclient.queues.v2.cli:ShowPool
pool_update = zaqarclient.queues.v2.cli:UpdatePool

View File

@ -78,6 +78,11 @@ _CLIENTS = {1: cv1.Client,
def Client(url=None, version=None, conf=None):
# NOTE: Please don't mix use the Client object with different version at
# the same time. Because the cache mechanism of queue's metadata will lead
# to unexpected response value.
# Please see zaqarclient.queues.v1.queues.Queue.metadata and
# zaqarclient.queues.v2.queues.Queue.metadata for more detail.
try:
return _CLIENTS[version](url, version, conf)
except KeyError:

View File

@ -144,9 +144,8 @@ class SetQueueMetadata(command.Command):
client = _get_client(self, parsed_args)
queue_name = parsed_args.queue_name
queue_metadata = parsed_args.queue_metadata
queue_exists = client.queue(queue_name, auto_create=False).exists()
if not queue_exists:
if (client.api_version == 1 and
not client.queue(queue_name, auto_create=False).exists()):
raise RuntimeError("Queue(%s) does not exist." % queue_name)
try:
@ -177,7 +176,7 @@ class GetQueueMetadata(show.ShowOne):
queue_name = parsed_args.queue_name
queue = client.queue(queue_name, auto_create=False)
if not queue.exists():
if client.api_version == 1 and not queue.exists():
raise RuntimeError("Queue(%s) does not exist." % queue_name)
columns = ("Metadata",)

View File

@ -97,13 +97,10 @@ class Queue(object):
# NOTE(jeffrey4l): Ensure that metadata is cleared when the new_meta
# is a empty dict.
if new_meta is not None:
if self.client.api_version < 1.1:
core.queue_set_metadata(trans, req, self._name, new_meta)
elif not len(new_meta):
# if metadata is empty dict, clear existing metadata
core.queue_create(trans, req, self._name, metadata=new_meta)
else:
core.queue_update(trans, req, self._name, metadata=new_meta)
if self.client.api_version == 1.1:
raise RuntimeError("V1.1 doesn't support to set the queue's "
"metadata. Please use V1.0 or V2.")
core.queue_set_metadata(trans, req, self._name, new_meta)
self._metadata = new_meta
# TODO(flaper87): Cache with timeout

View File

@ -48,6 +48,16 @@ class GetQueueStats(cli.GetQueueStats):
pass
class SetQueueMetadata(cli.SetQueueMetadata):
"""Set queue metadata"""
pass
class GetQueueMetadata(cli.GetQueueMetadata):
"""Get queue metadata"""
pass
class CreatePool(cli.CreatePool):
"""Create a pool"""
pass

View File

@ -63,6 +63,31 @@ claim_update = core.claim_update
claim_delete = core.claim_delete
def queue_update(transport, request, name, metadata, callback=None):
"""Updates a queue's metadata using PATCH for API v2
:param transport: Transport instance to use
:type transport: `transport.base.Transport`
:param request: Request instance ready to be sent.
:type request: `transport.request.Request`
:param name: Queue reference name.
:type name: `six.text_type`
:param metadata: Queue's metadata object.
:type metadata: `list`
:param callback: Optional callable to use as callback.
If specified, this request will be sent asynchronously.
(IGNORED UNTIL ASYNC SUPPORT IS COMPLETE)
:type callback: Callable object.
"""
request.operation = 'queue_update'
request.params['queue_name'] = name
request.content = json.dumps(metadata)
resp = transport.send(request)
return resp.deserialized_content
def signed_url_create(transport, request, queue_name, paths=None,
ttl_seconds=None, project_id=None, methods=None):
"""Creates a signed URL given a queue name

View File

@ -33,6 +33,56 @@ class Queue(queues.Queue):
marker=marker,
limit=limit)
def metadata(self, new_meta=None, force_reload=False):
"""Get metadata and return it
:param new_meta: A dictionary containing
an updated metadata object. If present
the queue metadata will be updated in
remote server. If the new_meta is empty,
the metadata object will be cleared.
:type new_meta: `dict`
:param force_reload: Whether to ignored the
cached metadata and reload it from the
server.
:type force_reload: `bool`
:returns: The queue metadata.
"""
req, trans = self.client._request_and_transport()
# TODO(flaper87): Cache with timeout
if new_meta is None and self._metadata and not force_reload:
return self._metadata
else:
self._metadata = core.queue_get(trans, req, self._name)
if new_meta is not None:
temp_metadata = self._metadata.copy()
changes = []
for key, value in new_meta.items():
# If key exists, replace it's value.
if self._metadata.get(key, None):
changes.append({'op': 'replace',
'path': '/metadata/%s' % key,
'value': value})
temp_metadata.pop(key)
# If not, add the new key.
else:
changes.append({'op': 'add',
'path': '/metadata/%s' % key,
'value': value})
# For the keys which are not included in the new metadata, remove
# them.
for key, value in temp_metadata.items():
changes.append({'op': 'remove',
'path': '/metadata/%s' % key})
self._metadata = core.queue_update(trans, req, self._name,
metadata=changes)
return self._metadata
def create_object(parent):
return lambda args: Queue(parent, args["name"], auto_create=False)

View File

@ -422,6 +422,20 @@ class QueuesV1_1QueueUnitTest(QueuesV1QueueUnitTest):
# just checking our way down to the transport
# doesn't crash.
def test_queue_metadata(self):
test_metadata = {'type': 'Bank Accounts'}
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, json.dumps(test_metadata))
send_method.return_value = resp
self.assertRaises(RuntimeError, self.queue.metadata, test_metadata)
def test_queue_metadata_update(self):
# v1.1 doesn't support set queue metadata
pass
class QueuesV1_1QueueFunctionalTest(QueuesV1QueueFunctionalTest):
@ -464,6 +478,14 @@ class QueuesV1_1QueueFunctionalTest(QueuesV1QueueFunctionalTest):
remaining = queue.messages(echo=True)
self.assertEqual(1, len(list(remaining)))
def test_queue_metadata_functional(self):
# v1.1 doesn't support set queue metadata
pass
def test_queue_metadata_reload_functional(self):
# v1.1 doesn't support set queue metadata
pass
class QueuesV2QueueUnitTest(QueuesV1_1QueueUnitTest):
@ -497,6 +519,45 @@ class QueuesV2QueueUnitTest(QueuesV1_1QueueUnitTest):
self.assertIn('http://trigger.me', subscriber_list)
self.assertIn('http://trigger.you', subscriber_list)
def test_queue_metadata(self):
# checked in "test_queue_metadata_update"
pass
def test_queue_metadata_update(self):
test_metadata = {'type': 'Bank Accounts', 'name': 'test1'}
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, json.dumps(test_metadata))
send_method.return_value = resp
# add 'test_metadata'
metadata = self.queue.metadata(new_meta=test_metadata)
self.assertEqual(test_metadata, metadata)
new_metadata_replace = {'type': 'test', 'name': 'test1'}
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, json.dumps(new_metadata_replace))
send_method.return_value = resp
# repalce 'type'
metadata = self.queue.metadata(
new_meta=new_metadata_replace)
expect_metadata = {'type': 'test', "name": 'test1'}
self.assertEqual(expect_metadata, metadata)
remove_metadata = {'name': 'test1'}
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, json.dumps(remove_metadata))
send_method.return_value = resp
# remove 'type'
metadata = self.queue.metadata(new_meta=remove_metadata)
expect_metadata = {"name": 'test1'}
self.assertEqual(expect_metadata, metadata)
class QueuesV2QueueFunctionalTest(QueuesV1_1QueueFunctionalTest):
@ -535,3 +596,80 @@ class QueuesV2QueueFunctionalTest(QueuesV1_1QueueFunctionalTest):
get_subscriptions = queue.subscriptions()
self.assertTrue(isinstance(get_subscriptions, iterator._Iterator))
self.assertEqual(2, len(list(get_subscriptions)))
def test_queue_metadata_reload_functional(self):
test_metadata = {'type': 'Bank Accounts', 'name': 'test1'}
queue = self.client.queue("meta-test", force_create=True)
self.addCleanup(queue.delete)
queue.metadata(new_meta=test_metadata)
# NOTE(flaper87): Overwrite the cached value
# but don't clear it.
queue._metadata = 'test'
expect_metadata = {'type': 'Bank Accounts', 'name': 'test1',
'_max_messages_post_size': 262144,
'_default_message_ttl': 3600}
metadata = queue.metadata(force_reload=True)
self.assertEqual(expect_metadata, metadata)
def test_queue_metadata_functional(self):
queue = self.client.queue("meta-test", force_create=True)
self.addCleanup(queue.delete)
# add two metadatas
test_metadata = {'type': 'Bank Accounts', 'name': 'test1'}
queue.metadata(new_meta=test_metadata)
# NOTE(flaper87): Clear metadata's cache
queue._metadata = None
metadata = queue.metadata()
expect_metadata = {'type': 'Bank Accounts', 'name': 'test1',
'_max_messages_post_size': 262144,
'_default_message_ttl': 3600}
self.assertEqual(expect_metadata, metadata)
# replace 'type', '_default_message_ttl' and add a new one 'age'
replace_add_metadata = {'type': 'test', 'name': 'test1', 'age': 13,
'_default_message_ttl': 1000}
queue.metadata(new_meta=replace_add_metadata)
queue._metadata = None
metadata = queue.metadata()
expect_metadata = {'type': 'test', 'name': 'test1', 'age': 13,
'_max_messages_post_size': 262144,
'_default_message_ttl': 1000}
self.assertEqual(expect_metadata, metadata)
# replace 'name', remove 'type', '_default_message_ttl' and add a new
# one 'fake'.
replace_remove_add_metadata = {'name': 'test2',
'age': 13,
'fake': 'test_fake',
}
queue.metadata(new_meta=replace_remove_add_metadata)
queue._metadata = None
metadata = queue.metadata()
expect_metadata = {'name': 'test2', 'age': 13, 'fake': 'test_fake',
'_max_messages_post_size': 262144,
'_default_message_ttl': 3600}
self.assertEqual(expect_metadata, metadata)
# replace 'name' to empty string and add a new empty dict 'empty_dict'.
replace_add_metadata = {'name': '',
'age': 13,
'fake': 'test_fake',
'empty_dict': {}
}
queue.metadata(new_meta=replace_add_metadata)
queue._metadata = None
metadata = queue.metadata()
expect_metadata = {'name': '', 'age': 13, 'fake': 'test_fake',
'_max_messages_post_size': 262144,
'_default_message_ttl': 3600, 'empty_dict': {}}
self.assertEqual(expect_metadata, metadata)
# Delete all metadata.
remove_all = {}
queue.metadata(new_meta=remove_all)
queue._metadata = None
metadata = queue.metadata()
expect_metadata = {'_max_messages_post_size': 262144,
'_default_message_ttl': 3600}
self.assertEqual(expect_metadata, metadata)

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from distutils.version import LooseVersion
import json
from zaqarclient.common import http
@ -88,7 +89,12 @@ class HttpTransport(base.Transport):
# NOTE(flape87): Do not modify
# request's headers directly.
headers = request.headers.copy()
headers['content-type'] = 'application/json'
if (request.operation == 'queue_update' and
LooseVersion(request.api.label) >= LooseVersion('v2')):
headers['content-type'] = \
'application/openstack-messaging-v2.0-json-patch'
else:
headers['content-type'] = 'application/json'
resp = self.client.request(method,
url=url,