[AIM] Add RPC endpoint unit tests

The code between oslo_messaging and the apic_aim mechanism driver's
RPC handling methods was not previously covered by unit tests, so they
are added.

Change-Id: Ib8a4d9dfa3674744f365ceb9a6f14b0ddf70d7ef
This commit is contained in:
Robert Kukura 2019-03-13 19:55:16 -04:00
parent 19a3d0af56
commit 22711dc069
2 changed files with 148 additions and 4 deletions

View File

@ -180,15 +180,18 @@ class ApicRpcHandlerMixin(object):
# Opflex RPC handler.
if self.enable_new_rpc:
self._opflex_endpoint = o_rpc.GBPServerRpcCallback(
self, self.notifier)
conn.create_consumer(
o_rpc.TOPIC_OPFLEX,
[o_rpc.GBPServerRpcCallback(self, self.notifier)],
[self._opflex_endpoint],
fanout=False)
# Topology RPC hander.
self._topology_endpoint = TopologyRpcEndpoint(self)
conn.create_consumer(
oa_rpc.TOPIC_APIC_SERVICE,
[TopologyRpcEndpoint(self)],
[self._topology_endpoint],
fanout=False)
# Start listeners and return list of servers.
@ -257,13 +260,13 @@ class ApicRpcHandlerMixin(object):
LOG.exception(e)
return {'device': device}
def request_vrf_details(self, context, kwargs):
def request_vrf_details(self, context, **kwargs):
LOG.debug("APIC AIM MD handling request_vrf_details for: %s", kwargs)
# REVISIT: This RPC is not currently invoked by the Opflex
# agent, but that may be planned. Once it is, move the handler
# implementation from get_vrf_details() to this method.
return self.get_vrf_details(context, kwargs)
return self.get_vrf_details(context, **kwargs)
def ip_address_owner_update(self, context, **kwargs):
LOG.debug("APIC AIM MD handling ip_address_owner_update for: %s",

View File

@ -499,6 +499,147 @@ class TestRpcListeners(ApicAimTestCase):
self.driver.enable_new_rpc = False
self._test_start_rpc_listeners(False)
def test_opflex_endpoint(self):
self.plugin.start_rpc_listeners()
endpoint = self.driver._opflex_endpoint
self.assertIsNotNone(endpoint)
context = n_context.get_admin_context()
# Test RPCs handled by _request_endpoint_details.
with mock.patch.object(
self.driver, '_request_endpoint_details', autospec=True,
return_value={'gbp_details': 'test details'}) as handler:
# Test get_gbp_details RPC normal case.
response = endpoint.get_gbp_details(
context, device='test device', host='test host')
handler.assert_called_once_with(
context, {'device': 'test device'}, 'test host')
self.assertEqual('test details', response)
# Test get_gbp_details RPC exception case.
handler.reset_mock()
handler.side_effect = Exception()
response = endpoint.get_gbp_details(
context, device='test device', host='test host')
handler.assert_called_once_with(
context, {'device': 'test device'}, 'test host')
self.assertEqual({'device': 'test device'}, response)
# Test request_endpoint_details async RPC.
with mock.patch.object(
self.driver.notifier, 'opflex_endpoint_update',
autospec=True) as notifier:
# Test normal case.
handler.reset_mock()
handler.side_effect = None
endpoint.request_endpoint_details(
context, request={'device': 'test device'},
host='test host')
handler.assert_called_once_with(
context, {'device': 'test device'}, 'test host')
notifier.assert_called_once_with(
context, [{'gbp_details': 'test details'}],
host='test host')
# Test exception case.
handler.reset_mock()
notifier.reset_mock()
handler.side_effect = Exception()
endpoint.request_endpoint_details(
context, request={'device': 'test device'},
host='test host')
handler.assert_called_once_with(
context, {'device': 'test device'}, 'test host')
notifier.assert_called_once_with(
context, [{'device': 'test device'}], host='test host')
# Test RPCs handled by _get_vrf_details.
with mock.patch.object(
self.driver, '_get_vrf_details', autospec=True,
return_value='test details') as handler:
# Test get_vrf_details RPC normal case.
response = endpoint.get_vrf_details(
context, vrf_id='test id', host='test host')
handler.assert_called_once_with(context, 'test id')
self.assertEqual('test details', response)
# Test get_vrf_details RPC exception case.
handler.reset_mock()
handler.side_effect = Exception()
response = endpoint.get_vrf_details(
context, vrf_id='test id', host='test host')
handler.assert_called_once_with(context, 'test id')
self.assertEqual({'l3_policy_id': 'test id'}, response)
# Test request_vrf_details async RPC.
#
# REVISIT: These tests are disabled because
# opflexagent.rpc.GBPServerRpcCallback.request_vrf_details()
# passes the keyword parameter host to
# opflexagent.rpc.AgentNotifierApi.opflex_vrf_update(),
# which does not expect any keyword parameters, resulting
# in an exception. Fortunately, this RPC is not currently
# invoked by the Opflex agent.
#
# with mock.patch.object(
# self.driver.notifier, 'opflex_vrf_update',
# autospec=True) as notifier:
#
# # Test normal case.
# handler.reset_mock()
# handler.side_effect = None
# endpoint.request_vrf_details(
# context, vrf_id='test id', host='test host')
# handler.assert_called_once_with(context, 'test id')
# notifier.assert_called_once_with(
# context, ['test details'], host='test host')
#
# # Test exception case.
# handler.reset_mock()
# notifier.reset_mock()
# handler.side_effect = Exception()
# endpoint.request_vrf_details(
# context, vrf_id='test id', host='test host')
# handler.assert_called_once_with(context, 'test id')
# notifier.assert_called_once_with(
# context, [{'l3_policy_id': 'test id'}], host='test host')
# Test ip_address_owner_update RPC.
with mock.patch.object(
self.driver, 'ip_address_owner_update',
autospec=True) as handler:
endpoint.ip_address_owner_update(
context, ip_owner_info='test info')
handler.assert_called_once_with(
context, ip_owner_info='test info')
def test_topology_endpoint(self):
self.plugin.start_rpc_listeners()
endpoint = self.driver._topology_endpoint
self.assertIsNotNone(endpoint)
context = n_context.get_admin_context()
# Test update_link RPC.
with mock.patch.object(
self.driver, 'update_link',
autospec=True) as handler:
endpoint.update_link(
context, 'test host', 'test intf', 'test mac', 'test switch',
'test module', 'test port', 'test pod', 'test port desc')
handler.assert_called_once_with(
context, 'test host', 'test intf', 'test mac', 'test switch',
'test module', 'test port', 'test pod', 'test port desc')
# Test delete_link RPC, which currently does not call the handler.
with mock.patch.object(
self.driver, 'delete_link',
autospec=True) as handler:
endpoint.delete_link(context, 'test something')
self.assertFalse(handler.called)
class TestAimMapping(ApicAimTestCase):
def setUp(self):