Version 0.2 has been released! A couple of notable features are worth mentioning...
Assets
The main feature of this release is the support for assets. Assets can be 'persisted' from one execution and then 'restored' to another.
Refer to the documentation for details, but as a quick example, assets are persisted by passing a path to the persist_asset
function:
import coflux as cf
@task()
def load_photo():
# ...
asset = cf.persist_asset(path)
# ...
The asset
can now be returned, or passed as an argument to another task.
And restored using the restore_asset
function:
import coflux as cf
@task()
def detect_subject(asset):
path = cf.restore_asset(asset)
# ...
The asset
variable above is an Asset
object, which is simply a reference to the asset. Coflux tracks which assets are restored by a task, and uses this to indicate the flow of data in the web UI.
The web UI also supports previewing of some asset types.
Explicit waiting
Another notable feature in this release is the addition of an option to tell the orchestrator to wait for other executions to complete before assigning and starting a task. This works in conjunction with asynchronous execution (.submit()
), and avoids having a task running that's just waiting on another task to complete.
Again, see the documentation, but to provide an example:
@task(wait_for={"user_execution", "product_execution"})
def create_order(user_execution, product_execution):
user = user_execution.result()
product = product_execution.result()
# ...
Referring to the timeline in the web UI, we can see from the gray region that the final task has been scheduled, but not assigned to an agent and started:
Although this has a marginal overhead on the total (wall clock) time, it can significantly reduce the number of concurrently running executions, and the time that tasks spend idle.
Feeback welcome! (joe@coflux.com)