diff --git a/releasenotes/notes/delete_messages_with_claim_ids-64bb8105de3768b1.yaml b/releasenotes/notes/delete_messages_with_claim_ids-64bb8105de3768b1.yaml new file mode 100644 index 000000000..200a4baf3 --- /dev/null +++ b/releasenotes/notes/delete_messages_with_claim_ids-64bb8105de3768b1.yaml @@ -0,0 +1,5 @@ +--- +features: + - Add an new option named 'message_delete_with_claim_id', when it is True, + delete messages must need claim_ids and message_ids both in request + parameters. This will improve the security of the message. diff --git a/zaqar/api/v1/request.py b/zaqar/api/v1/request.py index 0c8d35553..d2425f189 100644 --- a/zaqar/api/v1/request.py +++ b/zaqar/api/v1/request.py @@ -293,6 +293,7 @@ class RequestSchema(api.Api): 'properties': { 'queue_name': {'type': 'string'}, 'message_ids': {'type': 'array'}, + 'claim_ids': {'type': 'array'}, 'pop': {'type': 'integer'} }, 'required': ['queue_name'], diff --git a/zaqar/api/v2/endpoints.py b/zaqar/api/v2/endpoints.py index 888988a56..abd1a34f3 100644 --- a/zaqar/api/v2/endpoints.py +++ b/zaqar/api/v2/endpoints.py @@ -563,6 +563,9 @@ class Endpoints(object): project_id = req._headers.get('X-Project-ID') queue_name = req._body.get('queue_name') message_ids = req._body.get('message_ids') + claim_ids = None + if self._validate.get_limit_conf_value('message_delete_with_claim_id'): + claim_ids = req._body.get('claim_ids') pop_limit = req._body.get('pop') LOG.debug(u'Messages collection DELETE - queue: %(queue)s,' @@ -571,7 +574,7 @@ class Endpoints(object): 'message_ids': message_ids}) try: - self._validate.message_deletion(message_ids, pop_limit) + self._validate.message_deletion(message_ids, pop_limit, claim_ids) except validation.ValidationFailed as ex: LOG.debug(ex) @@ -580,14 +583,16 @@ class Endpoints(object): if message_ids: return self._delete_messages_by_id(req, queue_name, message_ids, - project_id) + project_id, claim_ids) elif pop_limit: return self._pop_messages(req, queue_name, project_id, pop_limit) @api_utils.on_exception_sends_500 - def _delete_messages_by_id(self, req, queue_name, ids, project_id): + def _delete_messages_by_id(self, req, queue_name, ids, project_id, + claim_ids=None): self._message_controller.bulk_delete(queue_name, message_ids=ids, - project=project_id) + project=project_id, + claim_ids=claim_ids) headers = {'status': 204} body = {} diff --git a/zaqar/conf/transport.py b/zaqar/conf/transport.py index 9201a2268..fe6de9b67 100644 --- a/zaqar/conf/transport.py +++ b/zaqar/conf/transport.py @@ -143,6 +143,13 @@ max_length_client_id = cfg.IntOpt( 'uuid restriction. Default is 36.') +message_delete_with_claim_id = cfg.BoolOpt( + 'message_delete_with_claim_id', default=False, + help='Enable delete messages must be with claim IDS. This will ' + 'improve the security of the message avoiding delete messages before' + ' they are claimed and handled.') + + GROUP_NAME = 'transport' ALL_OPTS = [ default_message_ttl, @@ -165,7 +172,8 @@ ALL_OPTS = [ max_pools_per_page, client_id_uuid_safe, min_length_client_id, - max_length_client_id + max_length_client_id, + message_delete_with_claim_id ] diff --git a/zaqar/storage/base.py b/zaqar/storage/base.py index cc4ac8513..82cceffc9 100644 --- a/zaqar/storage/base.py +++ b/zaqar/storage/base.py @@ -515,7 +515,7 @@ class Message(ControllerBase): raise NotImplementedError @abc.abstractmethod - def bulk_delete(self, queue, message_ids, project=None): + def bulk_delete(self, queue, message_ids, project=None, claim_ids=None): """Base method for deleting multiple messages. :param queue: Name of the queue to post @@ -523,6 +523,7 @@ class Message(ControllerBase): :param message_ids: A sequence of message IDs to be deleted. :param project: Project id + :param claim_ids: claim IDs passed in by the delete request """ raise NotImplementedError diff --git a/zaqar/storage/errors.py b/zaqar/storage/errors.py index 9fa32f0b7..9ac908224 100644 --- a/zaqar/storage/errors.py +++ b/zaqar/storage/errors.py @@ -108,6 +108,16 @@ class ClaimDoesNotExist(DoesNotExist): project=project) +class ClaimDoesNotMatch(ExceptionBase): + + msg_format = (u'Claim {cid} does not exist in the claim_ids parameter in' + u'queue {queue} for project {project}') + + def __init__(self, cid, queue, project): + super(ClaimDoesNotMatch, self).__init__(cid=cid, queue=queue, + project=project) + + class MessageIsClaimed(NotPermitted): msg_format = u'Message {mid} is claimed' diff --git a/zaqar/storage/mongodb/messages.py b/zaqar/storage/mongodb/messages.py index aac5e87f0..b75566a8f 100644 --- a/zaqar/storage/mongodb/messages.py +++ b/zaqar/storage/mongodb/messages.py @@ -724,14 +724,26 @@ class MessageController(storage.Message): @utils.raises_conn_error @utils.retries_on_autoreconnect - def bulk_delete(self, queue_name, message_ids, project=None): + def bulk_delete(self, queue_name, message_ids, project=None, + claim_ids=None): message_ids = [mid for mid in map(utils.to_oid, message_ids) if mid] + if claim_ids: + claim_ids = [cid for cid in map(utils.to_oid, claim_ids) if cid] query = { '_id': {'$in': message_ids}, PROJ_QUEUE: utils.scope_queue_name(queue_name, project), } collection = self._collection(queue_name, project) + if claim_ids: + message_claim_ids = [] + messages = collection.find(query).hint(ID_INDEX_FIELDS) + for message in messages: + message_claim_ids.append(message['c']['id']) + for cid in claim_ids: + if cid not in message_claim_ids: + raise errors.ClaimDoesNotExist(cid, queue_name, project) + collection.delete_many(query) @utils.raises_conn_error diff --git a/zaqar/storage/pooling.py b/zaqar/storage/pooling.py index 28434f0be..6992f7e95 100644 --- a/zaqar/storage/pooling.py +++ b/zaqar/storage/pooling.py @@ -284,11 +284,12 @@ class MessageController(storage.Message): message_id=message_id, claim=claim) return None - def bulk_delete(self, queue, message_ids, project=None): + def bulk_delete(self, queue, message_ids, project=None, claim_ids=None): control = self._get_controller(queue, project) if control: return control.bulk_delete(queue, project=project, - message_ids=message_ids) + message_ids=message_ids, + claim_ids=claim_ids) return None def pop(self, queue, limit, project=None): diff --git a/zaqar/storage/redis/messages.py b/zaqar/storage/redis/messages.py index 563dd44d3..9f35ad080 100644 --- a/zaqar/storage/redis/messages.py +++ b/zaqar/storage/redis/messages.py @@ -503,7 +503,7 @@ class MessageController(storage.Message, scripting.Mixin): @utils.raises_conn_error @utils.retries_on_connection_error - def bulk_delete(self, queue, message_ids, project=None): + def bulk_delete(self, queue, message_ids, project=None, claim_ids=None): claim_ctrl = self.driver.claim_controller if not self._queue_ctrl.exists(queue, project): return @@ -519,7 +519,14 @@ class MessageController(storage.Message, scripting.Mixin): pipe.zrem(msgset_key, mid) msg_claim = self._get_claim(mid) + + if claim_ids and msg_claim is None: + raise errors.MessageNotClaimed(mid) + if msg_claim is not None: + if claim_ids and (msg_claim['id'] not in claim_ids): + raise errors.ClaimDoesNotMatch(msg_claim['id'], queue, + project) claim_ctrl._del_message(queue, project, msg_claim['id'], mid, pipe) pipe.execute() diff --git a/zaqar/storage/swift/messages.py b/zaqar/storage/swift/messages.py index 69b9f136a..c8ddae74e 100644 --- a/zaqar/storage/swift/messages.py +++ b/zaqar/storage/swift/messages.py @@ -200,10 +200,17 @@ class MessageController(storage.Message): else: raise - def bulk_delete(self, queue, message_ids, project=None): - for id in message_ids: + def bulk_delete(self, queue, message_ids, project=None, claim_ids=None): + for message_id in message_ids: try: - self._delete(queue, id, project) + if claim_ids: + msg = self._get(queue, message_id, project) + if not msg['claim_id']: + raise errors.MessageNotClaimed(message_id) + if msg['claim_id'] not in claim_ids: + raise errors.ClaimDoesNotMatch(msg['claim_id'], + queue, project) + self._delete(queue, message_id, project) except errors.MessageDoesNotExist: pass diff --git a/zaqar/tests/faulty_storage.py b/zaqar/tests/faulty_storage.py index e09131b87..b9bbea7a5 100644 --- a/zaqar/tests/faulty_storage.py +++ b/zaqar/tests/faulty_storage.py @@ -142,5 +142,5 @@ class MessageController(storage.Message): def delete(self, queue, message_id, project=None, claim=None): raise NotImplementedError() - def bulk_delete(self, queue, message_ids, project=None): + def bulk_delete(self, queue, message_ids, project=None, claim_ids=None): raise NotImplementedError() diff --git a/zaqar/tests/unit/transport/wsgi/v2_0/test_messages.py b/zaqar/tests/unit/transport/wsgi/v2_0/test_messages.py index 2faf04422..8ae9c3d8e 100644 --- a/zaqar/tests/unit/transport/wsgi/v2_0/test_messages.py +++ b/zaqar/tests/unit/transport/wsgi/v2_0/test_messages.py @@ -410,6 +410,36 @@ class TestMessagesMongoDB(base.V2Base): self.simulate_delete(target, query_string=params, headers=self.headers) self.assertEqual(falcon.HTTP_204, self.srmock.status) + def test_bulk_delete_with_claim_ids(self): + self.conf.set_override('message_delete_with_claim_id', True, + 'transport') + path = self.queue_path + self._post_messages(path + '/messages', repeat=5) + [target, params] = self.srmock.headers_dict['location'].split('?') + + body = self.simulate_post(path + '/claims', + body='{"ttl": 100, "grace": 100}', + headers=self.headers) + self.assertEqual(falcon.HTTP_201, self.srmock.status) + claimed = jsonutils.loads(body[0])['messages'] + claime_ids = '&claim_ids=' + for claim in claimed: + claime_ids += claim['href'].split('claim_id=')[1] + ',' + + params = params + claime_ids + self.simulate_delete(target, query_string=params, headers=self.headers) + self.assertEqual(falcon.HTTP_204, self.srmock.status) + + def test_bulk_delete_without_claim_ids(self): + self.conf.set_override('message_delete_with_claim_id', True, + 'transport') + path = self.queue_path + self._post_messages(path + '/messages', repeat=5) + [target, params] = self.srmock.headers_dict['location'].split('?') + + self.simulate_delete(target, query_string=params, headers=self.headers) + self.assertEqual(falcon.HTTP_400, self.srmock.status) + def test_list(self): path = self.queue_path + '/messages' self._post_messages(path, repeat=10) diff --git a/zaqar/transport/validation.py b/zaqar/transport/validation.py index 650c9d70d..501f93610 100644 --- a/zaqar/transport/validation.py +++ b/zaqar/transport/validation.py @@ -445,11 +445,12 @@ class Validator(object): raise ValidationFailed( msg, self._limits_conf.max_messages_per_page) - def message_deletion(self, ids=None, pop=None): + def message_deletion(self, ids=None, pop=None, claim_ids=None): """Restrictions involving deletion of messages. :param ids: message ids passed in by the delete request :param pop: count of messages to be POPped + :param claim_ids: claim ids passed in by the delete request :raises ValidationFailed: if, pop AND id params are present together neither pop or id params are present @@ -468,6 +469,13 @@ class Validator(object): raise ValidationFailed(msg) + if self._limits_conf.message_delete_with_claim_id: + if (ids and claim_ids is None) or (ids is None and claim_ids): + msg = _(u'The request should have both "ids" and "claim_ids" ' + 'parameter in the request when ' + 'message_delete_with_claim_id is True.') + raise ValidationFailed(msg) + pop_uplimit = self._limits_conf.max_messages_per_claim_or_pop if pop is not None and not (0 < pop <= pop_uplimit): msg = _(u'Pop value must be at least 1 and may not ' diff --git a/zaqar/transport/wsgi/v2_0/messages.py b/zaqar/transport/wsgi/v2_0/messages.py index eb7e0ddac..ff5c3f297 100644 --- a/zaqar/transport/wsgi/v2_0/messages.py +++ b/zaqar/transport/wsgi/v2_0/messages.py @@ -280,9 +280,12 @@ class CollectionResource(object): @acl.enforce("messages:delete_all") def on_delete(self, req, resp, project_id, queue_name): ids = req.get_param_as_list('ids') + claim_ids = None + if self._validate.get_limit_conf_value('message_delete_with_claim_id'): + claim_ids = req.get_param_as_list('claim_ids') pop_limit = req.get_param_as_int('pop') try: - self._validate.message_deletion(ids, pop_limit) + self._validate.message_deletion(ids, pop_limit, claim_ids) except validation.ValidationFailed as ex: LOG.debug(ex) @@ -290,19 +293,21 @@ class CollectionResource(object): if ids: resp.status = self._delete_messages_by_id(queue_name, ids, - project_id) + project_id, claim_ids) elif pop_limit: resp.status, resp.body = self._pop_messages(queue_name, project_id, pop_limit) - def _delete_messages_by_id(self, queue_name, ids, project_id): + def _delete_messages_by_id(self, queue_name, ids, project_id, + claim_ids=None): try: self._message_controller.bulk_delete( queue_name, message_ids=ids, - project=project_id) + project=project_id, + claim_ids=claim_ids) except Exception as ex: LOG.exception(ex)