Add group policy configuration

Allow configuration of a zone group default sync policy. This is useful
in scenarios where we want to have selective buckets sync. Valuable
especially with the new `cloud-sync` relation.

This is based on Ceph multisite sync policy:
https://docs.ceph.com/en/latest/radosgw/multisite-sync-policy/

Additionally, three more Juju actions are added to selectively enable,
disable, or reset buckets sync:
* `enable-buckets-sync`
* `disable-buckets-sync`
* `reset-buckets-sync`

These new actions are meant to be used in conjunction with a default
zone group sync policy that allows syncing, but it's disabled by default.

Change-Id: I4a8076192269aaeaca50668ebcebc0a52c6d2c84
func-test-pr: https://github.com/openstack-charmers/zaza-openstack-tests/pull/1193
Signed-off-by: Ionut Balutoiu <ibalutoiu@cloudbasesolutions.com>
This commit is contained in:
Ionut Balutoiu 2023-10-19 13:16:04 +03:00
parent 940be7fdfc
commit dda2b0b83f
19 changed files with 1198 additions and 3 deletions

View File

@ -243,6 +243,9 @@ not deployed then see file `actions.yaml`.
* `readwrite`
* `resume`
* `tidydefaults`
* `enable-buckets-sync`
* `disable-buckets-sync`
* `reset-buckets-sync`
# Documentation

View File

@ -19,3 +19,29 @@ force-enable-multisite:
zonegroup:
type: string
description: Existing Zonegroup to be reconfigured as the 'zonegroup' config value.
enable-buckets-sync:
description: |
Enable buckets sync in the multi-site replication. This is meant to be
used only when the default zonegroup sync policy is not enabled, but it is
allowed.
params:
buckets:
type: string
description: Comma-separated list of buckets' names to enable syncing.
disable-buckets-sync:
description: |
Forbid buckets sync in the multi-site replication. This is useful when you
want to disable syncing for some buckets, but you want to sync all the
other buckets.
params:
buckets:
type: string
description: Comma-separated list of buckets' names to disable syncing.
reset-buckets-sync:
description: |
Reset buckets sync policy. After this is executed, the buckets will be
synced according to the default zone group sync policy.
params:
buckets:
type: string
description: Comma-separated list of buckets' names to reset sync policy.

View File

