Signals¶
Warning
This feature is in beta mode and subject to changes. Any feedback is appreciated.
Signals are handled via two methods: Workflow.signal and Workflow.wait_signal.
They are currently only implemented with SWF.
Signaling a workflow¶
The Workflow.signal method sends a signal to one or several workflows.
def run(self):
# Send to self, parent and children
future = self.submit(self.signal("signal_name", *args, **kwargs))
# Send to specific workflow
future = self.submit(self.signal("signal_name", workflow_id, run_id, *args, **kwargs))
The future will be finished, its result being *args and **kwargs, as soon as at least one workflow has been signaled (including oneself).
Waiting for a signal¶
The Workflow.wait_signal returns a Future which result is the signal input.
def run(self):
future = self.submit(self.wait_signal("signal_name"))
result = future.result
Naturally, one isn’t forced to wait on the future result:
def run(self):
my_signal = self.submit(self.wait_signal("signal_name"))
if my_signal.finished:
# Something happened
self.process(my_signal.result)
A workflow can choose to have signals not propagated to its parent by defining
propagate_signals_to_parent = False.
Limitations¶
- signals cannot be reset; they can be overwritten though (only the latest one count)
- derive from futures.Future to add the timestamp or counter and better names? This would bypass the "reset" issue too
One way to handle recurrent signals is by using event_id’s (available with Workflow.get_event_details). For
instance, when receiving a signal, check that a marker with the same name does not exist or is in the past (lower
event ID); if so, the signal is new, so process it and create a marker.
Implementation¶
The swf.executor.signal method returns a swf.SignalTask instance. Its schedule method
returns an ExternalWorkflowExecutionDecision containing the given signal, sent either to the running workflow or
the specified one.
This decision results in a SignalExternalWorkflowExecutionInitiated followed (if all’s well) by a
SignalExternalWorkflowExecutionInitiated in the sender’s history; from these events, we create first a running,
then a completed future. (It can also fail, for instance if the workflow doesn’t exist.)
The receiver gets a WorkflowExecutionSignaled with the signal name, input and external (i.e. sender) information.
We may want every known workflow to be signaled too: if propagate=True is passed to Workflow.signal, the
signal is propagated to the parent and children of the workflow.
Since we propagate using SignalWorkflowExecution, not a decision, the target doesn’t have the
externalWorkflowExecution information; so we pass __workflow_id and __run_id in the input.