Bring code up to speed.

* Use taskflow as a library
* Move requires to root
* Fix git path
* Update oslo

Change-Id: Iae8329a639e26881fbc3286479a429ae75149493
This commit is contained in:
Endre Karlson 2013-10-16 20:02:31 +02:00
parent e9d2aac34b
commit cd8f0b1e8e
56 changed files with 778 additions and 3252 deletions

View File

@ -5,7 +5,7 @@
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# not use this file except in co68mpliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
@ -15,9 +15,10 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow.patterns import linear_flow
from billingstack import tasks
from billingstack.openstack.common import log
from billingstack.taskflow.patterns import linear_flow
ACTION = 'merchant:create'
@ -27,23 +28,16 @@ LOG = log.getLogger(__name__)
class EntryCreateTask(tasks.RootTask):
def __init__(self, storage, **kw):
super(EntryCreateTask, self).__init__(**kw)
self.requires.update(['merchant'])
self.provides.update(['merchant'])
self.storage = storage
def __call__(self, context, merchant):
values = self.storage.create_merchant(context, merchant)
return {'merchant': values}
def execute(self, ctxt, values):
return self.storage.create_merchant(ctxt, values)
def create_flow(storage, values):
def create_flow(storage):
flow = linear_flow.Flow(ACTION)
flow.add(tasks.ValuesInjectTask(
{'merchant': values},
prefix=ACTION + ':initial'))
entry_task = EntryCreateTask(storage, provides='merchant', prefix=ACTION)
flow.add(entry_task)
entry_task = EntryCreateTask(storage, prefix=ACTION)
entry_task_id = flow.add(entry_task)
return entry_task_id, tasks._attach_debug_listeners(flow)
return flow

View File

@ -16,6 +16,9 @@
import sys
from oslo.config import cfg
from taskflow.engines import run as run_flow
from billingstack.openstack.common import log as logging
from billingstack.openstack.common.rpc import service as rpc_service
from billingstack.openstack.common import service as os_service
@ -103,9 +106,10 @@ class Service(rpc_service.Service):
# Merchant
def create_merchant(self, ctxt, values):
id_, flow = merchant.create_flow(self.storage_conn, values)
flow.run(ctxt)
return flow.results[id_]['merchant']
flow = merchant.create_flow(self.storage_conn)
result = run_flow(flow, engine_conf="parallel",
store={'values': values, 'ctxt': ctxt})
return result['merchant']
def list_merchants(self, ctxt, **kw):
return self.storage_conn.list_merchants(ctxt, **kw)

View File

@ -15,12 +15,13 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow.patterns import linear_flow
from billingstack import exceptions
from billingstack import tasks
from billingstack.collector import states
from billingstack.openstack.common import log
from billingstack.payment_gateway import get_provider
from billingstack.taskflow.patterns import linear_flow, threaded_flow
ACTION = 'gateway_configuration:create'
@ -31,32 +32,11 @@ LOG = log.getLogger(__name__)
class EntryCreateTask(tasks.RootTask):
def __init__(self, storage, **kw):
super(EntryCreateTask, self).__init__(**kw)
self.requires.update(['gateway_config'])
self.provides.update(['gateway_config'])
self.storage = storage
def __call__(self, context, gateway_config):
gateway_config['state'] = states.VERIFYING
values = self.storage.create_pg_config(context, gateway_config)
return {'gateway_config': values}
class ThreadStartTask(tasks.RootTask):
"""
This is the end of the current flow, we'll fire off a new threaded flow
that does stuff towards the actual Gateway which may include blocking code.
"""
def __init__(self, storage, **kw):
super(ThreadStartTask, self).__init__(**kw)
self.requires.update(['gateway_config'])
self.storage = storage
def __call__(self, ctxt, gateway_config):
flow = threaded_flow.Flow(ACTION + ':backend')
flow.add(tasks.ValuesInjectTask({'gateway_config': gateway_config}))
flow.add(PrerequirementsTask(self.storage))
flow.add(BackendVerifyTask(self.storage))
flow.run(ctxt)
def execute(self, ctxt, values):
values['state'] = states.VERIFYING
return self.storage.create_pg_config(ctxt, values)
class PrerequirementsTask(tasks.RootTask):
@ -65,20 +45,11 @@ class PrerequirementsTask(tasks.RootTask):
"""
def __init__(self, storage, **kw):
super(PrerequirementsTask, self).__init__(**kw)
self.requires.update(['gateway_config'])
self.provides.update([
'gateway_config',
'gateway_provider'
])
self.storage = storage
def __call__(self, ctxt, gateway_config):
gateway_provider = self.storage.get_pg_provider(
gateway_config['providedr_id'])
return {
'gateway_config': gateway_config,
'gateway_provider': gateway_provider
}
def execute(self, ctxt, gateway_config):
return self.storage.get_pg_provider(
ctxt, gateway_config['provider_id'])
class BackendVerifyTask(tasks.RootTask):
@ -92,11 +63,10 @@ class BackendVerifyTask(tasks.RootTask):
"""
def __init__(self, storage, **kw):
super(BackendVerifyTask, self).__init__(**kw)
self.requires.update(['gateway_config', 'gateway_provider'])
self.storage = storage
def __call__(self, ctxt, gateway_config, gateway_provider):
gateway_provider_cls = get_provider[gateway_provider['name']]
def execute(self, ctxt, gateway_config, gateway_provider):
gateway_provider_cls = get_provider(gateway_provider['name'])
gateway_provider_obj = gateway_provider_cls(gateway_config)
try:
@ -109,14 +79,19 @@ class BackendVerifyTask(tasks.RootTask):
ctxt, gateway_config['id'], {'state': states.ACTIVE})
def create_flow(storage, values):
flow = linear_flow.Flow(ACTION)
def create_flow(storage):
flow = linear_flow.Flow(ACTION + ':initial')
flow.add(tasks.ValuesInjectTask(
{'gateway_config': values},
prefix=ACTION + ':initial'))
entry_task = EntryCreateTask(
storage, provides='gateway_config', prefix=ACTION)
flow.add(entry_task)
entry_task = EntryCreateTask(storage, prefix=ACTION)
entry_task_id = flow.add(entry_task)
backend_flow = linear_flow.Flow(ACTION + ':backend')
prereq_task = PrerequirementsTask(
storage, provides='gateway_provider', prefix=ACTION)
backend_flow.add(prereq_task)
backend_flow.add(BackendVerifyTask(storage, prefix=ACTION))
return entry_task_id, tasks._attach_debug_listeners(flow)
flow.add(backend_flow)
return flow

View File

@ -15,12 +15,13 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow.patterns import linear_flow
from billingstack import exceptions
from billingstack import tasks
from billingstack.collector import states
from billingstack.openstack.common import log
from billingstack.payment_gateway import get_provider
from billingstack.taskflow.patterns import linear_flow, threaded_flow
ACTION = 'payment_method:create'
@ -34,34 +35,11 @@ class EntryCreateTask(tasks.RootTask):
"""
def __init__(self, storage, **kw):
super(EntryCreateTask, self).__init__(**kw)
self.requires.update(['payment_method'])
self.provides.update(['payment_method'])
self.storage = storage
def __call__(self, ctxt, payment_method):
payment_method['state'] = states.PENDING
values = self.storage.create_payment_method(ctxt, payment_method)
return {'payment_method': values}
class ThreadStartTask(tasks.RootTask):
"""
This is the end of the current flow, we'll fire off a new threaded flow
that does stuff towards the actual Gateway which may include blocking code.
This fires off a new flow that is threaded / greenthreads?
"""
def __init__(self, storage, **kw):
super(ThreadStartTask, self).__init__(**kw)
self.requires.update(['payment_method'])
self.storage = storage
def __call__(self, ctxt, payment_method):
flow = threaded_flow.Flow(ACTION + ':backend')
flow.add(tasks.ValuesInjectTask({'payment_method': payment_method}))
flow.add(PrerequirementsTask(self.storage))
flow.add(BackendCreateTask(self.storage))
flow.run(ctxt)
def execute(self, ctxt, values):
values['state'] = states.PENDING
return self.storage.create_payment_method(ctxt, values)
class PrerequirementsTask(tasks.RootTask):
@ -70,33 +48,23 @@ class PrerequirementsTask(tasks.RootTask):
"""
def __init__(self, storage, **kw):
super(PrerequirementsTask, self).__init__(**kw)
self.requires.update(['payment_method'])
self.provides.update([
'payment_method',
'gateway_config',
'gateway_provider'])
self.storage = storage
def __call__(self, ctxt, **kw):
kw['gateway_config'] = self.storage.get_pg_config(
ctxt, kw['payment_method']['provider_config_id'])
kw['gateway_provider'] = self.storage.get_pg_provider(
ctxt, kw['gateway_config']['provider_id'])
return kw
def execute(self, ctxt, values):
data = {}
data['gateway_config'] = self.storage.get_pg_config(
ctxt, values['provider_config_id'])
data['gateway_provider'] = self.storage.get_pg_provider(
ctxt, data['gateway_config']['provider_id'])
return data
class BackendCreateTask(tasks.RootTask):
def __init__(self, storage, **kw):
super(BackendCreateTask, self).__init__(**kw)
self.requires.update([
'payment_method',
'gateway_config',
'gateway_provider'])
self.storage = storage
def __call__(self, ctxt, payment_method, gateway_config, gateway_provider):
def execute(self, ctxt, payment_method, gateway_config, gateway_provider):
gateway_provider_cls = get_provider(gateway_provider['name'])
gateway_provider_obj = gateway_provider_cls(gateway_config)
@ -110,19 +78,26 @@ class BackendCreateTask(tasks.RootTask):
raise
def create_flow(storage, payment_method):
def create_flow(storage):
"""
The flow for the service to start
"""
flow = linear_flow.Flow(ACTION + ':initial')
flow.add(tasks.ValuesInjectTask(
{'payment_method': payment_method},
prefix=ACTION))
entry_task = EntryCreateTask(storage, provides='payment_method',
prefix=ACTION)
flow.add(entry_task)
entry_task = EntryCreateTask(storage, prefix=ACTION)
entry_task_id = flow.add(entry_task)
backend_flow = linear_flow.Flow(ACTION + ':backend')
prereq_task = PrerequirementsTask(
storage,
provides=set([
'gateway_config',
'gateway_provider']),
prefix=ACTION)
backend_flow.add(prereq_task)
backend_flow.add(BackendCreateTask(storage, prefix=ACTION))
flow.add(ThreadStartTask(storage, prefix=ACTION))
flow.add(backend_flow)
return entry_task_id, tasks._attach_debug_listeners(flow)
return flow

View File

@ -20,6 +20,8 @@ A service that does calls towards the PGP web endpoint or so
import sys
from oslo.config import cfg
from taskflow.engines import run as run_flow
from billingstack.openstack.common import log as logging
from billingstack.openstack.common.rpc import service as rpc_service
from billingstack.openstack.common import service as os_service
@ -64,10 +66,9 @@ class Service(rpc_service.Service):
# PGC
def create_pg_config(self, ctxt, values):
id_, flow = gateway_configuration.create_flow(
self.storage_conn, values)
flow.run(ctxt)
return flow.results[id_]['gateway_config']
flow = gateway_configuration.create_flow(self.storage_conn)
results = run_flow(flow, store={'values': values, 'ctxt': ctxt})
return results['gateway_config']
def list_pg_configs(self, ctxt, **kw):
return self.storage_conn.list_pg_configs(ctxt, **kw)
@ -83,10 +84,9 @@ class Service(rpc_service.Service):
# PM
def create_payment_method(self, ctxt, values):
id_, flow = payment_method.create_flow(
self.storage_conn, values)
flow.run(ctxt)
return flow.results[id_]['payment_method']
flow = payment_method.create_flow(self.storage_conn)
results = run_flow(flow, store={'values': values, 'ctxt': ctxt})
return results['payment_method']
def list_payment_methods(self, ctxt, **kw):
return self.storage_conn.list_payment_methods(ctxt, **kw)

View File

@ -40,13 +40,15 @@ class RequestContext(object):
"""
def __init__(self, auth_token=None, user=None, tenant=None, is_admin=False,
read_only=False, show_deleted=False, request_id=None):
read_only=False, show_deleted=False, request_id=None,
instance_uuid=None):
self.auth_token = auth_token
self.user = user
self.tenant = tenant
self.is_admin = is_admin
self.read_only = read_only
self.show_deleted = show_deleted
self.instance_uuid = instance_uuid
if not request_id:
request_id = generate_request_id()
self.request_id = request_id
@ -58,7 +60,8 @@ class RequestContext(object):
'read_only': self.read_only,
'show_deleted': self.show_deleted,
'auth_token': self.auth_token,
'request_id': self.request_id}
'request_id': self.request_id,
'instance_uuid': self.instance_uuid}
def get_admin_context(show_deleted=False):

View File

@ -24,6 +24,8 @@ import sys
import time
import traceback
import six
from billingstack.openstack.common.gettextutils import _ # noqa
@ -65,7 +67,7 @@ class save_and_reraise_exception(object):
self.tb))
return False
if self.reraise:
raise self.type_, self.value, self.tb
six.reraise(self.type_, self.value, self.tb)
def forever_retry_uncaught_exceptions(infunc):
@ -77,7 +79,7 @@ def forever_retry_uncaught_exceptions(infunc):
try:
return infunc(*args, **kwargs)
except Exception as exc:
this_exc_message = unicode(exc)
this_exc_message = six.u(str(exc))
if this_exc_message == last_exc_message:
exc_count += 1
else:

View File

@ -19,6 +19,7 @@
import contextlib
import errno
import os
import tempfile
from billingstack.openstack.common import excutils
from billingstack.openstack.common.gettextutils import _ # noqa
@ -69,33 +70,34 @@ def read_cached_file(filename, force_reload=False):
return (reloaded, cache_info['data'])
def delete_if_exists(path):
def delete_if_exists(path, remove=os.unlink):
"""Delete a file, but ignore file not found error.
:param path: File to delete
:param remove: Optional function to remove passed path
"""
try:
os.unlink(path)
remove(path)
except OSError as e:
if e.errno == errno.ENOENT:
return
else:
if e.errno != errno.ENOENT:
raise
@contextlib.contextmanager
def remove_path_on_error(path):
def remove_path_on_error(path, remove=delete_if_exists):
"""Protect code that wants to operate on PATH atomically.
Any exception will cause PATH to be removed.
:param path: File to work with
:param remove: Optional function to remove passed path
"""
try:
yield
except Exception:
with excutils.save_and_reraise_exception():
delete_if_exists(path)
remove(path)
def file_open(*args, **kwargs):
@ -108,3 +110,30 @@ def file_open(*args, **kwargs):
state at all (for unit tests)
"""
return file(*args, **kwargs)
def write_to_tempfile(content, path=None, suffix='', prefix='tmp'):
"""Create temporary file or use existing file.
This util is needed for creating temporary file with
specified content, suffix and prefix. If path is not None,
it will be used for writing content. If the path doesn't
exist it'll be created.
:param content: content for temporary file.
:param path: same as parameter 'dir' for mkstemp
:param suffix: same as parameter 'suffix' for mkstemp
:param prefix: same as parameter 'prefix' for mkstemp
For example: it can be used in database tests for creating
configuration files.
"""
if path:
ensure_tree(path)
(fd, path) = tempfile.mkstemp(suffix=suffix, dir=path, prefix=prefix)
try:
os.write(fd, content)
finally:
os.close(fd)
return path

View File

