Refactor of ring management code

Make the ring sync code clearer and improve logic around leader
switching during or after a sync. Also add more debug logs to
make it easier to debug when things go wrong.

Closes-Bug: 1448884
Change-Id: I10d51c74001710b6b7a2502e14370996b15ffb40
This commit is contained in:
Edward Hope-Morley 2016-03-02 11:34:58 +00:00
parent 74b951b517
commit ccaeae4706
5 changed files with 412 additions and 133 deletions

View File

@ -0,0 +1,29 @@
== Swift Charm Ring Management ==
Swift uses rings and builders to manage data distribution across storage devices
in the cluster. More information on this can be found in the upstream
documentation [0].
In order to function correctly, the rings and builders need to be the same
across all swift proxy units and the rings need to be distributed to storage
units. The swift proxy charm achieves this by electing a leader unit and having
that unit manage ring and builder files by updating them when new nodes or
devices are added to the cluster and distributing those files to other nodes in
the cluster.
Since over time the leader may change, rings syncs use acknowledgements from
peer units to determine whether a synchronisation has completed. This was if
the leader switches to another unit, we are able to know that that unit has
up-to-date ring and builder files.
When new devices are added to storage units, the leader proxy unit is notified
and adds them to the ring. Once complete, the leader unit will broadcast a
notification that rings and builders are ready to be synced across the cluster
(only proxy units get builders) and each unit in the cluster should then begin
syncing from the leader and acknowledge receipt.
During synchronisation, swift-proxy services are stopped in order to avoid
having requests being handled while rings are being updated.
[0] http://docs.openstack.org/developer/swift/overview_ring.html

View File

