From 85a0a6a28e166bc076cf8786de2b46248d8786a2 Mon Sep 17 00:00:00 2001 From: Eran Rom Date: Sun, 26 Jul 2015 13:31:17 +0300 Subject: [PATCH] Container-Sync to iterate only over synced containers This change introduces a sync_store which holds only containers that are enabled for sync. The store is implemented using a directory structure that resembles that of the containers directory, but has entries only for containers enabled for sync. The store is maintained in two ways: 1. Preemptively by the container server when processing PUT/POST/DELETE operations targeted at containers with x-container-sync-key / x-container-sync-to 2. In the background using the containers replicator whenever it processes a container set up for sync The change updates [1] [1] http://docs.openstack.org/developer/swift/overview_container_sync.html Change-Id: I9ae4d4c7ff6336611df4122b7c753cc4fa46c0ff Closes-Bug: #1476623 --- doc/source/overview_container_sync.rst | 71 +++-- swift/container/replicator.py | 18 ++ swift/container/server.py | 15 + swift/container/sync.py | 40 ++- swift/container/sync_store.py | 177 ++++++++++++ test/probe/test_container_sync.py | 78 +++++- test/unit/container/test_replicator.py | 133 ++++++++- test/unit/container/test_server.py | 69 +++++ test/unit/container/test_sync.py | 182 +++++++----- test/unit/container/test_sync_store.py | 367 +++++++++++++++++++++++++ 10 files changed, 1034 insertions(+), 116 deletions(-) create mode 100644 swift/container/sync_store.py create mode 100644 test/unit/container/test_sync_store.py diff --git a/doc/source/overview_container_sync.rst b/doc/source/overview_container_sync.rst index 8f03bf8174..c1255acaff 100644 --- a/doc/source/overview_container_sync.rst +++ b/doc/source/overview_container_sync.rst @@ -29,7 +29,7 @@ synchronization key. Configuring Container Sync -------------------------- -Create a container-sync-realms.conf file specifying the allowable clusters +Create a ``container-sync-realms.conf`` file specifying the allowable clusters and their information:: [realm1] @@ -50,18 +50,18 @@ clusters that have agreed to allow container syncing with each other. Realm names will be considered case insensitive. The key is the overall cluster-to-cluster key used in combination with the -external users' key that they set on their containers' X-Container-Sync-Key -metadata header values. These keys will be used to sign each request the -container sync daemon makes and used to validate each incoming container sync -request. +external users' key that they set on their containers' +``X-Container-Sync-Key`` metadata header values. These keys will be used to +sign each request the container sync daemon makes and used to validate each +incoming container sync request. The key2 is optional and is an additional key incoming requests will be checked against. This is so you can rotate keys if you wish; you move the existing key to key2 and make a new key value. -Any values in the realm section whose names begin with cluster\_ will indicate -the name and endpoint of a cluster and will be used by external users in -their containers' X-Container-Sync-To metadata header values with the format +Any values in the realm section whose names begin with ``cluster_`` will +indicate the name and endpoint of a cluster and will be used by external users in +their containers' ``X-Container-Sync-To`` metadata header values with the format "//realm_name/cluster_name/account_name/container_name". Realm and cluster names are considered case insensitive. @@ -71,7 +71,7 @@ container servers, since that is where the container sync daemon runs. Note that the endpoint ends with /v1/ and that the container sync daemon will then add the account/container/obj name after that. -Distribute this container-sync-realms.conf file to all your proxy servers +Distribute this ``container-sync-realms.conf`` file to all your proxy servers and container servers. You also need to add the container_sync middleware to your proxy pipeline. It @@ -95,7 +95,7 @@ section, Configuring Container Sync, for the new-style. With the old-style, the Swift cluster operator must allow synchronization with a set of hosts before the user can enable container synchronization. First, the backend container server needs to be given this list of hosts in the -container-server.conf file:: +``container-server.conf`` file:: [DEFAULT] # This is a comma separated list of hosts allowed in the @@ -170,8 +170,8 @@ we'll make next:: The ``-t`` indicates the cluster to sync to, which is the realm name of the section from container-sync-realms.conf, followed by the cluster name from -that section (without the cluster\_ prefix), followed by the account and container names we want to sync to. -The ``-k`` specifies the secret key the two containers will share for +that section (without the cluster\_ prefix), followed by the account and container +names we want to sync to. The ``-k`` specifies the secret key the two containers will share for synchronization; this is the user key, the cluster key in container-sync-realms.conf will also be used behind the scenes. @@ -195,8 +195,18 @@ as it gets synchronized over to the second:: list container2 [Nothing there yet, so we wait a bit...] - [If you're an operator running SAIO and just testing, you may need to - run 'swift-init container-sync once' to perform a sync scan.] + +.. note:: + + If you're an operator running SAIO and just testing, each time you + configure a container for synchronization and place objects in the + source container you will need to ensure that container-sync runs + before attempting to retrieve objects from the target container. + That is, you need to run:: + + swift-init container-sync once + +Now expect to see objects copied from the first container to the second:: $ swift -A http://cluster2/auth/v1.0 -U test2:tester2 -K testing2 \ list container2 @@ -340,13 +350,34 @@ synchronize to the second, we could have used this curl command:: What's going on behind the scenes, in the cluster? -------------------------------------------------- -The swift-container-sync does the job of sending updates to the remote -container. +Container ring devices have a directory called ``containers``, where container +databases reside. In addition to ``containers``, each container ring device +also has a directory called ``sync-containers``. ``sync-containers`` holds +symlinks to container databases that were configured for container sync using +``x-container-sync-to`` and ``x-container-sync-key`` metadata keys. -This is done by scanning the local devices for container databases and -checking for x-container-sync-to and x-container-sync-key metadata values. -If they exist, newer rows since the last sync will trigger PUTs or DELETEs -to the other container. +The swift-container-sync process does the job of sending updates to the remote +container. This is done by scanning ``sync-containers`` for container +databases. For each container db found, newer rows since the last sync will +trigger PUTs or DELETEs to the other container. + +``sync-containers`` is maintained as follows: +Whenever the container-server processes a PUT or a POST request that carries +``x-container-sync-to`` and ``x-container-sync-key`` metadata keys the server +creates a symlink to the container database in ``sync-containers``. Whenever +the container server deletes a synced container, the appropriate symlink +is deleted from ``sync-containers``. + +In addition to the container-server, the container-replicator process does the +job of identifying containers that should be synchronized. This is done by +scanning the local devices for container databases and checking for +x-container-sync-to and x-container-sync-key metadata values. If they exist +then a symlink to the container database is created in a sync-containers +sub-directory on the same device. + +Similarly, when the container sync metadata keys are deleted, the container +server and container-replicator would take care of deleting the symlinks +from ``sync-containers``. .. note:: diff --git a/swift/container/replicator.py b/swift/container/replicator.py index 810c7db600..b428086bdd 100644 --- a/swift/container/replicator.py +++ b/swift/container/replicator.py @@ -20,6 +20,7 @@ import time from collections import defaultdict from eventlet import Timeout +from swift.container.sync_store import ContainerSyncStore from swift.container.backend import ContainerBroker, DATADIR from swift.container.reconciler import ( MISPLACED_OBJECTS_ACCOUNT, incorrect_policy_index, @@ -189,6 +190,13 @@ class ContainerReplicator(db_replicator.Replicator): def _post_replicate_hook(self, broker, info, responses): if info['account'] == MISPLACED_OBJECTS_ACCOUNT: return + + try: + self.sync_store.update_sync_store(broker) + except Exception: + self.logger.exception('Failed to update sync_store %s' % + broker.db_file) + point = broker.get_reconciler_sync() if not broker.has_multiple_policies() and info['max_row'] != point: broker.update_reconciler_sync(info['max_row']) @@ -210,6 +218,13 @@ class ContainerReplicator(db_replicator.Replicator): # this container shouldn't be here, make sure it's cleaned up self.reconciler_cleanups[broker.container] = broker return + try: + # DB is going to get deleted. Be preemptive about it + self.sync_store.remove_synced_container(broker) + except Exception: + self.logger.exception('Failed to remove sync_store entry %s' % + broker.db_file) + return super(ContainerReplicator, self).delete_db(broker) def replicate_reconcilers(self): @@ -237,6 +252,9 @@ class ContainerReplicator(db_replicator.Replicator): def run_once(self, *args, **kwargs): self.reconciler_containers = {} self.reconciler_cleanups = {} + self.sync_store = ContainerSyncStore(self.root, + self.logger, + self.mount_check) rv = super(ContainerReplicator, self).run_once(*args, **kwargs) if any([self.reconciler_containers, self.reconciler_cleanups]): self.replicate_reconcilers() diff --git a/swift/container/server.py b/swift/container/server.py index 5f571ef9f2..0a09f57615 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -23,6 +23,7 @@ from xml.etree.cElementTree import Element, SubElement, tostring from eventlet import Timeout import swift.common.db +from swift.container.sync_store import ContainerSyncStore from swift.container.backend import ContainerBroker, DATADIR from swift.container.replicator import ContainerReplicatorRpc from swift.common.db import DatabaseAlreadyExists @@ -110,6 +111,9 @@ class ContainerController(BaseStorageServer): self.save_headers.append('x-versions-location') swift.common.db.DB_PREALLOCATION = \ config_true_value(conf.get('db_preallocation', 'f')) + self.sync_store = ContainerSyncStore(self.root, + self.logger, + self.mount_check) def _get_container_broker(self, drive, part, account, container, **kwargs): """ @@ -242,6 +246,13 @@ class ContainerController(BaseStorageServer): else: return None + def _update_sync_store(self, broker, method): + try: + self.sync_store.update_sync_store(broker) + except Exception: + self.logger.exception('Failed to update sync_store %s during %s' % + broker.db_file, method) + @public @timing_stats() def DELETE(self, req): @@ -276,6 +287,7 @@ class ContainerController(BaseStorageServer): broker.delete_db(req_timestamp.internal) if not broker.is_deleted(): return HTTPConflict(request=req) + self._update_sync_store(broker, 'DELETE') resp = self.account_update(req, account, container, broker) if resp: return resp @@ -381,6 +393,8 @@ class ContainerController(BaseStorageServer): broker.metadata['X-Container-Sync-To'][0]: broker.set_x_container_sync_points(-1, -1) broker.update_metadata(metadata, validate_metadata=True) + if metadata: + self._update_sync_store(broker, 'PUT') resp = self.account_update(req, account, container, broker) if resp: return resp @@ -564,6 +578,7 @@ class ContainerController(BaseStorageServer): broker.metadata['X-Container-Sync-To'][0]: broker.set_x_container_sync_points(-1, -1) broker.update_metadata(metadata, validate_metadata=True) + self._update_sync_store(broker, 'POST') return HTTPNoContent(request=req) def __call__(self, env, start_response): diff --git a/swift/container/sync.py b/swift/container/sync.py index 089c9a7481..ef9543883a 100644 --- a/swift/container/sync.py +++ b/swift/container/sync.py @@ -24,7 +24,9 @@ from struct import unpack_from from eventlet import sleep, Timeout import swift.common.db -from swift.container.backend import ContainerBroker, DATADIR +from swift.common.db import DatabaseConnectionError +from swift.container.backend import ContainerBroker +from swift.container.sync_store import ContainerSyncStore from swift.common.container_sync_realms import ContainerSyncRealms from swift.common.internal_client import ( delete_object, put_object, InternalClient, UnexpectedResponse) @@ -32,7 +34,7 @@ from swift.common.exceptions import ClientException from swift.common.ring import Ring from swift.common.ring.utils import is_local_device from swift.common.utils import ( - audit_location_generator, clean_content_type, config_true_value, + clean_content_type, config_true_value, FileLikeIter, get_logger, hash_path, quote, urlparse, validate_sync_to, whataremyips, Timestamp) from swift.common.daemon import Daemon @@ -187,6 +189,10 @@ class ContainerSync(Daemon): a.strip() for a in conf.get('sync_proxy', '').split(',') if a.strip()] + #: ContainerSyncStore instance for iterating over synced containers + self.sync_store = ContainerSyncStore(self.devices, + self.logger, + self.mount_check) #: Number of containers with sync turned on that were successfully #: synced. self.container_syncs = 0 @@ -194,7 +200,8 @@ class ContainerSync(Daemon): self.container_deletes = 0 #: Number of successful PUTs triggered. self.container_puts = 0 - #: Number of containers that didn't have sync turned on. + #: Number of containers whose sync has been turned off, but + #: are not yet cleared from the sync store. self.container_skips = 0 #: Number of containers that had a failure of some type. self.container_failures = 0 @@ -247,10 +254,7 @@ class ContainerSync(Daemon): sleep(random() * self.interval) while True: begin = time() - all_locs = audit_location_generator(self.devices, DATADIR, '.db', - mount_check=self.mount_check, - logger=self.logger) - for path, device, partition in all_locs: + for path in self.sync_store.synced_containers_generator(): self.container_sync(path) if time() - self.reported >= 3600: # once an hour self.report() @@ -264,10 +268,7 @@ class ContainerSync(Daemon): """ self.logger.info(_('Begin container sync "once" mode')) begin = time() - all_locs = audit_location_generator(self.devices, DATADIR, '.db', - mount_check=self.mount_check, - logger=self.logger) - for path, device, partition in all_locs: + for path in self.sync_store.synced_containers_generator(): self.container_sync(path) if time() - self.reported >= 3600: # once an hour self.report() @@ -308,7 +309,20 @@ class ContainerSync(Daemon): broker = None try: broker = ContainerBroker(path) - info = broker.get_info() + # The path we pass to the ContainerBroker is a real path of + # a container DB. If we get here, however, it means that this + # path is linked from the sync_containers dir. In rare cases + # of race or processes failures the link can be stale and + # the get_info below will raise a DB doesn't exist exception + # In this case we remove the stale link and raise an error + # since in most cases the db should be there. + try: + info = broker.get_info() + except DatabaseConnectionError as db_err: + if str(db_err).endswith("DB doesn't exist"): + self.sync_store.remove_synced_container(broker) + raise + x, nodes = self.container_ring.get_nodes(info['account'], info['container']) for ordinal, node in enumerate(nodes): @@ -388,7 +402,7 @@ class ContainerSync(Daemon): broker.set_x_container_sync_points(sync_point1, None) self.container_syncs += 1 self.logger.increment('syncs') - except (Exception, Timeout) as err: + except (Exception, Timeout): self.container_failures += 1 self.logger.increment('failures') self.logger.exception(_('ERROR Syncing %s'), diff --git a/swift/container/sync_store.py b/swift/container/sync_store.py new file mode 100644 index 0000000000..729eaee093 --- /dev/null +++ b/swift/container/sync_store.py @@ -0,0 +1,177 @@ +# Copyright (c) 2010-2016 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import errno + +from swift.common.utils import audit_location_generator, mkdirs +from swift.container.backend import DATADIR + +SYNC_DATADIR = 'sync_containers' + + +class ContainerSyncStore(object): + """ + Filesystem based store for local containers that needs to be synced. + + The store holds a list of containers that need to be synced by the + container sync daemon. The store is local to the container server node, + that is, only containers whose databases are kept locally on the node are + listed. + """ + def __init__(self, devices, logger, mount_check): + self.devices = os.path.normpath(os.path.join('/', devices)) + '/' + self.logger = logger + self.mount_check = mount_check + + def _container_to_synced_container_path(self, path): + # path is assumed to be of the form: + # /srv/node/sdb/containers/part/.../*.db + # or more generally: + # devices/device/containers/part/.../*.db + # Below we split the path to the following parts: + # devices, device, rest + devices = self.devices + path = os.path.normpath(path) + device = path[len(devices):path.rfind(DATADIR)] + rest = path[path.rfind(DATADIR) + len(DATADIR) + 1:] + + return os.path.join(devices, device, SYNC_DATADIR, rest) + + def _synced_container_to_container_path(self, path): + # synced path is assumed to be of the form: + # /srv/node/sdb/sync_containers/part/.../*.db + # or more generally: + # devices/device/sync_containers/part/.../*.db + # Below we split the path to the following parts: + # devices, device, rest + devices = self.devices + path = os.path.normpath(path) + device = path[len(devices):path.rfind(SYNC_DATADIR)] + rest = path[path.rfind(SYNC_DATADIR) + len(SYNC_DATADIR) + 1:] + + return os.path.join(devices, device, DATADIR, rest) + + def add_synced_container(self, broker): + """ + Adds the container db represented by broker to the list of synced + containers. + + :param broker: An instance of ContainerBroker representing the + container to add. + """ + sync_file = self._container_to_synced_container_path(broker.db_file) + stat = None + try: + stat = os.stat(sync_file) + except OSError as oserr: + if oserr.errno != errno.ENOENT: + raise oserr + + if stat is not None: + return + + sync_path = os.path.dirname(sync_file) + mkdirs(sync_path) + + try: + os.symlink(broker.db_file, sync_file) + except OSError as oserr: + if (oserr.errno != errno.EEXIST or + not os.path.islink(sync_file)): + raise oserr + + def remove_synced_container(self, broker): + """ + Removes the container db represented by broker from the list of synced + containers. + + :param broker: An instance of ContainerBroker representing the + container to remove. + """ + sync_file = broker.db_file + sync_file = self._container_to_synced_container_path(sync_file) + try: + os.unlink(sync_file) + os.removedirs(os.path.dirname(sync_file)) + except OSError as oserr: + if oserr.errno != errno.ENOENT: + raise oserr + + def update_sync_store(self, broker): + """ + Add or remove a symlink to/from the sync-containers directory + according to the broker's metadata. + + Decide according to the broker x-container-sync-to and + x-container-sync-key whether a symlink needs to be added or + removed. + + We mention that if both metadata items do not appear + at all, the container has never been set for sync in reclaim_age + in which case we do nothing. This is important as this method is + called for ALL containers from the container replicator. + + Once we realize that we do need to do something, we check if + the container is marked for delete, in which case we want to + remove the symlink + + For adding a symlink we notice that both x-container-sync-to and + x-container-sync-key exist and are valid, that is, are not empty. + + At this point we know we need to do something, the container + is not marked for delete and the condition to add a symlink + is not met. conclusion need to remove the symlink. + + :param broker: An instance of ContainerBroker + """ + # If the broker metadata does not have both x-container-sync-to + # and x-container-sync-key it has *never* been set. Make sure + # we do nothing in this case + if ('X-Container-Sync-To' not in broker.metadata and + 'X-Container-Sync-Key' not in broker.metadata): + return + + if broker.is_deleted(): + self.remove_synced_container(broker) + return + + # If both x-container-sync-to and x-container-sync-key + # exist and valid, add the symlink + sync_to = sync_key = None + if 'X-Container-Sync-To' in broker.metadata: + sync_to = broker.metadata['X-Container-Sync-To'][0] + if 'X-Container-Sync-Key' in broker.metadata: + sync_key = broker.metadata['X-Container-Sync-Key'][0] + if sync_to and sync_key: + self.add_synced_container(broker) + return + + self.remove_synced_container(broker) + + def synced_containers_generator(self): + """ + Iterates over the list of synced containers + yielding the path of the container db + """ + all_locs = audit_location_generator(self.devices, SYNC_DATADIR, '.db', + mount_check=self.mount_check, + logger=self.logger) + for path, device, partition in all_locs: + # What we want to yield is the real path as its being used for + # initiating a container broker. The broker would break if not + # given the db real path, as it e.g. assumes the existence of + # .pending in the same path + yield self._synced_container_to_container_path(path) diff --git a/test/probe/test_container_sync.py b/test/probe/test_container_sync.py index 4288dd4644..763b2d3159 100644 --- a/test/probe/test_container_sync.py +++ b/test/probe/test_container_sync.py @@ -18,8 +18,9 @@ from nose import SkipTest import unittest from six.moves.urllib.parse import urlparse -from swiftclient import client +from swiftclient import client, ClientException +from swift.common.http import HTTP_NOT_FOUND from swift.common.manager import Manager from test.probe.common import ReplProbeTest, ENABLED_POLICIES @@ -49,25 +50,27 @@ class TestContainerSync(ReplProbeTest): super(TestContainerSync, self).setUp() self.realm, self.cluster = get_current_realm_cluster(self.url) - def test_sync(self): - base_headers = {'X-Container-Sync-Key': 'secret'} - + def _setup_synced_containers(self, skey='secret', dkey='secret'): # setup dest container dest_container = 'dest-container-%s' % uuid.uuid4() - dest_headers = base_headers.copy() + dest_headers = {} dest_policy = None if len(ENABLED_POLICIES) > 1: dest_policy = random.choice(ENABLED_POLICIES) dest_headers['X-Storage-Policy'] = dest_policy.name + if dkey is not None: + dest_headers['X-Container-Sync-Key'] = dkey client.put_container(self.url, self.token, dest_container, headers=dest_headers) # setup source container source_container = 'source-container-%s' % uuid.uuid4() - source_headers = base_headers.copy() + source_headers = {} sync_to = '//%s/%s/%s/%s' % (self.realm, self.cluster, self.account, dest_container) source_headers['X-Container-Sync-To'] = sync_to + if skey is not None: + source_headers['X-Container-Sync-Key'] = skey if dest_policy: source_policy = random.choice([p for p in ENABLED_POLICIES if p is not dest_policy]) @@ -75,6 +78,11 @@ class TestContainerSync(ReplProbeTest): client.put_container(self.url, self.token, source_container, headers=source_headers) + return source_container, dest_container + + def test_sync(self): + source_container, dest_container = self._setup_synced_containers() + # upload to source object_name = 'object-%s' % uuid.uuid4() client.put_object(self.url, self.token, source_container, object_name, @@ -83,11 +91,63 @@ class TestContainerSync(ReplProbeTest): # cycle container-sync Manager(['container-sync']).once() - # retrieve from sync'd container - headers, body = client.get_object(self.url, self.token, - dest_container, object_name) + _junk, body = client.get_object(self.url, self.token, + dest_container, object_name) self.assertEqual(body, 'test-body') + def test_sync_lazy_skey(self): + # Create synced containers, but with no key at source + source_container, dest_container =\ + self._setup_synced_containers(None, 'secret') + + # upload to source + object_name = 'object-%s' % uuid.uuid4() + client.put_object(self.url, self.token, source_container, object_name, + 'test-body') + + # cycle container-sync, nothing should happen + Manager(['container-sync']).once() + with self.assertRaises(ClientException) as err: + _junk, body = client.get_object(self.url, self.token, + dest_container, object_name) + self.assertEqual(err.exception.http_status, HTTP_NOT_FOUND) + + # amend source key + source_headers = {'X-Container-Sync-Key': 'secret'} + client.put_container(self.url, self.token, source_container, + headers=source_headers) + # cycle container-sync, should replicate + Manager(['container-sync']).once() + _junk, body = client.get_object(self.url, self.token, + dest_container, object_name) + self.assertEqual(body, 'test-body') + + def test_sync_lazy_dkey(self): + # Create synced containers, but with no key at dest + source_container, dest_container =\ + self._setup_synced_containers('secret', None) + + # upload to source + object_name = 'object-%s' % uuid.uuid4() + client.put_object(self.url, self.token, source_container, object_name, + 'test-body') + + # cycle container-sync, nothing should happen + Manager(['container-sync']).once() + with self.assertRaises(ClientException) as err: + _junk, body = client.get_object(self.url, self.token, + dest_container, object_name) + self.assertEqual(err.exception.http_status, HTTP_NOT_FOUND) + + # amend dest key + dest_headers = {'X-Container-Sync-Key': 'secret'} + client.put_container(self.url, self.token, dest_container, + headers=dest_headers) + # cycle container-sync, should replicate + Manager(['container-sync']).once() + _junk, body = client.get_object(self.url, self.token, + dest_container, object_name) + self.assertEqual(body, 'test-body') if __name__ == "__main__": unittest.main() diff --git a/test/unit/container/test_replicator.py b/test/unit/container/test_replicator.py index 9216fed5ed..bdfe481d15 100644 --- a/test/unit/container/test_replicator.py +++ b/test/unit/container/test_replicator.py @@ -23,14 +23,14 @@ import random import sqlite3 from swift.common import db_replicator -from swift.container import replicator, backend, server +from swift.container import replicator, backend, server, sync_store from swift.container.reconciler import ( MISPLACED_OBJECTS_ACCOUNT, get_reconciler_container_name) from swift.common.utils import Timestamp from swift.common.storage_policy import POLICIES from test.unit.common import test_db_replicator -from test.unit import patch_policies, make_timestamp_iter +from test.unit import patch_policies, make_timestamp_iter, FakeLogger from contextlib import contextmanager @@ -998,6 +998,135 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): daemon._post_replicate_hook(broker, info, []) self.assertEqual(0, len(calls)) + def test_update_sync_store_exception(self): + class FakeContainerSyncStore(object): + def update_sync_store(self, broker): + raise OSError(1, '1') + + logger = FakeLogger() + daemon = replicator.ContainerReplicator({}, logger) + daemon.sync_store = FakeContainerSyncStore() + ts_iter = make_timestamp_iter() + broker = self._get_broker('a', 'c', node_index=0) + timestamp = next(ts_iter) + broker.initialize(timestamp.internal, POLICIES.default.idx) + info = broker.get_replication_info() + daemon._post_replicate_hook(broker, info, []) + log_lines = logger.get_lines_for_level('error') + self.assertEqual(1, len(log_lines)) + self.assertIn('Failed to update sync_store', log_lines[0]) + + def test_update_sync_store(self): + klass = 'swift.container.sync_store.ContainerSyncStore' + daemon = replicator.ContainerReplicator({}) + daemon.sync_store = sync_store.ContainerSyncStore( + daemon.root, daemon.logger, daemon.mount_check) + ts_iter = make_timestamp_iter() + broker = self._get_broker('a', 'c', node_index=0) + timestamp = next(ts_iter) + broker.initialize(timestamp.internal, POLICIES.default.idx) + info = broker.get_replication_info() + with mock.patch(klass + '.remove_synced_container') as mock_remove: + with mock.patch(klass + '.add_synced_container') as mock_add: + daemon._post_replicate_hook(broker, info, []) + self.assertEqual(0, mock_remove.call_count) + self.assertEqual(0, mock_add.call_count) + + timestamp = next(ts_iter) + # sync-to and sync-key empty - remove from store + broker.update_metadata( + {'X-Container-Sync-To': ('', timestamp.internal), + 'X-Container-Sync-Key': ('', timestamp.internal)}) + with mock.patch(klass + '.remove_synced_container') as mock_remove: + with mock.patch(klass + '.add_synced_container') as mock_add: + daemon._post_replicate_hook(broker, info, []) + self.assertEqual(0, mock_add.call_count) + mock_remove.assert_called_once_with(broker) + + timestamp = next(ts_iter) + # sync-to is not empty sync-key is empty - remove from store + broker.update_metadata( + {'X-Container-Sync-To': ('a', timestamp.internal)}) + with mock.patch(klass + '.remove_synced_container') as mock_remove: + with mock.patch(klass + '.add_synced_container') as mock_add: + daemon._post_replicate_hook(broker, info, []) + self.assertEqual(0, mock_add.call_count) + mock_remove.assert_called_once_with(broker) + + timestamp = next(ts_iter) + # sync-to is empty sync-key is not empty - remove from store + broker.update_metadata( + {'X-Container-Sync-To': ('', timestamp.internal), + 'X-Container-Sync-Key': ('secret', timestamp.internal)}) + with mock.patch(klass + '.remove_synced_container') as mock_remove: + with mock.patch(klass + '.add_synced_container') as mock_add: + daemon._post_replicate_hook(broker, info, []) + self.assertEqual(0, mock_add.call_count) + mock_remove.assert_called_once_with(broker) + + timestamp = next(ts_iter) + # sync-to, sync-key both not empty - add to store + broker.update_metadata( + {'X-Container-Sync-To': ('a', timestamp.internal), + 'X-Container-Sync-Key': ('secret', timestamp.internal)}) + with mock.patch(klass + '.remove_synced_container') as mock_remove: + with mock.patch(klass + '.add_synced_container') as mock_add: + daemon._post_replicate_hook(broker, info, []) + mock_add.assert_called_once_with(broker) + self.assertEqual(0, mock_remove.call_count) + + timestamp = next(ts_iter) + # container is removed - need to remove from store + broker.delete_db(timestamp.internal) + broker.update_metadata( + {'X-Container-Sync-To': ('a', timestamp.internal), + 'X-Container-Sync-Key': ('secret', timestamp.internal)}) + with mock.patch(klass + '.remove_synced_container') as mock_remove: + with mock.patch(klass + '.add_synced_container') as mock_add: + daemon._post_replicate_hook(broker, info, []) + self.assertEqual(0, mock_add.call_count) + mock_remove.assert_called_once_with(broker) + + def test_sync_triggers_sync_store_update(self): + klass = 'swift.container.sync_store.ContainerSyncStore' + ts_iter = make_timestamp_iter() + # Create two containers as follows: + # broker_1 which is not set for sync + # broker_2 which is set for sync and then unset + # test that while replicating both we see no activity + # for broker_1, and the anticipated activity for broker_2 + broker_1 = self._get_broker('a', 'c', node_index=0) + broker_1.initialize(next(ts_iter).internal, POLICIES.default.idx) + broker_2 = self._get_broker('b', 'd', node_index=0) + broker_2.initialize(next(ts_iter).internal, POLICIES.default.idx) + broker_2.update_metadata( + {'X-Container-Sync-To': ('a', next(ts_iter).internal), + 'X-Container-Sync-Key': ('secret', next(ts_iter).internal)}) + + # replicate once according to broker_1 + # relying on the fact that FakeRing would place both + # in the same partition. + part, node = self._get_broker_part_node(broker_1) + with mock.patch(klass + '.remove_synced_container') as mock_remove: + with mock.patch(klass + '.add_synced_container') as mock_add: + self._run_once(node) + self.assertEqual(1, mock_add.call_count) + self.assertEqual(broker_2.db_file, mock_add.call_args[0][0].db_file) + self.assertEqual(0, mock_remove.call_count) + + broker_2.update_metadata( + {'X-Container-Sync-To': ('', next(ts_iter).internal)}) + # replicate once this time according to broker_2 + # relying on the fact that FakeRing would place both + # in the same partition. + part, node = self._get_broker_part_node(broker_2) + with mock.patch(klass + '.remove_synced_container') as mock_remove: + with mock.patch(klass + '.add_synced_container') as mock_add: + self._run_once(node) + self.assertEqual(0, mock_add.call_count) + self.assertEqual(1, mock_remove.call_count) + self.assertEqual(broker_2.db_file, mock_remove.call_args[0][0].db_file) + if __name__ == '__main__': unittest.main() diff --git a/test/unit/container/test_server.py b/test/unit/container/test_server.py index fb414207d5..22e0f00c41 100644 --- a/test/unit/container/test_server.py +++ b/test/unit/container/test_server.py @@ -1153,6 +1153,75 @@ class TestContainerController(unittest.TestCase): self.assertEqual(info['x_container_sync_point1'], -1) self.assertEqual(info['x_container_sync_point2'], -1) + def test_update_sync_store_on_PUT(self): + # Create a synced container and validate a link is created + self._create_synced_container_and_validate_sync_store('PUT') + # remove the sync using PUT and validate the link is deleted + self._remove_sync_and_validate_sync_store('PUT') + + def test_update_sync_store_on_POST(self): + # Create a container and validate a link is not created + self._create_container_and_validate_sync_store() + # Update the container to be synced and validate a link is created + self._create_synced_container_and_validate_sync_store('POST') + # remove the sync using POST and validate the link is deleted + self._remove_sync_and_validate_sync_store('POST') + + def test_update_sync_store_on_DELETE(self): + # Create a synced container and validate a link is created + self._create_synced_container_and_validate_sync_store('PUT') + # Remove the container and validate the link is deleted + self._remove_sync_and_validate_sync_store('DELETE') + + def _create_container_and_validate_sync_store(self): + req = Request.blank( + '/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, + headers={'x-timestamp': '0'}) + req.get_response(self.controller) + db = self.controller._get_container_broker('sda1', 'p', 'a', 'c') + sync_store = self.controller.sync_store + db_path = db.db_file + db_link = sync_store._container_to_synced_container_path(db_path) + self.assertFalse(os.path.exists(db_link)) + sync_containers = [c for c in sync_store.synced_containers_generator()] + self.assertFalse(sync_containers) + + def _create_synced_container_and_validate_sync_store(self, method): + req = Request.blank( + '/sda1/p/a/c', environ={'REQUEST_METHOD': method}, + headers={'x-timestamp': '1', + 'x-container-sync-to': 'http://127.0.0.1:12345/v1/a/c', + 'x-container-sync-key': '1234'}) + req.get_response(self.controller) + db = self.controller._get_container_broker('sda1', 'p', 'a', 'c') + sync_store = self.controller.sync_store + db_path = db.db_file + db_link = sync_store._container_to_synced_container_path(db_path) + self.assertTrue(os.path.exists(db_link)) + sync_containers = [c for c in sync_store.synced_containers_generator()] + self.assertEqual(1, len(sync_containers)) + self.assertEqual(db_path, sync_containers[0]) + + def _remove_sync_and_validate_sync_store(self, method): + if method == 'DELETE': + headers = {'x-timestamp': '2'} + else: + headers = {'x-timestamp': '2', + 'x-container-sync-to': '', + 'x-container-sync-key': '1234'} + + req = Request.blank( + '/sda1/p/a/c', environ={'REQUEST_METHOD': method}, + headers=headers) + req.get_response(self.controller) + db = self.controller._get_container_broker('sda1', 'p', 'a', 'c') + sync_store = self.controller.sync_store + db_path = db.db_file + db_link = sync_store._container_to_synced_container_path(db_path) + self.assertFalse(os.path.exists(db_link)) + sync_containers = [c for c in sync_store.synced_containers_generator()] + self.assertFalse(sync_containers) + def test_REPLICATE_insufficient_storage(self): conf = {'devices': self.testdir, 'mount_check': 'true'} self.container_controller = container_server.ContainerController( diff --git a/test/unit/container/test_sync.py b/test/unit/container/test_sync.py index bc69fcffa5..60d606110d 100644 --- a/test/unit/container/test_sync.py +++ b/test/unit/container/test_sync.py @@ -19,8 +19,10 @@ import unittest from textwrap import dedent import mock +import errno from test.unit import debug_logger from swift.container import sync +from swift.common.db import DatabaseConnectionError from swift.common import utils from swift.common.wsgi import ConfigString from swift.common.exceptions import ClientException @@ -47,6 +49,7 @@ class FakeContainerBroker(object): def __init__(self, path, metadata=None, info=None, deleted=False, items_since=None): self.db_file = path + self.db_dir = os.path.dirname(path) self.metadata = metadata if metadata else {} self.info = info if info else {} self.deleted = deleted @@ -157,7 +160,6 @@ class TestContainerSync(unittest.TestCase): # interval sleep. time_calls = [0] sleep_calls = [] - audit_location_generator_calls = [0] def fake_time(): time_calls[0] += 1 @@ -176,48 +178,36 @@ class TestContainerSync(unittest.TestCase): def fake_sleep(amount): sleep_calls.append(amount) - def fake_audit_location_generator(*args, **kwargs): - audit_location_generator_calls[0] += 1 - # Makes .container_sync() short-circuit - yield 'container.db', 'device', 'partition' - return + gen_func = ('swift.container.sync_store.' + 'ContainerSyncStore.synced_containers_generator') + with mock.patch('swift.container.sync.InternalClient'), \ + mock.patch('swift.container.sync.time', fake_time), \ + mock.patch('swift.container.sync.sleep', fake_sleep), \ + mock.patch(gen_func) as fake_generator, \ + mock.patch('swift.container.sync.ContainerBroker', + lambda p: FakeContainerBroker(p, info={ + 'account': 'a', 'container': 'c', + 'storage_policy_index': 0})): + fake_generator.side_effect = [iter(['container.db']), + iter(['container.db'])] + cs = sync.ContainerSync({}, container_ring=FakeRing()) + try: + cs.run_forever() + except Exception as err: + if str(err) != 'we are now done': + raise - orig_time = sync.time - orig_sleep = sync.sleep - orig_ContainerBroker = sync.ContainerBroker - orig_audit_location_generator = sync.audit_location_generator - try: - sync.ContainerBroker = lambda p: FakeContainerBroker( - p, info={'account': 'a', 'container': 'c', - 'storage_policy_index': 0}) - sync.time = fake_time - sync.sleep = fake_sleep - - with mock.patch('swift.container.sync.InternalClient'): - cs = sync.ContainerSync({}, container_ring=FakeRing()) - sync.audit_location_generator = fake_audit_location_generator - cs.run_forever(1, 2, a=3, b=4, verbose=True) - except Exception as err: - if str(err) != 'we are now done': - raise - finally: - sync.time = orig_time - sync.sleep = orig_sleep - sync.audit_location_generator = orig_audit_location_generator - sync.ContainerBroker = orig_ContainerBroker - - self.assertEqual(time_calls, [9]) - self.assertEqual(len(sleep_calls), 2) - self.assertTrue(sleep_calls[0] <= cs.interval) - self.assertTrue(sleep_calls[1] == cs.interval - 1) - self.assertEqual(audit_location_generator_calls, [2]) - self.assertEqual(cs.reported, 3602) + self.assertEqual(time_calls, [9]) + self.assertEqual(len(sleep_calls), 2) + self.assertLessEqual(sleep_calls[0], cs.interval) + self.assertEqual(cs.interval - 1, sleep_calls[1]) + self.assertEqual(2, fake_generator.call_count) + self.assertEqual(cs.reported, 3602) def test_run_once(self): # This runs runs_once with fakes twice, the first causing an interim # report, the second with no interim report. time_calls = [0] - audit_location_generator_calls = [0] def fake_time(): time_calls[0] += 1 @@ -235,40 +225,31 @@ class TestContainerSync(unittest.TestCase): raise Exception('we are now done') return returns[time_calls[0] - 1] - def fake_audit_location_generator(*args, **kwargs): - audit_location_generator_calls[0] += 1 - # Makes .container_sync() short-circuit - yield 'container.db', 'device', 'partition' - return + gen_func = ('swift.container.sync_store.' + 'ContainerSyncStore.synced_containers_generator') + with mock.patch('swift.container.sync.InternalClient'), \ + mock.patch('swift.container.sync.time', fake_time), \ + mock.patch(gen_func) as fake_generator, \ + mock.patch('swift.container.sync.ContainerBroker', + lambda p: FakeContainerBroker(p, info={ + 'account': 'a', 'container': 'c', + 'storage_policy_index': 0})): + fake_generator.side_effect = [iter(['container.db']), + iter(['container.db'])] + cs = sync.ContainerSync({}, container_ring=FakeRing()) + try: + cs.run_once() + self.assertEqual(time_calls, [6]) + self.assertEqual(1, fake_generator.call_count) + self.assertEqual(cs.reported, 3602) + cs.run_once() + except Exception as err: + if str(err) != 'we are now done': + raise - orig_time = sync.time - orig_audit_location_generator = sync.audit_location_generator - orig_ContainerBroker = sync.ContainerBroker - try: - sync.ContainerBroker = lambda p: FakeContainerBroker( - p, info={'account': 'a', 'container': 'c', - 'storage_policy_index': 0}) - sync.time = fake_time - - with mock.patch('swift.container.sync.InternalClient'): - cs = sync.ContainerSync({}, container_ring=FakeRing()) - sync.audit_location_generator = fake_audit_location_generator - cs.run_once(1, 2, a=3, b=4, verbose=True) - self.assertEqual(time_calls, [6]) - self.assertEqual(audit_location_generator_calls, [1]) - self.assertEqual(cs.reported, 3602) - cs.run_once() - except Exception as err: - if str(err) != 'we are now done': - raise - finally: - sync.time = orig_time - sync.audit_location_generator = orig_audit_location_generator - sync.ContainerBroker = orig_ContainerBroker - - self.assertEqual(time_calls, [10]) - self.assertEqual(audit_location_generator_calls, [2]) - self.assertEqual(cs.reported, 3604) + self.assertEqual(time_calls, [10]) + self.assertEqual(2, fake_generator.call_count) + self.assertEqual(cs.reported, 3604) def test_container_sync_not_db(self): cring = FakeRing() @@ -280,8 +261,65 @@ class TestContainerSync(unittest.TestCase): cring = FakeRing() with mock.patch('swift.container.sync.InternalClient'): cs = sync.ContainerSync({}, container_ring=cring) - cs.container_sync('isa.db') - self.assertEqual(cs.container_failures, 1) + + broker = 'swift.container.backend.ContainerBroker' + store = 'swift.container.sync_store.ContainerSyncStore' + + # In this test we call the container_sync instance several + # times with a missing db in various combinations. + # Since we use the same ContainerSync instance for all tests + # its failures counter increases by one with each call. + + # Test the case where get_info returns DatabaseConnectionError + # with DB does not exist, and we succeed in deleting it. + with mock.patch(broker + '.get_info') as fake_get_info: + with mock.patch(store + '.remove_synced_container') as fake_remove: + fake_get_info.side_effect = DatabaseConnectionError( + 'a', + "DB doesn't exist") + cs.container_sync('isa.db') + self.assertEqual(cs.container_failures, 1) + self.assertEqual(cs.container_skips, 0) + self.assertEqual(1, fake_remove.call_count) + self.assertEqual('isa.db', fake_remove.call_args[0][0].db_file) + + # Test the case where get_info returns DatabaseConnectionError + # with DB does not exist, and we fail to delete it. + with mock.patch(broker + '.get_info') as fake_get_info: + with mock.patch(store + '.remove_synced_container') as fake_remove: + fake_get_info.side_effect = DatabaseConnectionError( + 'a', + "DB doesn't exist") + fake_remove.side_effect = OSError('1') + cs.container_sync('isa.db') + self.assertEqual(cs.container_failures, 2) + self.assertEqual(cs.container_skips, 0) + self.assertEqual(1, fake_remove.call_count) + self.assertEqual('isa.db', fake_remove.call_args[0][0].db_file) + + # Test the case where get_info returns DatabaseConnectionError + # with DB does not exist, and it returns an error != ENOENT. + with mock.patch(broker + '.get_info') as fake_get_info: + with mock.patch(store + '.remove_synced_container') as fake_remove: + fake_get_info.side_effect = DatabaseConnectionError( + 'a', + "DB doesn't exist") + fake_remove.side_effect = OSError(errno.EPERM, 'a') + cs.container_sync('isa.db') + self.assertEqual(cs.container_failures, 3) + self.assertEqual(cs.container_skips, 0) + self.assertEqual(1, fake_remove.call_count) + self.assertEqual('isa.db', fake_remove.call_args[0][0].db_file) + + # Test the case where get_info returns DatabaseConnectionError + # error different than DB does not exist + with mock.patch(broker + '.get_info') as fake_get_info: + with mock.patch(store + '.remove_synced_container') as fake_remove: + fake_get_info.side_effect = DatabaseConnectionError('a', 'a') + cs.container_sync('isa.db') + self.assertEqual(cs.container_failures, 4) + self.assertEqual(cs.container_skips, 0) + self.assertEqual(0, fake_remove.call_count) def test_container_sync_not_my_db(self): # Db could be there due to handoff replication so test that we ignore diff --git a/test/unit/container/test_sync_store.py b/test/unit/container/test_sync_store.py new file mode 100644 index 0000000000..d38d4bc3e5 --- /dev/null +++ b/test/unit/container/test_sync_store.py @@ -0,0 +1,367 @@ +# Copyright (c) 2010-2016 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import errno +import mock +import random +import logging +import unittest +import tempfile +from shutil import rmtree +from test.unit import debug_logger + +from swift.container.backend import DATADIR +from swift.container import sync_store + + +class FakeContainerBroker(object): + + def __init__(self, path): + self.db_file = path + self.db_dir = os.path.dirname(path) + self.metadata = dict() + self._is_deleted = False + + def is_deleted(self): + return self._is_deleted + + +class TestContainerSyncStore(unittest.TestCase): + + def setUp(self): + self.logger = debug_logger('test-container-sync-store') + self.logger.level = logging.DEBUG + self.test_dir_prefix = tempfile.mkdtemp() + self.devices_dir = os.path.join(self.test_dir_prefix, 'srv/node/') + os.makedirs(self.devices_dir) + # Create dummy container dbs + self.devices = ['sdax', 'sdb', 'sdc'] + self.partitions = ['21765', '38965', '13234'] + self.suffixes = ['312', '435'] + self.hashes = ['f19ed', '53ef', '0ab5', '9c3a'] + for device in self.devices: + data_dir_path = os.path.join(self.devices_dir, + device, + DATADIR) + os.makedirs(data_dir_path) + for part in self.partitions: + for suffix in self.suffixes: + for hsh in self.hashes: + db_dir = os.path.join(data_dir_path, + part, + suffix, + hsh) + os.makedirs(db_dir) + db_file = os.path.join(db_dir, '%s.db' % hsh) + with open(db_file, 'w') as outfile: + outfile.write('%s' % db_file) + + def teardown(self): + rmtree(self.test_dir_prefix) + + def pick_dbfile(self): + hsh = random.choice(self.hashes) + return os.path.join(self.devices_dir, + random.choice(self.devices), + DATADIR, + random.choice(self.partitions), + random.choice(self.suffixes), + hsh, + '%s.db' % hsh) + + # Path conversion tests + # container path is of the form: + # /srv/node/sdb/containers/part/.../*.db + # or more generally: + # devices/device/DATADIR/part/.../*.db + # synced container path is assumed to be of the form: + # /srv/node/sdb/sync_containers/part/.../*.db + # or more generally: + # devices/device/SYNC_DATADIR/part/.../*.db + # Indeed the ONLY DIFFERENCE is DATADIR <-> SYNC_DATADIR + # Since, however, the strings represented by the constants + # DATADIR or SYNC_DATADIR + # can appear in the devices or the device part, the conversion + # function between the two is a bit more subtle then a mere replacement. + + # This function tests the conversion between a container path + # and a synced container path + def test_container_to_synced_container_path_conversion(self): + # The conversion functions are oblivious to the suffix + # so we just pick up a constant one. + db_path_suffix = self._db_path_suffix() + + # We build various container path putting in both + # DATADIR and SYNC_DATADIR strings in the + # device and devices parts. + for devices, device in self._container_path_elements_generator(): + path = os.path.join(devices, device, DATADIR, db_path_suffix) + # Call the conversion function + sds = sync_store.ContainerSyncStore(devices, self.logger, False) + path = sds._container_to_synced_container_path(path) + # Validate that ONLY the DATADIR part was replaced with + # sync_store.SYNC_DATADIR + self._validate_container_path_parts(path, devices, device, + sync_store.SYNC_DATADIR, + db_path_suffix) + + # This function tests the conversion between a synced container path + # and a container path + def test_synced_container_to_container_path_conversion(self): + # The conversion functions are oblivious to the suffix + # so we just pick up a constant one. + db_path_suffix = ('133791/625/82a7f5a2c43281b0eab3597e35bb9625/' + '82a7f5a2c43281b0eab3597e35bb9625.db') + + # We build various synced container path putting in both + # DATADIR and SYNC_DATADIR strings in the + # device and devices parts. + for devices, device in self._container_path_elements_generator(): + path = os.path.join(devices, device, + sync_store.SYNC_DATADIR, db_path_suffix) + # Call the conversion function + sds = sync_store.ContainerSyncStore(devices, self.logger, False) + path = sds._synced_container_to_container_path(path) + # Validate that ONLY the SYNC_DATADIR part was replaced with + # DATADIR + self._validate_container_path_parts(path, devices, device, + DATADIR, + db_path_suffix) + + # Constructs a db path suffix of the form: + # 133791/625/82...25/82...25.db + def _db_path_suffix(self): + def random_hexa_string(length): + '%0xlength' % random.randrange(16 ** length) + + db = random_hexa_string(32) + return '%s/%s/%s/%s.db' % (random_hexa_string(5), + random_hexa_string(3), + db, db) + + def _container_path_elements_generator(self): + # We build various container path elements putting in both + # DATADIR and SYNC_DATADIR strings in the + # device and devices parts. + for devices in ['/srv/node', '/srv/node/', + '/srv/node/dev', + '/srv/node/%s' % DATADIR, + '/srv/node/%s' % sync_store.SYNC_DATADIR]: + for device in ['sdf1', 'sdf1/sdf2', + 'sdf1/%s' % DATADIR, + 'sdf1/%s' % sync_store.SYNC_DATADIR, + '%s/sda' % DATADIR, + '%s/sda' % sync_store.SYNC_DATADIR]: + yield devices, device + + def _validate_container_path_parts(self, path, devices, + device, target, suffix): + # Recall that the path is of the form: + # devices/device/target/suffix + # where each of the sub path elements (e.g. devices) + # has a path structure containing path elements separated by '/' + # We thus validate by splitting the path according to '/' + # traversing all of its path elements making sure that the + # first elements are those of devices, + # the second are those of device + # etc. + spath = path.split('/') + spath.reverse() + self.assertEqual(spath.pop(), '') + # Validate path against 'devices' + for p in [p for p in devices.split('/') if p]: + self.assertEqual(spath.pop(), p) + # Validate path against 'device' + for p in [p for p in device.split('/') if p]: + self.assertEqual(spath.pop(), p) + # Validate path against target + self.assertEqual(spath.pop(), target) + # Validate path against suffix + for p in [p for p in suffix.split('/') if p]: + self.assertEqual(spath.pop(), p) + + def test_add_synced_container(self): + # Add non-existing and existing synced containers + sds = sync_store.ContainerSyncStore(self.devices_dir, + self.logger, + False) + cfile = self.pick_dbfile() + broker = FakeContainerBroker(cfile) + for i in range(2): + sds.add_synced_container(broker) + scpath = sds._container_to_synced_container_path(cfile) + with open(scpath, 'r') as infile: + self.assertEqual(infile.read(), cfile) + + iterated_synced_containers = list() + for db_path in sds.synced_containers_generator(): + iterated_synced_containers.append(db_path) + + self.assertEqual(len(iterated_synced_containers), 1) + + def test_remove_synced_container(self): + # Add a synced container to remove + sds = sync_store.ContainerSyncStore(self.devices_dir, + self.logger, + False) + cfile = self.pick_dbfile() + # We keep here the link file so as to validate its deletion later + lfile = sds._container_to_synced_container_path(cfile) + broker = FakeContainerBroker(cfile) + sds.add_synced_container(broker) + + # Remove existing and non-existing synced containers + for i in range(2): + sds.remove_synced_container(broker) + + iterated_synced_containers = list() + for db_path in sds.synced_containers_generator(): + iterated_synced_containers.append(db_path) + + self.assertEqual(len(iterated_synced_containers), 0) + + # Make sure the whole link path gets deleted + # recall that the path has the following suffix: + # // + # /.db + # and we expect the .db as well as all path elements + # to get deleted + self.assertFalse(os.path.exists(lfile)) + lfile = os.path.dirname(lfile) + for i in range(3): + self.assertFalse(os.path.exists(os.path.dirname(lfile))) + lfile = os.path.dirname(lfile) + + def test_iterate_synced_containers(self): + # populate sync container db + sds = sync_store.ContainerSyncStore(self.devices_dir, + self.logger, + False) + containers = list() + for i in range(10): + cfile = self.pick_dbfile() + broker = FakeContainerBroker(cfile) + sds.add_synced_container(broker) + containers.append(cfile) + + iterated_synced_containers = list() + for db_path in sds.synced_containers_generator(): + iterated_synced_containers.append(db_path) + + self.assertEqual( + set(containers), set(iterated_synced_containers)) + + def test_unhandled_exceptions_in_add_remove(self): + sds = sync_store.ContainerSyncStore(self.devices_dir, + self.logger, + False) + cfile = self.pick_dbfile() + broker = FakeContainerBroker(cfile) + + with mock.patch( + 'swift.container.sync_store.os.stat', + side_effect=OSError(errno.EPERM, 'permission denied')): + with self.assertRaises(OSError) as cm: + sds.add_synced_container(broker) + self.assertEqual(errno.EPERM, cm.exception.errno) + + with mock.patch( + 'swift.container.sync_store.os.makedirs', + side_effect=OSError(errno.EPERM, 'permission denied')): + with self.assertRaises(OSError) as cm: + sds.add_synced_container(broker) + self.assertEqual(errno.EPERM, cm.exception.errno) + + with mock.patch( + 'swift.container.sync_store.os.symlink', + side_effect=OSError(errno.EPERM, 'permission denied')): + with self.assertRaises(OSError) as cm: + sds.add_synced_container(broker) + self.assertEqual(errno.EPERM, cm.exception.errno) + + with mock.patch( + 'swift.container.sync_store.os.unlink', + side_effect=OSError(errno.EPERM, 'permission denied')): + with self.assertRaises(OSError) as cm: + sds.remove_synced_container(broker) + self.assertEqual(errno.EPERM, cm.exception.errno) + + def test_update_sync_store_according_to_metadata_and_deleted(self): + # This function tests the update_sync_store 'logics' + # with respect to various combinations of the + # sync-to and sync-key metadata items and whether + # the database is marked for delete. + # The table below summarizes the expected result + # for the various combinations, e.g.: + # If metadata items exist and the database + # is not marked for delete then add should be called. + + results_list = [ + [False, 'a', 'b', 'add'], + [False, 'a', '', 'remove'], + [False, 'a', None, 'remove'], + [False, '', 'b', 'remove'], + [False, '', '', 'remove'], + [False, '', None, 'remove'], + [False, None, 'b', 'remove'], + [False, None, '', 'remove'], + [False, None, None, 'none'], + [True, 'a', 'b', 'remove'], + [True, 'a', '', 'remove'], + [True, 'a', None, 'remove'], + [True, '', 'b', 'remove'], + [True, '', '', 'remove'], + [True, '', None, 'remove'], + [True, None, 'b', 'remove'], + [True, None, '', 'remove'], + [True, None, None, 'none'], + ] + + store = 'swift.container.sync_store.ContainerSyncStore' + with mock.patch(store + '.add_synced_container') as add_container: + with mock.patch( + store + '.remove_synced_container') as remove_container: + sds = sync_store.ContainerSyncStore(self.devices_dir, + self.logger, + False) + add_calls = 0 + remove_calls = 0 + # We now iterate over the list of combinations + # Validating that add and removed are called as + # expected + for deleted, sync_to, sync_key, expected_op in results_list: + cfile = self.pick_dbfile() + broker = FakeContainerBroker(cfile) + broker._is_deleted = deleted + if sync_to is not None: + broker.metadata['X-Container-Sync-To'] = [ + sync_to, 1] + if sync_key is not None: + broker.metadata['X-Container-Sync-Key'] = [ + sync_key, 1] + sds.update_sync_store(broker) + if expected_op == 'add': + add_calls += 1 + if expected_op == 'remove': + remove_calls += 1 + self.assertEqual(add_container.call_count, + add_calls) + self.assertEqual(remove_container.call_count, + remove_calls) + + +if __name__ == '__main__': + unittest.main()