Add pool creation, fix individual key naming

Adds a pool creation function that works with Endpoint based interfaces.
This commit is contained in:
Frode Nordahl 2019-02-25 00:28:10 +03:00
parent b2ab9c6bcf
commit b54d497d91
3 changed files with 143 additions and 38 deletions

View File

@ -13,6 +13,7 @@
# limitations under the License.
import ipaddress
import json
import socket
# the reactive framework unfortunately does not grok `import as` in conjunction
@ -20,46 +21,131 @@ import socket
# imports
from charms.reactive import (
Endpoint,
all_flags_set,
clear_flag,
set_flag,
when,
when_all,
when_not,
)
import charmhelpers.contrib.storage.linux.ceph as ch_ceph
class CephRBDMirrorRequires(Endpoint):
def __init__(self, endpoint_name, relation_ids=None, unique_id=None):
"""Initialize unique ID.
This is used when requesting a key from Ceph.
The constructor exists mainly for testing purposes.
"""
if unique_id:
self.unique_id = unique_id
else:
self.unique_id = socket.gethostname()
self.key_name = '{}_key'.format(self.unique_id)
super().__init__(endpoint_name, relation_ids=relation_ids)
@when('endpoint.{endpoint_name}.joined')
def joined(self):
set_flag(self.expand_name('{endpoint_name}.connected'))
@when_all('endpoint.{endpoint_name}.changed.auth',
'endpoint.{endpoint_name}.changed.key',
'endpoint.{endpoint_name}.changed.ceph-public-address')
@when('endpoint.{endpoint_name}.changed')
def changed(self):
for key in ('auth', 'key', 'ceph-public-address'):
clear_flag(
self.expand_name(
'endpoint.{endpoint_name}.changed.' + key))
set_flag(self.expand_name('{endpoint_name}.available'))
flags = (
self.expand_name(
'endpoint.{endpoint_name}.changed.auth'),
self.expand_name(
'endpoint.{endpoint_name}.changed.' + self.key_name),
self.expand_name(
'endpoint.{endpoint_name}.changed.ceph-public-address'),
)
if all_flags_set(*flags):
for flag in (flags):
clear_flag(flag)
set_flag(self.expand_name('{endpoint_name}.available'))
@when_not('endpoint.{endpoint_name}.joined')
def broken(self):
clear_flag(self.expand_name('{endpoint_name}.available'))
clear_flag(self.expand_name('{endpoint_name}.connected'))
def request_key(self, unique_id=None):
if not unique_id:
unique_id = socket.gethostname()
def request_key(self):
for relation in self.relations:
relation.to_publish['unique_id'] = unique_id
relation.to_publish['unique_id'] = self.unique_id
def get_current_request(self):
"""
Retrieve the current Ceph broker request.
If no request has been created yet then create a new one.
"""
json_rq = self.all_joined_units.received['broker_req']
current_request = ch_ceph.CephBrokerRq()
if json_rq:
try:
j = json.loads(json_rq)
current_request.set_ops(j['ops'])
except (KeyError, json.decoder.JSONDecodeError):
raise
return current_request
def create_pool(self, name, replicas=3, weight=None, pg_num=None,
group=None, namespace=None, app_name=None):
"""
Request pool setup
:param name: Name of pool to create
:type name: str
:param replicas: Number of replicas for supporting pools
:type replicas: int
:param weight: The percentage of data the pool makes up
:type weight: int
:param pg_num: If not provided, this value will be calculated by the
broker based on how many OSDs are in the cluster at the
time of creation. Note that, if provided, this value
will be capped at the current available maximum.
:type pg_num: int
:param group: Group to add pool to.
:type group: str
:param namespace: A group can optionally have a namespace defined that
will be used to further restrict pool access.
:type namespace: str
:param app_name: Name of application using the pool (e.g. ``cephfs``,
``rbd``, ``rgw``)
:type app_name: str
"""
current_request = self.get_current_request()
current_request.add_op_create_pool(
name="{}".format(name),
replica_count=replicas,
pg_num=pg_num,
weight=weight,
group=group,
namespace=namespace,
app_name=app_name)
ch_ceph.send_request_if_needed(current_request,
relation=self.endpoint_name)
@property
def auth(self):
return self.all_joined_units.received['auth']
@property
def key(self):
return self.all_joined_units.received[self.key_name]
@property
def mon_hosts(self):
for relation in self.relations:
for unit in relation.units:
addr = ipaddress.ip_address(
unit.received['ceph-public-address'])
try:
addr = ipaddress.ip_address(
unit.received.get('ceph-public-address', ''))
except ValueError:
continue
port = 6789
if isinstance(addr, ipaddress.IPv6Address):
yield '[{}]:{}'.format(addr, port)
@ -67,5 +153,13 @@ class CephRBDMirrorRequires(Endpoint):
yield '{}:{}'.format(addr, port)
@property
def key(self):
return self.all_units.received['key']
def public_network(self):
pass
@property
def cluster_network(self):
pass
@property
def pools(self):
return self.all_joined_units.received['pools']

