Add lambda task and complete implementation of assert task

Add lambda task and complete implementation on assert task, both of
which take functions as constructor arguments, to be run at task
execute time against arguments supplied during execute time.  Lambda
task is for applying functions to generate new, transformed, and/or
consolidated outputs/data.  Assert task is for applying a
function/predicate on input data and assert the truth value of the
function/predicate's result.

Change-Id: Ib8a0a956011f00899e8bf8eea957655e9cffce97
This commit is contained in:
Min Pae 2015-02-12 10:24:37 -08:00
parent 1f2ccd8882
commit 13c78bbbd9
3 changed files with 105 additions and 10 deletions

View File

@ -15,3 +15,4 @@
from assert_task import Assert # noqa
from check_for import CheckFor # noqa
from lambda_task import Lambda # noqa

View File

@ -14,20 +14,57 @@
# under the License.
import inspect
import time
import taskflow.task as task
class Assert(task.Task):
"""General purpose Task to assert a condition
This Task takes a function (lambda or otherwise) as a condition and
applies it to input parameters. The result is then asserted to be
True.
Input parameters must be specifically required by the task using the
requires argument in the task constructor.
>>> from pprint import pprint
>>> import taskflow.engines as engines
>>> from taskflow.patterns import linear_flow
>>> from os_tasklib.common import Assert
>>> l = lambda x, y: x == y
>>> flow = linear_flow.Flow("test assert flow")
>>> flow = flow.add(Assert(l, requires=('x','y')))
>>> good_input = { 'x': 3, 'y': 3 }
>>> result = engines.run(flow, store=good_input)
>>> pprint(result)
{'x': 3, 'y': 3}
As shown below, a failed assertion will thrown an AssertionError. In
taskfow, this will also result in tasks that precede the failed Assert Task
to be reverted.
>>> bad_input = { 'x': 2, 'y': 3 }
>>> result = engines.run(flow, store=bad_input)
Traceback (most recent call last):
...
AssertionError
"""
def __init__(self,
predicate,
arglist=None,
condition,
timeout_seconds=None,
timeout_ms=None,
name=None,
**kwargs):
self.predicate = predicate
super(Assert, self).__init__(name=name, **kwargs)
self.condition = condition
self.argspec = inspect.getargspec(condition)
self.sleep_time = 0
if timeout_seconds:
@ -36,15 +73,16 @@ class Assert(task.Task):
if timeout_ms:
self.sleep_time += timeout_ms / 1000.0
super(Assert, self).__init__(name=name, **kwargs)
def execute(self, *args, **kwargs):
print("execute")
assert(self.predicate(*args, **kwargs))
return (args, kwargs)
common_keys = set(self.argspec.args).intersection(kwargs.keys())
condition_kwargs = {k: kwargs[k] for k in common_keys}
assert(self.condition(**condition_kwargs))
def revert(self, *args, **kwargs):
print("revert")
print("%s %s" % (args, kwargs))
if self.sleep_time != 0:
time.sleep(self.sleep_time)
if __name__ == "__main__":
import doctest
doctest.testmod()

View File

@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import inspect
import taskflow.task as task
class Lambda(task.Task):
"""General purpose Task to apply transformations to data
This Task takes a function (lambda or otherwise) and
applies it to input parameters.
Input parameters must be specifically required by the task using the
requires argument in the task constructor.
>>> from pprint import pprint
>>> import taskflow.engines as engines
>>> from taskflow.patterns import linear_flow
>>> from os_tasklib.common import Lambda
>>> l = lambda x, y: x + y
>>> flow = linear_flow.Flow("test lambda flow")
>>> flow = flow.add(Lambda(l, provides='z', requires=('x','y')))
>>> input_store = { 'x': 2, 'y': 3 }
>>> result = engines.run(flow, store=input_store)
>>> pprint(result)
{'x': 2, 'y': 3, 'z': 5}
"""
def __init__(self, functor, name=None, **kwargs):
super(Lambda, self).__init__(name=name, **kwargs)
self.functor = functor
self.argspec = inspect.getargspec(functor)
def execute(self, *args, **kwargs):
common_keys = set(self.argspec.args).intersection(kwargs.keys())
functor_kwargs = {k: kwargs[k] for k in common_keys}
result = self.functor(**functor_kwargs)
return result