Add a dynamic loading of the Watcher Planner implementation

In watcher, an audit generates a set of actions which
aims at achieving a given goal (lower energy consumption, ...).
It is possible to configure different strategies in order to achieve
each goal. Each strategy is written as a Python class which produces
a set of actions. Today, the set of possible actions is fixed for a
given version of Watcher and enables optimization algorithms to
include actions such as instance migration, changing hypervisor state,
changing power state (ACPI level, ...).

The objective of this patchset is to give the ability to extend the
default set of planner algorithms currently available in Watcher
using Stevedore.

The doc need to explain how create a new planner.

DocImpact
Partially implements: blueprint watcher-add-actions-via-conf

Change-Id: I2fd73f8c4a457ee391d764a7a3f494deecd2634f
This commit is contained in:
Jean-Emile DARTOIS 2016-01-07 16:54:15 +01:00
parent c0306ea8f4
commit 47759202a8
11 changed files with 157 additions and 32 deletions

View File

@ -91,7 +91,7 @@
# List of logger=LEVEL pairs. This option is ignored if
# log_config_append is set. (list value)
#default_log_levels = amqp=WARN,amqplib=WARN,boto=WARN,qpid=WARN,sqlalchemy=WARN,suds=INFO,oslo.messaging=INFO,iso8601=WARN,requests.packages.urllib3.connectionpool=WARN,urllib3.connectionpool=WARN,websocket=WARN,requests.packages.urllib3.util.retry=WARN,urllib3.util.retry=WARN,keystonemiddleware=WARN,routes.middleware=WARN,stevedore=WARN,taskflow=WARN
#default_log_levels = amqp=WARN,amqplib=WARN,boto=WARN,qpid=WARN,sqlalchemy=WARN,suds=INFO,oslo.messaging=INFO,iso8601=WARN,requests.packages.urllib3.connectionpool=WARN,urllib3.connectionpool=WARN,websocket=WARN,requests.packages.urllib3.util.retry=WARN,urllib3.util.retry=WARN,keystonemiddleware=WARN,routes.middleware=WARN,stevedore=WARN,taskflow=WARN,keystoneauth=WARN
# Enables or disables publication of error events. (boolean value)
#publish_errors = false
@ -127,10 +127,6 @@
# MatchMaker driver. (string value)
#rpc_zmq_matchmaker = redis
# Use REQ/REP pattern for all methods CALL/CAST/FANOUT. (boolean
# value)
#rpc_zmq_all_req_rep = true
# Type of concurrency used. Either "native" or "eventlet" (string
# value)
#rpc_zmq_concurrency = eventlet
@ -157,8 +153,13 @@
# timeout exception when timeout expired. (integer value)
#rpc_poll_timeout = 1
# Configures zmq-messaging to use broker or not. (boolean value)
#zmq_use_broker = false
# Configures zmq-messaging to use proxy with non PUB/SUB patterns.
# (boolean value)
#direct_over_proxy = true
# Use PUB/SUB pattern for fanout methods. PUB/SUB always uses proxy.
# (boolean value)
#use_pub_sub = true
# Minimal port number for random ports range. (port value)
# Minimum value: 1
@ -633,17 +634,6 @@
# Deprecated group/name - [DEFAULT]/amqp_auto_delete
#amqp_auto_delete = false
# Send a single AMQP reply to call message. The current behaviour
# since oslo-incubator is to send two AMQP replies - first one with
# the payload, a second one to ensure the other have finish to send
# the payload. We are going to remove it in the N release, but we must
# keep backward compatible at the same time. This option provides such
# compatibility - it defaults to False in Liberty and can be turned on
# for early adopters with a new installations or for testing. Please
# note, that this option will be removed in the Mitaka release.
# (boolean value)
#send_single_reply = false
# SSL version to use (valid only if SSL enabled). Valid values are
# TLSv1 and SSLv23. SSLv2, SSLv3, TLSv1_1, and TLSv1_2 may be
# available on some distributions. (string value)
@ -672,7 +662,7 @@
# replies. This value should not be longer than rpc_response_timeout.
# (integer value)
# Deprecated group/name - [DEFAULT]/kombu_reconnect_timeout
#kombu_missing_consumer_retry_timeout = 5
#kombu_missing_consumer_retry_timeout = 60
# Determines how the next RabbitMQ node is chosen in case the one we
# are currently connected to becomes unavailable. Takes effect only if
@ -803,3 +793,13 @@
# strategy (for example: BASIC_CONSOLIDATION:basic,
# MY_GOAL:my_strategy_1) (dict value)
#goals = DUMMY:dummy
[watcher_planner]
#
# From watcher
#
# The selected planner used to schedule the actions (string value)
#planner = default

