Use taskflow library for building and executing action plans

The aim of this patchset is to integrate taskflow in
the Watcher Applier. Taskflow will help us a lot to make
Action Plan execution easy, consistent, scalable and reliable.

DocImpact

Partially implements: blueprint use-taskflow

Change-Id: I903d6509d74a61ad64e1506b8a7156e6e91abcfb
Closes-Bug: #1535326
Closes-Bug: #1531912
This commit is contained in:
Jean-Emile DARTOIS 2016-01-06 12:44:25 +01:00
parent f675003076
commit 0e7bfe61bd
44 changed files with 1234 additions and 848 deletions

File diff suppressed because it is too large Load Diff

View File

@ -26,4 +26,5 @@ python-openstackclient>=1.5.0
six>=1.9.0
SQLAlchemy>=0.9.9,<1.1.0
stevedore>=1.5.0 # Apache-2.0
taskflow>=1.25.0 # Apache-2.0
WSME>=0.7

View File

@ -47,9 +47,13 @@ watcher_strategies =
outlet_temp_control = watcher.decision_engine.strategy.strategies.outlet_temp_control:OutletTempControl
watcher_actions =
migrate = watcher.applier.primitives.migration:Migrate
nop = watcher.applier.primitives.nop:Nop
change_nova_service_state = watcher.applier.primitives.change_nova_service_state:ChangeNovaServiceState
migrate = watcher.applier.actions.migration:Migrate
nop = watcher.applier.actions.nop:Nop
sleep = watcher.applier.actions.sleep:Sleep
change_nova_service_state = watcher.applier.actions.change_nova_service_state:ChangeNovaServiceState
watcher_workflow_engines =
taskflow = watcher.applier.workflow_engine.default:DefaultWorkFlowEngine
watcher_planners =
default = watcher.decision_engine.planner.default:DefaultPlanner

View File

