Fix messaging layer for newer oslo.messaging changes

Instead of spawning our own threads and relying on the oslo.messaging
executor blocking, lets just use oslo.messaging's internal threading
capabilities.  This converts rpc.Connection to rpc.MessagingService,
which is an oslo.service-based service and sets up messaging to managed
there instead.

Closes-bug: #1583330

Change-Id: I9f5a15f1c5dff7e90761887c519a15888096636b
This commit is contained in:
Adam Gandelman 2016-05-20 13:19:51 -07:00
parent 6fd701be79
commit 6a934060f0
4 changed files with 52 additions and 54 deletions

View File

@ -14,11 +14,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import threading
from six.moves.urllib import parse as urlparse
from oslo_log import log as logging
from oslo_config import cfg
from oslo_service import service
import oslo_messaging
from astara.common.i18n import _LW
@ -94,19 +94,19 @@ def get_rpc_notifier(topic='notifications'):
)
class Connection(object):
class MessagingService(service.Service):
"""Used to create objects that can manage multiple RPC connections"""
def __init__(self):
super(Connection, self).__init__()
self._server_threads = {}
super(MessagingService, self).__init__()
self._servers = set()
def _add_server_thread(self, server):
self._server_threads[server] = threading.Thread(target=server.start)
def _add_server(self, server):
self._servers.add(server)
def create_rpc_consumer(self, topic, endpoints):
"""Creates an RPC server for this host that will execute RPCs requested
by clients. Adds the resulting consumer to the pool of RPC server
threads.
by clients. Adds the resulting consumer to the pool of messaging
servers.
:param topic: Topic on which to listen for RPC requests
:param endpoints: List of endpoint objects that define methods that
@ -115,13 +115,13 @@ class Connection(object):
target = get_target(topic=topic, fanout=True, server=cfg.CONF.host)
server = get_server(target, endpoints)
LOG.debug('Created RPC server on topic %s', topic)
self._add_server_thread(server)
self._add_server(server)
def create_notification_listener(self, endpoints, exchange=None,
topic='notifications'):
"""Creates an oslo.messaging notification listener associated with
provided endpoints. Adds the resulting listener to the pool of RPC
server threads.
provided endpoints. Adds the resulting listener to the pool of
messaging servers.
:param endpoints: list of endpoint objects that define methods for
processing prioritized notifications
@ -134,19 +134,20 @@ class Connection(object):
exchange=exchange)
pool = 'astara.' + topic + '.' + cfg.CONF.host
server = oslo_messaging.get_notification_listener(
transport, [target], endpoints, pool=pool)
transport, [target], endpoints, pool=pool, executor='threading')
LOG.debug(
'Created RPC notification listener on topic:%s/exchange:%s.',
topic, exchange)
self._add_server_thread(server)
self._add_server(server)
def consume_in_threads(self):
"""Start all RPC consumers in threads"""
for server, thread in self._server_threads.items():
LOG.debug('Started RPC connection thread:%s/server:%s',
thread, server)
thread.start()
def start(self):
LOG.info('Astara notification listener service starting...')
super(MessagingService, self).start()
[s.start() for s in self._servers]
LOG.info('Astara notification listener service started.')
def close(self):
for server, thread in self._server_threads.items():
thread.join()
def stop(self):
LOG.info('Astara notification listener service stopping...')
super(MessagingService, self).stop()
[s.wait() for s in self._servers]
LOG.info('Astara notification listener service stopped.')

View File

