diff --git a/examples/claims.py b/examples/claims.py new file mode 100644 index 00000000..43977cf0 --- /dev/null +++ b/examples/claims.py @@ -0,0 +1,52 @@ +# Copyright (c) 2014 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +from marconiclient.queues.v1 import client + +URL = 'http://localhost:8888/v1' + +cli = client.Client(URL) +queue = cli.queue('worker-jobs') + + +def send_jobs(): + jobs = [ + {'name': 'fluffy'}, + {'name': 'scout'}, + {'name': 'jo'} + ] + queue.post([{'body': j, + 'ttl': 360} + for j in jobs]) + + +def process_jobs(): + while True: + claim1 = queue.claim(ttl=500, grace=900, limit=2) + for msg in claim1: + claim_id = msg.claim_id + print('{claim_id} =? {id}'.format(claim_id=claim_id, id=claim1.id)) + print('processing job %s' % (msg)) + msg.delete() + claim2 = queue.claim(id=claim_id) + print(claim2) + print('nothing to do but have a sleep :)') + time.sleep(1) + +if __name__ == '__main__': + send_jobs() + process_jobs() diff --git a/marconiclient/queues/v1/api.py b/marconiclient/queues/v1/api.py index 3b15cc2b..3acc771d 100644 --- a/marconiclient/queues/v1/api.py +++ b/marconiclient/queues/v1/api.py @@ -162,6 +162,48 @@ class V1(api.Api): } }, + 'claim_create': { + 'ref': 'queues/{queue_name}/claims', + 'method': 'POST', + 'required': ['queue_name'], + 'properties': { + 'queue_name': {'type': 'string'}, + 'limit': {'type': 'integer'}, + 'grace': {'type': 'integer'} + } + }, + + 'claim_get': { + 'ref': 'queues/{queue_name}/claims/{claim_id}', + 'method': 'GET', + 'required': ['queue_name', 'claim_id'], + 'properties': { + 'queue_name': {'type': 'string'}, + 'claim_id': {'type': 'string'} + } + }, + + 'claim_update': { + 'ref': 'queues/{queue_name}/claims/{claim_id}', + 'method': 'PATCH', + 'required': ['queue_name', 'claim_id'], + 'properties': { + 'queue_name': {'type': 'string'}, + 'claim_id': {'type': 'string'} + } + }, + + 'claim_delete': { + 'ref': 'queues/{queue_name}/claims/{claim_id}', + 'method': 'DELETE', + 'required': ['queue_name', 'claim_id'], + 'properties': { + 'queue_name': {'type': 'string'}, + 'claim_id': {'type': 'string'} + } + }, + + 'health': { 'admin': True, 'ref': 'health', diff --git a/marconiclient/queues/v1/claim.py b/marconiclient/queues/v1/claim.py new file mode 100644 index 00000000..6c1dc84f --- /dev/null +++ b/marconiclient/queues/v1/claim.py @@ -0,0 +1,97 @@ +# Copyright (c) 2014 Rackspace, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from marconiclient.queues.v1 import core +from marconiclient.queues.v1 import message + + +class Claim(object): + def __init__(self, queue, id=None, + ttl=None, grace=None, limit=None): + self._queue = queue + self.id = id + self._ttl = ttl + self._grace = grace + self._age = None + self._limit = limit + self._message_iter = None + if id is None: + self._create() + + def __repr__(self): + return ''.format(id=self.id, + ttl=self.ttl, + age=self.age) + + def _get(self): + req, trans = self._queue.client._request_and_transport() + + claim_res = core.claim_get(trans, req, self._queue._name, + self.id) + self._age = claim_res['age'] + self._ttl = claim_res['ttl'] + self._grace = claim_res.get('grace') + msgs = claim_res.get('messages', []) + self._message_iter = message._MessageIterator(self._queue, + msgs) + + def _create(self): + req, trans = self._queue.client._request_and_transport() + msgs = core.claim_create(trans, req, + self._queue._name, + ttl=self._ttl, + grace=self._grace, + limit=self._limit) + # extract the id from the first message + if msgs is not None: + self.id = msgs[0]['href'].split('=')[-1] + self._message_iter = message._MessageIterator(self._queue, msgs or []) + + def __iter__(self): + if self._message_iter is None: + self._get() + return self._message_iter + + @property + def age(self): + if self._age is None: + self._get() + return self._age + + @property + def ttl(self): + if self._ttl is None: + self._get() + return self._ttl + + def delete(self): + req, trans = self._queue.client._request_and_transport() + core.claim_delete(trans, req, self._queue._name, self.id) + + def update(self, ttl=None, grace=None): + req, trans = self._queue.client._request_and_transport() + kwargs = {} + if ttl is not None: + kwargs['ttl'] = ttl + if grace is not None: + kwargs['grace'] = grace + res = core.claim_update(trans, req, self._queue._name, self.id, + **kwargs) + # if the update succeeds, update our attributes. + if ttl is not None: + self._ttl = ttl + if grace is not None: + self._grace = grace + return res diff --git a/marconiclient/queues/v1/core.py b/marconiclient/queues/v1/core.py index 054ef663..9435eb6d 100644 --- a/marconiclient/queues/v1/core.py +++ b/marconiclient/queues/v1/core.py @@ -267,6 +267,73 @@ def message_delete(transport, request, queue_name, message_id, callback=None): transport.send(request) +def claim_create(transport, request, queue_name, **kwargs): + """Creates a Claim `claim_id` on the queue `queue_name` + + :param transport: Transport instance to use + :type transport: `transport.base.Transport` + :param request: Request instance ready to be sent. + :type request: `transport.request.Request` + """ + + request.operation = 'claim_create' + request.params['queue_name'] = queue_name + request.content = json.dumps(kwargs) + + resp = transport.send(request) + return resp.deserialized_content + + +def claim_get(transport, request, queue_name, claim_id): + """Gets a Claim `claim_id` + + :param transport: Transport instance to use + :type transport: `transport.base.Transport` + :param request: Request instance ready to be sent. + :type request: `transport.request.Request` + """ + + request.operation = 'claim_get' + request.params['queue_name'] = queue_name + request.params['claim_id'] = claim_id + + resp = transport.send(request) + return resp.deserialized_content + + +def claim_update(transport, request, queue_name, claim_id, **kwargs): + """Updates a Claim `claim_id` + + :param transport: Transport instance to use + :type transport: `transport.base.Transport` + :param request: Request instance ready to be sent. + :type request: `transport.request.Request` + """ + + request.operation = 'claim_update' + request.params['queue_name'] = queue_name + request.params['claim_id'] = claim_id + request.content = json.dumps(kwargs) + + resp = transport.send(request) + return resp.deserialized_content + + +def claim_delete(transport, request, queue_name, claim_id): + """Deletes a Claim `claim_id` + + :param transport: Transport instance to use + :type transport: `transport.base.Transport` + :param request: Request instance ready to be sent. + :type request: `transport.request.Request` + """ + request.operation = 'claim_delete' + request.params['queue_name'] = queue_name + request.params['claim_id'] = claim_id + + transport.send(request) + + def shard_create(transport, request, shard_name, shard_data): """Creates a shard called `shard_name` diff --git a/marconiclient/queues/v1/message.py b/marconiclient/queues/v1/message.py index fb845291..cdb6325a 100644 --- a/marconiclient/queues/v1/message.py +++ b/marconiclient/queues/v1/message.py @@ -101,12 +101,23 @@ class Message(object): # NOTE(flaper87): Is this really # necessary? Should this be returned # by Marconi? + # The url has two forms depending on if it has been claimed. + # /v1/queues/worker-jobs/messages/5c6939a8?claim_id=63c9a592 + # or + # /v1/queues/worker-jobs/messages/5c6939a8 self._id = href.split('/')[-1] + if '?' in self._id: + self._id = self._id.split('?')[0] def __repr__(self): return ''.format(id=self._id, ttl=self.ttl) + @property + def claim_id(self): + if '=' in self.href: + return self.href.split('=')[-1] + def delete(self): req, trans = self.queue.client._request_and_transport() core.message_delete(trans, req, self.queue._name, self._id) diff --git a/marconiclient/queues/v1/queues.py b/marconiclient/queues/v1/queues.py index 5c831c02..c83e3941 100644 --- a/marconiclient/queues/v1/queues.py +++ b/marconiclient/queues/v1/queues.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from marconiclient.queues.v1 import claim as claim_api from marconiclient.queues.v1 import core from marconiclient.queues.v1 import message @@ -208,3 +209,7 @@ class Queue(object): **params) return message._MessageIterator(self, msgs) + + def claim(self, id=None, ttl=None, grace=None, + limit=None): + return claim_api.Claim(self, id=id, ttl=ttl, grace=grace, limit=limit) diff --git a/marconiclient/tests/queues/claims.py b/marconiclient/tests/queues/claims.py new file mode 100644 index 00000000..fe193af4 --- /dev/null +++ b/marconiclient/tests/queues/claims.py @@ -0,0 +1,139 @@ +# Copyright (c) 2014 Rackspace Hosting. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import mock + +from marconiclient.queues.v1 import claim +from marconiclient.tests.queues import base +from marconiclient.transport import errors +from marconiclient.transport import response + + +class QueueV1ClaimUnitTest(base.QueuesTestBase): + + def test_claim(self): + result = [{ + 'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b01', + 'ttl': 800, + 'age': 790, + 'body': {'event': 'ActivateAccount', 'mode': 'active'} + }, { + 'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b02', + 'ttl': 800, + 'age': 790, + 'body': {'event': 'ActivateAccount', 'mode': 'active'} + }] + + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + + resp = response.Response(None, json.dumps(result)) + send_method.return_value = resp + + claimed = self.queue.claim(ttl=60, grace=60) + # messages doesn't support len() + num_tested = 0 + for num, msg in enumerate(claimed): + num_tested += 1 + self.assertEqual(result[num]['href'], msg.href) + self.assertEqual(len(result), num_tested) + + def test_claim_get_by_id(self): + result = { + 'href': '/v1/queues/fizbit/messages/50b68a50d6cb01?claim_id=4524', + 'age': 790, + 'ttl': 800, + 'messages': [{ + 'href': '/v1/queues/fizbit/messages/50b68a50d6f5b8c8a7c62b01', + 'ttl': 800, + 'age': 790, + 'body': {'event': 'ActivateAccount', 'mode': 'active'} + }]} + + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + + resp = response.Response(None, json.dumps(result)) + send_method.return_value = resp + + cl = self.queue.claim(id='5245432') + # messages doesn't support len() + num_tested = 0 + for num, msg in enumerate(cl): + num_tested += 1 + self.assertEqual(result['messages'][num]['href'], msg.href) + self.assertEqual(len(result['messages']), num_tested) + + def test_claim_update(self): + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + + resp = response.Response(None, None) + send_method.return_value = resp + + self.queue.claim(id='5245432').update(ttl=444, grace=987) + + # NOTE(asalkeld): Nothing to assert here, + # just checking our way down to the transport + # doesn't crash. + + def test_claim_delete(self): + with mock.patch.object(self.transport, 'send', + autospec=True) as send_method: + + resp = response.Response(None, None) + send_method.return_value = resp + + self.queue.claim(id='4225').delete() + + # NOTE(asalkeld): Nothing to assert here, + # just checking our way down to the transport + # doesn't crash. + + +class QueuesV1ClaimFunctionalTest(base.QueuesTestBase): + + def test_message_claim_functional(self): + queue = self.client.queue("test_queue") + queue._get_transport = mock.Mock(return_value=self.transport) + + messages = [{'ttl': 60, 'body': 'Post It 1!'}] + queue.post(messages) + + messages = queue.claim(ttl=120, grace=120) + self.assertTrue(isinstance(messages, claim.Claim)) + self.assertGreaterEqual(len(list(messages)), 0) + + def test_claim_get_functional(self): + queue = self.client.queue("test_queue") + queue._get_transport = mock.Mock(return_value=self.transport) + + res = queue.claim(ttl=100, grace=100) + claim_id = res.id + cl = queue.claim(id=claim_id) + self.assertEqual(cl.id, claim_id) + + def test_claim_create_delete_functional(self): + queue = self.client.queue("test_queue") + queue._get_transport = mock.Mock(return_value=self.transport) + + messages = [{'ttl': 60, 'body': 'Post It 1!'}] + queue.post(messages) + + cl = queue.claim(ttl=120, grace=120) + claim_id = cl.id + cl.delete() + self.assertRaises(errors.ResourceNotFound, queue.claim, id=claim_id) diff --git a/tests/functional/queues/v1/test_claims.py b/tests/functional/queues/v1/test_claims.py new file mode 100644 index 00000000..cffe9ca0 --- /dev/null +++ b/tests/functional/queues/v1/test_claims.py @@ -0,0 +1,26 @@ +# Copyright (c) 2014 Rackspace Hosting. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from marconiclient.tests.queues import claims +from marconiclient.transport import http + + +class QueuesV1ClaimHttpFunctionalTest(claims.QueuesV1ClaimFunctionalTest): + + is_functional = True + transport_cls = http.HttpTransport + url = 'http://127.0.0.1:8888/v1' + version = 1 diff --git a/tests/unit/queues/v1/test_claims.py b/tests/unit/queues/v1/test_claims.py new file mode 100644 index 00000000..f1f35de9 --- /dev/null +++ b/tests/unit/queues/v1/test_claims.py @@ -0,0 +1,25 @@ +# Copyright (c) Rackspace Hosting. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from marconiclient.tests.queues import claims +from marconiclient.transport import http + + +class QueuesV1ClaimsHttpUnitTest(claims.QueueV1ClaimUnitTest): + + transport_cls = http.HttpTransport + url = 'http://127.0.0.1:8888/v1' + version = 1