@ -18,51 +18,51 @@
#
from oslo_log import log
from watcher.applier.action_plan.base import BaseActionPlanHandler
from watcher.applier.default import DefaultApplier
from watcher.applier.messaging.events import Events
from watcher.common.messaging.events.event import Event
from watcher.objects.action_plan import ActionPlan
from watcher.objects.action_plan import Status
from watcher.applier.action_plan import base
from watcher.applier import default
from watcher.applier.messaging import event_types
from watcher.common.messaging.events import event
from watcher import objects
LOG = log.getLogger(__name__)
class DefaultActionPlanHandler(BaseActionPlanHandler):
def __init__(self, context, manager_applier, action_plan_uuid):
class DefaultActionPlanHandler(base.BaseActionPlanHandler):
def __init__(self, context, applier_manager, action_plan_uuid):
super(DefaultActionPlanHandler, self).__init__()
self.ctx = context
self.action_plan_uuid = action_plan_uuid
self.manager_applier = manager_applier
self.applier_manager = applier_manager
def notify(self, uuid, event_type, state):
action_plan = ActionPlan.get_by_uuid(self.ctx, uuid)
action_plan = objects.ActionPlan.get_by_uuid(self.ctx, uuid)
action_plan.state = state
action_plan.save()
event = Event()
event.type = event_type
event.data = {}
ev = event.Event()
ev.type = event_type
ev.data = {}
payload = {'action_plan__uuid': uuid,
'action_plan_state': state}
self.manager_applier.topic_status.publish_event(event.type.name,
self.applier_manager.topic_status.publish_event(ev.type.name,
payload)
def execute(self):
try:
# update state
self.notify(self.action_plan_uuid,
Events.LAUNCH_ACTION_PLAN,
Status.ONGOING)
applier = DefaultApplier(self.manager_applier, self.ctx)
event_types.EventTypes.LAUNCH_ACTION_PLAN,
objects.action_plan.Status.ONGOING)
applier = default.DefaultApplier(self.applier_manager, self.ctx)
result = applier.execute(self.action_plan_uuid)
except Exception as e:
LOG.exception(e)
result = False
LOG.error("Launch Action Plan " + unicode(e))
finally:
if result is True:
status = Status.SUCCEEDED
status = objects.action_plan.Status.SUCCEEDED
else:
status = Status.FAILED
status = objects.action_plan.Status.FAILED
# update state
self.notify(self.action_plan_uuid, Events.LAUNCH_ACTION_PLAN,
self.notify(self.action_plan_uuid,
event_types.EventTypes.LAUNCH_ACTION_PLAN,
status)

View File

@ -32,16 +32,15 @@ the appropriate commands to Nova for this type of
"""
import abc
import six
from watcher.applier import promise
import six
@six.add_metaclass(abc.ABCMeta)
class BasePrimitive(object):
class BaseAction(object):
def __init__(self):
self._input_parameters = None
self._applies_to = None
self._input_parameters = {}
self._applies_to = ""
@property
def input_parameters(self):
@ -59,12 +58,18 @@ class BasePrimitive(object):
def applies_to(self, a):
self._applies_to = a
@promise.Promise
@abc.abstractmethod
def execute(self):
raise NotImplementedError()
@promise.Promise
@abc.abstractmethod
def undo(self):
def revert(self):
raise NotImplementedError()
@abc.abstractmethod
def precondition(self):
raise NotImplementedError()
@abc.abstractmethod
def postcondition(self):
raise NotImplementedError()

View File

@ -19,30 +19,23 @@
from watcher._i18n import _
from watcher.applier.primitives import base
from watcher.applier import promise
from watcher.applier.actions import base
from watcher.common import exception
from watcher.common import keystone as kclient
from watcher.common import nova as nclient
from watcher.decision_engine.model import hypervisor_state as hstate
class ChangeNovaServiceState(base.BasePrimitive):
def __init__(self):
"""This class allows us to change the state of nova-compute service."""
super(ChangeNovaServiceState, self).__init__()
self._host = self.applies_to
self._state = self.input_parameters.get('state')
class ChangeNovaServiceState(base.BaseAction):
@property
def host(self):
return self._host
return self.applies_to
@property
def state(self):
return self._state
return self.input_parameters.get('state')
@promise.Promise
def execute(self):
target_state = None
if self.state == hstate.HypervisorState.OFFLINE.value:
@ -51,8 +44,7 @@ class ChangeNovaServiceState(base.BasePrimitive):
target_state = True
return self.nova_manage_service(target_state)
@promise.Promise
def undo(self):
def revert(self):
target_state = None
if self.state == hstate.HypervisorState.OFFLINE.value:
target_state = True
@ -72,3 +64,9 @@ class ChangeNovaServiceState(base.BasePrimitive):
return wrapper.enable_service_nova_compute(self.host)
else:
return wrapper.disable_service_nova_compute(self.host)
def precondition(self):
pass
def postcondition(self):
pass

View File

@ -19,7 +19,7 @@ from __future__ import unicode_literals
from oslo_log import log
from watcher.applier.primitives.loading import default
from watcher.applier.actions.loading import default
LOG = log.getLogger(__name__)

View File

@ -19,11 +19,11 @@ from __future__ import unicode_literals
from oslo_log import log
from watcher.common.loader.default import DefaultLoader
from watcher.common.loader import default
LOG = log.getLogger(__name__)
class DefaultActionLoader(DefaultLoader):
class DefaultActionLoader(default.DefaultLoader):
def __init__(self):
super(DefaultActionLoader, self).__init__(namespace='watcher_actions')

View File

@ -17,27 +17,42 @@
# limitations under the License.
#
from oslo_log import log
from watcher.applier.primitives import base
from watcher.applier import promise
from watcher.applier.actions import base
from watcher.common import exception
from watcher.common import keystone as kclient
from watcher.common import nova as nclient
LOG = log.getLogger(__name__)
class Migrate(base.BasePrimitive):
def __init__(self):
super(Migrate, self).__init__()
self.instance_uuid = self.applies_to
self.migration_type = self.input_parameters.get('migration_type')
class Migrate(base.BaseAction):
@property
def instance_uuid(self):
return self.applies_to
@property
def migration_type(self):
return self.input_parameters.get('migration_type')
@property
def dst_hypervisor(self):
return self.input_parameters.get('dst_hypervisor')
@property
def src_hypervisor(self):
return self.input_parameters.get('src_hypervisor')
def migrate(self, destination):
keystone = kclient.KeystoneClient()
wrapper = nclient.NovaClient(keystone.get_credentials(),
session=keystone.get_session())
LOG.debug("Migrate instance %s to %s ", self.instance_uuid,
destination)
instance = wrapper.find_instance(self.instance_uuid)
if instance:
if self.migration_type is 'live':
if self.migration_type == 'live':
return wrapper.live_migrate_instance(
instance_id=self.instance_uuid, dest_hostname=destination)
else:
@ -45,10 +60,17 @@ class Migrate(base.BasePrimitive):
else:
raise exception.InstanceNotFound(name=self.instance_uuid)
@promise.Promise
def execute(self):
return self.migrate(self.input_parameters.get('dst_hypervisor_uuid'))
return self.migrate(destination=self.dst_hypervisor)
@promise.Promise
def undo(self):
return self.migrate(self.input_parameters.get('src_hypervisor_uuid'))
def revert(self):
return self.migrate(destination=self.src_hypervisor)
def precondition(self):
# todo(jed) check if the instance exist/ check if the instance is on
# the src_hypervisor
pass
def postcondition(self):
# todo(jed) we can image to check extra parameters (nework reponse,ect)
pass

View File

@ -19,23 +19,28 @@
from oslo_log import log
from watcher.applier.primitives import base
from watcher.applier import promise
from watcher.applier.actions import base
LOG = log.getLogger(__name__)
class Nop(base.BasePrimitive):
class Nop(base.BaseAction):
@property
def message(self):
return self.input_parameters.get('message')
@promise.Promise
def execute(self):
LOG.debug("executing action NOP message:%s ",
self.input_parameters.get('message'))
LOG.debug("executing action NOP message:%s ", self.message)
return True
@promise.Promise
def undo(self):
LOG.debug("undo action NOP")
def revert(self):
LOG.debug("revert action NOP")
return True
def precondition(self):
pass
def postcondition(self):
pass

View File

@ -0,0 +1,48 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import time
from oslo_log import log
from watcher.applier.actions import base
LOG = log.getLogger(__name__)
class Sleep(base.BaseAction):
@property
def duration(self):
return int(self.input_parameters.get('duration'))
def execute(self):
LOG.debug("Starting action Sleep duration:%s ", self.duration)
time.sleep(self.duration)
return True
def revert(self):
LOG.debug("revert action Sleep")
return True
def precondition(self):
pass
def postcondition(self):
pass

View File

@ -16,24 +16,48 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from oslo_config import cfg
from oslo_log import log
from watcher.applier import base
from watcher.applier.execution import default
from watcher.applier.workflow_engine.loading import default
from watcher import objects
LOG = log.getLogger(__name__)
CONF = cfg.CONF
class DefaultApplier(base.BaseApplier):
def __init__(self, manager_applier, context):
def __init__(self, applier_manager, context):
super(DefaultApplier, self).__init__()
self.manager_applier = manager_applier
self.context = context
self.executor = default.DefaultActionPlanExecutor(manager_applier,
context)
self._applier_manager = applier_manager
self._loader = default.DefaultWorkFlowEngineLoader()
self._engine = None
self._context = context
@property
def context(self):
return self._context
@property
def applier_manager(self):
return self._applier_manager
@property
def engine(self):
if self._engine is None:
selected_workflow_engine = CONF.watcher_applier.workflow_engine
LOG.debug("Loading workflow engine %s ", selected_workflow_engine)
self._engine = self._loader.load(name=selected_workflow_engine)
self._engine.context = self.context
self._engine.applier_manager = self.applier_manager
return self._engine
def execute(self, action_plan_uuid):
LOG.debug("Executing action plan %s ", action_plan_uuid)
action_plan = objects.ActionPlan.get_by_uuid(self.context,
action_plan_uuid)
# todo(jed) remove direct access to dbapi need filter in object
filters = {'action_plan_id': action_plan.id}
actions = objects.Action.dbapi.get_action_list(self.context, filters)
return self.executor.execute(actions)
return self.engine.execute(actions)

View File

@ -1,57 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from oslo_log import log
from watcher._i18n import _LE
from watcher.applier.execution import base
from watcher.applier.execution import deploy_phase
from watcher.objects import action_plan
LOG = log.getLogger(__name__)
class DefaultActionPlanExecutor(base.BaseActionPlanExecutor):
def __init__(self, manager_applier, context):
super(DefaultActionPlanExecutor, self).__init__(manager_applier,
context)
self.deploy = deploy_phase.DeployPhase(self)
def execute(self, actions):
for action in actions:
try:
self.notify(action, action_plan.Status.ONGOING)
loaded_action = self.action_factory.make_action(action)
result = self.deploy.execute_primitive(loaded_action)
if result is False:
self.notify(action, action_plan.Status.FAILED)
self.deploy.rollback()
return False
else:
self.deploy.populate(loaded_action)
self.notify(action, action_plan.Status.SUCCEEDED)
except Exception as e:
LOG.expection(e)
LOG.debug('The ActionPlanExecutor failed to execute the action'
' %s ', action)
LOG.error(_LE("Trigger a rollback"))
self.notify(action, action_plan.Status.FAILED)
self.deploy.rollback()
return False
return True

View File

@ -1,56 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from oslo_log import log
LOG = log.getLogger(__name__)
class DeployPhase(object):
def __init__(self, executor):
# todo(jed) oslo_conf 10 secondes
self._max_timeout = 100000
self._actions = []
self._executor = executor
@property
def actions(self):
return self._actions
@property
def max_timeout(self):
return self._max_timeout
@max_timeout.setter
def max_timeout(self, m):
self._max_timeout = m
def populate(self, action):
self._actions.append(action)
def execute_primitive(self, primitive):
future = primitive.execute(primitive)
return future.result(self.max_timeout)
def rollback(self):
reverted = sorted(self.actions, reverse=True)
for primitive in reverted:
try:
self.execute_primitive(primitive)
except Exception as e:
LOG.error(e)

View File

@ -16,20 +16,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from concurrent.futures import ThreadPoolExecutor
from oslo_config import cfg
from oslo_log import log
from watcher.applier.messaging.trigger import TriggerActionPlan
from watcher.common.messaging.messaging_core import MessagingCore
from watcher.applier.messaging import trigger
from watcher.common.messaging import messaging_core
LOG = log.getLogger(__name__)
CONF = cfg.CONF
# Register options
APPLIER_MANAGER_OPTS = [
cfg.IntOpt('applier_worker', default='1', help='The number of worker'),
cfg.IntOpt('workers',
default='1',
min=1,
required=True,
help='Number of workers for applier, default value is 1.'),
cfg.StrOpt('topic_control',
default='watcher.applier.control',
help='The topic name used for'
@ -45,7 +49,11 @@ APPLIER_MANAGER_OPTS = [
cfg.StrOpt('publisher_id',
default='watcher.applier.api',
help='The identifier used by watcher '
'module on the message broker')
'module on the message broker'),
cfg.StrOpt('workflow_engine',
default='taskflow',
required=True,
help='Select the engine to use to execute the workflow')
]
opt_group = cfg.OptGroup(name='watcher_applier',
@ -55,7 +63,7 @@ CONF.register_group(opt_group)
CONF.register_opts(APPLIER_MANAGER_OPTS, opt_group)
class ApplierManager(MessagingCore):
class ApplierManager(messaging_core.MessagingCore):
def __init__(self):
super(ApplierManager, self).__init__(
CONF.watcher_applier.publisher_id,
@ -63,10 +71,7 @@ class ApplierManager(MessagingCore):
CONF.watcher_applier.topic_status,
api_version=self.API_VERSION,
)
# shared executor of the workflow
self.executor = ThreadPoolExecutor(max_workers=1)
# trigger action_plan
self.topic_control.add_endpoint(TriggerActionPlan(self))
self.topic_control.add_endpoint(trigger.TriggerActionPlan(self))
def join(self):
self.topic_control.join()

View File

@ -20,6 +20,6 @@
import enum
class Events(enum.Enum):
class EventTypes(enum.Enum):
LAUNCH_ACTION_PLAN = "launch_action_plan"
LAUNCH_ACTION = "launch_action"

View File

@ -16,30 +16,35 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from concurrent import futures
from oslo_config import cfg
from oslo_log import log
from watcher.applier.action_plan.default import DefaultActionPlanHandler
from watcher.applier.action_plan import default
LOG = log.getLogger(__name__)
CONF = cfg.CONF
class TriggerActionPlan(object):
def __init__(self, manager_applier):
self.manager_applier = manager_applier
def __init__(self, applier_manager):
self.applier_manager = applier_manager
workers = CONF.watcher_applier.workers
self.executor = futures.ThreadPoolExecutor(max_workers=workers)
def do_launch_action_plan(self, context, action_plan_uuid):
try:
cmd = DefaultActionPlanHandler(context,
self.manager_applier,
action_plan_uuid)
cmd = default.DefaultActionPlanHandler(context,
self.applier_manager,
action_plan_uuid)
cmd.execute()
except Exception as e:
LOG.exception(e)
def launch_action_plan(self, context, action_plan_uuid):
LOG.debug("Trigger ActionPlan %s" % action_plan_uuid)
LOG.debug("Trigger ActionPlan %s", action_plan_uuid)
# submit
self.manager_applier.executor.submit(self.do_launch_action_plan,
context,
action_plan_uuid)
self.executor.submit(self.do_launch_action_plan, context,
action_plan_uuid)
return action_plan_uuid

View File

@ -1,50 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from concurrent.futures import Future
from concurrent.futures import ThreadPoolExecutor
class Promise(object):
executor = ThreadPoolExecutor(
max_workers=10)
def __init__(self, func):
self.func = func
def resolve(self, *args, **kwargs):
resolved_args = []
resolved_kwargs = {}
for i, arg in enumerate(args):
if isinstance(arg, Future):
resolved_args.append(arg.result())
else:
resolved_args.append(arg)
for kw, arg in kwargs.items():
if isinstance(arg, Future):
resolved_kwargs[kw] = arg.result()
else:
resolved_kwargs[kw] = arg
return self.func(*resolved_args, **resolved_kwargs)
def __call__(self, *args, **kwargs):
return self.executor.submit(self.resolve, *args, **kwargs)

View File

@ -23,8 +23,8 @@ import oslo_messaging as om
from watcher.applier.manager import APPLIER_MANAGER_OPTS
from watcher.applier.manager import opt_group
from watcher.common import exception
from watcher.common.messaging.messaging_core import MessagingCore
from watcher.common.messaging.notification_handler import NotificationHandler
from watcher.common.messaging import messaging_core
from watcher.common.messaging import notification_handler as notification
from watcher.common import utils
@ -34,7 +34,7 @@ CONF.register_group(opt_group)
CONF.register_opts(APPLIER_MANAGER_OPTS, opt_group)
class ApplierAPI(MessagingCore):
class ApplierAPI(messaging_core.MessagingCore):
def __init__(self):
super(ApplierAPI, self).__init__(
@ -43,7 +43,7 @@ class ApplierAPI(MessagingCore):
CONF.watcher_applier.topic_status,
api_version=self.API_VERSION,
)
self.handler = NotificationHandler(self.publisher_id)
self.handler = notification.NotificationHandler(self.publisher_id)
self.handler.register_observer(self)
self.topic_status.add_endpoint(self.handler)
transport = om.get_transport(CONF)

View File

@ -20,26 +20,34 @@ import abc
import six
from watcher.applier.messaging import events
from watcher.applier.primitives import factory
from watcher.applier.actions import factory
from watcher.applier.messaging import event_types
from watcher.common.messaging.events import event
from watcher import objects
@six.add_metaclass(abc.ABCMeta)
class BaseActionPlanExecutor(object):
def __init__(self, manager_applier, context):
self._manager_applier = manager_applier
self._context = context
class BaseWorkFlowEngine(object):
def __init__(self):
self._applier_manager = None
self._context = None
self._action_factory = factory.ActionFactory()
@property
def context(self):
return self._context
@context.setter
def context(self, c):
self._context = c
@property
def manager_applier(self):
return self._manager_applier
def applier_manager(self):
return self._applier_manager
@applier_manager.setter
def applier_manager(self, a):
self._applier_manager = a
@property
def action_factory(self):
@ -50,11 +58,11 @@ class BaseActionPlanExecutor(object):
db_action.state = state
db_action.save()
ev = event.Event()
ev.type = events.Events.LAUNCH_ACTION
ev.type = event_types.EventTypes.LAUNCH_ACTION
ev.data = {}
payload = {'action_uuid': action.uuid,
'action_state': state}
self.manager_applier.topic_status.publish_event(ev.type.name,
self.applier_manager.topic_status.publish_event(ev.type.name,
payload)
@abc.abstractmethod

View File

@ -0,0 +1,159 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from oslo_log import log
from taskflow import engines
from taskflow.patterns import graph_flow as gf
from taskflow import task
from watcher._i18n import _LE, _LW, _LC
from watcher.applier.workflow_engine import base
from watcher.objects import action as obj_action
LOG = log.getLogger(__name__)
class DefaultWorkFlowEngine(base.BaseWorkFlowEngine):
def decider(self, history):
# FIXME(jed) not possible with the current Watcher Planner
#
# decider A callback function that will be expected to
# decide at runtime whether v should be allowed to execute
# (or whether the execution of v should be ignored,
# and therefore not executed). It is expected to take as single
# keyword argument history which will be the execution results of
# all u decideable links that have v as a target. It is expected
# to return a single boolean
# (True to allow v execution or False to not).
return True
def execute(self, actions):
try:
# NOTE(jed) We want to have a strong separation of concern
# between the Watcher planner and the Watcher Applier in order
# to us the possibility to support several workflow engine.
# We want to provide the 'taskflow' engine by
# default although we still want to leave the possibility for
# the users to change it.
# todo(jed) we need to change the way the actions are stored.
# The current implementation only use a linked list of actions.
# todo(jed) add olso conf for retry and name
flow = gf.Flow("watcher_flow")
previous = None
for a in actions:
task = TaskFlowActionContainer(a, self)
flow.add(task)
if previous is None:
previous = task
# we have only one Action in the Action Plan
if len(actions) == 1:
nop = TaskFlowNop()
flow.add(nop)
flow.link(previous, nop)
else:
# decider == guard (UML)
flow.link(previous, task, decider=self.decider)
previous = task
e = engines.load(flow)
e.run()
return True
except Exception as e:
LOG.exception(e)
return False
class TaskFlowActionContainer(task.Task):
def __init__(self, db_action, engine):
name = "action_type:{0} uuid:{1}".format(db_action.action_type,
db_action.uuid)
super(TaskFlowActionContainer, self).__init__(name=name)
self._db_action = db_action
self._engine = engine
self.loaded_action = None
@property
def action(self):
if self.loaded_action is None:
action = self.engine.action_factory.make_action(self._db_action)
self.loaded_action = action
return self.loaded_action
@property
def engine(self):
return self._engine
def pre_execute(self):
try:
self.engine.notify(self._db_action,
obj_action.Status.ONGOING)
LOG.debug("Precondition action %s", self.name)
self.action.precondition()
except Exception as e:
LOG.exception(e)
self.engine.notify(self._db_action,
obj_action.Status.FAILED)
raise
def execute(self, *args, **kwargs):
try:
LOG.debug("Running action %s", self.name)
# todo(jed) remove return (true or false) raise an Exception
result = self.action.execute()
if result is not True:
self.engine.notify(self._db_action,
obj_action.Status.FAILED)
else:
self.engine.notify(self._db_action,
obj_action.Status.SUCCEEDED)
except Exception as e:
LOG.exception(e)
LOG.error(_LE('The WorkFlow Engine has failed '
'to execute the action %s'), self.name)
self.engine.notify(self._db_action,
obj_action.Status.FAILED)
raise
def post_execute(self):
try:
LOG.debug("postcondition action %s", self.name)
self.action.postcondition()
except Exception as e:
LOG.exception(e)
self.engine.notify(self._db_action,
obj_action.Status.FAILED)
raise
def revert(self, *args, **kwargs):
LOG.warning(_LW("Revert action %s"), self.name)
try:
# todo(jed) do we need to update the states in case of failure ?
self.action.revert()
except Exception as e:
LOG.exception(e)
LOG.critical(_LC("Oops! We need disaster recover plan"))
class TaskFlowNop(task.Task):
"""This class is use in case of the workflow have only one Action.
We need at least two atoms to create a link
"""
def execute(self):
pass

View File

@ -0,0 +1,30 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import unicode_literals
from oslo_log import log
from watcher.common.loader import default
LOG = log.getLogger(__name__)
class DefaultWorkFlowEngineLoader(default.DefaultLoader):
def __init__(self):
super(DefaultWorkFlowEngineLoader, self).__init__(
namespace='watcher_workflow_engines')

View File

@ -328,7 +328,7 @@ class NovaClient(object):
return False
def live_migrate_instance(self, instance_id, dest_hostname,
block_migration=True, retry=120):
block_migration=False, retry=120):
"""This method does a live migration of a given instance
This method uses the Nova built-in live_migrate()

View File

@ -30,8 +30,9 @@ LOG = log.getLogger(__name__)
class DefaultPlanner(base.BasePlanner):
priorities = {
'nop': 0,
'migrate': 1,
'sleep': 1,
'change_nova_service_state': 2,
'migrate': 3,
}
def create_action(self,
@ -53,7 +54,7 @@ class DefaultPlanner(base.BasePlanner):
return action
def schedule(self, context, audit_id, solution):
LOG.debug('Create an action plan for the audit uuid')
LOG.debug('Create an action plan for the audit uuid: %s ', audit_id)
action_plan = self._create_action_plan(context, audit_id)
actions = list(solution.actions)
@ -76,18 +77,20 @@ class DefaultPlanner(base.BasePlanner):
action_plan.first_action_id = None
action_plan.save()
else:
# create the first action
parent_action = self._create_action(context,
scheduled[0][1],
None)
# remove first
scheduled.pop(0)
action_plan.first_action_id = parent_action.id
action_plan.save()
for s_action in scheduled:
action = self._create_action(context, s_action[1],
parent_action)
parent_action = action
current_action = self._create_action(context, s_action[1],
parent_action)
parent_action = current_action
return action_plan
@ -105,16 +108,19 @@ class DefaultPlanner(base.BasePlanner):
return new_action_plan
def _create_action(self, context, _action, parent_action):
action_description = str(_action)
LOG.debug("Create a action for the following resquest : %s"
% action_description)
try:
LOG.debug("Creating the %s in watcher db",
_action.get("action_type"))
new_action = objects.Action(context, **_action)
new_action.create(context)
new_action.save()
new_action = objects.Action(context, **_action)
new_action.create(context)
new_action.save()
if parent_action:
parent_action.next = new_action.id
parent_action.save()
if parent_action:
parent_action.next = new_action.id
parent_action.save()
return new_action
return new_action
except Exception as exc:
LOG.exception(exc)
raise

View File

@ -0,0 +1,27 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from watcher.decision_engine.strategy.strategies import basic_consolidation
from watcher.decision_engine.strategy.strategies import dummy_strategy
from watcher.decision_engine.strategy.strategies import outlet_temp_control
BasicConsolidation = basic_consolidation.BasicConsolidation
OutletTempControl = outlet_temp_control.OutletTempControl
DummyStrategy = dummy_strategy.DummyStrategy
__all__ = (BasicConsolidation, OutletTempControl, DummyStrategy)

View File

@ -336,11 +336,11 @@ class BasicConsolidation(BaseStrategy):
def add_migration(self,
applies_to,
migration_type,
src_hypervisor_uuid,
dst_hypervisor_uuid):
src_hypervisor,
dst_hypervisor):
parameters = {'migration_type': migration_type,
'src_hypervisor_uuid': src_hypervisor_uuid,
'dst_hypervisor_uuid': dst_hypervisor_uuid}
'src_hypervisor': src_hypervisor,
'dst_hypervisor': dst_hypervisor}
self.solution.add_action(action_type=self.MIGRATION,
applies_to=applies_to,
input_parameters=parameters)

