In Community Threads, we dig into some of the most frequent and insightful questions that appear in our Temporal Community.

The question

In today’s Thread, I will look at a question from a Temporal user who wants to know:

Is it possible to create event-based Workflows; and, if so, how do I make Activities within a Workflow wait for these events?

image-community-threads-1

The context

The full anatomy of a Workflow is out of the scope of this article, but as a durable, reliable, and scalable function execution, a Workflow Execution is the main unit of a Temporal application. Workflow Executions are instances of Workflow Definitions; think of a Workflow Definition as a series of expected steps or processes, and a Workflow Execution as a single “run” of that series of steps from start to finish.

Workflows use Signals to communicate between Workflow Execution instances, and Activities to communicate with the external environment.

The following diagrams shows the two ways Workflow Executions interact with the Temporal Platform: they can issue Commands, such as ScheduleActivityTask or StartTimer; and they can wait on Awaitables.

image-community-threads-2

A Workflow Execution may only ever block progress on an Awaitable that is provided through a Temporal SDK API. Awaitables are provided when using APIs for the following:

  • Awaiting: Progress can block using explicit “Await” APIs.
  • Requesting cancellation of another Workflow Execution: Progress can block on confirmation that the other Workflow Execution is canceled.
  • Sending a Signal: Progress can block on confirmation that the Signal sent.
  • Spawning a Child Workflow Execution: Progress can block on confirmation that the Child Workflow Execution started, and on the result of the Child Workflow Execution.
  • Spawning an Activity Execution: Progress can block on the result of the Activity Execution.
  • Starting a Timer: Progress can block until the Timer fires.

In the context of this user’s question, each of their Activities require waiting on either a handshake, an acknowledgment, or some other type of Awaitable before proceeding to the next Activity. As shown above, this event-based flow is in the DNA of Temporal Workflows.

The Answer

The question came from the Python SDK forum, so the example code for the response will most closely resemble a Python implementation.

The short answer is: yes! Because of the magic of Temporal and the Temporal-backed asyncio event loop in Python, you can combine variables and wait conditions to create async Activities within your Workflow Definitions. Executing Activities requires a start_to_close_timeout. The main purpose of this param is to detect when a Worker crashes after it has started executing an Activity Task.

You can then set up a signal handler within your Workflow, using set() for each event to send a signal to the Workflow to trigger the subsequent Activity.

Here is a sketch implementation:

@workflow.defn
class MyWorkflow:
    def __init__(self):
        self.stage = 0

    @workflow.run
    async def run(self) -> None:
        workflow.logger.info("Executing activity 1")
        await workflow.execute_activity(my_activity_1, start_to_close_timeout=timedelta(seconds=45))
        workflow.logger.info("Finished activity 1")
        await workflow.wait_condition(lambda: self.stage > 0)

        workflow.logger.info("Executing activity 2")
        await workflow.execute_activity(my_activity_2, start_to_close_timeout=timedelta(seconds=30))
        workflow.logger.info("Finished activity 2")
        await workflow.wait_condition(lambda: self.stage > 1)

        workflow.logger.info("Executing activity 3")
        await workflow.execute_activity(my_activity_3, start_to_close_timeout=timedelta(minutes=15))
        workflow.logger.info("Finished activity 3")
        await workflow.wait_condition(lambda: self.stage > 2)

        workflow.logger.info("Executing activity 4")
        await workflow.execute_activity(my_activity_4, start_to_close_timeout=timedelta(hours=2))
        workflow.logger.info("Finished activity 4")

    @workflow.signal
    def advance_stage(self, stage: int) –> None:
        self.stage = stage

This is a very normal Workflow and will work as expected even if it has to wait for weeks between events, the Worker crashes, etc. Thanks to Temporal’s replay functionality, to recover a Workflow you simply need to send a signal with the Activity number from a client anywhere, in any language as needed.

Join the Temporal user community in Slack

If you found this helpful, or if you have your own questions about Temporal that you’d like to see answered, you can join us in Slack to connect with other Temporal users and team members.