@ -32,6 +32,7 @@ from oslo_log import log as logging
from astara.common.i18n import _LE
from oslo_service import service
NOTIFICATIONS_OPTS = [
cfg.StrOpt('amqp-url',
@ -157,8 +158,8 @@ class NotificationsEndpoint(object):
def listen(notification_queue):
connection = rpc.Connection()
# listen for neutron notifications
"""Create and launch the messaging service"""
connection = rpc.MessagingService()
connection.create_notification_listener(
endpoints=[NotificationsEndpoint(notification_queue)],
exchange=cfg.CONF.neutron_control_exchange,
@ -167,10 +168,9 @@ def listen(notification_queue):
topic=L3_AGENT_TOPIC,
endpoints=[L3RPCEndpoint(notification_queue)]
)
# NOTE(adam_g): We previously consumed dhcp_agent messages as well
# as agent messgaes with hostname appended, do we need them still?
connection.consume_in_threads()
connection.close()
launcher = service.ServiceLauncher(cfg.CONF)
launcher.launch_service(service=connection, workers=1)
launcher.wait()
class Sender(object):

View File

@ -112,28 +112,28 @@ class TestRPC(testtools.TestCase):
mock.MagicMock(return_value='fake_server'))
@mock.patch.object(rpc, 'get_target',
mock.MagicMock(return_value='fake_target'))
class TestConnection(testtools.TestCase):
class TestMessagingService(testtools.TestCase):
def setUp(self):
super(TestConnection, self).setUp()
self.connection = rpc.Connection()
super(TestMessagingService, self).setUp()
self.connection = rpc.MessagingService()
self.config = self.useFixture(config_fixture.Config(cfg.CONF)).config
self.config(host='test_host')
def test_create_rpc_consumer(self):
endpoints = []
self.connection._add_server_thread = mock.MagicMock()
self.connection._add_server = mock.MagicMock()
self.connection.create_rpc_consumer(
topic='foo_topic', endpoints=endpoints)
rpc.get_target.return_value = 'fake_target'
rpc.get_target.assert_called_with(
topic='foo_topic', fanout=True, server='test_host')
rpc.get_server.assert_called_with('fake_target', endpoints)
self.connection._add_server_thread.assert_called_with('fake_server')
self.connection._add_server.assert_called_with('fake_server')
@mock.patch.object(oslo_messaging, 'get_notification_listener')
def test_create_notification_listener(self, fake_get_listener):
endpoints = []
self.connection._add_server_thread = mock.MagicMock()
self.connection._add_server = mock.MagicMock()
fake_get_listener.return_value = 'fake_listener_server'
self.connection.create_notification_listener(
endpoints=[], exchange='foo_exchange', topic='foo_topic')
@ -142,34 +142,30 @@ class TestConnection(testtools.TestCase):
topic='foo_topic', fanout=False, exchange='foo_exchange')
fake_get_listener.assert_called_with(
'fake_transport', ['fake_target'], endpoints,
pool='astara.foo_topic.test_host')
self.connection._add_server_thread.assert_called_with(
pool='astara.foo_topic.test_host', executor='threading')
self.connection._add_server.assert_called_with(
'fake_listener_server')
@mock.patch('threading.Thread')
def test__add_server_thread(self, fake_thread):
fake_thread.return_value = 'fake_server_thread'
def test__add_server(self):
fake_server = mock.MagicMock(
start=mock.MagicMock()
)
self.connection._add_server_thread(fake_server)
self.assertEqual(
self.connection._server_threads[fake_server],
'fake_server_thread')
fake_thread.assert_called_with(target=fake_server.start)
start=mock.MagicMock())
self.connection._add_server(fake_server)
self.assertIn(
fake_server,
self.connection._servers)
def test_consume_in_threads(self):
def test_start(self):
fake_server = mock.MagicMock(
start=mock.MagicMock()
)
self.connection._server_threads['foo'] = fake_server
self.connection.consume_in_threads()
self.connection._add_server(fake_server)
self.connection.start()
self.assertTrue(fake_server.start.called)
def test_close(self):
def test_stop(self):
fake_server = mock.MagicMock(
join=mock.MagicMock()
stop=mock.MagicMock()
)
self.connection._server_threads['foo'] = fake_server
self.connection.close()
self.assertTrue(fake_server.join.called)
self.connection._add_server(fake_server)
self.connection.stop()
self.assertTrue(fake_server.wait.called)

View File

@ -16,6 +16,7 @@ oslo.messaging>=4.5.0 # Apache-2.0
oslo.serialization>=1.10.0 # Apache-2.0
oslo.utils>=3.5.0 # Apache-2.0
oslo.rootwrap>=2.0.0 # Apache-2.0
oslo.service>=1.10.0 # Apache-2.0
WebOb>=1.2.3 # MIT
python-novaclient!=2.33.0,>=2.29.0 # Apache-2.0
cliff!=1.16.0,!=1.17.0,>=1.15.0 # Apache-2.0