Switch requires from RelationBase to Endpoint

Switch requires relation from the old api
charms.reactive.relations.RelationBase to the replacement
charms.reactive.endpoints.Endpoint.

Change-Id: I6fe7cab654d5768f521ffa4218ae5c3974ef4ae8
This commit is contained in:
Liam Young 2020-08-17 08:16:55 +00:00
parent cd3626b047
commit fc168f4e7b
3 changed files with 187 additions and 131 deletions

View File

@ -14,7 +14,33 @@
from .lib import base_requires
from charms.reactive import (
when,
)
class CephClientRequires(base_requires.CephRequires):
pass
@when('endpoint.{endpoint_name}.joined')
def joined(self):
super().joined()
@when('endpoint.{endpoint_name}.changed')
def changed(self):
super().changed()
@when('endpoint.{endpoint_name}.departed')
def departed(self):
super().changed()
@when('endpoint.{endpoint_name}.broken')
def broken(self):
super().broken()
def initial_ceph_response(self):
data = {
'key': self.key(),
'auth': self.auth(),
'mon_hosts': self.mon_hosts()
}
return data

View File

@ -14,61 +14,63 @@
import json
from charms.reactive import hook
from charms.reactive import RelationBase
from charms.reactive import scopes
from charmhelpers.core import hookenv
import charms.reactive as reactive
from charmhelpers.core.hookenv import log
from charmhelpers.contrib.network.ip import format_ipv6_addr
from charmhelpers.contrib.storage.linux.ceph import (
CephBrokerRq,
is_request_complete,
send_request_if_needed,
is_request_sent,
)
class CephRequires(RelationBase):
scope = scopes.GLOBAL
class CephRequires(reactive.Endpoint):
auto_accessors = ['auth', 'key']
@hook('{requires:ceph-client}-relation-{joined}')
def joined(self):
self.set_state('{relation_name}.connected')
reactive.set_flag(self.expand_name('{endpoint_name}.connected'))
def key(self):
return self.all_joined_units.received.get('key')
def auth(self):
return self.all_joined_units.received.get('auth')
@property
def relation_name(self):
return self.expand_name('{endpoint_name}')
def initial_ceph_response(self):
raise NotImplementedError
@hook('{requires:ceph-client}-relation-{changed,departed}')
def changed(self):
data = {
'key': self.key(),
'auth': self.auth(),
'mon_hosts': self.mon_hosts()
}
data = self.initial_ceph_response()
if all(data.values()):
self.set_state('{relation_name}.available')
reactive.set_flag(self.expand_name('{endpoint_name}.available'))
json_rq = self.get_local(key='broker_req')
if json_rq:
rq = CephBrokerRq()
j = json.loads(json_rq)
rq.ops = j['ops']
rq = self.get_current_request()
if rq:
log("changed broker_req: {}".format(rq.ops))
if rq and is_request_complete(rq,
relation=self.relation_name):
if rq and is_request_complete(rq, relation=self.relation_name):
log("Setting ceph-client.pools.available")
self.set_state('{relation_name}.pools.available')
reactive.set_flag(
self.expand_name('{endpoint_name}.pools.available'))
else:
log("incomplete request. broker_req not found")
@hook('{requires:ceph-client}-relation-{broken}')
def broken(self):
self.remove_state('{relation_name}.available')
self.remove_state('{relation_name}.connected')
self.remove_state('{relation_name}.pools.available')
reactive.clear_flag(
self.expand_name('{endpoint_name}.available'))
reactive.clear_flag(
self.expand_name('{endpoint_name}.connected'))
reactive.clear_flag(
self.expand_name('{endpoint_name}.pools.available'))
def create_replicated_pool(self, name, replicas=3, weight=None,
pg_num=None, group=None, namespace=None):
pg_num=None, group=None, namespace=None,
app_name=None):
"""
Request pool setup
@ -82,17 +84,22 @@ class CephRequires(RelationBase):
@param group: Group to add pool to.
@param namespace: A group can optionally have a namespace defined that
will be used to further restrict pool access.
@param app_name: (Optional) Tag pool with application name. Note that
there is certain protocols emerging upstream with
regard to meaningful application names to use.
Examples are ``rbd`` and ``rgw``.
"""
rq = self.get_current_request()
rq = self.get_current_request() or CephBrokerRq()
rq.add_op_create_replicated_pool(name=name,
replica_count=replicas,
pg_num=pg_num,
weight=weight,
group=group,
namespace=namespace)
self.set_local(key='broker_req', value=rq.request)
send_request_if_needed(rq, relation=self.relation_name)
self.remove_state('{relation_name}.pools.available')
namespace=namespace,
app_name=app_name)
self.send_request_if_needed(rq)
reactive.clear_flag(
self.expand_name('{endpoint_name}.pools.available'))
def create_pool(self, name, replicas=3, weight=None, pg_num=None,
group=None, namespace=None):
@ -130,7 +137,7 @@ class CephRequires(RelationBase):
@param max_objects: Maximum object quota to apply
@param allow_ec_overwrites: Allow EC pools to be overwritten
"""
rq = self.get_current_request()
rq = self.get_current_request() or CephBrokerRq()
rq.add_op_create_erasure_pool(name=name,
erasure_profile=erasure_profile,
weight=weight,
@ -139,9 +146,9 @@ class CephRequires(RelationBase):
max_bytes=max_bytes,
max_objects=max_objects,
allow_ec_overwrites=allow_ec_overwrites)
self.set_local(key='broker_req', value=rq.request)
send_request_if_needed(rq, relation=self.relation_name)
self.remove_state('{relation_name}.pools.available')
self.send_request_if_needed(rq, relation=self.relation_name)
reactive.clear_flag(
self.expand_name('{endpoint_name}.pools.available'))
def create_erasure_profile(self, name,
erasure_type='jerasure',
@ -181,7 +188,7 @@ class CephRequires(RelationBase):
Type of crush bucket in which set of chunks
defined by lrc_locality will be stored.
"""
rq = self.get_current_request()
rq = self.get_current_request() or CephBrokerRq()
rq.add_op_create_erasure_profile(
name=name,
erasure_type=erasure_type,
@ -195,12 +202,13 @@ class CephRequires(RelationBase):
clay_scalar_mds=clay_scalar_mds,
lrc_crush_locality=lrc_crush_locality
)
self.set_local(key='broker_req', value=rq.request)
send_request_if_needed(rq, relation=self.relation_name)
self.remove_state('{relation_name}.pools.available')
self.send_request_if_needed(rq, relation=self.relation_name)
reactive.clear_flag(
self.expand_name('{endpoint_name}.pools.available'))
def request_access_to_group(self, name, namespace=None, permission=None,
key_name=None, object_prefix_permissions=None):
key_name=None,
object_prefix_permissions=None):
"""
Adds the requested permissions to service's Ceph key
@ -218,44 +226,49 @@ class CephRequires(RelationBase):
@param key_name: userid to grant permission to
@param object_prefix_permissions: Add object_prefix permissions.
"""
current_request = self.get_current_request()
current_request = self.get_current_request() or CephBrokerRq()
current_request.add_op_request_access_to_group(
name,
namespace=namespace,
permission=permission,
key_name=key_name,
object_prefix_permissions=object_prefix_permissions)
self.set_local(key='broker_req', value=current_request.request)
send_request_if_needed(current_request, relation=self.relation_name)
self.send_request_if_needed(current_request)
def send_request_if_needed(self, request):
"""Send broker request if an equivalent request has not been sent
@param request: A CephBrokerRq object
"""
if is_request_sent(request, relation=self.relation_name):
log('Request already sent but not complete, '
'not sending new request')
else:
for relation in self.relations:
relation.to_publish['broker_req'] = json.loads(
request.request)
def get_current_request(self):
"""Return the current broker request for the interface."""
# json.dumps of the CephBrokerRq()
rq = CephBrokerRq()
json_rq = self.get_local(key='broker_req')
if json_rq:
try:
j = json.loads(json_rq)
log("Json request: {}".format(json_rq))
rq.set_ops(j['ops'])
except ValueError as err:
log("Unable to decode broker_req: {}. Error {}".format(
json_rq, err))
return rq
broker_reqs = []
for relation in self.relations:
broker_req = relation.to_publish.get('broker_req', {})
if broker_req:
rq = CephBrokerRq()
rq.set_ops(broker_req['ops'])
broker_reqs.append(rq)
# Check that if there are multiple requests then they are the same.
assert all(x == broker_reqs[0] for x in broker_reqs)
if broker_reqs:
return broker_reqs[0]
def get_remote_all(self, key, default=None):
"""Return a list of all values presented by remote units for key"""
# TODO: might be a nicer way todo this - written a while back!
values = []
for conversation in self.conversations():
for relation_id in conversation.relation_ids:
for unit in hookenv.related_units(relation_id):
value = hookenv.relation_get(key,
unit,
relation_id) or default
if value:
values.append(value)
for relation in self.relations:
for unit in relation.units:
value = unit.received.get(key, default)
if value:
values.append(value)
return list(set(values))
def mon_hosts(self):