View File

@ -28,6 +28,7 @@ class DummyStrategy(BaseStrategy):
DEFAULT_DESCRIPTION = "Dummy Strategy"
NOP = "nop"
SLEEP = "sleep"
def __init__(self, name=DEFAULT_NAME, description=DEFAULT_DESCRIPTION):
super(DummyStrategy, self).__init__(name, description)
@ -38,6 +39,12 @@ class DummyStrategy(BaseStrategy):
applies_to="",
input_parameters=parameters)
# todo(jed) add a new action to test the flow
# with two differents actions
parameters = {'message': 'Welcome'}
self.solution.add_action(action_type=self.NOP,
applies_to="",
input_parameters=parameters)
self.solution.add_action(action_type=self.SLEEP,
applies_to="",
input_parameters={'duration': '5'})
return self.solution

View File

@ -237,8 +237,8 @@ class OutletTempControl(BaseStrategy):
mig_src_hypervisor,
mig_dst_hypervisor):
parameters = {'migration_type': 'live',
'src_hypervisor_uuid': mig_src_hypervisor,
'dst_hypervisor_uuid': mig_dst_hypervisor}
'src_hypervisor': mig_src_hypervisor,
'dst_hypervisor': mig_dst_hypervisor}
self.solution.add_action(action_type=self.MIGRATION,
applies_to=vm_src,
input_parameters=parameters)

