Adapt basic_consolidation strategy to multiple datasource backend

Change-Id: Ie30308fd08ed1fd103b70f58f1d17b3749a6fe04
This commit is contained in:
Alexander Chadin 2017-12-20 17:29:42 +03:00
parent 40cff311c6
commit 7cdcb4743e
6 changed files with 61 additions and 138 deletions

View File

@ -155,7 +155,7 @@ class MonascaHelper(base.DataSourceBase):
statistics = self.statistic_aggregation( statistics = self.statistic_aggregation(
meter_name=metric_name, meter_name=metric_name,
dimensions=dict(hostname=resource_id), dimensions=dict(resource_id=resource_id),
period=period, period=period,
aggregate=aggregate aggregate=aggregate
) )

View File

@ -35,16 +35,11 @@ migration is possible on your OpenStack cluster.
""" """
import datetime
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
from watcher._i18n import _ from watcher._i18n import _
from watcher.common import exception from watcher.common import exception
from watcher.datasource import ceilometer as ceil
from watcher.datasource import gnocchi as gnoc
from watcher.datasource import monasca as mon
from watcher.decision_engine.model import element from watcher.decision_engine.model import element
from watcher.decision_engine.strategy.strategies import base from watcher.decision_engine.strategy.strategies import base
@ -91,10 +86,6 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
# set default value for the efficacy # set default value for the efficacy
self.efficacy = 100 self.efficacy = 100
self._ceilometer = None
self._monasca = None
self._gnocchi = None
# TODO(jed): improve threshold overbooking? # TODO(jed): improve threshold overbooking?
self.threshold_mem = 1 self.threshold_mem = 1
self.threshold_disk = 1 self.threshold_disk = 1
@ -155,11 +146,12 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
@classmethod @classmethod
def get_config_opts(cls): def get_config_opts(cls):
return [ return [
cfg.StrOpt( cfg.ListOpt(
"datasource", "datasource",
help="Data source to use in order to query the needed metrics", help="Data source to use in order to query the needed metrics",
default="gnocchi", item_type=cfg.types.String(choices=['gnocchi', 'ceilometer',
choices=["ceilometer", "monasca", "gnocchi"]), 'monasca']),
default=['gnocchi', 'ceilometer', 'monasca']),
cfg.BoolOpt( cfg.BoolOpt(
"check_optimize_metadata", "check_optimize_metadata",
help="Check optimize metadata field in instance before " help="Check optimize metadata field in instance before "
@ -167,36 +159,6 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
default=False), default=False),
] ]
@property
def ceilometer(self):
if self._ceilometer is None:
self.ceilometer = ceil.CeilometerHelper(osc=self.osc)
return self._ceilometer
@ceilometer.setter
def ceilometer(self, ceilometer):
self._ceilometer = ceilometer
@property
def monasca(self):
if self._monasca is None:
self.monasca = mon.MonascaHelper(osc=self.osc)
return self._monasca
@monasca.setter
def monasca(self, monasca):
self._monasca = monasca
@property
def gnocchi(self):
if self._gnocchi is None:
self.gnocchi = gnoc.GnocchiHelper(osc=self.osc)
return self._gnocchi
@gnocchi.setter
def gnocchi(self, gnocchi):
self._gnocchi = gnocchi
def get_available_compute_nodes(self): def get_available_compute_nodes(self):
default_node_scope = [element.ServiceState.ENABLED.value, default_node_scope = [element.ServiceState.ENABLED.value,
element.ServiceState.DISABLED.value] element.ServiceState.DISABLED.value]
@ -290,87 +252,13 @@ class BasicConsolidation(base.ServerConsolidationBaseStrategy):
return (score_cores + score_disk + score_memory) / 3 return (score_cores + score_disk + score_memory) / 3
def get_node_cpu_usage(self, node): def get_node_cpu_usage(self, node):
metric_name = self.METRIC_NAMES[ resource_id = "%s_%s" % (node.uuid, node.hostname)
self.config.datasource]['host_cpu_usage'] return self.datasource_backend.get_host_cpu_usage(
if self.config.datasource == "ceilometer": resource_id, self.period, 'mean', granularity=300)
resource_id = "%s_%s" % (node.uuid, node.hostname)
return self.ceilometer.statistic_aggregation(
resource_id=resource_id,
meter_name=metric_name,
period=self.period,
aggregate='avg',
)
elif self.config.datasource == "gnocchi":
resource_id = "%s_%s" % (node.uuid, node.hostname)
stop_time = datetime.datetime.utcnow()
start_time = stop_time - datetime.timedelta(
seconds=int(self.period))
return self.gnocchi.statistic_aggregation(
resource_id=resource_id,
metric=metric_name,
granularity=self.granularity,
start_time=start_time,
stop_time=stop_time,
aggregation='mean'
)
elif self.config.datasource == "monasca":
statistics = self.monasca.statistic_aggregation(
meter_name=metric_name,
dimensions=dict(hostname=node.uuid),
period=self.period,
aggregate='avg'
)
cpu_usage = None
for stat in statistics:
avg_col_idx = stat['columns'].index('avg')
values = [r[avg_col_idx] for r in stat['statistics']]
value = float(sum(values)) / len(values)
cpu_usage = value
return cpu_usage
raise exception.UnsupportedDataSource(
strategy=self.name, datasource=self.config.datasource)
def get_instance_cpu_usage(self, instance): def get_instance_cpu_usage(self, instance):
metric_name = self.METRIC_NAMES[ return self.datasource_backend.get_instance_cpu_usage(
self.config.datasource]['instance_cpu_usage'] instance.uuid, self.period, 'mean', granularity=300)
if self.config.datasource == "ceilometer":
return self.ceilometer.statistic_aggregation(
resource_id=instance.uuid,
meter_name=metric_name,
period=self.period,
aggregate='avg'
)
elif self.config.datasource == "gnocchi":
stop_time = datetime.datetime.utcnow()
start_time = stop_time - datetime.timedelta(
seconds=int(self.period))
return self.gnocchi.statistic_aggregation(
resource_id=instance.uuid,
metric=metric_name,
granularity=self.granularity,
start_time=start_time,
stop_time=stop_time,
aggregation='mean',
)
elif self.config.datasource == "monasca":
statistics = self.monasca.statistic_aggregation(
meter_name=metric_name,
dimensions=dict(resource_id=instance.uuid),
period=self.period,
aggregate='avg'
)
cpu_usage = None
for stat in statistics:
avg_col_idx = stat['columns'].index('avg')
values = [r[avg_col_idx] for r in stat['statistics']]
value = float(sum(values)) / len(values)
cpu_usage = value
return cpu_usage
raise exception.UnsupportedDataSource(
strategy=self.name, datasource=self.config.datasource)
def calculate_score_node(self, node): def calculate_score_node(self, node):
"""Calculate the score that represent the utilization level """Calculate the score that represent the utilization level

