Switch to taskflow for collector

Change-Id: I9a7abed07e99f6061d88782f6ea8bd1c3a0c9a93
This commit is contained in:
Endre Karlson 2013-08-21 00:07:30 +02:00
parent 2b757687e3
commit 117c68ee2d
24 changed files with 2555 additions and 6 deletions

View File

@ -0,0 +1,17 @@
# -*- encoding: utf-8 -*-
#
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,115 @@
# -*- encoding: utf-8 -*-
#
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from billingstack import exceptions
from billingstack import tasks
from billingstack.openstack.common import log
from billingstack.payment_gateway import get_provider
from billingstack.taskflow.patterns import linear_flow, threaded_flow
ACTION = 'gateway_configuration:create'
LOG = log.getLogger(__name__)
class EntryCreateTask(tasks.RootTask):
def __init__(self, storage, **kw):
super(EntryCreateTask, self).__init__(**kw)
self.requires.update(['gateway_config'])
self.provides.update(['gateway_config'])
self.storage = storage
def __call__(self, context, gateway_config):
values = self.storage.create_pg_config(context, gateway_config)
return {'gateway_config': values}
class ThreadStartTask(tasks.RootTask):
"""
This is the end of the current flow, we'll fire off a new threaded flow
that does stuff towards the actual Gateway which may include blocking code.
"""
def __init__(self, storage, **kw):
super(ThreadStartTask, self).__init__(**kw)
self.requires.update(['gateway_config'])
self.storage = storage
def __call__(self, ctxt, gateway_config):
flow = threaded_flow.Flow(ACTION + ':backend')
flow.add(tasks.ValuesInjectTask({'gateway_config': gateway_config}))
flow.add(PrerequirementsTask(self.storage))
flow.add(BackendVerifyTask(self.storage))
flow.run(ctxt)
class PrerequirementsTask(tasks.RootTask):
"""
Fetch provider information for use in the next task.
"""
def __init__(self, storage, **kw):
super(PrerequirementsTask, self).__init__(**kw)
self.requires.update(['gateway_config'])
self.provides.update([
'gateway_config',
'gateway_provider'
])
self.storage = storage
def __call__(self, ctxt, gateway_config):
gateway_provider = self.storage.get_pg_provider(
gateway_config['providedr_id'])
return {
'gateway_config': gateway_config,
'gateway_provider': gateway_provider
}
class BackendVerifyTask(tasks.RootTask):
"""
This is the verification task that runs in a threaded flow.
1. Load the Provider Plugin via entrypoints
2. Instantiate the Plugin with the Config
3. Execute verify_config call
4. Update storage accordingly
"""
def __init__(self, storage, **kw):
super(BackendVerifyTask, self).__init__(**kw)
self.requires.update(['gateway_config', 'gateway_provider'])
self.storage = storage
def __call__(self, ctxt, gateway_config, gateway_provider):
gateway_provider_cls = get_provider[gateway_provider['name']]
gateway_provider_obj = gateway_provider_cls(gateway_config)
try:
gateway_provider_obj.verify_config()
except exceptions.ConfigurationError:
raise
def create_flow(storage, values):
flow = linear_flow.Flow(ACTION)
flow.add(tasks.ValuesInjectTask(
{'gateway_config': values},
prefix=ACTION + ':initial'))
entry_task = EntryCreateTask(storage, prefix=ACTION)
entry_task_id = flow.add(entry_task)
return entry_task_id, tasks._attach_debug_listeners(flow)

View File

@ -0,0 +1,120 @@
# -*- encoding: utf-8 -*-
#
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from billingstack import tasks
from billingstack.openstack.common import log
from billingstack.payment_gateway import get_provider
from billingstack.taskflow.patterns import linear_flow, threaded_flow
ACTION = 'payment_method:create'
LOG = log.getLogger(__name__)
class EntryCreateTask(tasks.RootTask):
"""
Create the initial entry in the database
"""
def __init__(self, storage, **kw):
super(EntryCreateTask, self).__init__(**kw)
self.requires.update(['payment_method'])
self.provides.update(['payment_method'])
self.storage = storage
def __call__(self, ctxt, payment_method):
values = self.storage.create_payment_method(ctxt, payment_method)
return {'payment_method': values}
class ThreadStartTask(tasks.RootTask):
"""
This is the end of the current flow, we'll fire off a new threaded flow
that does stuff towards the actual Gateway which may include blocking code.
This fires off a new flow that is threaded / greenthreads?
"""
def __init__(self, storage, **kw):
super(ThreadStartTask, self).__init__(**kw)
self.requires.update(['payment_method'])
self.storage = storage
def __call__(self, ctxt, payment_method):
flow = threaded_flow.Flow(ACTION + ':backend')
flow.add(tasks.ValuesInjectTask({'payment_method': payment_method}))
flow.add(PrerequirementsTask(self.storage))
flow.add(BackendCreateTask(self.storage))
flow.run(ctxt)
class PrerequirementsTask(tasks.RootTask):
"""
Task to get the config and the provider from the catalog / database.
"""
def __init__(self, storage, **kw):
super(PrerequirementsTask, self).__init__(**kw)
self.requires.update(['payment_method'])
self.provides.update([
'payment_method',
'gateway_config',
'gateway_provider'])
self.storage = storage
def __call__(self, ctxt, **kw):
kw['gateway_config'] = self.storage.get_pg_config(
ctxt, kw['payment_method']['provider_config_id'])
kw['gateway_provider'] = self.storage.get_pg_provider(
ctxt, kw['gateway_config']['provider_id'])
return kw
class BackendCreateTask(tasks.RootTask):
def __init__(self, storage, **kw):
super(BackendCreateTask, self).__init__(**kw)
self.requires.update([
'payment_method',
'gateway_config',
'gateway_provider'])
self.storage = storage
def __call__(self, ctxt, payment_method, gateway_config, gateway_provider):
gateway_provider_cls = get_provider(gateway_provider['name'])
gateway_provider_obj = gateway_provider_cls(gateway_config)
gateway_provider_obj.create_payment_method(
payment_method['customer_id'],
payment_method)
def create_flow(storage, payment_method):
"""
The flow for the service to start
"""
flow = linear_flow.Flow(ACTION + ':initial')
flow.add(tasks.ValuesInjectTask(
{'payment_method': payment_method},
prefix=ACTION))
entry_task = EntryCreateTask(storage, prefix=ACTION)
entry_task_id = flow.add(entry_task)
flow.add(ThreadStartTask(storage, prefix=ACTION))
return entry_task_id, tasks._attach_debug_listeners(flow)

View File

@ -26,6 +26,8 @@ from billingstack.openstack.common import service as os_service
from billingstack.storage.utils import get_connection
from billingstack.central.rpcapi import CentralAPI
from billingstack import service as bs_service
from billingstack.collector.flows import (
gateway_configuration, payment_method)
cfg.CONF.import_opt('host', 'billingstack.netconf')
@ -62,7 +64,10 @@ class Service(rpc_service.Service):
# PGC
def create_pg_config(self, ctxt, values):
return self.storage_conn.create_pg_config(ctxt, values)
id_, flow = gateway_configuration.create_flow(
self.storage_conn, values)
flow.run(ctxt)
return flow.results[id_]['gateway_config']
def list_pg_configs(self, ctxt, **kw):
return self.storage_conn.list_pg_configs(ctxt, **kw)
@ -78,7 +83,10 @@ class Service(rpc_service.Service):
# PM
def create_payment_method(self, ctxt, values):
return self.storage_conn.create_payment_method(ctxt, values)
id_, flow = payment_method.create_flow(
self.storage_conn, values)
flow.run(ctxt)
return flow.results[id_]['payment_method']
def list_payment_methods(self, ctxt, **kw):
return self.storage_conn.list_payment_methods(ctxt, **kw)

View File

@ -0,0 +1,20 @@
# -*- encoding: utf-8 -*-
#
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
PENDING = u'PENDING'
CREATED = u'CREATED'
INVALID = u'INVALID'

View File

