Merged next charm in

This commit is contained in:
Liam Young 2015-01-09 10:36:01 +00:00
commit 67a0d7599b
45 changed files with 2561 additions and 801 deletions

View File

@ -14,8 +14,8 @@ This charm was developed to support deploying multiple version of Swift on
Ubuntu Precise 12.04, as they relate to the release series of OpenStack. That
is, OpenStack Essex corresponds to Swift 1.4.8 while OpenStack Folsom shipped
1.7.4. This charm can be used to deploy either (and future) versions of Swift
onto an Ubuntu Precise 12.04, making use of the Ubuntu Cloud Archive when
needed.
onto an Ubuntu Precise 12.04 or Trusty 14.04 making use of the Ubuntu Cloud
Archive when needed.
Usage
-----
@ -130,3 +130,4 @@ required for any integration with other OpenStack components.
Swift may be used to as a storage backend for the Glance image service. To do
so, simply add a relation between swift-proxy and an existing Glance service
deployed using the cs:precise/glance charm.

View File

@ -11,4 +11,5 @@ include:
- payload.execd
- contrib.network.ip
- contrib.peerstorage
- contrib.python.packages
- contrib.charmsupport

View File

@ -30,9 +30,23 @@ options:
type: int
description: Minimum replicas.
min-hours:
default: 1
default: 0
type: int
description: Minimum hours between balances
description: |
This is the Swift ring builder min_part_hours parameter. This
setting represents the amount of time in hours that Swift will wait
between subsequent ring re-balances in order to avoid large i/o loads as
data is re-balanced when new devices are added to the cluster. Once your
cluster has been built, you can set this to a higher value e.g. 1
(upstream default). Note that changing this value will result in an
attempt to re-balance and if successful, rings will be redistributed.
disable-ring-balance:
type: boolean
default: False
description: |
This provides similar support to min-hours but without having to modify
the builders. If True, any changes to the builders will not result in a
ring re-balance and sync until this value is set back to False.
zone-assignment:
default: "manual"
type: string
@ -71,7 +85,8 @@ options:
workers:
default: 0
type: int
description: Number of TCP workers to launch (0 for the number of system cores)
description: |
Number of TCP workers to launch (0 for the number of system cores).
operator-roles:
default: "Member,Admin"
type: string
@ -87,7 +102,9 @@ options:
node-timeout:
default: 60
type: int
description: How long the proxy server will wait on responses from the a/c/o servers.
description: |
How long the proxy server will wait on responses from the
account/container/object servers.
recoverable-node-timeout:
default: 30
type: int

View File

@ -0,0 +1,22 @@
# Bootstrap charm-helpers, installing its dependencies if necessary using
# only standard libraries.
import subprocess
import sys
try:
import six # flake8: noqa
except ImportError:
if sys.version_info.major == 2:
subprocess.check_call(['apt-get', 'install', '-y', 'python-six'])
else:
subprocess.check_call(['apt-get', 'install', '-y', 'python3-six'])
import six # flake8: noqa
try:
import yaml # flake8: noqa
except ImportError:
if sys.version_info.major == 2:
subprocess.check_call(['apt-get', 'install', '-y', 'python-yaml'])
else:
subprocess.check_call(['apt-get', 'install', '-y', 'python3-yaml'])
import yaml # flake8: noqa

View File

