Canvas Features¶
Simpleflow handles task grouping above the SWF level, using canvas concepts from Celery: Chains and Groups. It also allows delayed function execution with FuncGroups.
Chains¶
A Chain links tasks together sequentially.
from simpleflow import activity, Workflow
from simpleflow.canvas import Chain
@activity.with_attributes(task_list="quickstart", version="example")
def task_a():
return "Something"
@activity.with_attributes(task_list="quickstart", version="example")
def task_b(x):
return "Something Else", x
class AWorkflow(Workflow):
# ...
def run(self, *args, **kwargs):
futures = self.submit(Chain(task_a, (task_b, 42)))
print(f"Results: {futures.result}")
The future’s result is the list of each chained task result once they are finished.
The Chain.__init__ method takes a series of submittable objects or
(submittable, ...args) tuples.
The tasks can also be added after creating the chain, using the append
method.
from simpleflow import Workflow
from simpleflow.canvas import Chain
class AWorkflow(Workflow):
# ...
def run(self, *args, **kwargs):
chain = Chain()
chain.append(task_a)
chain.append(task_b, x=42)
futures = self.submit(chain)
print(f"Results: {futures.result}")
This allows the use of named arguments. Finally, appended tasks can
directly be ActivityTask or WorkflowTask for maximum flexibility.
Warning
These are obviously too many ways of specifying tasks.
Sending Results¶
Each task in a chain can send its results to the next one, by using the
send_results=True argument. The result is then added to the succeeding
task’s *args.
Error Handling¶
By default, an error in a task will break the chain and bubble the exception up. This can be controlled with several arguments:
raises_on_failure(default: True) — bubble-up on failurebreak_on_failure(default: True) — break on failurebubbles_exception_on_failure(default: False) — in a sub-chain, report the not-raised failure to the upper chain
See examples.canvas.CanvasWorkflow for what’s happening in the
different cases.
raises_on_failure is propagated to the group’s content if set. That is:
* chain = Chain(raises_on_failure=False); chain.append(some_activity)
will propagate raises_on_failure=False to some_activity;
* chain = Chain(); chain.append(some_activity); chain.raises_on_failure = False
will not.
The break_on_failure=False and send_results=True options are
currently incompatible.
Groups¶
A Group represents independent tasks that are scheduled in parallel.
from simpleflow import activity, Workflow
from simpleflow.canvas import Group
@activity.with_attributes(task_list="quickstart", version="example")
def task_a():
return "Something"
@activity.with_attributes(task_list="quickstart", version="example")
def task_b(x):
return "Something Else", x
class AWorkflow(Workflow):
# ...
def run(self, *args, **kwargs):
futures = self.submit(Group(task_a, (task_b, 42)))
print(f"Results: {futures.result}")
Defining a Group is similar to a Chain. The raises_on_failure and
bubbles_exception_on_failure arguments are the same. An extra argument
is max_parallel, which specifies how many tasks can be scheduled at a
given time.
FuncGroup¶
A FuncGroup instance encapsulates a function called by the executor
and returning a Chain or Group to execute. It can be seen as a barrier:
in a Chain, the function will be called after the previous task.
Warning
Untested code
from simpleflow import activity, Workflow
from simpleflow.canvas import Chain, FuncGroup, Group
@activity.with_attributes(task_list="quickstart", version="example")
def partition_data(data_location):
# Partition a list of things to do into parallelizable sub-parts
pass
@activity.with_attributes(task_list="quickstart", version="example")
def execute_on_sub_part(sub_part):
pass
class AWorkflow(Workflow):
# ...
def run(self, *args, **kwargs):
chain = Chain(send_result=True)
chain.append(partition_data, data_location="s3://my_bucket/foo")
chain.append(FuncGroup(lambda parts: Group(*[(execute_on_sub_part, sub_part) for sub_part in parts])))
Here, partition_data’s result is passed to the FuncGroup lambda,
which returns a Group parallelizing its execution.
Advanced Uses¶
No-result FuncGroup¶
The function encapsulated in a FuncGroup must return a result; this is
inconvenient when its job is limited to the workflow state, and must thus
return an empty Group. Since this has been a long-standing policy, a new
_allow_none argument relaxes this constraint.
Warning
This is a new experimental option: a better one might be to enforce that nothing is returned.