@ -26,10 +26,13 @@ Usual usage in an openstack.common module:
import copy
import gettext
import logging.handlers
import logging
import os
import re
import UserString
try:
import UserString as _userString
except ImportError:
import collections as _userString
from babel import localedata
import six
@ -37,7 +40,7 @@ import six
_localedir = os.environ.get('billingstack'.upper() + '_LOCALEDIR')
_t = gettext.translation('billingstack', localedir=_localedir, fallback=True)
_AVAILABLE_LANGUAGES = []
_AVAILABLE_LANGUAGES = {}
USE_LAZY = False
@ -57,6 +60,8 @@ def _(msg):
if USE_LAZY:
return Message(msg, 'billingstack')
else:
if six.PY3:
return _t.gettext(msg)
return _t.ugettext(msg)
@ -102,24 +107,28 @@ def install(domain, lazy=False):
"""
return Message(msg, domain)
import __builtin__
__builtin__.__dict__['_'] = _lazy_gettext
from six import moves
moves.builtins.__dict__['_'] = _lazy_gettext
else:
localedir = '%s_LOCALEDIR' % domain.upper()
gettext.install(domain,
localedir=os.environ.get(localedir),
unicode=True)
if six.PY3:
gettext.install(domain,
localedir=os.environ.get(localedir))
else:
gettext.install(domain,
localedir=os.environ.get(localedir),
unicode=True)
class Message(UserString.UserString, object):
class Message(_userString.UserString, object):
"""Class used to encapsulate translatable messages."""
def __init__(self, msg, domain):
# _msg is the gettext msgid and should never change
self._msg = msg
self._left_extra_msg = ''
self._right_extra_msg = ''
self._locale = None
self.params = None
self.locale = None
self.domain = domain
@property
@ -139,8 +148,13 @@ class Message(UserString.UserString, object):
localedir=localedir,
fallback=True)
if six.PY3:
ugettext = lang.gettext
else:
ugettext = lang.ugettext
full_msg = (self._left_extra_msg +
lang.ugettext(self._msg) +
ugettext(self._msg) +
self._right_extra_msg)
if self.params is not None:
@ -148,6 +162,33 @@ class Message(UserString.UserString, object):
return six.text_type(full_msg)
@property
def locale(self):
return self._locale
@locale.setter
def locale(self, value):
self._locale = value
if not self.params:
return
# This Message object may have been constructed with one or more
# Message objects as substitution parameters, given as a single
# Message, or a tuple or Map containing some, so when setting the
# locale for this Message we need to set it for those Messages too.
if isinstance(self.params, Message):
self.params.locale = value
return
if isinstance(self.params, tuple):
for param in self.params:
if isinstance(param, Message):
param.locale = value
return
if isinstance(self.params, dict):
for param in self.params.values():
if isinstance(param, Message):
param.locale = value
def _save_dictionary_parameter(self, dict_param):
full_msg = self.data
# look for %(blah) fields in string;
@ -166,7 +207,7 @@ class Message(UserString.UserString, object):
params[key] = copy.deepcopy(dict_param[key])
except TypeError:
# cast uncopyable thing to unicode string
params[key] = unicode(dict_param[key])
params[key] = six.text_type(dict_param[key])
return params
@ -185,7 +226,7 @@ class Message(UserString.UserString, object):
try:
self.params = copy.deepcopy(other)
except TypeError:
self.params = unicode(other)
self.params = six.text_type(other)
return self
@ -194,11 +235,13 @@ class Message(UserString.UserString, object):
return self.data
def __str__(self):
if six.PY3:
return self.__unicode__()
return self.data.encode('utf-8')
def __getstate__(self):
to_copy = ['_msg', '_right_extra_msg', '_left_extra_msg',
'domain', 'params', 'locale']
'domain', 'params', '_locale']
new_dict = self.__dict__.fromkeys(to_copy)
for attr in to_copy:
new_dict[attr] = copy.deepcopy(self.__dict__[attr])
@ -252,7 +295,7 @@ class Message(UserString.UserString, object):
if name in ops:
return getattr(self.data, name)
else:
return UserString.UserString.__getattribute__(self, name)
return _userString.UserString.__getattribute__(self, name)
def get_available_languages(domain):
@ -260,8 +303,8 @@ def get_available_languages(domain):
:param domain: the domain to get languages for
"""
if _AVAILABLE_LANGUAGES:
return _AVAILABLE_LANGUAGES
if domain in _AVAILABLE_LANGUAGES:
return copy.copy(_AVAILABLE_LANGUAGES[domain])
localedir = '%s_LOCALEDIR' % domain.upper()
find = lambda x: gettext.find(domain,
@ -270,28 +313,37 @@ def get_available_languages(domain):
# NOTE(mrodden): en_US should always be available (and first in case
# order matters) since our in-line message strings are en_US
_AVAILABLE_LANGUAGES.append('en_US')
language_list = ['en_US']
# NOTE(luisg): Babel <1.0 used a function called list(), which was
# renamed to locale_identifiers() in >=1.0, the requirements master list
# requires >=0.9.6, uncapped, so defensively work with both. We can remove
# this check when the master list updates to >=1.0, and all projects udpate
# this check when the master list updates to >=1.0, and update all projects
list_identifiers = (getattr(localedata, 'list', None) or
getattr(localedata, 'locale_identifiers'))
locale_identifiers = list_identifiers()
for i in locale_identifiers:
if find(i) is not None:
_AVAILABLE_LANGUAGES.append(i)
return _AVAILABLE_LANGUAGES
language_list.append(i)
_AVAILABLE_LANGUAGES[domain] = language_list
return copy.copy(language_list)
def get_localized_message(message, user_locale):
"""Gets a localized version of the given message in the given locale."""
if (isinstance(message, Message)):
if user_locale:
message.locale = user_locale
return unicode(message)
else:
return message
"""Gets a localized version of the given message in the given locale.
If the message is not a Message object the message is returned as-is.
If the locale is None the message is translated to the default locale.
:returns: the translated message in unicode, or the original message if
it could not be translated
"""
translated = message
if isinstance(message, Message):
original_locale = message.locale
message.locale = user_locale
translated = six.text_type(message)
message.locale = original_locale
return translated
class LocaleHandler(logging.Handler):

View File

@ -38,14 +38,19 @@ import functools
import inspect
import itertools
import json
import types
import xmlrpclib
try:
import xmlrpclib
except ImportError:
# NOTE(jd): xmlrpclib is not shipped with Python 3
xmlrpclib = None
import netaddr
import six
from billingstack.openstack.common import gettextutils
from billingstack.openstack.common import importutils
from billingstack.openstack.common import timeutils
netaddr = importutils.try_import("netaddr")
_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
inspect.isfunction, inspect.isgeneratorfunction,
@ -53,7 +58,8 @@ _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
inspect.iscode, inspect.isbuiltin, inspect.isroutine,
inspect.isabstract]
_simple_types = (types.NoneType, int, basestring, bool, float, long)
_simple_types = (six.string_types + six.integer_types
+ (type(None), bool, float))
def to_primitive(value, convert_instances=False, convert_datetime=True,
@ -125,11 +131,13 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
# It's not clear why xmlrpclib created their own DateTime type, but
# for our purposes, make it a datetime type which is explicitly
# handled
if isinstance(value, xmlrpclib.DateTime):
if xmlrpclib and isinstance(value, xmlrpclib.DateTime):
value = datetime.datetime(*tuple(value.timetuple())[:6])
if convert_datetime and isinstance(value, datetime.datetime):
return timeutils.strtime(value)
elif isinstance(value, gettextutils.Message):
return value.data
elif hasattr(value, 'iteritems'):
return recursive(dict(value.iteritems()), level=level + 1)
elif hasattr(value, '__iter__'):
@ -138,7 +146,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
# Likely an instance of something. Watch for cycles.
# Ignore class member vars.
return recursive(value.__dict__, level=level + 1)
elif isinstance(value, netaddr.IPAddress):
elif netaddr and isinstance(value, netaddr.IPAddress):
return six.text_type(value)
else:
if any(test(value) for test in _nasty_type_tests):

View File

@ -20,10 +20,14 @@ import contextlib
import errno
import functools
import os
import shutil
import subprocess
import sys
import tempfile
import threading
import time
import weakref
from eventlet import semaphore
from oslo.config import cfg
from billingstack.openstack.common import fileutils
@ -39,6 +43,7 @@ util_opts = [
cfg.BoolOpt('disable_process_locking', default=False,
help='Whether to disable inter-process locks'),
cfg.StrOpt('lock_path',
default=os.environ.get("BILLINGSTACK_LOCK_PATH"),
help=('Directory to use for lock files.'))
]
@ -131,13 +136,15 @@ else:
InterProcessLock = _PosixLock
_semaphores = weakref.WeakValueDictionary()
_semaphores_lock = threading.Lock()
@contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
"""Context based lock
This function yields a `semaphore.Semaphore` instance unless external is
This function yields a `threading.Semaphore` instance (if we don't use
eventlet.monkey_patch(), else `semaphore.Semaphore`) unless external is
True, in which case, it'll yield an InterProcessLock instance.
:param lock_file_prefix: The lock_file_prefix argument is used to provide
@ -152,15 +159,12 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None):
special location for external lock files to live. If nothing is set, then
CONF.lock_path is used as a default.
"""
# NOTE(soren): If we ever go natively threaded, this will be racy.
# See http://stackoverflow.com/questions/5390569/dyn
# amically-allocating-and-destroying-mutexes
sem = _semaphores.get(name, semaphore.Semaphore())
if name not in _semaphores:
# this check is not racy - we're already holding ref locally
# so GC won't remove the item and there was no IO switch
# (only valid in greenthreads)
_semaphores[name] = sem
with _semaphores_lock:
try:
sem = _semaphores[name]
except KeyError:
sem = threading.Semaphore()
_semaphores[name] = sem
with sem:
LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})
@ -240,13 +244,14 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
def wrap(f):
@functools.wraps(f)
def inner(*args, **kwargs):
with lock(name, lock_file_prefix, external, lock_path):
LOG.debug(_('Got semaphore / lock "%(function)s"'),
try:
with lock(name, lock_file_prefix, external, lock_path):
LOG.debug(_('Got semaphore / lock "%(function)s"'),
{'function': f.__name__})
return f(*args, **kwargs)
finally:
LOG.debug(_('Semaphore / lock released "%(function)s"'),
{'function': f.__name__})
return f(*args, **kwargs)
LOG.debug(_('Semaphore / lock released "%(function)s"'),
{'function': f.__name__})
return inner
return wrap
@ -274,3 +279,27 @@ def synchronized_with_prefix(lock_file_prefix):
"""
return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)
def main(argv):
"""Create a dir for locks and pass it to command from arguments
If you run this:
python -m openstack.common.lockutils python setup.py testr <etc>
a temporary directory will be created for all your locks and passed to all
your tests in an environment variable. The temporary dir will be deleted
afterwards and the return value will be preserved.
"""
lock_dir = tempfile.mkdtemp()
os.environ["BILLINGSTACK_LOCK_PATH"] = lock_dir
try:
ret_val = subprocess.call(argv[1:])
finally:
shutil.rmtree(lock_dir, ignore_errors=True)
return ret_val
if __name__ == '__main__':
sys.exit(main(sys.argv))

View File

@ -35,10 +35,12 @@ import logging
import logging.config
import logging.handlers
import os
import re
import sys
import traceback
from oslo.config import cfg
import six
from six import moves
from billingstack.openstack.common.gettextutils import _ # noqa
@ -49,6 +51,24 @@ from billingstack.openstack.common import local
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
_SANITIZE_KEYS = ['adminPass', 'admin_pass', 'password']
# NOTE(ldbragst): Let's build a list of regex objects using the list of
# _SANITIZE_KEYS we already have. This way, we only have to add the new key
# to the list of _SANITIZE_KEYS and we can generate regular expressions
# for XML and JSON automatically.
_SANITIZE_PATTERNS = []
_FORMAT_PATTERNS = [r'(%(key)s\s*[=]\s*[\"\']).*?([\"\'])',
r'(<%(key)s>).*?(</%(key)s>)',
r'([\"\']%(key)s[\"\']\s*:\s*[\"\']).*?([\"\'])',
r'([\'"].*?%(key)s[\'"]\s*:\s*u?[\'"]).*?([\'"])']
for key in _SANITIZE_KEYS:
for pattern in _FORMAT_PATTERNS:
reg_ex = re.compile(pattern % {'key': key}, re.DOTALL)
_SANITIZE_PATTERNS.append(reg_ex)
common_cli_opts = [
cfg.BoolOpt('debug',
short='d',
@ -63,11 +83,13 @@ common_cli_opts = [
]
logging_cli_opts = [
cfg.StrOpt('log-config',
cfg.StrOpt('log-config-append',
metavar='PATH',
help='If this option is specified, the logging configuration '
'file specified is used and overrides any other logging '
'options specified. Please see the Python logging module '
deprecated_name='log-config',
help='The name of logging configuration file. It does not '
'disable existing loggers, but just appends specified '
'logging configuration to any other existing logging '
'options. Please see the Python logging module '
'documentation for details on logging configuration '
'files.'),
cfg.StrOpt('log-format',
@ -126,12 +148,14 @@ log_opts = [
help='prefix each line of exception output with this format'),
cfg.ListOpt('default_log_levels',
default=[
'amqp=WARN',
'amqplib=WARN',
'sqlalchemy=WARN',
'boto=WARN',
'suds=INFO',
'keystone=INFO',
'eventlet.wsgi.server=WARN'
'qpid=WARN',
'sqlalchemy=WARN',
'suds=INFO',
'iso8601=WARN',
],
help='list of logger=LEVEL pairs'),
cfg.BoolOpt('publish_errors',
@ -207,6 +231,41 @@ def _get_log_file_path(binary=None):
binary = binary or _get_binary_name()
return '%s.log' % (os.path.join(logdir, binary),)
return None
def mask_password(message, secret="***"):
"""Replace password with 'secret' in message.
:param message: The string which includes security information.
:param secret: value with which to replace passwords, defaults to "***".
:returns: The unicode value of message with the password fields masked.
For example:
>>> mask_password("'adminPass' : 'aaaaa'")
"'adminPass' : '***'"
>>> mask_password("'admin_pass' : 'aaaaa'")
"'admin_pass' : '***'"
>>> mask_password('"password" : "aaaaa"')
'"password" : "***"'
>>> mask_password("'original_password' : 'aaaaa'")
"'original_password' : '***'"
>>> mask_password("u'original_password' : u'aaaaa'")
"u'original_password' : u'***'"
"""
message = six.text_type(message)
# NOTE(ldbragst): Check to see if anything in message contains any key
# specified in _SANITIZE_KEYS, if not then just return the message since
# we don't have to mask any passwords.
if not any(key in message for key in _SANITIZE_KEYS):
return message
secret = r'\g<1>' + secret + r'\g<2>'
for pattern in _SANITIZE_PATTERNS:
message = re.sub(pattern, secret, message)
return message
class BaseLoggerAdapter(logging.LoggerAdapter):
@ -249,6 +308,13 @@ class ContextAdapter(BaseLoggerAdapter):
self.warn(stdmsg, *args, **kwargs)
def process(self, msg, kwargs):
# NOTE(mrodden): catch any Message/other object and
# coerce to unicode before they can get
# to the python logging and possibly
# cause string encoding trouble
if not isinstance(msg, six.string_types):
msg = six.text_type(msg)
if 'extra' not in kwargs:
kwargs['extra'] = {}
extra = kwargs['extra']
@ -260,14 +326,14 @@ class ContextAdapter(BaseLoggerAdapter):
extra.update(_dictify_context(context))
instance = kwargs.pop('instance', None)
instance_uuid = (extra.get('instance_uuid', None) or
kwargs.pop('instance_uuid', None))
instance_extra = ''
if instance:
instance_extra = CONF.instance_format % instance
else:
instance_uuid = kwargs.pop('instance_uuid', None)
if instance_uuid:
instance_extra = (CONF.instance_uuid_format
% {'uuid': instance_uuid})
elif instance_uuid:
instance_extra = (CONF.instance_uuid_format
% {'uuid': instance_uuid})
extra.update({'instance': instance_extra})
extra.update({"project": self.project})
@ -344,17 +410,18 @@ class LogConfigError(Exception):
err_msg=self.err_msg)
def _load_log_config(log_config):
def _load_log_config(log_config_append):
try:
logging.config.fileConfig(log_config)
logging.config.fileConfig(log_config_append,
disable_existing_loggers=False)
except moves.configparser.Error as exc:
raise LogConfigError(log_config, str(exc))
raise LogConfigError(log_config_append, str(exc))
def setup(product_name):
"""Setup logging."""
if CONF.log_config:
_load_log_config(CONF.log_config)
if CONF.log_config_append:
_load_log_config(CONF.log_config_append)
else:
_setup_logging_from_conf()
sys.excepthook = _create_logging_excepthook(product_name)

View File

@ -43,4 +43,5 @@ def notify(context, message):
rpc.notify(context, topic, message)
except Exception:
LOG.exception(_("Could not send notification to %(topic)s. "
"Payload=%(message)s"), locals())
"Payload=%(message)s"),
{"topic": topic, "message": message})

View File

@ -49,4 +49,5 @@ def notify(context, message):
rpc.notify(context, topic, message, envelope=True)
except Exception:
LOG.exception(_("Could not send notification to %(topic)s. "
"Payload=%(message)s"), locals())
"Payload=%(message)s"),
{"topic": topic, "message": message})

View File

@ -132,7 +132,7 @@ def execute(*cmd, **kwargs):
raise UnknownArgumentError(_('Got unknown keyword args '
'to utils.execute: %r') % kwargs)
if run_as_root and os.geteuid() != 0:
if run_as_root and hasattr(os, 'geteuid') and os.geteuid() != 0:
if not root_helper:
raise NoRootWrapSpecified(
message=('Command requested root, but did not specify a root '
@ -168,14 +168,13 @@ def execute(*cmd, **kwargs):
result = obj.communicate()
obj.stdin.close() # pylint: disable=E1101
_returncode = obj.returncode # pylint: disable=E1101
if _returncode:
LOG.log(loglevel, _('Result was %s') % _returncode)
if not ignore_exit_code and _returncode not in check_exit_code:
(stdout, stderr) = result
raise ProcessExecutionError(exit_code=_returncode,
stdout=stdout,
stderr=stderr,
cmd=' '.join(cmd))
LOG.log(loglevel, _('Result was %s') % _returncode)
if not ignore_exit_code and _returncode not in check_exit_code:
(stdout, stderr) = result
raise ProcessExecutionError(exit_code=_returncode,
stdout=stdout,
stderr=stderr,
cmd=' '.join(cmd))
return result
except ProcessExecutionError:
if not attempts:

View File

@ -61,7 +61,7 @@ rpc_opts = [
'exceptions',
],
help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'),
' upon receiving exception data from an rpc call.'),
cfg.BoolOpt('fake_rabbit',
default=False,
help='If passed, use a fake RabbitMQ provider'),
@ -227,7 +227,7 @@ def notify(context, topic, msg, envelope=False):
def cleanup():
"""Clean up resoruces in use by implementation.
"""Clean up resources in use by implementation.
Clean up any resources that have been allocated by the RPC implementation.
This is typically open connections to a messaging service. This function

View File

@ -20,9 +20,9 @@
"""
Shared code between AMQP based openstack.common.rpc implementations.
The code in this module is shared between the rpc implemenations based on AMQP.
Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
AMQP, but is deprecated and predates this code.
The code in this module is shared between the rpc implementations based on
AMQP. Specifically, this includes impl_kombu and impl_qpid. impl_carrot also
uses AMQP, but is deprecated and predates this code.
"""
import collections
@ -189,7 +189,7 @@ class ReplyProxy(ConnectionContext):
def __init__(self, conf, connection_pool):
self._call_waiters = {}
self._num_call_waiters = 0
self._num_call_waiters_wrn_threshhold = 10
self._num_call_waiters_wrn_threshold = 10
self._reply_q = 'reply_' + uuid.uuid4().hex
super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
self.declare_direct_consumer(self._reply_q, self._process_data)
@ -208,11 +208,11 @@ class ReplyProxy(ConnectionContext):
def add_call_waiter(self, waiter, msg_id):
self._num_call_waiters += 1
if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
if self._num_call_waiters > self._num_call_waiters_wrn_threshold:
LOG.warn(_('Number of call waiters is greater than warning '
'threshhold: %d. There could be a MulticallProxyWaiter '
'leak.') % self._num_call_waiters_wrn_threshhold)
self._num_call_waiters_wrn_threshhold *= 2
'threshold: %d. There could be a MulticallProxyWaiter '
'leak.') % self._num_call_waiters_wrn_threshold)
self._num_call_waiters_wrn_threshold *= 2
self._call_waiters[msg_id] = waiter
def del_call_waiter(self, msg_id):
@ -241,7 +241,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
_add_unique_id(msg)
# If a reply_q exists, add the msg_id to the reply and pass the
# reply_q to direct_send() to use it as the response queue.
# Otherwise use the msg_id for backward compatibilty.
# Otherwise use the msg_id for backward compatibility.
if reply_q:
msg['_msg_id'] = msg_id
conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
@ -364,22 +364,43 @@ class CallbackWrapper(_ThreadPoolWithWait):
Allows it to be invoked in a green thread.
"""
def __init__(self, conf, callback, connection_pool):
def __init__(self, conf, callback, connection_pool,
wait_for_consumers=False):
"""Initiates CallbackWrapper object.
:param conf: cfg.CONF instance
:param callback: a callable (probably a function)
:param connection_pool: connection pool as returned by
get_connection_pool()
:param wait_for_consumers: wait for all green threads to
complete and raise the last
caught exception, if any.
"""
super(CallbackWrapper, self).__init__(
conf=conf,
connection_pool=connection_pool,
)
self.callback = callback
self.wait_for_consumers = wait_for_consumers
self.exc_info = None
def _wrap(self, message_data, **kwargs):
"""Wrap the callback invocation to catch exceptions.
"""
try:
self.callback(message_data, **kwargs)
except Exception:
self.exc_info = sys.exc_info()
def __call__(self, message_data):
self.pool.spawn_n(self.callback, message_data)
self.exc_info = None
self.pool.spawn_n(self._wrap, message_data)
if self.wait_for_consumers:
self.pool.waitall()
if self.exc_info:
raise self.exc_info[1], None, self.exc_info[2]
class ProxyCallback(_ThreadPoolWithWait):

View File

@ -29,6 +29,7 @@ from billingstack.openstack.common import importutils
from billingstack.openstack.common import jsonutils
from billingstack.openstack.common import local
from billingstack.openstack.common import log as logging
from billingstack.openstack.common import versionutils
CONF = cfg.CONF
@ -441,19 +442,15 @@ def client_exceptions(*exceptions):
return outer
# TODO(sirp): we should deprecate this in favor of
# using `versionutils.is_compatible` directly
def version_is_compatible(imp_version, version):
"""Determine whether versions are compatible.
:param imp_version: The version implemented
:param version: The version requested by an incoming message.
"""
version_parts = version.split('.')
imp_version_parts = imp_version.split('.')
if int(version_parts[0]) != int(imp_version_parts[0]): # Major
return False
if int(version_parts[1]) > int(imp_version_parts[1]): # Minor
return False
return True
return versionutils.is_compatible(version, imp_version)
def serialize_msg(raw_msg):

View File

@ -146,7 +146,7 @@ def multicall(conf, context, topic, msg, timeout=None):
try:
consumer = CONSUMERS[topic][0]
except (KeyError, IndexError):
return iter([None])
raise rpc_common.Timeout("No consumers available")
else:
return consumer.call(context, version, method, namespace, args,
timeout)

View File

@ -146,29 +146,23 @@ class ConsumerBase(object):
Messages that are processed without exception are ack'ed.
If the message processing generates an exception, it will be
ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed.
Rejection is better than waiting for the message to timeout.
Rejected messages are immediately requeued.
ack'ed if ack_on_error=True. Otherwise it will be .requeue()'ed.
"""
ack_msg = False
try:
msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
ack_msg = True
except Exception:
if self.ack_on_error:
ack_msg = True
LOG.exception(_("Failed to process message"
" ... skipping it."))
message.ack()
else:
LOG.exception(_("Failed to process message"
" ... will requeue."))
finally:
if ack_msg:
message.ack()
else:
message.reject()
message.requeue()
else:
message.ack()
def consume(self, *args, **kwargs):
"""Actually declare the consumer on the amqp channel. This will
@ -789,6 +783,7 @@ class Connection(object):
callback=callback,
connection_pool=rpc_amqp.get_connection_pool(self.conf,
Connection),
wait_for_consumers=not ack_on_error
)
self.proxy_callbacks.append(callback_wrapper)
self.declare_topic_consumer(

View File

@ -67,6 +67,17 @@ qpid_opts = [
cfg.BoolOpt('qpid_tcp_nodelay',
default=True,
help='Disable Nagle algorithm'),
# NOTE(russellb) If any additional versions are added (beyond 1 and 2),
# this file could probably use some additional refactoring so that the
# differences between each version are split into different classes.
cfg.IntOpt('qpid_topology_version',
default=1,
help="The qpid topology version to use. Version 1 is what "
"was originally used by impl_qpid. Version 2 includes "
"some backwards-incompatible changes that allow broker "
"federation to work. Users should update to version 2 "
"when they are able to take everything down, as it "
"requires a clean break."),
]
cfg.CONF.register_opts(qpid_opts)
@ -74,10 +85,17 @@ cfg.CONF.register_opts(qpid_opts)
JSON_CONTENT_TYPE = 'application/json; charset=utf8'
def raise_invalid_topology_version(conf):
msg = (_("Invalid value for qpid_topology_version: %d") %
conf.qpid_topology_version)
LOG.error(msg)
raise Exception(msg)
class ConsumerBase(object):
"""Consumer base class."""
def __init__(self, session, callback, node_name, node_opts,
def __init__(self, conf, session, callback, node_name, node_opts,
link_name, link_opts):
"""Declare a queue on an amqp session.
@ -95,26 +113,38 @@ class ConsumerBase(object):
self.receiver = None
self.session = None
addr_opts = {
"create": "always",
"node": {
"type": "topic",
"x-declare": {
if conf.qpid_topology_version == 1:
addr_opts = {
"create": "always",
"node": {
"type": "topic",
"x-declare": {
"durable": True,
"auto-delete": True,
},
},
"link": {
"name": link_name,
"durable": True,
"auto-delete": True,
"x-declare": {
"durable": False,
"auto-delete": True,
"exclusive": False,
},
},
},
"link": {
"name": link_name,
"durable": True,
"x-declare": {
"durable": False,
"auto-delete": True,
"exclusive": False,
}
addr_opts["node"]["x-declare"].update(node_opts)
elif conf.qpid_topology_version == 2:
addr_opts = {
"link": {
"x-declare": {
"auto-delete": True,
},
},
},
}
addr_opts["node"]["x-declare"].update(node_opts)
}
else:
raise_invalid_topology_version()
addr_opts["link"]["x-declare"].update(link_opts)
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
@ -122,7 +152,7 @@ class ConsumerBase(object):
self.connect(session)
def connect(self, session):
"""Declare the reciever on connect."""
"""Declare the receiver on connect."""
self._declare_receiver(session)
def reconnect(self, session):
@ -181,16 +211,24 @@ class DirectConsumer(ConsumerBase):
'callback' is the callback to call when messages are received
"""
super(DirectConsumer, self).__init__(
session, callback,
"%s/%s" % (msg_id, msg_id),
{"type": "direct"},
msg_id,
{
"auto-delete": conf.amqp_auto_delete,
"exclusive": True,
"durable": conf.amqp_durable_queues,
})
link_opts = {
"auto-delete": conf.amqp_auto_delete,
"exclusive": True,
"durable": conf.amqp_durable_queues,
}
if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (msg_id, msg_id)
node_opts = {"type": "direct"}
elif conf.qpid_topology_version == 2:
node_name = "amq.direct/%s" % msg_id
node_opts = {}
else:
raise_invalid_topology_version()
super(DirectConsumer, self).__init__(conf, session, callback,
node_name, node_opts, msg_id,
link_opts)
class TopicConsumer(ConsumerBase):
@ -208,14 +246,20 @@ class TopicConsumer(ConsumerBase):
"""
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
super(TopicConsumer, self).__init__(
session, callback,
"%s/%s" % (exchange_name, topic),
{}, name or topic,
{
"auto-delete": conf.amqp_auto_delete,
"durable": conf.amqp_durable_queues,
})
link_opts = {
"auto-delete": conf.amqp_auto_delete,
"durable": conf.amqp_durable_queues,
}
if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (exchange_name, topic)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
else:
raise_invalid_topology_version()
super(TopicConsumer, self).__init__(conf, session, callback, node_name,
{}, name or topic, link_opts)
class FanoutConsumer(ConsumerBase):
@ -230,12 +274,22 @@ class FanoutConsumer(ConsumerBase):
"""
self.conf = conf
super(FanoutConsumer, self).__init__(
session, callback,
"%s_fanout" % topic,
{"durable": False, "type": "fanout"},
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
{"exclusive": True})
link_opts = {"exclusive": True}
if conf.qpid_topology_version == 1:
node_name = "%s_fanout" % topic
node_opts = {"durable": False, "type": "fanout"}
link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/fanout/%s" % topic
node_opts = {}
link_name = ""
else:
raise_invalid_topology_version()
super(FanoutConsumer, self).__init__(conf, session, callback,
node_name, node_opts, link_name,
link_opts)
def reconnect(self, session):
topic = self.get_node_name().rpartition('_fanout')[0]
@ -253,29 +307,34 @@ class FanoutConsumer(ConsumerBase):
class Publisher(object):
"""Base Publisher class."""
def __init__(self, session, node_name, node_opts=None):
def __init__(self, conf, session, node_name, node_opts=None):
"""Init the Publisher class with the exchange_name, routing_key,
and other options
"""
self.sender = None
self.session = session
addr_opts = {
"create": "always",
"node": {
"type": "topic",
"x-declare": {
"durable": False,
# auto-delete isn't implemented for exchanges in qpid,
# but put in here anyway
"auto-delete": True,
if conf.qpid_topology_version == 1:
addr_opts = {
"create": "always",
"node": {
"type": "topic",
"x-declare": {
"durable": False,
# auto-delete isn't implemented for exchanges in qpid,
# but put in here anyway
"auto-delete": True,
},
},
},
}
if node_opts:
addr_opts["node"]["x-declare"].update(node_opts)
}
if node_opts:
addr_opts["node"]["x-declare"].update(node_opts)
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
elif conf.qpid_topology_version == 2:
self.address = node_name
else:
raise_invalid_topology_version()
self.reconnect(session)
@ -319,39 +378,73 @@ class DirectPublisher(Publisher):
"""Publisher class for 'direct'."""
def __init__(self, conf, session, msg_id):
"""Init a 'direct' publisher."""
super(DirectPublisher, self).__init__(session, msg_id,
{"type": "direct"})
if conf.qpid_topology_version == 1:
node_name = msg_id
node_opts = {"type": "direct"}
elif conf.qpid_topology_version == 2:
node_name = "amq.direct/%s" % msg_id
node_opts = {}
else:
raise_invalid_topology_version()
super(DirectPublisher, self).__init__(conf, session, node_name,
node_opts)
class TopicPublisher(Publisher):
"""Publisher class for 'topic'."""
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""Init a 'topic' publisher.
"""
exchange_name = rpc_amqp.get_control_exchange(conf)
super(TopicPublisher, self).__init__(session,
"%s/%s" % (exchange_name, topic))
if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (exchange_name, topic)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
else:
raise_invalid_topology_version()
super(TopicPublisher, self).__init__(conf, session, node_name)
class FanoutPublisher(Publisher):
"""Publisher class for 'fanout'."""
def __init__(self, conf, session, topic):
"""init a 'fanout' publisher.
"""Init a 'fanout' publisher.
"""
super(FanoutPublisher, self).__init__(
session,
"%s_fanout" % topic, {"type": "fanout"})
if conf.qpid_topology_version == 1:
node_name = "%s_fanout" % topic
node_opts = {"type": "fanout"}
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/fanout/%s" % topic
node_opts = {}
else:
raise_invalid_topology_version()
super(FanoutPublisher, self).__init__(conf, session, node_name,
node_opts)
class NotifyPublisher(Publisher):
"""Publisher class for notifications."""
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""Init a 'topic' publisher.
"""
exchange_name = rpc_amqp.get_control_exchange(conf)
super(NotifyPublisher, self).__init__(session,
"%s/%s" % (exchange_name, topic),
{"durable": True})
node_opts = {"durable": True}
if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (exchange_name, topic)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
else:
raise_invalid_topology_version()
super(NotifyPublisher, self).__init__(conf, session, node_name,
node_opts)
class Connection(object):
@ -665,6 +758,7 @@ class Connection(object):
callback=callback,
connection_pool=rpc_amqp.get_connection_pool(self.conf,
Connection),
wait_for_consumers=not ack_on_error
)
self.proxy_callbacks.append(callback_wrapper)

View File

@ -192,7 +192,7 @@ class ZmqSocket(object):
# it would be much worse if some of the code calling this
# were to fail. For now, lets log, and later evaluate
# if we can safely raise here.
LOG.error("ZeroMQ socket could not be closed.")
LOG.error(_("ZeroMQ socket could not be closed."))
self.sock = None
def recv(self, **kwargs):

View File

@ -21,7 +21,6 @@ For more information about rpc API version numbers, see:
rpc/dispatcher.py
"""
from billingstack.openstack.common import rpc
from billingstack.openstack.common.rpc import common as rpc_common
from billingstack.openstack.common.rpc import serializer as rpc_serializer
@ -36,7 +35,7 @@ class RpcProxy(object):
rpc API.
"""
# The default namespace, which can be overriden in a subclass.
# The default namespace, which can be overridden in a subclass.
RPC_API_NAMESPACE = None
def __init__(self, topic, default_version, version_cap=None,

View File

@ -16,10 +16,12 @@
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class Serializer(object):
"""Generic (de-)serialization definition base class."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def serialize_entity(self, context, entity):

View File

@ -20,6 +20,7 @@
"""Generic Node base class for all workers that run on hosts."""
import errno
import logging as std_logging
import os
import random
import signal
@ -28,7 +29,6 @@ import time
import eventlet
from eventlet import event
import logging as std_logging
from oslo.config import cfg
from billingstack.openstack.common import eventlet_backdoor
@ -43,6 +43,29 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def _sighup_supported():
return hasattr(signal, 'SIGHUP')
def _is_sighup(signo):
return _sighup_supported() and signo == signal.SIGHUP
def _signo_to_signame(signo):
signals = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT'}
if _sighup_supported():
signals[signal.SIGHUP] = 'SIGHUP'
return signals[signo]
def _set_signals_handler(handler):
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)
if _sighup_supported():
signal.signal(signal.SIGHUP, handler)
class Launcher(object):
"""Launch one or more services and wait for them to complete."""
@ -100,18 +123,13 @@ class SignalExit(SystemExit):
class ServiceLauncher(Launcher):
def _handle_signal(self, signo, frame):
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGHUP, signal.SIG_DFL)
_set_signals_handler(signal.SIG_DFL)
raise SignalExit(signo)
def handle_signal(self):
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGHUP, self._handle_signal)
_set_signals_handler(self._handle_signal)
def _wait_for_exit_or_signal(self):
def _wait_for_exit_or_signal(self, ready_callback=None):
status = None
signo = 0
@ -119,11 +137,11 @@ class ServiceLauncher(Launcher):
CONF.log_opt_values(LOG, std_logging.DEBUG)
try:
if ready_callback:
ready_callback()
super(ServiceLauncher, self).wait()
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT',
signal.SIGHUP: 'SIGHUP'}[exc.signo]
signame = _signo_to_signame(exc.signo)
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
@ -140,11 +158,11 @@ class ServiceLauncher(Launcher):
return status, signo
def wait(self):
def wait(self, ready_callback=None):
while True:
self.handle_signal()
status, signo = self._wait_for_exit_or_signal()
if signo != signal.SIGHUP:
status, signo = self._wait_for_exit_or_signal(ready_callback)
if not _is_sighup(signo):
return status
self.restart()
@ -167,18 +185,14 @@ class ProcessLauncher(object):
self.handle_signal()
def handle_signal(self):
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGHUP, self._handle_signal)
_set_signals_handler(self._handle_signal)
def _handle_signal(self, signo, frame):
self.sigcaught = signo
self.running = False
# Allow the process to be killed again and die from natural causes
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGHUP, signal.SIG_DFL)
_set_signals_handler(signal.SIG_DFL)
def _pipe_watcher(self):
# This will block until the write end is closed when the parent
@ -200,7 +214,8 @@ class ProcessLauncher(object):
raise SignalExit(signal.SIGHUP)
signal.signal(signal.SIGTERM, _sigterm)
signal.signal(signal.SIGHUP, _sighup)
if _sighup_supported():
signal.signal(signal.SIGHUP, _sighup)
# Block SIGINT and let the parent send us a SIGTERM
signal.signal(signal.SIGINT, signal.SIG_IGN)
@ -208,12 +223,13 @@ class ProcessLauncher(object):
status = None
signo = 0
# NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children.
try:
launcher.wait()
except SignalExit as exc:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT',
signal.SIGHUP: 'SIGHUP'}[exc.signo]
signame = _signo_to_signame(exc.signo)
LOG.info(_('Caught %s, exiting'), signame)
status = exc.code
signo = exc.signo
@ -262,14 +278,11 @@ class ProcessLauncher(object):
pid = os.fork()
if pid == 0:
# NOTE(johannes): All exceptions are caught to ensure this
# doesn't fallback into the loop spawning children. It would
# be bad for a child to spawn more children.
launcher = self._child_process(wrap.service)
while True:
self._child_process_handle_signal()
status, signo = self._child_wait_for_exit_or_signal(launcher)
if signo != signal.SIGHUP:
if not _is_sighup(signo):
break
launcher.restart()
@ -339,11 +352,9 @@ class ProcessLauncher(object):
self.handle_signal()
self._respawn_children()
if self.sigcaught:
signame = {signal.SIGTERM: 'SIGTERM',
signal.SIGINT: 'SIGINT',
signal.SIGHUP: 'SIGHUP'}[self.sigcaught]
signame = _signo_to_signame(self.sigcaught)
LOG.info(_('Caught %s, stopping children'), signame)
if self.sigcaught != signal.SIGHUP:
if not _is_sighup(self.sigcaught):
break
for pid in self.children:

View File

@ -0,0 +1,54 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010-2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Common utilities used in testing"""
import os
import fixtures
import testtools
class BaseTestCase(testtools.TestCase):
def setUp(self):
super(BaseTestCase, self).setUp()
self._set_timeout()
self._fake_output()
self.useFixture(fixtures.FakeLogger('billingstack.openstack.common'))
self.useFixture(fixtures.NestedTempfile())
self.useFixture(fixtures.TempHomeDir())
def _set_timeout(self):
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
try:
test_timeout = int(test_timeout)
except ValueError:
# If timeout value is invalid do not set a timeout.
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
def _fake_output(self):
if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
os.environ.get('OS_STDOUT_CAPTURE') == '1'):
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
os.environ.get('OS_STDERR_CAPTURE') == '1'):
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))

View File

@ -48,6 +48,9 @@ class Thread(object):
def wait(self):
return self.thread.wait()
def link(self, func, *args, **kwargs):
self.thread.link(func, *args, **kwargs)
class ThreadGroup(object):
"""The point of the ThreadGroup classis to:
@ -79,6 +82,7 @@ class ThreadGroup(object):
gt = self.pool.spawn(callback, *args, **kwargs)
th = Thread(gt, self)
self.threads.append(th)
return th
def thread_done(self, thread):
self.threads.remove(thread)

