Open-source workflow engine

Orchestrate and observe computational workflows defined in plain Python. Suitable for data pipelines, background tasks, chat bots.

Getting started

import coflux as cf
import requests

@cf.task(retries=2)
def fetch_splines(url):
    return requests.get(url).json()

@cf.task()
def reticulate(splines):
    return list(reversed(splines))

@cf.workflow()
def my_workflow(url):
    reticulate(fetch_splines(url))

More examples on GitHub...

Monitor

Real-time observability

Watch workflows execute in real-time in the graph-based web UI. Debug issues within a workflow by re-running problematic steps.

Screenshot of the Coflux UI

Build

Developer friendly

Optimised for rapid local development. Agents can detect code changes and automatically reload. Workflows are defined in plain Python - no static DAGs or magical DSLs - see how it works.
$ pip install coflux
$ coflux server
Server started. Running on port 7777.
$ coflux agent --dev my_example.repo
Connecting (localhost:7777, pSTtDMx, production)...
Connected.

Deploy

Agent-based architecture

Maintain control of deploying agents in your own infrastructure. Coflux takes care of orchestrating execution of tasks.

Architecture

Debug

Turbo-charged triage

Inspect arguments, results and errors in the web UI. Re-run failing production steps in your local environment. Memo-ise side-effecting (or slow) tasks so that you can re-run a workflow without re-running the task.

@cf.task(memo=True)
def send_notification(...):
    mailer.send(...)

Use cases

  • Data pipelines

  • Background tasks

  • Scientific models

  • Simulation and control

  • Bioinformatics

  • Machine learning

Features

Batteries included

Easily enable per-step features as needed and focus on business logic. Sensible defaults that can be customised as needed.

  • Distributed execution

    Start up as many agents as needed, and Coflux will distribute the workload between them.

  • Low-latency

    Scheduled tasks are run in isolated processes, and start in milliseconds.

  • Caching

    Enable caching for a task to have the result be re-used across runs.

  • Sensors

    Build sensors to react to events in your system in real-time, and trigger workflows.

  • Debouncing

    Defer execution of a task - e.g., to send a user notification - until tasks have stopped being scheduled.

  • Automatic retries

    Automatically retry faiFeatureng tasks (with exponential backoff).

  • Memoisation

    Memoise a task within a run so that the run can be re-executed without the memoised task being re-executed.

  • SchedulingComing soon

    Specify cron schedules for regularly executing tasks.

Join the mailing list

Get notified of new product features.