Merge "More conservative inventory updates"

This commit is contained in:
Jenkins 2016-09-13 20:46:24 +00:00 committed by Gerrit Code Review
commit 11ff0ded4e
2 changed files with 262 additions and 197 deletions

View File

@ -13,7 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import functools
import time
from keystoneauth1 import exceptions as ks_exc
from keystoneauth1 import loading as keystone
@ -24,9 +26,13 @@ from nova.compute import utils as compute_utils
import nova.conf
from nova.i18n import _LE, _LI, _LW
from nova import objects
from nova.objects import fields
CONF = nova.conf.CONF
LOG = logging.getLogger(__name__)
VCPU = fields.ResourceClass.VCPU
MEMORY_MB = fields.ResourceClass.MEMORY_MB
DISK_GB = fields.ResourceClass.DISK_GB
def safe_connect(f):
@ -233,34 +239,73 @@ class SchedulerReportClient(object):
'allocation_ratio': compute_node.disk_allocation_ratio,
},
}
generation = self._resource_providers[compute_node.uuid].generation
data = {
'resource_provider_generation': generation,
'inventories': inventories,
}
return data
@safe_connect
def _update_inventory(self, compute_node):
def _get_inventory(self, compute_node):
url = '/resource_providers/%s/inventories' % compute_node.uuid
result = self.get(url)
if not result:
return {'inventories': {}}
return result.json()
@staticmethod
def _compare_inventory(local, remote):
"""This is needed because the placement service is not restful
and does not take and return a consistent object.
"""
# Snip out the generation from each of the resource class dicts
trimmed = {}
for rclass, rdict in remote.items():
trimmed[rclass] = copy.copy(rdict)
trimmed[rclass].pop('resource_provider_generation')
return local == trimmed
def _update_inventory_attempt(self, compute_node):
"""Update the inventory for this compute node if needed.
:param compute_node: The objects.ComputeNode for the operation
:returns: True if the inventory was updated (or did not need to be),
False otherwise.
"""
url = '/resource_providers/%s/inventories' % compute_node.uuid
data = self._compute_node_inventory(compute_node)
curr = self._get_inventory(compute_node)
# Update our generation immediately, if possible. We always report
# VCPU inventory, so use that.
if curr.get('inventories'):
server_gen = (curr['inventories'][VCPU]
['resource_provider_generation'])
my_rp = self._resource_providers[compute_node.uuid]
if server_gen != my_rp.generation:
LOG.debug('Updating our resource provider generation '
'from %(old)i to %(new)i',
{'old': my_rp.generation,
'new': server_gen})
my_rp.generation = server_gen
# Check to see if we need to update placement's view
if self._compare_inventory(data['inventories'],
curr.get('inventories', {})):
return True
data['resource_provider_generation'] = (
self._resource_providers[compute_node.uuid].generation)
url = '/resource_providers/%s/inventories' % compute_node.uuid
result = self.put(url, data)
if result.status_code == 409:
# Generation fail, re-poll and then re-try
del self._resource_providers[compute_node.uuid]
self._ensure_resource_provider(
compute_node.uuid, compute_node.hypervisor_hostname)
LOG.info(_LI('Retrying update inventory for %s'),
LOG.info(_LI('Inventory update conflict for %s'),
compute_node.uuid)
# Regenerate the body with the new generation
data = self._compute_node_inventory(compute_node)
result = self.put(url, data)
# Invalidate our cache and re-fetch the resource provider
# to be sure to get the latest generation.
del self._resource_providers[compute_node.uuid]
self._ensure_resource_provider(compute_node.uuid,
compute_node.hypervisor_hostname)
return False
elif not result:
LOG.warning(_LW('Failed to update inventory for '
'%(uuid)s: %(status)i %(text)s'),
@ -269,27 +314,31 @@ class SchedulerReportClient(object):
'text': result.text})
return False
generation = data['resource_provider_generation']
if result.status_code == 200:
self._resource_providers[compute_node.uuid].generation = (
generation + 1)
LOG.debug('Updated inventory for %s at generation %i' % (
compute_node.uuid, generation))
return True
elif result.status_code == 409:
LOG.info(_LI('Double generation clash updating inventory '
'for %(uuid)s at generation %(gen)i'),
{'uuid': compute_node.uuid,
'gen': generation})
if result.status_code != 200:
LOG.info(
_LI('Received unexpected response code %(code)i while '
'trying to update inventory for compute node %(uuid)s'
': %(text)s'),
{'uuid': compute_node.uuid,
'code': result.status_code,
'text': result.text})
return False
LOG.info(_LI('Received unexpected response code %(code)i while '
'trying to update inventory for compute node %(uuid)s '
'at generation %(gen)i: %(text)s'),
{'uuid': compute_node.uuid,
'code': result.status_code,
'gen': generation,
'text': result.text})
# Update our view of the generation for next time
updated_inventories = result.json()['inventories']
new_gen = updated_inventories[VCPU]['resource_provider_generation']
self._resource_providers[compute_node.uuid].generation = new_gen
LOG.debug('Updated inventory for %s at generation %i' % (
compute_node.uuid, new_gen))
return True
@safe_connect
def _update_inventory(self, compute_node):
for attempt in (1, 2, 3):
if self._update_inventory_attempt(compute_node):
return True
time.sleep(1)
return False
def update_resource_stats(self, compute_node):
@ -311,9 +360,9 @@ class SchedulerReportClient(object):
instance.flavor.swap +
instance.flavor.ephemeral_gb)
return {
'MEMORY_MB': instance.flavor.memory_mb,
'VCPU': instance.flavor.vcpus,
'DISK_GB': disk,
MEMORY_MB: instance.flavor.memory_mb,
VCPU: instance.flavor.vcpus,
DISK_GB: disk,
}
@safe_connect

