Add a Message resource and a MessageIterator

This patch defines the Message resource and the MessageIterator. The
former represents a message unit and the possible operations that can be
executed from that resource. The later allows users to iterate over
Messages' pages and consume all available messages.

The patch also renames queue_id to queue._name which is more consistent
with the API terminology.

NOTE: Functional tests are missing and will be added in a separate
patch.

Change-Id: I8c871e326bd580964f15d4ffc16c6264f9825ba7
Partially-Implements: python-marconiclient-v1
This commit is contained in:
Flavio Percoco 2013-12-20 15:33:36 +01:00
parent 63256dfc48
commit 4fdbddac29
13 changed files with 244 additions and 119 deletions

View File

@ -31,9 +31,14 @@ def create_post_delete(queue_name, messages):
cli = client.Client(URL) cli = client.Client(URL)
queue = cli.queue(queue_name) queue = cli.queue(queue_name)
queue.post(messages) queue.post(messages)
for msg in queue.messages(echo=True):
print(msg.body)
msg.delete()
queue.delete() queue.delete()
if __name__ == '__main__': if __name__ == '__main__':
messages = [{'body': {'id': idx}, 'ttl': 60} messages = [{'body': {'id': idx}, 'ttl': 360}
for idx in range(20)] for idx in range(20)]
create_post_delete('my_queue', messages) create_post_delete('my_queue', messages)

View File

