luminous: ceph-volume switch

Switch to using ceph-volume + LVM for managing block devices
for Luminous and later; this is the upstream preferred approach
to managing OSD devices, allowing for more flexibility in terms
of use of crypto and logical volumes.

Change-Id: I30c4d29e6f568ac2e30a45b1a7bc0e68685c3707
Depends-On: I1675b67d364ae6042129a8a717d4bdffff5bde92
This commit is contained in:
James Page 2018-03-23 13:23:27 +00:00
parent e179d36802
commit b6dca11a1b
13 changed files with 743 additions and 131 deletions

View File

@ -3,7 +3,10 @@
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 2.7</pydev_property>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property>
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
<path>/ceph-osd/hooks</path>
<path>/ceph-osd/unit_tests</path>
<path>/${PROJECT_DIR_NAME}/lib</path>
<path>/${PROJECT_DIR_NAME}/unit_tests</path>
<path>/${PROJECT_DIR_NAME}/tests</path>
<path>/${PROJECT_DIR_NAME}/hooks</path>
<path>/${PROJECT_DIR_NAME}/actions</path>
</pydev_pathproperty>
</pydev_project>

View File

@ -65,7 +65,8 @@ def get_ca_cert():
if ca_cert is None:
log("Inspecting identity-service relations for CA SSL certificate.",
level=INFO)
for r_id in relation_ids('identity-service'):
for r_id in (relation_ids('identity-service') +
relation_ids('identity-credentials')):
for unit in relation_list(r_id):
if ca_cert is None:
ca_cert = relation_get('ca_cert',

View File

@ -384,6 +384,7 @@ class IdentityServiceContext(OSContextGenerator):
# so a missing value just indicates keystone needs
# upgrading
ctxt['admin_tenant_id'] = rdata.get('service_tenant_id')
ctxt['admin_domain_id'] = rdata.get('service_domain_id')
return ctxt
return {}

View File

@ -182,7 +182,7 @@ SWIFT_CODENAMES = OrderedDict([
('pike',
['2.13.0', '2.15.0']),
('queens',
['2.16.0']),
['2.16.0', '2.17.0']),
])
# >= Liberty version->codename mapping

View File

