From 2a6dfae2f3d50750de2bdb5f31ea52070588e895 Mon Sep 17 00:00:00 2001 From: Tim Burke Date: Mon, 15 Jun 2020 17:09:15 -0700 Subject: [PATCH] Allow direct and internal clients to use the replication network A new header `X-Backend-Use-Replication-Network` is added; if true, use the replication network instead of the client-data-path network. Several background daemons are updated to use the replication network: * account-reaper * container-reconciler * container-sharder * container-sync * object-expirer Note that if container-sync is being used to sync data within the same cluster, the replication network will only be used when communicating with the "source" container; the "destination" traffic will continue to use the configured realm endpoint. The direct and internal client APIs still default to using the client-data-path network; this maintains backwards compatibility for external tools written against them. UpgradeImpact ============= Until recently, servers configured with replication_server = true would only handle REPLICATE (and, in the case of object servers, SSYNC) requests, and would respond 405 Method Not Allowed to other requests. When upgrading from Swift 2.25.0 or earlier, remove the config option and restart services prior to upgrade to avoid a flood of background daemon errors in logs. Note that some background daemons find work by querying Swift rather than walking local drives that should be available on the replication network: * container-reconciler * object-expirer Previosuly these may have been configured without access to the replication network; ensure they have access before upgrading. Closes-Bug: #1883302 Related-Bug: #1446873 Related-Change: Ica2b41a52d11cb10c94fa8ad780a201318c4fc87 Change-Id: Ieef534bf5d5fb53602e875b51c15ef565882fbff --- swift/account/reaper.py | 10 +++- swift/common/direct_client.py | 33 ++++++++--- swift/common/internal_client.py | 7 ++- swift/common/request_helpers.py | 17 +++++- swift/container/reconciler.py | 19 ++++-- swift/container/sharder.py | 5 +- swift/container/sync.py | 3 +- swift/obj/expirer.py | 3 +- swift/proxy/controllers/base.py | 13 +++-- swift/proxy/controllers/obj.py | 5 +- test/unit/__init__.py | 30 +++++++--- test/unit/account/test_reaper.py | 6 +- test/unit/common/test_direct_client.py | 55 ++++++++++++++++++ test/unit/common/test_internal_client.py | 73 +++++++++++++++++++++++- test/unit/common/test_request_helpers.py | 13 +++++ test/unit/container/test_reconciler.py | 1 + test/unit/container/test_sharder.py | 22 +++++-- test/unit/proxy/test_server.py | 2 +- 18 files changed, 271 insertions(+), 46 deletions(-) diff --git a/swift/account/reaper.py b/swift/account/reaper.py index 41b4485d1b..7b62df03fc 100644 --- a/swift/account/reaper.py +++ b/swift/account/reaper.py @@ -31,6 +31,7 @@ from swift.common.constraints import check_drive from swift.common.direct_client import direct_delete_container, \ direct_delete_object, direct_get_container from swift.common.exceptions import ClientException +from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER from swift.common.ring import Ring from swift.common.ring.utils import is_local_device from swift.common.utils import get_logger, whataremyips, config_true_value, \ @@ -370,7 +371,8 @@ class AccountReaper(Daemon): node, part, account, container, marker=marker, conn_timeout=self.conn_timeout, - response_timeout=self.node_timeout) + response_timeout=self.node_timeout, + headers={USE_REPLICATION_NETWORK_HEADER: 'true'}) self.stats_return_codes[2] = \ self.stats_return_codes.get(2, 0) + 1 self.logger.increment('return_codes.2') @@ -418,7 +420,8 @@ class AccountReaper(Daemon): 'X-Account-Partition': str(account_partition), 'X-Account-Device': anode['device'], 'X-Account-Override-Deleted': 'yes', - 'X-Timestamp': timestamp.internal}) + 'X-Timestamp': timestamp.internal, + USE_REPLICATION_NETWORK_HEADER: 'true'}) successes += 1 self.stats_return_codes[2] = \ self.stats_return_codes.get(2, 0) + 1 @@ -494,7 +497,8 @@ class AccountReaper(Daemon): 'X-Container-Partition': str(container_partition), 'X-Container-Device': cnode['device'], 'X-Backend-Storage-Policy-Index': policy_index, - 'X-Timestamp': timestamp.internal}) + 'X-Timestamp': timestamp.internal, + USE_REPLICATION_NETWORK_HEADER: 'true'}) successes += 1 self.stats_return_codes[2] = \ self.stats_return_codes.get(2, 0) + 1 diff --git a/swift/common/direct_client.py b/swift/common/direct_client.py index 86a644d8b0..a68159f207 100644 --- a/swift/common/direct_client.py +++ b/swift/common/direct_client.py @@ -29,6 +29,8 @@ from six.moves.http_client import HTTPException from swift.common.bufferedhttp import http_connect from swift.common.exceptions import ClientException +from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER, \ + get_ip_port from swift.common.swob import normalize_etag from swift.common.utils import Timestamp, FileLikeIter, quote from swift.common.http import HTTP_NO_CONTENT, HTTP_INSUFFICIENT_STORAGE, \ @@ -100,9 +102,10 @@ def _make_req(node, part, method, path, headers, stype, if content_length is None: headers['Transfer-Encoding'] = 'chunked' + ip, port = get_ip_port(node, headers) headers.setdefault('X-Backend-Allow-Reserved-Names', 'true') with Timeout(conn_timeout): - conn = http_connect(node['ip'], node['port'], node['device'], part, + conn = http_connect(ip, port, node['device'], part, method, path, headers=headers) if contents is not None: @@ -145,6 +148,9 @@ def _get_direct_account_container(path, stype, node, part, Do not use directly use the get_direct_account or get_direct_container instead. """ + if headers is None: + headers = {} + params = ['format=json'] if marker: params.append('marker=%s' % quote(marker)) @@ -159,8 +165,10 @@ def _get_direct_account_container(path, stype, node, part, if reverse: params.append('reverse=%s' % quote(reverse)) qs = '&'.join(params) + + ip, port = get_ip_port(node, headers) with Timeout(conn_timeout): - conn = http_connect(node['ip'], node['port'], node['device'], part, + conn = http_connect(ip, port, node['device'], part, 'GET', path, query_string=qs, headers=gen_headers(hdrs_in=headers)) with Timeout(response_timeout): @@ -200,7 +208,8 @@ def gen_headers(hdrs_in=None, add_ts=True): def direct_get_account(node, part, account, marker=None, limit=None, prefix=None, delimiter=None, conn_timeout=5, - response_timeout=15, end_marker=None, reverse=None): + response_timeout=15, end_marker=None, reverse=None, + headers=None): """ Get listings directly from the account server. @@ -220,6 +229,7 @@ def direct_get_account(node, part, account, marker=None, limit=None, """ path = _make_path(account) return _get_direct_account_container(path, "Account", node, part, + headers=headers, marker=marker, limit=limit, prefix=prefix, delimiter=delimiter, @@ -240,7 +250,7 @@ def direct_delete_account(node, part, account, conn_timeout=5, def direct_head_container(node, part, account, container, conn_timeout=5, - response_timeout=15): + response_timeout=15, headers=None): """ Request container information directly from the container server. @@ -253,8 +263,11 @@ def direct_head_container(node, part, account, container, conn_timeout=5, :returns: a dict containing the response's headers in a HeaderKeyDict :raises ClientException: HTTP HEAD request failed """ + if headers is None: + headers = {} + path = _make_path(account, container) - resp = _make_req(node, part, 'HEAD', path, gen_headers(), + resp = _make_req(node, part, 'HEAD', path, gen_headers(headers), 'Container', conn_timeout, response_timeout) resp_headers = HeaderKeyDict() @@ -431,9 +444,10 @@ def direct_get_object(node, part, account, container, obj, conn_timeout=5, if headers is None: headers = {} + ip, port = get_ip_port(node, headers) path = _make_path(account, container, obj) with Timeout(conn_timeout): - conn = http_connect(node['ip'], node['port'], node['device'], part, + conn = http_connect(ip, port, node['device'], part, 'GET', path, headers=gen_headers(headers)) with Timeout(response_timeout): resp = conn.getresponse() @@ -551,6 +565,9 @@ def direct_get_suffix_hashes(node, part, suffixes, conn_timeout=5, """ Get suffix hashes directly from the object server. + Note that unlike other ``direct_client`` functions, this one defaults + to using the replication network to make requests. + :param node: node dictionary from the ring :param part: partition the container is on :param conn_timeout: timeout in seconds for establishing the connection @@ -562,9 +579,11 @@ def direct_get_suffix_hashes(node, part, suffixes, conn_timeout=5, if headers is None: headers = {} + headers.setdefault(USE_REPLICATION_NETWORK_HEADER, 'true') + ip, port = get_ip_port(node, headers) path = '/%s' % '-'.join(suffixes) with Timeout(conn_timeout): - conn = http_connect(node['replication_ip'], node['replication_port'], + conn = http_connect(ip, port, node['device'], part, 'REPLICATE', path, headers=gen_headers(headers)) with Timeout(response_timeout): diff --git a/swift/common/internal_client.py b/swift/common/internal_client.py index c165909e71..58b06d1cb5 100644 --- a/swift/common/internal_client.py +++ b/swift/common/internal_client.py @@ -29,6 +29,7 @@ from swift.common.constraints import AUTO_CREATE_ACCOUNT_PREFIX from swift.common.exceptions import ClientException from swift.common.http import (HTTP_NOT_FOUND, HTTP_MULTIPLE_CHOICES, is_client_error, is_server_error) +from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER from swift.common.swob import Request, bytes_to_wsgi from swift.common.utils import quote, closing_if_possible from swift.common.wsgi import loadapp, pipeline_property @@ -147,13 +148,14 @@ class InternalClient(object): """ def __init__(self, conf_path, user_agent, request_tries, - allow_modify_pipeline=False): + allow_modify_pipeline=False, use_replication_network=False): if request_tries < 1: raise ValueError('request_tries must be positive') self.app = loadapp(conf_path, allow_modify_pipeline=allow_modify_pipeline) self.user_agent = user_agent self.request_tries = request_tries + self.use_replication_network = use_replication_network get_object_ring = pipeline_property('get_object_ring') container_ring = pipeline_property('container_ring') @@ -186,6 +188,9 @@ class InternalClient(object): headers = dict(headers) headers['user-agent'] = self.user_agent headers.setdefault('x-backend-allow-reserved-names', 'true') + if self.use_replication_network: + headers.setdefault(USE_REPLICATION_NETWORK_HEADER, 'true') + for attempt in range(self.request_tries): resp = exc_type = exc_value = exc_traceback = None req = Request.blank( diff --git a/swift/common/request_helpers.py b/swift/common/request_helpers.py index c120ac2778..c660da74b2 100644 --- a/swift/common/request_helpers.py +++ b/swift/common/request_helpers.py @@ -40,14 +40,15 @@ from swift.common.utils import split_path, validate_device_partition, \ close_if_possible, maybe_multipart_byteranges_to_document_iters, \ multipart_byteranges_to_document_iters, parse_content_type, \ parse_content_range, csv_append, list_from_csv, Spliterator, quote, \ - RESERVED + RESERVED, config_true_value from swift.common.wsgi import make_subrequest -from swift.container.reconciler import MISPLACED_OBJECTS_ACCOUNT OBJECT_TRANSIENT_SYSMETA_PREFIX = 'x-object-transient-sysmeta-' OBJECT_SYSMETA_CONTAINER_UPDATE_OVERRIDE_PREFIX = \ 'x-object-sysmeta-container-update-override-' +USE_REPLICATION_NETWORK_HEADER = 'x-backend-use-replication-network' +MISPLACED_OBJECTS_ACCOUNT = '.misplaced_objects' if six.PY2: @@ -849,3 +850,15 @@ def update_ignore_range_header(req, name): raise ValueError('Header name must not contain commas') hdr = 'X-Backend-Ignore-Range-If-Metadata-Present' req.headers[hdr] = csv_append(req.headers.get(hdr), name) + + +def get_ip_port(node, headers): + use_replication_network = False + for h, v in headers.items(): + if h.lower() == USE_REPLICATION_NETWORK_HEADER: + use_replication_network = config_true_value(v) + break + if use_replication_network: + return node['replication_ip'], node['replication_port'] + else: + return node['ip'], node['port'] diff --git a/swift/container/reconciler.py b/swift/container/reconciler.py index 7a8b5fd1e9..9bf225eb64 100644 --- a/swift/container/reconciler.py +++ b/swift/container/reconciler.py @@ -27,11 +27,12 @@ from swift.common.direct_client import ( direct_head_container, direct_delete_container_object, direct_put_container_object, ClientException) from swift.common.internal_client import InternalClient, UnexpectedResponse +from swift.common.request_helpers import MISPLACED_OBJECTS_ACCOUNT, \ + USE_REPLICATION_NETWORK_HEADER from swift.common.utils import get_logger, split_path, majority_size, \ FileLikeIter, Timestamp, last_modified_date_to_timestamp, \ LRUCache, decode_timestamps -MISPLACED_OBJECTS_ACCOUNT = '.misplaced_objects' MISPLACED_OBJECTS_CONTAINER_DIVISOR = 3600 # 1 hour CONTAINER_POLICY_TTL = 30 @@ -224,6 +225,7 @@ def add_to_reconciler_queue(container_ring, account, container, obj, 'X-Etag': obj_timestamp, 'X-Timestamp': x_timestamp, 'X-Content-Type': q_op_type, + USE_REPLICATION_NETWORK_HEADER: 'true', } def _check_success(*args, **kwargs): @@ -307,7 +309,8 @@ def direct_get_container_policy_index(container_ring, account_name, """ def _eat_client_exception(*args): try: - return direct_head_container(*args) + return direct_head_container(*args, headers={ + USE_REPLICATION_NETWORK_HEADER: 'true'}) except ClientException as err: if err.http_status == 404: return err.http_headers @@ -333,6 +336,10 @@ def direct_delete_container_entry(container_ring, account_name, container_name, object listing. Does not talk to object servers; use this only when a container entry does not actually have a corresponding object. """ + if headers is None: + headers = {} + headers[USE_REPLICATION_NETWORK_HEADER] = 'true' + pool = GreenPool() part, nodes = container_ring.get_nodes(account_name, container_name) for node in nodes: @@ -360,9 +367,11 @@ class ContainerReconciler(Daemon): '/etc/swift/container-reconciler.conf' self.logger = get_logger(conf, log_route='container-reconciler') request_tries = int(conf.get('request_tries') or 3) - self.swift = InternalClient(conf_path, - 'Swift Container Reconciler', - request_tries) + self.swift = InternalClient( + conf_path, + 'Swift Container Reconciler', + request_tries, + use_replication_network=True) self.stats = defaultdict(int) self.last_stat_time = time.time() diff --git a/swift/container/sharder.py b/swift/container/sharder.py index dd33043ae4..2c70a1b27f 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -29,6 +29,7 @@ from swift.common.constraints import check_drive, AUTO_CREATE_ACCOUNT_PREFIX from swift.common.direct_client import (direct_put_container, DirectClientException) from swift.common.exceptions import DeviceUnavailable +from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER from swift.common.ring.utils import is_local_device from swift.common.swob import str_to_wsgi from swift.common.utils import get_logger, config_true_value, \ @@ -409,7 +410,8 @@ class ContainerSharder(ContainerReplicator): internal_client_conf_path, 'Swift Container Sharder', request_tries, - allow_modify_pipeline=False) + allow_modify_pipeline=False, + use_replication_network=True) except (OSError, IOError) as err: if err.errno != errno.ENOENT and \ not str(err).endswith(' not found'): @@ -623,6 +625,7 @@ class ContainerSharder(ContainerReplicator): part, nodes = self.ring.get_nodes(account, container) headers = headers or {} headers.update({'X-Backend-Record-Type': RECORD_TYPE_SHARD, + USE_REPLICATION_NETWORK_HEADER: 'True', 'User-Agent': 'container-sharder %s' % os.getpid(), 'X-Timestamp': Timestamp.now().normal, 'Content-Length': len(body), diff --git a/swift/container/sync.py b/swift/container/sync.py index 529f85a448..34b300fb03 100644 --- a/swift/container/sync.py +++ b/swift/container/sync.py @@ -241,7 +241,8 @@ class ContainerSync(Daemon): internal_client_conf = internal_client_conf_path try: self.swift = InternalClient( - internal_client_conf, 'Swift Container Sync', request_tries) + internal_client_conf, 'Swift Container Sync', request_tries, + use_replication_network=True) except (OSError, IOError) as err: if err.errno != errno.ENOENT and \ not str(err).endswith(' not found'): diff --git a/swift/obj/expirer.py b/swift/obj/expirer.py index 1aef5fc2ff..b041f99c0a 100644 --- a/swift/obj/expirer.py +++ b/swift/obj/expirer.py @@ -134,7 +134,8 @@ class ObjectExpirer(Daemon): request_tries = int(self.conf.get('request_tries') or 3) self.swift = swift or InternalClient( - self.ic_conf_path, 'Swift Object Expirer', request_tries) + self.ic_conf_path, 'Swift Object Expirer', request_tries, + use_replication_network=True) self.processes = int(self.conf.get('processes', 0)) self.process = int(self.conf.get('process', 0)) diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 95ff8e7653..2912da3ea6 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -63,7 +63,7 @@ from swift.common.swob import Request, Response, Range, \ from swift.common.request_helpers import strip_sys_meta_prefix, \ strip_user_meta_prefix, is_user_meta, is_sys_meta, is_sys_or_user_meta, \ http_response_to_document_iters, is_object_transient_sysmeta, \ - strip_object_transient_sysmeta_prefix + strip_object_transient_sysmeta_prefix, get_ip_port from swift.common.storage_policy import POLICIES @@ -1264,11 +1264,13 @@ class ResumingGetter(object): # a request may be specialised with specific backend headers if self.header_provider: req_headers.update(self.header_provider()) + + ip, port = get_ip_port(node, req_headers) start_node_timing = time.time() try: with ConnectionTimeout(self.app.conn_timeout): conn = http_connect( - node['ip'], node['port'], node['device'], + ip, port, node['device'], self.partition, self.req_method, self.path, headers=req_headers, query_string=self.req_query_string) @@ -1766,11 +1768,12 @@ class Controller(object): headers['Content-Length'] = str(len(body)) for node in nodes: try: + ip, port = get_ip_port(node, headers) start_node_timing = time.time() with ConnectionTimeout(self.app.conn_timeout): - conn = http_connect(node['ip'], node['port'], - node['device'], part, method, path, - headers=headers, query_string=query) + conn = http_connect( + ip, port, node['device'], part, method, path, + headers=headers, query_string=query) conn.node = node self.app.set_node_timing(node, time.time() - start_node_timing) if body: diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 058c445f10..ceab334463 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -73,7 +73,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ HTTPRequestedRangeNotSatisfiable, Range, HTTPInternalServerError, \ normalize_etag from swift.common.request_helpers import update_etag_is_at_header, \ - resolve_etag_is_at_header, validate_internal_obj + resolve_etag_is_at_header, validate_internal_obj, get_ip_port def check_content_type(req): @@ -1683,9 +1683,10 @@ class Putter(object): @classmethod def _make_connection(cls, node, part, path, headers, conn_timeout, node_timeout): + ip, port = get_ip_port(node, headers) start_time = time.time() with ConnectionTimeout(conn_timeout): - conn = http_connect(node['ip'], node['port'], node['device'], + conn = http_connect(ip, port, node['device'], part, 'PUT', path, headers) connect_duration = time.time() - start_time diff --git a/test/unit/__init__.py b/test/unit/__init__.py index e019e3058a..adb6663903 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -215,12 +215,13 @@ class PatchPolicies(object): class FakeRing(Ring): def __init__(self, replicas=3, max_more_nodes=0, part_power=0, - base_port=1000): + base_port=1000, separate_replication=False): self.serialized_path = '/foo/bar/object.ring.gz' self._base_port = base_port self.max_more_nodes = max_more_nodes self._part_shift = 32 - part_power self._init_device_char() + self.separate_replication = separate_replication # 9 total nodes (6 more past the initial 3) is the cap, no matter if # this is set higher, or R^2 for R replicas self.set_replicas(replicas) @@ -256,11 +257,16 @@ class FakeRing(Ring): for x in range(self.replicas): ip = '10.0.0.%s' % x port = self._base_port + x + if self.separate_replication: + repl_ip = '10.0.1.%s' % x + repl_port = port + 100 + else: + repl_ip, repl_port = ip, port dev = { 'ip': ip, - 'replication_ip': ip, + 'replication_ip': repl_ip, 'port': port, - 'replication_port': port, + 'replication_port': repl_port, 'device': self.device_char, 'zone': x % 3, 'region': x % 2, @@ -278,10 +284,17 @@ class FakeRing(Ring): def get_more_nodes(self, part): index_counter = itertools.count() for x in range(self.replicas, (self.replicas + self.max_more_nodes)): - yield {'ip': '10.0.0.%s' % x, - 'replication_ip': '10.0.0.%s' % x, - 'port': self._base_port + x, - 'replication_port': self._base_port + x, + ip = '10.0.0.%s' % x + port = self._base_port + x + if self.separate_replication: + repl_ip = '10.0.1.%s' % x + repl_port = port + 100 + else: + repl_ip, repl_port = ip, port + yield {'ip': ip, + 'replication_ip': repl_ip, + 'port': port, + 'replication_port': repl_port, 'device': 'sda', 'zone': x % 3, 'region': x % 2, @@ -1265,12 +1278,11 @@ def fake_ec_node_response(node_frags, policy): call_count = {} # maps node index to get_response call count for node def _build_node_map(req, policy): - node_key = lambda n: (n['ip'], n['port']) part = utils.split_path(req['path'], 5, 5, True)[1] all_nodes.extend(policy.object_ring.get_part_nodes(part)) all_nodes.extend(policy.object_ring.get_more_nodes(part)) for i, node in enumerate(all_nodes): - node_map[node_key(node)] = i + node_map[(node['ip'], node['port'])] = i call_count[i] = 0 # normalize node_frags to a list of fragments for each node even diff --git a/test/unit/account/test_reaper.py b/test/unit/account/test_reaper.py index d8b76eb9b2..60d0d28b2f 100644 --- a/test/unit/account/test_reaper.py +++ b/test/unit/account/test_reaper.py @@ -322,7 +322,8 @@ class TestReaper(unittest.TestCase): 'X-Container-Partition': 'partition', 'X-Container-Device': device, 'X-Backend-Storage-Policy-Index': policy.idx, - 'X-Timestamp': '1429117638.86767' + 'X-Timestamp': '1429117638.86767', + 'x-backend-use-replication-network': 'true', } ring = r.get_object_ring(policy.idx) expected = call(dict(ring.devs[i], index=i), 0, @@ -442,7 +443,8 @@ class TestReaper(unittest.TestCase): 'X-Account-Partition': 'partition', 'X-Account-Device': device, 'X-Account-Override-Deleted': 'yes', - 'X-Timestamp': '1429117639.67676' + 'X-Timestamp': '1429117639.67676', + 'x-backend-use-replication-network': 'true', } ring = r.get_object_ring(policy.idx) expected = call(dict(ring.devs[i], index=i), 0, 'a', 'c', diff --git a/test/unit/common/test_direct_client.py b/test/unit/common/test_direct_client.py index f15aab2883..faf2887cf6 100644 --- a/test/unit/common/test_direct_client.py +++ b/test/unit/common/test_direct_client.py @@ -316,6 +316,31 @@ class TestDirectClient(unittest.TestCase): self.assertIn('X-Timestamp', headers) self.assertIn('User-Agent', headers) + def test_direct_delete_account_replication_net(self): + part = '0' + account = 'a' + + mock_path = 'swift.common.bufferedhttp.http_connect_raw' + with mock.patch(mock_path) as fake_connect: + fake_connect.return_value.getresponse.return_value.status = 200 + direct_client.direct_delete_account( + self.node, part, account, + headers={'X-Backend-Use-Replication-Network': 't'}) + args, kwargs = fake_connect.call_args + ip = args[0] + self.assertEqual(self.node['replication_ip'], ip) + self.assertNotEqual(self.node['ip'], ip) + port = args[1] + self.assertEqual(self.node['replication_port'], port) + self.assertNotEqual(self.node['port'], port) + method = args[2] + self.assertEqual('DELETE', method) + path = args[3] + self.assertEqual('/sda/0/a', path) + headers = args[4] + self.assertIn('X-Timestamp', headers) + self.assertIn('User-Agent', headers) + def test_direct_delete_account_failure(self): part = '0' account = 'a' @@ -346,6 +371,24 @@ class TestDirectClient(unittest.TestCase): self.user_agent) self.assertEqual(headers, resp) + def test_direct_head_container_replication_net(self): + headers = HeaderKeyDict(key='value') + + with mocked_http_conn(200, headers) as conn: + resp = direct_client.direct_head_container( + self.node, self.part, self.account, self.container, + headers={'X-Backend-Use-Replication-Network': 'on'}) + self.assertEqual(conn.host, self.node['replication_ip']) + self.assertEqual(conn.port, self.node['replication_port']) + self.assertNotEqual(conn.host, self.node['ip']) + self.assertNotEqual(conn.port, self.node['port']) + self.assertEqual(conn.method, 'HEAD') + self.assertEqual(conn.path, self.container_path) + + self.assertEqual(conn.req_headers['user-agent'], + self.user_agent) + self.assertEqual(headers, resp) + def test_direct_head_container_error(self): headers = HeaderKeyDict(key='value') @@ -441,6 +484,18 @@ class TestDirectClient(unittest.TestCase): self.assertEqual(conn.method, 'DELETE') self.assertEqual(conn.path, self.container_path) + def test_direct_delete_container_replication_net(self): + with mocked_http_conn(200) as conn: + direct_client.direct_delete_container( + self.node, self.part, self.account, self.container, + headers={'X-Backend-Use-Replication-Network': '1'}) + self.assertEqual(conn.host, self.node['replication_ip']) + self.assertEqual(conn.port, self.node['replication_port']) + self.assertNotEqual(conn.host, self.node['ip']) + self.assertNotEqual(conn.port, self.node['port']) + self.assertEqual(conn.method, 'DELETE') + self.assertEqual(conn.path, self.container_path) + def test_direct_delete_container_with_timestamp(self): # ensure timestamp is different from any that might be auto-generated timestamp = Timestamp(time.time() - 100) diff --git a/test/unit/common/test_internal_client.py b/test/unit/common/test_internal_client.py index da055d7653..a57eae04a7 100644 --- a/test/unit/common/test_internal_client.py +++ b/test/unit/common/test_internal_client.py @@ -25,7 +25,7 @@ from textwrap import dedent import six from six.moves import range, zip_longest from six.moves.urllib.parse import quote, parse_qsl -from swift.common import exceptions, internal_client, swob +from swift.common import exceptions, internal_client, request_helpers, swob from swift.common.header_key_dict import HeaderKeyDict from swift.common.storage_policy import StoragePolicy from swift.common.middleware.proxy_logging import ProxyLoggingMiddleware @@ -303,7 +303,7 @@ class TestInternalClient(unittest.TestCase): with mock.patch.object(internal_client, 'loadapp', app.load), \ self.assertRaises(ValueError): # First try with a bad arg - client = internal_client.InternalClient( + internal_client.InternalClient( conf_path, user_agent, request_tries=0) self.assertEqual(0, app.load_called) @@ -315,6 +315,18 @@ class TestInternalClient(unittest.TestCase): self.assertEqual(app, client.app) self.assertEqual(user_agent, client.user_agent) self.assertEqual(request_tries, client.request_tries) + self.assertFalse(client.use_replication_network) + + with mock.patch.object(internal_client, 'loadapp', app.load): + client = internal_client.InternalClient( + conf_path, user_agent, request_tries, + use_replication_network=True) + + self.assertEqual(2, app.load_called) + self.assertEqual(app, client.app) + self.assertEqual(user_agent, client.user_agent) + self.assertEqual(request_tries, client.request_tries) + self.assertTrue(client.use_replication_network) def test_make_request_sets_user_agent(self): class InternalClient(internal_client.InternalClient): @@ -323,8 +335,11 @@ class TestInternalClient(unittest.TestCase): self.app = self.fake_app self.user_agent = 'some_agent' self.request_tries = 1 + self.use_replication_network = False def fake_app(self, env, start_response): + self.test.assertNotIn( + 'HTTP_X_BACKEND_USE_REPLICATION_NETWORK', env) self.test.assertEqual(self.user_agent, env['HTTP_USER_AGENT']) start_response('200 Ok', [('Content-Length', '0')]) return [] @@ -332,6 +347,47 @@ class TestInternalClient(unittest.TestCase): client = InternalClient(self) client.make_request('GET', '/', {}, (200,)) + def test_make_request_defaults_replication_network_header(self): + class InternalClient(internal_client.InternalClient): + def __init__(self, test): + self.test = test + self.app = self.fake_app + self.user_agent = 'some_agent' + self.request_tries = 1 + self.use_replication_network = False + self.expected_header_value = None + + def fake_app(self, env, start_response): + if self.expected_header_value is None: + self.test.assertNotIn( + 'HTTP_X_BACKEND_USE_REPLICATION_NETWORK', env) + else: + hdr_val = env['HTTP_X_BACKEND_USE_REPLICATION_NETWORK'] + self.test.assertEqual(self.expected_header_value, hdr_val) + start_response('200 Ok', [('Content-Length', '0')]) + return [] + + client = InternalClient(self) + client.make_request('GET', '/', {}, (200,)) + # Caller can still override + client.expected_header_value = 'false' + client.make_request('GET', '/', { + request_helpers.USE_REPLICATION_NETWORK_HEADER: 'false'}, (200,)) + client.expected_header_value = 'true' + client.make_request('GET', '/', { + request_helpers.USE_REPLICATION_NETWORK_HEADER: 'true'}, (200,)) + + # Switch default behavior + client.use_replication_network = True + + client.make_request('GET', '/', {}, (200,)) + client.expected_header_value = 'false' + client.make_request('GET', '/', { + request_helpers.USE_REPLICATION_NETWORK_HEADER: 'false'}, (200,)) + client.expected_header_value = 'on' + client.make_request('GET', '/', { + request_helpers.USE_REPLICATION_NETWORK_HEADER: 'on'}, (200,)) + def test_make_request_sets_query_string(self): captured_envs = [] @@ -341,6 +397,7 @@ class TestInternalClient(unittest.TestCase): self.app = self.fake_app self.user_agent = 'some_agent' self.request_tries = 1 + self.use_replication_network = False def fake_app(self, env, start_response): captured_envs.append(env) @@ -362,6 +419,7 @@ class TestInternalClient(unittest.TestCase): self.app = self.fake_app self.user_agent = 'some_agent' self.request_tries = 4 + self.use_replication_network = False self.tries = 0 self.sleep_called = 0 @@ -441,6 +499,7 @@ class TestInternalClient(unittest.TestCase): self.app = self.fake_app self.user_agent = 'some_agent' self.request_tries = 3 + self.use_replication_network = False self.env = None def fake_app(self, env, start_response): @@ -468,6 +527,7 @@ class TestInternalClient(unittest.TestCase): self.fake_app, {}, self.logger) self.user_agent = 'some_agent' self.request_tries = 3 + self.use_replication_network = False def fake_app(self, env, start_response): body = b'fake error response' @@ -499,6 +559,7 @@ class TestInternalClient(unittest.TestCase): self.user_agent = 'some_agent' self.resp_status = resp_status self.request_tries = 3 + self.use_replication_network = False self.closed_paths = [] self.fully_read_paths = [] @@ -557,6 +618,7 @@ class TestInternalClient(unittest.TestCase): self.app = self.fake_app self.user_agent = 'some_agent' self.request_tries = 3 + self.use_replication_network = False def fake_app(self, env, start_response): start_response('200 Ok', [('Content-Length', '0')]) @@ -607,6 +669,7 @@ class TestInternalClient(unittest.TestCase): self.app = self.fake_app self.user_agent = 'some_agent' self.request_tries = 3 + self.use_replication_network = False self.status = status self.call_count = 0 @@ -698,6 +761,7 @@ class TestInternalClient(unittest.TestCase): def __init__(self): self.user_agent = 'test' self.request_tries = 1 + self.use_replication_network = False self.app = self.fake_app def fake_app(self, environ, start_response): @@ -1217,6 +1281,7 @@ class TestInternalClient(unittest.TestCase): self.app = self.fake_app self.user_agent = 'some_agent' self.request_tries = 3 + self.use_replication_network = False def fake_app(self, env, start_response): self.req_env = env @@ -1261,6 +1326,7 @@ class TestInternalClient(unittest.TestCase): self.app = self.fake_app self.user_agent = 'some_agent' self.request_tries = 3 + self.use_replication_network = False def fake_app(self, env, start_response): start_response('200 Ok', [('Content-Length', '0')]) @@ -1280,6 +1346,7 @@ class TestInternalClient(unittest.TestCase): self.app = self.fake_app self.user_agent = 'some_agent' self.request_tries = 3 + self.use_replication_network = False def fake_app(self, env, start_response): start_response('200 Ok', [('Content-Length', '0')]) @@ -1300,6 +1367,7 @@ class TestInternalClient(unittest.TestCase): self.app = self.fake_app self.user_agent = 'some_agent' self.request_tries = 3 + self.use_replication_network = False def fake_app(self, env, start_response): start_response('404 Not Found', []) @@ -1330,6 +1398,7 @@ class TestInternalClient(unittest.TestCase): class InternalClient(internal_client.InternalClient): def __init__(self, test, path, headers, fobj): self.test = test + self.use_replication_network = False self.path = path self.headers = headers self.fobj = fobj diff --git a/test/unit/common/test_request_helpers.py b/test/unit/common/test_request_helpers.py index 70e75d53a0..14bdd04d2a 100644 --- a/test/unit/common/test_request_helpers.py +++ b/test/unit/common/test_request_helpers.py @@ -124,6 +124,19 @@ class TestRequestHelpers(unittest.TestCase): self.assertFalse('c' in to_req.headers) self.assertFalse('C' in to_req.headers) + def test_get_ip_port(self): + node = { + 'ip': '1.2.3.4', + 'port': 6000, + 'replication_ip': '5.6.7.8', + 'replication_port': 7000, + } + self.assertEqual(('1.2.3.4', 6000), rh.get_ip_port(node, {})) + self.assertEqual(('5.6.7.8', 7000), rh.get_ip_port(node, { + rh.USE_REPLICATION_NETWORK_HEADER: 'true'})) + self.assertEqual(('1.2.3.4', 6000), rh.get_ip_port(node, { + rh.USE_REPLICATION_NETWORK_HEADER: 'false'})) + @patch_policies(with_ec_default=True) def test_get_name_and_placement_object_req(self): path = '/device/part/account/container/object' diff --git a/test/unit/container/test_reconciler.py b/test/unit/container/test_reconciler.py index fc227b4b87..5fc26edeff 100644 --- a/test/unit/container/test_reconciler.py +++ b/test/unit/container/test_reconciler.py @@ -95,6 +95,7 @@ class FakeInternalClient(reconciler.InternalClient): self.app = FakeStoragePolicySwift() self.user_agent = 'fake-internal-client' self.request_tries = 1 + self.use_replication_network = True self.parse(listings) def parse(self, listings): diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index a54ddb652b..36e0795234 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -171,7 +171,8 @@ class TestSharder(BaseTestSharder): 'container-sharder', sharder.logger.logger.name) mock_ic.assert_called_once_with( '/etc/swift/internal-client.conf', 'Swift Container Sharder', 3, - allow_modify_pipeline=False) + allow_modify_pipeline=False, + use_replication_network=True) conf = { 'mount_check': False, 'bind_ip': '10.11.12.13', 'bind_port': 62010, @@ -221,7 +222,8 @@ class TestSharder(BaseTestSharder): sharder, mock_ic = do_test(conf, expected) mock_ic.assert_called_once_with( '/etc/swift/my-sharder-ic.conf', 'Swift Container Sharder', 2, - allow_modify_pipeline=False) + allow_modify_pipeline=False, + use_replication_network=True) self.assertEqual(self.logger.get_lines_for_level('warning'), [ 'Option auto_create_account_prefix is deprecated. ' 'Configure auto_create_account_prefix under the ' @@ -731,11 +733,12 @@ class TestSharder(BaseTestSharder): self.logger.clear() conf = conf or {} conf['devices'] = self.tempdir + fake_ring = FakeRing(replicas=replicas, separate_replication=True) with mock.patch( 'swift.container.sharder.internal_client.InternalClient'): with mock.patch( 'swift.common.db_replicator.ring.Ring', - lambda *args, **kwargs: FakeRing(replicas=replicas)): + return_value=fake_ring): sharder = ContainerSharder(conf, logger=self.logger) sharder._local_device_ids = {0, 1, 2} sharder._replicate_object = mock.MagicMock( @@ -4185,20 +4188,31 @@ class TestSharder(BaseTestSharder): def check_shard_ranges_sent(self, broker, expected_sent): bodies = [] + servers = [] def capture_send(conn, data): bodies.append(data) + def capture_connect(host, port, *a, **kw): + servers.append((host, port)) + self.assertFalse(broker.get_own_shard_range().reported) # sanity with self._mock_sharder() as sharder: with mocked_http_conn(204, 204, 204, - give_send=capture_send) as mock_conn: + give_send=capture_send, + give_connect=capture_connect) as mock_conn: sharder._update_root_container(broker) for req in mock_conn.requests: self.assertEqual('PUT', req['method']) self.assertEqual([expected_sent] * 3, [json.loads(b) for b in bodies]) + self.assertEqual(servers, [ + # NB: replication interfaces + ('10.0.1.0', 1100), + ('10.0.1.1', 1101), + ('10.0.1.2', 1102), + ]) self.assertTrue(broker.get_own_shard_range().reported) def test_update_root_container_own_range(self): diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 8ccdfbdffc..e9cebfe930 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -318,7 +318,7 @@ class TestController(unittest.TestCase): self.controller.account_info(self.account, self.request) set_http_connect(201, raise_timeout_exc=True) self.controller._make_request( - nodes, partition, 'POST', '/', '', '', None, + nodes, partition, 'POST', '/', {}, '', None, self.controller.app.logger.thread_locals) # tests if 200 is cached and used