Skip to content

wt-task

wt-task provides the @task decorator that wraps plain Python functions with a chainable, composable execution API. Used in generated DAG code to execute registered functions with validation, error handling, and tracing.

Modules: decorator · base · sync_task · async_task · exceptions · skip · executors · testing · tracing

Public API:

from wt_task import task, SyncTask, AsyncTask

@task Decorator

def task(
    func: Callable[P, R] | None = None,
    *,
    description: str | None = None,
    tags: list[str] | None = None,
) -> SyncTask[P, R, K, V] | Callable[[Callable[P, R]], SyncTask[P, R, K, V]]

Works as both a decorator and a wrapper function. Creates a SyncTask instance.

Usage forms

# Bare decorator
@task
def my_func(x: int) -> int:
    return x * 2

# With arguments
@task(description="...", tags=["io"])
def my_func(x: int) -> int:
    return x * 2

# Wrapper function (used in generated DAG code)
result = task(registered_func).partial(x=5).call()

SyncTask fields

Field Type Description
func Callable The wrapped function
tags list[str] Categorization tags
description str \| None Task description
task_instance_id str \| None Unique identifier for this instance
executor Executor \| None Custom executor backend

Execution Methods

.call(*args, **kwargs) -> R

Execute the task directly with the given arguments.

.map(argnames, argvalues) -> Sequence[R]

Map the task over a sequence. argnames is the parameter name (or list of names) to bind each element to; argvalues is the iterable.

results = task(process_item).map("item", items)
# Equivalent to: [process_item(item=x) for x in items]

.mapvalues(argnames, argvalues) -> Sequence[tuple[K, R]]

Map over key-value pairs, preserving the keys. Input must be a sequence of (key, value) tuples.

results = task(transform).mapvalues("data", grouped_data)
# Input:  [("group_a", data_a), ("group_b", data_b)]
# Output: [("group_a", result_a), ("group_b", result_b)]

Transformation Methods

All transformation methods return a new task instance (immutable — the original is unchanged), enabling method chaining:

result = (
    task(my_func)
    .set_task_instance_id("step_one")
    .partial(x=5)
    .validate()
    .handle_errors()
    .call()
)
Method Signature Description
.partial(**kwargs) -> Self Bind keyword arguments
.validate() -> Self Enable Pydantic validation (coerces string inputs to correct types)
.set_task_instance_id(id) -> Self Assign a unique identifier for error reporting
.handle_errors() -> Self Wrap exceptions as TaskInstanceError with instance ID
.skipif(conditions, unpack_depth=1) -> Self Conditionally skip based on boolean condition functions
.with_tracing() -> Self Enable OpenTelemetry tracing
.set_executor(name_or_executor) -> SyncTask \| AsyncTask Switch executor backend

Exceptions and Skip

Name Description
TaskInstanceError Wraps an exception with the task instance ID for debugging
SkipSentinel Returned when a task is skipped via .skipif()
SKIP_SENTINEL Singleton instance of SkipSentinel

Testing

from wt_task.testing import create_func_magicmock

create_func_magicmock creates a MagicMock with the same signature as the original function, useful for testing DAG code without executing real tasks.