Add pool creation, fix individual key naming
Adds a pool creation function that works with Endpoint based interfaces.
This commit is contained in:
parent
b2ab9c6bcf
commit
b54d497d91
128
requires.py
128
requires.py
|
@ -13,6 +13,7 @@
|
|||
# limitations under the License.
|
||||
|
||||
import ipaddress
|
||||
import json
|
||||
import socket
|
||||
|
||||
# the reactive framework unfortunately does not grok `import as` in conjunction
|
||||
|
@ -20,46 +21,131 @@ import socket
|
|||
# imports
|
||||
from charms.reactive import (
|
||||
Endpoint,
|
||||
all_flags_set,
|
||||
clear_flag,
|
||||
set_flag,
|
||||
when,
|
||||
when_all,
|
||||
when_not,
|
||||
)
|
||||
|
||||
import charmhelpers.contrib.storage.linux.ceph as ch_ceph
|
||||
|
||||
|
||||
class CephRBDMirrorRequires(Endpoint):
|
||||
|
||||
def __init__(self, endpoint_name, relation_ids=None, unique_id=None):
|
||||
"""Initialize unique ID.
|
||||
|
||||
This is used when requesting a key from Ceph.
|
||||
|
||||
The constructor exists mainly for testing purposes.
|
||||
"""
|
||||
if unique_id:
|
||||
self.unique_id = unique_id
|
||||
else:
|
||||
self.unique_id = socket.gethostname()
|
||||
self.key_name = '{}_key'.format(self.unique_id)
|
||||
super().__init__(endpoint_name, relation_ids=relation_ids)
|
||||
|
||||
@when('endpoint.{endpoint_name}.joined')
|
||||
def joined(self):
|
||||
set_flag(self.expand_name('{endpoint_name}.connected'))
|
||||
|
||||
@when_all('endpoint.{endpoint_name}.changed.auth',
|
||||
'endpoint.{endpoint_name}.changed.key',
|
||||
'endpoint.{endpoint_name}.changed.ceph-public-address')
|
||||
@when('endpoint.{endpoint_name}.changed')
|
||||
def changed(self):
|
||||
for key in ('auth', 'key', 'ceph-public-address'):
|
||||
clear_flag(
|
||||
self.expand_name(
|
||||
'endpoint.{endpoint_name}.changed.' + key))
|
||||
set_flag(self.expand_name('{endpoint_name}.available'))
|
||||
flags = (
|
||||
self.expand_name(
|
||||
'endpoint.{endpoint_name}.changed.auth'),
|
||||
self.expand_name(
|
||||
'endpoint.{endpoint_name}.changed.' + self.key_name),
|
||||
self.expand_name(
|
||||
'endpoint.{endpoint_name}.changed.ceph-public-address'),
|
||||
)
|
||||
if all_flags_set(*flags):
|
||||
for flag in (flags):
|
||||
clear_flag(flag)
|
||||
set_flag(self.expand_name('{endpoint_name}.available'))
|
||||
|
||||
@when_not('endpoint.{endpoint_name}.joined')
|
||||
def broken(self):
|
||||
clear_flag(self.expand_name('{endpoint_name}.available'))
|
||||
clear_flag(self.expand_name('{endpoint_name}.connected'))
|
||||
|
||||
def request_key(self, unique_id=None):
|
||||
if not unique_id:
|
||||
unique_id = socket.gethostname()
|
||||
def request_key(self):
|
||||
for relation in self.relations:
|
||||
relation.to_publish['unique_id'] = unique_id
|
||||
relation.to_publish['unique_id'] = self.unique_id
|
||||
|
||||
def get_current_request(self):
|
||||
"""
|
||||
Retrieve the current Ceph broker request.
|
||||
|
||||
If no request has been created yet then create a new one.
|
||||
"""
|
||||
json_rq = self.all_joined_units.received['broker_req']
|
||||
current_request = ch_ceph.CephBrokerRq()
|
||||
if json_rq:
|
||||
try:
|
||||
j = json.loads(json_rq)
|
||||
current_request.set_ops(j['ops'])
|
||||
except (KeyError, json.decoder.JSONDecodeError):
|
||||
raise
|
||||
return current_request
|
||||
|
||||
def create_pool(self, name, replicas=3, weight=None, pg_num=None,
|
||||
group=None, namespace=None, app_name=None):
|
||||
"""
|
||||
Request pool setup
|
||||
|
||||
:param name: Name of pool to create
|
||||
:type name: str
|
||||
:param replicas: Number of replicas for supporting pools
|
||||
:type replicas: int
|
||||
:param weight: The percentage of data the pool makes up
|
||||
:type weight: int
|
||||
:param pg_num: If not provided, this value will be calculated by the
|
||||
broker based on how many OSDs are in the cluster at the
|
||||
time of creation. Note that, if provided, this value
|
||||
will be capped at the current available maximum.
|
||||
:type pg_num: int
|
||||
:param group: Group to add pool to.
|
||||
:type group: str
|
||||
:param namespace: A group can optionally have a namespace defined that
|
||||
will be used to further restrict pool access.
|
||||
:type namespace: str
|
||||
:param app_name: Name of application using the pool (e.g. ``cephfs``,
|
||||
``rbd``, ``rgw``)
|
||||
:type app_name: str
|
||||
"""
|
||||
|
||||
current_request = self.get_current_request()
|
||||
current_request.add_op_create_pool(
|
||||
name="{}".format(name),
|
||||
replica_count=replicas,
|
||||
pg_num=pg_num,
|
||||
weight=weight,
|
||||
group=group,
|
||||
namespace=namespace,
|
||||
app_name=app_name)
|
||||
ch_ceph.send_request_if_needed(current_request,
|
||||
relation=self.endpoint_name)
|
||||
|
||||
@property
|
||||
def auth(self):
|
||||
return self.all_joined_units.received['auth']
|
||||
|
||||
@property
|
||||
def key(self):
|
||||
return self.all_joined_units.received[self.key_name]
|
||||
|
||||
@property
|
||||
def mon_hosts(self):
|
||||
for relation in self.relations:
|
||||
for unit in relation.units:
|
||||
addr = ipaddress.ip_address(
|
||||
unit.received['ceph-public-address'])
|
||||
try:
|
||||
addr = ipaddress.ip_address(
|
||||
unit.received.get('ceph-public-address', ''))
|
||||
except ValueError:
|
||||
continue
|
||||
port = 6789
|
||||
if isinstance(addr, ipaddress.IPv6Address):
|
||||
yield '[{}]:{}'.format(addr, port)
|
||||
|
@ -67,5 +153,13 @@ class CephRBDMirrorRequires(Endpoint):
|
|||
yield '{}:{}'.format(addr, port)
|
||||
|
||||
@property
|
||||
def key(self):
|
||||
return self.all_units.received['key']
|
||||
def public_network(self):
|
||||
pass
|
||||
|
||||
@property
|
||||
def cluster_network(self):
|
||||
pass
|
||||
|
||||
@property
|
||||
def pools(self):
|
||||
return self.all_joined_units.received['pools']
|
||||
|
|
|
@ -20,3 +20,11 @@ sys.path.append('src/lib')
|
|||
# Mock out charmhelpers so that we can test without it.
|
||||
import charms_openstack.test_mocks # noqa
|
||||
charms_openstack.test_mocks.mock_charmhelpers()
|
||||
|
||||
charmhelpers = charms_openstack.test_mocks.charmhelpers
|
||||
sys.modules['charmhelpers.contrib.storage'] = (
|
||||
charmhelpers.contrib.storage)
|
||||
sys.modules['charmhelpers.contrib.storage.linux'] = (
|
||||
charmhelpers.contrib.storage.linux)
|
||||
sys.modules['charmhelpers.contrib.storage.linux.ceph'] = (
|
||||
charmhelpers.contrib.storage.linux.ceph)
|
||||
|
|
|
@ -29,19 +29,14 @@ class TestRegisteredHooks(test_utils.TestRegisteredHooks):
|
|||
'when': {
|
||||
'joined': (
|
||||
'endpoint.{endpoint_name}.joined',),
|
||||
},
|
||||
'when_all': {
|
||||
'changed': (
|
||||
'endpoint.{endpoint_name}.changed.auth',
|
||||
'endpoint.{endpoint_name}.changed.key',
|
||||
'endpoint.{endpoint_name}.changed.ceph-public-address',),
|
||||
'endpoint.{endpoint_name}.changed',),
|
||||
},
|
||||
'when_not': {
|
||||
'broken': ('endpoint.{endpoint_name}.joined',),
|
||||
},
|
||||
}
|
||||
# test that the hooks were registered via the
|
||||
# reactive.barbican_handlers
|
||||
# test that the hooks were registered
|
||||
self.registered_hooks_test_helper(requires, hook_set, defaults)
|
||||
|
||||
|
||||
|
@ -50,7 +45,7 @@ class TestCephRBDMirrorRequires(test_utils.PatchHelper):
|
|||
def setUp(self):
|
||||
super().setUp()
|
||||
self.requires_class = requires.CephRBDMirrorRequires(
|
||||
'some-relation', [])
|
||||
'some-endpoint', [], unique_id='some-hostname')
|
||||
self._patches = {}
|
||||
self._patches_start = {}
|
||||
|
||||
|
@ -81,44 +76,52 @@ class TestCephRBDMirrorRequires(test_utils.PatchHelper):
|
|||
def test_joined(self):
|
||||
self.patch_object(requires, 'set_flag')
|
||||
self.requires_class.joined()
|
||||
self.set_flag.assert_called_once_with('some-relation.connected')
|
||||
self.set_flag.assert_called_once_with('some-endpoint.connected')
|
||||
|
||||
def test_changed(self):
|
||||
self.patch_object(requires, 'all_flags_set')
|
||||
self.patch_object(requires, 'clear_flag')
|
||||
self.patch_object(requires, 'set_flag')
|
||||
self.all_flags_set.return_value = True
|
||||
self.requires_class.changed()
|
||||
self.all_flags_set.assert_called_with(
|
||||
'endpoint.some-endpoint.changed.auth',
|
||||
'endpoint.some-endpoint.changed.some-hostname_key',
|
||||
'endpoint.some-endpoint.changed.ceph-public-address',
|
||||
)
|
||||
self.clear_flag.assert_has_calls([
|
||||
mock.call('endpoint.some-relation.changed.auth'),
|
||||
mock.call('endpoint.some-relation.changed.key'),
|
||||
mock.call('endpoint.some-relation.changed.ceph-public-address'),
|
||||
mock.call('endpoint.some-endpoint.changed.auth'),
|
||||
mock.call('endpoint.some-endpoint.changed.some-hostname_key'),
|
||||
mock.call('endpoint.some-endpoint.changed.ceph-public-address'),
|
||||
])
|
||||
self.set_flag.assert_called_once_with('some-relation.available')
|
||||
self.set_flag.assert_called_once_with('some-endpoint.available')
|
||||
|
||||
def test_broken(self):
|
||||
self.patch_object(requires, 'clear_flag')
|
||||
self.requires_class.broken()
|
||||
self.clear_flag.assert_has_calls([
|
||||
mock.call('some-relation.available'),
|
||||
mock.call('some-relation.connected'),
|
||||
mock.call('some-endpoint.available'),
|
||||
mock.call('some-endpoint.connected'),
|
||||
])
|
||||
|
||||
def test_request_key(self):
|
||||
self.patch_object(requires, 'socket')
|
||||
self.socket.gethostname.return_value = 'somehostname'
|
||||
to_publish = self.patch_topublish()
|
||||
self.requires_class.request_key()
|
||||
to_publish.__setitem__.assert_called_with('unique_id', 'somehostname')
|
||||
self.requires_class.request_key(unique_id='unicorn')
|
||||
to_publish.__setitem__.assert_called_with('unique_id', 'unicorn')
|
||||
to_publish.__setitem__.assert_called_with('unique_id', 'some-hostname')
|
||||
|
||||
def test_mon_hosts(self):
|
||||
self.patch_requires_class('_relations')
|
||||
relation = mock.MagicMock()
|
||||
unit_incomplete = mock.MagicMock()
|
||||
unit_incomplete.received = {}
|
||||
unit_invalid = mock.MagicMock()
|
||||
unit_invalid.received = {'ceph-public-address': None}
|
||||
unitv6 = mock.MagicMock()
|
||||
unitv6.received = {'ceph-public-address': '2001:db8:42::1'}
|
||||
unitv4 = mock.MagicMock()
|
||||
unitv4.received = {'ceph-public-address': '192.0.2.1'}
|
||||
relation.units.__iter__.return_value = [unitv6, unitv4]
|
||||
relation.units.__iter__.return_value = [unit_incomplete, unitv6,
|
||||
unit_invalid, unitv4]
|
||||
self._relations.__iter__.return_value = [relation]
|
||||
self.assertEqual(list(self.requires_class.mon_hosts),
|
||||
['[2001:db8:42::1]:6789', '192.0.2.1:6789'])
|
||||
|
|
Loading…
Reference in New Issue