diff --git a/neutron/services/placement_report/__init__.py b/neutron/services/placement_report/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/services/placement_report/plugin.py b/neutron/services/placement_report/plugin.py new file mode 100644 index 00000000000..8a6e3f737e4 --- /dev/null +++ b/neutron/services/placement_report/plugin.py @@ -0,0 +1,227 @@ +# Copyright 2018 Ericsson +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from keystoneauth1 import exceptions as ks_exc +from neutron_lib.agent import constants as agent_const +from neutron_lib.api.definitions import agent_resources_synced +from neutron_lib.callbacks import events +from neutron_lib.callbacks import registry +from neutron_lib.callbacks import resources +from neutron_lib.placement import client as place_client +from neutron_lib.plugins import directory +from neutron_lib.services import base as service_base +from oslo_config import cfg +from oslo_log import log as logging + +from neutron.agent.common import placement_report +from neutron.notifiers import batch_notifier + +LOG = logging.getLogger(__name__) + +PLUGIN_TYPE = "placement_report" + + +@registry.has_registry_receivers +class PlacementReportPlugin(service_base.ServicePluginBase): + + supported_extension_aliases = [] + + # A service plugin without claiming support for filter validation would + # disable filter validation for all other plugins, so we report support + # although this plugin doesn't have filters. + __filter_validation_support = True + + @classmethod + def get_plugin_type(cls): + return PLUGIN_TYPE + + def get_plugin_description(self): + return "Sync placement info from agent to server to placement." + + def __init__(self): + self._core_plugin = directory.get_plugin() + # NOTE(bence romsics): The following bug and fix may be relevant here. + # https://bugs.launchpad.net/nova/+bug/1697825 + # https://review.openstack.org/493536 + self._placement_client = place_client.PlacementAPIClient(cfg.CONF) + self._agents = PlacementReporterAgents(self._core_plugin) + self._batch_notifier = batch_notifier.BatchNotifier( + cfg.CONF.send_events_interval, self._execute_deferred) + + def _execute_deferred(self, deferred_batch): + for deferred in deferred_batch: + deferred() + + def _get_rp_by_name(self, name): + rps = self._placement_client.list_resource_providers( + name=name)['resource_providers'] + # RP names are unique, therefore we can get 0 or 1. But not many. + if len(rps) != 1: + # NOTE(bence romsics): While we could raise() here and by detect + # an error a bit earlier, we want the error to surface in the + # sync batch below so it is going to be properly caught and is + # going to influence the agent's resources_synced attribute. + LOG.warning( + 'placement client: no such resource provider: %s', name) + return {'uuid': None} + return rps[0] + + def _sync_placement_state(self, agent, agent_db): + configurations = agent['configurations'] + mech_driver = self._agents.mechanism_driver_by_agent_type( + agent['agent_type']) + uuid_ns = mech_driver.resource_provider_uuid5_namespace + supported_vnic_types = mech_driver.supported_vnic_types + device_mappings = mech_driver.get_standard_device_mappings(agent) + + try: + agent_host_rp_uuid = self._get_rp_by_name( + name=agent['host'])['uuid'] + except ks_exc.HttpError: + # Delay the error for the same reason as in _get_rp_by_name(). + agent_host_rp_uuid = None + + state = placement_report.PlacementState( + rp_bandwidths=configurations[ + 'resource_provider_bandwidths'], + rp_inventory_defaults=configurations[ + 'resource_provider_inventory_defaults'], + driver_uuid_namespace=uuid_ns, + agent_type=agent['agent_type'], + agent_host=agent['host'], + agent_host_rp_uuid=agent_host_rp_uuid, + device_mappings=device_mappings, + supported_vnic_types=supported_vnic_types, + client=self._placement_client) + + deferred_batch = state.deferred_sync() + + # NOTE(bence romsics): Some client calls depend on earlier + # ones, but not all. There are calls in a batch that can succeed + # independently of earlier calls. Therefore even if a call fails + # we have to suppress its failure so the later independent calls + # have a chance to succeed. If we queue up the deferred client + # calls one by one then we cannot handle errors at the end of + # a batch. So instead we should wrap the deferred client calls + # in a single deferred batch which executes the client calls, + # continuing to the next client call even if there was an error + # but remembering if an error happened. Then at the end of the + # batch (also having access to the agent object) set the agent's + # resources_synced attribute according to the success/failure + # of the batch. Since each client call does monkey patched I/O + # we'll yield to other eventlet threads in each call therefore + # the performance should not be affected by the wrapping. + def batch(): + errors = False + + for deferred in deferred_batch: + try: + LOG.debug('placement client: {}'.format(deferred)) + deferred.execute() + except Exception: + errors = True + LOG.exception( + 'placement client call failed: %s', + str(deferred)) + + resources_synced = not errors + agent_db.resources_synced = resources_synced + agent_db.update() + + LOG.debug( + 'Synchronization of resources' + ' of agent type %(type)s' + ' at host %(host)s' + ' to placement %(result)s.', + {'type': agent['agent_type'], + 'host': agent['host'], + 'result': 'succeeded' if resources_synced else 'failed'}) + + self._batch_notifier.queue_event(batch) + + @registry.receives(resources.AGENT, + [events.AFTER_CREATE, events.AFTER_UPDATE]) + def handle_placement_config(self, resource, event, trigger, payload): + # NOTE(bence romsics): This method gets called a lot, keep it quick. + agent = payload.desired_state + status = payload.metadata.get('status') + context = payload.context + if agent['agent_type'] not in self._agents.supported_agent_types: + return + if 'resource_provider_bandwidths' not in agent['configurations']: + LOG.warning( + "The mechanism driver claims agent type supports " + "placement reports, but the agent does not report " + "'resoure_provider_bandwidths' in its configurations. " + "host: %(host)s, type: %(type)s", + {'host': agent['agent_type'], + 'type': agent['host']}) + return + + # We need to get the same agent as in + # neutron.db.agents_db.AgentDbMixin.create_or_update_agent() + agent_db = self._core_plugin._get_agent_by_type_and_host( + context, agent['agent_type'], agent['host']) + + # sync the state known by us to placement + if ( + # agent object in API (re-)created + status == agent_const.AGENT_NEW or + # agent (re-)started (even without config change) + 'start_flag' in agent or + # never tried to sync yet or last sync failed + not agent_db[agent_resources_synced.RESOURCES_SYNCED]): + LOG.debug( + 'placement: syncing state for agent type %s on host %s', + agent['agent_type'], agent['host']) + self._sync_placement_state(agent, agent_db) + else: + LOG.debug( + 'placement: nothing to sync for agent type %s on host %s', + agent['agent_type'], agent['host']) + + +class PlacementReporterAgents(object): + + # Yep, this is meant to depend on ML2. + def __init__(self, ml2_plugin): + self._mechanism_drivers = ml2_plugin.mechanism_manager.\ + ordered_mech_drivers + self._supported_agent_types = [] + self._agent_type_to_mech_driver = {} + + @property + def supported_agent_types(self): + if not self._supported_agent_types: + # NOTE(bence romsics): We treat the presence of the + # RP uuid namespace a proxy for supporting placement reports from + # the driver's agent type. But we could introduce a property/logic + # explicitly describing the agent types supporting placement + # reports any time if this proved to be insufficient. + self._supported_agent_types = [ + driver.obj.agent_type + for driver in self._mechanism_drivers + if driver.obj.resource_provider_uuid5_namespace is not None] + LOG.debug('agent types supporting placement reports: %s', + ', '.join(self._supported_agent_types)) + return self._supported_agent_types + + def mechanism_driver_by_agent_type(self, agent_type): + if agent_type not in self._agent_type_to_mech_driver: + for driver in self._mechanism_drivers: + if (hasattr(driver.obj, 'agent_type') and + agent_type == driver.obj.agent_type): + self._agent_type_to_mech_driver[agent_type] = driver.obj + break + return self._agent_type_to_mech_driver[agent_type] diff --git a/neutron/tests/unit/plugins/ml2/drivers/mechanism_test.py b/neutron/tests/unit/plugins/ml2/drivers/mechanism_test.py index ac632b665ce..3e6ebe2ae0b 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/mechanism_test.py +++ b/neutron/tests/unit/plugins/ml2/drivers/mechanism_test.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +import uuid + from neutron_lib.api.definitions import portbindings from neutron_lib import constants as const from neutron_lib.plugins.ml2 import api @@ -243,3 +245,18 @@ class TestMechanismDriver(api.MechanismDriver): def filter_hosts_with_segment_access( self, context, segments, candidate_hosts, agent_getter): return set() + + @property + def resource_provider_uuid5_namespace(self): + return uuid.UUID('7f0ce65c-1f13-11e9-8921-3c6aa7b21d17') + + @property + def agent_type(self): + return 'test_mechanism_driver_agent' + + @property + def supported_vnic_types(self): + return ('test_mechanism_driver_vnic_type',) + + def get_standard_device_mappings(self, agent): + return {} diff --git a/neutron/tests/unit/services/placement_report/__init__.py b/neutron/tests/unit/services/placement_report/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/tests/unit/services/placement_report/test_plugin.py b/neutron/tests/unit/services/placement_report/test_plugin.py new file mode 100644 index 00000000000..2eae9445d8a --- /dev/null +++ b/neutron/tests/unit/services/placement_report/test_plugin.py @@ -0,0 +1,198 @@ +# Copyright 2019 Ericsson +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from neutron_lib.agent import constants as agent_const +from oslo_log import log as logging + +from neutron.services.placement_report import plugin +from neutron.tests.unit.plugins.ml2.drivers import mechanism_test +from neutron.tests.unit.plugins.ml2 import test_plugin + +LOG = logging.getLogger(__name__) + + +class PlacementReportPluginTestCases(test_plugin.Ml2PluginV2TestCase): + + def setUp(self): + super(PlacementReportPluginTestCases, self).setUp() + self.service_plugin = plugin.PlacementReportPlugin() + + def test__get_rp_by_name_found(self): + with mock.patch.object( + self.service_plugin._placement_client, + 'list_resource_providers', + return_value={'resource_providers': ['fake_rp']}): + rp = self.service_plugin._get_rp_by_name('whatever') + self.assertEqual('fake_rp', rp) + + def test__get_rp_by_name_not_found(self): + with mock.patch.object( + self.service_plugin._placement_client, + 'list_resource_providers', + return_value={'resource_providers': []}), \ + mock.patch.object(plugin.LOG, 'warning') as log_mock: + rp = self.service_plugin._get_rp_by_name('whatever') + self.assertEqual(1, log_mock.call_count) + self.assertEqual({'uuid': None}, rp) + + def test_no_sync_for_unsupported_agent_type(self): + payload = mock.Mock( + # looking all good, but agent type not supported + desired_state={ + 'agent_type': 'unsupported agent type', + 'configurations': {'resource_provider_bandwidths': {}}, + 'host': 'fake host', + }) + + with mock.patch.object(self.service_plugin._core_plugin, + '_get_agent_by_type_and_host') as mock_get_agent, \ + mock.patch.object(self.service_plugin, + '_sync_placement_state') as mock_sync: + + self.service_plugin.handle_placement_config( + mock.ANY, mock.ANY, mock.ANY, payload) + + mock_get_agent.assert_not_called() + mock_sync.assert_not_called() + + def test_no_sync_without_resource_info(self): + payload = mock.Mock( + # looking all good, but 'configurations' has no + # 'resource_provider_bandwidths' + desired_state={ + 'agent_type': 'test_mechanism_driver_agent', + 'configurations': {}, + 'host': 'fake host', + }) + + with mock.patch.object(self.service_plugin._core_plugin, + '_get_agent_by_type_and_host') as mock_get_agent, \ + mock.patch.object(self.service_plugin, + '_sync_placement_state') as mock_sync: + + self.service_plugin.handle_placement_config( + mock.ANY, mock.ANY, mock.ANY, payload) + + mock_get_agent.assert_not_called() + mock_sync.assert_not_called() + + def test_sync_if_agent_is_new(self): + payload = mock.Mock( + desired_state={ + 'agent_type': 'test_mechanism_driver_agent', + 'configurations': {'resource_provider_bandwidths': {}}, + 'host': 'fake host', + }, + metadata={ + 'status': agent_const.AGENT_NEW, + }, + ) + + with mock.patch.object(self.service_plugin._core_plugin, + '_get_agent_by_type_and_host') as mock_get_agent, \ + mock.patch.object(self.service_plugin, + '_sync_placement_state') as mock_sync: + + self.service_plugin.handle_placement_config( + mock.ANY, mock.ANY, mock.ANY, payload) + + self.assertEqual(1, mock_get_agent.call_count) + self.assertEqual(1, mock_sync.call_count) + + def test_sync_if_agent_is_restarted(self): + payload = mock.Mock( + desired_state={ + 'agent_type': 'test_mechanism_driver_agent', + 'configurations': {'resource_provider_bandwidths': {}}, + 'host': 'fake host', + 'start_flag': True, + }, + ) + + with mock.patch.object(self.service_plugin._core_plugin, + '_get_agent_by_type_and_host') as mock_get_agent, \ + mock.patch.object(self.service_plugin, + '_sync_placement_state') as mock_sync: + + self.service_plugin.handle_placement_config( + mock.ANY, mock.ANY, mock.ANY, payload) + + self.assertEqual(1, mock_get_agent.call_count) + self.assertEqual(1, mock_sync.call_count) + + def test_sync_after_transient_error(self): + payload = mock.Mock( + desired_state={ + 'agent_type': 'test_mechanism_driver_agent', + 'configurations': {'resource_provider_bandwidths': {}}, + 'host': 'fake host', + }, + ) + + with mock.patch.object(self.service_plugin._core_plugin, + '_get_agent_by_type_and_host', + return_value={'resources_synced': False}) as mock_get_agent, \ + mock.patch.object(self.service_plugin, + '_sync_placement_state') as mock_sync: + + self.service_plugin.handle_placement_config( + mock.ANY, mock.ANY, mock.ANY, payload) + + self.assertEqual(1, mock_get_agent.call_count) + self.assertEqual(1, mock_sync.call_count) + + def test__sync_placement_state(self): + agent = { + 'agent_type': 'test_mechanism_driver_agent', + 'configurations': { + 'resource_provider_bandwidths': {}, + 'resource_provider_inventory_defaults': {}, + }, + 'host': 'fake host', + } + agent_db = mock.Mock() + + with mock.patch.object(self.service_plugin._batch_notifier, + 'queue_event') as mock_queue_event, \ + mock.patch.object(self.service_plugin._placement_client, + 'list_resource_providers', + return_value={'resource_providers': [{'uuid': 'fake uuid'}]}): + + self.service_plugin._sync_placement_state(agent, agent_db) + + self.assertEqual(1, mock_queue_event.call_count) + + +class PlacementReporterAgentsTestCases(test_plugin.Ml2PluginV2TestCase): + + def test_supported_agent_types(self): + self.agents = plugin.PlacementReporterAgents(ml2_plugin=self.plugin) + self.assertEqual( + ['test_mechanism_driver_agent'], + self.agents.supported_agent_types) + + def test_mechanism_driver_by_agent_type_found(self): + self.agents = plugin.PlacementReporterAgents(ml2_plugin=self.plugin) + mech_driver = self.agents.mechanism_driver_by_agent_type( + 'test_mechanism_driver_agent') + self.assertTrue(mech_driver, mechanism_test.TestMechanismDriver) + + def test_mechanism_driver_by_agent_type_not_found(self): + self.agents = plugin.PlacementReporterAgents(ml2_plugin=self.plugin) + self.assertRaises( + Exception, # noqa + self.agents.mechanism_driver_by_agent_type, + 'agent_not_belonging_to_any_mechanism_driver') diff --git a/setup.cfg b/setup.cfg index 7f3da2a1e3d..46118d8b4cd 100644 --- a/setup.cfg +++ b/setup.cfg @@ -74,6 +74,7 @@ neutron.service_plugins = loki = neutron.services.loki.loki_plugin:LokiPlugin log = neutron.services.logapi.logging_plugin:LoggingPlugin port_forwarding = neutron.services.portforwarding.pf_plugin:PortForwardingPlugin + placement = neutron.services.placement_report.plugin:PlacementReportPlugin neutron.ml2.type_drivers = flat = neutron.plugins.ml2.drivers.type_flat:FlatTypeDriver local = neutron.plugins.ml2.drivers.type_local:LocalTypeDriver