View File

@ -8,7 +8,7 @@ msgid ""
msgstr ""
"Project-Id-Version: python-watcher 0.21.1.dev32\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2016-01-15 10:25+0100\n"
"POT-Creation-Date: 2016-01-19 17:54+0100\n"
"PO-Revision-Date: 2015-12-11 15:42+0100\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language: fr\n"
@ -71,14 +71,24 @@ msgstr ""
msgid "Error parsing HTTP response: %s"
msgstr ""
#: watcher/applier/execution/default.py:52
msgid "Trigger a rollback"
msgstr ""
#: watcher/applier/primitives/change_nova_service_state.py:66
#: watcher/applier/actions/change_nova_service_state.py:58
msgid "The target state is not defined"
msgstr ""
#: watcher/applier/workflow_engine/default.py:69
#, python-format
msgid "The WorkFlow Engine has failed to execute the action %s"
msgstr "Le moteur de workflow a echoué lors de l'éxécution de l'action %s"
#: watcher/applier/workflow_engine/default.py:77
#, python-format
msgid "Revert action %s"
msgstr "Annulation de l'action %s"
#: watcher/applier/workflow_engine/default.py:83
msgid "Oops! We need disaster recover plan"
msgstr "Oops! Nous avons besoin d'un plan de reprise d'activité"
#: watcher/cmd/api.py:46 watcher/cmd/applier.py:39
#: watcher/cmd/decisionengine.py:40
#, python-format
@ -353,7 +363,7 @@ msgstr ""
msgid "'obj' argument type is not valid"
msgstr ""
#: watcher/decision_engine/planner/default.py:75
#: watcher/decision_engine/planner/default.py:76
msgid "The action plan is empty"
msgstr ""
@ -547,3 +557,9 @@ msgstr ""
#~ msgid "The hypervisor could not be found"
#~ msgstr ""
#~ msgid "Trigger a rollback"
#~ msgstr ""
#~ msgid "The WorkFlow Engine has failedto execute the action %s"
#~ msgstr ""

