Placement reporting service plugin

This service plugin synchronizes ML2 mechanism driver agents' resource
information to Placement. To use this service an agent must add
'resource_provider_bandwidths' to the 'configurations' field of its
RPC heartbeat. It also may add 'resource_provider_inventory_defaults'
to fine tune Placement inventory parameters. Again to use this service a
mechanism driver must implement get_standrd_device_mappings() and allocate
a UUID as mechanism driver property 'resource_provider_uuid5_namespace'.

The synchronization is triggered by:
* any new agent object in the DB
* restart of an agent (via 'start_flag' in the RPC heartbeat)
* if an agent's 'resources_synced' attribute is not True (None/False)

The latter should autoheal transient errors of the synchronization
process. That is if a sync attemp fails then we store
resources_synced=False which triggers a sync retry at each new heartbeat
message until a sync attempt finally succeeds and we can set
resources_synced=True.

Since this code functionally depends on ML2 we can also consider making
it part of ML2, but at the moment it is a service plugin for better
decoupling. Even if you load the service plugin the logic gracefully
degrades for heartbeat messages not containing resource provider info.

If needed the sync can be forced in multiple ways. First, if you restart
an agent then the RPs belonging to that agent will be re-synced. You may
also delete the agent by 'openstack network agent delete' and let the
next heartbeat message re-create the agent object. On re-creation the
RPs belonging to that agent will be re-synced. On the other hand a
neutron-server restart does not trigger a re-sync in any way. Depending
on the trade-off between the admin's needs to force re-syncs and the
performance of (not absolutely necessary) Placement updates re-sync
conditions may be further fine tuned.

Example config for neutron-server:

neutron.conf:
[DEFAULT]
service_plugins = placement

Change-Id: Ia1ff6f7559ab77913ddb9c3b134420a401b8eb43
Co-Authored-By: Lajos Katona <lajos.katona@ericsson.com>
Depends-On: https://review.openstack.org/586567
Partial-Bug: #1578989
See-Also: https://review.openstack.org/502306 (nova spec)
See-Also: https://review.openstack.org/508149 (neutron spec)
This commit is contained in:
Bence Romsics 2018-07-06 16:02:15 +02:00
parent 4ab00d3471
commit 9e8e987e6c
6 changed files with 443 additions and 0 deletions

View File