@ -49,6 +49,8 @@ from charmhelpers.core.host import (
service_restart,
)
DEFAULT_SYNC_POLICY_ID = 'default'
def pause(args):
"""Pause the Ceilometer services.
@ -227,6 +229,137 @@ def force_enable_multisite(args):
action_fail(message + " : {}".format(cpe.output))
def is_multisite_sync_policy_action_allowed():
if not is_leader():
action_fail("This action can only be executed on leader unit.")
return False
realm = config('realm')
zone = config('zone')
zonegroup = config('zonegroup')
if not all((realm, zonegroup, zone)):
action_fail("Missing required charm configurations realm({}), "
"zonegroup({}) and zone({}).".format(
realm, zonegroup, zone
))
return False
if not multisite.is_multisite_configured(zone=zone, zonegroup=zonegroup):
action_fail("Multisite is not configured")
return False
zonegroup_info = multisite.get_zonegroup_info(zonegroup)
if zonegroup_info is None:
action_fail("Failed to fetch zonegroup ({}) info".format(zonegroup))
return False
zone_info = multisite.get_zone_info(zone)
if zone_info is None:
action_fail("Failed to fetch zone ({}) info".format(zone))
return False
if zonegroup_info['master_zone'] != zone_info['id']:
action_fail('This action can only be executed on primary RGW '
'application units.')
return False
return True
def update_buckets_sync_policy(buckets, sync_policy_state):
zone = config('zone')
zonegroup = config('zonegroup')
existing_buckets = multisite.list_buckets(zonegroup=zonegroup, zone=zone)
messages = []
for bucket in buckets:
if bucket in existing_buckets:
multisite.create_sync_group(
bucket=bucket,
group_id=DEFAULT_SYNC_POLICY_ID,
status=sync_policy_state)
multisite.create_sync_group_pipe(
bucket=bucket,
group_id=DEFAULT_SYNC_POLICY_ID,
pipe_id=DEFAULT_SYNC_POLICY_ID,
source_zones=['*'],
dest_zones=['*'])
message = 'Updated "{}" bucket sync policy to "{}"'.format(
bucket, sync_policy_state)
else:
message = ('Bucket "{}" does not exist in the zonegroup "{}" and '
'zone "{}"'.format(bucket, zonegroup, zone))
log(message)
messages.append(message)
action_set(
values={
'message': '\n'.join(messages)
}
)
def reset_buckets_sync_policy(buckets):
zone = config('zone')
zonegroup = config('zonegroup')
existing_buckets = multisite.list_buckets(zonegroup=zonegroup, zone=zone)
messages = []
for bucket in buckets:
if bucket in existing_buckets:
multisite.remove_sync_group(
bucket=bucket,
group_id=DEFAULT_SYNC_POLICY_ID)
message = 'Reset "{}" bucket sync policy'.format(bucket)
else:
message = ('Bucket "{}" does not exist in the zonegroup "{}" and '
'zone "{}"'.format(bucket, zonegroup, zone))
log(message)
messages.append(message)
action_set(
values={
'message': '\n'.join(messages)
}
)
def enable_buckets_sync(args):
if not is_multisite_sync_policy_action_allowed():
return
try:
update_buckets_sync_policy(
buckets=action_get('buckets').split(','),
sync_policy_state=multisite.SYNC_POLICY_ENABLED,
)
except subprocess.CalledProcessError as cpe:
message = "Failed to enable sync for the given buckets"
log(message, level=ERROR)
action_fail(message + " : {}".format(cpe.output))
def disable_buckets_sync(args):
if not is_multisite_sync_policy_action_allowed():
return
try:
update_buckets_sync_policy(
buckets=action_get('buckets').split(','),
sync_policy_state=multisite.SYNC_POLICY_FORBIDDEN,
)
except subprocess.CalledProcessError as cpe:
message = "Failed to disable sync for the given buckets"
log(message, level=ERROR)
action_fail(message + " : {}".format(cpe.output))
def reset_buckets_sync(args):
if not is_multisite_sync_policy_action_allowed():
return
try:
reset_buckets_sync_policy(buckets=action_get('buckets').split(','))
except subprocess.CalledProcessError as cpe:
message = "Failed to reset sync for the given buckets"
log(message, level=ERROR)
action_fail(message + " : {}".format(cpe.output))
# A dictionary of all the defined actions to callables (which take
# parsed arguments).
ACTIONS = {
@ -237,6 +370,9 @@ ACTIONS = {
"readwrite": readwrite,
"tidydefaults": tidydefaults,
"force-enable-multisite": force_enable_multisite,
"enable-buckets-sync": enable_buckets_sync,
"disable-buckets-sync": disable_buckets_sync,
"reset-buckets-sync": reset_buckets_sync,
}

View File

@ -0,0 +1 @@
actions.py

1
actions/enable-buckets-sync Symbolic link
View File

@ -0,0 +1 @@
actions.py

1
actions/reset-buckets-sync Symbolic link
View File

@ -0,0 +1 @@
actions.py

View File

@ -429,6 +429,34 @@ options:
description: |
Name of RADOS Gateway Zone to create for multi-site replication. This
option must be specific to the local site e.g. us-west or us-east.
sync-policy-state:
type: string
default:
description: |
This setting is used by the primary ceph-radosgw in multi-site
replication.
By default, all the buckets are synced from a primary RGW zone to the
secondary zone. This config option allows us to have selective buckets
sync. If this is set, it will be used as the default policy state for
all the buckets in the zonegroup.
Valid values are:
* enabled - sync is allowed and enabled
* allowed - sync is allowed
* forbidden - sync is not allowed
sync-policy-flow-type:
type: string
default: symmetrical
description: |
This setting is used by the secondary ceph-radosgw in multi-site
replication, and it's effective only when 'sync-policy-state' config is
set on the primary ceph-radosgw.
Valid values are:
* directional - data is only synced in one direction, from primary to
secondary.
* symmetrical - data is synced in both directions.
namespace-tenants:
type: boolean
default: False

View File

@ -43,6 +43,7 @@ from charmhelpers.core.hookenv import (
relation_set,
log,
DEBUG,
WARNING,
Hooks, UnregisteredHookError,
status_set,
is_leader,
@ -134,6 +135,7 @@ APACHE_PACKAGES = [
]
MULTISITE_SYSTEM_USER = 'multisite-sync'
MULTISITE_DEFAULT_SYNC_GROUP_ID = 'default'
def upgrade_available():
@ -845,6 +847,123 @@ def primary_relation_joined(relation_id=None):
secret=secret)
@hooks.hook('primary-relation-changed')
def primary_relation_changed(relation_id=None, unit=None):
if not is_leader():
log('Cannot setup multisite configuration, this unit is not the '
'leader')
return
if not ready_for_service(legacy=False):
log('unit not ready, deferring multisite configuration')
return
sync_policy_state = config('sync-policy-state')
if not sync_policy_state:
log("The config sync-policy-state is not set. Skipping zone group "
"default sync policy configuration")
return
secondary_data = relation_get(rid=relation_id, unit=unit)
if not all((secondary_data.get('zone'),
secondary_data.get('sync_policy_flow_type'))):
log("Defer processing until secondary RGW has provided required data")
return
zonegroup = config('zonegroup')
primary_zone = config('zone')
secondary_zone = secondary_data['zone']
sync_flow_type = secondary_data['sync_policy_flow_type']
if (secondary_data.get('zone_tier_type') == 'cloud' and
sync_flow_type != multisite.SYNC_FLOW_DIRECTIONAL):
log("The secondary zone is set with cloud tier type. Ignoring "
"configured {} sync policy flow, and using {}.".format(
sync_flow_type,
multisite.SYNC_FLOW_DIRECTIONAL),
level=WARNING)
sync_flow_type = multisite.SYNC_FLOW_DIRECTIONAL
mutation = False
flow_id = '{}-{}'.format(primary_zone, secondary_zone)
pipe_id = '{}-{}'.format(primary_zone, secondary_zone)
if multisite.sync_group_exists(MULTISITE_DEFAULT_SYNC_GROUP_ID):
# check if sync group changed
group = multisite.get_sync_group(MULTISITE_DEFAULT_SYNC_GROUP_ID)
if group.get('status') != sync_policy_state:
log('Default sync policy state changed to {}'.format(
sync_policy_state))
mutation = True
# check if data flow needs to be created or updated
symmetrical_flows = [
flow['id']
for flow in group['data_flow'].get('symmetrical', [])
]
directional_flows = [
"{}-{}".format(flow['source_zone'], flow['dest_zone'])
for flow in group['data_flow'].get('directional', [])
]
data_flows = symmetrical_flows + directional_flows
if flow_id not in data_flows:
log('Data flow {} not found, creating now'.format(flow_id))
else:
if (sync_flow_type == multisite.SYNC_FLOW_SYMMETRICAL and
flow_id in directional_flows):
multisite.remove_sync_group_flow(
group_id=MULTISITE_DEFAULT_SYNC_GROUP_ID, flow_id=flow_id,
flow_type=multisite.SYNC_FLOW_DIRECTIONAL,
source_zone=primary_zone, dest_zone=secondary_zone)
mutation = True
if (sync_flow_type == multisite.SYNC_FLOW_DIRECTIONAL and
flow_id in symmetrical_flows):
multisite.remove_sync_group_flow(
group_id=MULTISITE_DEFAULT_SYNC_GROUP_ID, flow_id=flow_id,
flow_type=multisite.SYNC_FLOW_SYMMETRICAL,
source_zone=primary_zone, dest_zone=secondary_zone)
mutation = True
if mutation:
log('Data flow {} type changed to {}'.format(flow_id,
sync_flow_type))
# check if pipe needs to be created
pipes = [pipe['id'] for pipe in group['pipes']]
if pipe_id not in pipes:
log('Pipe {} not found, creating now'.format(pipe_id))
mutation = True
else:
log('Default sync policy not configured, configuring it now')
mutation = True
if mutation:
multisite.create_sync_group(
group_id=MULTISITE_DEFAULT_SYNC_GROUP_ID,
status=sync_policy_state)
multisite.create_sync_group_flow(
group_id=MULTISITE_DEFAULT_SYNC_GROUP_ID,
flow_id=flow_id,
flow_type=sync_flow_type,
source_zone=primary_zone,
dest_zone=secondary_zone)
source_zones = [primary_zone, secondary_zone]
dest_zones = [primary_zone, secondary_zone]
if sync_flow_type == multisite.SYNC_FLOW_DIRECTIONAL:
source_zones = [primary_zone]
dest_zones = [secondary_zone]
multisite.create_sync_group_pipe(
group_id=MULTISITE_DEFAULT_SYNC_GROUP_ID,
pipe_id=pipe_id,
source_zones=source_zones,
dest_zones=dest_zones)
log(
'Mutation detected. Restarting {}.'.format(service_name()),
'INFO')
multisite.update_period(zonegroup=zonegroup, zone=primary_zone)
CONFIGS.write_all()
service_restart(service_name())
leader_set(restart_nonce=str(uuid.uuid4()))
else:
log('No mutation detected.', 'INFO')
@hooks.hook('primary-relation-departed')
@hooks.hook('secondary-relation-departed')
def multisite_relation_departed():
@ -979,6 +1098,10 @@ def secondary_relation_changed(relation_id=None, unit=None):
else:
log('No mutation detected.', 'INFO')
relation_set(relation_id=relation_id,
zone=zone,
sync_policy_flow_type=config('sync-policy-flow-type'))
@hooks.hook('master-relation-departed')
@hooks.hook('slave-relation-departed')
@ -1016,6 +1139,8 @@ def leader_settings_changed():
# Primary/Secondary relation
for r_id in relation_ids('primary'):
primary_relation_joined(r_id)
for unit in related_units(r_id):
primary_relation_changed(r_id, unit)
for r_id in relation_ids('radosgw-user'):
radosgw_user_changed(r_id)
@ -1031,6 +1156,8 @@ def process_multisite_relations():
# Primary/Secondary relation
for r_id in relation_ids('primary'):
primary_relation_joined(r_id)
for unit in related_units(r_id):
primary_relation_changed(r_id, unit)
for r_id in relation_ids('secondary'):
for unit in related_units(r_id):
secondary_relation_changed(r_id, unit)

View File

@ -24,6 +24,31 @@ import charmhelpers.core.decorators as decorators
RGW_ADMIN = 'radosgw-admin'
SYNC_POLICY_ENABLED = 'enabled'
SYNC_POLICY_ALLOWED = 'allowed'
SYNC_POLICY_FORBIDDEN = 'forbidden'
SYNC_POLICY_STATES = [
SYNC_POLICY_ENABLED,
SYNC_POLICY_ALLOWED,
SYNC_POLICY_FORBIDDEN
]
SYNC_FLOW_DIRECTIONAL = 'directional'
SYNC_FLOW_SYMMETRICAL = 'symmetrical'
SYNC_FLOW_TYPES = [
SYNC_FLOW_DIRECTIONAL,
SYNC_FLOW_SYMMETRICAL,
]
class UnknownSyncPolicyState(Exception):
"""Raised when an unknown sync policy state is encountered"""
pass
class UnknownSyncFlowType(Exception):
"""Raised when an unknown sync flow type is encountered"""
pass
@decorators.retry_on_exception(num_retries=10, base_delay=5,
exc_type=subprocess.CalledProcessError)
@ -370,6 +395,28 @@ def modify_zone(name, endpoints=None, default=False, master=False,
return None
def get_zone_info(name, zonegroup=None):
"""Fetch detailed info for the provided zone
:param name: zone name
:type name: str
:param zonegroup: parent zonegroup name
:type zonegroup: str
:rtype: dict
"""
cmd = [
RGW_ADMIN, '--id={}'.format(_key_name()),
'zone', 'get',
'--rgw-zone={}'.format(name),
]
if zonegroup:
cmd.append('--rgw-zonegroup={}'.format(zonegroup))
try:
return json.loads(_check_output(cmd))
except TypeError:
return None
def remove_zone_from_zonegroup(zone, zonegroup):
"""Remove RADOS Gateway zone from provided parent zonegroup
@ -888,3 +935,245 @@ def check_cluster_has_buckets():
if check_zonegroup_has_buckets(zonegroup):
return True
return False
def list_sync_groups(bucket=None):
"""List sync policy groups.
:param bucket: Bucket name. If this this given, the bucket level group
policies are listed.
:type bucket: str
:return: List of sync policy groups.
:rtype: list
"""
cmd = [
RGW_ADMIN, '--id={}'.format(_key_name()),
'sync', 'group', 'get',
]
if bucket:
cmd.append('--bucket={}'.format(bucket))
try:
return json.loads(_check_output(cmd))
except TypeError:
return []
def sync_group_exists(group_id, bucket=None):
"""Check if the sync policy group exists.
:param group_id: Sync policy group id.
:type group_id: str
:param bucket: Bucket name. If this this given, the bucket level group
policy is checked.
:type bucket: str
:rtype: Boolean
"""
for group in list_sync_groups(bucket=bucket):
if group['key'] == group_id:
return True
return False
def get_sync_group(group_id, bucket=None):
"""Get the sync policy group configuration.
:param group_id: Sync policy group id.
:type group_id: str
:param bucket: Bucket name. If this this given, the bucket level group
policy is returned.
:type bucket: str
:return: Sync policy group configuration.
:rtype: dict
"""
cmd = [
RGW_ADMIN, '--id={}'.format(_key_name()),
'sync', 'group', 'get',
'--group-id={}'.format(group_id),
]
if bucket:
cmd.append('--bucket={}'.format(bucket))
try:
return json.loads(_check_output(cmd))
except TypeError:
return None
def create_sync_group(group_id, status, bucket=None):
"""Create a sync policy group.
:param group_id: ID of the sync policy group to be created.
:type group_id: str
:param status: Status of the sync policy group to be created. Must be one
of the following: 'enabled', 'allowed', 'forbidden'.
:type status: str
:param bucket: Bucket name. If this this given, the bucket level group
policy is created.
:type bucket: str
:raises UnknownSyncPolicyState: if the provided status is not one of the
allowed values.
:return: Sync policy group configuration.
:rtype: dict
"""
if status not in SYNC_POLICY_STATES:
raise UnknownSyncPolicyState(
'Unknown sync policy state: {}'.format(status))
cmd = [
RGW_ADMIN, '--id={}'.format(_key_name()),
'sync', 'group', 'create',
'--group-id={}'.format(group_id),
'--status={}'.format(status),
]
if bucket:
cmd.append('--bucket={}'.format(bucket))
try:
return json.loads(_check_output(cmd))
except TypeError:
return None
def remove_sync_group(group_id, bucket=None):
"""Remove a sync group with the given group ID and optional bucket.
:param group_id: The ID of the sync group to remove.
:type group_id: str
:param bucket: Bucket name. If this this given, the bucket level group
policy is removed.
:type bucket: str
:return: The output of the command as a dict.
:rtype: dict
"""
cmd = [
RGW_ADMIN, '--id={}'.format(_key_name()),
'sync', 'group', 'remove',
'--group-id={}'.format(group_id),
]
if bucket:
cmd.append('--bucket={}'.format(bucket))
try:
return json.loads(_check_output(cmd))
except TypeError:
return None
def create_sync_group_flow(group_id, flow_id, flow_type, source_zone,
dest_zone):
"""Create a new sync group data flow with the given parameters.
:param group_id: The ID of the sync group to create the data flow for.
:type group_id: str
:param flow_id: The ID of the new data flow.
:type flow_id: str
:param flow_type: The type of the new data flow.
:type flow_type: str
:param source_zone: The source zone for the new data flow.
:type source_zone: str
:param dest_zone: The destination zone for the new data flow.
:type dest_zone: str
:raises UnknownSyncFlowType: If an unknown sync flow type is provided.
:return: Sync group data flow configuration.
:rtype: dict
"""
cmd = [
RGW_ADMIN, '--id={}'.format(_key_name()),
'sync', 'group', 'flow', 'create',
'--group-id={}'.format(group_id),
'--flow-id={}'.format(flow_id),
'--flow-type={}'.format(flow_type),
]
if flow_type == SYNC_FLOW_SYMMETRICAL:
cmd.append('--zones={},{}'.format(source_zone, dest_zone))
elif flow_type == SYNC_FLOW_DIRECTIONAL:
cmd.append('--source-zone={}'.format(source_zone))
cmd.append('--dest-zone={}'.format(dest_zone))
else:
raise UnknownSyncFlowType(
'Unknown sync flow type {}'.format(flow_type))
try:
return json.loads(_check_output(cmd))
except TypeError:
return None
def remove_sync_group_flow(group_id, flow_id, flow_type, source_zone=None,
dest_zone=None):
"""Remove a sync group data flow.
:param group_id: The ID of the sync group.
:type group_id: str
:param flow_id: The ID of the flow to remove.
:type flow_id: str
:param flow_type: The type of the flow to remove.
:type flow_type: str
:param source_zone: The source zone of the flow to remove (only for
directional flows).
:type source_zone: str
:param dest_zone: The destination zone of the flow to remove (only for
directional flows).
:type dest_zone: str
:return: The output of the command as a dict.
:rtype: dict
"""
cmd = [
RGW_ADMIN, '--id={}'.format(_key_name()),
'sync', 'group', 'flow', 'remove',
'--group-id={}'.format(group_id),
'--flow-id={}'.format(flow_id),
'--flow-type={}'.format(flow_type),
]
if flow_type == SYNC_FLOW_DIRECTIONAL:
cmd.append('--source-zone={}'.format(source_zone))
cmd.append('--dest-zone={}'.format(dest_zone))
try:
return json.loads(_check_output(cmd))
except TypeError:
return None
def create_sync_group_pipe(group_id, pipe_id, source_zones, dest_zones,
source_bucket='*', dest_bucket='*', bucket=None):
"""Create a sync group pipe between source and destination zones.
:param group_id: The ID of the sync group.
:type group_id: str
:param pipe_id: The ID of the sync group pipe.
:type pipe_id: str
:param source_zones: A list of source zones.
:type source_zones: list
:param dest_zones: A list of destination zones.
:type dest_zones: list
:param source_bucket: The source bucket name. Default is '*'.
:type source_bucket: str
:param dest_bucket: The destination bucket name. Default is '*'.
:type dest_bucket: str
:param bucket: The bucket name. If specified, the sync group pipe will be
created for this bucket only.
:type bucket: str
:return: Sync group pipe configuration.
:rtype: dict
"""
cmd = [
RGW_ADMIN, '--id={}'.format(_key_name()),
'sync', 'group', 'pipe', 'create',
'--group-id={}'.format(group_id),
'--pipe-id={}'.format(pipe_id),
'--source-zones={}'.format(','.join(source_zones)),
'--source-bucket={}'.format(source_bucket),
'--dest-zones={}'.format(','.join(dest_zones)),
'--dest-bucket={}'.format(dest_bucket),
]
if bucket:
cmd.append('--bucket={}'.format(bucket))
try:
return json.loads(_check_output(cmd))
except TypeError:
return None

