diff --git a/ceph/ceph_broker.py b/ceph/ceph_broker.py index 33d0df8..f15b9bd 100644 --- a/ceph/ceph_broker.py +++ b/ceph/ceph_broker.py @@ -34,6 +34,8 @@ from charmhelpers.contrib.storage.linux.ceph import ( delete_pool, erasure_profile_exists, get_osds, + monitor_key_get, + monitor_key_set, pool_exists, pool_set, remove_pool_snapshot, @@ -49,7 +51,7 @@ from charmhelpers.contrib.storage.linux.ceph import ( # This comes from http://docs.ceph.com/docs/master/rados/operations/pools/ # This should do a decent job of preventing people from passing in bad values. # It will give a useful error message -from subprocess import check_output, CalledProcessError +from subprocess import check_call, check_output, CalledProcessError POOL_KEYS = { # "Ceph Key Name": [Python type, [Valid Range]] @@ -157,11 +159,169 @@ def handle_create_erasure_profile(request, service): data_chunks=k, coding_chunks=m, locality=l) +def handle_add_permissions_to_key(request, service): + """ + Groups are defined by the key cephx.groups.(namespace-)?-(name). This key + will contain a dict serialized to JSON with data about the group, including + pools and members. + + A group can optionally have a namespace defined that will be used to + further restrict pool access. + """ + service_name = request.get('name') + group_name = request.get('group') + group_namespace = request.get('group-namespace') + if group_namespace: + group_name = "{}-{}".format(group_namespace, group_name) + group = get_group(group_name=group_name) + service_obj = get_service_groups(service=service_name, + namespace=group_namespace) + format("Service object: {}".format(service_obj)) + permission = request.get('group-permission') or "rwx" + if service_name not in group['services']: + group['services'].append(service_name) + save_group(group=group, group_name=group_name) + if permission not in service_obj['group_names']: + service_obj['group_names'][permission] = [] + if group_name not in service_obj['group_names'][permission]: + service_obj['group_names'][permission].append(group_name) + save_service(service=service_obj, service_name=service_name) + service_obj['groups'][group_name] = group + update_service_permissions(service_name, service_obj, group_namespace) + + +def update_service_permissions(service, service_obj=None, namespace=None): + """Update the key permissions for the named client in Ceph""" + if not service_obj: + service_obj = get_service_groups(service=service, namespace=namespace) + permissions = pool_permission_list_for_service(service_obj) + call = ['ceph', 'auth', 'caps', 'client.{}'.format(service)] + permissions + try: + check_call(call) + except CalledProcessError as e: + log("Error updating key capabilities: {}".format(e)) + + +def add_pool_to_group(pool, group, namespace=None): + """Add a named pool to a named group""" + group_name = group + if namespace: + group_name = "{}-{}".format(namespace, group_name) + group = get_group(group_name=group_name) + group["pools"].append(pool) + save_group(group, group_name=group_name) + for service in group['services']: + update_service_permissions(service, namespace=namespace) + + +def pool_permission_list_for_service(service): + """Build the permission string for Ceph for a given service""" + permissions = "" + permission_types = {} + for permission, group in service["group_names"].items(): + if permission not in permission_types: + permission_types[permission] = [] + for item in group: + permission_types[permission].append(item) + for permission, groups in permission_types.items(): + permission = " allow {}".format(permission) + for group in groups: + for pool in service['groups'][group]['pools']: + permission = "{} pool={}".format(permission, pool) + permissions += permission + return ["mon", "allow r", "osd", permissions.strip()] + + +def get_service_groups(service, namespace=None): + """ + Services are objects stored with some metadata, they look like (for a + service named "nova"): + { + group_names: {'rwx': ['images']}, + groups: {} + } + After populating the group, it looks like: + { + group_names: {'rwx': ['images']}, + groups: { + 1 'images': { + pools: ['glance'], + services: ['nova'] + } + } + } + """ + service_json = monitor_key_get(service='admin', + key="cephx.services.{}".format(service)) + try: + service = json.loads(service_json) + except TypeError: + service = None + except ValueError: + service = None + if service: + for permission, groups in service['group_names'].items(): + for group in groups: + name = group + if namespace: + name = "{}-{}".format(namespace, name) + service['groups'][group] = get_group(group_name=name) + else: + service = {'group_names': {}, 'groups': {}} + return service + + +def get_group(group_name): + """ + A group is a structure to hold data about a named group, structured as: + { + pools: ['glance'], + services: ['nova'] + } + """ + group_key = get_group_key(group_name=group_name) + group_json = monitor_key_get(service='admin', key=group_key) + try: + group = json.loads(group_json) + except TypeError: + group = None + except ValueError: + group = None + if not group: + group = { + 'pools': [], + 'services': [] + } + return group + + +def save_service(service_name, service): + """Persist a service in the monitor cluster""" + service['groups'] = {} + return monitor_key_set(service='admin', + key="cephx.services.{}".format(service_name), + value=json.dumps(service)) + + +def save_group(group, group_name): + """Persist a group in the monitor cluster""" + group_key = get_group_key(group_name=group_name) + return monitor_key_set(service='admin', + key=group_key, + value=json.dumps(group)) + + +def get_group_key(group_name): + """Build group key""" + return 'cephx.groups.{}'.format(group_name) + + def handle_erasure_pool(request, service): pool_name = request.get('name') erasure_profile = request.get('erasure-profile') quota = request.get('max-bytes') weight = request.get('weight') + group_name = request.get('group') if erasure_profile is None: erasure_profile = "default-canonical" @@ -172,6 +332,13 @@ def handle_erasure_pool(request, service): log(msg, level=ERROR) return {'exit-code': 1, 'stderr': msg} + if group_name: + group_namespace = request.get('group-namespace') + # Add the pool to the group named "group_name" + add_pool_to_group(pool=pool_name, + group=group_name, + namespace=group_namespace) + # TODO: Default to 3/2 erasure coding. I believe this requires min 5 osds if not erasure_profile_exists(service=service, name=erasure_profile): # TODO: Fail and tell them to create the profile or default @@ -200,6 +367,7 @@ def handle_replicated_pool(request, service): replicas = request.get('replicas') quota = request.get('max-bytes') weight = request.get('weight') + group_name = request.get('group') # Optional params pg_num = request.get('pg_num') @@ -215,6 +383,13 @@ def handle_replicated_pool(request, service): log(msg, level=ERROR) return {'exit-code': 1, 'stderr': msg} + if group_name: + group_namespace = request.get('group-namespace') + # Add the pool to the group named "group_name" + add_pool_to_group(pool=pool_name, + group=group_name, + namespace=group_namespace) + kwargs = {} if pg_num: kwargs['pg_num'] = pg_num @@ -570,6 +745,8 @@ def process_requests_v1(reqs): ret = handle_rgw_create_user(request=req, service=svc) elif op == "move-osd-to-bucket": ret = handle_put_osd_in_bucket(request=req, service=svc) + elif op == "add-permissions-to-key": + ret = handle_add_permissions_to_key(request=req, service=svc) else: msg = "Unknown operation '%s'" % op log(msg, level=ERROR) diff --git a/unit_tests/test_ceph_broker.py b/unit_tests/test_ceph_broker.py index c154a7b..e3be093 100644 --- a/unit_tests/test_ceph_broker.py +++ b/unit_tests/test_ceph_broker.py @@ -25,6 +25,180 @@ class CephBrokerTestCase(unittest.TestCase): def setUp(self): super(CephBrokerTestCase, self).setUp() + @mock.patch('ceph_broker.check_call') + def test_update_service_permission(self, _check_call): + service_obj = { + 'group_names': {'rwx': ['images']}, + 'groups': {'images': {'pools': ['cinder'], 'services': ['nova']}} + } + ceph_broker.update_service_permissions(service='nova', + service_obj=service_obj) + _check_call.assert_called_with(['ceph', 'auth', 'caps', + 'client.nova', 'mon', 'allow r', 'osd', + 'allow rwx pool=cinder']) + + @mock.patch('ceph_broker.check_call') + @mock.patch('ceph_broker.get_service_groups') + @mock.patch('ceph_broker.monitor_key_set') + @mock.patch('ceph_broker.monitor_key_get') + def test_add_pool_to_existing_group_with_services(self, + _monitor_key_get, + _monitor_key_set, + _get_service_groups, + _check_call): + _monitor_key_get.return_value = '{"pools": ["glance"],'\ + ' "services": ["nova"]}' + service = { + 'group_names': {'rwx': ['images']}, + 'groups': {'images': {'pools': [ + 'glance', 'cinder' + ], 'services': ['nova']}} + } + _get_service_groups.return_value = service + ceph_broker.add_pool_to_group( + pool="cinder", + group="images" + ) + _monitor_key_set.assert_called_with( + key='cephx.groups.images', + service='admin', + value=json.dumps({"pools": ["glance", "cinder"], + "services": ["nova"]})) + _check_call.assert_called_with(['ceph', 'auth', 'caps', + 'client.nova', 'mon', 'allow r', 'osd', + 'allow rwx pool=glance pool=cinder']) + + @mock.patch('ceph_broker.monitor_key_set') + @mock.patch('ceph_broker.monitor_key_get') + def test_add_pool_to_existing_group(self, + _monitor_key_get, + _monitor_key_set): + _monitor_key_get.return_value = '{"pools": ["glance"], "services": []}' + ceph_broker.add_pool_to_group( + pool="cinder", + group="images" + ) + _monitor_key_set.assert_called_with( + key='cephx.groups.images', + service='admin', + value=json.dumps({"pools": ["glance", "cinder"], "services": []})) + + @mock.patch('ceph_broker.monitor_key_set') + @mock.patch('ceph_broker.monitor_key_get') + def test_add_pool_to_new_group(self, + _monitor_key_get, + _monitor_key_set): + _monitor_key_get.return_value = '{"pools": [], "services": []}' + ceph_broker.add_pool_to_group( + pool="glance", + group="images" + ) + _monitor_key_set.assert_called_with( + key='cephx.groups.images', + service='admin', + value=json.dumps({"pools": ["glance"], "services": []})) + + def test_pool_permission_list_for_service(self): + service = { + 'group_names': {'rwx': ['images']}, + 'groups': {'images': {'pools': ['glance'], 'services': ['nova']}} + } + result = ceph_broker.pool_permission_list_for_service(service) + self.assertEqual(result, ['mon', + 'allow r', + 'osd', + 'allow rwx pool=glance']) + + @mock.patch('ceph_broker.monitor_key_set') + def test_save_service(self, _monitor_key_set): + service = { + 'group_names': {'rwx': 'images'}, + 'groups': {'images': {'pools': ['glance'], 'services': ['nova']}} + } + ceph_broker.save_service(service=service, service_name='nova') + _monitor_key_set.assert_called_with( + value='{"groups": {}, "group_names": {"rwx": "images"}}', + key='cephx.services.nova', + service='admin') + + @mock.patch('ceph_broker.monitor_key_get') + def test_get_service_groups_empty(self, _monitor_key_get): + _monitor_key_get.return_value = None + service = ceph_broker.get_service_groups('nova') + _monitor_key_get.assert_called_with( + key='cephx.services.nova', + service='admin' + ) + self.assertEqual(service, {'group_names': {}, 'groups': {}}) + + @mock.patch('ceph_broker.monitor_key_get') + def test_get_service_groups_empty_str(self, _monitor_key_get): + _monitor_key_get.return_value = '' + service = ceph_broker.get_service_groups('nova') + _monitor_key_get.assert_called_with( + key='cephx.services.nova', + service='admin' + ) + self.assertEqual(service, {'group_names': {}, 'groups': {}}) + + @mock.patch('ceph_broker.get_group') + @mock.patch('ceph_broker.monitor_key_get') + def test_get_service_groups(self, _monitor_key_get, _get_group): + _monitor_key_get.return_value = '{"group_names": {"rwx": ["images"]}' \ + ',"groups": {}}' + _get_group.return_value = { + 'pools': ["glance"], + 'services': ['nova'] + } + service = ceph_broker.get_service_groups('nova') + _monitor_key_get.assert_called_with( + key='cephx.services.nova', + service='admin' + ) + self.assertEqual(service, { + 'group_names': {'rwx': ['images']}, + 'groups': {'images': {'pools': ['glance'], 'services': ['nova']}} + }) + + @mock.patch('ceph_broker.monitor_key_set') + def test_save_group(self, _monitor_key_set): + group = { + 'pools': ["glance"], + 'services': [] + } + ceph_broker.save_group(group=group, group_name='images') + _monitor_key_set.assert_called_with( + key='cephx.groups.images', + service='admin', + value=json.dumps(group)) + + @mock.patch('ceph_broker.monitor_key_get') + def test_get_group_empty_str(self, _monitor_key_get): + _monitor_key_get.return_value = '' + group = ceph_broker.get_group('images') + self.assertEqual(group, { + 'pools': [], + 'services': [] + }) + + @mock.patch('ceph_broker.monitor_key_get') + def test_get_group_empty(self, _monitor_key_get): + _monitor_key_get.return_value = None + group = ceph_broker.get_group('images') + self.assertEqual(group, { + 'pools': [], + 'services': [] + }) + + @mock.patch('ceph_broker.monitor_key_get') + def test_get_group(self, _monitor_key_get): + _monitor_key_get.return_value = '{"pools": ["glance"], "services": []}' + group = ceph_broker.get_group('images') + self.assertEqual(group, { + 'pools': ["glance"], + 'services': [] + }) + @mock.patch('ceph_broker.log') def test_process_requests_noop(self, mock_log): req = json.dumps({'api-version': 1, 'ops': []}) @@ -74,6 +248,29 @@ class CephBrokerTestCase(unittest.TestCase): replicas=3, pg_num=100) self.assertEqual(json.loads(rc), {'exit-code': 0}) + @mock.patch('ceph_broker.ReplicatedPool') + @mock.patch('ceph_broker.pool_exists') + @mock.patch('ceph_broker.log') + @mock.patch('ceph_broker.add_pool_to_group') + def test_process_requests_create_pool_w_group(self, add_pool_to_group, + mock_log, mock_pool_exists, + mock_replicated_pool): + mock_pool_exists.return_value = False + reqs = json.dumps({'api-version': 1, + 'ops': [{ + 'op': 'create-pool', + 'name': 'foo', + 'replicas': 3, + 'group': 'image'}]}) + rc = ceph_broker.process_requests(reqs) + add_pool_to_group.assert_called_with(group='image', + pool='foo', + namespace=None) + mock_pool_exists.assert_called_with(service='admin', name='foo') + mock_replicated_pool.assert_called_with(service='admin', name='foo', + replicas=3) + self.assertEqual(json.loads(rc), {'exit-code': 0}) + @mock.patch('ceph_broker.ReplicatedPool') @mock.patch('ceph_broker.pool_exists') @mock.patch('ceph_broker.log')