replication network aware helper for container-updater in proxy
Change-Id: I303a1ff97325654478c2a47af056deda37696b7b
This commit is contained in:
parent
6f890d2ba9
commit
e375ce5bf0
|
@ -448,14 +448,15 @@ class BaseObjectController(Controller):
|
||||||
headers = [self.generate_request_headers(req, additional=req.headers)
|
headers = [self.generate_request_headers(req, additional=req.headers)
|
||||||
for _junk in range(n_outgoing)]
|
for _junk in range(n_outgoing)]
|
||||||
|
|
||||||
def set_container_update(index, container):
|
def set_container_update(index, container_node):
|
||||||
|
ip, port = get_ip_port(container_node, headers[index])
|
||||||
headers[index]['X-Container-Partition'] = container_partition
|
headers[index]['X-Container-Partition'] = container_partition
|
||||||
headers[index]['X-Container-Host'] = csv_append(
|
headers[index]['X-Container-Host'] = csv_append(
|
||||||
headers[index].get('X-Container-Host'),
|
headers[index].get('X-Container-Host'),
|
||||||
'%(ip)s:%(port)s' % container)
|
'%(ip)s:%(port)s' % {'ip': ip, 'port': port})
|
||||||
headers[index]['X-Container-Device'] = csv_append(
|
headers[index]['X-Container-Device'] = csv_append(
|
||||||
headers[index].get('X-Container-Device'),
|
headers[index].get('X-Container-Device'),
|
||||||
container['device'])
|
container_node['device'])
|
||||||
if container_path:
|
if container_path:
|
||||||
headers[index]['X-Backend-Quoted-Container-Path'] = quote(
|
headers[index]['X-Backend-Quoted-Container-Path'] = quote(
|
||||||
container_path)
|
container_path)
|
||||||
|
@ -468,11 +469,12 @@ class BaseObjectController(Controller):
|
||||||
# will eat the update and move it as a misplaced object.
|
# will eat the update and move it as a misplaced object.
|
||||||
|
|
||||||
def set_delete_at_headers(index, delete_at_node):
|
def set_delete_at_headers(index, delete_at_node):
|
||||||
|
ip, port = get_ip_port(delete_at_node, headers[index])
|
||||||
headers[index]['X-Delete-At-Container'] = delete_at_container
|
headers[index]['X-Delete-At-Container'] = delete_at_container
|
||||||
headers[index]['X-Delete-At-Partition'] = delete_at_partition
|
headers[index]['X-Delete-At-Partition'] = delete_at_partition
|
||||||
headers[index]['X-Delete-At-Host'] = csv_append(
|
headers[index]['X-Delete-At-Host'] = csv_append(
|
||||||
headers[index].get('X-Delete-At-Host'),
|
headers[index].get('X-Delete-At-Host'),
|
||||||
'%(ip)s:%(port)s' % delete_at_node)
|
'%(ip)s:%(port)s' % {'ip': ip, 'port': port})
|
||||||
headers[index]['X-Delete-At-Device'] = csv_append(
|
headers[index]['X-Delete-At-Device'] = csv_append(
|
||||||
headers[index].get('X-Delete-At-Device'),
|
headers[index].get('X-Delete-At-Device'),
|
||||||
delete_at_node['device'])
|
delete_at_node['device'])
|
||||||
|
|
|
@ -434,6 +434,67 @@ class CommonObjectControllerMixin(BaseObjectControllerMixin):
|
||||||
resp = req.get_response(self.app)
|
resp = req.get_response(self.app)
|
||||||
self.assertEqual(resp.status_int, 204)
|
self.assertEqual(resp.status_int, 204)
|
||||||
|
|
||||||
|
def test_object_DELETE_backend_update_container_ip_default(self):
|
||||||
|
self.policy.object_ring = FakeRing(separate_replication=True)
|
||||||
|
self.app.container_ring = FakeRing(separate_replication=True)
|
||||||
|
# sanity, devs have different ip & replication_ip
|
||||||
|
for ring in (self.policy.object_ring, self.app.container_ring):
|
||||||
|
for dev in ring.devs:
|
||||||
|
self.assertNotEqual(dev['ip'], dev['replication_ip'])
|
||||||
|
req = swift.common.swob.Request.blank('/v1/a/c/o', method='DELETE')
|
||||||
|
codes = [204] * self.replicas()
|
||||||
|
with mocked_http_conn(*codes) as log:
|
||||||
|
resp = req.get_response(self.app)
|
||||||
|
self.assertEqual(resp.status_int, 204)
|
||||||
|
|
||||||
|
# sanity, object hosts use node ip/port
|
||||||
|
object_hosts = {'%(ip)s:%(port)s' % req for req in log.requests}
|
||||||
|
object_ring = self.app.get_object_ring(int(self.policy))
|
||||||
|
part, object_nodes = object_ring.get_nodes('a', 'c', 'o')
|
||||||
|
expected_object_hosts = {'%(ip)s:%(port)s' % n for n in object_nodes}
|
||||||
|
self.assertEqual(object_hosts, expected_object_hosts)
|
||||||
|
|
||||||
|
# container hosts use node ip/port
|
||||||
|
container_hosts = {req['headers']['x-container-host']
|
||||||
|
for req in log.requests}
|
||||||
|
part, container_nodes = self.app.container_ring.get_nodes('a', 'c')
|
||||||
|
expected_container_hosts = {'%(ip)s:%(port)s' % n
|
||||||
|
for n in container_nodes}
|
||||||
|
self.assertEqual(container_hosts, expected_container_hosts)
|
||||||
|
|
||||||
|
def test_repl_object_DELETE_backend_update_container_repl_ip(self):
|
||||||
|
self.policy.object_ring = FakeRing(separate_replication=True)
|
||||||
|
self.app.container_ring = FakeRing(separate_replication=True)
|
||||||
|
# sanity, devs have different ip & replication_ip
|
||||||
|
for ring in (self.policy.object_ring, self.app.container_ring):
|
||||||
|
for dev in ring.devs:
|
||||||
|
self.assertNotEqual(dev['ip'], dev['replication_ip'])
|
||||||
|
req = swift.common.swob.Request.blank(
|
||||||
|
'/v1/a/c/o', method='DELETE', headers={
|
||||||
|
'x-backend-use-replication-network': 'true'})
|
||||||
|
codes = [204] * self.replicas()
|
||||||
|
with mocked_http_conn(*codes) as log:
|
||||||
|
resp = req.get_response(self.app)
|
||||||
|
self.assertEqual(resp.status_int, 204)
|
||||||
|
|
||||||
|
# sanity, object hosts use node replication ip/port
|
||||||
|
object_hosts = {'%(ip)s:%(port)s' % req for req in log.requests}
|
||||||
|
object_ring = self.app.get_object_ring(int(self.policy))
|
||||||
|
part, object_nodes = object_ring.get_nodes('a', 'c', 'o')
|
||||||
|
expected_object_hosts = {
|
||||||
|
'%(replication_ip)s:%(replication_port)s' % n
|
||||||
|
for n in object_nodes}
|
||||||
|
self.assertEqual(object_hosts, expected_object_hosts)
|
||||||
|
|
||||||
|
# container hosts use node replication ip/port
|
||||||
|
container_hosts = {req['headers']['x-container-host']
|
||||||
|
for req in log.requests}
|
||||||
|
part, container_nodes = self.app.container_ring.get_nodes('a', 'c')
|
||||||
|
expected_container_hosts = {
|
||||||
|
'%(replication_ip)s:%(replication_port)s' % n
|
||||||
|
for n in container_nodes}
|
||||||
|
self.assertEqual(container_hosts, expected_container_hosts)
|
||||||
|
|
||||||
def test_DELETE_missing_one(self):
|
def test_DELETE_missing_one(self):
|
||||||
# Obviously this test doesn't work if we're testing 1 replica.
|
# Obviously this test doesn't work if we're testing 1 replica.
|
||||||
# In that case, we don't have any failovers to check.
|
# In that case, we don't have any failovers to check.
|
||||||
|
|
|
@ -4176,20 +4176,32 @@ class TestReplicatedObjectController(
|
||||||
StoragePolicy(1, 'one', object_ring=FakeRing()),
|
StoragePolicy(1, 'one', object_ring=FakeRing()),
|
||||||
])
|
])
|
||||||
def test_POST_backend_headers(self):
|
def test_POST_backend_headers(self):
|
||||||
|
self.app = proxy_server.Application(
|
||||||
|
{},
|
||||||
|
account_ring=FakeRing(separate_replication=True),
|
||||||
|
container_ring=FakeRing(separate_replication=True))
|
||||||
|
|
||||||
|
part = self.app.container_ring.get_part('a', 'c')
|
||||||
|
nodes = self.app.container_ring.get_part_nodes(part)
|
||||||
|
self.assertNotEqual(nodes[0]['ip'], nodes[0]['replication_ip'])
|
||||||
|
|
||||||
# reset the router post patch_policies
|
# reset the router post patch_policies
|
||||||
self.app.obj_controller_router = proxy_server.ObjectControllerRouter()
|
self.app.obj_controller_router = proxy_server.ObjectControllerRouter()
|
||||||
self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes
|
self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes
|
||||||
|
|
||||||
def do_test(resp_headers):
|
def do_test(resp_headers, use_replication=False):
|
||||||
backend_requests = []
|
backend_requests = []
|
||||||
|
|
||||||
def capture_requests(ip, port, method, path, headers, *args,
|
def capture_requests(ip, port, method, path, headers, *args,
|
||||||
**kwargs):
|
**kwargs):
|
||||||
backend_requests.append((method, path, headers))
|
backend_requests.append((method, path, headers))
|
||||||
|
|
||||||
|
replication_aware = 'true' if use_replication else 'false'
|
||||||
req = Request.blank('/v1/a/c/o', {}, method='POST',
|
req = Request.blank('/v1/a/c/o', {}, method='POST',
|
||||||
headers={'X-Object-Meta-Color': 'Blue',
|
headers={'X-Object-Meta-Color': 'Blue',
|
||||||
'Content-Type': 'text/plain'})
|
'Content-Type': 'text/plain',
|
||||||
|
'x-backend-use-replication-network':
|
||||||
|
replication_aware})
|
||||||
|
|
||||||
# we want the container_info response to says a policy index of 1
|
# we want the container_info response to says a policy index of 1
|
||||||
with mocked_http_conn(
|
with mocked_http_conn(
|
||||||
|
@ -4244,7 +4256,10 @@ class TestReplicatedObjectController(
|
||||||
|
|
||||||
expected = {}
|
expected = {}
|
||||||
for i, device in enumerate(['sda', 'sdb', 'sdc']):
|
for i, device in enumerate(['sda', 'sdb', 'sdc']):
|
||||||
expected[device] = '10.0.0.%d:100%d' % (i, i)
|
if use_replication:
|
||||||
|
expected[device] = '10.0.1.%d:110%d' % (i, i)
|
||||||
|
else:
|
||||||
|
expected[device] = '10.0.0.%d:100%d' % (i, i)
|
||||||
self.assertEqual(container_headers, expected)
|
self.assertEqual(container_headers, expected)
|
||||||
|
|
||||||
# and again with policy override
|
# and again with policy override
|
||||||
|
@ -4277,6 +4292,7 @@ class TestReplicatedObjectController(
|
||||||
do_test(resp_headers)
|
do_test(resp_headers)
|
||||||
resp_headers['X-Backend-Sharding-State'] = 'unsharded'
|
resp_headers['X-Backend-Sharding-State'] = 'unsharded'
|
||||||
do_test(resp_headers)
|
do_test(resp_headers)
|
||||||
|
do_test(resp_headers, use_replication=True)
|
||||||
|
|
||||||
def _check_request(self, req, method, path, headers=None, params=None):
|
def _check_request(self, req, method, path, headers=None, params=None):
|
||||||
self.assertEqual(method, req['method'])
|
self.assertEqual(method, req['method'])
|
||||||
|
|
Loading…
Reference in New Issue