diff --git a/setup.cfg b/setup.cfg index 7213f85..982585a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -53,6 +53,7 @@ watcher_goals = thermal_optimization = watcher.decision_engine.goal.goals:ThermalOptimization workload_balancing = watcher.decision_engine.goal.goals:WorkloadBalancing airflow_optimization = watcher.decision_engine.goal.goals:AirflowOptimization + noisy_neighbor = watcher.decision_engine.goal.goals:NoisyNeighborOptimization watcher_scoring_engines = dummy_scorer = watcher.decision_engine.scoring.dummy_scorer:DummyScorer @@ -70,6 +71,7 @@ watcher_strategies = workload_stabilization = watcher.decision_engine.strategy.strategies.workload_stabilization:WorkloadStabilization workload_balance = watcher.decision_engine.strategy.strategies.workload_balance:WorkloadBalance uniform_airflow = watcher.decision_engine.strategy.strategies.uniform_airflow:UniformAirflow + noisy_neighbor = watcher.decision_engine.strategy.strategies.noisy_neighbor:NoisyNeighbor watcher_actions = migrate = watcher.applier.actions.migration:Migrate diff --git a/watcher/decision_engine/goal/__init__.py b/watcher/decision_engine/goal/__init__.py index 69c63d6..1607884 100644 --- a/watcher/decision_engine/goal/__init__.py +++ b/watcher/decision_engine/goal/__init__.py @@ -21,6 +21,8 @@ ServerConsolidation = goals.ServerConsolidation ThermalOptimization = goals.ThermalOptimization Unclassified = goals.Unclassified WorkloadBalancing = goals.WorkloadBalancing +NoisyNeighbor = goals.NoisyNeighborOptimization __all__ = ("Dummy", "ServerConsolidation", "ThermalOptimization", - "Unclassified", "WorkloadBalancing", ) + "Unclassified", "WorkloadBalancing", + "NoisyNeighborOptimization",) diff --git a/watcher/decision_engine/goal/goals.py b/watcher/decision_engine/goal/goals.py index 213fc10..e5be78f 100644 --- a/watcher/decision_engine/goal/goals.py +++ b/watcher/decision_engine/goal/goals.py @@ -166,3 +166,29 @@ class AirflowOptimization(base.Goal): def get_efficacy_specification(cls): """The efficacy spec for the current goal""" return specs.Unclassified() + + +class NoisyNeighborOptimization(base.Goal): + """NoisyNeighborOptimization + + This goal is used to identify and migrate a Noisy Neighbor - + a low priority VM that negatively affects peformance of a high priority VM + in terms of IPC by over utilizing Last Level Cache. + """ + + @classmethod + def get_name(cls): + return "noisy_neighbor" + + @classmethod + def get_display_name(cls): + return _("Noisy Neighbor") + + @classmethod + def get_translatable_display_name(cls): + return "Noisy Neighbor" + + @classmethod + def get_efficacy_specification(cls): + """The efficacy spec for the current goal""" + return specs.Unclassified() diff --git a/watcher/decision_engine/strategy/strategies/__init__.py b/watcher/decision_engine/strategy/strategies/__init__.py index 403d04b..c1a2821 100644 --- a/watcher/decision_engine/strategy/strategies/__init__.py +++ b/watcher/decision_engine/strategy/strategies/__init__.py @@ -17,6 +17,7 @@ from watcher.decision_engine.strategy.strategies import basic_consolidation from watcher.decision_engine.strategy.strategies import dummy_strategy from watcher.decision_engine.strategy.strategies import dummy_with_scorer +from watcher.decision_engine.strategy.strategies import noisy_neighbor from watcher.decision_engine.strategy.strategies import outlet_temp_control from watcher.decision_engine.strategy.strategies import uniform_airflow from watcher.decision_engine.strategy.strategies import \ @@ -32,7 +33,8 @@ VMWorkloadConsolidation = vm_workload_consolidation.VMWorkloadConsolidation WorkloadBalance = workload_balance.WorkloadBalance WorkloadStabilization = workload_stabilization.WorkloadStabilization UniformAirflow = uniform_airflow.UniformAirflow +NoisyNeighbor = noisy_neighbor.NoisyNeighbor __all__ = ("BasicConsolidation", "OutletTempControl", "DummyStrategy", "DummyWithScorer", "VMWorkloadConsolidation", "WorkloadBalance", - "WorkloadStabilization", "UniformAirflow") + "WorkloadStabilization", "UniformAirflow", "NoisyNeighbor") diff --git a/watcher/decision_engine/strategy/strategies/base.py b/watcher/decision_engine/strategy/strategies/base.py index fc8721e..1c014d1 100644 --- a/watcher/decision_engine/strategy/strategies/base.py +++ b/watcher/decision_engine/strategy/strategies/base.py @@ -328,3 +328,11 @@ class WorkloadStabilizationBaseStrategy(BaseStrategy): @classmethod def get_goal_name(cls): return "workload_balancing" + + +@six.add_metaclass(abc.ABCMeta) +class NoisyNeighborBaseStrategy(BaseStrategy): + + @classmethod + def get_goal_name(cls): + return "noisy_neighbor" diff --git a/watcher/decision_engine/strategy/strategies/noisy_neighbor.py b/watcher/decision_engine/strategy/strategies/noisy_neighbor.py new file mode 100644 index 0000000..d67b411 --- /dev/null +++ b/watcher/decision_engine/strategy/strategies/noisy_neighbor.py @@ -0,0 +1,304 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2017 Intel Corp +# +# Authors: Prudhvi Rao Shedimbi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from oslo_log import log + +from watcher._i18n import _ +from watcher.common import exception as wexc +from watcher.datasource import ceilometer as ceil +from watcher.decision_engine.strategy.strategies import base + +LOG = log.getLogger(__name__) + + +class NoisyNeighbor(base.NoisyNeighborBaseStrategy): + + MIGRATION = "migrate" + # The meter to report L3 cache in ceilometer + METER_NAME_L3 = "cpu_l3_cache" + DEFAULT_WATCHER_PRIORITY = 5 + + def __init__(self, config, osc=None): + """Noisy Neighbor strategy using live migration + + :param config: A mapping containing the configuration of this strategy + :type config: dict + :param osc: an OpenStackClients object, defaults to None + :type osc: :py:class:`~.OpenStackClients` instance, optional + """ + + super(NoisyNeighbor, self).__init__(config, osc) + + self.meter_name = self.METER_NAME_L3 + self._ceilometer = None + + @property + def ceilometer(self): + if self._ceilometer is None: + self._ceilometer = ceil.CeilometerHelper(osc=self.osc) + return self._ceilometer + + @ceilometer.setter + def ceilometer(self, c): + self._ceilometer = c + + @classmethod + def get_name(cls): + return "noisy_neighbor" + + @classmethod + def get_display_name(cls): + return _("Noisy Neighbor") + + @classmethod + def get_translatable_display_name(cls): + return "Noisy Neighbor" + + @classmethod + def get_schema(cls): + # Mandatory default setting for each element + return { + "properties": { + "cache_threshold": { + "description": "Performance drop in L3_cache threshold " + "for migration", + "type": "number", + "default": 35.0 + }, + "period": { + "description": "Aggregate time period of ceilometer", + "type": "number", + "default": 100.0 + }, + }, + } + + def get_current_and_previous_cache(self, instance): + + try: + current_cache = self.ceilometer.statistic_aggregation( + resource_id=instance.uuid, + meter_name=self.meter_name, period=self.period, + aggregate='avg') + + previous_cache = 2 * ( + self.ceilometer.statistic_aggregation( + resource_id=instance.uuid, + meter_name=self.meter_name, + period=2*self.period, aggregate='avg')) - current_cache + + except Exception as exc: + LOG.exception(exc) + return None + + return current_cache, previous_cache + + def find_priority_instance(self, instance): + + current_cache, previous_cache = \ + self.get_current_and_previous_cache(instance) + + if None in (current_cache, previous_cache): + LOG.warning("Ceilometer unable to pick L3 Cache " + "values. Skipping the instance") + return None + + if (current_cache < (1 - (self.cache_threshold / 100.0)) * + previous_cache): + return instance + else: + return None + + def find_noisy_instance(self, instance): + + noisy_current_cache, noisy_previous_cache = \ + self.get_current_and_previous_cache(instance) + + if None in (noisy_current_cache, noisy_previous_cache): + LOG.warning("Ceilometer unable to pick " + "L3 Cache. Skipping the instance") + return None + + if (noisy_current_cache > (1 + (self.cache_threshold / 100.0)) * + noisy_previous_cache): + return instance + else: + return None + + def group_hosts(self): + + nodes = self.compute_model.get_all_compute_nodes() + size_cluster = len(nodes) + if size_cluster == 0: + raise wexc.ClusterEmpty() + + hosts_need_release = {} + hosts_target = [] + + for node in nodes.values(): + instances_of_node = self.compute_model.get_node_instances(node) + node_instance_count = len(instances_of_node) + + # Flag that tells us whether to skip the node or not. If True, + # the node is skipped. Will be true if we find a noisy instance or + # when potential priority instance will be same as potential noisy + # instance + loop_break_flag = False + + if node_instance_count > 1: + + instance_priority_list = [] + + for instance in instances_of_node: + instance_priority_list.append(instance) + + # If there is no metadata regarding watcher-priority, it takes + # DEFAULT_WATCHER_PRIORITY as priority. + instance_priority_list.sort(key=lambda a: ( + a.get('metadata').get('watcher-priority'), + self.DEFAULT_WATCHER_PRIORITY)) + + instance_priority_list_reverse = list(instance_priority_list) + instance_priority_list_reverse.reverse() + + for potential_priority_instance in instance_priority_list: + + priority_instance = self.find_priority_instance( + potential_priority_instance) + + if (priority_instance is not None): + + for potential_noisy_instance in ( + instance_priority_list_reverse): + if(potential_noisy_instance == + potential_priority_instance): + loop_break_flag = True + break + + noisy_instance = self.find_noisy_instance( + potential_noisy_instance) + + if noisy_instance is not None: + hosts_need_release[node.uuid] = { + 'priority_vm': potential_priority_instance, + 'noisy_vm': potential_noisy_instance} + LOG.debug("Priority VM found: %s" % ( + potential_priority_instance.uuid)) + LOG.debug("Noisy VM found: %s" % ( + potential_noisy_instance.uuid)) + loop_break_flag = True + break + + # No need to check other instances in the node + if loop_break_flag is True: + break + + if node.uuid not in hosts_need_release: + hosts_target.append(node) + + return hosts_need_release, hosts_target + + def calc_used_resource(self, node): + """Calculate the used vcpus, memory and disk based on VM flavors""" + instances = self.compute_model.get_node_instances(node) + vcpus_used = 0 + memory_mb_used = 0 + disk_gb_used = 0 + for instance in instances: + vcpus_used += instance.vcpus + memory_mb_used += instance.memory + disk_gb_used += instance.disk + + return vcpus_used, memory_mb_used, disk_gb_used + + def filter_dest_servers(self, hosts, instance_to_migrate): + required_cores = instance_to_migrate.vcpus + required_disk = instance_to_migrate.disk + required_memory = instance_to_migrate.memory + + dest_servers = [] + for host in hosts: + cores_used, mem_used, disk_used = self.calc_used_resource(host) + cores_available = host.vcpus - cores_used + disk_available = host.disk - disk_used + mem_available = host.memory - mem_used + if (cores_available >= required_cores and disk_available >= + required_disk and mem_available >= required_memory): + dest_servers.append(host) + + return dest_servers + + def pre_execute(self): + LOG.debug("Initializing Noisy Neighbor strategy") + + if not self.compute_model: + raise wexc.ClusterStateNotDefined() + + if self.compute_model.stale: + raise wexc.ClusterStateStale() + + LOG.debug(self.compute_model.to_string()) + + def do_execute(self): + self.cache_threshold = self.input_parameters.cache_threshold + self.period = self.input_parameters.period + + hosts_need_release, hosts_target = self.group_hosts() + + if len(hosts_need_release) == 0: + LOG.debug("No hosts require optimization") + return + + if len(hosts_target) == 0: + LOG.debug("No hosts available to migrate") + return + + mig_source_node_name = max(hosts_need_release.keys(), key=lambda a: + hosts_need_release[a]['priority_vm']) + instance_to_migrate = hosts_need_release[mig_source_node_name][ + 'noisy_vm'] + + if instance_to_migrate is None: + return + + dest_servers = self.filter_dest_servers(hosts_target, + instance_to_migrate) + + if len(dest_servers) == 0: + LOG.info("No proper target host could be found") + return + + # Destination node will be the first available node in the list. + mig_destination_node = dest_servers[0] + mig_source_node = self.compute_model.get_node_by_uuid( + mig_source_node_name) + + if self.compute_model.migrate_instance(instance_to_migrate, + mig_source_node, + mig_destination_node): + parameters = {'migration_type': 'live', + 'source_node': mig_source_node.uuid, + 'destination_node': mig_destination_node.uuid} + self.solution.add_action(action_type=self.MIGRATION, + resource_id=instance_to_migrate.uuid, + input_parameters=parameters) + + def post_execute(self): + self.solution.model = self.compute_model + + LOG.debug(self.compute_model.to_string()) diff --git a/watcher/tests/decision_engine/model/ceilometer_metrics.py b/watcher/tests/decision_engine/model/ceilometer_metrics.py index c8834ea..9c5d336 100644 --- a/watcher/tests/decision_engine/model/ceilometer_metrics.py +++ b/watcher/tests/decision_engine/model/ceilometer_metrics.py @@ -56,6 +56,41 @@ class FakeCeilometerMetrics(object): result = self.get_average_usage_instance_cpu_wb(resource_id) return result + def mock_get_statistics_nn(self, resource_id, meter_name, period, + aggregate='avg'): + result = 0.0 + if meter_name == "cpu_l3_cache" and period == 100: + result = self.get_average_l3_cache_current(resource_id) + if meter_name == "cpu_l3_cache" and period == 200: + result = self.get_average_l3_cache_previous(resource_id) + return result + + @staticmethod + def get_average_l3_cache_current(uuid): + """The average l3 cache used by instance""" + mock = {} + mock['73b09e16-35b7-4922-804e-e8f5d9b740fc'] = 35 * oslo_utils.units.Ki + mock['cae81432-1631-4d4e-b29c-6f3acdcde906'] = 30 * oslo_utils.units.Ki + mock['INSTANCE_3'] = 40 * oslo_utils.units.Ki + mock['INSTANCE_4'] = 35 * oslo_utils.units.Ki + if uuid not in mock.keys(): + mock[uuid] = 25 * oslo_utils.units.Ki + return mock[str(uuid)] + + @staticmethod + def get_average_l3_cache_previous(uuid): + """The average l3 cache used by instance""" + mock = {} + mock['73b09e16-35b7-4922-804e-e8f5d9b740fc'] = 34.5 * ( + oslo_utils.units.Ki) + mock['cae81432-1631-4d4e-b29c-6f3acdcde906'] = 30.5 * ( + oslo_utils.units.Ki) + mock['INSTANCE_3'] = 60 * oslo_utils.units.Ki + mock['INSTANCE_4'] = 22.5 * oslo_utils.units.Ki + if uuid not in mock.keys(): + mock[uuid] = 25 * oslo_utils.units.Ki + return mock[str(uuid)] + @staticmethod def get_average_outlet_temperature(uuid): """The average outlet temperature for host""" diff --git a/watcher/tests/decision_engine/model/data/scenario_7_with_2_nodes.xml b/watcher/tests/decision_engine/model/data/scenario_7_with_2_nodes.xml index 99ee529..cf86c00 100644 --- a/watcher/tests/decision_engine/model/data/scenario_7_with_2_nodes.xml +++ b/watcher/tests/decision_engine/model/data/scenario_7_with_2_nodes.xml @@ -1,10 +1,10 @@ - - + + - - + + diff --git a/watcher/tests/decision_engine/strategy/strategies/test_noisy_neighbor.py b/watcher/tests/decision_engine/strategy/strategies/test_noisy_neighbor.py new file mode 100644 index 0000000..349ce1b --- /dev/null +++ b/watcher/tests/decision_engine/strategy/strategies/test_noisy_neighbor.py @@ -0,0 +1,179 @@ +# -*- encoding: utf-8 -*- +# Copyright (c) 2017 Intel Corp +# +# Authors: Prudhvi Rao Shedimbi +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import mock + +from watcher.applier.loading import default +from watcher.common import exception +from watcher.common import utils +from watcher.decision_engine.model import model_root +from watcher.decision_engine.strategy import strategies +from watcher.tests import base +from watcher.tests.decision_engine.model import ceilometer_metrics +from watcher.tests.decision_engine.model import faker_cluster_state + + +class TestNoisyNeighbor(base.TestCase): + + def setUp(self): + super(TestNoisyNeighbor, self).setUp() + # fake metrics + self.fake_metrics = ceilometer_metrics.FakeCeilometerMetrics() + # fake cluster + self.fake_cluster = faker_cluster_state.FakerModelCollector() + + p_model = mock.patch.object( + strategies.NoisyNeighbor, "compute_model", + new_callable=mock.PropertyMock) + self.m_model = p_model.start() + self.addCleanup(p_model.stop) + + p_ceilometer = mock.patch.object( + strategies.NoisyNeighbor, "ceilometer", + new_callable=mock.PropertyMock) + self.m_ceilometer = p_ceilometer.start() + self.addCleanup(p_ceilometer.stop) + + p_audit_scope = mock.patch.object( + strategies.NoisyNeighbor, "audit_scope", + new_callable=mock.PropertyMock + ) + self.m_audit_scope = p_audit_scope.start() + self.addCleanup(p_audit_scope.stop) + + self.m_audit_scope.return_value = mock.Mock() + + self.m_model.return_value = model_root.ModelRoot() + self.m_ceilometer.return_value = mock.Mock( + statistic_aggregation=self.fake_metrics.mock_get_statistics_nn) + self.strategy = strategies.NoisyNeighbor(config=mock.Mock()) + + self.strategy.input_parameters = utils.Struct() + self.strategy.input_parameters.update({'cache_threshold': 35}) + self.strategy.threshold = 35 + self.strategy.input_parameters.update({'period': 100}) + self.strategy.threshold = 100 + + def test_calc_used_resource(self): + model = self.fake_cluster.generate_scenario_3_with_2_nodes() + self.m_model.return_value = model + node = model.get_node_by_uuid('Node_0') + cores_used, mem_used, disk_used = self.strategy.calc_used_resource( + node) + + self.assertEqual((10, 2, 20), (cores_used, mem_used, disk_used)) + + def test_group_hosts(self): + self.strategy.cache_threshold = 35 + self.strategy.period = 100 + model = self.fake_cluster.generate_scenario_7_with_2_nodes() + self.m_model.return_value = model + node_uuid = 'Node_1' + n1, n2 = self.strategy.group_hosts() + self.assertTrue(node_uuid in n1) + self.assertEqual(n1[node_uuid]['priority_vm'].uuid, 'INSTANCE_3') + self.assertEqual(n1[node_uuid]['noisy_vm'].uuid, 'INSTANCE_4') + self.assertEqual('Node_0', n2[0].uuid) + + def test_find_priority_instance(self): + self.strategy.cache_threshold = 35 + self.strategy.period = 100 + model = self.fake_cluster.generate_scenario_7_with_2_nodes() + self.m_model.return_value = model + potential_prio_inst = model.get_instance_by_uuid('INSTANCE_3') + inst_res = self.strategy.find_priority_instance(potential_prio_inst) + self.assertEqual('INSTANCE_3', inst_res.uuid) + + def test_find_noisy_instance(self): + self.strategy.cache_threshold = 35 + self.strategy.period = 100 + model = self.fake_cluster.generate_scenario_7_with_2_nodes() + self.m_model.return_value = model + potential_noisy_inst = model.get_instance_by_uuid('INSTANCE_4') + inst_res = self.strategy.find_noisy_instance(potential_noisy_inst) + self.assertEqual('INSTANCE_4', inst_res.uuid) + + def test_filter_destination_hosts(self): + model = self.fake_cluster.generate_scenario_7_with_2_nodes() + self.m_model.return_value = model + self.strategy.cache_threshold = 35 + self.strategy.period = 100 + n1, n2 = self.strategy.group_hosts() + mig_source_node = max(n1.keys(), key=lambda a: + n1[a]['priority_vm']) + instance_to_mig = n1[mig_source_node]['noisy_vm'] + dest_hosts = self.strategy.filter_dest_servers( + n2, instance_to_mig) + + self.assertEqual(1, len(dest_hosts)) + self.assertEqual('Node_0', dest_hosts[0].uuid) + + def test_exception_model(self): + self.m_model.return_value = None + self.assertRaises( + exception.ClusterStateNotDefined, self.strategy.execute) + + def test_exception_cluster_empty(self): + model = model_root.ModelRoot() + self.m_model.return_value = model + self.assertRaises(exception.ClusterEmpty, self.strategy.execute) + + def test_exception_stale_cdm(self): + self.fake_cluster.set_cluster_data_model_as_stale() + self.m_model.return_value = self.fake_cluster.cluster_data_model + + self.assertRaises( + exception.ClusterStateNotDefined, + self.strategy.execute) + + def test_execute_cluster_empty(self): + model = model_root.ModelRoot() + self.m_model.return_value = model + self.assertRaises(exception.ClusterEmpty, self.strategy.execute) + + def test_execute_no_workload(self): + self.strategy.cache_threshold = 35 + self.strategy.period = 100 + model = self.fake_cluster.generate_scenario_4_with_1_node_no_instance() + self.m_model.return_value = model + + solution = self.strategy.execute() + self.assertEqual([], solution.actions) + + def test_execute(self): + self.strategy.cache_threshold = 35 + self.strategy.period = 100 + model = self.fake_cluster.generate_scenario_7_with_2_nodes() + self.m_model.return_value = model + solution = self.strategy.execute() + actions_counter = collections.Counter( + [action.get('action_type') for action in solution.actions]) + + num_migrations = actions_counter.get("migrate", 0) + self.assertEqual(1, num_migrations) + + def test_check_parameters(self): + model = self.fake_cluster.generate_scenario_3_with_2_nodes() + self.m_model.return_value = model + solution = self.strategy.execute() + loader = default.DefaultActionLoader() + for action in solution.actions: + loaded_action = loader.load(action['action_type']) + loaded_action.input_parameters = action['input_parameters'] + loaded_action.validate_parameters()