View File

@ -7,9 +7,9 @@
#, fuzzy
msgid ""
msgstr ""
"Project-Id-Version: python-watcher 0.22.1.dev19\n"
"Project-Id-Version: python-watcher 0.22.1.dev28\n"
"Report-Msgid-Bugs-To: EMAIL@ADDRESS\n"
"POT-Creation-Date: 2016-01-15 10:25+0100\n"
"POT-Creation-Date: 2016-01-19 17:54+0100\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
@ -70,12 +70,22 @@ msgstr ""
msgid "Error parsing HTTP response: %s"
msgstr ""
#: watcher/applier/execution/default.py:52
msgid "Trigger a rollback"
#: watcher/applier/actions/change_nova_service_state.py:58
msgid "The target state is not defined"
msgstr ""
#: watcher/applier/primitives/change_nova_service_state.py:66
msgid "The target state is not defined"
#: watcher/applier/workflow_engine/default.py:69
#, python-format
msgid "The WorkFlow Engine has failed to execute the action %s"
msgstr ""
#: watcher/applier/workflow_engine/default.py:77
#, python-format
msgid "Revert action %s"
msgstr ""
#: watcher/applier/workflow_engine/default.py:83
msgid "Oops! We need disaster recover plan"
msgstr ""
#: watcher/cmd/api.py:46 watcher/cmd/applier.py:39
@ -351,7 +361,7 @@ msgstr ""
msgid "'obj' argument type is not valid"
msgstr ""
#: watcher/decision_engine/planner/default.py:75
#: watcher/decision_engine/planner/default.py:76
msgid "The action plan is empty"
msgstr ""

