The headline for this release is metrics — you can now record numeric values from executions, and visualise the results in Studio. This release also adds worker-enforced execution timeouts, support for polling execution results, run-level defaults for memoisation, and experimental support for a Kubernetes launcher.
Metrics
You can now record metrics from within tasks. Metrics are streamed in real-time and can be used to track training progress, monitor performance, or surface any numeric signal from your code.
The API is straightforward — create a Metric and call record():
import coflux as cf
loss = cf.Metric("loss", group="training")
accuracy = cf.Metric("accuracy", group="training")
@cf.task()
def train(epochs):
for epoch in range(epochs):
loss_val, acc_val = run_epoch(epoch)
loss.record(loss_val, at=epoch)
accuracy.record(acc_val, at=epoch)
By default, values are associated with the time since the execution started, but you can specify an explicit x-value with at= — useful when you want to plot against epochs, steps, or any other dimension.
Metrics within the same group are displayed together on a shared chart in Studio. You can configure the axes further with MetricGroup and MetricScale:
group = cf.MetricGroup("training", units="epoch")
accuracy = cf.Metric("accuracy", group=group, scale=cf.MetricScale(units="%", lower=0, upper=100))
For tracking iteration progress, there's a progress() helper that wraps an iterable and automatically records a progress metric as items are consumed:
@cf.task()
def process_batch(items):
for item in cf.progress(items):
handle(item)
This renders as a progress bar in Studio, giving visibility into long-running operations.
Currently a built-in metric store is supported, but the intention is to support integrating with external stores in future.
Timeouts
Tasks can now specify a timeout. If an execution exceeds its timeout, the process is killed and child executions are cancelled. Any execution waiting on the result will receive an ExecutionTimeout exception:
@cf.task(timeout=30)
def call_external_api():
...
Timeouts compose with retries — a timed-out execution counts as a failure and can trigger a retry (if configured).
Polling
Previously, calling .result() on an execution would suspend the caller until the result was available (or trigger suspense, if configured). You can now use .poll() to check whether a result is ready without suspending:
execution = slow_task.submit()
while (result := execution.poll(timeout=1)) is None:
print("waiting...")
poll() returns the result if it's available, or None (by default) if the execution is still running. The optional timeout parameter specifies how long to wait (in seconds) before returning None.
Run-level memoisation
Workflows can now opt into memoisation, setting an (overridable) default for steps in the run. Memoised tasks (that have matching arguments) get reused within a run - this enables re-running specific steps in a run and have them reuse results from their dependencies without re-running them (and also functions as per-run caching).
@cf.workflow(memo=True)
def analyse():
...
Kubernetes launcher (experimental)
Coflux can now launch workers as Kubernetes jobs. Configure a pool with the kubernetes launcher type, and the server will create and manage pods on demand:
coflux pools create k8s-pool --type kubernetes \
--set image=myorg/myapp:latest \
--set namespace=coflux-workers \
--modules myapp.workflows
When running inside a Kubernetes cluster, authentication is automatic. For external access, you can provide an API server URL and token (though note this token is stored in the orchestration database):
coflux pools create k8s-pool --type kubernetes \
--set image=myorg/myapp:latest \
--set apiServer=https://my-cluster:6443 \
--set token="$(cat /path/to/token)" \
--set serverHost=coflux.example.com:7777 \
--modules myapp.workflows
Other notable changes
- Recurrent tasks: Recurrent targets now only recur when they return
None. Returning a value completes the cycle, giving an explicit way to stop recurrence. - Concurrent step prevention: Re-running a step will now cancel any in-progress execution of that step. Re-running a scheduled retry will expedite the retry.
- Run-level
requires: A workflow'srequirestags now apply to the entire run (and merged with step-level tags), so child tasks are automatically routed to matching workers unless explicitly overridden. - Submit overrides: The
coflux submitcommand now supports flags for overriding workflow options at submission time —--requires,--memo/--no-memo,--delay, and--retries. - Pool
acceptstags: Pools can now defineacceptstags (in addition to existingprovides). When set, only executions whoserequirestags match will be assigned to the pool — preventing general work from being scheduled to specialised workers. - Pool management: Pools can now be disabled and re-enabled without deleting them. Disabled pools drain their workers and won't accept new executions.
- Pool export/import: Pool configurations can be exported and imported, making it easier to replicate setups across environments.
Feedback welcome! (joe@coflux.com)