View File

@ -21,6 +21,7 @@ Time related utilities and helper functions.
import calendar
import datetime
import time
import iso8601
import six
@ -49,9 +50,9 @@ def parse_isotime(timestr):
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
raise ValueError(unicode(e))
raise ValueError(six.text_type(e))
except TypeError as e:
raise ValueError(unicode(e))
raise ValueError(six.text_type(e))
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
@ -90,6 +91,11 @@ def is_newer_than(after, seconds):
def utcnow_ts():
"""Timestamp version of our utcnow function."""
if utcnow.override_time is None:
# NOTE(kgriffs): This is several times faster
# than going through calendar.timegm(...)
return int(time.time())
return calendar.timegm(utcnow().timetuple())
@ -111,12 +117,15 @@ def iso8601_from_timestamp(timestamp):
utcnow.override_time = None
def set_time_override(override_time=datetime.datetime.utcnow()):
def set_time_override(override_time=None):
"""Overrides utils.utcnow.
Make it return a constant time or a list thereof, one at a time.
:param override_time: datetime instance or list thereof. If not
given, defaults to the current UTC time.
"""
utcnow.override_time = override_time
utcnow.override_time = override_time or datetime.datetime.utcnow()
def advance_time_delta(timedelta):

View File

@ -0,0 +1,45 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Helpers for comparing version strings.
"""
import pkg_resources
def is_compatible(requested_version, current_version, same_major=True):
"""Determine whether `requested_version` is satisfied by
`current_version`; in other words, `current_version` is >=
`requested_version`.
:param requested_version: version to check for compatibility
:param current_version: version to check against
:param same_major: if True, the major version must be identical between
`requested_version` and `current_version`. This is used when a
major-version difference indicates incompatibility between the two
versions. Since this is the common-case in practice, the default is
True.
:returns: True if compatible, False if not
"""
requested_parts = pkg_resources.parse_version(requested_version)
current_parts = pkg_resources.parse_version(current_version)
if same_major and (requested_parts[0] != current_parts[0]):
return False
return current_parts >= requested_parts

View File

@ -1 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4

View File

@ -1,97 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
from billingstack.taskflow import functor_task
from billingstack.taskflow import utils
def wraps(fn):
"""This will not be needed in python 3.2 or greater which already has this
built-in to its functools.wraps method.
"""
def wrapper(f):
f = functools.wraps(fn)(f)
f.__wrapped__ = getattr(fn, '__wrapped__', fn)
return f
return wrapper
def locked(*args, **kwargs):
def decorator(f):
attr_name = kwargs.get('lock', '_lock')
@wraps(f)
def wrapper(*args, **kwargs):
lock = getattr(args[0], attr_name)
with lock:
return f(*args, **kwargs)
return wrapper
# This is needed to handle when the decorator has args or the decorator
# doesn't have args, python is rather weird here...
if kwargs or not args:
return decorator
else:
if len(args) == 1:
return decorator(args[0])
else:
return decorator
def _original_function(fun):
"""Get original function from static or class method"""
if isinstance(fun, staticmethod):
return fun.__get__(object())
elif isinstance(fun, classmethod):
return fun.__get__(object()).im_func
return fun
def task(*args, **kwargs):
"""Decorates a given function so that it can be used as a task"""
def decorator(f):
def task_factory(execute_with, **factory_kwargs):
merged = kwargs.copy()
merged.update(factory_kwargs)
# NOTE(imelnikov): we can't capture f here because for
# bound methods and bound class methods the object it
# is bound to is yet unknown at the moment
return functor_task.FunctorTask(execute_with, **merged)
w_f = _original_function(f)
setattr(w_f, utils.TASK_FACTORY_ATTRIBUTE, task_factory)
return f
# This is needed to handle when the decorator has args or the decorator
# doesn't have args, python is rather weird here...
if kwargs:
if args:
raise TypeError('task decorator takes 0 positional arguments,'
'%s given' % len(args))
return decorator
else:
if len(args) == 1:
return decorator(args[0])
else:
return decorator

View File

@ -1,77 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class TaskFlowException(Exception):
"""Base class for exceptions emitted from this library."""
pass
class Duplicate(TaskFlowException):
"""Raised when a duplicate entry is found."""
pass
class StorageError(TaskFlowException):
"""Raised when logbook can not be read/saved/deleted."""
def __init__(self, message, cause=None):
super(StorageError, self).__init__(message)
self.cause = cause
class NotFound(TaskFlowException):
"""Raised when some entry in some object doesn't exist."""
pass
class AlreadyExists(TaskFlowException):
"""Raised when some entry in some object already exists."""
pass
class ClosedException(TaskFlowException):
"""Raised when an access on a closed object occurs."""
pass
class InvalidStateException(TaskFlowException):
"""Raised when a task/job/workflow is in an invalid state when an
operation is attempting to apply to said task/job/workflow.
"""
pass
class UnclaimableJobException(TaskFlowException):
"""Raised when a job can not be claimed."""
pass
class JobNotFound(TaskFlowException):
"""Raised when a job entry can not be found."""
pass
class MissingDependencies(InvalidStateException):
"""Raised when a entity has dependencies that can not be satisified."""
message = ("%(who)s requires %(requirements)s but no other entity produces"
" said requirements")
def __init__(self, who, requirements):
message = self.message % {'who': who, 'requirements': requirements}
super(MissingDependencies, self).__init__(message)

View File

@ -1,216 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import threading
from billingstack.openstack.common import uuidutils
from billingstack.taskflow import exceptions as exc
from billingstack.taskflow import states
from billingstack.taskflow import utils
class Flow(object):
"""The base abstract class of all flow implementations.
It provides a set of parents to flows that have a concept of parent flows
as well as a state and state utility functions to the deriving classes. It
also provides a name and an identifier (uuid or other) to the flow so that
it can be uniquely identifed among many flows.
Flows are expected to provide (if desired) the following methods:
- add
- add_many
- interrupt
- reset
- rollback
- run
- soft_reset
"""
__metaclass__ = abc.ABCMeta
# Common states that certain actions can be performed in. If the flow
# is not in these sets of states then it is likely that the flow operation
# can not succeed.
RESETTABLE_STATES = set([
states.INTERRUPTED,
states.SUCCESS,
states.PENDING,
states.FAILURE,
])
SOFT_RESETTABLE_STATES = set([
states.INTERRUPTED,
])
UNINTERRUPTIBLE_STATES = set([
states.FAILURE,
states.SUCCESS,
states.PENDING,
])
RUNNABLE_STATES = set([
states.PENDING,
])
def __init__(self, name, parents=None, uuid=None):
self._name = str(name)
# The state of this flow.
self._state = states.PENDING
# If this flow has a parent flow/s which need to be reverted if
# this flow fails then please include them here to allow this child
# to call the parents...
if parents:
self.parents = tuple(parents)
else:
self.parents = tuple([])
# Any objects that want to listen when a wf/task starts/stops/completes
# or errors should be registered here. This can be used to monitor
# progress and record tasks finishing (so that it becomes possible to
# store the result of a task in some persistent or semi-persistent
# storage backend).
self.notifier = utils.TransitionNotifier()
self.task_notifier = utils.TransitionNotifier()
# Assign this flow a unique identifer.
if uuid:
self._id = str(uuid)
else:
self._id = uuidutils.generate_uuid()
# Ensure we can not change the state at the same time in 2 different
# threads.
self._state_lock = threading.RLock()
@property
def name(self):
"""A non-unique name for this flow (human readable)"""
return self._name
@property
def uuid(self):
return self._id
@property
def state(self):
"""Provides a read-only view of the flow state."""
return self._state
def _change_state(self, context, new_state, check_func=None, notify=True):
old_state = None
changed = False
with self._state_lock:
if self.state != new_state:
if (not check_func or
(check_func and check_func(self.state))):
changed = True
old_state = self.state
self._state = new_state
# Don't notify while holding the lock so that the reciever of said
# notifications can actually perform operations on the given flow
# without getting into deadlock.
if notify and changed:
self.notifier.notify(self.state, details={
'context': context,
'flow': self,
'old_state': old_state,
})
return changed
def __str__(self):
lines = ["Flow: %s" % (self.name)]
lines.append("%s" % (self.uuid))
lines.append("%s" % (len(self.parents)))
lines.append("%s" % (self.state))
return "; ".join(lines)
@abc.abstractmethod
def add(self, task):
"""Adds a given task to this flow.
Returns the uuid that is associated with the task for later operations
before and after it is ran.
"""
raise NotImplementedError()
def add_many(self, tasks):
"""Adds many tasks to this flow.
Returns a list of uuids (one for each task added).
"""
uuids = []
for t in tasks:
uuids.append(self.add(t))
return uuids
def interrupt(self):
"""Attempts to interrupt the current flow and any tasks that are
currently not running in the flow.
Returns how many tasks were interrupted (if any).
"""
def check():
if self.state in self.UNINTERRUPTIBLE_STATES:
raise exc.InvalidStateException(("Can not interrupt when"
" in state %s") % self.state)
check()
with self._state_lock:
check()
self._change_state(None, states.INTERRUPTED)
return 0
def reset(self):
"""Fully resets the internal state of this flow, allowing for the flow
to be ran again.
Note: Listeners are also reset.
"""
def check():
if self.state not in self.RESETTABLE_STATES:
raise exc.InvalidStateException(("Can not reset when"
" in state %s") % self.state)
check()
with self._state_lock:
check()
self.notifier.reset()
self.task_notifier.reset()
self._change_state(None, states.PENDING)
def soft_reset(self):
"""Partially resets the internal state of this flow, allowing for the
flow to be ran again from an interrupted state.
"""
def check():
if self.state not in self.SOFT_RESETTABLE_STATES:
raise exc.InvalidStateException(("Can not soft reset when"
" in state %s") % self.state)
check()
with self._state_lock:
check()
self._change_state(None, states.PENDING)
@abc.abstractmethod
def run(self, context, *args, **kwargs):
"""Executes the workflow."""
raise NotImplementedError()
def rollback(self, context, cause):
"""Performs rollback of this workflow and any attached parent workflows
if present.
"""
pass

