adding Map and Reduce task and simplifying Lambda

- adding Map and Reduce tasks to mimic the map and reduce builtin
  functions

- modified Lambda task to no longer require a 'require' parameter,
  instead using the functor's argument list to specify required
  parameters to the Task

Change-Id: I048c5777028ffeb175b273ebc60396e65c614187
This commit is contained in:
Min Pae 2015-02-14 16:17:02 -08:00
parent 413d2c12be
commit 4aef254997
4 changed files with 132 additions and 11 deletions

View File

@ -16,3 +16,5 @@
from assert_task import Assert # noqa
from check_for import CheckFor # noqa
from lambda_task import Lambda # noqa
from map_task import Map # noqa
from reduce_task import Reduce # noqa

View File

@ -13,8 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import inspect
import oslo_utils.reflection as reflection
import taskflow.task as task
@ -24,9 +23,6 @@ class Lambda(task.Task):
This Task takes a function (lambda or otherwise) and
applies it to input parameters.
Input parameters must be specifically required by the task using the
requires argument in the task constructor.
>>> from pprint import pprint
>>> import taskflow.engines as engines
>>> from taskflow.patterns import linear_flow
@ -34,7 +30,7 @@ class Lambda(task.Task):
>>> l = lambda x, y: x + y
>>> flow = linear_flow.Flow("test lambda flow")
>>> flow = flow.add(Lambda(l, provides='z', requires=('x','y')))
>>> flow = flow.add(Lambda(l, provides='z'))
>>> input_store = { 'x': 2, 'y': 3 }
>>> result = engines.run(flow, store=input_store)
@ -42,14 +38,19 @@ class Lambda(task.Task):
>>> pprint(result)
{'x': 2, 'y': 3, 'z': 5}
"""
def __init__(self, functor, name=None, **kwargs):
super(Lambda, self).__init__(name=name, **kwargs)
def __init__(self, functor, name=None, requires=None, **kwargs):
self.f_args = reflection.get_callable_args(functor)
if requires and tuple(requires) != tuple(self.f_args):
raise ValueError("requires must be the same as the functor "
"argument list")
super(Lambda, self).__init__(name=name, requires=self.f_args, **kwargs)
self.functor = functor
self.argspec = inspect.getargspec(functor)
def execute(self, *args, **kwargs):
common_keys = set(self.argspec.args).intersection(kwargs.keys())
def execute(self, **kwargs):
common_keys = set(self.f_args).intersection(kwargs.keys())
functor_kwargs = {k: kwargs[k] for k in common_keys}
result = self.functor(**functor_kwargs)

View File

@ -0,0 +1,60 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import oslo_utils.reflection as reflection
import taskflow.task as task
class Map(task.Task):
"""General purpose Task to map a function to a list
This Task mimics the behavior of Python's built-in map function. The Task
takes a functor (lambda or otherwise) and a list. The list is specified
using the requires argument of the Task. When executed, this task calls
map with the functor and list as arguments. The resulting list from the
call to map is then returned after execution.
Each value of the returned list can be bound to individual names using
the provides argument, following taskflow standard behavior. Order is
preserved in the returned list.
>>> from pprint import pprint
>>> import taskflow.engines as engines
>>> from taskflow.patterns import linear_flow
>>> from os_tasklib.common import Map
>>> l = lambda x: x + x
>>> flow = linear_flow.Flow("test lambda flow")
>>> flow = flow.add(Map(l, requires=('x','y'), provides=('x2', 'y2')))
>>> input_store = { 'x': 2, 'y': 3 }
>>> result = engines.run(flow, store=input_store)
>>> pprint(result)
{'x': 2, 'x2': 4, 'y': 3, 'y2': 6}
"""
def __init__(self, functor, requires, name=None, **kwargs):
super(Map, self).__init__(name=name, requires=requires, **kwargs)
f_args = reflection.get_callable_args(functor)
if len(f_args) != 1:
raise ValueError("functor must take exactly 1 argument")
self.functor = functor
self._requires = requires
def execute(self, *args, **kwargs):
l = [kwargs[r] for r in self._requires]
return map(self.functor, l)

View File

@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import oslo_utils.reflection as reflection
import taskflow.task as task
class Reduce(task.Task):
"""General purpose Task to reduce a list by applying a function
This Task mimics the behavior of Python's built-in reduce function. The
Task takes a functor (lambda or otherwise) and a list. The list is
specified using the requires argument of the Task. When executed, this
task calls reduce with the functor and list as arguments. The resulting
value from the call to reduce is then returned after execution.
>>> from pprint import pprint
>>> import taskflow.engines as engines
>>> from taskflow.patterns import linear_flow
>>> from os_tasklib.common import Reduce
>>> l = lambda x, y: x + y
>>> flow = linear_flow.Flow("test lambda flow")
>>> flow = flow.add(Reduce(l, provides='sum', requires=('x','y','z')))
>>> input_store = { 'x': 2, 'y': 3 , 'z': 4}
>>> result = engines.run(flow, store=input_store)
>>> pprint(result)
{'sum': 9, 'x': 2, 'y': 3, 'z': 4}
"""
def __init__(self, functor, requires, name=None, **kwargs):
super(Reduce, self).__init__(name=name, requires=requires, **kwargs)
if len(requires) < 2:
raise ValueError("Minimum of 2 arguments required")
f_args = reflection.get_callable_args(functor)
if len(f_args) != 2:
raise ValueError("functor must take exactly 2 arguments")
self.functor = functor
def execute(self, *args, **kwargs):
l = [kwargs[r] for r in self.requires]
return reduce(self.functor, l)