Merge "Check before applying plugin and perms changes"

This commit is contained in:
Zuul 2022-02-08 21:53:14 +00:00 committed by Gerrit Code Review
commit f44cccc505
2 changed files with 701 additions and 40 deletions

View File

@ -369,6 +369,159 @@ def list_users():
json_processor=_json_processor)
def list_user_tags(user):
"""Get list of tags for user.
:param user: Name of user to get tags for
:type user: str
:returns: List of tags associated with user.
:rtype: [str]
:raises: NotImplementedError
"""
all_tags = query_rabbit(['list_users'])
users = [
tag_dict['tags']
for tag_dict in all_tags
if tag_dict['user'] == user]
# The datastructure returned by rabbitctl is a list of dicts in the
# form [{'user': 'testuser1', 'tags': []}, ...] so it is possible
# for there to be multiple dicts which apply to the same user. Handle
# this unlikely event by merging the lists.
result = sum(users, [])
result = sorted(list(set(result)))
return result
def list_user_permissions(user):
"""Get list of user permissions.
:param user: Name of user to get permissions for
:type user: str
:returns: List of dictionaries e.g. [{'vhost': 'vhost-name',
'configure': '.*',
'write': '.*',
'read': '.*'},...]
:rtype: List[Dict[str,str]]
:raises: NotImplementedError
"""
return query_rabbit(['list_user_permissions', user])
def list_user_vhost_permissions(user, vhost):
"""Get list of user permissions for a vhost.
:param user: Name of user to get permissions for
:type user: str
:param vhost: Name of vhost to get permissions for
:type vhost: str
:returns: List of dictionaries e.g. [{'configure': '.*',
'write': '.*',
'read': '.*'},...]
:rtype: List[Dict[str,str]]
:raises: NotImplementedError
"""
perms = list_user_permissions(user)
vhost_perms = [
{
'configure': tag_dict['configure'],
'write': tag_dict['write'],
'read': tag_dict['read']}
for tag_dict in perms
if tag_dict['vhost'] == vhost]
return vhost_perms
def list_plugins():
"""List plugins.
Return a list of dictionaries relating to each plugin.
:returns: List of dictionaries e.g.
[
{
'enabled': str,
'name': str,
'running': bool,
'running_version': int,
'version': [int, int, ...]},
...
]
:rtype: List[Dict[str, Union[str, bool, int, List[int]]]]
"""
def _json_processor(output):
return output['plugins']
return query_rabbit(
['list'],
json_processor=_json_processor,
binary=get_plugin_manager())
def list_policies(vhost):
"""List policies for a vhost.
Return a list of dictionaries relating to each policy.
[
{
'apply-to': str,
'definition': str,
'name': str,
'pattern': str,
'priority': int,
'vhost': str}
...]
:param vhost: Name of vhost to get policies for
:type vhost: str
:returns: List of dictionaries e.g.
:rtype: List[Dict[str, Union[str, int]]]
"""
return query_rabbit(['list_policies', '-p', vhost])
def get_vhost_policy(vhost, policy_name):
"""Get a policy with a given name on a given vhost.
Return policy:
{
'apply-to': str,
'definition': str,
'name': str,
'pattern': str,
'priority': int,
'vhost': str}
:param vhost: Name of vhost to get policies for
:type vhost: str
:param policy_name: Name of policy
:type policy_name: str
:returns: Policy dict.
:rtype: Union[Dict[str, Union[str, int]], None]
"""
policies = [
p for p in list_policies(vhost)
if p['name'] == policy_name]
if len(policies) > 0:
return policies[0]
else:
return None
def list_enabled_plugins():
"""Get list of enabled plugins.
:returns: List of enabled plugins
:rtype: [str]
:raises: NotImplementedError
"""
enabled_plugins = [
pdict['name']
for pdict in list_plugins()
if pdict['enabled'] in ['enabled', 'implicit']]
log("Enabled plugins: {}".format(enabled_plugins))
return sorted(list(set(enabled_plugins)))
def vhost_queue_info(vhost):
return list_vhost_queue_info(vhost)
@ -388,6 +541,36 @@ def user_exists(user):
return user in list_users()
def apply_tags(user, tags):
"""Apply tags to user if needed.
Apply tags to user. Any existing tags will be removed.
:param user: Name of user to apply tags to.
:type user: str
:param tags: Tags to apply to user.
:type tags: List[str]
:raises: NotImplementedError
"""
log('Adding tags [{}] to user {}'.format(
', '.join(tags),
user
))
try:
existing_user_tags = list_user_tags(user)
if existing_user_tags == sorted(list(set(tags))):
log("User {} already has tags {}".format(user, tags))
else:
log("Existing user tags for {} are {} which do not match {}. "
"Updating tags.".format(user, existing_user_tags, tags))
rabbitmqctl('set_user_tags', user, ' '.join(tags))
except NotImplementedError:
# Cannot cheeck existing tags so apply them to be on thee
# safe side.
log("Cannot retrieve existing tags for {}".format(user))
rabbitmqctl('set_user_tags', user, ' '.join(tags))
def create_user(user, password, tags=[]):
exists = user_exists(user)
@ -398,11 +581,7 @@ def create_user(user, password, tags=[]):
if 'administrator' in tags:
log('Granting admin access to {}'.format(user))
log('Adding tags [{}] to user {}'.format(
', '.join(tags),
user
))
rabbitmqctl('set_user_tags', user, ' '.join(tags))
apply_tags(user, tags)
def grant_permissions(user, vhost):
@ -416,15 +595,163 @@ def grant_permissions(user, vhost):
log(
"Granting permissions for user {} on vhost {}".format(user, vhost),
level='DEBUG')
log("Granting permissions", level='DEBUG')
rabbitmqctl('set_permissions', '-p',
vhost, user, '.*', '.*', '.*')
try:
vhost_perms = list_user_vhost_permissions(user, vhost)
if len(vhost_perms) > 1:
# This will almost certainly never happen but handle it just in
# case.
log('Multiple permissions found for the same user and vhost. '
'Resetting permission.')
apply_perms = True
elif len(vhost_perms) == 1:
perms = vhost_perms[0]
apply_perms = False
for attr in ['configure', 'write', 'read']:
if perms[attr] != '.*':
log(
'Permissions for user {} on vhost {} to {} are {} '
'not {}'.format(user, vhost, attr, perms[attr], '.*'))
apply_perms = True
else:
apply_perms = True
except NotImplementedError:
apply_perms = True
if apply_perms:
rabbitmqctl('set_permissions', '-p',
vhost, user, '.*', '.*', '.*')
else:
log('No permissions update needed for user {} on vhost {}'.format(
user, vhost))
def set_policy(vhost, policy_name, match, value):
log("setting policy", level='DEBUG')
rabbitmqctl('set_policy', '-p', vhost,
policy_name, match, value)
def compare_policy(vhost, policy_name, pattern, definition, priority=None,
apply_to=None):
"""Check a policy matches the supplied attributes.
:param vhost: Name of vhost to find policy in
:type vhost: str
:param policy_name: Name of policy
:type policy_name: str
:param pattern: The regular expression, which when matches on a given
resources causes the policy to apply.
:type pattern: str
:param definition: The definition of the policy, as a JSON term.
:type definition: str
:param priority: The priority of the policy as an integer.
:type priority: int
:param apply_to: Which types of object this policy should apply to.
Possible values are: queues exchanges all
:type apply_to: str
:returns: Whether they match
:rtype: bool
"""
policy = get_vhost_policy(vhost, policy_name)
# Convert unset options to defaults
# https://www.rabbitmq.com/rabbitmqctl.8.html#set_policy
if priority is None:
priority = 0
if apply_to is None:
apply_to = 'all'
if not policy:
log("Old policy not found.")
return False
if policy.get('pattern') != pattern:
log("Policy pattern does not match, {} != {}".format(
policy.get('pattern'),
pattern))
return False
if policy.get('priority') != int(priority):
log("Policy priority does not match, {} != {}".format(
policy.get('priority'),
priority))
return False
if policy.get('apply-to') != apply_to:
log("Policy apply_to does not match, {} != {}".format(
policy.get('apply-to'),
apply_to))
return False
existing_definition = json.loads(policy['definition'])
new_definition = json.loads(definition)
if existing_definition != new_definition:
log("Policy definition does not match, {} != {}".format(
policy.get('existing_definition'),
new_definition))
return False
log("Policies match")
return True
def set_policy(vhost, policy_name, pattern, definition, priority=None,
apply_to=None):
"""Apply a policy
:param vhost: Name of vhost to apply policy to
:type vhost: str
:param policy_name: Name of policy
:type policy_name: str
:param pattern: The regular expression, which when matches on a given
resources causes the policy to apply.
:type pattern: str
:param definition: The definition of the policy, as a JSON term.
:type definition: str
:param priority: The priority of the policy as an integer.
:type priority: int
:param apply_to: Which types of object this policy should apply to.
Possible values are: queues exchanges all
:type apply_to: str
"""
try:
if compare_policy(vhost, policy_name, pattern, definition, priority,
apply_to):
log("{} on vhost {} matched proposed policy, no update "
"needed.".format(policy_name, vhost))
policy_update_needed = False
else:
log("{} on vhost {} did not match proposed policy, update "
"needed.".format(policy_name, vhost))
policy_update_needed = True
except NotImplementedError:
log("Could not query exiting policies. Assuming update of {} on vhost "
"{} is needed.".format(policy_name, vhost))
policy_update_needed = True
if policy_update_needed:
log("{} on vhost {} required update.".format(policy_name, vhost))
log("setting policy", level='DEBUG')
cmd = ['set_policy', '-p', vhost]
if priority:
cmd.extend(['--priority', priority])
if apply_to:
cmd.extend(['--apply-to', apply_to])
cmd.extend([policy_name, pattern, definition])
rabbitmqctl(*cmd)
def clear_policy(vhost, policy_name):
"""Remove a policy
:param vhost: Name of vhost to remove policy from
:type vhost: str
:param policy_name: Name of policy
:type policy_name: str
"""
try:
policy = get_vhost_policy(vhost, policy_name)
if policy:
policy_update_needed = True
else:
log("{} on vhost {} not found, do not need to clear it.".format(
policy_name,
vhost))
policy_update_needed = False
except NotImplementedError:
log("Policy lookup failed for {} on vhost {} not.".format(
policy_name,
vhost))
policy_update_needed = True
if policy_update_needed:
rabbitmqctl('clear_policy', '-p', vhost, policy_name)
def set_ha_mode(vhost, mode, params=None, sync_mode='automatic'):
@ -488,7 +815,7 @@ def clear_ha_mode(vhost, name='HA', force=False):
log("Clearing '%s' policy from vhost '%s'" % (name, vhost), level='INFO')
try:
rabbitmqctl('clear_policy', '-p', vhost, name)
clear_policy(vhost, name)
except subprocess.CalledProcessError as ex:
if not force:
raise ex
@ -507,9 +834,9 @@ def set_all_mirroring_queues(enable):
return
if enable:
status_set('active', 'Enabling queue mirroring')
status_set('active', 'Checking queue mirroring is enabled')
else:
status_set('active', 'Disabling queue mirroring')
status_set('active', 'Checking queue mirroring is disabled')
for vhost in list_vhosts():
if enable:
@ -552,12 +879,13 @@ def configure_notification_ttl(vhost, ttl=3600000):
until a more general service discovery mechanism exists so that
notifications can be enabled/disabled on each individual service.
'''
rabbitmqctl('set_policy',
'TTL', '^(versioned_)?notifications.*',
'{{"message-ttl":{ttl}}}'.format(ttl=ttl),
'--priority', '1',
'--apply-to', 'queues',
'-p', vhost)
set_policy(
vhost,
'TTL',
'^(versioned_)?notifications.*',
'{{"message-ttl":{ttl}}}'.format(ttl=ttl),
priority='1',
apply_to='queues')
def configure_ttl(vhost, ttlname, ttlreg, ttl):
@ -569,13 +897,13 @@ def configure_ttl(vhost, ttlname, ttlreg, ttl):
ttlname, ttlreg, ttl), INFO)
if not all([ttlname, ttlreg, ttl]):
return
rabbitmqctl('set_policy',
'{ttlname}'.format(ttlname=ttlname),
'"{ttlreg}"'.format(ttlreg=ttlreg),
'{{"expires":{ttl}}}'.format(ttl=ttl),
'--priority', '1',
'--apply-to', 'queues',
'-p', vhost)
set_policy(
vhost,
'{ttlname}'.format(ttlname=ttlname),
'"{ttlreg}"'.format(ttlreg=ttlreg),
'{{"expires":{ttl}}}'.format(ttl=ttl),
priority='1',
apply_to='queues')
def rabbitmqctl_normalized_output(*args):
@ -782,11 +1110,33 @@ def _manage_plugin(plugin, action):
def enable_plugin(plugin):
_manage_plugin(plugin, 'enable')
try:
enabled_plugins = list_enabled_plugins()
if plugin in enabled_plugins:
log("Plugin {} already in list of enabled plugins {}. Do not "
"need to enable it.".format(plugin, enabled_plugins))
else:
log("Enabling plugin {}".format(plugin))
_manage_plugin(plugin, 'enable')
except (NotImplementedError, subprocess.CalledProcessError):
log("Cannot get list of plugins, running enable just in "
"case.")
_manage_plugin(plugin, 'enable')
def disable_plugin(plugin):
_manage_plugin(plugin, 'disable')
try:
enabled_plugins = list_enabled_plugins()
if plugin in enabled_plugins:
log("Disabling plugin {}".format(plugin))
_manage_plugin(plugin, 'disable')
else:
log("Plugin {} not in list of enabled plugins {}. Do not need to "
"disable it.".format(plugin, enabled_plugins))
except (NotImplementedError, subprocess.CalledProcessError):
log("Cannot get list of plugins, running disable just in "
"case.")
_manage_plugin(plugin, 'disable')
def get_managment_port():

View File

@ -151,6 +151,47 @@ RABBITMQCTL_LIST_VHOSTS_382 = (b'[{"name": "/"},{"name": "landscape"},'
b'{"name": "openstack"}]')
RABBITMQCTL_USER_TAGS = b"""[
{"user":"testuser1","tags":[]}
,{"user":"nagios-rabbitmq-server-0","tags":["monitoring"]}
,{"user":"cinder","tags":[""]}
,{"user":"nagios-rabbitmq-server-2","tags":["monitoring"]}
,{"user":"nagios-rabbitmq-server-1","tags":["monitoring"]}
,{"user":"guest","tags":["monitoring"]}
,{"user":"guest","tags":["administrator"]}]"""
RABBITMQCTL_LIST_PERMS = b"""
[
{"vhost":"nagios-rabbitmq-server-0","configure":".*","write":".*","read":".*"}
]"""
RABBITMQCTL_LIST_PLUGINS = b"""
{"format":"normal","plugins":[
{"enabled":"not_enabled","name":"rabbitmq_amqp1_0","running":false,
"running_version":null,"version":[51,46,56,46,50]},
{"enabled":"enabled","name":"rabbitmq_event_exchange","running":false,
"running_version":null,"version":[51,46,56,46,50]},
{"enabled":"implicit","name":"rabbitmq_auth_backend_cache","running":false,
"running_version":null,"version":[51,46,56,46,50]}],"status":"node_down"}
"""
RABBITMQCTL_LIST_POLICIES = b"""[
{"vhost":"nagios-rabbitmq-server-0",
"name":"HA","pattern":"^(?!amq\\\\.).*",
"apply-to":"all",
"definition":"{\\"ha-mode\\":\\"all\\",\\"ha-sync-mode\\":\\"automatic\\"}",
"priority":0}
]"""
TEST_HA_POLICY = {
'apply-to': 'all',
'definition': '{"ha-mode":"all","ha-sync-mode":"automatic"}',
'name': 'HA',
'pattern': '^(?!amq\\.).*',
'priority': 0,
'vhost': 'nagios-rabbitmq-server-0'}
class UtilsTests(CharmTestCase):
def setUp(self):
super(UtilsTests, self).setUp(rabbit_utils,
@ -1000,31 +1041,37 @@ class UtilsTests(CharmTestCase):
rabbit_utils.cluster_wait()
mock_distributed_wait.assert_called_with(modulo=10, wait=60)
@mock.patch.object(rabbit_utils, 'get_vhost_policy')
@mock.patch.object(rabbit_utils, 'rabbitmqctl')
def test_configure_notification_ttl(self, rabbitmqctl):
def test_configure_notification_ttl(self, mock_rabbitmqctl,
mock_get_vhost_policy):
mock_get_vhost_policy.return_value = []
rabbit_utils.configure_notification_ttl('test',
23000)
rabbitmqctl.assert_called_once_with(
mock_rabbitmqctl.assert_called_once_with(
'set_policy',
'TTL', '^(versioned_)?notifications.*',
'{"message-ttl":23000}',
'-p', 'test',
'--priority', '1',
'--apply-to', 'queues',
'-p', 'test'
'TTL', '^(versioned_)?notifications.*',
'{"message-ttl":23000}'
)
@mock.patch.object(rabbit_utils, 'get_vhost_policy')
@mock.patch.object(rabbit_utils, 'rabbitmqctl')
def test_configure_ttl(self, rabbitmqctl):
def test_configure_ttl(self, mock_rabbitmqctl,
mock_get_vhost_policy):
mock_get_vhost_policy.return_value = []
rabbit_utils.configure_ttl('test', ttlname='heat_expiry',
ttlreg='heat-engine-listener|engine_worker',
ttl=23000)
rabbitmqctl.assert_called_once_with(
mock_rabbitmqctl.assert_called_once_with(
'set_policy',
'heat_expiry', '"heat-engine-listener|engine_worker"',
'{"expires":23000}',
'-p', 'test',
'--priority', '1',
'--apply-to', 'queues',
'-p', 'test'
'heat_expiry', '"heat-engine-listener|engine_worker"',
'{"expires":23000}'
)
@mock.patch.object(rabbit_utils, 'leader_get')
@ -1417,6 +1464,7 @@ class UtilsTests(CharmTestCase):
# default config value should show 10 minutes
max_age = rabbit_utils.get_max_stats_file_age()
self.assertEqual(600, max_age)
# changing to run every 15 minutes shows 30 minutes
self.test_config.set('stats_cron_schedule', '*/15 * * * *')
max_age = rabbit_utils.get_max_stats_file_age()
@ -1497,3 +1545,266 @@ class UtilsTests(CharmTestCase):
'DISTRIB_CODENAME': 'xenial'}
self.test_config.set('management_plugin', True)
self.assertFalse(rabbit_utils.management_plugin_enabled())
@mock.patch.object(rabbit_utils, 'subprocess')
@mock.patch.object(rabbit_utils, 'rabbit_supports_json')
def test_list_user_tags(self, mock_rabbit_supports_json, mock_subprocess):
mock_rabbit_supports_json.return_value = False
with self.assertRaises(NotImplementedError):
rabbit_utils.list_user_tags('usera')
mock_rabbit_supports_json.return_value = True
mock_subprocess.check_output.return_value = RABBITMQCTL_USER_TAGS
self.assertEqual(
rabbit_utils.list_user_tags('guest'),
['administrator', 'monitoring'])
@mock.patch.object(rabbit_utils, 'subprocess')
@mock.patch.object(rabbit_utils, 'rabbit_supports_json')
def test_list_user_permissions(self, mock_rabbit_supports_json,
mock_subprocess):
mock_rabbit_supports_json.return_value = False
with self.assertRaises(NotImplementedError):
rabbit_utils.list_user_permissions('usera')
mock_rabbit_supports_json.return_value = True
mock_subprocess.check_output.return_value = RABBITMQCTL_LIST_PERMS
self.assertEqual(
rabbit_utils.list_user_permissions('nagios-rabbitmq-server-0'),
[
{
'configure': '.*',
'read': '.*',
'vhost': 'nagios-rabbitmq-server-0',
'write': '.*'}])
@mock.patch.object(rabbit_utils, 'list_user_permissions')
@mock.patch.object(rabbit_utils, 'rabbit_supports_json')
def test_list_user_vhost_permissions(self, mock_rabbit_supports_json,
mock_list_user_permissions):
mock_rabbit_supports_json.return_value = True
mock_list_user_permissions.return_value = [
{
'vhost': 'nagios-rabbitmq-server-0',
'configure': '.*',
'write': '.*',
'read': '.*'}]
self.assertEqual(
rabbit_utils.list_user_vhost_permissions(
'nagios-rabbitmq-server-0',
'nagios-rabbitmq-server-0'),
[
{
'configure': '.*',
'read': '.*',
'write': '.*'}])
@mock.patch.object(rabbit_utils, 'get_plugin_manager')
@mock.patch.object(rabbit_utils, 'subprocess')
@mock.patch.object(rabbit_utils, 'rabbit_supports_json')
def test_list_enabled_plugins(self, mock_rabbit_supports_json,
mock_subprocess,
mock_get_plugin_manager):
mock_rabbit_supports_json.return_value = False
with self.assertRaises(NotImplementedError):
rabbit_utils.list_enabled_plugins()
mock_rabbit_supports_json.return_value = True
mock_subprocess.check_output.return_value = RABBITMQCTL_LIST_PLUGINS
self.assertEqual(
rabbit_utils.list_enabled_plugins(),
['rabbitmq_auth_backend_cache', 'rabbitmq_event_exchange'])
@mock.patch.object(rabbit_utils, 'rabbitmqctl')
@mock.patch.object(rabbit_utils, 'list_user_tags')
def test_apply_tags(self, mock_list_user_tags, mock_rabbitmqctl):
mock_list_user_tags.return_value = ['admin']
rabbit_utils.apply_tags('user1', ['admin'])
self.assertFalse(mock_rabbitmqctl.called)
rabbit_utils.apply_tags('user1', ['monitor'])
mock_rabbitmqctl.assert_called_once_with(
'set_user_tags', 'user1', 'monitor')
mock_rabbitmqctl.reset_mock()
mock_list_user_tags.side_effect = NotImplementedError
rabbit_utils.apply_tags('user1', ['monitor'])
mock_rabbitmqctl.assert_called_once_with(
'set_user_tags', 'user1', 'monitor')
@mock.patch.object(rabbit_utils, 'rabbitmqctl')
@mock.patch.object(rabbit_utils, 'list_user_vhost_permissions')
def test_grant_permissions(self, mock_list_user_vhost_permissions,
mock_rabbitmqctl):
# Check perms are set if multiple sets of perms are returned
mock_list_user_vhost_permissions.return_value = [
{
'configure': '.*',
'read': '.*',
'write': '.*'},
{
'configure': '.*',
'read': '.*',
'write': '.*'}]
rabbit_utils.grant_permissions('user1', 'vhost1')
mock_rabbitmqctl.assert_called_once_with(
'set_permissions', '-p', 'vhost1', 'user1', '.*', '.*', '.*')
# Check perms are not set if existing perms match
mock_rabbitmqctl.reset_mock()
mock_list_user_vhost_permissions.return_value = [
{
'configure': '.*',
'read': '.*',
'write': '.*'}]
rabbit_utils.grant_permissions('user1', 'vhost1')
self.assertFalse(mock_rabbitmqctl.called)
# Check perms are set if existing perms do not match
mock_rabbitmqctl.reset_mock()
mock_list_user_vhost_permissions.return_value = [
{
'configure': '.*',
'read': '.*',
'write': 'r'}]
rabbit_utils.grant_permissions('user1', 'vhost1')
mock_list_user_vhost_permissions.return_value = [
{
'configure': '.*',
'read': '.*',
'write': '.*'}]
# Check perms are set if no existing perms are found
mock_rabbitmqctl.reset_mock()
mock_list_user_vhost_permissions.return_value = []
rabbit_utils.grant_permissions('user1', 'vhost1')
mock_list_user_vhost_permissions.return_value = [
{
'configure': '.*',
'read': '.*',
'write': '.*'}]
# Check perms are set if get_user_vhost_permissions throws
# NotImplementedError
mock_rabbitmqctl.reset_mock()
mock_list_user_vhost_permissions.side_effect = NotImplementedError
rabbit_utils.grant_permissions('user1', 'vhost1')
mock_list_user_vhost_permissions.return_value = [
{
'configure': '.*',
'read': '.*',
'write': '.*'}]
@mock.patch.object(rabbit_utils, 'glob')
def test_get_plugin_manager(self, mock_glob):
mock_glob.glob.return_value = [
'/sbin/rabbitmq-plugins',
'/bin/rabbitmq-plugins']
self.assertEqual(
rabbit_utils.get_plugin_manager(),
'/sbin/rabbitmq-plugins')
@mock.patch.object(rabbit_utils, 'list_enabled_plugins')
@mock.patch.object(rabbit_utils, '_manage_plugin')
def test_enable_plugin(self, mock_manage_plugin,
mock_list_enabled_plugins):
mock_list_enabled_plugins.return_value = ['unicorn']
rabbit_utils.enable_plugin('unicorn')
self.assertFalse(mock_manage_plugin.called)
rabbit_utils.enable_plugin('grapefruit')
mock_manage_plugin.assert_called_once_with('grapefruit', 'enable')
@mock.patch.object(rabbit_utils, 'list_enabled_plugins')
@mock.patch.object(rabbit_utils, '_manage_plugin')
def test_disable_plugin(self, mock_manage_plugin,
mock_list_enabled_plugins):
mock_list_enabled_plugins.return_value = ['unicorn']
rabbit_utils.disable_plugin('grapefruit')
self.assertFalse(mock_manage_plugin.called)
rabbit_utils.disable_plugin('unicorn')
mock_manage_plugin.assert_called_once_with('unicorn', 'disable')
@mock.patch.object(rabbit_utils, 'rabbit_supports_json')
@mock.patch.object(rabbit_utils.subprocess, 'check_output')
def test_list_policies(self, mock_check_output, mock_rabbit_supports_json):
mock_rabbit_supports_json.return_value = True
mock_check_output.return_value = RABBITMQCTL_LIST_POLICIES
self.assertEqual(
rabbit_utils.list_policies('nagios-rabbitmq-server-0'),
[TEST_HA_POLICY])
@mock.patch.object(rabbit_utils, 'list_policies')
def test_get_vhost_policy(self, mock_list_policies):
mock_list_policies.return_value = [TEST_HA_POLICY]
self.assertEqual(
rabbit_utils.get_vhost_policy('nagios-rabbitmq-server-0', 'HA'),
TEST_HA_POLICY)
self.assertIsNone(
rabbit_utils.get_vhost_policy('nagios-rabbitmq-server-0', 'FAKE'))
mock_list_policies.return_value = []
self.assertIsNone(
rabbit_utils.get_vhost_policy('nagios-rabbitmq-server-0', 'HA'))
@mock.patch.object(rabbit_utils, 'get_vhost_policy')
def test_compare_policy(self, mock_get_vhost_policy):
mock_get_vhost_policy.return_value = TEST_HA_POLICY
self.assertTrue(
rabbit_utils.compare_policy(
'nagios-rabbitmq-server-0',
'HA',
'^(?!amq\\.).*',
'{"ha-mode":"all","ha-sync-mode":"automatic"}',
priority=0,
apply_to='all'))
# Check defaults
self.assertTrue(
rabbit_utils.compare_policy(
'nagios-rabbitmq-server-0',
'HA',
'^(?!amq\\.).*',
'{"ha-mode":"all","ha-sync-mode":"automatic"}'))
self.assertFalse(
rabbit_utils.compare_policy(
'nagios-rabbitmq-server-0',
'HA',
'^(?!foo\\.).*',
'{"ha-mode":"all","ha-sync-mode":"automatic"}'))
@mock.patch.object(rabbit_utils, 'rabbitmqctl')
@mock.patch.object(rabbit_utils, 'get_vhost_policy')
def test_set_policy(self, mock_get_vhost_policy, mock_rabbitmqctl):
mock_get_vhost_policy.return_value = TEST_HA_POLICY
rabbit_utils.set_policy(
'nagios-rabbitmq-server-0',
'HA',
'^(?!amq\\.).*',
'{"ha-mode":"all","ha-sync-mode":"automatic"}',
priority=0,
apply_to='all')
# set_policy not called for existing policy.
self.assertFalse(mock_rabbitmqctl.called)
rabbit_utils.set_policy(
'nagios-rabbitmq-server-0',
'HA',
'^(?!foo\\.).*',
'{"ha-mode":"all","ha-sync-mode":"automatic"}',
priority=0,
apply_to='all')
mock_rabbitmqctl.assert_called_once_with(
'set_policy',
'-p', 'nagios-rabbitmq-server-0',
'--apply-to', 'all',
'HA',
'^(?!foo\\.).*',
'{"ha-mode":"all","ha-sync-mode":"automatic"}')
@mock.patch.object(rabbit_utils, 'rabbitmqctl')
@mock.patch.object(rabbit_utils, 'get_vhost_policy')
def test_clear_policy(self, mock_get_vhost_policy, mock_rabbitmqctl):
mock_get_vhost_policy.return_value = None
rabbit_utils.clear_policy('nagios-rabbitmq-server-0', 'HA')
self.assertFalse(mock_rabbitmqctl.called)
mock_get_vhost_policy.return_value = TEST_HA_POLICY
rabbit_utils.clear_policy('nagios-rabbitmq-server-0', 'HA')
mock_rabbitmqctl.assert_called_once_with(
'clear_policy',
'-p',
'nagios-rabbitmq-server-0',
'HA')