View File

@ -1,95 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 AT&T Labs Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import inspect
from billingstack.taskflow import task as base
# These arguments are ones that we will skip when parsing for requirements
# for a function to operate (when used as a task).
AUTO_ARGS = ('self', 'context', 'cls')
def _filter_arg(arg):
if arg in AUTO_ARGS:
return False
# In certain decorator cases it seems like we get the function to be
# decorated as an argument, we don't want to take that as a real argument.
if not isinstance(arg, basestring):
return False
return True
class FunctorTask(base.Task):
"""Adaptor to make task from a callable
Take any callable and make a task from it.
"""
@staticmethod
def _get_callable_name(execute_with):
"""Generate a name from callable"""
im_class = getattr(execute_with, 'im_class', None)
if im_class is not None:
parts = (im_class.__module__, im_class.__name__,
execute_with.__name__)
else:
parts = (execute_with.__module__, execute_with.__name__)
return '.'.join(parts)
def __init__(self, execute_with, **kwargs):
"""Initialize FunctorTask instance with given callable and kwargs
:param execute_with: the callable
:param kwargs: reserved keywords (all optional) are
name: name of the task, default None (auto generate)
task_id: id of the task, default None (auto generate)
revert_with: the callable to revert, default None
version: version of the task, default Task's version 1.0
optionals: optionals of the task, default ()
provides: provides of the task, default ()
requires: requires of the task, default ()
auto_extract: auto extract execute_with's args and put it into
requires, default True
"""
name = kwargs.pop('name', None)
task_id = kwargs.pop('task_id', None)
if name is None:
name = self._get_callable_name(execute_with)
super(FunctorTask, self).__init__(name, task_id)
self._execute_with = execute_with
self._revert_with = kwargs.pop('revert_with', None)
self.version = kwargs.pop('version', self.version)
self.optional.update(kwargs.pop('optional', ()))
self.provides.update(kwargs.pop('provides', ()))
self.requires.update(kwargs.pop('requires', ()))
if kwargs.pop('auto_extract', True):
f_args = inspect.getargspec(execute_with).args
self.requires.update([arg for arg in f_args if _filter_arg(arg)])
if kwargs:
raise TypeError('__init__() got an unexpected keyword argument %r'
% kwargs.keys[0])
def __call__(self, *args, **kwargs):
return self._execute_with(*args, **kwargs)
def revert(self, *args, **kwargs):
if self._revert_with:
return self._revert_with(*args, **kwargs)
else:
return None

View File

@ -1,80 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from billingstack.taskflow import exceptions as exc
LOG = logging.getLogger(__name__)
def connect(graph, infer_key='infer', auto_reason='auto', discard_func=None):
"""Connects a graphs runners to other runners in the graph which provide
outputs for each runners requirements.
"""
if len(graph) == 0:
return
if discard_func:
for (u, v, e_data) in graph.edges(data=True):
if discard_func(u, v, e_data):
graph.remove_edge(u, v)
for (r, r_data) in graph.nodes_iter(data=True):
requires = set(r.requires)
# Find the ones that have already been attached manually.
manual_providers = {}
if requires:
incoming = [e[0] for e in graph.in_edges_iter([r])]
for r2 in incoming:
fulfills = requires & r2.provides
if fulfills:
LOG.debug("%s is a manual provider of %s for %s",
r2, fulfills, r)
for k in fulfills:
manual_providers[k] = r2
requires.remove(k)
# Anything leftover that we must find providers for??
auto_providers = {}
if requires and r_data.get(infer_key):
for r2 in graph.nodes_iter():
if r is r2:
continue
fulfills = requires & r2.provides
if fulfills:
graph.add_edge(r2, r, reason=auto_reason)
LOG.debug("Connecting %s as a automatic provider for"
" %s for %s", r2, fulfills, r)
for k in fulfills:
auto_providers[k] = r2
requires.remove(k)
if not requires:
break
# Anything still leftover??
if requires:
# Ensure its in string format, since join will puke on
# things that are not strings.
missing = ", ".join(sorted([str(s) for s in requires]))
raise exc.MissingDependencies(r, missing)
else:
r.providers = {}
r.providers.update(auto_providers)
r.providers.update(manual_providers)

View File

@ -1 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4

View File

@ -1,286 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import functools
import logging
import threading
from billingstack.openstack.common import excutils
from billingstack.taskflow import decorators
from billingstack.taskflow import exceptions as exc
from billingstack.taskflow import states
from billingstack.taskflow import utils
from billingstack.taskflow import flow
LOG = logging.getLogger(__name__)
class Flow(flow.Flow):
""""A linear chain of tasks that can be applied in order as one unit and
rolled back as one unit using the reverse order that the tasks have
been applied in.
Note(harlowja): Each task in the chain must have requirements
which are satisfied by the previous task/s in the chain.
"""
def __init__(self, name, parents=None, uuid=None):
super(Flow, self).__init__(name, parents, uuid)
# The tasks which have been applied will be collected here so that they
# can be reverted in the correct order on failure.
self._accumulator = utils.RollbackAccumulator()
# Tasks results are stored here. Lookup is by the uuid that was
# returned from the add function.
self.results = {}
# The previously left off iterator that can be used to resume from
# the last task (if interrupted and soft-reset).
self._leftoff_at = None
# All runners to run are collected here.
self._runners = []
self._connected = False
self._lock = threading.RLock()
# The resumption strategy to use.
self.resumer = None
@decorators.locked
def add(self, task):
"""Adds a given task to this flow."""
assert isinstance(task, collections.Callable)
r = utils.AOTRunner(task)
r.runs_before = list(reversed(self._runners))
self._runners.append(r)
self._reset_internals()
return r.uuid
def _reset_internals(self):
self._connected = False
self._leftoff_at = None
def _associate_providers(self, runner):
# Ensure that some previous task provides this input.
who_provides = {}
task_requires = runner.requires
for r in task_requires:
provider = None
for before_me in runner.runs_before:
if r in before_me.provides:
provider = before_me
break
if provider:
who_provides[r] = provider
# Ensure that the last task provides all the needed input for this
# task to run correctly.
missing_requires = task_requires - set(who_provides.keys())
if missing_requires:
raise exc.MissingDependencies(runner, sorted(missing_requires))
runner.providers.update(who_provides)
def __str__(self):
lines = ["LinearFlow: %s" % (self.name)]
lines.append("%s" % (self.uuid))
lines.append("%s" % (len(self._runners)))
lines.append("%s" % (len(self.parents)))
lines.append("%s" % (self.state))
return "; ".join(lines)
@decorators.locked
def remove(self, uuid):
index_removed = -1
for (i, r) in enumerate(self._runners):
if r.uuid == uuid:
index_removed = i
break
if index_removed == -1:
raise ValueError("No runner found with uuid %s" % (uuid))
else:
removed = self._runners.pop(index_removed)
self._reset_internals()
# Go and remove it from any runner after the removed runner since
# those runners may have had an attachment to it.
for r in self._runners[index_removed:]:
try:
r.runs_before.remove(removed)
except (IndexError, ValueError):
pass
def __len__(self):
return len(self._runners)
def _connect(self):
if self._connected:
return self._runners
for r in self._runners:
r.providers = {}
for r in reversed(self._runners):
self._associate_providers(r)
self._connected = True
return self._runners
def _ordering(self):
return iter(self._connect())
@decorators.locked
def run(self, context, *args, **kwargs):
def abort_if(current_state, ok_states):
if current_state not in ok_states:
return False
return True
def resume_it():
if self._leftoff_at is not None:
return ([], self._leftoff_at)
if self.resumer:
(finished, leftover) = self.resumer(self, self._ordering())
else:
finished = []
leftover = self._ordering()
return (finished, leftover)
start_check_functor = functools.partial(abort_if,
ok_states=self.RUNNABLE_STATES)
if not self._change_state(context, states.STARTED,
check_func=start_check_functor):
return
try:
those_finished, leftover = resume_it()
except Exception:
with excutils.save_and_reraise_exception():
self._change_state(context, states.FAILURE)
def run_it(runner, failed=False, result=None, simulate_run=False):
try:
# Add the task to be rolled back *immediately* so that even if
# the task fails while producing results it will be given a
# chance to rollback.
rb = utils.RollbackTask(context, runner.task, result=None)
self._accumulator.add(rb)
self.task_notifier.notify(states.STARTED, details={
'context': context,
'flow': self,
'runner': runner,
})
if not simulate_run:
result = runner(context, *args, **kwargs)
else:
if failed:
# TODO(harlowja): make this configurable??
# If we previously failed, we want to fail again at
# the same place.
if not result:
# If no exception or exception message was provided
# or captured from the previous run then we need to
# form one for this task.
result = "%s failed running." % (runner.task)
if isinstance(result, basestring):
result = exc.InvalidStateException(result)
if not isinstance(result, Exception):
LOG.warn("Can not raise a non-exception"
" object: %s", result)
result = exc.InvalidStateException()
raise result
# Adjust the task result in the accumulator before
# notifying others that the task has finished to
# avoid the case where a listener might throw an
# exception.
rb.result = result
runner.result = result
self.results[runner.uuid] = result
self.task_notifier.notify(states.SUCCESS, details={
'context': context,
'flow': self,
'runner': runner,
})
except Exception as e:
runner.result = e
cause = utils.FlowFailure(runner, self, e)
with excutils.save_and_reraise_exception():
# Notify any listeners that the task has errored.
self.task_notifier.notify(states.FAILURE, details={
'context': context,
'flow': self,
'runner': runner,
})
self.rollback(context, cause)
run_check_functor = functools.partial(abort_if,
ok_states=[states.STARTED,
states.RESUMING])
if len(those_finished):
if not self._change_state(context, states.RESUMING,
check_func=run_check_functor):
return
for (r, details) in those_finished:
# Fake running the task so that we trigger the same
# notifications and state changes (and rollback that
# would have happened in a normal flow).
failed = states.FAILURE in details.get('states', [])
result = details.get('result')
run_it(r, failed=failed, result=result, simulate_run=True)
self._leftoff_at = leftover
if not self._change_state(context, states.RUNNING,
check_func=run_check_functor):
return
was_interrupted = False
for r in leftover:
r.reset()
run_it(r)
if self.state == states.INTERRUPTED:
was_interrupted = True
break
if not was_interrupted:
# Only gets here if everything went successfully.
self._change_state(context, states.SUCCESS)
self._leftoff_at = None
@decorators.locked
def reset(self):
super(Flow, self).reset()
self.results = {}
self.resumer = None
self._accumulator.reset()
self._reset_internals()
@decorators.locked
def rollback(self, context, cause):
# Performs basic task by task rollback by going through the reverse
# order that tasks have finished and asking said task to undo whatever
# it has done. If this flow has any parent flows then they will
# also be called to rollback any tasks said parents contain.
#
# Note(harlowja): if a flow can more simply revert a whole set of
# tasks via a simpler command then it can override this method to
# accomplish that.
#
# For example, if each task was creating a file in a directory, then
# it's easier to just remove the directory than to ask each task to
# delete its file individually.
self._change_state(context, states.REVERTING)
try:
self._accumulator.rollback(cause)
finally:
self._change_state(context, states.FAILURE)
# Rollback any parents flows if they exist...
for p in self.parents:
p.rollback(context, cause)

View File