View File

@ -0,0 +1 @@
hooks.py

View File

@ -82,6 +82,7 @@ class MultisiteActionsTestCase(CharmTestCase):
TO_PATCH = [
'action_fail',
'action_get',
'action_set',
'multisite',
'config',
@ -89,6 +90,7 @@ class MultisiteActionsTestCase(CharmTestCase):
'leader_set',
'service_name',
'service_restart',
'log',
]
def setUp(self):
@ -154,3 +156,176 @@ class MultisiteActionsTestCase(CharmTestCase):
self.test_config.set('zone', None)
actions.tidydefaults([])
self.action_fail.assert_called_once()
def test_enable_buckets_sync(self):
self.multisite.is_multisite_configured.return_value = True
self.multisite.get_zonegroup_info.return_value = {
'master_zone': 'test-zone-id',
}
self.multisite.get_zone_info.return_value = {
'id': 'test-zone-id',
}
self.is_leader.return_value = True
self.action_get.return_value = 'testbucket1,testbucket2,non-existent'
self.test_config.set('zone', 'testzone')
self.test_config.set('zonegroup', 'testzonegroup')
self.test_config.set('realm', 'testrealm')
self.multisite.list_buckets.return_value = ['testbucket1',
'testbucket2']
actions.enable_buckets_sync([])
self.multisite.is_multisite_configured.assert_called_once()
self.multisite.get_zonegroup_info.assert_called_once_with(
'testzonegroup',
)
self.multisite.get_zone_info.assert_called_once_with(
'testzone',
)
self.action_get.assert_called_once_with('buckets')
self.multisite.list_buckets.assert_called_once_with(
zonegroup='testzonegroup', zone='testzone',
)
self.assertEqual(self.multisite.create_sync_group.call_count, 2)
self.multisite.create_sync_group.assert_has_calls([
mock.call(bucket='testbucket1',
group_id='default',
status=self.multisite.SYNC_POLICY_ENABLED),
mock.call(bucket='testbucket2',
group_id='default',
status=self.multisite.SYNC_POLICY_ENABLED),
])
self.assertEqual(self.multisite.create_sync_group_pipe.call_count, 2)
self.multisite.create_sync_group_pipe.assert_has_calls([
mock.call(bucket='testbucket1',
group_id='default',
pipe_id='default',
source_zones=['*'],
dest_zones=['*']),
mock.call(bucket='testbucket2',
group_id='default',
pipe_id='default',
source_zones=['*'],
dest_zones=['*']),
])
expected_messages = [
'Updated "testbucket1" bucket sync policy to "{}"'.format(
self.multisite.SYNC_POLICY_ENABLED),
'Updated "testbucket2" bucket sync policy to "{}"'.format(
self.multisite.SYNC_POLICY_ENABLED),
('Bucket "non-existent" does not exist in the zonegroup '
'"testzonegroup" and zone "testzone"'),
]
self.assertEqual(self.log.call_count, 3)
self.log.assert_has_calls([
mock.call(expected_messages[0]),
mock.call(expected_messages[1]),
mock.call(expected_messages[2]),
])
self.action_set.assert_called_once_with(
values={
'message': '\n'.join(expected_messages),
})
def test_disable_buckets_sync(self):
self.multisite.is_multisite_configured.return_value = True
self.multisite.get_zonegroup_info.return_value = {
'master_zone': 'test-zone-id',
}
self.multisite.get_zone_info.return_value = {
'id': 'test-zone-id',
}
self.is_leader.return_value = True
self.action_get.return_value = 'testbucket1,non-existent'
self.test_config.set('zone', 'testzone')
self.test_config.set('zonegroup', 'testzonegroup')
self.test_config.set('realm', 'testrealm')
self.multisite.list_buckets.return_value = ['testbucket1']
actions.disable_buckets_sync([])
self.multisite.is_multisite_configured.assert_called_once()
self.multisite.get_zonegroup_info.assert_called_once_with(
'testzonegroup',
)
self.multisite.get_zone_info.assert_called_once_with(
'testzone',
)
self.action_get.assert_called_once_with('buckets')
self.multisite.list_buckets.assert_called_once_with(
zonegroup='testzonegroup', zone='testzone',
)
self.multisite.create_sync_group.assert_called_once_with(
bucket='testbucket1',
group_id='default',
status=self.multisite.SYNC_POLICY_FORBIDDEN,
)
self.multisite.create_sync_group_pipe.assert_called_once_with(
bucket='testbucket1',
group_id='default',
pipe_id='default',
source_zones=['*'],
dest_zones=['*'],
)
expected_messages = [
'Updated "testbucket1" bucket sync policy to "{}"'.format(
self.multisite.SYNC_POLICY_FORBIDDEN),
('Bucket "non-existent" does not exist in the zonegroup '
'"testzonegroup" and zone "testzone"'),
]
self.assertEqual(self.log.call_count, 2)
self.log.assert_has_calls([
mock.call(expected_messages[0]),
mock.call(expected_messages[1]),
])
self.action_set.assert_called_once_with(
values={
'message': '\n'.join(expected_messages),
})
def test_reset_buckets_sync(self):
self.multisite.is_multisite_configured.return_value = True
self.multisite.get_zonegroup_info.return_value = {
'master_zone': 'test-zone-id',
}
self.multisite.get_zone_info.return_value = {
'id': 'test-zone-id',
}
self.is_leader.return_value = True
self.action_get.return_value = 'testbucket1,non-existent'
self.test_config.set('zone', 'testzone')
self.test_config.set('zonegroup', 'testzonegroup')
self.test_config.set('realm', 'testrealm')
self.multisite.list_buckets.return_value = ['testbucket1']
actions.reset_buckets_sync([])
self.multisite.is_multisite_configured.assert_called_once()
self.multisite.get_zonegroup_info.assert_called_once_with(
'testzonegroup',
)
self.multisite.get_zone_info.assert_called_once_with(
'testzone',
)
self.action_get.assert_called_once_with('buckets')
self.multisite.list_buckets.assert_called_once_with(
zonegroup='testzonegroup', zone='testzone',
)
self.multisite.remove_sync_group.assert_called_once_with(
bucket='testbucket1',
group_id='default',
)
expected_messages = [
'Reset "testbucket1" bucket sync policy',
('Bucket "non-existent" does not exist in the zonegroup '
'"testzonegroup" and zone "testzone"'),
]
self.assertEqual(self.log.call_count, 2)
self.log.assert_has_calls([
mock.call(expected_messages[0]),
mock.call(expected_messages[1]),
])
self.action_set.assert_called_once_with(
values={
'message': '\n'.join(expected_messages),
})