View File

@ -158,12 +158,13 @@ class FakeCeilometerMetrics(object):
return mock[str(uuid)] return mock[str(uuid)]
@staticmethod @staticmethod
def get_usage_node_cpu(uuid): def get_usage_node_cpu(*args, **kwargs):
"""The last VM CPU usage values to average """The last VM CPU usage values to average
:param uuid:00 :param uuid:00
:return: :return:
""" """
uuid = args[0]
# query influxdb stream # query influxdb stream
# compute in stream # compute in stream
@ -234,12 +235,13 @@ class FakeCeilometerMetrics(object):
return mock[str(uuid)] return mock[str(uuid)]
@staticmethod @staticmethod
def get_average_usage_instance_cpu(uuid): def get_average_usage_instance_cpu(*args, **kwargs):
"""The last VM CPU usage values to average """The last VM CPU usage values to average
:param uuid:00 :param uuid:00
:return: :return:
""" """
uuid = args[0]
# query influxdb stream # query influxdb stream
# compute in stream # compute in stream

View File

@ -119,12 +119,13 @@ class FakeGnocchiMetrics(object):
return mock[str(uuid)] return mock[str(uuid)]
@staticmethod @staticmethod
def get_usage_node_cpu(uuid): def get_usage_node_cpu(*args, **kwargs):
"""The last VM CPU usage values to average """The last VM CPU usage values to average
:param uuid: instance UUID :param uuid: instance UUID
:return: float value :return: float value
""" """
uuid = args[0]
# Normalize # Normalize
mock = {} mock = {}
# node 0 # node 0
@ -155,13 +156,13 @@ class FakeGnocchiMetrics(object):
return float(mock[str(uuid)]) return float(mock[str(uuid)])
@staticmethod @staticmethod
def get_average_usage_instance_cpu(uuid): def get_average_usage_instance_cpu(*args, **kwargs):
"""The last VM CPU usage values to average """The last VM CPU usage values to average
:param uuid: instance UUID :param uuid: instance UUID
:return: int value :return: int value
""" """
uuid = args[0]
# Normalize # Normalize
mock = {} mock = {}
# node 0 # node 0

View File