View File

@ -46,6 +46,9 @@ watcher_strategies =
basic = watcher.decision_engine.strategy.strategies.basic_consolidation:BasicConsolidation
outlet_temp_control = watcher.decision_engine.strategy.strategies.outlet_temp_control:OutletTempControl
watcher_planners =
default = watcher.decision_engine.planner.default:DefaultPlanner
[build_sphinx]
source-dir = doc/source
build-dir = doc/build

View File

@ -19,7 +19,7 @@ from watcher.common.messaging.events.event import Event
from watcher.decision_engine.audit.base import \
BaseAuditHandler
from watcher.decision_engine.messaging.events import Events
from watcher.decision_engine.planner.default import DefaultPlanner
from watcher.decision_engine.planner.manager import PlannerManager
from watcher.decision_engine.strategy.context.default import \
DefaultStrategyContext
from watcher.objects.audit import Audit
@ -34,6 +34,14 @@ class DefaultAuditHandler(BaseAuditHandler):
super(DefaultAuditHandler, self).__init__()
self._messaging = messaging
self._strategy_context = DefaultStrategyContext()
self._planner_manager = PlannerManager()
self._planner = None
@property
def planner(self):
if self._planner is None:
self._planner = self._planner_manager.load()
return self._planner
@property
def messaging(self):
@ -72,9 +80,7 @@ class DefaultAuditHandler(BaseAuditHandler):
solution = self.strategy_context.execute_strategy(audit_uuid,
request_context)
# schedule the actions and create in the watcher db the ActionPlan
planner = DefaultPlanner()
planner.schedule(request_context, audit.id, solution)
self.planner.schedule(request_context, audit.id, solution)
# change state of the audit to SUCCEEDED
self.update_audit_state(request_context, audit_uuid,

View File

@ -16,9 +16,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from oslo_log import log
from enum import Enum
import enum
from oslo_log import log
from watcher._i18n import _LW
from watcher.common import exception
@ -37,7 +37,7 @@ LOG = log.getLogger(__name__)
# https://wiki.openstack.org/wiki/NovaOrchestration/WorkflowEngines
class Primitives(Enum):
class Primitives(enum.Enum):
LIVE_MIGRATE = 'MIGRATE'
COLD_MIGRATE = 'MIGRATE'
POWER_STATE = 'POWERSTATE'

View File

@ -0,0 +1,53 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from oslo_config import cfg
from oslo_log import log
from watcher.decision_engine.planner.loading import default as loader
LOG = log.getLogger(__name__)
CONF = cfg.CONF
default_planner = 'default'
WATCHER_PLANNER_OPTS = {
cfg.StrOpt('planner',
default=default_planner,
required=True,
help='The selected planner used to schedule the actions')
}
planner_opt_group = cfg.OptGroup(name='watcher_planner',
title='Defines the parameters of '
'the planner')
CONF.register_group(planner_opt_group)
CONF.register_opts(WATCHER_PLANNER_OPTS, planner_opt_group)
class PlannerManager(object):
def __init__(self):
self._loader = loader.DefaultPlannerLoader()
@property
def loader(self):
return self._loader
def load(self):
selected_planner = CONF.watcher_planner.planner
LOG.debug("Loading {0}".format(selected_planner))
return self.loader.load(name=selected_planner)

View File

@ -18,6 +18,7 @@
import watcher.api.app
from watcher.applier import manager as applier_manager
from watcher.decision_engine import manager as decision_engine_manger
from watcher.decision_engine.planner import manager as planner_manager
from watcher.decision_engine.strategy.selection import default \
as strategy_selector
@ -29,5 +30,6 @@ def list_opts():
('watcher_decision_engine',
decision_engine_manger.WATCHER_DECISION_ENGINE_OPTS),
('watcher_applier',
applier_manager.APPLIER_MANAGER_OPTS)
applier_manager.APPLIER_MANAGER_OPTS),
('watcher_planner', planner_manager.WATCHER_PLANNER_OPTS)
]

