Moved more broker code into ceph_broker
Added support for versioning api Added unit tests
This commit is contained in:
parent
eb436cd35f
commit
e328ba4054
|
@ -2,6 +2,8 @@
|
|||
#
|
||||
# Copyright 2014 Canonical Ltd.
|
||||
#
|
||||
import json
|
||||
|
||||
from charmhelpers.core.hookenv import (
|
||||
log,
|
||||
INFO,
|
||||
|
@ -13,8 +15,37 @@ from charmhelpers.contrib.storage.linux.ceph import (
|
|||
)
|
||||
|
||||
|
||||
def decode(f):
|
||||
def decode_inner(req):
|
||||
return json.dumps(f(json.loads(req)))
|
||||
|
||||
return decode_inner
|
||||
|
||||
|
||||
@decode
|
||||
def process_requests(reqs):
|
||||
"""Process a Ceph broker request from a ceph client."""
|
||||
"""Process a Ceph broker request from a ceph client.
|
||||
|
||||
This is a versioned api. We choose the api version based on provided
|
||||
version from client.
|
||||
"""
|
||||
version = reqs.get('version')
|
||||
if version == 1:
|
||||
return process_requests_v1(reqs['ops'])
|
||||
|
||||
msg = ("Missing or invalid api version (%s)" % (version))
|
||||
return {'exit_code': 1, 'stderr': msg}
|
||||
|
||||
|
||||
def process_requests_v1(reqs):
|
||||
"""Process a v1 requests from a ceph client.
|
||||
|
||||
Takes a list of requests (dicts) and processes each one until it hits an
|
||||
error.
|
||||
|
||||
Upon completion of all ops or if an error is found, a response dict is
|
||||
returned containing exit code and any extra info.
|
||||
"""
|
||||
log("Processing %s ceph broker requests" % (len(reqs)), level=INFO)
|
||||
for req in reqs:
|
||||
op = req.get('op')
|
||||
|
@ -26,11 +57,11 @@ def process_requests(reqs):
|
|||
params = {'pool': req.get('name'),
|
||||
'replicas': req.get('replicas')}
|
||||
if not all(params.iteritems()):
|
||||
log("Missing parameter(s): %s" %
|
||||
(' '.join([k for k in params.iterkeys()
|
||||
if not params[k]])),
|
||||
level=ERROR)
|
||||
return 1
|
||||
msg = ("Missing parameter(s): %s" %
|
||||
(' '.join([k for k in params.iterkeys()
|
||||
if not params[k]])))
|
||||
log(msg, level=ERROR)
|
||||
return {'exit_code': 1, 'stderr': msg}
|
||||
|
||||
pool = params['pool']
|
||||
replicas = params['replicas']
|
||||
|
@ -42,7 +73,8 @@ def process_requests(reqs):
|
|||
log("Pool '%s' already exists - skipping create" % (pool),
|
||||
level=INFO)
|
||||
else:
|
||||
log("Unknown operation '%s'" % (op))
|
||||
return 1
|
||||
msg = "Unknown operation '%s'" % (op)
|
||||
log(msg, level=ERROR)
|
||||
return {'exit_code': 1, 'stderr': msg}
|
||||
|
||||
return 0
|
||||
return {'exit_code': 0}
|
||||
|
|
|
@ -9,14 +9,15 @@
|
|||
#
|
||||
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
|
||||
import ceph
|
||||
from charmhelpers.core.hookenv import (
|
||||
log, ERROR,
|
||||
log,
|
||||
INFO,
|
||||
ERROR,
|
||||
config,
|
||||
relation_ids,
|
||||
related_units,
|
||||
|
@ -26,7 +27,6 @@ from charmhelpers.core.hookenv import (
|
|||
Hooks, UnregisteredHookError,
|
||||
service_name
|
||||
)
|
||||
|
||||
from charmhelpers.core.host import (
|
||||
service_restart,
|
||||
umount,
|
||||
|
@ -45,7 +45,6 @@ from charmhelpers.contrib.network.ip import (
|
|||
get_ipv6_addr,
|
||||
format_ipv6_addr
|
||||
)
|
||||
|
||||
from utils import (
|
||||
render_template,
|
||||
get_public_addr,
|
||||
|
@ -297,17 +296,17 @@ def client_relation_joined(relid=None):
|
|||
@hooks.hook('client-relation-changed')
|
||||
def client_relation_changed(relid=None):
|
||||
"""Process broker requests from ceph client relations."""
|
||||
if ceph.is_quorum() and ceph.is_leader():
|
||||
if ceph.is_quorum():
|
||||
settings = relation_get(rid=relid)
|
||||
if 'broker_req' in settings:
|
||||
req = settings['broker_req']
|
||||
log("Broker request received from ceph client")
|
||||
exit_code = process_requests(json.loads(req))
|
||||
# Construct JSON response dict allowing other data to be added as
|
||||
# and when we need it.
|
||||
resp = json.dumps({'exit_code': exit_code})
|
||||
relation_set(relation_id=relid,
|
||||
relation_settings={'broker_rsp': resp})
|
||||
if not ceph.is_leader():
|
||||
log("Not leader - ignoring broker request", level=INFO)
|
||||
else:
|
||||
req = settings['broker_req']
|
||||
log("Broker request received from ceph client")
|
||||
rsp = process_requests(req)
|
||||
relation_set(relation_id=relid,
|
||||
relation_settings={'broker_rsp': rsp})
|
||||
else:
|
||||
log('mon cluster not in quorum')
|
||||
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import json
|
||||
import mock
|
||||
import unittest
|
||||
|
||||
|
@ -11,13 +12,34 @@ class CephBrokerTestCase(unittest.TestCase):
|
|||
|
||||
@mock.patch('ceph_broker.log')
|
||||
def test_process_requests_noop(self, mock_log):
|
||||
rc = ceph_broker.process_requests([{}])
|
||||
self.assertEqual(rc, 1)
|
||||
req = json.dumps({'version': 1, 'ops': []})
|
||||
rc = ceph_broker.process_requests(req)
|
||||
self.assertEqual(json.loads(rc), {'exit_code': 0})
|
||||
|
||||
@mock.patch('ceph_broker.log')
|
||||
def test_process_requests_missing_api_version(self, mock_log):
|
||||
req = json.dumps({'ops': []})
|
||||
rc = ceph_broker.process_requests(req)
|
||||
self.assertEqual(json.loads(rc), {'exit_code': 1,
|
||||
'stderr':
|
||||
('Missing or invalid api version '
|
||||
'(None)')})
|
||||
|
||||
@mock.patch('ceph_broker.log')
|
||||
def test_process_requests_invalid_api_version(self, mock_log):
|
||||
req = json.dumps({'version': 2, 'ops': []})
|
||||
rc = ceph_broker.process_requests(req)
|
||||
self.assertEqual(json.loads(rc),
|
||||
{'exit_code': 1,
|
||||
'stderr': 'Missing or invalid api version (2)'})
|
||||
|
||||
@mock.patch('ceph_broker.log')
|
||||
def test_process_requests_invalid(self, mock_log):
|
||||
rc = ceph_broker.process_requests([{'op': 'invalid_op'}])
|
||||
self.assertEqual(rc, 1)
|
||||
reqs = json.dumps({'version': 1, 'ops': [{'op': 'invalid_op'}]})
|
||||
rc = ceph_broker.process_requests(reqs)
|
||||
self.assertEqual(json.loads(rc),
|
||||
{'exit_code': 1,
|
||||
'stderr': "Unknown operation 'invalid_op'"})
|
||||
|
||||
@mock.patch('ceph_broker.create_pool')
|
||||
@mock.patch('ceph_broker.pool_exists')
|
||||
|
@ -25,12 +47,14 @@ class CephBrokerTestCase(unittest.TestCase):
|
|||
def test_process_requests_create_pool(self, mock_log, mock_pool_exists,
|
||||
mock_create_pool):
|
||||
mock_pool_exists.return_value = False
|
||||
rc = ceph_broker.process_requests([{'op': 'create_pool', 'name': 'foo',
|
||||
'replicas': 3}])
|
||||
reqs = json.dumps({'version': 1,
|
||||
'ops': [{'op': 'create_pool', 'name':
|
||||
'foo', 'replicas': 3}]})
|
||||
rc = ceph_broker.process_requests(reqs)
|
||||
mock_pool_exists.assert_called_with(service='admin', name='foo')
|
||||
mock_create_pool.assert_called_with(service='admin', name='foo',
|
||||
replicas=3)
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertEqual(json.loads(rc), {'exit_code': 0})
|
||||
|
||||
@mock.patch('ceph_broker.create_pool')
|
||||
@mock.patch('ceph_broker.pool_exists')
|
||||
|
@ -39,8 +63,10 @@ class CephBrokerTestCase(unittest.TestCase):
|
|||
mock_pool_exists,
|
||||
mock_create_pool):
|
||||
mock_pool_exists.return_value = True
|
||||
rc = ceph_broker.process_requests([{'op': 'create_pool', 'name': 'foo',
|
||||
'replicas': 3}])
|
||||
reqs = json.dumps({'version': 1,
|
||||
'ops': [{'op': 'create_pool', 'name': 'foo',
|
||||
'replicas': 3}]})
|
||||
rc = ceph_broker.process_requests(reqs)
|
||||
mock_pool_exists.assert_called_with(service='admin', name='foo')
|
||||
self.assertFalse(mock_create_pool.called)
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertEqual(json.loads(rc), {'exit_code': 0})
|
||||
|
|
Loading…
Reference in New Issue