@ -1,636 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from billingstack.taskflow import exceptions as exc
from billingstack.taskflow import flow
from billingstack.taskflow import graph_utils
from billingstack.taskflow import states
from billingstack.taskflow import utils
import collections
import functools
import logging
import sys
import threading
import weakref
from networkx.algorithms import cycles
from networkx.classes import digraph
LOG = logging.getLogger(__name__)
class DependencyTimeout(exc.InvalidStateException):
"""When running in parallel a task has the ability to timeout waiting for
its dependent tasks to finish, this will be raised when that occurs.
"""
pass
class Flow(flow.Flow):
"""This flow pattern establishes tasks into a graph where each task is a
node in the graph and dependencies between tasks are edges in the graph.
When running (in parallel) each task will only be activated when its
dependencies have been satisified. When a graph is split into two or more
segments, both of those segments will be ran in parallel.
For example lets take this small little *somewhat complicated* graph:
X--Y--C--D
| |
A--B-- --G--
| | |--Z(end)
E--F-- --H--
In this flow the following will be ran in parallel at start:
1. X--Y
2. A--B
3. E--F
Note the C--D nodes will not be able to run until [Y,B,F] has completed.
After C--D completes the following will be ran in parallel:
1. G
2. H
Then finally Z will run (after [G,H] complete) and the flow will then have
finished executing.
"""
MUTABLE_STATES = set([states.PENDING, states.FAILURE, states.SUCCESS])
REVERTABLE_STATES = set([states.FAILURE, states.INCOMPLETE])
CANCELLABLE_STATES = set([states.PENDING, states.RUNNING])
def __init__(self, name):
super(Flow, self).__init__(name)
self._graph = digraph.DiGraph(name=name)
self._run_lock = threading.RLock()
self._cancel_lock = threading.RLock()
self._mutate_lock = threading.RLock()
# NOTE(harlowja) The locking order in this list actually matters since
# we need to make sure that users of this list do not get deadlocked
# by out of order lock access.
self._core_locks = [
self._run_lock,
self._mutate_lock,
self._cancel_lock,
]
self._run_locks = [
self._run_lock,
self._mutate_lock,
]
self._cancel_locks = [
self._cancel_lock,
]
self.results = {}
self.resumer = None
def __str__(self):
lines = ["ParallelFlow: %s" % (self.name)]
lines.append("%s" % (self._graph.number_of_nodes()))
lines.append("%s" % (self.state))
return "; ".join(lines)
def soft_reset(self):
# The way this flow works does not allow (at the current moment) for
# you to suspend the threads and then resume them at a later time,
# instead it only supports interruption (which will cancel the threads)
# and then a full reset.
raise NotImplementedError("Threaded flow does not currently support"
" soft resetting, please try using"
" reset() instead")
def interrupt(self):
"""Currently we can not pause threads and then resume them later, not
really thinking that we should likely ever do this.
"""
raise NotImplementedError("Threaded flow does not currently support"
" interruption, please try using"
" cancel() instead")
def reset(self):
# All locks are used so that resets can not happen while running or
# cancelling or modifying.
with utils.MultiLock(self._core_locks):
super(Flow, self).reset()
self.results = {}
self.resumer = None
def cancel(self):
def check():
if self.state not in self.CANCELLABLE_STATES:
raise exc.InvalidStateException("Can not attempt cancellation"
" when in state %s" %
self.state)
check()
cancelled = 0
was_empty = False
# We don't lock the other locks so that the flow can be cancelled while
# running. Further state management logic is then used while running
# to verify that the flow should still be running when it has been
# cancelled.
with utils.MultiLock(self._cancel_locks):
check()
if len(self._graph) == 0:
was_empty = True
else:
for r in self._graph.nodes_iter():
try:
if r.cancel(blocking=False):
cancelled += 1
except exc.InvalidStateException:
pass
if cancelled or was_empty:
self._change_state(None, states.CANCELLED)
return cancelled
def _find_uuid(self, uuid):
# Finds the runner for the given uuid (or returns none)
for r in self._graph.nodes_iter():
if r.uuid == uuid:
return r
return None
def add(self, task, timeout=None, infer=True):
"""Adds a task to the given flow using the given timeout which will be
used a the timeout to wait for dependencies (if any) to be
fulfilled.
"""
def check():
if self.state not in self.MUTABLE_STATES:
raise exc.InvalidStateException("Flow is currently in a"
" non-mutable %s state" %
(self.state))
# Ensure that we do a quick check to see if we can even perform this
# addition before we go about actually acquiring the lock to perform
# the actual addition.
check()
# All locks must be acquired so that modifications can not be made
# while running, cancelling or performing a simultaneous mutation.
with utils.MultiLock(self._core_locks):
check()
runner = ThreadRunner(task, self, timeout)
self._graph.add_node(runner, infer=infer)
return runner.uuid
def _connect(self):
"""Infers and connects the edges of the given tasks by examining the
associated tasks provides and requires attributes and connecting tasks
that require items to tasks that produce said items.
"""
# Disconnect all edges not manually created before we attempt to infer
# them so that we don't retain edges that are invalid.
def disconnect_non_user(u, v, e_data):
if e_data and e_data.get('reason') != 'manual':
return True
return False
# Link providers to requirers.
graph_utils.connect(self._graph,
discard_func=disconnect_non_user)
# Connect the successors & predecessors and related siblings
for r in self._graph.nodes_iter():
r._predecessors = []
r._successors = []
for (r2, _me) in self._graph.in_edges_iter([r]):
r._predecessors.append(r2)
for (_me, r2) in self._graph.out_edges_iter([r]):
r._successors.append(r2)
r.siblings = []
for r2 in self._graph.nodes_iter():
if r2 is r or r2 in r._predecessors or r2 in r._successors:
continue
r._siblings.append(r2)
def add_many(self, tasks):
"""Adds a list of tasks to the flow."""
def check():
if self.state not in self.MUTABLE_STATES:
raise exc.InvalidStateException("Flow is currently in a"
" non-mutable state %s"
% (self.state))
# Ensure that we do a quick check to see if we can even perform this
# addition before we go about actually acquiring the lock.
check()
# All locks must be acquired so that modifications can not be made
# while running, cancelling or performing a simultaneous mutation.
with utils.MultiLock(self._core_locks):
check()
added = []
for t in tasks:
added.append(self.add(t))
return added
def add_dependency(self, provider_uuid, consumer_uuid):
"""Manually adds a dependency between a provider and a consumer."""
def check_and_fetch():
if self.state not in self.MUTABLE_STATES:
raise exc.InvalidStateException("Flow is currently in a"
" non-mutable state %s"
% (self.state))
provider = self._find_uuid(provider_uuid)
if not provider or not self._graph.has_node(provider):
raise exc.InvalidStateException("Can not add a dependency "
"from unknown uuid %s" %
(provider_uuid))
consumer = self._find_uuid(consumer_uuid)
if not consumer or not self._graph.has_node(consumer):
raise exc.InvalidStateException("Can not add a dependency "
"to unknown uuid %s"
% (consumer_uuid))
if provider is consumer:
raise exc.InvalidStateException("Can not add a dependency "
"to loop via uuid %s"
% (consumer_uuid))
return (provider, consumer)
check_and_fetch()
# All locks must be acquired so that modifications can not be made
# while running, cancelling or performing a simultaneous mutation.
with utils.MultiLock(self._core_locks):
(provider, consumer) = check_and_fetch()
self._graph.add_edge(provider, consumer, reason='manual')
LOG.debug("Connecting %s as a manual provider for %s",
provider, consumer)
def run(self, context, *args, **kwargs):
"""Executes the given flow using the given context and args/kwargs."""
def abort_if(current_state, ok_states):
if current_state in (states.CANCELLED,):
return False
if current_state not in ok_states:
return False
return True
def check():
if self.state not in self.RUNNABLE_STATES:
raise exc.InvalidStateException("Flow is currently unable "
"to be ran in state %s"
% (self.state))
def connect_and_verify():
"""Do basic sanity tests on the graph structure."""
if len(self._graph) == 0:
return
self._connect()
degrees = [g[1] for g in self._graph.in_degree_iter()]
zero_degrees = [d for d in degrees if d == 0]
if not zero_degrees:
# If every task depends on something else to produce its input
# then we will be in a deadlock situation.
raise exc.InvalidStateException("No task has an in-degree"
" of zero")
self_loops = self._graph.nodes_with_selfloops()
if self_loops:
# A task that has a dependency on itself will never be able
# to run.
raise exc.InvalidStateException("%s tasks have been detected"
" with dependencies on"
" themselves" %
len(self_loops))
simple_cycles = len(cycles.recursive_simple_cycles(self._graph))
if simple_cycles:
# A task loop will never be able to run, unless it somehow
# breaks that loop.
raise exc.InvalidStateException("%s tasks have been detected"
" with dependency loops" %
simple_cycles)
def run_it(result_cb, args, kwargs):
check_runnable = functools.partial(abort_if,
ok_states=self.RUNNABLE_STATES)
if self._change_state(context, states.RUNNING,
check_func=check_runnable):
self.results = {}
if len(self._graph) == 0:
return
for r in self._graph.nodes_iter():
r.reset()
r._result_cb = result_cb
executor = utils.ThreadGroupExecutor()
for r in self._graph.nodes_iter():
executor.submit(r, *args, **kwargs)
executor.await_termination()
def trigger_rollback(failures):
if not failures:
return
causes = []
for r in failures:
causes.append(utils.FlowFailure(r, self,
r.exc, r.exc_info))
try:
self.rollback(context, causes)
except exc.InvalidStateException:
pass
finally:
# TODO(harlowja): re-raise a combined exception when
# there are more than one failures??
for f in failures:
if all(f.exc_info):
raise f.exc_info[0], f.exc_info[1], f.exc_info[2]
def handle_results():
# Isolate each runner state into groups so that we can easily tell
# which ones failed, cancelled, completed...
groups = collections.defaultdict(list)
for r in self._graph.nodes_iter():
groups[r.state].append(r)
for r in self._graph.nodes_iter():
if r not in groups.get(states.FAILURE, []) and r.has_ran():
self.results[r.uuid] = r.result
if groups[states.FAILURE]:
self._change_state(context, states.FAILURE)
trigger_rollback(groups[states.FAILURE])
elif (groups[states.CANCELLED] or groups[states.PENDING]
or groups[states.TIMED_OUT] or groups[states.STARTED]):
self._change_state(context, states.INCOMPLETE)
else:
check_ran = functools.partial(abort_if,
ok_states=[states.RUNNING])
self._change_state(context, states.SUCCESS,
check_func=check_ran)
def get_resumer_cb():
if not self.resumer:
return None
(ran, _others) = self.resumer(self, self._graph.nodes_iter())
def fetch_results(runner):
for (r, metadata) in ran:
if r is runner:
return (True, metadata.get('result'))
return (False, None)
result_cb = fetch_results
return result_cb
args = [context] + list(args)
check()
# Only acquire the run lock (but use further state checking) and the
# mutation lock to stop simultaneous running and simultaneous mutating
# which are not allowed on a running flow. Allow simultaneous cancel
# by performing repeated state checking while running.
with utils.MultiLock(self._run_locks):
check()
connect_and_verify()
try:
run_it(get_resumer_cb(), args, kwargs)
finally:
handle_results()
def rollback(self, context, cause):
"""Rolls back all tasks that are *not* still pending or cancelled."""
def check():
if self.state not in self.REVERTABLE_STATES:
raise exc.InvalidStateException("Flow is currently unable "
"to be rolled back in "
"state %s" % (self.state))
check()
# All locks must be acquired so that modifications can not be made
# while another entity is running, rolling-back, cancelling or
# performing a mutation operation.
with utils.MultiLock(self._core_locks):
check()
accum = utils.RollbackAccumulator()
for r in self._graph.nodes_iter():
if r.has_ran():
accum.add(utils.RollbackTask(context, r.task, r.result))
try:
self._change_state(context, states.REVERTING)
accum.rollback(cause)
finally:
self._change_state(context, states.FAILURE)
class ThreadRunner(utils.Runner):
"""A helper class that will use a countdown latch to avoid calling its
callable object until said countdown latch has emptied. After it has
been emptied the predecessor tasks will be examined for dependent results
and said results will then be provided to call the runners callable
object.
TODO(harlowja): this could be a 'future' like object in the future since it
is starting to have the same purpose and usage (in a way). Likely switch
this over to the task details object or a subclass of it???
"""
RESETTABLE_STATES = set([states.PENDING, states.SUCCESS, states.FAILURE,
states.CANCELLED])
RUNNABLE_STATES = set([states.PENDING])
CANCELABLE_STATES = set([states.PENDING])
SUCCESS_STATES = set([states.SUCCESS])
CANCEL_SUCCESSORS_WHEN = set([states.FAILURE, states.CANCELLED,
states.TIMED_OUT])
NO_RAN_STATES = set([states.CANCELLED, states.PENDING, states.TIMED_OUT,
states.RUNNING])
def __init__(self, task, flow, timeout):
super(ThreadRunner, self).__init__(task)
# Use weak references to give the GC a break.
self._flow = weakref.proxy(flow)
self._notifier = flow.task_notifier
self._timeout = timeout
self._state = states.PENDING
self._run_lock = threading.RLock()
# Use the flows state lock so that state notifications are not sent
# simultaneously for a given flow.
self._state_lock = flow._state_lock
self._cancel_lock = threading.RLock()
self._latch = utils.CountDownLatch()
# Any related family.
self._predecessors = []
self._successors = []
self._siblings = []
# Ensure we capture any exceptions that may have been triggered.
self.exc = None
self.exc_info = (None, None, None)
# This callback will be called before the underlying task is actually
# returned and it should either return a tuple of (has_result, result)
self._result_cb = None
@property
def state(self):
return self._state
def has_ran(self):
if self.state in self.NO_RAN_STATES:
return False
return True
def _change_state(self, context, new_state):
old_state = None
changed = False
with self._state_lock:
if self.state != new_state:
old_state = self.state
self._state = new_state
changed = True
# Don't notify while holding the lock so that the reciever of said
# notifications can actually perform operations on the given runner
# without getting into deadlock.
if changed and self._notifier:
self._notifier.notify(self.state, details={
'context': context,
'flow': self._flow,
'old_state': old_state,
'runner': self,
})
def cancel(self, blocking=True):
def check():
if self.state not in self.CANCELABLE_STATES:
raise exc.InvalidStateException("Runner not in a cancelable"
" state: %s" % (self.state))
# Check before as a quick way out of attempting to acquire the more
# heavy-weight lock. Then acquire the lock (which should not be
# possible if we are currently running) and set the state (if still
# applicable).
check()
acquired = False
cancelled = False
try:
acquired = self._cancel_lock.acquire(blocking=blocking)
if acquired:
check()
cancelled = True
self._change_state(None, states.CANCELLED)
finally:
if acquired:
self._cancel_lock.release()
return cancelled
def reset(self):
def check():
if self.state not in self.RESETTABLE_STATES:
raise exc.InvalidStateException("Runner not in a resettable"
" state: %s" % (self.state))
def do_reset():
self._latch.count = len(self._predecessors)
self.exc = None
self.exc_info = (None, None, None)
self.result = None
self._change_state(None, states.PENDING)
# We need to acquire both locks here so that we can not be running
# or being cancelled at the same time we are resetting.
check()
with self._run_lock:
check()
with self._cancel_lock:
check()
do_reset()
@property
def runs_before(self):
# NOTE(harlowja): this list may change, depending on which other
# runners have completed (or are currently actively running), so
# this is why this is a property instead of a semi-static defined list
# like in the AOT class. The list should only get bigger and not
# smaller so it should be fine to filter on runners that have completed
# successfully.
finished_ok = []
for r in self._siblings:
if r.has_ran() and r.state in self.SUCCESS_STATES:
finished_ok.append(r)
return finished_ok
def __call__(self, context, *args, **kwargs):
def is_runnable():
if self.state not in self.RUNNABLE_STATES:
return False
return True
def run(*args, **kwargs):
try:
self._change_state(context, states.RUNNING)
has_result = False
if self._result_cb:
has_result, self.result = self._result_cb(self)
if not has_result:
super(ThreadRunner, self).__call__(*args, **kwargs)
self._change_state(context, states.SUCCESS)
except Exception as e:
self._change_state(context, states.FAILURE)
self.exc = e
self.exc_info = sys.exc_info()
def signal():
if not self._successors:
return
if self.state in self.CANCEL_SUCCESSORS_WHEN:
for r in self._successors:
try:
r.cancel(blocking=False)
except exc.InvalidStateException:
pass
for r in self._successors:
try:
r._latch.countDown()
except Exception:
LOG.exception("Failed decrementing %s latch", r)
# We check before to avoid attempting to acquire the lock when we are
# known to be in a non-runnable state.
if not is_runnable():
return
args = [context] + list(args)
with self._run_lock:
# We check after we now own the run lock since a previous thread
# could have exited and released that lock and set the state to
# not runnable.
if not is_runnable():
return
may_proceed = self._latch.await(self._timeout)
# We now acquire the cancel lock so that we can be assured that
# we have not been cancelled by another entity.
with self._cancel_lock:
try:
# If we have been cancelled after awaiting and timing out
# ensure that we alter the state to show timed out (but
# not if we have been cancelled, since our state should
# be cancelled instead). This is done after acquiring the
# cancel lock so that we will not try to overwrite another
# entity trying to set the runner to the cancel state.
if not may_proceed and self.state != states.CANCELLED:
self._change_state(context, states.TIMED_OUT)
# We at this point should only have been able to time out
# or be cancelled, no other state transitions should have
# been possible.
if self.state not in (states.CANCELLED, states.TIMED_OUT):
run(*args, **kwargs)
finally:
signal()

View File

@ -1,44 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Job states.
CLAIMED = 'CLAIMED'
FAILURE = 'FAILURE'
PENDING = 'PENDING'
RUNNING = 'RUNNING'
SUCCESS = 'SUCCESS'
UNCLAIMED = 'UNCLAIMED'
# Flow states.
FAILURE = FAILURE
INTERRUPTED = 'INTERRUPTED'
PENDING = 'PENDING'
RESUMING = 'RESUMING'
REVERTING = 'REVERTING'
RUNNING = RUNNING
STARTED = 'STARTED'
SUCCESS = SUCCESS
CANCELLED = 'CANCELLED'
INCOMPLETE = 'INCOMPLETE'
# Task states.
FAILURE = FAILURE
STARTED = STARTED
SUCCESS = SUCCESS
TIMED_OUT = 'TIMED_OUT'
CANCELLED = CANCELLED

View File

@ -1,77 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from billingstack.openstack.common import uuidutils
from billingstack.taskflow import utils
class Task(object):
"""An abstraction that defines a potential piece of work that can be
applied and can be reverted to undo the work as a single unit.
"""
__metaclass__ = abc.ABCMeta
def __init__(self, name, task_id=None):
if task_id:
self._uuid = task_id
else:
self._uuid = uuidutils.generate_uuid()
self._name = name
# An *immutable* input 'resource' name set this task depends
# on existing before this task can be applied.
self.requires = set()
# An *immutable* input 'resource' name set this task would like to
# depends on existing before this task can be applied (but does not
# strongly depend on existing).
self.optional = set()
# An *immutable* output 'resource' name set this task
# produces that other tasks may depend on this task providing.
self.provides = set()
# This identifies the version of the task to be ran which
# can be useful in resuming older versions of tasks. Standard
# major, minor version semantics apply.
self.version = (1, 0)
@property
def uuid(self):
return self._uuid
@property
def name(self):
return self._name
def __str__(self):
return "%s==%s" % (self.name, utils.get_task_version(self))
@abc.abstractmethod
def __call__(self, context, *args, **kwargs):
"""Activate a given task which will perform some operation and return.
This method can be used to apply some given context and given set
of args and kwargs to accomplish some goal. Note that the result
that is returned needs to be serializable so that it can be passed
back into this task if reverting is triggered.
"""
def revert(self, context, result, cause):
"""Revert this task using the given context, result that the apply
provided as well as any information which may have caused
said reversion.
"""

View File