View File

@ -10,10 +10,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import unittest
from unittest import mock
from charmhelpers.contrib.storage.linux.ceph import (
CephBrokerRq,
)
with mock.patch('charmhelpers.core.hookenv.metadata') as _meta:
_meta.return_Value = 'ss'
@ -27,7 +30,6 @@ _hook_args = {}
TO_PATCH = []
TO_PATCH_BASE_REQUIRES = [
'is_request_complete',
'send_request_if_needed',
]
@ -41,6 +43,14 @@ def mock_hook(*args, **kwargs):
return inner
class DummyRequest(CephBrokerRq):
def __init__(self, req_json=None, request_id=12):
super().__init__(request_id=request_id)
if req_json:
self.set_ops(json.loads(req_json)['ops'])
class TestCephClientRequires(unittest.TestCase):
@classmethod
@ -84,6 +94,8 @@ class TestCephClientRequires(unittest.TestCase):
setattr(self, method, self.patch(method))
for method in TO_PATCH_BASE_REQUIRES:
setattr(self, method, self.patch(method, requires.base_requires))
self.patch_object(requires.base_requires.reactive, 'set_flag')
self.patch_object(requires.base_requires.reactive, 'clear_flag')
def tearDown(self):
self.cr = None
@ -101,6 +113,26 @@ class TestCephClientRequires(unittest.TestCase):
self._patches_start[attr] = started
setattr(self, attr, started)
def patch_object(self, obj, attr, name=None, **kwargs):
"""Patch a patchable thing. Uses mock.patch.object() to do the work.
Automatically unpatches at the end of the test.
The mock gets added to the test object (self) using 'name' or the attr
passed in the arguments.
:param obj: an object that needs to have an attribute patched.
:param attr: <string> that represents the attribute being patched.
:param name: optional <string> name to call the mock.
:param **kwargs: any other args to pass to mock.patch()
"""
mocked = mock.patch.object(obj, attr, **kwargs)
if name is None:
name = attr
started = mocked.start()
self._patches[name] = mocked
self._patches_start[name] = started
setattr(self, name, started)
def test_registered_hooks(self):
# test that the decorators actually registered the relation
# expressions that are meaningful for this interface: this is to
@ -109,78 +141,59 @@ class TestCephClientRequires(unittest.TestCase):
hook_patterns = {
'data_changed': ('endpoint.{endpoint_name}.changed', ),
'joined': ('endpoint.{endpoint_name}.joined', ),
'broken': ('endpoint.{endpoint_name}.joined', ),
'broken': ('endpoint.{endpoint_name}.broken', ),
'changed': ('endpoint.{endpoint_name}.changed', ),
'departed': ('endpoint.{endpoint_name}.departed', ),
}
for k, v in _hook_args.items():
self.assertEqual(hook_patterns[k], v['args'])
def test_date_changed(self):
def test_data_changed(self):
self.patch_kr('key', 'key1')
self.patch_kr('auth', 'auth1')
self.patch_kr('mon_hosts', 'host1')
self.patch_kr('get_local', None)
self.patch_kr('set_state')
self.cr.changed()
self.set_state.assert_called_once_with('{relation_name}.available')
self.set_flag.assert_called_once_with('some-relation.available')
def test_date_changed_incomplete(self):
def test_data_changed_incomplete(self):
self.patch_kr('key', 'key1')
self.patch_kr('auth', None)
self.patch_kr('mon_hosts', 'host1')
self.patch_kr('get_local', None)
self.patch_kr('set_state')
self.cr.changed()
self.assertFalse(self.set_state.called)
self.assertFalse(self.set_flag.called)
def test_date_changed_existing_broker_rq(self):
broker_req = (
'{"api-version": 1, '
'"request-id": "4f7e247d-f953-11e8-a4f3-fa163e55565e",'
'"ops": [{"group": "volumes", "name": "cinder-ceph", '
'"weight": 40, "replicas": 3, "pg_num": null, '
'"group-namespace": null, "op": "create-pool"}]}')
def test_data_changed_existing_broker_rq(self):
self.patch_kr('key', 'key1')
self.patch_kr('auth', 'auth1')
self.patch_kr('mon_hosts', 'host1')
self.patch_kr('get_local', broker_req)
self.patch_kr('set_state')
self.patch_kr('get_current_request', DummyRequest())
self.is_request_complete.return_value = True
self.cr.changed()
self.set_state.assert_has_calls([
mock.call('{relation_name}.available'),
mock.call('{relation_name}.pools.available')])
self.set_flag.assert_has_calls([
mock.call('some-relation.available'),
mock.call('some-relation.pools.available')])
def test_date_changed_existing_broker_rq_incomplete(self):
broker_req = (
'{"api-version": 1, '
'"request-id": "4f7e247d-f953-11e8-a4f3-fa163e55565e",'
'"ops": [{"group": "volumes", "name": "cinder-ceph", '
'"weight": 40, "replicas": 3, "pg_num": null, '
'"group-namespace": null, "op": "create-pool"}]}')
self.patch_kr('key', 'key1')
self.patch_kr('auth', 'auth1')
self.patch_kr('mon_hosts', 'host1')
self.patch_kr('get_local', broker_req)
self.patch_kr('set_state')
self.is_request_complete.return_value = False
self.cr.changed()
# Side effect of asserting pools.available was not set.
self.set_state.assert_called_once_with('{relation_name}.available')
self.set_flag.assert_called_once_with('some-relation.available')
def test_broken(self):
self.patch_kr('remove_state')
self.cr.broken()
self.remove_state.assert_has_calls([
mock.call('{relation_name}.available'),
mock.call('{relation_name}.connected'),
mock.call('{relation_name}.pools.available')])
self.clear_flag.assert_has_calls([
mock.call('some-relation.available'),
mock.call('some-relation.connected'),
mock.call('some-relation.pools.available')])
@mock.patch.object(charmhelpers.contrib.storage.linux.ceph.uuid, 'uuid1')
def test_create_pool_new_request(self, _uuid1):
self.patch_kr('remove_state')
self.patch_kr('get_current_request', None)
self.patch_kr('send_request_if_needed')
_uuid1.return_value = '9e34123e-fa0c-11e8-ad9c-fa163ed1cc55'
self.patch_kr('get_local', None)
self.patch_kr('set_local')
self.cr.create_pool('bob')
ceph_broker_rq = self.send_request_if_needed.mock_calls[0][1][0]
self.assertEqual(
@ -199,7 +212,7 @@ class TestCephClientRequires(unittest.TestCase):
@mock.patch.object(charmhelpers.contrib.storage.linux.ceph.uuid, 'uuid1')
def test_create_pool_existing_request(self, _uuid1):
self.patch_kr('remove_state')
self.patch_kr('send_request_if_needed')
_uuid1.return_value = '9e34123e-fa0c-11e8-ad9c-fa163ed1cc55'
req = (
'{"api-version": 1, '
@ -208,7 +221,8 @@ class TestCephClientRequires(unittest.TestCase):
'"group-namespace": null, "app-name": null, "max-bytes": null, '
'"max-objects": null}], '
'"request-id": "9e34123e-fa0c-11e8-ad9c-fa163ed1cc55"}')
self.patch_kr('get_local', req)
existing_request = DummyRequest(req_json=req)
self.patch_kr('get_current_request', existing_request)
self.cr.create_pool('bob')
ceph_broker_rq = self.send_request_if_needed.mock_calls[0][1][0]
self.assertEqual(
@ -226,8 +240,7 @@ class TestCephClientRequires(unittest.TestCase):
'weight': None}])
def test_request_access_to_group_new_request(self):
self.patch_kr('get_local', '{"ops": []}')
self.patch_kr('set_local')
self.patch_kr('send_request_if_needed')
self.cr.request_access_to_group(
'volumes',
key_name='cinder',
@ -245,21 +258,22 @@ class TestCephClientRequires(unittest.TestCase):
'op': 'add-permissions-to-key'}])
def test_request_access_to_group_existing_request(self):
self.patch_kr('send_request_if_needed')
req = (
'{"api-version": 1, '
'"ops": [{"op": "create-pool", "name": "volumes", "replicas": 3, '
'"pg_num": null, "weight": null, "group": null, '
'"group-namespace": null}], '
'"request-id": "9e34123e-fa0c-11e8-ad9c-fa163ed1cc55"}')
self.patch_kr('get_local', req)
existing_request = DummyRequest(req_json=req)
self.patch_kr('get_current_request', existing_request)
self.cr.request_access_to_group(
'volumes',
key_name='cinder',
object_prefix_permissions={'class-read': ['rbd_children']},
permission='rwx')
ceph_broker_rq = self.send_request_if_needed.mock_calls[0][1][0]
self.assertEqual(
ceph_broker_rq.ops,
existing_request.ops,
[
{
'op': 'create-pool',
@ -278,9 +292,7 @@ class TestCephClientRequires(unittest.TestCase):
'class-read': ['rbd_children']},
'op': 'add-permissions-to-key'}])
@mock.patch.object(requires.base_requires.hookenv, 'related_units')
@mock.patch.object(requires.base_requires.hookenv, 'relation_get')
def test_get_remote_all(self, relation_get, related_units):
def test_get_remote_all(self):
unit_data = {
'rid:1': {
'app1/0': {
@ -294,16 +306,21 @@ class TestCephClientRequires(unittest.TestCase):
'key1': 'value1',
'key2': 'value3'}},
'rid:3': {}}
unit0_r1_mock = mock.MagicMock()
unit0_r1_mock.received = unit_data['rid:1']['app1/0']
unit1_r1_mock = mock.MagicMock()
unit1_r1_mock.received = unit_data['rid:1']['app1/1']
unit0_r2_mock = mock.MagicMock()
unit0_r2_mock.received = unit_data['rid:2']['app2/0']
rel1 = mock.MagicMock()
rel1.units = [unit0_r1_mock, unit1_r1_mock]
rel2 = mock.MagicMock()
rel2.units = [unit0_r2_mock]
rel3 = mock.MagicMock()
rel3.units = []
def get_unit_data(key, unit, relation_id):
return unit_data[relation_id].get(unit, {}).get(key, {})
conv1 = mock.MagicMock()
conv1.relation_ids = ['rid:1', 'rid:2']
conv2 = mock.MagicMock()
conv2.relation_ids = ['rid:3']
self.patch_kr('conversations', [conv1, conv2])
related_units.side_effect = lambda x: unit_data[x].keys()
relation_get.side_effect = get_unit_data
self.patch_kr('_relations')
self._relations.__iter__.return_value = [rel1, rel2, rel3]
# Check de-duplication:
self.assertEqual(
self.cr.get_remote_all('key1'),