Merge "Charms can request access to groups of pools"

This commit is contained in:
Jenkins 2017-02-09 22:08:13 +00:00 committed by Gerrit Code Review
commit bb53ae545b
2 changed files with 375 additions and 1 deletions

View File

@ -34,6 +34,8 @@ from charmhelpers.contrib.storage.linux.ceph import (
delete_pool,
erasure_profile_exists,
get_osds,
monitor_key_get,
monitor_key_set,
pool_exists,
pool_set,
remove_pool_snapshot,
@ -49,7 +51,7 @@ from charmhelpers.contrib.storage.linux.ceph import (
# This comes from http://docs.ceph.com/docs/master/rados/operations/pools/
# This should do a decent job of preventing people from passing in bad values.
# It will give a useful error message
from subprocess import check_output, CalledProcessError
from subprocess import check_call, check_output, CalledProcessError
POOL_KEYS = {
# "Ceph Key Name": [Python type, [Valid Range]]
@ -157,11 +159,169 @@ def handle_create_erasure_profile(request, service):
data_chunks=k, coding_chunks=m, locality=l)
def handle_add_permissions_to_key(request, service):
"""
Groups are defined by the key cephx.groups.(namespace-)?-(name). This key
will contain a dict serialized to JSON with data about the group, including
pools and members.
A group can optionally have a namespace defined that will be used to
further restrict pool access.
"""
service_name = request.get('name')
group_name = request.get('group')
group_namespace = request.get('group-namespace')
if group_namespace:
group_name = "{}-{}".format(group_namespace, group_name)
group = get_group(group_name=group_name)
service_obj = get_service_groups(service=service_name,
namespace=group_namespace)
format("Service object: {}".format(service_obj))
permission = request.get('group-permission') or "rwx"
if service_name not in group['services']:
group['services'].append(service_name)
save_group(group=group, group_name=group_name)
if permission not in service_obj['group_names']:
service_obj['group_names'][permission] = []
if group_name not in service_obj['group_names'][permission]:
service_obj['group_names'][permission].append(group_name)
save_service(service=service_obj, service_name=service_name)
service_obj['groups'][group_name] = group
update_service_permissions(service_name, service_obj, group_namespace)
def update_service_permissions(service, service_obj=None, namespace=None):
"""Update the key permissions for the named client in Ceph"""
if not service_obj:
service_obj = get_service_groups(service=service, namespace=namespace)
permissions = pool_permission_list_for_service(service_obj)
call = ['ceph', 'auth', 'caps', 'client.{}'.format(service)] + permissions
try:
check_call(call)
except CalledProcessError as e:
log("Error updating key capabilities: {}".format(e))
def add_pool_to_group(pool, group, namespace=None):
"""Add a named pool to a named group"""
group_name = group
if namespace:
group_name = "{}-{}".format(namespace, group_name)
group = get_group(group_name=group_name)
group["pools"].append(pool)
save_group(group, group_name=group_name)
for service in group['services']:
update_service_permissions(service, namespace=namespace)
def pool_permission_list_for_service(service):
"""Build the permission string for Ceph for a given service"""
permissions = ""
permission_types = {}
for permission, group in service["group_names"].items():
if permission not in permission_types:
permission_types[permission] = []
for item in group:
permission_types[permission].append(item)
for permission, groups in permission_types.items():
permission = " allow {}".format(permission)
for group in groups:
for pool in service['groups'][group]['pools']:
permission = "{} pool={}".format(permission, pool)
permissions += permission
return ["mon", "allow r", "osd", permissions.strip()]
def get_service_groups(service, namespace=None):
"""
Services are objects stored with some metadata, they look like (for a
service named "nova"):
{
group_names: {'rwx': ['images']},
groups: {}
}
After populating the group, it looks like:
{
group_names: {'rwx': ['images']},
groups: {
1 'images': {
pools: ['glance'],
services: ['nova']
}
}
}
"""
service_json = monitor_key_get(service='admin',
key="cephx.services.{}".format(service))
try:
service = json.loads(service_json)
except TypeError:
service = None
except ValueError:
service = None
if service:
for permission, groups in service['group_names'].items():
for group in groups:
name = group
if namespace:
name = "{}-{}".format(namespace, name)
service['groups'][group] = get_group(group_name=name)
else:
service = {'group_names': {}, 'groups': {}}
return service
def get_group(group_name):
"""
A group is a structure to hold data about a named group, structured as:
{
pools: ['glance'],
services: ['nova']
}
"""
group_key = get_group_key(group_name=group_name)
group_json = monitor_key_get(service='admin', key=group_key)
try:
group = json.loads(group_json)
except TypeError:
group = None
except ValueError:
group = None
if not group:
group = {
'pools': [],
'services': []
}
return group
def save_service(service_name, service):
"""Persist a service in the monitor cluster"""
service['groups'] = {}
return monitor_key_set(service='admin',
key="cephx.services.{}".format(service_name),
value=json.dumps(service))
def save_group(group, group_name):
"""Persist a group in the monitor cluster"""
group_key = get_group_key(group_name=group_name)
return monitor_key_set(service='admin',
key=group_key,
value=json.dumps(group))
def get_group_key(group_name):
"""Build group key"""
return 'cephx.groups.{}'.format(group_name)
def handle_erasure_pool(request, service):
pool_name = request.get('name')
erasure_profile = request.get('erasure-profile')
quota = request.get('max-bytes')
weight = request.get('weight')
group_name = request.get('group')
if erasure_profile is None:
erasure_profile = "default-canonical"
@ -172,6 +332,13 @@ def handle_erasure_pool(request, service):
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
if group_name:
group_namespace = request.get('group-namespace')
# Add the pool to the group named "group_name"
add_pool_to_group(pool=pool_name,
group=group_name,
namespace=group_namespace)
# TODO: Default to 3/2 erasure coding. I believe this requires min 5 osds
if not erasure_profile_exists(service=service, name=erasure_profile):
# TODO: Fail and tell them to create the profile or default
@ -200,6 +367,7 @@ def handle_replicated_pool(request, service):
replicas = request.get('replicas')
quota = request.get('max-bytes')
weight = request.get('weight')
group_name = request.get('group')
# Optional params
pg_num = request.get('pg_num')
@ -215,6 +383,13 @@ def handle_replicated_pool(request, service):
log(msg, level=ERROR)
return {'exit-code': 1, 'stderr': msg}
if group_name:
group_namespace = request.get('group-namespace')
# Add the pool to the group named "group_name"
add_pool_to_group(pool=pool_name,
group=group_name,
namespace=group_namespace)
kwargs = {}
if pg_num:
kwargs['pg_num'] = pg_num
@ -570,6 +745,8 @@ def process_requests_v1(reqs):
ret = handle_rgw_create_user(request=req, service=svc)
elif op == "move-osd-to-bucket":
ret = handle_put_osd_in_bucket(request=req, service=svc)
elif op == "add-permissions-to-key":
ret = handle_add_permissions_to_key(request=req, service=svc)
else:
msg = "Unknown operation '%s'" % op
log(msg, level=ERROR)

View File

@ -25,6 +25,180 @@ class CephBrokerTestCase(unittest.TestCase):
def setUp(self):
super(CephBrokerTestCase, self).setUp()
@mock.patch('ceph_broker.check_call')
def test_update_service_permission(self, _check_call):
service_obj = {
'group_names': {'rwx': ['images']},
'groups': {'images': {'pools': ['cinder'], 'services': ['nova']}}
}
ceph_broker.update_service_permissions(service='nova',
service_obj=service_obj)
_check_call.assert_called_with(['ceph', 'auth', 'caps',
'client.nova', 'mon', 'allow r', 'osd',
'allow rwx pool=cinder'])
@mock.patch('ceph_broker.check_call')
@mock.patch('ceph_broker.get_service_groups')
@mock.patch('ceph_broker.monitor_key_set')
@mock.patch('ceph_broker.monitor_key_get')
def test_add_pool_to_existing_group_with_services(self,
_monitor_key_get,
_monitor_key_set,
_get_service_groups,
_check_call):
_monitor_key_get.return_value = '{"pools": ["glance"],'\
' "services": ["nova"]}'
service = {
'group_names': {'rwx': ['images']},
'groups': {'images': {'pools': [
'glance', 'cinder'
], 'services': ['nova']}}
}
_get_service_groups.return_value = service
ceph_broker.add_pool_to_group(
pool="cinder",
group="images"
)
_monitor_key_set.assert_called_with(
key='cephx.groups.images',
service='admin',
value=json.dumps({"pools": ["glance", "cinder"],
"services": ["nova"]}))
_check_call.assert_called_with(['ceph', 'auth', 'caps',
'client.nova', 'mon', 'allow r', 'osd',
'allow rwx pool=glance pool=cinder'])
@mock.patch('ceph_broker.monitor_key_set')
@mock.patch('ceph_broker.monitor_key_get')
def test_add_pool_to_existing_group(self,
_monitor_key_get,
_monitor_key_set):
_monitor_key_get.return_value = '{"pools": ["glance"], "services": []}'
ceph_broker.add_pool_to_group(
pool="cinder",
group="images"
)
_monitor_key_set.assert_called_with(
key='cephx.groups.images',
service='admin',
value=json.dumps({"pools": ["glance", "cinder"], "services": []}))
@mock.patch('ceph_broker.monitor_key_set')
@mock.patch('ceph_broker.monitor_key_get')
def test_add_pool_to_new_group(self,
_monitor_key_get,
_monitor_key_set):
_monitor_key_get.return_value = '{"pools": [], "services": []}'
ceph_broker.add_pool_to_group(
pool="glance",
group="images"
)
_monitor_key_set.assert_called_with(
key='cephx.groups.images',
service='admin',
value=json.dumps({"pools": ["glance"], "services": []}))
def test_pool_permission_list_for_service(self):
service = {
'group_names': {'rwx': ['images']},
'groups': {'images': {'pools': ['glance'], 'services': ['nova']}}
}
result = ceph_broker.pool_permission_list_for_service(service)
self.assertEqual(result, ['mon',
'allow r',
'osd',
'allow rwx pool=glance'])
@mock.patch('ceph_broker.monitor_key_set')
def test_save_service(self, _monitor_key_set):
service = {
'group_names': {'rwx': 'images'},
'groups': {'images': {'pools': ['glance'], 'services': ['nova']}}
}
ceph_broker.save_service(service=service, service_name='nova')
_monitor_key_set.assert_called_with(
value='{"groups": {}, "group_names": {"rwx": "images"}}',
key='cephx.services.nova',
service='admin')
@mock.patch('ceph_broker.monitor_key_get')
def test_get_service_groups_empty(self, _monitor_key_get):
_monitor_key_get.return_value = None
service = ceph_broker.get_service_groups('nova')
_monitor_key_get.assert_called_with(
key='cephx.services.nova',
service='admin'
)
self.assertEqual(service, {'group_names': {}, 'groups': {}})
@mock.patch('ceph_broker.monitor_key_get')
def test_get_service_groups_empty_str(self, _monitor_key_get):
_monitor_key_get.return_value = ''
service = ceph_broker.get_service_groups('nova')
_monitor_key_get.assert_called_with(
key='cephx.services.nova',
service='admin'
)
self.assertEqual(service, {'group_names': {}, 'groups': {}})
@mock.patch('ceph_broker.get_group')
@mock.patch('ceph_broker.monitor_key_get')
def test_get_service_groups(self, _monitor_key_get, _get_group):
_monitor_key_get.return_value = '{"group_names": {"rwx": ["images"]}' \
',"groups": {}}'
_get_group.return_value = {
'pools': ["glance"],
'services': ['nova']
}
service = ceph_broker.get_service_groups('nova')
_monitor_key_get.assert_called_with(
key='cephx.services.nova',
service='admin'
)
self.assertEqual(service, {
'group_names': {'rwx': ['images']},
'groups': {'images': {'pools': ['glance'], 'services': ['nova']}}
})
@mock.patch('ceph_broker.monitor_key_set')
def test_save_group(self, _monitor_key_set):
group = {
'pools': ["glance"],
'services': []
}
ceph_broker.save_group(group=group, group_name='images')
_monitor_key_set.assert_called_with(
key='cephx.groups.images',
service='admin',
value=json.dumps(group))
@mock.patch('ceph_broker.monitor_key_get')
def test_get_group_empty_str(self, _monitor_key_get):
_monitor_key_get.return_value = ''
group = ceph_broker.get_group('images')
self.assertEqual(group, {
'pools': [],
'services': []
})
@mock.patch('ceph_broker.monitor_key_get')
def test_get_group_empty(self, _monitor_key_get):
_monitor_key_get.return_value = None
group = ceph_broker.get_group('images')
self.assertEqual(group, {
'pools': [],
'services': []
})
@mock.patch('ceph_broker.monitor_key_get')
def test_get_group(self, _monitor_key_get):
_monitor_key_get.return_value = '{"pools": ["glance"], "services": []}'
group = ceph_broker.get_group('images')
self.assertEqual(group, {
'pools': ["glance"],
'services': []
})
@mock.patch('ceph_broker.log')
def test_process_requests_noop(self, mock_log):
req = json.dumps({'api-version': 1, 'ops': []})
@ -74,6 +248,29 @@ class CephBrokerTestCase(unittest.TestCase):
replicas=3, pg_num=100)
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.ReplicatedPool')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.log')
@mock.patch('ceph_broker.add_pool_to_group')
def test_process_requests_create_pool_w_group(self, add_pool_to_group,
mock_log, mock_pool_exists,
mock_replicated_pool):
mock_pool_exists.return_value = False
reqs = json.dumps({'api-version': 1,
'ops': [{
'op': 'create-pool',
'name': 'foo',
'replicas': 3,
'group': 'image'}]})
rc = ceph_broker.process_requests(reqs)
add_pool_to_group.assert_called_with(group='image',
pool='foo',
namespace=None)
mock_pool_exists.assert_called_with(service='admin', name='foo')
mock_replicated_pool.assert_called_with(service='admin', name='foo',
replicas=3)
self.assertEqual(json.loads(rc), {'exit-code': 0})
@mock.patch('ceph_broker.ReplicatedPool')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.log')