Skip to content

wt-invokers

wt-invokers provides the execution layer for the wt ecosystem. It defines an abstract AbstractInvoker interface with two concrete implementations for running compiled workflows.

Modules: abstract · local · cloud_batch

Public API:

from wt_invokers import (
    AbstractInvoker,
    LocalSubprocessInvoker,
    CloudBatchInvoker,
    InvokerError,
    InvocationTimeoutError,
    InstallationError,
)
Invoker Execution Target Supports Waiting
LocalSubprocessInvoker Local machine via pixi Yes
CloudBatchInvoker Google Cloud Batch containers No

AbstractInvoker

Abstract base class that all workflow invokers must implement.

Attributes

Attribute Type Description
matchspec MatchSpec Rattler MatchSpec identifying the workflow package
results_env_var str Name of the environment variable for results URL

Abstract Methods

Async API

All abstract methods are async. Callers must await them.

Method Signature Description
is_installed() async -> bool Check whether the workflow is installed
install() async -> None Install the workflow
run() see below Launch the workflow with configuration
wait() async -> int Wait for completion and return exit code

run() signature:

async def run(
    self,
    workflow_run_id: str,
    config_text: str,
    results_url: str,
    execution_mode: str,
    mock_io: bool,
    otel_exporter: str | None = None,
    otel_console_exporter_dst: str | None = None,
    extra_env: dict[str, str] | None = None,
    lithops_config_text: str | None = None,
    **kwargs: Any,
) -> None

Properties

Property Type Description
is_waitable bool Whether this invoker supports waiting for completion

Concrete Methods

Method Description
check_output() Run a one-off command in the workflow environment

LocalSubprocessInvoker

Executes workflows as local subprocesses using pixi environments. Suitable for development, testing, and small-scale deployments.

Attributes

Attribute Type Description
matchspec MatchSpec Workflow MatchSpec
results_env_var str Environment variable for results URL
cwd Path Working directory for subprocess execution

Properties

Property Value Description
entrypoint str Pixi command (e.g. pixi run -e default my-workflow)
is_waitable True Always supports waiting

Methods

Method Description
is_installed() Check if workflow is available in the pixi environment
install() Raises NotImplementedError
run(**kwargs) Launch workflow in a subprocess
wait() Block until subprocess completes, return exit code
check_output() Run a one-off command and capture stdout

CloudBatchInvoker

Submits workflows to Google Cloud Batch for containerized execution. Jobs are submitted asynchronously — does not wait for completion.

Requires GCP extras: pip install wt-invokers[gcp]

Environment Variables

Variable Required Default Description
GOOGLE_CLOUD_PROJECT yes GCP project ID
CLOUD_RUN_REGION no us-central1 GCP region
BATCH_SERVICE_ACCOUNT no Service account email

Properties

Property Value Description
entrypoint str Pixi command
is_waitable False Does not support waiting

Methods

Method Description
is_installed() Always returns True
install() Raises NotImplementedError
run(**kwargs) Submit workflow to Cloud Batch
wait() No-op, always returns 0

The Cloud Batch job is structured as a single TaskGroup with one Task containing one Runnable (the container).


Exceptions

Exception Description
InvokerError Base exception for invoker errors
InvocationTimeoutError Workflow execution timed out
InstallationError Workflow installation failed