# Copyright (c) 2013 Red Hat, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implements the MongoDB storage controller for queues.
Field Mappings:
In order to reduce the disk / memory space used,
field names will be, most of the time, the first
letter of their long name.
from oslo_log import log as logging
from oslo_utils import timeutils
from pymongo.collection import ReturnDocument
import pymongo.errors
from zaqar.common import decorators
from zaqar.i18n import _
from zaqar import storage
from import errors
from import utils
LOG = logging.getLogger(__name__)
# NOTE(kgriffs): E.g.: 'queuecontroller:exists:5083853/my-queue'
_QUEUE_CACHE_PREFIX = 'queuecontroller:'
# NOTE(kgriffs): This causes some race conditions, but they are
# harmless. If a queue was deleted, but we are still returning
# that it exists, some messages may get inserted without the
# client getting an error. In this case, those messages would
# be orphaned and expire eventually according to their TTL.
# What this means for the client is that they have a bug; they
# deleted a queue and then immediately tried to post messages
# to it. If they keep trying to use the queue, they will
# eventually start getting an error, once the cache entry
# expires, which should clue them in on what happened.
# TODO(kgriffs): Make dynamic?
def _queue_exists_key(queue, project=None):
# NOTE(kgriffs): Use string concatenation for performance,
# also put project first since it is guaranteed to be
# unique, which should reduce lookup time.
return _QUEUE_CACHE_PREFIX + 'exists:' + str(project) + '/' + queue
class QueueController(storage.Queue):
"""Implements queue resource operations using MongoDB.
Queues are scoped by project, which is prefixed to the
queue name.
Name Field
name -> p_q
msg counter -> c
metadata -> m
Message Counter:
Name Field
value -> v
modified ts -> t
def __init__(self, *args, **kwargs):
super(QueueController, self).__init__(*args, **kwargs)
self._cache = self.driver.cache
self._collection = self.driver.queues_database.queues
# NOTE(flaper87): This creates a unique index for
# project and name. Using project as the prefix
# allows for querying by project and project+name.
# This is also useful for retrieving the queues list for
# a specific project, for example. Order matters!
# NOTE(wanghao): pymongo has removed the ensure_index since 4.0.0.
# So we need to update ensure_index to create_index.
self._collection.create_index([('p_q', 1)], unique=True)
# ----------------------------------------------------------------------
# Helpers
# ----------------------------------------------------------------------
def _get_counter(self, name, project=None):
"""Retrieves the current message counter value for a given queue.
This helper is used to generate monotonic pagination
markers that are saved as part of the message
Note 1: Markers are scoped per-queue and so are *not*
globally unique or globally ordered.
Note 2: If two or more requests to this method are made
in parallel, this method will return the same counter
value. This is done intentionally so that the caller
can detect a parallel message post, allowing it to
mitigate race conditions between producer and
observer clients.
:param name: Name of the queue to which the counter is scoped
:param project: Queue's project
:returns: current message counter as an integer
doc = self._collection.find_one(_get_scoped_query(name, project),
projection={'c.v': 1, '_id': 0})
if doc is None:
raise errors.QueueDoesNotExist(name, project)
return doc['c']['v']
def _inc_counter(self, name, project=None, amount=1, window=None):
"""Increments the message counter and returns the new value.
:param name: Name of the queue to which the counter is scoped
:param project: Queue's project name
:param amount: (Default 1) Amount by which to increment the counter
:param window: (Default None) A time window, in seconds, that
must have elapsed since the counter was last updated, in
order to increment the counter.
:returns: Updated message counter value, or None if window
was specified, and the counter has already been updated
within the specified time period.
:raises QueueDoesNotExist: if not found
now = timeutils.utcnow_ts()
update = {'$inc': {'c.v': amount}, '$set': {'c.t': now}}
query = _get_scoped_query(name, project)
if window is not None:
threshold = now - window
query['c.t'] = {'$lt': threshold}
while True:
doc = self._collection.find_one_and_update(
query, update, return_document=ReturnDocument.AFTER,
projection={'c.v': 1, '_id': 0})
except pymongo.errors.AutoReconnect:
LOG.exception('Auto reconnect failure')
if doc is None:
if window is None:
# NOTE(kgriffs): Since we did not filter by a time window,
# the queue should have been found and updated. Perhaps
# the queue has been deleted?
message = _(u'Failed to increment the message '
u'counter for queue %(name)s and '
u'project %(project)s')
message %= dict(name=name, project=project)
raise errors.QueueDoesNotExist(name, project)
# NOTE(kgriffs): Assume the queue existed, but the counter
# was recently updated, causing the range query on 'c.t' to
# exclude the record.
return None
return doc['c']['v']
# ----------------------------------------------------------------------
# Interface
# ----------------------------------------------------------------------
def _get(self, name, project=None):
return self.get_metadata(name, project)
except errors.QueueDoesNotExist:
return {}
def _list(self, project=None, kfilter={}, marker=None,
limit=storage.DEFAULT_QUEUES_PER_PAGE, detailed=False,
query = utils.scoped_query(marker, project, name, kfilter)
projection = {'p_q': 1, '_id': 0}
if detailed:
projection['m'] = 1
cursor = self._collection.find(query, projection=projection)
cursor = cursor.limit(limit).sort('p_q')
marker_name = {}
ntotal = self._collection.count_documents(query, limit=limit)
def normalizer(record):
queue = {'name': utils.descope_queue_name(record['p_q'])}
marker_name['next'] = queue['name']
if detailed:
queue['metadata'] = record['m']
return queue
yield utils.HookedCursor(cursor, normalizer, ntotal=ntotal)
yield marker_name and marker_name['next']
def get_metadata(self, name, project=None):
queue = self._collection.find_one(_get_scoped_query(name, project),
projection={'m': 1, '_id': 0})
if queue is None:
raise errors.QueueDoesNotExist(name, project)
return queue.get('m', {})
# @utils.retries_on_autoreconnect
def _create(self, name, metadata=None, project=None):
# NOTE(flaper87): If the connection fails after it was called
# and we retry to insert the queue, we could end up returning
# `False` because of the `DuplicatedKeyError` although the
# queue was indeed created by this API call.
# TODO(kgriffs): Commented out `retries_on_autoreconnect` for
# now due to the above issue, since creating a queue is less
# important to make super HA.
# NOTE(kgriffs): Start counting at 1, and assume the first
# message ever posted will succeed and set t to a UNIX
# "modified at" timestamp.
counter = {'v': 1, 't': 0}
scoped_name = utils.scope_queue_name(name, project)
{'p_q': scoped_name, 'm': metadata or {},
'c': counter})
except pymongo.errors.DuplicateKeyError:
return False
return True
# NOTE(kgriffs): Only cache when it exists; if it doesn't exist, and
# someone creates it, we want it to be immediately visible.
@decorators.caches(_queue_exists_key, _QUEUE_CACHE_TTL, lambda v: v)
def _exists(self, name, project=None):
query = _get_scoped_query(name, project)
return self._collection.find_one(query) is not None
def set_metadata(self, name, metadata, project=None):
rst = self._collection.update_one(_get_scoped_query(name, project),
{'$set': {'m': metadata}})
if rst.matched_count == 0:
raise errors.QueueDoesNotExist(name, project)
def _delete(self, name, project=None):
self._collection.delete_one(_get_scoped_query(name, project))
def _stats(self, name, project=None):
def _calculate_resource_count(self, project=None):
query = utils.scoped_query(None, project, None, {})
return self._collection.count_documents(query)
def _get_scoped_query(name, project):
return {'p_q': utils.scope_queue_name(name, project)}