From 3dfeff7a19e16b166c302a8896b39e8357eeb6f7 Mon Sep 17 00:00:00 2001 From: Chris MacNaughton Date: Fri, 10 Feb 2017 07:54:14 -0500 Subject: [PATCH] Sync in charms.ceph This brings in the new broker change to restrict key access by groups Change-Id: I19ad0142b4227ba555a0794e8b938372d9fdb84c Partial-Bug: 1424771 --- lib/ceph/__init__.py | 86 ++++++++++++++----- lib/ceph/ceph_broker.py | 179 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 244 insertions(+), 21 deletions(-) diff --git a/lib/ceph/__init__.py b/lib/ceph/__init__.py index 7f80b2c..e87aef9 100644 --- a/lib/ceph/__init__.py +++ b/lib/ceph/__init__.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from _ctypes import POINTER, byref import ctypes import collections import json @@ -309,22 +310,52 @@ def set_hdd_read_ahead(dev_name, read_ahead_sectors=256): def get_block_uuid(block_dev): """ - This queries blkid to get the uuid for a block device. + This queries blkid to get the uuid for a block device. Note: This function + needs to be called with root priv. It will raise an error otherwise. :param block_dev: Name of the block device to query. - :return: The UUID of the device or None on Error. + :return: The UUID of the device or None on Error. Raises OSError """ try: - block_info = subprocess.check_output( - ['blkid', '-o', 'export', block_dev]) - for tag in block_info.split('\n'): - parts = tag.split('=') - if parts[0] == 'UUID': - return parts[1] - return None - except subprocess.CalledProcessError as err: - log('get_block_uuid failed with error: {}'.format(err.output), + blkid = ctypes.cdll.LoadLibrary("libblkid.so") + # Header signature + # extern int blkid_probe_lookup_value(blkid_probe pr, const char *name, + # const char **data, size_t *len); + blkid.blkid_new_probe_from_filename.argtypes = [ctypes.c_char_p] + blkid.blkid_probe_lookup_value.argtypes = [ctypes.c_void_p, + ctypes.c_char_p, + POINTER(ctypes.c_char_p), + POINTER(ctypes.c_ulong)] + except OSError as err: + log('get_block_uuid loading libblkid.so failed with error: {}'.format( + os.strerror(err.errno)), level=ERROR) + raise err + if not os.path.exists(block_dev): return None + probe = blkid.blkid_new_probe_from_filename(ctypes.c_char_p(block_dev)) + if probe < 0: + log('get_block_uuid new_probe_from_filename failed: {}'.format( + os.strerror(probe)), + level=ERROR) + raise OSError(probe, os.strerror(probe)) + result = blkid.blkid_do_probe(probe) + if result != 0: + log('get_block_uuid do_probe failed with error: {}'.format( + os.strerror(result)), + level=ERROR) + raise OSError(result, os.strerror(result)) + uuid = ctypes.c_char_p() + result = blkid.blkid_probe_lookup_value(probe, + ctypes.c_char_p( + 'UUID'.encode('ascii')), + byref(uuid), None) + if result < 0: + log('get_block_uuid lookup_value failed with error: {}'.format( + os.strerror(result)), + level=ERROR) + raise OSError(result, os.strerror(result)) + blkid.blkid_free_probe(probe) + return ctypes.string_at(uuid).decode('ascii') def check_max_sectors(save_settings_dict, @@ -390,6 +421,7 @@ def tune_dev(block_dev): if uuid is None: log('block device {} uuid is None. Unable to save to ' 'hdparm.conf'.format(block_dev), level=DEBUG) + return save_settings_dict = {} log('Tuning device {}'.format(block_dev)) status_set('maintenance', 'Tuning device {}'.format(block_dev)) @@ -1430,10 +1462,17 @@ def upgrade_monitor(new_version): service_stop('ceph-mon-all') apt_install(packages=PACKAGES, fatal=True) - # Ensure the ownership of Ceph's directories is correct - chownr(path=os.path.join(os.sep, "var", "lib", "ceph"), - owner=ceph_user(), - group=ceph_user()) + # Ensure the files and directories under /var/lib/ceph is chowned + # properly as part of the move to the Jewel release, which moved the + # ceph daemons to running as ceph:ceph instead of root:root. + if new_version == 'jewel': + # Ensure the ownership of Ceph's directories is correct + owner = ceph_user() + chownr(path=os.path.join(os.sep, "var", "lib", "ceph"), + owner=owner, + group=owner, + follow_links=True) + if systemd(): for mon_id in get_local_mon_ids(): service_start('ceph-mon@{}'.format(mon_id)) @@ -1608,10 +1647,18 @@ def upgrade_osd(new_version): service_stop('ceph-osd-all') apt_install(packages=PACKAGES, fatal=True) - # Ensure the ownership of Ceph's directories is correct - chownr(path=os.path.join(os.sep, "var", "lib", "ceph"), - owner=ceph_user(), - group=ceph_user()) + # Ensure the files and directories under /var/lib/ceph is chowned + # properly as part of the move to the Jewel release, which moved the + # ceph daemons to running as ceph:ceph instead of root:root. Only do + # it when necessary as this is an expensive operation to run. + if new_version == 'jewel': + owner = ceph_user() + status_set('maintenance', 'Updating file ownership for OSDs') + chownr(path=os.path.join(os.sep, "var", "lib", "ceph"), + owner=owner, + group=owner, + follow_links=True) + if systemd(): for osd_id in get_local_osd_ids(): service_start('ceph-osd@{}'.format(osd_id)) @@ -1642,7 +1689,6 @@ def list_pools(service): log("rados lspools failed with error: {}".format(err.output)) raise - # A dict of valid ceph upgrade paths. Mapping is old -> new UPGRADE_PATHS = { 'firefly': 'hammer', diff --git a/lib/ceph/ceph_broker.py b/lib/ceph/ceph_broker.py index 33d0df8..f15b9bd 100644 --- a/lib/ceph/ceph_broker.py +++ b/lib/ceph/ceph_broker.py @@ -34,6 +34,8 @@ from charmhelpers.contrib.storage.linux.ceph import ( delete_pool, erasure_profile_exists, get_osds, + monitor_key_get, + monitor_key_set, pool_exists, pool_set, remove_pool_snapshot, @@ -49,7 +51,7 @@ from charmhelpers.contrib.storage.linux.ceph import ( # This comes from http://docs.ceph.com/docs/master/rados/operations/pools/ # This should do a decent job of preventing people from passing in bad values. # It will give a useful error message -from subprocess import check_output, CalledProcessError +from subprocess import check_call, check_output, CalledProcessError POOL_KEYS = { # "Ceph Key Name": [Python type, [Valid Range]] @@ -157,11 +159,169 @@ def handle_create_erasure_profile(request, service): data_chunks=k, coding_chunks=m, locality=l) +def handle_add_permissions_to_key(request, service): + """ + Groups are defined by the key cephx.groups.(namespace-)?-(name). This key + will contain a dict serialized to JSON with data about the group, including + pools and members. + + A group can optionally have a namespace defined that will be used to + further restrict pool access. + """ + service_name = request.get('name') + group_name = request.get('group') + group_namespace = request.get('group-namespace') + if group_namespace: + group_name = "{}-{}".format(group_namespace, group_name) + group = get_group(group_name=group_name) + service_obj = get_service_groups(service=service_name, + namespace=group_namespace) + format("Service object: {}".format(service_obj)) + permission = request.get('group-permission') or "rwx" + if service_name not in group['services']: + group['services'].append(service_name) + save_group(group=group, group_name=group_name) + if permission not in service_obj['group_names']: + service_obj['group_names'][permission] = [] + if group_name not in service_obj['group_names'][permission]: + service_obj['group_names'][permission].append(group_name) + save_service(service=service_obj, service_name=service_name) + service_obj['groups'][group_name] = group + update_service_permissions(service_name, service_obj, group_namespace) + + +def update_service_permissions(service, service_obj=None, namespace=None): + """Update the key permissions for the named client in Ceph""" + if not service_obj: + service_obj = get_service_groups(service=service, namespace=namespace) + permissions = pool_permission_list_for_service(service_obj) + call = ['ceph', 'auth', 'caps', 'client.{}'.format(service)] + permissions + try: + check_call(call) + except CalledProcessError as e: + log("Error updating key capabilities: {}".format(e)) + + +def add_pool_to_group(pool, group, namespace=None): + """Add a named pool to a named group""" + group_name = group + if namespace: + group_name = "{}-{}".format(namespace, group_name) + group = get_group(group_name=group_name) + group["pools"].append(pool) + save_group(group, group_name=group_name) + for service in group['services']: + update_service_permissions(service, namespace=namespace) + + +def pool_permission_list_for_service(service): + """Build the permission string for Ceph for a given service""" + permissions = "" + permission_types = {} + for permission, group in service["group_names"].items(): + if permission not in permission_types: + permission_types[permission] = [] + for item in group: + permission_types[permission].append(item) + for permission, groups in permission_types.items(): + permission = " allow {}".format(permission) + for group in groups: + for pool in service['groups'][group]['pools']: + permission = "{} pool={}".format(permission, pool) + permissions += permission + return ["mon", "allow r", "osd", permissions.strip()] + + +def get_service_groups(service, namespace=None): + """ + Services are objects stored with some metadata, they look like (for a + service named "nova"): + { + group_names: {'rwx': ['images']}, + groups: {} + } + After populating the group, it looks like: + { + group_names: {'rwx': ['images']}, + groups: { + 1 'images': { + pools: ['glance'], + services: ['nova'] + } + } + } + """ + service_json = monitor_key_get(service='admin', + key="cephx.services.{}".format(service)) + try: + service = json.loads(service_json) + except TypeError: + service = None + except ValueError: + service = None + if service: + for permission, groups in service['group_names'].items(): + for group in groups: + name = group + if namespace: + name = "{}-{}".format(namespace, name) + service['groups'][group] = get_group(group_name=name) + else: + service = {'group_names': {}, 'groups': {}} + return service + + +def get_group(group_name): + """ + A group is a structure to hold data about a named group, structured as: + { + pools: ['glance'], + services: ['nova'] + } + """ + group_key = get_group_key(group_name=group_name) + group_json = monitor_key_get(service='admin', key=group_key) + try: + group = json.loads(group_json) + except TypeError: + group = None + except ValueError: + group = None + if not group: + group = { + 'pools': [], + 'services': [] + } + return group + + +def save_service(service_name, service): + """Persist a service in the monitor cluster""" + service['groups'] = {} + return monitor_key_set(service='admin', + key="cephx.services.{}".format(service_name), + value=json.dumps(service)) + + +def save_group(group, group_name): + """Persist a group in the monitor cluster""" + group_key = get_group_key(group_name=group_name) + return monitor_key_set(service='admin', + key=group_key, + value=json.dumps(group)) + + +def get_group_key(group_name): + """Build group key""" + return 'cephx.groups.{}'.format(group_name) + + def handle_erasure_pool(request, service): pool_name = request.get('name') erasure_profile = request.get('erasure-profile') quota = request.get('max-bytes') weight = request.get('weight') + group_name = request.get('group') if erasure_profile is None: erasure_profile = "default-canonical" @@ -172,6 +332,13 @@ def handle_erasure_pool(request, service): log(msg, level=ERROR) return {'exit-code': 1, 'stderr': msg} + if group_name: + group_namespace = request.get('group-namespace') + # Add the pool to the group named "group_name" + add_pool_to_group(pool=pool_name, + group=group_name, + namespace=group_namespace) + # TODO: Default to 3/2 erasure coding. I believe this requires min 5 osds if not erasure_profile_exists(service=service, name=erasure_profile): # TODO: Fail and tell them to create the profile or default @@ -200,6 +367,7 @@ def handle_replicated_pool(request, service): replicas = request.get('replicas') quota = request.get('max-bytes') weight = request.get('weight') + group_name = request.get('group') # Optional params pg_num = request.get('pg_num') @@ -215,6 +383,13 @@ def handle_replicated_pool(request, service): log(msg, level=ERROR) return {'exit-code': 1, 'stderr': msg} + if group_name: + group_namespace = request.get('group-namespace') + # Add the pool to the group named "group_name" + add_pool_to_group(pool=pool_name, + group=group_name, + namespace=group_namespace) + kwargs = {} if pg_num: kwargs['pg_num'] = pg_num @@ -570,6 +745,8 @@ def process_requests_v1(reqs): ret = handle_rgw_create_user(request=req, service=svc) elif op == "move-osd-to-bucket": ret = handle_put_osd_in_bucket(request=req, service=svc) + elif op == "add-permissions-to-key": + ret = handle_add_permissions_to_key(request=req, service=svc) else: msg = "Unknown operation '%s'" % op log(msg, level=ERROR)