@ -0,0 +1,227 @@
# Copyright 2018 Ericsson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from keystoneauth1 import exceptions as ks_exc
from neutron_lib.agent import constants as agent_const
from neutron_lib.api.definitions import agent_resources_synced
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib.placement import client as place_client
from neutron_lib.plugins import directory
from neutron_lib.services import base as service_base
from oslo_config import cfg
from oslo_log import log as logging
from neutron.agent.common import placement_report
from neutron.notifiers import batch_notifier
LOG = logging.getLogger(__name__)
PLUGIN_TYPE = "placement_report"
@registry.has_registry_receivers
class PlacementReportPlugin(service_base.ServicePluginBase):
supported_extension_aliases = []
# A service plugin without claiming support for filter validation would
# disable filter validation for all other plugins, so we report support
# although this plugin doesn't have filters.
__filter_validation_support = True
@classmethod
def get_plugin_type(cls):
return PLUGIN_TYPE
def get_plugin_description(self):
return "Sync placement info from agent to server to placement."
def __init__(self):
self._core_plugin = directory.get_plugin()
# NOTE(bence romsics): The following bug and fix may be relevant here.
# https://bugs.launchpad.net/nova/+bug/1697825
# https://review.openstack.org/493536
self._placement_client = place_client.PlacementAPIClient(cfg.CONF)
self._agents = PlacementReporterAgents(self._core_plugin)
self._batch_notifier = batch_notifier.BatchNotifier(
cfg.CONF.send_events_interval, self._execute_deferred)
def _execute_deferred(self, deferred_batch):
for deferred in deferred_batch:
deferred()
def _get_rp_by_name(self, name):
rps = self._placement_client.list_resource_providers(
name=name)['resource_providers']
# RP names are unique, therefore we can get 0 or 1. But not many.
if len(rps) != 1:
# NOTE(bence romsics): While we could raise() here and by detect
# an error a bit earlier, we want the error to surface in the
# sync batch below so it is going to be properly caught and is
# going to influence the agent's resources_synced attribute.
LOG.warning(
'placement client: no such resource provider: %s', name)
return {'uuid': None}
return rps[0]
def _sync_placement_state(self, agent, agent_db):
configurations = agent['configurations']
mech_driver = self._agents.mechanism_driver_by_agent_type(
agent['agent_type'])
uuid_ns = mech_driver.resource_provider_uuid5_namespace
supported_vnic_types = mech_driver.supported_vnic_types
device_mappings = mech_driver.get_standard_device_mappings(agent)
try:
agent_host_rp_uuid = self._get_rp_by_name(
name=agent['host'])['uuid']
except ks_exc.HttpError:
# Delay the error for the same reason as in _get_rp_by_name().
agent_host_rp_uuid = None
state = placement_report.PlacementState(
rp_bandwidths=configurations[
'resource_provider_bandwidths'],
rp_inventory_defaults=configurations[
'resource_provider_inventory_defaults'],
driver_uuid_namespace=uuid_ns,
agent_type=agent['agent_type'],
agent_host=agent['host'],
agent_host_rp_uuid=agent_host_rp_uuid,
device_mappings=device_mappings,
supported_vnic_types=supported_vnic_types,
client=self._placement_client)
deferred_batch = state.deferred_sync()
# NOTE(bence romsics): Some client calls depend on earlier
# ones, but not all. There are calls in a batch that can succeed
# independently of earlier calls. Therefore even if a call fails
# we have to suppress its failure so the later independent calls
# have a chance to succeed. If we queue up the deferred client
# calls one by one then we cannot handle errors at the end of
# a batch. So instead we should wrap the deferred client calls
# in a single deferred batch which executes the client calls,
# continuing to the next client call even if there was an error
# but remembering if an error happened. Then at the end of the
# batch (also having access to the agent object) set the agent's
# resources_synced attribute according to the success/failure
# of the batch. Since each client call does monkey patched I/O
# we'll yield to other eventlet threads in each call therefore
# the performance should not be affected by the wrapping.
def batch():
errors = False
for deferred in deferred_batch:
try:
LOG.debug('placement client: {}'.format(deferred))
deferred.execute()
except Exception:
errors = True
LOG.exception(
'placement client call failed: %s',
str(deferred))
resources_synced = not errors
agent_db.resources_synced = resources_synced
agent_db.update()
LOG.debug(
'Synchronization of resources'
' of agent type %(type)s'
' at host %(host)s'
' to placement %(result)s.',
{'type': agent['agent_type'],
'host': agent['host'],
'result': 'succeeded' if resources_synced else 'failed'})
self._batch_notifier.queue_event(batch)
@registry.receives(resources.AGENT,
[events.AFTER_CREATE, events.AFTER_UPDATE])
def handle_placement_config(self, resource, event, trigger, payload):
# NOTE(bence romsics): This method gets called a lot, keep it quick.
agent = payload.desired_state
status = payload.metadata.get('status')
context = payload.context
if agent['agent_type'] not in self._agents.supported_agent_types:
return
if 'resource_provider_bandwidths' not in agent['configurations']:
LOG.warning(
"The mechanism driver claims agent type supports "
"placement reports, but the agent does not report "
"'resoure_provider_bandwidths' in its configurations. "
"host: %(host)s, type: %(type)s",
{'host': agent['agent_type'],
'type': agent['host']})
return
# We need to get the same agent as in
# neutron.db.agents_db.AgentDbMixin.create_or_update_agent()
agent_db = self._core_plugin._get_agent_by_type_and_host(
context, agent['agent_type'], agent['host'])
# sync the state known by us to placement
if (
# agent object in API (re-)created
status == agent_const.AGENT_NEW or
# agent (re-)started (even without config change)
'start_flag' in agent or
# never tried to sync yet or last sync failed
not agent_db[agent_resources_synced.RESOURCES_SYNCED]):
LOG.debug(
'placement: syncing state for agent type %s on host %s',
agent['agent_type'], agent['host'])
self._sync_placement_state(agent, agent_db)
else:
LOG.debug(
'placement: nothing to sync for agent type %s on host %s',
agent['agent_type'], agent['host'])
class PlacementReporterAgents(object):
# Yep, this is meant to depend on ML2.
def __init__(self, ml2_plugin):
self._mechanism_drivers = ml2_plugin.mechanism_manager.\
ordered_mech_drivers
self._supported_agent_types = []
self._agent_type_to_mech_driver = {}
@property
def supported_agent_types(self):
if not self._supported_agent_types:
# NOTE(bence romsics): We treat the presence of the
# RP uuid namespace a proxy for supporting placement reports from
# the driver's agent type. But we could introduce a property/logic
# explicitly describing the agent types supporting placement
# reports any time if this proved to be insufficient.
self._supported_agent_types = [
driver.obj.agent_type
for driver in self._mechanism_drivers
if driver.obj.resource_provider_uuid5_namespace is not None]
LOG.debug('agent types supporting placement reports: %s',
', '.join(self._supported_agent_types))
return self._supported_agent_types
def mechanism_driver_by_agent_type(self, agent_type):
if agent_type not in self._agent_type_to_mech_driver:
for driver in self._mechanism_drivers:
if (hasattr(driver.obj, 'agent_type') and
agent_type == driver.obj.agent_type):
self._agent_type_to_mech_driver[agent_type] = driver.obj
break
return self._agent_type_to_mech_driver[agent_type]

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from neutron_lib.api.definitions import portbindings
from neutron_lib import constants as const
from neutron_lib.plugins.ml2 import api
@ -243,3 +245,18 @@ class TestMechanismDriver(api.MechanismDriver):
def filter_hosts_with_segment_access(
self, context, segments, candidate_hosts, agent_getter):
return set()
@property
def resource_provider_uuid5_namespace(self):
return uuid.UUID('7f0ce65c-1f13-11e9-8921-3c6aa7b21d17')
@property
def agent_type(self):
return 'test_mechanism_driver_agent'
@property
def supported_vnic_types(self):
return ('test_mechanism_driver_vnic_type',)
def get_standard_device_mappings(self, agent):
return {}

