Tasks¶
Tasks are the public scheduling unit in Asyncz. A task combines the callable, trigger, executor alias, arguments, and persistence metadata required to schedule and reschedule work.
Core task fields¶
idnamefnargskwargstriggerexecutormistrigger_grace_timecoalescemax_instancesnext_run_time
Task state and inspection¶
Every task exposes a derived scheduling state:
pending: the task is still in the scheduler's pending queue and has not been committed to a started store yetpaused: the task exists but has nonext_run_timescheduled: the task has an upcomingnext_run_time
Useful task inspection helpers:
task.schedule_statetask.pausedtask.snapshot()
Example:
task = scheduler.add_task(cleanup, "interval", minutes=5, id="cleanup")
info = task.snapshot()
print(info.callable_reference)
print(info.trigger_alias)
print(info.schedule_state.value)
snapshot() returns an immutable TaskInfo model that is safe to pass into:
- dashboards
- CLI formatting code
- structured logging
- tests and assertions
Creating a task directly¶
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger
# Create a scheduler
scheduler = AsyncIOScheduler()
def check_status():
# Logic to check statuses
...
Task(
id="my-task",
fn=check_status,
name="my-func",
scheduler=scheduler,
trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
max_instances=3,
coalesce=True,
)
scheduler.start()
# or manually submit one with a changed trigger
t = Task(
id="my-task2",
fn=check_status,
max_instances=1,
coalesce=True,
)
scheduler.add_task(
t, trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1)
)
In most applications you will create tasks through scheduler.add_task(...), which returns a Task instance after applying scheduler defaults.
Updating a task¶
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger
# Create a scheduler
scheduler = AsyncIOScheduler()
def check_status():
# Logic to check statuses
...
# Create a task
task = Task(
id="my-task",
fn=check_status,
name="my-func",
scheduler=scheduler,
trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
max_instances=3,
coalesce=True,
)
# Update the task
task.update(
name="my-new-task-id",
max_instances=5,
coalesce=False,
)
Important constraints:
- task ids are immutable
- changing
next_run_timerequires a scheduler so Asyncz can normalize the datetime - task callables must remain serializable if you want to persist them in a store
Rescheduling a task¶
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger
# Create a scheduler
scheduler = AsyncIOScheduler()
def check_status():
# Logic to check statuses
...
# Create a task
task = Task(
id="my-task",
fn=check_status,
name="my-func",
scheduler=scheduler,
trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
max_instances=3,
coalesce=True,
)
# Reschedule the task
task.reschedule("my-task", trigger="cron", hour=10, minute=5)
reschedule_task() changes the trigger and recomputes the next run time.
Decorator mode¶
When you call scheduler.add_task(...) without a callable, Asyncz returns a decorator-mode task. When the decorated function is applied, Asyncz creates a submitted copy of that task definition.
This is useful when you want task metadata to live next to the function definition but still be managed by the scheduler.
Lifecycle tasks¶
Asyncz supports lifecycle-style tasks implemented as generators or async generators.
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function
# Create a scheduler
scheduler = AsyncIOScheduler()
def lifecycle_task():
# setup
...
# we have to mask generator send so it could be set to a task
scheduler.add_task(make_function(generator.send), args=[False], trigger="shutdown")
running = yield
while running:
# do something
running = yield
# cleanup
# setup task
generator = lifecycle_task()
generator.send(None)
# Run every 5 minutes
scheduler.add_task(make_function(generator.send), args=[True], trigger="interval", minutes=5)
scheduler.start()
...
# Now the shutdown task is executed and the generator progresses in the cleanup
scheduler.stop()
Notes:
- lifecycle generators only work with
MemoryStore - generator-based tasks are not pickleable, so they cannot be persisted to Redis, MongoDB, SQLAlchemy, or FileStore
Lifecycle tasks in multi-process deployments¶
Asyncz can combine lifecycle tasks with file-based locks to coordinate setup, tick, and cleanup behavior across multiple worker processes.
Examples:
import tempfile
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function
# Create a scheduler
scheduler = AsyncIOScheduler(
lock_path="/tmp/asyncz_{pgrp}_{store}.lock",
stores={
"default": {"type": "file", "directory": tempfile.mkdtemp(), "cleanup_directory": True},
"memory": {"type": "memory"},
},
)
def lifecycle_task():
# setup
...
# we have to mask generator send so it could be set to a task
scheduler.add_task(
make_function(generator.send), args=[False], trigger="shutdown", store="memory"
)
running = yield
while running:
# do something
running = yield
# cleanup
# setup task
generator = lifecycle_task()
generator.send(None)
# must be a global referencable function
def lifecycle_tick():
generator.send(True)
# Run every 5 minutes
scheduler.add_task(lifecycle_tick, trigger="interval", minutes=5)
scheduler.start()
...
# Now the shutdown task is executed and the generator progresses in the cleanup
scheduler.stop()
import tempfile
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.utils import make_function
from asyncz.locks import FileLockProtected
# Create a scheduler
scheduler = AsyncIOScheduler(
lock_path="/tmp/asyncz_{pgrp}_{store}.lock",
stores={
"default": {"type": "file", "directory": tempfile.mkdtemp(), "cleanup_directory": True},
"memory": {"type": "memory"},
},
)
def lifecycle_task(name: str):
# setup initial
...
# intialize a file lock (multi-processing safe)
file_lock = FileLockProtected(f"/tmp/asyncz_bg_{name}_{{pgrp}}.lock")
while True:
# don't block the generator
with file_lock.protected(False) as got_the_lock:
if not got_the_lock:
running = yield
if not running:
break
continue
# delayed setup phase. Only executed when the lock was grabbed. e.g. for creating db clients.
...
# we have to mask generator send so it could be set to a task
scheduler.add_task(
make_function(generator.send), args=[False], trigger="shutdown", store="memory"
)
running = yield
while running:
# do something safe
try:
# do something risky
...
except Exception:
# log
...
running = yield
try:
# cleanup the loop setup
...
except Exception:
# log
...
# break the loop
break
# extra cleanup which is always executed except an exception was raised
# setup task
generator = lifecycle_task("foo")
generator.send(None)
# must be a global referencable function
def lifecycle_tick():
generator.send(True)
# Run every 5 minutes
scheduler.add_task(lifecycle_tick, trigger="interval", minutes=5)
# should be better a context manager or lifespan wrapper (.asgi) to cleanup on unexpected errors
with scheduler:
...
# Now the shutdown task is executed and the generator progresses in the cleanup