Expose performance tuning options via charm

Adds connection-backlog and erl-vm-io-thread-multiplier config
options which are required for tuning when deploying in
environments with large numbers of clients and server hosts
with high numbers of cores respectively. Also refactors
rabbitmq-env.conf rendering code so that it is properly
applied by the charm.

Change-Id: I8596eb9a0419d4e64782bfaf9d8c0f67e5de96b1
Closes-Bug: 1693561
This commit is contained in:
Edward Hope-Morley 2017-05-30 15:13:26 +01:00
parent 75a5d0f2f1
commit 96fa17720b
9 changed files with 339 additions and 91 deletions

View File

@ -89,6 +89,26 @@ options:
- ['/', 'queue1', 10, 20]
- ['/', 'queue2', 200, 300]
Wildcards '*' are accepted to monitor all vhosts and/or queues
connection-backlog:
type: int
default:
description: |
Overrides the size of the connection backlog maintained by the server.
Environments with large numbers of clients will want to set this value
higher than the default (default value varies with rabbtimq version, see
https://www.rabbitmq.com/networking.html for more info).
erl-vm-io-thread-multiplier:
type: int
default:
description: |
Multiplier used to calculate the number of threads used in the erl vm
worker thread pool using the number of CPU cores extant in
the host system. The upstream docs recommend that this multiplier be
> 12 per core - we use 24 as default so that we end up with roughly the
same as current rabbitmq package defaults and that is what is used
internally to the charm if no value is set here. Also, if this value is
left unset and this application is running inside a container, the number
of threads will be capped based on a maximum of 2 cores.
# SSL configuration
ssl:
type: string

View File

@ -26,6 +26,7 @@ from collections import OrderedDict
from rabbitmq_context import (
RabbitMQSSLContext,
RabbitMQClusterContext,
RabbitMQEnvContext,
)
from charmhelpers.core.templating import render
@ -120,7 +121,9 @@ CONFIG_FILES = OrderedDict([
'services': ['rabbitmq-server']
}),
(ENV_CONF, {
'hook_contexts': None,
'hook_contexts': [
RabbitMQEnvContext(),
],
'services': ['rabbitmq-server']
}),
(ENABLED_PLUGINS, {
@ -513,53 +516,6 @@ def leave_cluster():
raise
def update_rmq_env_conf(hostname=None, ipv6=False):
"""Update or append environment config.
rabbitmq.conf.d is not present on all releases, so use or create
rabbitmq-env.conf instead.
"""
keyvals = {}
if ipv6:
keyvals['RABBITMQ_SERVER_START_ARGS'] = "'-proto_dist inet6_tcp'"
if hostname:
keyvals['RABBITMQ_NODENAME'] = hostname
out = []
keys_found = []
if os.path.exists(ENV_CONF):
for line in open(ENV_CONF).readlines():
for key, val in keyvals.items():
if line.strip().startswith(key):
keys_found.append(key)
line = '%s=%s' % (key, val)
out.append(line)
for key, val in keyvals.items():
log('Updating %s, %s=%s' % (ENV_CONF, key, val))
if key not in keys_found:
out.append('%s=%s' % (key, val))
with open(ENV_CONF, 'wb') as conf:
conf.write('\n'.join(out))
# Ensure newline at EOF
conf.write('\n')
def get_node_name():
if not os.path.exists(ENV_CONF):
return None
env_conf = open(ENV_CONF, 'r').readlines()
node_name = None
for l in env_conf:
if l.startswith('RABBITMQ_NODENAME'):
node_name = l.split('=')[1].strip()
return node_name
def _manage_plugin(plugin, action):
os.environ['HOME'] = '/root'
_rabbitmq_plugins = \
@ -674,12 +630,6 @@ def get_rabbit_password(username, password=None, local=False):
return _password
def bind_ipv6_interface():
out = "RABBITMQ_SERVER_START_ARGS='-proto_dist inet6_tcp'\n"
with open(ENV_CONF, 'wb') as conf:
conf.write(out)
def update_hosts_file(map):
"""Rabbitmq does not currently like ipv6 addresses so we need to use dns
names instead. In order to make them resolvable we ensure they are in

View File

@ -12,28 +12,56 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from charmhelpers.contrib.ssl.service import ServiceCA
import base64
import grp
import os
import pwd
import re
import sys
import ssl_utils
from charmhelpers.contrib.ssl.service import ServiceCA
from charmhelpers.core.host import is_container
from charmhelpers.fetch import apt_install
from charmhelpers.core.hookenv import (
open_port,
close_port,
config,
log,
service_name,
relation_ids,
DEBUG,
WARNING,
ERROR,
)
import sys
import pwd
import grp
import os
import base64
# python-six in ensured by charmhelpers import so we put this here.
import six
try:
import psutil
except ImportError:
if six.PY2:
apt_install('python-psutil', fatal=True)
else:
apt_install('python3-psutil', fatal=True)
import psutil
import ssl_utils
ssl_key_file = "/etc/rabbitmq/rabbit-server-privkey.pem"
ssl_cert_file = "/etc/rabbitmq/rabbit-server-cert.pem"
ssl_ca_file = "/etc/rabbitmq/rabbit-server-ca.pem"
RABBITMQ_CTL = '/usr/sbin/rabbitmqctl'
ENV_CONF = '/etc/rabbitmq/rabbitmq-env.conf'
# Rabbimq docs recommend min. 12 threads per core (see LP: #1693561)
# NOTE(hopem): these defaults give us roughly the same as the default shipped
# with the version of rabbitmq in ubuntu xenial (3.5.7) - see
# https://tinyurl.com/rabbitmq-3-5-7 for exact value. Note that
# this default has increased with newer versions so we should
# track this and keep the charm up-to-date.
DEFAULT_MULTIPLIER = 24
MAX_DEFAULT_THREADS = DEFAULT_MULTIPLIER * 2
def convert_from_base64(v):
@ -136,6 +164,99 @@ class RabbitMQSSLContext(object):
class RabbitMQClusterContext(object):
def __call__(self):
return {
'cluster_partition_handling': config('cluster-partition-handling'),
}
ctxt = {'cluster_partition_handling':
config('cluster-partition-handling')}
if config('connection-backlog'):
ctxt['connection_backlog'] = config('connection-backlog')
return ctxt
class RabbitMQEnvContext(object):
def calculate_threads(self):
"""
Determine the number of erl vm threads in pool based in cpu resources
available.
Number of threads will be limited to MAX_DEFAULT_WORKERS in
container environments where no worker-multipler configuration
option been set.
@returns int: number of io threads to allocate
"""
try:
num_cpus = psutil.cpu_count()
except AttributeError:
num_cpus = psutil.NUM_CPUS
multiplier = (config('erl-vm-io-thread-multiplier') or
DEFAULT_MULTIPLIER)
log("Calculating erl vm io thread pool size based on num_cpus={} and "
"multiplier={}".format(num_cpus, multiplier), DEBUG)
count = int(num_cpus * multiplier)
if multiplier > 0 and count == 0:
count = 1
if config('erl-vm-io-thread-multiplier') is None and is_container():
# NOTE(hopem): Limit unconfigured erl-vm-io-thread-multiplier
# to MAX_DEFAULT_THREADS to avoid insane pool
# configuration in LXD containers on large servers.
count = min(count, MAX_DEFAULT_THREADS)
log("erl vm io thread pool size = {} (capped={})"
.format(count, is_container()), DEBUG)
return count
def __call__(self):
"""Write rabbitmq-env.conf according to charm config.
We never overwrite RABBITMQ_NODENAME to ensure that we don't break
clustered rabbitmq.
"""
blacklist = ['RABBITMQ_NODENAME']
context = {'settings': {}}
key = 'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS'
context['settings'][key] = "'+A {}'".format(self.calculate_threads())
if config('prefer-ipv6'):
key = 'RABBITMQ_SERVER_START_ARGS'
context['settings'][key] = "'-proto_dist inet6_tcp'"
# TODO: this is legacy HA and should be removed since it is now
# deprecated.
if relation_ids('ha'):
if not config('ha-vip-only'):
# TODO: do we need to remove this setting if it already exists
# and the above is false?
context['settings']['RABBITMQ_NODENAME'] = \
'{}@localhost'.format(service_name())
if os.path.exists(ENV_CONF):
for line in open(ENV_CONF).readlines():
if re.search('^\s*#', line) or not line.strip('\n'):
# ignore commented or blank lines
continue
_line = line.partition("=")
key = _line[0].strip()
val = _line[2].strip()
if _line[1] != "=":
log("Unable to parse line '{}' from {}".format(line,
ENV_CONF),
WARNING)
continue
if key in blacklist:
# Keep original
log("Leaving {} setting untouched".format(key), DEBUG)
context['settings'][key] = val
return context

View File

@ -361,6 +361,8 @@ def update_cookie(leaders_cookie=None):
@hooks.hook('ha-relation-joined')
@rabbit.restart_on_change({rabbit.ENV_CONF:
rabbit.restart_map()[rabbit.ENV_CONF]})
def ha_joined():
corosync_bindiface = config('ha-bindiface')
corosync_mcastport = config('ha-mcastport')
@ -385,14 +387,8 @@ def ha_joined():
log('ha_joined: No ceph relation yet, deferring.')
return
name = '%s@localhost' % SERVICE_NAME
if rabbit.get_node_name() != name and vip_only is False:
log('Stopping rabbitmq-server.')
service_stop('rabbitmq-server')
rabbit.update_rmq_env_conf(hostname='%s@localhost' % SERVICE_NAME,
ipv6=config('prefer-ipv6'))
else:
log('Node name already set to %s.' % name)
ctxt = {rabbit.ENV_CONF: rabbit.CONFIG_FILES[rabbit.ENV_CONF]}
rabbit.ConfigRenderer(ctxt).write(rabbit.ENV_CONF)
relation_settings = {}
relation_settings['corosync_bindiface'] = corosync_bindiface

View File

@ -0,0 +1,9 @@
###############################################################################
# [ WARNING ]
# Configuration file maintained by Juju. Local changes may be overwritten.
###############################################################################
{% if settings -%}
{%- for key, value in settings.iteritems() %}
{{key}}={{value}}
{%- endfor %}
{% endif %}

View File

@ -1,36 +1,39 @@
[
{rabbit, [
{collect_statistics_interval, 30000},
{% if ssl_only %}
{%- if ssl_only %}
{tcp_listeners, []},
{% else %}
{tcp_listeners, [5672]},
{% endif %}
{% if ssl_port %}
{%- endif %}
{%- if connection_backlog %}
{tcp_listen_options, [{backlog, {{connection_backlog}}}]},
{%- endif %}
{%- if ssl_port %}
{ssl_listeners, [{{ ssl_port }}]},
{% endif %}
{% if ssl_mode == "on" or ssl_mode == "only" %}
{%- endif %}
{%- if ssl_mode == "on" or ssl_mode == "only" %}
{ssl_options, [
{verify, verify_peer},
{% if ssl_client %}
{%- if ssl_client %}
{fail_if_no_peer_cert, true},
{% else %}
{fail_if_no_peer_cert, false},
{% endif %}
{% if ssl_ca_file %}
{%- endif %}
{%- if ssl_ca_file %}
{cacertfile, "{{ ssl_ca_file }}"},
{% endif %}
{% if ssl_cert_file %}
{%- endif %}
{%- if ssl_cert_file %}
{certfile, "{{ ssl_cert_file }}"},
{% endif %}
{% if ssl_key_file %}
{%- endif %}
{%- if ssl_key_file %}
{keyfile, "{{ ssl_key_file }}"}
{% endif %}
{%- endif %}
]},
{% endif %}
{%- endif %}
{% if cluster_partition_handling %}
{%- if cluster_partition_handling %}
{cluster_partition_handling, {{ cluster_partition_handling }}}
{% endif %}
{%- endif %}
]}
].

View File

@ -86,7 +86,8 @@ class RmqBasicDeployment(OpenStackAmuletDeployment):
"""
this_service = {
'name': 'rabbitmq-server',
'units': 3
'units': 3,
'constraints': {'cpu-cores': 2},
}
other_services = [
{'name': 'cinder'},

View File

@ -689,3 +689,24 @@ class UtilsTests(CharmTestCase):
rabbit_utils.check_cluster_memberships()
mock_forget_cluster_node.assert_called_with(
'rabbit@juju-devel3-machine-42')
@mock.patch('rabbitmq_context.psutil.NUM_CPUS', 2)
@mock.patch('rabbitmq_context.relation_ids')
@mock.patch('rabbitmq_context.config')
def test_render_rabbitmq_env(self, mock_config, mock_relation_ids):
mock_relation_ids.return_value = []
mock_config.return_value = 3
with mock.patch('rabbit_utils.render') as mock_render:
ctxt = {rabbit_utils.ENV_CONF:
rabbit_utils.CONFIG_FILES[rabbit_utils.ENV_CONF]}
rabbit_utils.ConfigRenderer(ctxt).write(
rabbit_utils.ENV_CONF)
ctxt = {'settings': {'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS':
"'+A 6'",
'RABBITMQ_SERVER_START_ARGS':
"'-proto_dist inet6_tcp'"}}
mock_render.assert_called_with('rabbitmq-env.conf',
'/etc/rabbitmq/rabbitmq-env.conf',
ctxt,
perms=420)

View File

@ -16,6 +16,7 @@ import rabbitmq_context
import mock
import unittest
import tempfile
class TestRabbitMQSSLContext(unittest.TestCase):
@ -89,7 +90,133 @@ class TestRabbitMQClusterContext(unittest.TestCase):
self.assertEqual(
rabbitmq_context.RabbitMQClusterContext().__call__(), {
'cluster_partition_handling': "ignore"
'cluster_partition_handling': "ignore",
'connection_backlog': "ignore"
})
config.assert_called_once_with("cluster-partition-handling")
config.assert_has_calls([mock.call("cluster-partition-handling"),
mock.call("connection-backlog")])
class TestRabbitMQEnvContext(unittest.TestCase):
@mock.patch.object(rabbitmq_context.psutil, 'NUM_CPUS', 2)
@mock.patch.object(rabbitmq_context, 'relation_ids', lambda *args: [])
@mock.patch.object(rabbitmq_context, 'service_name')
@mock.patch.object(rabbitmq_context, 'config')
def test_rabbitmqenv(self, mock_config, mock_service_name):
config = {}
def fake_config(key):
return config.get(key)
mock_service_name.return_value = 'svc_foo'
mock_config.side_effect = fake_config
with tempfile.NamedTemporaryFile() as tmpfile:
with mock.patch('rabbitmq_context.ENV_CONF', tmpfile.name):
config['prefer-ipv6'] = True
config['erl-vm-io-thread-multiplier'] = 36
ctxt = rabbitmq_context.RabbitMQEnvContext()()
self.assertEqual(ctxt['settings'],
{'RABBITMQ_SERVER_START_ARGS':
"'-proto_dist inet6_tcp'",
'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS':
"'+A 72'"})
@mock.patch.object(rabbitmq_context.psutil, 'NUM_CPUS', 2)
@mock.patch.object(rabbitmq_context, 'relation_ids')
@mock.patch.object(rabbitmq_context, 'service_name')
@mock.patch.object(rabbitmq_context, 'config')
def test_rabbitmqenv_legacy_ha_support(self, mock_config,
mock_service_name,
mock_relation_ids):
config = {}
def fake_config(key):
return config.get(key)
def fake_relation_ids(key):
if 'ha':
return ['ha:1']
mock_relation_ids.side_effect = fake_relation_ids
mock_service_name.return_value = 'svc_foo'
mock_config.side_effect = fake_config
with tempfile.NamedTemporaryFile() as tmpfile:
with mock.patch('rabbitmq_context.ENV_CONF', tmpfile.name):
ctxt = rabbitmq_context.RabbitMQEnvContext()()
self.assertEqual(ctxt['settings'],
{'RABBITMQ_NODENAME': 'svc_foo@localhost',
'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS':
"'+A 48'"})
mock_relation_ids.side_effect = lambda key: []
with tempfile.NamedTemporaryFile() as tmpfile:
with mock.patch('rabbitmq_context.ENV_CONF', tmpfile.name):
ctxt = rabbitmq_context.RabbitMQEnvContext()()
self.assertEqual(ctxt['settings'],
{'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS':
"'+A 48'"})
@mock.patch.object(rabbitmq_context.psutil, 'NUM_CPUS', 2)
@mock.patch.object(rabbitmq_context, 'relation_ids')
@mock.patch.object(rabbitmq_context, 'service_name')
@mock.patch.object(rabbitmq_context, 'config')
def test_rabbitmqenv_existing_nodename(self, mock_config,
mock_service_name,
mock_relation_ids):
def fake_relation_ids(key):
if 'ha':
return ['ha:1']
mock_relation_ids.side_effect = fake_relation_ids
mock_service_name.return_value = 'svc_foo'
mock_config.return_value = None
with tempfile.NamedTemporaryFile() as tmpfile:
with mock.patch('rabbitmq_context.ENV_CONF', tmpfile.name):
with open(tmpfile.name, 'w') as fd:
fd.write("RABBITMQ_NODENAME = blah@localhost")
ctxt = rabbitmq_context.RabbitMQEnvContext()()
self.assertEqual(ctxt['settings'],
{'RABBITMQ_NODENAME': 'blah@localhost',
'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS':
"'+A 48'"})
@mock.patch.object(rabbitmq_context, 'relation_ids', lambda *args: [])
@mock.patch.object(rabbitmq_context.psutil, 'NUM_CPUS', 128)
@mock.patch.object(rabbitmq_context, 'service_name')
@mock.patch.object(rabbitmq_context, 'config')
def test_rabbitmqenv_in_container(self, mock_config, mock_service_name):
mock_service_name.return_value = 'svc_foo'
config = {}
def fake_config(key):
return config.get(key)
mock_config.side_effect = fake_config
with mock.patch.object(rabbitmq_context, 'is_container') as \
mock_is_ctnr:
mock_is_ctnr.return_value = True
ctxt = rabbitmq_context.RabbitMQEnvContext()()
self.assertEqual(ctxt['settings'],
{'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS':
"'+A 48'"})
config['erl-vm-io-thread-multiplier'] = 24
ctxt = rabbitmq_context.RabbitMQEnvContext()()
self.assertEqual(ctxt['settings'],
{'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS':
"'+A 3072'"})
del config['erl-vm-io-thread-multiplier']
mock_is_ctnr.return_value = False
ctxt = rabbitmq_context.RabbitMQEnvContext()()
self.assertEqual(ctxt['settings'],
{'RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS':
"'+A 3072'"})