Add a Message resource and a MessageIterator

This patch defines the Message resource and the MessageIterator. The
former represents a message unit and the possible operations that can be
executed from that resource. The later allows users to iterate over
Messages' pages and consume all available messages.

The patch also renames queue_id to queue._name which is more consistent
with the API terminology.

NOTE: Functional tests are missing and will be added in a separate
patch.

Change-Id: I8c871e326bd580964f15d4ffc16c6264f9825ba7
Partially-Implements: python-marconiclient-v1
This commit is contained in:
Flavio Percoco 2013-12-20 15:33:36 +01:00
parent 63256dfc48
commit 4fdbddac29
13 changed files with 244 additions and 119 deletions

View File

@ -31,9 +31,14 @@ def create_post_delete(queue_name, messages):
cli = client.Client(URL)
queue = cli.queue(queue_name)
queue.post(messages)
for msg in queue.messages(echo=True):
print(msg.body)
msg.delete()
queue.delete()
if __name__ == '__main__':
messages = [{'body': {'id': idx}, 'ttl': 60}
messages = [{'body': {'id': idx}, 'ttl': 360}
for idx in range(20)]
create_post_delete('my_queue', messages)

View File

@ -18,6 +18,8 @@ from marconiclient.transport import api
class V1(api.Api):
label = 'v1'
schema = {
'queue_list': {
'ref': 'queues',

View File

@ -88,3 +88,14 @@ class Client(object):
:rtype: `queues.Queue`
"""
return queues.Queue(self, ref, **kwargs)
def follow(self, ref):
"""Follows ref.
:params ref: The reference path.
:type ref: `six.text_type`
"""
req, trans = self._request_and_transport()
req.ref = ref
return trans.send(req).deserialized_content

View File

@ -1,4 +1,4 @@
# Copyright (c) 2013 Rackspace, Inc.
# Copyright (c) 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -14,63 +14,99 @@
# limitations under the License.
"""Implements a message controller that understands Marconi messages."""
def _args_from_dict(msg):
return {
'href': msg['href'],
'ttl': msg['ttl'],
'age': msg['age'],
'body': msg['body']
}
from marconiclient.queues.v1 import core
def from_dict(msg, connection=None):
"""from_dict(dict, Connection) => Message
:param msg: A dictionary created by decoding a Marconi message JSON reply
:param connection: A connection to a Marconi server.
:raises: KeyError If msg is missing fields
:raises: TypeError if msg is not a dict.
# NOTE(flaper87): Consider moving the
# iterator into a common package.
class _MessageIterator(object):
"""Message iterator
This iterator is not meant to be used outside
the scope of this package. The iterator gets
a dictionary as returned by the message listing
endpoint and iterates over the messages in the
`messages` key.
If there are no messages left to return, the iterator
will try to load more by following the `next` rel link
type.
The iterator raises a StopIteration exception if the server
doesn't return more messages after a `next-page` call.
:param client: The client instance used by the queue
:type client: `v1.Client`
:param messages: Response returned by the messages listing call
:type messages: Dict
"""
return Message(
connection=connection,
**_args_from_dict(msg)
)
def __init__(self, queue, messages):
self._queue = queue
# NOTE(flaper87): Simple hack to
# re-use the iterator for get_many_messages
# and message listing.
self._links = []
self._messages = messages
if isinstance(messages, dict):
self._links = messages['links']
self._messages = messages['messages']
def __iter__(self):
return self
def _next_page(self):
for link in self._links:
if link['rel'] == 'next':
# NOTE(flaper87): We already have the
# ref for the next set of messages, lets
# just follow it.
messages = self._queue.client.follow(link['href'])
# NOTE(flaper87): Since we're using
# `.follow`, the empty result will
# be None. Consider making the API
# return an empty dict for consistency.
if messages:
self._links = messages['links']
self._messages = messages['messages']
return
raise StopIteration
def __next__(self):
try:
msg = self._messages.pop(0)
except IndexError:
self._next_page()
return self.next()
return Message(self._queue, **msg)
# NOTE(flaper87): Py2K support
next = __next__
class Message(object):
"""A handler for Marconi server Message resources.
Attributes are only downloaded once - at creation time.
"""
def __init__(self, href, ttl, age, body, connection):
def __init__(self, queue, href, ttl, age, body):
self.queue = queue
self.href = href
self.ttl = ttl
self.age = age
self.body = body
self._connection = connection
self._deleted = False
# NOTE(flaper87): Is this really
# necessary? Should this be returned
# by Marconi?
self._id = href.split('/')[-1]
def __repr__(self):
return '<Message ttl:%s>' % (self.ttl,)
def _assert_not_deleted(self):
assert not self._deleted, 'Already deleted'
def reload(self):
"""Queries the server and updates all local attributes
with new values.
"""
self._assert_not_deleted()
msg = self._connection.get(self.href).json()
self.href = msg['href']
self.ttl = msg['ttl']
self.age = msg['age']
self.body = msg['body']
return '<Message id:%(id)s ttl:%(ttl)s>'.format(id=self._id,
ttl=self.ttl)
def delete(self):
"""Deletes this resource from the server, but leaves the local
object intact.
"""
self._assert_not_deleted()
self._connection.delete(self.href)
self._deleted = True
req, trans = self.queue.client._request_and_transport()
core.message_delete(trans, req, self.queue._name, self._id)

View File

@ -14,15 +14,16 @@
# limitations under the License.
from marconiclient.queues.v1 import core
from marconiclient.queues.v1 import message
class Queue(object):
def __init__(self, client, queue_id, auto_create=True):
def __init__(self, client, queue_name, auto_create=True):
self.client = client
# NOTE(flaper87) Queue Info
self._id = queue_id
self._name = queue_name
self._metadata = None
if auto_create:
@ -31,7 +32,7 @@ class Queue(object):
def exists(self):
"""Checks if the queue exists."""
req, trans = self.client._request_and_transport()
return core.queue_exists(trans, req, self._id)
return core.queue_exists(trans, req, self._name)
def ensure_exists(self):
"""Ensures a queue exists
@ -41,7 +42,7 @@ class Queue(object):
right after it was called.
"""
req, trans = self.client._request_and_transport()
core.queue_create(trans, req, self._id)
core.queue_create(trans, req, self._name)
def metadata(self, new_meta=None, force_reload=False):
"""Get metadata and return it
@ -61,19 +62,19 @@ class Queue(object):
req, trans = self.client._request_and_transport()
if new_meta:
core.queue_set_metadata(trans, req, self._id, new_meta)
core.queue_set_metadata(trans, req, self._name, new_meta)
self._metadata = new_meta
# TODO(flaper87): Cache with timeout
if self._metadata and not force_reload:
return self._metadata
self._metadata = core.queue_get_metadata(trans, req, self._id)
self._metadata = core.queue_get_metadata(trans, req, self._name)
return self._metadata
def delete(self):
req, trans = self.client._request_and_transport()
core.queue_delete(trans, req, self._id)
core.queue_delete(trans, req, self._name)
# Messages API
@ -93,7 +94,7 @@ class Queue(object):
# TODO(flaper87): Return a list of messages
return core.message_post(trans, req,
self._id, messages)
self._name, messages)
def message(self, message_id):
"""Gets a message by id
@ -105,8 +106,9 @@ class Queue(object):
:rtype: `dict`
"""
req, trans = self.client._request_and_transport()
return core.message_get(trans, req, self._id,
message_id)
msg = core.message_get(trans, req, self._name,
message_id)
return message.Message(self, **msg)
def messages(self, *messages, **params):
"""Gets a list of messages from the server
@ -135,12 +137,14 @@ class Queue(object):
# and messages deserialization.
if messages:
return core.message_get_many(trans, req,
self._id, messages)
msgs = core.message_get_many(trans, req,
self._name, messages)
else:
# NOTE(flaper87): It's safe to access messages
# directly. If something wrong happens, the core
# API will raise the right exceptions.
msgs = core.message_list(trans, req,
self._name,
**params)
# NOTE(flaper87): It's safe to access messages
# directly. If something wrong happens, the core
# API will raise the right exceptions.
return core.message_list(trans, req,
self._id,
**params)['messages']
return message._MessageIterator(self, msgs)

View File

@ -17,10 +17,15 @@ import mock
from marconiclient.queues import client
from marconiclient.tests import base
from marconiclient.tests.transport import dummy
class QueuesTestBase(base.TestBase):
transport_cls = dummy.DummyTransport
url = 'http://127.0.0.1:8888/v1'
version = 1
def setUp(self):
super(QueuesTestBase, self).setUp()
self.transport = self.transport_cls(self.conf)

View File

@ -0,0 +1,42 @@
# Copyright (c) 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import mock
from marconiclient.tests.queues import base
from marconiclient.transport import response
class QueuesV1MessageUnitTest(base.QueuesTestBase):
def test_message_delete(self):
returned = {
'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b01',
'ttl': 800,
'age': 790,
'body': {'event': 'ActivateAccount', 'mode': 'active'}
}
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, json.dumps(returned))
send_method.return_value = resp
msg = self.queue.message('50b68a50d6f5b8c8a7c62b01')
send_method.return_value = None
self.assertIsNone(msg.delete())

View File

@ -16,6 +16,7 @@
import json
import mock
from marconiclient.queues.v1 import message
from marconiclient.tests.queues import base
from marconiclient.transport import response
@ -113,7 +114,8 @@ class QueuesV1QueueUnitTest(base.QueuesTestBase):
resp = response.Response(None, json.dumps(returned))
send_method.return_value = resp
self.queue.messages(limit=1)
msgs = self.queue.messages(limit=1)
self.assertIsInstance(msgs, message._MessageIterator)
# NOTE(flaper87): Nothing to assert here,
# just checking our way down to the transport
@ -133,8 +135,8 @@ class QueuesV1QueueUnitTest(base.QueuesTestBase):
resp = response.Response(None, json.dumps(returned))
send_method.return_value = resp
msg = self.queue.message('50b68a50d6f5b8c8a7c62b01')
self.assertTrue(isinstance(msg, dict))
msgs = self.queue.message('50b68a50d6f5b8c8a7c62b01')
self.assertIsInstance(msgs, message.Message)
# NOTE(flaper87): Nothing to assert here,
# just checking our way down to the transport
@ -161,7 +163,7 @@ class QueuesV1QueueUnitTest(base.QueuesTestBase):
msg = self.queue.messages('50b68a50d6f5b8c8a7c62b01',
'50b68a50d6f5b8c8a7c62b02')
self.assertTrue(isinstance(msg, list))
self.assertIsInstance(msg, message._MessageIterator)
# NOTE(flaper87): Nothing to assert here,
# just checking our way down to the transport

View File

@ -17,6 +17,8 @@ from marconiclient.transport import api
class FakeApi(api.Api):
label = 'v1'
schema = {
'test_operation': {
'ref': 'test/{name}',

View File

@ -22,6 +22,7 @@ from marconiclient import errors
class Api(object):
schema = {}
label = None
validators = {}
def get_schema(self, operation):

View File

@ -45,9 +45,19 @@ class HttpTransport(base.Transport):
# happen before any other operation here.
# request.validate()
schema = request.api.get_schema(request.operation)
ref = schema.get('ref', '')
schema = {}
ref_params = {}
ref = request.ref
if request.operation:
schema = request.api.get_schema(request.operation)
ref = ref or schema.get('ref', '')
# FIXME(flaper87): We expect the endpoint
# to have the API version label already,
# however in a follow-your-nose implementation
# it should be the other way around.
ref = ref.lstrip('/' + request.api.label)
for param in list(request.params.keys()):
if '{{{0}}}'.format(param) in ref:

View File

@ -77,7 +77,7 @@ class Request(object):
"""
def __init__(self, endpoint='', operation='',
content=None, params=None,
ref='', content=None, params=None,
headers=None, api=None):
self._api = None
@ -85,6 +85,7 @@ class Request(object):
self.endpoint = endpoint
self.operation = operation
self.ref = ref
self.content = content
self.params = params or {}
self.headers = headers or {}

View File

@ -12,61 +12,65 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import json
import mock
from marconiclient.queues.v1 import message
from marconiclient.tests.mock import message as mock_message
from marconiclient.tests.queues import base
from marconiclient.tests.queues import messages as test_message
from marconiclient.transport import http
from marconiclient.transport import response
HREF = '/v1/queue/dgq/messages/my_msg_is_chocolate'
AGE = 100
TTL = 120
class TestMessageIterator(base.QueuesTestBase):
def test_no_next_iteration(self):
messages = {'links': [],
'messages': [{
'href': '/v1/queues/mine/messages/123123423',
'ttl': 800,
'age': 790,
'body': {'event': 'ActivateAccount',
'mode': 'active'}
}]
}
iterator = message._MessageIterator(self.queue, messages)
iterated = [msg for msg in iterator]
self.assertEqual(len(iterated), 1)
def test_next_page(self):
messages = {'links': [],
'messages': [{
'href': '/v1/queues/mine/messages/123123423',
'ttl': 800,
'age': 790,
'body': {'event': 'ActivateAccount',
'mode': 'active'}
}]
}
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, json.dumps(messages))
send_method.return_value = resp
# NOTE(flaper87): The first iteration will return 1 message
# and then call `_next_page` which will use the rel-next link
# to get a new set of messages.
link = {'rel': 'next',
'href': "/v1/queues/mine/messages?marker=6244-244224-783"}
messages['links'].append(link)
iterator = message._MessageIterator(self.queue, messages)
iterated = [msg for msg in iterator]
self.assertEqual(len(iterated), 2)
class TestSimpleMessage(unittest.TestCase):
def setUp(self):
msg_body = {
'href': HREF,
'ttl': TTL,
'age': AGE,
'body': {'name': 'chocolate'}
}
self.conn = mock.MagicMock()
self.msg = message.from_dict(msg_body, connection=self.conn)
class QueuesV1MessageHttpUnitTest(test_message.QueuesV1MessageUnitTest):
def _attr_check(self, xhref, xttl, xage, xbody):
self.assertEqual(self.msg.href, xhref)
self.assertEqual(self.msg.ttl, xttl)
self.assertEqual(self.msg.age, xage)
self.assertEqual(self.msg.body, xbody)
def test_attributes_match_expected(self):
self._attr_check(xhref=HREF, xttl=TTL, xage=AGE,
xbody={'name': 'chocolate'})
def test_repr_matches_expected(self):
self.assertEqual(repr(self.msg),
'<Message ttl:%s>' % (self.msg.ttl,))
def test_delete_works(self):
self.msg.delete()
def test_reload_works(self):
msg = mock_message.message(
href=HREF, ttl=TTL - 1, age=AGE + 1,
body={'name': 'vanilla'})
self.conn.get.return_value = mock.MagicMock()
self.conn.get.return_value.json.return_value = msg
self.msg.reload()
self._attr_check(xhref=HREF, xttl=TTL - 1, xage=AGE + 1,
xbody={'name': 'vanilla'})
def test_reload_after_delete_throws(self):
self.msg.delete()
self.assertRaises(AssertionError, self.msg.reload)
def test_delete_after_delete_throws(self):
self.msg.delete()
self.assertRaises(AssertionError, self.msg.delete)
transport_cls = http.HttpTransport
url = 'http://127.0.0.1:8888/v1'
version = 1