Merge "Integrated consolidation strategy with watcher"

This commit is contained in:
Jenkins 2016-03-29 15:36:28 +00:00 committed by Gerrit Code Review
commit 99ff6d3348
5 changed files with 1069 additions and 1 deletions

View File

@ -49,6 +49,7 @@ watcher_strategies =
dummy = watcher.decision_engine.strategy.strategies.dummy_strategy:DummyStrategy
basic = watcher.decision_engine.strategy.strategies.basic_consolidation:BasicConsolidation
outlet_temp_control = watcher.decision_engine.strategy.strategies.outlet_temp_control:OutletTempControl
vm_workload_consolidation = watcher.decision_engine.strategy.strategies.vm_workload_consolidation:VMWorkloadConsolidation
watcher_actions =
migrate = watcher.applier.actions.migration:Migrate

View File

@ -18,10 +18,14 @@
from watcher.decision_engine.strategy.strategies import basic_consolidation
from watcher.decision_engine.strategy.strategies import dummy_strategy
from watcher.decision_engine.strategy.strategies import outlet_temp_control
from watcher.decision_engine.strategy.strategies \
import vm_workload_consolidation
BasicConsolidation = basic_consolidation.BasicConsolidation
OutletTempControl = outlet_temp_control.OutletTempControl
DummyStrategy = dummy_strategy.DummyStrategy
VMWorkloadConsolidation = vm_workload_consolidation.VMWorkloadConsolidation
__all__ = (BasicConsolidation, OutletTempControl, DummyStrategy)
__all__ = (BasicConsolidation, OutletTempControl, DummyStrategy,
VMWorkloadConsolidation)

View File