View File

@ -287,6 +287,7 @@ class MonContextTest(CharmTestCase):
self.assertEqual(expect, mon_ctxt())
self.assertTrue(mock_ensure_rsv_v6.called)
@patch.object(context, 'format_ipv6_addr', lambda *_: None)
@patch('ceph_radosgw_context.https')
@patch('charmhelpers.contrib.hahelpers.cluster.relation_ids')
@patch('charmhelpers.contrib.hahelpers.cluster.config_get')

View File

@ -305,11 +305,11 @@ class CephRadosGWUtilTests(CharmTestCase):
def test_listen_port(self):
self.https.return_value = False
self.assertEquals(80, utils.listen_port())
self.assertEqual(80, utils.listen_port())
self.https.return_value = True
self.assertEquals(443, utils.listen_port())
self.assertEqual(443, utils.listen_port())
self.test_config.set('port', 42)
self.assertEquals(42, utils.listen_port())
self.assertEqual(42, utils.listen_port())
def test_set_s3_app(self):
self.leader_get.return_value = None

View File

@ -13,6 +13,7 @@
# limitations under the License.
import base64
import json
import os
from unittest.mock import (
patch, call, MagicMock, ANY
)
@ -338,6 +339,8 @@ class CephRadosGWTests(CharmTestCase):
@patch.object(ceph_hooks, 'leader_get')
@patch('charmhelpers.contrib.openstack.ip.service_name',
lambda *args: 'ceph-radosgw')
@patch('charmhelpers.contrib.openstack.ip.resolve_address',
lambda *args: 'myserv')
@patch('charmhelpers.contrib.openstack.ip.config')
def test_identity_joined_early_version(self, _config, _leader_get):
self.cmp_pkgrevno.return_value = -1
@ -687,6 +690,7 @@ class MiscMultisiteTests(CharmTestCase):
'leader_get',
'is_leader',
'primary_relation_joined',
'primary_relation_changed',
'secondary_relation_changed',
'service_restart',
'service_name',
@ -724,6 +728,12 @@ class MiscMultisiteTests(CharmTestCase):
def test_process_multisite_relations(self):
ceph_hooks.process_multisite_relations()
self.primary_relation_joined.assert_called_once_with('primary:1')
self.assertEqual(self.primary_relation_changed.call_count, 2)
self.primary_relation_changed.assert_has_calls([
call('primary:1', 'rgw/0'),
call('primary:1', 'rgw/1'),
])
self.assertEqual(self.secondary_relation_changed.call_count, 2)
self.secondary_relation_changed.assert_has_calls([
call('secondary:1', 'rgw-s/0'),
call('secondary:1', 'rgw-s/1'),
@ -889,6 +899,85 @@ class PrimaryMultisiteTests(CephRadosMultisiteTests):
)
self.multisite.list_realms.assert_not_called()
def test_primary_relation_changed_sync_policy_state_unset(self):
self.is_leader.return_value = True
ceph_hooks.primary_relation_changed('primary:1')
self.is_leader.assert_called_once()
self.ready_for_service.assert_called_once_with(legacy=False)
self.config.assert_called_once_with('sync-policy-state')
def test_primary_relation_changed_sync_rel_data_incomplete(self):
self.is_leader.return_value = True
self.test_config.set('sync-policy-state', 'allowed')
self.relation_get.return_value = {'zone': 'secondary'}
ceph_hooks.primary_relation_changed('primary:1', 'rgw/0')
self.is_leader.assert_called_once()
self.ready_for_service.assert_called_once_with(legacy=False)
self.config.assert_called_once_with('sync-policy-state')
self.relation_get.assert_called_once_with(rid='primary:1',
unit='rgw/0')
def test_primary_relation_changed(self):
self.is_leader.return_value = True
configs = {
'sync-policy-state': 'allowed',
'zonegroup': 'testzonegroup',
'zone': 'zone_a',
}
for k, v in configs.items():
self.test_config.set(k, v)
self.relation_get.return_value = {
'zone': 'zone_b',
'sync_policy_flow_type': 'symmetrical',
# this should force flow type to directional, and ignore the value
# from the relation data.
'zone_tier_type': 'cloud',
}
self.multisite.sync_group_exists.return_value = True
group_test_data_file = os.path.join(
os.path.dirname(__file__), 'testdata', 'test_get_sync_group.json')
with open(group_test_data_file, 'r') as f:
self.multisite.get_sync_group.return_value = json.loads(f.read())
ceph_hooks.primary_relation_changed('primary:1', 'rgw/0')
self.is_leader.assert_called_once()
self.ready_for_service.assert_called_once_with(legacy=False)
self.config.assert_has_calls([
call('sync-policy-state'),
call('zonegroup'),
call('zone'),
])
self.relation_get.assert_called_once_with(rid='primary:1',
unit='rgw/0')
self.multisite.sync_group_exists.assert_called_once_with(
ceph_hooks.MULTISITE_DEFAULT_SYNC_GROUP_ID)
self.multisite.remove_sync_group_flow.assert_called_once_with(
group_id=ceph_hooks.MULTISITE_DEFAULT_SYNC_GROUP_ID,
flow_id='zone_a-zone_b',
flow_type=self.multisite.SYNC_FLOW_SYMMETRICAL,
source_zone='zone_a', dest_zone='zone_b')
self.multisite.create_sync_group.assert_called_once_with(
group_id=ceph_hooks.MULTISITE_DEFAULT_SYNC_GROUP_ID,
status='allowed')
self.multisite.create_sync_group_flow.assert_called_once_with(
group_id=ceph_hooks.MULTISITE_DEFAULT_SYNC_GROUP_ID,
flow_id='zone_a-zone_b',
flow_type=self.multisite.SYNC_FLOW_DIRECTIONAL,
source_zone='zone_a', dest_zone='zone_b')
self.multisite.create_sync_group_pipe.assert_called_once_with(
group_id=ceph_hooks.MULTISITE_DEFAULT_SYNC_GROUP_ID,
pipe_id='zone_a-zone_b',
source_zones=['zone_a'], dest_zones=['zone_b'])
self.multisite.update_period.assert_called_once_with(
zonegroup='testzonegroup', zone='zone_a')
self.service_restart.assert_called_once_with('rgw@hostname')
self.leader_set.assert_called_once_with(restart_nonce=ANY)
@patch.object(json, 'loads')
def test_multisite_relation_departed(self, json_loads):
for k, v in self._complete_config.items():
@ -916,6 +1005,7 @@ class SecondaryMultisiteTests(CephRadosMultisiteTests):
'realm': 'testrealm',
'zonegroup': 'testzonegroup',
'zone': 'testzone2',
'sync-policy-flow-type': 'symmetrical',
}
_test_relation = {
@ -978,6 +1068,11 @@ class SecondaryMultisiteTests(CephRadosMultisiteTests):
])
self.service_restart.assert_called_once()
self.leader_set.assert_called_once_with(restart_nonce=ANY)
self.relation_set.assert_called_once_with(
relation_id='secondary:1',
zone='testzone2',
sync_policy_flow_type='symmetrical',
)
def test_secondary_relation_changed_incomplete_relation(self):
for k, v in self._complete_config.items():
@ -986,6 +1081,7 @@ class SecondaryMultisiteTests(CephRadosMultisiteTests):
self.relation_get.return_value = {}
ceph_hooks.secondary_relation_changed('secondary:1', 'rgw/0')
self.config.assert_not_called()
self.relation_set.assert_not_called()
def test_secondary_relation_changed_mismatching_config(self):
for k, v in self._complete_config.items():
@ -999,11 +1095,13 @@ class SecondaryMultisiteTests(CephRadosMultisiteTests):
call('zone'),
])
self.multisite.list_realms.assert_not_called()
self.relation_set.assert_not_called()
def test_secondary_relation_changed_not_leader(self):
self.is_leader.return_value = False
ceph_hooks.secondary_relation_changed('secondary:1', 'rgw/0')
self.relation_get.assert_not_called()
self.relation_set.assert_not_called()
@patch.object(ceph_hooks, 'apt_install')
@patch.object(ceph_hooks, 'services')

