port_forwarding: extend support for OVN usage (cont.)

This is a follow up from commit 102c442bcf

The call to resources_rpc.ResourcesPushRpcApi() will fail in situations
where RPC is not available. That is the case for neutron_ovn_db_sync_util.

The changes here will only set push_api if any of the configured service
plugins need it. This is a behavior similar to what is done for the QOS
plugin.

Related-Bug: #1877447
Change-Id: I3f2e18fabf4556cd708c6e544b5aaf37f72b44df
This commit is contained in:
Flavio Fernandes 2020-06-08 18:02:06 -04:00
parent cb5f42584e
commit eb46081150
2 changed files with 57 additions and 33 deletions

View File

@ -56,15 +56,22 @@ PORT_FORWARDING_FLOATINGIP_KEY = '_pf_floatingips'
def _required_service_plugins():
supported_svc_plugins = [l3.ROUTER, 'ovn-router']
SvcPlugin = collections.namedtuple('SvcPlugin', 'plugin uses_rpc')
l3_router = SvcPlugin(l3.ROUTER, True)
supported_svc_plugins = [l3_router, SvcPlugin('ovn-router', False)]
plugins = []
rpc_required = False
try:
plugins = [svc for svc in supported_svc_plugins if
svc in cfg.CONF.service_plugins]
for svc in supported_svc_plugins:
if svc.plugin in cfg.CONF.service_plugins:
plugins.append(svc.plugin)
rpc_required |= svc.uses_rpc
except cfg.NoSuchOptError:
plugins = None
pass
# Use l3.ROUTER as required service plugin if no other was provided.
return plugins or [l3.ROUTER]
if plugins:
return plugins, rpc_required
# Use l3_router as required service plugin if no other was provided.
return [l3_router.plugin], l3_router.uses_rpc
@resource_extend.has_resource_extenders
@ -75,7 +82,8 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
This class implements a Port Forwarding plugin.
"""
required_service_plugins = _required_service_plugins()
required_service_plugins, _rpc_notifications_required = \
_required_service_plugins()
supported_extension_aliases = [apidef.ALIAS,
expose_port_forwarding_in_fip.ALIAS,
@ -87,7 +95,8 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
def __init__(self):
super(PortForwardingPlugin, self).__init__()
self.push_api = resources_rpc.ResourcesPushRpcApi()
self.push_api = resources_rpc.ResourcesPushRpcApi() \
if self._rpc_notifications_required else None
self.l3_plugin = directory.get_plugin(constants.L3)
self.core_plugin = directory.get_plugin()
registry.publish(pf_consts.PORT_FORWARDING_PLUGIN, events.AFTER_INIT,
@ -241,8 +250,9 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
pf_resource.delete()
remove_port_forwarding_list.append(pf_resource)
self.push_api.push(context, remove_port_forwarding_list,
rpc_events.DELETED)
if self._rpc_notifications_required:
self.push_api.push(context, remove_port_forwarding_list,
rpc_events.DELETED)
registry_notify_payload = [
callbacks.PortForwardingPayload(context, original_pf=pf_obj) for
pf_obj in remove_port_forwarding_list]
@ -369,7 +379,8 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
self.push_api.push(context, [pf_obj], rpc_events.CREATED)
if self._rpc_notifications_required:
self.push_api.push(context, [pf_obj], rpc_events.CREATED)
registry.notify(pf_consts.PORT_FORWARDING, events.AFTER_CREATE,
self,
payload=[callbacks.PortForwardingPayload(context,
@ -423,7 +434,8 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
"are %s") % conflict_params
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
self.push_api.push(context, [pf_obj], rpc_events.UPDATED)
if self._rpc_notifications_required:
self.push_api.push(context, [pf_obj], rpc_events.UPDATED)
registry.notify(pf_consts.PORT_FORWARDING, events.AFTER_UPDATE, self,
payload=[callbacks.PortForwardingPayload(context,
current_pf=pf_obj, original_pf=original_pf_obj)])
@ -524,7 +536,8 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
fip_obj.update_fields({'router_id': None})
fip_obj.update()
pf_obj.delete()
self.push_api.push(context, [pf_obj], rpc_events.DELETED)
if self._rpc_notifications_required:
self.push_api.push(context, [pf_obj], rpc_events.DELETED)
registry.notify(pf_consts.PORT_FORWARDING, events.AFTER_DELETE, self,
payload=[callbacks.PortForwardingPayload(
context, original_pf=pf_obj)])

View File

@ -395,37 +395,48 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
self.ctxt, 'fake-pf-id', 'fip_id_2', **pf_input)
def test_service_plugins_values(self):
exp_default = ['router']
exp_default_plugins = ['router']
supported_plugins = ['router', 'ovn-router']
same_as_input = 'same_as_input'
TC = namedtuple('TC', 'input expected description')
TC = namedtuple('TC', 'input exp_plugins exp_uses_rpc description')
test_cases = [
TC([], exp_default, "default from empty cfg"),
TC(['foo'], exp_default, "default from unexpected cfg"),
TC(['foo', 123], exp_default, "default from unexpected cfg"),
TC(['foo', 'router'], exp_default, "default from valid cfg"),
TC(['router'], same_as_input, "valid cfg 1"),
TC(['router'], same_as_input, "valid cfg 1"),
TC(['ovn-router'], same_as_input, "valid cfg 2"),
TC(['ovn-router', 'router'], supported_plugins, "valid cfg 3"),
TC(['router', 'ovn-router'], supported_plugins, "valid cfg 4"),
TC(['bar', 'router', 'foo'], ['router'], "valid cfg 5"),
TC(['bar', 'ovn-router', 'foo'], ['ovn-router'], "valid cfg 6"),
TC([], exp_default_plugins, True, "default from empty cfg"),
TC(['foo'], exp_default_plugins, True,
"default from unexpected cfg"),
TC(['foo', 123], exp_default_plugins, True,
"default from unexpected cfg"),
TC(['foo', 'router'], exp_default_plugins, True,
"default from valid cfg"),
TC(['router'], same_as_input, True, "valid cfg 1"),
TC(['router'], same_as_input, True, "valid cfg 1"),
TC(['ovn-router'], same_as_input, False, "valid cfg 2"),
TC(['ovn-router', 'router'], supported_plugins, True,
"valid cfg 3"),
TC(['router', 'ovn-router'], supported_plugins, True,
"valid cfg 4"),
TC(['bar', 'router', 'foo'], ['router'], True, "valid cfg 5"),
TC(['bar', 'ovn-router', 'foo'], ['ovn-router'], False,
"valid cfg 6"),
TC(['bar', 'router', 123, 'ovn-router', 'foo', 'kitchen', 'sink'],
supported_plugins, "valid cfg 7"),
supported_plugins, True, "valid cfg 7"),
]
for tc in test_cases:
cfg.CONF.set_override("service_plugins", tc.input)
result = pf_plugin._required_service_plugins()
if tc.expected == same_as_input:
self.assertEqual(tc.input, result, tc.description)
plugins, rpc_required = pf_plugin._required_service_plugins()
if tc.exp_plugins == same_as_input:
self.assertEqual(
(tc.input, tc.exp_uses_rpc), (plugins, rpc_required),
tc.description)
else:
self.assertEqual(tc.expected, result, tc.description)
self.assertEqual(
(tc.exp_plugins, tc.exp_uses_rpc), (plugins, rpc_required),
tc.description)
@mock.patch.object(cfg.ConfigOpts, '__getattr__')
def test_service_plugins_no_such_opt(self, mock_config_opts_get):
description = "test cfg.NoSuchOptError exception"
mock_config_opts_get.side_effect = cfg.NoSuchOptError('test_svc_plug')
result = pf_plugin._required_service_plugins()
plugins, rpc_required = pf_plugin._required_service_plugins()
mock_config_opts_get.assert_called_once()
self.assertEqual(['router'], result, description)
self.assertEqual(
(['router'], True), (plugins, rpc_required), description)