Create a PeriodicWorker for DbQuotaNoLockDriver clean up

The "DbQuotaNoLockDriver" quota driver "Reservation" registers clean up
is done now in a "PeriodicWorker" spawned by ML2Plugin during the
initialization. The "Reservation" registers are no longer deleted
synchronously during the API calls.

That will prevent from possible database deadlocks when concurrent
delete operations clash (as seen in very busy systems, with more
then 500 parallel requests). Although those database deadlocks were
recoverable, this new implementation will avoid this by allowing
onle single thread to execute this command periodically.

Related-Bug: #1954662
Change-Id: I50bab57830ce4c1d123b2cbd9d9832690bd4c8f9
This commit is contained in:
Rodolfo Alonso Hernandez 2022-01-16 16:32:18 +00:00 committed by Rodolfo Alonso
parent 24c802711a
commit 6ea6fdd874
5 changed files with 47 additions and 181 deletions

View File

@ -12,11 +12,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import collections
import datetime
from neutron_lib.db import api as db_api
from neutron_lib.db import quota_api as nlib_quota_api
from oslo_db import exception as db_exc
from neutron.common import utils
@ -251,162 +251,7 @@ def remove_expired_reservations(context, project_id=None, timeout=None):
project_id)
class QuotaDriverAPI(object, metaclass=abc.ABCMeta):
@staticmethod
@abc.abstractmethod
def get_default_quotas(context, resources, project_id):
"""Given a list of resources, retrieve the default quotas set for
a project.
:param context: The request context, for access checks.
:param resources: A dictionary of the registered resource keys.
:param project_id: The ID of the project to return default quotas for.
:return: dict from resource name to dict of name and limit
"""
@staticmethod
@abc.abstractmethod
def get_project_quotas(context, resources, project_id):
"""Retrieve the quotas for the given list of resources and project
:param context: The request context, for access checks.
:param resources: A dictionary of the registered resource keys.
:param project_id: The ID of the project to return quotas for.
:return: dict from resource name to dict of name and limit
"""
@staticmethod
@abc.abstractmethod
def get_detailed_project_quotas(context, resources, project_id):
"""Retrieve detailed quotas for the given list of resources and project
:param context: The request context, for access checks.
:param resources: A dictionary of the registered resource keys.
:param project_id: The ID of the project to return quotas for.
:return dict: mapping resource name in dict to its corresponding limit
used and reserved. Reserved currently returns default
value of 0
"""
@staticmethod
@abc.abstractmethod
def delete_project_quota(context, project_id):
"""Delete the quota entries for a given project_id.
After deletion, this project will use default quota values in conf.
Raise a "not found" error if the quota for the given project was
never defined.
:param context: The request context, for access checks.
:param project_id: The ID of the project to return quotas for.
"""
@staticmethod
@abc.abstractmethod
def get_all_quotas(context, resources):
"""Given a list of resources, retrieve the quotas for the all tenants.
:param context: The request context, for access checks.
:param resources: A dictionary of the registered resource keys.
:return: quotas list of dict of project_id:, resourcekey1:
resourcekey2: ...
"""
@staticmethod
@abc.abstractmethod
def update_quota_limit(context, project_id, resource, limit):
"""Update the quota limit for a resource in a project
:param context: The request context, for access checks.
:param project_id: The ID of the project to update the quota.
:param resource: the resource to update the quota.
:param limit: new resource quota limit.
"""
@staticmethod
@abc.abstractmethod
def make_reservation(context, project_id, resources, deltas, plugin):
"""Make multiple resource reservations for a given project
:param context: The request context, for access checks.
:param resources: A dictionary of the registered resource keys.
:param project_id: The ID of the project to make the reservations for.
:return: ``ReservationInfo`` object.
"""
@staticmethod
@abc.abstractmethod
def commit_reservation(context, reservation_id):
"""Commit a reservation register
:param context: The request context, for access checks.
:param reservation_id: ID of the reservation register to commit.
"""
@staticmethod
@abc.abstractmethod
def cancel_reservation(context, reservation_id):
"""Cancel a reservation register
:param context: The request context, for access checks.
:param reservation_id: ID of the reservation register to cancel.
"""
@staticmethod
@abc.abstractmethod
def limit_check(context, project_id, resources, values):
"""Check simple quota limits.
For limits--those quotas for which there is no usage
synchronization function--this method checks that a set of
proposed values are permitted by the limit restriction.
If any of the proposed values is over the defined quota, an
OverQuota exception will be raised with the sorted list of the
resources which are too high. Otherwise, the method returns
nothing.
:param context: The request context, for access checks.
:param project_id: The ID of the project to make the reservations for.
:param resources: A dictionary of the registered resource.
:param values: A dictionary of the values to check against the
quota.
"""
@staticmethod
@abc.abstractmethod
def get_resource_usage(context, project_id, resources, resource_name):
"""Return the resource current usage
:param context: The request context, for access checks.
:param project_id: The ID of the project to make the reservations for.
:param resources: A dictionary of the registered resources.
:param resource_name: The name of the resource to retrieve the usage.
:return: The current resource usage.
"""
@staticmethod
@abc.abstractmethod
def quota_limit_check(context, project_id, resources, deltas):
"""Check the current resource usage against a set of deltas.
This method will check if the provided resource deltas could be
assigned depending on the current resource usage and the quota limits.
If the resource deltas plus the resource usage fit under the quota
limit, the method will pass. If not, a ``OverQuota`` will be raised.
:param context: The request context, for access checks.
:param project_id: The ID of the project to make the reservations for.
:param resources: A dictionary of the registered resource.
:param deltas: A dictionary of the values to check against the
quota limits.
:return: None if passed; ``OverQuota`` if quota limits are exceeded,
``InvalidQuotaValue`` if delta values are invalid.
"""
class NullQuotaDriver(QuotaDriverAPI):
class NullQuotaDriver(nlib_quota_api.QuotaDriverAPI):
@staticmethod
def get_default_quotas(context, resources, project_id):
@ -455,3 +300,7 @@ class NullQuotaDriver(QuotaDriverAPI):
@staticmethod
def quota_limit_check(context, project_id, resources, deltas):
pass
@staticmethod
def get_workers():
return []

