DAG Workflows
The Python Workflows SDK supports DAG workflows in a declarative way, using the step.do decorator with the depends parameter to define dependencies (other steps that must complete before this step can run).
from workers import Response, WorkflowEntrypoint
class PythonWorkflowStarter(WorkflowEntrypoint): async def run(self, event, step): async def await_step(fn): try: return await fn() except TypeError as e: print(f"Successfully caught {type(e).__name__}: {e}")
step.sleep('demo sleep', '10 seconds')
@step.do('dependency1') async def dep_1(): # does stuff print('executing dep1')
@step.do('dependency2') async def dep_2(): # does stuff print('executing dep2')
@step.do('demo do', depends=[dep_1, dep_2], concurrent=True) async def final_step(res1, res2): # does stuff print('something')
await await_step(final_step)
async def on_fetch(request, env): await env.MY_WORKFLOW.create() return Response("Hello world!")On this example, step_a and step_b are run concurrently before execution of my_final_step, which depends on both of them.
Having concurrent=True allows the dependencies to be resolved concurrently. If one of the callables passed to depends has already completed, it will be skipped and its return value will be reused.
This pattern is useful for diamond shaped workflows, where a step depends on two or more other steps that can run concurrently.
Was this helpful?
- Resources
- API
- New to Cloudflare?
- Directory
- Sponsorships
- Open Source
- Support
- Help Center
- System Status
- Compliance
- GDPR
- Company
- cloudflare.com
- Our team
- Careers
- © 2025 Cloudflare, Inc.
- Privacy Policy
- Terms of Use
- Report Security Issues
- Trademark
-