Merge "Support binding_deactivate in CacheBackedPluginApi"

This commit is contained in:
Zuul 2018-07-20 19:01:45 +00:00 committed by Gerrit Code Review
commit 2dd5e21310
2 changed files with 147 additions and 4 deletions

View File

@ -18,8 +18,10 @@ import itertools
import netaddr
from neutron_lib.agent import topics
from neutron_lib.api.definitions import portbindings_extended as pb_ext
from neutron_lib.callbacks import events as callback_events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources as callback_resources
from neutron_lib import constants
from oslo_log import log as logging
import oslo_messaging
@ -199,16 +201,54 @@ class CacheBackedPluginApi(PluginApi):
the payloads the handlers are expecting (an ID).
"""
rtype = rtype.lower() # all legacy handlers don't camelcase
is_delete = event == callback_events.AFTER_DELETE
suffix = 'delete' if is_delete else 'update'
method = "%s_%s" % (rtype, suffix)
method, host = self._get_method_host(rtype, event, **kwargs)
if not hasattr(self._legacy_interface, method):
# TODO(kevinbenton): once these notifications are stable, emit
# a deprecation warning for legacy handlers
return
payload = {rtype: {'id': resource_id}, '%s_id' % rtype: resource_id}
payload = {rtype: {'id': resource_id}, '%s_id' % rtype: resource_id,
'host': host}
getattr(self._legacy_interface, method)(context, **payload)
def _get_method_host(self, rtype, event, **kwargs):
"""Constructs the name of method to be called in the legacy interface.
If the event received is a port update that contains a binding
activation where a previous binding is deactivated, the method name
is 'binding_deactivate' and the host where the binding has to be
deactivated is returned. Otherwise, the method name is constructed from
rtype and the event received and the host is None.
"""
is_delete = event == callback_events.AFTER_DELETE
suffix = 'delete' if is_delete else 'update'
method = "%s_%s" % (rtype, suffix)
host = None
if is_delete or rtype != callback_resources.PORT:
return method, host
# A port update was received. Find out if it is a binding activation
# where a previous binding was deactivated
BINDINGS = pb_ext.COLLECTION_NAME
if BINDINGS in kwargs.get('changed_fields', set()):
existing_active_binding = (
utils.get_port_binding_by_status_and_host(
getattr(kwargs['existing'], 'bindings', []),
constants.ACTIVE))
updated_active_binding = (
utils.get_port_binding_by_status_and_host(
getattr(kwargs['updated'], 'bindings', []),
constants.ACTIVE))
if (existing_active_binding and updated_active_binding and
existing_active_binding.host !=
updated_active_binding.host):
if (utils.get_port_binding_by_status_and_host(
getattr(kwargs['updated'], 'bindings', []),
constants.INACTIVE,
host=existing_active_binding.host)):
method = 'binding_deactivate'
host = existing_active_binding.host
return method, host
def get_devices_details_list_and_failed_devices(self, context, devices,
agent_id, host=None):
result = {'devices': [], 'failed_devices': []}

View File

@ -16,9 +16,15 @@
import datetime
import mock
from neutron_lib.agent import topics as lib_topics
from neutron_lib.callbacks import events
from neutron_lib.callbacks import resources
from neutron_lib import constants
from oslo_context import context as oslo_context
from oslo_utils import uuidutils
from neutron.agent import rpc
from neutron.objects import ports
from neutron.tests import base
@ -162,3 +168,100 @@ class AgentRPCMethods(base.BaseTestCase):
with mock.patch(call_to_patch) as create_connection:
rpc.create_consumers(endpoints, 'foo', [('topic', 'op', 'node1')])
create_connection.assert_has_calls(expected)
class TestCacheBackedPluginApi(base.BaseTestCase):
def setUp(self):
super(TestCacheBackedPluginApi, self).setUp()
self._api = rpc.CacheBackedPluginApi(lib_topics.PLUGIN)
self._api._legacy_interface = mock.Mock()
self._port_id = uuidutils.generate_uuid()
self._port = ports.Port(
id=self._port_id,
bindings=[ports.PortBinding(port_id=self._port_id,
host='host1',
status=constants.ACTIVE)])
def test__legacy_notifier_resource_delete(self):
self._api._legacy_notifier(resources.PORT, events.AFTER_DELETE, self,
mock.ANY, resource_id=self._port_id,
existing=self._port)
self._api._legacy_interface.port_update.assert_not_called()
self._api._legacy_interface.port_delete.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id,
host=None)
self._api._legacy_interface.binding_deactivate.assert_not_called()
def test__legacy_notifier_resource_update(self):
updated_port = ports.Port(id=self._port_id, name='updated_port')
self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self,
mock.ANY, changed_fields=set(['name']),
resource_id=self._port_id,
existing=self._port, updated=updated_port)
self._api._legacy_interface.port_delete.assert_not_called()
self._api._legacy_interface.port_update.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id,
host=None)
self._api._legacy_interface.binding_deactivate.assert_not_called()
def _test__legacy_notifier_binding_activated(self):
updated_port = ports.Port(
id=self._port_id, name='updated_port',
bindings=[ports.PortBinding(port_id=self._port_id,
host='host2',
status=constants.ACTIVE),
ports.PortBinding(port_id=self._port_id,
host='host1',
status=constants.INACTIVE)])
self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self,
mock.ANY,
changed_fields=set(['name', 'bindings']),
resource_id=self._port_id,
existing=self._port, updated=updated_port)
self._api._legacy_interface.port_update.assert_not_called()
self._api._legacy_interface.port_delete.assert_not_called()
def test__legacy_notifier_new_binding_activated(self):
self._test__legacy_notifier_binding_activated()
self._api._legacy_interface.binding_deactivate.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id,
host='host1')
def test__legacy_notifier_no_new_binding_activated(self):
updated_port = ports.Port(
id=self._port_id, name='updated_port',
bindings=[ports.PortBinding(port_id=self._port_id,
host='host2',
status=constants.ACTIVE)])
self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self,
mock.ANY,
changed_fields=set(['name', 'bindings']),
resource_id=self._port_id,
existing=self._port, updated=updated_port)
self._api._legacy_interface.port_update.assert_called_once_with(
mock.ANY, port={'id': self._port_id}, port_id=self._port_id,
host=None)
self._api._legacy_interface.port_delete.assert_not_called()
self._api._legacy_interface.binding_deactivate.assert_not_called()
def test__legacy_notifier_existing_or_updated_is_none(self):
self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE,
self, mock.ANY,
changed_fields=set(['name', 'bindings']),
resource_id=self._port_id,
existing=None, updated=None)
self._api._legacy_notifier(resources.PORT, events.AFTER_UPDATE, self,
mock.ANY,
changed_fields=set(['name', 'bindings']),
resource_id=self._port_id,
existing=self._port, updated=None)
call = mock.call(mock.ANY, port={'id': self._port_id},
port_id=self._port_id, host=None)
self._api._legacy_interface.port_update.assert_has_calls([call, call])
self._api._legacy_interface.port_delete.assert_not_called()
self._api._legacy_interface.binding_deactivate.assert_not_called()
def test__legacy_notifier_binding_activated_not_supported(self):
del self._api._legacy_interface.binding_deactivate
self._test__legacy_notifier_binding_activated()