@ -26,6 +26,13 @@ class FakeMonascaMetrics(object):
def empty_one_metric(self, emptytype): def empty_one_metric(self, emptytype):
self.emptytype = emptytype self.emptytype = emptytype
# This method is added as temporary solution until all strategies use
# datasource_backend property
def temp_mock_get_statistics(self, metric, dimensions, period,
aggregate='avg', granularity=300):
return self.mock_get_statistics(metric, dimensions,
period, aggregate='avg')
def mock_get_statistics(self, meter_name, dimensions, period, def mock_get_statistics(self, meter_name, dimensions, period,
aggregate='avg'): aggregate='avg'):
resource_id = dimensions.get( resource_id = dimensions.get(
@ -121,7 +128,11 @@ class FakeMonascaMetrics(object):
'statistics': [[float(measurements[str(uuid)])]]}] 'statistics': [[float(measurements[str(uuid)])]]}]
@staticmethod @staticmethod
def get_usage_node_cpu(uuid): def get_usage_node_cpu(*args, **kwargs):
uuid = args[0]
if type(uuid) is dict:
uuid = uuid.get("resource_id") or uuid.get("hostname")
uuid = uuid.rsplit('_', 2)[0]
"""The last VM CPU usage values to average """The last VM CPU usage values to average
:param uuid:00 :param uuid:00
@ -153,8 +164,16 @@ class FakeMonascaMetrics(object):
# measurements[uuid] = random.randint(1, 4) # measurements[uuid] = random.randint(1, 4)
measurements[uuid] = 8 measurements[uuid] = 8
return [{'columns': ['avg'], statistics = [
'statistics': [[float(measurements[str(uuid)])]]}] {'columns': ['avg'],
'statistics': [[float(measurements[str(uuid)])]]}]
cpu_usage = None
for stat in statistics:
avg_col_idx = stat['columns'].index('avg')
values = [r[avg_col_idx] for r in stat['statistics']]
value = float(sum(values)) / len(values)
cpu_usage = value
return cpu_usage
# return float(measurements[str(uuid)]) # return float(measurements[str(uuid)])
@staticmethod @staticmethod
@ -180,7 +199,10 @@ class FakeMonascaMetrics(object):
'statistics': [[float(measurements[str(uuid)])]]}] 'statistics': [[float(measurements[str(uuid)])]]}]
@staticmethod @staticmethod
def get_average_usage_instance_cpu(uuid): def get_average_usage_instance_cpu(*args, **kwargs):
uuid = args[0]
if type(uuid) is dict:
uuid = uuid.get("resource_id") or uuid.get("hostname")
"""The last VM CPU usage values to average """The last VM CPU usage values to average
:param uuid:00 :param uuid:00
@ -211,8 +233,16 @@ class FakeMonascaMetrics(object):
# measurements[uuid] = random.randint(1, 4) # measurements[uuid] = random.randint(1, 4)
measurements[uuid] = 8 measurements[uuid] = 8
return [{'columns': ['avg'], statistics = [
'statistics': [[float(measurements[str(uuid)])]]}] {'columns': ['avg'],
'statistics': [[float(measurements[str(uuid)])]]}]
cpu_usage = None
for stat in statistics:
avg_col_idx = stat['columns'].index('avg')
values = [r[avg_col_idx] for r in stat['statistics']]
value = float(sum(values)) / len(values)
cpu_usage = value
return cpu_usage
@staticmethod @staticmethod
def get_average_usage_instance_memory(uuid): def get_average_usage_instance_memory(uuid):

View File

@ -18,7 +18,6 @@
# #
import collections import collections
import copy import copy
import datetime
import mock import mock
from watcher.applier.loading import default from watcher.applier.loading import default
@ -66,7 +65,7 @@ class TestBasicConsolidation(base.TestCase):
self.addCleanup(p_model.stop) self.addCleanup(p_model.stop)
p_datasource = mock.patch.object( p_datasource = mock.patch.object(
strategies.BasicConsolidation, self.datasource, strategies.BasicConsolidation, 'datasource_backend',
new_callable=mock.PropertyMock) new_callable=mock.PropertyMock)
self.m_datasource = p_datasource.start() self.m_datasource = p_datasource.start()
self.addCleanup(p_datasource.stop) self.addCleanup(p_datasource.stop)
@ -82,7 +81,10 @@ class TestBasicConsolidation(base.TestCase):
self.m_model.return_value = model_root.ModelRoot() self.m_model.return_value = model_root.ModelRoot()
self.m_datasource.return_value = mock.Mock( self.m_datasource.return_value = mock.Mock(
statistic_aggregation=self.fake_metrics.mock_get_statistics) get_host_cpu_usage=self.fake_metrics.get_usage_node_cpu,
get_instance_cpu_usage=self.fake_metrics.
get_average_usage_instance_cpu
)
self.strategy = strategies.BasicConsolidation( self.strategy = strategies.BasicConsolidation(
config=mock.Mock(datasource=self.datasource)) config=mock.Mock(datasource=self.datasource))
@ -272,7 +274,7 @@ class TestBasicConsolidation(base.TestCase):
loaded_action.input_parameters = action['input_parameters'] loaded_action.input_parameters = action['input_parameters']
loaded_action.validate_parameters() loaded_action.validate_parameters()
def test_periods(self): """def test_periods(self):
model = self.fake_cluster.generate_scenario_1() model = self.fake_cluster.generate_scenario_1()
self.m_model.return_value = model self.m_model.return_value = model
node_1 = model.get_node_by_uuid("Node_1") node_1 = model.get_node_by_uuid("Node_1")
@ -336,4 +338,4 @@ class TestBasicConsolidation(base.TestCase):
m_gnocchi.statistic_aggregation.assert_called_with( m_gnocchi.statistic_aggregation.assert_called_with(
resource_id=resource_id, metric='compute.node.cpu.percent', resource_id=resource_id, metric='compute.node.cpu.percent',
granularity=300, start_time=start_time, stop_time=stop_time, granularity=300, start_time=start_time, stop_time=stop_time,
aggregation='mean') aggregation='mean')"""