View File

@ -20,3 +20,11 @@ sys.path.append('src/lib')
# Mock out charmhelpers so that we can test without it.
import charms_openstack.test_mocks # noqa
charms_openstack.test_mocks.mock_charmhelpers()
charmhelpers = charms_openstack.test_mocks.charmhelpers
sys.modules['charmhelpers.contrib.storage'] = (
charmhelpers.contrib.storage)
sys.modules['charmhelpers.contrib.storage.linux'] = (
charmhelpers.contrib.storage.linux)
sys.modules['charmhelpers.contrib.storage.linux.ceph'] = (
charmhelpers.contrib.storage.linux.ceph)

View File

@ -29,19 +29,14 @@ class TestRegisteredHooks(test_utils.TestRegisteredHooks):
'when': {
'joined': (
'endpoint.{endpoint_name}.joined',),
},
'when_all': {
'changed': (
'endpoint.{endpoint_name}.changed.auth',
'endpoint.{endpoint_name}.changed.key',
'endpoint.{endpoint_name}.changed.ceph-public-address',),
'endpoint.{endpoint_name}.changed',),
},
'when_not': {
'broken': ('endpoint.{endpoint_name}.joined',),
},
}
# test that the hooks were registered via the
# reactive.barbican_handlers
# test that the hooks were registered
self.registered_hooks_test_helper(requires, hook_set, defaults)
@ -50,7 +45,7 @@ class TestCephRBDMirrorRequires(test_utils.PatchHelper):
def setUp(self):
super().setUp()
self.requires_class = requires.CephRBDMirrorRequires(
'some-relation', [])
'some-endpoint', [], unique_id='some-hostname')
self._patches = {}
self._patches_start = {}
@ -81,44 +76,52 @@ class TestCephRBDMirrorRequires(test_utils.PatchHelper):
def test_joined(self):
self.patch_object(requires, 'set_flag')
self.requires_class.joined()
self.set_flag.assert_called_once_with('some-relation.connected')
self.set_flag.assert_called_once_with('some-endpoint.connected')
def test_changed(self):
self.patch_object(requires, 'all_flags_set')
self.patch_object(requires, 'clear_flag')
self.patch_object(requires, 'set_flag')
self.all_flags_set.return_value = True
self.requires_class.changed()
self.all_flags_set.assert_called_with(
'endpoint.some-endpoint.changed.auth',
'endpoint.some-endpoint.changed.some-hostname_key',
'endpoint.some-endpoint.changed.ceph-public-address',
)
self.clear_flag.assert_has_calls([
mock.call('endpoint.some-relation.changed.auth'),
mock.call('endpoint.some-relation.changed.key'),
mock.call('endpoint.some-relation.changed.ceph-public-address'),
mock.call('endpoint.some-endpoint.changed.auth'),
mock.call('endpoint.some-endpoint.changed.some-hostname_key'),
mock.call('endpoint.some-endpoint.changed.ceph-public-address'),
])
self.set_flag.assert_called_once_with('some-relation.available')
self.set_flag.assert_called_once_with('some-endpoint.available')
def test_broken(self):
self.patch_object(requires, 'clear_flag')
self.requires_class.broken()
self.clear_flag.assert_has_calls([
mock.call('some-relation.available'),
mock.call('some-relation.connected'),
mock.call('some-endpoint.available'),
mock.call('some-endpoint.connected'),
])
def test_request_key(self):
self.patch_object(requires, 'socket')
self.socket.gethostname.return_value = 'somehostname'
to_publish = self.patch_topublish()
self.requires_class.request_key()
to_publish.__setitem__.assert_called_with('unique_id', 'somehostname')
self.requires_class.request_key(unique_id='unicorn')
to_publish.__setitem__.assert_called_with('unique_id', 'unicorn')
to_publish.__setitem__.assert_called_with('unique_id', 'some-hostname')
def test_mon_hosts(self):
self.patch_requires_class('_relations')
relation = mock.MagicMock()
unit_incomplete = mock.MagicMock()
unit_incomplete.received = {}
unit_invalid = mock.MagicMock()
unit_invalid.received = {'ceph-public-address': None}
unitv6 = mock.MagicMock()
unitv6.received = {'ceph-public-address': '2001:db8:42::1'}
unitv4 = mock.MagicMock()
unitv4.received = {'ceph-public-address': '192.0.2.1'}
relation.units.__iter__.return_value = [unitv6, unitv4]
relation.units.__iter__.return_value = [unit_incomplete, unitv6,
unit_invalid, unitv4]
self._relations.__iter__.return_value = [relation]
self.assertEqual(list(self.requires_class.mon_hosts),
['[2001:db8:42::1]:6789', '192.0.2.1:6789'])