@ -0,0 +1,522 @@
# -*- encoding: utf-8 -*-
#
# Authors: Vojtech CIMA <cima@zhaw.ch>
# Bruno GRAZIOLI <gaea@zhaw.ch>
# Sean MURPHY <murp@zhaw.ch>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from copy import deepcopy
from oslo_log import log
from watcher._i18n import _LE, _LI
from watcher.common import exception
from watcher.decision_engine.model import hypervisor_state as hyper_state
from watcher.decision_engine.model import resource
from watcher.decision_engine.model import vm_state
from watcher.decision_engine.strategy.strategies import base
from watcher.metrics_engine.cluster_history import ceilometer \
as ceilometer_cluster_history
LOG = log.getLogger(__name__)
class VMWorkloadConsolidation(base.BaseStrategy):
"""VM Workload Consolidation Strategy.
A load consolidation strategy based on heuristic first-fit
algorithm which focuses on measured CPU utilization and tries to
minimize hosts which have too much or too little load respecting
resource capacity constraints.
This strategy produces a solution resulting in more efficient
utilization of cluster resources using following four phases:
* Offload phase - handling over-utilized resources
* Consolidation phase - handling under-utilized resources
* Solution optimization - reducing number of migrations
* Deactivation of unused hypervisors
A capacity coefficients (cc) might be used to adjust optimization
thresholds. Different resources may require different coefficient
values as well as setting up different coefficient values in both
phases may lead to to more efficient consolidation in the end.
If the cc equals 1 the full resource capacity may be used, cc
values lower than 1 will lead to resource under utilization and
values higher than 1 will lead to resource overbooking.
e.g. If targeted utilization is 80% of hypervisor capacity,
the coefficient in the consolidation phase will be 0.8, but
may any lower value in the offloading phase. The lower it gets
the cluster will appear more released (distributed) for the
following consolidation phase.
As this strategy laverages VM live migration to move the load
from one hypervisor to another, this feature needs to be set up
correctly on all hypervisors within the cluster.
This strategy assumes it is possible to live migrate any VM from
an active hypervisor to any other active hypervisor.
"""
DEFAULT_NAME = 'vm_workload_consolidation'
DEFAULT_DESCRIPTION = 'VM Workload Consolidation Strategy'
def __init__(self, name=DEFAULT_NAME, description=DEFAULT_DESCRIPTION,
osc=None):
super(VMWorkloadConsolidation, self).__init__(name, description, osc)
self._ceilometer = None
self.number_of_migrations = 0
self.number_of_released_hypervisors = 0
self.ceilometer_vm_data_cache = dict()
@property
def ceilometer(self):
if self._ceilometer is None:
self._ceilometer = (ceilometer_cluster_history.
CeilometerClusterHistory(osc=self.osc))
return self._ceilometer
@ceilometer.setter
def ceilometer(self, ceilometer):
self._ceilometer = ceilometer
def get_state_str(self, state):
"""Get resource state in string format.
:param state: resource state of unknown type
"""
if type(state) == str:
return state
elif (type(state) == hyper_state.HypervisorState or
type(state) == vm_state.VMState):
return state.value
else:
LOG.error(_LE('Unexpexted resource state type, '
'state=%(state)s, state_type=%(st)s.'),
state=state,
st=type(state))
raise exception.WatcherException
def add_action_activate_hypervisor(self, hypervisor):
"""Add an action for hypervisor activation into the solution.
:param hypervisor: hypervisor object
:return: None
"""
params = {'state': hyper_state.HypervisorState.ONLINE.value}
self.solution.add_action(
action_type='change_nova_service_state',
resource_id=hypervisor.uuid,
input_parameters=params)
self.number_of_released_hypervisors -= 1
def add_action_deactivate_hypervisor(self, hypervisor):
"""Add an action for hypervisor deactivation into the solution.
:param hypervisor: hypervisor object
:return: None
"""
params = {'state': hyper_state.HypervisorState.OFFLINE.value}
self.solution.add_action(
action_type='change_nova_service_state',
resource_id=hypervisor.uuid,
input_parameters=params)
self.number_of_released_hypervisors += 1
def add_migration(self, vm_uuid, src_hypervisor,
dst_hypervisor, model):
"""Add an action for VM migration into the solution.
:param vm_uuid: vm uuid
:param src_hypervisor: hypervisor object
:param dst_hypervisor: hypervisor object
:param model: model_root object
:return: None
"""
vm = model.get_vm_from_id(vm_uuid)
vm_state_str = self.get_state_str(vm.state)
if vm_state_str != vm_state.VMState.ACTIVE.value:
'''
Watcher curently only supports live VM migration and block live
VM migration which both requires migrated VM to be active.
When supported, the cold migration may be used as a fallback
migration mechanism to move non active VMs.
'''
LOG.error(_LE('Cannot live migrate: vm_uuid=%(vm_uuid)s, '
'state=%(vm_state)s.'),
vm_uuid=vm_uuid,
vm_state=vm_state_str)
raise exception.WatcherException
migration_type = 'live'
dst_hyper_state_str = self.get_state_str(dst_hypervisor.state)
if dst_hyper_state_str == hyper_state.HypervisorState.OFFLINE.value:
self.add_action_activate_hypervisor(dst_hypervisor)
model.get_mapping().unmap(src_hypervisor, vm)
model.get_mapping().map(dst_hypervisor, vm)
params = {'migration_type': migration_type,
'src_hypervisor': src_hypervisor.uuid,
'dst_hypervisor': dst_hypervisor.uuid}
self.solution.add_action(action_type='migrate',
resource_id=vm.uuid,
input_parameters=params)
self.number_of_migrations += 1
def deactivate_unused_hypervisors(self, model):
"""Generate actions for deactivation of unused hypervisors.
:param model: model_root object
:return: None
"""
for hypervisor in model.get_all_hypervisors().values():
if len(model.get_mapping().get_node_vms(hypervisor)) == 0:
self.add_action_deactivate_hypervisor(hypervisor)
def get_prediction_model(self, model):
"""Return a deepcopy of a model representing current cluster state.
:param model: model_root object
:return: model_root object
"""
return deepcopy(model)
def get_vm_utilization(self, vm_uuid, model, period=3600, aggr='avg'):
"""Collect cpu, ram and disk utilization statistics of a VM.
:param vm_uuid: vm object
:param model: model_root object
:param period: seconds
:param aggr: string
:return: dict(cpu(number of vcpus used), ram(MB used), disk(B used))
"""
if vm_uuid in self.ceilometer_vm_data_cache.keys():
return self.ceilometer_vm_data_cache.get(vm_uuid)
cpu_util_metric = 'cpu_util'
ram_util_metric = 'memory.usage'
ram_alloc_metric = 'memory'
disk_alloc_metric = 'disk.root.size'
vm_cpu_util = self.ceilometer.statistic_aggregation(
resource_id=vm_uuid, meter_name=cpu_util_metric,
period=period, aggregate=aggr)
vm_cpu_cores = model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(
model.get_vm_from_id(vm_uuid))
if vm_cpu_util:
total_cpu_utilization = vm_cpu_cores * (vm_cpu_util / 100.0)
else:
total_cpu_utilization = vm_cpu_cores
vm_ram_util = self.ceilometer.statistic_aggregation(
resource_id=vm_uuid, meter_name=ram_util_metric,
period=period, aggregate=aggr)
if not vm_ram_util:
vm_ram_util = self.ceilometer.statistic_aggregation(
resource_id=vm_uuid, meter_name=ram_alloc_metric,
period=period, aggregate=aggr)
vm_disk_util = self.ceilometer.statistic_aggregation(
resource_id=vm_uuid, meter_name=disk_alloc_metric,
period=period, aggregate=aggr)
if not vm_ram_util or not vm_disk_util:
LOG.error(
_LE('No values returned by %(resource_id)s '
'for memory.usage or disk.root.size'),
resource_id=vm_uuid
)
raise exception.NoDataFound
self.ceilometer_vm_data_cache[vm_uuid] = dict(
cpu=total_cpu_utilization, ram=vm_ram_util, disk=vm_disk_util)
return self.ceilometer_vm_data_cache.get(vm_uuid)
def get_hypervisor_utilization(self, hypervisor, model, period=3600,
aggr='avg'):
"""Collect cpu, ram and disk utilization statistics of a hypervisor.
:param hypervisor: hypervisor object
:param model: model_root object
:param period: seconds
:param aggr: string
:return: dict(cpu(number of cores used), ram(MB used), disk(B used))
"""
hypervisor_vms = model.get_mapping().get_node_vms_from_id(
hypervisor.uuid)
hypervisor_ram_util = 0
hypervisor_disk_util = 0
hypervisor_cpu_util = 0
for vm_uuid in hypervisor_vms:
vm_util = self.get_vm_utilization(vm_uuid, model, period, aggr)
hypervisor_cpu_util += vm_util['cpu']
hypervisor_ram_util += vm_util['ram']
hypervisor_disk_util += vm_util['disk']
return dict(cpu=hypervisor_cpu_util, ram=hypervisor_ram_util,
disk=hypervisor_disk_util)
def get_hypervisor_capacity(self, hypervisor, model):
"""Collect cpu, ram and disk capacity of a hypervisor.
:param hypervisor: hypervisor object
:param model: model_root object
:return: dict(cpu(cores), ram(MB), disk(B))
"""
hypervisor_cpu_capacity = model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity(hypervisor)
hypervisor_disk_capacity = model.get_resource_from_id(
resource.ResourceType.disk_capacity).get_capacity(hypervisor)
hypervisor_ram_capacity = model.get_resource_from_id(
resource.ResourceType.memory).get_capacity(hypervisor)
return dict(cpu=hypervisor_cpu_capacity, ram=hypervisor_ram_capacity,
disk=hypervisor_disk_capacity)
def get_relative_hypervisor_utilization(self, hypervisor, model):
"""Return relative hypervisor utilization (rhu).
:param hypervisor: hypervisor object
:param model: model_root object
:return: {'cpu': <0,1>, 'ram': <0,1>, 'disk': <0,1>}
"""
rhu = {}
util = self.get_hypervisor_utilization(hypervisor, model)
cap = self.get_hypervisor_capacity(hypervisor, model)
for k in util.keys():
rhu[k] = float(util[k]) / float(cap[k])
return rhu
def get_relative_cluster_utilization(self, model):
"""Calculate relative cluster utilization (rcu).
RCU is an average of relative utilizations (rhu) of active hypervisors.
:param model: model_root object
:return: {'cpu': <0,1>, 'ram': <0,1>, 'disk': <0,1>}
"""
hypervisors = model.get_all_hypervisors().values()
rcu = {}
counters = {}
for hypervisor in hypervisors:
hyper_state_str = self.get_state_str(hypervisor.state)
if hyper_state_str == hyper_state.HypervisorState.ONLINE.value:
rhu = self.get_relative_hypervisor_utilization(
hypervisor, model)
for k in rhu.keys():
if k not in rcu:
rcu[k] = 0
if k not in counters:
counters[k] = 0
rcu[k] += rhu[k]
counters[k] += 1
for k in rcu.keys():
rcu[k] /= counters[k]
return rcu
def is_overloaded(self, hypervisor, model, cc):
"""Indicate whether a hypervisor is overloaded.
This considers provided resource capacity coefficients (cc).
:param hypervisor: hypervisor object
:param model: model_root object
:param cc: dictionary containing resource capacity coefficients
:return: [True, False]
"""
hypervisor_capacity = self.get_hypervisor_capacity(hypervisor, model)
hypervisor_utilization = self.get_hypervisor_utilization(
hypervisor, model)
metrics = ['cpu']
for m in metrics:
if hypervisor_utilization[m] > hypervisor_capacity[m] * cc[m]:
return True
return False
def vm_fits(self, vm_uuid, hypervisor, model, cc):
"""Indicate whether is a hypervisor able to accomodate a VM.
This considers provided resource capacity coefficients (cc).
:param vm_uuid: string
:param hypervisor: hypervisor object
:param model: model_root object
:param cc: dictionary containing resource capacity coefficients
:return: [True, False]
"""
hypervisor_capacity = self.get_hypervisor_capacity(hypervisor, model)
hypervisor_utilization = self.get_hypervisor_utilization(
hypervisor, model)
vm_utilization = self.get_vm_utilization(vm_uuid, model)
metrics = ['cpu', 'ram', 'disk']
for m in metrics:
if (vm_utilization[m] + hypervisor_utilization[m] >
hypervisor_capacity[m] * cc[m]):
return False
return True
def optimize_solution(self, model):
"""Optimize solution.
This is done by eliminating unnecessary or circular set of migrations
which can be replaced by a more efficient solution.
e.g.:
* A->B, B->C => replace migrations A->B, B->C with
a single migration A->C as both solution result in
VM running on hypervisor C which can be achieved with
one migration instead of two.
* A->B, B->A => remove A->B and B->A as they do not result
in a new VM placement.
:param model: model_root object
"""
migrate_actions = (
a for a in self.solution.actions if a[
'action_type'] == 'migrate')
vm_to_be_migrated = (a['input_parameters']['resource_id']
for a in migrate_actions)
vm_uuids = list(set(vm_to_be_migrated))
for vm_uuid in vm_uuids:
actions = list(
a for a in self.solution.actions if a[
'input_parameters'][
'resource_id'] == vm_uuid)
if len(actions) > 1:
src = actions[0]['input_parameters']['src_hypervisor']
dst = actions[-1]['input_parameters']['dst_hypervisor']
for a in actions:
self.solution.actions.remove(a)
self.number_of_migrations -= 1
if src != dst:
self.add_migration(vm_uuid, src, dst, model)
def offload_phase(self, model, cc):
"""Perform offloading phase.
This considers provided resource capacity coefficients.
Offload phase performing first-fit based bin packing to offload
overloaded hypervisors. This is done in a fashion of moving
the least CPU utilized VM first as live migration these
generaly causes less troubles. This phase results in a cluster
with no overloaded hypervisors.
* This phase is be able to activate turned off hypervisors (if needed
and any available) in the case of the resource capacity provided by
active hypervisors is not able to accomodate all the load.
As the offload phase is later followed by the consolidation phase,
the hypervisor activation in this phase doesn't necessarily results
in more activated hypervisors in the final solution.
:param model: model_root object
:param cc: dictionary containing resource capacity coefficients
"""
sorted_hypervisors = sorted(
model.get_all_hypervisors().values(),
key=lambda x: self.get_hypervisor_utilization(x, model)['cpu'])
for hypervisor in reversed(sorted_hypervisors):
if self.is_overloaded(hypervisor, model, cc):
for vm in sorted(model.get_mapping().get_node_vms(hypervisor),
key=lambda x: self.get_vm_utilization(
x, model)['cpu']):
for dst_hypervisor in reversed(sorted_hypervisors):
if self.vm_fits(vm, dst_hypervisor, model, cc):
self.add_migration(vm, hypervisor,
dst_hypervisor, model)
break
if not self.is_overloaded(hypervisor, model, cc):
break
def consolidation_phase(self, model, cc):
"""Perform consolidation phase.
This considers provided resource capacity coefficients.
Consolidation phase performing first-fit based bin packing.
First, hypervisors with the lowest cpu utilization are consolidated
by moving their load to hypervisors with the highest cpu utilization
which can accomodate the load. In this phase the most cpu utilizied
VMs are prioritizied as their load is more difficult to accomodate
in the system than less cpu utilizied VMs which can be later used
to fill smaller CPU capacity gaps.
:param model: model_root object
:param cc: dictionary containing resource capacity coefficients
"""
sorted_hypervisors = sorted(
model.get_all_hypervisors().values(),
key=lambda x: self.get_hypervisor_utilization(x, model)['cpu'])
asc = 0
for hypervisor in sorted_hypervisors:
vms = sorted(model.get_mapping().get_node_vms(hypervisor),
key=lambda x: self.get_vm_utilization(x,
model)['cpu'])
for vm in reversed(vms):
dsc = len(sorted_hypervisors) - 1
for dst_hypervisor in reversed(sorted_hypervisors):
if asc >= dsc:
break
if self.vm_fits(vm, dst_hypervisor, model, cc):
self.add_migration(vm, hypervisor,
dst_hypervisor, model)
break
dsc -= 1
asc += 1
def execute(self, original_model):
"""Execute strategy.
This strategy produces a solution resulting in more
efficient utilization of cluster resources using following
four phases:
* Offload phase - handling over-utilized resources
* Consolidation phase - handling under-utilized resources
* Solution optimization - reducing number of migrations
* Deactivation of unused hypervisors
:param original_model: root_model object
"""
LOG.info(_LI('Executing Smart Strategy'))
model = self.get_prediction_model(original_model)
rcu = self.get_relative_cluster_utilization(model)
self.ceilometer_vm_data_cache = dict()
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
# Offloading phase
self.offload_phase(model, cc)
# Consolidation phase
self.consolidation_phase(model, cc)
# Optimize solution
self.optimize_solution(model)
# Deactivate unused hypervisors
self.deactivate_unused_hypervisors(model)
rcu_after = self.get_relative_cluster_utilization(model)
info = {
'number_of_migrations': self.number_of_migrations,
'number_of_released_hypervisors':
self.number_of_released_hypervisors,
'relative_cluster_utilization_before': str(rcu),
'relative_cluster_utilization_after': str(rcu_after)
}
LOG.debug(info)
self.solution.model = model
self.solution.efficacy = rcu_after['cpu']
return self.solution