View File

@ -15,9 +15,8 @@
# limitations under the License.
import mock
from mock import MagicMock
from watcher.common.exception import ActionNotFound
from watcher.common import exception
from watcher.common import utils
from watcher.db import api as db_api
from watcher.decision_engine.actions.base import BaseAction
@ -40,7 +39,7 @@ class SolutionFaker(object):
metrics = FakerMetricsCollector()
current_state_cluster = FakerModelCollector()
sercon = BasicConsolidation("basic", "Basic offline consolidation")
sercon.ceilometer = MagicMock(
sercon.ceilometer = mock.MagicMock(
get_statistics=metrics.mock_get_statistics)
return sercon.execute(current_state_cluster.generate_scenario_1())
@ -51,7 +50,7 @@ class SolutionFakerSingleHyp(object):
metrics = FakerMetricsCollector()
current_state_cluster = FakerModelCollector()
sercon = BasicConsolidation("basic", "Basic offline consolidation")
sercon.ceilometer = MagicMock(
sercon.ceilometer = mock.MagicMock(
get_statistics=metrics.mock_get_statistics)
return sercon.execute(
@ -117,7 +116,8 @@ class TestDefaultPlanner(base.DbTestCase):
audit = db_utils.create_test_audit(uuid=utils.generate_uuid())
fake_solution = SolutionFaker.build()
fake_solution.actions[0] = "valeur_qcq"
self.assertRaises(ActionNotFound, self.default_planner.schedule,
self.assertRaises(exception.ActionNotFound,
self.default_planner.schedule,
self.context, audit.id, fake_solution)
def test_schedule_scheduled_empty(self):

View File

@ -0,0 +1,29 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from watcher.decision_engine.planner import base as planner
from watcher.decision_engine.planner.loading import default
from watcher.tests import base
class TestDefaultPlannerLoader(base.TestCase):
loader = default.DefaultPlannerLoader()
def test_endpoints(self):
for endpoint in self.loader.list_available():
loaded = self.loader.load(endpoint)
self.assertIsNotNone(loaded)
self.assertIsInstance(loaded, planner.BasePlanner)

View File

@ -0,0 +1,28 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from watcher.decision_engine.planner.default import DefaultPlanner
from watcher.decision_engine.planner.manager import PlannerManager
from watcher.tests import base
class TestPlannerManager(base.TestCase):
def test_load(self):
cfg.CONF.set_override('planner', "default", group='watcher_planner')
manager = PlannerManager()
self.assertIsInstance(manager.load(), DefaultPlanner)

View File

@ -70,3 +70,7 @@ class TestDefaultStrategyLoader(TestCase):
strategy_loader = DefaultStrategyLoader()
loaded_strategy = strategy_loader.load("dummy")
self.assertIsInstance(loaded_strategy, DummyStrategy)
def test_endpoints(self):
for endpoint in self.strategy_loader.list_available():
self.assertIsNotNone(self.strategy_loader.load(endpoint))