@ -21,6 +21,7 @@ from sqlalchemy import Unicode
from sqlalchemy.orm import exc, relationship
from sqlalchemy.ext.declarative import declarative_base
from billingstack.collector import states
from billingstack.collector.storage import Connection, StorageEngine
from billingstack.openstack.common import log as logging
from billingstack.sqlalchemy.types import JSON, UUID
@ -116,6 +117,8 @@ class PGConfig(BASE, model_base.BaseMixin):
onupdate='CASCADE'),
nullable=False)
state = Column(Unicode(20), default=states.PENDING)
class PaymentMethod(BASE, model_base.BaseMixin):
name = Column(Unicode(255), nullable=False)
@ -132,6 +135,8 @@ class PaymentMethod(BASE, model_base.BaseMixin):
provider_config_id = Column(UUID, ForeignKey('pg_config.id',
onupdate='CASCADE'), nullable=False)
state = Column(Unicode(20), default=states.PENDING)
class SQLAlchemyEngine(StorageEngine):
__plugin_name__ = 'sqlalchemy'

View File

@ -17,6 +17,9 @@ from billingstack.plugin import Plugin
class Provider(Plugin):
"""
Base API for Gateway Plugins.
"""
__plugin_ns__ = 'billingstack.payment_gateway'
__plugin_type__ = 'payment_gateway'
@ -44,6 +47,9 @@ class Provider(Plugin):
@classmethod
def values(cls):
"""
The values for this provider, used when registering in the catalog.
"""
return dict(
name=cls.get_plugin_name(),
title=cls.__title__,
@ -56,7 +62,14 @@ class Provider(Plugin):
"""
raise NotImplementedError
@classmethod
def verify_config(self):
"""
Verify a configuration.
Raise ConfigurationError if invalid config.
"""
raise NotImplementedError
def create_account(self, values):
"""
Create a new Account

View File

@ -16,6 +16,11 @@
from billingstack.payment_gateway.base import Provider
class DummyClient(object):
def __init__(self):
pass
class DummyProvider(Provider):
"""
A Stupid Provider that does nothing
@ -32,3 +37,12 @@ class DummyProvider(Provider):
@classmethod
def properties(cls):
return {"enabled": 0}
def get_client(self):
return DummyClient()
def create_payment_method(self, account_id, values):
return True
def verify_config(self):
return True

View File

@ -1,7 +1,7 @@
[
{
"name" : "braintree",
"title" : "BrainTree Payment Gateway",
"description" : "www.braintree.com integration provider"
"name" : "dummy",
"title" : "Dummy Provider",
"description" : "Dummy integration provider"
}
]

View File

@ -0,0 +1 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4

View File

@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
from billingstack.taskflow import functor_task
from billingstack.taskflow import utils
def wraps(fn):
"""This will not be needed in python 3.2 or greater which already has this
built-in to its functools.wraps method.
"""
def wrapper(f):
f = functools.wraps(fn)(f)
f.__wrapped__ = getattr(fn, '__wrapped__', fn)
return f
return wrapper
def locked(*args, **kwargs):
def decorator(f):
attr_name = kwargs.get('lock', '_lock')
@wraps(f)
def wrapper(*args, **kwargs):
lock = getattr(args[0], attr_name)
with lock:
return f(*args, **kwargs)
return wrapper
# This is needed to handle when the decorator has args or the decorator
# doesn't have args, python is rather weird here...
if kwargs or not args:
return decorator
else:
if len(args) == 1:
return decorator(args[0])
else:
return decorator
def _original_function(fun):
"""Get original function from static or class method"""
if isinstance(fun, staticmethod):
return fun.__get__(object())
elif isinstance(fun, classmethod):
return fun.__get__(object()).im_func
return fun
def task(*args, **kwargs):
"""Decorates a given function so that it can be used as a task"""
def decorator(f):
def task_factory(execute_with, **factory_kwargs):
merged = kwargs.copy()
merged.update(factory_kwargs)
# NOTE(imelnikov): we can't capture f here because for
# bound methods and bound class methods the object it
# is bound to is yet unknown at the moment
return functor_task.FunctorTask(execute_with, **merged)
w_f = _original_function(f)
setattr(w_f, utils.TASK_FACTORY_ATTRIBUTE, task_factory)
return f
# This is needed to handle when the decorator has args or the decorator
# doesn't have args, python is rather weird here...
if kwargs:
if args:
raise TypeError('task decorator takes 0 positional arguments,'
'%s given' % len(args))
return decorator
else:
if len(args) == 1:
return decorator(args[0])
else:
return decorator

View File

@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class TaskFlowException(Exception):
"""Base class for exceptions emitted from this library."""
pass
class Duplicate(TaskFlowException):
"""Raised when a duplicate entry is found."""
pass
class StorageError(TaskFlowException):
"""Raised when logbook can not be read/saved/deleted."""
def __init__(self, message, cause=None):
super(StorageError, self).__init__(message)
self.cause = cause
class NotFound(TaskFlowException):
"""Raised when some entry in some object doesn't exist."""
pass
class AlreadyExists(TaskFlowException):
"""Raised when some entry in some object already exists."""
pass
class ClosedException(TaskFlowException):
"""Raised when an access on a closed object occurs."""
pass
class InvalidStateException(TaskFlowException):
"""Raised when a task/job/workflow is in an invalid state when an
operation is attempting to apply to said task/job/workflow.
"""
pass
class UnclaimableJobException(TaskFlowException):
"""Raised when a job can not be claimed."""
pass
class JobNotFound(TaskFlowException):
"""Raised when a job entry can not be found."""
pass
class MissingDependencies(InvalidStateException):
"""Raised when a entity has dependencies that can not be satisified."""
message = ("%(who)s requires %(requirements)s but no other entity produces"
" said requirements")
def __init__(self, who, requirements):
message = self.message % {'who': who, 'requirements': requirements}
super(MissingDependencies, self).__init__(message)

View File

@ -0,0 +1,216 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import threading
from billingstack.openstack.common import uuidutils
from billingstack.taskflow import exceptions as exc
from billingstack.taskflow import states
from billingstack.taskflow import utils
class Flow(object):
"""The base abstract class of all flow implementations.
It provides a set of parents to flows that have a concept of parent flows
as well as a state and state utility functions to the deriving classes. It
also provides a name and an identifier (uuid or other) to the flow so that
it can be uniquely identifed among many flows.
Flows are expected to provide (if desired) the following methods:
- add
- add_many
- interrupt
- reset
- rollback
- run
- soft_reset
"""
__metaclass__ = abc.ABCMeta
# Common states that certain actions can be performed in. If the flow
# is not in these sets of states then it is likely that the flow operation
# can not succeed.
RESETTABLE_STATES = set([
states.INTERRUPTED,
states.SUCCESS,
states.PENDING,
states.FAILURE,
])
SOFT_RESETTABLE_STATES = set([
states.INTERRUPTED,
])
UNINTERRUPTIBLE_STATES = set([
states.FAILURE,
states.SUCCESS,
states.PENDING,
])
RUNNABLE_STATES = set([
states.PENDING,
])
def __init__(self, name, parents=None, uuid=None):
self._name = str(name)
# The state of this flow.
self._state = states.PENDING
# If this flow has a parent flow/s which need to be reverted if
# this flow fails then please include them here to allow this child
# to call the parents...
if parents:
self.parents = tuple(parents)
else:
self.parents = tuple([])
# Any objects that want to listen when a wf/task starts/stops/completes
# or errors should be registered here. This can be used to monitor
# progress and record tasks finishing (so that it becomes possible to
# store the result of a task in some persistent or semi-persistent
# storage backend).
self.notifier = utils.TransitionNotifier()
self.task_notifier = utils.TransitionNotifier()
# Assign this flow a unique identifer.
if uuid:
self._id = str(uuid)
else:
self._id = uuidutils.generate_uuid()
# Ensure we can not change the state at the same time in 2 different
# threads.
self._state_lock = threading.RLock()
@property
def name(self):
"""A non-unique name for this flow (human readable)"""
return self._name
@property
def uuid(self):
return self._id
@property
def state(self):
"""Provides a read-only view of the flow state."""
return self._state
def _change_state(self, context, new_state, check_func=None, notify=True):
old_state = None
changed = False
with self._state_lock:
if self.state != new_state:
if (not check_func or
(check_func and check_func(self.state))):
changed = True
old_state = self.state
self._state = new_state
# Don't notify while holding the lock so that the reciever of said
# notifications can actually perform operations on the given flow
# without getting into deadlock.
if notify and changed:
self.notifier.notify(self.state, details={
'context': context,
'flow': self,
'old_state': old_state,
})
return changed
def __str__(self):
lines = ["Flow: %s" % (self.name)]
lines.append("%s" % (self.uuid))
lines.append("%s" % (len(self.parents)))
lines.append("%s" % (self.state))
return "; ".join(lines)
@abc.abstractmethod
def add(self, task):
"""Adds a given task to this flow.
Returns the uuid that is associated with the task for later operations
before and after it is ran.
"""
raise NotImplementedError()
def add_many(self, tasks):
"""Adds many tasks to this flow.
Returns a list of uuids (one for each task added).
"""
uuids = []
for t in tasks:
uuids.append(self.add(t))
return uuids
def interrupt(self):
"""Attempts to interrupt the current flow and any tasks that are
currently not running in the flow.
Returns how many tasks were interrupted (if any).
"""
def check():
if self.state in self.UNINTERRUPTIBLE_STATES:
raise exc.InvalidStateException(("Can not interrupt when"
" in state %s") % self.state)
check()
with self._state_lock:
check()
self._change_state(None, states.INTERRUPTED)
return 0
def reset(self):
"""Fully resets the internal state of this flow, allowing for the flow
to be ran again.
Note: Listeners are also reset.
"""
def check():
if self.state not in self.RESETTABLE_STATES:
raise exc.InvalidStateException(("Can not reset when"
" in state %s") % self.state)
check()
with self._state_lock:
check()
self.notifier.reset()
self.task_notifier.reset()
self._change_state(None, states.PENDING)
def soft_reset(self):
"""Partially resets the internal state of this flow, allowing for the
flow to be ran again from an interrupted state.
"""
def check():
if self.state not in self.SOFT_RESETTABLE_STATES:
raise exc.InvalidStateException(("Can not soft reset when"
" in state %s") % self.state)
check()
with self._state_lock:
check()
self._change_state(None, states.PENDING)
@abc.abstractmethod
def run(self, context, *args, **kwargs):
"""Executes the workflow."""
raise NotImplementedError()
def rollback(self, context, cause):
"""Performs rollback of this workflow and any attached parent workflows
if present.
"""
pass

View File

@ -0,0 +1,95 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 AT&T Labs Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import inspect
from billingstack.taskflow import task as base
# These arguments are ones that we will skip when parsing for requirements
# for a function to operate (when used as a task).
AUTO_ARGS = ('self', 'context', 'cls')
def _filter_arg(arg):
if arg in AUTO_ARGS:
return False
# In certain decorator cases it seems like we get the function to be
# decorated as an argument, we don't want to take that as a real argument.
if not isinstance(arg, basestring):
return False
return True
class FunctorTask(base.Task):
"""Adaptor to make task from a callable
Take any callable and make a task from it.
"""
@staticmethod
def _get_callable_name(execute_with):
"""Generate a name from callable"""
im_class = getattr(execute_with, 'im_class', None)
if im_class is not None:
parts = (im_class.__module__, im_class.__name__,
execute_with.__name__)
else:
parts = (execute_with.__module__, execute_with.__name__)
return '.'.join(parts)
def __init__(self, execute_with, **kwargs):
"""Initialize FunctorTask instance with given callable and kwargs
:param execute_with: the callable
:param kwargs: reserved keywords (all optional) are
name: name of the task, default None (auto generate)
task_id: id of the task, default None (auto generate)
revert_with: the callable to revert, default None
version: version of the task, default Task's version 1.0
optionals: optionals of the task, default ()
provides: provides of the task, default ()
requires: requires of the task, default ()
auto_extract: auto extract execute_with's args and put it into
requires, default True
"""
name = kwargs.pop('name', None)
task_id = kwargs.pop('task_id', None)
if name is None:
name = self._get_callable_name(execute_with)
super(FunctorTask, self).__init__(name, task_id)
self._execute_with = execute_with
self._revert_with = kwargs.pop('revert_with', None)
self.version = kwargs.pop('version', self.version)
self.optional.update(kwargs.pop('optional', ()))
self.provides.update(kwargs.pop('provides', ()))
self.requires.update(kwargs.pop('requires', ()))
if kwargs.pop('auto_extract', True):
f_args = inspect.getargspec(execute_with).args
self.requires.update([arg for arg in f_args if _filter_arg(arg)])
if kwargs:
raise TypeError('__init__() got an unexpected keyword argument %r'
% kwargs.keys[0])
def __call__(self, *args, **kwargs):
return self._execute_with(*args, **kwargs)
def revert(self, *args, **kwargs):
if self._revert_with:
return self._revert_with(*args, **kwargs)
else:
return None

View File

@ -0,0 +1,80 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from billingstack.taskflow import exceptions as exc
LOG = logging.getLogger(__name__)
def connect(graph, infer_key='infer', auto_reason='auto', discard_func=None):
"""Connects a graphs runners to other runners in the graph which provide
outputs for each runners requirements.
"""
if len(graph) == 0:
return
if discard_func:
for (u, v, e_data) in graph.edges(data=True):
if discard_func(u, v, e_data):
graph.remove_edge(u, v)
for (r, r_data) in graph.nodes_iter(data=True):
requires = set(r.requires)
# Find the ones that have already been attached manually.
manual_providers = {}
if requires:
incoming = [e[0] for e in graph.in_edges_iter([r])]
for r2 in incoming:
fulfills = requires & r2.provides
if fulfills:
LOG.debug("%s is a manual provider of %s for %s",
r2, fulfills, r)
for k in fulfills:
manual_providers[k] = r2
requires.remove(k)
# Anything leftover that we must find providers for??
auto_providers = {}
if requires and r_data.get(infer_key):
for r2 in graph.nodes_iter():
if r is r2:
continue
fulfills = requires & r2.provides
if fulfills:
graph.add_edge(r2, r, reason=auto_reason)
LOG.debug("Connecting %s as a automatic provider for"
" %s for %s", r2, fulfills, r)
for k in fulfills:
auto_providers[k] = r2
requires.remove(k)
if not requires:
break
# Anything still leftover??
if requires:
# Ensure its in string format, since join will puke on
# things that are not strings.
missing = ", ".join(sorted([str(s) for s in requires]))
raise exc.MissingDependencies(r, missing)
else:
r.providers = {}
r.providers.update(auto_providers)
r.providers.update(manual_providers)

View File

@ -0,0 +1 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4

View File

@ -0,0 +1,286 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import functools
import logging
import threading
from billingstack.openstack.common import excutils
from billingstack.taskflow import decorators
from billingstack.taskflow import exceptions as exc
from billingstack.taskflow import states
from billingstack.taskflow import utils
from billingstack.taskflow import flow
LOG = logging.getLogger(__name__)
class Flow(flow.Flow):
""""A linear chain of tasks that can be applied in order as one unit and
rolled back as one unit using the reverse order that the tasks have
been applied in.
Note(harlowja): Each task in the chain must have requirements
which are satisfied by the previous task/s in the chain.
"""
def __init__(self, name, parents=None, uuid=None):
super(Flow, self).__init__(name, parents, uuid)
# The tasks which have been applied will be collected here so that they
# can be reverted in the correct order on failure.
self._accumulator = utils.RollbackAccumulator()
# Tasks results are stored here. Lookup is by the uuid that was
# returned from the add function.
self.results = {}
# The previously left off iterator that can be used to resume from
# the last task (if interrupted and soft-reset).
self._leftoff_at = None
# All runners to run are collected here.
self._runners = []
self._connected = False
self._lock = threading.RLock()
# The resumption strategy to use.
self.resumer = None
@decorators.locked
def add(self, task):
"""Adds a given task to this flow."""
assert isinstance(task, collections.Callable)
r = utils.AOTRunner(task)
r.runs_before = list(reversed(self._runners))
self._runners.append(r)
self._reset_internals()
return r.uuid
def _reset_internals(self):
self._connected = False
self._leftoff_at = None
def _associate_providers(self, runner):
# Ensure that some previous task provides this input.
who_provides = {}
task_requires = runner.requires
for r in task_requires:
provider = None
for before_me in runner.runs_before:
if r in before_me.provides:
provider = before_me
break
if provider:
who_provides[r] = provider
# Ensure that the last task provides all the needed input for this
# task to run correctly.
missing_requires = task_requires - set(who_provides.keys())
if missing_requires:
raise exc.MissingDependencies(runner, sorted(missing_requires))
runner.providers.update(who_provides)
def __str__(self):
lines = ["LinearFlow: %s" % (self.name)]
lines.append("%s" % (self.uuid))
lines.append("%s" % (len(self._runners)))
lines.append("%s" % (len(self.parents)))
lines.append("%s" % (self.state))
return "; ".join(lines)
@decorators.locked
def remove(self, uuid):
index_removed = -1
for (i, r) in enumerate(self._runners):
if r.uuid == uuid:
index_removed = i
break
if index_removed == -1:
raise ValueError("No runner found with uuid %s" % (uuid))
else:
removed = self._runners.pop(index_removed)
self._reset_internals()
# Go and remove it from any runner after the removed runner since
# those runners may have had an attachment to it.
for r in self._runners[index_removed:]:
try:
r.runs_before.remove(removed)
except (IndexError, ValueError):
pass
def __len__(self):
return len(self._runners)
def _connect(self):
if self._connected:
return self._runners
for r in self._runners:
r.providers = {}
for r in reversed(self._runners):
self._associate_providers(r)
self._connected = True
return self._runners
def _ordering(self):
return iter(self._connect())
@decorators.locked
def run(self, context, *args, **kwargs):
def abort_if(current_state, ok_states):
if current_state not in ok_states:
return False
return True
def resume_it():
if self._leftoff_at is not None:
return ([], self._leftoff_at)
if self.resumer:
(finished, leftover) = self.resumer(self, self._ordering())
else:
finished = []
leftover = self._ordering()
return (finished, leftover)
start_check_functor = functools.partial(abort_if,
ok_states=self.RUNNABLE_STATES)
if not self._change_state(context, states.STARTED,
check_func=start_check_functor):
return
try:
those_finished, leftover = resume_it()
except Exception:
with excutils.save_and_reraise_exception():
self._change_state(context, states.FAILURE)
def run_it(runner, failed=False, result=None, simulate_run=False):
try:
# Add the task to be rolled back *immediately* so that even if
# the task fails while producing results it will be given a
# chance to rollback.
rb = utils.RollbackTask(context, runner.task, result=None)
self._accumulator.add(rb)
self.task_notifier.notify(states.STARTED, details={
'context': context,
'flow': self,
'runner': runner,
})
if not simulate_run:
result = runner(context, *args, **kwargs)
else:
if failed:
# TODO(harlowja): make this configurable??
# If we previously failed, we want to fail again at
# the same place.
if not result:
# If no exception or exception message was provided
# or captured from the previous run then we need to
# form one for this task.
result = "%s failed running." % (runner.task)
if isinstance(result, basestring):
result = exc.InvalidStateException(result)
if not isinstance(result, Exception):
LOG.warn("Can not raise a non-exception"
" object: %s", result)
result = exc.InvalidStateException()
raise result
# Adjust the task result in the accumulator before
# notifying others that the task has finished to
# avoid the case where a listener might throw an
# exception.
rb.result = result
runner.result = result
self.results[runner.uuid] = result
self.task_notifier.notify(states.SUCCESS, details={
'context': context,
'flow': self,
'runner': runner,
})
except Exception as e:
runner.result = e
cause = utils.FlowFailure(runner, self, e)
with excutils.save_and_reraise_exception():
# Notify any listeners that the task has errored.
self.task_notifier.notify(states.FAILURE, details={
'context': context,
'flow': self,
'runner': runner,
})
self.rollback(context, cause)
run_check_functor = functools.partial(abort_if,
ok_states=[states.STARTED,
states.RESUMING])
if len(those_finished):
if not self._change_state(context, states.RESUMING,
check_func=run_check_functor):
return
for (r, details) in those_finished:
# Fake running the task so that we trigger the same
# notifications and state changes (and rollback that
# would have happened in a normal flow).
failed = states.FAILURE in details.get('states', [])
result = details.get('result')
run_it(r, failed=failed, result=result, simulate_run=True)
self._leftoff_at = leftover
if not self._change_state(context, states.RUNNING,
check_func=run_check_functor):
return
was_interrupted = False
for r in leftover:
r.reset()
run_it(r)
if self.state == states.INTERRUPTED:
was_interrupted = True
break
if not was_interrupted:
# Only gets here if everything went successfully.
self._change_state(context, states.SUCCESS)
self._leftoff_at = None
@decorators.locked
def reset(self):
super(Flow, self).reset()
self.results = {}
self.resumer = None
self._accumulator.reset()
self._reset_internals()
@decorators.locked
def rollback(self, context, cause):
# Performs basic task by task rollback by going through the reverse
# order that tasks have finished and asking said task to undo whatever
# it has done. If this flow has any parent flows then they will
# also be called to rollback any tasks said parents contain.
#
# Note(harlowja): if a flow can more simply revert a whole set of
# tasks via a simpler command then it can override this method to
# accomplish that.
#
# For example, if each task was creating a file in a directory, then
# it's easier to just remove the directory than to ask each task to
# delete its file individually.
self._change_state(context, states.REVERTING)
try:
self._accumulator.rollback(cause)
finally:
self._change_state(context, states.FAILURE)
# Rollback any parents flows if they exist...
for p in self.parents:
p.rollback(context, cause)

View File

@ -0,0 +1,636 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from billingstack.taskflow import exceptions as exc
from billingstack.taskflow import flow
from billingstack.taskflow import graph_utils
from billingstack.taskflow import states
from billingstack.taskflow import utils
import collections
import functools
import logging
import sys
import threading
import weakref
from networkx.algorithms import cycles
from networkx.classes import digraph
LOG = logging.getLogger(__name__)
class DependencyTimeout(exc.InvalidStateException):
"""When running in parallel a task has the ability to timeout waiting for
its dependent tasks to finish, this will be raised when that occurs.
"""
pass
class Flow(flow.Flow):
"""This flow pattern establishes tasks into a graph where each task is a
node in the graph and dependencies between tasks are edges in the graph.
When running (in parallel) each task will only be activated when its
dependencies have been satisified. When a graph is split into two or more
segments, both of those segments will be ran in parallel.
For example lets take this small little *somewhat complicated* graph:
X--Y--C--D
| |
A--B-- --G--
| | |--Z(end)
E--F-- --H--
In this flow the following will be ran in parallel at start:
1. X--Y
2. A--B
3. E--F
Note the C--D nodes will not be able to run until [Y,B,F] has completed.
After C--D completes the following will be ran in parallel:
1. G
2. H
Then finally Z will run (after [G,H] complete) and the flow will then have
finished executing.
"""
MUTABLE_STATES = set([states.PENDING, states.FAILURE, states.SUCCESS])
REVERTABLE_STATES = set([states.FAILURE, states.INCOMPLETE])
CANCELLABLE_STATES = set([states.PENDING, states.RUNNING])
def __init__(self, name):
super(Flow, self).__init__(name)
self._graph = digraph.DiGraph(name=name)
self._run_lock = threading.RLock()
self._cancel_lock = threading.RLock()
self._mutate_lock = threading.RLock()
# NOTE(harlowja) The locking order in this list actually matters since
# we need to make sure that users of this list do not get deadlocked
# by out of order lock access.
self._core_locks = [
self._run_lock,
self._mutate_lock,
self._cancel_lock,
]
self._run_locks = [
self._run_lock,
self._mutate_lock,
]
self._cancel_locks = [
self._cancel_lock,
]
self.results = {}
self.resumer = None
def __str__(self):
lines = ["ParallelFlow: %s" % (self.name)]
lines.append("%s" % (self._graph.number_of_nodes()))
lines.append("%s" % (self.state))
return "; ".join(lines)
def soft_reset(self):
# The way this flow works does not allow (at the current moment) for
# you to suspend the threads and then resume them at a later time,
# instead it only supports interruption (which will cancel the threads)
# and then a full reset.
raise NotImplementedError("Threaded flow does not currently support"
" soft resetting, please try using"
" reset() instead")
def interrupt(self):
"""Currently we can not pause threads and then resume them later, not
really thinking that we should likely ever do this.
"""
raise NotImplementedError("Threaded flow does not currently support"
" interruption, please try using"
" cancel() instead")
def reset(self):
# All locks are used so that resets can not happen while running or
# cancelling or modifying.
with utils.MultiLock(self._core_locks):
super(Flow, self).reset()
self.results = {}
self.resumer = None
def cancel(self):
def check():
if self.state not in self.CANCELLABLE_STATES:
raise exc.InvalidStateException("Can not attempt cancellation"
" when in state %s" %
self.state)
check()
cancelled = 0
was_empty = False
# We don't lock the other locks so that the flow can be cancelled while
# running. Further state management logic is then used while running
# to verify that the flow should still be running when it has been
# cancelled.
with utils.MultiLock(self._cancel_locks):
check()
if len(self._graph) == 0:
was_empty = True
else:
for r in self._graph.nodes_iter():
try:
if r.cancel(blocking=False):
cancelled += 1
except exc.InvalidStateException:
pass
if cancelled or was_empty:
self._change_state(None, states.CANCELLED)
return cancelled
def _find_uuid(self, uuid):
# Finds the runner for the given uuid (or returns none)
for r in self._graph.nodes_iter():
if r.uuid == uuid:
return r
return None
def add(self, task, timeout=None, infer=True):
"""Adds a task to the given flow using the given timeout which will be
used a the timeout to wait for dependencies (if any) to be
fulfilled.
"""
def check():
if self.state not in self.MUTABLE_STATES:
raise exc.InvalidStateException("Flow is currently in a"
" non-mutable %s state" %
(self.state))
# Ensure that we do a quick check to see if we can even perform this
# addition before we go about actually acquiring the lock to perform
# the actual addition.
check()
# All locks must be acquired so that modifications can not be made
# while running, cancelling or performing a simultaneous mutation.
with utils.MultiLock(self._core_locks):
check()
runner = ThreadRunner(task, self, timeout)
self._graph.add_node(runner, infer=infer)
return runner.uuid
def _connect(self):
"""Infers and connects the edges of the given tasks by examining the
associated tasks provides and requires attributes and connecting tasks
that require items to tasks that produce said items.
"""
# Disconnect all edges not manually created before we attempt to infer
# them so that we don't retain edges that are invalid.
def disconnect_non_user(u, v, e_data):
if e_data and e_data.get('reason') != 'manual':
return True
return False
# Link providers to requirers.
graph_utils.connect(self._graph,
discard_func=disconnect_non_user)
# Connect the successors & predecessors and related siblings
for r in self._graph.nodes_iter():
r._predecessors = []
r._successors = []
for (r2, _me) in self._graph.in_edges_iter([r]):
r._predecessors.append(r2)
for (_me, r2) in self._graph.out_edges_iter([r]):
r._successors.append(r2)
r.siblings = []
for r2 in self._graph.nodes_iter():
if r2 is r or r2 in r._predecessors or r2 in r._successors:
continue
r._siblings.append(r2)
def add_many(self, tasks):
"""Adds a list of tasks to the flow."""
def check():
if self.state not in self.MUTABLE_STATES:
raise exc.InvalidStateException("Flow is currently in a"
" non-mutable state %s"
% (self.state))
# Ensure that we do a quick check to see if we can even perform this
# addition before we go about actually acquiring the lock.
check()
# All locks must be acquired so that modifications can not be made
# while running, cancelling or performing a simultaneous mutation.
with utils.MultiLock(self._core_locks):
check()
added = []
for t in tasks:
added.append(self.add(t))
return added
def add_dependency(self, provider_uuid, consumer_uuid):
"""Manually adds a dependency between a provider and a consumer."""
def check_and_fetch():
if self.state not in self.MUTABLE_STATES:
raise exc.InvalidStateException("Flow is currently in a"
" non-mutable state %s"
% (self.state))
provider = self._find_uuid(provider_uuid)
if not provider or not self._graph.has_node(provider):
raise exc.InvalidStateException("Can not add a dependency "
"from unknown uuid %s" %
(provider_uuid))
consumer = self._find_uuid(consumer_uuid)
if not consumer or not self._graph.has_node(consumer):
raise exc.InvalidStateException("Can not add a dependency "
"to unknown uuid %s"
% (consumer_uuid))
if provider is consumer:
raise exc.InvalidStateException("Can not add a dependency "
"to loop via uuid %s"
% (consumer_uuid))
return (provider, consumer)
check_and_fetch()
# All locks must be acquired so that modifications can not be made
# while running, cancelling or performing a simultaneous mutation.
with utils.MultiLock(self._core_locks):
(provider, consumer) = check_and_fetch()
self._graph.add_edge(provider, consumer, reason='manual')
LOG.debug("Connecting %s as a manual provider for %s",
provider, consumer)
def run(self, context, *args, **kwargs):
"""Executes the given flow using the given context and args/kwargs."""
def abort_if(current_state, ok_states):
if current_state in (states.CANCELLED,):
return False
if current_state not in ok_states:
return False
return True
def check():
if self.state not in self.RUNNABLE_STATES:
raise exc.InvalidStateException("Flow is currently unable "
"to be ran in state %s"
% (self.state))
def connect_and_verify():
"""Do basic sanity tests on the graph structure."""
if len(self._graph) == 0:
return
self._connect()
degrees = [g[1] for g in self._graph.in_degree_iter()]
zero_degrees = [d for d in degrees if d == 0]
if not zero_degrees:
# If every task depends on something else to produce its input
# then we will be in a deadlock situation.
raise exc.InvalidStateException("No task has an in-degree"
" of zero")
self_loops = self._graph.nodes_with_selfloops()
if self_loops:
# A task that has a dependency on itself will never be able
# to run.
raise exc.InvalidStateException("%s tasks have been detected"
" with dependencies on"
" themselves" %
len(self_loops))
simple_cycles = len(cycles.recursive_simple_cycles(self._graph))
if simple_cycles:
# A task loop will never be able to run, unless it somehow
# breaks that loop.
raise exc.InvalidStateException("%s tasks have been detected"
" with dependency loops" %
simple_cycles)
def run_it(result_cb, args, kwargs):
check_runnable = functools.partial(abort_if,
ok_states=self.RUNNABLE_STATES)
if self._change_state(context, states.RUNNING,
check_func=check_runnable):
self.results = {}
if len(self._graph) == 0:
return
for r in self._graph.nodes_iter():
r.reset()
r._result_cb = result_cb
executor = utils.ThreadGroupExecutor()
for r in self._graph.nodes_iter():
executor.submit(r, *args, **kwargs)
executor.await_termination()
def trigger_rollback(failures):
if not failures:
return
causes = []
for r in failures:
causes.append(utils.FlowFailure(r, self,
r.exc, r.exc_info))
try:
self.rollback(context, causes)
except exc.InvalidStateException:
pass
finally:
# TODO(harlowja): re-raise a combined exception when
# there are more than one failures??
for f in failures:
if all(f.exc_info):
raise f.exc_info[0], f.exc_info[1], f.exc_info[2]
def handle_results():
# Isolate each runner state into groups so that we can easily tell
# which ones failed, cancelled, completed...
groups = collections.defaultdict(list)
for r in self._graph.nodes_iter():
groups[r.state].append(r)
for r in self._graph.nodes_iter():
if r not in groups.get(states.FAILURE, []) and r.has_ran():
self.results[r.uuid] = r.result
if groups[states.FAILURE]:
self._change_state(context, states.FAILURE)
trigger_rollback(groups[states.FAILURE])
elif (groups[states.CANCELLED] or groups[states.PENDING]
or groups[states.TIMED_OUT] or groups[states.STARTED]):
self._change_state(context, states.INCOMPLETE)
else:
check_ran = functools.partial(abort_if,
ok_states=[states.RUNNING])
self._change_state(context, states.SUCCESS,
check_func=check_ran)
def get_resumer_cb():
if not self.resumer:
return None
(ran, _others) = self.resumer(self, self._graph.nodes_iter())
def fetch_results(runner):
for (r, metadata) in ran:
if r is runner:
return (True, metadata.get('result'))
return (False, None)
result_cb = fetch_results
return result_cb
args = [context] + list(args)
check()
# Only acquire the run lock (but use further state checking) and the
# mutation lock to stop simultaneous running and simultaneous mutating
# which are not allowed on a running flow. Allow simultaneous cancel
# by performing repeated state checking while running.
with utils.MultiLock(self._run_locks):
check()
connect_and_verify()
try:
run_it(get_resumer_cb(), args, kwargs)
finally:
handle_results()
def rollback(self, context, cause):
"""Rolls back all tasks that are *not* still pending or cancelled."""
def check():
if self.state not in self.REVERTABLE_STATES:
raise exc.InvalidStateException("Flow is currently unable "
"to be rolled back in "
"state %s" % (self.state))
check()
# All locks must be acquired so that modifications can not be made
# while another entity is running, rolling-back, cancelling or
# performing a mutation operation.
with utils.MultiLock(self._core_locks):
check()
accum = utils.RollbackAccumulator()
for r in self._graph.nodes_iter():
if r.has_ran():
accum.add(utils.RollbackTask(context, r.task, r.result))
try:
self._change_state(context, states.REVERTING)
accum.rollback(cause)
finally:
self._change_state(context, states.FAILURE)
class ThreadRunner(utils.Runner):
"""A helper class that will use a countdown latch to avoid calling its
callable object until said countdown latch has emptied. After it has
been emptied the predecessor tasks will be examined for dependent results
and said results will then be provided to call the runners callable
object.
TODO(harlowja): this could be a 'future' like object in the future since it
is starting to have the same purpose and usage (in a way). Likely switch
this over to the task details object or a subclass of it???
"""
RESETTABLE_STATES = set([states.PENDING, states.SUCCESS, states.FAILURE,
states.CANCELLED])
RUNNABLE_STATES = set([states.PENDING])
CANCELABLE_STATES = set([states.PENDING])
SUCCESS_STATES = set([states.SUCCESS])
CANCEL_SUCCESSORS_WHEN = set([states.FAILURE, states.CANCELLED,
states.TIMED_OUT])
NO_RAN_STATES = set([states.CANCELLED, states.PENDING, states.TIMED_OUT,
states.RUNNING])
def __init__(self, task, flow, timeout):
super(ThreadRunner, self).__init__(task)
# Use weak references to give the GC a break.
self._flow = weakref.proxy(flow)
self._notifier = flow.task_notifier
self._timeout = timeout
self._state = states.PENDING
self._run_lock = threading.RLock()
# Use the flows state lock so that state notifications are not sent
# simultaneously for a given flow.
self._state_lock = flow._state_lock
self._cancel_lock = threading.RLock()
self._latch = utils.CountDownLatch()
# Any related family.
self._predecessors = []
self._successors = []
self._siblings = []
# Ensure we capture any exceptions that may have been triggered.
self.exc = None
self.exc_info = (None, None, None)
# This callback will be called before the underlying task is actually
# returned and it should either return a tuple of (has_result, result)
self._result_cb = None
@property
def state(self):
return self._state
def has_ran(self):
if self.state in self.NO_RAN_STATES:
return False
return True
def _change_state(self, context, new_state):
old_state = None
changed = False
with self._state_lock:
if self.state != new_state:
old_state = self.state
self._state = new_state
changed = True
# Don't notify while holding the lock so that the reciever of said
# notifications can actually perform operations on the given runner
# without getting into deadlock.
if changed and self._notifier:
self._notifier.notify(self.state, details={
'context': context,
'flow': self._flow,
'old_state': old_state,
'runner': self,
})
def cancel(self, blocking=True):
def check():
if self.state not in self.CANCELABLE_STATES:
raise exc.InvalidStateException("Runner not in a cancelable"
" state: %s" % (self.state))
# Check before as a quick way out of attempting to acquire the more
# heavy-weight lock. Then acquire the lock (which should not be
# possible if we are currently running) and set the state (if still
# applicable).
check()
acquired = False
cancelled = False
try:
acquired = self._cancel_lock.acquire(blocking=blocking)
if acquired:
check()
cancelled = True
self._change_state(None, states.CANCELLED)
finally:
if acquired:
self._cancel_lock.release()
return cancelled
def reset(self):
def check():
if self.state not in self.RESETTABLE_STATES:
raise exc.InvalidStateException("Runner not in a resettable"
" state: %s" % (self.state))
def do_reset():
self._latch.count = len(self._predecessors)
self.exc = None
self.exc_info = (None, None, None)
self.result = None
self._change_state(None, states.PENDING)
# We need to acquire both locks here so that we can not be running
# or being cancelled at the same time we are resetting.
check()
with self._run_lock:
check()
with self._cancel_lock:
check()
do_reset()
@property
def runs_before(self):
# NOTE(harlowja): this list may change, depending on which other
# runners have completed (or are currently actively running), so
# this is why this is a property instead of a semi-static defined list
# like in the AOT class. The list should only get bigger and not
# smaller so it should be fine to filter on runners that have completed
# successfully.
finished_ok = []
for r in self._siblings:
if r.has_ran() and r.state in self.SUCCESS_STATES:
finished_ok.append(r)
return finished_ok
def __call__(self, context, *args, **kwargs):
def is_runnable():
if self.state not in self.RUNNABLE_STATES:
return False
return True
def run(*args, **kwargs):
try:
self._change_state(context, states.RUNNING)
has_result = False
if self._result_cb:
has_result, self.result = self._result_cb(self)
if not has_result:
super(ThreadRunner, self).__call__(*args, **kwargs)
self._change_state(context, states.SUCCESS)
except Exception as e:
self._change_state(context, states.FAILURE)
self.exc = e
self.exc_info = sys.exc_info()
def signal():
if not self._successors:
return
if self.state in self.CANCEL_SUCCESSORS_WHEN:
for r in self._successors:
try:
r.cancel(blocking=False)
except exc.InvalidStateException:
pass
for r in self._successors:
try:
r._latch.countDown()
except Exception:
LOG.exception("Failed decrementing %s latch", r)
# We check before to avoid attempting to acquire the lock when we are
# known to be in a non-runnable state.
if not is_runnable():
return
args = [context] + list(args)
with self._run_lock:
# We check after we now own the run lock since a previous thread
# could have exited and released that lock and set the state to
# not runnable.
if not is_runnable():
return
may_proceed = self._latch.await(self._timeout)
# We now acquire the cancel lock so that we can be assured that
# we have not been cancelled by another entity.
with self._cancel_lock:
try:
# If we have been cancelled after awaiting and timing out
# ensure that we alter the state to show timed out (but
# not if we have been cancelled, since our state should
# be cancelled instead). This is done after acquiring the
# cancel lock so that we will not try to overwrite another
# entity trying to set the runner to the cancel state.
if not may_proceed and self.state != states.CANCELLED:
self._change_state(context, states.TIMED_OUT)
# We at this point should only have been able to time out
# or be cancelled, no other state transitions should have
# been possible.
if self.state not in (states.CANCELLED, states.TIMED_OUT):
run(*args, **kwargs)
finally:
signal()

View File

@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Job states.
CLAIMED = 'CLAIMED'
FAILURE = 'FAILURE'
PENDING = 'PENDING'
RUNNING = 'RUNNING'
SUCCESS = 'SUCCESS'
UNCLAIMED = 'UNCLAIMED'
# Flow states.
FAILURE = FAILURE
INTERRUPTED = 'INTERRUPTED'
PENDING = 'PENDING'
RESUMING = 'RESUMING'
REVERTING = 'REVERTING'
RUNNING = RUNNING
STARTED = 'STARTED'
SUCCESS = SUCCESS
CANCELLED = 'CANCELLED'
INCOMPLETE = 'INCOMPLETE'
# Task states.
FAILURE = FAILURE
STARTED = STARTED
SUCCESS = SUCCESS
TIMED_OUT = 'TIMED_OUT'
CANCELLED = CANCELLED

View File

@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from billingstack.openstack.common import uuidutils
from billingstack.taskflow import utils
class Task(object):
"""An abstraction that defines a potential piece of work that can be
applied and can be reverted to undo the work as a single unit.
"""
__metaclass__ = abc.ABCMeta
def __init__(self, name, task_id=None):
if task_id:
self._uuid = task_id
else:
self._uuid = uuidutils.generate_uuid()
self._name = name
# An *immutable* input 'resource' name set this task depends
# on existing before this task can be applied.
self.requires = set()
# An *immutable* input 'resource' name set this task would like to
# depends on existing before this task can be applied (but does not
# strongly depend on existing).
self.optional = set()
# An *immutable* output 'resource' name set this task
# produces that other tasks may depend on this task providing.
self.provides = set()
# This identifies the version of the task to be ran which
# can be useful in resuming older versions of tasks. Standard
# major, minor version semantics apply.
self.version = (1, 0)
@property
def uuid(self):
return self._uuid
@property
def name(self):
return self._name
def __str__(self):
return "%s==%s" % (self.name, utils.get_task_version(self))
@abc.abstractmethod
def __call__(self, context, *args, **kwargs):
"""Activate a given task which will perform some operation and return.
This method can be used to apply some given context and given set
of args and kwargs to accomplish some goal. Note that the result
that is returned needs to be serializable so that it can be passed
back into this task if reverting is triggered.
"""
def revert(self, context, result, cause):
"""Revert this task using the given context, result that the apply
provided as well as any information which may have caused
said reversion.
"""

View File

@ -0,0 +1,532 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import contextlib
import copy
import logging
import re
import sys
import threading
import threading2
import time
from billingstack.openstack.common import uuidutils
TASK_FACTORY_ATTRIBUTE = '_TaskFlow_task_factory'
LOG = logging.getLogger(__name__)
def await(check_functor, timeout=None):
if timeout is not None:
end_time = time.time() + max(0, timeout)
else:
end_time = None
# Use the same/similar scheme that the python condition class uses.
delay = 0.0005
while not check_functor():
time.sleep(delay)
if end_time is not None:
remaining = end_time - time.time()
if remaining <= 0:
return False
delay = min(delay * 2, remaining, 0.05)
else:
delay = min(delay * 2, 0.05)
return True
def get_task_version(task):
"""Gets a tasks *string* version, whether it is a task object/function."""
task_version = getattr(task, 'version')
if isinstance(task_version, (list, tuple)):
task_version = '.'.join(str(item) for item in task_version)
if task_version is not None and not isinstance(task_version, basestring):
task_version = str(task_version)
return task_version
def is_version_compatible(version_1, version_2):
"""Checks for major version compatibility of two *string" versions."""
if version_1 == version_2:
# Equivalent exactly, so skip the rest.
return True
def _convert_to_pieces(version):
try:
pieces = []
for p in version.split("."):
p = p.strip()
if not len(p):
pieces.append(0)
continue
# Clean off things like 1alpha, or 2b and just select the
# digit that starts that entry instead.
p_match = re.match(r"(\d+)([A-Za-z]*)(.*)", p)
if p_match:
p = p_match.group(1)
pieces.append(int(p))
except (AttributeError, TypeError, ValueError):
pieces = []
return pieces
version_1_pieces = _convert_to_pieces(version_1)
version_2_pieces = _convert_to_pieces(version_2)
if len(version_1_pieces) == 0 or len(version_2_pieces) == 0:
return False
# Ensure major version compatibility to start.
major1 = version_1_pieces[0]
major2 = version_2_pieces[0]
if major1 != major2:
return False
return True
class MultiLock(object):
"""A class which can attempt to obtain many locks at once and release
said locks when exiting.
Useful as a context manager around many locks (instead of having to nest
said individual context managers).
"""
def __init__(self, locks):
assert len(locks) > 0, "Zero locks requested"
self._locks = locks
self._locked = [False] * len(locks)
def __enter__(self):
def is_locked(lock):
# NOTE(harlowja): the threading2 lock doesn't seem to have this
# attribute, so thats why we are checking it existing first.
if hasattr(lock, 'locked'):
return lock.locked()
return False
for i in xrange(0, len(self._locked)):
if self._locked[i] or is_locked(self._locks[i]):
raise threading.ThreadError("Lock %s not previously released"
% (i + 1))
self._locked[i] = False
for (i, lock) in enumerate(self._locks):
self._locked[i] = lock.acquire()
def __exit__(self, type, value, traceback):
for (i, locked) in enumerate(self._locked):
try:
if locked:
self._locks[i].release()
self._locked[i] = False
except threading.ThreadError:
LOG.exception("Unable to release lock %s", i + 1)
class CountDownLatch(object):
"""Similar in concept to the java count down latch."""
def __init__(self, count=0):
self.count = count
self.lock = threading.Condition()
def countDown(self):
with self.lock:
self.count -= 1
if self.count <= 0:
self.lock.notifyAll()
def await(self, timeout=None):
end_time = None
if timeout is not None:
timeout = max(0, timeout)
end_time = time.time() + timeout
time_up = False
with self.lock:
while True:
# Stop waiting on these 2 conditions.
if time_up or self.count <= 0:
break
# Was this a spurious wakeup or did we really end??
self.lock.wait(timeout=timeout)
if end_time is not None:
if time.time() >= end_time:
time_up = True
else:
# Reduce the timeout so that we don't wait extra time
# over what we initially were requested to.
timeout = end_time - time.time()
return self.count <= 0
class LastFedIter(object):
"""An iterator which yields back the first item and then yields back
results from the provided iterator.
"""
def __init__(self, first, rest_itr):
self.first = first
self.rest_itr = rest_itr
def __iter__(self):
yield self.first
for i in self.rest_itr:
yield i
class ThreadGroupExecutor(object):
"""A simple thread executor that spins up new threads (or greenthreads) for
each task to be completed (no pool limit is enforced).
TODO(harlowja): Likely if we use the more advanced executors that come with
the concurrent.futures library we can just get rid of this.
"""
def __init__(self, daemonize=True):
self._threads = []
self._group = threading2.ThreadGroup()
self._daemonize = daemonize
def submit(self, fn, *args, **kwargs):
t = threading2.Thread(target=fn, group=self._group,
args=args, kwargs=kwargs)
t.daemon = self._daemonize
self._threads.append(t)
t.start()
def await_termination(self, timeout=None):
if not self._threads:
return
return self._group.join(timeout)
class FlowFailure(object):
"""When a task failure occurs the following object will be given to revert
and can be used to interrogate what caused the failure.
"""
def __init__(self, runner, flow, exc, exc_info=None):
self.runner = runner
self.flow = flow
self.exc = exc
if not exc_info:
self.exc_info = sys.exc_info()
else:
self.exc_info = exc_info
class RollbackTask(object):
"""A helper task that on being called will call the underlying callable
tasks revert method (if said method exists).
"""
def __init__(self, context, task, result):
self.task = task
self.result = result
self.context = context
def __str__(self):
return str(self.task)
def __call__(self, cause):
if ((hasattr(self.task, "revert") and
isinstance(self.task.revert, collections.Callable))):
self.task.revert(self.context, self.result, cause)
class Runner(object):
"""A helper class that wraps a task and can find the needed inputs for
the task to run, as well as providing a uuid and other useful functionality
for users of the task.
TODO(harlowja): replace with the task details object or a subclass of
that???
"""
def __init__(self, task, uuid=None):
assert isinstance(task, collections.Callable)
task_factory = getattr(task, TASK_FACTORY_ATTRIBUTE, None)
if task_factory:
self.task = task_factory(task)
else:
self.task = task
self.providers = {}
self.result = None
if not uuid:
self._id = uuidutils.generate_uuid()
else:
self._id = str(uuid)
@property
def uuid(self):
return str(self._id)
@property
def requires(self):
return self.task.requires
@property
def provides(self):
return self.task.provides
@property
def optional(self):
return self.task.optional
@property
def runs_before(self):
return []
@property
def version(self):
return get_task_version(self.task)
@property
def name(self):
return self.task.name
def reset(self):
self.result = None
def __str__(self):
lines = ["Runner: %s" % (self.name)]
lines.append("%s" % (self.uuid))
lines.append("%s" % (self.version))
return "; ".join(lines)
def __call__(self, *args, **kwargs):
# Find all of our inputs first.
kwargs = dict(kwargs)
for (k, who_made) in self.providers.iteritems():
if k in kwargs:
continue
try:
kwargs[k] = who_made.result[k]
except (TypeError, KeyError):
pass
optional_keys = self.optional
optional_keys = optional_keys - set(kwargs.keys())
for k in optional_keys:
for who_ran in self.runs_before:
matched = False
if k in who_ran.provides:
try:
kwargs[k] = who_ran.result[k]
matched = True
except (TypeError, KeyError):
pass
if matched:
break
# Ensure all required keys are either existent or set to none.
for k in self.requires:
if k not in kwargs:
kwargs[k] = None
# And now finally run.
self.result = self.task(*args, **kwargs)
return self.result
class AOTRunner(Runner):
"""A runner that knows who runs before this runner ahead of time from a
known list of previous runners.
"""
def __init__(self, task):
super(AOTRunner, self).__init__(task)
self._runs_before = []
@property
def runs_before(self):
return self._runs_before
@runs_before.setter
def runs_before(self, runs_before):
self._runs_before = list(runs_before)
class TransitionNotifier(object):
"""A utility helper class that can be used to subscribe to
notifications of events occuring as well as allow a entity to post said
notifications to subscribers.
"""
RESERVED_KEYS = ('details',)
ANY = '*'
def __init__(self):
self._listeners = collections.defaultdict(list)
def reset(self):
self._listeners = collections.defaultdict(list)
def notify(self, state, details):
listeners = list(self._listeners.get(self.ANY, []))
for i in self._listeners[state]:
if i not in listeners:
listeners.append(i)
if not listeners:
return
for (callback, args, kwargs) in listeners:
if args is None:
args = []
if kwargs is None:
kwargs = {}
kwargs['details'] = details
try:
callback(state, *args, **kwargs)
except Exception:
LOG.exception(("Failure calling callback %s to notify about"
" state transition %s"), callback, state)
def register(self, state, callback, args=None, kwargs=None):
assert isinstance(callback, collections.Callable)
for i, (cb, args, kwargs) in enumerate(self._listeners.get(state, [])):
if cb is callback:
raise ValueError("Callback %s already registered" % (callback))
if kwargs:
for k in self.RESERVED_KEYS:
if k in kwargs:
raise KeyError(("Reserved key '%s' not allowed in "
"kwargs") % k)
kwargs = copy.copy(kwargs)
if args:
args = copy.copy(args)
self._listeners[state].append((callback, args, kwargs))
def deregister(self, state, callback):
if state not in self._listeners:
return
for i, (cb, args, kwargs) in enumerate(self._listeners[state]):
if cb is callback:
self._listeners[state].pop(i)
break
class RollbackAccumulator(object):
"""A utility class that can help in organizing 'undo' like code
so that said code be rolled back on failure (automatically or manually)
by activating rollback callables that were inserted during said codes
progression.
"""
def __init__(self):
self._rollbacks = []
def add(self, *callables):
self._rollbacks.extend(callables)
def reset(self):
self._rollbacks = []
def __len__(self):
return len(self._rollbacks)
def __enter__(self):
return self
def rollback(self, cause):
LOG.warn("Activating %s rollbacks due to %s.", len(self), cause)
for (i, f) in enumerate(reversed(self._rollbacks)):
LOG.debug("Calling rollback %s: %s", i + 1, f)
try:
f(cause)
except Exception:
LOG.exception(("Failed rolling back %s: %s due "
"to inner exception."), i + 1, f)
def __exit__(self, type, value, tb):
if any((value, type, tb)):
self.rollback(value)
class ReaderWriterLock(object):
"""A simple reader-writer lock.
Several readers can hold the lock simultaneously, and only one writer.
Write locks have priority over reads to prevent write starvation.
Public domain @ http://majid.info/blog/a-reader-writer-lock-for-python/
"""
def __init__(self):
self.rwlock = 0
self.writers_waiting = 0
self.monitor = threading.Lock()
self.readers_ok = threading.Condition(self.monitor)
self.writers_ok = threading.Condition(self.monitor)
@contextlib.contextmanager
def acquire(self, read=True):
"""Acquire a read or write lock in a context manager."""
try:
if read:
self.acquire_read()
else:
self.acquire_write()
yield self
finally:
self.release()
def acquire_read(self):
"""Acquire a read lock.
Several threads can hold this typeof lock.
It is exclusive with write locks.
"""
self.monitor.acquire()
while self.rwlock < 0 or self.writers_waiting:
self.readers_ok.wait()
self.rwlock += 1
self.monitor.release()
def acquire_write(self):
"""Acquire a write lock.
Only one thread can hold this lock, and only when no read locks
are also held.
"""
self.monitor.acquire()
while self.rwlock != 0:
self.writers_waiting += 1
self.writers_ok.wait()
self.writers_waiting -= 1
self.rwlock = -1
self.monitor.release()
def release(self):
"""Release a lock, whether read or write."""
self.monitor.acquire()
if self.rwlock < 0:
self.rwlock = 0
else:
self.rwlock -= 1
wake_writers = self.writers_waiting and self.rwlock == 0
wake_readers = self.writers_waiting == 0
self.monitor.release()
if wake_writers:
self.writers_ok.acquire()
self.writers_ok.notify()
self.writers_ok.release()
elif wake_readers:
self.readers_ok.acquire()
self.readers_ok.notifyAll()
self.readers_ok.release()

85
billingstack/tasks.py Normal file
View File

@ -0,0 +1,85 @@
# -*- encoding: utf-8 -*-
#
# Author: Endre Karlson <endre.karlson@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from billingstack.openstack.common import log
from billingstack.openstack.common.gettextutils import _
from billingstack.taskflow import task
LOG = log.getLogger(__name__)
def _make_task_name(cls, prefix="default", addons=None):
components = [cls.__module__, cls.__name__]
if addons:
for a in addons:
components.append(str(a))
return "%s:%s" % (prefix, ".".join(components))
def _attach_debug_listeners(flow):
"""Sets up a nice set of debug listeners for the flow.
These listeners will log when tasks/flows are transitioning from state to
state so that said states can be seen in the debug log output which is very
useful for figuring out where problems are occuring.
"""
def flow_log_change(state, details):
LOG.debug(_("%(flow)s has moved into state %(state)s from state"
" %(old_state)s") % {'state': state,
'old_state': details.get('old_state'),
'flow': details['flow']})
def task_log_change(state, details):
LOG.debug(_("%(flow)s has moved %(runner)s into state %(state)s with"
" result: %(result)s") % {'state': state,
'flow': details['flow'],
'runner': details['runner'],
'result': details.get('result')})
# Register * for all state changes (and not selective state changes to be
# called upon) since all the changes is more useful.
flow.notifier.register('*', flow_log_change)
flow.task_notifier.register('*', task_log_change)
return flow
class RootTask(task.Task):
def __init__(self, name=None, **kw):
name = name or _make_task_name(self.__class__, **kw)
super(RootTask, self).__init__(name)
class ValuesInjectTask(RootTask):
"""
This injects a dict into the flow.
This injection is done so that the keys (and values) provided can be
dependended on by tasks further down the line. Since taskflow is dependency
based this can be considered the bootstrapping task that provides an
initial set of values for other tasks to get started with. If this did not
exist then tasks would fail locating there dependent tasks and the values
said dependent tasks produce.
Reversion strategy: N/A
"""
def __init__(self, values, **kw):
super(ValuesInjectTask, self).__init__(**kw)
self.provides.update(values.keys())
self._values = values
def __call__(self, context):
return dict(self._values)

7
taskflow.conf Normal file
View File

@ -0,0 +1,7 @@
[DEFAULT]
# The list of primitives to copy from taskflow
primitives=flow.threaded_flow,flow.linear_flow,task
# The base module to hold the copy of taskflow
base=billingstack

View File

@ -16,3 +16,6 @@ Routes>=1.12.3
stevedore>=0.10
WebOb>=1.2.3,<1.3
https://github.com/stackforge/wsme/archive/master.zip#egg=WSME
# Taskflow
threading2
networkx