View File

@ -0,0 +1,198 @@
# Copyright 2019 Ericsson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from neutron_lib.agent import constants as agent_const
from oslo_log import log as logging
from neutron.services.placement_report import plugin
from neutron.tests.unit.plugins.ml2.drivers import mechanism_test
from neutron.tests.unit.plugins.ml2 import test_plugin
LOG = logging.getLogger(__name__)
class PlacementReportPluginTestCases(test_plugin.Ml2PluginV2TestCase):
def setUp(self):
super(PlacementReportPluginTestCases, self).setUp()
self.service_plugin = plugin.PlacementReportPlugin()
def test__get_rp_by_name_found(self):
with mock.patch.object(
self.service_plugin._placement_client,
'list_resource_providers',
return_value={'resource_providers': ['fake_rp']}):
rp = self.service_plugin._get_rp_by_name('whatever')
self.assertEqual('fake_rp', rp)
def test__get_rp_by_name_not_found(self):
with mock.patch.object(
self.service_plugin._placement_client,
'list_resource_providers',
return_value={'resource_providers': []}), \
mock.patch.object(plugin.LOG, 'warning') as log_mock:
rp = self.service_plugin._get_rp_by_name('whatever')
self.assertEqual(1, log_mock.call_count)
self.assertEqual({'uuid': None}, rp)
def test_no_sync_for_unsupported_agent_type(self):
payload = mock.Mock(
# looking all good, but agent type not supported
desired_state={
'agent_type': 'unsupported agent type',
'configurations': {'resource_provider_bandwidths': {}},
'host': 'fake host',
})
with mock.patch.object(self.service_plugin._core_plugin,
'_get_agent_by_type_and_host') as mock_get_agent, \
mock.patch.object(self.service_plugin,
'_sync_placement_state') as mock_sync:
self.service_plugin.handle_placement_config(
mock.ANY, mock.ANY, mock.ANY, payload)
mock_get_agent.assert_not_called()
mock_sync.assert_not_called()
def test_no_sync_without_resource_info(self):
payload = mock.Mock(
# looking all good, but 'configurations' has no
# 'resource_provider_bandwidths'
desired_state={
'agent_type': 'test_mechanism_driver_agent',
'configurations': {},
'host': 'fake host',
})
with mock.patch.object(self.service_plugin._core_plugin,
'_get_agent_by_type_and_host') as mock_get_agent, \
mock.patch.object(self.service_plugin,
'_sync_placement_state') as mock_sync:
self.service_plugin.handle_placement_config(
mock.ANY, mock.ANY, mock.ANY, payload)
mock_get_agent.assert_not_called()
mock_sync.assert_not_called()
def test_sync_if_agent_is_new(self):
payload = mock.Mock(
desired_state={
'agent_type': 'test_mechanism_driver_agent',
'configurations': {'resource_provider_bandwidths': {}},
'host': 'fake host',
},
metadata={
'status': agent_const.AGENT_NEW,
},
)
with mock.patch.object(self.service_plugin._core_plugin,
'_get_agent_by_type_and_host') as mock_get_agent, \
mock.patch.object(self.service_plugin,
'_sync_placement_state') as mock_sync:
self.service_plugin.handle_placement_config(
mock.ANY, mock.ANY, mock.ANY, payload)
self.assertEqual(1, mock_get_agent.call_count)
self.assertEqual(1, mock_sync.call_count)
def test_sync_if_agent_is_restarted(self):
payload = mock.Mock(
desired_state={
'agent_type': 'test_mechanism_driver_agent',
'configurations': {'resource_provider_bandwidths': {}},
'host': 'fake host',
'start_flag': True,
},
)
with mock.patch.object(self.service_plugin._core_plugin,
'_get_agent_by_type_and_host') as mock_get_agent, \
mock.patch.object(self.service_plugin,
'_sync_placement_state') as mock_sync:
self.service_plugin.handle_placement_config(
mock.ANY, mock.ANY, mock.ANY, payload)
self.assertEqual(1, mock_get_agent.call_count)
self.assertEqual(1, mock_sync.call_count)
def test_sync_after_transient_error(self):
payload = mock.Mock(
desired_state={
'agent_type': 'test_mechanism_driver_agent',
'configurations': {'resource_provider_bandwidths': {}},
'host': 'fake host',
},
)
with mock.patch.object(self.service_plugin._core_plugin,
'_get_agent_by_type_and_host',
return_value={'resources_synced': False}) as mock_get_agent, \
mock.patch.object(self.service_plugin,
'_sync_placement_state') as mock_sync:
self.service_plugin.handle_placement_config(
mock.ANY, mock.ANY, mock.ANY, payload)
self.assertEqual(1, mock_get_agent.call_count)
self.assertEqual(1, mock_sync.call_count)
def test__sync_placement_state(self):
agent = {
'agent_type': 'test_mechanism_driver_agent',
'configurations': {
'resource_provider_bandwidths': {},
'resource_provider_inventory_defaults': {},
},
'host': 'fake host',
}
agent_db = mock.Mock()
with mock.patch.object(self.service_plugin._batch_notifier,
'queue_event') as mock_queue_event, \
mock.patch.object(self.service_plugin._placement_client,
'list_resource_providers',
return_value={'resource_providers': [{'uuid': 'fake uuid'}]}):
self.service_plugin._sync_placement_state(agent, agent_db)
self.assertEqual(1, mock_queue_event.call_count)
class PlacementReporterAgentsTestCases(test_plugin.Ml2PluginV2TestCase):
def test_supported_agent_types(self):
self.agents = plugin.PlacementReporterAgents(ml2_plugin=self.plugin)
self.assertEqual(
['test_mechanism_driver_agent'],
self.agents.supported_agent_types)
def test_mechanism_driver_by_agent_type_found(self):
self.agents = plugin.PlacementReporterAgents(ml2_plugin=self.plugin)
mech_driver = self.agents.mechanism_driver_by_agent_type(
'test_mechanism_driver_agent')
self.assertTrue(mech_driver, mechanism_test.TestMechanismDriver)
def test_mechanism_driver_by_agent_type_not_found(self):
self.agents = plugin.PlacementReporterAgents(ml2_plugin=self.plugin)
self.assertRaises(
Exception, # noqa
self.agents.mechanism_driver_by_agent_type,
'agent_not_belonging_to_any_mechanism_driver')

View File

@ -74,6 +74,7 @@ neutron.service_plugins =
loki = neutron.services.loki.loki_plugin:LokiPlugin
log = neutron.services.logapi.logging_plugin:LoggingPlugin
port_forwarding = neutron.services.portforwarding.pf_plugin:PortForwardingPlugin
placement = neutron.services.placement_report.plugin:PlacementReportPlugin
neutron.ml2.type_drivers =
flat = neutron.plugins.ml2.drivers.type_flat:FlatTypeDriver
local = neutron.plugins.ml2.drivers.type_local:LocalTypeDriver