View File

@ -398,195 +398,211 @@ class SchedulerReportClientTestCase(test.NoDBTestCase):
},
}
expected = {
'resource_provider_generation': rp.generation,
'inventories': expected_inventories,
}
self.assertEqual(expected, result)
def test_update_inventory(self):
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'get')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'put')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_compute_node_inventory')
def test_update_inventory(self, mock_inv, mock_put, mock_get):
# Ensure _update_inventory() returns a list of Inventories objects
# after creating or updating the existing values
uuid = uuids.compute_node
name = 'computehost'
compute_node = objects.ComputeNode(uuid=uuid,
hypervisor_hostname=name,
vcpus=2,
cpu_allocation_ratio=16.0,
memory_mb=1024,
ram_allocation_ratio=1.5,
local_gb=10,
disk_allocation_ratio=1.0)
rp = objects.ResourceProvider(uuid=uuid, name=name, generation=42)
hypervisor_hostname='foo')
rp = objects.ResourceProvider(uuid=uuid, name='foo', generation=42)
# Make sure the ResourceProvider exists for preventing to call the API
self.client._resource_providers[uuid] = rp
expected_output = mock.sentinel.inventories
resp_mock = mock.Mock(status_code=200, json=lambda: expected_output)
self.ks_sess_mock.put.return_value = resp_mock
mock_inv.return_value = {'inventories': []}
mock_get.return_value.json.return_value = {
'inventories': {
'VCPU': {'resource_provider_generation': 43},
}}
mock_put.return_value.status_code = 200
mock_put.return_value.json.return_value = {
'inventories': {
'VCPU': {
'resource_provider_generation': 44,
}}}
# Make sure we store the original generation bit before it's updated
original_generation = rp.generation
expected_payload = self.client._compute_node_inventory(compute_node)
result = self.client._update_inventory(compute_node)
expected_url = '/resource_providers/' + uuid + '/inventories'
self.ks_sess_mock.put.assert_called_once_with(
expected_url,
endpoint_filter=mock.ANY,
json=expected_payload,
raise_exc=False)
result = self.client._update_inventory_attempt(compute_node)
self.assertTrue(result)
# Make sure the generation bit has been incremented
rp = self.client._resource_providers[compute_node.uuid]
self.assertEqual(original_generation + 1, rp.generation)
exp_url = '/resource_providers/%s/inventories' % uuid
mock_get.assert_called_once_with(exp_url)
# Called with the newly-found generation from the existing inventory
self.assertEqual(43,
mock_inv.return_value['resource_provider_generation'])
# Updated with the new inventory from the PUT call
self.assertEqual(44, rp.generation)
mock_put.assert_called_once_with(exp_url, mock_inv.return_value)
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'get')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'put')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_compute_node_inventory')
def test_update_inventory_no_update(self, mock_inv, mock_put, mock_get):
uuid = uuids.compute_node
compute_node = objects.ComputeNode(uuid=uuid,
hypervisor_hostname='foo')
rp = objects.ResourceProvider(uuid=uuid, name='foo', generation=42)
self.client._resource_providers[uuid] = rp
mock_inv.return_value = {'inventories': {
'VCPU': {'total': 8},
}}
mock_get.return_value.json.return_value = {'inventories': {
'VCPU': {'total': 8,
'resource_provider_generation': 43}
}}
result = self.client._update_inventory_attempt(compute_node)
self.assertTrue(result)
exp_url = '/resource_providers/%s/inventories' % uuid
mock_get.assert_called_once_with(exp_url)
# No update so put should not be called
self.assertFalse(mock_put.called)
# Make sure we updated the generation from the inventory records
self.assertEqual(43, rp.generation)
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_get_inventory')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'put')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_compute_node_inventory')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_ensure_resource_provider')
def test_update_inventory_conflicts(self, mock_ensure, mock_inv,
mock_put, mock_get):
# Ensure _update_inventory() returns a list of Inventories objects
# after creating or updating the existing values
uuid = uuids.compute_node
compute_node = objects.ComputeNode(uuid=uuid,
hypervisor_hostname='foo')
rp = objects.ResourceProvider(uuid=uuid, name='foo', generation=42)
# Make sure the ResourceProvider exists for preventing to call the API
self.client._resource_providers[uuid] = rp
mock_inv.return_value = {'inventories': [{'resource_class': 'VCPU'}]}
mock_get.return_value = {}
mock_put.return_value.status_code = 409
result = self.client._update_inventory_attempt(compute_node)
self.assertFalse(result)
# Invalidated the cache
self.assertNotIn(uuid, self.client._resource_providers)
# Refreshed our resource provider
mock_ensure.assert_called_once_with(uuid, 'foo')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_get_inventory')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'put')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_compute_node_inventory')
def test_update_inventory_unknown_response(self, mock_inv,
mock_put, mock_get):
# Ensure _update_inventory() returns a list of Inventories objects
# after creating or updating the existing values
uuid = uuids.compute_node
compute_node = objects.ComputeNode(uuid=uuid,
hypervisor_hostname='foo')
rp = objects.ResourceProvider(uuid=uuid, name='foo', generation=42)
# Make sure the ResourceProvider exists for preventing to call the API
self.client._resource_providers[uuid] = rp
mock_inv.return_value = {'inventories': [{'resource_class': 'VCPU'}]}
mock_get.return_value = {}
mock_put.return_value.status_code = 234
result = self.client._update_inventory_attempt(compute_node)
self.assertFalse(result)
# No cache invalidation
self.assertIn(uuid, self.client._resource_providers)
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_get_inventory')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'put')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_compute_node_inventory')
def test_update_inventory_failed(self, mock_inv,
mock_put, mock_get):
# Ensure _update_inventory() returns a list of Inventories objects
# after creating or updating the existing values
uuid = uuids.compute_node
compute_node = objects.ComputeNode(uuid=uuid,
hypervisor_hostname='foo')
rp = objects.ResourceProvider(uuid=uuid, name='foo', generation=42)
# Make sure the ResourceProvider exists for preventing to call the API
self.client._resource_providers[uuid] = rp
mock_inv.return_value = {'inventories': [{'resource_class': 'VCPU'}]}
mock_get.return_value = {}
try:
mock_put.return_value.__nonzero__.return_value = False
except AttributeError:
# Thanks py3
mock_put.return_value.__bool__.return_value = False
result = self.client._update_inventory_attempt(compute_node)
self.assertFalse(result)
# No cache invalidation
self.assertIn(uuid, self.client._resource_providers)
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_ensure_resource_provider')
def test_update_inventory_conflicts_and_then_succeeds(self, ensure_mock):
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_update_inventory_attempt')
@mock.patch('time.sleep')
def test_update_inventory_fails_and_then_succeeds(self, mock_sleep,
mock_update,
mock_ensure):
# Ensure _update_inventory() fails if we have a conflict when updating
# but retries correctly.
uuid = uuids.compute_node
name = 'computehost'
compute_node = objects.ComputeNode(uuid=uuid,
hypervisor_hostname=name,
vcpus=2,
cpu_allocation_ratio=16.0,
memory_mb=1024,
ram_allocation_ratio=1.5,
local_gb=10,
disk_allocation_ratio=1.0)
rp = objects.ResourceProvider(uuid=uuid, name=name, generation=42)
# Make sure the ResourceProvider exists for preventing to call the API
def fake_ensure_rp(uuid, name=None):
self.client._resource_providers[uuid] = rp
ensure_mock.side_effect = fake_ensure_rp
self.client._resource_providers[uuid] = rp
# Make sure we store the original generation bit before it's updated
original_generation = rp.generation
expected_payload = self.client._compute_node_inventory(compute_node)
expected_output = mock.sentinel.inventories
conflict_mock = mock.Mock(status_code=409)
success_mock = mock.Mock(status_code=200, json=lambda: expected_output)
self.ks_sess_mock.put.side_effect = (conflict_mock, success_mock)
result = self.client._update_inventory(compute_node)
expected_url = '/resource_providers/' + uuid + '/inventories'
self.ks_sess_mock.put.assert_has_calls(
[
mock.call(expected_url,
endpoint_filter=mock.ANY,
json=expected_payload,
raise_exc=False),
mock.call(expected_url,
endpoint_filter=mock.ANY,
json=expected_payload,
raise_exc=False),
])
cn = mock.MagicMock()
mock_update.side_effect = (False, True)
result = self.client._update_inventory(cn)
self.assertTrue(result)
# Make sure the generation bit has been incremented
rp = self.client._resource_providers[compute_node.uuid]
self.assertEqual(original_generation + 1, rp.generation)
# Only slept once
mock_sleep.assert_called_once_with(1)
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_ensure_resource_provider')
def test_update_inventory_conflicts_and_then_fails(self, ensure_mock):
# Ensure _update_inventory() fails if we have a conflict when updating
# but fails again.
uuid = uuids.compute_node
name = 'computehost'
compute_node = objects.ComputeNode(uuid=uuid,
hypervisor_hostname=name,
vcpus=2,
cpu_allocation_ratio=16.0,
memory_mb=1024,
ram_allocation_ratio=1.5,
local_gb=10,
disk_allocation_ratio=1.0)
rp = objects.ResourceProvider(uuid=uuid, name=name, generation=42)
# Make sure the ResourceProvider exists for preventing to call the API
def fake_ensure_rp(uuid, name=None):
self.client._resource_providers[uuid] = rp
ensure_mock.side_effect = fake_ensure_rp
self.client._resource_providers[uuid] = rp
expected_payload = self.client._compute_node_inventory(compute_node)
conflict_mock = mock.Mock(status_code=409)
fail_mock = mock.Mock(status_code=400)
self.ks_sess_mock.put.side_effect = (conflict_mock, fail_mock)
result = self.client._update_inventory(compute_node)
expected_url = '/resource_providers/' + uuid + '/inventories'
self.ks_sess_mock.put.assert_has_calls(
[
mock.call(expected_url,
endpoint_filter=mock.ANY,
json=expected_payload,
raise_exc=False),
mock.call(expected_url,
endpoint_filter=mock.ANY,
json=expected_payload,
raise_exc=False),
])
self.assertFalse(result)
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_ensure_resource_provider')
def test_update_inventory_conflicts_and_then_conflicts(self, ensure_mock):
# Ensure _update_inventory() fails if we have a conflict when updating
# but fails again.
uuid = uuids.compute_node
name = 'computehost'
compute_node = objects.ComputeNode(uuid=uuid,
hypervisor_hostname=name,
vcpus=2,
cpu_allocation_ratio=16.0,
memory_mb=1024,
ram_allocation_ratio=1.5,
local_gb=10,
disk_allocation_ratio=1.0)
rp = objects.ResourceProvider(uuid=uuid, name=name, generation=42)
# Make sure the ResourceProvider exists for preventing to call the API
def fake_ensure_rp(uuid, name=None):
self.client._resource_providers[uuid] = rp
ensure_mock.side_effect = fake_ensure_rp
self.client._resource_providers[uuid] = rp
expected_payload = self.client._compute_node_inventory(compute_node)
conflict_mock = mock.Mock(status_code=409)
self.ks_sess_mock.put.return_value = conflict_mock
result = self.client._update_inventory(compute_node)
expected_url = '/resource_providers/' + uuid + '/inventories'
self.ks_sess_mock.put.assert_has_calls(
[
mock.call(expected_url,
endpoint_filter=mock.ANY,
json=expected_payload,
raise_exc=False),
mock.call(expected_url,
endpoint_filter=mock.ANY,
json=expected_payload,
raise_exc=False),
])
'_update_inventory_attempt')
@mock.patch('time.sleep')
def test_update_inventory_never_succeeds(self, mock_sleep,
mock_update,
mock_ensure):
# but retries correctly.
cn = mock.MagicMock()
mock_update.side_effect = (False, False, False)
result = self.client._update_inventory(cn)
self.assertFalse(result)
# Slept three times
mock_sleep.assert_has_calls([mock.call(1), mock.call(1), mock.call(1)])
# Three attempts to update
mock_update.assert_has_calls([mock.call(cn), mock.call(cn),
mock.call(cn)])
# Slept three times
mock_sleep.assert_has_calls([mock.call(1), mock.call(1), mock.call(1)])
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'
'_ensure_resource_provider')
@mock.patch('nova.scheduler.client.report.SchedulerReportClient.'