View File

@ -20,7 +20,7 @@ from mock import call
from mock import MagicMock
from watcher.applier.action_plan.default import DefaultActionPlanHandler
from watcher.applier.messaging.events import Events
from watcher.applier.messaging.event_types import EventTypes
from watcher.objects.action_plan import Status
from watcher.objects import ActionPlan
from watcher.tests.db.base import DbTestCase
@ -33,17 +33,7 @@ class TestDefaultActionPlanHandler(DbTestCase):
self.action_plan = obj_utils.create_test_action_plan(
self.context)
def test_launch_action_plan_wihout_errors(self):
try:
command = DefaultActionPlanHandler(self.context, MagicMock(),
self.action_plan.uuid)
command.execute()
except Exception as e:
self.fail(
"The ActionPlan should be trigged wihtour error" + unicode(e))
def test_launch_action_plan_state_failed(self):
def test_launch_action_plan(self):
command = DefaultActionPlanHandler(self.context, MagicMock(),
self.action_plan.uuid)
command.execute()
@ -57,10 +47,10 @@ class TestDefaultActionPlanHandler(DbTestCase):
self.action_plan.uuid)
command.execute()
call_on_going = call(Events.LAUNCH_ACTION_PLAN.name, {
call_on_going = call(EventTypes.LAUNCH_ACTION_PLAN.name, {
'action_plan_status': Status.ONGOING,
'action_plan__uuid': self.action_plan.uuid})
call_succeeded = call(Events.LAUNCH_ACTION_PLAN.name, {
call_succeeded = call(EventTypes.LAUNCH_ACTION_PLAN.name, {
'action_plan_status': Status.SUCCEEDED,
'action_plan__uuid': self.action_plan.uuid})

View File

@ -0,0 +1,32 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import unicode_literals
from watcher.applier.actions import base as abase
from watcher.applier.actions.loading import default
from watcher.tests import base
class TestDefaultActionLoader(base.TestCase):
def setUp(self):
super(TestDefaultActionLoader, self).setUp()
self.loader = default.DefaultActionLoader()
def test_endpoints(self):
for endpoint in self.loader.list_available():
loaded = self.loader.load(endpoint)
self.assertIsNotNone(loaded)
self.assertIsInstance(loaded, abase.BaseAction)

View File

@ -1,56 +0,0 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import mock
from watcher.applier.execution import default
from watcher.common import utils
from watcher import objects
from watcher.tests.db import base
class TestDefaultActionPlanExecutor(base.DbTestCase):
def setUp(self):
super(TestDefaultActionPlanExecutor, self).setUp()
self.executor = default.DefaultActionPlanExecutor(mock.MagicMock(),
self.context)
def test_execute(self):
actions = mock.MagicMock()
result = self.executor.execute(actions)
self.assertEqual(result, True)
def test_execute_with_actions(self):
actions = []
action = {
'uuid': utils.generate_uuid(),
'action_plan_id': 0,
'action_type': "nop",
'applies_to': '',
'input_parameters': {'state': 'OFFLINE'},
'state': objects.action.Status.PENDING,
'alarm': None,
'next': None,
}
new_action = objects.Action(self.context, **action)
new_action.create(self.context)
new_action.save()
actions.append(objects.Action.get_by_uuid(self.context,
action['uuid']))
result = self.executor.execute(actions)
self.assertEqual(result, True)

View File

@ -18,8 +18,9 @@
#
from mock import MagicMock
from watcher.applier.messaging.trigger import TriggerActionPlan
import mock
from watcher.applier.messaging import trigger
from watcher.common import utils
from watcher.tests import base
@ -27,8 +28,8 @@ from watcher.tests import base
class TestTriggerActionPlan(base.TestCase):
def __init__(self, *args, **kwds):
super(TestTriggerActionPlan, self).__init__(*args, **kwds)
self.applier = MagicMock()
self.endpoint = TriggerActionPlan(self.applier)
self.applier = mock.MagicMock()
self.endpoint = trigger.TriggerActionPlan(self.applier)
def setUp(self):
super(TestTriggerActionPlan, self).setUp()

View File

@ -0,0 +1,32 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2016 b<>com
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import unicode_literals
from watcher.applier.workflow_engine import base as wbase
from watcher.applier.workflow_engine.loading import default
from watcher.tests import base
class TestDefaultActionLoader(base.TestCase):
def setUp(self):
super(TestDefaultActionLoader, self).setUp()
self.loader = default.DefaultWorkFlowEngineLoader()
def test_endpoints(self):
for endpoint in self.loader.list_available():
loaded = self.loader.load(endpoint)
self.assertIsNotNone(loaded)
self.assertIsInstance(loaded, wbase.BaseWorkFlowEngine)

View File

@ -0,0 +1,164 @@
# -*- encoding: utf-8 -*-
# Copyright (c) 2015 b<>com
#
# Authors: Jean-Emile DARTOIS <jean-emile.dartois@b-com.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import abc
import mock
import six
from stevedore import driver
from stevedore import extension
from watcher.applier.actions import base as abase
from watcher.applier.workflow_engine import default as tflow
from watcher.common import utils
from watcher import objects
from watcher.tests.db import base
@six.add_metaclass(abc.ABCMeta)
class FakeAction(abase.BaseAction):
def precondition(self):
pass
def revert(self):
pass
def execute(self):
raise Exception()
@classmethod
def namespace(cls):
return "TESTING"
@classmethod
def get_name(cls):
return 'fake_action'
class TestDefaultWorkFlowEngine(base.DbTestCase):
def setUp(self):
super(TestDefaultWorkFlowEngine, self).setUp()
self.engine = tflow.DefaultWorkFlowEngine()
self.engine.context = self.context
self.engine.applier_manager = mock.MagicMock()
def test_execute(self):
actions = mock.MagicMock()
result = self.engine.execute(actions)
self.assertEqual(result, True)
def create_action(self, action_type, applies_to, parameters, next):
action = {
'uuid': utils.generate_uuid(),
'action_plan_id': 0,
'action_type': action_type,
'applies_to': applies_to,
'input_parameters': parameters,
'state': objects.action.Status.PENDING,
'alarm': None,
'next': next,
}
new_action = objects.Action(self.context, **action)
new_action.create(self.context)
new_action.save()
return new_action
def check_action_state(self, action, expected_state):
to_check = objects.Action.get_by_uuid(self.context, action.uuid)
self.assertEqual(to_check.state, expected_state)
def check_actions_state(self, actions, expected_state):
for a in actions:
self.check_action_state(a, expected_state)
def test_execute_with_no_actions(self):
actions = []
result = self.engine.execute(actions)
self.assertEqual(result, True)
def test_execute_with_one_action(self):
actions = [self.create_action("nop", "", {'message': 'test'}, None)]
result = self.engine.execute(actions)
self.assertEqual(result, True)
self.check_actions_state(actions, objects.action.Status.SUCCEEDED)
def test_execute_with_two_actions(self):
actions = []
next = self.create_action("sleep", "", {'duration': '0'}, None)
first = self.create_action("nop", "", {'message': 'test'}, next.id)
actions.append(first)
actions.append(next)
result = self.engine.execute(actions)
self.assertEqual(result, True)
self.check_actions_state(actions, objects.action.Status.SUCCEEDED)
def test_execute_with_three_actions(self):
actions = []
next2 = self.create_action("nop", "vm1", {'message': 'next'}, None)
next = self.create_action("sleep", "vm1", {'duration': '0'}, next2.id)
first = self.create_action("nop", "vm1", {'message': 'hello'}, next.id)
self.check_action_state(first, objects.action.Status.PENDING)
self.check_action_state(next, objects.action.Status.PENDING)
self.check_action_state(next2, objects.action.Status.PENDING)
actions.append(first)
actions.append(next)
actions.append(next2)
result = self.engine.execute(actions)
self.assertEqual(result, True)
self.check_actions_state(actions, objects.action.Status.SUCCEEDED)
def test_execute_with_exception(self):
actions = []
next2 = self.create_action("no_exist",
"vm1", {'message': 'next'}, None)
next = self.create_action("sleep", "vm1",
{'duration': '0'}, next2.id)
first = self.create_action("nop", "vm1",
{'message': 'hello'}, next.id)
self.check_action_state(first, objects.action.Status.PENDING)
self.check_action_state(next, objects.action.Status.PENDING)
self.check_action_state(next2, objects.action.Status.PENDING)
actions.append(first)
actions.append(next)
actions.append(next2)
result = self.engine.execute(actions)
self.assertEqual(result, False)
self.check_action_state(first, objects.action.Status.SUCCEEDED)
self.check_action_state(next, objects.action.Status.SUCCEEDED)
self.check_action_state(next2, objects.action.Status.FAILED)
@mock.patch("watcher.common.loader.default.DriverManager")
def test_execute_with_action_exception(self, m_driver):
m_driver.return_value = driver.DriverManager.make_test_instance(
extension=extension.Extension(name=FakeAction.get_name(),
entry_point="%s:%s" % (
FakeAction.__module__,
FakeAction.__name__),
plugin=FakeAction,
obj=None),
namespace=FakeAction.namespace())
actions = [self.create_action("dontcare", "vm1", {}, None)]
result = self.engine.execute(actions)
self.assertEqual(result, False)
self.check_action_state(actions[0], objects.action.Status.FAILED)

View File

@ -18,25 +18,26 @@ import mock
from watcher.common import utils
from watcher.db import api as db_api
from watcher.decision_engine.planner.default import DefaultPlanner
from watcher.decision_engine.solution.default import DefaultSolution
from watcher.decision_engine.strategy.strategies.basic_consolidation import \
BasicConsolidation
from watcher.decision_engine.planner import default as pbase
from watcher.decision_engine.solution import default as dsol
from watcher.decision_engine.strategy import strategies
from watcher import objects
from watcher.tests.db import base
from watcher.tests.db import utils as db_utils
from watcher.tests.decision_engine.strategy.strategies.faker_cluster_state \
import FakerModelCollector
from watcher.tests.decision_engine.strategy.strategies.faker_metrics_collector \
import FakerMetricsCollector
from watcher.tests.decision_engine.strategy.strategies \
import faker_cluster_state
from watcher.tests.decision_engine.strategy.strategies \
import faker_metrics_collector as fake
from watcher.tests.objects import utils as obj_utils
class SolutionFaker(object):
@staticmethod
def build():
metrics = FakerMetricsCollector()
current_state_cluster = FakerModelCollector()
sercon = BasicConsolidation("basic", "Basic offline consolidation")
metrics = fake.FakerMetricsCollector()
current_state_cluster = faker_cluster_state.FakerModelCollector()
sercon = strategies.BasicConsolidation("basic",
"Basic offline consolidation")
sercon.ceilometer = mock.\
MagicMock(get_statistics=metrics.mock_get_statistics)
return sercon.execute(current_state_cluster.generate_scenario_1())
@ -45,9 +46,10 @@ class SolutionFaker(object):
class SolutionFakerSingleHyp(object):
@staticmethod
def build():
metrics = FakerMetricsCollector()
current_state_cluster = FakerModelCollector()
sercon = BasicConsolidation("basic", "Basic offline consolidation")
metrics = fake.FakerMetricsCollector()
current_state_cluster = faker_cluster_state.FakerModelCollector()
sercon = strategies.BasicConsolidation("basic",
"Basic offline consolidation")
sercon.ceilometer = \
mock.MagicMock(get_statistics=metrics.mock_get_statistics)
@ -57,9 +59,9 @@ class SolutionFakerSingleHyp(object):
class TestActionScheduling(base.DbTestCase):
def test_schedule_actions(self):
default_planner = DefaultPlanner()
default_planner = pbase.DefaultPlanner()
audit = db_utils.create_test_audit(uuid=utils.generate_uuid())
solution = DefaultSolution()
solution = dsol.DefaultSolution()
parameters = {
"src_uuid_hypervisor": "server1",
@ -70,7 +72,7 @@ class TestActionScheduling(base.DbTestCase):
input_parameters=parameters)
with mock.patch.object(
DefaultPlanner, "create_action",
pbase.DefaultPlanner, "create_action",
wraps=default_planner.create_action) as m_create_action:
action_plan = default_planner.schedule(
self.context, audit.id, solution
@ -78,12 +80,46 @@ class TestActionScheduling(base.DbTestCase):
self.assertIsNotNone(action_plan.uuid)
self.assertEqual(m_create_action.call_count, 1)
filters = {'action_plan_id': action_plan.id}
actions = objects.Action.dbapi.get_action_list(self.context, filters)
self.assertEqual(actions[0].action_type, "migrate")
def test_schedule_two_actions(self):
default_planner = pbase.DefaultPlanner()
audit = db_utils.create_test_audit(uuid=utils.generate_uuid())
solution = dsol.DefaultSolution()
parameters = {
"src_uuid_hypervisor": "server1",
"dst_uuid_hypervisor": "server2",
}
solution.add_action(action_type="migrate",
applies_to="b199db0c-1408-4d52-b5a5-5ca14de0ff36",
input_parameters=parameters)
solution.add_action(action_type="nop",
applies_to="",
input_parameters={})
with mock.patch.object(
pbase.DefaultPlanner, "create_action",
wraps=default_planner.create_action) as m_create_action:
action_plan = default_planner.schedule(
self.context, audit.id, solution
)
self.assertIsNotNone(action_plan.uuid)
self.assertEqual(m_create_action.call_count, 2)
# check order
filters = {'action_plan_id': action_plan.id}
actions = objects.Action.dbapi.get_action_list(self.context, filters)
self.assertEqual(actions[0].action_type, "nop")
self.assertEqual(actions[1].action_type, "migrate")
class TestDefaultPlanner(base.DbTestCase):
def setUp(self):
super(TestDefaultPlanner, self).setUp()
self.default_planner = DefaultPlanner()
self.default_planner = pbase.DefaultPlanner()
obj_utils.create_test_audit_template(self.context)
p = mock.patch.object(db_api.BaseConnection, 'create_action_plan')