Skip to content

Tasks

Tasks are the public scheduling unit in Asyncz. A task combines the callable, trigger, executor alias, arguments, and persistence metadata required to schedule and reschedule work.

from asyncz.tasks import Task

Core task fields

  • id
  • name
  • fn
  • args
  • kwargs
  • trigger
  • executor
  • mistrigger_grace_time
  • coalesce
  • max_instances
  • next_run_time

Task state and inspection

Every task exposes a derived scheduling state:

  • pending: the task is still in the scheduler's pending queue and has not been committed to a started store yet
  • paused: the task exists but has no next_run_time
  • scheduled: the task has an upcoming next_run_time

Useful task inspection helpers:

  • task.schedule_state
  • task.paused
  • task.snapshot()

Example:

task = scheduler.add_task(cleanup, "interval", minutes=5, id="cleanup")
info = task.snapshot()

print(info.callable_reference)
print(info.trigger_alias)
print(info.schedule_state.value)

snapshot() returns an immutable TaskInfo model that is safe to pass into:

  • dashboards
  • CLI formatting code
  • structured logging
  • tests and assertions

Creating a task directly

from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger

# Create a scheduler
scheduler = AsyncIOScheduler()


def check_status():
    # Logic to check statuses
    ...


Task(
    id="my-task",
    fn=check_status,
    name="my-func",
    scheduler=scheduler,
    trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
    max_instances=3,
    coalesce=True,
)

scheduler.start()

# or manually submit one with a changed trigger

t = Task(
    id="my-task2",
    fn=check_status,
    max_instances=1,
    coalesce=True,
)

scheduler.add_task(
    t, trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1)
)

In most applications you will create tasks through scheduler.add_task(...), which returns a Task instance after applying scheduler defaults.

Updating a task

from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger

# Create a scheduler
scheduler = AsyncIOScheduler()


def check_status():
    # Logic to check statuses
    ...


# Create a task
task = Task(
    id="my-task",
    fn=check_status,
    name="my-func",
    scheduler=scheduler,
    trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
    max_instances=3,
    coalesce=True,
)

# Update the task
task.update(
    name="my-new-task-id",
    max_instances=5,
    coalesce=False,
)

Important constraints:

  • task ids are immutable
  • changing next_run_time requires a scheduler so Asyncz can normalize the datetime
  • task callables must remain serializable if you want to persist them in a store

Rescheduling a task

from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger

# Create a scheduler
scheduler = AsyncIOScheduler()


def check_status():
    # Logic to check statuses
    ...


# Create a task
task = Task(
    id="my-task",
    fn=check_status,
    name="my-func",
    scheduler=scheduler,
    trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
    max_instances=3,
    coalesce=True,
)

# Reschedule the task
task.reschedule("my-task", trigger="cron", hour=10, minute=5)

reschedule_task() changes the trigger and recomputes the next run time.

Decorator mode

When you call scheduler.add_task(...) without a callable, Asyncz returns a decorator-mode task. When the decorated function is applied, Asyncz creates a submitted copy of that task definition.

This is useful when you want task metadata to live next to the function definition but still be managed by the scheduler.

Lifecycle tasks

Asyncz supports lifecycle-style tasks implemented as generators or async generators.

from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function

# Create a scheduler
scheduler = AsyncIOScheduler()


def lifecycle_task():
    # setup
    ...
    # we have to mask generator send so it could be set to a task
    scheduler.add_task(make_function(generator.send), args=[False], trigger="shutdown")
    running = yield
    while running:
        # do something
        running = yield
    # cleanup


# setup task
generator = lifecycle_task()
generator.send(None)

# Run every 5 minutes
scheduler.add_task(make_function(generator.send), args=[True], trigger="interval", minutes=5)

scheduler.start()
...
# Now the shutdown task is executed and the generator progresses in the cleanup
scheduler.stop()

Notes:

  • lifecycle generators only work with MemoryStore
  • generator-based tasks are not pickleable, so they cannot be persisted to Redis, MongoDB, SQLAlchemy, or FileStore

Lifecycle tasks in multi-process deployments

Asyncz can combine lifecycle tasks with file-based locks to coordinate setup, tick, and cleanup behavior across multiple worker processes.

Examples:

import tempfile
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function

# Create a scheduler
scheduler = AsyncIOScheduler(
    lock_path="/tmp/asyncz_{pgrp}_{store}.lock",
    stores={
        "default": {"type": "file", "directory": tempfile.mkdtemp(), "cleanup_directory": True},
        "memory": {"type": "memory"},
    },
)


def lifecycle_task():
    # setup
    ...
    # we have to mask generator send so it could be set to a task
    scheduler.add_task(
        make_function(generator.send), args=[False], trigger="shutdown", store="memory"
    )
    running = yield
    while running:
        # do something
        running = yield
    # cleanup


# setup task
generator = lifecycle_task()
generator.send(None)


# must be a global referencable function
def lifecycle_tick():
    generator.send(True)


# Run every 5 minutes
scheduler.add_task(lifecycle_tick, trigger="interval", minutes=5)

scheduler.start()
...
# Now the shutdown task is executed and the generator progresses in the cleanup
scheduler.stop()
import tempfile
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function
from asyncz.locks import FileLockProtected

# Create a scheduler
scheduler = AsyncIOScheduler(
    lock_path="/tmp/asyncz_{pgrp}_{store}.lock",
    stores={
        "default": {"type": "file", "directory": tempfile.mkdtemp(), "cleanup_directory": True},
        "memory": {"type": "memory"},
    },
)


def lifecycle_task(name: str):
    # setup initial
    ...
    # intialize a file lock (multi-processing safe)
    file_lock = FileLockProtected(f"/tmp/asyncz_bg_{name}_{{pgrp}}.lock")
    while True:
        # don't block the generator
        with file_lock.protected(False) as got_the_lock:
            if not got_the_lock:
                running = yield
                if not running:
                    break
                continue
            # delayed setup phase. Only executed when the lock was grabbed. e.g. for  creating db clients.
            ...
            # we have to mask generator send so it could be set to a task
            scheduler.add_task(
                make_function(generator.send), args=[False], trigger="shutdown", store="memory"
            )
            running = yield
            while running:
                # do something safe
                try:
                    # do something risky
                    ...
                except Exception:
                    # log
                    ...
                running = yield
            try:
                # cleanup the loop setup
                ...
            except Exception:
                # log
                ...
            # break the loop
            break
    # extra cleanup which is always executed except an exception was raised


# setup task
generator = lifecycle_task("foo")
generator.send(None)


# must be a global referencable function
def lifecycle_tick():
    generator.send(True)


# Run every 5 minutes
scheduler.add_task(lifecycle_tick, trigger="interval", minutes=5)

# should be better a context manager or lifespan wrapper (.asgi) to cleanup on unexpected errors
with scheduler:
    ...
    # Now the shutdown task is executed and the generator progresses in the cleanup