View File

@ -0,0 +1,264 @@
# -*- encoding: utf-8 -*-
#
# Authors: Vojtech CIMA <cima@zhaw.ch>
# Bruno GRAZIOLI <gaea@zhaw.ch>
# Sean MURPHY <murp@zhaw.ch>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from watcher.decision_engine.model import hypervisor
from watcher.decision_engine.model import model_root as modelroot
from watcher.decision_engine.model import resource
from watcher.decision_engine.model import vm as modelvm
from watcher.decision_engine.model import vm_state
from watcher.metrics_engine.cluster_model_collector import base
class FakerModelCollector(base.BaseClusterModelCollector):
def __init__(self):
pass
def get_latest_cluster_data_model(self):
return self.generate_scenario_1()
def generate_scenario_1(self):
"""Simulates cluster with 2 hypervisors and 2 VMs using 1:1 mapping"""
current_state_cluster = modelroot.ModelRoot()
count_node = 2
count_vm = 2
mem = resource.Resource(resource.ResourceType.memory)
num_cores = resource.Resource(resource.ResourceType.cpu_cores)
disk = resource.Resource(resource.ResourceType.disk)
disk_capacity =\
resource.Resource(resource.ResourceType.disk_capacity)
current_state_cluster.create_resource(mem)
current_state_cluster.create_resource(num_cores)
current_state_cluster.create_resource(disk)
current_state_cluster.create_resource(disk_capacity)
for i in range(0, count_node):
node_uuid = "Node_{0}".format(i)
node = hypervisor.Hypervisor()
node.uuid = node_uuid
node.hostname = "hostname_{0}".format(i)
node.state = 'up'
mem.set_capacity(node, 64)
disk_capacity.set_capacity(node, 250)
num_cores.set_capacity(node, 40)
current_state_cluster.add_hypervisor(node)
for i in range(0, count_vm):
vm_uuid = "VM_{0}".format(i)
vm = modelvm.VM()
vm.uuid = vm_uuid
vm.state = vm_state.VMState.ACTIVE
mem.set_capacity(vm, 2)
disk.set_capacity(vm, 20)
num_cores.set_capacity(vm, 10)
current_state_cluster.add_vm(vm)
current_state_cluster.get_mapping().map(
current_state_cluster.get_hypervisor_from_id("Node_0"),
current_state_cluster.get_vm_from_id("VM_0"))
current_state_cluster.get_mapping().map(
current_state_cluster.get_hypervisor_from_id("Node_1"),
current_state_cluster.get_vm_from_id("VM_1"))
return current_state_cluster
def generate_scenario_2(self):
"""Simulates a cluster
With 4 hypervisors and 6 VMs all mapped to one hypervisor
"""
current_state_cluster = modelroot.ModelRoot()
count_node = 4
count_vm = 6
mem = resource.Resource(resource.ResourceType.memory)
num_cores = resource.Resource(resource.ResourceType.cpu_cores)
disk = resource.Resource(resource.ResourceType.disk)
disk_capacity =\
resource.Resource(resource.ResourceType.disk_capacity)
current_state_cluster.create_resource(mem)
current_state_cluster.create_resource(num_cores)
current_state_cluster.create_resource(disk)
current_state_cluster.create_resource(disk_capacity)
for i in range(0, count_node):
node_uuid = "Node_{0}".format(i)
node = hypervisor.Hypervisor()
node.uuid = node_uuid
node.hostname = "hostname_{0}".format(i)
node.state = 'up'
mem.set_capacity(node, 64)
disk_capacity.set_capacity(node, 250)
num_cores.set_capacity(node, 16)
current_state_cluster.add_hypervisor(node)
for i in range(0, count_vm):
vm_uuid = "VM_{0}".format(i)
vm = modelvm.VM()
vm.uuid = vm_uuid
vm.state = vm_state.VMState.ACTIVE
mem.set_capacity(vm, 2)
disk.set_capacity(vm, 20)
num_cores.set_capacity(vm, 10)
current_state_cluster.add_vm(vm)
current_state_cluster.get_mapping().map(
current_state_cluster.get_hypervisor_from_id("Node_0"),
current_state_cluster.get_vm_from_id("VM_%s" % str(i)))
return current_state_cluster
def generate_scenario_3(self):
"""Simulates a cluster
With 4 hypervisors and 6 VMs all mapped to one hypervisor
"""
current_state_cluster = modelroot.ModelRoot()
count_node = 2
count_vm = 4
mem = resource.Resource(resource.ResourceType.memory)
num_cores = resource.Resource(resource.ResourceType.cpu_cores)
disk = resource.Resource(resource.ResourceType.disk)
disk_capacity =\
resource.Resource(resource.ResourceType.disk_capacity)
current_state_cluster.create_resource(mem)
current_state_cluster.create_resource(num_cores)
current_state_cluster.create_resource(disk)
current_state_cluster.create_resource(disk_capacity)
for i in range(0, count_node):
node_uuid = "Node_{0}".format(i)
node = hypervisor.Hypervisor()
node.uuid = node_uuid
node.hostname = "hostname_{0}".format(i)
node.state = 'up'
mem.set_capacity(node, 64)
disk_capacity.set_capacity(node, 250)
num_cores.set_capacity(node, 10)
current_state_cluster.add_hypervisor(node)
for i in range(6, 6 + count_vm):
vm_uuid = "VM_{0}".format(i)
vm = modelvm.VM()
vm.uuid = vm_uuid
vm.state = vm_state.VMState.ACTIVE
mem.set_capacity(vm, 2)
disk.set_capacity(vm, 20)
num_cores.set_capacity(vm, 2 ** (i-6))
current_state_cluster.add_vm(vm)
current_state_cluster.get_mapping().map(
current_state_cluster.get_hypervisor_from_id("Node_0"),
current_state_cluster.get_vm_from_id("VM_%s" % str(i)))
return current_state_cluster
class FakeCeilometerMetrics(object):
def __init__(self, model):
self.model = model
def mock_get_statistics(self, resource_id, meter_name, period=3600,
aggregate='avg'):
if meter_name == "compute.node.cpu.percent":
return self.get_hypervisor_cpu_util(resource_id)
elif meter_name == "cpu_util":
return self.get_vm_cpu_util(resource_id)
elif meter_name == "memory.usage":
return self.get_vm_ram_util(resource_id)
elif meter_name == "disk.root.size":
return self.get_vm_disk_root_size(resource_id)
def get_hypervisor_cpu_util(self, r_id):
"""Calculates hypervisor utilization dynamicaly.
Hypervisor CPU utilization should consider
and corelate with actual VM-hypervisor mappings
provided within a cluster model.
Returns relative hypervisor CPU utilization <0, 100>.
:param r_id: resource id
"""
id = '%s_%s' % (r_id.split('_')[0], r_id.split('_')[1])
vms = self.model.get_mapping().get_node_vms_from_id(id)
util_sum = 0.0
hypervisor_cpu_cores = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores).get_capacity_from_id(id)
for vm_uuid in vms:
vm_cpu_cores = self.model.get_resource_from_id(
resource.ResourceType.cpu_cores).\
get_capacity(self.model.get_vm_from_id(vm_uuid))
total_cpu_util = vm_cpu_cores * self.get_vm_cpu_util(vm_uuid)
util_sum += total_cpu_util / 100.0
util_sum /= hypervisor_cpu_cores
return util_sum * 100.0
def get_vm_cpu_util(self, r_id):
vm_cpu_util = dict()
vm_cpu_util['VM_0'] = 10
vm_cpu_util['VM_1'] = 30
vm_cpu_util['VM_2'] = 60
vm_cpu_util['VM_3'] = 20
vm_cpu_util['VM_4'] = 40
vm_cpu_util['VM_5'] = 50
vm_cpu_util['VM_6'] = 100
vm_cpu_util['VM_7'] = 100
vm_cpu_util['VM_8'] = 100
vm_cpu_util['VM_9'] = 100
return vm_cpu_util[str(r_id)]
def get_vm_ram_util(self, r_id):
vm_ram_util = dict()
vm_ram_util['VM_0'] = 1
vm_ram_util['VM_1'] = 2
vm_ram_util['VM_2'] = 4
vm_ram_util['VM_3'] = 8
vm_ram_util['VM_4'] = 3
vm_ram_util['VM_5'] = 2
vm_ram_util['VM_6'] = 1
vm_ram_util['VM_7'] = 2
vm_ram_util['VM_8'] = 4
vm_ram_util['VM_9'] = 8
return vm_ram_util[str(r_id)]
def get_vm_disk_root_size(self, r_id):
vm_disk_util = dict()
vm_disk_util['VM_0'] = 10
vm_disk_util['VM_1'] = 15
vm_disk_util['VM_2'] = 30
vm_disk_util['VM_3'] = 35
vm_disk_util['VM_4'] = 20
vm_disk_util['VM_5'] = 25
vm_disk_util['VM_6'] = 25
vm_disk_util['VM_7'] = 25
vm_disk_util['VM_8'] = 25
vm_disk_util['VM_9'] = 25
return vm_disk_util[str(r_id)]

