Merge "Fix AttributeError during startup of ovs agent in DVR mode"

This commit is contained in:
Jenkins 2014-12-19 02:49:58 +00:00 committed by Gerrit Code Review
commit 5375ebaf0f
3 changed files with 34 additions and 8 deletions

View File

@ -26,7 +26,7 @@ from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def create_consumers(endpoints, prefix, topic_details):
def create_consumers(endpoints, prefix, topic_details, start_listening=True):
"""Create agent RPC consumers.
:param endpoints: The list of endpoints to process the incoming messages.
@ -34,6 +34,7 @@ def create_consumers(endpoints, prefix, topic_details):
:param topic_details: A list of topics. Each topic has a name, an
operation, and an optional host param keying the
subscription to topic.host for plugin calls.
:param start_listening: if True, it starts the processing loop
:returns: A common Connection.
"""
@ -50,7 +51,8 @@ def create_consumers(endpoints, prefix, topic_details):
connection.create_consumer(node_topic_name,
endpoints,
fanout=False)
connection.consume_in_threads()
if start_listening:
connection.consume_in_threads()
return connection

View File

@ -259,6 +259,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.iter_num = 0
self.run_daemon_loop = True
# The initialization is complete; we can start receiving messages
self.connection.consume_in_threads()
def _report_state(self):
# How many devices are likely used by a VM
self.agent_state.get('configurations')['devices'] = (
@ -296,7 +299,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
topics.UPDATE, cfg.CONF.host])
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
consumers,
start_listening=False)
def get_net_uuid(self, vif_id):
for network_id, vlan_mapping in self.local_vlan_map.iteritems():

View File

@ -118,7 +118,16 @@ class AgentPluginReportState(base.BaseTestCase):
class AgentRPCMethods(base.BaseTestCase):
def test_create_consumers(self):
def _test_create_consumers(
self, endpoints, method, expected, topics, listen):
call_to_patch = 'neutron.common.rpc.create_connection'
with mock.patch(call_to_patch) as create_connection:
rpc.create_consumers(
endpoints, method, topics, start_listening=listen)
create_connection.assert_has_calls(expected)
def test_create_consumers_start_listening(self):
endpoints = [mock.Mock()]
expected = [
mock.call(new=True),
@ -126,11 +135,22 @@ class AgentRPCMethods(base.BaseTestCase):
fanout=True),
mock.call().consume_in_threads()
]
method = 'foo'
topics = [('topic', 'op')]
self._test_create_consumers(
endpoints, method, expected, topics, True)
call_to_patch = 'neutron.common.rpc.create_connection'
with mock.patch(call_to_patch) as create_connection:
rpc.create_consumers(endpoints, 'foo', [('topic', 'op')])
create_connection.assert_has_calls(expected)
def test_create_consumers_do_not_listen(self):
endpoints = [mock.Mock()]
expected = [
mock.call(new=True),
mock.call().create_consumer('foo-topic-op', endpoints,
fanout=True),
]
method = 'foo'
topics = [('topic', 'op')]
self._test_create_consumers(
endpoints, method, expected, topics, False)
def test_create_consumers_with_node_name(self):
endpoints = [mock.Mock()]