View File

@ -484,3 +484,181 @@ class TestMultisiteHelpers(CharmTestCase):
multisite.check_cluster_has_buckets(),
True
)
def test_get_zone_info(self):
multisite.get_zone_info('test_zone', 'test_zonegroup')
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'zone', 'get',
'--rgw-zone=test_zone', '--rgw-zonegroup=test_zonegroup',
])
def test_sync_group_exists(self):
groups = [
{'key': 'group1'},
{'key': 'group2'},
]
self.subprocess.check_output.return_value = json.dumps(groups).encode()
self.assertTrue(multisite.sync_group_exists('group1'))
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'get',
])
def test_bucket_sync_group_exists(self):
with open(self._testdata('test_list_sync_groups'), 'rb') as f:
self.subprocess.check_output.return_value = f.read()
self.assertTrue(multisite.sync_group_exists('default',
bucket='test'))
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'get',
'--bucket=test',
])
def test_sync_group_does_not_exists(self):
with open(self._testdata('test_list_sync_groups'), 'rb') as f:
self.subprocess.check_output.return_value = f.read()
self.assertFalse(multisite.sync_group_exists('group-non-existent'))
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'get',
])
def test_get_sync_group(self):
with open(self._testdata(whoami()), 'rb') as f:
self.subprocess.check_output.return_value = f.read()
result = multisite.get_sync_group('default')
self.assertEqual(result['id'], 'default')
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'get',
'--group-id=default',
])
def test_create_sync_group(self):
test_group_json = json.dumps({"id": "default"}).encode()
self.subprocess.check_output.return_value = test_group_json
result = multisite.create_sync_group(
group_id='default',
status=multisite.SYNC_POLICY_ENABLED,
)
self.assertEqual(result['id'], 'default')
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'create',
'--group-id=default',
'--status={}'.format(multisite.SYNC_POLICY_ENABLED),
])
def test_create_sync_group_wrong_status(self):
self.assertRaises(
multisite.UnknownSyncPolicyState,
multisite.create_sync_group, "default", "wrong_status",
)
def test_remove_sync_group(self):
multisite.remove_sync_group('default')
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'remove',
'--group-id=default',
])
def test_create_sync_group_flow_symmetrical(self):
with open(self._testdata('test_create_sync_group_flow'), 'rb') as f:
self.subprocess.check_output.return_value = f.read()
result = multisite.create_sync_group_flow(
group_id='default',
flow_id='flow_id',
flow_type=multisite.SYNC_FLOW_SYMMETRICAL,
source_zone='zone_a',
dest_zone='zone_b',
)
self.assertEqual(result['groups'][0]['id'], 'default')
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'flow', 'create',
'--group-id=default',
'--flow-id=flow_id',
'--flow-type=symmetrical',
'--zones=zone_a,zone_b',
])
def test_create_sync_group_flow_directional(self):
with open(self._testdata('test_create_sync_group_flow'), 'rb') as f:
self.subprocess.check_output.return_value = f.read()
result = multisite.create_sync_group_flow(
group_id='default',
flow_id='flow_id',
flow_type=multisite.SYNC_FLOW_DIRECTIONAL,
source_zone='zone_a',
dest_zone='zone_b',
)
self.assertEqual(result['groups'][0]['id'], 'default')
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'flow', 'create',
'--group-id=default',
'--flow-id=flow_id',
'--flow-type=directional',
'--source-zone=zone_a', '--dest-zone=zone_b',
])
def test_create_sync_group_flow_wrong_type(self):
self.assertRaises(
multisite.UnknownSyncFlowType,
multisite.create_sync_group_flow,
group_id='default', flow_id='flow_id', flow_type='wrong_type',
source_zone='zone_a', dest_zone='zone_b',
)
def test_remove_sync_group_flow_symmetrical(self):
multisite.remove_sync_group_flow(
group_id='default',
flow_id='flow_id',
flow_type=multisite.SYNC_FLOW_SYMMETRICAL,
)
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'flow', 'remove',
'--group-id=default',
'--flow-id=flow_id',
'--flow-type=symmetrical',
])
def test_remove_sync_group_flow_directional(self):
multisite.remove_sync_group_flow(
group_id='default',
flow_id='flow_id',
flow_type=multisite.SYNC_FLOW_DIRECTIONAL,
source_zone='zone_a',
dest_zone='zone_b',
)
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'flow', 'remove',
'--group-id=default',
'--flow-id=flow_id',
'--flow-type=directional',
'--source-zone=zone_a', '--dest-zone=zone_b',
])
def test_create_sync_group_pipe(self):
with open(self._testdata(whoami()), 'rb') as f:
self.subprocess.check_output.return_value = f.read()
result = multisite.create_sync_group_pipe(
group_id='default',
pipe_id='pipe_id',
source_zones=['zone_a', 'zone_b'],
dest_zones=['zone_c', 'zone_d'],
)
self.assertEqual(result['groups'][0]['id'], 'default')
self.subprocess.check_output.assert_called_with([
'radosgw-admin', '--id=rgw.testhost',
'sync', 'group', 'pipe', 'create',
'--group-id=default',
'--pipe-id=pipe_id',
'--source-zones=zone_a,zone_b', '--source-bucket=*',
'--dest-zones=zone_c,zone_d', '--dest-bucket=*',
])

