Signals¶
Warning
This feature is in beta mode and subject to changes. Any feedback is appreciated.
Signals are handled via two methods: Workflow.signal
and Workflow.wait_signal
.
They are currently only implemented with SWF.
Signaling a workflow¶
The Workflow.signal
method sends a signal to one or several workflows.
def run(self):
# Send to self, parent and children
future = self.submit(self.signal("signal_name", *args, **kwargs))
# Send to specific workflow
future = self.submit(self.signal("signal_name", workflow_id, run_id, *args, **kwargs))
The future will be finished, its result being *args and **kwargs, as soon as at least one workflow has been signaled (including oneself).
Waiting for a signal¶
The Workflow.wait_signal
returns a Future
which result is the signal input.
def run(self):
future = self.submit(self.wait_signal("signal_name"))
result = future.result
Naturally, one isn’t forced to wait on the future result:
def run(self):
my_signal = self.submit(self.wait_signal("signal_name"))
if my_signal.finished:
# Something happened
self.process(my_signal.result)
A workflow can choose to have signals not propagated to its parent by defining
propagate_signals_to_parent = False
.
Limitations¶
- signals cannot be reset; they can be overwritten though (only the latest one count)
- derive from futures.Future to add the timestamp or counter and better names? This would bypass the "reset" issue too
One way to handle recurrent signals is by using event_id
’s (available with Workflow.get_event_details
). For
instance, when receiving a signal, check that a marker with the same name does not exist or is in the past (lower
event ID); if so, the signal is new, so process it and create a marker.
Implementation¶
The swf.executor.signal
method returns a swf.SignalTask
instance. Its schedule
method
returns an ExternalWorkflowExecutionDecision
containing the given signal, sent either to the running workflow or
the specified one.
This decision results in a SignalExternalWorkflowExecutionInitiated
followed (if all’s well) by a
SignalExternalWorkflowExecutionInitiated
in the sender’s history; from these events, we create first a running,
then a completed future. (It can also fail, for instance if the workflow doesn’t exist.)
The receiver gets a WorkflowExecutionSignaled
with the signal name, input and external (i.e. sender) information.
We may want every known workflow to be signaled too: if propagate=True
is passed to Workflow.signal
, the
signal is propagated to the parent and children of the workflow.
Since we propagate using SignalWorkflowExecution
, not a decision, the target doesn’t have the
externalWorkflowExecution
information; so we pass __workflow_id
and __run_id
in the input.