Support message queue clusters in inter-cell communication

Since cells use oslo.messaging to specify and store the message queue URL,
multiple hosts can be specified by manually modifying that URL in the database.
However, there is no way to specify multiple hosts during cell creation phase.
This patch adds a --broker_hosts option to `nova-manage cell create` command,
which is analogous to the rabbit_hosts option in nova.conf and can be used to
specify multiple message queue servers as a comma separated list. Each server
is specified using hostname:port with both being mandatory. The existing
--hostname and --port options continue to remain but are only considered if no
--broker_hosts is specified.
Internally, each host is converted to a oslo.messaging.TransportHost
and added to the generated TransportURL.
This patch also adds unit tests for creation of the TransportHosts from
user given input.

Change-Id: I14de860b1d12f3e2c0169b58651d580792d6ce0e
Closes-Bug: 1178541
This commit is contained in:
Dheeraj Gupta 2014-08-12 15:12:52 +00:00
parent 073c3c8635
commit cc17991d1a
2 changed files with 171 additions and 8 deletions

View File

@ -1059,6 +1059,46 @@ class GetLogCommands(object):
class CellCommands(object):
"""Commands for managing cells."""
def _create_transport_hosts(self, username, password,
broker_hosts=None, hostname=None, port=None):
"""Returns a list of oslo.messaging.TransportHost objects."""
transport_hosts = []
# Either broker-hosts or hostname should be set
if broker_hosts:
hosts = broker_hosts.split(',')
for host in hosts:
host = host.strip()
broker_hostname, broker_port = utils.parse_server_string(host)
if not broker_port:
msg = _('Invalid broker_hosts value: %s. It should be'
' in hostname:port format') % host
raise ValueError(msg)
try:
broker_port = int(broker_port)
except ValueError:
msg = _('Invalid port value: %s. It should be '
'an integer') % broker_port
raise ValueError(msg)
transport_hosts.append(
messaging.TransportHost(
hostname=broker_hostname,
port=broker_port,
username=username,
password=password))
else:
try:
port = int(port)
except ValueError:
msg = _("Invalid port value: %s. Should be an integer") % port
raise ValueError(msg)
transport_hosts.append(
messaging.TransportHost(
hostname=hostname,
port=port,
username=username,
password=password))
return transport_hosts
@args('--name', metavar='<name>', help='Name for the new cell')
@args('--cell_type', metavar='<parent|child>',
help='Whether the cell is a parent or child')
@ -1066,6 +1106,11 @@ class CellCommands(object):
help='Username for the message broker in this cell')
@args('--password', metavar='<password>',
help='Password for the message broker in this cell')
@args('--broker_hosts', metavar='<broker_hosts>',
help='Comma separated list of message brokers in this cell. '
'Each Broker is specified as hostname:port with both '
'mandatory. This option overrides the --hostname '
'and --port options (if provided). ')
@args('--hostname', metavar='<hostname>',
help='Address of the message broker in this cell')
@args('--port', metavar='<number>',
@ -1074,8 +1119,8 @@ class CellCommands(object):
help='The virtual host of the message broker in this cell')
@args('--woffset', metavar='<float>')
@args('--wscale', metavar='<float>')
def create(self, name, cell_type='child', username=None, password=None,
hostname=None, port=None, virtual_host=None,
def create(self, name, cell_type='child', username=None, broker_hosts=None,
password=None, hostname=None, port=None, virtual_host=None,
woffset=None, wscale=None):
if cell_type not in ['parent', 'child']:
@ -1083,13 +1128,12 @@ class CellCommands(object):
return(2)
# Set up the transport URL
transport_host = messaging.TransportHost(hostname=hostname,
port=int(port),
username=username,
password=password)
transport_hosts = self._create_transport_hosts(
username, password,
broker_hosts, hostname,
port)
transport_url = rpc.get_transport_url()
transport_url.hosts.append(transport_host)
transport_url.hosts.extend(transport_hosts)
transport_url.virtual_host = virtual_host
is_parent = cell_type == 'parent'

View File

@ -17,6 +17,7 @@ import StringIO
import sys
import fixtures
import mock
from nova.cmd import manage
from nova import context
@ -346,3 +347,121 @@ class ServiceCommandsTestCase(test.TestCase):
def test_service_disable_invalid_params(self):
self.assertEqual(2, self.commands.disable('nohost', 'noservice'))
class CellCommandsTestCase(test.TestCase):
def setUp(self):
super(CellCommandsTestCase, self).setUp()
self.commands = manage.CellCommands()
def test_create_transport_hosts_multiple(self):
"""Test the _create_transport_hosts method
when broker_hosts is set.
"""
brokers = "127.0.0.1:5672,127.0.0.2:5671"
thosts = self.commands._create_transport_hosts(
'guest', 'devstack',
broker_hosts=brokers)
self.assertEqual(2, len(thosts))
self.assertEqual('127.0.0.1', thosts[0].hostname)
self.assertEqual(5672, thosts[0].port)
self.assertEqual('127.0.0.2', thosts[1].hostname)
self.assertEqual(5671, thosts[1].port)
def test_create_transport_hosts_single(self):
"""Test the _create_transport_hosts method when hostname is passed."""
thosts = self.commands._create_transport_hosts('guest', 'devstack',
hostname='127.0.0.1',
port=80)
self.assertEqual(1, len(thosts))
self.assertEqual('127.0.0.1', thosts[0].hostname)
self.assertEqual(80, thosts[0].port)
def test_create_transport_hosts_single_broker(self):
"""Test the _create_transport_hosts method for single broker_hosts."""
thosts = self.commands._create_transport_hosts(
'guest', 'devstack',
broker_hosts='127.0.0.1:5672')
self.assertEqual(1, len(thosts))
self.assertEqual('127.0.0.1', thosts[0].hostname)
self.assertEqual(5672, thosts[0].port)
def test_create_transport_hosts_both(self):
"""Test the _create_transport_hosts method when both broker_hosts
and hostname/port are passed.
"""
thosts = self.commands._create_transport_hosts(
'guest', 'devstack',
broker_hosts='127.0.0.1:5672',
hostname='127.0.0.2', port=80)
self.assertEqual(1, len(thosts))
self.assertEqual('127.0.0.1', thosts[0].hostname)
self.assertEqual(5672, thosts[0].port)
def test_create_transport_hosts_wrong_val(self):
"""Test the _create_transport_hosts method when broker_hosts
is wrongly sepcified
"""
self.assertRaises(ValueError,
self.commands._create_transport_hosts,
'guest', 'devstack',
broker_hosts='127.0.0.1:5672,127.0.0.1')
def test_create_transport_hosts_wrong_port_val(self):
"""Test the _create_transport_hosts method when port in
broker_hosts is wrongly sepcified
"""
self.assertRaises(ValueError,
self.commands._create_transport_hosts,
'guest', 'devstack',
broker_hosts='127.0.0.1:')
def test_create_transport_hosts_wrong_port_arg(self):
"""Test the _create_transport_hosts method when port
argument is wrongly sepcified
"""
self.assertRaises(ValueError,
self.commands._create_transport_hosts,
'guest', 'devstack',
hostname='127.0.0.1', port='ab')
@mock.patch.object(context, 'get_admin_context')
@mock.patch.object(db, 'cell_create')
def test_create_broker_hosts(self, mock_db_cell_create, mock_ctxt):
"""Test the create function when broker_hosts is
passed
"""
cell_tp_url = "fake://guest:devstack@127.0.0.1:5432"
cell_tp_url += ",guest:devstack@127.0.0.2:9999/"
ctxt = mock.sentinel
mock_ctxt.return_value = mock.sentinel
self.commands.create("test",
broker_hosts='127.0.0.1:5432,127.0.0.2:9999',
woffset=0, wscale=0,
username="guest", password="devstack")
exp_values = {'name': "test",
'is_parent': False,
'transport_url': cell_tp_url,
'weight_offset': 0.0,
'weight_scale': 0.0}
mock_db_cell_create.assert_called_once_with(ctxt, exp_values)
@mock.patch.object(context, 'get_admin_context')
@mock.patch.object(db, 'cell_create')
def test_create_hostname(self, mock_db_cell_create, mock_ctxt):
"""Test the create function when hostname and port is
passed
"""
cell_tp_url = "fake://guest:devstack@127.0.0.1:9999/"
ctxt = mock.sentinel
mock_ctxt.return_value = mock.sentinel
self.commands.create("test",
hostname='127.0.0.1', port="9999",
woffset=0, wscale=0,
username="guest", password="devstack")
exp_values = {'name': "test",
'is_parent': False,
'transport_url': cell_tp_url,
'weight_offset': 0.0,
'weight_scale': 0.0}
mock_db_cell_create.assert_called_once_with(ctxt, exp_values)