added unit tests

This commit is contained in:
Edward Hope-Morley 2014-11-08 20:15:17 +00:00
parent a4988b051b
commit c0c2512df6
5 changed files with 57 additions and 14 deletions

0
hooks/__init__.py Normal file
View File

View File

@ -23,16 +23,21 @@ def process_requests(reqs):
# setup to use them for these operations.
svc = 'admin'
if op == "create_pool":
pool = req.get('pool')
pool = req.get('name')
replicas = req.get('replicas')
if not all([pool, replicas]):
log("Missing parameter(s)", level=ERROR)
return 1
if not pool_exists(service=svc, name=pool):
log("Creating pool '%s'" % (pool), level=INFO)
log("Creating pool '%s' (replicas=%s)" % (pool, replicas),
level=INFO)
create_pool(service=svc, name=pool, replicas=replicas)
else:
log("Pool '%s' already exists" % (pool), level=INFO)
log("Pool '%s' already exists - skipping create" % (pool),
level=INFO)
else:
log("Unknown operation '%s'" % (op))
return 1
return 0

View File

@ -296,17 +296,18 @@ def client_relation_joined(relid=None):
@hooks.hook('client-relation-changed')
def client_relation_changed(relid=None):
if ceph.is_quorum():
resp = None
"""Process broker requests from ceph client relations."""
if ceph.is_quorum() and ceph.is_leader():
settings = relation_get(rid=relid)
if 'broker_req' in settings:
req = settings['broker_req']
log("Broker request received")
resp = process_requests(json.loads(req))
if resp is not None:
relation_set(relation_id=relid,
relation_settings={'broker_rsp': resp})
log("Broker request received from ceph client")
exit_code = process_requests(json.loads(req))
# Construct JSON response dict allowing other data to be added as
# and when we need it.
resp = json.dumps({'exit_code': exit_code})
relation_set(relation_id=relid,
relation_settings={'broker_rsp': resp})
else:
log('mon cluster not in quorum')

2
unit_tests/__init__.py Normal file
View File

@ -0,0 +1,2 @@
import sys
sys.path.append('hooks')

View File

@ -1,11 +1,46 @@
#import mock
import mock
import unittest
import ceph_broker
class CephBrokerTestCase(unittest.TestCase):
def setUp(self):
super(CephBrokerTestCase, self).setUp()
def test_process_requests(self):
pass
@mock.patch('ceph_broker.log')
def test_process_requests_noop(self, mock_log):
rc = ceph_broker.process_requests([{}])
self.assertEqual(rc, 1)
@mock.patch('ceph_broker.log')
def test_process_requests_invalid(self, mock_log):
rc = ceph_broker.process_requests([{'op': 'invalid_op'}])
self.assertEqual(rc, 1)
@mock.patch('ceph_broker.create_pool')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.log')
def test_process_requests_create_pool(self, mock_log, mock_pool_exists,
mock_create_pool):
mock_pool_exists.return_value = False
rc = ceph_broker.process_requests([{'op': 'create_pool', 'name': 'foo',
'replicas': 3}])
mock_pool_exists.assert_called_with(service='admin', name='foo')
mock_create_pool.assert_called_with(service='admin', name='foo',
replicas=3)
self.assertEqual(rc, 0)
@mock.patch('ceph_broker.create_pool')
@mock.patch('ceph_broker.pool_exists')
@mock.patch('ceph_broker.log')
def test_process_requests_create_pool_exists(self, mock_log,
mock_pool_exists,
mock_create_pool):
mock_pool_exists.return_value = True
rc = ceph_broker.process_requests([{'op': 'create_pool', 'name': 'foo',
'replicas': 3}])
mock_pool_exists.assert_called_with(service='admin', name='foo')
self.assertFalse(mock_create_pool.called)
self.assertEqual(rc, 0)