View File

@ -15,6 +15,7 @@
from neutron_lib.api import attributes
from neutron_lib.db import api as db_api
from neutron_lib.db import quota_api as nlib_quota_api
from neutron_lib import exceptions
from neutron_lib.plugins import constants
from neutron_lib.plugins import directory
@ -27,7 +28,7 @@ from neutron.quota import resource as res
LOG = log.getLogger(__name__)
class DbQuotaDriver(quota_api.QuotaDriverAPI):
class DbQuotaDriver(nlib_quota_api.QuotaDriverAPI):
"""Driver to perform necessary checks to enforce quotas and obtain quota
information.
@ -342,3 +343,7 @@ class DbQuotaDriver(quota_api.QuotaDriverAPI):
if overs:
raise exceptions.OverQuota(overs=sorted(overs))
@staticmethod
def get_workers():
return []

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import context as n_context
from neutron_lib.db import api as db_api
from neutron_lib import exceptions
from oslo_db import exception as db_exc
@ -21,6 +22,7 @@ from oslo_log import log
from neutron.common import utils
from neutron.db.quota import api as quota_api
from neutron.db.quota import driver as quota_driver
from neutron import worker as neutron_worker
LOG = log.getLogger(__name__)
@ -42,26 +44,21 @@ class DbQuotaNoLockDriver(quota_driver.DbQuotaDriver):
method is to be fast enough to avoid the concurrency when counting the
resources while not blocking concurrent API operations.
"""
@staticmethod
@utils.skip_exceptions(db_exc.DBError)
def _remove_expired_reservations(self, context, project_id, timeout):
"""Remove expired reservations
def _remove_expired_reservations():
"""Remove expired reservations from all projects
Any DB exception will be catch and dismissed. This operation can have
been successfully executed by another concurrent request. There is no
need to fail or retry it.
"""
quota_api.remove_expired_reservations(context, project_id=project_id,
timeout=timeout)
context = n_context.get_admin_context()
quota_api.remove_expired_reservations(
context, timeout=quota_api.RESERVATION_EXPIRATION_TIMEOUT)
@db_api.retry_if_session_inactive()
def make_reservation(self, context, project_id, resources, deltas, plugin):
# Delete expired reservations before counting valid ones. This
# operation is fast and by calling it before making any
# reservation, we ensure the freshness of the reservations.
self._remove_expired_reservations(
context, project_id=project_id,
timeout=quota_api.RESERVATION_EXPIRATION_TIMEOUT)
resources_over_limit = []
with db_api.CONTEXT_WRITER.using(context):
# Filter out unlimited resources.
@ -94,3 +91,9 @@ class DbQuotaNoLockDriver(quota_driver.DbQuotaDriver):
return
return tracked_resource.count(context, None, project_id,
count_db_registers=True)
@staticmethod
def get_workers():
interval = quota_api.RESERVATION_EXPIRATION_TIMEOUT
method = DbQuotaNoLockDriver._remove_expired_reservations
return [neutron_worker.PeriodicWorker(method, interval, interval)]

View File

@ -115,7 +115,6 @@ from neutron.db import extradhcpopt_db
from neutron.db.models import securitygroup as sg_models
from neutron.db import models_v2
from neutron.db import provisioning_blocks
from neutron.db.quota import driver # noqa
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.db import segments_db
from neutron.db import subnet_service_type_mixin
@ -137,6 +136,7 @@ from neutron.plugins.ml2 import managers
from neutron.plugins.ml2 import models
from neutron.plugins.ml2 import ovo_rpc
from neutron.plugins.ml2 import rpc
from neutron import quota
from neutron.quota import resource_registry
from neutron.services.segments import plugin as segments_plugin
@ -279,6 +279,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
self.add_agent_status_check_worker(self.agent_health_check)
self.add_workers(self.mechanism_manager.get_workers())
self._verify_service_plugins_requirements()
self._quota_workers()
LOG.info("Modular L2 Plugin initialization complete")
def _setup_rpc(self):
@ -404,6 +405,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
fanout=False)
return self.conn_reports.consume_in_threads()
def _quota_workers(self):
workers = quota.QUOTAS.get_driver().get_workers()
if workers:
self.add_workers(workers)
def _filter_nets_provider(self, context, networks, filters):
return [network
for network in networks

View File

@ -29,6 +29,10 @@ class TestDbQuotaDriverNoLock(test_driver.TestDbQuotaDriver):
super(TestDbQuotaDriverNoLock, self).setUp()
self.quota_driver = driver_nolock.DbQuotaNoLockDriver()
@staticmethod
def _cleanup_timeout(previous_value):
quota_api.RESERVATION_EXPIRATION_TIMEOUT = previous_value
def test__remove_expired_reservations(self):
for project, resource in itertools.product(self.projects,
self.resources):
@ -46,14 +50,13 @@ class TestDbQuotaDriverNoLock(test_driver.TestDbQuotaDriver):
self.assertIn(delta.resource, self.resources)
# Delete the expired reservations and check.
for project in self.projects:
# NOTE(ralonsoh): the timeout is set to -121 to force the deletion
# of all created reservations, including those ones created in this
# test. The value of 121 overcomes the 120 seconds of default
# expiration time a reservation has.
time_delta = quota_api.RESERVATION_EXPIRATION_TIMEOUT + 1
self.quota_driver._remove_expired_reservations(
self.context, project, -time_delta)
res = quota_obj.Reservation.get_objects(self.context,
project_id=project)
self.assertEqual([], res)
# NOTE(ralonsoh): the timeout is set to -121 to force the deletion
# of all created reservations, including those ones created in this
# test. The value of 121 overcomes the 120 seconds of default
# expiration time a reservation has.
timeout = quota_api.RESERVATION_EXPIRATION_TIMEOUT
quota_api.RESERVATION_EXPIRATION_TIMEOUT = -(timeout + 1)
self.addCleanup(self._cleanup_timeout, timeout)
self.quota_driver._remove_expired_reservations()
res = quota_obj.Reservation.get_objects(self.context)
self.assertEqual([], res)