Allow loading conductors via entrypoints
Add an easy to use helper function that is exposed to fetch a conductor in a way that matches how engines and persistence backends and jobboards are loaded. This also adjusts the single threaded conductor to be now named blocking conductor and exposes an entrypoint backend fetch() function to load current and future conductors. Change-Id: Id146d847c329d3e8510a8f24c3ec8b918680ddb5
This commit is contained in:
parent
28bece7c7c
commit
eaad725508
|
@ -67,14 +67,15 @@ Interfaces
|
|||
Implementations
|
||||
===============
|
||||
|
||||
.. automodule:: taskflow.conductors.single_threaded
|
||||
.. automodule:: taskflow.conductors.backends
|
||||
.. automodule:: taskflow.conductors.backends.impl_blocking
|
||||
|
||||
Hierarchy
|
||||
=========
|
||||
|
||||
.. inheritance-diagram::
|
||||
taskflow.conductors.base
|
||||
taskflow.conductors.single_threaded
|
||||
taskflow.conductors.backends.impl_blocking
|
||||
:parts: 1
|
||||
|
||||
.. _railroad conductors: http://en.wikipedia.org/wiki/Conductor_%28transportation%29
|
||||
|
|
|
@ -37,6 +37,9 @@ packages =
|
|||
taskflow.jobboards =
|
||||
zookeeper = taskflow.jobs.backends.impl_zookeeper:ZookeeperJobBoard
|
||||
|
||||
taskflow.conductors =
|
||||
blocking = taskflow.conductors.backends.impl_blocking:BlockingConductor
|
||||
|
||||
taskflow.persistence =
|
||||
dir = taskflow.persistence.backends.impl_dir:DirBackend
|
||||
file = taskflow.persistence.backends.impl_dir:DirBackend
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
import stevedore.driver
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
|
||||
# NOTE(harlowja): this is the entrypoint namespace, not the module namespace.
|
||||
CONDUCTOR_NAMESPACE = 'taskflow.conductors'
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def fetch(kind, name, jobboard, namespace=CONDUCTOR_NAMESPACE, **kwargs):
|
||||
"""Fetch a conductor backend with the given options.
|
||||
|
||||
This fetch method will look for the entrypoint 'kind' in the entrypoint
|
||||
namespace, and then attempt to instantiate that entrypoint using the
|
||||
provided name, jobboard and any board specific kwargs.
|
||||
"""
|
||||
LOG.debug('Looking for %r conductor driver in %r', kind, namespace)
|
||||
try:
|
||||
mgr = stevedore.driver.DriverManager(
|
||||
namespace, kind,
|
||||
invoke_on_load=True,
|
||||
invoke_args=(name, jobboard),
|
||||
invoke_kwds=kwargs)
|
||||
return mgr.driver
|
||||
except RuntimeError as e:
|
||||
raise exc.NotFound("Could not find conductor %s" % (kind), e)
|
|
@ -0,0 +1,175 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import six
|
||||
|
||||
from taskflow.conductors import base
|
||||
from taskflow import exceptions as excp
|
||||
from taskflow.listeners import logging as logging_listener
|
||||
from taskflow import logging
|
||||
from taskflow.types import timing as tt
|
||||
from taskflow.utils import async_utils
|
||||
from taskflow.utils import deprecation
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
WAIT_TIMEOUT = 0.5
|
||||
NO_CONSUME_EXCEPTIONS = tuple([
|
||||
excp.ExecutionFailure,
|
||||
excp.StorageFailure,
|
||||
])
|
||||
|
||||
|
||||
class BlockingConductor(base.Conductor):
|
||||
"""A conductor that runs jobs in its own dispatching loop.
|
||||
|
||||
This conductor iterates over jobs in the provided jobboard (waiting for
|
||||
the given timeout if no jobs exist) and attempts to claim them, work on
|
||||
those jobs in its local thread (blocking further work from being claimed
|
||||
and consumed) and then consume those work units after completetion. This
|
||||
process will repeat until the conductor has been stopped or other critical
|
||||
error occurs.
|
||||
|
||||
NOTE(harlowja): consumption occurs even if a engine fails to run due to
|
||||
a task failure. This is only skipped when an execution failure or
|
||||
a storage failure occurs which are *usually* correctable by re-running on
|
||||
a different conductor (storage failures and execution failures may be
|
||||
transient issues that can be worked around by later execution). If a job
|
||||
after completing can not be consumed or abandoned the conductor relies
|
||||
upon the jobboard capabilities to automatically abandon these jobs.
|
||||
"""
|
||||
|
||||
def __init__(self, name, jobboard,
|
||||
persistence=None, engine=None,
|
||||
engine_options=None, wait_timeout=None):
|
||||
super(BlockingConductor, self).__init__(
|
||||
name, jobboard, persistence=persistence,
|
||||
engine=engine, engine_options=engine_options)
|
||||
if wait_timeout is None:
|
||||
wait_timeout = WAIT_TIMEOUT
|
||||
if isinstance(wait_timeout, (int, float) + six.string_types):
|
||||
self._wait_timeout = tt.Timeout(float(wait_timeout))
|
||||
elif isinstance(wait_timeout, tt.Timeout):
|
||||
self._wait_timeout = wait_timeout
|
||||
else:
|
||||
raise ValueError("Invalid timeout literal: %s" % (wait_timeout))
|
||||
self._dead = threading_utils.Event()
|
||||
|
||||
@deprecation.removed_kwarg('timeout',
|
||||
version="0.8", removal_version="?")
|
||||
def stop(self, timeout=None):
|
||||
"""Requests the conductor to stop dispatching.
|
||||
|
||||
This method can be used to request that a conductor stop its
|
||||
consumption & dispatching loop.
|
||||
|
||||
The method returns immediately regardless of whether the conductor has
|
||||
been stopped.
|
||||
|
||||
:param timeout: This parameter is **deprecated** and is present for
|
||||
backward compatibility **only**. In order to wait for
|
||||
the conductor to gracefully shut down, :meth:`wait`
|
||||
should be used instead.
|
||||
"""
|
||||
self._wait_timeout.interrupt()
|
||||
|
||||
@property
|
||||
def dispatching(self):
|
||||
return not self._dead.is_set()
|
||||
|
||||
def _dispatch_job(self, job):
|
||||
engine = self._engine_from_job(job)
|
||||
consume = True
|
||||
with logging_listener.LoggingListener(engine, log=LOG):
|
||||
LOG.debug("Dispatching engine %s for job: %s", engine, job)
|
||||
try:
|
||||
engine.run()
|
||||
except excp.WrappedFailure as e:
|
||||
if all((f.check(*NO_CONSUME_EXCEPTIONS) for f in e)):
|
||||
consume = False
|
||||
if LOG.isEnabledFor(logging.WARNING):
|
||||
if consume:
|
||||
LOG.warn("Job execution failed (consumption being"
|
||||
" skipped): %s [%s failures]", job, len(e))
|
||||
else:
|
||||
LOG.warn("Job execution failed (consumption"
|
||||
" proceeding): %s [%s failures]", job, len(e))
|
||||
# Show the failure/s + traceback (if possible)...
|
||||
for i, f in enumerate(e):
|
||||
LOG.warn("%s. %s", i + 1, f.pformat(traceback=True))
|
||||
except NO_CONSUME_EXCEPTIONS:
|
||||
LOG.warn("Job execution failed (consumption being"
|
||||
" skipped): %s", job, exc_info=True)
|
||||
consume = False
|
||||
except Exception:
|
||||
LOG.warn("Job execution failed (consumption proceeding): %s",
|
||||
job, exc_info=True)
|
||||
else:
|
||||
LOG.info("Job completed successfully: %s", job)
|
||||
return async_utils.make_completed_future(consume)
|
||||
|
||||
def run(self):
|
||||
self._dead.clear()
|
||||
try:
|
||||
while True:
|
||||
if self._wait_timeout.is_stopped():
|
||||
break
|
||||
dispatched = 0
|
||||
for job in self._jobboard.iterjobs():
|
||||
if self._wait_timeout.is_stopped():
|
||||
break
|
||||
LOG.debug("Trying to claim job: %s", job)
|
||||
try:
|
||||
self._jobboard.claim(job, self._name)
|
||||
except (excp.UnclaimableJob, excp.NotFound):
|
||||
LOG.debug("Job already claimed or consumed: %s", job)
|
||||
continue
|
||||
consume = False
|
||||
try:
|
||||
f = self._dispatch_job(job)
|
||||
except Exception:
|
||||
LOG.warn("Job dispatching failed: %s", job,
|
||||
exc_info=True)
|
||||
else:
|
||||
dispatched += 1
|
||||
consume = f.result()
|
||||
try:
|
||||
if consume:
|
||||
self._jobboard.consume(job, self._name)
|
||||
else:
|
||||
self._jobboard.abandon(job, self._name)
|
||||
except (excp.JobFailure, excp.NotFound):
|
||||
if consume:
|
||||
LOG.warn("Failed job consumption: %s", job,
|
||||
exc_info=True)
|
||||
else:
|
||||
LOG.warn("Failed job abandonment: %s", job,
|
||||
exc_info=True)
|
||||
if dispatched == 0 and not self._wait_timeout.is_stopped():
|
||||
self._wait_timeout.wait()
|
||||
finally:
|
||||
self._dead.set()
|
||||
|
||||
def wait(self, timeout=None):
|
||||
"""Waits for the conductor to gracefully exit.
|
||||
|
||||
This method waits for the conductor to gracefully exit. An optional
|
||||
timeout can be provided, which will cause the method to return
|
||||
within the specified timeout. If the timeout is reached, the returned
|
||||
value will be False.
|
||||
|
||||
:param timeout: Maximum number of seconds that the :meth:`wait` method
|
||||
should block for.
|
||||
"""
|
||||
return self._dead.wait(timeout)
|
|
@ -24,7 +24,7 @@ from taskflow.utils import lock_utils
|
|||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Conductor(object):
|
||||
"""Conductors conduct jobs & assist in associated runtime interactions.
|
||||
"""Base for all conductor implementations.
|
||||
|
||||
Conductors act as entities which extract jobs from a jobboard, assign
|
||||
there work to some engine (using some desired configuration) and then wait
|
||||
|
@ -34,8 +34,8 @@ class Conductor(object):
|
|||
period of time will finish up the prior failed conductors work.
|
||||
"""
|
||||
|
||||
def __init__(self, name, jobboard, persistence,
|
||||
engine=None, engine_options=None):
|
||||
def __init__(self, name, jobboard,
|
||||
persistence=None, engine=None, engine_options=None):
|
||||
self._name = name
|
||||
self._jobboard = jobboard
|
||||
self._engine = engine
|
||||
|
@ -101,7 +101,7 @@ class Conductor(object):
|
|||
|
||||
@lock_utils.locked
|
||||
def close(self):
|
||||
"""Closes the jobboard, disallowing further use."""
|
||||
"""Closes the contained jobboard, disallowing further use."""
|
||||
self._jobboard.close()
|
||||
|
||||
@abc.abstractmethod
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
|
@ -12,163 +14,15 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import six
|
||||
|
||||
from taskflow.conductors import base
|
||||
from taskflow import exceptions as excp
|
||||
from taskflow.listeners import logging as logging_listener
|
||||
from taskflow import logging
|
||||
from taskflow.types import timing as tt
|
||||
from taskflow.utils import async_utils
|
||||
from taskflow.conductors.backends import impl_blocking
|
||||
from taskflow.utils import deprecation
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
WAIT_TIMEOUT = 0.5
|
||||
NO_CONSUME_EXCEPTIONS = tuple([
|
||||
excp.ExecutionFailure,
|
||||
excp.StorageFailure,
|
||||
])
|
||||
# TODO(harlowja): remove this module soon...
|
||||
deprecation.removed_module(__name__,
|
||||
replacement_name="the conductor entrypoints",
|
||||
version="0.8", removal_version="?")
|
||||
|
||||
|
||||
class SingleThreadedConductor(base.Conductor):
|
||||
"""A conductor that runs jobs in its own dispatching loop.
|
||||
|
||||
This conductor iterates over jobs in the provided jobboard (waiting for
|
||||
the given timeout if no jobs exist) and attempts to claim them, work on
|
||||
those jobs in its local thread (blocking further work from being claimed
|
||||
and consumed) and then consume those work units after completetion. This
|
||||
process will repeat until the conductor has been stopped or other critical
|
||||
error occurs.
|
||||
|
||||
NOTE(harlowja): consumption occurs even if a engine fails to run due to
|
||||
a task failure. This is only skipped when an execution failure or
|
||||
a storage failure occurs which are *usually* correctable by re-running on
|
||||
a different conductor (storage failures and execution failures may be
|
||||
transient issues that can be worked around by later execution). If a job
|
||||
after completing can not be consumed or abandoned the conductor relies
|
||||
upon the jobboard capabilities to automatically abandon these jobs.
|
||||
"""
|
||||
|
||||
def __init__(self, name, jobboard, persistence,
|
||||
engine=None, engine_options=None, wait_timeout=None):
|
||||
super(SingleThreadedConductor, self).__init__(
|
||||
name, jobboard, persistence,
|
||||
engine=engine, engine_options=engine_options)
|
||||
if wait_timeout is None:
|
||||
wait_timeout = WAIT_TIMEOUT
|
||||
if isinstance(wait_timeout, (int, float) + six.string_types):
|
||||
self._wait_timeout = tt.Timeout(float(wait_timeout))
|
||||
elif isinstance(wait_timeout, tt.Timeout):
|
||||
self._wait_timeout = wait_timeout
|
||||
else:
|
||||
raise ValueError("Invalid timeout literal: %s" % (wait_timeout))
|
||||
self._dead = threading_utils.Event()
|
||||
|
||||
@deprecation.removed_kwarg('timeout',
|
||||
version="0.8", removal_version="?")
|
||||
def stop(self, timeout=None):
|
||||
"""Requests the conductor to stop dispatching.
|
||||
|
||||
This method can be used to request that a conductor stop its
|
||||
consumption & dispatching loop.
|
||||
|
||||
The method returns immediately regardless of whether the conductor has
|
||||
been stopped.
|
||||
|
||||
:param timeout: This parameter is **deprecated** and is present for
|
||||
backward compatibility **only**. In order to wait for
|
||||
the conductor to gracefully shut down, :meth:`wait`
|
||||
should be used instead.
|
||||
"""
|
||||
self._wait_timeout.interrupt()
|
||||
|
||||
@property
|
||||
def dispatching(self):
|
||||
return not self._dead.is_set()
|
||||
|
||||
def _dispatch_job(self, job):
|
||||
engine = self._engine_from_job(job)
|
||||
consume = True
|
||||
with logging_listener.LoggingListener(engine, log=LOG):
|
||||
LOG.debug("Dispatching engine %s for job: %s", engine, job)
|
||||
try:
|
||||
engine.run()
|
||||
except excp.WrappedFailure as e:
|
||||
if all((f.check(*NO_CONSUME_EXCEPTIONS) for f in e)):
|
||||
consume = False
|
||||
if LOG.isEnabledFor(logging.WARNING):
|
||||
if consume:
|
||||
LOG.warn("Job execution failed (consumption being"
|
||||
" skipped): %s [%s failures]", job, len(e))
|
||||
else:
|
||||
LOG.warn("Job execution failed (consumption"
|
||||
" proceeding): %s [%s failures]", job, len(e))
|
||||
# Show the failure/s + traceback (if possible)...
|
||||
for i, f in enumerate(e):
|
||||
LOG.warn("%s. %s", i + 1, f.pformat(traceback=True))
|
||||
except NO_CONSUME_EXCEPTIONS:
|
||||
LOG.warn("Job execution failed (consumption being"
|
||||
" skipped): %s", job, exc_info=True)
|
||||
consume = False
|
||||
except Exception:
|
||||
LOG.warn("Job execution failed (consumption proceeding): %s",
|
||||
job, exc_info=True)
|
||||
else:
|
||||
LOG.info("Job completed successfully: %s", job)
|
||||
return async_utils.make_completed_future(consume)
|
||||
|
||||
def run(self):
|
||||
self._dead.clear()
|
||||
try:
|
||||
while True:
|
||||
if self._wait_timeout.is_stopped():
|
||||
break
|
||||
dispatched = 0
|
||||
for job in self._jobboard.iterjobs():
|
||||
if self._wait_timeout.is_stopped():
|
||||
break
|
||||
LOG.debug("Trying to claim job: %s", job)
|
||||
try:
|
||||
self._jobboard.claim(job, self._name)
|
||||
except (excp.UnclaimableJob, excp.NotFound):
|
||||
LOG.debug("Job already claimed or consumed: %s", job)
|
||||
continue
|
||||
consume = False
|
||||
try:
|
||||
f = self._dispatch_job(job)
|
||||
except Exception:
|
||||
LOG.warn("Job dispatching failed: %s", job,
|
||||
exc_info=True)
|
||||
else:
|
||||
dispatched += 1
|
||||
consume = f.result()
|
||||
try:
|
||||
if consume:
|
||||
self._jobboard.consume(job, self._name)
|
||||
else:
|
||||
self._jobboard.abandon(job, self._name)
|
||||
except (excp.JobFailure, excp.NotFound):
|
||||
if consume:
|
||||
LOG.warn("Failed job consumption: %s", job,
|
||||
exc_info=True)
|
||||
else:
|
||||
LOG.warn("Failed job abandonment: %s", job,
|
||||
exc_info=True)
|
||||
if dispatched == 0 and not self._wait_timeout.is_stopped():
|
||||
self._wait_timeout.wait()
|
||||
finally:
|
||||
self._dead.set()
|
||||
|
||||
def wait(self, timeout=None):
|
||||
"""Waits for the conductor to gracefully exit.
|
||||
|
||||
This method waits for the conductor to gracefully exit. An optional
|
||||
timeout can be provided, which will cause the method to return
|
||||
within the specified timeout. If the timeout is reached, the returned
|
||||
value will be False.
|
||||
|
||||
:param timeout: Maximum number of seconds that the :meth:`wait` method
|
||||
should block for.
|
||||
"""
|
||||
return self._dead.wait(timeout)
|
||||
# TODO(harlowja): remove this proxy/legacy class soon...
|
||||
SingleThreadedConductor = deprecation.moved_inheritable_class(
|
||||
impl_blocking.BlockingConductor, 'SingleThreadedConductor',
|
||||
__name__, version="0.8", removal_version="?")
|
||||
|
|
|
@ -19,7 +19,7 @@ import contextlib
|
|||
|
||||
from zake import fake_client
|
||||
|
||||
from taskflow.conductors import single_threaded as stc
|
||||
from taskflow.conductors import backends
|
||||
from taskflow import engines
|
||||
from taskflow.jobs.backends import impl_zookeeper
|
||||
from taskflow.jobs import base
|
||||
|
@ -50,10 +50,13 @@ def test_factory(blowup):
|
|||
return f
|
||||
|
||||
|
||||
class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
|
||||
ComponentBundle = collections.namedtuple('ComponentBundle',
|
||||
['board', 'client',
|
||||
'persistence', 'conductor'])
|
||||
ComponentBundle = collections.namedtuple('ComponentBundle',
|
||||
['board', 'client',
|
||||
'persistence', 'conductor'])
|
||||
|
||||
|
||||
class BlockingConductorTest(test_utils.EngineTestBase, test.TestCase):
|
||||
KIND = 'blocking'
|
||||
|
||||
def make_components(self, name='testing', wait_timeout=0.1):
|
||||
client = fake_client.FakeClient()
|
||||
|
@ -61,9 +64,10 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
|
|||
board = impl_zookeeper.ZookeeperJobBoard(name, {},
|
||||
client=client,
|
||||
persistence=persistence)
|
||||
conductor = stc.SingleThreadedConductor(name, board, persistence,
|
||||
wait_timeout=wait_timeout)
|
||||
return self.ComponentBundle(board, client, persistence, conductor)
|
||||
conductor = backends.fetch(self.KIND, name, board,
|
||||
persistence=persistence,
|
||||
wait_timeout=wait_timeout)
|
||||
return ComponentBundle(board, client, persistence, conductor)
|
||||
|
||||
def test_connection(self):
|
||||
components = self.make_components()
|
|
@ -118,8 +118,8 @@ class MovedClassProxy(object):
|
|||
type(self).__name__, id(self), wrapped, id(wrapped))
|
||||
|
||||
|
||||
def _generate_moved_message(prefix, postfix=None, message=None,
|
||||
version=None, removal_version=None):
|
||||
def _generate_message(prefix, postfix=None, message=None,
|
||||
version=None, removal_version=None):
|
||||
message_components = [prefix]
|
||||
if version:
|
||||
message_components.append(" in version '%s'" % version)
|
||||
|
@ -143,9 +143,9 @@ def renamed_kwarg(old_name, new_name, message=None,
|
|||
|
||||
prefix = _KWARG_MOVED_PREFIX_TPL % old_name
|
||||
postfix = _KWARG_MOVED_POSTFIX_TPL % new_name
|
||||
out_message = _generate_moved_message(prefix, postfix=postfix,
|
||||
message=message, version=version,
|
||||
removal_version=removal_version)
|
||||
out_message = _generate_message(prefix, postfix=postfix,
|
||||
message=message, version=version,
|
||||
removal_version=removal_version)
|
||||
|
||||
def decorator(f):
|
||||
|
||||
|
@ -165,9 +165,9 @@ def removed_kwarg(old_name, message=None,
|
|||
"""Decorates a kwarg accepting function to deprecate a removed kwarg."""
|
||||
|
||||
prefix = _KWARG_MOVED_PREFIX_TPL % old_name
|
||||
out_message = _generate_moved_message(prefix, postfix=None,
|
||||
message=message, version=version,
|
||||
removal_version=removal_version)
|
||||
out_message = _generate_message(prefix, postfix=None,
|
||||
message=message, version=version,
|
||||
removal_version=removal_version)
|
||||
|
||||
def decorator(f):
|
||||
|
||||
|
@ -204,7 +204,7 @@ def _moved_decorator(kind, new_attribute_name, message=None,
|
|||
old_name = ".".join((base_name, old_attribute_name))
|
||||
new_name = ".".join((base_name, new_attribute_name))
|
||||
prefix = _KIND_MOVED_PREFIX_TPL % (kind, old_name, new_name)
|
||||
out_message = _generate_moved_message(
|
||||
out_message = _generate_message(
|
||||
prefix, message=message,
|
||||
version=version, removal_version=removal_version)
|
||||
deprecation(out_message, stacklevel=stacklevel)
|
||||
|
@ -215,6 +215,20 @@ def _moved_decorator(kind, new_attribute_name, message=None,
|
|||
return decorator
|
||||
|
||||
|
||||
def removed_module(module_name, replacement_name=None, message=None,
|
||||
version=None, removal_version=None, stacklevel=4):
|
||||
prefix = "The '%s' module usage is deprecated" % module_name
|
||||
if replacement_name:
|
||||
postfix = ", please use %s instead" % replacement_name
|
||||
else:
|
||||
postfix = None
|
||||
out_message = _generate_message(prefix,
|
||||
postfix=postfix, message=message,
|
||||
version=version,
|
||||
removal_version=removal_version)
|
||||
deprecation(out_message, stacklevel=stacklevel)
|
||||
|
||||
|
||||
def moved_property(new_attribute_name, message=None,
|
||||
version=None, removal_version=None, stacklevel=3):
|
||||
"""Decorates a *instance* property that was moved to another location."""
|
||||
|
@ -240,9 +254,9 @@ def moved_inheritable_class(new_class, old_class_name, old_module_name,
|
|||
old_name = ".".join((old_module_name, old_class_name))
|
||||
new_name = reflection.get_class_name(new_class)
|
||||
prefix = _CLASS_MOVED_PREFIX_TPL % (old_name, new_name)
|
||||
out_message = _generate_moved_message(prefix,
|
||||
message=message, version=version,
|
||||
removal_version=removal_version)
|
||||
out_message = _generate_message(prefix,
|
||||
message=message, version=version,
|
||||
removal_version=removal_version)
|
||||
|
||||
def decorator(f):
|
||||
|
||||
|
@ -273,7 +287,7 @@ def moved_class(new_class, old_class_name, old_module_name, message=None,
|
|||
old_name = ".".join((old_module_name, old_class_name))
|
||||
new_name = reflection.get_class_name(new_class)
|
||||
prefix = _CLASS_MOVED_PREFIX_TPL % (old_name, new_name)
|
||||
out_message = _generate_moved_message(prefix,
|
||||
message=message, version=version,
|
||||
removal_version=removal_version)
|
||||
out_message = _generate_message(prefix,
|
||||
message=message, version=version,
|
||||
removal_version=removal_version)
|
||||
return MovedClassProxy(new_class, out_message, stacklevel=stacklevel)
|
||||
|
|
Loading…
Reference in New Issue