@ -151,3 +151,32 @@ def extend_logical_volume_by_device(lv_name, block_device):
'''
cmd = ['lvextend', lv_name, block_device]
check_call(cmd)
def create_logical_volume(lv_name, volume_group, size=None):
'''
Create a new logical volume in an existing volume group
:param lv_name: str: name of logical volume to be created.
:param volume_group: str: Name of volume group to use for the new volume.
:param size: str: Size of logical volume to create (100% if not supplied)
:raises subprocess.CalledProcessError: in the event that the lvcreate fails.
'''
if size:
check_call([
'lvcreate',
'--yes',
'-L',
'{}'.format(size),
'-n', lv_name, volume_group
])
# create the lv with all the space available, this is needed because the
# system call is different for LVM
else:
check_call([
'lvcreate',
'--yes',
'-l',
'100%FREE',
'-n', lv_name, volume_group
])

View File

@ -27,6 +27,7 @@ import glob
import os
import json
import yaml
import re
import subprocess
import sys
import errno
@ -67,7 +68,7 @@ def cached(func):
@wraps(func)
def wrapper(*args, **kwargs):
global cache
key = str((func, args, kwargs))
key = json.dumps((func, args, kwargs), sort_keys=True, default=str)
try:
return cache[key]
except KeyError:
@ -1043,7 +1044,6 @@ def juju_version():
universal_newlines=True).strip()
@cached
def has_juju_version(minimum_version):
"""Return True if the Juju version is at least the provided version"""
return LooseVersion(juju_version()) >= LooseVersion(minimum_version)
@ -1103,6 +1103,8 @@ def _run_atexit():
@translate_exc(from_exc=OSError, to_exc=NotImplementedError)
def network_get_primary_address(binding):
'''
Deprecated since Juju 2.3; use network_get()
Retrieve the primary network address for a named binding
:param binding: string. The name of a relation of extra-binding
@ -1123,7 +1125,6 @@ def network_get_primary_address(binding):
return response
@translate_exc(from_exc=OSError, to_exc=NotImplementedError)
def network_get(endpoint, relation_id=None):
"""
Retrieve the network details for a relation endpoint
@ -1131,24 +1132,20 @@ def network_get(endpoint, relation_id=None):
:param endpoint: string. The name of a relation endpoint
:param relation_id: int. The ID of the relation for the current context.
:return: dict. The loaded YAML output of the network-get query.
:raise: NotImplementedError if run on Juju < 2.1
:raise: NotImplementedError if request not supported by the Juju version.
"""
if not has_juju_version('2.2'):
raise NotImplementedError(juju_version()) # earlier versions require --primary-address
if relation_id and not has_juju_version('2.3'):
raise NotImplementedError # 2.3 added the -r option
cmd = ['network-get', endpoint, '--format', 'yaml']
if relation_id:
cmd.append('-r')
cmd.append(relation_id)
try:
response = subprocess.check_output(
cmd,
stderr=subprocess.STDOUT).decode('UTF-8').strip()
except CalledProcessError as e:
# Early versions of Juju 2.0.x required the --primary-address argument.
# We catch that condition here and raise NotImplementedError since
# the requested semantics are not available - the caller can then
# use the network_get_primary_address() method instead.
if '--primary-address is currently required' in e.output.decode('UTF-8'):
raise NotImplementedError
raise
response = subprocess.check_output(
cmd,
stderr=subprocess.STDOUT).decode('UTF-8').strip()
return yaml.safe_load(response)
@ -1204,9 +1201,23 @@ def iter_units_for_relation_name(relation_name):
def ingress_address(rid=None, unit=None):
"""
Retrieve the ingress-address from a relation when available. Otherwise,
return the private-address. This function is to be used on the consuming
side of the relation.
Retrieve the ingress-address from a relation when available.
Otherwise, return the private-address.
When used on the consuming side of the relation (unit is a remote
unit), the ingress-address is the IP address that this unit needs
to use to reach the provided service on the remote unit.
When used on the providing side of the relation (unit == local_unit()),
the ingress-address is the IP address that is advertised to remote
units on this relation. Remote units need to use this address to
reach the local provided service on this unit.
Note that charms may document some other method to use in
preference to the ingress_address(), such as an address provided
on a different relation attribute or a service discovery mechanism.
This allows charms to redirect inbound connections to their peers
or different applications such as load balancers.
Usage:
addresses = [ingress_address(rid=u.rid, unit=u.unit)
@ -1220,3 +1231,40 @@ def ingress_address(rid=None, unit=None):
settings = relation_get(rid=rid, unit=unit)
return (settings.get('ingress-address') or
settings.get('private-address'))
def egress_subnets(rid=None, unit=None):
"""
Retrieve the egress-subnets from a relation.
This function is to be used on the providing side of the
relation, and provides the ranges of addresses that client
connections may come from. The result is uninteresting on
the consuming side of a relation (unit == local_unit()).
Returns a stable list of subnets in CIDR format.
eg. ['192.168.1.0/24', '2001::F00F/128']
If egress-subnets is not available, falls back to using the published
ingress-address, or finally private-address.
:param rid: string relation id
:param unit: string unit name
:side effect: calls relation_get
:return: list of subnets in CIDR format. eg. ['192.168.1.0/24', '2001::F00F/128']
"""
def _to_range(addr):
if re.search(r'^(?:\d{1,3}\.){3}\d{1,3}$', addr) is not None:
addr += '/32'
elif ':' in addr and '/' not in addr: # IPv6
addr += '/128'
return addr
settings = relation_get(rid=rid, unit=unit)
if 'egress-subnets' in settings:
return [n.strip() for n in settings['egress-subnets'].split(',') if n.strip()]
if 'ingress-address' in settings:
return [_to_range(settings['ingress-address'])]
if 'private-address' in settings:
return [_to_range(settings['private-address'])]
return [] # Should never happen

View File

@ -313,17 +313,26 @@ class PortManagerCallback(ManagerCallback):
with open(port_file) as fp:
old_ports = fp.read().split(',')
for old_port in old_ports:
if bool(old_port):
old_port = int(old_port)
if old_port not in new_ports:
hookenv.close_port(old_port)
if bool(old_port) and not self.ports_contains(old_port, new_ports):
hookenv.close_port(old_port)
with open(port_file, 'w') as fp:
fp.write(','.join(str(port) for port in new_ports))
for port in new_ports:
# A port is either a number or 'ICMP'
protocol = 'TCP'
if str(port).upper() == 'ICMP':
protocol = 'ICMP'
if event_name == 'start':
hookenv.open_port(port)
hookenv.open_port(port, protocol)
elif event_name == 'stop':
hookenv.close_port(port)
hookenv.close_port(port, protocol)
def ports_contains(self, port, ports):
if not bool(port):
return False
if str(port).upper() != 'ICMP':
port = int(port)
return port in ports
def service_stop(service_name):

View File

@ -15,6 +15,7 @@
import collections
import ctypes
import errno
import glob
import json
import os
import pyudev
@ -25,6 +26,7 @@ import subprocess
import sys
import time
import shutil
import uuid
from datetime import datetime
@ -73,6 +75,7 @@ from charmhelpers.contrib.storage.linux.utils import (
from charmhelpers.contrib.openstack.utils import (
get_os_codename_install_source,
)
from charmhelpers.contrib.storage.linux import lvm
CEPH_BASE_DIR = os.path.join(os.sep, 'var', 'lib', 'ceph')
OSD_BASE_DIR = os.path.join(CEPH_BASE_DIR, 'osd')
@ -83,7 +86,8 @@ PEON = 'peon'
QUORUM = [LEADER, PEON]
PACKAGES = ['ceph', 'gdisk', 'ntp', 'btrfs-tools', 'python-ceph',
'radosgw', 'xfsprogs', 'python-pyudev']
'radosgw', 'xfsprogs', 'python-pyudev',
'lvm2', 'parted']
LinkSpeed = {
"BASE_10": 10,
@ -1358,10 +1362,17 @@ def add_keyring_to_ceph(keyring, secret, hostname, path, done, init_marker):
# admin keys for the cluster; this command
# will wait for quorum in the cluster before
# returning.
cmd = ['ceph-create-keys', '--id', hostname]
# NOTE(fnordahl): The default timeout in ceph-create-keys of 600
# seconds is not adequate for all situations.
# LP#1719436
cmd = ['ceph-create-keys', '--id', hostname, '--timeout', '1800']
subprocess.check_call(cmd)
osstat = os.stat("/etc/ceph/ceph.client.admin.keyring")
_client_admin_keyring = '/etc/ceph/ceph.client.admin.keyring'
osstat = os.stat(_client_admin_keyring)
if not osstat.st_size:
# NOTE(fnordahl): Retry will fail as long as this file exists.
# LP#1719436
os.remove(_client_admin_keyring)
raise Exception
@ -1399,17 +1410,36 @@ def get_partitions(dev):
return []
def find_least_used_utility_device(utility_devices):
def get_lvs(dev):
"""
List logical volumes for the provided block device
:param: dev: Full path to block device.
:raises subprocess.CalledProcessError: in the event that any supporting
operation failed.
:returns: list: List of logical volumes provided by the block device
"""
pv_dev = _partition_name(dev)
if not lvm.is_lvm_physical_volume(pv_dev):
return []
vg_name = lvm.list_lvm_volume_group(pv_dev)
return lvm.list_logical_volumes('vg_name={}'.format(vg_name))
def find_least_used_utility_device(utility_devices, lvs=False):
"""
Find a utility device which has the smallest number of partitions
among other devices in the supplied list.
:utility_devices: A list of devices to be used for filestore journal
or bluestore wal or db.
:lvs: flag to indicate whether inspection should be based on LVM LV's
:return: string device name
"""
usages = map(lambda a: (len(get_partitions(a)), a), utility_devices)
if lvs:
usages = map(lambda a: (len(get_lvs(a)), a), utility_devices)
else:
usages = map(lambda a: (len(get_partitions(a)), a), utility_devices)
least = min(usages, key=lambda t: t[0])
return least[1]
@ -1460,49 +1490,28 @@ def osdize_dev(dev, osd_format, osd_journal, reformat_osd=False,
log('Looks like {} is in use, skipping.'.format(dev))
return
status_set('maintenance', 'Initializing device {}'.format(dev))
cmd = ['ceph-disk', 'prepare']
# Later versions of ceph support more options
if cmp_pkgrevno('ceph', '0.60') >= 0:
if encrypt:
cmd.append('--dmcrypt')
if cmp_pkgrevno('ceph', '0.48.3') >= 0:
if osd_format and not bluestore:
cmd.append('--fs-type')
cmd.append(osd_format)
if is_active_bluestore_device(dev):
log('{} is in use as an active bluestore block device,'
' skipping.'.format(dev))
return
if reformat_osd:
cmd.append('--zap-disk')
if reformat_osd:
zap_disk(dev)
# NOTE(jamespage): enable experimental bluestore support
if cmp_pkgrevno('ceph', '10.2.0') >= 0 and bluestore:
cmd.append('--bluestore')
wal = get_devices('bluestore-wal')
if wal:
cmd.append('--block.wal')
least_used_wal = find_least_used_utility_device(wal)
cmd.append(least_used_wal)
db = get_devices('bluestore-db')
if db:
cmd.append('--block.db')
least_used_db = find_least_used_utility_device(db)
cmd.append(least_used_db)
elif cmp_pkgrevno('ceph', '12.1.0') >= 0 and not bluestore:
cmd.append('--filestore')
cmd.append(dev)
if osd_journal:
least_used = find_least_used_utility_device(osd_journal)
cmd.append(least_used)
if cmp_pkgrevno('ceph', '12.2.4') >= 0:
cmd = _ceph_volume(dev,
osd_journal,
encrypt,
bluestore)
else:
# Just provide the device - no other options
# for older versions of ceph
cmd.append(dev)
if reformat_osd:
zap_disk(dev)
cmd = _ceph_disk(dev,
osd_format,
osd_journal,
encrypt,
bluestore)
try:
status_set('maintenance', 'Initializing device {}'.format(dev))
log("osdize cmd: {}".format(cmd))
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
@ -1513,6 +1522,289 @@ def osdize_dev(dev, osd_format, osd_journal, reformat_osd=False,
raise
def _ceph_disk(dev, osd_format, osd_journal, encrypt=False, bluestore=False):
"""
Prepare a device for usage as a Ceph OSD using ceph-disk
:param: dev: Full path to use for OSD block device setup
:param: osd_journal: List of block devices to use for OSD journals
:param: encrypt: Use block device encryption (unsupported)
:param: bluestore: Use bluestore storage for OSD
:returns: list. 'ceph-disk' command and required parameters for
execution by check_call
"""
cmd = ['ceph-disk', 'prepare']
if encrypt:
cmd.append('--dmcrypt')
if osd_format and not bluestore:
cmd.append('--fs-type')
cmd.append(osd_format)
# NOTE(jamespage): enable experimental bluestore support
if cmp_pkgrevno('ceph', '10.2.0') >= 0 and bluestore:
cmd.append('--bluestore')
wal = get_devices('bluestore-wal')
if wal:
cmd.append('--block.wal')
least_used_wal = find_least_used_utility_device(wal)
cmd.append(least_used_wal)
db = get_devices('bluestore-db')
if db:
cmd.append('--block.db')
least_used_db = find_least_used_utility_device(db)
cmd.append(least_used_db)
elif cmp_pkgrevno('ceph', '12.1.0') >= 0 and not bluestore:
cmd.append('--filestore')
cmd.append(dev)
if osd_journal:
least_used = find_least_used_utility_device(osd_journal)
cmd.append(least_used)
return cmd
def _ceph_volume(dev, osd_journal, encrypt=False, bluestore=False):
"""
Prepare and activate a device for usage as a Ceph OSD using ceph-volume.
This also includes creation of all PV's, VG's and LV's required to
support the initialization of the OSD.
:param: dev: Full path to use for OSD block device setup
:param: osd_journal: List of block devices to use for OSD journals
:param: encrypt: Use block device encryption
:param: bluestore: Use bluestore storage for OSD
:raises subprocess.CalledProcessError: in the event that any supporting
LVM operation failed.
:returns: list. 'ceph-volume' command and required parameters for
execution by check_call
"""
cmd = ['ceph-volume', 'lvm', 'create']
osd_fsid = str(uuid.uuid4())
cmd.append('--osd-fsid')
cmd.append(osd_fsid)
if bluestore:
cmd.append('--bluestore')
main_device_type = 'block'
else:
cmd.append('--filestore')
main_device_type = 'data'
if encrypt:
cmd.append('--dmcrypt')
# On-disk journal volume creation
if not osd_journal and not bluestore:
journal_lv_type = 'journal'
cmd.append('--journal')
cmd.append(_allocate_logical_volume(
dev,
journal_lv_type,
osd_fsid,
size='{}M'.format(calculate_volume_size('journal')))
)
cmd.append('--data')
cmd.append(_allocate_logical_volume(dev,
main_device_type,
osd_fsid))
if bluestore:
for extra_volume in ('wal', 'db'):
devices = get_devices('bluestore-{}'.format(extra_volume))
if devices:
cmd.append('--block.{}'.format(extra_volume))
least_used = find_least_used_utility_device(devices,
lvs=True)
cmd.append(_allocate_logical_volume(
least_used,
extra_volume,
osd_fsid,
size='{}M'.format(calculate_volume_size(extra_volume)),
shared=True)
)
elif osd_journal:
cmd.append('--journal')
least_used = find_least_used_utility_device(osd_journal,
lvs=True)
cmd.append(_allocate_logical_volume(
least_used,
'journal',
osd_fsid,
size='{}M'.format(calculate_volume_size('journal')),
shared=True)
)
return cmd
def _partition_name(dev):
"""
Derive the first partition name for a block device
:param: dev: Full path to block device.
:returns: str: Full path to first partition on block device.
"""
if dev[-1].isdigit():
return '{}p1'.format(dev)
else:
return '{}1'.format(dev)
# TODO(jamespage): Deal with lockbox encrypted bluestore devices.
def is_active_bluestore_device(dev):
"""
Determine whether provided device is part of an active
bluestore based OSD (as its block component).
:param: dev: Full path to block device to check for Bluestore usage.
:returns: boolean: indicating whether device is in active use.
"""
pv_dev = _partition_name(dev)
if not lvm.is_lvm_physical_volume(pv_dev):
return False
vg_name = lvm.list_lvm_volume_group(pv_dev)
lv_name = lvm.list_logical_volumes('vg_name={}'.format(vg_name))[0]
block_symlinks = glob.glob('/var/lib/ceph/osd/ceph-*/block')
for block_candidate in block_symlinks:
if os.path.islink(block_candidate):
target = os.readlink(block_candidate)
if target.endswith(lv_name):
return True
return False
def get_conf(variable):
"""
Get the value of the given configuration variable from the
cluster.
:param variable: ceph configuration variable
:returns: str. configured value for provided variable
"""
return subprocess.check_output([
'ceph-osd',
'--show-config-value={}'.format(variable),
]).strip()
def calculate_volume_size(lv_type):
"""
Determine the configured size for Bluestore DB/WAL or
Filestore Journal devices
:param lv_type: volume type (db, wal or journal)
:raises KeyError: if invalid lv_type is supplied
:returns: int. Configured size in megabytes for volume type
"""
# lv_type -> ceph configuration option
_config_map = {
'db': 'bluestore_block_db_size',
'wal': 'bluestore_block_wal_size',
'journal': 'osd_journal_size',
}
# default sizes in MB
_default_size = {
'db': 1024,
'wal': 576,
'journal': 1024,
}
# conversion of ceph config units to MB
_units = {
'db': 1048576, # Bytes -> MB
'wal': 1048576, # Bytes -> MB
'journal': 1, # Already in MB
}
configured_size = get_conf(_config_map[lv_type])
if configured_size is None or int(configured_size) == 0:
return _default_size[lv_type]
else:
return int(configured_size) / _units[lv_type]
def _initialize_disk(dev):
"""
Initialize a raw block device with a single paritition
consuming 100% of the avaliable disk space.
Function assumes that block device has already been wiped.
:param: dev: path to block device to initialize
:raises: subprocess.CalledProcessError: if any parted calls fail
:returns: str: Full path to new partition.
"""
partition = _partition_name(dev)
if not os.path.exists(partition):
subprocess.check_call([
'parted', '--script',
dev,
'mklabel',
'gpt',
])
subprocess.check_call([
'parted', '--script',
dev,
'mkpart',
'primary', '1', '100%',
])
return partition
def _allocate_logical_volume(dev, lv_type, osd_fsid,
size=None, shared=False):
"""
Allocate a logical volume from a block device, ensuring any
required initialization and setup of PV's and VG's to support
the LV.
:param: dev: path to block device to allocate from.
:param: lv_type: logical volume type to create
(data, block, journal, wal, db)
:param: osd_fsid: UUID of the OSD associate with the LV
:param: size: Size in LVM format for the device;
if unset 100% of VG
:param: shared: Shared volume group (journal, wal, db)
:raises subprocess.CalledProcessError: in the event that any supporting
LVM or parted operation fails.
:returns: str: String in the format 'vg_name/lv_name'.
"""
lv_name = "osd-{}-{}".format(lv_type, osd_fsid)
current_volumes = lvm.list_logical_volumes()
pv_dev = _initialize_disk(dev)
vg_name = None
if not lvm.is_lvm_physical_volume(pv_dev):
lvm.create_lvm_physical_volume(pv_dev)
if shared:
vg_name = 'ceph-{}-{}'.format(lv_type,
str(uuid.uuid4()))
else:
vg_name = 'ceph-{}'.format(osd_fsid)
lvm.create_lvm_volume_group(vg_name, pv_dev)
else:
vg_name = lvm.list_lvm_volume_group(pv_dev)
if lv_name not in current_volumes:
lvm.create_logical_volume(lv_name, vg_name, size)
return "{}/{}".format(vg_name, lv_name)
def osdize_dir(path, encrypt=False, bluestore=False):
"""Ask ceph-disk to prepare a directory to become an osd.