View File

@ -0,0 +1,277 @@
# -*- encoding: utf-8 -*-
#
# Authors: Vojtech CIMA <cima@zhaw.ch>
# Bruno GRAZIOLI <gaea@zhaw.ch>
# Sean MURPHY <murp@zhaw.ch>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import mock
from watcher.decision_engine.strategy import strategies
from watcher.tests import base
from watcher.tests.decision_engine.strategy.strategies \
import faker_cluster_and_metrics
class TestSmartConsolidation(base.BaseTestCase):
fake_cluster = faker_cluster_and_metrics.FakerModelCollector()
def test_get_vm_utilization(self):
cluster = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(cluster)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
vm_0 = cluster.get_vm_from_id("VM_0")
vm_util = dict(cpu=1.0, ram=1, disk=10)
self.assertEqual(vm_util,
strategy.get_vm_utilization(vm_0.uuid, cluster))
def test_get_hypervisor_utilization(self):
cluster = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(cluster)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
node_0 = cluster.get_hypervisor_from_id("Node_0")
node_util = dict(cpu=1.0, ram=1, disk=10)
self.assertEqual(node_util,
strategy.get_hypervisor_utilization(node_0, cluster))
def test_get_hypervisor_capacity(self):
cluster = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(cluster)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
node_0 = cluster.get_hypervisor_from_id("Node_0")
node_util = dict(cpu=40, ram=64, disk=250)
self.assertEqual(node_util,
strategy.get_hypervisor_capacity(node_0, cluster))
def test_get_relative_hypervisor_utilization(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
hypervisor = model.get_hypervisor_from_id('Node_0')
rhu = strategy.get_relative_hypervisor_utilization(hypervisor, model)
expected_rhu = {'disk': 0.04, 'ram': 0.015625, 'cpu': 0.025}
self.assertEqual(expected_rhu, rhu)
def test_get_relative_cluster_utilization(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
cru = strategy.get_relative_cluster_utilization(model)
expected_cru = {'cpu': 0.05, 'disk': 0.05, 'ram': 0.0234375}
self.assertEqual(expected_cru, cru)
def test_add_migration(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h1 = model.get_hypervisor_from_id('Node_0')
h2 = model.get_hypervisor_from_id('Node_1')
vm_uuid = 'VM_0'
strategy.add_migration(vm_uuid, h1, h2, model)
self.assertEqual(1, len(strategy.solution.actions))
expected = {'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2.uuid,
'src_hypervisor': h1.uuid,
'migration_type': 'live',
'resource_id': vm_uuid}}
self.assertEqual(expected, strategy.solution.actions[0])
def test_is_overloaded(self):
strategy = strategies.VMWorkloadConsolidation()
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h1 = model.get_hypervisor_from_id('Node_0')
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
res = strategy.is_overloaded(h1, model, cc)
self.assertEqual(False, res)
cc = {'cpu': 0.025, 'ram': 1.0, 'disk': 1.0}
res = strategy.is_overloaded(h1, model, cc)
self.assertEqual(False, res)
cc = {'cpu': 0.024, 'ram': 1.0, 'disk': 1.0}
res = strategy.is_overloaded(h1, model, cc)
self.assertEqual(True, res)
def test_vm_fits(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h = model.get_hypervisor_from_id('Node_1')
vm_uuid = 'VM_0'
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
res = strategy.vm_fits(vm_uuid, h, model, cc)
self.assertEqual(True, res)
cc = {'cpu': 0.025, 'ram': 1.0, 'disk': 1.0}
res = strategy.vm_fits(vm_uuid, h, model, cc)
self.assertEqual(False, res)
def test_add_action_activate_hypervisor(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h = model.get_hypervisor_from_id('Node_0')
strategy.add_action_activate_hypervisor(h)
expected = [{'action_type': 'change_nova_service_state',
'input_parameters': {'state': 'up',
'resource_id': 'Node_0'}}]
self.assertEqual(expected, strategy.solution.actions)
def test_add_action_deactivate_hypervisor(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h = model.get_hypervisor_from_id('Node_0')
strategy.add_action_deactivate_hypervisor(h)
expected = [{'action_type': 'change_nova_service_state',
'input_parameters': {'state': 'down',
'resource_id': 'Node_0'}}]
self.assertEqual(expected, strategy.solution.actions)
def test_deactivate_unused_hypervisors(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h1 = model.get_hypervisor_from_id('Node_0')
h2 = model.get_hypervisor_from_id('Node_1')
vm_uuid = 'VM_0'
strategy.deactivate_unused_hypervisors(model)
self.assertEqual(0, len(strategy.solution.actions))
# Migrate VM to free the hypervisor
strategy.add_migration(vm_uuid, h1, h2, model)
strategy.deactivate_unused_hypervisors(model)
expected = {'action_type': 'change_nova_service_state',
'input_parameters': {'state': 'down',
'resource_id': 'Node_0'}}
self.assertEqual(2, len(strategy.solution.actions))
self.assertEqual(expected, strategy.solution.actions[1])
def test_offload_phase(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
strategy.offload_phase(model, cc)
expected = []
self.assertEqual(expected, strategy.solution.actions)
def test_consolidation_phase(self):
model = self.fake_cluster.generate_scenario_1()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h1 = model.get_hypervisor_from_id('Node_0')
h2 = model.get_hypervisor_from_id('Node_1')
vm_uuid = 'VM_0'
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
strategy.consolidation_phase(model, cc)
expected = [{'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2.uuid,
'src_hypervisor': h1.uuid,
'migration_type': 'live',
'resource_id': vm_uuid}}]
self.assertEqual(expected, strategy.solution.actions)
def test_strategy(self):
model = self.fake_cluster.generate_scenario_2()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h1 = model.get_hypervisor_from_id('Node_0')
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
strategy.offload_phase(model, cc)
strategy.consolidation_phase(model, cc)
strategy.optimize_solution(model)
h2 = strategy.solution.actions[0][
'input_parameters']['dst_hypervisor']
expected = [{'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2,
'src_hypervisor': h1.uuid,
'migration_type': 'live',
'resource_id': 'VM_3'}},
{'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2,
'src_hypervisor': h1.uuid,
'migration_type': 'live',
'resource_id': 'VM_1'}}]
self.assertEqual(expected, strategy.solution.actions)
def test_strategy2(self):
model = self.fake_cluster.generate_scenario_3()
fake_metrics = faker_cluster_and_metrics.FakeCeilometerMetrics(model)
strategy = strategies.VMWorkloadConsolidation()
strategy.ceilometer = mock.MagicMock(
statistic_aggregation=fake_metrics.mock_get_statistics)
h1 = model.get_hypervisor_from_id('Node_0')
h2 = model.get_hypervisor_from_id('Node_1')
cc = {'cpu': 1.0, 'ram': 1.0, 'disk': 1.0}
strategy.offload_phase(model, cc)
expected = [{'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2.uuid,
'migration_type': 'live',
'resource_id': 'VM_6',
'src_hypervisor': h1.uuid}},
{'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2.uuid,
'migration_type': 'live',
'resource_id': 'VM_7',
'src_hypervisor': h1.uuid}},
{'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h2.uuid,
'migration_type': 'live',
'resource_id': 'VM_8',
'src_hypervisor': h1.uuid}}]
self.assertEqual(expected, strategy.solution.actions)
strategy.consolidation_phase(model, cc)
expected.append({'action_type': 'migrate',
'input_parameters': {'dst_hypervisor': h1.uuid,
'migration_type': 'live',
'resource_id': 'VM_7',
'src_hypervisor': h2.uuid}})
self.assertEqual(expected, strategy.solution.actions)
strategy.optimize_solution(model)
del expected[3]
del expected[1]
self.assertEqual(expected, strategy.solution.actions)