Coflux
BlogRelease 0.11 - Inputs

April 18, 2026

Release 0.11 - Inputs

Joe Freeman

The headline for this release is inputs - workflows can now request structured input from a user (an approval, a single value, or a group of values), and resume when the response is provided. This release also introduces a new select operation for waiting on the first of several executions/inputs to resolve, and adds a configurable drain window for workers.

Inputs

A lot of useful workflows aren't fully autonomous. Deployments need approving. Flagged content needs reviewing. A model's output needs sanity-checking before it ships.

With this release, that interaction can become part of the workflow itself. You define a Prompt, submit it, and the execution waits for a response in the same way it waits for the result of an execution:

import coflux as cf

approve = cf.Prompt("Approve deployment of {service}?", title="Deployment")

@cf.workflow()
def deploy(service: str):
    approve(service=service)  # blocks; raises InputDismissed if dismissed
    do_deploy(service)

A prompt without a model (as above) operates as a simple approve/reject gate. To collect structured data, pass a Pydantic model - the schema is used to render a form in Studio, and the response is validated. For example, a triage step that needs a human to categorise an incoming alert:

from typing import Literal
from pydantic import BaseModel

class Triage(BaseModel):
    severity: Literal["low", "medium", "high"]
    component: str
    assignee: str | None = None

triage = cf.Prompt("Triage incident:\n\n{{summary}}", model=Triage)

@cf.workflow()
def handle_incident(summary: str):
    decision = triage(summary=summary)  # decision is a Triage
    if decison.severity == "high":
        send_page(decision.assignee, summary)
    open_ticket(decision.component, summary)

Submitting a prompt synchronously blocks the execution. You can alternatively use prompt.submit(...) to get an Input handle, which behaves similarly to an Execution:

input = review.submit(item="order-123")
# ...do other work...
response = input.result()

Or use suspense to have the execution suspend and restart once a response has been provided.

See the documentation for details.

Select

Submitting several executions in parallel and waiting for all of them can be done by calling .submit() on each and then .result() on each. But sometimes you want to process results in the order they become available.

cf.select waits for the first of multiple handles to resolve:

handles = [query_replica.submit(replica, q) for replica in replicas]
winner, remaining = cf.select(handles)
return winner.result()

If the losers are no longer useful once a winner emerges, pass cancel_remaining=True to cancel them automatically:

winner, _ = cf.select(
    [strategy_a.submit(input), strategy_b.submit(input), strategy_c.submit(input)],
    cancel_remaining=True,
)
return winner.result()

Input handles can also be passed to select (and mixed with executions) - for example, race a slow automated lookup against a human prompt and take whichever arrives first.

select honours an enclosing cf.suspense(timeout=...) scope - if the wait expires, it raises TimeoutError.

See the documentation for details.

Worker drain timeout

As of this release, when workers are interrupted (with SIGINT/SIGTERM), they stop accepting new work and drain the in-flight executions - giving them a configurable window to finish before being cancelled. The same drain runs apply to reloads with --watch/--dev mode, so editing a file in a dev loop no longer kills work that was about to complete.

Configure the window with --drain-timeout (defaults to 2m; 0 waits indefinitely):

coflux worker --drain-timeout 5m myapp.workflows

Signal handling is phased so you can still bail out: a second SIGINT aborts the drain early; a third forces an exit.

Other notable changes

  • Async tasks and workflows - @cf.task() and @cf.workflow() now work with async def functions.
  • Per-call-site overrides - Target now has a fluent interface for overriding decorator-level options at a single call site. Chain them as needed: my_task.with_retries(3).with_timeout(30).submit(x).
  • Cancellation - cf.cancel([handle, ...]) cancels multiple handles (executions/inputs) atomically.

Feedback welcome! (joe@coflux.com)

Join the mailing list

Get notified of new product features.