Merge "Enable per policy proxy config options"

This commit is contained in:
Jenkins 2017-05-25 06:34:48 +00:00 committed by Gerrit Code Review
commit 263dc8a3f3
11 changed files with 1150 additions and 117 deletions

View File

@ -1751,7 +1751,9 @@ sorting_method shuffle Storage nodes can be chosen at
control. In both the timing and
affinity cases, equally-sorting nodes
are still randomly chosen to spread
load.
load. This option may be overridden
in a per-policy configuration
section.
timing_expiry 300 If the "timing" sorting_method is
used, the timings will only be valid
for the number of seconds configured
@ -1809,14 +1811,18 @@ read_affinity None Specifies which backend servers t
be given to the selection; lower
numbers are higher priority.
Default is empty, meaning no
preference.
preference. This option may be
overridden in a per-policy
configuration section.
write_affinity None Specifies which backend servers to
prefer on writes. Format is a comma
separated list of affinity
descriptors of the form r<N> for
region N or r<N>z<M> for region N,
zone M. Default is empty, meaning no
preference.
preference. This option may be
overridden in a per-policy
configuration section.
write_affinity_node_count 2 * replicas The number of local (as governed by
the write_affinity setting) nodes to
attempt to contact first on writes,
@ -1825,9 +1831,99 @@ write_affinity_node_count 2 * replicas The number of local (as governed
'* replicas' at the end to have it
use the number given times the number
of replicas for the ring being used
for the request.
for the request. This option may be
overridden in a per-policy
configuration section.
============================ =============== =====================================
Per policy configuration
^^^^^^^^^^^^^^^^^^^^^^^^
Some proxy-server configuration options may be overridden on a per-policy
basis by including per-policy config section(s). These options are:
- sorting_method
- read_affinity
- write_affinity
- write_affinity_node_count
The per-policy config section name must be of the form::
[proxy-server:policy:<policy index>]
.. note::
The per-policy config section name should refer to the policy index, not
the policy name.
.. note::
The first part of proxy-server config section name must match the name of
the proxy-server config section. This is typically ``proxy-server`` as
shown above, but if different then the names of any per-policy config
sections must be changed accordingly.
The value of an option specified in a per-policy section will override any
value given in the proxy-server section for that policy only. Otherwise the
value of these options will be that specified in the proxy-server section.
For example, the following section provides policy-specific options for a
policy with index 3::
[proxy-server:policy:3]
sorting_method = affinity
read_affinity = r2=1
write_affinity = r2
write_affinity_node_count = 1 * replicas
.. note::
It is recommended that per-policy config options are *not* included in the
``[DEFAULT]`` section. If they are then the following behavior applies.
Per-policy config sections will inherit options in the DEFAULT section of
the config file, and any such inheritance will take precedence over
inheriting options from the proxy-server config section.
Per-policy config section options will override options in the
``[DEFAULT]`` section. Unlike the behavior described under `General Server
Configuration`_ for paste-deploy ``filter`` and ``app`` sections, the
``set`` keyword is not required for options to override in per-policy
config sections.
For example, given the following settings in a config file::
[DEFAULT]
sorting_method = affinity
read_affinity = r0=100
write_affinity = r0
[app:proxy-server]
use = egg:swift#proxy
# use of set keyword here overrides [DEFAULT] option
set read_affinity = r1=100
# without set keyword, [DEFAULT] option overrides in a paste-deploy section
write_affinity = r1
[proxy-server:policy:0]
sorting_method = affinity
# set keyword not required here to override [DEFAULT] option
write_affinity = r1
would result in policy with index ``0`` having settings:
* ``read_affinity = r0=100`` (inherited from the ``[DEFAULT]`` section)
* ``write_affinity = r1`` (specified in the policy 0 section)
and any other policy would have the default settings of:
* ``read_affinity = r1=100`` (set in the proxy-server section)
* ``write_affinity = r0`` (inherited from the ``[DEFAULT]`` section)
Tempauth
^^^^^^^^
[tempauth]
===================== =============================== =======================

View File

@ -174,6 +174,7 @@ use = egg:swift#proxy
# affinity cases, equally-sorting nodes are still randomly chosen to
# spread load.
# The valid values for sorting_method are "affinity", "shuffle", or "timing".
# This option may be overridden in a per-policy configuration section.
# sorting_method = shuffle
#
# If the "timing" sorting_method is used, the timings will only be valid for
@ -211,6 +212,7 @@ use = egg:swift#proxy
# anything in region 2, then everything else:
# read_affinity = r1z1=100, r1z2=200, r2=300
# Default is empty, meaning no preference.
# This option may be overridden in a per-policy configuration section.
# read_affinity =
#
# Specifies which backend servers to prefer on writes. Format is a comma
@ -223,6 +225,7 @@ use = egg:swift#proxy
# nodes:
# write_affinity = r1, r2
# Default is empty, meaning no preference.
# This option may be overridden in a per-policy configuration section.
# write_affinity =
#
# The number of local (as governed by the write_affinity setting) nodes to
@ -230,6 +233,7 @@ use = egg:swift#proxy
# should be an integer number, or use '* replicas' at the end to have it use
# the number given times the number of replicas for the ring being used for the
# request.
# This option may be overridden in a per-policy configuration section.
# write_affinity_node_count = 2 * replicas
#
# These are the headers whose values will only be shown to swift_owners. The
@ -249,6 +253,18 @@ use = egg:swift#proxy
# ionice_class =
# ionice_priority =
# Some proxy-server configuration options may be overridden on a per-policy
# basis by including per-policy config section(s). The value of any option
# specified a per-policy section will override any value given in the
# proxy-server section for that policy only. Otherwise the value of these
# options will be that specified in the proxy-server section.
# The section name should refer to the policy index, not the policy name.
# [proxy-server:policy:<policy index>]
# sorting_method =
# read_affinity =
# write_affinity =
# write_affinity_node_count =
[filter:tempauth]
use = egg:swift#tempauth
# You can override the default log routing for this filter here:

View File

@ -2443,6 +2443,8 @@ def readconf(conf_path, section_name=None, log_name=None, defaults=None,
else:
c = ConfigParser(defaults)
if hasattr(conf_path, 'readline'):
if hasattr(conf_path, 'seek'):
conf_path.seek(0)
c.readfp(conf_path)
else:
if os.path.isdir(conf_path):

View File

@ -65,6 +65,7 @@ class NamedConfigLoader(loadwsgi.ConfigLoader):
context = super(NamedConfigLoader, self).get_context(
object_type, name=name, global_conf=global_conf)
context.name = name
context.local_conf['__name__'] = name
return context
@ -114,7 +115,7 @@ class ConfigString(NamedConfigLoader):
self.filename = "string"
defaults = {
'here': "string",
'__file__': "string",
'__file__': StringIO(dedent(config_string)),
}
self.parser = loadwsgi.NicerConfigParser("string", defaults=defaults)
self.parser.optionxform = str # Don't lower-case keys

View File

@ -1306,9 +1306,11 @@ class NodeIter(object):
:param partition: ring partition to yield nodes for
:param node_iter: optional iterable of nodes to try. Useful if you
want to filter or reorder the nodes.
:param policy: an instance of :class:`BaseStoragePolicy`. This should be
None for an account or container ring.
"""
def __init__(self, app, ring, partition, node_iter=None):
def __init__(self, app, ring, partition, node_iter=None, policy=None):
self.app = app
self.ring = ring
self.partition = partition
@ -1324,7 +1326,8 @@ class NodeIter(object):
# Use of list() here forcibly yanks the first N nodes (the primary
# nodes) from node_iter, so the rest of its values are handoffs.
self.primary_nodes = self.app.sort_nodes(
list(itertools.islice(node_iter, num_primary_nodes)))
list(itertools.islice(node_iter, num_primary_nodes)),
policy=policy)
self.handoff_iter = node_iter
self._node_provider = None

View File

@ -129,7 +129,7 @@ class BaseObjectController(Controller):
self.container_name = unquote(container_name)
self.object_name = unquote(object_name)
def iter_nodes_local_first(self, ring, partition):
def iter_nodes_local_first(self, ring, partition, policy=None):
"""
Yields nodes for a ring partition.
@ -143,13 +143,13 @@ class BaseObjectController(Controller):
:param ring: ring to get nodes from
:param partition: ring partition to yield nodes for
"""
is_local = self.app.write_affinity_is_local_fn
policy_conf = self.app.get_policy_options(policy)
is_local = policy_conf.write_affinity_is_local_fn
if is_local is None:
return self.app.iter_nodes(ring, partition)
return self.app.iter_nodes(ring, partition, policy=policy)
primary_nodes = ring.get_part_nodes(partition)
num_locals = self.app.write_affinity_node_count(len(primary_nodes))
num_locals = policy_conf.write_affinity_node_count(len(primary_nodes))
all_nodes = itertools.chain(primary_nodes,
ring.get_more_nodes(partition))
@ -165,7 +165,7 @@ class BaseObjectController(Controller):
all_nodes))
return self.app.iter_nodes(
ring, partition, node_iter=local_first_node_iter)
ring, partition, node_iter=local_first_node_iter, policy=policy)
def GETorHEAD(self, req):
"""Handle HTTP GET or HEAD requests."""
@ -184,7 +184,7 @@ class BaseObjectController(Controller):
return aresp
partition = obj_ring.get_part(
self.account_name, self.container_name, self.object_name)
node_iter = self.app.iter_nodes(obj_ring, partition)
node_iter = self.app.iter_nodes(obj_ring, partition, policy=policy)
resp = self._get_or_head_response(req, node_iter, partition, policy)
@ -541,7 +541,7 @@ class BaseObjectController(Controller):
"""
obj_ring = policy.object_ring
node_iter = GreenthreadSafeIterator(
self.iter_nodes_local_first(obj_ring, partition))
self.iter_nodes_local_first(obj_ring, partition, policy=policy))
pile = GreenPile(len(nodes))
for nheaders in outgoing_headers:

View File

@ -16,6 +16,9 @@
import mimetypes
import os
import socket
from collections import defaultdict
from swift import gettext_ as _
from random import shuffle
from time import time
@ -32,7 +35,7 @@ from swift.common.ring import Ring
from swift.common.utils import cache_from_env, get_logger, \
get_remote_client, split_path, config_true_value, generate_trans_id, \
affinity_key_function, affinity_locality_predicate, list_from_csv, \
register_swift_info
register_swift_info, readconf
from swift.common.constraints import check_utf8, valid_api_version
from swift.proxy.controllers import AccountController, ContainerController, \
ObjectControllerRouter, InfoController
@ -76,6 +79,67 @@ required_filters = [
'catch_errors', 'gatekeeper', 'proxy_logging']}]
def _label_for_policy(policy):
if policy is not None:
return 'policy %s (%s)' % (policy.idx, policy.name)
return '(default)'
class OverrideConf(object):
"""
Encapsulates proxy server properties that may be overridden e.g. for
policy specific configurations.
:param conf: the proxy-server config dict.
:param override_conf: a dict of overriding configuration options.
"""
def __init__(self, base_conf, override_conf):
self.conf = base_conf
self.override_conf = override_conf
self.sorting_method = self._get('sorting_method', 'shuffle').lower()
self.read_affinity = self._get('read_affinity', '')
try:
self.read_affinity_sort_key = affinity_key_function(
self.read_affinity)
except ValueError as err:
# make the message a little more useful
raise ValueError("Invalid read_affinity value: %r (%s)" %
(self.read_affinity, err.message))
self.write_affinity = self._get('write_affinity', '')
try:
self.write_affinity_is_local_fn \
= affinity_locality_predicate(self.write_affinity)
except ValueError as err:
# make the message a little more useful
raise ValueError("Invalid write_affinity value: %r (%s)" %
(self.write_affinity, err.message))
self.write_affinity_node_value = self._get(
'write_affinity_node_count', '2 * replicas').lower()
value = self.write_affinity_node_value.split()
if len(value) == 1:
wanc_value = int(value[0])
self.write_affinity_node_count = lambda replicas: wanc_value
elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas':
wanc_value = int(value[0])
self.write_affinity_node_count = \
lambda replicas: wanc_value * replicas
else:
raise ValueError(
'Invalid write_affinity_node_count value: %r' %
(' '.join(value)))
def __repr__(self):
return ('sorting_method: %s, read_affinity: %s, write_affinity: %s, '
'write_affinity_node_count: %s' %
(self.sorting_method, self.read_affinity, self.write_affinity,
self.write_affinity_node_value))
def _get(self, key, default):
return self.override_conf.get(key, self.conf.get(key, default))
class Application(object):
"""WSGI application for the proxy server."""
@ -87,6 +151,9 @@ class Application(object):
self.logger = get_logger(conf, log_route='proxy-server')
else:
self.logger = logger
self._override_confs = self._load_per_policy_config(conf)
self.sorts_by_timing = any(pc.sorting_method == 'timing'
for pc in self._override_confs.values())
self._error_limiting = {}
@ -155,7 +222,6 @@ class Application(object):
conf.get('strict_cors_mode', 't'))
self.node_timings = {}
self.timing_expiry = int(conf.get('timing_expiry', 300))
self.sorting_method = conf.get('sorting_method', 'shuffle').lower()
self.concurrent_gets = \
config_true_value(conf.get('concurrent_gets'))
self.concurrency_timeout = float(conf.get('concurrency_timeout',
@ -170,33 +236,6 @@ class Application(object):
else:
raise ValueError(
'Invalid request_node_count value: %r' % ''.join(value))
try:
self._read_affinity = read_affinity = conf.get('read_affinity', '')
self.read_affinity_sort_key = affinity_key_function(read_affinity)
except ValueError as err:
# make the message a little more useful
raise ValueError("Invalid read_affinity value: %r (%s)" %
(read_affinity, err.message))
try:
write_affinity = conf.get('write_affinity', '')
self.write_affinity_is_local_fn \
= affinity_locality_predicate(write_affinity)
except ValueError as err:
# make the message a little more useful
raise ValueError("Invalid write_affinity value: %r (%s)" %
(write_affinity, err.message))
value = conf.get('write_affinity_node_count',
'2 * replicas').lower().split()
if len(value) == 1:
wanc_value = int(value[0])
self.write_affinity_node_count = lambda replicas: wanc_value
elif len(value) == 3 and value[1] == '*' and value[2] == 'replicas':
wanc_value = int(value[0])
self.write_affinity_node_count = \
lambda replicas: wanc_value * replicas
else:
raise ValueError(
'Invalid write_affinity_node_count value: %r' % ''.join(value))
# swift_owner_headers are stripped by the account and container
# controllers; we should extend header stripping to object controller
# when a privileged object header is implemented.
@ -235,15 +274,68 @@ class Application(object):
account_autocreate=self.account_autocreate,
**constraints.EFFECTIVE_CONSTRAINTS)
def _make_policy_override(self, policy, conf, override_conf):
label_for_policy = _label_for_policy(policy)
try:
override = OverrideConf(conf, override_conf)
self.logger.debug("Loaded override config for %s: %r" %
(label_for_policy, override))
return override
except ValueError as err:
raise ValueError(err.message + ' for %s' % label_for_policy)
def _load_per_policy_config(self, conf):
"""
Loads per-policy config override values from proxy server conf file.
:param conf: the proxy server local conf dict
:return: a dict mapping :class:`BaseStoragePolicy` to an instance of
:class:`OverrideConf` that has policy specific config attributes
"""
# the default conf will be used when looking up a policy that had no
# override conf
default_conf = self._make_policy_override(None, conf, {})
override_confs = defaultdict(lambda: default_conf)
# force None key to be set in the defaultdict so that it is found when
# iterating over items in check_config
override_confs[None] = default_conf
for index, override_conf in conf.get('policy_config', {}).items():
try:
index = int(index)
except ValueError:
# require policies to be referenced by index; using index *or*
# name isn't possible because names such as "3" are allowed
raise ValueError(
'Override config must refer to policy index: %r' % index)
try:
policy = POLICIES[index]
except KeyError:
raise ValueError(
"No policy found for override config, index: %s" % index)
override = self._make_policy_override(policy, conf, override_conf)
override_confs[policy] = override
return override_confs
def get_policy_options(self, policy):
"""
Return policy specific options.
:param policy: an instance of :class:`BaseStoragePolicy`
:return: an instance of :class:`OverrideConf`
"""
return self._override_confs[policy]
def check_config(self):
"""
Check the configuration for possible errors
"""
if self._read_affinity and self.sorting_method != 'affinity':
self.logger.warning(
_("sorting_method is set to '%s', not 'affinity'; "
"read_affinity setting will have no effect."),
self.sorting_method)
for policy, conf in self._override_confs.items():
if conf.read_affinity and conf.sorting_method != 'affinity':
self.logger.warning(
_("sorting_method is set to '%(method)s', not 'affinity'; "
"%(label)s read_affinity setting will have no effect."),
{'label': _label_for_policy(policy),
'method': conf.sorting_method})
def get_object_ring(self, policy_idx):
"""
@ -425,30 +517,34 @@ class Application(object):
self.logger.exception(_('ERROR Unhandled exception in request'))
return HTTPServerError(request=req)
def sort_nodes(self, nodes):
'''
def sort_nodes(self, nodes, policy=None):
"""
Sorts nodes in-place (and returns the sorted list) according to
the configured strategy. The default "sorting" is to randomly
shuffle the nodes. If the "timing" strategy is chosen, the nodes
are sorted according to the stored timing data.
'''
:param nodes: a list of nodes
:param policy: an instance of :class:`BaseStoragePolicy`
"""
# In the case of timing sorting, shuffling ensures that close timings
# (ie within the rounding resolution) won't prefer one over another.
# Python's sort is stable (http://wiki.python.org/moin/HowTo/Sorting/)
shuffle(nodes)
if self.sorting_method == 'timing':
policy_conf = self.get_policy_options(policy)
if policy_conf.sorting_method == 'timing':
now = time()
def key_func(node):
timing, expires = self.node_timings.get(node['ip'], (-1.0, 0))
return timing if expires > now else -1.0
nodes.sort(key=key_func)
elif self.sorting_method == 'affinity':
nodes.sort(key=self.read_affinity_sort_key)
elif policy_conf.sorting_method == 'affinity':
nodes.sort(key=policy_conf.read_affinity_sort_key)
return nodes
def set_node_timing(self, node, timing):
if self.sorting_method != 'timing':
if not self.sorts_by_timing:
return
now = time()
timing = round(timing, 3) # sort timings to the millisecond
@ -516,8 +612,9 @@ class Application(object):
{'msg': msg.decode('utf-8'), 'ip': node['ip'],
'port': node['port'], 'device': node['device']})
def iter_nodes(self, ring, partition, node_iter=None):
return NodeIter(self, ring, partition, node_iter=node_iter)
def iter_nodes(self, ring, partition, node_iter=None, policy=None):
return NodeIter(self, ring, partition, node_iter=node_iter,
policy=policy)
def exception_occurred(self, node, typ, additional_info,
**kwargs):
@ -575,10 +672,42 @@ class Application(object):
self.logger.debug(_("Pipeline is \"%s\""), pipe)
def parse_per_policy_config(conf):
"""
Search the config file for any per-policy config sections and load those
sections to a dict mapping policy reference (name or index) to policy
options.
:param conf: the proxy server conf dict
:return: a dict mapping policy reference -> dict of policy options
:raises ValueError: if a policy config section has an invalid name
"""
policy_config = {}
try:
all_conf = readconf(conf['__file__'])
except KeyError:
get_logger(conf).warning(
"Unable to load policy specific configuration options: "
"cannot access proxy server conf file")
return policy_config
policy_section_prefix = conf['__name__'] + ':policy:'
for section, options in all_conf.items():
if not section.startswith(policy_section_prefix):
continue
policy_ref = section[len(policy_section_prefix):]
policy_config[policy_ref] = options
return policy_config
def app_factory(global_conf, **local_conf):
"""paste.deploy app factory for creating WSGI proxy apps."""
conf = global_conf.copy()
conf.update(local_conf)
# Do this here so that the use of conf['__file__'] and conf['__name__'] is
# isolated from the Application. This also simplifies tests that construct
# an Application instance directly.
conf['policy_config'] = parse_per_policy_config(conf)
app = Application(conf)
app.check_config()
return app

View File

@ -169,6 +169,7 @@ class TestWSGI(unittest.TestCase):
'here': os.path.dirname(conf_file),
'conn_timeout': '0.2',
'swift_dir': t,
'__name__': 'proxy-server'
}
self.assertEqual(expected, conf)
# logger works
@ -234,6 +235,7 @@ class TestWSGI(unittest.TestCase):
'here': conf_dir,
'conn_timeout': '0.2',
'swift_dir': conf_root,
'__name__': 'proxy-server'
}
self.assertEqual(expected, conf)
# logger works
@ -571,7 +573,7 @@ class TestWSGI(unittest.TestCase):
expected = {
'__file__': os.path.join(path, 'server.conf.d'),
'here': os.path.join(path, 'server.conf.d'),
'port': '8080',
'port': '8080', '__name__': 'main'
}
self.assertEqual(conf, expected)

View File

@ -180,7 +180,7 @@ class TestContainerController(TestRingBase):
self.assertNotEqual(context['headers']['x-timestamp'], '1.0')
def test_node_errors(self):
self.app.sort_nodes = lambda n: n
self.app.sort_nodes = lambda n, *args, **kwargs: n
for method in ('PUT', 'DELETE', 'POST'):
def test_status_map(statuses, expected):

View File

@ -195,11 +195,12 @@ class BaseObjectControllerMixin(object):
def test_iter_nodes_local_first_noops_when_no_affinity(self):
# this test needs a stable node order - most don't
self.app.sort_nodes = lambda l: l
self.app.sort_nodes = lambda l, *args, **kwargs: l
controller = self.controller_cls(
self.app, 'a', 'c', 'o')
self.app.write_affinity_is_local_fn = None
object_ring = self.policy.object_ring
policy = self.policy
self.app.get_policy_options(policy).write_affinity_is_local_fn = None
object_ring = policy.object_ring
all_nodes = object_ring.get_part_nodes(1)
all_nodes.extend(object_ring.get_more_nodes(1))
@ -213,10 +214,11 @@ class BaseObjectControllerMixin(object):
def test_iter_nodes_local_first_moves_locals_first(self):
controller = self.controller_cls(
self.app, 'a', 'c', 'o')
self.app.write_affinity_is_local_fn = (
policy_conf = self.app.get_policy_options(self.policy)
policy_conf.write_affinity_is_local_fn = (
lambda node: node['region'] == 1)
# we'll write to one more than replica count local nodes
self.app.write_affinity_node_count = lambda r: r + 1
policy_conf.write_affinity_node_count = lambda r: r + 1
object_ring = self.policy.object_ring
# make our fake ring have plenty of nodes, and not get limited
@ -234,7 +236,7 @@ class BaseObjectControllerMixin(object):
# make sure we have enough local nodes (sanity)
all_local_nodes = [n for n in all_nodes if
self.app.write_affinity_is_local_fn(n)]
policy_conf.write_affinity_is_local_fn(n)]
self.assertGreaterEqual(len(all_local_nodes), self.replicas() + 1)
# finally, create the local_first_nodes iter and flatten it out
@ -252,7 +254,8 @@ class BaseObjectControllerMixin(object):
def test_iter_nodes_local_first_best_effort(self):
controller = self.controller_cls(
self.app, 'a', 'c', 'o')
self.app.write_affinity_is_local_fn = (
policy_conf = self.app.get_policy_options(self.policy)
policy_conf.write_affinity_is_local_fn = (
lambda node: node['region'] == 1)
object_ring = self.policy.object_ring
@ -266,7 +269,7 @@ class BaseObjectControllerMixin(object):
self.assertEqual(len(all_nodes), self.replicas() +
POLICIES.default.object_ring.max_more_nodes)
all_local_nodes = [n for n in all_nodes if
self.app.write_affinity_is_local_fn(n)]
policy_conf.write_affinity_is_local_fn(n)]
self.assertEqual(len(all_local_nodes), self.replicas())
# but the local nodes we do have are at the front of the local iter
first_n_local_first_nodes = local_first_nodes[:len(all_local_nodes)]
@ -575,6 +578,80 @@ class BaseObjectControllerMixin(object):
self.assertEqual(container_updates, expected)
def _check_write_affinity(
self, conf, policy_conf, policy, affinity_regions, affinity_count):
conf['policy_config'] = policy_conf
app = PatchedObjControllerApp(
conf, FakeMemcache(), account_ring=FakeRing(),
container_ring=FakeRing(), logger=self.logger)
controller = self.controller_cls(app, 'a', 'c', 'o')
object_ring = app.get_object_ring(int(policy))
# make our fake ring have plenty of nodes, and not get limited
# artificially by the proxy max request node count
object_ring.max_more_nodes = 100
all_nodes = object_ring.get_part_nodes(1)
all_nodes.extend(object_ring.get_more_nodes(1))
# make sure we have enough local nodes (sanity)
all_local_nodes = [n for n in all_nodes if
n['region'] in affinity_regions]
self.assertGreaterEqual(len(all_local_nodes), affinity_count)
# finally, create the local_first_nodes iter and flatten it out
local_first_nodes = list(controller.iter_nodes_local_first(
object_ring, 1, policy))
# check that the required number of local nodes were moved up the order
node_regions = [node['region'] for node in local_first_nodes]
self.assertTrue(
all(r in affinity_regions for r in node_regions[:affinity_count]),
'Unexpected region found in local nodes, expected %s but got %s' %
(affinity_regions, node_regions))
return app
def test_write_affinity_not_configured(self):
# default is no write affinity so expect both regions 0 and 1
self._check_write_affinity({}, {}, POLICIES[0], [0, 1],
2 * self.replicas(POLICIES[0]))
self._check_write_affinity({}, {}, POLICIES[1], [0, 1],
2 * self.replicas(POLICIES[1]))
def test_write_affinity_proxy_server_config(self):
# without overrides policies use proxy-server config section options
conf = {'write_affinity_node_count': '1 * replicas',
'write_affinity': 'r0'}
self._check_write_affinity(conf, {}, POLICIES[0], [0],
self.replicas(POLICIES[0]))
self._check_write_affinity(conf, {}, POLICIES[1], [0],
self.replicas(POLICIES[1]))
def test_write_affinity_per_policy_config(self):
# check only per-policy configuration is sufficient
conf = {}
policy_conf = {'0': {'write_affinity_node_count': '1 * replicas',
'write_affinity': 'r1'},
'1': {'write_affinity_node_count': '5',
'write_affinity': 'r0'}}
self._check_write_affinity(conf, policy_conf, POLICIES[0], [1],
self.replicas(POLICIES[0]))
self._check_write_affinity(conf, policy_conf, POLICIES[1], [0], 5)
def test_write_affinity_per_policy_config_overrides_and_inherits(self):
# check per-policy config is preferred over proxy-server section config
conf = {'write_affinity_node_count': '1 * replicas',
'write_affinity': 'r0'}
policy_conf = {'0': {'write_affinity': 'r1'},
'1': {'write_affinity_node_count': '3 * replicas'}}
# policy 0 inherits default node count, override affinity to r1
self._check_write_affinity(conf, policy_conf, POLICIES[0], [1],
self.replicas(POLICIES[0]))
# policy 1 inherits default affinity to r0, overrides node count
self._check_write_affinity(conf, policy_conf, POLICIES[1], [0],
3 * self.replicas(POLICIES[1]))
# end of BaseObjectControllerMixin
@ -843,7 +920,7 @@ class TestReplicatedObjController(BaseObjectControllerMixin,
def test_PUT_connect_exceptions(self):
object_ring = self.app.get_object_ring(None)
self.app.sort_nodes = lambda n: n # disable shuffle
self.app.sort_nodes = lambda n, *args, **kwargs: n # disable shuffle
def test_status_map(statuses, expected):
self.app._error_limiting = {}

View File

@ -40,6 +40,7 @@ import re
import random
from collections import defaultdict
import uuid
from copy import deepcopy
import mock
from eventlet import sleep, spawn, wsgi, Timeout, debug
@ -67,7 +68,7 @@ from swift.common import utils, constraints
from swift.common.utils import hash_path, storage_directory, \
parse_content_type, parse_mime_headers, \
iter_multipart_mime_documents, public, mkdirs, NullLogger
from swift.common.wsgi import monkey_patch_mimetools, loadapp
from swift.common.wsgi import monkey_patch_mimetools, loadapp, ConfigString
from swift.proxy.controllers import base as proxy_base
from swift.proxy.controllers.base import get_cache_key, cors_validation, \
get_account_info, get_container_info
@ -748,20 +749,156 @@ class TestProxyServer(unittest.TestCase):
{'ip': '127.0.0.1'}]
self.assertEqual(res, exp_sorting)
def test_node_affinity(self):
baseapp = proxy_server.Application({'sorting_method': 'affinity',
'read_affinity': 'r1=1'},
def _do_sort_nodes(self, conf, policy_conf, nodes, policy,
node_timings=None):
# Note with shuffling mocked out, sort_nodes will by default return
# nodes in the order they are given
nodes = deepcopy(nodes)
conf = deepcopy(conf)
conf['policy_config'] = deepcopy(policy_conf)
baseapp = proxy_server.Application(conf,
FakeMemcache(),
logger=FakeLogger(),
container_ring=FakeRing(),
account_ring=FakeRing())
nodes = [{'region': 2, 'zone': 1, 'ip': '127.0.0.1'},
{'region': 1, 'zone': 2, 'ip': '127.0.0.2'}]
if node_timings:
for i, n in enumerate(nodes):
baseapp.set_node_timing(n, node_timings[i])
with mock.patch('swift.proxy.server.shuffle', lambda x: x):
app_sorted = baseapp.sort_nodes(nodes)
exp_sorted = [{'region': 1, 'zone': 2, 'ip': '127.0.0.2'},
{'region': 2, 'zone': 1, 'ip': '127.0.0.1'}]
self.assertEqual(exp_sorted, app_sorted)
app_sorted = baseapp.sort_nodes(nodes, policy)
self.assertFalse(baseapp.logger.get_lines_for_level('warning'))
return baseapp, app_sorted
def test_sort_nodes_default(self):
nodes = [{'region': 0, 'zone': 1, 'ip': '127.0.0.3'},
{'region': 1, 'zone': 1, 'ip': '127.0.0.1'},
{'region': 2, 'zone': 2, 'ip': '127.0.0.2'}]
# sanity check - no affinity conf results in node order unchanged
app, actual = self._do_sort_nodes({}, {}, nodes, None)
self.assertEqual(nodes, actual)
def test_sort_nodes_by_affinity_proxy_server_config(self):
nodes = [{'region': 0, 'zone': 1, 'ip': '127.0.0.3'},
{'region': 1, 'zone': 1, 'ip': '127.0.0.1'},
{'region': 2, 'zone': 2, 'ip': '127.0.0.2'}]
# proxy-server affinity conf is to prefer r2
conf = {'sorting_method': 'affinity', 'read_affinity': 'r2=1'}
app, actual = self._do_sort_nodes(conf, {}, nodes, None)
self.assertEqual([nodes[2], nodes[0], nodes[1]], actual)
app, actual = self._do_sort_nodes(conf, {}, nodes, POLICIES[0])
self.assertEqual([nodes[2], nodes[0], nodes[1]], actual)
# check that node timings are not collected if sorting_method != timing
self.assertFalse(app.sorts_by_timing) # sanity check
self.assertFalse(app.node_timings) # sanity check
# proxy-server affinity conf is to prefer region 1
conf = {'sorting_method': 'affinity', 'read_affinity': 'r1=1'}
app, actual = self._do_sort_nodes(conf, {}, nodes, None)
self.assertEqual([nodes[1], nodes[0], nodes[2]], actual)
app, actual = self._do_sort_nodes(conf, {}, nodes, POLICIES[0])
self.assertEqual([nodes[1], nodes[0], nodes[2]], actual)
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing()),
StoragePolicy(1, 'one', False, object_ring=FakeRing())])
def test_sort_nodes_by_affinity_per_policy(self):
nodes = [{'region': 0, 'zone': 1, 'ip': '127.0.0.4'},
{'region': 1, 'zone': 0, 'ip': '127.0.0.3'},
{'region': 2, 'zone': 1, 'ip': '127.0.0.1'},
{'region': 3, 'zone': 0, 'ip': '127.0.0.2'}]
conf = {'sorting_method': 'affinity', 'read_affinity': 'r3=1'}
per_policy = {'0': {'sorting_method': 'affinity',
'read_affinity': 'r1=1'},
'1': {'sorting_method': 'affinity',
'read_affinity': 'r2=1'}}
# policy 0 affinity prefers r1
app, actual = self._do_sort_nodes(conf, per_policy, nodes, POLICIES[0])
self.assertEqual([nodes[1], nodes[0], nodes[2], nodes[3]], actual)
# policy 1 affinity prefers r2
app, actual = self._do_sort_nodes(conf, per_policy, nodes, POLICIES[1])
self.assertEqual([nodes[2], nodes[0], nodes[1], nodes[3]], actual)
# default affinity prefers r3
app, actual = self._do_sort_nodes(conf, per_policy, nodes, None)
self.assertEqual([nodes[3], nodes[0], nodes[1], nodes[2]], actual)
def test_sort_nodes_by_affinity_per_policy_with_no_default(self):
# no proxy-server setting but policy 0 prefers r0
nodes = [{'region': 1, 'zone': 1, 'ip': '127.0.0.1'},
{'region': 0, 'zone': 2, 'ip': '127.0.0.2'}]
conf = {}
per_policy = {'0': {'sorting_method': 'affinity',
'read_affinity': 'r0=0'}}
# policy 0 uses affinity sorting
app, actual = self._do_sort_nodes(conf, per_policy, nodes, POLICIES[0])
self.assertEqual([nodes[1], nodes[0]], actual)
# any other policy will use default sorting
app, actual = self._do_sort_nodes(conf, per_policy, nodes, None)
self.assertEqual(nodes, actual)
def test_sort_nodes_by_affinity_per_policy_inherits(self):
# policy 0 has read_affinity but no sorting_method override,
nodes = [{'region': 1, 'zone': 1, 'ip': '127.0.0.1'},
{'region': 0, 'zone': 2, 'ip': '127.0.0.2'}]
conf = {}
per_policy = {'0': {'read_affinity': 'r0=0'}}
# policy 0 uses the default sorting method instead of affinity sorting
app, actual = self._do_sort_nodes(conf, per_policy, nodes, POLICIES[0])
self.assertEqual(nodes, actual)
# but if proxy-server sorting_method is affinity then policy 0 inherits
conf = {'sorting_method': 'affinity'}
app, actual = self._do_sort_nodes(conf, per_policy, nodes, POLICIES[0])
self.assertEqual([nodes[1], nodes[0]], actual)
def test_sort_nodes_by_affinity_per_policy_overrides(self):
# default setting is to sort by timing but policy 0 uses read affinity
nodes = [{'region': 0, 'zone': 1, 'ip': '127.0.0.3'},
{'region': 1, 'zone': 1, 'ip': '127.0.0.1'},
{'region': 2, 'zone': 2, 'ip': '127.0.0.2'}]
node_timings = [10, 1, 100]
conf = {'sorting_method': 'timing'}
per_policy = {'0': {'sorting_method': 'affinity',
'read_affinity': 'r1=1,r2=2'}}
app, actual = self._do_sort_nodes(conf, per_policy, nodes, POLICIES[0],
node_timings=node_timings)
self.assertEqual([nodes[1], nodes[2], nodes[0]], actual)
# check that timings are collected despite one policy using affinity
self.assertTrue(app.sorts_by_timing)
self.assertEqual(3, len(app.node_timings))
# check app defaults to sorting by timing when no policy specified
app, actual = self._do_sort_nodes(conf, per_policy, nodes, None,
node_timings=node_timings)
self.assertEqual([nodes[1], nodes[0], nodes[2]], actual)
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing()),
StoragePolicy(1, 'one', False, object_ring=FakeRing())])
def test_sort_nodes_by_timing_per_policy(self):
# default setting is to sort by affinity but policy 0 uses timing
nodes = [{'region': 0, 'zone': 1, 'ip': '127.0.0.3'},
{'region': 1, 'zone': 1, 'ip': '127.0.0.1'},
{'region': 2, 'zone': 2, 'ip': '127.0.0.2'}]
node_timings = [10, 1, 100]
conf = {'sorting_method': 'affinity', 'read_affinity': 'r1=1,r2=2'}
per_policy = {'0': {'sorting_method': 'timing',
'read_affinity': 'r1=1,r2=2'}, # should be ignored
'1': {'read_affinity': 'r2=1'}}
# policy 0 uses timing
app, actual = self._do_sort_nodes(conf, per_policy, nodes, POLICIES[0],
node_timings=node_timings)
self.assertEqual([nodes[1], nodes[0], nodes[2]], actual)
self.assertTrue(app.sorts_by_timing)
self.assertEqual(3, len(app.node_timings))
# policy 1 uses policy specific read affinity
app, actual = self._do_sort_nodes(conf, per_policy, nodes, POLICIES[1],
node_timings=node_timings)
self.assertEqual([nodes[2], nodes[0], nodes[1]], actual)
# check that with no policy specified the default read affinity is used
app, actual = self._do_sort_nodes(conf, per_policy, nodes, None,
node_timings=node_timings)
self.assertEqual([nodes[1], nodes[2], nodes[0]], actual)
def test_node_concurrency(self):
nodes = [{'region': 1, 'zone': 1, 'ip': '127.0.0.1', 'port': 6010,
@ -1141,6 +1278,468 @@ class TestProxyServerLoading(unittest.TestCase):
self.assertTrue(policy.object_ring)
@patch_policies()
class TestProxyServerConfigLoading(unittest.TestCase):
def setUp(self):
self.tempdir = mkdtemp()
account_ring_path = os.path.join(self.tempdir, 'account.ring.gz')
write_fake_ring(account_ring_path)
container_ring_path = os.path.join(self.tempdir, 'container.ring.gz')
write_fake_ring(container_ring_path)
def tearDown(self):
rmtree(self.tempdir)
def _write_conf(self, conf_body):
# this is broken out to a method so that subclasses can override
conf_path = os.path.join(self.tempdir, 'proxy-server.conf')
with open(conf_path, 'w') as f:
f.write(dedent(conf_body))
return conf_path
def _write_conf_and_load_app(self, conf_sections):
# write proxy-server.conf file, load app
conf_body = """
[DEFAULT]
swift_dir = %s
[pipeline:main]
pipeline = proxy-server
%s
""" % (self.tempdir, conf_sections)
conf_path = self._write_conf(conf_body)
with mock.patch('swift.proxy.server.get_logger',
return_value=FakeLogger()):
app = loadapp(conf_path, allow_modify_pipeline=False)
return app
def _check_policy_conf(self, app, exp_conf, exp_is_local):
# verify expected config
for policy, options in exp_conf.items():
for k, v in options.items():
actual = getattr(app.get_policy_options(policy), k)
if k == "write_affinity_node_count":
if policy: # this check only applies when using a policy
actual = actual(policy.object_ring.replica_count)
self.assertEqual(v, actual)
continue
self.assertEqual(v, actual,
"Expected %s=%s but got %s=%s for policy %s" %
(k, v, k, actual, policy))
for policy, nodes in exp_is_local.items():
fn = app.get_policy_options(policy).write_affinity_is_local_fn
if nodes is None:
self.assertIsNone(fn)
continue
for node, expected_result in nodes:
actual = fn(node)
self.assertIs(expected_result, actual,
"Expected %s but got %s for %s, policy %s" %
(expected_result, actual, node, policy))
return app
def test_per_policy_conf_none_configured(self):
conf_sections = """
[app:proxy-server]
use = egg:swift#proxy
"""
expected_default = {"read_affinity": "",
"sorting_method": "shuffle",
"write_affinity_node_count": 6}
exp_conf = {None: expected_default,
POLICIES[0]: expected_default,
POLICIES[1]: expected_default}
exp_is_local = {POLICIES[0]: None,
POLICIES[1]: None}
app = self._write_conf_and_load_app(conf_sections)
self._check_policy_conf(app, exp_conf, exp_is_local)
def test_per_policy_conf_one_configured(self):
conf_sections = """
[app:proxy-server]
use = egg:swift#proxy
[proxy-server:policy:0]
sorting_method = affinity
read_affinity = r1=100
write_affinity = r1
write_affinity_node_count = 1 * replicas
"""
expected_default = {"read_affinity": "",
"sorting_method": "shuffle",
"write_affinity_node_count": 6}
exp_conf = {None: expected_default,
POLICIES[0]: {"read_affinity": "r1=100",
"sorting_method": "affinity",
"write_affinity_node_count": 3},
POLICIES[1]: expected_default}
exp_is_local = {POLICIES[0]: [({'region': 1, 'zone': 2}, True),
({'region': 2, 'zone': 1}, False)],
POLICIES[1]: None}
app = self._write_conf_and_load_app(conf_sections)
self._check_policy_conf(app, exp_conf, exp_is_local)
default_conf = app.get_policy_options(None)
self.assertEqual(
('sorting_method: shuffle, read_affinity: , write_affinity: , '
'write_affinity_node_count: 2 * replicas'),
repr(default_conf))
policy_0_conf = app.get_policy_options(POLICIES[0])
self.assertEqual(
('sorting_method: affinity, read_affinity: r1=100, '
'write_affinity: r1, write_affinity_node_count: 1 * replicas'),
repr(policy_0_conf))
policy_1_conf = app.get_policy_options(POLICIES[1])
self.assertIs(default_conf, policy_1_conf)
def test_per_policy_conf_inherits_defaults(self):
conf_sections = """
[app:proxy-server]
use = egg:swift#proxy
sorting_method = affinity
write_affinity_node_count = 1 * replicas
[proxy-server:policy:0]
read_affinity = r1=100
write_affinity = r1
"""
expected_default = {"read_affinity": "",
"sorting_method": "affinity",
"write_affinity_node_count": 3}
exp_conf = {None: expected_default,
POLICIES[0]: {"read_affinity": "r1=100",
"sorting_method": "affinity",
"write_affinity_node_count": 3},
POLICIES[1]: expected_default}
exp_is_local = {POLICIES[0]: [({'region': 1, 'zone': 2}, True),
({'region': 2, 'zone': 1}, False)],
POLICIES[1]: None}
app = self._write_conf_and_load_app(conf_sections)
self._check_policy_conf(app, exp_conf, exp_is_local)
def test_per_policy_conf_overrides_default_affinity(self):
conf_sections = """
[app:proxy-server]
use = egg:swift#proxy
sorting_method = affinity
read_affinity = r2=10
write_affinity_node_count = 1 * replicas
write_affinity = r2
[proxy-server:policy:0]
read_affinity = r1=100
write_affinity = r1
write_affinity_node_count = 5
[proxy-server:policy:1]
read_affinity = r1=1
write_affinity = r3
write_affinity_node_count = 4
"""
exp_conf = {None: {"read_affinity": "r2=10",
"sorting_method": "affinity",
"write_affinity_node_count": 3},
POLICIES[0]: {"read_affinity": "r1=100",
"sorting_method": "affinity",
"write_affinity_node_count": 5},
POLICIES[1]: {"read_affinity": "r1=1",
"sorting_method": "affinity",
"write_affinity_node_count": 4}}
exp_is_local = {POLICIES[0]: [({'region': 1, 'zone': 2}, True),
({'region': 2, 'zone': 1}, False)],
POLICIES[1]: [({'region': 3, 'zone': 2}, True),
({'region': 1, 'zone': 1}, False),
({'region': 2, 'zone': 1}, False)]}
app = self._write_conf_and_load_app(conf_sections)
self._check_policy_conf(app, exp_conf, exp_is_local)
def test_per_policy_conf_overrides_default_sorting_method(self):
conf_sections = """
[app:proxy-server]
use = egg:swift#proxy
sorting_method = timing
[proxy-server:policy:0]
sorting_method = affinity
read_affinity = r1=100
[proxy-server:policy:1]
sorting_method = affinity
read_affinity = r1=1
"""
exp_conf = {None: {"read_affinity": "",
"sorting_method": "timing"},
POLICIES[0]: {"read_affinity": "r1=100",
"sorting_method": "affinity"},
POLICIES[1]: {"read_affinity": "r1=1",
"sorting_method": "affinity"}}
app = self._write_conf_and_load_app(conf_sections)
self._check_policy_conf(app, exp_conf, {})
def test_per_policy_conf_with_DEFAULT_options(self):
conf_body = """
[DEFAULT]
write_affinity = r0
read_affinity = r0=100
swift_dir = %s
[pipeline:main]
pipeline = proxy-server
[app:proxy-server]
use = egg:swift#proxy
# in a paste-deploy section, DEFAULT section value overrides
write_affinity = r2
# ...but the use of 'set' overrides the DEFAULT section value
set read_affinity = r1=100
[proxy-server:policy:0]
# not a paste-deploy section so any value here overrides DEFAULT
sorting_method = affinity
write_affinity = r2
read_affinity = r2=100
[proxy-server:policy:1]
sorting_method = affinity
""" % self.tempdir
conf_path = self._write_conf(conf_body)
with mock.patch('swift.proxy.server.get_logger',
return_value=FakeLogger()):
app = loadapp(conf_path, allow_modify_pipeline=False)
exp_conf = {
# default read_affinity is r1, set in proxy-server section
None: {"read_affinity": "r1=100",
"sorting_method": "shuffle",
"write_affinity_node_count": 6},
# policy 0 read affinity is r2, dictated by policy 0 section
POLICIES[0]: {"read_affinity": "r2=100",
"sorting_method": "affinity",
"write_affinity_node_count": 6},
# policy 1 read_affinity is r0, dictated by DEFAULT section,
# overrides proxy server section
POLICIES[1]: {"read_affinity": "r0=100",
"sorting_method": "affinity",
"write_affinity_node_count": 6}}
exp_is_local = {
# default write_affinity is r0, dictated by DEFAULT section
None: [({'region': 0, 'zone': 2}, True),
({'region': 1, 'zone': 1}, False)],
# policy 0 write_affinity is r2, dictated by policy 0 section
POLICIES[0]: [({'region': 0, 'zone': 2}, False),
({'region': 2, 'zone': 1}, True)],
# policy 1 write_affinity is r0, inherited from default
POLICIES[1]: [({'region': 0, 'zone': 2}, True),
({'region': 1, 'zone': 1}, False)]}
self._check_policy_conf(app, exp_conf, exp_is_local)
def test_per_policy_conf_warns_about_sorting_method_mismatch(self):
# verify that policy specific warnings are emitted when read_affinity
# is set but sorting_method is not affinity
conf_sections = """
[app:proxy-server]
use = egg:swift#proxy
read_affinity = r2=10
sorting_method = timing
[proxy-server:policy:0]
read_affinity = r1=100
[proxy-server:policy:1]
sorting_method = affinity
read_affinity = r1=1
"""
exp_conf = {None: {"read_affinity": "r2=10",
"sorting_method": "timing"},
POLICIES[0]: {"read_affinity": "r1=100",
"sorting_method": "timing"},
POLICIES[1]: {"read_affinity": "r1=1",
"sorting_method": "affinity"}}
app = self._write_conf_and_load_app(conf_sections)
self._check_policy_conf(app, exp_conf, {})
lines = app.logger.get_lines_for_level('warning')
scopes = {'default', 'policy 0 (nulo)'}
for line in lines[:2]:
self.assertIn(
"sorting_method is set to 'timing', not 'affinity'", line)
for scope in scopes:
if scope in line:
scopes.remove(scope)
break
else:
self.fail("None of %s found in warning: %r" % (scopes, line))
self.assertFalse(scopes)
def test_per_policy_conf_with_unknown_policy(self):
# verify that unknown policy section is warned about but doesn't break
# other policy configs
conf_sections = """
[app:proxy-server]
use = egg:swift#proxy
read_affinity = r2=10
sorting_method = affinity
[proxy-server:policy:999]
read_affinity = r2z1=1
"""
with self.assertRaises(ValueError) as cm:
self._write_conf_and_load_app(conf_sections)
self.assertIn('No policy found for override config, index: 999',
cm.exception.message)
def test_per_policy_conf_sets_timing_sorting_method(self):
conf_sections = """
[app:proxy-server]
use = egg:swift#proxy
sorting_method = affinity
[proxy-server:policy:0]
sorting_method = timing
[proxy-server:policy:1]
read_affinity = r1=1
"""
exp_conf = {None: {"read_affinity": "",
"sorting_method": "affinity"},
POLICIES[0]: {"read_affinity": "",
"sorting_method": "timing"},
POLICIES[1]: {"read_affinity": "r1=1",
"sorting_method": "affinity"}}
app = self._write_conf_and_load_app(conf_sections)
self._check_policy_conf(app, exp_conf, {})
def test_per_policy_conf_invalid_read_affinity_value(self):
def do_test(conf_sections, scope):
with self.assertRaises(ValueError) as cm:
self._write_conf_and_load_app(conf_sections)
self.assertIn('broken', cm.exception.message)
self.assertIn(
'Invalid read_affinity value:', cm.exception.message)
self.assertIn(scope, cm.exception.message)
conf_sections = """
[app:proxy-server]
use = egg:swift#proxy
sorting_method = affinity
read_affinity = r1=1
[proxy-server:policy:0]
sorting_method = affinity
read_affinity = broken
"""
do_test(conf_sections, 'policy 0 (nulo)')
conf_sections = """
[app:proxy-server]
use = egg:swift#proxy
sorting_method = affinity
read_affinity = broken
[proxy-server:policy:0]
sorting_method = affinity
read_affinity = r1=1
"""
do_test(conf_sections, '(default)')
def test_per_policy_conf_invalid_write_affinity_value(self):
def do_test(conf_sections, scope):
with self.assertRaises(ValueError) as cm:
self._write_conf_and_load_app(conf_sections)
self.assertIn('broken', cm.exception.message)
self.assertIn(
'Invalid write_affinity value:', cm.exception.message)
self.assertIn(scope, cm.exception.message)
conf_sections = """
[app:proxy-server]
use = egg:swift#proxy
write_affinity = r1
[proxy-server:policy:0]
sorting_method = affinity
write_affinity = broken
"""
do_test(conf_sections, 'policy 0 (nulo)')
conf_sections = """
[app:proxy-server]
use = egg:swift#proxy
write_affinity = broken
[proxy-server:policy:0]
write_affinity = r1
"""
do_test(conf_sections, '(default)')
def test_per_policy_conf_invalid_write_affinity_node_count_value(self):
def do_test(conf_sections, scope):
with self.assertRaises(ValueError) as cm:
self._write_conf_and_load_app(conf_sections)
self.assertIn('2* replicas', cm.exception.message)
self.assertIn('Invalid write_affinity_node_count value:',
cm.exception.message)
self.assertIn(scope, cm.exception.message)
conf_sections = """
[app:proxy-server]
use = egg:swift#proxy
write_affinity_node_count = 2 * replicas
[proxy-server:policy:0]
sorting_method = affinity
write_affinity_node_count = 2* replicas
"""
do_test(conf_sections, 'policy 0 (nulo)')
conf_sections = """
[app:proxy-server]
use = egg:swift#proxy
write_affinity_node_count = 2* replicas
[proxy-server:policy:0]
write_affinity_node_count = 2 * replicas
"""
do_test(conf_sections, '(default)')
def test_per_policy_conf_bad_section_name(self):
conf_sections = """
[app:proxy-server]
use = egg:swift#proxy
[proxy-server:policy:]
"""
with self.assertRaises(ValueError) as cm:
self._write_conf_and_load_app(conf_sections)
self.assertIn("Override config must refer to policy index: ''",
cm.exception.message)
def test_per_policy_conf_section_name_not_index(self):
conf_sections = """
[app:proxy-server]
use = egg:swift#proxy
[proxy-server:policy:uno]
"""
with self.assertRaises(ValueError) as cm:
self._write_conf_and_load_app(conf_sections)
self.assertIn("Override config must refer to policy index: 'uno'",
cm.exception.message)
class TestProxyServerConfigStringLoading(TestProxyServerConfigLoading):
# The proxy may be loaded from a conf string rather than a conf file, for
# example when ContainerSync creates an InternalClient from a default
# config string. So repeat super-class tests using a string loader.
def _write_conf(self, conf_body):
# this is broken out to a method so that subclasses can override
return ConfigString(conf_body)
class BaseTestObjectController(object):
"""
A root of TestObjController that implements helper methods for child
@ -1953,7 +2552,8 @@ class TestReplicatedObjectController(
self.assertEqual(test_errors, [])
self.assertTrue(res.status.startswith('201 '))
def test_PUT_respects_write_affinity(self):
def _check_PUT_respects_write_affinity(self, conf, policy,
expected_region):
written_to = []
def test_connect(ipaddr, port, device, partition, method, path,
@ -1961,33 +2561,65 @@ class TestReplicatedObjectController(
if path == '/a/c/o.jpg':
written_to.append((ipaddr, port, device))
with save_globals():
def is_r0(node):
return node['region'] == 0
object_ring = self.app.get_object_ring(None)
object_ring.max_more_nodes = 100
self.app.write_affinity_is_local_fn = is_r0
self.app.write_affinity_node_count = lambda r: 3
controller = \
ReplicatedObjectController(
self.app, 'a', 'c', 'o.jpg')
set_http_connect(200, 200, 201, 201, 201,
give_connect=test_connect)
req = Request.blank('/v1/a/c/o.jpg', {})
req.content_length = 1
req.body = 'a'
self.app.memcache.store = {}
res = controller.PUT(req)
self.assertTrue(res.status.startswith('201 '))
# mock shuffle to be a no-op to ensure that the only way nodes would
# not be used in ring order is if affinity is respected.
with mock.patch('swift.proxy.server.shuffle', lambda x: x):
app = proxy_server.Application(
conf, FakeMemcache(),
logger=debug_logger('proxy-ut'),
account_ring=FakeRing(),
container_ring=FakeRing())
with save_globals():
object_ring = app.get_object_ring(policy)
object_ring.max_more_nodes = 100
controller = \
ReplicatedObjectController(
app, 'a', 'c', 'o.jpg')
# requests go to acc, con, obj, obj, obj
set_http_connect(200, 200, 201, 201, 201,
give_connect=test_connect)
req = Request.blank(
'/v1/a/c/o.jpg', method='PUT', body='a',
headers={'X-Backend-Storage-Policy-Index': str(policy)})
app.memcache.store = {}
res = controller.PUT(req)
self.assertTrue(res.status.startswith('201 '))
self.assertEqual(3, len(written_to))
for ip, port, device in written_to:
# this is kind of a hokey test, but in FakeRing, the port is even
# when the region is 0, and odd when the region is 1, so this test
# asserts that we only wrote to nodes in region 0.
self.assertEqual(0, port % 2)
self.assertEqual(expected_region, port % 2)
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing()),
StoragePolicy(1, 'one', False, object_ring=FakeRing())])
def test_PUT_respects_write_affinity(self):
# nodes in fake ring order have r0z0, r1z1, r0z2
# Check default conf via proxy server conf
conf = {'write_affinity': 'r0'}
self._check_PUT_respects_write_affinity(conf, 0, 0)
# policy 0 and policy 1 have conf via per policy conf section
conf = {
'write_affinity': '',
'policy_config': {
'0': {'write_affinity': 'r0'},
'1': {'write_affinity': 'r1'}
}
}
self._check_PUT_respects_write_affinity(conf, 0, 0)
self._check_PUT_respects_write_affinity(conf, 1, 1)
# policy 0 conf via per policy conf section override proxy server conf,
# policy 1 uses default
conf = {
'write_affinity': 'r0',
'policy_config': {
'0': {'write_affinity': 'r1'}
}
}
self._check_PUT_respects_write_affinity(conf, 0, 1)
self._check_PUT_respects_write_affinity(conf, 1, 0)
def test_PUT_respects_write_affinity_with_507s(self):
written_to = []
@ -2001,10 +2633,11 @@ class TestReplicatedObjectController(
def is_r0(node):
return node['region'] == 0
object_ring = self.app.get_object_ring(None)
object_ring = self.app.get_object_ring(0)
object_ring.max_more_nodes = 100
self.app.write_affinity_is_local_fn = is_r0
self.app.write_affinity_node_count = lambda r: 3
policy_conf = self.app.get_policy_options(POLICIES[0])
policy_conf.write_affinity_is_local_fn = is_r0
policy_conf.write_affinity_node_count = lambda r: 3
controller = \
ReplicatedObjectController(
@ -2500,7 +3133,7 @@ class TestReplicatedObjectController(
# reset the router post patch_policies
self.app.obj_controller_router = proxy_server.ObjectControllerRouter()
self.app.object_post_as_copy = False
self.app.sort_nodes = lambda nodes: nodes
self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes
backend_requests = []
def capture_requests(ip, port, method, path, headers, *args,
@ -3194,10 +3827,11 @@ class TestReplicatedObjectController(
for node in self.app.iter_nodes(object_ring, 0):
pass
sort_nodes.assert_called_once_with(
object_ring.get_part_nodes(0))
object_ring.get_part_nodes(0), policy=None)
def test_iter_nodes_skips_error_limited(self):
with mock.patch.object(self.app, 'sort_nodes', lambda n: n):
with mock.patch.object(self.app, 'sort_nodes',
lambda n, *args, **kwargs: n):
object_ring = self.app.get_object_ring(None)
first_nodes = list(self.app.iter_nodes(object_ring, 0))
second_nodes = list(self.app.iter_nodes(object_ring, 0))
@ -3209,7 +3843,8 @@ class TestReplicatedObjectController(
def test_iter_nodes_gives_extra_if_error_limited_inline(self):
object_ring = self.app.get_object_ring(None)
with mock.patch.object(self.app, 'sort_nodes', lambda n: n), \
with mock.patch.object(self.app, 'sort_nodes',
lambda n, *args, **kwargs: n), \
mock.patch.object(self.app, 'request_node_count',
lambda r: 6), \
mock.patch.object(object_ring, 'max_more_nodes', 99):
@ -3226,14 +3861,14 @@ class TestReplicatedObjectController(
object_ring = self.app.get_object_ring(None)
node_list = [dict(id=n, ip='1.2.3.4', port=n, device='D')
for n in range(10)]
with mock.patch.object(self.app, 'sort_nodes', lambda n: n), \
with mock.patch.object(self.app, 'sort_nodes', lambda n, *args, **kwargs: n), \
mock.patch.object(self.app, 'request_node_count',
lambda r: 3):
got_nodes = list(self.app.iter_nodes(object_ring, 0,
node_iter=iter(node_list)))
self.assertEqual(node_list[:3], got_nodes)
with mock.patch.object(self.app, 'sort_nodes', lambda n: n), \
with mock.patch.object(self.app, 'sort_nodes', lambda n, *args, **kwargs: n), \
mock.patch.object(self.app, 'request_node_count',
lambda r: 1000000):
got_nodes = list(self.app.iter_nodes(object_ring, 0,
@ -3300,7 +3935,7 @@ class TestReplicatedObjectController(
with save_globals():
controller = ReplicatedObjectController(
self.app, 'account', 'container', 'object')
controller.app.sort_nodes = lambda l: l
controller.app.sort_nodes = lambda l, *args, **kwargs: l
object_ring = controller.app.get_object_ring(None)
self.assert_status_map(controller.HEAD, (200, 200, 503, 200, 200),
200)
@ -3339,7 +3974,7 @@ class TestReplicatedObjectController(
with save_globals():
controller = ReplicatedObjectController(
self.app, 'account', 'container', 'object')
controller.app.sort_nodes = lambda l: l
controller.app.sort_nodes = lambda l, *args, **kwargs: l
object_ring = controller.app.get_object_ring(None)
self.assert_status_map(controller.HEAD, (200, 200, 503, 200, 200),
200)
@ -3368,7 +4003,7 @@ class TestReplicatedObjectController(
with save_globals():
controller = ReplicatedObjectController(
self.app, 'account', 'container', 'object')
controller.app.sort_nodes = lambda l: l
controller.app.sort_nodes = lambda l, *args, **kwargs: l
object_ring = controller.app.get_object_ring(None)
# acc con obj obj obj
self.assert_status_map(controller.PUT, (200, 200, 503, 200, 200),
@ -3388,7 +4023,7 @@ class TestReplicatedObjectController(
with save_globals():
controller = ReplicatedObjectController(
self.app, 'account', 'container', 'object')
controller.app.sort_nodes = lambda l: l
controller.app.sort_nodes = lambda l, *args, **kwargs: l
object_ring = controller.app.get_object_ring(None)
# acc con obj obj obj
self.assert_status_map(controller.PUT, (200, 200, 200, 200, 503),
@ -4021,6 +4656,78 @@ class TestReplicatedObjectController(
controller.GET(req)
self.assertTrue(called[0])
def _check_GET_respects_read_affinity(self, conf, policy, expected_nodes):
actual_nodes = []
def test_connect(ipaddr, port, device, partition, method, path,
headers=None, query_string=None):
if path == '/a/c/o.jpg':
actual_nodes.append(ipaddr)
# mock shuffle to be a no-op to ensure that the only way nodes would
# not be used in ring order is if affinity is respected.
with mock.patch('swift.proxy.server.shuffle', lambda x: x):
app = proxy_server.Application(
conf, FakeMemcache(),
logger=debug_logger('proxy-ut'),
account_ring=FakeRing(),
container_ring=FakeRing())
with save_globals():
object_ring = app.get_object_ring(policy)
object_ring.max_more_nodes = 100
controller = \
ReplicatedObjectController(
app, 'a', 'c', 'o.jpg')
# requests go to acc, con, obj, obj, obj
set_http_connect(200, 200, 404, 404, 200,
give_connect=test_connect)
req = Request.blank(
'/v1/a/c/o.jpg',
headers={'X-Backend-Storage-Policy-Index': str(policy)})
app.memcache.store = {}
res = controller.GET(req)
self.assertTrue(res.status.startswith('200 '))
self.assertEqual(3, len(actual_nodes))
self.assertEqual(expected_nodes, actual_nodes)
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing()),
StoragePolicy(1, 'one', False, object_ring=FakeRing())])
def test_GET_respects_read_affinity(self):
# nodes in fake ring order have r0z0, r1z1, r0z2
# Check default conf via proxy server conf
conf = {'read_affinity': 'r0z2=1, r1=2',
'sorting_method': 'affinity'}
expected_nodes = ['10.0.0.2', '10.0.0.1', '10.0.0.0']
self._check_GET_respects_read_affinity(conf, 0, expected_nodes)
# policy 0 and policy 1 have conf via per policy conf section
conf = {
'read_affinity': '',
'sorting_method': 'shuffle',
'policy_config': {
'0': {'read_affinity': 'r1z1=1, r0z2=2',
'sorting_method': 'affinity'},
'1': {'read_affinity': 'r0z2=1, r0z0=2',
'sorting_method': 'affinity'}
}
}
expected_nodes = ['10.0.0.1', '10.0.0.2', '10.0.0.0']
self._check_GET_respects_read_affinity(conf, 0, expected_nodes)
expected_nodes = ['10.0.0.2', '10.0.0.0', '10.0.0.1']
self._check_GET_respects_read_affinity(conf, 1, expected_nodes)
# policy 0 conf via per policy conf section overrides proxy server conf
conf = {
'read_affinity': 'r1z1=1, r0z2=2',
'sorting_method': 'affinity',
'policy_config': {
'0': {'read_affinity': 'r0z2=1, r0=2',
'sorting_method': 'affinity'}
}
}
expected_nodes = ['10.0.0.2', '10.0.0.0', '10.0.0.1']
self._check_GET_respects_read_affinity(conf, 0, expected_nodes)
def test_HEAD_calls_authorize(self):
called = [False]
@ -7182,7 +7889,7 @@ class TestContainerController(unittest.TestCase):
controller = proxy_server.ContainerController(self.app, 'account',
'container')
container_ring = controller.app.container_ring
controller.app.sort_nodes = lambda l: l
controller.app.sort_nodes = lambda l, *args, **kwargs: l
self.assert_status_map(controller.HEAD, (200, 503, 200, 200), 200,
missing_container=False)