diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 2f977b26c522..5adfcb2334b8 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -253,8 +253,7 @@ class ComputeManager(manager.SchedulerDependentManager): (old_ref, instance_ref) = self.db.instance_update_and_get_original( context, instance_uuid, kwargs) - self.resource_tracker.update_load_stats_for_instance(context, - instance_ref) + self.resource_tracker.update_usage(context, instance_ref) notifications.send_update(context, old_ref, instance_ref) return instance_ref @@ -482,10 +481,14 @@ class ComputeManager(manager.SchedulerDependentManager): network_info = self._allocate_network(context, instance, requested_networks) try: - memory_mb_limit = filter_properties.get('memory_mb_limit', - None) - with self.resource_tracker.instance_resource_claim(context, - instance, memory_mb_limit=memory_mb_limit): + limits = filter_properties.get('limits', {}) + with self.resource_tracker.resource_claim(context, instance, + limits): + # Resources are available to build this instance here, + # mark it as belonging to this host: + self._instance_update(context, instance['uuid'], + host=self.host, launched_on=self.host) + block_device_info = self._prep_block_device(context, instance) instance = self._spawn(context, instance, image_meta, @@ -686,7 +689,6 @@ class ComputeManager(manager.SchedulerDependentManager): LOG.audit(_('Starting instance...'), context=context, instance=instance) self._instance_update(context, instance['uuid'], - host=self.host, launched_on=self.host, vm_state=vm_states.BUILDING, task_state=None, expected_task_state=(task_states.SCHEDULING, @@ -891,8 +893,6 @@ class ComputeManager(manager.SchedulerDependentManager): self.db.instance_destroy(context, instance_uuid) system_meta = self.db.instance_system_metadata_get(context, instance_uuid) - # mark resources free - self.resource_tracker.free_resources(context) self._notify_about_instance_usage(context, instance, "delete.end", system_metadata=system_meta) diff --git a/nova/compute/resource_tracker.py b/nova/compute/resource_tracker.py index cc088655b966..eb0d302b0fb2 100644 --- a/nova/compute/resource_tracker.py +++ b/nova/compute/resource_tracker.py @@ -19,17 +19,22 @@ scheduler with useful information about availability through the ComputeNode model. """ -from nova import context +from nova.compute import vm_states from nova import db from nova import exception from nova import flags from nova.openstack.common import cfg from nova.openstack.common import importutils +from nova.openstack.common import jsonutils from nova.openstack.common import log as logging from nova.openstack.common import timeutils from nova import utils resource_tracker_opts = [ + cfg.IntOpt('reserved_host_disk_mb', default=0, + help='Amount of disk in MB to reserve for the host'), + cfg.IntOpt('reserved_host_memory_mb', default=512, + help='Amount of memory in MB to reserve for the host'), cfg.IntOpt('claim_timeout_seconds', default=600, help='How long, in seconds, before a resource claim times out'), cfg.StrOpt('compute_stats_class', @@ -52,46 +57,36 @@ class Claim(object): correct decisions with respect to host selection. """ - def __init__(self, claim_id, memory_mb, disk_gb, timeout, *args, **kwargs): - self.claim_id = claim_id - self.memory_mb = memory_mb - self.disk_gb = disk_gb + def __init__(self, instance, timeout): + self.instance = jsonutils.to_primitive(instance) self.timeout = timeout self.expire_ts = timeutils.utcnow_ts() + timeout - def apply_claim(self, resources): - """Adjust the resources required from available resources. - - :param resources: Should be a dictionary-like object that - has fields like a compute node - """ - return self._apply(resources) - - def undo_claim(self, resources): - return self._apply(resources, sign=-1) - def is_expired(self): """Determine if this adjustment is old enough that we can assume it's no longer needed. """ return timeutils.utcnow_ts() > self.expire_ts - def _apply(self, resources, sign=1): - values = {} - values['memory_mb_used'] = (resources['memory_mb_used'] + sign * - self.memory_mb) - values['free_ram_mb'] = (resources['free_ram_mb'] - sign * - self.memory_mb) - values['local_gb_used'] = (resources['local_gb_used'] + sign * - self.disk_gb) - values['free_disk_gb'] = (resources['free_disk_gb'] - sign * - self.disk_gb) + @property + def claim_id(self): + return self.instance['uuid'] - return values + @property + def disk_gb(self): + return self.instance['root_gb'] + self.instance['ephemeral_gb'] + + @property + def memory_mb(self): + return self.instance['memory_mb'] + + @property + def vcpus(self): + return self.instance['vcpus'] def __str__(self): - return "[Claim %d: %d MB memory, %d GB disk]" % (self.claim_id, - self.memory_mb, self.disk_gb) + return "[Claim %s: %d MB memory, %d GB disk, %d VCPUS]" % \ + (self.claim_id, self.memory_mb, self.disk_gb, self.vcpus) class ResourceContextManager(object): @@ -127,46 +122,25 @@ class ResourceTracker(object): self.next_claim_id = 1 self.claims = {} self.stats = importutils.import_object(FLAGS.compute_stats_class) + self.tracked_instances = {} - def resource_claim(self, context, *args, **kwargs): - claim = self.begin_resource_claim(context, *args, **kwargs) - return ResourceContextManager(context, claim, self) - - def instance_resource_claim(self, context, instance_ref, *args, **kwargs): - claim = self.begin_instance_resource_claim(context, instance_ref, - *args, **kwargs) + def resource_claim(self, context, instance_ref, limits=None): + claim = self.begin_resource_claim(context, instance_ref, limits) return ResourceContextManager(context, claim, self) @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) - def begin_instance_resource_claim(self, context, instance_ref, *args, - **kwargs): - """Method to begin a resource claim for a new instance.""" - memory_mb = instance_ref['memory_mb'] - disk_gb = instance_ref['root_gb'] + instance_ref['ephemeral_gb'] - - claim = self._do_begin_resource_claim(context, memory_mb, disk_gb, - *args, **kwargs) - if claim: - # also update load stats related to new instances firing up - - values = self._create_load_stats(context, instance_ref) - self.compute_node = self._update(context, values) - return claim - - @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) - def begin_resource_claim(self, context, memory_mb, disk_gb, - memory_mb_limit=None, timeout=None, *args, **kwargs): + def begin_resource_claim(self, context, instance_ref, limits=None, + timeout=None): """Indicate that some resources are needed for an upcoming compute - host operation. + instance build operation. - This should be called any time the compute node is about to perform - an operation that will consume resources. + This should be called before the compute node is about to perform + an instance build operation that will consume additional resources. - :param memory_mb: security context - :param memory_mb: Memory in MB to be claimed - :param root_gb: Disk in GB to be claimed - :param memory_mb_limit: Memory in MB that is the maximum to allocate on - this node. May exceed installed physical memory if - oversubscription is the desired behavior. + :param context: security context + :param instance_ref: instance to reserve resources for + :param limits: Dict of oversubscription limits for memory, disk, + and CPUs. :param timeout: How long, in seconds, the operation that requires these resources should take to actually allocate what it needs from the hypervisor. If the timeout is @@ -177,71 +151,142 @@ class ResourceTracker(object): compute operation is finished. Returns None if the claim failed. """ - - return self._do_begin_resource_claim(context, memory_mb, disk_gb, - memory_mb_limit, timeout, *args, **kwargs) - - def _do_begin_resource_claim(self, context, memory_mb, disk_gb, - memory_mb_limit=None, timeout=None, *args, **kwargs): - if self.disabled: return + if not limits: + limits = {} + if not timeout: timeout = FLAGS.claim_timeout_seconds - memory_mb = abs(memory_mb) - disk_gb = abs(disk_gb) + # If an individual limit is None, the resource will be considered + # unlimited: + memory_mb_limit = limits.get('memory_mb') + disk_gb_limit = limits.get('disk_gb') + vcpu_limit = limits.get('vcpu') + + memory_mb = instance_ref['memory_mb'] + disk_gb = instance_ref['root_gb'] + instance_ref['ephemeral_gb'] + vcpus = instance_ref['vcpus'] msg = _("Attempting claim: memory %(memory_mb)d MB, disk %(disk_gb)d " - "GB, mem limit %(memory_mb_limit)s") % locals() + "GB, VCPUs %(vcpus)d") % locals() LOG.audit(msg) - if not memory_mb_limit: - # default to total memory: - memory_mb_limit = self.compute_node['memory_mb'] + # Test for resources: + if not self._can_claim_memory(memory_mb, memory_mb_limit): + return - free_ram_mb = memory_mb_limit - self.compute_node['memory_mb_used'] + if not self._can_claim_disk(disk_gb, disk_gb_limit): + return + if not self._can_claim_cpu(vcpus, vcpu_limit): + return + + # keep track of this claim until we know whether the compute operation + # was successful/completed: + claim = Claim(instance_ref, timeout) + self.claims[claim.claim_id] = claim + + # Mark resources in-use and update stats + self._update_usage_from_instance(self.compute_node, instance_ref) + + # persist changes to the compute node: + self._update(context, self.compute_node) + return claim + + def _can_claim_memory(self, memory_mb, memory_mb_limit): + """Test if memory needed for a claim can be safely allocated""" # Installed memory and usage info: msg = _("Total memory: %(total_mem)d MB, used: %(used_mem)d MB, free: " - "%(free_mem)d") % dict( + "%(free_mem)d MB") % dict( total_mem=self.compute_node['memory_mb'], used_mem=self.compute_node['memory_mb_used'], free_mem=self.compute_node['local_gb_used']) LOG.audit(msg) + if memory_mb_limit is None: + # treat memory as unlimited: + LOG.audit(_("Memory limit not specified, defaulting to unlimited")) + return True + + free_ram_mb = memory_mb_limit - self.compute_node['memory_mb_used'] + # Oversubscribed memory policy info: - msg = _("Limit: %(memory_mb_limit)d MB, free: %(free_ram_mb)d") % \ - locals() + msg = _("Memory limit: %(memory_mb_limit)d MB, free: " + "%(free_ram_mb)d MB") % locals() LOG.audit(msg) - if memory_mb > free_ram_mb: + can_claim_mem = memory_mb <= free_ram_mb + + if not can_claim_mem: msg = _("Unable to claim resources. Free memory %(free_ram_mb)d " "MB < requested memory %(memory_mb)d MB") % locals() LOG.info(msg) - return None - if disk_gb > self.compute_node['free_disk_gb']: + return can_claim_mem + + def _can_claim_disk(self, disk_gb, disk_gb_limit): + """Test if disk space needed can be safely allocated""" + # Installed disk and usage info: + msg = _("Total disk: %(total_disk)d GB, used: %(used_disk)d GB, free: " + "%(free_disk)d GB") % dict( + total_disk=self.compute_node['local_gb'], + used_disk=self.compute_node['local_gb_used'], + free_disk=self.compute_node['free_disk_gb']) + LOG.audit(msg) + + if disk_gb_limit is None: + # treat disk as unlimited: + LOG.audit(_("Disk limit not specified, defaulting to unlimited")) + return True + + free_disk_gb = disk_gb_limit - self.compute_node['local_gb_used'] + + # Oversubscribed disk policy info: + msg = _("Disk limit: %(disk_gb_limit)d GB, free: " + "%(free_disk_gb)d GB") % locals() + LOG.audit(msg) + + can_claim_disk = disk_gb <= free_disk_gb + if not can_claim_disk: msg = _("Unable to claim resources. Free disk %(free_disk_gb)d GB" " < requested disk %(disk_gb)d GB") % dict( free_disk_gb=self.compute_node['free_disk_gb'], disk_gb=disk_gb) LOG.info(msg) - return None - claim_id = self._get_next_id() - c = Claim(claim_id, memory_mb, disk_gb, timeout, *args, **kwargs) + return can_claim_disk - # adjust compute node usage values and save so scheduler will see it: - values = c.apply_claim(self.compute_node) - self.compute_node = self._update(context, values) + def _can_claim_cpu(self, vcpus, vcpu_limit): + """Test if CPUs can be safely allocated according to given policy.""" - # keep track of this claim until we know whether the compute operation - # was successful/completed: - self.claims[claim_id] = c - return c + msg = _("Total VCPUs: %(total_vcpus)d, used: %(used_vcpus)d") \ + % dict(total_vcpus=self.compute_node['vcpus'], + used_vcpus=self.compute_node['vcpus_used']) + LOG.audit(msg) + if vcpu_limit is None: + # treat cpu as unlimited: + LOG.audit(_("VCPU limit not specified, defaulting to unlimited")) + return True + + # Oversubscribed disk policy info: + msg = _("CPU limit: %(vcpu_limit)d") % locals() + LOG.audit(msg) + + free_vcpus = vcpu_limit - self.compute_node['vcpus_used'] + can_claim_cpu = vcpus <= free_vcpus + + if not can_claim_cpu: + msg = _("Unable to claim resources. Free CPU %(free_vcpus)d < " + "requested CPU %(vcpus)d") % locals() + LOG.info(msg) + + return can_claim_cpu + + @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) def finish_resource_claim(self, claim): """Indicate that the compute operation that previously claimed the resources identified by 'claim' has now completed and the resources @@ -260,7 +305,7 @@ class ResourceTracker(object): if self.claims.pop(claim.claim_id, None): LOG.info(_("Finishing claim: %s") % claim) else: - LOG.info(_("Can't find claim %d. It may have been 'finished' " + LOG.info(_("Can't find claim %s. It may have been 'finished' " "twice, or it has already timed out."), claim.claim_id) @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) @@ -278,27 +323,37 @@ class ResourceTracker(object): # un-claim the resources: if self.claims.pop(claim.claim_id, None): LOG.info(_("Aborting claim: %s") % claim) - values = claim.undo_claim(self.compute_node) - self.compute_node = self._update(context, values) + # flag the instance as deleted to revert the resource usage + # and associated stats: + claim.instance['vm_state'] = vm_states.DELETED + self._update_usage_from_instance(self.compute_node, claim.instance) + self._update(context, self.compute_node) + else: # can't find the claim. this may mean the claim already timed # out or it was already explicitly finished/aborted. - LOG.info(_("Claim %d not found. It either timed out or was " + LOG.audit(_("Claim %s not found. It either timed out or was " "already explicitly finished/aborted"), claim.claim_id) + @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) + def update_usage(self, context, instance): + """Update the resource usage and stats after a change in an + instance + """ + if self.disabled: + return + + # don't update usage for this instance unless it submitted a resource + # claim first: + uuid = instance['uuid'] + if uuid in self.tracked_instances: + self._update_usage_from_instance(self.compute_node, instance) + self._update(context.elevated(), self.compute_node) + @property def disabled(self): return self.compute_node is None - def free_resources(self, context): - """A compute operation finished freeing up resources. Update compute - model to reflect updated resource usage. - - (The hypervisor may not immediately 'GC' all resources, so ask directly - to see what's available to update the compute node model.) - """ - self.update_available_resource(context.elevated()) - @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) def update_available_resource(self, context): """Override in-memory calculations of compute node resource usage based @@ -308,36 +363,33 @@ class ResourceTracker(object): declared a need for resources, but not necessarily retrieved them from the hypervisor layer yet. """ - # ask hypervisor for its view of resource availability & - # usage: resources = self.driver.get_available_resource() if not resources: # The virt driver does not support this function - LOG.warn(_("Virt driver does not support " + LOG.audit(_("Virt driver does not support " "'get_available_resource' Compute tracking is disabled.")) self.compute_node = None self.claims = {} return - # Confirm resources dictionary contains expected keys: self._verify_resources(resources) - resources['free_ram_mb'] = resources['memory_mb'] - \ - resources['memory_mb_used'] - resources['free_disk_gb'] = resources['local_gb'] - \ - resources['local_gb_used'] + self._report_hypervisor_resource_view(resources) - LOG.audit(_("free_ram_mb: %s") % resources['free_ram_mb']) - LOG.audit(_("free_disk_gb: %s") % resources['free_disk_gb']) - # Apply resource claims representing in-progress operations to - # 'resources'. This may over-estimate the amount of resources in use, - # at least until the next time 'update_available_resource' runs. - self._apply_claims(resources) + self._purge_expired_claims() - # also generate all load stats: - values = self._create_load_stats(context) - resources.update(values) + # Grab all instances assigned to this host: + filters = {'host': self.host, 'deleted': False} + instances = db.instance_get_all_by_filters(context, filters) + # Now calculate usage based on instance utilization: + self._update_usage_from_instances(resources, instances) + self._report_final_resource_view(resources) + + self._sync_compute_node(context, resources) + + def _sync_compute_node(self, context, resources): + """Create or update the compute node DB record""" if not self.compute_node: # we need a copy of the ComputeNode record: service = self._get_service(context) @@ -352,80 +404,30 @@ class ResourceTracker(object): if not self.compute_node: # Need to create the ComputeNode record: resources['service_id'] = service['id'] - self.compute_node = self._create(context, resources) + self._create(context, resources) LOG.info(_('Compute_service record created for %s ') % self.host) else: # just update the record: - self.compute_node = self._update(context, resources, - prune_stats=True) + self._update(context, resources, prune_stats=True) LOG.info(_('Compute_service record updated for %s ') % self.host) - @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE) - def update_load_stats_for_instance(self, context, instance_ref): - """Update workload stats for the local compute host.""" - - if self.disabled: - return - - values = {} - self.stats.update_stats_for_instance(instance_ref) - values['stats'] = self.stats - - values['current_workload'] = self.stats.calculate_workload() - values['running_vms'] = self.stats.num_instances - values['vcpus_used'] = self.stats.num_vcpus_used - - self.compute_node = self._update(context.elevated(), values) - - def _apply_claims(self, resources): - """Apply in-progress resource claims to the 'resources' dict from the - virt layer - """ + def _purge_expired_claims(self): + """Purge expired resource claims""" for claim_id in self.claims.keys(): c = self.claims[claim_id] if c.is_expired(): - # if this claim is expired, just expunge it - LOG.info(_("Expiring resource claim %d"), claim_id) + # if this claim is expired, just expunge it. + # it is assumed that the instance will eventually get built + # successfully. + LOG.audit(_("Expiring resource claim %s"), claim_id) self.claims.pop(claim_id) - else: - values = c.apply_claim(resources) - resources.update(values) def _create(self, context, values): """Create the compute node in the DB""" # initialize load stats from existing instances: compute_node = db.compute_node_create(context, values) - return compute_node - - def _create_load_stats(self, context, instance=None): - """For each existing instance generate load stats for the compute - node record. - """ - values = {} - - if instance: - instances = [instance] - else: - self.stats.clear() # re-generating all, so clear old stats - - # grab all instances that are not yet DELETED - filters = {'host': self.host, 'deleted': False} - instances = db.instance_get_all_by_filters(context, filters) - - for instance in instances: - self.stats.update_stats_for_instance(instance) - - values['current_workload'] = self.stats.calculate_workload() - values['running_vms'] = self.stats.num_instances - values['vcpus_used'] = self.stats.num_vcpus_used - values['stats'] = self.stats - return values - - def _get_next_id(self): - next_id = self.next_claim_id - self.next_claim_id += 1 - return next_id + self.compute_node = dict(compute_node) def _get_service(self, context): try: @@ -434,10 +436,105 @@ class ResourceTracker(object): except exception.NotFound: LOG.warn(_("No service record for host %s"), self.host) + def _report_hypervisor_resource_view(self, resources): + """Log the hypervisor's view of free memory in and free disk. + This is just a snapshot of resource usage recorded by the + virt driver. + """ + free_ram_mb = resources['memory_mb'] - resources['memory_mb_used'] + free_disk_gb = resources['local_gb'] - resources['local_gb_used'] + + LOG.debug(_("Hypervisor: free ram (MB): %s") % free_ram_mb) + LOG.debug(_("Hypervisor: free disk (GB): %s") % free_disk_gb) + + vcpus = resources['vcpus'] + if vcpus: + free_vcpus = vcpus - resources['vcpus_used'] + LOG.debug(_("Hypervisor: free VCPUs: %s") % free_vcpus) + else: + LOG.debug(_("Hypervisor: VCPU information unavailable")) + + def _report_final_resource_view(self, resources): + """Report final calculate of free memory and free disk including + instance calculations and in-progress resource claims. These + values will be exposed via the compute node table to the scheduler. + """ + LOG.audit(_("Free ram (MB): %s") % resources['free_ram_mb']) + LOG.audit(_("Free disk (GB): %s") % resources['free_disk_gb']) + + vcpus = resources['vcpus'] + if vcpus: + free_vcpus = vcpus - resources['vcpus_used'] + LOG.audit(_("Free VCPUS: %s") % free_vcpus) + else: + LOG.audit(_("Free VCPU information unavailable")) + def _update(self, context, values, prune_stats=False): """Persist the compute node updates to the DB""" - return db.compute_node_update(context, self.compute_node['id'], - values, prune_stats) + compute_node = db.compute_node_update(context, + self.compute_node['id'], values, prune_stats) + self.compute_node = dict(compute_node) + + def _update_usage_from_instance(self, resources, instance): + """Update usage for a single instance.""" + + uuid = instance['uuid'] + is_new_instance = uuid not in self.tracked_instances + is_deleted_instance = instance['vm_state'] == vm_states.DELETED + + if is_new_instance: + self.tracked_instances[uuid] = 1 + sign = 1 + + if instance['vm_state'] == vm_states.DELETED: + self.tracked_instances.pop(uuid) + sign = -1 + + self.stats.update_stats_for_instance(instance) + + # if it's a new or deleted instance: + if is_new_instance or is_deleted_instance: + # new instance, update compute node resource usage: + resources['memory_mb_used'] += sign * instance['memory_mb'] + resources['local_gb_used'] += sign * instance['root_gb'] + resources['local_gb_used'] += sign * instance['ephemeral_gb'] + + # free ram and disk may be negative, depending on policy: + resources['free_ram_mb'] = (resources['memory_mb'] - + resources['memory_mb_used']) + resources['free_disk_gb'] = (resources['local_gb'] - + resources['local_gb_used']) + + resources['running_vms'] = self.stats.num_instances + resources['vcpus_used'] = self.stats.num_vcpus_used + + resources['current_workload'] = self.stats.calculate_workload() + resources['stats'] = self.stats + + def _update_usage_from_instances(self, resources, instances): + """Calculate resource usage based on instance utilization. This is + different than the hypervisor's view as it will account for all + instances assigned to the local compute host, even if they are not + currently powered on. + """ + self.tracked_instances.clear() + + # purge old stats + self.stats.clear() + + # set some intiial values, reserve room for host/hypervisor: + resources['local_gb_used'] = FLAGS.reserved_host_disk_mb / 1024 + resources['memory_mb_used'] = FLAGS.reserved_host_memory_mb + resources['vcpus_used'] = 0 + resources['free_ram_mb'] = (resources['memory_mb'] - + resources['memory_mb_used']) + resources['free_disk_gb'] = (resources['local_gb'] - + resources['local_gb_used']) + resources['current_workload'] = 0 + resources['running_vms'] = 0 + + for instance in instances: + self._update_usage_from_instance(resources, instance) def _verify_resources(self, resources): resource_keys = ["vcpus", "memory_mb", "local_gb", "cpu_info", diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index 1c79c124d891..3798cade8e54 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -47,8 +47,6 @@ from sqlalchemy.sql.expression import literal_column from sqlalchemy.sql import func FLAGS = flags.FLAGS -flags.DECLARE('reserved_host_disk_mb', 'nova.scheduler.host_manager') -flags.DECLARE('reserved_host_memory_mb', 'nova.scheduler.host_manager') LOG = logging.getLogger(__name__) diff --git a/nova/scheduler/filter_scheduler.py b/nova/scheduler/filter_scheduler.py index 371aebf5377a..b2928177df7a 100644 --- a/nova/scheduler/filter_scheduler.py +++ b/nova/scheduler/filter_scheduler.py @@ -132,6 +132,9 @@ class FilterScheduler(driver.Scheduler): # Add a retry entry for the selected compute host: self._add_retry_host(filter_properties, weighted_host.host_state.host) + self._add_oversubscription_policy(filter_properties, + weighted_host.host_state) + payload = dict(request_spec=request_spec, weighted_host=weighted_host.to_dict(), instance_id=instance_uuid) @@ -160,6 +163,9 @@ class FilterScheduler(driver.Scheduler): hosts = retry['hosts'] hosts.append(host) + def _add_oversubscription_policy(self, filter_properties, host_state): + filter_properties['limits'] = host_state.limits + def _get_configuration_options(self): """Fetch options dictionary. Broken out for testing.""" return self.options.get_configuration() diff --git a/nova/scheduler/filters/core_filter.py b/nova/scheduler/filters/core_filter.py index 5af68bc9fac8..98b0930d983e 100644 --- a/nova/scheduler/filters/core_filter.py +++ b/nova/scheduler/filters/core_filter.py @@ -47,4 +47,10 @@ class CoreFilter(filters.BaseHostFilter): instance_vcpus = instance_type['vcpus'] vcpus_total = host_state.vcpus_total * FLAGS.cpu_allocation_ratio + + # Only provide a VCPU limit to compute if the virt driver is reporting + # an accurate count of installed VCPUs. (XenServer driver does not) + if vcpus_total > 0: + host_state.limits['vcpu'] = vcpus_total + return (vcpus_total - host_state.vcpus_used) >= instance_vcpus diff --git a/nova/scheduler/filters/disk_filter.py b/nova/scheduler/filters/disk_filter.py new file mode 100644 index 000000000000..88b8c3377e20 --- /dev/null +++ b/nova/scheduler/filters/disk_filter.py @@ -0,0 +1,54 @@ +# Copyright (c) 2012 OpenStack, LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from nova import flags +from nova.openstack.common import cfg +from nova.openstack.common import log as logging +from nova.scheduler import filters + +LOG = logging.getLogger(__name__) + +disk_allocation_ratio_opt = cfg.FloatOpt("disk_allocation_ratio", default=1.0, + help="virtual disk to physical disk allocation ratio") + +FLAGS = flags.FLAGS +FLAGS.register_opt(disk_allocation_ratio_opt) + + +class DiskFilter(filters.BaseHostFilter): + """Disk Filter with over subscription flag""" + + def host_passes(self, host_state, filter_properties): + """Filter based on disk usage""" + instance_type = filter_properties.get('instance_type') + requested_disk = 1024 * (instance_type['root_gb'] + + instance_type['ephemeral_gb']) + + free_disk_mb = host_state.free_disk_mb + total_usable_disk_mb = host_state.total_usable_disk_gb * 1024 + + disk_mb_limit = total_usable_disk_mb * FLAGS.disk_allocation_ratio + used_disk_mb = total_usable_disk_mb - free_disk_mb + usable_disk_mb = disk_mb_limit - used_disk_mb + + if not usable_disk_mb >= requested_disk: + LOG.debug(_("%(host_state)s does not have %(requested_disk)s MB " + "usable disk, it only has %(usable_disk_mb)s MB usable " + "disk."), locals()) + return False + + disk_gb_limit = disk_mb_limit / 1024 + host_state.limits['disk_gb'] = disk_gb_limit + return True diff --git a/nova/scheduler/filters/ram_filter.py b/nova/scheduler/filters/ram_filter.py index 8fb89bf5b4ae..22ba0252ca85 100644 --- a/nova/scheduler/filters/ram_filter.py +++ b/nova/scheduler/filters/ram_filter.py @@ -39,17 +39,15 @@ class RamFilter(filters.BaseHostFilter): free_ram_mb = host_state.free_ram_mb total_usable_ram_mb = host_state.total_usable_ram_mb - oversubscribed_ram_limit_mb = (total_usable_ram_mb * - FLAGS.ram_allocation_ratio) + memory_mb_limit = total_usable_ram_mb * FLAGS.ram_allocation_ratio used_ram_mb = total_usable_ram_mb - free_ram_mb - usable_ram = oversubscribed_ram_limit_mb - used_ram_mb + usable_ram = memory_mb_limit - used_ram_mb if not usable_ram >= requested_ram: LOG.debug(_("%(host_state)s does not have %(requested_ram)s MB " "usable ram, it only has %(usable_ram)s MB usable ram."), locals()) return False - # save oversubscribe ram limit so the compute host can verify - # memory availability on builds: - filter_properties['memory_mb_limit'] = oversubscribed_ram_limit_mb + # save oversubscription limit for compute node to test against: + host_state.limits['memory_mb'] = memory_mb_limit return True diff --git a/nova/scheduler/host_manager.py b/nova/scheduler/host_manager.py index 695189881d54..7b6192c01c92 100644 --- a/nova/scheduler/host_manager.py +++ b/nova/scheduler/host_manager.py @@ -27,14 +27,7 @@ from nova.openstack.common import log as logging from nova.openstack.common import timeutils from nova.scheduler import filters - host_manager_opts = [ - cfg.IntOpt('reserved_host_disk_mb', - default=0, - help='Amount of disk in MB to reserve for host/dom0'), - cfg.IntOpt('reserved_host_memory_mb', - default=512, - help='Amount of memory in MB to reserve for host/dom0'), cfg.MultiStrOpt('scheduler_available_filters', default=['nova.scheduler.filters.standard_filters'], help='Filter classes available to the scheduler which may ' @@ -112,32 +105,31 @@ class HostState(object): self.service = ReadOnlyDict(service) # Mutable available resources. # These will change as resources are virtually "consumed". + self.total_usable_disk_gb = 0 + self.disk_mb_used = 0 self.free_ram_mb = 0 self.free_disk_mb = 0 self.vcpus_total = 0 self.vcpus_used = 0 + # Resource oversubscription values for the compute host: + self.limits = {} + def update_from_compute_node(self, compute): """Update information about a host from its compute_node info.""" - all_disk_mb = compute['local_gb'] * 1024 all_ram_mb = compute['memory_mb'] # Assume virtual size is all consumed by instances if use qcow2 disk. least = compute.get('disk_available_least') free_disk_mb = least if least is not None else compute['free_disk_gb'] free_disk_mb *= 1024 - free_ram_mb = compute['free_ram_mb'] - if FLAGS.reserved_host_disk_mb > 0: - all_disk_mb -= FLAGS.reserved_host_disk_mb - free_disk_mb -= FLAGS.reserved_host_disk_mb - if FLAGS.reserved_host_memory_mb > 0: - all_ram_mb -= FLAGS.reserved_host_memory_mb - free_ram_mb -= FLAGS.reserved_host_memory_mb + self.disk_mb_used = compute['local_gb_used'] * 1024 #NOTE(jogo) free_ram_mb can be negative - self.free_ram_mb = free_ram_mb + self.free_ram_mb = compute['free_ram_mb'] self.total_usable_ram_mb = all_ram_mb + self.total_usable_disk_gb = compute['local_gb'] self.free_disk_mb = free_disk_mb self.vcpus_total = compute['vcpus'] self.vcpus_used = compute['vcpus_used'] diff --git a/nova/tests/compute/fake_resource_tracker.py b/nova/tests/compute/fake_resource_tracker.py index faba13bab38a..9c404fbc0529 100644 --- a/nova/tests/compute/fake_resource_tracker.py +++ b/nova/tests/compute/fake_resource_tracker.py @@ -22,11 +22,10 @@ class FakeResourceTracker(resource_tracker.ResourceTracker): """Version without a DB requirement""" def _create(self, context, values): - return values + self.compute_node = values def _update(self, context, values, prune_stats=False): self.compute_node.update(values) - return self.compute_node def _get_service(self, context): return { diff --git a/nova/tests/compute/test_compute.py b/nova/tests/compute/test_compute.py index 14edaa7a0b25..105ee9f44844 100644 --- a/nova/tests/compute/test_compute.py +++ b/nova/tests/compute/test_compute.py @@ -275,23 +275,36 @@ class ComputeTestCase(BaseTestCase): finally: db.instance_destroy(self.context, instance['uuid']) - def test_create_instance_insufficient_memory(self): + def test_create_instance_unlimited_memory(self): + """Default of memory limit=None is unlimited""" + self.flags(reserved_host_disk_mb=0, reserved_host_memory_mb=0) + self.compute.resource_tracker.update_available_resource(self.context) params = {"memory_mb": 999999999999} + filter_properties = {'limits': {'memory_mb': None}} instance = self._create_fake_instance(params) - self.assertRaises(exception.ComputeResourcesUnavailable, - self.compute.run_instance, self.context, instance=instance) + self.compute.run_instance(self.context, instance=instance, + filter_properties=filter_properties) + self.assertEqual(999999999999, + self.compute.resource_tracker.compute_node['memory_mb_used']) - def test_create_instance_insufficient_disk(self): + def test_create_instance_unlimited_disk(self): + self.flags(reserved_host_disk_mb=0, reserved_host_memory_mb=0) + self.compute.resource_tracker.update_available_resource(self.context) params = {"root_gb": 999999999999, "ephemeral_gb": 99999999999} + filter_properties = {'limits': {'disk_gb': None}} instance = self._create_fake_instance(params) - self.assertRaises(exception.ComputeResourcesUnavailable, - self.compute.run_instance, self.context, instance=instance) + self.compute.run_instance(self.context, instance=instance, + filter_properties=filter_properties) def test_create_multiple_instances_then_starve(self): + self.flags(reserved_host_disk_mb=0, reserved_host_memory_mb=0) + self.compute.resource_tracker.update_available_resource(self.context) + filter_properties = {'limits': {'memory_mb': 4096, 'disk_gb': 1000}} params = {"memory_mb": 1024, "root_gb": 128, "ephemeral_gb": 128} instance = self._create_fake_instance(params) - self.compute.run_instance(self.context, instance=instance) + self.compute.run_instance(self.context, instance=instance, + filter_properties=filter_properties) self.assertEquals(1024, self.compute.resource_tracker.compute_node['memory_mb_used']) self.assertEquals(256, @@ -299,7 +312,8 @@ class ComputeTestCase(BaseTestCase): params = {"memory_mb": 2048, "root_gb": 256, "ephemeral_gb": 256} instance = self._create_fake_instance(params) - self.compute.run_instance(self.context, instance=instance) + self.compute.run_instance(self.context, instance=instance, + filter_properties=filter_properties) self.assertEquals(3072, self.compute.resource_tracker.compute_node['memory_mb_used']) self.assertEquals(768, @@ -308,11 +322,15 @@ class ComputeTestCase(BaseTestCase): params = {"memory_mb": 8192, "root_gb": 8192, "ephemeral_gb": 8192} instance = self._create_fake_instance(params) self.assertRaises(exception.ComputeResourcesUnavailable, - self.compute.run_instance, self.context, instance=instance) + self.compute.run_instance, self.context, instance=instance, + filter_properties=filter_properties) def test_create_instance_with_oversubscribed_ram(self): """Test passing of oversubscribed ram policy from the scheduler.""" + self.flags(reserved_host_disk_mb=0, reserved_host_memory_mb=0) + self.compute.resource_tracker.update_available_resource(self.context) + # get total memory as reported by virt driver: resources = self.compute.driver.get_available_resource() total_mem_mb = resources['memory_mb'] @@ -326,7 +344,8 @@ class ComputeTestCase(BaseTestCase): "ephemeral_gb": 128} instance = self._create_fake_instance(params) - filter_properties = dict(memory_mb_limit=oversub_limit_mb) + limits = {'memory_mb': oversub_limit_mb} + filter_properties = {'limits': limits} self.compute.run_instance(self.context, instance=instance, filter_properties=filter_properties) @@ -337,6 +356,9 @@ class ComputeTestCase(BaseTestCase): """Test passing of oversubscribed ram policy from the scheduler, but with insufficient memory. """ + self.flags(reserved_host_disk_mb=0, reserved_host_memory_mb=0) + self.compute.resource_tracker.update_available_resource(self.context) + # get total memory as reported by virt driver: resources = self.compute.driver.get_available_resource() total_mem_mb = resources['memory_mb'] @@ -350,12 +372,115 @@ class ComputeTestCase(BaseTestCase): "ephemeral_gb": 128} instance = self._create_fake_instance(params) - filter_properties = dict(memory_mb_limit=oversub_limit_mb) + filter_properties = {'limits': {'memory_mb': oversub_limit_mb}} self.assertRaises(exception.ComputeResourcesUnavailable, self.compute.run_instance, self.context, instance=instance, filter_properties=filter_properties) + def test_create_instance_with_oversubscribed_cpu(self): + """Test passing of oversubscribed cpu policy from the scheduler.""" + + self.flags(reserved_host_disk_mb=0, reserved_host_memory_mb=0) + self.compute.resource_tracker.update_available_resource(self.context) + limits = {'vcpu': 3} + filter_properties = {'limits': limits} + + # get total memory as reported by virt driver: + resources = self.compute.driver.get_available_resource() + self.assertEqual(1, resources['vcpus']) + + # build an instance, specifying an amount of memory that exceeds + # total_mem_mb, but is less than the oversubscribed limit: + params = {"memory_mb": 10, "root_gb": 1, + "ephemeral_gb": 1, "vcpus": 2} + instance = self._create_fake_instance(params) + self.compute.run_instance(self.context, instance=instance, + filter_properties=filter_properties) + + self.assertEqual(2, + self.compute.resource_tracker.compute_node['vcpus_used']) + + # create one more instance: + params = {"memory_mb": 10, "root_gb": 1, + "ephemeral_gb": 1, "vcpus": 1} + instance = self._create_fake_instance(params) + self.compute.run_instance(self.context, instance=instance, + filter_properties=filter_properties) + + self.assertEqual(3, + self.compute.resource_tracker.compute_node['vcpus_used']) + + # delete the instance: + instance['vm_state'] = vm_states.DELETED + self.compute.resource_tracker.update_usage(self.context, + instance=instance) + + self.assertEqual(2, + self.compute.resource_tracker.compute_node['vcpus_used']) + + # now oversubscribe vcpus and fail: + params = {"memory_mb": 10, "root_gb": 1, + "ephemeral_gb": 1, "vcpus": 2} + instance = self._create_fake_instance(params) + + limits = {'vcpu': 3} + filter_properties = {'limits': limits} + self.assertRaises(exception.ComputeResourcesUnavailable, + self.compute.run_instance, self.context, instance=instance, + filter_properties=filter_properties) + + def test_create_instance_with_oversubscribed_disk(self): + """Test passing of oversubscribed disk policy from the scheduler.""" + + self.flags(reserved_host_disk_mb=0, reserved_host_memory_mb=0) + self.compute.resource_tracker.update_available_resource(self.context) + + # get total memory as reported by virt driver: + resources = self.compute.driver.get_available_resource() + total_disk_gb = resources['local_gb'] + + oversub_limit_gb = total_disk_gb * 1.5 + instance_gb = int(total_disk_gb * 1.45) + + # build an instance, specifying an amount of disk that exceeds + # total_disk_gb, but is less than the oversubscribed limit: + params = {"root_gb": instance_gb, "memory_mb": 10} + instance = self._create_fake_instance(params) + + limits = {'disk_gb': oversub_limit_gb} + filter_properties = {'limits': limits} + self.compute.run_instance(self.context, instance=instance, + filter_properties=filter_properties) + + self.assertEqual(instance_gb, + self.compute.resource_tracker.compute_node['local_gb_used']) + + def test_create_instance_with_oversubscribed_disk_fail(self): + """Test passing of oversubscribed disk policy from the scheduler, but + with insufficient disk. + """ + self.flags(reserved_host_disk_mb=0, reserved_host_memory_mb=0) + self.compute.resource_tracker.update_available_resource(self.context) + + # get total memory as reported by virt driver: + resources = self.compute.driver.get_available_resource() + total_disk_gb = resources['local_gb'] + + oversub_limit_gb = total_disk_gb * 1.5 + instance_gb = int(total_disk_gb * 1.55) + + # build an instance, specifying an amount of disk that exceeds + # total_disk_gb, but is less than the oversubscribed limit: + params = {"root_gb": instance_gb, "memory_mb": 10} + instance = self._create_fake_instance(params) + + limits = {'disk_gb': oversub_limit_gb} + filter_properties = {'limits': limits} + self.assertRaises(exception.ComputeResourcesUnavailable, + self.compute.run_instance, self.context, instance=instance, + filter_properties=filter_properties) + def test_default_access_ip(self): self.flags(default_access_ip_network_name='test1') fake_network.unset_stub_network_methods(self.stubs) diff --git a/nova/tests/compute/test_resource_tracker.py b/nova/tests/compute/test_resource_tracker.py index 092029fa66a0..0af0e42842d4 100644 --- a/nova/tests/compute/test_resource_tracker.py +++ b/nova/tests/compute/test_resource_tracker.py @@ -17,17 +17,20 @@ """Tests for compute resource tracking""" -import copy +import uuid from nova.compute import resource_tracker from nova.compute import task_states from nova.compute import vm_states from nova import db from nova import exception +from nova.openstack.common import log as logging from nova.openstack.common import timeutils from nova import test from nova.virt import driver +LOG = logging.getLogger(__name__) + class FakeContext(object): def __init__(self, is_admin=False): @@ -75,20 +78,12 @@ class BaseTestCase(test.TestCase): def setUp(self): super(BaseTestCase, self).setUp() + self.flags(reserved_host_disk_mb=0, + reserved_host_memory_mb=0) + self.context = FakeContext() - self.instance_ref = { - "memory_mb": 1, - "root_gb": 1, - "ephemeral_gb": 1, - "vm_state": vm_states.BUILDING, - "task_state": None, - "os_type": "Linux", - "project_id": "1234", - "vcpus": 1, - "uuid": "12-34-56-78-90", - } - + self._instances = [] self.stubs.Set(db, 'instance_get_all_by_filters', self._fake_instance_get_all_by_filters) @@ -126,8 +121,26 @@ class BaseTestCase(test.TestCase): } return service + def _fake_instance(self, *args, **kwargs): + instance = { + 'uuid': str(uuid.uuid1()), + 'vm_state': vm_states.BUILDING, + 'task_state': None, + 'memory_mb': 2, + 'root_gb': 3, + 'ephemeral_gb': 1, + 'os_type': 'Linux', + 'project_id': '123456', + 'vcpus': 1, + 'host': None, + } + instance.update(kwargs) + + self._instances.append(instance) + return instance + def _fake_instance_get_all_by_filters(self, ctx, filters, **kwargs): - return [] + return self._instances def _tracker(self, unsupported=False): host = "fakehost" @@ -161,22 +174,16 @@ class UnsupportedDriverTestCase(BaseTestCase): claim = self.tracker.begin_resource_claim(self.context, 1, 1) self.assertEqual(None, claim) - def testDisabledContextClaim(self): - # basic context manager variation: - with self.tracker.resource_claim(self.context, 1, 1): - pass - self.assertEqual(0, len(self.tracker.claims)) - def testDisabledInstanceClaim(self): # instance variation: - claim = self.tracker.begin_instance_resource_claim(self.context, - self.instance_ref) + instance = self._fake_instance() + claim = self.tracker.begin_resource_claim(self.context, instance) self.assertEqual(None, claim) def testDisabledInstanceContextClaim(self): # instance context manager variation: - with self.tracker.instance_resource_claim(self.context, - self.instance_ref): + instance = self._fake_instance() + with self.tracker.resource_claim(self.context, instance): pass self.assertEqual(0, len(self.tracker.claims)) @@ -187,10 +194,10 @@ class UnsupportedDriverTestCase(BaseTestCase): self.assertEqual(None, self.tracker.abort_resource_claim(self.context, None)) - def testDisabledFreeResources(self): - self.tracker.free_resources(self.context) - self.assertTrue(self.tracker.disabled) - self.assertEqual(None, self.tracker.compute_node) + def testDisabledUpdateUsage(self): + instance = self._fake_instance(host='fakehost', memory_mb=5, + root_gb=10) + self.tracker.update_usage(self.context, instance) class MissingServiceTestCase(BaseTestCase): @@ -253,9 +260,35 @@ class ResourceTestCase(BaseTestCase): prune_stats=False): self.updated = True values['stats'] = [{"key": "num_instances", "value": "1"}] + self.compute.update(values) return self.compute + def testUpdateUseOnlyForTracked(self): + """Only update usage is a previous claim has added instance to + list of tracked instances. + """ + instance = self._fake_instance(memory_mb=3, root_gb=1, ephemeral_gb=1, + task_state=None) + self.tracker.update_usage(self.context, instance) + + self.assertEqual(0, self.tracker.compute_node['memory_mb_used']) + self.assertEqual(0, self.tracker.compute_node['local_gb_used']) + self.assertEqual(0, self.tracker.compute_node['current_workload']) + + claim = self.tracker.begin_resource_claim(self.context, instance) + self.assertNotEqual(None, claim) + self.assertEqual(3, self.tracker.compute_node['memory_mb_used']) + self.assertEqual(2, self.tracker.compute_node['local_gb_used']) + + # now update should actually take effect + instance['task_state'] = task_states.SCHEDULING + self.tracker.update_usage(self.context, instance) + + self.assertEqual(3, self.tracker.compute_node['memory_mb_used']) + self.assertEqual(2, self.tracker.compute_node['local_gb_used']) + self.assertEqual(1, self.tracker.compute_node['current_workload']) + def testFreeRamResourceValue(self): driver = FakeVirtDriver() mem_free = driver.memory_mb - driver.memory_mb_used @@ -270,42 +303,109 @@ class ResourceTestCase(BaseTestCase): self.assertFalse(self.tracker.disabled) self.assertTrue(self.updated) - def testInsufficientMemoryClaim(self): - """Exceed memory limit of 5MB""" - claim = self.tracker.begin_resource_claim(self.context, memory_mb=2, - disk_gb=0) + def testCpuUnlimited(self): + """Test default of unlimited CPU""" + self.assertEqual(0, self.tracker.compute_node['vcpus_used']) + instance = self._fake_instance(memory_mb=1, root_gb=1, ephemeral_gb=1, + vcpus=100000) + claim = self.tracker.begin_resource_claim(self.context, instance) self.assertNotEqual(None, claim) + self.assertEqual(100000, self.tracker.compute_node['vcpus_used']) - claim = self.tracker.begin_resource_claim(self.context, memory_mb=3, - disk_gb=0) + def testCpuOversubscription(self): + """Test client-supplied oversubscription of CPU""" + self.assertEqual(1, self.tracker.compute_node['vcpus']) + + instance = self._fake_instance(memory_mb=1, root_gb=1, ephemeral_gb=1, + vcpus=3) + limits = {'vcpu': 5} + claim = self.tracker.begin_resource_claim(self.context, instance, + limits) self.assertNotEqual(None, claim) + self.assertEqual(3, self.tracker.compute_node['vcpus_used']) - claim = self.tracker.begin_resource_claim(self.context, memory_mb=1, - disk_gb=0) - self.assertEqual(None, claim) + def testMemoryOversubscription(self): + """Test client-supplied oversubscription of memory""" + instance = self._fake_instance(memory_mb=8, root_gb=1, ephemeral_gb=1) + limits = {'memory_mb': 8} + claim = self.tracker.begin_resource_claim(self.context, instance, + limits) + self.assertNotEqual(None, claim) + self.assertEqual(8, self.tracker.compute_node['memory_mb_used']) + self.assertEqual(2, self.tracker.compute_node['local_gb_used']) + + def testDiskOversubscription(self): + """Test client-supplied oversubscription of disk space""" + instance = self._fake_instance(memory_mb=1, root_gb=10, ephemeral_gb=1) + limits = {'disk_gb': 12} + claim = self.tracker.begin_resource_claim(self.context, instance, + limits) + self.assertNotEqual(None, claim) + self.assertEqual(1, self.tracker.compute_node['memory_mb_used']) + self.assertEqual(11, self.tracker.compute_node['local_gb_used']) + + def testUnlimitedMemoryClaim(self): + """Test default of unlimited memory""" + instance = self._fake_instance(memory_mb=200000000000, root_gb=1, + ephemeral_gb=1) + claim = self.tracker.begin_resource_claim(self.context, instance) + self.assertNotEqual(None, claim) + self.assertEqual(200000000000, + self.tracker.compute_node['memory_mb_used']) def testInsufficientMemoryClaimWithOversubscription(self): """Exceed oversubscribed memory limit of 10MB""" - claim = self.tracker.begin_resource_claim(self.context, memory_mb=10, - disk_gb=0, memory_mb_limit=10) + instance = self._fake_instance(memory_mb=10, root_gb=0, + ephemeral_gb=0) + limits = {'memory_mb': 10} + claim = self.tracker.begin_resource_claim(self.context, instance, + limits) self.assertNotEqual(None, claim) - claim = self.tracker.begin_resource_claim(self.context, memory_mb=1, - disk_gb=0, memory_mb_limit=10) + instance = self._fake_instance(memory_mb=1, root_gb=0, + ephemeral_gb=0) + limits = {'memory_mb': 10} + claim = self.tracker.begin_resource_claim(self.context, instance, + limits) self.assertEqual(None, claim) - def testInsufficientDiskClaim(self): - """Exceed disk limit of 5GB""" - claim = self.tracker.begin_resource_claim(self.context, memory_mb=0, - disk_gb=2) + def testUnlimitDiskClaim(self): + """Test default of unlimited disk space""" + instance = self._fake_instance(memory_mb=0, root_gb=200000000, + ephemeral_gb=0) + claim = self.tracker.begin_resource_claim(self.context, instance) + self.assertNotEqual(None, claim) + self.assertEqual(200000000, self.tracker.compute_node['local_gb_used']) + + def testInsufficientDiskClaimWithOversubscription(self): + """Exceed oversubscribed disk limit of 10GB""" + instance = self._fake_instance(memory_mb=1, root_gb=4, + ephemeral_gb=5) # 9 GB + limits = {'disk_gb': 10} + claim = self.tracker.begin_resource_claim(self.context, instance, + limits) self.assertNotEqual(None, claim) - claim = self.tracker.begin_resource_claim(self.context, memory_mb=0, - disk_gb=3) - self.assertNotEqual(None, claim) + instance = self._fake_instance(memory_mb=1, root_gb=1, + ephemeral_gb=1) # 2 GB + limits = {'disk_gb': 10} + claim = self.tracker.begin_resource_claim(self.context, instance, + limits) + self.assertEqual(None, claim) - claim = self.tracker.begin_resource_claim(self.context, memory_mb=0, - disk_gb=5) + def testInsufficientCpuClaim(self): + instance = self._fake_instance(memory_mb=0, root_gb=0, + ephemeral_gb=0, vcpus=1) + claim = self.tracker.begin_resource_claim(self.context, instance) + self.assertNotEqual(None, claim) + self.assertEqual(1, self.tracker.compute_node['vcpus_used']) + + instance = self._fake_instance(memory_mb=0, root_gb=0, + ephemeral_gb=0, vcpus=1) + + limits = {'vcpu': 1} + claim = self.tracker.begin_resource_claim(self.context, instance, + limits) self.assertEqual(None, claim) def testClaimAndFinish(self): @@ -317,8 +417,9 @@ class ResourceTestCase(BaseTestCase): claim_mem = 3 claim_disk = 2 - claim = self.tracker.begin_resource_claim(self.context, claim_mem, - claim_disk) + instance = self._fake_instance(memory_mb=claim_mem, root_gb=claim_disk, + ephemeral_gb=0) + claim = self.tracker.begin_resource_claim(self.context, instance) self.assertEqual(5, self.compute["memory_mb"]) self.assertEqual(claim_mem, self.compute["memory_mb_used"]) @@ -334,35 +435,26 @@ class ResourceTestCase(BaseTestCase): driver.memory_mb_used = claim_mem driver.local_gb_used = claim_disk - # 2nd update compute node from the virt layer. because the claim is - # in-progress (unfinished), the audit will actually mark the resources - # as unsubscribed: self.tracker.update_available_resource(self.context) - self.assertEqual(2 * claim_mem, - self.compute['memory_mb_used']) - self.assertEqual(5 - (2 * claim_mem), - self.compute['free_ram_mb']) + # confirm that resource usage is derived from instance usages, + # not virt layer: + self.assertEqual(claim_mem, self.compute['memory_mb_used']) + self.assertEqual(5 - claim_mem, self.compute['free_ram_mb']) - self.assertEqual(2 * claim_disk, - self.compute['local_gb_used']) - self.assertEqual(6 - (2 * claim_disk), - self.compute['free_disk_gb']) + self.assertEqual(claim_disk, self.compute['local_gb_used']) + self.assertEqual(6 - claim_disk, self.compute['free_disk_gb']) # Finally, finish the claimm and update from the virt layer again. # Resource usage will be consistent again: self.tracker.finish_resource_claim(claim) self.tracker.update_available_resource(self.context) - self.assertEqual(claim_mem, - self.compute['memory_mb_used']) - self.assertEqual(5 - claim_mem, - self.compute['free_ram_mb']) + self.assertEqual(claim_mem, self.compute['memory_mb_used']) + self.assertEqual(5 - claim_mem, self.compute['free_ram_mb']) - self.assertEqual(claim_disk, - self.compute['local_gb_used']) - self.assertEqual(6 - claim_disk, - self.compute['free_disk_gb']) + self.assertEqual(claim_disk, self.compute['local_gb_used']) + self.assertEqual(6 - claim_disk, self.compute['free_disk_gb']) def testClaimAndAbort(self): self.assertEqual(5, self.tracker.compute_node['memory_mb']) @@ -373,8 +465,10 @@ class ResourceTestCase(BaseTestCase): claim_mem = 3 claim_disk = 2 - claim = self.tracker.begin_resource_claim(self.context, claim_mem, - claim_disk) + instance = self._fake_instance(memory_mb=claim_mem, + root_gb=claim_disk, ephemeral_gb=0) + claim = self.tracker.begin_resource_claim(self.context, instance) + self.assertNotEqual(None, claim) self.assertEqual(5, self.compute["memory_mb"]) self.assertEqual(claim_mem, self.compute["memory_mb_used"]) @@ -398,14 +492,14 @@ class ResourceTestCase(BaseTestCase): """Test that old claims get cleaned up automatically if not finished or aborted explicitly. """ - claim = self.tracker.begin_resource_claim(self.context, memory_mb=2, - disk_gb=2) + instance = self._fake_instance(memory_mb=2, root_gb=2, ephemeral_gb=0) + claim = self.tracker.begin_resource_claim(self.context, instance) claim.expire_ts = timeutils.utcnow_ts() - 1 self.assertTrue(claim.is_expired()) # and an unexpired claim - claim2 = self.tracker.begin_resource_claim(self.context, memory_mb=1, - disk_gb=1) + instance2 = self._fake_instance(memory_mb=1, root_gb=1, ephemeral_gb=0) + claim2 = self.tracker.begin_resource_claim(self.context, instance2) self.assertEqual(2, len(self.tracker.claims)) self.assertEqual(2 + 1, self.tracker.compute_node['memory_mb_used']) @@ -415,34 +509,30 @@ class ResourceTestCase(BaseTestCase): self.tracker.update_available_resource(self.context) self.assertEqual(1, len(self.tracker.claims)) - self.assertEqual(1, self.tracker.compute_node['memory_mb_used']) - self.assertEqual(1, self.tracker.compute_node['local_gb_used']) + self.assertEqual(2, len(self.tracker.tracked_instances)) - # and just call finish & abort to ensure expired claims do not cause - # any other explosions: + # the expired claim's instance is assumed to still exist, so the + # resources should be counted: + self.assertEqual(2 + 1, self.tracker.compute_node['memory_mb_used']) + self.assertEqual(2 + 1, self.tracker.compute_node['local_gb_used']) + + # this abort should do nothing because the claim was purged due to + # expiration: self.tracker.abort_resource_claim(self.context, claim) - self.tracker.finish_resource_claim(claim) + + # call finish on claim2: + self.tracker.finish_resource_claim(claim2) + + # should have usage from both instances: + self.assertEqual(1 + 2, self.tracker.compute_node['memory_mb_used']) + self.assertEqual(1 + 2, self.tracker.compute_node['local_gb_used']) def testInstanceClaim(self): - self.tracker.begin_instance_resource_claim(self.context, - self.instance_ref) + instance = self._fake_instance(memory_mb=1, root_gb=0, ephemeral_gb=2) + self.tracker.begin_resource_claim(self.context, instance) self.assertEqual(1, self.tracker.compute_node['memory_mb_used']) self.assertEqual(2, self.tracker.compute_node['local_gb_used']) - def testContextClaim(self): - with self.tracker.resource_claim(self.context, memory_mb=1, disk_gb=1): - # - self.assertEqual(1, self.tracker.compute_node['memory_mb_used']) - self.assertEqual(1, self.tracker.compute_node['local_gb_used']) - self.assertEqual(1, self.compute['memory_mb_used']) - self.assertEqual(1, self.compute['local_gb_used']) - - self.tracker.update_available_resource(self.context) - self.assertEqual(0, self.tracker.compute_node['memory_mb_used']) - self.assertEqual(0, self.tracker.compute_node['local_gb_used']) - self.assertEqual(0, self.compute['memory_mb_used']) - self.assertEqual(0, self.compute['local_gb_used']) - def testContextClaimWithException(self): try: with self.tracker.resource_claim(self.context, memory_mb=1, @@ -459,34 +549,63 @@ class ResourceTestCase(BaseTestCase): self.assertEqual(0, self.compute['local_gb_used']) def testInstanceContextClaim(self): - with self.tracker.instance_resource_claim(self.context, - self.instance_ref): + instance = self._fake_instance(memory_mb=1, root_gb=1, ephemeral_gb=1) + with self.tracker.resource_claim(self.context, instance): # self.assertEqual(1, self.tracker.compute_node['memory_mb_used']) self.assertEqual(2, self.tracker.compute_node['local_gb_used']) self.assertEqual(1, self.compute['memory_mb_used']) self.assertEqual(2, self.compute['local_gb_used']) + # after exiting claim context, build is marked as finished. usage + # totals should be same: self.tracker.update_available_resource(self.context) - self.assertEqual(0, self.tracker.compute_node['memory_mb_used']) - self.assertEqual(0, self.tracker.compute_node['local_gb_used']) - self.assertEqual(0, self.compute['memory_mb_used']) - self.assertEqual(0, self.compute['local_gb_used']) + self.assertEqual(1, self.tracker.compute_node['memory_mb_used']) + self.assertEqual(2, self.tracker.compute_node['local_gb_used']) + self.assertEqual(1, self.compute['memory_mb_used']) + self.assertEqual(2, self.compute['local_gb_used']) def testUpdateLoadStatsForInstance(self): self.assertFalse(self.tracker.disabled) self.assertEqual(0, self.tracker.compute_node['current_workload']) - self.instance_ref['task_state'] = task_states.SCHEDULING - with self.tracker.instance_resource_claim(self.context, - self.instance_ref): + instance = self._fake_instance(task_state=task_states.SCHEDULING) + with self.tracker.resource_claim(self.context, instance): pass self.assertEqual(1, self.tracker.compute_node['current_workload']) - self.instance_ref['vm_state'] = vm_states.ACTIVE - self.instance_ref['task_state'] = None + instance['vm_state'] = vm_states.ACTIVE + instance['task_state'] = None + instance['host'] = 'fakehost' - self.tracker.update_load_stats_for_instance(self.context, - self.instance_ref) + self.tracker.update_usage(self.context, instance) self.assertEqual(0, self.tracker.compute_node['current_workload']) + + def testCpuStats(self): + limits = {'disk_gb': 100, 'memory_mb': 100} + self.assertEqual(0, self.tracker.compute_node['vcpus_used']) + + instance = self._fake_instance(vcpus=1) + + # should not do anything until a claim is made: + self.tracker.update_usage(self.context, instance) + self.assertEqual(0, self.tracker.compute_node['vcpus_used']) + + with self.tracker.resource_claim(self.context, instance, limits): + pass + self.assertEqual(1, self.tracker.compute_node['vcpus_used']) + + # instance state can change without modifying vcpus in use: + instance['task_state'] = task_states.SCHEDULING + self.tracker.update_usage(self.context, instance) + self.assertEqual(1, self.tracker.compute_node['vcpus_used']) + + instance = self._fake_instance(vcpus=10) + with self.tracker.resource_claim(self.context, instance, limits): + pass + self.assertEqual(11, self.tracker.compute_node['vcpus_used']) + + instance['vm_state'] = vm_states.DELETED + self.tracker.update_usage(self.context, instance) + self.assertEqual(1, self.tracker.compute_node['vcpus_used']) diff --git a/nova/tests/scheduler/fakes.py b/nova/tests/scheduler/fakes.py index 27341a62c93e..29466fbe16bc 100644 --- a/nova/tests/scheduler/fakes.py +++ b/nova/tests/scheduler/fakes.py @@ -28,16 +28,20 @@ from nova.scheduler import host_manager COMPUTE_NODES = [ dict(id=1, local_gb=1024, memory_mb=1024, vcpus=1, disk_available_least=512, free_ram_mb=512, vcpus_used=1, - free_disk_mb=512, service=dict(host='host1', disabled=False)), + free_disk_mb=512, local_gb_used=0, + service=dict(host='host1', disabled=False)), dict(id=2, local_gb=2048, memory_mb=2048, vcpus=2, disk_available_least=1024, free_ram_mb=1024, vcpus_used=2, - free_disk_mb=1024, service=dict(host='host2', disabled=True)), + free_disk_mb=1024, local_gb_used=0, + service=dict(host='host2', disabled=True)), dict(id=3, local_gb=4096, memory_mb=4096, vcpus=4, disk_available_least=3072, free_ram_mb=3072, vcpus_used=1, - free_disk_mb=3072, service=dict(host='host3', disabled=False)), + free_disk_mb=3072, local_gb_used=0, + service=dict(host='host3', disabled=False)), dict(id=4, local_gb=8192, memory_mb=8192, vcpus=8, disk_available_least=8192, free_ram_mb=8192, vcpus_used=0, - free_disk_mb=8192, service=dict(host='host4', disabled=False)), + free_disk_mb=8192, local_gb_used=0, + service=dict(host='host4', disabled=False)), # Broken entry dict(id=5, local_gb=1024, memory_mb=1024, vcpus=1, service=None), ] diff --git a/nova/tests/scheduler/test_filter_scheduler.py b/nova/tests/scheduler/test_filter_scheduler.py index 130251a0f34a..9e78f3a50733 100644 --- a/nova/tests/scheduler/test_filter_scheduler.py +++ b/nova/tests/scheduler/test_filter_scheduler.py @@ -216,7 +216,6 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase): self.assertEqual(info['called'], 0) def test_get_cost_functions(self): - self.flags(reserved_host_memory_mb=128) fixture = fakes.FakeFilterScheduler() fns = fixture.get_cost_functions() self.assertEquals(len(fns), 1) @@ -225,8 +224,9 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase): hostinfo = host_manager.HostState('host', 'compute') hostinfo.update_from_compute_node(dict(memory_mb=1000, local_gb=0, vcpus=1, disk_available_least=1000, - free_disk_mb=1000, free_ram_mb=1000, vcpus_used=0)) - self.assertEquals(1000 - 128, fn(hostinfo, {})) + free_disk_mb=1000, free_ram_mb=872, vcpus_used=0, + local_gb_used=0)) + self.assertEquals(872, fn(hostinfo, {})) def test_max_attempts(self): self.flags(scheduler_max_attempts=4) diff --git a/nova/tests/scheduler/test_host_filters.py b/nova/tests/scheduler/test_host_filters.py index 19b008212e78..42d43a197f99 100644 --- a/nova/tests/scheduler/test_host_filters.py +++ b/nova/tests/scheduler/test_host_filters.py @@ -514,6 +514,18 @@ class HostFiltersTestCase(test.TestCase): 'capabilities': capabilities, 'service': service}) self.assertFalse(filt_cls.host_passes(host, filter_properties)) + def test_ram_filter_passes(self): + self._stub_service_is_up(True) + filt_cls = self.class_map['RamFilter']() + self.flags(ram_allocation_ratio=1.0) + filter_properties = {'instance_type': {'memory_mb': 1024}} + capabilities = {'enabled': True} + service = {'disabled': False} + host = fakes.FakeHostState('host1', 'compute', + {'free_ram_mb': 1024, 'total_usable_ram_mb': 1024, + 'capabilities': capabilities, 'service': service}) + self.assertTrue(filt_cls.host_passes(host, filter_properties)) + def test_ram_filter_oversubscribe(self): self._stub_service_is_up(True) filt_cls = self.class_map['RamFilter']() @@ -525,24 +537,62 @@ class HostFiltersTestCase(test.TestCase): {'free_ram_mb': -1024, 'total_usable_ram_mb': 2048, 'capabilities': capabilities, 'service': service}) self.assertTrue(filt_cls.host_passes(host, filter_properties)) + self.assertEqual(2048 * 2.0, host.limits['memory_mb']) - def test_ram_filter_sets_memory_limit(self): - """Test that ram filter sets a filter_property denoting the memory - ceiling. - """ + def test_disk_filter_passes(self): self._stub_service_is_up(True) - filt_cls = self.class_map['RamFilter']() - self.flags(ram_allocation_ratio=2.0) - filter_properties = {'instance_type': {'memory_mb': 1024}} + filt_cls = self.class_map['DiskFilter']() + self.flags(disk_allocation_ratio=1.0) + filter_properties = {'instance_type': {'root_gb': 1, + 'ephemeral_gb': 1}} capabilities = {'enabled': True} service = {'disabled': False} host = fakes.FakeHostState('host1', 'compute', - {'free_ram_mb': -1024, 'total_usable_ram_mb': 2048, + {'free_disk_mb': 11 * 1024, 'total_usable_disk_gb': 13, 'capabilities': capabilities, 'service': service}) - filt_cls.host_passes(host, filter_properties) + self.assertTrue(filt_cls.host_passes(host, filter_properties)) - self.assertEqual(host.total_usable_ram_mb * 2.0, - filter_properties['memory_mb_limit']) + def test_disk_filter_fails(self): + self._stub_service_is_up(True) + filt_cls = self.class_map['DiskFilter']() + self.flags(disk_allocation_ratio=1.0) + filter_properties = {'instance_type': {'root_gb': 2, + 'ephemeral_gb': 1}} + capabilities = {'enabled': True} + service = {'disabled': False} + host = fakes.FakeHostState('host1', 'compute', + {'free_disk_mb': 11 * 1024, 'total_usable_disk_gb': 13, + 'capabilities': capabilities, 'service': service}) + self.assertTrue(filt_cls.host_passes(host, filter_properties)) + + def test_disk_filter_oversubscribe(self): + self._stub_service_is_up(True) + filt_cls = self.class_map['DiskFilter']() + self.flags(disk_allocation_ratio=10.0) + filter_properties = {'instance_type': {'root_gb': 100, + 'ephemeral_gb': 19}} + capabilities = {'enabled': True} + service = {'disabled': False} + # 1GB used... so 119GB allowed... + host = fakes.FakeHostState('host1', 'compute', + {'free_disk_mb': 11 * 1024, 'total_usable_disk_gb': 12, + 'capabilities': capabilities, 'service': service}) + self.assertTrue(filt_cls.host_passes(host, filter_properties)) + self.assertEqual(12 * 10.0, host.limits['disk_gb']) + + def test_disk_filter_oversubscribe_fail(self): + self._stub_service_is_up(True) + filt_cls = self.class_map['DiskFilter']() + self.flags(disk_allocation_ratio=10.0) + filter_properties = {'instance_type': {'root_gb': 100, + 'ephemeral_gb': 20}} + capabilities = {'enabled': True} + service = {'disabled': False} + # 1GB used... so 119GB allowed... + host = fakes.FakeHostState('host1', 'compute', + {'free_disk_mb': 11 * 1024, 'total_usable_disk_gb': 12, + 'capabilities': capabilities, 'service': service}) + self.assertFalse(filt_cls.host_passes(host, filter_properties)) def test_compute_filter_fails_on_service_disabled(self): self._stub_service_is_up(True) diff --git a/nova/tests/scheduler/test_host_manager.py b/nova/tests/scheduler/test_host_manager.py index a2f9fc425ec8..2ca8f3ad9cd9 100644 --- a/nova/tests/scheduler/test_host_manager.py +++ b/nova/tests/scheduler/test_host_manager.py @@ -122,8 +122,6 @@ class HostManagerTestCase(test.TestCase): self.assertDictMatch(service_states, expected) def test_get_all_host_states(self): - self.flags(reserved_host_memory_mb=512, - reserved_host_disk_mb=1024) context = 'fake_context' topic = 'compute' @@ -145,18 +143,18 @@ class HostManagerTestCase(test.TestCase): host = compute_node['service']['host'] self.assertEqual(host_states[host].service, compute_node['service']) - self.assertEqual(host_states['host1'].free_ram_mb, 0) + self.assertEqual(host_states['host1'].free_ram_mb, 512) # 511GB - self.assertEqual(host_states['host1'].free_disk_mb, 523264) - self.assertEqual(host_states['host2'].free_ram_mb, 512) + self.assertEqual(host_states['host1'].free_disk_mb, 524288) + self.assertEqual(host_states['host2'].free_ram_mb, 1024) # 1023GB - self.assertEqual(host_states['host2'].free_disk_mb, 1047552) - self.assertEqual(host_states['host3'].free_ram_mb, 2560) + self.assertEqual(host_states['host2'].free_disk_mb, 1048576) + self.assertEqual(host_states['host3'].free_ram_mb, 3072) # 3071GB - self.assertEqual(host_states['host3'].free_disk_mb, 3144704) - self.assertEqual(host_states['host4'].free_ram_mb, 7680) + self.assertEqual(host_states['host3'].free_disk_mb, 3145728) + self.assertEqual(host_states['host4'].free_ram_mb, 8192) # 8191GB - self.assertEqual(host_states['host4'].free_disk_mb, 8387584) + self.assertEqual(host_states['host4'].free_disk_mb, 8388608) class HostStateTestCase(test.TestCase): diff --git a/nova/virt/fake.py b/nova/virt/fake.py index 48ee67d5b61b..a6476f9d9a5a 100644 --- a/nova/virt/fake.py +++ b/nova/virt/fake.py @@ -231,7 +231,7 @@ class FakeDriver(driver.ComputeDriver): """ dic = {'vcpus': 1, - 'memory_mb': 4096, + 'memory_mb': 8192, 'local_gb': 1028, 'vcpus_used': 0, 'memory_mb_used': 0,