Remove random wait use distributed_wait
Random waits were used to avoid restart collisions. This change uses distributed_wait based on modulo_distribution to calculate wait time based on the modulo-nodes setting (or peer relations) and known-wait. charm-helpers sync for module distribution helpers Depends-On: I02c648cccc72d816beeec5546b6c7914d57c607a Change-Id: Ic796c23e1bc560d461c674cbfadf8589380eb649
This commit is contained in:
parent
11166e8ae1
commit
a68b912cf5
23
config.yaml
23
config.yaml
|
@ -254,3 +254,26 @@ options:
|
|||
rbd pool has been created, changing this value will not have any effect
|
||||
(although it can be changed in ceph by manually configuring your ceph
|
||||
cluster).
|
||||
modulo-nodes:
|
||||
type: int
|
||||
default:
|
||||
description: |
|
||||
This config option is rarely required but is provided for fine tuning, it
|
||||
is safe to leave unset. Modulo nodes is used to help avoid restart
|
||||
collisions as well as distribute load on the cloud at larger scale.
|
||||
During restarts and cluster joins rabbitmq needs to execute these
|
||||
operations serially. By setting modulo-nodes to the size of the cluster
|
||||
and known-wait to a reasonable value, the charm will distribute the
|
||||
operations serially. If this value is unset, the charm will check
|
||||
min-cluster-size or else finally default to the size of the cluster
|
||||
based on peer relations. Setting this value to 0 will execute operations
|
||||
with no wait time. Setting this value to less than the cluster size will
|
||||
distribute load but may lead to restart collisions.
|
||||
known-wait:
|
||||
type: int
|
||||
default: 30
|
||||
description: |
|
||||
Known wait along with modulo nodes is used to help avoid restart
|
||||
collisions. Known wait is the amount of time between one node executing
|
||||
an operation and another. On slower hardware this value may need to be
|
||||
larger than the default of 30 seconds.
|
||||
|
|
|
@ -27,6 +27,7 @@ clustering-related helpers.
|
|||
|
||||
import subprocess
|
||||
import os
|
||||
import time
|
||||
|
||||
from socket import gethostname as get_unit_hostname
|
||||
|
||||
|
@ -45,6 +46,9 @@ from charmhelpers.core.hookenv import (
|
|||
is_leader as juju_is_leader,
|
||||
status_set,
|
||||
)
|
||||
from charmhelpers.core.host import (
|
||||
modulo_distribution,
|
||||
)
|
||||
from charmhelpers.core.decorators import (
|
||||
retry_on_exception,
|
||||
)
|
||||
|
@ -361,3 +365,29 @@ def canonical_url(configs, vip_setting='vip'):
|
|||
else:
|
||||
addr = unit_get('private-address')
|
||||
return '%s://%s' % (scheme, addr)
|
||||
|
||||
|
||||
def distributed_wait(modulo=None, wait=None, operation_name='operation'):
|
||||
''' Distribute operations by waiting based on modulo_distribution
|
||||
|
||||
If modulo and or wait are not set, check config_get for those values.
|
||||
|
||||
:param modulo: int The modulo number creates the group distribution
|
||||
:param wait: int The constant time wait value
|
||||
:param operation_name: string Operation name for status message
|
||||
i.e. 'restart'
|
||||
:side effect: Calls config_get()
|
||||
:side effect: Calls log()
|
||||
:side effect: Calls status_set()
|
||||
:side effect: Calls time.sleep()
|
||||
'''
|
||||
if modulo is None:
|
||||
modulo = config_get('modulo-nodes')
|
||||
if wait is None:
|
||||
wait = config_get('known-wait')
|
||||
calculated_wait = modulo_distribution(modulo=modulo, wait=wait)
|
||||
msg = "Waiting {} seconds for {} ...".format(calculated_wait,
|
||||
operation_name)
|
||||
log(msg, DEBUG)
|
||||
status_set('maintenance', msg)
|
||||
time.sleep(calculated_wait)
|
||||
|
|
|
@ -70,12 +70,12 @@ class DisabledModuleAudit(BaseAudit):
|
|||
"""Returns the modules which are enabled in Apache."""
|
||||
output = subprocess.check_output(['apache2ctl', '-M'])
|
||||
modules = []
|
||||
for line in output.strip().split():
|
||||
for line in output.splitlines():
|
||||
# Each line of the enabled module output looks like:
|
||||
# module_name (static|shared)
|
||||
# Plus a header line at the top of the output which is stripped
|
||||
# out by the regex.
|
||||
matcher = re.search(r'^ (\S*)', line)
|
||||
matcher = re.search(r'^ (\S*)_module (\S*)', line)
|
||||
if matcher:
|
||||
modules.append(matcher.group(1))
|
||||
return modules
|
||||
|
|
|
@ -29,3 +29,16 @@ def install_alternative(name, target, source, priority=50):
|
|||
target, name, source, str(priority)
|
||||
]
|
||||
subprocess.check_call(cmd)
|
||||
|
||||
|
||||
def remove_alternative(name, source):
|
||||
"""Remove an installed alternative configuration file
|
||||
|
||||
:param name: string name of the alternative to remove
|
||||
:param source: string full path to alternative to remove
|
||||
"""
|
||||
cmd = [
|
||||
'update-alternatives', '--remove',
|
||||
name, source
|
||||
]
|
||||
subprocess.check_call(cmd)
|
||||
|
|
|
@ -802,8 +802,9 @@ class ApacheSSLContext(OSContextGenerator):
|
|||
else:
|
||||
# Expect cert/key provided in config (currently assumed that ca
|
||||
# uses ip for cn)
|
||||
cn = resolve_address(endpoint_type=INTERNAL)
|
||||
self.configure_cert(cn)
|
||||
for net_type in (INTERNAL, ADMIN, PUBLIC):
|
||||
cn = resolve_address(endpoint_type=net_type)
|
||||
self.configure_cert(cn)
|
||||
|
||||
addresses = self.get_network_addresses()
|
||||
for address, endpoint in addresses:
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
CRITICAL=0
|
||||
NOTACTIVE=''
|
||||
LOGFILE=/var/log/nagios/check_haproxy.log
|
||||
AUTH=$(grep -r "stats auth" /etc/haproxy | awk 'NR=1{print $4}')
|
||||
AUTH=$(grep -r "stats auth" /etc/haproxy/haproxy.cfg | awk 'NR=1{print $4}')
|
||||
|
||||
typeset -i N_INSTANCES=0
|
||||
for appserver in $(awk '/^\s+server/{print $2}' /etc/haproxy/haproxy.cfg)
|
||||
|
|
|
@ -59,18 +59,13 @@ def determine_dkms_package():
|
|||
|
||||
|
||||
def quantum_plugins():
|
||||
from charmhelpers.contrib.openstack import context
|
||||
return {
|
||||
'ovs': {
|
||||
'config': '/etc/quantum/plugins/openvswitch/'
|
||||
'ovs_quantum_plugin.ini',
|
||||
'driver': 'quantum.plugins.openvswitch.ovs_quantum_plugin.'
|
||||
'OVSQuantumPluginV2',
|
||||
'contexts': [
|
||||
context.SharedDBContext(user=config('neutron-database-user'),
|
||||
database=config('neutron-database'),
|
||||
relation_prefix='neutron',
|
||||
ssl_dir=QUANTUM_CONF_DIR)],
|
||||
'contexts': [],
|
||||
'services': ['quantum-plugin-openvswitch-agent'],
|
||||
'packages': [determine_dkms_package(),
|
||||
['quantum-plugin-openvswitch-agent']],
|
||||
|
@ -82,11 +77,7 @@ def quantum_plugins():
|
|||
'config': '/etc/quantum/plugins/nicira/nvp.ini',
|
||||
'driver': 'quantum.plugins.nicira.nicira_nvp_plugin.'
|
||||
'QuantumPlugin.NvpPluginV2',
|
||||
'contexts': [
|
||||
context.SharedDBContext(user=config('neutron-database-user'),
|
||||
database=config('neutron-database'),
|
||||
relation_prefix='neutron',
|
||||
ssl_dir=QUANTUM_CONF_DIR)],
|
||||
'contexts': [],
|
||||
'services': [],
|
||||
'packages': [],
|
||||
'server_packages': ['quantum-server',
|
||||
|
@ -100,7 +91,6 @@ NEUTRON_CONF_DIR = '/etc/neutron'
|
|||
|
||||
|
||||
def neutron_plugins():
|
||||
from charmhelpers.contrib.openstack import context
|
||||
release = os_release('nova-common')
|
||||
plugins = {
|
||||
'ovs': {
|
||||
|
@ -108,11 +98,7 @@ def neutron_plugins():
|
|||
'ovs_neutron_plugin.ini',
|
||||
'driver': 'neutron.plugins.openvswitch.ovs_neutron_plugin.'
|
||||
'OVSNeutronPluginV2',
|
||||
'contexts': [
|
||||
context.SharedDBContext(user=config('neutron-database-user'),
|
||||
database=config('neutron-database'),
|
||||
relation_prefix='neutron',
|
||||
ssl_dir=NEUTRON_CONF_DIR)],
|
||||
'contexts': [],
|
||||
'services': ['neutron-plugin-openvswitch-agent'],
|
||||
'packages': [determine_dkms_package(),
|
||||
['neutron-plugin-openvswitch-agent']],
|
||||
|
@ -124,11 +110,7 @@ def neutron_plugins():
|
|||
'config': '/etc/neutron/plugins/nicira/nvp.ini',
|
||||
'driver': 'neutron.plugins.nicira.nicira_nvp_plugin.'
|
||||
'NeutronPlugin.NvpPluginV2',
|
||||
'contexts': [
|
||||
context.SharedDBContext(user=config('neutron-database-user'),
|
||||
database=config('neutron-database'),
|
||||
relation_prefix='neutron',
|
||||
ssl_dir=NEUTRON_CONF_DIR)],
|
||||
'contexts': [],
|
||||
'services': [],
|
||||
'packages': [],
|
||||
'server_packages': ['neutron-server',
|
||||
|
@ -138,11 +120,7 @@ def neutron_plugins():
|
|||
'nsx': {
|
||||
'config': '/etc/neutron/plugins/vmware/nsx.ini',
|
||||
'driver': 'vmware',
|
||||
'contexts': [
|
||||
context.SharedDBContext(user=config('neutron-database-user'),
|
||||
database=config('neutron-database'),
|
||||
relation_prefix='neutron',
|
||||
ssl_dir=NEUTRON_CONF_DIR)],
|
||||
'contexts': [],
|
||||
'services': [],
|
||||
'packages': [],
|
||||
'server_packages': ['neutron-server',
|
||||
|
@ -152,11 +130,7 @@ def neutron_plugins():
|
|||
'n1kv': {
|
||||
'config': '/etc/neutron/plugins/cisco/cisco_plugins.ini',
|
||||
'driver': 'neutron.plugins.cisco.network_plugin.PluginV2',
|
||||
'contexts': [
|
||||
context.SharedDBContext(user=config('neutron-database-user'),
|
||||
database=config('neutron-database'),
|
||||
relation_prefix='neutron',
|
||||
ssl_dir=NEUTRON_CONF_DIR)],
|
||||
'contexts': [],
|
||||
'services': [],
|
||||
'packages': [determine_dkms_package(),
|
||||
['neutron-plugin-cisco']],
|
||||
|
@ -167,11 +141,7 @@ def neutron_plugins():
|
|||
'Calico': {
|
||||
'config': '/etc/neutron/plugins/ml2/ml2_conf.ini',
|
||||
'driver': 'neutron.plugins.ml2.plugin.Ml2Plugin',
|
||||
'contexts': [
|
||||
context.SharedDBContext(user=config('neutron-database-user'),
|
||||
database=config('neutron-database'),
|
||||
relation_prefix='neutron',
|
||||
ssl_dir=NEUTRON_CONF_DIR)],
|
||||
'contexts': [],
|
||||
'services': ['calico-felix',
|
||||
'bird',
|
||||
'neutron-dhcp-agent',
|
||||
|
@ -189,11 +159,7 @@ def neutron_plugins():
|
|||
'vsp': {
|
||||
'config': '/etc/neutron/plugins/nuage/nuage_plugin.ini',
|
||||
'driver': 'neutron.plugins.nuage.plugin.NuagePlugin',
|
||||
'contexts': [
|
||||
context.SharedDBContext(user=config('neutron-database-user'),
|
||||
database=config('neutron-database'),
|
||||
relation_prefix='neutron',
|
||||
ssl_dir=NEUTRON_CONF_DIR)],
|
||||
'contexts': [],
|
||||
'services': [],
|
||||
'packages': [],
|
||||
'server_packages': ['neutron-server', 'neutron-plugin-nuage'],
|
||||
|
@ -203,10 +169,7 @@ def neutron_plugins():
|
|||
'config': '/etc/neutron/plugins/plumgrid/plumgrid.ini',
|
||||
'driver': ('neutron.plugins.plumgrid.plumgrid_plugin'
|
||||
'.plumgrid_plugin.NeutronPluginPLUMgridV2'),
|
||||
'contexts': [
|
||||
context.SharedDBContext(user=config('database-user'),
|
||||
database=config('database'),
|
||||
ssl_dir=NEUTRON_CONF_DIR)],
|
||||
'contexts': [],
|
||||
'services': [],
|
||||
'packages': ['plumgrid-lxc',
|
||||
'iovisor-dkms'],
|
||||
|
@ -217,11 +180,7 @@ def neutron_plugins():
|
|||
'midonet': {
|
||||
'config': '/etc/neutron/plugins/midonet/midonet.ini',
|
||||
'driver': 'midonet.neutron.plugin.MidonetPluginV2',
|
||||
'contexts': [
|
||||
context.SharedDBContext(user=config('neutron-database-user'),
|
||||
database=config('neutron-database'),
|
||||
relation_prefix='neutron',
|
||||
ssl_dir=NEUTRON_CONF_DIR)],
|
||||
'contexts': [],
|
||||
'services': [],
|
||||
'packages': [determine_dkms_package()],
|
||||
'server_packages': ['neutron-server',
|
||||
|
|
|
@ -95,7 +95,7 @@ from charmhelpers.fetch import (
|
|||
from charmhelpers.fetch.snap import (
|
||||
snap_install,
|
||||
snap_refresh,
|
||||
SNAP_CHANNELS,
|
||||
valid_snap_channel,
|
||||
)
|
||||
|
||||
from charmhelpers.contrib.storage.linux.utils import is_block_device, zap_disk
|
||||
|
@ -579,6 +579,9 @@ def configure_installation_source(source_plus_key):
|
|||
Note that the behaviour on error is to log the error to the juju log and
|
||||
then call sys.exit(1).
|
||||
"""
|
||||
if source_plus_key.startswith('snap'):
|
||||
# Do nothing for snap installs
|
||||
return
|
||||
# extract the key if there is one, denoted by a '|' in the rel
|
||||
source, key = get_source_and_pgp_key(source_plus_key)
|
||||
|
||||
|
@ -2048,7 +2051,7 @@ def update_json_file(filename, items):
|
|||
def snap_install_requested():
|
||||
""" Determine if installing from snaps
|
||||
|
||||
If openstack-origin is of the form snap:channel-series-release
|
||||
If openstack-origin is of the form snap:track/channel
|
||||
and channel is in SNAPS_CHANNELS return True.
|
||||
"""
|
||||
origin = config('openstack-origin') or ""
|
||||
|
@ -2056,10 +2059,12 @@ def snap_install_requested():
|
|||
return False
|
||||
|
||||
_src = origin[5:]
|
||||
channel, series, release = _src.split('-')
|
||||
if channel.lower() in SNAP_CHANNELS:
|
||||
return True
|
||||
return False
|
||||
if '/' in _src:
|
||||
_track, channel = _src.split('/')
|
||||
else:
|
||||
# Hanlde snap:track with no channel
|
||||
channel = 'stable'
|
||||
return valid_snap_channel(channel)
|
||||
|
||||
|
||||
def get_snaps_install_info_from_origin(snaps, src, mode='classic'):
|
||||
|
@ -2067,7 +2072,7 @@ def get_snaps_install_info_from_origin(snaps, src, mode='classic'):
|
|||
|
||||
@param snaps: List of snaps
|
||||
@param src: String of openstack-origin or source of the form
|
||||
snap:channel-series-track
|
||||
snap:track/channel
|
||||
@param mode: String classic, devmode or jailmode
|
||||
@returns: Dictionary of snaps with channels and modes
|
||||
"""
|
||||
|
@ -2077,8 +2082,7 @@ def get_snaps_install_info_from_origin(snaps, src, mode='classic'):
|
|||
return {}
|
||||
|
||||
_src = src[5:]
|
||||
_channel, _series, _release = _src.split('-')
|
||||
channel = '--channel={}/{}'.format(_release, _channel)
|
||||
channel = '--channel={}'.format(_src)
|
||||
|
||||
return {snap: {'channel': channel, 'mode': mode}
|
||||
for snap in snaps}
|
||||
|
@ -2090,8 +2094,8 @@ def install_os_snaps(snaps, refresh=False):
|
|||
@param snaps: Dictionary of snaps with channels and modes of the form:
|
||||
{'snap_name': {'channel': 'snap_channel',
|
||||
'mode': 'snap_mode'}}
|
||||
Where channel a snapstore channel and mode is --classic, --devmode or
|
||||
--jailmode.
|
||||
Where channel is a snapstore channel and mode is --classic, --devmode
|
||||
or --jailmode.
|
||||
@param post_snap_install: Callback function to run after snaps have been
|
||||
installed
|
||||
"""
|
||||
|
|
|
@ -218,6 +218,8 @@ def principal_unit():
|
|||
for rid in relation_ids(reltype):
|
||||
for unit in related_units(rid):
|
||||
md = _metadata_unit(unit)
|
||||
if not md:
|
||||
continue
|
||||
subordinate = md.pop('subordinate', None)
|
||||
if not subordinate:
|
||||
return unit
|
||||
|
@ -511,7 +513,10 @@ def _metadata_unit(unit):
|
|||
"""
|
||||
basedir = os.sep.join(charm_dir().split(os.sep)[:-2])
|
||||
unitdir = 'unit-{}'.format(unit.replace(os.sep, '-'))
|
||||
with open(os.path.join(basedir, unitdir, 'charm', 'metadata.yaml')) as md:
|
||||
joineddir = os.path.join(basedir, unitdir, 'charm', 'metadata.yaml')
|
||||
if not os.path.exists(joineddir):
|
||||
return None
|
||||
with open(joineddir) as md:
|
||||
return yaml.safe_load(md)
|
||||
|
||||
|
||||
|
@ -667,6 +672,17 @@ def close_ports(start, end, protocol="TCP"):
|
|||
subprocess.check_call(_args)
|
||||
|
||||
|
||||
def opened_ports():
|
||||
"""Get the opened ports
|
||||
|
||||
*Note that this will only show ports opened in a previous hook*
|
||||
|
||||
:returns: Opened ports as a list of strings: ``['8080/tcp', '8081-8083/tcp']``
|
||||
"""
|
||||
_args = ['opened-ports', '--format=json']
|
||||
return json.loads(subprocess.check_output(_args).decode('UTF-8'))
|
||||
|
||||
|
||||
@cached
|
||||
def unit_get(attribute):
|
||||
"""Get the unit ID for the remote unit"""
|
||||
|
|
|
@ -34,7 +34,7 @@ import six
|
|||
|
||||
from contextlib import contextmanager
|
||||
from collections import OrderedDict
|
||||
from .hookenv import log, DEBUG
|
||||
from .hookenv import log, DEBUG, local_unit
|
||||
from .fstab import Fstab
|
||||
from charmhelpers.osplatform import get_platform
|
||||
|
||||
|
@ -946,3 +946,31 @@ def updatedb(updatedb_text, new_path):
|
|||
lines[i] = 'PRUNEPATHS="{}"'.format(' '.join(paths))
|
||||
output = "\n".join(lines)
|
||||
return output
|
||||
|
||||
|
||||
def modulo_distribution(modulo=3, wait=30):
|
||||
""" Modulo distribution
|
||||
|
||||
This helper uses the unit number, a modulo value and a constant wait time
|
||||
to produce a calculated wait time distribution. This is useful in large
|
||||
scale deployments to distribute load during an expensive operation such as
|
||||
service restarts.
|
||||
|
||||
If you have 1000 nodes that need to restart 100 at a time 1 minute at a
|
||||
time:
|
||||
|
||||
time.wait(modulo_distribution(modulo=100, wait=60))
|
||||
restart()
|
||||
|
||||
If you need restarts to happen serially set modulo to the exact number of
|
||||
nodes and set a high constant wait time:
|
||||
|
||||
time.wait(modulo_distribution(modulo=10, wait=120))
|
||||
restart()
|
||||
|
||||
@param modulo: int The modulo number creates the group distribution
|
||||
@param wait: int The constant time wait value
|
||||
@return: int Calculated time to wait for unit operation
|
||||
"""
|
||||
unit_number = int(local_unit().split('/')[1])
|
||||
return (unit_number % modulo) * wait
|
||||
|
|
|
@ -41,6 +41,10 @@ class CouldNotAcquireLockException(Exception):
|
|||
pass
|
||||
|
||||
|
||||
class InvalidSnapChannel(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _snap_exec(commands):
|
||||
"""
|
||||
Execute snap commands.
|
||||
|
@ -132,3 +136,15 @@ def snap_refresh(packages, *flags):
|
|||
|
||||
log(message, level='INFO')
|
||||
return _snap_exec(['refresh'] + flags + packages)
|
||||
|
||||
|
||||
def valid_snap_channel(channel):
|
||||
""" Validate snap channel exists
|
||||
|
||||
:raises InvalidSnapChannel: When channel does not exist
|
||||
:return: Boolean
|
||||
"""
|
||||
if channel.lower() in SNAP_CHANNELS:
|
||||
return True
|
||||
else:
|
||||
raise InvalidSnapChannel("Invalid Snap Channel: {}".format(channel))
|
||||
|
|
|
@ -18,7 +18,6 @@ import sys
|
|||
import subprocess
|
||||
import glob
|
||||
import tempfile
|
||||
import random
|
||||
import time
|
||||
import socket
|
||||
from collections import OrderedDict
|
||||
|
@ -40,6 +39,10 @@ from charmhelpers.contrib.openstack.utils import (
|
|||
is_unit_paused_set,
|
||||
)
|
||||
|
||||
from charmhelpers.contrib.hahelpers.cluster import (
|
||||
distributed_wait,
|
||||
)
|
||||
|
||||
from charmhelpers.contrib.network.ip import (
|
||||
get_ipv6_addr,
|
||||
get_address_in_network,
|
||||
|
@ -401,6 +404,37 @@ def wait_app():
|
|||
raise ex
|
||||
|
||||
|
||||
def cluster_wait():
|
||||
''' Wait for operations based on modulo distribution
|
||||
|
||||
Use the distributed_wait function to determine how long to wait before
|
||||
running an operation like restart or cluster join. By setting modulo to
|
||||
the exact number of nodes in the cluster we get serial operations.
|
||||
|
||||
Check for explicit configuration parameters for modulo distribution.
|
||||
The config setting modulo-nodes has first priority. If modulo-nodes is not
|
||||
set, check min-cluster-size. Finally, if neither value is set, determine
|
||||
how many peers there are from the cluster relation.
|
||||
|
||||
@side_effect: distributed_wait is called which calls time.sleep()
|
||||
@return: None
|
||||
'''
|
||||
wait = config('known-wait')
|
||||
if config('modulo-nodes') is not None:
|
||||
# modulo-nodes has first priority
|
||||
num_nodes = config('modulo-nodes')
|
||||
elif config('min-cluster-size'):
|
||||
# min-cluster-size is consulted next
|
||||
num_nodes = config('min-cluster-size')
|
||||
else:
|
||||
# If nothing explicit is configured, determine cluster size based on
|
||||
# peer relations
|
||||
num_nodes = 1
|
||||
for rid in relation_ids('cluster'):
|
||||
num_nodes += len(related_units(rid))
|
||||
distributed_wait(modulo=num_nodes, wait=wait)
|
||||
|
||||
|
||||
def start_app():
|
||||
''' Start the rabbitmq app and wait until it is fully started '''
|
||||
status_set('maintenance', 'Starting rabbitmq applilcation')
|
||||
|
@ -450,11 +484,8 @@ def cluster_with():
|
|||
# NOTE: The primary problem rabbitmq has clustering is when
|
||||
# more than one node attempts to cluster at the same time.
|
||||
# The asynchronous nature of hook firing nearly guarantees
|
||||
# this. Using random time wait is a hack until we can
|
||||
# implement charmhelpers.coordinator.
|
||||
status_set('maintenance',
|
||||
'Random wait for join_cluster to avoid collisions')
|
||||
time.sleep(random.random() * 100)
|
||||
# this. Using cluster_wait based on modulo_distribution
|
||||
cluster_wait()
|
||||
try:
|
||||
join_cluster(node)
|
||||
# NOTE: toggle the cluster relation to ensure that any peers
|
||||
|
@ -826,9 +857,7 @@ def restart_on_change(restart_map, stopstart=False):
|
|||
if path_hash(path) != checksums[path]:
|
||||
restarts += restart_map[path]
|
||||
services_list = list(OrderedDict.fromkeys(restarts))
|
||||
status_set('maintenance',
|
||||
'Random wait for restart to avoid collisions')
|
||||
time.sleep(random.random() * 100)
|
||||
cluster_wait()
|
||||
if not stopstart:
|
||||
for svc_name in services_list:
|
||||
system_service('restart', svc_name)
|
||||
|
|
|
@ -95,7 +95,7 @@ from charmhelpers.fetch import (
|
|||
from charmhelpers.fetch.snap import (
|
||||
snap_install,
|
||||
snap_refresh,
|
||||
SNAP_CHANNELS,
|
||||
valid_snap_channel,
|
||||
)
|
||||
|
||||
from charmhelpers.contrib.storage.linux.utils import is_block_device, zap_disk
|
||||
|
@ -579,6 +579,9 @@ def configure_installation_source(source_plus_key):
|
|||
Note that the behaviour on error is to log the error to the juju log and
|
||||
then call sys.exit(1).
|
||||
"""
|
||||
if source_plus_key.startswith('snap'):
|
||||
# Do nothing for snap installs
|
||||
return
|
||||
# extract the key if there is one, denoted by a '|' in the rel
|
||||
source, key = get_source_and_pgp_key(source_plus_key)
|
||||
|
||||
|
@ -2048,7 +2051,7 @@ def update_json_file(filename, items):
|
|||
def snap_install_requested():
|
||||
""" Determine if installing from snaps
|
||||
|
||||
If openstack-origin is of the form snap:channel-series-release
|
||||
If openstack-origin is of the form snap:track/channel
|
||||
and channel is in SNAPS_CHANNELS return True.
|
||||
"""
|
||||
origin = config('openstack-origin') or ""
|
||||
|
@ -2056,10 +2059,12 @@ def snap_install_requested():
|
|||
return False
|
||||
|
||||
_src = origin[5:]
|
||||
channel, series, release = _src.split('-')
|
||||
if channel.lower() in SNAP_CHANNELS:
|
||||
return True
|
||||
return False
|
||||
if '/' in _src:
|
||||
_track, channel = _src.split('/')
|
||||
else:
|
||||
# Hanlde snap:track with no channel
|
||||
channel = 'stable'
|
||||
return valid_snap_channel(channel)
|
||||
|
||||
|
||||
def get_snaps_install_info_from_origin(snaps, src, mode='classic'):
|
||||
|
@ -2067,7 +2072,7 @@ def get_snaps_install_info_from_origin(snaps, src, mode='classic'):
|
|||
|
||||
@param snaps: List of snaps
|
||||
@param src: String of openstack-origin or source of the form
|
||||
snap:channel-series-track
|
||||
snap:track/channel
|
||||
@param mode: String classic, devmode or jailmode
|
||||
@returns: Dictionary of snaps with channels and modes
|
||||
"""
|
||||
|
@ -2077,8 +2082,7 @@ def get_snaps_install_info_from_origin(snaps, src, mode='classic'):
|
|||
return {}
|
||||
|
||||
_src = src[5:]
|
||||
_channel, _series, _release = _src.split('-')
|
||||
channel = '--channel={}/{}'.format(_release, _channel)
|
||||
channel = '--channel={}'.format(_src)
|
||||
|
||||
return {snap: {'channel': channel, 'mode': mode}
|
||||
for snap in snaps}
|
||||
|
@ -2090,8 +2094,8 @@ def install_os_snaps(snaps, refresh=False):
|
|||
@param snaps: Dictionary of snaps with channels and modes of the form:
|
||||
{'snap_name': {'channel': 'snap_channel',
|
||||
'mode': 'snap_mode'}}
|
||||
Where channel a snapstore channel and mode is --classic, --devmode or
|
||||
--jailmode.
|
||||
Where channel is a snapstore channel and mode is --classic, --devmode
|
||||
or --jailmode.
|
||||
@param post_snap_install: Callback function to run after snaps have been
|
||||
installed
|
||||
"""
|
||||
|
|
|
@ -218,6 +218,8 @@ def principal_unit():
|
|||
for rid in relation_ids(reltype):
|
||||
for unit in related_units(rid):
|
||||
md = _metadata_unit(unit)
|
||||
if not md:
|
||||
continue
|
||||
subordinate = md.pop('subordinate', None)
|
||||
if not subordinate:
|
||||
return unit
|
||||
|
@ -511,7 +513,10 @@ def _metadata_unit(unit):
|
|||
"""
|
||||
basedir = os.sep.join(charm_dir().split(os.sep)[:-2])
|
||||
unitdir = 'unit-{}'.format(unit.replace(os.sep, '-'))
|
||||
with open(os.path.join(basedir, unitdir, 'charm', 'metadata.yaml')) as md:
|
||||
joineddir = os.path.join(basedir, unitdir, 'charm', 'metadata.yaml')
|
||||
if not os.path.exists(joineddir):
|
||||
return None
|
||||
with open(joineddir) as md:
|
||||
return yaml.safe_load(md)
|
||||
|
||||
|
||||
|
@ -667,6 +672,17 @@ def close_ports(start, end, protocol="TCP"):
|
|||
subprocess.check_call(_args)
|
||||
|
||||
|
||||
def opened_ports():
|
||||
"""Get the opened ports
|
||||
|
||||
*Note that this will only show ports opened in a previous hook*
|
||||
|
||||
:returns: Opened ports as a list of strings: ``['8080/tcp', '8081-8083/tcp']``
|
||||
"""
|
||||
_args = ['opened-ports', '--format=json']
|
||||
return json.loads(subprocess.check_output(_args).decode('UTF-8'))
|
||||
|
||||
|
||||
@cached
|
||||
def unit_get(attribute):
|
||||
"""Get the unit ID for the remote unit"""
|
||||
|
|
|
@ -34,7 +34,7 @@ import six
|
|||
|
||||
from contextlib import contextmanager
|
||||
from collections import OrderedDict
|
||||
from .hookenv import log, DEBUG
|
||||
from .hookenv import log, DEBUG, local_unit
|
||||
from .fstab import Fstab
|
||||
from charmhelpers.osplatform import get_platform
|
||||
|
||||
|
@ -946,3 +946,31 @@ def updatedb(updatedb_text, new_path):
|
|||
lines[i] = 'PRUNEPATHS="{}"'.format(' '.join(paths))
|
||||
output = "\n".join(lines)
|
||||
return output
|
||||
|
||||
|
||||
def modulo_distribution(modulo=3, wait=30):
|
||||
""" Modulo distribution
|
||||
|
||||
This helper uses the unit number, a modulo value and a constant wait time
|
||||
to produce a calculated wait time distribution. This is useful in large
|
||||
scale deployments to distribute load during an expensive operation such as
|
||||
service restarts.
|
||||
|
||||
If you have 1000 nodes that need to restart 100 at a time 1 minute at a
|
||||
time:
|
||||
|
||||
time.wait(modulo_distribution(modulo=100, wait=60))
|
||||
restart()
|
||||
|
||||
If you need restarts to happen serially set modulo to the exact number of
|
||||
nodes and set a high constant wait time:
|
||||
|
||||
time.wait(modulo_distribution(modulo=10, wait=120))
|
||||
restart()
|
||||
|
||||
@param modulo: int The modulo number creates the group distribution
|
||||
@param wait: int The constant time wait value
|
||||
@return: int Calculated time to wait for unit operation
|
||||
"""
|
||||
unit_number = int(local_unit().split('/')[1])
|
||||
return (unit_number % modulo) * wait
|
||||
|
|
|
@ -41,6 +41,10 @@ class CouldNotAcquireLockException(Exception):
|
|||
pass
|
||||
|
||||
|
||||
class InvalidSnapChannel(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _snap_exec(commands):
|
||||
"""
|
||||
Execute snap commands.
|
||||
|
@ -132,3 +136,15 @@ def snap_refresh(packages, *flags):
|
|||
|
||||
log(message, level='INFO')
|
||||
return _snap_exec(['refresh'] + flags + packages)
|
||||
|
||||
|
||||
def valid_snap_channel(channel):
|
||||
""" Validate snap channel exists
|
||||
|
||||
:raises InvalidSnapChannel: When channel does not exist
|
||||
:return: Boolean
|
||||
"""
|
||||
if channel.lower() in SNAP_CHANNELS:
|
||||
return True
|
||||
else:
|
||||
raise InvalidSnapChannel("Invalid Snap Channel: {}".format(channel))
|
||||
|
|
|
@ -228,6 +228,7 @@ class UtilsTests(CharmTestCase):
|
|||
self.assertEqual(rabbit_utils.leader_node(),
|
||||
'rabbit@juju-devel3-machine-15')
|
||||
|
||||
@mock.patch('rabbit_utils.cluster_wait')
|
||||
@mock.patch('rabbit_utils.relation_set')
|
||||
@mock.patch('rabbit_utils.wait_app')
|
||||
@mock.patch('rabbit_utils.subprocess.check_call')
|
||||
|
@ -242,12 +243,13 @@ class UtilsTests(CharmTestCase):
|
|||
mock_running_nodes, mock_time,
|
||||
mock_check_output, mock_check_call,
|
||||
mock_wait_app,
|
||||
mock_relation_set):
|
||||
mock_relation_set, mock_cluster_wait):
|
||||
mock_cmp_pkgrevno.return_value = True
|
||||
mock_clustered.return_value = False
|
||||
mock_leader_node.return_value = 'rabbit@juju-devel7-machine-11'
|
||||
mock_running_nodes.return_value = ['rabbit@juju-devel5-machine-19']
|
||||
rabbit_utils.cluster_with()
|
||||
mock_cluster_wait.assert_called_once_with()
|
||||
mock_check_output.assert_called_with([rabbit_utils.RABBITMQ_CTL,
|
||||
'join_cluster',
|
||||
'rabbit@juju-devel7-machine-11'],
|
||||
|
@ -720,3 +722,31 @@ class UtilsTests(CharmTestCase):
|
|||
self.test_config.set("source", "same")
|
||||
self.test_config.set_previous("source", "same")
|
||||
self.assertFalse(rabbit_utils.archive_upgrade_available())
|
||||
|
||||
@mock.patch.object(rabbit_utils, 'distributed_wait')
|
||||
def test_cluster_wait(self, mock_distributed_wait):
|
||||
self.relation_ids.return_value = ['amqp:27']
|
||||
self.related_units.return_value = ['unit/1', 'unit/2', 'unit/3']
|
||||
# Default check peer relation
|
||||
_config = {'known-wait': 30}
|
||||
self.config.side_effect = lambda key: _config.get(key)
|
||||
rabbit_utils.cluster_wait()
|
||||
mock_distributed_wait.assert_called_with(modulo=4, wait=30)
|
||||
|
||||
# Use Min Cluster Size
|
||||
_config = {'min-cluster-size': 5, 'known-wait': 30}
|
||||
self.config.side_effect = lambda key: _config.get(key)
|
||||
rabbit_utils.cluster_wait()
|
||||
mock_distributed_wait.assert_called_with(modulo=5, wait=30)
|
||||
|
||||
# Override with modulo-nodes
|
||||
_config = {'min-cluster-size': 5, 'modulo-nodes': 10, 'known-wait': 60}
|
||||
self.config.side_effect = lambda key: _config.get(key)
|
||||
rabbit_utils.cluster_wait()
|
||||
mock_distributed_wait.assert_called_with(modulo=10, wait=60)
|
||||
|
||||
# Just modulo-nodes
|
||||
_config = {'modulo-nodes': 10, 'known-wait': 60}
|
||||
self.config.side_effect = lambda key: _config.get(key)
|
||||
rabbit_utils.cluster_wait()
|
||||
mock_distributed_wait.assert_called_with(modulo=10, wait=60)
|
||||
|
|
Loading…
Reference in New Issue