View File

@ -152,10 +152,7 @@ class CephOsdBasicDeployment(OpenStackAmuletDeployment):
tenant='admin')
# Authenticate admin with cinder endpoint
self.cinder = u.authenticate_cinder_admin(self.keystone_sentry,
username='admin',
password='openstack',
tenant='admin')
self.cinder = u.authenticate_cinder_admin(self.keystone)
# Authenticate admin with glance endpoint
self.glance = u.authenticate_glance_admin(self.keystone)

View File

@ -21,6 +21,9 @@ from collections import OrderedDict
from charmhelpers.contrib.amulet.deployment import (
AmuletDeployment
)
from charmhelpers.contrib.openstack.amulet.utils import (
OPENSTACK_RELEASES_PAIRS
)
DEBUG = logging.DEBUG
ERROR = logging.ERROR
@ -271,11 +274,8 @@ class OpenStackAmuletDeployment(AmuletDeployment):
release.
"""
# Must be ordered by OpenStack release (not by Ubuntu release):
(self.trusty_icehouse, self.trusty_kilo, self.trusty_liberty,
self.trusty_mitaka, self.xenial_mitaka, self.xenial_newton,
self.yakkety_newton, self.xenial_ocata, self.zesty_ocata,
self.xenial_pike, self.artful_pike, self.xenial_queens,
self.bionic_queens,) = range(13)
for i, os_pair in enumerate(OPENSTACK_RELEASES_PAIRS):
setattr(self, os_pair, i)
releases = {
('trusty', None): self.trusty_icehouse,

View File

@ -50,6 +50,13 @@ ERROR = logging.ERROR
NOVA_CLIENT_VERSION = "2"
OPENSTACK_RELEASES_PAIRS = [
'trusty_icehouse', 'trusty_kilo', 'trusty_liberty',
'trusty_mitaka', 'xenial_mitaka', 'xenial_newton',
'yakkety_newton', 'xenial_ocata', 'zesty_ocata',
'xenial_pike', 'artful_pike', 'xenial_queens',
'bionic_queens']
class OpenStackAmuletUtils(AmuletUtils):
"""OpenStack amulet utilities.
@ -63,7 +70,34 @@ class OpenStackAmuletUtils(AmuletUtils):
super(OpenStackAmuletUtils, self).__init__(log_level)
def validate_endpoint_data(self, endpoints, admin_port, internal_port,
public_port, expected):
public_port, expected, openstack_release=None):
"""Validate endpoint data. Pick the correct validator based on
OpenStack release. Expected data should be in the v2 format:
{
'id': id,
'region': region,
'adminurl': adminurl,
'internalurl': internalurl,
'publicurl': publicurl,
'service_id': service_id}
"""
validation_function = self.validate_v2_endpoint_data
xenial_queens = OPENSTACK_RELEASES_PAIRS.index('xenial_queens')
if openstack_release and openstack_release >= xenial_queens:
validation_function = self.validate_v3_endpoint_data
expected = {
'id': expected['id'],
'region': expected['region'],
'region_id': 'RegionOne',
'url': self.valid_url,
'interface': self.not_null,
'service_id': expected['service_id']}
return validation_function(endpoints, admin_port, internal_port,
public_port, expected)
def validate_v2_endpoint_data(self, endpoints, admin_port, internal_port,
public_port, expected):
"""Validate endpoint data.
Validate actual endpoint data vs expected endpoint data. The ports
@ -141,7 +175,86 @@ class OpenStackAmuletUtils(AmuletUtils):
if len(found) != expected_num_eps:
return 'Unexpected number of endpoints found'
def validate_svc_catalog_endpoint_data(self, expected, actual):
def convert_svc_catalog_endpoint_data_to_v3(self, ep_data):
"""Convert v2 endpoint data into v3.
{
'service_name1': [
{
'adminURL': adminURL,
'id': id,
'region': region.
'publicURL': publicURL,
'internalURL': internalURL
}],
'service_name2': [
{
'adminURL': adminURL,
'id': id,
'region': region.
'publicURL': publicURL,
'internalURL': internalURL
}],
}
"""
self.log.warn("Endpoint ID and Region ID validation is limited to not "
"null checks after v2 to v3 conversion")
for svc in ep_data.keys():
assert len(ep_data[svc]) == 1, "Unknown data format"
svc_ep_data = ep_data[svc][0]
ep_data[svc] = [
{
'url': svc_ep_data['adminURL'],
'interface': 'admin',
'region': svc_ep_data['region'],
'region_id': self.not_null,
'id': self.not_null},
{
'url': svc_ep_data['publicURL'],
'interface': 'public',
'region': svc_ep_data['region'],
'region_id': self.not_null,
'id': self.not_null},
{
'url': svc_ep_data['internalURL'],
'interface': 'internal',
'region': svc_ep_data['region'],
'region_id': self.not_null,
'id': self.not_null}]
return ep_data
def validate_svc_catalog_endpoint_data(self, expected, actual,
openstack_release=None):
"""Validate service catalog endpoint data. Pick the correct validator
for the OpenStack version. Expected data should be in the v2 format:
{
'service_name1': [
{
'adminURL': adminURL,
'id': id,
'region': region.
'publicURL': publicURL,
'internalURL': internalURL
}],
'service_name2': [
{
'adminURL': adminURL,
'id': id,
'region': region.
'publicURL': publicURL,
'internalURL': internalURL
}],
}
"""
validation_function = self.validate_v2_svc_catalog_endpoint_data
xenial_queens = OPENSTACK_RELEASES_PAIRS.index('xenial_queens')
if openstack_release and openstack_release >= xenial_queens:
validation_function = self.validate_v3_svc_catalog_endpoint_data
expected = self.convert_svc_catalog_endpoint_data_to_v3(expected)
return validation_function(expected, actual)
def validate_v2_svc_catalog_endpoint_data(self, expected, actual):
"""Validate service catalog endpoint data.
Validate a list of actual service catalog endpoints vs a list of
@ -350,16 +463,13 @@ class OpenStackAmuletUtils(AmuletUtils):
deployment._auto_wait_for_status()
self.keystone_wait_for_propagation(sentry_relation_pairs, api_version)
def authenticate_cinder_admin(self, keystone_sentry, username,
password, tenant, api_version=2):
def authenticate_cinder_admin(self, keystone, api_version=2):
"""Authenticates admin user with cinder."""
# NOTE(beisner): cinder python client doesn't accept tokens.
keystone_ip = keystone_sentry.info['public-address']
ept = "http://{}:5000/v2.0".format(keystone_ip.strip().decode('utf-8'))
self.log.debug('Authenticating cinder admin...')
_clients = {
1: cinder_client.Client,
2: cinder_clientv2.Client}
return _clients[api_version](username, password, tenant, ept)
return _clients[api_version](session=keystone.session)
def authenticate_keystone(self, keystone_ip, username, password,
api_version=False, admin_port=False,
@ -367,13 +477,36 @@ class OpenStackAmuletUtils(AmuletUtils):
project_domain_name=None, project_name=None):
"""Authenticate with Keystone"""
self.log.debug('Authenticating with keystone...')
port = 5000
if admin_port:
port = 35357
base_ep = "http://{}:{}".format(keystone_ip.strip().decode('utf-8'),
port)
if not api_version or api_version == 2:
ep = base_ep + "/v2.0"
if not api_version:
api_version = 2
sess, auth = self.get_keystone_session(
keystone_ip=keystone_ip,
username=username,
password=password,
api_version=api_version,
admin_port=admin_port,
user_domain_name=user_domain_name,
domain_name=domain_name,
project_domain_name=project_domain_name,
project_name=project_name
)
if api_version == 2:
client = keystone_client.Client(session=sess)
else:
client = keystone_client_v3.Client(session=sess)
# This populates the client.service_catalog
client.auth_ref = auth.get_access(sess)
return client
def get_keystone_session(self, keystone_ip, username, password,
api_version=False, admin_port=False,
user_domain_name=None, domain_name=None,
project_domain_name=None, project_name=None):
"""Return a keystone session object"""
ep = self.get_keystone_endpoint(keystone_ip,
api_version=api_version,
admin_port=admin_port)
if api_version == 2:
auth = v2.Password(
username=username,
password=password,
@ -381,12 +514,7 @@ class OpenStackAmuletUtils(AmuletUtils):
auth_url=ep
)
sess = keystone_session.Session(auth=auth)
client = keystone_client.Client(session=sess)
# This populates the client.service_catalog
client.auth_ref = auth.get_access(sess)
return client
else:
ep = base_ep + "/v3"
auth = v3.Password(
user_domain_name=user_domain_name,
username=username,
@ -397,10 +525,57 @@ class OpenStackAmuletUtils(AmuletUtils):
auth_url=ep
)
sess = keystone_session.Session(auth=auth)
client = keystone_client_v3.Client(session=sess)
# This populates the client.service_catalog
client.auth_ref = auth.get_access(sess)
return client
return (sess, auth)
def get_keystone_endpoint(self, keystone_ip, api_version=None,
admin_port=False):
"""Return keystone endpoint"""
port = 5000
if admin_port:
port = 35357
base_ep = "http://{}:{}".format(keystone_ip.strip().decode('utf-8'),
port)
if api_version == 2:
ep = base_ep + "/v2.0"
else:
ep = base_ep + "/v3"
return ep
def get_default_keystone_session(self, keystone_sentry,
openstack_release=None):
"""Return a keystone session object and client object assuming standard
default settings
Example call in amulet tests:
self.keystone_session, self.keystone = u.get_default_keystone_session(
self.keystone_sentry,
openstack_release=self._get_openstack_release())
The session can then be used to auth other clients:
neutronclient.Client(session=session)
aodh_client.Client(session=session)
eyc
"""
self.log.debug('Authenticating keystone admin...')
api_version = 2
client_class = keystone_client.Client
# 11 => xenial_queens
if openstack_release and openstack_release >= 11:
api_version = 3
client_class = keystone_client_v3.Client
keystone_ip = keystone_sentry.info['public-address']
session, auth = self.get_keystone_session(
keystone_ip,
api_version=api_version,
username='admin',
password='openstack',
project_name='admin',
user_domain_name='admin_domain',
project_domain_name='admin_domain')
client = client_class(session=session)
# This populates the client.service_catalog
client.auth_ref = auth.get_access(session)
return session, client
def authenticate_keystone_admin(self, keystone_sentry, user, password,
tenant=None, api_version=None,

View File

@ -27,6 +27,7 @@ import glob
import os
import json
import yaml
import re
import subprocess
import sys
import errno
@ -67,7 +68,7 @@ def cached(func):
@wraps(func)
def wrapper(*args, **kwargs):
global cache
key = str((func, args, kwargs))
key = json.dumps((func, args, kwargs), sort_keys=True, default=str)
try:
return cache[key]
except KeyError:
@ -1043,7 +1044,6 @@ def juju_version():
universal_newlines=True).strip()
@cached
def has_juju_version(minimum_version):
"""Return True if the Juju version is at least the provided version"""
return LooseVersion(juju_version()) >= LooseVersion(minimum_version)
@ -1103,6 +1103,8 @@ def _run_atexit():
@translate_exc(from_exc=OSError, to_exc=NotImplementedError)
def network_get_primary_address(binding):
'''
Deprecated since Juju 2.3; use network_get()
Retrieve the primary network address for a named binding
:param binding: string. The name of a relation of extra-binding
@ -1123,7 +1125,6 @@ def network_get_primary_address(binding):
return response
@translate_exc(from_exc=OSError, to_exc=NotImplementedError)
def network_get(endpoint, relation_id=None):
"""
Retrieve the network details for a relation endpoint
@ -1131,24 +1132,20 @@ def network_get(endpoint, relation_id=None):
:param endpoint: string. The name of a relation endpoint
:param relation_id: int. The ID of the relation for the current context.
:return: dict. The loaded YAML output of the network-get query.
:raise: NotImplementedError if run on Juju < 2.1
:raise: NotImplementedError if request not supported by the Juju version.
"""
if not has_juju_version('2.2'):
raise NotImplementedError(juju_version()) # earlier versions require --primary-address
if relation_id and not has_juju_version('2.3'):
raise NotImplementedError # 2.3 added the -r option
cmd = ['network-get', endpoint, '--format', 'yaml']
if relation_id:
cmd.append('-r')
cmd.append(relation_id)
try:
response = subprocess.check_output(
cmd,
stderr=subprocess.STDOUT).decode('UTF-8').strip()
except CalledProcessError as e:
# Early versions of Juju 2.0.x required the --primary-address argument.
# We catch that condition here and raise NotImplementedError since
# the requested semantics are not available - the caller can then
# use the network_get_primary_address() method instead.
if '--primary-address is currently required' in e.output.decode('UTF-8'):
raise NotImplementedError
raise
response = subprocess.check_output(
cmd,
stderr=subprocess.STDOUT).decode('UTF-8').strip()
return yaml.safe_load(response)
@ -1204,9 +1201,23 @@ def iter_units_for_relation_name(relation_name):
def ingress_address(rid=None, unit=None):
"""
Retrieve the ingress-address from a relation when available. Otherwise,
return the private-address. This function is to be used on the consuming
side of the relation.
Retrieve the ingress-address from a relation when available.
Otherwise, return the private-address.
When used on the consuming side of the relation (unit is a remote
unit), the ingress-address is the IP address that this unit needs
to use to reach the provided service on the remote unit.
When used on the providing side of the relation (unit == local_unit()),
the ingress-address is the IP address that is advertised to remote
units on this relation. Remote units need to use this address to
reach the local provided service on this unit.
Note that charms may document some other method to use in
preference to the ingress_address(), such as an address provided
on a different relation attribute or a service discovery mechanism.
This allows charms to redirect inbound connections to their peers
or different applications such as load balancers.
Usage:
addresses = [ingress_address(rid=u.rid, unit=u.unit)
@ -1220,3 +1231,40 @@ def ingress_address(rid=None, unit=None):
settings = relation_get(rid=rid, unit=unit)
return (settings.get('ingress-address') or
settings.get('private-address'))
def egress_subnets(rid=None, unit=None):
"""
Retrieve the egress-subnets from a relation.
This function is to be used on the providing side of the
relation, and provides the ranges of addresses that client
connections may come from. The result is uninteresting on
the consuming side of a relation (unit == local_unit()).
Returns a stable list of subnets in CIDR format.
eg. ['192.168.1.0/24', '2001::F00F/128']
If egress-subnets is not available, falls back to using the published
ingress-address, or finally private-address.
:param rid: string relation id
:param unit: string unit name
:side effect: calls relation_get
:return: list of subnets in CIDR format. eg. ['192.168.1.0/24', '2001::F00F/128']
"""
def _to_range(addr):
if re.search(r'^(?:\d{1,3}\.){3}\d{1,3}$', addr) is not None:
addr += '/32'
elif ':' in addr and '/' not in addr: # IPv6
addr += '/128'
return addr
settings = relation_get(rid=rid, unit=unit)
if 'egress-subnets' in settings:
return [n.strip() for n in settings['egress-subnets'].split(',') if n.strip()]
if 'ingress-address' in settings:
return [_to_range(settings['ingress-address'])]
if 'private-address' in settings:
return [_to_range(settings['private-address'])]
return [] # Should never happen

View File

@ -313,17 +313,26 @@ class PortManagerCallback(ManagerCallback):
with open(port_file) as fp:
old_ports = fp.read().split(',')
for old_port in old_ports:
if bool(old_port):
old_port = int(old_port)
if old_port not in new_ports:
hookenv.close_port(old_port)
if bool(old_port) and not self.ports_contains(old_port, new_ports):
hookenv.close_port(old_port)
with open(port_file, 'w') as fp:
fp.write(','.join(str(port) for port in new_ports))
for port in new_ports:
# A port is either a number or 'ICMP'
protocol = 'TCP'
if str(port).upper() == 'ICMP':
protocol = 'ICMP'
if event_name == 'start':
hookenv.open_port(port)
hookenv.open_port(port, protocol)
elif event_name == 'stop':
hookenv.close_port(port)
hookenv.close_port(port, protocol)
def ports_contains(self, port, ports):
if not bool(port):
return False
if str(port).upper() != 'ICMP':
port = int(port)
return port in ports
def service_stop(service_name):