diff --git a/examples/simple.py b/examples/simple.py index e1216a4c..0b6a90b2 100644 --- a/examples/simple.py +++ b/examples/simple.py @@ -31,9 +31,14 @@ def create_post_delete(queue_name, messages): cli = client.Client(URL) queue = cli.queue(queue_name) queue.post(messages) + + for msg in queue.messages(echo=True): + print(msg.body) + msg.delete() + queue.delete() if __name__ == '__main__': - messages = [{'body': {'id': idx}, 'ttl': 60} + messages = [{'body': {'id': idx}, 'ttl': 360} for idx in range(20)] create_post_delete('my_queue', messages) diff --git a/marconiclient/queues/v1/api.py b/marconiclient/queues/v1/api.py index e0c8e3e5..62c4ea80 100644 --- a/marconiclient/queues/v1/api.py +++ b/marconiclient/queues/v1/api.py @@ -18,6 +18,8 @@ from marconiclient.transport import api class V1(api.Api): + label = 'v1' + schema = { 'queue_list': { 'ref': 'queues', diff --git a/marconiclient/queues/v1/client.py b/marconiclient/queues/v1/client.py index c0fcff51..b7cb6f8d 100644 --- a/marconiclient/queues/v1/client.py +++ b/marconiclient/queues/v1/client.py @@ -88,3 +88,14 @@ class Client(object): :rtype: `queues.Queue` """ return queues.Queue(self, ref, **kwargs) + + def follow(self, ref): + """Follows ref. + + :params ref: The reference path. + :type ref: `six.text_type` + """ + req, trans = self._request_and_transport() + req.ref = ref + + return trans.send(req).deserialized_content diff --git a/marconiclient/queues/v1/message.py b/marconiclient/queues/v1/message.py index f810b939..9d6ac3e6 100644 --- a/marconiclient/queues/v1/message.py +++ b/marconiclient/queues/v1/message.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013 Rackspace, Inc. +# Copyright (c) 2013 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,63 +14,99 @@ # limitations under the License. """Implements a message controller that understands Marconi messages.""" - -def _args_from_dict(msg): - return { - 'href': msg['href'], - 'ttl': msg['ttl'], - 'age': msg['age'], - 'body': msg['body'] - } +from marconiclient.queues.v1 import core -def from_dict(msg, connection=None): - """from_dict(dict, Connection) => Message - :param msg: A dictionary created by decoding a Marconi message JSON reply - :param connection: A connection to a Marconi server. - :raises: KeyError If msg is missing fields - :raises: TypeError if msg is not a dict. +# NOTE(flaper87): Consider moving the +# iterator into a common package. +class _MessageIterator(object): + """Message iterator + + This iterator is not meant to be used outside + the scope of this package. The iterator gets + a dictionary as returned by the message listing + endpoint and iterates over the messages in the + `messages` key. + + If there are no messages left to return, the iterator + will try to load more by following the `next` rel link + type. + + The iterator raises a StopIteration exception if the server + doesn't return more messages after a `next-page` call. + + :param client: The client instance used by the queue + :type client: `v1.Client` + :param messages: Response returned by the messages listing call + :type messages: Dict """ - return Message( - connection=connection, - **_args_from_dict(msg) - ) + + def __init__(self, queue, messages): + self._queue = queue + + # NOTE(flaper87): Simple hack to + # re-use the iterator for get_many_messages + # and message listing. + self._links = [] + self._messages = messages + + if isinstance(messages, dict): + self._links = messages['links'] + self._messages = messages['messages'] + + def __iter__(self): + return self + + def _next_page(self): + for link in self._links: + if link['rel'] == 'next': + # NOTE(flaper87): We already have the + # ref for the next set of messages, lets + # just follow it. + messages = self._queue.client.follow(link['href']) + + # NOTE(flaper87): Since we're using + # `.follow`, the empty result will + # be None. Consider making the API + # return an empty dict for consistency. + if messages: + self._links = messages['links'] + self._messages = messages['messages'] + return + raise StopIteration + + def __next__(self): + try: + msg = self._messages.pop(0) + except IndexError: + self._next_page() + return self.next() + return Message(self._queue, **msg) + + # NOTE(flaper87): Py2K support + next = __next__ class Message(object): """A handler for Marconi server Message resources. Attributes are only downloaded once - at creation time. """ - def __init__(self, href, ttl, age, body, connection): + def __init__(self, queue, href, ttl, age, body): + self.queue = queue self.href = href self.ttl = ttl self.age = age self.body = body - self._connection = connection - self._deleted = False + + # NOTE(flaper87): Is this really + # necessary? Should this be returned + # by Marconi? + self._id = href.split('/')[-1] def __repr__(self): - return '' % (self.ttl,) - - def _assert_not_deleted(self): - assert not self._deleted, 'Already deleted' - - def reload(self): - """Queries the server and updates all local attributes - with new values. - """ - self._assert_not_deleted() - msg = self._connection.get(self.href).json() - - self.href = msg['href'] - self.ttl = msg['ttl'] - self.age = msg['age'] - self.body = msg['body'] + return ''.format(id=self._id, + ttl=self.ttl) def delete(self): - """Deletes this resource from the server, but leaves the local - object intact. - """ - self._assert_not_deleted() - self._connection.delete(self.href) - self._deleted = True + req, trans = self.queue.client._request_and_transport() + core.message_delete(trans, req, self.queue._name, self._id) diff --git a/marconiclient/queues/v1/queues.py b/marconiclient/queues/v1/queues.py index 376f6985..62fbef80 100644 --- a/marconiclient/queues/v1/queues.py +++ b/marconiclient/queues/v1/queues.py @@ -14,15 +14,16 @@ # limitations under the License. from marconiclient.queues.v1 import core +from marconiclient.queues.v1 import message class Queue(object): - def __init__(self, client, queue_id, auto_create=True): + def __init__(self, client, queue_name, auto_create=True): self.client = client # NOTE(flaper87) Queue Info - self._id = queue_id + self._name = queue_name self._metadata = None if auto_create: @@ -31,7 +32,7 @@ class Queue(object): def exists(self): """Checks if the queue exists.""" req, trans = self.client._request_and_transport() - return core.queue_exists(trans, req, self._id) + return core.queue_exists(trans, req, self._name) def ensure_exists(self): """Ensures a queue exists @@ -41,7 +42,7 @@ class Queue(object): right after it was called. """ req, trans = self.client._request_and_transport() - core.queue_create(trans, req, self._id) + core.queue_create(trans, req, self._name) def metadata(self, new_meta=None, force_reload=False): """Get metadata and return it @@ -61,19 +62,19 @@ class Queue(object): req, trans = self.client._request_and_transport() if new_meta: - core.queue_set_metadata(trans, req, self._id, new_meta) + core.queue_set_metadata(trans, req, self._name, new_meta) self._metadata = new_meta # TODO(flaper87): Cache with timeout if self._metadata and not force_reload: return self._metadata - self._metadata = core.queue_get_metadata(trans, req, self._id) + self._metadata = core.queue_get_metadata(trans, req, self._name) return self._metadata def delete(self): req, trans = self.client._request_and_transport() - core.queue_delete(trans, req, self._id) + core.queue_delete(trans, req, self._name) # Messages API @@ -93,7 +94,7 @@ class Queue(object): # TODO(flaper87): Return a list of messages return core.message_post(trans, req, - self._id, messages) + self._name, messages) def message(self, message_id): """Gets a message by id @@ -105,8 +106,9 @@ class Queue(object): :rtype: `dict` """ req, trans = self.client._request_and_transport() - return core.message_get(trans, req, self._id, - message_id) + msg = core.message_get(trans, req, self._name, + message_id) + return message.Message(self, **msg) def messages(self, *messages, **params): """Gets a list of messages from the server @@ -135,12 +137,14 @@ class Queue(object): # and messages deserialization. if messages: - return core.message_get_many(trans, req, - self._id, messages) + msgs = core.message_get_many(trans, req, + self._name, messages) + else: + # NOTE(flaper87): It's safe to access messages + # directly. If something wrong happens, the core + # API will raise the right exceptions. + msgs = core.message_list(trans, req, + self._name, + **params) - # NOTE(flaper87): It's safe to access messages - # directly. If something wrong happens, the core - # API will raise the right exceptions. - return core.message_list(trans, req, - self._id, - **params)['messages'] + return message._MessageIterator(self, msgs) diff --git a/marconiclient/tests/queues/base.py b/marconiclient/tests/queues/base.py index d529264d..f6abdbee 100644 --- a/marconiclient/tests/queues/base.py +++ b/marconiclient/tests/queues/base.py @@ -17,10 +17,15 @@ import mock from marconiclient.queues import client from marconiclient.tests import base +from marconiclient.tests.transport import dummy class QueuesTestBase(base.TestBase): + transport_cls = dummy.DummyTransport + url = 'http://127.0.0.1:8888/v1' + version = 1 + def setUp(self): super(QueuesTestBase, self).setUp() self.transport = self.transport_cls(self.conf) diff --git a/marconiclient/tests/queues/messages.py b/marconiclient/tests/queues/messages.py new file mode 100644 index 00000000..d2d4664c --- /dev/null +++ b/marconiclient/tests/queues/messages.py @@ -0,0 +1,42 @@ +# Copyright (c) 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import mock + +from marconiclient.tests.queues import base +from marconiclient.transport import response + + +class QueuesV1MessageUnitTest(base.QueuesTestBase): + + def test_message_delete(self): + returned = { + 'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b01', + 'ttl': 800, + 'age': 790, + 'body': {'event': 'ActivateAccount', 'mode': 'active'} + } + + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + + resp = response.Response(None, json.dumps(returned)) + send_method.return_value = resp + + msg = self.queue.message('50b68a50d6f5b8c8a7c62b01') + + send_method.return_value = None + self.assertIsNone(msg.delete()) diff --git a/marconiclient/tests/queues/queues.py b/marconiclient/tests/queues/queues.py index f3c19baa..d820f991 100644 --- a/marconiclient/tests/queues/queues.py +++ b/marconiclient/tests/queues/queues.py @@ -16,6 +16,7 @@ import json import mock +from marconiclient.queues.v1 import message from marconiclient.tests.queues import base from marconiclient.transport import response @@ -113,7 +114,8 @@ class QueuesV1QueueUnitTest(base.QueuesTestBase): resp = response.Response(None, json.dumps(returned)) send_method.return_value = resp - self.queue.messages(limit=1) + msgs = self.queue.messages(limit=1) + self.assertIsInstance(msgs, message._MessageIterator) # NOTE(flaper87): Nothing to assert here, # just checking our way down to the transport @@ -133,8 +135,8 @@ class QueuesV1QueueUnitTest(base.QueuesTestBase): resp = response.Response(None, json.dumps(returned)) send_method.return_value = resp - msg = self.queue.message('50b68a50d6f5b8c8a7c62b01') - self.assertTrue(isinstance(msg, dict)) + msgs = self.queue.message('50b68a50d6f5b8c8a7c62b01') + self.assertIsInstance(msgs, message.Message) # NOTE(flaper87): Nothing to assert here, # just checking our way down to the transport @@ -161,7 +163,7 @@ class QueuesV1QueueUnitTest(base.QueuesTestBase): msg = self.queue.messages('50b68a50d6f5b8c8a7c62b01', '50b68a50d6f5b8c8a7c62b02') - self.assertTrue(isinstance(msg, list)) + self.assertIsInstance(msg, message._MessageIterator) # NOTE(flaper87): Nothing to assert here, # just checking our way down to the transport diff --git a/marconiclient/tests/transport/api.py b/marconiclient/tests/transport/api.py index 926ed4e0..66ee4234 100644 --- a/marconiclient/tests/transport/api.py +++ b/marconiclient/tests/transport/api.py @@ -17,6 +17,8 @@ from marconiclient.transport import api class FakeApi(api.Api): + label = 'v1' + schema = { 'test_operation': { 'ref': 'test/{name}', diff --git a/marconiclient/transport/api.py b/marconiclient/transport/api.py index 7cefc13b..4c6f9a19 100644 --- a/marconiclient/transport/api.py +++ b/marconiclient/transport/api.py @@ -22,6 +22,7 @@ from marconiclient import errors class Api(object): schema = {} + label = None validators = {} def get_schema(self, operation): diff --git a/marconiclient/transport/http.py b/marconiclient/transport/http.py index 835e858d..20525dc3 100644 --- a/marconiclient/transport/http.py +++ b/marconiclient/transport/http.py @@ -45,9 +45,19 @@ class HttpTransport(base.Transport): # happen before any other operation here. # request.validate() - schema = request.api.get_schema(request.operation) - ref = schema.get('ref', '') + schema = {} ref_params = {} + ref = request.ref + + if request.operation: + schema = request.api.get_schema(request.operation) + ref = ref or schema.get('ref', '') + + # FIXME(flaper87): We expect the endpoint + # to have the API version label already, + # however in a follow-your-nose implementation + # it should be the other way around. + ref = ref.lstrip('/' + request.api.label) for param in list(request.params.keys()): if '{{{0}}}'.format(param) in ref: diff --git a/marconiclient/transport/request.py b/marconiclient/transport/request.py index 4edb70e0..62a4d4d6 100644 --- a/marconiclient/transport/request.py +++ b/marconiclient/transport/request.py @@ -77,7 +77,7 @@ class Request(object): """ def __init__(self, endpoint='', operation='', - content=None, params=None, + ref='', content=None, params=None, headers=None, api=None): self._api = None @@ -85,6 +85,7 @@ class Request(object): self.endpoint = endpoint self.operation = operation + self.ref = ref self.content = content self.params = params or {} self.headers = headers or {} diff --git a/tests/unit/queues/v1/test_message.py b/tests/unit/queues/v1/test_message.py index ee67f38e..3fc2b365 100644 --- a/tests/unit/queues/v1/test_message.py +++ b/tests/unit/queues/v1/test_message.py @@ -12,61 +12,65 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -import unittest +import json import mock from marconiclient.queues.v1 import message -from marconiclient.tests.mock import message as mock_message +from marconiclient.tests.queues import base +from marconiclient.tests.queues import messages as test_message +from marconiclient.transport import http +from marconiclient.transport import response -HREF = '/v1/queue/dgq/messages/my_msg_is_chocolate' -AGE = 100 -TTL = 120 +class TestMessageIterator(base.QueuesTestBase): + + def test_no_next_iteration(self): + messages = {'links': [], + 'messages': [{ + 'href': '/v1/queues/mine/messages/123123423', + 'ttl': 800, + 'age': 790, + 'body': {'event': 'ActivateAccount', + 'mode': 'active'} + }] + } + + iterator = message._MessageIterator(self.queue, messages) + iterated = [msg for msg in iterator] + self.assertEqual(len(iterated), 1) + + def test_next_page(self): + messages = {'links': [], + 'messages': [{ + 'href': '/v1/queues/mine/messages/123123423', + 'ttl': 800, + 'age': 790, + 'body': {'event': 'ActivateAccount', + 'mode': 'active'} + }] + } + + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + + resp = response.Response(None, json.dumps(messages)) + send_method.return_value = resp + + # NOTE(flaper87): The first iteration will return 1 message + # and then call `_next_page` which will use the rel-next link + # to get a new set of messages. + link = {'rel': 'next', + 'href': "/v1/queues/mine/messages?marker=6244-244224-783"} + messages['links'].append(link) + + iterator = message._MessageIterator(self.queue, messages) + iterated = [msg for msg in iterator] + self.assertEqual(len(iterated), 2) -class TestSimpleMessage(unittest.TestCase): - def setUp(self): - msg_body = { - 'href': HREF, - 'ttl': TTL, - 'age': AGE, - 'body': {'name': 'chocolate'} - } - self.conn = mock.MagicMock() - self.msg = message.from_dict(msg_body, connection=self.conn) +class QueuesV1MessageHttpUnitTest(test_message.QueuesV1MessageUnitTest): - def _attr_check(self, xhref, xttl, xage, xbody): - self.assertEqual(self.msg.href, xhref) - self.assertEqual(self.msg.ttl, xttl) - self.assertEqual(self.msg.age, xage) - self.assertEqual(self.msg.body, xbody) - - def test_attributes_match_expected(self): - self._attr_check(xhref=HREF, xttl=TTL, xage=AGE, - xbody={'name': 'chocolate'}) - - def test_repr_matches_expected(self): - self.assertEqual(repr(self.msg), - '' % (self.msg.ttl,)) - - def test_delete_works(self): - self.msg.delete() - - def test_reload_works(self): - msg = mock_message.message( - href=HREF, ttl=TTL - 1, age=AGE + 1, - body={'name': 'vanilla'}) - self.conn.get.return_value = mock.MagicMock() - self.conn.get.return_value.json.return_value = msg - self.msg.reload() - self._attr_check(xhref=HREF, xttl=TTL - 1, xage=AGE + 1, - xbody={'name': 'vanilla'}) - - def test_reload_after_delete_throws(self): - self.msg.delete() - self.assertRaises(AssertionError, self.msg.reload) - - def test_delete_after_delete_throws(self): - self.msg.delete() - self.assertRaises(AssertionError, self.msg.delete) + transport_cls = http.HttpTransport + url = 'http://127.0.0.1:8888/v1' + version = 1