@ -1,532 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import contextlib
import copy
import logging
import re
import sys
import threading
import threading2
import time
from billingstack.openstack.common import uuidutils
TASK_FACTORY_ATTRIBUTE = '_TaskFlow_task_factory'
LOG = logging.getLogger(__name__)
def await(check_functor, timeout=None):
if timeout is not None:
end_time = time.time() + max(0, timeout)
else:
end_time = None
# Use the same/similar scheme that the python condition class uses.
delay = 0.0005
while not check_functor():
time.sleep(delay)
if end_time is not None:
remaining = end_time - time.time()
if remaining <= 0:
return False
delay = min(delay * 2, remaining, 0.05)
else:
delay = min(delay * 2, 0.05)
return True
def get_task_version(task):
"""Gets a tasks *string* version, whether it is a task object/function."""
task_version = getattr(task, 'version')
if isinstance(task_version, (list, tuple)):
task_version = '.'.join(str(item) for item in task_version)
if task_version is not None and not isinstance(task_version, basestring):
task_version = str(task_version)
return task_version
def is_version_compatible(version_1, version_2):
"""Checks for major version compatibility of two *string" versions."""
if version_1 == version_2:
# Equivalent exactly, so skip the rest.
return True
def _convert_to_pieces(version):
try:
pieces = []
for p in version.split("."):
p = p.strip()
if not len(p):
pieces.append(0)
continue
# Clean off things like 1alpha, or 2b and just select the
# digit that starts that entry instead.
p_match = re.match(r"(\d+)([A-Za-z]*)(.*)", p)
if p_match:
p = p_match.group(1)
pieces.append(int(p))
except (AttributeError, TypeError, ValueError):
pieces = []
return pieces
version_1_pieces = _convert_to_pieces(version_1)
version_2_pieces = _convert_to_pieces(version_2)
if len(version_1_pieces) == 0 or len(version_2_pieces) == 0:
return False
# Ensure major version compatibility to start.
major1 = version_1_pieces[0]
major2 = version_2_pieces[0]
if major1 != major2:
return False
return True
class MultiLock(object):
"""A class which can attempt to obtain many locks at once and release
said locks when exiting.
Useful as a context manager around many locks (instead of having to nest
said individual context managers).
"""
def __init__(self, locks):
assert len(locks) > 0, "Zero locks requested"
self._locks = locks
self._locked = [False] * len(locks)
def __enter__(self):
def is_locked(lock):
# NOTE(harlowja): the threading2 lock doesn't seem to have this
# attribute, so thats why we are checking it existing first.
if hasattr(lock, 'locked'):
return lock.locked()
return False
for i in xrange(0, len(self._locked)):
if self._locked[i] or is_locked(self._locks[i]):
raise threading.ThreadError("Lock %s not previously released"
% (i + 1))
self._locked[i] = False
for (i, lock) in enumerate(self._locks):
self._locked[i] = lock.acquire()
def __exit__(self, type, value, traceback):
for (i, locked) in enumerate(self._locked):
try:
if locked:
self._locks[i].release()
self._locked[i] = False
except threading.ThreadError:
LOG.exception("Unable to release lock %s", i + 1)
class CountDownLatch(object):
"""Similar in concept to the java count down latch."""
def __init__(self, count=0):
self.count = count
self.lock = threading.Condition()
def countDown(self):
with self.lock:
self.count -= 1
if self.count <= 0:
self.lock.notifyAll()
def await(self, timeout=None):
end_time = None
if timeout is not None:
timeout = max(0, timeout)
end_time = time.time() + timeout
time_up = False
with self.lock:
while True:
# Stop waiting on these 2 conditions.
if time_up or self.count <= 0:
break
# Was this a spurious wakeup or did we really end??
self.lock.wait(timeout=timeout)
if end_time is not None:
if time.time() >= end_time:
time_up = True
else:
# Reduce the timeout so that we don't wait extra time
# over what we initially were requested to.
timeout = end_time - time.time()
return self.count <= 0
class LastFedIter(object):
"""An iterator which yields back the first item and then yields back
results from the provided iterator.
"""
def __init__(self, first, rest_itr):
self.first = first
self.rest_itr = rest_itr
def __iter__(self):
yield self.first
for i in self.rest_itr:
yield i
class ThreadGroupExecutor(object):
"""A simple thread executor that spins up new threads (or greenthreads) for
each task to be completed (no pool limit is enforced).
TODO(harlowja): Likely if we use the more advanced executors that come with
the concurrent.futures library we can just get rid of this.
"""
def __init__(self, daemonize=True):
self._threads = []
self._group = threading2.ThreadGroup()
self._daemonize = daemonize
def submit(self, fn, *args, **kwargs):
t = threading2.Thread(target=fn, group=self._group,
args=args, kwargs=kwargs)
t.daemon = self._daemonize
self._threads.append(t)
t.start()
def await_termination(self, timeout=None):
if not self._threads:
return
return self._group.join(timeout)
class FlowFailure(object):
"""When a task failure occurs the following object will be given to revert
and can be used to interrogate what caused the failure.
"""
def __init__(self, runner, flow, exc, exc_info=None):
self.runner = runner
self.flow = flow
self.exc = exc
if not exc_info:
self.exc_info = sys.exc_info()
else:
self.exc_info = exc_info
class RollbackTask(object):
"""A helper task that on being called will call the underlying callable
tasks revert method (if said method exists).
"""
def __init__(self, context, task, result):
self.task = task
self.result = result
self.context = context
def __str__(self):
return str(self.task)
def __call__(self, cause):
if ((hasattr(self.task, "revert") and
isinstance(self.task.revert, collections.Callable))):
self.task.revert(self.context, self.result, cause)
class Runner(object):
"""A helper class that wraps a task and can find the needed inputs for
the task to run, as well as providing a uuid and other useful functionality
for users of the task.
TODO(harlowja): replace with the task details object or a subclass of
that???
"""
def __init__(self, task, uuid=None):
assert isinstance(task, collections.Callable)
task_factory = getattr(task, TASK_FACTORY_ATTRIBUTE, None)
if task_factory:
self.task = task_factory(task)
else:
self.task = task
self.providers = {}
self.result = None
if not uuid:
self._id = uuidutils.generate_uuid()
else:
self._id = str(uuid)
@property
def uuid(self):
return str(self._id)
@property
def requires(self):
return self.task.requires
@property
def provides(self):
return self.task.provides
@property
def optional(self):
return self.task.optional
@property
def runs_before(self):
return []
@property
def version(self):
return get_task_version(self.task)
@property
def name(self):
return self.task.name
def reset(self):
self.result = None
def __str__(self):
lines = ["Runner: %s" % (self.name)]
lines.append("%s" % (self.uuid))
lines.append("%s" % (self.version))
return "; ".join(lines)
def __call__(self, *args, **kwargs):
# Find all of our inputs first.
kwargs = dict(kwargs)
for (k, who_made) in self.providers.iteritems():
if k in kwargs:
continue
try:
kwargs[k] = who_made.result[k]
except (TypeError, KeyError):
pass
optional_keys = self.optional
optional_keys = optional_keys - set(kwargs.keys())
for k in optional_keys:
for who_ran in self.runs_before:
matched = False
if k in who_ran.provides:
try:
kwargs[k] = who_ran.result[k]
matched = True
except (TypeError, KeyError):
pass
if matched:
break
# Ensure all required keys are either existent or set to none.
for k in self.requires:
if k not in kwargs:
kwargs[k] = None
# And now finally run.
self.result = self.task(*args, **kwargs)
return self.result
class AOTRunner(Runner):
"""A runner that knows who runs before this runner ahead of time from a
known list of previous runners.
"""
def __init__(self, task):
super(AOTRunner, self).__init__(task)
self._runs_before = []
@property
def runs_before(self):
return self._runs_before
@runs_before.setter
def runs_before(self, runs_before):
self._runs_before = list(runs_before)
class TransitionNotifier(object):
"""A utility helper class that can be used to subscribe to
notifications of events occuring as well as allow a entity to post said
notifications to subscribers.
"""
RESERVED_KEYS = ('details',)
ANY = '*'
def __init__(self):
self._listeners = collections.defaultdict(list)
def reset(self):
self._listeners = collections.defaultdict(list)
def notify(self, state, details):
listeners = list(self._listeners.get(self.ANY, []))
for i in self._listeners[state]:
if i not in listeners:
listeners.append(i)
if not listeners:
return
for (callback, args, kwargs) in listeners:
if args is None:
args = []
if kwargs is None:
kwargs = {}
kwargs['details'] = details
try:
callback(state, *args, **kwargs)
except Exception:
LOG.exception(("Failure calling callback %s to notify about"
" state transition %s"), callback, state)
def register(self, state, callback, args=None, kwargs=None):
assert isinstance(callback, collections.Callable)
for i, (cb, args, kwargs) in enumerate(self._listeners.get(state, [])):
if cb is callback:
raise ValueError("Callback %s already registered" % (callback))
if kwargs:
for k in self.RESERVED_KEYS:
if k in kwargs:
raise KeyError(("Reserved key '%s' not allowed in "
"kwargs") % k)
kwargs = copy.copy(kwargs)
if args:
args = copy.copy(args)
self._listeners[state].append((callback, args, kwargs))
def deregister(self, state, callback):
if state not in self._listeners:
return
for i, (cb, args, kwargs) in enumerate(self._listeners[state]):
if cb is callback:
self._listeners[state].pop(i)
break
class RollbackAccumulator(object):
"""A utility class that can help in organizing 'undo' like code
so that said code be rolled back on failure (automatically or manually)
by activating rollback callables that were inserted during said codes
progression.
"""
def __init__(self):
self._rollbacks = []
def add(self, *callables):
self._rollbacks.extend(callables)
def reset(self):
self._rollbacks = []
def __len__(self):
return len(self._rollbacks)
def __enter__(self):
return self
def rollback(self, cause):
LOG.warn("Activating %s rollbacks due to %s.", len(self), cause)
for (i, f) in enumerate(reversed(self._rollbacks)):
LOG.debug("Calling rollback %s: %s", i + 1, f)
try:
f(cause)
except Exception:
LOG.exception(("Failed rolling back %s: %s due "
"to inner exception."), i + 1, f)
def __exit__(self, type, value, tb):
if any((value, type, tb)):
self.rollback(value)
class ReaderWriterLock(object):
"""A simple reader-writer lock.
Several readers can hold the lock simultaneously, and only one writer.
Write locks have priority over reads to prevent write starvation.
Public domain @ http://majid.info/blog/a-reader-writer-lock-for-python/
"""
def __init__(self):
self.rwlock = 0
self.writers_waiting = 0
self.monitor = threading.Lock()
self.readers_ok = threading.Condition(self.monitor)
self.writers_ok = threading.Condition(self.monitor)
@contextlib.contextmanager
def acquire(self, read=True):
"""Acquire a read or write lock in a context manager."""
try:
if read:
self.acquire_read()
else:
self.acquire_write()
yield self
finally:
self.release()
def acquire_read(self):
"""Acquire a read lock.
Several threads can hold this typeof lock.
It is exclusive with write locks.
"""
self.monitor.acquire()
while self.rwlock < 0 or self.writers_waiting:
self.readers_ok.wait()
self.rwlock += 1
self.monitor.release()
def acquire_write(self):
"""Acquire a write lock.
Only one thread can hold this lock, and only when no read locks
are also held.
"""
self.monitor.acquire()
while self.rwlock != 0:
self.writers_waiting += 1
self.writers_ok.wait()
self.writers_waiting -= 1
self.rwlock = -1
self.monitor.release()
def release(self):
"""Release a lock, whether read or write."""
self.monitor.acquire()
if self.rwlock < 0:
self.rwlock = 0
else:
self.rwlock -= 1
wake_writers = self.writers_waiting and self.rwlock == 0
wake_readers = self.writers_waiting == 0
self.monitor.release()
if wake_writers:
self.writers_ok.acquire()
self.writers_ok.notify()
self.writers_ok.release()
elif wake_readers:
self.readers_ok.acquire()
self.readers_ok.notifyAll()
self.readers_ok.release()

View File

@ -13,15 +13,17 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import task
from billingstack.openstack.common import log
from billingstack.openstack.common.gettextutils import _
from billingstack.taskflow import task
LOG = log.getLogger(__name__)
def _make_task_name(cls, prefix="default", addons=None):
def _make_task_name(cls, prefix=None, addons=None):
prefix = prefix or 'default'
components = [cls.__module__, cls.__name__]
if addons:
for a in addons:
@ -58,28 +60,7 @@ def _attach_debug_listeners(flow):
class RootTask(task.Task):
def __init__(self, name=None, **kw):
name = name or _make_task_name(self.__class__, **kw)
super(RootTask, self).__init__(name)
class ValuesInjectTask(RootTask):
"""
This injects a dict into the flow.
This injection is done so that the keys (and values) provided can be
dependended on by tasks further down the line. Since taskflow is dependency
based this can be considered the bootstrapping task that provides an
initial set of values for other tasks to get started with. If this did not
exist then tasks would fail locating there dependent tasks and the values
said dependent tasks produce.
Reversion strategy: N/A
"""
def __init__(self, values, **kw):
super(ValuesInjectTask, self).__init__(**kw)
self.provides.update(values.keys())
self._values = values
def __call__(self, context):
return dict(self._values)
def __init__(self, name=None, prefix=None, addons=None, **kw):
name = name or _make_task_name(self.__class__, prefix=prefix,
addons=addons)
super(RootTask, self).__init__(name, **kw)

View File

@ -39,7 +39,7 @@ Common Steps
::
$ git clone https://github.com/billingstack/billingstack.git
$ git clone https://github.com/stackforge/billingstack.git
$ cd billingstack
3. Setup virtualenv and Install BillingStack and it's dependencies
@ -131,4 +131,4 @@ Installing the API
...
2013-06-09 03:52:31 INFO [eventlet.wsgi] (2223) wsgi starting up on http://0.0.0.0:9091/
2013-06-09 03:52:31 INFO [eventlet.wsgi] (2223) wsgi starting up on http://0.0.0.0:9091/

28
requirements.txt Normal file
View File

@ -0,0 +1,28 @@
Babel>=1.3
pbr>=0.5.21,<1.0
# This file is managed by openstack-depends
argparse
cliff>=1.4.3
eventlet>=0.13.0
extras
pecan>=0.2.0
iso8601>=0.1.8
netaddr>=0.7.6
oslo.config>=1.2.0
Paste
PasteDeploy>=1.5.0
Routes>=1.12.3
stevedore>=0.10
WebOb>=1.2.3,<1.3
WSME>=0.5b6
# Optional Stuff that is used by default
alembic>=0.4.1
SQLAlchemy>=0.7.8,<=0.7.99
kombu>=2.4.8
# Identity
python-memcached>=1.48
passlib
pycountry
taskflow

View File

@ -18,5 +18,5 @@
import setuptools
setuptools.setup(
setup_requires=['pbr>=0.5.21,<1.0'],
setup_requires=['pbr'],
pbr=True)

View File

@ -1,7 +0,0 @@
[DEFAULT]
# The list of primitives to copy from taskflow
primitives=flow.threaded_flow,flow.linear_flow,task
# The base module to hold the copy of taskflow
base=billingstack

View File