@ -16,6 +16,8 @@ import os
from socket import gethostname as get_unit_hostname
import six
from charmhelpers.core.hookenv import (
log,
relation_ids,
@ -27,12 +29,19 @@ from charmhelpers.core.hookenv import (
WARNING,
unit_get,
)
from charmhelpers.core.decorators import (
retry_on_exception,
)
class HAIncompleteConfig(Exception):
pass
class CRMResourceNotFound(Exception):
pass
def is_elected_leader(resource):
"""
Returns True if the charm executing this is the elected cluster leader.
@ -67,24 +76,30 @@ def is_clustered():
return False
def is_crm_leader(resource):
@retry_on_exception(5, base_delay=2, exc_type=CRMResourceNotFound)
def is_crm_leader(resource, retry=False):
"""
Returns True if the charm calling this is the elected corosync leader,
as returned by calling the external "crm" command.
We allow this operation to be retried to avoid the possibility of getting a
false negative. See LP #1396246 for more info.
"""
cmd = [
"crm", "resource",
"show", resource
]
cmd = ['crm', 'resource', 'show', resource]
try:
status = subprocess.check_output(cmd)
status = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
if not isinstance(status, six.text_type):
status = six.text_type(status, "utf-8")
except subprocess.CalledProcessError:
return False
else:
if get_unit_hostname() in status:
return True
else:
return False
status = None
if status and get_unit_hostname() in status:
return True
if status and "resource %s is NOT running" % (resource) in status:
raise CRMResourceNotFound("CRM resource %s not found" % (resource))
return False
def is_leader(resource):
@ -150,34 +165,42 @@ def https():
return False
def determine_api_port(public_port):
def determine_api_port(public_port, singlenode_mode=False):
'''
Determine correct API server listening port based on
existence of HTTPS reverse proxy and/or haproxy.
public_port: int: standard public port for given service
singlenode_mode: boolean: Shuffle ports when only a single unit is present
returns: int: the correct listening port for the API service
'''
i = 0
if len(peer_units()) > 0 or is_clustered():
if singlenode_mode:
i += 1
elif len(peer_units()) > 0 or is_clustered():
i += 1
if https():
i += 1
return public_port - (i * 10)
def determine_apache_port(public_port):
def determine_apache_port(public_port, singlenode_mode=False):
'''
Description: Determine correct apache listening port based on public IP +
state of the cluster.
public_port: int: standard public port for given service
singlenode_mode: boolean: Shuffle ports when only a single unit is present
returns: int: the correct listening port for the HAProxy service
'''
i = 0
if len(peer_units()) > 0 or is_clustered():
if singlenode_mode:
i += 1
elif len(peer_units()) > 0 or is_clustered():
i += 1
return public_port - (i * 10)
@ -197,7 +220,7 @@ def get_hacluster_config():
for setting in settings:
conf[setting] = config_get(setting)
missing = []
[missing.append(s) for s, v in conf.iteritems() if v is None]
[missing.append(s) for s, v in six.iteritems(conf) if v is None]
if missing:
log('Insufficient config data to configure hacluster.', level=ERROR)
raise HAIncompleteConfig

View File

@ -1,15 +1,12 @@
import glob
import re
import subprocess
import sys
from functools import partial
from charmhelpers.core.hookenv import unit_get
from charmhelpers.fetch import apt_install
from charmhelpers.core.hookenv import (
WARNING,
ERROR,
log
)
@ -34,31 +31,28 @@ def _validate_cidr(network):
network)
def no_ip_found_error_out(network):
errmsg = ("No IP address found in network: %s" % network)
raise ValueError(errmsg)
def get_address_in_network(network, fallback=None, fatal=False):
"""
Get an IPv4 or IPv6 address within the network from the host.
"""Get an IPv4 or IPv6 address within the network from the host.
:param network (str): CIDR presentation format. For example,
'192.168.1.0/24'.
:param fallback (str): If no address is found, return fallback.
:param fatal (boolean): If no address is found, fallback is not
set and fatal is True then exit(1).
"""
def not_found_error_out():
log("No IP address found in network: %s" % network,
level=ERROR)
sys.exit(1)
if network is None:
if fallback is not None:
return fallback
if fatal:
no_ip_found_error_out(network)
else:
if fatal:
not_found_error_out()
else:
return None
return None
_validate_cidr(network)
network = netaddr.IPNetwork(network)
@ -70,6 +64,7 @@ def get_address_in_network(network, fallback=None, fatal=False):
cidr = netaddr.IPNetwork("%s/%s" % (addr, netmask))
if cidr in network:
return str(cidr.ip)
if network.version == 6 and netifaces.AF_INET6 in addresses:
for addr in addresses[netifaces.AF_INET6]:
if not addr['addr'].startswith('fe80'):
@ -82,20 +77,20 @@ def get_address_in_network(network, fallback=None, fatal=False):
return fallback
if fatal:
not_found_error_out()
no_ip_found_error_out(network)
return None
def is_ipv6(address):
'''Determine whether provided address is IPv6 or not'''
"""Determine whether provided address is IPv6 or not."""
try:
address = netaddr.IPAddress(address)
except netaddr.AddrFormatError:
# probably a hostname - so not an address at all!
return False
else:
return address.version == 6
return address.version == 6
def is_address_in_network(network, address):
@ -113,11 +108,13 @@ def is_address_in_network(network, address):
except (netaddr.core.AddrFormatError, ValueError):
raise ValueError("Network (%s) is not in CIDR presentation format" %
network)
try:
address = netaddr.IPAddress(address)
except (netaddr.core.AddrFormatError, ValueError):
raise ValueError("Address (%s) is not in correct presentation format" %
address)
if address in network:
return True
else:
@ -147,6 +144,7 @@ def _get_for_address(address, key):
return iface
else:
return addresses[netifaces.AF_INET][0][key]
if address.version == 6 and netifaces.AF_INET6 in addresses:
for addr in addresses[netifaces.AF_INET6]:
if not addr['addr'].startswith('fe80'):
@ -160,41 +158,42 @@ def _get_for_address(address, key):
return str(cidr).split('/')[1]
else:
return addr[key]
return None
get_iface_for_address = partial(_get_for_address, key='iface')
get_netmask_for_address = partial(_get_for_address, key='netmask')
def format_ipv6_addr(address):
"""
IPv6 needs to be wrapped with [] in url link to parse correctly.
"""If address is IPv6, wrap it in '[]' otherwise return None.
This is required by most configuration files when specifying IPv6
addresses.
"""
if is_ipv6(address):
address = "[%s]" % address
else:
log("Not a valid ipv6 address: %s" % address, level=WARNING)
address = None
return "[%s]" % address
return address
return None
def get_iface_addr(iface='eth0', inet_type='AF_INET', inc_aliases=False,
fatal=True, exc_list=None):
"""
Return the assigned IP address for a given interface, if any, or [].
"""
"""Return the assigned IP address for a given interface, if any."""
# Extract nic if passed /dev/ethX
if '/' in iface:
iface = iface.split('/')[-1]
if not exc_list:
exc_list = []
try:
inet_num = getattr(netifaces, inet_type)
except AttributeError:
raise Exception('Unknown inet type ' + str(inet_type))
raise Exception("Unknown inet type '%s'" % str(inet_type))
interfaces = netifaces.interfaces()
if inc_aliases:
@ -202,15 +201,18 @@ def get_iface_addr(iface='eth0', inet_type='AF_INET', inc_aliases=False,
for _iface in interfaces:
if iface == _iface or _iface.split(':')[0] == iface:
ifaces.append(_iface)
if fatal and not ifaces:
raise Exception("Invalid interface '%s'" % iface)
ifaces.sort()
else:
if iface not in interfaces:
if fatal:
raise Exception("%s not found " % (iface))
raise Exception("Interface '%s' not found " % (iface))
else:
return []
else:
ifaces = [iface]
@ -221,10 +223,13 @@ def get_iface_addr(iface='eth0', inet_type='AF_INET', inc_aliases=False,
for entry in net_info[inet_num]:
if 'addr' in entry and entry['addr'] not in exc_list:
addresses.append(entry['addr'])
if fatal and not addresses:
raise Exception("Interface '%s' doesn't have any %s addresses." %
(iface, inet_type))
return addresses
return sorted(addresses)
get_ipv4_addr = partial(get_iface_addr, inet_type='AF_INET')
@ -241,6 +246,7 @@ def get_iface_from_addr(addr):
raw = re.match(ll_key, _addr)
if raw:
_addr = raw.group(1)
if _addr == addr:
log("Address '%s' is configured on iface '%s'" %
(addr, iface))
@ -251,8 +257,9 @@ def get_iface_from_addr(addr):
def sniff_iface(f):
"""If no iface provided, inject net iface inferred from unit private
address.
"""Ensure decorated function is called with a value for iface.
If no iface provided, inject net iface inferred from unit private address.
"""
def iface_sniffer(*args, **kwargs):
if not kwargs.get('iface', None):
@ -295,7 +302,7 @@ def get_ipv6_addr(iface=None, inc_aliases=False, fatal=True, exc_list=None,
if global_addrs:
# Make sure any found global addresses are not temporary
cmd = ['ip', 'addr', 'show', iface]
out = subprocess.check_output(cmd)
out = subprocess.check_output(cmd).decode('UTF-8')
if dynamic_only:
key = re.compile("inet6 (.+)/[0-9]+ scope global dynamic.*")
else:
@ -317,33 +324,28 @@ def get_ipv6_addr(iface=None, inc_aliases=False, fatal=True, exc_list=None,
return addrs
if fatal:
raise Exception("Interface '%s' doesn't have a scope global "
raise Exception("Interface '%s' does not have a scope global "
"non-temporary ipv6 address." % iface)
return []
def get_bridges(vnic_dir='/sys/devices/virtual/net'):
"""
Return a list of bridges on the system or []
"""
b_rgex = vnic_dir + '/*/bridge'
return [x.replace(vnic_dir, '').split('/')[1] for x in glob.glob(b_rgex)]
"""Return a list of bridges on the system."""
b_regex = "%s/*/bridge" % vnic_dir
return [x.replace(vnic_dir, '').split('/')[1] for x in glob.glob(b_regex)]
def get_bridge_nics(bridge, vnic_dir='/sys/devices/virtual/net'):
"""
Return a list of nics comprising a given bridge on the system or []
"""
brif_rgex = "%s/%s/brif/*" % (vnic_dir, bridge)
return [x.split('/')[-1] for x in glob.glob(brif_rgex)]
"""Return a list of nics comprising a given bridge on the system."""
brif_regex = "%s/%s/brif/*" % (vnic_dir, bridge)
return [x.split('/')[-1] for x in glob.glob(brif_regex)]
def is_bridge_member(nic):
"""
Check if a given nic is a member of a bridge
"""
"""Check if a given nic is a member of a bridge."""
for bridge in get_bridges():
if nic in get_bridge_nics(bridge):
return True
return False

View File

@ -1,3 +1,4 @@
import six
from charmhelpers.contrib.amulet.deployment import (
AmuletDeployment
)
@ -69,7 +70,7 @@ class OpenStackAmuletDeployment(AmuletDeployment):
def _configure_services(self, configs):
"""Configure all of the services."""
for service, config in configs.iteritems():
for service, config in six.iteritems(configs):
self.d.configure(service, config)
def _get_openstack_release(self):

View File

@ -7,6 +7,8 @@ import glanceclient.v1.client as glance_client
import keystoneclient.v2_0 as keystone_client
import novaclient.v1_1.client as nova_client
import six
from charmhelpers.contrib.amulet.utils import (
AmuletUtils
)
@ -60,7 +62,7 @@ class OpenStackAmuletUtils(AmuletUtils):
expected service catalog endpoints.
"""
self.log.debug('actual: {}'.format(repr(actual)))
for k, v in expected.iteritems():
for k, v in six.iteritems(expected):
if k in actual:
ret = self._validate_dict_data(expected[k][0], actual[k][0])
if ret:

File diff suppressed because it is too large Load Diff

View File

@ -2,21 +2,19 @@ from charmhelpers.core.hookenv import (
config,
unit_get,
)
from charmhelpers.contrib.network.ip import (
get_address_in_network,
is_address_in_network,
is_ipv6,
get_ipv6_addr,
)
from charmhelpers.contrib.hahelpers.cluster import is_clustered
PUBLIC = 'public'
INTERNAL = 'int'
ADMIN = 'admin'
_address_map = {
ADDRESS_MAP = {
PUBLIC: {
'config': 'os-public-network',
'fallback': 'public-address'
@ -33,16 +31,14 @@ _address_map = {
def canonical_url(configs, endpoint_type=PUBLIC):
'''
Returns the correct HTTP URL to this host given the state of HTTPS
"""Returns the correct HTTP URL to this host given the state of HTTPS
configuration, hacluster and charm configuration.
:configs OSTemplateRenderer: A config tempating object to inspect for
a complete https context.
:endpoint_type str: The endpoint type to resolve.
:returns str: Base URL for services on the current service unit.
'''
:param configs: OSTemplateRenderer config templating object to inspect
for a complete https context.
:param endpoint_type: str endpoint type to resolve.
:param returns: str base URL for services on the current service unit.
"""
scheme = 'http'
if 'https' in configs.complete_contexts():
scheme = 'https'
@ -53,27 +49,45 @@ def canonical_url(configs, endpoint_type=PUBLIC):
def resolve_address(endpoint_type=PUBLIC):
"""Return unit address depending on net config.
If unit is clustered with vip(s) and has net splits defined, return vip on
correct network. If clustered with no nets defined, return primary vip.
If not clustered, return unit address ensuring address is on configured net
split if one is configured.
:param endpoint_type: Network endpoing type
"""
resolved_address = None
if is_clustered():
if config(_address_map[endpoint_type]['config']) is None:
# Assume vip is simple and pass back directly
resolved_address = config('vip')
vips = config('vip')
if vips:
vips = vips.split()
net_type = ADDRESS_MAP[endpoint_type]['config']
net_addr = config(net_type)
net_fallback = ADDRESS_MAP[endpoint_type]['fallback']
clustered = is_clustered()
if clustered:
if not net_addr:
# If no net-splits defined, we expect a single vip
resolved_address = vips[0]
else:
for vip in config('vip').split():
if is_address_in_network(
config(_address_map[endpoint_type]['config']),
vip):
for vip in vips:
if is_address_in_network(net_addr, vip):
resolved_address = vip
break
else:
if config('prefer-ipv6'):
fallback_addr = get_ipv6_addr(exc_list=[config('vip')])[0]
fallback_addr = get_ipv6_addr(exc_list=vips)[0]
else:
fallback_addr = unit_get(_address_map[endpoint_type]['fallback'])
resolved_address = get_address_in_network(
config(_address_map[endpoint_type]['config']), fallback_addr)
fallback_addr = unit_get(net_fallback)
resolved_address = get_address_in_network(net_addr, fallback_addr)
if resolved_address is None:
raise ValueError('Unable to resolve a suitable IP address'
' based on charm state and configuration')
else:
return resolved_address
raise ValueError("Unable to resolve a suitable IP address based on "
"charm state and configuration. (net_type=%s, "
"clustered=%s)" % (net_type, clustered))
return resolved_address

View File

@ -14,7 +14,7 @@ from charmhelpers.contrib.openstack.utils import os_release
def headers_package():
"""Ensures correct linux-headers for running kernel are installed,
for building DKMS package"""
kver = check_output(['uname', '-r']).strip()
kver = check_output(['uname', '-r']).decode('UTF-8').strip()
return 'linux-headers-%s' % kver
QUANTUM_CONF_DIR = '/etc/quantum'
@ -22,7 +22,7 @@ QUANTUM_CONF_DIR = '/etc/quantum'
def kernel_version():
""" Retrieve the current major kernel version as a tuple e.g. (3, 13) """
kver = check_output(['uname', '-r']).strip()
kver = check_output(['uname', '-r']).decode('UTF-8').strip()
kver = kver.split('.')
return (int(kver[0]), int(kver[1]))
@ -138,10 +138,31 @@ def neutron_plugins():
relation_prefix='neutron',
ssl_dir=NEUTRON_CONF_DIR)],
'services': [],
'packages': [['neutron-plugin-cisco']],
'packages': [[headers_package()] + determine_dkms_package(),
['neutron-plugin-cisco']],
'server_packages': ['neutron-server',
'neutron-plugin-cisco'],
'server_services': ['neutron-server']
},
'Calico': {
'config': '/etc/neutron/plugins/ml2/ml2_conf.ini',
'driver': 'neutron.plugins.ml2.plugin.Ml2Plugin',
'contexts': [
context.SharedDBContext(user=config('neutron-database-user'),
database=config('neutron-database'),
relation_prefix='neutron',
ssl_dir=NEUTRON_CONF_DIR)],
'services': ['calico-felix',
'bird',
'neutron-dhcp-agent',
'nova-api-metadata'],
'packages': [[headers_package()] + determine_dkms_package(),
['calico-compute',
'bird',
'neutron-dhcp-agent',
'nova-api-metadata']],
'server_packages': ['neutron-server', 'calico-control'],
'server_services': ['neutron-server']
}
}
if release >= 'icehouse':
@ -162,7 +183,8 @@ def neutron_plugin_attribute(plugin, attr, net_manager=None):
elif manager == 'neutron':
plugins = neutron_plugins()
else:
log('Error: Network manager does not support plugins.')
log("Network manager '%s' does not support plugins." % (manager),
level=ERROR)
raise Exception
try:

View File

@ -35,10 +35,12 @@ listen stats {{ stat_port }}
stats auth admin:password
{% if frontends -%}
{% for service, ports in service_ports.iteritems() -%}
{% for service, ports in service_ports.items() -%}
frontend tcp-in_{{ service }}
bind *:{{ ports[0] }}
{% if ipv6 -%}
bind :::{{ ports[0] }}
{% endif -%}
{% for frontend in frontends -%}
acl net_{{ frontend }} dst {{ frontends[frontend]['network'] }}
use_backend {{ service }}_{{ frontend }} if net_{{ frontend }}
@ -46,7 +48,7 @@ frontend tcp-in_{{ service }}
{% for frontend in frontends -%}
backend {{ service }}_{{ frontend }}
balance leastconn
{% for unit, address in frontends[frontend]['backends'].iteritems() -%}
{% for unit, address in frontends[frontend]['backends'].items() -%}
server {{ unit }} {{ address }}:{{ ports[1] }} check
{% endfor %}
{% endfor -%}

View File

@ -1,13 +1,13 @@
import os
from charmhelpers.fetch import apt_install
import six
from charmhelpers.fetch import apt_install
from charmhelpers.core.hookenv import (
log,
ERROR,
INFO
)
from charmhelpers.contrib.openstack.utils import OPENSTACK_CODENAMES
try:
@ -43,7 +43,7 @@ def get_loader(templates_dir, os_release):
order by OpenStack release.
"""
tmpl_dirs = [(rel, os.path.join(templates_dir, rel))
for rel in OPENSTACK_CODENAMES.itervalues()]
for rel in six.itervalues(OPENSTACK_CODENAMES)]
if not os.path.isdir(templates_dir):
log('Templates directory not found @ %s.' % templates_dir,
@ -258,7 +258,7 @@ class OSConfigRenderer(object):
"""
Write out all registered config files.
"""
[self.write(k) for k in self.templates.iterkeys()]
[self.write(k) for k in six.iterkeys(self.templates)]
def set_release(self, openstack_release):
"""
@ -275,5 +275,5 @@ class OSConfigRenderer(object):
'''
interfaces = []
[interfaces.extend(i.complete_contexts())
for i in self.templates.itervalues()]
for i in six.itervalues(self.templates)]
return interfaces

View File

@ -2,6 +2,7 @@
# Common python helper functions used for OpenStack charms.
from collections import OrderedDict
from functools import wraps
import subprocess
import json
@ -9,11 +10,13 @@ import os
import socket
import sys
import six
import yaml
from charmhelpers.core.hookenv import (
config,
log as juju_log,
charm_dir,
ERROR,
INFO,
relation_ids,
relation_set
@ -30,7 +33,8 @@ from charmhelpers.contrib.network.ip import (
)
from charmhelpers.core.host import lsb_release, mounts, umount
from charmhelpers.fetch import apt_install, apt_cache
from charmhelpers.fetch import apt_install, apt_cache, install_remote
from charmhelpers.contrib.python.packages import pip_install
from charmhelpers.contrib.storage.linux.utils import is_block_device, zap_disk
from charmhelpers.contrib.storage.linux.loopback import ensure_loopback_device
@ -112,7 +116,7 @@ def get_os_codename_install_source(src):
# Best guess match based on deb string provided
if src.startswith('deb') or src.startswith('ppa'):
for k, v in OPENSTACK_CODENAMES.iteritems():
for k, v in six.iteritems(OPENSTACK_CODENAMES):
if v in src:
return v
@ -133,7 +137,7 @@ def get_os_codename_version(vers):
def get_os_version_codename(codename):
'''Determine OpenStack version number from codename.'''
for k, v in OPENSTACK_CODENAMES.iteritems():
for k, v in six.iteritems(OPENSTACK_CODENAMES):
if v == codename:
return k
e = 'Could not derive OpenStack version for '\
@ -193,7 +197,7 @@ def get_os_version_package(pkg, fatal=True):
else:
vers_map = OPENSTACK_CODENAMES
for version, cname in vers_map.iteritems():
for version, cname in six.iteritems(vers_map):
if cname == codename:
return version
# e = "Could not determine OpenStack version for package: %s" % pkg
@ -317,7 +321,7 @@ def save_script_rc(script_path="scripts/scriptrc", **env_vars):
rc_script.write(
"#!/bin/bash\n")
[rc_script.write('export %s=%s\n' % (u, p))
for u, p in env_vars.iteritems() if u != "script_path"]
for u, p in six.iteritems(env_vars) if u != "script_path"]
def openstack_upgrade_available(package):
@ -350,8 +354,8 @@ def ensure_block_device(block_device):
'''
_none = ['None', 'none', None]
if (block_device in _none):
error_out('prepare_storage(): Missing required input: '
'block_device=%s.' % block_device, level=ERROR)
error_out('prepare_storage(): Missing required input: block_device=%s.'
% block_device)
if block_device.startswith('/dev/'):
bdev = block_device
@ -367,8 +371,7 @@ def ensure_block_device(block_device):
bdev = '/dev/%s' % block_device
if not is_block_device(bdev):
error_out('Failed to locate valid block device at %s' % bdev,
level=ERROR)
error_out('Failed to locate valid block device at %s' % bdev)
return bdev
@ -417,7 +420,7 @@ def ns_query(address):
if isinstance(address, dns.name.Name):
rtype = 'PTR'
elif isinstance(address, basestring):
elif isinstance(address, six.string_types):
rtype = 'A'
else:
return None
@ -468,6 +471,14 @@ def get_hostname(address, fqdn=True):
return result.split('.')[0]
def get_matchmaker_map(mm_file='/etc/oslo/matchmaker_ring.json'):
mm_map = {}
if os.path.isfile(mm_file):
with open(mm_file, 'r') as f:
mm_map = json.load(f)
return mm_map
def sync_db_with_multi_ipv6_addresses(database, database_user,
relation_prefix=None):
hosts = get_ipv6_addr(dynamic_only=False)
@ -477,10 +488,132 @@ def sync_db_with_multi_ipv6_addresses(database, database_user,
'hostname': json.dumps(hosts)}
if relation_prefix:
keys = kwargs.keys()
for key in keys:
for key in list(kwargs.keys()):
kwargs["%s_%s" % (relation_prefix, key)] = kwargs[key]
del kwargs[key]
for rid in relation_ids('shared-db'):
relation_set(relation_id=rid, **kwargs)
def os_requires_version(ostack_release, pkg):
"""
Decorator for hook to specify minimum supported release
"""
def wrap(f):
@wraps(f)
def wrapped_f(*args):
if os_release(pkg) < ostack_release:
raise Exception("This hook is not supported on releases"
" before %s" % ostack_release)
f(*args)
return wrapped_f
return wrap
def git_install_requested():
"""Returns true if openstack-origin-git is specified."""
return config('openstack-origin-git') != "None"
requirements_dir = None
def git_clone_and_install(file_name, core_project):
"""Clone/install all OpenStack repos specified in yaml config file."""
global requirements_dir
if file_name == "None":
return
yaml_file = os.path.join(charm_dir(), file_name)
# clone/install the requirements project first
installed = _git_clone_and_install_subset(yaml_file,
whitelist=['requirements'])
if 'requirements' not in installed:
error_out('requirements git repository must be specified')
# clone/install all other projects except requirements and the core project
blacklist = ['requirements', core_project]
_git_clone_and_install_subset(yaml_file, blacklist=blacklist,
update_requirements=True)
# clone/install the core project
whitelist = [core_project]
installed = _git_clone_and_install_subset(yaml_file, whitelist=whitelist,
update_requirements=True)
if core_project not in installed:
error_out('{} git repository must be specified'.format(core_project))
def _git_clone_and_install_subset(yaml_file, whitelist=[], blacklist=[],
update_requirements=False):
"""Clone/install subset of OpenStack repos specified in yaml config file."""
global requirements_dir
installed = []
with open(yaml_file, 'r') as fd:
projects = yaml.load(fd)
for proj, val in projects.items():
# The project subset is chosen based on the following 3 rules:
# 1) If project is in blacklist, we don't clone/install it, period.
# 2) If whitelist is empty, we clone/install everything else.
# 3) If whitelist is not empty, we clone/install everything in the
# whitelist.
if proj in blacklist:
continue
if whitelist and proj not in whitelist:
continue
repo = val['repository']
branch = val['branch']
repo_dir = _git_clone_and_install_single(repo, branch,
update_requirements)
if proj == 'requirements':
requirements_dir = repo_dir
installed.append(proj)
return installed
def _git_clone_and_install_single(repo, branch, update_requirements=False):
"""Clone and install a single git repository."""
dest_parent_dir = "/mnt/openstack-git/"
dest_dir = os.path.join(dest_parent_dir, os.path.basename(repo))
if not os.path.exists(dest_parent_dir):
juju_log('Host dir not mounted at {}. '
'Creating directory there instead.'.format(dest_parent_dir))
os.mkdir(dest_parent_dir)
if not os.path.exists(dest_dir):
juju_log('Cloning git repo: {}, branch: {}'.format(repo, branch))
repo_dir = install_remote(repo, dest=dest_parent_dir, branch=branch)
else:
repo_dir = dest_dir
if update_requirements:
if not requirements_dir:
error_out('requirements repo must be cloned before '
'updating from global requirements.')
_git_update_requirements(repo_dir, requirements_dir)
juju_log('Installing git repo from dir: {}'.format(repo_dir))
pip_install(repo_dir)
return repo_dir
def _git_update_requirements(package_dir, reqs_dir):
"""Update from global requirements.
Update an OpenStack git directory's requirements.txt and
test-requirements.txt from global-requirements.txt."""
orig_dir = os.getcwd()
os.chdir(reqs_dir)
cmd = "python update.py {}".format(package_dir)
try:
subprocess.check_call(cmd.split(' '))
except subprocess.CalledProcessError:
package = os.path.basename(package_dir)
error_out("Error updating {} from global-requirements.txt".format(package))
os.chdir(orig_dir)

View File

@ -1,3 +1,4 @@
import six
from charmhelpers.core.hookenv import relation_id as current_relation_id
from charmhelpers.core.hookenv import (
is_relation_made,
@ -93,7 +94,7 @@ def peer_echo(includes=None):
if ex in echo_data:
echo_data.pop(ex)
else:
for attribute, value in rdata.iteritems():
for attribute, value in six.iteritems(rdata):
for include in includes:
if include in attribute:
echo_data[attribute] = value
@ -119,8 +120,8 @@ def peer_store_and_set(relation_id=None, peer_relation_name='cluster',
relation_settings=relation_settings,
**kwargs)
if is_relation_made(peer_relation_name):
for key, value in dict(kwargs.items() +
relation_settings.items()).iteritems():
for key, value in six.iteritems(dict(list(kwargs.items()) +
list(relation_settings.items()))):
key_prefix = relation_id or current_relation_id()
peer_store(key_prefix + delimiter + key,
value,

View File

@ -0,0 +1,77 @@
#!/usr/bin/env python
# coding: utf-8
__author__ = "Jorge Niedbalski <jorge.niedbalski@canonical.com>"
from charmhelpers.fetch import apt_install, apt_update
from charmhelpers.core.hookenv import log
try:
from pip import main as pip_execute
except ImportError:
apt_update()
apt_install('python-pip')
from pip import main as pip_execute
def parse_options(given, available):
"""Given a set of options, check if available"""
for key, value in sorted(given.items()):
if key in available:
yield "--{0}={1}".format(key, value)
def pip_install_requirements(requirements, **options):
"""Install a requirements file """
command = ["install"]
available_options = ('proxy', 'src', 'log', )
for option in parse_options(options, available_options):
command.append(option)
command.append("-r {0}".format(requirements))
log("Installing from file: {} with options: {}".format(requirements,
command))
pip_execute(command)
def pip_install(package, fatal=False, **options):
"""Install a python package"""
command = ["install"]
available_options = ('proxy', 'src', 'log', "index-url", )
for option in parse_options(options, available_options):
command.append(option)
if isinstance(package, list):
command.extend(package)
else:
command.append(package)
log("Installing {} package with options: {}".format(package,
command))
pip_execute(command)
def pip_uninstall(package, **options):
"""Uninstall a python package"""
command = ["uninstall", "-q", "-y"]
available_options = ('proxy', 'log', )
for option in parse_options(options, available_options):
command.append(option)
if isinstance(package, list):
command.extend(package)
else:
command.append(package)
log("Uninstalling {} package with options: {}".format(package,
command))
pip_execute(command)
def pip_list():
"""Returns the list of current python installed packages
"""
return pip_execute(["list"])

View File

@ -16,19 +16,18 @@ import time
from subprocess import (
check_call,
check_output,
CalledProcessError
CalledProcessError,
)
from charmhelpers.core.hookenv import (
relation_get,
relation_ids,
related_units,
log,
DEBUG,
INFO,
WARNING,
ERROR
ERROR,
)
from charmhelpers.core.host import (
mount,
mounts,
@ -37,7 +36,6 @@ from charmhelpers.core.host import (
service_running,
umount,
)
from charmhelpers.fetch import (
apt_install,
)
@ -56,99 +54,85 @@ CEPH_CONF = """[global]
def install():
''' Basic Ceph client installation '''
"""Basic Ceph client installation."""
ceph_dir = "/etc/ceph"
if not os.path.exists(ceph_dir):
os.mkdir(ceph_dir)
apt_install('ceph-common', fatal=True)
def rbd_exists(service, pool, rbd_img):
''' Check to see if a RADOS block device exists '''
"""Check to see if a RADOS block device exists."""
try:
out = check_output(['rbd', 'list', '--id', service,
'--pool', pool])
out = check_output(['rbd', 'list', '--id',
service, '--pool', pool]).decode('UTF-8')
except CalledProcessError:
return False
else:
return rbd_img in out
return rbd_img in out
def create_rbd_image(service, pool, image, sizemb):
''' Create a new RADOS block device '''
cmd = [
'rbd',
'create',
image,
'--size',
str(sizemb),
'--id',
service,
'--pool',
pool
]
"""Create a new RADOS block device."""
cmd = ['rbd', 'create', image, '--size', str(sizemb), '--id', service,
'--pool', pool]
check_call(cmd)
def pool_exists(service, name):
''' Check to see if a RADOS pool already exists '''
"""Check to see if a RADOS pool already exists."""
try:
out = check_output(['rados', '--id', service, 'lspools'])
out = check_output(['rados', '--id', service,
'lspools']).decode('UTF-8')
except CalledProcessError:
return False
else:
return name in out
return name in out
def get_osds(service):
'''
Return a list of all Ceph Object Storage Daemons
currently in the cluster
'''
"""Return a list of all Ceph Object Storage Daemons currently in the
cluster.
"""
version = ceph_version()
if version and version >= '0.56':
return json.loads(check_output(['ceph', '--id', service,
'osd', 'ls', '--format=json']))
else:
return None
'osd', 'ls',
'--format=json']).decode('UTF-8'))
return None
def create_pool(service, name, replicas=2):
''' Create a new RADOS pool '''
def create_pool(service, name, replicas=3):
"""Create a new RADOS pool."""
if pool_exists(service, name):
log("Ceph pool {} already exists, skipping creation".format(name),
level=WARNING)
return
# Calculate the number of placement groups based
# on upstream recommended best practices.
osds = get_osds(service)
if osds:
pgnum = (len(osds) * 100 / replicas)
pgnum = (len(osds) * 100 // replicas)
else:
# NOTE(james-page): Default to 200 for older ceph versions
# which don't support OSD query from cli
pgnum = 200
cmd = [
'ceph', '--id', service,
'osd', 'pool', 'create',
name, str(pgnum)
]
cmd = ['ceph', '--id', service, 'osd', 'pool', 'create', name, str(pgnum)]
check_call(cmd)
cmd = [
'ceph', '--id', service,
'osd', 'pool', 'set', name,
'size', str(replicas)
]
cmd = ['ceph', '--id', service, 'osd', 'pool', 'set', name, 'size',
str(replicas)]
check_call(cmd)
def delete_pool(service, name):
''' Delete a RADOS pool from ceph '''
cmd = [
'ceph', '--id', service,
'osd', 'pool', 'delete',
name, '--yes-i-really-really-mean-it'
]
"""Delete a RADOS pool from ceph."""
cmd = ['ceph', '--id', service, 'osd', 'pool', 'delete', name,
'--yes-i-really-really-mean-it']
check_call(cmd)
@ -161,44 +145,43 @@ def _keyring_path(service):
def create_keyring(service, key):
''' Create a new Ceph keyring containing key'''
"""Create a new Ceph keyring containing key."""
keyring = _keyring_path(service)
if os.path.exists(keyring):
log('ceph: Keyring exists at %s.' % keyring, level=WARNING)
log('Ceph keyring exists at %s.' % keyring, level=WARNING)
return
cmd = [
'ceph-authtool',
keyring,
'--create-keyring',
'--name=client.{}'.format(service),
'--add-key={}'.format(key)
]
cmd = ['ceph-authtool', keyring, '--create-keyring',
'--name=client.{}'.format(service), '--add-key={}'.format(key)]
check_call(cmd)
log('ceph: Created new ring at %s.' % keyring, level=INFO)
log('Created new ceph keyring at %s.' % keyring, level=DEBUG)
def create_key_file(service, key):
''' Create a file containing key '''
"""Create a file containing key."""
keyfile = _keyfile_path(service)
if os.path.exists(keyfile):
log('ceph: Keyfile exists at %s.' % keyfile, level=WARNING)
log('Keyfile exists at %s.' % keyfile, level=WARNING)
return
with open(keyfile, 'w') as fd:
fd.write(key)
log('ceph: Created new keyfile at %s.' % keyfile, level=INFO)
log('Created new keyfile at %s.' % keyfile, level=INFO)
def get_ceph_nodes():
''' Query named relation 'ceph' to detemine current nodes '''
"""Query named relation 'ceph' to determine current nodes."""
hosts = []
for r_id in relation_ids('ceph'):
for unit in related_units(r_id):
hosts.append(relation_get('private-address', unit=unit, rid=r_id))
return hosts
def configure(service, key, auth, use_syslog):
''' Perform basic configuration of Ceph '''
"""Perform basic configuration of Ceph."""
create_keyring(service, key)
create_key_file(service, key)
hosts = get_ceph_nodes()
@ -211,17 +194,17 @@ def configure(service, key, auth, use_syslog):
def image_mapped(name):
''' Determine whether a RADOS block device is mapped locally '''
"""Determine whether a RADOS block device is mapped locally."""
try:
out = check_output(['rbd', 'showmapped'])
out = check_output(['rbd', 'showmapped']).decode('UTF-8')
except CalledProcessError:
return False
else:
return name in out
return name in out
def map_block_storage(service, pool, image):
''' Map a RADOS block device for local use '''
"""Map a RADOS block device for local use."""
cmd = [
'rbd',
'map',
@ -235,31 +218,32 @@ def map_block_storage(service, pool, image):
def filesystem_mounted(fs):
''' Determine whether a filesytems is already mounted '''
"""Determine whether a filesytems is already mounted."""
return fs in [f for f, m in mounts()]
def make_filesystem(blk_device, fstype='ext4', timeout=10):
''' Make a new filesystem on the specified block device '''
"""Make a new filesystem on the specified block device."""
count = 0
e_noent = os.errno.ENOENT
while not os.path.exists(blk_device):
if count >= timeout:
log('ceph: gave up waiting on block device %s' % blk_device,
log('Gave up waiting on block device %s' % blk_device,
level=ERROR)
raise IOError(e_noent, os.strerror(e_noent), blk_device)
log('ceph: waiting for block device %s to appear' % blk_device,
level=INFO)
log('Waiting for block device %s to appear' % blk_device,
level=DEBUG)
count += 1
time.sleep(1)
else:
log('ceph: Formatting block device %s as filesystem %s.' %
log('Formatting block device %s as filesystem %s.' %
(blk_device, fstype), level=INFO)
check_call(['mkfs', '-t', fstype, blk_device])
def place_data_on_block_device(blk_device, data_src_dst):
''' Migrate data in data_src_dst to blk_device and then remount '''
"""Migrate data in data_src_dst to blk_device and then remount."""
# mount block device into /mnt
mount(blk_device, '/mnt')
# copy data to /mnt
@ -279,8 +263,8 @@ def place_data_on_block_device(blk_device, data_src_dst):
# TODO: re-use
def modprobe(module):
''' Load a kernel module and configure for auto-load on reboot '''
log('ceph: Loading kernel module', level=INFO)
"""Load a kernel module and configure for auto-load on reboot."""
log('Loading kernel module', level=INFO)
cmd = ['modprobe', module]
check_call(cmd)
with open('/etc/modules', 'r+') as modules:
@ -289,7 +273,7 @@ def modprobe(module):
def copy_files(src, dst, symlinks=False, ignore=None):
''' Copy files from src to dst '''
"""Copy files from src to dst."""
for item in os.listdir(src):
s = os.path.join(src, item)
d = os.path.join(dst, item)
@ -300,9 +284,9 @@ def copy_files(src, dst, symlinks=False, ignore=None):
def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
blk_device, fstype, system_services=[]):
"""
NOTE: This function must only be called from a single service unit for
blk_device, fstype, system_services=[],
replicas=3):
"""NOTE: This function must only be called from a single service unit for
the same rbd_img otherwise data loss will occur.
Ensures given pool and RBD image exists, is mapped to a block device,
@ -316,15 +300,16 @@ def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
"""
# Ensure pool, RBD image, RBD mappings are in place.
if not pool_exists(service, pool):
log('ceph: Creating new pool {}.'.format(pool))
create_pool(service, pool)
log('Creating new pool {}.'.format(pool), level=INFO)
create_pool(service, pool, replicas=replicas)
if not rbd_exists(service, pool, rbd_img):
log('ceph: Creating RBD image ({}).'.format(rbd_img))
log('Creating RBD image ({}).'.format(rbd_img), level=INFO)
create_rbd_image(service, pool, rbd_img, sizemb)
if not image_mapped(rbd_img):
log('ceph: Mapping RBD Image {} as a Block Device.'.format(rbd_img))
log('Mapping RBD Image {} as a Block Device.'.format(rbd_img),
level=INFO)
map_block_storage(service, pool, rbd_img)
# make file system
@ -339,45 +324,47 @@ def ensure_ceph_storage(service, pool, rbd_img, sizemb, mount_point,
for svc in system_services:
if service_running(svc):
log('ceph: Stopping services {} prior to migrating data.'
.format(svc))
log('Stopping services {} prior to migrating data.'
.format(svc), level=DEBUG)
service_stop(svc)
place_data_on_block_device(blk_device, mount_point)
for svc in system_services:
log('ceph: Starting service {} after migrating data.'
.format(svc))
log('Starting service {} after migrating data.'
.format(svc), level=DEBUG)
service_start(svc)
def ensure_ceph_keyring(service, user=None, group=None):
'''
Ensures a ceph keyring is created for a named service
and optionally ensures user and group ownership.
"""Ensures a ceph keyring is created for a named service and optionally
ensures user and group ownership.
Returns False if no ceph key is available in relation state.
'''
"""
key = None
for rid in relation_ids('ceph'):
for unit in related_units(rid):
key = relation_get('key', rid=rid, unit=unit)
if key:
break
if not key:
return False
create_keyring(service=service, key=key)
keyring = _keyring_path(service)
if user and group:
check_call(['chown', '%s.%s' % (user, group), keyring])
return True
def ceph_version():
''' Retrieve the local version of ceph '''
"""Retrieve the local version of ceph."""
if os.path.exists('/usr/bin/ceph'):
cmd = ['ceph', '-v']
output = check_output(cmd)
output = check_output(cmd).decode('US-ASCII')
output = output.split()
if len(output) > 3:
return output[2]
@ -385,3 +372,46 @@ def ceph_version():
return None
else:
return None
class CephBrokerRq(object):
"""Ceph broker request.
Multiple operations can be added to a request and sent to the Ceph broker
to be executed.
Request is json-encoded for sending over the wire.
The API is versioned and defaults to version 1.
"""
def __init__(self, api_version=1):
self.api_version = api_version
self.ops = []
def add_op_create_pool(self, name, replica_count=3):
self.ops.append({'op': 'create-pool', 'name': name,
'replicas': replica_count})
@property
def request(self):
return json.dumps({'api-version': self.api_version, 'ops': self.ops})
class CephBrokerRsp(object):
"""Ceph broker response.
Response is json-decoded and contents provided as methods/properties.
The API is versioned and defaults to version 1.
"""
def __init__(self, encoded_rsp):
self.api_version = None
self.rsp = json.loads(encoded_rsp)
@property
def exit_code(self):
return self.rsp.get('exit-code')
@property
def exit_msg(self):
return self.rsp.get('stderr')

View File

@ -1,12 +1,12 @@
import os
import re
from subprocess import (
check_call,
check_output,
)
import six
##################################################
# loopback device helpers.
@ -37,7 +37,7 @@ def create_loopback(file_path):
'''
file_path = os.path.abspath(file_path)
check_call(['losetup', '--find', file_path])
for d, f in loopback_devices().iteritems():
for d, f in six.iteritems(loopback_devices()):
if f == file_path:
return d
@ -51,7 +51,7 @@ def ensure_loopback_device(path, size):
:returns: str: Full path to the ensured loopback device (eg, /dev/loop0)
'''
for d, f in loopback_devices().iteritems():
for d, f in six.iteritems(loopback_devices()):
if f == path:
return d

View File

@ -61,6 +61,7 @@ def list_lvm_volume_group(block_device):
vg = None
pvd = check_output(['pvdisplay', block_device]).splitlines()
for l in pvd:
l = l.decode('UTF-8')
if l.strip().startswith('VG Name'):
vg = ' '.join(l.strip().split()[2:])
return vg

View File

@ -30,7 +30,8 @@ def zap_disk(block_device):
# sometimes sgdisk exits non-zero; this is OK, dd will clean up
call(['sgdisk', '--zap-all', '--mbrtogpt',
'--clear', block_device])
dev_end = check_output(['blockdev', '--getsz', block_device])
dev_end = check_output(['blockdev', '--getsz',
block_device]).decode('UTF-8')
gpt_end = int(dev_end.split()[0]) - 100
check_call(['dd', 'if=/dev/zero', 'of=%s' % (block_device),
'bs=1M', 'count=1'])
@ -47,7 +48,7 @@ def is_device_mounted(device):
it doesn't.
'''
is_partition = bool(re.search(r".*[0-9]+\b", device))
out = check_output(['mount'])
out = check_output(['mount']).decode('UTF-8')
if is_partition:
return bool(re.search(device + r"\b", out))
return bool(re.search(device + r"[0-9]+\b", out))

View File

@ -0,0 +1,41 @@
#
# Copyright 2014 Canonical Ltd.
#
# Authors:
# Edward Hope-Morley <opentastic@gmail.com>
#
import time
from charmhelpers.core.hookenv import (
log,
INFO,
)
def retry_on_exception(num_retries, base_delay=0, exc_type=Exception):
"""If the decorated function raises exception exc_type, allow num_retries
retry attempts before raise the exception.
"""
def _retry_on_exception_inner_1(f):
def _retry_on_exception_inner_2(*args, **kwargs):
retries = num_retries
multiplier = 1
while True:
try:
return f(*args, **kwargs)
except exc_type:
if not retries:
raise
delay = base_delay * multiplier
multiplier += 1
log("Retrying '%s' %d more times (delay=%s)" %
(f.__name__, retries, delay), level=INFO)
retries -= 1
if delay:
time.sleep(delay)
return _retry_on_exception_inner_2
return _retry_on_exception_inner_1

View File

@ -3,10 +3,11 @@
__author__ = 'Jorge Niedbalski R. <jorge.niedbalski@canonical.com>'
import io
import os
class Fstab(file):
class Fstab(io.FileIO):
"""This class extends file in order to implement a file reader/writer
for file `/etc/fstab`
"""
@ -24,8 +25,8 @@ class Fstab(file):
options = "defaults"
self.options = options
self.d = d
self.p = p
self.d = int(d)
self.p = int(p)
def __eq__(self, o):
return str(self) == str(o)
@ -45,7 +46,7 @@ class Fstab(file):
self._path = path
else:
self._path = self.DEFAULT_PATH
file.__init__(self, self._path, 'r+')
super(Fstab, self).__init__(self._path, 'rb+')
def _hydrate_entry(self, line):
# NOTE: use split with no arguments to split on any
@ -58,8 +59,9 @@ class Fstab(file):
def entries(self):
self.seek(0)
for line in self.readlines():
line = line.decode('us-ascii')
try:
if not line.startswith("#"):
if line.strip() and not line.startswith("#"):
yield self._hydrate_entry(line)
except ValueError:
pass
@ -75,14 +77,14 @@ class Fstab(file):
if self.get_entry_by_attr('device', entry.device):
return False
self.write(str(entry) + '\n')
self.write((str(entry) + '\n').encode('us-ascii'))
self.truncate()
return entry
def remove_entry(self, entry):
self.seek(0)
lines = self.readlines()
lines = [l.decode('us-ascii') for l in self.readlines()]
found = False
for index, line in enumerate(lines):
@ -97,7 +99,7 @@ class Fstab(file):
lines.remove(line)
self.seek(0)
self.write(''.join(lines))
self.write(''.join(lines).encode('us-ascii'))
self.truncate()
return True

View File

@ -9,9 +9,14 @@ import json
import yaml
import subprocess
import sys
import UserDict
from subprocess import CalledProcessError
import six
if not six.PY3:
from UserDict import UserDict
else:
from collections import UserDict
CRITICAL = "CRITICAL"
ERROR = "ERROR"
WARNING = "WARNING"
@ -63,16 +68,18 @@ def log(message, level=None):
command = ['juju-log']
if level:
command += ['-l', level]
if not isinstance(message, six.string_types):
message = repr(message)
command += [message]
subprocess.call(command)
class Serializable(UserDict.IterableUserDict):
class Serializable(UserDict):
"""Wrapper, an object that can be serialized to yaml or json"""
def __init__(self, obj):
# wrap the object
UserDict.IterableUserDict.__init__(self)
UserDict.__init__(self)
self.data = obj
def __getattr__(self, attr):
@ -214,6 +221,12 @@ class Config(dict):
except KeyError:
return (self._prev_dict or {})[key]
def keys(self):
prev_keys = []
if self._prev_dict is not None:
prev_keys = self._prev_dict.keys()
return list(set(prev_keys + list(dict.keys(self))))
def load_previous(self, path=None):
"""Load previous copy of config from disk.
@ -263,7 +276,7 @@ class Config(dict):
"""
if self._prev_dict:
for k, v in self._prev_dict.iteritems():
for k, v in six.iteritems(self._prev_dict):
if k not in self:
self[k] = v
with open(self.path, 'w') as f:
@ -278,7 +291,8 @@ def config(scope=None):
config_cmd_line.append(scope)
config_cmd_line.append('--format=json')
try:
config_data = json.loads(subprocess.check_output(config_cmd_line))
config_data = json.loads(
subprocess.check_output(config_cmd_line).decode('UTF-8'))
if scope is not None:
return config_data
return Config(config_data)
@ -297,10 +311,10 @@ def relation_get(attribute=None, unit=None, rid=None):
if unit:
_args.append(unit)
try:
return json.loads(subprocess.check_output(_args))
return json.loads(subprocess.check_output(_args).decode('UTF-8'))
except ValueError:
return None
except CalledProcessError, e:
except CalledProcessError as e:
if e.returncode == 2:
return None
raise
@ -312,7 +326,7 @@ def relation_set(relation_id=None, relation_settings=None, **kwargs):
relation_cmd_line = ['relation-set']
if relation_id is not None:
relation_cmd_line.extend(('-r', relation_id))
for k, v in (relation_settings.items() + kwargs.items()):
for k, v in (list(relation_settings.items()) + list(kwargs.items())):
if v is None:
relation_cmd_line.append('{}='.format(k))
else:
@ -329,7 +343,8 @@ def relation_ids(reltype=None):
relid_cmd_line = ['relation-ids', '--format=json']
if reltype is not None:
relid_cmd_line.append(reltype)
return json.loads(subprocess.check_output(relid_cmd_line)) or []
return json.loads(
subprocess.check_output(relid_cmd_line).decode('UTF-8')) or []
return []
@ -340,7 +355,8 @@ def related_units(relid=None):
units_cmd_line = ['relation-list', '--format=json']
if relid is not None:
units_cmd_line.extend(('-r', relid))
return json.loads(subprocess.check_output(units_cmd_line)) or []
return json.loads(
subprocess.check_output(units_cmd_line).decode('UTF-8')) or []
@cached
@ -379,21 +395,31 @@ def relations_of_type(reltype=None):
return relation_data
@cached
def metadata():
"""Get the current charm metadata.yaml contents as a python object"""
with open(os.path.join(charm_dir(), 'metadata.yaml')) as md:
return yaml.safe_load(md)
@cached
def relation_types():
"""Get a list of relation types supported by this charm"""
charmdir = os.environ.get('CHARM_DIR', '')
mdf = open(os.path.join(charmdir, 'metadata.yaml'))
md = yaml.safe_load(mdf)
rel_types = []
md = metadata()
for key in ('provides', 'requires', 'peers'):
section = md.get(key)
if section:
rel_types.extend(section.keys())
mdf.close()
return rel_types
@cached
def charm_name():
"""Get the name of the current charm as is specified on metadata.yaml"""
return metadata().get('name')
@cached
def relations():
"""Get a nested dictionary of relation data for all related units"""
@ -449,7 +475,7 @@ def unit_get(attribute):
"""Get the unit ID for the remote unit"""
_args = ['unit-get', '--format=json', attribute]
try:
return json.loads(subprocess.check_output(_args))
return json.loads(subprocess.check_output(_args).decode('UTF-8'))
except ValueError:
return None

View File

@ -6,19 +6,20 @@
# Matthew Wedgwood <matthew.wedgwood@canonical.com>
import os
import re
import pwd
import grp
import random
import string
import subprocess
import hashlib
import shutil
from contextlib import contextmanager
from collections import OrderedDict
from hookenv import log
from fstab import Fstab
import six
from .hookenv import log
from .fstab import Fstab
def service_start(service_name):
@ -54,7 +55,9 @@ def service(action, service_name):
def service_running(service):
"""Determine whether a system service is running"""
try:
output = subprocess.check_output(['service', service, 'status'], stderr=subprocess.STDOUT)
output = subprocess.check_output(
['service', service, 'status'],
stderr=subprocess.STDOUT).decode('UTF-8')
except subprocess.CalledProcessError:
return False
else:
@ -67,7 +70,9 @@ def service_running(service):
def service_available(service_name):
"""Determine whether a system service is available"""
try:
subprocess.check_output(['service', service_name, 'status'], stderr=subprocess.STDOUT)
subprocess.check_output(
['service', service_name, 'status'],
stderr=subprocess.STDOUT).decode('UTF-8')
except subprocess.CalledProcessError as e:
return 'unrecognized service' not in e.output
else:
@ -96,6 +101,26 @@ def adduser(username, password=None, shell='/bin/bash', system_user=False):
return user_info
def add_group(group_name, system_group=False):
"""Add a group to the system"""
try:
group_info = grp.getgrnam(group_name)
log('group {0} already exists!'.format(group_name))
except KeyError:
log('creating group {0}'.format(group_name))
cmd = ['addgroup']
if system_group:
cmd.append('--system')
else:
cmd.extend([
'--group',
])
cmd.append(group_name)
subprocess.check_call(cmd)
group_info = grp.getgrnam(group_name)
return group_info
def add_user_to_group(username, group):
"""Add a user to a group"""
cmd = [
@ -115,7 +140,7 @@ def rsync(from_path, to_path, flags='-r', options=None):
cmd.append(from_path)
cmd.append(to_path)
log(" ".join(cmd))
return subprocess.check_output(cmd).strip()
return subprocess.check_output(cmd).decode('UTF-8').strip()
def symlink(source, destination):
@ -130,23 +155,26 @@ def symlink(source, destination):
subprocess.check_call(cmd)
def mkdir(path, owner='root', group='root', perms=0555, force=False):
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
if os.path.exists(realpath):
if force and not os.path.isdir(realpath):
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
else:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chown(realpath, uid, gid)
def write_file(path, content, owner='root', group='root', perms=0444):
def write_file(path, content, owner='root', group='root', perms=0o444):
"""Create or overwrite a file with the contents of a string"""
log("Writing file {} {}:{} {:o}".format(path, owner, group, perms))
uid = pwd.getpwnam(owner).pw_uid
@ -177,7 +205,7 @@ def mount(device, mountpoint, options=None, persist=False, filesystem="ext3"):
cmd_args.extend([device, mountpoint])
try:
subprocess.check_output(cmd_args)
except subprocess.CalledProcessError, e:
except subprocess.CalledProcessError as e:
log('Error mounting {} at {}\n{}'.format(device, mountpoint, e.output))
return False
@ -191,7 +219,7 @@ def umount(mountpoint, persist=False):
cmd_args = ['umount', mountpoint]
try:
subprocess.check_output(cmd_args)
except subprocess.CalledProcessError, e:
except subprocess.CalledProcessError as e:
log('Error unmounting {}\n{}'.format(mountpoint, e.output))
return False
@ -218,8 +246,8 @@ def file_hash(path, hash_type='md5'):
"""
if os.path.exists(path):
h = getattr(hashlib, hash_type)()
with open(path, 'r') as source:
h.update(source.read()) # IGNORE:E1101 - it does have update
with open(path, 'rb') as source:
h.update(source.read())
return h.hexdigest()
else:
return None
@ -297,7 +325,7 @@ def pwgen(length=None):
if length is None:
length = random.choice(range(35, 45))
alphanumeric_chars = [
l for l in (string.letters + string.digits)
l for l in (string.ascii_letters + string.digits)
if l not in 'l0QD1vAEIOUaeiou']
random_chars = [
random.choice(alphanumeric_chars) for _ in range(length)]
@ -306,18 +334,24 @@ def pwgen(length=None):
def list_nics(nic_type):
'''Return a list of nics of given type(s)'''
if isinstance(nic_type, basestring):
if isinstance(nic_type, six.string_types):
int_types = [nic_type]
else:
int_types = nic_type
interfaces = []
for int_type in int_types:
cmd = ['ip', 'addr', 'show', 'label', int_type + '*']
ip_output = subprocess.check_output(cmd).split('\n')
ip_output = subprocess.check_output(cmd).decode('UTF-8').split('\n')
ip_output = (line for line in ip_output if line)
for line in ip_output:
if line.split()[1].startswith(int_type):
interfaces.append(line.split()[1].replace(":", ""))
matched = re.search('.*: (bond[0-9]+\.[0-9]+)@.*', line)
if matched:
interface = matched.groups()[0]
else:
interface = line.split()[1].replace(":", "")
interfaces.append(interface)
return interfaces
@ -329,7 +363,7 @@ def set_nic_mtu(nic, mtu):
def get_nic_mtu(nic):
cmd = ['ip', 'addr', 'show', nic]
ip_output = subprocess.check_output(cmd).split('\n')
ip_output = subprocess.check_output(cmd).decode('UTF-8').split('\n')
mtu = ""
for line in ip_output:
words = line.split()
@ -340,7 +374,7 @@ def get_nic_mtu(nic):
def get_nic_hwaddr(nic):
cmd = ['ip', '-o', '-0', 'addr', 'show', nic]
ip_output = subprocess.check_output(cmd)
ip_output = subprocess.check_output(cmd).decode('UTF-8')
hwaddr = ""
words = ip_output.split()
if 'link/ether' in words:
@ -357,8 +391,8 @@ def cmp_pkgrevno(package, revno, pkgcache=None):
'''
import apt_pkg
from charmhelpers.fetch import apt_cache
if not pkgcache:
from charmhelpers.fetch import apt_cache
pkgcache = apt_cache()
pkg = pkgcache[package]
return apt_pkg.version_compare(pkg.current_ver.ver_str, revno)

View File

@ -1,2 +1,2 @@
from .base import *
from .helpers import *
from .base import * # NOQA
from .helpers import * # NOQA

View File

@ -196,7 +196,7 @@ class StoredContext(dict):
if not os.path.isabs(file_name):
file_name = os.path.join(hookenv.charm_dir(), file_name)
with open(file_name, 'w') as file_stream:
os.fchmod(file_stream.fileno(), 0600)
os.fchmod(file_stream.fileno(), 0o600)
yaml.dump(config_data, file_stream)
def read_context(self, file_name):
@ -211,15 +211,19 @@ class StoredContext(dict):
class TemplateCallback(ManagerCallback):
"""
Callback class that will render a Jinja2 template, for use as a ready action.
Callback class that will render a Jinja2 template, for use as a ready
action.
:param str source: The template source file, relative to
`$CHARM_DIR/templates`
:param str source: The template source file, relative to `$CHARM_DIR/templates`
:param str target: The target to write the rendered template to
:param str owner: The owner of the rendered file
:param str group: The group of the rendered file
:param int perms: The permissions of the rendered file
"""
def __init__(self, source, target, owner='root', group='root', perms=0444):
def __init__(self, source, target,
owner='root', group='root', perms=0o444):
self.source = source
self.target = target
self.owner = owner

View File

@ -4,7 +4,8 @@ from charmhelpers.core import host
from charmhelpers.core import hookenv
def render(source, target, context, owner='root', group='root', perms=0444, templates_dir=None):
def render(source, target, context, owner='root', group='root',
perms=0o444, templates_dir=None):
"""
Render a template.
@ -47,5 +48,5 @@ def render(source, target, context, owner='root', group='root', perms=0444, temp
level=hookenv.ERROR)
raise e
content = template.render(context)
host.mkdir(os.path.dirname(target))
host.mkdir(os.path.dirname(target), owner, group)
host.write_file(target, content, owner, group, perms)

View File

@ -5,10 +5,6 @@ from yaml import safe_load
from charmhelpers.core.host import (
lsb_release
)
from urlparse import (
urlparse,
urlunparse,
)
import subprocess
from charmhelpers.core.hookenv import (
config,
@ -16,6 +12,12 @@ from charmhelpers.core.hookenv import (
)
import os
import six
if six.PY3:
from urllib.parse import urlparse, urlunparse
else:
from urlparse import urlparse, urlunparse
CLOUD_ARCHIVE = """# Ubuntu Cloud Archive
deb http://ubuntu-cloud.archive.canonical.com/ubuntu {} main
@ -72,6 +74,7 @@ CLOUD_ARCHIVE_POCKETS = {
FETCH_HANDLERS = (
'charmhelpers.fetch.archiveurl.ArchiveUrlFetchHandler',
'charmhelpers.fetch.bzrurl.BzrUrlFetchHandler',
'charmhelpers.fetch.giturl.GitUrlFetchHandler',
)
APT_NO_LOCK = 100 # The return code for "couldn't acquire lock" in APT.
@ -148,7 +151,7 @@ def apt_install(packages, options=None, fatal=False):
cmd = ['apt-get', '--assume-yes']
cmd.extend(options)
cmd.append('install')
if isinstance(packages, basestring):
if isinstance(packages, six.string_types):
cmd.append(packages)
else:
cmd.extend(packages)
@ -181,7 +184,7 @@ def apt_update(fatal=False):
def apt_purge(packages, fatal=False):
"""Purge one or more packages"""
cmd = ['apt-get', '--assume-yes', 'purge']
if isinstance(packages, basestring):
if isinstance(packages, six.string_types):
cmd.append(packages)
else:
cmd.extend(packages)
@ -192,7 +195,7 @@ def apt_purge(packages, fatal=False):
def apt_hold(packages, fatal=False):
"""Hold one or more packages"""
cmd = ['apt-mark', 'hold']
if isinstance(packages, basestring):
if isinstance(packages, six.string_types):
cmd.append(packages)
else:
cmd.extend(packages)
@ -218,6 +221,7 @@ def add_source(source, key=None):
pocket for the release.
'cloud:' may be used to activate official cloud archive pockets,
such as 'cloud:icehouse'
'distro' may be used as a noop
@param key: A key to be added to the system's APT keyring and used
to verify the signatures on packages. Ideally, this should be an
@ -251,12 +255,14 @@ def add_source(source, key=None):
release = lsb_release()['DISTRIB_CODENAME']
with open('/etc/apt/sources.list.d/proposed.list', 'w') as apt:
apt.write(PROPOSED_POCKET.format(release))
elif source == 'distro':
pass
else:
raise SourceConfigError("Unknown source: {!r}".format(source))
log("Unknown source: {!r}".format(source))
if key:
if '-----BEGIN PGP PUBLIC KEY BLOCK-----' in key:
with NamedTemporaryFile() as key_file:
with NamedTemporaryFile('w+') as key_file:
key_file.write(key)
key_file.flush()
key_file.seek(0)
@ -293,14 +299,14 @@ def configure_sources(update=False,
sources = safe_load((config(sources_var) or '').strip()) or []
keys = safe_load((config(keys_var) or '').strip()) or None
if isinstance(sources, basestring):
if isinstance(sources, six.string_types):
sources = [sources]
if keys is None:
for source in sources:
add_source(source, None)
else:
if isinstance(keys, basestring):
if isinstance(keys, six.string_types):
keys = [keys]
if len(sources) != len(keys):
@ -397,7 +403,7 @@ def _run_apt_command(cmd, fatal=False):
while result is None or result == APT_NO_LOCK:
try:
result = subprocess.check_call(cmd, env=env)
except subprocess.CalledProcessError, e:
except subprocess.CalledProcessError as e:
retry_count = retry_count + 1
if retry_count > APT_NO_LOCK_RETRY_COUNT:
raise

View File

@ -1,8 +1,23 @@
import os
import urllib2
from urllib import urlretrieve
import urlparse
import hashlib
import re
import six
if six.PY3:
from urllib.request import (
build_opener, install_opener, urlopen, urlretrieve,
HTTPPasswordMgrWithDefaultRealm, HTTPBasicAuthHandler,
)
from urllib.parse import urlparse, urlunparse, parse_qs
from urllib.error import URLError
else:
from urllib import urlretrieve
from urllib2 import (
build_opener, install_opener, urlopen,
HTTPPasswordMgrWithDefaultRealm, HTTPBasicAuthHandler,
URLError
)
from urlparse import urlparse, urlunparse, parse_qs
from charmhelpers.fetch import (
BaseFetchHandler,
@ -15,6 +30,24 @@ from charmhelpers.payload.archive import (
from charmhelpers.core.host import mkdir, check_hash
def splituser(host):
'''urllib.splituser(), but six's support of this seems broken'''
_userprog = re.compile('^(.*)@(.*)$')
match = _userprog.match(host)
if match:
return match.group(1, 2)
return None, host
def splitpasswd(user):
'''urllib.splitpasswd(), but six's support of this is missing'''
_passwdprog = re.compile('^([^:]*):(.*)$', re.S)
match = _passwdprog.match(user)
if match:
return match.group(1, 2)
return user, None
class ArchiveUrlFetchHandler(BaseFetchHandler):
"""
Handler to download archive files from arbitrary URLs.
@ -42,20 +75,20 @@ class ArchiveUrlFetchHandler(BaseFetchHandler):
"""
# propogate all exceptions
# URLError, OSError, etc
proto, netloc, path, params, query, fragment = urlparse.urlparse(source)
proto, netloc, path, params, query, fragment = urlparse(source)
if proto in ('http', 'https'):
auth, barehost = urllib2.splituser(netloc)
auth, barehost = splituser(netloc)
if auth is not None:
source = urlparse.urlunparse((proto, barehost, path, params, query, fragment))
username, password = urllib2.splitpasswd(auth)
passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
source = urlunparse((proto, barehost, path, params, query, fragment))
username, password = splitpasswd(auth)
passman = HTTPPasswordMgrWithDefaultRealm()
# Realm is set to None in add_password to force the username and password
# to be used whatever the realm
passman.add_password(None, source, username, password)
authhandler = urllib2.HTTPBasicAuthHandler(passman)
opener = urllib2.build_opener(authhandler)
urllib2.install_opener(opener)
response = urllib2.urlopen(source)
authhandler = HTTPBasicAuthHandler(passman)
opener = build_opener(authhandler)
install_opener(opener)
response = urlopen(source)
try:
with open(dest, 'w') as dest_file:
dest_file.write(response.read())
@ -91,17 +124,21 @@ class ArchiveUrlFetchHandler(BaseFetchHandler):
url_parts = self.parse_url(source)
dest_dir = os.path.join(os.environ.get('CHARM_DIR'), 'fetched')
if not os.path.exists(dest_dir):
mkdir(dest_dir, perms=0755)
mkdir(dest_dir, perms=0o755)
dld_file = os.path.join(dest_dir, os.path.basename(url_parts.path))
try:
self.download(source, dld_file)
except urllib2.URLError as e:
except URLError as e:
raise UnhandledSource(e.reason)
except OSError as e:
raise UnhandledSource(e.strerror)
options = urlparse.parse_qs(url_parts.fragment)
options = parse_qs(url_parts.fragment)
for key, value in options.items():
if key in hashlib.algorithms:
if not six.PY3:
algorithms = hashlib.algorithms
else:
algorithms = hashlib.algorithms_available
if key in algorithms:
check_hash(dld_file, value, key)
if checksum:
check_hash(dld_file, checksum, hash_type)

View File

@ -5,6 +5,10 @@ from charmhelpers.fetch import (
)
from charmhelpers.core.host import mkdir
import six
if six.PY3:
raise ImportError('bzrlib does not support Python3')
try:
from bzrlib.branch import Branch
except ImportError:
@ -42,7 +46,7 @@ class BzrUrlFetchHandler(BaseFetchHandler):
dest_dir = os.path.join(os.environ.get('CHARM_DIR'), "fetched",
branch_name)
if not os.path.exists(dest_dir):
mkdir(dest_dir, perms=0755)
mkdir(dest_dir, perms=0o755)
try:
self.branch(source, dest_dir)
except OSError as e:

View File

@ -0,0 +1,51 @@
import os
from charmhelpers.fetch import (
BaseFetchHandler,
UnhandledSource
)
from charmhelpers.core.host import mkdir
import six
if six.PY3:
raise ImportError('GitPython does not support Python 3')
try:
from git import Repo
except ImportError:
from charmhelpers.fetch import apt_install
apt_install("python-git")
from git import Repo
class GitUrlFetchHandler(BaseFetchHandler):
"""Handler for git branches via generic and github URLs"""
def can_handle(self, source):
url_parts = self.parse_url(source)
# TODO (mattyw) no support for ssh git@ yet
if url_parts.scheme not in ('http', 'https', 'git'):
return False
else:
return True
def clone(self, source, dest, branch):
if not self.can_handle(source):
raise UnhandledSource("Cannot handle {}".format(source))
repo = Repo.clone_from(source, dest)
repo.git.checkout(branch)
def install(self, source, branch="master", dest=None):
url_parts = self.parse_url(source)
branch_name = url_parts.path.strip("/").split("/")[-1]
if dest:
dest_dir = os.path.join(dest, branch_name)
else:
dest_dir = os.path.join(os.environ.get('CHARM_DIR'), "fetched",
branch_name)
if not os.path.exists(dest_dir):
mkdir(dest_dir, perms=0o755)
try:
self.clone(source, dest_dir, branch)
except OSError as e:
raise UnhandledSource(e.strerror)
return dest_dir

View File

@ -0,0 +1 @@
swift_hooks.py

View File

@ -1,3 +1,6 @@
import os
import uuid
from charmhelpers.core.hookenv import (
config,
log,
@ -5,40 +8,38 @@ from charmhelpers.core.hookenv import (
related_units,
relation_get,
unit_get,
service_name
service_name,
)
from charmhelpers.contrib.openstack.context import (
OSContextGenerator,
ApacheSSLContext as SSLContext,
context_complete,
)
from charmhelpers.contrib.hahelpers.cluster import (
determine_api_port,
determine_apache_port,
)
from charmhelpers.contrib.network.ip import (
get_ipv6_addr
)
from charmhelpers.contrib.openstack.utils import get_host_ip
import os
import uuid
SWIFT_HASH_FILE = '/var/lib/juju/swift-hash-path.conf'
WWW_DIR = '/var/www/swift-rings'
class HAProxyContext(OSContextGenerator):
interfaces = ['cluster']
def __call__(self):
'''
Extends the main charmhelpers HAProxyContext with a port mapping
"""Extends the main charmhelpers HAProxyContext with a port mapping
specific to this charm.
Also used to extend cinder.conf context with correct api_listening_port
'''
"""
haproxy_port = config('bind-port')
api_port = determine_apache_port(config('bind-port'))
api_port = determine_apache_port(config('bind-port'),
singlenode_mode=True)
ctxt = {
'service_ports': {'swift_api': [haproxy_port, api_port]},
@ -46,9 +47,6 @@ class HAProxyContext(OSContextGenerator):
return ctxt
WWW_DIR = '/var/www/swift-rings'
class ApacheSSLContext(SSLContext):
interfaces = ['https']
external_ports = [config('bind-port')]
@ -66,6 +64,7 @@ class SwiftRingContext(OSContextGenerator):
host_ip = get_ipv6_addr(exc_list=[config('vip')])[0]
else:
host_ip = get_host_ip(host)
allowed_hosts.append(host_ip)
ctxt = {
@ -90,10 +89,11 @@ class SwiftIdentityContext(OSContextGenerator):
else:
proxy_ip = get_host_ip(unit_get('private-address'))
memcached_ip = get_host_ip(unit_get('private-address'))
ctxt = {
'proxy_ip': proxy_ip,
'memcached_ip': memcached_ip,
'bind_port': determine_api_port(bind_port),
'bind_port': determine_api_port(bind_port, singlenode_mode=True),
'workers': workers,
'operator_roles': config('operator-roles'),
'delay_auth_decision': config('delay-auth-decision'),
@ -147,6 +147,7 @@ class SwiftIdentityContext(OSContextGenerator):
}
if context_complete(ks_auth):
ctxt.update(ks_auth)
return ctxt
@ -158,9 +159,8 @@ class MemcachedContext(OSContextGenerator):
ctxt['memcached_ip'] = 'ip6-localhost'
else:
ctxt['memcached_ip'] = get_host_ip(unit_get('private-address'))
return ctxt
SWIFT_HASH_FILE = '/var/lib/juju/swift-hash-path.conf'
return ctxt
def get_swift_hash():
@ -176,6 +176,7 @@ def get_swift_hash():
service_name()))
with open(SWIFT_HASH_FILE, 'w') as hashfile:
hashfile.write(swift_hash)
return swift_hash

View File

@ -2,69 +2,82 @@
import os
import sys
import shutil
import uuid
import subprocess
import charmhelpers.contrib.openstack.utils as openstack
import charmhelpers.contrib.hahelpers.cluster as cluster
from swift_utils import (
SwiftProxyCharmException,
register_configs,
restart_map,
services,
determine_packages,
ensure_swift_dir,
SWIFT_RINGS, get_www_dir,
SWIFT_RINGS,
get_www_dir,
initialize_ring,
swift_user,
SWIFT_HA_RES,
balance_ring,
SWIFT_CONF_DIR,
get_zone,
exists_in_ring,
add_to_ring,
should_balance,
do_openstack_upgrade,
write_rc_script,
setup_ipv6
setup_ipv6,
update_rings,
balance_rings,
fully_synced,
sync_proxy_rings,
broadcast_rings_available,
mark_www_rings_deleted,
SwiftProxyClusterRPC,
get_first_available_value,
all_responses_equal,
ensure_www_dir_permissions,
)
from swift_context import get_swift_hash
import charmhelpers.contrib.openstack.utils as openstack
from charmhelpers.contrib.hahelpers.cluster import (
is_elected_leader,
is_crm_leader
)
from charmhelpers.core.hookenv import (
config,
local_unit,
remote_unit,
unit_get,
relation_set,
relation_ids,
relation_get,
related_units,
relations_of_type,
local_unit,
log, ERROR,
log,
DEBUG,
INFO,
WARNING,
ERROR,
Hooks, UnregisteredHookError,
open_port
open_port,
)
from charmhelpers.core.host import (
service_restart,
restart_on_change
service_stop,
service_start,
restart_on_change,
)
from charmhelpers.fetch import (
apt_install,
apt_update
apt_update,
)
from charmhelpers.payload.execd import execd_preinstall
from charmhelpers.contrib.openstack.ip import (
canonical_url,
PUBLIC, INTERNAL, ADMIN
PUBLIC,
INTERNAL,
ADMIN,
)
from charmhelpers.contrib.network.ip import (
get_iface_for_address,
get_netmask_for_address,
get_address_in_network,
get_ipv6_addr,
is_ipv6,
format_ipv6_addr,
is_ipv6
)
from charmhelpers.contrib.openstack.context import ADDRESS_TYPES
from charmhelpers.contrib.charmsupport.nrpe import NRPE
@ -74,9 +87,7 @@ extra_pkgs = [
"python-jinja2"
]
hooks = Hooks()
CONFIGS = register_configs()
@ -86,32 +97,60 @@ def install():
src = config('openstack-origin')
if src != 'distro':
openstack.configure_installation_source(src)
apt_update(fatal=True)
rel = openstack.get_os_codename_install_source(src)
pkgs = determine_packages(rel)
apt_install(pkgs, fatal=True)
apt_install(extra_pkgs, fatal=True)
ensure_swift_dir()
# initialize new storage rings.
for ring in SWIFT_RINGS.iteritems():
initialize_ring(ring[1],
config('partition-power'),
config('replicas'),
config('min-hours'))
if is_elected_leader(SWIFT_HA_RES):
log("Leader established, generating ring builders", level=INFO)
# initialize new storage rings.
for path in SWIFT_RINGS.itervalues():
initialize_ring(path,
config('partition-power'),
config('replicas'),
config('min-hours'))
# configure a directory on webserver for distributing rings.
www_dir = get_www_dir()
if not os.path.isdir(www_dir):
os.mkdir(www_dir, 0o755)
uid, gid = swift_user()
os.chown(www_dir, uid, gid)
ensure_www_dir_permissions(get_www_dir())
@hooks.hook('config-changed')
@restart_on_change(restart_map())
def config_changed():
if config('prefer-ipv6'):
setup_ipv6()
configure_https()
open_port(config('bind-port'))
ensure_swift_directories()
setup_rsync()
# Determine whether or not we should do an upgrade.
if openstack.openstack_upgrade_available('python-swift'):
do_openstack_upgrade(CONFIGS)
update_rings(min_part_hours=config('min-hours'))
if not config('disable-ring-balance') and is_elected_leader(SWIFT_HA_RES):
# Try ring balance. If rings are balanced, no sync will occur.
balance_rings()
for r_id in relation_ids('identity-service'):
keystone_joined(relid=r_id)
if relations_of_type('nrpe-external-master'):
update_nrpe_config()
@hooks.hook('identity-service-relation-joined')
def keystone_joined(relid=None):
if not cluster.eligible_leader(SWIFT_HA_RES):
if not is_elected_leader(SWIFT_HA_RES):
return
port = config('bind-port')
admin_url = '%s:%s' % (canonical_url(CONFIGS, ADMIN), port)
internal_url = '%s:%s/v1/AUTH_$(tenant_id)s' % \
@ -133,53 +172,43 @@ def keystone_changed():
configure_https()
def balance_rings():
'''handle doing ring balancing and distribution.'''
new_ring = False
for ring in SWIFT_RINGS.itervalues():
if balance_ring(ring):
log('Balanced ring %s' % ring)
new_ring = True
if not new_ring:
return
@hooks.hook('swift-storage-relation-joined')
def storage_joined():
if not is_elected_leader(SWIFT_HA_RES):
log("New storage relation joined - stopping proxy until ring builder "
"synced", level=INFO)
service_stop('swift-proxy')
www_dir = get_www_dir()
for ring in SWIFT_RINGS.keys():
f = '%s.ring.gz' % ring
shutil.copyfile(os.path.join(SWIFT_CONF_DIR, f),
os.path.join(www_dir, f))
if cluster.eligible_leader(SWIFT_HA_RES):
msg = 'Broadcasting notification to all storage nodes that new '\
'ring is ready for consumption.'
log(msg)
path = os.path.basename(www_dir)
trigger = uuid.uuid4()
if cluster.is_clustered():
hostname = config('vip')
elif config('prefer-ipv6'):
hostname = get_ipv6_addr(exc_list=[config('vip')])[0]
else:
hostname = unit_get('private-address')
hostname = format_ipv6_addr(hostname) or hostname
rings_url = 'http://%s/%s' % (hostname, path)
# notify storage nodes that there is a new ring to fetch.
for relid in relation_ids('swift-storage'):
relation_set(relation_id=relid, swift_hash=get_swift_hash(),
rings_url=rings_url, trigger=trigger)
service_restart('swift-proxy')
# This unit is not currently responsible for distributing rings but
# may become so at some time in the future so we do this to avoid the
# possibility of storage nodes getting out-of-date rings by deprecating
# any existing ones from the www dir.
mark_www_rings_deleted()
@hooks.hook('swift-storage-relation-changed')
@restart_on_change(restart_map())
def storage_changed():
"""Storage relation.
Only the leader unit can update and distribute rings so if we are not the
leader we ignore this event and wait for a resync request from the leader.
"""
if not is_elected_leader(SWIFT_HA_RES):
log("Not the leader - ignoring storage relation until leader ready.",
level=DEBUG)
return
log("Leader established, updating ring builders", level=INFO)
addr = relation_get('private-address')
if config('prefer-ipv6'):
host_ip = '[%s]' % relation_get('private-address')
host_ip = format_ipv6_addr(addr)
if not host_ip:
errmsg = ("Did not get IPv6 address from storage relation "
"(got=%s)" % (addr))
raise SwiftProxyCharmException(errmsg)
else:
host_ip = openstack.get_host_ip(relation_get('private-address'))
host_ip = openstack.get_host_ip(addr)
zone = get_zone(config('zone-assignment'))
node_settings = {
@ -189,8 +218,11 @@ def storage_changed():
'object_port': relation_get('object_port'),
'container_port': relation_get('container_port'),
}
if None in node_settings.itervalues():
log('storage_changed: Relation not ready.')
missing = [k for k, v in node_settings.iteritems() if v is None]
log("Relation not ready - some required values not provided by "
"relation (missing=%s)" % (', '.join(missing)), level=INFO)
return None
for k in ['zone', 'account_port', 'object_port', 'container_port']:
@ -198,16 +230,16 @@ def storage_changed():
CONFIGS.write_all()
# allow for multiple devs per unit, passed along as a : separated list
devs = relation_get('device').split(':')
for dev in devs:
node_settings['device'] = dev
for ring in SWIFT_RINGS.itervalues():
if not exists_in_ring(ring, node_settings):
add_to_ring(ring, node_settings)
# Allow for multiple devs per unit, passed along as a : separated list
# Update and balance rings.
devs = relation_get('device')
if devs:
node_settings['devices'] = devs.split(':')
if should_balance([r for r in SWIFT_RINGS.itervalues()]):
balance_rings()
update_rings(node_settings)
# Restart proxy here in case no config changes made (so
# restart_on_change() ineffective).
service_restart('swift-proxy')
@hooks.hook('swift-storage-relation-broken')
@ -216,54 +248,171 @@ def storage_broken():
CONFIGS.write_all()
@hooks.hook('config-changed')
@restart_on_change(restart_map())
def config_changed():
if config('prefer-ipv6'):
setup_ipv6()
configure_https()
open_port(config('bind-port'))
update_nrpe_config()
# Determine whether or not we should do an upgrade, based on the
# the version offered in keyston-release.
if (openstack.openstack_upgrade_available('python-swift')):
do_openstack_upgrade(CONFIGS)
for r_id in relation_ids('identity-service'):
keystone_joined(relid=r_id)
[cluster_joined(rid) for rid in relation_ids('cluster')]
@hooks.hook('cluster-relation-joined')
def cluster_joined(relation_id=None):
for addr_type in ADDRESS_TYPES:
address = get_address_in_network(
config('os-{}-network'.format(addr_type))
)
netaddr_cfg = 'os-{}-network'.format(addr_type)
address = get_address_in_network(config(netaddr_cfg))
if address:
relation_set(
relation_id=relation_id,
relation_settings={'{}-address'.format(addr_type): address}
)
settings = {'{}-address'.format(addr_type): address}
relation_set(relation_id=relation_id, relation_settings=settings)
if config('prefer-ipv6'):
private_addr = get_ipv6_addr(exc_list=[config('vip')])[0]
relation_set(relation_id=relation_id,
relation_settings={'private-address': private_addr})
else:
private_addr = unit_get('private-address')
def all_peers_stopped(responses):
"""Establish whether all peers have stopped their proxy services.
Each peer unit will set stop-proxy-service-ack to rq value to indicate that
it has stopped its proxy service. We wait for all units to be stopped
before triggering a sync. Peer services will be restarted once their rings
are synced with the leader.
To be safe, default expectation is that api is still running.
"""
rq_key = SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC
ack_key = SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC_ACK
token = relation_get(attribute=rq_key, unit=local_unit())
if not token or token != responses[0].get(ack_key):
log("Unmatched token in ack (expected=%s, got=%s)" %
(token, responses[0].get(ack_key)), level=DEBUG)
return False
if not all_responses_equal(responses, ack_key):
return False
return True
def cluster_leader_actions():
"""Cluster relation hook actions to be performed by leader units.
NOTE: must be called by leader from cluster relation hook.
"""
log("Cluster changed by unit=%s (local is leader)" % (remote_unit()),
level=DEBUG)
# If we have received an ack, check other units
settings = relation_get() or {}
ack_key = SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC_ACK
# Protect against leader changing mid-sync
if settings.get(SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC):
log("Sync request received yet this is leader unit. This would "
"indicate that the leader has changed mid-sync - stopping proxy "
"and notifying peers", level=ERROR)
service_stop('swift-proxy')
SwiftProxyClusterRPC().notify_leader_changed()
return
elif ack_key in settings:
token = settings[ack_key]
# Find out if all peer units have been stopped.
responses = []
for rid in relation_ids('cluster'):
for unit in related_units(rid):
responses.append(relation_get(rid=rid, unit=unit))
# Ensure all peers stopped before starting sync
if all_peers_stopped(responses):
key = 'peers-only'
if not all_responses_equal(responses, key, must_exist=False):
msg = ("Did not get equal response from every peer unit for "
"'%s'" % (key))
raise SwiftProxyCharmException(msg)
peers_only = int(get_first_available_value(responses, key,
default=0))
log("Syncing rings and builders (peers-only=%s)" % (peers_only),
level=DEBUG)
broadcast_rings_available(token, storage=not peers_only)
else:
log("Not all peer apis stopped - skipping sync until all peers "
"ready (got %s)" % (responses), level=INFO)
CONFIGS.write_all()
def cluster_non_leader_actions():
"""Cluster relation hook actions to be performed by non-leader units.
NOTE: must be called by non-leader from cluster relation hook.
"""
log("Cluster changed by unit=%s (local is non-leader)" % (remote_unit()),
level=DEBUG)
settings = relation_get() or {}
# Check whether we have been requested to stop proxy service
rq_key = SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC
token = settings.get(rq_key, None)
if token:
log("Peer request to stop proxy service received (%s) - sending ack" %
(token), level=INFO)
service_stop('swift-proxy')
peers_only = settings.get('peers-only', None)
rq = SwiftProxyClusterRPC().stop_proxy_ack(echo_token=token,
echo_peers_only=peers_only)
relation_set(relation_settings=rq)
return
# Check if there are any builder files we can sync from the leader.
log("Non-leader peer - checking if updated rings available", level=DEBUG)
broker = settings.get('builder-broker', None)
if not broker:
log("No update available", level=DEBUG)
service_start('swift-proxy')
return
builders_only = int(settings.get('sync-only-builders', 0))
path = os.path.basename(get_www_dir())
try:
sync_proxy_rings('http://%s/%s' % (broker, path),
rings=not builders_only)
except subprocess.CalledProcessError:
log("Ring builder sync failed, builders not yet available - "
"leader not ready?", level=WARNING)
return None
# Re-enable the proxy once all builders and rings are synced
if fully_synced():
log("Ring builders synced - starting proxy", level=INFO)
CONFIGS.write_all()
service_start('swift-proxy')
else:
log("Not all builders and rings synced yet - waiting for peer sync "
"before starting proxy", level=INFO)
@hooks.hook('cluster-relation-changed',
'cluster-relation-departed')
@restart_on_change(restart_map())
def cluster_changed():
CONFIGS.write_all()
key = SwiftProxyClusterRPC.KEY_NOTIFY_LEADER_CHANGED
leader_changed = relation_get(attribute=key)
if leader_changed:
log("Leader changed notification received from peer unit. Since this "
"most likely occurred during a ring sync proxies will be "
"disabled until the leader is restored and a fresh sync request "
"is set out", level=WARNING)
service_stop("swift-proxy")
return
if is_elected_leader(SWIFT_HA_RES):
cluster_leader_actions()
else:
cluster_non_leader_actions()
@hooks.hook('ha-relation-changed')
def ha_relation_changed():
clustered = relation_get('clustered')
if clustered and cluster.is_leader(SWIFT_HA_RES):
log('Cluster configured, notifying other services and'
'updating keystone endpoint configuration')
if clustered and is_crm_leader(SWIFT_HA_RES):
log("Cluster configured, notifying other services and updating "
"keystone endpoint configuration", level=INFO)
# Tell all related services to start using
# the VIP instead
for r_id in relation_ids('identity-service'):
@ -278,17 +427,12 @@ def ha_relation_joined():
corosync_mcastport = config('ha-mcastport')
vip = config('vip')
if not vip:
log('Unable to configure hacluster as vip not provided',
level=ERROR)
sys.exit(1)
msg = 'Unable to configure hacluster as vip not provided'
raise SwiftProxyCharmException(msg)
# Obtain resources
resources = {
'res_swift_haproxy': 'lsb:haproxy'
}
resource_params = {
'res_swift_haproxy': 'op monitor interval="5s"'
}
resources = {'res_swift_haproxy': 'lsb:haproxy'}
resource_params = {'res_swift_haproxy': 'op monitor interval="5s"'}
vip_group = []
for vip in vip.split():
@ -315,12 +459,8 @@ def ha_relation_joined():
if len(vip_group) >= 1:
relation_set(groups={'grp_swift_vips': ' '.join(vip_group)})
init_services = {
'res_swift_haproxy': 'haproxy'
}
clones = {
'cl_swift_haproxy': 'res_swift_haproxy'
}
init_services = {'res_swift_haproxy': 'haproxy'}
clones = {'cl_swift_haproxy': 'res_swift_haproxy'}
relation_set(init_services=init_services,
corosync_bindiface=corosync_bindiface,
@ -331,10 +471,9 @@ def ha_relation_joined():
def configure_https():
'''
Enables SSL API Apache config if appropriate and kicks identity-service
"""Enables SSL API Apache config if appropriate and kicks identity-service
with any required api updates.
'''
"""
# need to write all to ensure changes to the entire request pipeline
# propagate (c-api, haprxy, apache)
CONFIGS.write_all()
@ -352,7 +491,10 @@ def configure_https():
for rid in relation_ids('identity-service'):
keystone_joined(relid=rid)
write_rc_script()
env_vars = {'OPENSTACK_SERVICE_SWIFT': 'proxy-server',
'OPENSTACK_PORT_API': config('bind-port'),
'OPENSTACK_PORT_MEMCACHED': 11211}
openstack.save_script_rc(**env_vars)
@hooks.hook('nrpe-external-master-relation-joined',
@ -410,7 +552,7 @@ def main():
try:
hooks.execute(sys.argv)
except UnregisteredHookError as e:
log('Unknown hook {} - skipping.'.format(e))
log('Unknown hook {} - skipping.'.format(e), level=DEBUG)
if __name__ == '__main__':

View File

@ -1,14 +1,48 @@
import copy
import glob
import hashlib
import os
import pwd
import shutil
import subprocess
import charmhelpers.contrib.openstack.utils as openstack
import sys
from collections import OrderedDict
import tempfile
import threading
import uuid
from collections import OrderedDict
from swift_context import (
get_swift_hash,
SwiftHashContext,
SwiftIdentityContext,
HAProxyContext,
SwiftRingContext,
ApacheSSLContext,
MemcachedContext,
)
import charmhelpers.contrib.openstack.context as context
import charmhelpers.contrib.openstack.templating as templating
from charmhelpers.contrib.openstack.utils import (
get_os_codename_package,
get_os_codename_install_source,
configure_installation_source
)
from charmhelpers.contrib.hahelpers.cluster import (
is_elected_leader,
is_clustered,
peer_units,
)
from charmhelpers.core.hookenv import (
log, ERROR,
log,
DEBUG,
INFO,
WARNING,
config,
relation_get,
unit_get,
relation_set,
relation_ids,
related_units,
)
from charmhelpers.fetch import (
apt_update,
@ -16,31 +50,38 @@ from charmhelpers.fetch import (
apt_install,
add_source
)
from charmhelpers.core.host import (
lsb_release
)
import charmhelpers.contrib.openstack.context as context
import charmhelpers.contrib.openstack.templating as templating
import swift_context
from charmhelpers.contrib.network.ip import (
format_ipv6_addr,
get_ipv6_addr,
)
from charmhelpers.core.decorators import (
retry_on_exception,
)
# Various config files that are managed via templating.
SWIFT_CONF = '/etc/swift/swift.conf'
SWIFT_PROXY_CONF = '/etc/swift/proxy-server.conf'
SWIFT_CONF_DIR = '/etc/swift'
SWIFT_RING_EXT = 'ring.gz'
SWIFT_CONF = os.path.join(SWIFT_CONF_DIR, 'swift.conf')
SWIFT_PROXY_CONF = os.path.join(SWIFT_CONF_DIR, 'proxy-server.conf')
SWIFT_CONF_DIR = os.path.dirname(SWIFT_CONF)
MEMCACHED_CONF = '/etc/memcached.conf'
SWIFT_RINGS_CONF = '/etc/apache2/conf.d/swift-rings'
SWIFT_RINGS_24_CONF = '/etc/apache2/conf-available/swift-rings.conf'
HAPROXY_CONF = '/etc/haproxy/haproxy.cfg'
APACHE_SITE_CONF = '/etc/apache2/sites-available/openstack_https_frontend'
APACHE_SITE_24_CONF = '/etc/apache2/sites-available/' \
'openstack_https_frontend.conf'
APACHE_SITES_AVAILABLE = '/etc/apache2/sites-available'
APACHE_SITE_CONF = os.path.join(APACHE_SITES_AVAILABLE,
'openstack_https_frontend')
APACHE_SITE_24_CONF = os.path.join(APACHE_SITES_AVAILABLE,
'openstack_https_frontend.conf')
WWW_DIR = '/var/www/swift-rings'
ALTERNATE_WWW_DIR = '/var/www/html/swift-rings'
RING_SYNC_SEMAPHORE = threading.Semaphore()
def get_www_dir():
if os.path.isdir(os.path.dirname(ALTERNATE_WWW_DIR)):
@ -50,13 +91,13 @@ def get_www_dir():
SWIFT_RINGS = {
'account': '/etc/swift/account.builder',
'container': '/etc/swift/container.builder',
'object': '/etc/swift/object.builder'
'account': os.path.join(SWIFT_CONF_DIR, 'account.builder'),
'container': os.path.join(SWIFT_CONF_DIR, 'container.builder'),
'object': os.path.join(SWIFT_CONF_DIR, 'object.builder')
}
SSL_CERT = '/etc/swift/cert.crt'
SSL_KEY = '/etc/swift/cert.key'
SSL_CERT = os.path.join(SWIFT_CONF_DIR, 'cert.crt')
SSL_KEY = os.path.join(SWIFT_CONF_DIR, 'cert.key')
# Essex packages
BASE_PACKAGES = [
@ -70,59 +111,179 @@ BASE_PACKAGES = [
FOLSOM_PACKAGES = BASE_PACKAGES + ['swift-plugin-s3']
SWIFT_HA_RES = 'grp_swift_vips'
TEMPLATES = 'templates/'
# Map config files to hook contexts and services that will be associated
# with file in restart_on_changes()'s service map.
CONFIG_FILES = OrderedDict([
(SWIFT_CONF, {
'hook_contexts': [swift_context.SwiftHashContext()],
'hook_contexts': [SwiftHashContext()],
'services': ['swift-proxy'],
}),
(SWIFT_PROXY_CONF, {
'hook_contexts': [swift_context.SwiftIdentityContext(),
'hook_contexts': [SwiftIdentityContext(),
context.BindHostContext()],
'services': ['swift-proxy'],
}),
(HAPROXY_CONF, {
'hook_contexts': [context.HAProxyContext(),
swift_context.HAProxyContext()],
'hook_contexts': [context.HAProxyContext(singlenode_mode=True),
HAProxyContext()],
'services': ['haproxy'],
}),
(SWIFT_RINGS_CONF, {
'hook_contexts': [swift_context.SwiftRingContext()],
'hook_contexts': [SwiftRingContext()],
'services': ['apache2'],
}),
(SWIFT_RINGS_24_CONF, {
'hook_contexts': [swift_context.SwiftRingContext()],
'hook_contexts': [SwiftRingContext()],
'services': ['apache2'],
}),
(APACHE_SITE_CONF, {
'hook_contexts': [swift_context.ApacheSSLContext()],
'hook_contexts': [ApacheSSLContext()],
'services': ['apache2'],
}),
(APACHE_SITE_24_CONF, {
'hook_contexts': [swift_context.ApacheSSLContext()],
'hook_contexts': [ApacheSSLContext()],
'services': ['apache2'],
}),
(MEMCACHED_CONF, {
'hook_contexts': [swift_context.MemcachedContext()],
'hook_contexts': [MemcachedContext()],
'services': ['memcached'],
}),
])
def register_configs():
class SwiftProxyCharmException(Exception):
pass
class SwiftProxyClusterRPC(object):
"""Provides cluster relation rpc dicts.
Crucially, this ensures that any settings we don't use in any given call
are set to None, therefore removing them from the relation so they don't
get accidentally interpreted by the receiver as part of the request.
NOTE: these are only intended to be used from cluster peer relations.
"""
Register config files with their respective contexts.
Regstration of some configs may not be required depending on
KEY_STOP_PROXY_SVC = 'stop-proxy-service'
KEY_STOP_PROXY_SVC_ACK = 'stop-proxy-service-ack'
KEY_NOTIFY_LEADER_CHANGED = 'leader-changed-notification'
def __init__(self, version=1):
self._version = version
def template(self):
# Everything must be None by default so it gets dropped from the
# relation unless we want it to be set.
templates = {1: {'trigger': None,
'broker-token': None,
'builder-broker': None,
self.KEY_STOP_PROXY_SVC: None,
self.KEY_STOP_PROXY_SVC_ACK: None,
self.KEY_NOTIFY_LEADER_CHANGED: None,
'peers-only': None,
'sync-only-builders': None}}
return copy.deepcopy(templates[self._version])
def stop_proxy_request(self, peers_only=False):
"""Request to stop peer proxy service.
NOTE: leader action
"""
rq = self.template()
rq['trigger'] = str(uuid.uuid4())
rq[self.KEY_STOP_PROXY_SVC] = rq['trigger']
if peers_only:
rq['peers-only'] = 1
return rq
def stop_proxy_ack(self, echo_token, echo_peers_only):
"""Ack that peer proxy service is stopped.
NOTE: non-leader action
"""
rq = self.template()
rq['trigger'] = str(uuid.uuid4())
# These echo values should match those received in the request
rq[self.KEY_STOP_PROXY_SVC_ACK] = echo_token
rq['peers-only'] = echo_peers_only
return rq
def sync_rings_request(self, broker_host, broker_token,
builders_only=False):
"""Request for peer to sync rings.
NOTE: leader action
"""
rq = self.template()
rq['trigger'] = str(uuid.uuid4())
if builders_only:
rq['sync-only-builders'] = 1
rq['broker-token'] = broker_token
rq['builder-broker'] = broker_host
return rq
def notify_leader_changed(self):
"""Notify peers that leader has changed.
NOTE: leader action
"""
rq = self.template()
rq['trigger'] = str(uuid.uuid4())
rq[self.KEY_NOTIFY_LEADER_CHANGED] = rq['trigger']
return rq
def get_first_available_value(responses, key, default=None):
for r in responses:
if key in r:
return r[key]
return default
def all_responses_equal(responses, key, must_exist=True):
"""If key exists in responses, all values for it must be equal.
If all equal return True. If key does not exist and must_exist is True
return False otherwise True.
"""
sentinel = object()
val = None
all_equal = True
for r in responses:
_val = r.get(key, sentinel)
if val is not None and val != _val:
all_equal = False
break
elif _val != sentinel:
val = _val
if must_exist and val is None:
all_equal = False
if all_equal:
return True
log("Responses not all equal for key '%s'" % (key), level=DEBUG)
return False
def register_configs():
"""Register config files with their respective contexts.
Registration of some configs may not be required depending on
existing of certain relations.
"""
# if called without anything installed (eg during install hook)
# just default to earliest supported release. configs dont get touched
# till post-install, anyway.
release = openstack.get_os_codename_package('swift-proxy', fatal=False) \
release = get_os_codename_package('swift-proxy', fatal=False) \
or 'essex'
configs = templating.OSConfigRenderer(templates_dir=TEMPLATES,
openstack_release=release)
@ -149,13 +310,12 @@ def register_configs():
def restart_map():
'''
Determine the correct resource map to be passed to
"""Determine the correct resource map to be passed to
charmhelpers.core.restart_on_change() based on the services configured.
:returns: dict: A dictionary mapping config file to lists of services
:returns dict: A dictionary mapping config file to lists of services
that should be restarted when file changes.
'''
"""
_map = []
for f, ctxt in CONFIG_FILES.iteritems():
svcs = []
@ -163,6 +323,7 @@ def restart_map():
svcs.append(svc)
if svcs:
_map.append((f, svcs))
return OrderedDict(_map)
@ -182,12 +343,13 @@ def swift_user(username='swift'):
def ensure_swift_dir(conf_dir=os.path.dirname(SWIFT_CONF)):
if not os.path.isdir(conf_dir):
os.mkdir(conf_dir, 0o750)
uid, gid = swift_user()
os.chown(conf_dir, uid, gid)
def determine_packages(release):
'''determine what packages are needed for a given OpenStack release'''
"""Determine what packages are needed for a given OpenStack release."""
if release == 'essex':
return BASE_PACKAGES
elif release == 'folsom':
@ -198,13 +360,6 @@ def determine_packages(release):
return FOLSOM_PACKAGES
def write_rc_script():
env_vars = {'OPENSTACK_SERVICE_SWIFT': 'proxy-server',
'OPENSTACK_PORT_API': config('bind-port'),
'OPENSTACK_PORT_MEMCACHED': 11211}
openstack.save_script_rc(**env_vars)
def _load_builder(path):
# lifted straight from /usr/bin/swift-ring-builder
from swift.common.ring import RingBuilder
@ -221,6 +376,7 @@ def _load_builder(path):
for dev in builder.devs:
if dev and 'meta' not in dev:
dev['meta'] = ''
return builder
@ -230,14 +386,14 @@ def _write_ring(ring, ring_path):
def ring_port(ring_path, node):
'''determine correct port from relation settings for a given ring file.'''
"""Determine correct port from relation settings for a given ring file."""
for name in ['account', 'object', 'container']:
if name in ring_path:
return node[('%s_port' % name)]
def initialize_ring(path, part_power, replicas, min_hours):
'''Initialize a new swift ring with given parameters.'''
"""Initialize a new swift ring with given parameters."""
from swift.common.ring import RingBuilder
ring = RingBuilder(part_power, replicas, min_hours)
_write_ring(ring, path)
@ -252,14 +408,13 @@ def exists_in_ring(ring_path, node):
n = [(i, node[i]) for i in node if i in dev and i != 'zone']
if sorted(d) == sorted(n):
msg = 'Node already exists in ring (%s).' % ring_path
log(msg)
log('Node already exists in ring (%s).' % ring_path, level=INFO)
return True
return False
def add_to_ring(ring_path, node):
def add_to_ring(ring_path, node, device):
ring = _load_builder(ring_path)
port = ring_port(ring_path, node)
@ -273,16 +428,14 @@ def add_to_ring(ring_path, node):
'zone': node['zone'],
'ip': node['ip'],
'port': port,
'device': node['device'],
'device': device,
'weight': 100,
'meta': '',
}
ring.add_dev(new_dev)
_write_ring(ring, ring_path)
msg = 'Added new device to ring %s: %s' %\
(ring_path,
[k for k in new_dev.iteritems()])
log(msg)
msg = 'Added new device to ring %s: %s' % (ring_path, new_dev)
log(msg, level=INFO)
def _get_zone(ring_builder):
@ -310,19 +463,33 @@ def _get_zone(ring_builder):
return sorted(zone_distrib, key=zone_distrib.get).pop(0)
def get_min_part_hours(ring):
builder = _load_builder(ring)
return builder.min_part_hours
def set_min_part_hours(path, value):
cmd = ['swift-ring-builder', path, 'set_min_part_hours', str(value)]
p = subprocess.Popen(cmd)
p.communicate()
rc = p.returncode
if rc != 0:
msg = ("Failed to set min_part_hours=%s on %s" % (value, path))
raise SwiftProxyCharmException(msg)
def get_zone(assignment_policy):
''' Determine the appropriate zone depending on configured assignment
policy.
"""Determine appropriate zone based on configured assignment policy.
Manual assignment relies on each storage zone being deployed as a
separate service unit with its desired zone set as a configuration
option.
Manual assignment relies on each storage zone being deployed as a
separate service unit with its desired zone set as a configuration
option.
Auto assignment distributes swift-storage machine units across a number
of zones equal to the configured minimum replicas. This allows for a
single swift-storage service unit, with each 'add-unit'd machine unit
being assigned to a different zone.
'''
Auto assignment distributes swift-storage machine units across a number
of zones equal to the configured minimum replicas. This allows for a
single swift-storage service unit, with each 'add-unit'd machine unit
being assigned to a different zone.
"""
if assignment_policy == 'manual':
return relation_get('zone')
elif assignment_policy == 'auto':
@ -332,13 +499,15 @@ def get_zone(assignment_policy):
potential_zones.append(_get_zone(builder))
return set(potential_zones).pop()
else:
log('Invalid zone assignment policy: %s' % assignment_policy,
level=ERROR)
sys.exit(1)
msg = ('Invalid zone assignment policy: %s' % assignment_policy)
raise SwiftProxyCharmException(msg)
def balance_ring(ring_path):
'''balance a ring. return True if it needs redistribution'''
"""Balance a ring.
Returns True if it needs redistribution.
"""
# shell out to swift-ring-builder instead, since the balancing code there
# does a bunch of un-importable validation.'''
cmd = ['swift-ring-builder', ring_path, 'rebalance']
@ -347,34 +516,53 @@ def balance_ring(ring_path):
rc = p.returncode
if rc == 0:
return True
elif rc == 1:
# swift-ring-builder returns 1 on WARNING (ring didn't require balance)
if rc == 1:
# Ring builder exit-code=1 is supposed to indicate warning but I have
# noticed that it can also return 1 with the following sort of message:
#
# NOTE: Balance of 166.67 indicates you should push this ring, wait
# at least 0 hours, and rebalance/repush.
#
# This indicates that a balance has occurred and a resync would be
# required so not sure why 1 is returned in this case.
return False
else:
log('balance_ring: %s returned %s' % (cmd, rc), level=ERROR)
sys.exit(1)
msg = ('balance_ring: %s returned %s' % (cmd, rc))
raise SwiftProxyCharmException(msg)
def should_balance(rings):
'''Based on zones vs min. replicas, determine whether or not the rings
should be balanaced during initial configuration.'''
do_rebalance = True
"""Determine whether or not a re-balance is required and allowed.
Ring balance can be disabled/postponed using the disable-ring-balance
config option.
Otherwise, using zones vs min. replicas, determine whether or not the rings
should be balanced.
"""
if config('disable-ring-balance'):
return False
for ring in rings:
zones = []
r = _load_builder(ring).to_dict()
replicas = r['replicas']
zones = [d['zone'] for d in r['devs']]
if len(set(zones)) < replicas:
do_rebalance = False
return do_rebalance
builder = _load_builder(ring).to_dict()
replicas = builder['replicas']
zones = [dev['zone'] for dev in builder['devs']]
num_zones = len(set(zones))
if num_zones < replicas:
log("Not enough zones (%d) defined to allow ring balance "
"(need >= %d)" % (num_zones, replicas), level=INFO)
return False
return True
def do_openstack_upgrade(configs):
new_src = config('openstack-origin')
new_os_rel = openstack.get_os_codename_install_source(new_src)
new_os_rel = get_os_codename_install_source(new_src)
log('Performing OpenStack upgrade to %s.' % (new_os_rel))
openstack.configure_installation_source(new_src)
log('Performing OpenStack upgrade to %s.' % (new_os_rel), level=DEBUG)
configure_installation_source(new_src)
dpkg_opts = [
'--option', 'Dpkg::Options::=--force-confnew',
'--option', 'Dpkg::Options::=--force-confdef',
@ -386,10 +574,16 @@ def do_openstack_upgrade(configs):
def setup_ipv6():
"""Validate that we can support IPv6 mode.
This should be called if prefer-ipv6 is True to ensure that we are running
in an environment that supports ipv6.
"""
ubuntu_rel = lsb_release()['DISTRIB_CODENAME'].lower()
if ubuntu_rel < "trusty":
raise Exception("IPv6 is not supported in the charms for Ubuntu "
"versions less than Trusty 14.04")
msg = ("IPv6 is not supported in the charms for Ubuntu versions less "
"than Trusty 14.04")
raise SwiftProxyCharmException(msg)
# NOTE(xianghui): Need to install haproxy(1.5.3) from trusty-backports
# to support ipv6 address, so check is required to make sure not
@ -399,3 +593,413 @@ def setup_ipv6():
' main')
apt_update()
apt_install('haproxy/trusty-backports', fatal=True)
@retry_on_exception(3, base_delay=2, exc_type=subprocess.CalledProcessError)
def sync_proxy_rings(broker_url, builders=True, rings=True):
"""The leader proxy is responsible for intialising, updating and
rebalancing the ring. Once the leader is ready the rings must then be
synced into each other proxy unit.
Note that we sync the ring builder and .gz files since the builder itself
is linked to the underlying .gz ring.
"""
log('Fetching swift rings & builders from proxy @ %s.' % broker_url,
level=DEBUG)
target = SWIFT_CONF_DIR
synced = []
tmpdir = tempfile.mkdtemp(prefix='swiftrings')
try:
for server in ['account', 'object', 'container']:
if builders:
url = '%s/%s.builder' % (broker_url, server)
log('Fetching %s.' % url, level=DEBUG)
builder = "%s.builder" % (server)
cmd = ['wget', url, '--retry-connrefused', '-t', '10', '-O',
os.path.join(tmpdir, builder)]
subprocess.check_call(cmd)
synced.append(builder)
if rings:
url = '%s/%s.%s' % (broker_url, server, SWIFT_RING_EXT)
log('Fetching %s.' % url, level=DEBUG)
ring = '%s.%s' % (server, SWIFT_RING_EXT)
cmd = ['wget', url, '--retry-connrefused', '-t', '10', '-O',
os.path.join(tmpdir, ring)]
subprocess.check_call(cmd)
synced.append(ring)
# Once all have been successfully downloaded, move them to actual
# location.
for f in synced:
os.rename(os.path.join(tmpdir, f), os.path.join(target, f))
finally:
shutil.rmtree(tmpdir)
def ensure_www_dir_permissions(www_dir):
if not os.path.isdir(www_dir):
os.mkdir(www_dir, 0o755)
else:
os.chmod(www_dir, 0o755)
uid, gid = swift_user()
os.chown(www_dir, uid, gid)
def update_www_rings(rings=True, builders=True):
"""Copy rings to apache www dir.
Try to do this as atomically as possible to avoid races with storage nodes
syncing rings.
"""
if not (rings or builders):
return
tmp_dir = tempfile.mkdtemp(prefix='swift-rings-www-tmp')
for ring, builder_path in SWIFT_RINGS.iteritems():
if rings:
ringfile = '%s.%s' % (ring, SWIFT_RING_EXT)
src = os.path.join(SWIFT_CONF_DIR, ringfile)
dst = os.path.join(tmp_dir, ringfile)
shutil.copyfile(src, dst)
if builders:
src = builder_path
dst = os.path.join(tmp_dir, os.path.basename(builder_path))
shutil.copyfile(src, dst)
www_dir = get_www_dir()
deleted = "%s.deleted" % (www_dir)
ensure_www_dir_permissions(tmp_dir)
os.rename(www_dir, deleted)
os.rename(tmp_dir, www_dir)
shutil.rmtree(deleted)
def get_rings_checksum():
"""Returns sha256 checksum for rings in /etc/swift."""
sha = hashlib.sha256()
for ring in SWIFT_RINGS.iterkeys():
path = os.path.join(SWIFT_CONF_DIR, '%s.%s' % (ring, SWIFT_RING_EXT))
if not os.path.isfile(path):
continue
with open(path, 'rb') as fd:
sha.update(fd.read())
return sha.hexdigest()
def get_builders_checksum():
"""Returns sha256 checksum for builders in /etc/swift."""
sha = hashlib.sha256()
for builder in SWIFT_RINGS.itervalues():
if not os.path.exists(builder):
continue
with open(builder, 'rb') as fd:
sha.update(fd.read())
return sha.hexdigest()
def get_broker_token():
"""Get ack token from peers to be used as broker token.
Must be equal across all peers.
Returns token or None if not found.
"""
responses = []
ack_key = SwiftProxyClusterRPC.KEY_STOP_PROXY_SVC_ACK
for rid in relation_ids('cluster'):
for unit in related_units(rid):
responses.append(relation_get(attribute=ack_key, rid=rid,
unit=unit))
# If no acks exist we have probably never done a sync so make up a token
if len(responses) == 0:
return str(uuid.uuid4())
if not all(responses) or len(set(responses)) != 1:
log("Not all ack tokens equal - %s" % (responses), level=DEBUG)
return None
return responses[0]
def sync_builders_and_rings_if_changed(f):
"""Only trigger a ring or builder sync if they have changed as a result of
the decorated operation.
"""
def _inner_sync_builders_and_rings_if_changed(*args, **kwargs):
if not is_elected_leader(SWIFT_HA_RES):
log("Sync rings called by non-leader - skipping", level=WARNING)
return
try:
# Ensure we don't do a double sync if we are nested.
do_sync = False
if RING_SYNC_SEMAPHORE.acquire(blocking=0):
do_sync = True
rings_before = get_rings_checksum()
builders_before = get_builders_checksum()
ret = f(*args, **kwargs)
if not do_sync:
return ret
rings_after = get_rings_checksum()
builders_after = get_builders_checksum()
rings_path = os.path.join(SWIFT_CONF_DIR, '*.%s' %
(SWIFT_RING_EXT))
rings_ready = len(glob.glob(rings_path)) == len(SWIFT_RINGS)
rings_changed = rings_after != rings_before
builders_changed = builders_after != builders_before
if rings_changed or builders_changed:
# Copy builders and rings (if available) to the server dir.
update_www_rings(rings=rings_ready)
if rings_changed and rings_ready:
# Trigger sync
cluster_sync_rings(peers_only=not rings_changed)
else:
cluster_sync_rings(peers_only=True, builders_only=True)
log("Rings not ready for sync - syncing builders",
level=DEBUG)
else:
log("Rings/builders unchanged - skipping sync", level=DEBUG)
return ret
finally:
RING_SYNC_SEMAPHORE.release()
return _inner_sync_builders_and_rings_if_changed
@sync_builders_and_rings_if_changed
def update_rings(node_settings=None, min_part_hours=None):
"""Update builder with node settings and balance rings if necessary.
Also update min_part_hours if provided.
"""
if not is_elected_leader(SWIFT_HA_RES):
log("Update rings called by non-leader - skipping", level=WARNING)
return
balance_required = False
if min_part_hours:
# NOTE: no need to stop the proxy since we are not changing the rings,
# only the builder.
# Only update if all exist
if all([os.path.exists(p) for p in SWIFT_RINGS.itervalues()]):
for ring, path in SWIFT_RINGS.iteritems():
current_min_part_hours = get_min_part_hours(path)
if min_part_hours != current_min_part_hours:
log("Setting ring %s min_part_hours to %s" %
(ring, min_part_hours), level=INFO)
try:
set_min_part_hours(path, min_part_hours)
except SwiftProxyCharmException as exc:
# TODO: ignore for now since this should not be
# critical but in the future we should support a
# rollback.
log(str(exc), level=WARNING)
else:
balance_required = True
if node_settings:
for dev in node_settings.get('devices', []):
for ring in SWIFT_RINGS.itervalues():
if not exists_in_ring(ring, node_settings):
add_to_ring(ring, node_settings, dev)
balance_required = True
if balance_required:
balance_rings()
@sync_builders_and_rings_if_changed
def balance_rings():
"""Rebalance each ring and notify peers that new rings are available."""
if not is_elected_leader(SWIFT_HA_RES):
log("Balance rings called by non-leader - skipping", level=WARNING)
return
if not should_balance([r for r in SWIFT_RINGS.itervalues()]):
log("Not yet ready to balance rings - insufficient replicas?",
level=INFO)
return
rebalanced = False
for path in SWIFT_RINGS.itervalues():
if balance_ring(path):
log('Balanced ring %s' % path, level=DEBUG)
rebalanced = True
else:
log('Ring %s not rebalanced' % path, level=DEBUG)
if not rebalanced:
log("Rings unchanged by rebalance", level=DEBUG)
# NOTE: checksum will tell for sure
def mark_www_rings_deleted():
"""Mark any rings from the apache server directory as deleted so that
storage units won't see them.
"""
www_dir = get_www_dir()
for ring, _ in SWIFT_RINGS.iteritems():
path = os.path.join(www_dir, '%s.ring.gz' % ring)
if os.path.exists(path):
os.rename(path, "%s.deleted" % (path))
def notify_peers_builders_available(broker_token, builders_only=False):
"""Notify peer swift-proxy units that they should synchronise ring and
builder files.
Note that this should only be called from the leader unit.
"""
if not is_elected_leader(SWIFT_HA_RES):
log("Ring availability peer broadcast requested by non-leader - "
"skipping", level=WARNING)
return
if is_clustered():
hostname = config('vip')
else:
hostname = get_hostaddr()
hostname = format_ipv6_addr(hostname) or hostname
# Notify peers that builders are available
log("Notifying peer(s) that rings are ready for sync.", level=INFO)
rq = SwiftProxyClusterRPC().sync_rings_request(hostname,
broker_token,
builders_only=builders_only)
for rid in relation_ids('cluster'):
log("Notifying rid=%s (%s)" % (rid, rq), level=DEBUG)
relation_set(relation_id=rid, relation_settings=rq)
def broadcast_rings_available(broker_token, peers=True, storage=True,
builders_only=False):
"""Notify storage relations and cluster (peer) relations that rings and
builders are availble for sync.
We can opt to only notify peer or storage relations if needs be.
"""
if storage:
# TODO: get ack from storage units that they are synced before
# syncing proxies.
notify_storage_rings_available()
else:
log("Skipping notify storage relations", level=DEBUG)
if peers:
notify_peers_builders_available(broker_token,
builders_only=builders_only)
else:
log("Skipping notify peer relations", level=DEBUG)
def cluster_sync_rings(peers_only=False, builders_only=False):
"""Notify peer relations that they should stop their proxy services.
Peer units will then be expected to do a relation_set with
stop-proxy-service-ack set rq value. Once all peers have responded, the
leader will send out notification to all relations that rings are available
for sync.
If peers_only is True, only peer units will be synced. This is typically
used when only builder files have been changed.
This should only be called by the leader unit.
"""
if not is_elected_leader(SWIFT_HA_RES):
# Only the leader can do this.
return
if not peer_units():
# If we have no peer units just go ahead and broadcast to storage
# relations. If we have been instructed to only broadcast to peers this
# should do nothing.
broker_token = get_broker_token()
broadcast_rings_available(broker_token, peers=False,
storage=not peers_only)
return
elif builders_only:
# No need to stop proxies if only syncing builders between peers.
broker_token = get_broker_token()
broadcast_rings_available(broker_token, storage=False,
builders_only=builders_only)
return
rel_ids = relation_ids('cluster')
trigger = str(uuid.uuid4())
log("Sending request to stop proxy service to all peers (%s)" % (trigger),
level=INFO)
rq = SwiftProxyClusterRPC().stop_proxy_request(peers_only)
for rid in rel_ids:
relation_set(relation_id=rid, relation_settings=rq)
def notify_storage_rings_available():
"""Notify peer swift-storage relations that they should synchronise ring
and builder files.
Note that this should only be called from the leader unit.
"""
if not is_elected_leader(SWIFT_HA_RES):
log("Ring availability storage-relation broadcast requested by "
"non-leader - skipping", level=WARNING)
return
if is_clustered():
hostname = config('vip')
else:
hostname = get_hostaddr()
hostname = format_ipv6_addr(hostname) or hostname
path = os.path.basename(get_www_dir())
rings_url = 'http://%s/%s' % (hostname, path)
trigger = uuid.uuid4()
# Notify storage nodes that there is a new ring to fetch.
log("Notifying storage nodes that new ring is ready for sync.", level=INFO)
for relid in relation_ids('swift-storage'):
relation_set(relation_id=relid, swift_hash=get_swift_hash(),
rings_url=rings_url, trigger=trigger)
def fully_synced():
"""Check that we have all the rings and builders synced from the leader.
Returns True if we have all rings and builders.
"""
not_synced = []
for ring, builder in SWIFT_RINGS.iteritems():
if not os.path.exists(builder):
not_synced.append(builder)
ringfile = os.path.join(SWIFT_CONF_DIR,
'%s.%s' % (ring, SWIFT_RING_EXT))
if not os.path.exists(ringfile):
not_synced.append(ringfile)
if not_synced:
log("Not yet synced: %s" % ', '.join(not_synced), level=INFO)
return False
return True
def get_hostaddr():
if config('prefer-ipv6'):
return get_ipv6_addr(exc_list=[config('vip')])[0]
return unit_get('private-address')

View File

@ -368,7 +368,7 @@ class SwiftProxyBasicDeployment(OpenStackAmuletDeployment):
expected = {
'DEFAULT': {
'bind_port': '8080',
'bind_port': '8070',
'workers': '0',
'user': 'swift'
},
@ -461,7 +461,7 @@ class SwiftProxyBasicDeployment(OpenStackAmuletDeployment):
expected = {
'DEFAULT': {
'bind_port': '8080',
'bind_port': '8070',
'workers': '0',
'user': 'swift'
},
@ -546,7 +546,7 @@ class SwiftProxyBasicDeployment(OpenStackAmuletDeployment):
expected = {
'DEFAULT': {
'bind_port': '8080',
'bind_port': '8070',
'workers': '0',
'user': 'swift'
},
@ -631,7 +631,7 @@ class SwiftProxyBasicDeployment(OpenStackAmuletDeployment):
expected = {
'DEFAULT': {
'bind_port': '8080',
'bind_port': '8070',
'workers': '0',
'user': 'swift'
},
@ -712,7 +712,7 @@ class SwiftProxyBasicDeployment(OpenStackAmuletDeployment):
expected = {
'DEFAULT': {
'bind_port': '8080',
'bind_port': '8070',
'workers': '0',
'user': 'swift'
},

View File

@ -0,0 +1,22 @@
# Bootstrap charm-helpers, installing its dependencies if necessary using
# only standard libraries.
import subprocess
import sys
try:
import six # flake8: noqa
except ImportError:
if sys.version_info.major == 2:
subprocess.check_call(['apt-get', 'install', '-y', 'python-six'])
else:
subprocess.check_call(['apt-get', 'install', '-y', 'python3-six'])
import six # flake8: noqa
try:
import yaml # flake8: noqa
except ImportError:
if sys.version_info.major == 2:
subprocess.check_call(['apt-get', 'install', '-y', 'python-yaml'])
else:
subprocess.check_call(['apt-get', 'install', '-y', 'python3-yaml'])
import yaml # flake8: noqa

View File

@ -1,6 +1,6 @@
import amulet
import os
import six
class AmuletDeployment(object):
@ -52,12 +52,12 @@ class AmuletDeployment(object):
def _add_relations(self, relations):
"""Add all of the relations for the services."""
for k, v in relations.iteritems():
for k, v in six.iteritems(relations):
self.d.relate(k, v)
def _configure_services(self, configs):
"""Configure all of the services."""
for service, config in configs.iteritems():
for service, config in six.iteritems(configs):
self.d.configure(service, config)
def _deploy(self):

View File

@ -5,6 +5,8 @@ import re
import sys
import time
import six
class AmuletUtils(object):
"""Amulet utilities.
@ -58,7 +60,7 @@ class AmuletUtils(object):
Verify the specified services are running on the corresponding
service units.
"""
for k, v in commands.iteritems():
for k, v in six.iteritems(commands):
for cmd in v:
output, code = k.run(cmd)
if code != 0:
@ -100,11 +102,11 @@ class AmuletUtils(object):
longs, or can be a function that evaluate a variable and returns a
bool.
"""
for k, v in expected.iteritems():
for k, v in six.iteritems(expected):
if k in actual:
if (isinstance(v, basestring) or
if (isinstance(v, six.string_types) or
isinstance(v, bool) or
isinstance(v, (int, long))):
isinstance(v, six.integer_types)):
if v != actual[k]:
return "{}:{}".format(k, actual[k])
elif not v(actual[k]):

View File

@ -1,3 +1,4 @@
import six
from charmhelpers.contrib.amulet.deployment import (
AmuletDeployment
)
@ -69,7 +70,7 @@ class OpenStackAmuletDeployment(AmuletDeployment):
def _configure_services(self, configs):
"""Configure all of the services."""
for service, config in configs.iteritems():
for service, config in six.iteritems(configs):
self.d.configure(service, config)
def _get_openstack_release(self):

View File

@ -7,6 +7,8 @@ import glanceclient.v1.client as glance_client
import keystoneclient.v2_0 as keystone_client
import novaclient.v1_1.client as nova_client
import six
from charmhelpers.contrib.amulet.utils import (
AmuletUtils
)
@ -60,7 +62,7 @@ class OpenStackAmuletUtils(AmuletUtils):
expected service catalog endpoints.
"""
self.log.debug('actual: {}'.format(repr(actual)))
for k, v in expected.iteritems():
for k, v in six.iteritems(expected):
if k in actual:
ret = self._validate_dict_data(expected[k][0], actual[k][0])
if ret:

View File

@ -0,0 +1,56 @@
import mock
import os
import tempfile
import unittest
import uuid
with mock.patch('charmhelpers.core.hookenv.config'):
import swift_context
class SwiftContextTestCase(unittest.TestCase):
@mock.patch('swift_context.config')
def test_get_swift_hash_file(self, mock_config):
expected = '##FILEHASH##'
with tempfile.NamedTemporaryFile() as tmpfile:
swift_context.SWIFT_HASH_FILE = tmpfile.name
tmpfile.write(expected)
tmpfile.seek(0)
os.fsync(tmpfile)
hash = swift_context.get_swift_hash()
self.assertFalse(mock_config.called)
self.assertEqual(expected, hash)
@mock.patch('swift_context.config')
def test_get_swift_hash_config(self, mock_config):
expected = '##CFGHASH##'
mock_config.return_value = expected
tmpfile = tempfile.mktemp()
swift_context.SWIFT_HASH_FILE = tmpfile
hash = swift_context.get_swift_hash()
with open(tmpfile, 'r') as fd:
self.assertEqual(expected, fd.read())
self.assertTrue(mock_config.called)
self.assertEqual(expected, hash)
@mock.patch('swift_context.service_name')
@mock.patch('swift_context.config')
def test_get_swift_hash_env(self, mock_config, mock_service_name):
mock_config.return_value = None
mock_service_name.return_value = "testsvc"
tmpfile = tempfile.mktemp()
swift_context.SWIFT_HASH_FILE = tmpfile
with mock.patch('swift_context.os.environ.get') as mock_env_get:
mock_env_get.return_value = str(uuid.uuid4())
hash = swift_context.get_swift_hash()
mock_env_get.assert_called_with('JUJU_ENV_UUID')
with open(tmpfile, 'r') as fd:
self.assertEqual(hash, fd.read())
self.assertTrue(mock_config.called)

View File

@ -0,0 +1,39 @@
import mock
import unittest
import uuid
with mock.patch('charmhelpers.core.hookenv.log'):
import swift_hooks
class SwiftHooksTestCase(unittest.TestCase):
@mock.patch("swift_hooks.relation_get")
@mock.patch("swift_hooks.local_unit")
def test_all_peers_stopped(self, mock_local_unit, mock_relation_get):
token1 = str(uuid.uuid4())
token2 = str(uuid.uuid4())
mock_relation_get.return_value = token1
responses = [{'some-other-key': token1}]
self.assertFalse(swift_hooks.all_peers_stopped(responses))
responses = [{'stop-proxy-service-ack': token1},
{'stop-proxy-service-ack': token2}]
self.assertFalse(swift_hooks.all_peers_stopped(responses))
responses = [{'stop-proxy-service-ack': token1},
{'stop-proxy-service-ack': token1},
{'some-other-key': token1}]
self.assertFalse(swift_hooks.all_peers_stopped(responses))
responses = [{'stop-proxy-service-ack': token1},
{'stop-proxy-service-ack': token1}]
self.assertTrue(swift_hooks.all_peers_stopped(responses))
mock_relation_get.return_value = token2
responses = [{'stop-proxy-service-ack': token1},
{'stop-proxy-service-ack': token1}]
self.assertFalse(swift_hooks.all_peers_stopped(responses))

View File

@ -0,0 +1,225 @@
import mock
import os
import shutil
import tempfile
import uuid
import unittest
with mock.patch('charmhelpers.core.hookenv.config'):
import swift_utils
def init_ring_paths(tmpdir):
swift_utils.SWIFT_CONF_DIR = tmpdir
for ring in swift_utils.SWIFT_RINGS.iterkeys():
path = os.path.join(tmpdir, "%s.builder" % ring)
swift_utils.SWIFT_RINGS[ring] = path
with open(path, 'w') as fd:
fd.write("0\n")
class SwiftUtilsTestCase(unittest.TestCase):
@mock.patch('swift_utils.get_broker_token')
@mock.patch('swift_utils.update_www_rings')
@mock.patch('swift_utils.get_builders_checksum')
@mock.patch('swift_utils.get_rings_checksum')
@mock.patch('swift_utils.balance_rings')
@mock.patch('swift_utils.log')
@mock.patch('swift_utils.os.path.exists')
@mock.patch('swift_utils.is_elected_leader')
@mock.patch('swift_utils.get_min_part_hours')
@mock.patch('swift_utils.set_min_part_hours')
def test_update_rings(self, mock_set_min_hours,
mock_get_min_hours,
mock_is_elected_leader, mock_path_exists,
mock_log, mock_balance_rings,
mock_get_rings_checksum,
mock_get_builders_checksum, mock_update_www_rings,
mock_get_broker_token):
mock_get_broker_token.return_value = "token1"
# Make sure same is returned for both so that we don't try to sync
mock_get_rings_checksum.return_value = None
mock_get_builders_checksum.return_value = None
# Test blocker 1
mock_is_elected_leader.return_value = False
swift_utils.update_rings()
self.assertFalse(mock_balance_rings.called)
# Test blocker 2
mock_path_exists.return_value = False
mock_is_elected_leader.return_value = True
swift_utils.update_rings()
self.assertFalse(mock_get_min_hours.called)
self.assertFalse(mock_balance_rings.called)
# Test blocker 3
mock_path_exists.return_value = True
mock_is_elected_leader.return_value = True
mock_get_min_hours.return_value = 10
swift_utils.update_rings(min_part_hours=10)
self.assertTrue(mock_get_min_hours.called)
self.assertFalse(mock_set_min_hours.called)
self.assertFalse(mock_balance_rings.called)
mock_get_min_hours.reset_mock()
# Test go through
mock_path_exists.return_value = True
mock_is_elected_leader.return_value = True
mock_get_min_hours.return_value = 0
swift_utils.update_rings(min_part_hours=10)
self.assertTrue(mock_get_min_hours.called)
self.assertTrue(mock_set_min_hours.called)
self.assertTrue(mock_balance_rings.called)
@mock.patch('swift_utils.get_broker_token')
@mock.patch('swift_utils.balance_rings')
@mock.patch('swift_utils.log')
@mock.patch('swift_utils.is_elected_leader')
@mock.patch('swift_utils.config')
@mock.patch('swift_utils.update_www_rings')
@mock.patch('swift_utils.cluster_sync_rings')
def test_sync_builders_and_rings_if_changed(self, mock_cluster_sync_rings,
mock_update_www_rings,
mock_config,
mock_is_elected_leader,
mock_log,
mock_balance_rings,
mock_get_broker_token):
mock_get_broker_token.return_value = "token1"
@swift_utils.sync_builders_and_rings_if_changed
def mock_balance():
for ring, builder in swift_utils.SWIFT_RINGS.iteritems():
ring = os.path.join(swift_utils.SWIFT_CONF_DIR,
'%s.ring.gz' % ring)
with open(ring, 'w') as fd:
fd.write(str(uuid.uuid4()))
with open(builder, 'w') as fd:
fd.write(str(uuid.uuid4()))
mock_balance_rings.side_effect = mock_balance
init_ring_paths(tempfile.mkdtemp())
try:
swift_utils.balance_rings()
finally:
shutil.rmtree(swift_utils.SWIFT_CONF_DIR)
self.assertTrue(mock_update_www_rings.called)
self.assertTrue(mock_cluster_sync_rings.called)
@mock.patch('swift_utils.get_www_dir')
def test_mark_www_rings_deleted(self, mock_get_www_dir):
try:
tmpdir = tempfile.mkdtemp()
mock_get_www_dir.return_value = tmpdir
swift_utils.mark_www_rings_deleted()
finally:
shutil.rmtree(tmpdir)
@mock.patch('swift_utils.uuid')
def test_cluster_rpc_stop_proxy_request(self, mock_uuid):
mock_uuid.uuid4.return_value = 'test-uuid'
rpc = swift_utils.SwiftProxyClusterRPC()
rq = rpc.stop_proxy_request(peers_only=True)
self.assertEqual({'trigger': 'test-uuid',
'broker-token': None,
'builder-broker': None,
'peers-only': True,
'leader-changed-notification': None,
'stop-proxy-service': 'test-uuid',
'stop-proxy-service-ack': None,
'sync-only-builders': None}, rq)
rq = rpc.stop_proxy_request()
self.assertEqual({'trigger': 'test-uuid',
'broker-token': None,
'builder-broker': None,
'peers-only': None,
'leader-changed-notification': None,
'stop-proxy-service': 'test-uuid',
'stop-proxy-service-ack': None,
'sync-only-builders': None}, rq)
@mock.patch('swift_utils.uuid')
def test_cluster_rpc_stop_proxy_ack(self, mock_uuid):
mock_uuid.uuid4.return_value = 'token2'
rpc = swift_utils.SwiftProxyClusterRPC()
rq = rpc.stop_proxy_ack(echo_token='token1', echo_peers_only='1')
self.assertEqual({'trigger': 'token2',
'broker-token': None,
'builder-broker': None,
'peers-only': '1',
'leader-changed-notification': None,
'stop-proxy-service': None,
'stop-proxy-service-ack': 'token1',
'sync-only-builders': None}, rq)
@mock.patch('swift_utils.uuid')
def test_cluster_rpc_sync_request(self, mock_uuid):
mock_uuid.uuid4.return_value = 'token2'
rpc = swift_utils.SwiftProxyClusterRPC()
rq = rpc.sync_rings_request('HostA', 'token1')
self.assertEqual({'trigger': 'token2',
'broker-token': 'token1',
'builder-broker': 'HostA',
'peers-only': None,
'leader-changed-notification': None,
'stop-proxy-service': None,
'stop-proxy-service-ack': None,
'sync-only-builders': None}, rq)
@mock.patch('swift_utils.uuid')
def test_cluster_rpc_notify_leader_changed(self, mock_uuid):
mock_uuid.uuid4.return_value = 'token1'
rpc = swift_utils.SwiftProxyClusterRPC()
rq = rpc.notify_leader_changed()
self.assertEqual({'trigger': 'token1',
'broker-token': None,
'builder-broker': None,
'peers-only': None,
'leader-changed-notification': 'token1',
'stop-proxy-service': None,
'stop-proxy-service-ack': None,
'sync-only-builders': None}, rq)
def test_all_responses_equal(self):
responses = [{'a': 1, 'c': 3}]
self.assertTrue(swift_utils.all_responses_equal(responses, 'b',
must_exist=False))
responses = [{'a': 1, 'c': 3}]
self.assertFalse(swift_utils.all_responses_equal(responses, 'b'))
responses = [{'a': 1, 'b': 2, 'c': 3}]
self.assertTrue(swift_utils.all_responses_equal(responses, 'b'))
responses = [{'a': 1, 'b': 2, 'c': 3}, {'a': 1, 'b': 2, 'c': 3}]
self.assertTrue(swift_utils.all_responses_equal(responses, 'b'))
responses = [{'a': 1, 'b': 2, 'c': 3}, {'a': 2, 'b': 2, 'c': 3}]
self.assertTrue(swift_utils.all_responses_equal(responses, 'b'))
responses = [{'a': 1, 'b': 2, 'c': 3}, {'a': 1, 'b': 3, 'c': 3}]
self.assertFalse(swift_utils.all_responses_equal(responses, 'b'))
def test_get_first_available_value(self):
rsps = [{'key1': 'A'}, {'key1': 'B'}]
self.assertEqual('A',
swift_utils.get_first_available_value(rsps, 'key1'))
rsps = [{'key2': 'A'}, {'key1': 'B'}]
self.assertEqual('B',
swift_utils.get_first_available_value(rsps, 'key1'))
rsps = [{'key2': 'A'}, {'key1': 'B'}]
self.assertIsNone(swift_utils.get_first_available_value(rsps, 'key3'))
rsps = []
self.assertIsNone(swift_utils.get_first_available_value(rsps, 'key3'))