View File

@ -0,0 +1,20 @@
{
"groups": [
{
"id": "default",
"data_flow": {
"symmetrical": [
{
"id": "zone_a-zone_b",
"zones": [
"zone_a",
"zone_b"
]
}
]
},
"pipes": [],
"status": "allowed"
}
]
}

View File

@ -0,0 +1,49 @@
{
"groups": [
{
"id": "default",
"data_flow": {
"symmetrical": [
{
"id": "zone_a-zone_b",
"zones": [
"zone_a",
"zone_b"
]
}
]
},
"pipes": [
{
"id": "zone_a-zone_b",
"source": {
"bucket": "*",
"zones": [
"zone_a",
"zone_b"
]
},
"dest": {
"bucket": "*",
"zones": [
"zone_a",
"zone_b"
]
},
"params": {
"source": {
"filter": {
"tags": []
}
},
"dest": {},
"priority": 0,
"mode": "system",
"user": ""
}
}
],
"status": "allowed"
}
]
}

View File

@ -0,0 +1,16 @@
{
"id": "default",
"data_flow": {
"symmetrical": [
{
"id": "zone_a-zone_b",
"zones": [
"zone_a",
"zone_b"
]
}
]
},
"pipes": [],
"status": "allowed"
}

View File

@ -0,0 +1,45 @@
[
{
"key": "default",
"val": {
"id": "default",
"data_flow": {
"directional": [
{
"source_zone": "zone_a",
"dest_zone": "zone_b"
}
]
},
"pipes": [
{
"id": "zone_a-zone_b",
"source": {
"bucket": "*",
"zones": [
"zone_a"
]
},
"dest": {
"bucket": "*",
"zones": [
"zone_b"
]
},
"params": {
"source": {
"filter": {
"tags": []
}
},
"dest": {},
"priority": 0,
"mode": "system",
"user": ""
}
}
],
"status": "allowed"
}
}
]