@ -1,333 +0,0 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013, Nebula, Inc.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Colorizer Code is borrowed from Twisted:
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Display a subunit stream through a colorized unittest test runner."""
import heapq
import subunit
import sys
import unittest
import testtools
class _AnsiColorizer(object):
"""
A colorizer is an object that loosely wraps around a stream, allowing
callers to write text to the stream in a particular color.
Colorizer classes must implement C{supported()} and C{write(text, color)}.
"""
_colors = dict(black=30, red=31, green=32, yellow=33,
blue=34, magenta=35, cyan=36, white=37)
def __init__(self, stream):
self.stream = stream
def supported(cls, stream=sys.stdout):
"""
A class method that returns True if the current platform supports
coloring terminal output using this method. Returns False otherwise.
"""
if not stream.isatty():
return False # auto color only on TTYs
try:
import curses
except ImportError:
return False
else:
try:
try:
return curses.tigetnum("colors") > 2
except curses.error:
curses.setupterm()
return curses.tigetnum("colors") > 2
except Exception:
# guess false in case of error
return False
supported = classmethod(supported)
def write(self, text, color):
"""
Write the given text to the stream in the given color.
@param text: Text to be written to the stream.
@param color: A string label for a color. e.g. 'red', 'white'.
"""
color = self._colors[color]
self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
class _Win32Colorizer(object):
"""
See _AnsiColorizer docstring.
"""
def __init__(self, stream):
import win32console
red, green, blue, bold = (win32console.FOREGROUND_RED,
win32console.FOREGROUND_GREEN,
win32console.FOREGROUND_BLUE,
win32console.FOREGROUND_INTENSITY)
self.stream = stream
self.screenBuffer = win32console.GetStdHandle(
win32console.STD_OUT_HANDLE)
self._colors = {
'normal': red | green | blue,
'red': red | bold,
'green': green | bold,
'blue': blue | bold,
'yellow': red | green | bold,
'magenta': red | blue | bold,
'cyan': green | blue | bold,
'white': red | green | blue | bold
}
def supported(cls, stream=sys.stdout):
try:
import win32console
screenBuffer = win32console.GetStdHandle(
win32console.STD_OUT_HANDLE)
except ImportError:
return False
import pywintypes
try:
screenBuffer.SetConsoleTextAttribute(
win32console.FOREGROUND_RED |
win32console.FOREGROUND_GREEN |
win32console.FOREGROUND_BLUE)
except pywintypes.error:
return False
else:
return True
supported = classmethod(supported)
def write(self, text, color):
color = self._colors[color]
self.screenBuffer.SetConsoleTextAttribute(color)
self.stream.write(text)
self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
class _NullColorizer(object):
"""
See _AnsiColorizer docstring.
"""
def __init__(self, stream):
self.stream = stream
def supported(cls, stream=sys.stdout):
return True
supported = classmethod(supported)
def write(self, text, color):
self.stream.write(text)
def get_elapsed_time_color(elapsed_time):
if elapsed_time > 1.0:
return 'red'
elif elapsed_time > 0.25:
return 'yellow'
else:
return 'green'
class NovaTestResult(testtools.TestResult):
def __init__(self, stream, descriptions, verbosity):
super(NovaTestResult, self).__init__()
self.stream = stream
self.showAll = verbosity > 1
self.num_slow_tests = 10
self.slow_tests = [] # this is a fixed-sized heap
self.colorizer = None
# NOTE(vish): reset stdout for the terminal check
stdout = sys.stdout
sys.stdout = sys.__stdout__
for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
if colorizer.supported():
self.colorizer = colorizer(self.stream)
break
sys.stdout = stdout
self.start_time = None
self.last_time = {}
self.results = {}
self.last_written = None
def _writeElapsedTime(self, elapsed):
color = get_elapsed_time_color(elapsed)
self.colorizer.write(" %.2f" % elapsed, color)
def _addResult(self, test, *args):
try:
name = test.id()
except AttributeError:
name = 'Unknown.unknown'
test_class, test_name = name.rsplit('.', 1)
elapsed = (self._now() - self.start_time).total_seconds()
item = (elapsed, test_class, test_name)
if len(self.slow_tests) >= self.num_slow_tests:
heapq.heappushpop(self.slow_tests, item)
else:
heapq.heappush(self.slow_tests, item)
self.results.setdefault(test_class, [])
self.results[test_class].append((test_name, elapsed) + args)
self.last_time[test_class] = self._now()
self.writeTests()
def _writeResult(self, test_name, elapsed, long_result, color,
short_result, success):
if self.showAll:
self.stream.write(' %s' % str(test_name).ljust(66))
self.colorizer.write(long_result, color)
if success:
self._writeElapsedTime(elapsed)
self.stream.writeln()
else:
self.colorizer.write(short_result, color)
def addSuccess(self, test):
super(NovaTestResult, self).addSuccess(test)
self._addResult(test, 'OK', 'green', '.', True)
def addFailure(self, test, err):
super(NovaTestResult, self).addFailure(test, err)
self._addResult(test, 'FAIL', 'red', 'F', False)
def addError(self, test, err):
super(NovaTestResult, self).addFailure(test, err)
self._addResult(test, 'ERROR', 'red', 'E', False)
def addSkip(self, test, reason=None, details=None):
super(NovaTestResult, self).addSkip(test, reason, details)
self._addResult(test, 'SKIP', 'blue', 'S', True)
def startTest(self, test):
self.start_time = self._now()
super(NovaTestResult, self).startTest(test)
def writeTestCase(self, cls):
if not self.results.get(cls):
return
if cls != self.last_written:
self.colorizer.write(cls, 'white')
self.stream.writeln()
for result in self.results[cls]:
self._writeResult(*result)
del self.results[cls]
self.stream.flush()
self.last_written = cls
def writeTests(self):
time = self.last_time.get(self.last_written, self._now())
if not self.last_written or (self._now() - time).total_seconds() > 2.0:
diff = 3.0
while diff > 2.0:
classes = self.results.keys()
oldest = min(classes, key=lambda x: self.last_time[x])
diff = (self._now() - self.last_time[oldest]).total_seconds()
self.writeTestCase(oldest)
else:
self.writeTestCase(self.last_written)
def done(self):
self.stopTestRun()
def stopTestRun(self):
for cls in list(self.results.iterkeys()):
self.writeTestCase(cls)
self.stream.writeln()
self.writeSlowTests()
def writeSlowTests(self):
# Pare out 'fast' tests
slow_tests = [item for item in self.slow_tests
if get_elapsed_time_color(item[0]) != 'green']
if slow_tests:
slow_total_time = sum(item[0] for item in slow_tests)
slow = ("Slowest %i tests took %.2f secs:"
% (len(slow_tests), slow_total_time))
self.colorizer.write(slow, 'yellow')
self.stream.writeln()
last_cls = None
# sort by name
for elapsed, cls, name in sorted(slow_tests,
key=lambda x: x[1] + x[2]):
if cls != last_cls:
self.colorizer.write(cls, 'white')
self.stream.writeln()
last_cls = cls
self.stream.write(' %s' % str(name).ljust(68))
self._writeElapsedTime(elapsed)
self.stream.writeln()
def printErrors(self):
if self.showAll:
self.stream.writeln()
self.printErrorList('ERROR', self.errors)
self.printErrorList('FAIL', self.failures)
def printErrorList(self, flavor, errors):
for test, err in errors:
self.colorizer.write("=" * 70, 'red')
self.stream.writeln()
self.colorizer.write(flavor, 'red')
self.stream.writeln(": %s" % test.id())
self.colorizer.write("-" * 70, 'red')
self.stream.writeln()
self.stream.writeln("%s" % err)
test = subunit.ProtocolTestCase(sys.stdin, passthrough=None)
if sys.version_info[0:2] <= (2, 6):
runner = unittest.TextTestRunner(verbosity=2)
else:
runner = unittest.TextTestRunner(verbosity=2, resultclass=NovaTestResult)
if runner.run(test).wasSuccessful():
exit_code = 0
else:
exit_code = 1
sys.exit(exit_code)

View File

@ -1,76 +0,0 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Copyright 2010 OpenStack Foundation.
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Installation script for BillingStack's development virtualenv
"""
import os
import subprocess
import sys
import install_venv_common as install_venv
def print_help():
help = """
BillingStack development environment setup is complete.
BillingStack development uses virtualenv to track and manage Python dependencies
while in development and testing.
To activate the BillingStack virtualenv for the extent of your current shell
session you can run:
$ source .venv/bin/activate
Or, if you prefer, you can run commands in the virtualenv on a case by case
basis by running:
$ tools/with_venv.sh <your command>
Also, make test will automatically use the virtualenv.
"""
print help
def main(argv):
root = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
venv = os.path.join(root, '.venv')
pip_requires = os.path.join(root, 'tools', 'pip-requires')
pip_options = os.path.join(root, 'tools', 'pip-options')
test_requires = os.path.join(root, 'tools', 'test-requires')
py_version = "python%s.%s" % (sys.version_info[0], sys.version_info[1])
project = 'quantum'
install = install_venv.InstallVenv(root, venv, pip_requires, pip_options, test_requires,
py_version, project)
options = install.parse_args(argv)
install.check_python_version()
install.check_dependencies()
install.create_virtualenv(no_site_packages=options.no_site_packages)
install.install_dependencies()
install.post_process()
print_help()
if __name__ == '__main__':
main(sys.argv)

View File

@ -1,224 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack Foundation
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Provides methods needed by installation script for OpenStack development
virtual environments.
Synced in from openstack-common
"""
import argparse
import os
import subprocess
import sys
class InstallVenv(object):
def __init__(self, root, venv, pip_requires, pip_options, test_requires,
py_version, project):
self.root = root
self.venv = venv
self.pip_requires = pip_requires
self.pip_options = pip_options
self.test_requires = test_requires
self.py_version = py_version
self.project = project
def die(self, message, *args):
print >> sys.stderr, message % args
sys.exit(1)
def check_python_version(self):
if sys.version_info < (2, 6):
self.die("Need Python Version >= 2.6")
def run_command_with_code(self, cmd, redirect_output=True,
check_exit_code=True):
"""Runs a command in an out-of-process shell.
Returns the output of that command. Working directory is self.root.
"""
if redirect_output:
stdout = subprocess.PIPE
else:
stdout = None
proc = subprocess.Popen(cmd, cwd=self.root, stdout=stdout)
output = proc.communicate()[0]
if check_exit_code and proc.returncode != 0:
self.die('Command "%s" failed.\n%s', ' '.join(cmd), output)
return (output, proc.returncode)
def run_command(self, cmd, redirect_output=True, check_exit_code=True):
return self.run_command_with_code(cmd, redirect_output,
check_exit_code)[0]
def get_distro(self):
if (os.path.exists('/etc/fedora-release') or
os.path.exists('/etc/redhat-release')):
return Fedora(self.root, self.venv, self.pip_requires,
self.pip_options, self.test_requires,
self.py_version, self.project)
else:
return Distro(self.root, self.venv, self.pip_requires,
self.pip_options, self.test_requires,
self.py_version, self.project)
def check_dependencies(self):
self.get_distro().install_virtualenv()
def create_virtualenv(self, no_site_packages=True):
"""Creates the virtual environment and installs PIP.
Creates the virtual environment and installs PIP only into the
virtual environment.
"""
if not os.path.isdir(self.venv):
print 'Creating venv...',
if no_site_packages:
self.run_command(['virtualenv', '-q', '--no-site-packages',
self.venv])
else:
self.run_command(['virtualenv', '-q', self.venv])
print 'done.'
print 'Installing pip in venv...',
if not self.run_command(['tools/with_venv.sh', 'easy_install',
'pip>1.0']).strip():
self.die("Failed to install pip.")
print 'done.'
else:
print "venv already exists..."
pass
def pip_install(self, *args):
self.run_command(['tools/with_venv.sh',
'pip', 'install', '--upgrade'] + list(args),
redirect_output=False)
def install_dependencies(self):
print 'Installing dependencies with pip (this can take a while)...'
# First things first, make sure our venv has the latest pip and
# distribute.
# NOTE: we keep pip at version 1.1 since the most recent version causes
# the .venv creation to fail. See:
# https://bugs.launchpad.net/nova/+bug/1047120
self.pip_install('pip==1.1')
self.pip_install('distribute')
# Install greenlet by hand - just listing it in the requires file does
# not
# get it installed in the right order
self.pip_install('greenlet')
self.pip_install('-r', self.pip_requires)
self.pip_install('-r', self.pip_options)
self.pip_install('-r', self.test_requires)
def post_process(self):
self.get_distro().post_process()
def parse_args(self, argv):
"""Parses command-line arguments."""
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--no-site-packages',
action='store_true',
help="Do not inherit packages from global Python "
"install")
return parser.parse_args(argv[1:])
class Distro(InstallVenv):
def check_cmd(self, cmd):
return bool(self.run_command(['which', cmd],
check_exit_code=False).strip())
def install_virtualenv(self):
if self.check_cmd('virtualenv'):
return
if self.check_cmd('easy_install'):
print 'Installing virtualenv via easy_install...',
if self.run_command(['easy_install', 'virtualenv']):
print 'Succeeded'
return
else:
print 'Failed'
self.die('ERROR: virtualenv not found.\n\n%s development'
' requires virtualenv, please install it using your'
' favorite package management tool' % self.project)
def post_process(self):
"""Any distribution-specific post-processing gets done here.
In particular, this is useful for applying patches to code inside
the venv.
"""
pass
class Fedora(Distro):
"""This covers all Fedora-based distributions.
Includes: Fedora, RHEL, CentOS, Scientific Linux
"""
def check_pkg(self, pkg):
return self.run_command_with_code(['rpm', '-q', pkg],
check_exit_code=False)[1] == 0
def yum_install(self, pkg, **kwargs):
print "Attempting to install '%s' via yum" % pkg
self.run_command(['sudo', 'yum', 'install', '-y', pkg], **kwargs)
def apply_patch(self, originalfile, patchfile):
self.run_command(['patch', '-N', originalfile, patchfile],
check_exit_code=False)
def install_virtualenv(self):
if self.check_cmd('virtualenv'):
return
if not self.check_pkg('python-virtualenv'):
self.yum_install('python-virtualenv', check_exit_code=False)
super(Fedora, self).install_virtualenv()
def post_process(self):
"""Workaround for a bug in eventlet.
This currently affects RHEL6.1, but the fix can safely be
applied to all RHEL and Fedora distributions.
This can be removed when the fix is applied upstream.
Nova: https://bugs.launchpad.net/nova/+bug/884915
Upstream: https://bitbucket.org/which_linden/eventlet/issue/89
"""
# Install "patch" program if it's not there
if not self.check_pkg('patch'):
self.yum_install('patch')
# Apply the eventlet patch
self.apply_patch(os.path.join(self.venv, 'lib', self.py_version,
'site-packages',
'eventlet/green/subprocess.py'),
'contrib/redhat-eventlet.patch')

View File

@ -1,39 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import sys
import install_venv_common as install_venv
def main(argv):
root = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
venv = os.environ['VIRTUAL_ENV']
pip_requires = os.path.join(root, 'tools', 'pip-requires')
pip_options = os.path.join(root, 'tools', 'pip-options')
test_requires = os.path.join(root, 'tools', 'test-requires')
py_version = "python%s.%s" % (sys.version_info[0], sys.version_info[1])
project = 'Quantum'
install = install_venv.InstallVenv(root, venv, pip_requires, pip_options,
test_requires, py_version, project)
#NOTE(dprince): For Tox we only run post_process (which patches files, etc)
install.post_process()
if __name__ == '__main__':
main(sys.argv)

View File

@ -1,8 +0,0 @@
# Optional Stuff that is used by default
alembic
SQLAlchemy>=0.7.8,<=0.7.9
kombu
# Identity
python-memcached
passlib

View File

@ -1,21 +0,0 @@
Babel>=0.9.6
pbr>=0.5.21,<1.0
# This file is managed by openstack-depends
argparse
cliff>=1.4
eventlet>=0.13.0
extras
pecan>=0.2.0
iso8601>=0.1.4
netaddr
oslo.config>=1.1.0
Paste
PasteDeploy>=1.5.0
pycountry
Routes>=1.12.3
stevedore>=0.10
WebOb>=1.2.3,<1.3
https://github.com/stackforge/wsme/archive/master.zip#egg=WSME
# Taskflow
threading2
networkx

View File

View File

@ -18,4 +18,4 @@
TOOLS=`dirname $0`
VENV=$TOOLS/../.venv
source $VENV/bin/activate && $@
source $VENV/bin/activate && "$@"

17
tox.ini
View File

@ -2,26 +2,24 @@
envlist = py26,py27,pep8
[testenv]
#usedevelop = True
install_command = pip install {opts} {packages}
setenv = VIRTUAL_ENV={envdir}
deps = -r{toxinidir}/tools/pip-requires
-r{toxinidir}/tools/pip-options
-r{toxinidir}/tools/test-requires
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
setuptools_git>=0.4
commands =
python tools/patch_tox_venv.py
python setup.py testr --slowest --testr-args='{posargs}'
commands = python setup.py testr --slowest --testr-args='{posargs}'
[tox:jenkins]
sitepackages = True
downloadcache = ~/cache/pip
[testenv:pep8]
deps = flake8
commands =
flake8
[testenv:cover]
commands =
python tools/patch_tox_venv.py
python setup.py testr --coverage --testr-args='{posargs}'
[testenv:venv]
@ -38,5 +36,4 @@ commands = {posargs}
# TODO(markmcclain) H202 assertRaises Exception too broad
ignore = E711,E712,E125,H301,H302,H404,H901,H902,H202
show-source = true
builtins = _
exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,tools
exclude = .venv,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,tests,build