Fix and cleanup compute node stat tracking

Fix an issue where compute node stats are not reported correctly
if the instance's state was modified from outside of the compute
host. (e.g. nova-api server changing task_state prior to invoking
a rebuild_instance on the compute manager)

Also did some general cleanup on the stats code.

bug 1046591

Change-Id: I5d9fc78f22df6854777c5957cbc69c8dd5ec10d3
This commit is contained in:
Brian Elliott 2012-09-02 19:29:56 +00:00
parent 06dec8ec1e
commit 8dc34a0aba
5 changed files with 123 additions and 80 deletions

View File

@ -252,7 +252,7 @@ class ComputeManager(manager.SchedulerDependentManager):
(old_ref, instance_ref) = self.db.instance_update_and_get_original(
context, instance_uuid, kwargs)
self.resource_tracker.update_load_stats_for_instance(context, old_ref,
self.resource_tracker.update_load_stats_for_instance(context,
instance_ref)
notifications.send_update(context, old_ref, instance_ref)

View File

@ -362,14 +362,14 @@ class ResourceTracker(object):
LOG.info(_('Compute_service record updated for %s ') % self.host)
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
def update_load_stats_for_instance(self, context, old_ref, instance_ref):
def update_load_stats_for_instance(self, context, instance_ref):
"""Update workload stats for the local compute host."""
if self.disabled:
return
values = {}
self.stats.update_stats_for_instance(old_ref, instance_ref)
self.stats.update_stats_for_instance(instance_ref)
values['stats'] = self.stats
values['current_workload'] = self.stats.calculate_workload()
@ -414,7 +414,7 @@ class ResourceTracker(object):
instances = db.instance_get_all_by_filters(context, filters)
for instance in instances:
self.stats.add_stats_for_instance(instance)
self.stats.update_stats_for_instance(instance)
values['current_workload'] = self.stats.calculate_workload()
values['running_vms'] = self.stats.num_instances

View File

@ -15,31 +15,24 @@
from nova.compute import task_states
from nova.compute import vm_states
from nova import db
from nova.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class Stats(dict):
"""Handler for updates to compute node workload stats."""
def add_stats_for_instance(self, instance):
self._increment("num_task_%s" % instance['task_state'])
def __init__(self):
super(Stats, self).__init__()
self._increment("num_vm_%s" % instance['vm_state'])
self._increment("num_instances")
# Track instance states for compute node workload calculations:
self.states = {}
os_type = instance['os_type']
self._increment("num_os_type_%s" % os_type)
def clear(self):
super(Stats, self).clear()
proj_id = instance['project_id']
self._increment("num_proj_%s" % proj_id)
self.states.clear()
x = self.get("num_vcpus_used", 0)
self["num_vcpus_used"] = x + instance["vcpus"]
def calculate_io_workload(self):
@property
def io_workload(self):
"""Calculate an I/O based load by counting I/O heavy operations"""
def _get(state, state_type):
@ -82,32 +75,44 @@ class Stats(dict):
def num_vcpus_used(self):
return self.get("num_vcpus_used", 0)
def update_stats_for_instance(self, old_instance, instance):
def update_stats_for_instance(self, instance):
"""Update stats after an instance is changed."""
old_vm_state = old_instance['vm_state']
new_vm_state = instance['vm_state']
uuid = instance['uuid']
if old_vm_state != new_vm_state:
self._decrement("num_vm_%s" % old_vm_state)
self._increment("num_vm_%s" % new_vm_state)
if new_vm_state == vm_states.DELETED:
self._decrement("num_instances")
self._decrement("num_os_type_%s" % old_instance['os_type'])
self._decrement("num_proj_%s" % old_instance["project_id"])
# First, remove stats from the previous instance
# state:
if uuid in self.states:
old_state = self.states[uuid]
self._decrement("num_vm_%s" % old_state['vm_state'])
self._decrement("num_task_%s" % old_state['task_state'])
self._decrement("num_os_type_%s" % old_state['os_type'])
self._decrement("num_proj_%s" % old_state['project_id'])
x = self.get("num_vcpus_used", 0)
self["num_vcpus_used"] = x - old_instance['vcpus']
self["num_vcpus_used"] = x - old_state['vcpus']
else:
# new instance
self._increment("num_instances")
old_task_state = old_instance['task_state']
new_task_state = instance['task_state']
# Now update stats from the new instance state:
(vm_state, task_state, os_type, project_id, vcpus) = \
self._extract_state_from_instance(instance)
if old_task_state != new_task_state:
self._decrement("num_task_%s" % old_task_state)
self._increment("num_task_%s" % new_task_state)
if vm_state == vm_states.DELETED:
self._decrement("num_instances")
self.states.pop(uuid)
else:
self._increment("num_vm_%s" % vm_state)
self._increment("num_task_%s" % task_state)
self._increment("num_os_type_%s" % os_type)
self._increment("num_proj_%s" % project_id)
x = self.get("num_vcpus_used", 0)
self["num_vcpus_used"] = x + vcpus
# save updated I/O workload in stats:
self["io_workload"] = self.io_workload
def _decrement(self, key):
x = self.get(key, 0)
@ -116,3 +121,19 @@ class Stats(dict):
def _increment(self, key):
x = self.get(key, 0)
self[key] = x + 1
def _extract_state_from_instance(self, instance):
"""Save the useful bits of instance state for tracking purposes"""
uuid = instance['uuid']
vm_state = instance['vm_state']
task_state = instance['task_state']
os_type = instance['os_type']
project_id = instance['project_id']
vcpus = instance['vcpus']
self.states[uuid] = dict(vm_state=vm_state, task_state=task_state,
os_type=os_type, project_id=project_id,
vcpus=vcpus)
return (vm_state, task_state, os_type, project_id, vcpus)

View File

@ -86,6 +86,7 @@ class BaseTestCase(test.TestCase):
"os_type": "Linux",
"project_id": "1234",
"vcpus": 1,
"uuid": "12-34-56-78-90",
}
self.stubs.Set(db, 'instance_get_all_by_filters',
@ -476,17 +477,16 @@ class ResourceTestCase(BaseTestCase):
self.assertFalse(self.tracker.disabled)
self.assertEqual(0, self.tracker.compute_node['current_workload'])
old_ref = self.instance_ref
old_ref['task_state'] = task_states.SCHEDULING
with self.tracker.instance_resource_claim(self.context, old_ref):
self.instance_ref['task_state'] = task_states.SCHEDULING
with self.tracker.instance_resource_claim(self.context,
self.instance_ref):
pass
self.assertEqual(1, self.tracker.compute_node['current_workload'])
new_ref = copy.copy(old_ref)
new_ref['vm_state'] = vm_states.ACTIVE
new_ref['task_state'] = None
self.instance_ref['vm_state'] = vm_states.ACTIVE
self.instance_ref['task_state'] = None
self.tracker.update_load_stats_for_instance(self.context, old_ref,
new_ref)
self.tracker.update_load_stats_for_instance(self.context,
self.instance_ref)
self.assertEqual(0, self.tracker.compute_node['current_workload'])

View File

@ -37,12 +37,13 @@ class StatsTestCase(test.TestCase):
"task_state": None,
"vm_state": vm_states.BUILDING,
"vcpus": 1,
"uuid": "12-34-56-78-90",
}
if values:
instance.update(values)
return instance
def testOsTypeCount(self):
def test_os_type_count(self):
os_type = "Linux"
self.assertEqual(0, self.stats.num_os_type(os_type))
self.stats._increment("num_os_type_" + os_type)
@ -52,7 +53,7 @@ class StatsTestCase(test.TestCase):
self.stats["num_os_type_" + os_type] -= 1
self.assertEqual(1, self.stats.num_os_type(os_type))
def testUpdateProjectCount(self):
def test_update_project_count(self):
proj_id = "1234"
def _get():
@ -64,22 +65,23 @@ class StatsTestCase(test.TestCase):
self.stats["num_proj_" + proj_id] -= 1
self.assertEqual(0, _get())
def testInstanceCount(self):
def test_instance_count(self):
self.assertEqual(0, self.stats.num_instances)
for i in range(5):
self.stats._increment("num_instances")
self.stats["num_instances"] -= 1
self.assertEqual(4, self.stats.num_instances)
def testAddStatsForInstance(self):
def test_add_stats_for_instance(self):
instance = {
"os_type": "Linux",
"project_id": "1234",
"task_state": None,
"vm_state": vm_states.BUILDING,
"vcpus": 3,
"uuid": "12-34-56-78-90",
}
self.stats.add_stats_for_instance(instance)
self.stats.update_stats_for_instance(instance)
instance = {
"os_type": "FreeBSD",
@ -87,8 +89,9 @@ class StatsTestCase(test.TestCase):
"task_state": task_states.SCHEDULING,
"vm_state": None,
"vcpus": 1,
"uuid": "23-45-67-89-01",
}
self.stats.add_stats_for_instance(instance)
self.stats.update_stats_for_instance(instance)
instance = {
"os_type": "Linux",
@ -96,8 +99,9 @@ class StatsTestCase(test.TestCase):
"task_state": task_states.SCHEDULING,
"vm_state": vm_states.BUILDING,
"vcpus": 2,
"uuid": "34-56-78-90-12",
}
self.stats.add_stats_for_instance(instance)
self.stats.update_stats_for_instance(instance)
self.assertEqual(2, self.stats.num_os_type("Linux"))
self.assertEqual(1, self.stats.num_os_type("FreeBSD"))
@ -113,54 +117,54 @@ class StatsTestCase(test.TestCase):
self.assertEqual(6, self.stats.num_vcpus_used)
def testCalculateWorkload(self):
def test_calculate_workload(self):
self.stats._increment("num_task_None")
self.stats._increment("num_task_" + task_states.SCHEDULING)
self.stats._increment("num_task_" + task_states.SCHEDULING)
self.assertEqual(2, self.stats.calculate_workload())
def testUpdateStatsForInstanceNoChange(self):
old = self._create_instance()
self.stats.add_stats_for_instance(old)
def test_update_stats_for_instance_no_change(self):
instance = self._create_instance()
self.stats.update_stats_for_instance(instance)
self.stats.update_stats_for_instance(old, old) # no change
self.stats.update_stats_for_instance(instance) # no change
self.assertEqual(1, self.stats.num_instances)
self.assertEqual(1, self.stats.num_instances_for_project("1234"))
self.assertEqual(1, self.stats["num_os_type_Linux"])
self.assertEqual(1, self.stats["num_task_None"])
self.assertEqual(1, self.stats["num_vm_" + vm_states.BUILDING])
def testUpdateStatsForInstanceVmChange(self):
old = self._create_instance()
self.stats.add_stats_for_instance(old)
def test_update_stats_for_instance_vm_change(self):
instance = self._create_instance()
self.stats.update_stats_for_instance(instance)
new = self._create_instance({"vm_state": vm_states.PAUSED})
self.stats.update_stats_for_instance(old, new)
instance["vm_state"] = vm_states.PAUSED
self.stats.update_stats_for_instance(instance)
self.assertEqual(1, self.stats.num_instances)
self.assertEqual(1, self.stats.num_instances_for_project(1234))
self.assertEqual(1, self.stats.num_os_type("Linux"))
self.assertEqual(0, self.stats.num_vm_state(vm_states.BUILDING))
self.assertEqual(1, self.stats.num_vm_state(vm_states.PAUSED))
self.assertEqual(1, self.stats["num_os_type_Linux"])
self.assertEqual(0, self.stats["num_vm_%s" % vm_states.BUILDING])
self.assertEqual(1, self.stats["num_vm_%s" % vm_states.PAUSED])
def testUpdateStatsForInstanceVmChange(self):
old = self._create_instance()
self.stats.add_stats_for_instance(old)
def test_update_stats_for_instance_task_change(self):
instance = self._create_instance()
self.stats.update_stats_for_instance(instance)
new = self._create_instance({"task_state": task_states.REBUILDING})
self.stats.update_stats_for_instance(old, new)
instance["task_state"] = task_states.REBUILDING
self.stats.update_stats_for_instance(instance)
self.assertEqual(1, self.stats.num_instances)
self.assertEqual(1, self.stats.num_instances_for_project("1234"))
self.assertEqual(1, self.stats["num_os_type_Linux"])
self.assertEqual(0, self.stats["num_task_None"])
self.assertEqual(1, self.stats["num_task_" + task_states.REBUILDING])
self.assertEqual(1, self.stats["num_task_%s" % task_states.REBUILDING])
def testUpdateStatsForInstanceDeleted(self):
old = self._create_instance()
self.stats.add_stats_for_instance(old)
def test_update_stats_for_instance_deleted(self):
instance = self._create_instance()
self.stats.update_stats_for_instance(instance)
self.assertEqual(1, self.stats["num_proj_1234"])
new = self._create_instance({"vm_state": vm_states.DELETED})
self.stats.update_stats_for_instance(old, new)
instance["vm_state"] = vm_states.DELETED
self.stats.update_stats_for_instance(instance)
self.assertEqual(0, self.stats.num_instances)
self.assertEqual(0, self.stats.num_instances_for_project("1234"))
@ -168,7 +172,7 @@ class StatsTestCase(test.TestCase):
self.assertEqual(0, self.stats["num_vm_" + vm_states.BUILDING])
self.assertEqual(0, self.stats.num_vcpus_used)
def testIoWorkload(self):
def test_io_workload(self):
vms = [vm_states.ACTIVE, vm_states.BUILDING, vm_states.PAUSED]
tasks = [task_states.RESIZE_MIGRATING, task_states.REBUILDING,
task_states.RESIZE_PREP, task_states.IMAGE_SNAPSHOT,
@ -179,4 +183,22 @@ class StatsTestCase(test.TestCase):
for state in tasks:
self.stats._increment("num_task_" + state)
self.assertEqual(6, self.stats.calculate_io_workload())
self.assertEqual(6, self.stats.io_workload)
def test_io_workload_saved_to_stats(self):
values = {'task_state': task_states.RESIZE_MIGRATING}
instance = self._create_instance(values)
self.stats.update_stats_for_instance(instance)
self.assertEqual(2, self.stats["io_workload"])
def test_clear(self):
instance = self._create_instance()
self.stats.update_stats_for_instance(instance)
self.assertNotEqual(0, len(self.stats))
self.assertEqual(1, len(self.stats.states))
self.stats.clear()
self.assertEqual(0, len(self.stats))
self.assertEqual(0, len(self.stats.states))