diff --git a/releasenotes/notes/update-mongo-driver-with-new-version-of-pymongo-ebd82e428bb57ebd.yaml b/releasenotes/notes/update-mongo-driver-with-new-version-of-pymongo-ebd82e428bb57ebd.yaml new file mode 100644 index 000000000..f0c76156b --- /dev/null +++ b/releasenotes/notes/update-mongo-driver-with-new-version-of-pymongo-ebd82e428bb57ebd.yaml @@ -0,0 +1,11 @@ +--- +upgrade: + - | + Upgrade one of storage drivers, mongo driver with new version of pymongo. + Pymongo has been updated to 4.0.0, there are some changes which are not + supported in new version: + 1. Collection.count and Cursor.count is removed. + 2. Collection.ensure_index is removed. + 3. Collection.__bool__ raises NotImplementedError. + 4. Should use Binary.from_uuid to handle the UUID object. + Those changes need to upgrade the mongo driver's code to work well. diff --git a/zaqar/common/auth.py b/zaqar/common/auth.py index dff605b64..8189b21da 100644 --- a/zaqar/common/auth.py +++ b/zaqar/common/auth.py @@ -54,7 +54,7 @@ def _get_admin_session(conf_group): def _get_user_client(auth_plugin): sess = loading.load_session_from_conf_options( cfg.CONF, TRUSTEE_CONF_GROUP, auth=auth_plugin) - return client.Client(session=sess) + return client.Client(session=sess, interface='public') def create_trust_id(auth_plugin, trustor_user_id, trustor_project_id, roles, diff --git a/zaqar/storage/mongodb/catalogue.py b/zaqar/storage/mongodb/catalogue.py index 9df8faf65..7b52788a2 100644 --- a/zaqar/storage/mongodb/catalogue.py +++ b/zaqar/storage/mongodb/catalogue.py @@ -43,7 +43,7 @@ class CatalogueController(base.CatalogueBase): super(CatalogueController, self).__init__(*args, **kwargs) self._col = self.driver.database.catalogue - self._col.ensure_index(CATALOGUE_INDEX, unique=True) + self._col.create_index(CATALOGUE_INDEX, unique=True) @utils.raises_conn_error def _insert(self, project, queue, pool, upsert): @@ -56,8 +56,9 @@ class CatalogueController(base.CatalogueBase): fields = {'_id': 0} query = utils.scoped_query(None, project) + ntotal = self._col.count_documents(query) return utils.HookedCursor(self._col.find(query, fields), - _normalize) + _normalize, ntotal=ntotal) @utils.raises_conn_error def get(self, project, queue): @@ -95,7 +96,7 @@ class CatalogueController(base.CatalogueBase): @utils.raises_conn_error def drop_all(self): self._col.drop() - self._col.ensure_index(CATALOGUE_INDEX, unique=True) + self._col.create_index(CATALOGUE_INDEX, unique=True) def _normalize(entry): diff --git a/zaqar/storage/mongodb/claims.py b/zaqar/storage/mongodb/claims.py index 42151cb29..9ec996db1 100644 --- a/zaqar/storage/mongodb/claims.py +++ b/zaqar/storage/mongodb/claims.py @@ -247,7 +247,7 @@ class ClaimController(storage.Claim): {'$set': new_msg}, **kwargs) dlq_collection = msg_ctrl._collection(dlq_name, project) - if not dlq_collection: + if dlq_collection is None: LOG.warning(u"Failed to find the message collection " u"for queue %(dlq_name)s", {"dlq_name": dlq_name}) diff --git a/zaqar/storage/mongodb/driver.py b/zaqar/storage/mongodb/driver.py index f1a677734..bb89ac08b 100644 --- a/zaqar/storage/mongodb/driver.py +++ b/zaqar/storage/mongodb/driver.py @@ -142,10 +142,11 @@ class DataDriver(storage.DataDriverBase): message_volume = {'free': 0, 'claimed': 0, 'total': 0} for msg_col in [db.messages for db in self.message_databases]: - msg_count_claimed = msg_col.find({'c.id': {'$ne': None}}).count() + msg_count_claimed = msg_col.count_documents({'c.id': + {'$ne': None}}) message_volume['claimed'] += msg_count_claimed - msg_count_total = msg_col.find().count() + msg_count_total = msg_col.count_documents({}) message_volume['total'] += msg_count_total message_volume['free'] = (message_volume['total'] - diff --git a/zaqar/storage/mongodb/flavors.py b/zaqar/storage/mongodb/flavors.py index bd77b7788..52984ab9c 100644 --- a/zaqar/storage/mongodb/flavors.py +++ b/zaqar/storage/mongodb/flavors.py @@ -48,11 +48,11 @@ class FlavorsController(base.FlavorsBase): super(FlavorsController, self).__init__(*args, **kwargs) self._col = self.driver.database.flavors - self._col.ensure_index(FLAVORS_INDEX, + self._col.create_index(FLAVORS_INDEX, background=True, name='flavors_name', unique=True) - self._col.ensure_index(FLAVORS_STORAGE_POOL_INDEX, + self._col.create_index(FLAVORS_STORAGE_POOL_INDEX, background=True, name='flavors_storage_pool_group_name') @@ -66,13 +66,14 @@ class FlavorsController(base.FlavorsBase): cursor = self._col.find(query, projection=_field_spec(detailed), limit=limit).sort('n', 1) + ntotal = self._col.count_documents(query) marker_name = {} def normalizer(flavor): marker_name['next'] = flavor['n'] return _normalize(flavor, detailed=detailed) - yield utils.HookedCursor(cursor, normalizer) + yield utils.HookedCursor(cursor, normalizer, ntotal=ntotal) yield marker_name and marker_name['next'] @utils.raises_conn_error @@ -127,7 +128,7 @@ class FlavorsController(base.FlavorsBase): @utils.raises_conn_error def drop_all(self): self._col.drop() - self._col.ensure_index(FLAVORS_INDEX, unique=True) + self._col.create_index(FLAVORS_INDEX, unique=True) def _normalize(flavor, detailed=False): diff --git a/zaqar/storage/mongodb/messages.py b/zaqar/storage/mongodb/messages.py index daa068e52..2084ea4be 100644 --- a/zaqar/storage/mongodb/messages.py +++ b/zaqar/storage/mongodb/messages.py @@ -23,7 +23,9 @@ Field Mappings: import datetime import time +import uuid +from bson import binary from bson import objectid from oslo_log import log as logging from oslo_utils import timeutils @@ -169,28 +171,28 @@ class MessageController(storage.Message): def _ensure_indexes(self, collection): """Ensures that all indexes are created.""" - collection.ensure_index(TTL_INDEX_FIELDS, + collection.create_index(TTL_INDEX_FIELDS, name='ttl', expireAfterSeconds=0, background=True) - collection.ensure_index(ACTIVE_INDEX_FIELDS, + collection.create_index(ACTIVE_INDEX_FIELDS, name='active', background=True) - collection.ensure_index(CLAIMED_INDEX_FIELDS, + collection.create_index(CLAIMED_INDEX_FIELDS, name='claimed', background=True) - collection.ensure_index(COUNTING_INDEX_FIELDS, + collection.create_index(COUNTING_INDEX_FIELDS, name='counting', background=True) - collection.ensure_index(MARKER_INDEX_FIELDS, + collection.create_index(MARKER_INDEX_FIELDS, name='queue_marker', background=True) - collection.ensure_index(TRANSACTION_INDEX_FIELDS, + collection.create_index(TRANSACTION_INDEX_FIELDS, name='transaction', background=True) @@ -234,7 +236,7 @@ class MessageController(storage.Message): def _list(self, queue_name, project=None, marker=None, echo=False, client_uuid=None, projection=None, include_claimed=False, include_delayed=False, - sort=1, limit=None): + sort=1, limit=None, count=False): """Message document listing helper. :param queue_name: Name of the queue to list @@ -261,6 +263,7 @@ class MessageController(storage.Message): to list. The results may include fewer messages than the requested `limit` if not enough are available. If limit is not specified + :param count: (Default False) If return the collection's count :returns: Generator yielding up to `limit` messages. """ @@ -284,6 +287,12 @@ class MessageController(storage.Message): } if not echo: + if (client_uuid is not None) and not isinstance(client_uuid, + uuid.UUID): + client_uuid = uuid.UUID(client_uuid) + client_uuid = binary.Binary.from_uuid(client_uuid) + elif isinstance(client_uuid, uuid.UUID): + client_uuid = binary.Binary.from_uuid(client_uuid) query['u'] = {'$ne': client_uuid} if marker is not None: @@ -308,12 +317,19 @@ class MessageController(storage.Message): cursor = collection.find(query, projection=projection, sort=[('k', sort)]) + ntotal = None + if count: + ntotal = collection.count_documents(query) if limit is not None: cursor.limit(limit) + if count: + ntotal = collection.count_documents(query, limit=limit) # NOTE(flaper87): Suggest the index to use for this query to # ensure the most performant one is chosen. + if count: + return cursor.hint(ACTIVE_INDEX_FIELDS), ntotal return cursor.hint(ACTIVE_INDEX_FIELDS) # ---------------------------------------------------------------------- @@ -348,7 +364,8 @@ class MessageController(storage.Message): query['c.e'] = {'$lte': timeutils.utcnow_ts()} collection = self._collection(queue_name, project) - return collection.count(filter=query, hint=COUNTING_INDEX_FIELDS) + return collection.count_documents(filter=query, + hint=COUNTING_INDEX_FIELDS) def _active(self, queue_name, marker=None, echo=False, client_uuid=None, projection=None, project=None, @@ -383,8 +400,10 @@ class MessageController(storage.Message): msgs = collection.find(query, sort=[('k', 1)], **kwargs).hint( CLAIMED_INDEX_FIELDS) + ntotal = collection.count_documents(query) if limit is not None: msgs = msgs.limit(limit) + ntotal = collection.count_documents(query, limit=limit) now = timeutils.utcnow_ts() @@ -394,7 +413,7 @@ class MessageController(storage.Message): return doc - return utils.HookedCursor(msgs, denormalizer) + return utils.HookedCursor(msgs, denormalizer, ntotal=ntotal) def _unclaim(self, queue_name, claim_id, project=None): cid = utils.to_oid(claim_id) @@ -540,10 +559,13 @@ class MessageController(storage.Message): except ValueError: yield iter([]) - messages = self._list(queue_name, project=project, marker=marker, - client_uuid=client_uuid, echo=echo, - include_claimed=include_claimed, - include_delayed=include_delayed, limit=limit) + messages, ntotal = self._list(queue_name, project=project, + marker=marker, + client_uuid=client_uuid, echo=echo, + include_claimed=include_claimed, + include_delayed=include_delayed, + limit=limit, + count=True) marker_id = {} @@ -556,7 +578,7 @@ class MessageController(storage.Message): return _basic_message(msg, now) - yield utils.HookedCursor(messages, denormalizer) + yield utils.HookedCursor(messages, denormalizer, ntotal=ntotal) yield str(marker_id['next']) @utils.raises_conn_error @@ -617,11 +639,12 @@ class MessageController(storage.Message): # NOTE(flaper87): Should this query # be sorted? messages = collection.find(query).hint(ID_INDEX_FIELDS) + ntotal = collection.count_documents(query) def denormalizer(msg): return _basic_message(msg, now) - return utils.HookedCursor(messages, denormalizer) + return utils.HookedCursor(messages, denormalizer, ntotal=ntotal) @utils.raises_conn_error @utils.retries_on_autoreconnect @@ -634,6 +657,13 @@ class MessageController(storage.Message): if not self._queue_ctrl.exists(queue_name, project): raise errors.QueueDoesNotExist(queue_name, project) + if (client_uuid is not None) and not isinstance(client_uuid, + uuid.UUID): + client_uuid = uuid.UUID(client_uuid) + client_uuid = binary.Binary.from_uuid(client_uuid) + elif isinstance(client_uuid, uuid.UUID): + client_uuid = binary.Binary.from_uuid(client_uuid) + # NOTE(flaper87): Make sure the counter exists. This method # is an upsert. self._get_counter(queue_name, project) @@ -776,20 +806,20 @@ class FIFOMessageController(MessageController): def _ensure_indexes(self, collection): """Ensures that all indexes are created.""" - collection.ensure_index(TTL_INDEX_FIELDS, + collection.create_index(TTL_INDEX_FIELDS, name='ttl', expireAfterSeconds=0, background=True) - collection.ensure_index(ACTIVE_INDEX_FIELDS, + collection.create_index(ACTIVE_INDEX_FIELDS, name='active', background=True) - collection.ensure_index(CLAIMED_INDEX_FIELDS, + collection.create_index(CLAIMED_INDEX_FIELDS, name='claimed', background=True) - collection.ensure_index(COUNTING_INDEX_FIELDS, + collection.create_index(COUNTING_INDEX_FIELDS, name='counting', background=True) @@ -800,12 +830,12 @@ class FIFOMessageController(MessageController): # to miss a message when there is more than one # producer posting messages to the same queue, in # parallel. - collection.ensure_index(MARKER_INDEX_FIELDS, + collection.create_index(MARKER_INDEX_FIELDS, name='queue_marker', unique=True, background=True) - collection.ensure_index(TRANSACTION_INDEX_FIELDS, + collection.create_index(TRANSACTION_INDEX_FIELDS, name='transaction', background=True) @@ -840,6 +870,13 @@ class FIFOMessageController(MessageController): # Unique transaction ID to facilitate atomic batch inserts transaction = objectid.ObjectId() + if (client_uuid is not None) and not isinstance(client_uuid, + uuid.UUID): + client_uuid = uuid.UUID(client_uuid) + client_uuid = binary.Binary.from_uuid(client_uuid) + elif isinstance(client_uuid, uuid.UUID): + client_uuid = binary.Binary.from_uuid(client_uuid) + prepared_messages = [] for index, message in enumerate(messages): msg = { diff --git a/zaqar/storage/mongodb/pools.py b/zaqar/storage/mongodb/pools.py index 80bd06316..e26e3edb0 100644 --- a/zaqar/storage/mongodb/pools.py +++ b/zaqar/storage/mongodb/pools.py @@ -57,12 +57,12 @@ class PoolsController(base.PoolsBase): super(PoolsController, self).__init__(*args, **kwargs) self._col = self.driver.database.pools - self._col.ensure_index(POOLS_INDEX, + self._col.create_index(POOLS_INDEX, background=True, name='pools_name', unique=True) - self._col.ensure_index(URI_INDEX, + self._col.create_index(URI_INDEX, background=True, name='pools_uri', unique=True) @@ -95,15 +95,16 @@ class PoolsController(base.PoolsBase): @utils.raises_conn_error def _get_pools_by_flavor(self, flavor=None, detailed=False): - query = None + query = {} if flavor is None: query = {'f': None} elif flavor.get('name') is not None: query = {'f': flavor.get('name')} cursor = self._col.find(query, projection=_field_spec(detailed)) + ntotal = self._col.count_documents(query) normalizer = functools.partial(_normalize, detailed=detailed) - return utils.HookedCursor(cursor, normalizer) + return utils.HookedCursor(cursor, normalizer, ntotal=ntotal) @utils.raises_conn_error def _create(self, name, weight, uri, flavor=None, @@ -169,7 +170,7 @@ class PoolsController(base.PoolsBase): @utils.raises_conn_error def _drop_all(self): self._col.drop() - self._col.ensure_index(POOLS_INDEX, unique=True) + self._col.create_index(POOLS_INDEX, unique=True) def _normalize(pool, detailed=False): diff --git a/zaqar/storage/mongodb/queues.py b/zaqar/storage/mongodb/queues.py index b13ef8778..9a4be8572 100644 --- a/zaqar/storage/mongodb/queues.py +++ b/zaqar/storage/mongodb/queues.py @@ -95,7 +95,9 @@ class QueueController(storage.Queue): # allows for querying by project and project+name. # This is also useful for retrieving the queues list for # a specific project, for example. Order matters! - self._collection.ensure_index([('p_q', 1)], unique=True) + # NOTE(wanghao): pymongo has removed the ensure_index since 4.0.0. + # So we need to update ensure_index to create_index. + self._collection.create_index([('p_q', 1)], unique=True) # ---------------------------------------------------------------------- # Helpers @@ -209,6 +211,7 @@ class QueueController(storage.Queue): cursor = self._collection.find(query, projection=projection) cursor = cursor.limit(limit).sort('p_q') marker_name = {} + ntotal = self._collection.count_documents(query, limit=limit) def normalizer(record): queue = {'name': utils.descope_queue_name(record['p_q'])} @@ -217,7 +220,7 @@ class QueueController(storage.Queue): queue['metadata'] = record['m'] return queue - yield utils.HookedCursor(cursor, normalizer) + yield utils.HookedCursor(cursor, normalizer, ntotal=ntotal) yield marker_name and marker_name['next'] @utils.raises_conn_error @@ -291,8 +294,7 @@ class QueueController(storage.Queue): @utils.retries_on_autoreconnect def _calculate_resource_count(self, project=None): query = utils.scoped_query(None, project, None, {}) - projection = {'p_q': 1, '_id': 0} - return self._collection.find(query, projection=projection).count() + return self._collection.count_documents(query) def _get_scoped_query(name, project): diff --git a/zaqar/storage/mongodb/subscriptions.py b/zaqar/storage/mongodb/subscriptions.py index 03937356d..ee345d7a7 100644 --- a/zaqar/storage/mongodb/subscriptions.py +++ b/zaqar/storage/mongodb/subscriptions.py @@ -54,13 +54,13 @@ class SubscriptionController(base.Subscription): def __init__(self, *args, **kwargs): super(SubscriptionController, self).__init__(*args, **kwargs) self._collection = self.driver.subscriptions_database.subscriptions - self._collection.ensure_index(SUBSCRIPTIONS_INDEX, unique=True) + self._collection.create_index(SUBSCRIPTIONS_INDEX, unique=True) # NOTE(flwang): MongoDB will automatically delete the subscription # from the subscriptions collection when the subscription's 'e' value # is older than the number of seconds specified in expireAfterSeconds, # i.e. 0 seconds older in this case. As such, the data expires at the # specified 'e' value. - self._collection.ensure_index(TTL_INDEX_FIELDS, name='ttl', + self._collection.create_index(TTL_INDEX_FIELDS, name='ttl', expireAfterSeconds=0, background=True) @@ -76,6 +76,7 @@ class SubscriptionController(base.Subscription): cursor = self._collection.find(query, projection=projection) cursor = cursor.limit(limit).sort('_id') marker_name = {} + ntotal = self._collection.count_documents(query, limit=limit) now = timeutils.utcnow_ts() @@ -84,7 +85,7 @@ class SubscriptionController(base.Subscription): return _basic_subscription(record, now) - yield utils.HookedCursor(cursor, normalizer) + yield utils.HookedCursor(cursor, normalizer, ntotal=ntotal) yield marker_name and marker_name['next'] @utils.raises_conn_error diff --git a/zaqar/storage/mongodb/topic_messages.py b/zaqar/storage/mongodb/topic_messages.py index 425c526be..af9023010 100644 --- a/zaqar/storage/mongodb/topic_messages.py +++ b/zaqar/storage/mongodb/topic_messages.py @@ -23,7 +23,9 @@ Field Mappings: import datetime import time +import uuid +from bson import binary from bson import objectid from oslo_log import log as logging from oslo_utils import timeutils @@ -144,24 +146,24 @@ class MessageController(storage.Message): def _ensure_indexes(self, collection): """Ensures that all indexes are created.""" - collection.ensure_index(TTL_INDEX_FIELDS, + collection.create_index(TTL_INDEX_FIELDS, name='ttl', expireAfterSeconds=0, background=True) - collection.ensure_index(ACTIVE_INDEX_FIELDS, + collection.create_index(ACTIVE_INDEX_FIELDS, name='active', background=True) - collection.ensure_index(COUNTING_INDEX_FIELDS, + collection.create_index(COUNTING_INDEX_FIELDS, name='counting', background=True) - collection.ensure_index(MARKER_INDEX_FIELDS, + collection.create_index(MARKER_INDEX_FIELDS, name='queue_marker', background=True) - collection.ensure_index(TRANSACTION_INDEX_FIELDS, + collection.create_index(TRANSACTION_INDEX_FIELDS, name='transaction', background=True) @@ -205,7 +207,7 @@ class MessageController(storage.Message): def _list(self, topic_name, project=None, marker=None, echo=False, client_uuid=None, projection=None, include_claimed=False, include_delayed=False, - sort=1, limit=None): + sort=1, limit=None, count=False): """Message document listing helper. :param topic_name: Name of the topic to list @@ -232,6 +234,7 @@ class MessageController(storage.Message): to list. The results may include fewer messages than the requested `limit` if not enough are available. If limit is not specified + :param count: (Default False) If return the count number of cursor :returns: Generator yielding up to `limit` messages. """ @@ -255,6 +258,12 @@ class MessageController(storage.Message): } if not echo: + if (client_uuid is not None) and not isinstance(client_uuid, + uuid.UUID): + client_uuid = uuid.UUID(client_uuid) + client_uuid = binary.Binary.from_uuid(client_uuid) + elif isinstance(client_uuid, uuid.UUID): + client_uuid = binary.Binary.from_uuid(client_uuid) query['u'] = {'$ne': client_uuid} if marker is not None: @@ -274,12 +283,19 @@ class MessageController(storage.Message): cursor = collection.find(query, projection=projection, sort=[('k', sort)]) + ntotal = None + if count: + ntotal = collection.count_documents(query) if limit is not None: cursor.limit(limit) + if count: + ntotal = collection.count_documents(query, limit=limit) # NOTE(flaper87): Suggest the index to use for this query to # ensure the most performant one is chosen. + if count: + return cursor.hint(ACTIVE_INDEX_FIELDS), ntotal return cursor.hint(ACTIVE_INDEX_FIELDS) # ---------------------------------------------------------------------- @@ -310,7 +326,8 @@ class MessageController(storage.Message): } collection = self._collection(topic_name, project) - return collection.count(filter=query, hint=COUNTING_INDEX_FIELDS) + return collection.count_documents(filter=query, + hint=COUNTING_INDEX_FIELDS) def _active(self, topic_name, marker=None, echo=False, client_uuid=None, projection=None, project=None, @@ -447,10 +464,12 @@ class MessageController(storage.Message): except ValueError: yield iter([]) - messages = self._list(topic_name, project=project, marker=marker, - client_uuid=client_uuid, echo=echo, - include_claimed=include_claimed, - include_delayed=include_delayed, limit=limit) + messages, ntotal = self._list(topic_name, project=project, + marker=marker, + client_uuid=client_uuid, echo=echo, + include_claimed=include_claimed, + include_delayed=include_delayed, + limit=limit, count=True) marker_id = {} @@ -463,7 +482,7 @@ class MessageController(storage.Message): return _basic_message(msg, now) - yield utils.HookedCursor(messages, denormalizer) + yield utils.HookedCursor(messages, denormalizer, ntotal=ntotal) yield str(marker_id['next']) @utils.raises_conn_error @@ -524,11 +543,12 @@ class MessageController(storage.Message): # NOTE(flaper87): Should this query # be sorted? messages = collection.find(query).hint(ID_INDEX_FIELDS) + ntotal = collection.count_documents(query) def denormalizer(msg): return _basic_message(msg, now) - return utils.HookedCursor(messages, denormalizer) + return utils.HookedCursor(messages, denormalizer, ntotal=ntotal) @utils.raises_conn_error @utils.retries_on_autoreconnect @@ -554,6 +574,13 @@ class MessageController(storage.Message): project, amount=msgs_n) - msgs_n + if (client_uuid is not None) and not isinstance(client_uuid, + uuid.UUID): + client_uuid = uuid.UUID(client_uuid) + client_uuid = binary.Binary.from_uuid(client_uuid) + elif isinstance(client_uuid, uuid.UUID): + client_uuid = binary.Binary.from_uuid(client_uuid) + prepared_messages = [] for index, message in enumerate(messages): msg = { @@ -682,16 +709,16 @@ class FIFOMessageController(MessageController): def _ensure_indexes(self, collection): """Ensures that all indexes are created.""" - collection.ensure_index(TTL_INDEX_FIELDS, + collection.create_index(TTL_INDEX_FIELDS, name='ttl', expireAfterSeconds=0, background=True) - collection.ensure_index(ACTIVE_INDEX_FIELDS, + collection.create_index(ACTIVE_INDEX_FIELDS, name='active', background=True) - collection.ensure_index(COUNTING_INDEX_FIELDS, + collection.create_index(COUNTING_INDEX_FIELDS, name='counting', background=True) @@ -702,12 +729,12 @@ class FIFOMessageController(MessageController): # to miss a message when there is more than one # producer posting messages to the same queue, in # parallel. - collection.ensure_index(MARKER_INDEX_FIELDS, + collection.create_index(MARKER_INDEX_FIELDS, name='queue_marker', unique=True, background=True) - collection.ensure_index(TRANSACTION_INDEX_FIELDS, + collection.create_index(TRANSACTION_INDEX_FIELDS, name='transaction', background=True) @@ -742,6 +769,13 @@ class FIFOMessageController(MessageController): # Unique transaction ID to facilitate atomic batch inserts transaction = objectid.ObjectId() + if (client_uuid is not None) and not isinstance(client_uuid, + uuid.UUID): + client_uuid = uuid.UUID(client_uuid) + client_uuid = binary.Binary.from_uuid(client_uuid) + elif isinstance(client_uuid, uuid.UUID): + client_uuid = binary.Binary.from_uuid(client_uuid) + prepared_messages = [] for index, message in enumerate(messages): msg = { diff --git a/zaqar/storage/mongodb/topics.py b/zaqar/storage/mongodb/topics.py index 6eddd0303..2d45cbcf7 100644 --- a/zaqar/storage/mongodb/topics.py +++ b/zaqar/storage/mongodb/topics.py @@ -81,7 +81,7 @@ class TopicController(storage.Topic): # allows for querying by project and project+name. # This is also useful for retrieving the queues list for # a specific project, for example. Order matters! - self._collection.ensure_index([('p_t', 1)], unique=True) + self._collection.create_index([('p_t', 1)], unique=True) # ---------------------------------------------------------------------- # Helpers @@ -196,6 +196,7 @@ class TopicController(storage.Topic): cursor = self._collection.find(query, projection=projection) cursor = cursor.limit(limit).sort('p_t') marker_name = {} + ntotal = self._collection.count_documents(query, limit=limit) def normalizer(record): topic = {'name': utils.descope_queue_name(record['p_t'])} @@ -204,7 +205,7 @@ class TopicController(storage.Topic): topic['metadata'] = record['m'] return topic - yield utils.HookedCursor(cursor, normalizer) + yield utils.HookedCursor(cursor, normalizer, ntotal=ntotal) yield marker_name and marker_name['next'] @utils.raises_conn_error diff --git a/zaqar/storage/mongodb/utils.py b/zaqar/storage/mongodb/utils.py index 914bf8167..b006756e7 100644 --- a/zaqar/storage/mongodb/utils.py +++ b/zaqar/storage/mongodb/utils.py @@ -317,9 +317,10 @@ def retries_on_autoreconnect(func): class HookedCursor(object): - def __init__(self, cursor, denormalizer): + def __init__(self, cursor, denormalizer, ntotal=None): self.cursor = cursor self.denormalizer = denormalizer + self.ntotal = ntotal def __getattr__(self, attr): return getattr(self.cursor, attr) @@ -328,11 +329,14 @@ class HookedCursor(object): return self def __len__(self): - return self.cursor.count(True) + return self.ntotal @raises_conn_error def next(self): - item = next(self.cursor) + try: + item = next(self.cursor) + except errors.InvalidOperation: + raise StopIteration() return self.denormalizer(item) def __next__(self): diff --git a/zaqar/tests/unit/storage/test_impl_mongodb.py b/zaqar/tests/unit/storage/test_impl_mongodb.py index a78618141..c3e2a943e 100644 --- a/zaqar/tests/unit/storage/test_impl_mongodb.py +++ b/zaqar/tests/unit/storage/test_impl_mongodb.py @@ -313,10 +313,10 @@ class MongodbMessageTests(MongodbSetupMixin, base.MessageControllerTest): seed_marker1 = self.controller._get_counter(queue_name, self.project) self.assertEqual(0, seed_marker1, 'First marker is 0') - + uuid = '97b64000-2526-11e3-b088-d85c1300734c' for i in range(iterations): - self.controller.post(queue_name, [{'ttl': 60}], - 'uuid', project=self.project) + self.controller.post(queue_name, [{'ttl': 60}], uuid, + project=self.project) marker1 = self.controller._get_counter(queue_name, self.project)