@ -18,6 +18,8 @@ from marconiclient.transport import api
class V1(api.Api): class V1(api.Api):
label = 'v1'
schema = { schema = {
'queue_list': { 'queue_list': {
'ref': 'queues', 'ref': 'queues',

View File

@ -88,3 +88,14 @@ class Client(object):
:rtype: `queues.Queue` :rtype: `queues.Queue`
""" """
return queues.Queue(self, ref, **kwargs) return queues.Queue(self, ref, **kwargs)
def follow(self, ref):
"""Follows ref.
:params ref: The reference path.
:type ref: `six.text_type`
"""
req, trans = self._request_and_transport()
req.ref = ref
return trans.send(req).deserialized_content

View File

@ -1,4 +1,4 @@
# Copyright (c) 2013 Rackspace, Inc. # Copyright (c) 2013 Red Hat, Inc.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -14,63 +14,99 @@
# limitations under the License. # limitations under the License.
"""Implements a message controller that understands Marconi messages.""" """Implements a message controller that understands Marconi messages."""
from marconiclient.queues.v1 import core
def _args_from_dict(msg):
return {
'href': msg['href'],
'ttl': msg['ttl'],
'age': msg['age'],
'body': msg['body']
}
def from_dict(msg, connection=None): # NOTE(flaper87): Consider moving the
"""from_dict(dict, Connection) => Message # iterator into a common package.
:param msg: A dictionary created by decoding a Marconi message JSON reply class _MessageIterator(object):
:param connection: A connection to a Marconi server. """Message iterator
:raises: KeyError If msg is missing fields
:raises: TypeError if msg is not a dict. This iterator is not meant to be used outside
the scope of this package. The iterator gets
a dictionary as returned by the message listing
endpoint and iterates over the messages in the
`messages` key.
If there are no messages left to return, the iterator
will try to load more by following the `next` rel link
type.
The iterator raises a StopIteration exception if the server
doesn't return more messages after a `next-page` call.
:param client: The client instance used by the queue
:type client: `v1.Client`
:param messages: Response returned by the messages listing call
:type messages: Dict
""" """
return Message(
connection=connection, def __init__(self, queue, messages):
**_args_from_dict(msg) self._queue = queue
)
# NOTE(flaper87): Simple hack to
# re-use the iterator for get_many_messages
# and message listing.
self._links = []
self._messages = messages
if isinstance(messages, dict):
self._links = messages['links']
self._messages = messages['messages']
def __iter__(self):
return self
def _next_page(self):
for link in self._links:
if link['rel'] == 'next':
# NOTE(flaper87): We already have the
# ref for the next set of messages, lets
# just follow it.
messages = self._queue.client.follow(link['href'])
# NOTE(flaper87): Since we're using
# `.follow`, the empty result will
# be None. Consider making the API
# return an empty dict for consistency.
if messages:
self._links = messages['links']
self._messages = messages['messages']
return
raise StopIteration
def __next__(self):
try:
msg = self._messages.pop(0)
except IndexError:
self._next_page()
return self.next()
return Message(self._queue, **msg)
# NOTE(flaper87): Py2K support
next = __next__
class Message(object): class Message(object):
"""A handler for Marconi server Message resources. """A handler for Marconi server Message resources.
Attributes are only downloaded once - at creation time. Attributes are only downloaded once - at creation time.
""" """
def __init__(self, href, ttl, age, body, connection): def __init__(self, queue, href, ttl, age, body):
self.queue = queue
self.href = href self.href = href
self.ttl = ttl self.ttl = ttl
self.age = age self.age = age
self.body = body self.body = body
self._connection = connection
self._deleted = False # NOTE(flaper87): Is this really
# necessary? Should this be returned
# by Marconi?
self._id = href.split('/')[-1]
def __repr__(self): def __repr__(self):
return '<Message ttl:%s>' % (self.ttl,) return '<Message id:%(id)s ttl:%(ttl)s>'.format(id=self._id,
ttl=self.ttl)
def _assert_not_deleted(self):
assert not self._deleted, 'Already deleted'
def reload(self):
"""Queries the server and updates all local attributes
with new values.
"""
self._assert_not_deleted()
msg = self._connection.get(self.href).json()
self.href = msg['href']
self.ttl = msg['ttl']
self.age = msg['age']
self.body = msg['body']
def delete(self): def delete(self):
"""Deletes this resource from the server, but leaves the local req, trans = self.queue.client._request_and_transport()
object intact. core.message_delete(trans, req, self.queue._name, self._id)
"""
self._assert_not_deleted()
self._connection.delete(self.href)
self._deleted = True

View File

@ -14,15 +14,16 @@
# limitations under the License. # limitations under the License.
from marconiclient.queues.v1 import core from marconiclient.queues.v1 import core
from marconiclient.queues.v1 import message
class Queue(object): class Queue(object):
def __init__(self, client, queue_id, auto_create=True): def __init__(self, client, queue_name, auto_create=True):
self.client = client self.client = client
# NOTE(flaper87) Queue Info # NOTE(flaper87) Queue Info
self._id = queue_id self._name = queue_name
self._metadata = None self._metadata = None
if auto_create: if auto_create:
@ -31,7 +32,7 @@ class Queue(object):
def exists(self): def exists(self):
"""Checks if the queue exists.""" """Checks if the queue exists."""
req, trans = self.client._request_and_transport() req, trans = self.client._request_and_transport()
return core.queue_exists(trans, req, self._id) return core.queue_exists(trans, req, self._name)
def ensure_exists(self): def ensure_exists(self):
"""Ensures a queue exists """Ensures a queue exists
@ -41,7 +42,7 @@ class Queue(object):
right after it was called. right after it was called.
""" """
req, trans = self.client._request_and_transport() req, trans = self.client._request_and_transport()
core.queue_create(trans, req, self._id) core.queue_create(trans, req, self._name)
def metadata(self, new_meta=None, force_reload=False): def metadata(self, new_meta=None, force_reload=False):
"""Get metadata and return it """Get metadata and return it
@ -61,19 +62,19 @@ class Queue(object):
req, trans = self.client._request_and_transport() req, trans = self.client._request_and_transport()
if new_meta: if new_meta:
core.queue_set_metadata(trans, req, self._id, new_meta) core.queue_set_metadata(trans, req, self._name, new_meta)
self._metadata = new_meta self._metadata = new_meta
# TODO(flaper87): Cache with timeout # TODO(flaper87): Cache with timeout
if self._metadata and not force_reload: if self._metadata and not force_reload:
return self._metadata return self._metadata
self._metadata = core.queue_get_metadata(trans, req, self._id) self._metadata = core.queue_get_metadata(trans, req, self._name)
return self._metadata return self._metadata
def delete(self): def delete(self):
req, trans = self.client._request_and_transport() req, trans = self.client._request_and_transport()
core.queue_delete(trans, req, self._id) core.queue_delete(trans, req, self._name)
# Messages API # Messages API
@ -93,7 +94,7 @@ class Queue(object):
# TODO(flaper87): Return a list of messages # TODO(flaper87): Return a list of messages
return core.message_post(trans, req, return core.message_post(trans, req,
self._id, messages) self._name, messages)
def message(self, message_id): def message(self, message_id):
"""Gets a message by id """Gets a message by id
@ -105,8 +106,9 @@ class Queue(object):
:rtype: `dict` :rtype: `dict`
""" """
req, trans = self.client._request_and_transport() req, trans = self.client._request_and_transport()
return core.message_get(trans, req, self._id, msg = core.message_get(trans, req, self._name,
message_id) message_id)
return message.Message(self, **msg)
def messages(self, *messages, **params): def messages(self, *messages, **params):
"""Gets a list of messages from the server """Gets a list of messages from the server
@ -135,12 +137,14 @@ class Queue(object):
# and messages deserialization. # and messages deserialization.
if messages: if messages:
return core.message_get_many(trans, req, msgs = core.message_get_many(trans, req,
self._id, messages) self._name, messages)
else:
# NOTE(flaper87): It's safe to access messages
# directly. If something wrong happens, the core
# API will raise the right exceptions.
msgs = core.message_list(trans, req,
self._name,
**params)
# NOTE(flaper87): It's safe to access messages return message._MessageIterator(self, msgs)
# directly. If something wrong happens, the core
# API will raise the right exceptions.
return core.message_list(trans, req,
self._id,
**params)['messages']

View File

@ -17,10 +17,15 @@ import mock
from marconiclient.queues import client from marconiclient.queues import client
from marconiclient.tests import base from marconiclient.tests import base
from marconiclient.tests.transport import dummy
class QueuesTestBase(base.TestBase): class QueuesTestBase(base.TestBase):
transport_cls = dummy.DummyTransport
url = 'http://127.0.0.1:8888/v1'
version = 1
def setUp(self): def setUp(self):
super(QueuesTestBase, self).setUp() super(QueuesTestBase, self).setUp()
self.transport = self.transport_cls(self.conf) self.transport = self.transport_cls(self.conf)

View File

@ -0,0 +1,42 @@
# Copyright (c) 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import mock
from marconiclient.tests.queues import base
from marconiclient.transport import response
class QueuesV1MessageUnitTest(base.QueuesTestBase):
def test_message_delete(self):
returned = {
'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b01',
'ttl': 800,
'age': 790,
'body': {'event': 'ActivateAccount', 'mode': 'active'}
}
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, json.dumps(returned))
send_method.return_value = resp
msg = self.queue.message('50b68a50d6f5b8c8a7c62b01')
send_method.return_value = None
self.assertIsNone(msg.delete())

View File

@ -16,6 +16,7 @@
import json import json
import mock import mock
from marconiclient.queues.v1 import message
from marconiclient.tests.queues import base from marconiclient.tests.queues import base
from marconiclient.transport import response from marconiclient.transport import response
@ -113,7 +114,8 @@ class QueuesV1QueueUnitTest(base.QueuesTestBase):
resp = response.Response(None, json.dumps(returned)) resp = response.Response(None, json.dumps(returned))
send_method.return_value = resp send_method.return_value = resp
self.queue.messages(limit=1) msgs = self.queue.messages(limit=1)
self.assertIsInstance(msgs, message._MessageIterator)
# NOTE(flaper87): Nothing to assert here, # NOTE(flaper87): Nothing to assert here,
# just checking our way down to the transport # just checking our way down to the transport
@ -133,8 +135,8 @@ class QueuesV1QueueUnitTest(base.QueuesTestBase):
resp = response.Response(None, json.dumps(returned)) resp = response.Response(None, json.dumps(returned))
send_method.return_value = resp send_method.return_value = resp
msg = self.queue.message('50b68a50d6f5b8c8a7c62b01') msgs = self.queue.message('50b68a50d6f5b8c8a7c62b01')
self.assertTrue(isinstance(msg, dict)) self.assertIsInstance(msgs, message.Message)
# NOTE(flaper87): Nothing to assert here, # NOTE(flaper87): Nothing to assert here,
# just checking our way down to the transport # just checking our way down to the transport
@ -161,7 +163,7 @@ class QueuesV1QueueUnitTest(base.QueuesTestBase):
msg = self.queue.messages('50b68a50d6f5b8c8a7c62b01', msg = self.queue.messages('50b68a50d6f5b8c8a7c62b01',
'50b68a50d6f5b8c8a7c62b02') '50b68a50d6f5b8c8a7c62b02')
self.assertTrue(isinstance(msg, list)) self.assertIsInstance(msg, message._MessageIterator)
# NOTE(flaper87): Nothing to assert here, # NOTE(flaper87): Nothing to assert here,
# just checking our way down to the transport # just checking our way down to the transport

View File

@ -17,6 +17,8 @@ from marconiclient.transport import api
class FakeApi(api.Api): class FakeApi(api.Api):
label = 'v1'
schema = { schema = {
'test_operation': { 'test_operation': {
'ref': 'test/{name}', 'ref': 'test/{name}',

View File

@ -22,6 +22,7 @@ from marconiclient import errors
class Api(object): class Api(object):
schema = {} schema = {}
label = None
validators = {} validators = {}
def get_schema(self, operation): def get_schema(self, operation):

View File

@ -45,9 +45,19 @@ class HttpTransport(base.Transport):
# happen before any other operation here. # happen before any other operation here.
# request.validate() # request.validate()
schema = request.api.get_schema(request.operation) schema = {}
ref = schema.get('ref', '')
ref_params = {} ref_params = {}
ref = request.ref
if request.operation:
schema = request.api.get_schema(request.operation)
ref = ref or schema.get('ref', '')
# FIXME(flaper87): We expect the endpoint
# to have the API version label already,
# however in a follow-your-nose implementation
# it should be the other way around.
ref = ref.lstrip('/' + request.api.label)
for param in list(request.params.keys()): for param in list(request.params.keys()):
if '{{{0}}}'.format(param) in ref: if '{{{0}}}'.format(param) in ref:

View File

@ -77,7 +77,7 @@ class Request(object):
""" """
def __init__(self, endpoint='', operation='', def __init__(self, endpoint='', operation='',
content=None, params=None, ref='', content=None, params=None,
headers=None, api=None): headers=None, api=None):
self._api = None self._api = None
@ -85,6 +85,7 @@ class Request(object):
self.endpoint = endpoint self.endpoint = endpoint
self.operation = operation self.operation = operation
self.ref = ref
self.content = content self.content = content
self.params = params or {} self.params = params or {}
self.headers = headers or {} self.headers = headers or {}

View File

@ -12,61 +12,65 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import unittest
import json
import mock import mock
from marconiclient.queues.v1 import message from marconiclient.queues.v1 import message
from marconiclient.tests.mock import message as mock_message from marconiclient.tests.queues import base
from marconiclient.tests.queues import messages as test_message
from marconiclient.transport import http
from marconiclient.transport import response
HREF = '/v1/queue/dgq/messages/my_msg_is_chocolate' class TestMessageIterator(base.QueuesTestBase):
AGE = 100
TTL = 120 def test_no_next_iteration(self):
messages = {'links': [],
'messages': [{
'href': '/v1/queues/mine/messages/123123423',
'ttl': 800,
'age': 790,
'body': {'event': 'ActivateAccount',
'mode': 'active'}
}]
}
iterator = message._MessageIterator(self.queue, messages)
iterated = [msg for msg in iterator]
self.assertEqual(len(iterated), 1)
def test_next_page(self):
messages = {'links': [],
'messages': [{
'href': '/v1/queues/mine/messages/123123423',
'ttl': 800,
'age': 790,
'body': {'event': 'ActivateAccount',
'mode': 'active'}
}]
}
with mock.patch.object(self.transport, 'send',
autospec=True) as send_method:
resp = response.Response(None, json.dumps(messages))
send_method.return_value = resp
# NOTE(flaper87): The first iteration will return 1 message
# and then call `_next_page` which will use the rel-next link
# to get a new set of messages.
link = {'rel': 'next',
'href': "/v1/queues/mine/messages?marker=6244-244224-783"}
messages['links'].append(link)
iterator = message._MessageIterator(self.queue, messages)
iterated = [msg for msg in iterator]
self.assertEqual(len(iterated), 2)
class TestSimpleMessage(unittest.TestCase): class QueuesV1MessageHttpUnitTest(test_message.QueuesV1MessageUnitTest):
def setUp(self):
msg_body = {
'href': HREF,
'ttl': TTL,
'age': AGE,
'body': {'name': 'chocolate'}
}
self.conn = mock.MagicMock()
self.msg = message.from_dict(msg_body, connection=self.conn)
def _attr_check(self, xhref, xttl, xage, xbody): transport_cls = http.HttpTransport
self.assertEqual(self.msg.href, xhref) url = 'http://127.0.0.1:8888/v1'
self.assertEqual(self.msg.ttl, xttl) version = 1
self.assertEqual(self.msg.age, xage)
self.assertEqual(self.msg.body, xbody)
def test_attributes_match_expected(self):
self._attr_check(xhref=HREF, xttl=TTL, xage=AGE,
xbody={'name': 'chocolate'})
def test_repr_matches_expected(self):
self.assertEqual(repr(self.msg),
'<Message ttl:%s>' % (self.msg.ttl,))
def test_delete_works(self):
self.msg.delete()
def test_reload_works(self):
msg = mock_message.message(
href=HREF, ttl=TTL - 1, age=AGE + 1,
body={'name': 'vanilla'})
self.conn.get.return_value = mock.MagicMock()
self.conn.get.return_value.json.return_value = msg
self.msg.reload()
self._attr_check(xhref=HREF, xttl=TTL - 1, xage=AGE + 1,
xbody={'name': 'vanilla'})
def test_reload_after_delete_throws(self):
self.msg.delete()
self.assertRaises(AssertionError, self.msg.reload)
def test_delete_after_delete_throws(self):
self.msg.delete()
self.assertRaises(AssertionError, self.msg.delete)