@ -33,6 +33,10 @@ from lib.swift_utils import (
get_first_available_value, get_first_available_value,
all_responses_equal, all_responses_equal,
ensure_www_dir_permissions, ensure_www_dir_permissions,
sync_builders_and_rings_if_changed,
cluster_sync_rings,
is_most_recent_timestamp,
timestamps_available,
is_paused, is_paused,
pause_aware_restart_on_change, pause_aware_restart_on_change,
assess_status, assess_status,
@ -55,7 +59,6 @@ from charmhelpers.core.hookenv import (
DEBUG, DEBUG,
INFO, INFO,
WARNING, WARNING,
ERROR,
Hooks, UnregisteredHookError, Hooks, UnregisteredHookError,
open_port, open_port,
status_set, status_set,
@ -146,7 +149,7 @@ def config_changed():
do_openstack_upgrade(CONFIGS) do_openstack_upgrade(CONFIGS)
status_set('maintenance', 'Running openstack upgrade') status_set('maintenance', 'Running openstack upgrade')
status_set('maintenance', 'Updating and balancing rings') status_set('maintenance', 'Updating and (maybe) balancing rings')
update_rings(min_part_hours=config('min-hours')) update_rings(min_part_hours=config('min-hours'))
if not config('disable-ring-balance') and is_elected_leader(SWIFT_HA_RES): if not config('disable-ring-balance') and is_elected_leader(SWIFT_HA_RES):
@ -242,11 +245,11 @@ def storage_changed():
leader we ignore this event and wait for a resync request from the leader. leader we ignore this event and wait for a resync request from the leader.
""" """
if not is_elected_leader(SWIFT_HA_RES): if not is_elected_leader(SWIFT_HA_RES):
log("Not the leader - ignoring storage relation until leader ready.", log("Not the leader - deferring storage relation change to leader "
level=DEBUG) "unit.", level=DEBUG)
return return
log("Leader established, updating ring builders", level=INFO) log("Storage relation changed -processing", level=DEBUG)
host_ip = get_host_ip() host_ip = get_host_ip()
if not host_ip: if not host_ip:
log("No host ip found in storage relation - deferring storage " log("No host ip found in storage relation - deferring storage "
@ -325,7 +328,7 @@ def cluster_joined(relation_id=None):
private_addr = unit_get('private-address') private_addr = unit_get('private-address')
def all_peers_stopped(responses): def is_all_peers_stopped(responses):
"""Establish whether all peers have stopped their proxy services. """Establish whether all peers have stopped their proxy services.
Each peer unit will set stop-proxy-service-ack to rq value to indicate that Each peer unit will set stop-proxy-service-ack to rq value to indicate that
@ -339,11 +342,15 @@ def all_peers_stopped(responses):
ack_key = SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC_ACK ack_key = SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC_ACK
token = relation_get(attribute=rq_key, unit=local_unit()) token = relation_get(attribute=rq_key, unit=local_unit())
if not token or token != responses[0].get(ack_key): if not token or token != responses[0].get(ack_key):
log("Unmatched token in ack (expected=%s, got=%s)" % log("Token mismatch, rq and ack tokens differ (expected ack=%s, "
"got=%s)" %
(token, responses[0].get(ack_key)), level=DEBUG) (token, responses[0].get(ack_key)), level=DEBUG)
return False return False
if not all_responses_equal(responses, ack_key): if not all_responses_equal(responses, ack_key):
log("Not all ack responses are equal. Either we are still waiting "
"for responses or we were not the request originator.",
level=DEBUG)
return False return False
return True return True
@ -357,20 +364,44 @@ def cluster_leader_actions():
log("Cluster changed by unit=%s (local is leader)" % (remote_unit()), log("Cluster changed by unit=%s (local is leader)" % (remote_unit()),
level=DEBUG) level=DEBUG)
# If we have received an ack, check other units rx_settings = relation_get() or {}
settings = relation_get() or {} tx_settings = relation_get(unit=local_unit()) or {}
ack_key = SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC_ACK
# Protect against leader changing mid-sync rx_rq_token = rx_settings.get(SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC)
if settings.get(SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC): rx_ack_token = rx_settings.get(SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC_ACK)
log("Sync request received yet this is leader unit. This would "
"indicate that the leader has changed mid-sync - stopping proxy " tx_rq_token = tx_settings.get(SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC)
"and notifying peers", level=ERROR) tx_ack_token = tx_settings.get(SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC_ACK)
service_stop('swift-proxy')
SwiftProxyClusterRPC().notify_leader_changed() rx_leader_changed = \
rx_settings.get(SwiftProxyClusterRPC.KEY_NOTIFY_LEADER_CHANGED)
if rx_leader_changed:
log("Leader change notification received and this is leader so "
"retrying sync.", level=INFO)
# FIXME: check that we were previously part of a successful sync to
# ensure we have good rings.
cluster_sync_rings(peers_only=tx_settings.get('peers-only', False),
token=rx_leader_changed)
return return
elif ack_key in settings:
token = settings[ack_key] rx_resync_request = \
rx_settings.get(SwiftProxyClusterRPC.KEY_REQUEST_RESYNC)
resync_request_ack_key = SwiftProxyClusterRPC.KEY_REQUEST_RESYNC_ACK
tx_resync_request_ack = tx_settings.get(resync_request_ack_key)
if rx_resync_request and tx_resync_request_ack != rx_resync_request:
log("Unit '%s' has requested a resync" % (remote_unit()),
level=INFO)
cluster_sync_rings(peers_only=True)
relation_set(**{resync_request_ack_key: rx_resync_request})
return
# If we have received an ack token ensure it is not associated with a
# request we received from another peer. If it is, this would indicate
# a leadership change during a sync and this unit will abort the sync or
# attempt to restore the original leader so to be able to complete the
# sync.
if rx_ack_token and rx_ack_token == tx_rq_token:
# Find out if all peer units have been stopped. # Find out if all peer units have been stopped.
responses = [] responses = []
for rid in relation_ids('cluster'): for rid in relation_ids('cluster'):
@ -378,21 +409,43 @@ def cluster_leader_actions():
responses.append(relation_get(rid=rid, unit=unit)) responses.append(relation_get(rid=rid, unit=unit))
# Ensure all peers stopped before starting sync # Ensure all peers stopped before starting sync
if all_peers_stopped(responses): if is_all_peers_stopped(responses):
key = 'peers-only' key = 'peers-only'
if not all_responses_equal(responses, key, must_exist=False): if not all_responses_equal(responses, key, must_exist=False):
msg = ("Did not get equal response from every peer unit for " msg = ("Did not get equal response from every peer unit for "
"'%s'" % (key)) "'%s'" % (key))
raise SwiftProxyCharmException(msg) raise SwiftProxyCharmException(msg)
peers_only = int(get_first_available_value(responses, key, peers_only = bool(get_first_available_value(responses, key,
default=0)) default=0))
log("Syncing rings and builders (peers-only=%s)" % (peers_only), log("Syncing rings and builders (peers-only=%s)" % (peers_only),
level=DEBUG) level=DEBUG)
broadcast_rings_available(token, storage=not peers_only) broadcast_rings_available(broker_token=rx_ack_token,
storage=not peers_only)
else: else:
key = SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC_ACK
acks = ', '.join([rsp[key] for rsp in responses if key in rsp])
log("Not all peer apis stopped - skipping sync until all peers " log("Not all peer apis stopped - skipping sync until all peers "
"ready (got %s)" % (responses), level=INFO) "ready (current='%s', token='%s')" % (acks, tx_ack_token),
level=INFO)
elif ((rx_ack_token and (rx_ack_token == tx_ack_token)) or
(rx_rq_token and (rx_rq_token == rx_ack_token))):
log("It appears that the cluster leader has changed mid-sync - "
"stopping proxy service", level=WARNING)
service_stop('swift-proxy')
broker = rx_settings.get('builder-broker')
if broker:
# If we get here, manual intervention will be required in order
# to restore the cluster.
msg = ("Failed to restore previous broker '%s' as leader" %
(broker))
raise SwiftProxyCharmException(msg)
else:
msg = ("No builder-broker on rx_settings relation from '%s' - "
"unable to attempt leader restore" % (remote_unit()))
raise SwiftProxyCharmException(msg)
else:
log("Not taking any sync actions", level=DEBUG)
CONFIGS.write_all() CONFIGS.write_all()
@ -404,31 +457,74 @@ def cluster_non_leader_actions():
""" """
log("Cluster changed by unit=%s (local is non-leader)" % (remote_unit()), log("Cluster changed by unit=%s (local is non-leader)" % (remote_unit()),
level=DEBUG) level=DEBUG)
settings = relation_get() or {} rx_settings = relation_get() or {}
tx_settings = relation_get(unit=local_unit()) or {}
token = rx_settings.get(SwiftProxyClusterRPC.KEY_NOTIFY_LEADER_CHANGED)
if token:
log("Leader-changed notification received from peer unit. Since "
"this most likely occurred during a ring sync proxies will "
"be disabled until the leader is restored and a fresh sync "
"request is set out", level=WARNING)
service_stop("swift-proxy")
return
rx_rq_token = rx_settings.get(SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC)
# Check whether we have been requested to stop proxy service # Check whether we have been requested to stop proxy service
rq_key = SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC if rx_rq_token:
token = settings.get(rq_key, None)
if token:
log("Peer request to stop proxy service received (%s) - sending ack" % log("Peer request to stop proxy service received (%s) - sending ack" %
(token), level=INFO) (rx_rq_token), level=INFO)
service_stop('swift-proxy') service_stop('swift-proxy')
peers_only = settings.get('peers-only', None) peers_only = rx_settings.get('peers-only', None)
rq = SwiftProxyClusterRPC().stop_proxy_ack(echo_token=token, rq = SwiftProxyClusterRPC().stop_proxy_ack(echo_token=rx_rq_token,
echo_peers_only=peers_only) echo_peers_only=peers_only)
relation_set(relation_settings=rq) relation_set(relation_settings=rq)
return return
# Check if there are any builder files we can sync from the leader. # Check if there are any builder files we can sync from the leader.
log("Non-leader peer - checking if updated rings available", level=DEBUG) broker = rx_settings.get('builder-broker', None)
broker = settings.get('builder-broker', None) broker_token = rx_settings.get('broker-token', None)
broker_timestamp = rx_settings.get('broker-timestamp', None)
tx_ack_token = tx_settings.get(SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC_ACK)
if not broker: if not broker:
log("No update available", level=DEBUG) log("No ring/builder update available", level=DEBUG)
if not is_paused(): if not is_paused():
service_start('swift-proxy') service_start('swift-proxy')
return
elif broker_token:
if tx_ack_token:
if broker_token == tx_ack_token:
log("Broker and ACK tokens match (%s)" % (broker_token),
level=DEBUG)
else:
log("Received ring/builder update notification but tokens do "
"not match (broker-token=%s/ack-token=%s)" %
(broker_token, tx_ack_token), level=WARNING)
return
else:
log("Broker token available without handshake, assuming we just "
"joined and rings won't change", level=DEBUG)
else:
log("Not taking any sync actions", level=DEBUG)
return return
builders_only = int(settings.get('sync-only-builders', 0)) # If we upgrade from cluster that did not use timestamps, the new peer will
# need to request a re-sync from the leader
if not is_most_recent_timestamp(broker_timestamp):
if not timestamps_available(excluded_unit=remote_unit()):
log("Requesting resync")
rq = SwiftProxyClusterRPC().request_resync(broker_token)
relation_set(relation_settings=rq)
else:
log("Did not receive most recent broker timestamp but timestamps "
"are available - waiting for next timestamp", level=INFO)
return
log("Ring/builder update available", level=DEBUG)
builders_only = int(rx_settings.get('sync-only-builders', 0))
path = os.path.basename(get_www_dir()) path = os.path.basename(get_www_dir())
try: try:
sync_proxy_rings('http://%s/%s' % (broker, path), sync_proxy_rings('http://%s/%s' % (broker, path),
@ -436,7 +532,7 @@ def cluster_non_leader_actions():
except CalledProcessError: except CalledProcessError:
log("Ring builder sync failed, builders not yet available - " log("Ring builder sync failed, builders not yet available - "
"leader not ready?", level=WARNING) "leader not ready?", level=WARNING)
return None return
# Re-enable the proxy once all builders and rings are synced # Re-enable the proxy once all builders and rings are synced
if fully_synced(): if fully_synced():
@ -452,16 +548,6 @@ def cluster_non_leader_actions():
@hooks.hook('cluster-relation-changed') @hooks.hook('cluster-relation-changed')
@pause_aware_restart_on_change(restart_map()) @pause_aware_restart_on_change(restart_map())
def cluster_changed(): def cluster_changed():
key = SwiftProxyClusterRPC.KEY_NOTIFY_LEADER_CHANGED
leader_changed = relation_get(attribute=key)
if leader_changed:
log("Leader changed notification received from peer unit. Since this "
"most likely occurred during a ring sync proxies will be "
"disabled until the leader is restored and a fresh sync request "
"is set out", level=WARNING)
service_stop("swift-proxy")
return
if is_elected_leader(SWIFT_HA_RES): if is_elected_leader(SWIFT_HA_RES):
cluster_leader_actions() cluster_leader_actions()
else: else:
@ -469,6 +555,7 @@ def cluster_changed():
@hooks.hook('ha-relation-changed') @hooks.hook('ha-relation-changed')
@sync_builders_and_rings_if_changed
def ha_relation_changed(): def ha_relation_changed():
clustered = relation_get('clustered') clustered = relation_get('clustered')
if clustered: if clustered:

View File

@ -7,6 +7,7 @@ import shutil
import subprocess import subprocess
import tempfile import tempfile
import threading import threading
import time
import uuid import uuid
from collections import OrderedDict from collections import OrderedDict
@ -39,6 +40,7 @@ from charmhelpers.core.hookenv import (
INFO, INFO,
WARNING, WARNING,
config, config,
local_unit,
relation_get, relation_get,
unit_get, unit_get,
relation_set, relation_set,
@ -180,40 +182,69 @@ class SwiftProxyClusterRPC(object):
KEY_STOP_PROXY_SVC = 'stop-proxy-service' KEY_STOP_PROXY_SVC = 'stop-proxy-service'
KEY_STOP_PROXY_SVC_ACK = 'stop-proxy-service-ack' KEY_STOP_PROXY_SVC_ACK = 'stop-proxy-service-ack'
KEY_NOTIFY_LEADER_CHANGED = 'leader-changed-notification' KEY_NOTIFY_LEADER_CHANGED = 'leader-changed-notification'
KEY_REQUEST_RESYNC = 'resync-request'
KEY_REQUEST_RESYNC_ACK = 'resync-request-ack'
def __init__(self, version=1): def __init__(self, version=1):
self._version = version self._version = version
@property
def _hostname(self):
hostname = get_hostaddr()
return format_ipv6_addr(hostname) or hostname
def template(self): def template(self):
# Everything must be None by default so it gets dropped from the # Everything must be None by default so it gets dropped from the
# relation unless we want it to be set. # relation unless we want it to be set.
templates = {1: {'trigger': None, templates = {1: {'trigger': None,
'broker-token': None, 'broker-token': None,
'builder-broker': None, 'builder-broker': None,
'broker-timestamp': None,
self.KEY_STOP_PROXY_SVC: None, self.KEY_STOP_PROXY_SVC: None,
self.KEY_STOP_PROXY_SVC_ACK: None, self.KEY_STOP_PROXY_SVC_ACK: None,
self.KEY_NOTIFY_LEADER_CHANGED: None, self.KEY_NOTIFY_LEADER_CHANGED: None,
self.KEY_REQUEST_RESYNC: None,
'peers-only': None, 'peers-only': None,
'sync-only-builders': None}} 'sync-only-builders': None}}
return copy.deepcopy(templates[self._version]) return copy.deepcopy(templates[self._version])
def stop_proxy_request(self, peers_only=False): def stop_proxy_request(self, peers_only=False, token=None):
"""Request to stop peer proxy service. """Request to stop peer proxy service.
NOTE: leader action A token can optionally be supplied in case we want to restart a
previously triggered sync e.g. following a leader change notification.
NOTE: this action must only be performed by the cluster leader.
:param peers_only: If True, indicates that we only want peer
(i.e. proxy not storage) units to be notified.
:param token: optional request token expected to be echoed in ACK from
peer. If token not provided, a new one is generated.
""" """
if not is_elected_leader(SWIFT_HA_RES):
errmsg = "Leader function called by non-leader"
raise SwiftProxyCharmException(errmsg)
rq = self.template() rq = self.template()
rq['trigger'] = str(uuid.uuid4()) if not token:
token = str(uuid.uuid4())
rq['trigger'] = token
rq[self.KEY_STOP_PROXY_SVC] = rq['trigger'] rq[self.KEY_STOP_PROXY_SVC] = rq['trigger']
if peers_only: if peers_only:
rq['peers-only'] = 1 rq['peers-only'] = 1
rq['builder-broker'] = self._hostname
return rq return rq
def stop_proxy_ack(self, echo_token, echo_peers_only): def stop_proxy_ack(self, echo_token, echo_peers_only):
"""Ack that peer proxy service is stopped. """Ack that peer proxy service is stopped.
NOTE: non-leader action NOTE: this action must NOT be performed by the cluster leader.
:param echo_peers_only: peers_only value from leader that we echo back.
:param echo_token: request token received from leader that we must now
echo as an ACK.
""" """
rq = self.template() rq = self.template()
rq['trigger'] = str(uuid.uuid4()) rq['trigger'] = str(uuid.uuid4())
@ -222,12 +253,19 @@ class SwiftProxyClusterRPC(object):
rq['peers-only'] = echo_peers_only rq['peers-only'] = echo_peers_only
return rq return rq
def sync_rings_request(self, broker_host, broker_token, def sync_rings_request(self, broker_token, builders_only=False):
builders_only=False): """Request for peers to sync rings from leader.
"""Request for peer to sync rings.
NOTE: leader action NOTE: this action must only be performed by the cluster leader.
:param broker_token: token to identify sync request.
:param builders_only: if False, tell peers to sync builders only (not
rings).
""" """
if not is_elected_leader(SWIFT_HA_RES):
errmsg = "Leader function called by non-leader"
raise SwiftProxyCharmException(errmsg)
rq = self.template() rq = self.template()
rq['trigger'] = str(uuid.uuid4()) rq['trigger'] = str(uuid.uuid4())
@ -235,17 +273,38 @@ class SwiftProxyClusterRPC(object):
rq['sync-only-builders'] = 1 rq['sync-only-builders'] = 1
rq['broker-token'] = broker_token rq['broker-token'] = broker_token
rq['builder-broker'] = broker_host rq['broker-timestamp'] = "%f" % time.time()
rq['builder-broker'] = self._hostname
return rq return rq
def notify_leader_changed(self): def notify_leader_changed(self, token):
"""Notify peers that leader has changed. """Notify peers that leader has changed.
NOTE: leader action The token passed in must be that associated with the sync we claim to
have been interrupted. It will be re-used by the restored leader once
it receives this notification.
NOTE: this action must only be performed by the cluster leader that
has relinquished it's leader status as part of the current hook
context.
"""
if not is_elected_leader(SWIFT_HA_RES):
errmsg = "Leader function called by non-leader"
raise SwiftProxyCharmException(errmsg)
rq = self.template()
rq['trigger'] = str(uuid.uuid4())
rq[self.KEY_NOTIFY_LEADER_CHANGED] = token
return rq
def request_resync(self, token):
"""Request re-sync from leader.
NOTE: this action must not be performed by the cluster leader.
""" """
rq = self.template() rq = self.template()
rq['trigger'] = str(uuid.uuid4()) rq['trigger'] = str(uuid.uuid4())
rq[self.KEY_NOTIFY_LEADER_CHANGED] = rq['trigger'] rq[self.KEY_REQUEST_RESYNC] = token
return rq return rq
@ -704,29 +763,27 @@ def get_builders_checksum():
return sha.hexdigest() return sha.hexdigest()
def get_broker_token(): def non_null_unique(data):
"""Get ack token from peers to be used as broker token. """Return True if data is a list containing all non-null values that are
all equal.
Must be equal across all peers.
Returns token or None if not found.
""" """
responses = [] return (all(data) and len(set(data)) > 1)
ack_key = SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC_ACK
def previously_synced():
"""If a full sync is not known to have been performed from this unit, False
is returned."""
broker = None
for rid in relation_ids('cluster'): for rid in relation_ids('cluster'):
for unit in related_units(rid): broker = relation_get(attribute='builder-broker', rid=rid,
responses.append(relation_get(attribute=ack_key, rid=rid, unit=local_unit())
unit=unit)) only_builders_synced = relation_get(attribute='sync-only-builders',
rid=rid, unit=local_unit())
# If no acks exist we have probably never done a sync so make up a token if broker and not only_builders_synced:
if len(responses) == 0: return True
return str(uuid.uuid4())
if not all(responses) or len(set(responses)) != 1: return False
log("Not all ack tokens equal - %s" % (responses), level=DEBUG)
return None
return responses[0]
def sync_builders_and_rings_if_changed(f): def sync_builders_and_rings_if_changed(f):
@ -757,7 +814,8 @@ def sync_builders_and_rings_if_changed(f):
rings_path = os.path.join(SWIFT_CONF_DIR, '*.%s' % rings_path = os.path.join(SWIFT_CONF_DIR, '*.%s' %
(SWIFT_RING_EXT)) (SWIFT_RING_EXT))
rings_ready = len(glob.glob(rings_path)) == len(SWIFT_RINGS) rings_ready = len(glob.glob(rings_path)) == len(SWIFT_RINGS)
rings_changed = rings_after != rings_before rings_changed = ((rings_after != rings_before) or
not previously_synced())
builders_changed = builders_after != builders_before builders_changed = builders_after != builders_before
if rings_changed or builders_changed: if rings_changed or builders_changed:
# Copy builders and rings (if available) to the server dir. # Copy builders and rings (if available) to the server dir.
@ -766,9 +824,9 @@ def sync_builders_and_rings_if_changed(f):
# Trigger sync # Trigger sync
cluster_sync_rings(peers_only=not rings_changed) cluster_sync_rings(peers_only=not rings_changed)
else: else:
cluster_sync_rings(peers_only=True, builders_only=True)
log("Rings not ready for sync - syncing builders", log("Rings not ready for sync - syncing builders",
level=DEBUG) level=DEBUG)
cluster_sync_rings(peers_only=True, builders_only=True)
else: else:
log("Rings/builders unchanged - skipping sync", level=DEBUG) log("Rings/builders unchanged - skipping sync", level=DEBUG)
@ -835,6 +893,7 @@ def balance_rings():
return return
rebalanced = False rebalanced = False
log("Rebalancing rings", level=INFO)
for path in SWIFT_RINGS.itervalues(): for path in SWIFT_RINGS.itervalues():
if balance_ring(path): if balance_ring(path):
log('Balanced ring %s' % path, level=DEBUG) log('Balanced ring %s' % path, level=DEBUG)
@ -869,20 +928,31 @@ def notify_peers_builders_available(broker_token, builders_only=False):
"skipping", level=WARNING) "skipping", level=WARNING)
return return
hostname = get_hostaddr() if not broker_token:
hostname = format_ipv6_addr(hostname) or hostname log("No broker token - aborting sync", level=WARNING)
return
cluster_rids = relation_ids('cluster')
if not cluster_rids:
log("Cluster relation not yet available - skipping sync", level=DEBUG)
return
if builders_only:
type = "builders"
else:
type = "builders & rings"
# Notify peers that builders are available # Notify peers that builders are available
log("Notifying peer(s) that rings are ready for sync.", level=INFO) log("Notifying peer(s) that %s are ready for sync." % type, level=INFO)
rq = SwiftProxyClusterRPC().sync_rings_request(hostname, rq = SwiftProxyClusterRPC().sync_rings_request(broker_token,
broker_token,
builders_only=builders_only) builders_only=builders_only)
for rid in relation_ids('cluster'): for rid in cluster_rids:
log("Notifying rid=%s (%s)" % (rid, rq), level=DEBUG) log("Notifying rid=%s (%s)" % (rid, rq), level=DEBUG)
relation_set(relation_id=rid, relation_settings=rq) relation_set(relation_id=rid, relation_settings=rq)
def broadcast_rings_available(broker_token, peers=True, storage=True, def broadcast_rings_available(storage=True, builders_only=False,
builders_only=False): broker_token=None):
"""Notify storage relations and cluster (peer) relations that rings and """Notify storage relations and cluster (peer) relations that rings and
builders are availble for sync. builders are availble for sync.
@ -895,14 +965,13 @@ def broadcast_rings_available(broker_token, peers=True, storage=True,
else: else:
log("Skipping notify storage relations", level=DEBUG) log("Skipping notify storage relations", level=DEBUG)
if peers: # Always set peer info even if not clustered so that info is present when
notify_peers_builders_available(broker_token, # units join
builders_only=builders_only) notify_peers_builders_available(broker_token,
else: builders_only=builders_only)
log("Skipping notify peer relations", level=DEBUG)
def cluster_sync_rings(peers_only=False, builders_only=False): def cluster_sync_rings(peers_only=False, builders_only=False, token=None):
"""Notify peer relations that they should stop their proxy services. """Notify peer relations that they should stop their proxy services.
Peer units will then be expected to do a relation_set with Peer units will then be expected to do a relation_set with
@ -923,24 +992,21 @@ def cluster_sync_rings(peers_only=False, builders_only=False):
# If we have no peer units just go ahead and broadcast to storage # If we have no peer units just go ahead and broadcast to storage
# relations. If we have been instructed to only broadcast to peers this # relations. If we have been instructed to only broadcast to peers this
# should do nothing. # should do nothing.
broker_token = get_broker_token() broadcast_rings_available(broker_token=str(uuid.uuid4()),
broadcast_rings_available(broker_token, peers=False,
storage=not peers_only) storage=not peers_only)
return return
elif builders_only: elif builders_only:
if not token:
token = str(uuid.uuid4())
# No need to stop proxies if only syncing builders between peers. # No need to stop proxies if only syncing builders between peers.
broker_token = get_broker_token() broadcast_rings_available(storage=False, builders_only=True,
broadcast_rings_available(broker_token, storage=False, broker_token=token)
builders_only=builders_only)
return return
rel_ids = relation_ids('cluster') log("Sending stop proxy service request to all peers", level=INFO)
trigger = str(uuid.uuid4()) rq = SwiftProxyClusterRPC().stop_proxy_request(peers_only, token=token)
for rid in relation_ids('cluster'):
log("Sending request to stop proxy service to all peers (%s)" % (trigger),
level=INFO)
rq = SwiftProxyClusterRPC().stop_proxy_request(peers_only)
for rid in rel_ids:
relation_set(relation_id=rid, relation_settings=rq) relation_set(relation_id=rid, relation_settings=rq)
@ -961,7 +1027,8 @@ def notify_storage_rings_available():
rings_url = 'http://%s/%s' % (hostname, path) rings_url = 'http://%s/%s' % (hostname, path)
trigger = uuid.uuid4() trigger = uuid.uuid4()
# Notify storage nodes that there is a new ring to fetch. # Notify storage nodes that there is a new ring to fetch.
log("Notifying storage nodes that new ring is ready for sync.", level=INFO) log("Notifying storage nodes that new rings are ready for sync.",
level=INFO)
for relid in relation_ids('swift-storage'): for relid in relation_ids('swift-storage'):
relation_set(relation_id=relid, swift_hash=get_swift_hash(), relation_set(relation_id=relid, swift_hash=get_swift_hash(),
rings_url=rings_url, trigger=trigger) rings_url=rings_url, trigger=trigger)
@ -996,6 +1063,34 @@ def get_hostaddr():
return unit_get('private-address') return unit_get('private-address')
def is_most_recent_timestamp(timestamp):
ts = []
for rid in relation_ids('cluster'):
for unit in related_units(rid):
settings = relation_get(rid=rid, unit=unit)
t = settings.get('broker-timestamp')
if t:
ts.append(t)
if not ts or not timestamp:
return False
return timestamp >= max(ts)
def timestamps_available(excluded_unit):
for rid in relation_ids('cluster'):
for unit in related_units(rid):
if unit == excluded_unit:
continue
settings = relation_get(rid=rid, unit=unit)
if settings.get('broker-timestamp'):
return True
return False
def is_paused(status_get=status_get): def is_paused(status_get=status_get):
"""Is the unit paused?""" """Is the unit paused?"""
with HookData()(): with HookData()():

View File

@ -28,32 +28,32 @@ class SwiftHooksTestCase(unittest.TestCase):
@patch("swift_hooks.relation_get") @patch("swift_hooks.relation_get")
@patch("swift_hooks.local_unit") @patch("swift_hooks.local_unit")
def test_all_peers_stopped(self, mock_local_unit, mock_relation_get): def test_is_all_peers_stopped(self, mock_local_unit, mock_relation_get):
token1 = str(uuid.uuid4()) token1 = str(uuid.uuid4())
token2 = str(uuid.uuid4()) token2 = str(uuid.uuid4())
mock_relation_get.return_value = token1 mock_relation_get.return_value = token1
responses = [{'some-other-key': token1}] responses = [{'some-other-key': token1}]
self.assertFalse(swift_hooks.all_peers_stopped(responses)) self.assertFalse(swift_hooks.is_all_peers_stopped(responses))
responses = [{'stop-proxy-service-ack': token1}, responses = [{'stop-proxy-service-ack': token1},
{'stop-proxy-service-ack': token2}] {'stop-proxy-service-ack': token2}]
self.assertFalse(swift_hooks.all_peers_stopped(responses)) self.assertFalse(swift_hooks.is_all_peers_stopped(responses))
responses = [{'stop-proxy-service-ack': token1}, responses = [{'stop-proxy-service-ack': token1},
{'stop-proxy-service-ack': token1}, {'stop-proxy-service-ack': token1},
{'some-other-key': token1}] {'some-other-key': token1}]
self.assertFalse(swift_hooks.all_peers_stopped(responses)) self.assertFalse(swift_hooks.is_all_peers_stopped(responses))
responses = [{'stop-proxy-service-ack': token1}, responses = [{'stop-proxy-service-ack': token1},
{'stop-proxy-service-ack': token1}] {'stop-proxy-service-ack': token1}]
self.assertTrue(swift_hooks.all_peers_stopped(responses)) self.assertTrue(swift_hooks.is_all_peers_stopped(responses))
mock_relation_get.return_value = token2 mock_relation_get.return_value = token2
responses = [{'stop-proxy-service-ack': token1}, responses = [{'stop-proxy-service-ack': token1},
{'stop-proxy-service-ack': token1}] {'stop-proxy-service-ack': token1}]
self.assertFalse(swift_hooks.all_peers_stopped(responses)) self.assertFalse(swift_hooks.is_all_peers_stopped(responses))
@patch.object(swift_hooks, 'config') @patch.object(swift_hooks, 'config')
@patch('charmhelpers.contrib.openstack.ip.config') @patch('charmhelpers.contrib.openstack.ip.config')

View File

@ -22,7 +22,7 @@ def init_ring_paths(tmpdir):
class SwiftUtilsTestCase(unittest.TestCase): class SwiftUtilsTestCase(unittest.TestCase):
@mock.patch('lib.swift_utils.get_broker_token') @mock.patch.object(swift_utils, 'previously_synced')
@mock.patch('lib.swift_utils.update_www_rings') @mock.patch('lib.swift_utils.update_www_rings')
@mock.patch('lib.swift_utils.get_builders_checksum') @mock.patch('lib.swift_utils.get_builders_checksum')
@mock.patch('lib.swift_utils.get_rings_checksum') @mock.patch('lib.swift_utils.get_rings_checksum')
@ -38,12 +38,12 @@ class SwiftUtilsTestCase(unittest.TestCase):
mock_log, mock_balance_rings, mock_log, mock_balance_rings,
mock_get_rings_checksum, mock_get_rings_checksum,
mock_get_builders_checksum, mock_update_www_rings, mock_get_builders_checksum, mock_update_www_rings,
mock_get_broker_token): mock_previously_synced):
mock_get_broker_token.return_value = "token1"
# Make sure same is returned for both so that we don't try to sync # Make sure same is returned for both so that we don't try to sync
mock_get_rings_checksum.return_value = None mock_get_rings_checksum.return_value = None
mock_get_builders_checksum.return_value = None mock_get_builders_checksum.return_value = None
mock_previously_synced.return_value = True
# Test blocker 1 # Test blocker 1
mock_is_elected_leader.return_value = False mock_is_elected_leader.return_value = False
@ -77,25 +77,23 @@ class SwiftUtilsTestCase(unittest.TestCase):
self.assertTrue(mock_set_min_hours.called) self.assertTrue(mock_set_min_hours.called)
self.assertTrue(mock_balance_rings.called) self.assertTrue(mock_balance_rings.called)
@mock.patch('lib.swift_utils.previously_synced')
@mock.patch('lib.swift_utils._load_builder') @mock.patch('lib.swift_utils._load_builder')
@mock.patch('lib.swift_utils.initialize_ring') @mock.patch('lib.swift_utils.initialize_ring')
@mock.patch('lib.swift_utils.get_broker_token')
@mock.patch('lib.swift_utils.update_www_rings') @mock.patch('lib.swift_utils.update_www_rings')
@mock.patch('lib.swift_utils.get_builders_checksum') @mock.patch('lib.swift_utils.get_builders_checksum')
@mock.patch('lib.swift_utils.get_rings_checksum') @mock.patch('lib.swift_utils.get_rings_checksum')
@mock.patch('lib.swift_utils.balance_rings') @mock.patch('lib.swift_utils.balance_rings')
@mock.patch('lib.swift_utils.log') @mock.patch('lib.swift_utils.log')
@mock.patch('lib.swift_utils.is_elected_leader') @mock.patch('lib.swift_utils.is_elected_leader')
def test_update_rings_multiple_devs(self, def test_update_rings_multiple_devs(self, mock_is_elected_leader,
mock_is_elected_leader,
mock_log, mock_balance_rings, mock_log, mock_balance_rings,
mock_get_rings_checksum, mock_get_rings_checksum,
mock_get_builders_checksum, mock_get_builders_checksum,
mock_update_www_rings, mock_update_www_rings,
mock_get_broker_token,
mock_initialize_ring, mock_initialize_ring,
mock_load_builder, mock_load_builder,
): mock_previously_synced):
# To avoid the need for swift.common.ring library, mock a basic # To avoid the need for swift.common.ring library, mock a basic
# rings dictionary, keyed by path. # rings dictionary, keyed by path.
# Each ring has enough logic to hold a dictionary with a single 'devs' # Each ring has enough logic to hold a dictionary with a single 'devs'
@ -115,11 +113,13 @@ class SwiftUtilsTestCase(unittest.TestCase):
def add_dev(self, dev): def add_dev(self, dev):
mock_rings[self.path]['devs'].append(dev) mock_rings[self.path]['devs'].append(dev)
return mock_ring(path) return mock_ring(path)
def mock_initialize_ring_fn(path, *args): def mock_initialize_ring_fn(path, *args):
mock_rings.setdefault(path, {'devs': []}) mock_rings.setdefault(path, {'devs': []})
mock_is_elected_leader.return_value = True
mock_load_builder.side_effect = mock_load_builder_fn mock_load_builder.side_effect = mock_load_builder_fn
mock_initialize_ring.side_effect = mock_initialize_ring_fn mock_initialize_ring.side_effect = mock_initialize_ring_fn
@ -153,7 +153,6 @@ class SwiftUtilsTestCase(unittest.TestCase):
swift_utils.update_rings(nodes) swift_utils.update_rings(nodes)
self.assertFalse(mock_add_to_ring.called) self.assertFalse(mock_add_to_ring.called)
@mock.patch('lib.swift_utils.get_broker_token')
@mock.patch('lib.swift_utils.balance_rings') @mock.patch('lib.swift_utils.balance_rings')
@mock.patch('lib.swift_utils.log') @mock.patch('lib.swift_utils.log')
@mock.patch('lib.swift_utils.is_elected_leader') @mock.patch('lib.swift_utils.is_elected_leader')
@ -165,9 +164,7 @@ class SwiftUtilsTestCase(unittest.TestCase):
mock_config, mock_config,
mock_is_elected_leader, mock_is_elected_leader,
mock_log, mock_log,
mock_balance_rings, mock_balance_rings):
mock_get_broker_token):
mock_get_broker_token.return_value = "token1"
@swift_utils.sync_builders_and_rings_if_changed @swift_utils.sync_builders_and_rings_if_changed
def mock_balance(): def mock_balance():
@ -200,6 +197,8 @@ class SwiftUtilsTestCase(unittest.TestCase):
finally: finally:
shutil.rmtree(tmpdir) shutil.rmtree(tmpdir)
@mock.patch('lib.swift_utils.is_elected_leader', lambda arg: True)
@mock.patch.object(swift_utils, 'get_hostaddr', lambda *args: '1.2.3.4')
@mock.patch('lib.swift_utils.uuid') @mock.patch('lib.swift_utils.uuid')
def test_cluster_rpc_stop_proxy_request(self, mock_uuid): def test_cluster_rpc_stop_proxy_request(self, mock_uuid):
mock_uuid.uuid4.return_value = 'test-uuid' mock_uuid.uuid4.return_value = 'test-uuid'
@ -207,9 +206,11 @@ class SwiftUtilsTestCase(unittest.TestCase):
rq = rpc.stop_proxy_request(peers_only=True) rq = rpc.stop_proxy_request(peers_only=True)
self.assertEqual({'trigger': 'test-uuid', self.assertEqual({'trigger': 'test-uuid',
'broker-token': None, 'broker-token': None,
'builder-broker': None, 'broker-timestamp': None,
'peers-only': True, 'builder-broker': '1.2.3.4',
'peers-only': 1,
'leader-changed-notification': None, 'leader-changed-notification': None,
'resync-request': None,
'stop-proxy-service': 'test-uuid', 'stop-proxy-service': 'test-uuid',
'stop-proxy-service-ack': None, 'stop-proxy-service-ack': None,
'sync-only-builders': None}, rq) 'sync-only-builders': None}, rq)
@ -217,13 +218,18 @@ class SwiftUtilsTestCase(unittest.TestCase):
rq = rpc.stop_proxy_request() rq = rpc.stop_proxy_request()
self.assertEqual({'trigger': 'test-uuid', self.assertEqual({'trigger': 'test-uuid',
'broker-token': None, 'broker-token': None,
'builder-broker': None, 'broker-timestamp': None,
'builder-broker': '1.2.3.4',
'peers-only': None, 'peers-only': None,
'leader-changed-notification': None, 'leader-changed-notification': None,
'resync-request': None,
'stop-proxy-service': 'test-uuid', 'stop-proxy-service': 'test-uuid',
'stop-proxy-service-ack': None, 'stop-proxy-service-ack': None,
'sync-only-builders': None}, rq) 'sync-only-builders': None}, rq)
template_keys = set(rpc.template())
self.assertTrue(set(rq.keys()).issubset(template_keys))
@mock.patch('lib.swift_utils.uuid') @mock.patch('lib.swift_utils.uuid')
def test_cluster_rpc_stop_proxy_ack(self, mock_uuid): def test_cluster_rpc_stop_proxy_ack(self, mock_uuid):
mock_uuid.uuid4.return_value = 'token2' mock_uuid.uuid4.return_value = 'token2'
@ -232,40 +238,60 @@ class SwiftUtilsTestCase(unittest.TestCase):
self.assertEqual({'trigger': 'token2', self.assertEqual({'trigger': 'token2',
'broker-token': None, 'broker-token': None,
'builder-broker': None, 'builder-broker': None,
'broker-timestamp': None,
'peers-only': '1', 'peers-only': '1',
'leader-changed-notification': None, 'leader-changed-notification': None,
'resync-request': None,
'stop-proxy-service': None, 'stop-proxy-service': None,
'stop-proxy-service-ack': 'token1', 'stop-proxy-service-ack': 'token1',
'sync-only-builders': None}, rq) 'sync-only-builders': None}, rq)
template_keys = set(rpc.template())
self.assertTrue(set(rq.keys()).issubset(template_keys))
@mock.patch('lib.swift_utils.is_elected_leader', lambda arg: True)
@mock.patch.object(swift_utils, 'get_hostaddr', lambda *args: '1.2.3.4')
@mock.patch.object(swift_utils, 'time')
@mock.patch('lib.swift_utils.uuid') @mock.patch('lib.swift_utils.uuid')
def test_cluster_rpc_sync_request(self, mock_uuid): def test_cluster_rpc_sync_request(self, mock_uuid, mock_time):
mock_time.time = mock.Mock(return_value=float(1.234))
mock_uuid.uuid4.return_value = 'token2' mock_uuid.uuid4.return_value = 'token2'
rpc = swift_utils.SwiftProxyClusterRPC() rpc = swift_utils.SwiftProxyClusterRPC()
rq = rpc.sync_rings_request('HostA', 'token1') rq = rpc.sync_rings_request('token1')
self.assertEqual({'trigger': 'token2', self.assertEqual({'trigger': 'token2',
'broker-token': 'token1', 'broker-token': 'token1',
'builder-broker': 'HostA', 'broker-timestamp': '1.234000',
'builder-broker': '1.2.3.4',
'peers-only': None, 'peers-only': None,
'leader-changed-notification': None, 'leader-changed-notification': None,
'resync-request': None,
'stop-proxy-service': None, 'stop-proxy-service': None,
'stop-proxy-service-ack': None, 'stop-proxy-service-ack': None,
'sync-only-builders': None}, rq) 'sync-only-builders': None}, rq)
template_keys = set(rpc.template())
self.assertTrue(set(rq.keys()).issubset(template_keys))
@mock.patch('lib.swift_utils.is_elected_leader', lambda arg: True)
@mock.patch('lib.swift_utils.uuid') @mock.patch('lib.swift_utils.uuid')
def test_cluster_rpc_notify_leader_changed(self, mock_uuid): def test_cluster_rpc_notify_leader_changed(self, mock_uuid):
mock_uuid.uuid4.return_value = 'token1' mock_uuid.uuid4.return_value = 'e4b67426-6cc0-4aa3-829d-227999cd0a75'
rpc = swift_utils.SwiftProxyClusterRPC() rpc = swift_utils.SwiftProxyClusterRPC()
rq = rpc.notify_leader_changed() rq = rpc.notify_leader_changed('token1')
self.assertEqual({'trigger': 'token1', self.assertEqual({'trigger': 'e4b67426-6cc0-4aa3-829d-227999cd0a75',
'broker-token': None, 'broker-token': None,
'builder-broker': None, 'builder-broker': None,
'broker-timestamp': None,
'peers-only': None, 'peers-only': None,
'leader-changed-notification': 'token1', 'leader-changed-notification': 'token1',
'stop-proxy-service': None, 'stop-proxy-service': None,
'stop-proxy-service-ack': None, 'stop-proxy-service-ack': None,
'resync-request': None,
'sync-only-builders': None}, rq) 'sync-only-builders': None}, rq)
template_keys = set(rpc.template().keys())
self.assertTrue(set(rq.keys()).issubset(template_keys))
def test_all_responses_equal(self): def test_all_responses_equal(self):
responses = [{'a': 1, 'c': 3}] responses = [{'a': 1, 'c': 3}]
self.assertTrue(swift_utils.all_responses_equal(responses, 'b', self.assertTrue(swift_utils.all_responses_equal(responses, 'b',
@ -301,6 +327,48 @@ class SwiftUtilsTestCase(unittest.TestCase):
rsps = [] rsps = []
self.assertIsNone(swift_utils.get_first_available_value(rsps, 'key3')) self.assertIsNone(swift_utils.get_first_available_value(rsps, 'key3'))
@mock.patch.object(swift_utils, 'relation_get')
@mock.patch.object(swift_utils, 'related_units', lambda arg: ['proxy/1'])
@mock.patch.object(swift_utils, 'relation_ids', lambda arg: ['cluster:1'])
def test_is_most_recent_timestamp(self, mock_rel_get):
mock_rel_get.return_value = {'broker-timestamp': '1111'}
self.assertTrue(swift_utils.is_most_recent_timestamp('1234'))
mock_rel_get.return_value = {'broker-timestamp': '2234'}
self.assertFalse(swift_utils.is_most_recent_timestamp('1234'))
mock_rel_get.return_value = {}
self.assertFalse(swift_utils.is_most_recent_timestamp('1234'))
mock_rel_get.return_value = {'broker-timestamp': '2234'}
self.assertFalse(swift_utils.is_most_recent_timestamp(None))
@mock.patch.object(swift_utils, 'relation_get')
@mock.patch.object(swift_utils, 'related_units', lambda arg: ['proxy/1'])
@mock.patch.object(swift_utils, 'relation_ids', lambda arg: ['cluster:1'])
def test_timestamps_available(self, mock_rel_get):
mock_rel_get.return_value = {}
self.assertFalse(swift_utils.timestamps_available('proxy/1'))
mock_rel_get.return_value = {'broker-timestamp': '1234'}
self.assertFalse(swift_utils.timestamps_available('proxy/1'))
mock_rel_get.return_value = {'broker-timestamp': '1234'}
self.assertTrue(swift_utils.timestamps_available('proxy/2'))
def _test_is_paused_unknown(self):
def fake_status_get():
return ("unknown", "")
self.assertFalse(swift_utils.is_paused(status_get=fake_status_get))
def _test_is_paused_paused(self):
def fake_status_get():
return ("maintenance", "Paused")
self.assertTrue(swift_utils.is_paused(status_get=fake_status_get))
def _test_is_paused_other_maintenance(self):
def fake_status_get():
return ("maintenance", "Hook")
self.assertFalse(swift_utils.is_paused(status_get=fake_status_get))
@mock.patch('lib.swift_utils.is_paused') @mock.patch('lib.swift_utils.is_paused')
@mock.patch('lib.swift_utils.config') @mock.patch('lib.swift_utils.config')
@mock.patch('lib.swift_utils.set_os_workload_status') @mock.patch('lib.swift_utils.set_os_workload_status')