From 96fa17720b7fb16eadd5531c88d20005a0425d31 Mon Sep 17 00:00:00 2001 From: Edward Hope-Morley Date: Tue, 30 May 2017 15:13:26 +0100 Subject: [PATCH] Expose performance tuning options via charm Adds connection-backlog and erl-vm-io-thread-multiplier config options which are required for tuning when deploying in environments with large numbers of clients and server hosts with high numbers of cores respectively. Also refactors rabbitmq-env.conf rendering code so that it is properly applied by the charm. Change-Id: I8596eb9a0419d4e64782bfaf9d8c0f67e5de96b1 Closes-Bug: 1693561 --- config.yaml | 20 ++++ hooks/rabbit_utils.py | 58 +----------- hooks/rabbitmq_context.py | 141 ++++++++++++++++++++++++++-- hooks/rabbitmq_server_relations.py | 12 +-- templates/rabbitmq-env.conf | 9 ++ templates/rabbitmq.config | 35 +++---- tests/basic_deployment.py | 3 +- unit_tests/test_rabbit_utils.py | 21 +++++ unit_tests/test_rabbitmq_context.py | 131 +++++++++++++++++++++++++- 9 files changed, 339 insertions(+), 91 deletions(-) create mode 100644 templates/rabbitmq-env.conf diff --git a/config.yaml b/config.yaml index ebc6d458..97190a47 100644 --- a/config.yaml +++ b/config.yaml @@ -89,6 +89,26 @@ options: - ['/', 'queue1', 10, 20] - ['/', 'queue2', 200, 300] Wildcards '*' are accepted to monitor all vhosts and/or queues + connection-backlog: + type: int + default: + description: | + Overrides the size of the connection backlog maintained by the server. + Environments with large numbers of clients will want to set this value + higher than the default (default value varies with rabbtimq version, see + https://www.rabbitmq.com/networking.html for more info). + erl-vm-io-thread-multiplier: + type: int + default: + description: | + Multiplier used to calculate the number of threads used in the erl vm + worker thread pool using the number of CPU cores extant in + the host system. The upstream docs recommend that this multiplier be + > 12 per core - we use 24 as default so that we end up with roughly the + same as current rabbitmq package defaults and that is what is used + internally to the charm if no value is set here. Also, if this value is + left unset and this application is running inside a container, the number + of threads will be capped based on a maximum of 2 cores. # SSL configuration ssl: type: string diff --git a/hooks/rabbit_utils.py b/hooks/rabbit_utils.py index a94985db..4de33014 100644 --- a/hooks/rabbit_utils.py +++ b/hooks/rabbit_utils.py @@ -26,6 +26,7 @@ from collections import OrderedDict from rabbitmq_context import ( RabbitMQSSLContext, RabbitMQClusterContext, + RabbitMQEnvContext, ) from charmhelpers.core.templating import render @@ -120,7 +121,9 @@ CONFIG_FILES = OrderedDict([ 'services': ['rabbitmq-server'] }), (ENV_CONF, { - 'hook_contexts': None, + 'hook_contexts': [ + RabbitMQEnvContext(), + ], 'services': ['rabbitmq-server'] }), (ENABLED_PLUGINS, { @@ -513,53 +516,6 @@ def leave_cluster(): raise -def update_rmq_env_conf(hostname=None, ipv6=False): - """Update or append environment config. - - rabbitmq.conf.d is not present on all releases, so use or create - rabbitmq-env.conf instead. - """ - - keyvals = {} - if ipv6: - keyvals['RABBITMQ_SERVER_START_ARGS'] = "'-proto_dist inet6_tcp'" - - if hostname: - keyvals['RABBITMQ_NODENAME'] = hostname - - out = [] - keys_found = [] - if os.path.exists(ENV_CONF): - for line in open(ENV_CONF).readlines(): - for key, val in keyvals.items(): - if line.strip().startswith(key): - keys_found.append(key) - line = '%s=%s' % (key, val) - - out.append(line) - - for key, val in keyvals.items(): - log('Updating %s, %s=%s' % (ENV_CONF, key, val)) - if key not in keys_found: - out.append('%s=%s' % (key, val)) - - with open(ENV_CONF, 'wb') as conf: - conf.write('\n'.join(out)) - # Ensure newline at EOF - conf.write('\n') - - -def get_node_name(): - if not os.path.exists(ENV_CONF): - return None - env_conf = open(ENV_CONF, 'r').readlines() - node_name = None - for l in env_conf: - if l.startswith('RABBITMQ_NODENAME'): - node_name = l.split('=')[1].strip() - return node_name - - def _manage_plugin(plugin, action): os.environ['HOME'] = '/root' _rabbitmq_plugins = \ @@ -674,12 +630,6 @@ def get_rabbit_password(username, password=None, local=False): return _password -def bind_ipv6_interface(): - out = "RABBITMQ_SERVER_START_ARGS='-proto_dist inet6_tcp'\n" - with open(ENV_CONF, 'wb') as conf: - conf.write(out) - - def update_hosts_file(map): """Rabbitmq does not currently like ipv6 addresses so we need to use dns names instead. In order to make them resolvable we ensure they are in diff --git a/hooks/rabbitmq_context.py b/hooks/rabbitmq_context.py index 9b8d1021..858e9c97 100644 --- a/hooks/rabbitmq_context.py +++ b/hooks/rabbitmq_context.py @@ -12,28 +12,56 @@ # See the License for the specific language governing permissions and # limitations under the License. -from charmhelpers.contrib.ssl.service import ServiceCA +import base64 +import grp +import os +import pwd +import re +import sys +import ssl_utils + +from charmhelpers.contrib.ssl.service import ServiceCA +from charmhelpers.core.host import is_container +from charmhelpers.fetch import apt_install from charmhelpers.core.hookenv import ( open_port, close_port, config, log, + service_name, + relation_ids, + DEBUG, + WARNING, ERROR, ) -import sys -import pwd -import grp -import os -import base64 +# python-six in ensured by charmhelpers import so we put this here. +import six +try: + import psutil +except ImportError: + if six.PY2: + apt_install('python-psutil', fatal=True) + else: + apt_install('python3-psutil', fatal=True) + import psutil -import ssl_utils ssl_key_file = "/etc/rabbitmq/rabbit-server-privkey.pem" ssl_cert_file = "/etc/rabbitmq/rabbit-server-cert.pem" ssl_ca_file = "/etc/rabbitmq/rabbit-server-ca.pem" RABBITMQ_CTL = '/usr/sbin/rabbitmqctl' +ENV_CONF = '/etc/rabbitmq/rabbitmq-env.conf' + +# Rabbimq docs recommend min. 12 threads per core (see LP: #1693561) +# NOTE(hopem): these defaults give us roughly the same as the default shipped +# with the version of rabbitmq in ubuntu xenial (3.5.7) - see +# https://tinyurl.com/rabbitmq-3-5-7 for exact value. Note that +# this default has increased with newer versions so we should +# track this and keep the charm up-to-date. +DEFAULT_MULTIPLIER = 24 +MAX_DEFAULT_THREADS = DEFAULT_MULTIPLIER * 2 def convert_from_base64(v): @@ -136,6 +164,99 @@ class RabbitMQSSLContext(object): class RabbitMQClusterContext(object): def __call__(self): - return { - 'cluster_partition_handling': config('cluster-partition-handling'), - } + ctxt = {'cluster_partition_handling': + config('cluster-partition-handling')} + + if config('connection-backlog'): + ctxt['connection_backlog'] = config('connection-backlog') + + return ctxt + + +class RabbitMQEnvContext(object): + + def calculate_threads(self): + """ + Determine the number of erl vm threads in pool based in cpu resources + available. + + Number of threads will be limited to MAX_DEFAULT_WORKERS in + container environments where no worker-multipler configuration + option been set. + + @returns int: number of io threads to allocate + """ + + try: + num_cpus = psutil.cpu_count() + except AttributeError: + num_cpus = psutil.NUM_CPUS + + multiplier = (config('erl-vm-io-thread-multiplier') or + DEFAULT_MULTIPLIER) + + log("Calculating erl vm io thread pool size based on num_cpus={} and " + "multiplier={}".format(num_cpus, multiplier), DEBUG) + + count = int(num_cpus * multiplier) + if multiplier > 0 and count == 0: + count = 1 + + if config('erl-vm-io-thread-multiplier') is None and is_container(): + # NOTE(hopem): Limit unconfigured erl-vm-io-thread-multiplier + # to MAX_DEFAULT_THREADS to avoid insane pool + # configuration in LXD containers on large servers. + count = min(count, MAX_DEFAULT_THREADS) + + log("erl vm io thread pool size = {} (capped={})" + .format(count, is_container()), DEBUG) + + return count + + def __call__(self): + """Write rabbitmq-env.conf according to charm config. + + We never overwrite RABBITMQ_NODENAME to ensure that we don't break + clustered rabbitmq. + """ + blacklist = ['RABBITMQ_NODENAME'] + + context = {'settings': {}} + key = 'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS' + context['settings'][key] = "'+A {}'".format(self.calculate_threads()) + + if config('prefer-ipv6'): + key = 'RABBITMQ_SERVER_START_ARGS' + context['settings'][key] = "'-proto_dist inet6_tcp'" + + # TODO: this is legacy HA and should be removed since it is now + # deprecated. + if relation_ids('ha'): + if not config('ha-vip-only'): + # TODO: do we need to remove this setting if it already exists + # and the above is false? + context['settings']['RABBITMQ_NODENAME'] = \ + '{}@localhost'.format(service_name()) + + if os.path.exists(ENV_CONF): + for line in open(ENV_CONF).readlines(): + if re.search('^\s*#', line) or not line.strip('\n'): + # ignore commented or blank lines + continue + + _line = line.partition("=") + key = _line[0].strip() + val = _line[2].strip() + + if _line[1] != "=": + log("Unable to parse line '{}' from {}".format(line, + ENV_CONF), + WARNING) + continue + + if key in blacklist: + # Keep original + log("Leaving {} setting untouched".format(key), DEBUG) + context['settings'][key] = val + + return context diff --git a/hooks/rabbitmq_server_relations.py b/hooks/rabbitmq_server_relations.py index ce456c74..22e6a923 100755 --- a/hooks/rabbitmq_server_relations.py +++ b/hooks/rabbitmq_server_relations.py @@ -361,6 +361,8 @@ def update_cookie(leaders_cookie=None): @hooks.hook('ha-relation-joined') +@rabbit.restart_on_change({rabbit.ENV_CONF: + rabbit.restart_map()[rabbit.ENV_CONF]}) def ha_joined(): corosync_bindiface = config('ha-bindiface') corosync_mcastport = config('ha-mcastport') @@ -385,14 +387,8 @@ def ha_joined(): log('ha_joined: No ceph relation yet, deferring.') return - name = '%s@localhost' % SERVICE_NAME - if rabbit.get_node_name() != name and vip_only is False: - log('Stopping rabbitmq-server.') - service_stop('rabbitmq-server') - rabbit.update_rmq_env_conf(hostname='%s@localhost' % SERVICE_NAME, - ipv6=config('prefer-ipv6')) - else: - log('Node name already set to %s.' % name) + ctxt = {rabbit.ENV_CONF: rabbit.CONFIG_FILES[rabbit.ENV_CONF]} + rabbit.ConfigRenderer(ctxt).write(rabbit.ENV_CONF) relation_settings = {} relation_settings['corosync_bindiface'] = corosync_bindiface diff --git a/templates/rabbitmq-env.conf b/templates/rabbitmq-env.conf new file mode 100644 index 00000000..6595f7b9 --- /dev/null +++ b/templates/rabbitmq-env.conf @@ -0,0 +1,9 @@ +############################################################################### +# [ WARNING ] +# Configuration file maintained by Juju. Local changes may be overwritten. +############################################################################### +{% if settings -%} +{%- for key, value in settings.iteritems() %} +{{key}}={{value}} +{%- endfor %} +{% endif %} diff --git a/templates/rabbitmq.config b/templates/rabbitmq.config index 72d99914..0d747810 100644 --- a/templates/rabbitmq.config +++ b/templates/rabbitmq.config @@ -1,36 +1,39 @@ [ {rabbit, [ {collect_statistics_interval, 30000}, -{% if ssl_only %} +{%- if ssl_only %} {tcp_listeners, []}, {% else %} {tcp_listeners, [5672]}, -{% endif %} -{% if ssl_port %} +{%- endif %} +{%- if connection_backlog %} + {tcp_listen_options, [{backlog, {{connection_backlog}}}]}, +{%- endif %} +{%- if ssl_port %} {ssl_listeners, [{{ ssl_port }}]}, -{% endif %} -{% if ssl_mode == "on" or ssl_mode == "only" %} +{%- endif %} +{%- if ssl_mode == "on" or ssl_mode == "only" %} {ssl_options, [ {verify, verify_peer}, -{% if ssl_client %} +{%- if ssl_client %} {fail_if_no_peer_cert, true}, {% else %} {fail_if_no_peer_cert, false}, -{% endif %} - {% if ssl_ca_file %} +{%- endif %} +{%- if ssl_ca_file %} {cacertfile, "{{ ssl_ca_file }}"}, - {% endif %} - {% if ssl_cert_file %} +{%- endif %} +{%- if ssl_cert_file %} {certfile, "{{ ssl_cert_file }}"}, - {% endif %} - {% if ssl_key_file %} +{%- endif %} +{%- if ssl_key_file %} {keyfile, "{{ ssl_key_file }}"} - {% endif %} +{%- endif %} ]}, -{% endif %} +{%- endif %} - {% if cluster_partition_handling %} +{%- if cluster_partition_handling %} {cluster_partition_handling, {{ cluster_partition_handling }}} - {% endif %} +{%- endif %} ]} ]. diff --git a/tests/basic_deployment.py b/tests/basic_deployment.py index 57e95563..a94c9c60 100644 --- a/tests/basic_deployment.py +++ b/tests/basic_deployment.py @@ -86,7 +86,8 @@ class RmqBasicDeployment(OpenStackAmuletDeployment): """ this_service = { 'name': 'rabbitmq-server', - 'units': 3 + 'units': 3, + 'constraints': {'cpu-cores': 2}, } other_services = [ {'name': 'cinder'}, diff --git a/unit_tests/test_rabbit_utils.py b/unit_tests/test_rabbit_utils.py index db4f6b19..44d596c8 100644 --- a/unit_tests/test_rabbit_utils.py +++ b/unit_tests/test_rabbit_utils.py @@ -689,3 +689,24 @@ class UtilsTests(CharmTestCase): rabbit_utils.check_cluster_memberships() mock_forget_cluster_node.assert_called_with( 'rabbit@juju-devel3-machine-42') + + @mock.patch('rabbitmq_context.psutil.NUM_CPUS', 2) + @mock.patch('rabbitmq_context.relation_ids') + @mock.patch('rabbitmq_context.config') + def test_render_rabbitmq_env(self, mock_config, mock_relation_ids): + mock_relation_ids.return_value = [] + mock_config.return_value = 3 + with mock.patch('rabbit_utils.render') as mock_render: + ctxt = {rabbit_utils.ENV_CONF: + rabbit_utils.CONFIG_FILES[rabbit_utils.ENV_CONF]} + rabbit_utils.ConfigRenderer(ctxt).write( + rabbit_utils.ENV_CONF) + + ctxt = {'settings': {'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS': + "'+A 6'", + 'RABBITMQ_SERVER_START_ARGS': + "'-proto_dist inet6_tcp'"}} + mock_render.assert_called_with('rabbitmq-env.conf', + '/etc/rabbitmq/rabbitmq-env.conf', + ctxt, + perms=420) diff --git a/unit_tests/test_rabbitmq_context.py b/unit_tests/test_rabbitmq_context.py index dbc18d9d..2aecf286 100644 --- a/unit_tests/test_rabbitmq_context.py +++ b/unit_tests/test_rabbitmq_context.py @@ -16,6 +16,7 @@ import rabbitmq_context import mock import unittest +import tempfile class TestRabbitMQSSLContext(unittest.TestCase): @@ -89,7 +90,133 @@ class TestRabbitMQClusterContext(unittest.TestCase): self.assertEqual( rabbitmq_context.RabbitMQClusterContext().__call__(), { - 'cluster_partition_handling': "ignore" + 'cluster_partition_handling': "ignore", + 'connection_backlog': "ignore" }) - config.assert_called_once_with("cluster-partition-handling") + config.assert_has_calls([mock.call("cluster-partition-handling"), + mock.call("connection-backlog")]) + + +class TestRabbitMQEnvContext(unittest.TestCase): + + @mock.patch.object(rabbitmq_context.psutil, 'NUM_CPUS', 2) + @mock.patch.object(rabbitmq_context, 'relation_ids', lambda *args: []) + @mock.patch.object(rabbitmq_context, 'service_name') + @mock.patch.object(rabbitmq_context, 'config') + def test_rabbitmqenv(self, mock_config, mock_service_name): + config = {} + + def fake_config(key): + return config.get(key) + + mock_service_name.return_value = 'svc_foo' + mock_config.side_effect = fake_config + + with tempfile.NamedTemporaryFile() as tmpfile: + with mock.patch('rabbitmq_context.ENV_CONF', tmpfile.name): + config['prefer-ipv6'] = True + config['erl-vm-io-thread-multiplier'] = 36 + ctxt = rabbitmq_context.RabbitMQEnvContext()() + self.assertEqual(ctxt['settings'], + {'RABBITMQ_SERVER_START_ARGS': + "'-proto_dist inet6_tcp'", + 'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS': + "'+A 72'"}) + + @mock.patch.object(rabbitmq_context.psutil, 'NUM_CPUS', 2) + @mock.patch.object(rabbitmq_context, 'relation_ids') + @mock.patch.object(rabbitmq_context, 'service_name') + @mock.patch.object(rabbitmq_context, 'config') + def test_rabbitmqenv_legacy_ha_support(self, mock_config, + mock_service_name, + mock_relation_ids): + config = {} + + def fake_config(key): + return config.get(key) + + def fake_relation_ids(key): + if 'ha': + return ['ha:1'] + + mock_relation_ids.side_effect = fake_relation_ids + mock_service_name.return_value = 'svc_foo' + mock_config.side_effect = fake_config + + with tempfile.NamedTemporaryFile() as tmpfile: + with mock.patch('rabbitmq_context.ENV_CONF', tmpfile.name): + ctxt = rabbitmq_context.RabbitMQEnvContext()() + self.assertEqual(ctxt['settings'], + {'RABBITMQ_NODENAME': 'svc_foo@localhost', + 'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS': + "'+A 48'"}) + + mock_relation_ids.side_effect = lambda key: [] + with tempfile.NamedTemporaryFile() as tmpfile: + with mock.patch('rabbitmq_context.ENV_CONF', tmpfile.name): + ctxt = rabbitmq_context.RabbitMQEnvContext()() + self.assertEqual(ctxt['settings'], + {'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS': + "'+A 48'"}) + + @mock.patch.object(rabbitmq_context.psutil, 'NUM_CPUS', 2) + @mock.patch.object(rabbitmq_context, 'relation_ids') + @mock.patch.object(rabbitmq_context, 'service_name') + @mock.patch.object(rabbitmq_context, 'config') + def test_rabbitmqenv_existing_nodename(self, mock_config, + mock_service_name, + mock_relation_ids): + def fake_relation_ids(key): + if 'ha': + return ['ha:1'] + + mock_relation_ids.side_effect = fake_relation_ids + mock_service_name.return_value = 'svc_foo' + mock_config.return_value = None + + with tempfile.NamedTemporaryFile() as tmpfile: + with mock.patch('rabbitmq_context.ENV_CONF', tmpfile.name): + with open(tmpfile.name, 'w') as fd: + fd.write("RABBITMQ_NODENAME = blah@localhost") + + ctxt = rabbitmq_context.RabbitMQEnvContext()() + self.assertEqual(ctxt['settings'], + {'RABBITMQ_NODENAME': 'blah@localhost', + 'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS': + "'+A 48'"}) + + @mock.patch.object(rabbitmq_context, 'relation_ids', lambda *args: []) + @mock.patch.object(rabbitmq_context.psutil, 'NUM_CPUS', 128) + @mock.patch.object(rabbitmq_context, 'service_name') + @mock.patch.object(rabbitmq_context, 'config') + def test_rabbitmqenv_in_container(self, mock_config, mock_service_name): + mock_service_name.return_value = 'svc_foo' + + config = {} + + def fake_config(key): + return config.get(key) + + mock_config.side_effect = fake_config + + with mock.patch.object(rabbitmq_context, 'is_container') as \ + mock_is_ctnr: + mock_is_ctnr.return_value = True + ctxt = rabbitmq_context.RabbitMQEnvContext()() + self.assertEqual(ctxt['settings'], + {'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS': + "'+A 48'"}) + + config['erl-vm-io-thread-multiplier'] = 24 + ctxt = rabbitmq_context.RabbitMQEnvContext()() + self.assertEqual(ctxt['settings'], + {'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS': + "'+A 3072'"}) + + del config['erl-vm-io-thread-multiplier'] + mock_is_ctnr.return_value = False + ctxt = rabbitmq_context.RabbitMQEnvContext()() + self.assertEqual(ctxt['settings'], + {'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS': + "'+A 3072'"})