outlet Temperature based migration strategy

It implements one of the algorithm of Intel thermal POC.
It extends the BaseStrategy class, getting the Outlet
metrics of servers via Ceilometer, and generates solutions
when the outlet Temperature is greater than threshold.
current threshold is hard-coded, will make it configurable
in the next patches.

Implements: blueprint outlet-temperature-based-strategy

Change-Id: I248147329d34eddf408652205a077895be572010
Co-Authored-By: Zhenzan Zhou <zhenzan.zhou@intel.com>
This commit is contained in:
junjie huang 2015-12-14 00:09:57 +00:00
parent 660c782626
commit 8ebc898924
8 changed files with 440 additions and 27 deletions

View File

@ -44,6 +44,7 @@ watcher.database.migration_backend =
watcher_strategies =
dummy = watcher.decision_engine.strategy.strategies.dummy_strategy:DummyStrategy
basic = watcher.decision_engine.strategy.strategies.basic_consolidation:BasicConsolidation
outlet_temp_control = watcher.decision_engine.strategy.strategies.outlet_temp_control:OutletTempControl
[build_sphinx]
source-dir = doc/source

View File

@ -53,7 +53,7 @@ class KeystoneClient(object):
self._token = None
def get_endpoint(self, **kwargs):
kc = self._get_ksclient()
kc = self.get_ksclient()
if not kc.has_service_catalog():
raise exception.KeystoneFailure(
_('No Keystone service catalog loaded')
@ -63,7 +63,7 @@ class KeystoneClient(object):
if kwargs.get('region_name'):
attr = 'region'
filter_value = kwargs.get('region_name')
return self._get_ksclient().service_catalog.url_for(
return kc.service_catalog.url_for(
service_type=kwargs.get('service_type') or 'metering',
attr=attr,
filter_value=filter_value,
@ -81,15 +81,17 @@ class KeystoneClient(object):
# fails to override the version in the URL
return urljoin(auth_url.rstrip('/'), api_version)
def _get_ksclient(self):
def get_ksclient(self, creds=None):
"""Get an endpoint and auth token from Keystone."""
ks_args = self.get_credentials()
auth_version = CONF.keystone_authtoken.auth_version
auth_url = CONF.keystone_authtoken.auth_uri
api_version = self._is_apiv3(auth_url, auth_version)
api_v3 = self._is_apiv3(auth_url, auth_version)
if creds is None:
ks_args = self._get_credentials(api_v3)
else:
ks_args = creds
if api_version:
if api_v3:
from keystoneclient.v3 import client
else:
from keystoneclient.v2_0 import client
@ -99,17 +101,29 @@ class KeystoneClient(object):
return client.Client(**ks_args)
def get_credentials(self):
creds = \
{'auth_url': CONF.keystone_authtoken.auth_uri,
'username': CONF.keystone_authtoken.admin_user,
'password': CONF.keystone_authtoken.admin_password,
'project_name': CONF.keystone_authtoken.admin_tenant_name,
'user_domain_name': "default",
'project_domain_name': "default"}
def _get_credentials(self, api_v3):
if api_v3:
creds = \
{'auth_url': CONF.keystone_authtoken.auth_uri,
'username': CONF.keystone_authtoken.admin_user,
'password': CONF.keystone_authtoken.admin_password,
'project_name': CONF.keystone_authtoken.admin_tenant_name,
'user_domain_name': "default",
'project_domain_name': "default"}
else:
creds = \
{'auth_url': CONF.keystone_authtoken.auth_uri,
'username': CONF.keystone_authtoken.admin_user,
'password': CONF.keystone_authtoken.admin_password,
'tenant_name': CONF.keystone_authtoken.admin_tenant_name}
LOG.debug(creds)
return creds
def get_credentials(self):
api_v3 = self._is_apiv3(CONF.keystone_authtoken.auth_uri,
CONF.keystone_authtoken.auth_version)
return self._get_credentials(api_v3)
def get_session(self):
creds = self.get_credentials()
self._auth = generic.Password(**creds)

View File

@ -25,11 +25,12 @@ from oslo_log import log
import cinderclient.exceptions as ciexceptions
import cinderclient.v2.client as ciclient
import glanceclient.v2.client as glclient
import keystoneclient.v3.client as ksclient
import neutronclient.neutron.client as netclient
import novaclient.client as nvclient
import novaclient.exceptions as nvexceptions
from watcher.common import keystone
LOG = log.getLogger(__name__)
@ -43,7 +44,7 @@ class NovaClient(object):
self.cinder = None
self.nova = nvclient.Client(self.NOVA_CLIENT_API_VERSION,
session=session)
self.keystone = ksclient.Client(**creds)
self.keystone = keystone.KeystoneClient().get_ksclient(creds)
self.glance = None
def get_hypervisors_list(self):

View File

@ -0,0 +1,251 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 Intel Corp
#
# Authors: Junjie-Huang <junjie.huang@intel.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from oslo_log import log
from watcher._i18n import _LE
from watcher.common import exception as wexc
from watcher.decision_engine.actions.migration import Migrate
from watcher.decision_engine.actions.migration import MigrationType
from watcher.decision_engine.model.resource import ResourceType
from watcher.decision_engine.model.vm_state import VMState
from watcher.decision_engine.strategy.common.level import StrategyLevel
from watcher.decision_engine.strategy.strategies.base import BaseStrategy
from watcher.metrics_engine.cluster_history.ceilometer import \
CeilometerClusterHistory
LOG = log.getLogger(__name__)
class OutletTempControl(BaseStrategy):
DEFAULT_NAME = "outlet_temp_control"
DEFAULT_DESCRIPTION = "outlet temperature based migration strategy"
# The meter to report outlet temperature in ceilometer
METER_NAME = "hardware.ipmi.node.outlet_temperature"
# Unit: degree C
THRESHOLD = 35.0
def __init__(self, name=DEFAULT_NAME, description=DEFAULT_DESCRIPTION):
"""[PoC]Outlet temperature control using live migration
It is a migration strategy based on the Outlet Temperature of physical
servers. It generates solutions to move a workload whenever a servers
outlet temperature is higher than the specified threshold. As of now,
we cannot forecast how many instances should be migrated. This is the
reason why we simply plan a single virtual machine migration.
So it's better to use this algorithm with CONTINUOUS audits.
Requirements:
* Hardware: computer node should support IPMI and PTAS technology
* Software: Ceilometer component ceilometer-agent-ipmi running
in each compute node, and Ceilometer API can report such telemetry
"hardware.ipmi.node.outlet_temperature" successfully.
* You must have at least 2 physical compute nodes to run this strategy.
Good Strategy:
Towards to software defined infrastructure, the power and thermal
intelligences is being adopted to optimize workload, which can help
improve efficiency, reduce power, as well as to improve datacenter PUE
and lower down operation cost in data center.
Outlet(Exhaust Air) Temperature is one of the important thermal
telemetries to measure thermal/workload status of server.
:param name: the name of the strategy
:param description: a description of the strategy
"""
super(OutletTempControl, self).__init__(name, description)
# the migration plan will be triggered when the outlet temperature
# reaches threshold
# TODO(zhenzanz): Threshold should be configurable for each audit
self.threshold = self.THRESHOLD
self._meter = self.METER_NAME
self._ceilometer = None
@property
def ceilometer(self):
if self._ceilometer is None:
self._ceilometer = CeilometerClusterHistory()
return self._ceilometer
@ceilometer.setter
def ceilometer(self, c):
self._ceilometer = c
def calc_used_res(self, model, hypervisor, cap_cores, cap_mem, cap_disk):
'''calculate the used vcpus, memory and disk based on VM flavors'''
vms = model.get_mapping().get_node_vms(hypervisor)
vcpus_used = 0
memory_mb_used = 0
disk_gb_used = 0
if len(vms) > 0:
for vm_id in vms:
vm = model.get_vm_from_id(vm_id)
vcpus_used += cap_cores.get_capacity(vm)
memory_mb_used += cap_mem.get_capacity(vm)
disk_gb_used += cap_disk.get_capacity(vm)
return vcpus_used, memory_mb_used, disk_gb_used
def group_hosts_by_outlet_temp(self, model):
'''Group hosts based on outlet temp meters'''
hypervisors = model.get_all_hypervisors()
size_cluster = len(hypervisors)
if size_cluster == 0:
raise wexc.ClusterEmpty()
hosts_need_release = []
hosts_target = []
for hypervisor_id in hypervisors:
hypervisor = model.get_hypervisor_from_id(hypervisor_id)
resource_id = hypervisor.uuid
outlet_temp = self.ceilometer.statistic_aggregation(
resource_id=resource_id,
meter_name=self._meter,
period="30",
aggregate='avg')
# some hosts may not have outlet temp meters, remove from target
if outlet_temp is None:
LOG.warning(_LE("%s: no outlet temp data"), resource_id)
continue
LOG.debug("%s: outlet temperature %f" % (resource_id, outlet_temp))
hvmap = {'hv': hypervisor, 'outlet_temp': outlet_temp}
if outlet_temp >= self.threshold:
# mark the hypervisor to release resources
hosts_need_release.append(hvmap)
else:
hosts_target.append(hvmap)
return hosts_need_release, hosts_target
def choose_vm_to_migrate(self, model, hosts):
'''pick up an active vm instance to migrate from provided hosts'''
for hvmap in hosts:
mig_src_hypervisor = hvmap['hv']
vms_of_src = model.get_mapping().get_node_vms(mig_src_hypervisor)
if len(vms_of_src) > 0:
for vm_id in vms_of_src:
try:
# select the first active VM to migrate
vm = model.get_vm_from_id(vm_id)
if vm.state != VMState.ACTIVE.value:
LOG.info(_LE("VM not active, skipped: %s"),
vm.uuid)
continue
return (mig_src_hypervisor, vm)
except wexc.VMNotFound as e:
LOG.info("VM not found Error: %s" % e.message)
pass
return None
def filter_dest_servers(self, model, hosts, vm_to_migrate):
'''Only return hosts with sufficient available resources'''
cap_cores = model.get_resource_from_id(ResourceType.cpu_cores)
cap_disk = model.get_resource_from_id(ResourceType.disk)
cap_mem = model.get_resource_from_id(ResourceType.memory)
required_cores = cap_cores.get_capacity(vm_to_migrate)
required_disk = cap_disk.get_capacity(vm_to_migrate)
required_mem = cap_mem.get_capacity(vm_to_migrate)
# filter hypervisors without enough resource
dest_servers = []
for hvmap in hosts:
host = hvmap['hv']
# available
cores_used, mem_used, disk_used = self.calc_used_res(model,
host,
cap_cores,
cap_mem,
cap_disk)
cores_available = cap_cores.get_capacity(host) - cores_used
disk_available = cap_disk.get_capacity(host) - mem_used
mem_available = cap_mem.get_capacity(host) - disk_used
if cores_available >= required_cores and \
disk_available >= required_disk and \
mem_available >= required_mem:
dest_servers.append(hvmap)
return dest_servers
def execute(self, orign_model):
LOG.debug("Initializing Outlet temperature strategy")
if orign_model is None:
raise wexc.ClusterStateNotDefined()
current_model = orign_model
hosts_need_release, hosts_target = self.group_hosts_by_outlet_temp(
current_model)
if len(hosts_need_release) == 0:
# TODO(zhenzanz): return something right if there's no hot servers
LOG.debug("No hosts require optimization")
return self.solution
if len(hosts_target) == 0:
LOG.warning(_LE("No hosts under outlet temp threshold found"))
return self.solution
# choose the server with highest outlet t
hosts_need_release = sorted(hosts_need_release,
reverse=True,
key=lambda x: (x["outlet_temp"]))
vm_to_migrate = self.choose_vm_to_migrate(current_model,
hosts_need_release)
# calculate the vm's cpu cores,memory,disk needs
if vm_to_migrate is None:
return self.solution
mig_src_hypervisor, vm_src = vm_to_migrate
dest_servers = self.filter_dest_servers(current_model,
hosts_target,
vm_src)
# sort the filtered result by outlet temp
# pick up the lowest one as dest server
if len(dest_servers) == 0:
# TODO(zhenzanz): maybe to warn that there's no resource
# for instance.
LOG.info(_LE("No proper target host could be found"))
return self.solution
dest_servers = sorted(dest_servers,
reverse=False,
key=lambda x: (x["outlet_temp"]))
# always use the host with lowerest outlet temperature
mig_dst_hypervisor = dest_servers[0]['hv']
# generate solution to migrate the vm to the dest server,
if current_model.get_mapping().migrate_vm(vm_src,
mig_src_hypervisor,
mig_dst_hypervisor):
live_migrate = Migrate(vm_src,
mig_src_hypervisor,
mig_dst_hypervisor)
live_migrate.migration_type = MigrationType.pre_copy
live_migrate.level = StrategyLevel.conservative
self.solution.add_change_request(live_migrate)
self.solution.model = current_model
return self.solution

View File

@ -46,13 +46,17 @@ class TestKeystone(BaseTestCase):
self.assertEqual(ep, expected_endpoint)
def test_get_session(self):
@mock.patch('watcher.common.keystone.KeystoneClient._is_apiv3')
def test_get_session(self, mock_apiv3):
mock_apiv3.return_value = True
k = KeystoneClient()
session = k.get_session()
self.assertIsInstance(session.auth, Password)
self.assertIsInstance(session, Session)
def test_get_credentials(self):
@mock.patch('watcher.common.keystone.KeystoneClient._is_apiv3')
def test_get_credentials(self, mock_apiv3):
mock_apiv3.return_value = True
expected_creds = {'auth_url': None,
'password': None,
'project_domain_name': 'default',

View File

@ -20,10 +20,10 @@
import time
import glanceclient.v2.client as glclient
import keystoneclient.v3.client as ksclient
import mock
import novaclient.client as nvclient
from watcher.common import keystone
from watcher.common.nova import NovaClient
from watcher.common import utils
from watcher.tests import base
@ -40,7 +40,7 @@ class TestNovaClient(base.TestCase):
self.creds = mock.MagicMock()
self.session = mock.MagicMock()
@mock.patch.object(ksclient, "Client", mock.Mock())
@mock.patch.object(keystone, 'KeystoneClient', mock.Mock())
@mock.patch.object(nvclient, "Client", mock.Mock())
def test_stop_instance(self):
nova_client = NovaClient(creds=self.creds, session=self.session)
@ -55,7 +55,7 @@ class TestNovaClient(base.TestCase):
result = nova_client.stop_instance(instance_id)
self.assertEqual(result, True)
@mock.patch.object(ksclient, "Client", mock.Mock())
@mock.patch.object(keystone, 'KeystoneClient', mock.Mock())
@mock.patch.object(nvclient, "Client", mock.Mock())
def test_set_host_offline(self):
nova_client = NovaClient(creds=self.creds, session=self.session)
@ -66,7 +66,7 @@ class TestNovaClient(base.TestCase):
self.assertEqual(result, True)
@mock.patch.object(time, 'sleep', mock.Mock())
@mock.patch.object(ksclient, "Client", mock.Mock())
@mock.patch.object(keystone, 'KeystoneClient', mock.Mock())
@mock.patch.object(nvclient, "Client", mock.Mock())
def test_live_migrate_instance(self):
nova_client = NovaClient(creds=self.creds, session=self.session)
@ -79,7 +79,7 @@ class TestNovaClient(base.TestCase):
)
self.assertIsNotNone(instance)
@mock.patch.object(ksclient, "Client", mock.Mock())
@mock.patch.object(keystone, 'KeystoneClient', mock.Mock())
@mock.patch.object(nvclient, "Client", mock.Mock())
def test_watcher_non_live_migrate_instance_not_found(self):
nova_client = NovaClient(creds=self.creds, session=self.session)
@ -93,7 +93,7 @@ class TestNovaClient(base.TestCase):
self.assertEqual(is_success, False)
@mock.patch.object(time, 'sleep', mock.Mock())
@mock.patch.object(ksclient, "Client", mock.Mock())
@mock.patch.object(keystone, 'KeystoneClient', mock.Mock())
@mock.patch.object(nvclient, "Client", mock.Mock())
def test_watcher_non_live_migrate_instance_volume(self):
nova_client = NovaClient(creds=self.creds, session=self.session)
@ -107,7 +107,7 @@ class TestNovaClient(base.TestCase):
self.assertIsNotNone(instance)
@mock.patch.object(time, 'sleep', mock.Mock())
@mock.patch.object(ksclient, "Client", mock.Mock())
@mock.patch.object(keystone, 'KeystoneClient', mock.Mock())
@mock.patch.object(nvclient, "Client", mock.Mock())
def test_watcher_non_live_migrate_keep_image(self):
nova_client = NovaClient(creds=self.creds, session=self.session)
@ -130,7 +130,7 @@ class TestNovaClient(base.TestCase):
self.assertIsNotNone(instance)
@mock.patch.object(time, 'sleep', mock.Mock())
@mock.patch.object(ksclient, "Client", mock.Mock())
@mock.patch.object(keystone, 'KeystoneClient', mock.Mock())
@mock.patch.object(nvclient, "Client", mock.Mock())
@mock.patch.object(glclient, "Client")
def test_create_image_from_instance(self, m_glance_cls):

View File

@ -34,8 +34,21 @@ class FakerMetricsCollector(object):
result = self.get_usage_node_cpu(resource_id)
elif meter_name == "cpu_util":
result = self.get_average_usage_vm_cpu(resource_id)
elif meter_name == "hardware.ipmi.node.outlet_temperature":
result = self.get_average_outlet_temperature(resource_id)
return result
def get_average_outlet_temperature(self, uuid):
"""The average outlet temperature for host"""
mock = {}
mock['Node_0'] = 30
# use a big value to make sure it exceeds threshold
mock['Node_1'] = 100
if uuid not in mock.keys():
mock[uuid] = 100
return mock[str(uuid)]
def get_usage_node_cpu(self, uuid):
"""The last VM CPU usage values to average

View File

@ -0,0 +1,129 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 Intel Corp
#
# Authors: Zhenzan Zhou <zhenzan.zhou@intel.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from collections import Counter
from mock import MagicMock
from watcher.common import exception
from watcher.decision_engine.actions.migration import Migrate
from watcher.decision_engine.model.model_root import ModelRoot
from watcher.decision_engine.model.resource import ResourceType
from watcher.decision_engine.strategy.strategies.outlet_temp_control import \
OutletTempControl
from watcher.tests import base
from watcher.tests.decision_engine.strategy.strategies.faker_cluster_state \
import FakerModelCollector
from watcher.tests.decision_engine.strategy.strategies.faker_metrics_collector\
import FakerMetricsCollector
class TestOutletTempControl(base.BaseTestCase):
# fake metrics
fake_metrics = FakerMetricsCollector()
# fake cluster
fake_cluster = FakerModelCollector()
def test_calc_used_res(self):
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
strategy = OutletTempControl()
hypervisor = model.get_hypervisor_from_id('Node_0')
cap_cores = model.get_resource_from_id(ResourceType.cpu_cores)
cap_mem = model.get_resource_from_id(ResourceType.memory)
cap_disk = model.get_resource_from_id(ResourceType.disk)
cores_used, mem_used, disk_used = strategy.calc_used_res(model,
hypervisor,
cap_cores,
cap_mem,
cap_disk)
self.assertEqual((cores_used, mem_used, disk_used), (10, 2, 20))
def test_group_hosts_by_outlet_temp(self):
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
strategy = OutletTempControl()
strategy.ceilometer = MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
h1, h2 = strategy.group_hosts_by_outlet_temp(model)
self.assertEqual(h1[0]['hv'].uuid, 'Node_1')
self.assertEqual(h2[0]['hv'].uuid, 'Node_0')
def test_choose_vm_to_migrate(self):
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
strategy = OutletTempControl()
strategy.ceilometer = MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
h1, h2 = strategy.group_hosts_by_outlet_temp(model)
vm_to_mig = strategy.choose_vm_to_migrate(model, h1)
self.assertEqual(vm_to_mig[0].uuid, 'Node_1')
self.assertEqual(vm_to_mig[1].uuid, 'VM_1')
def test_filter_dest_servers(self):
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
strategy = OutletTempControl()
strategy.ceilometer = MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
h1, h2 = strategy.group_hosts_by_outlet_temp(model)
vm_to_mig = strategy.choose_vm_to_migrate(model, h1)
dest_hosts = strategy.filter_dest_servers(model, h2, vm_to_mig[1])
self.assertEqual(len(dest_hosts), 1)
self.assertEqual(dest_hosts[0]['hv'].uuid, 'Node_0')
def test_exception_model(self):
strategy = OutletTempControl()
self.assertRaises(exception.ClusterStateNotDefined, strategy.execute,
None)
def test_exception_cluster_empty(self):
strategy = OutletTempControl()
model = ModelRoot()
self.assertRaises(exception.ClusterEmpty, strategy.execute, model)
def test_execute_cluster_empty(self):
current_state_cluster = FakerModelCollector()
strategy = OutletTempControl()
strategy.ceilometer = MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
model = current_state_cluster.generate_random(0, 0)
self.assertRaises(exception.ClusterEmpty, strategy.execute, model)
def test_execute_no_workload(self):
strategy = OutletTempControl()
strategy.ceilometer = MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
current_state_cluster = FakerModelCollector()
model = current_state_cluster. \
generate_scenario_4_with_1_hypervisor_no_vm()
solution = strategy.execute(model)
self.assertEqual(solution.actions, [])
def test_execute(self):
strategy = OutletTempControl()
strategy.ceilometer = MagicMock(
statistic_aggregation=self.fake_metrics.mock_get_statistics)
model = self.fake_cluster.generate_scenario_3_with_2_hypervisors()
solution = strategy.execute(model)
actions_counter = Counter(
[type(action) for action in solution.actions])
num_migrations = actions_counter.get(Migrate, 0)
self.assertEqual(num_migrations, 1)