Merge "Update RT aggregate map less frequently"
This commit is contained in:
commit
49ffcf3a8d
|
@ -41,6 +41,8 @@ _RE_INV_IN_USE = re.compile("Inventory for (.+) on resource provider "
|
|||
"(.+) in use")
|
||||
WARN_EVERY = 10
|
||||
PLACEMENT_CLIENT_SEMAPHORE = 'placement_client'
|
||||
# Number of seconds between attempts to update the aggregate map
|
||||
AGGREGATE_REFRESH = 300
|
||||
|
||||
|
||||
def warn_limit(self, msg):
|
||||
|
@ -239,6 +241,8 @@ class SchedulerReportClient(object):
|
|||
# A dict, keyed by resource provider UUID, of sets of aggregate UUIDs
|
||||
# the provider is associated with
|
||||
self._provider_aggregate_map = {}
|
||||
# Track the last time we updated the aggregate map.
|
||||
self.aggregate_refresh_time = {}
|
||||
self._client = self._create_client()
|
||||
# NOTE(danms): Keep track of how naggy we've been
|
||||
self._warn_count = 0
|
||||
|
@ -253,6 +257,7 @@ class SchedulerReportClient(object):
|
|||
# clean slate.
|
||||
self._resource_providers = {}
|
||||
self._provider_aggregate_map = {}
|
||||
self.aggregate_refresh_time = {}
|
||||
auth_plugin = keystone.load_auth_from_conf_options(
|
||||
CONF, 'placement')
|
||||
return keystone.load_session_from_conf_options(
|
||||
|
@ -502,17 +507,7 @@ class SchedulerReportClient(object):
|
|||
value
|
||||
"""
|
||||
if uuid in self._resource_providers:
|
||||
# NOTE(jaypipes): This isn't optimal to check if aggregate
|
||||
# associations have changed each time we call
|
||||
# _ensure_resource_provider() and get a hit on the local cache of
|
||||
# provider objects, however the alternative is to force operators
|
||||
# to restart all their nova-compute workers every time they add or
|
||||
# change an aggregate. We might optionally want to add some sort of
|
||||
# cache refresh delay or interval as an optimization?
|
||||
msg = "Refreshing aggregate associations for resource provider %s"
|
||||
LOG.debug(msg, uuid)
|
||||
aggs = self._get_provider_aggregates(uuid)
|
||||
self._provider_aggregate_map[uuid] = aggs
|
||||
self._refresh_aggregate_map(uuid)
|
||||
return self._resource_providers[uuid]
|
||||
|
||||
rp = self._get_resource_provider(uuid)
|
||||
|
@ -521,11 +516,10 @@ class SchedulerReportClient(object):
|
|||
rp = self._create_resource_provider(uuid, name)
|
||||
if rp is None:
|
||||
return
|
||||
msg = "Grabbing aggregate associations for resource provider %s"
|
||||
LOG.debug(msg, uuid)
|
||||
aggs = self._get_provider_aggregates(uuid)
|
||||
self._resource_providers[uuid] = rp
|
||||
self._provider_aggregate_map[uuid] = aggs
|
||||
# If there had been no resource provider record, force refreshing
|
||||
# the aggregate map.
|
||||
self._refresh_aggregate_map(uuid, force=True)
|
||||
return rp
|
||||
|
||||
def _get_inventory(self, rp_uuid):
|
||||
|
@ -557,6 +551,36 @@ class SchedulerReportClient(object):
|
|||
my_rp['generation'] = server_gen
|
||||
return curr
|
||||
|
||||
def _refresh_aggregate_map(self, rp_uuid, force=False):
|
||||
"""Refresh the aggregate map for the provided resource provider uuid.
|
||||
|
||||
Only refresh if there has been no refresh during the lifetime of
|
||||
this process, AGGREGATE_REFRESH seconds have passed, or the force arg
|
||||
has been set to True.
|
||||
|
||||
:param uuid: UUID of the resource provider to check for fresh
|
||||
aggregates
|
||||
:param force: If True, force the refresh
|
||||
"""
|
||||
if force or self._aggregate_map_stale(rp_uuid):
|
||||
aggs = self._get_provider_aggregates(rp_uuid)
|
||||
|
||||
msg = ("Refreshing aggregate associations for resource "
|
||||
"provider %s, aggregates: %s")
|
||||
LOG.debug(msg, rp_uuid, ','.join(aggs or ['None']))
|
||||
|
||||
self._provider_aggregate_map[rp_uuid] = aggs
|
||||
self.aggregate_refresh_time[rp_uuid] = time.time()
|
||||
|
||||
def _aggregate_map_stale(self, uuid):
|
||||
"""Respond True if the _provider_aggregate_map is old.
|
||||
|
||||
It is old if aggregate_refresh_time for this uuid is not set
|
||||
or more than AGGREGATE_REFRESH seconds ago.
|
||||
"""
|
||||
refresh_time = self.aggregate_refresh_time.get(uuid, 0)
|
||||
return (time.time() - refresh_time) > AGGREGATE_REFRESH
|
||||
|
||||
def _update_inventory_attempt(self, rp_uuid, inv_data):
|
||||
"""Update the inventory for this resource provider if needed.
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
# under the License.
|
||||
|
||||
import copy
|
||||
import time
|
||||
|
||||
from keystoneauth1 import exceptions as ks_exc
|
||||
import mock
|
||||
|
@ -1368,6 +1369,64 @@ class TestAggregates(SchedulerReportClientTestCase):
|
|||
log_mock.call_args[0][1]['placement_req_id'])
|
||||
self.assertIsNone(result)
|
||||
|
||||
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
|
||||
'_get_provider_aggregates')
|
||||
def test_refresh_aggregate_map_no_last(self, mock_get):
|
||||
"""Test that refresh aggregate map is called when the map is
|
||||
stale.
|
||||
"""
|
||||
uuid = uuids.compute_node
|
||||
self.client._refresh_aggregate_map(uuid)
|
||||
mock_get.assert_called_once_with(uuid)
|
||||
self.assertIn(uuid, self.client.aggregate_refresh_time)
|
||||
|
||||
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
|
||||
'_get_provider_aggregates')
|
||||
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
|
||||
'_aggregate_map_stale')
|
||||
def test_refresh_aggregate_map_not_stale(self, mock_stale, mock_get):
|
||||
"""Test that refresh aggregate map is not called when the map is
|
||||
not stale.
|
||||
"""
|
||||
mock_stale.return_value = False
|
||||
uuid = uuids.compute_node
|
||||
self.client._refresh_aggregate_map(uuid)
|
||||
mock_get.assert_not_called()
|
||||
self.assertFalse(self.client.aggregate_refresh_time)
|
||||
|
||||
@mock.patch.object(report.LOG, 'debug')
|
||||
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
|
||||
'_get_provider_aggregates')
|
||||
def test_refresh_aggregate_map_time(self, mock_get, log_mock):
|
||||
"""Test that refresh aggregate map is called when the map is
|
||||
stale.
|
||||
"""
|
||||
uuid = uuids.compute_node
|
||||
mock_get.return_value = set([])
|
||||
|
||||
# Called a first time because aggregate_refresh_time is empty.
|
||||
now = time.time()
|
||||
self.client._refresh_aggregate_map(uuid)
|
||||
mock_get.assert_called_once_with(uuid)
|
||||
log_mock.assert_called_once_with(
|
||||
'Refreshing aggregate associations for resource '
|
||||
'provider %s, aggregates: %s', uuid, 'None')
|
||||
self.assertIn(uuid, self.client.aggregate_refresh_time)
|
||||
|
||||
# Clear call count.
|
||||
mock_get.reset_mock()
|
||||
|
||||
with mock.patch('time.time') as mock_future:
|
||||
# Not called a second time because not enough time has passed.
|
||||
mock_future.return_value = now + report.AGGREGATE_REFRESH / 2
|
||||
self.client._refresh_aggregate_map(uuid)
|
||||
mock_get.assert_not_called()
|
||||
|
||||
# Called because time has passed.
|
||||
mock_future.return_value = now + report.AGGREGATE_REFRESH + 1
|
||||
self.client._refresh_aggregate_map(uuid)
|
||||
mock_get.assert_called_once_with(uuid)
|
||||
|
||||
|
||||
class TestComputeNodeToInventoryDict(test.NoDBTestCase):
|
||||
def test_compute_node_inventory(self):
|
||||
|
|
Loading…
Reference in New Issue