Schedulers¶
Schedulers coordinate every other Asyncz component. They own the configured stores and executors, calculate due run times, submit work, dispatch events, and expose the public task-management API.
Available schedulers¶
AsyncIOScheduler¶
The default scheduler for most applications. It can reuse an existing event loop or create an isolated loop when needed.
NativeAsyncIOScheduler¶
Use this variant when the surrounding application already owns the running event loop and you want start() / shutdown() to be awaitable.
Defaults¶
When you instantiate a scheduler with no explicit configuration, Asyncz will lazily add:
- the
memorystore asdefault - the
asyncioexecutor asdefault
Configuring a scheduler¶
Asyncz accepts direct Python objects, plugin aliases, or configuration dictionaries.
Python-first configuration¶
from datetime import timezone as tz
from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.stores.mongo import MongoDBStore
from asyncz.stores.redis import RedisStore
# Define the stores
# Override the default MemoryStore to become RedisStore where the db is 0
stores = {"mongo": MongoDBStore(), "default": RedisStore(database=0)}
# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {
"default": AsyncIOExecutor(),
"threadpool": ThreadPoolExecutor(max_workers=20),
}
# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}
# Create the scheduler
scheduler = AsyncIOScheduler(
stores=stores, executors=executors, task_defaults=task_defaults, timezone=tz.utc
)
# Start the scheduler
with scheduler:
# note: you can also use start() and shutdown() manually
# Nesting is also not a problem (start and shutdown are refcounted and only the outermost scope does start and shutdown the scheduler)
scheduler.start()
scheduler.stop()
# manually you have more control like:
scheduler.start(paused=True)
scheduler.resume()
# noop because not outermost scope
with scheduler:
...
scheduler.shutdown(wait=False)
Configuration dictionary at construction time¶
from asyncz.schedulers.asyncio import AsyncIOScheduler
# Create the scheduler
scheduler = AsyncIOScheduler(
global_config={
"asyncz.stores.mongo": {"type": "mongodb"},
"asyncz.stores.default": {"type": "redis", "database": "0"},
"asyncz.executors.pool": {
"max_workers": "20",
"class": "asyncz.executors.pool:ThreadPoolExecutor",
},
"asyncz.executors.default": {"class": "asyncz.executors.asyncio:AsyncIOExecutor"},
"asyncz.task_defaults.coalesce": "false",
"asyncz.task_defaults.max_instances": "3",
"asyncz.task_defaults.timezone": "UTC",
},
)
# Start the scheduler
with scheduler:
...
# your code
Reconfiguring with setup()¶
from datetime import timezone as tz
from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.stores import MongoDBStore, RedisStore
# Define the stores
# Override the default MemoryStore to become RedisStore where the db is 0
stores = {"mongo": MongoDBStore(), "default": RedisStore(database=0)}
# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {
"default": AsyncIOExecutor(),
"threadpool": ThreadPoolExecutor(max_workers=20),
}
# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}
# Create the scheduler
scheduler = AsyncIOScheduler()
## Add some tasks here or anything else (for instance 3 tasks)
scheduler.add_task(...)
scheduler.add_task(...)
scheduler.add_task(...)
scheduler.setup(stores=stores, executors=executors, task_defaults=task_defaults, timezone=tz.utc)
# Start the scheduler
with scheduler:
...
# your code
Logging¶
Asyncz uses the standard logging module.
The default scheduler logger namespace is:
If you pass logger_name="worker-a", the scheduler logger becomes:
Stores and executors follow the same convention:
asyncz.stores.<alias>asyncz.executors.<alias>
If you need custom logger creation, provide a custom loggers_class. The built-in implementation is asyncz.schedulers.base.ClassicLogging.
Starting and stopping¶
from asyncz.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
# Start the scheduler
scheduler.start()
from asyncz.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
# Shutdown the scheduler
scheduler.shutdown()
Adding tasks¶
scheduler.add_task(...) is the main entry point for registering work.
- Pass a trigger instance directly.
- Or pass a trigger alias plus trigger-specific keyword arguments.
- The return value is an
asyncz.tasks.Task.
from datetime import timezone as tz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger
from asyncz.tasks import Task
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def collect_www_info():
# Add logic to collect information from the internet
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every 2 minutes
scheduler.add_task(
collect_www_info,
trigger=IntervalTrigger(minutes=2),
max_instances=1,
replace_existing=True,
coalesce=True,
)
# Run every 10 minutes
scheduler.add_task(
check_status,
trigger=IntervalTrigger(minutes=10),
max_instances=1,
replace_existing=True,
coalesce=False,
)
# Run every 10 minutes collect_www_info before sending the newsletter
feed_data = collect_www_info()
scheduler.add_task(
fn_or_task=send_email_newsletter,
args=[feed_data],
trigger=IntervalTrigger(minutes=10),
max_instances=1,
replace_existing=True,
coalesce=False,
)
# Add Task object
task = Task(
fn=send_email_newsletter,
args=[feed_data],
trigger=IntervalTrigger(minutes=10),
max_instances=1,
replace_existing=True,
coalesce=False,
)
# you can update most attributes here. Note: a task can be only submitted once
scheduler.add_task(task)
# Use Task as decorator (leave fn empty)
decorator = scheduler.add_task(
args=[feed_data],
trigger=IntervalTrigger(minutes=10),
max_instances=1,
replace_existing=True,
coalesce=False,
)
decorator(send_email_newsletter)
# Start the scheduler
scheduler.start()
# Add paused Task
scheduler.add_task(
send_email_newsletter,
args=[feed_data],
trigger=IntervalTrigger(minutes=10),
max_instances=1,
replace_existing=True,
coalesce=False,
# this pauses the task on submit
next_run_time=None,
)
Decorator mode¶
If you omit the callable, add_task() returns a decorator-style task definition.
from datetime import timezone as tz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)
# Run every Monday, Wednesday and Friday
@scheduler.add_task(
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5")
)
def send_email_newsletter():
# Add logic to send emails here
...
# Run every 2 minutes
@scheduler.add_task(
trigger=IntervalTrigger(minutes=2),
max_instances=1,
coalesce=True,
)
def collect_www_info():
# Add logic to collect information from the internet
...
@scheduler.add_task(
trigger=IntervalTrigger(minutes=10),
max_instances=1,
coalesce=False,
)
def check_status():
# Logic to check a given status of whatever needed
...
# has now asyncz_tasks containing the copy
hasattr(check_status, "asyncz_tasks")
# Start the scheduler
scheduler.start()
Task management operations¶
Asyncz also supports:
pause()/resume()for the whole schedulerrun_task()for administrative "run now" flowspause_task()/resume_task()update_task()reschedule_task()delete_task()/remove_all_tasks()
Examples:
from datetime import timezone as tz
from asyncz.schedulers.asyncio import AsyncIOScheduler
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
task = scheduler.add_task(send_email_newsletter, "cron", hour="0-23", minute="1")
# Pause task
task.pause()
# Run every hour and minute 1
task = scheduler.add_task(check_status, "cron", hour="0-23", minute="1")
# Pause task
task.pause()
from datetime import timezone as tz
from asyncz.schedulers.asyncio import AsyncIOScheduler
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
task = scheduler.add_task(send_email_newsletter, "cron", hour="0-23", minute="1")
# resume task
task.resume()
# Run every hour and minute 1
task = scheduler.add_task(check_status, "cron", hour="0-23", minute="1")
# Resume task
task.resume()
from datetime import timezone as tz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
id="send_email_newsletter",
fn=send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every 10 minutes
check_status = scheduler.add_task(
id="check_status",
fn=check_status,
trigger=IntervalTrigger(minutes=10),
max_instances=1,
replace_existing=True,
coalesce=False,
)
# Update the task by id or object
scheduler.update_task("send_email_newsletter", coalesce=False, max_instances=4)
scheduler.update_task(check_status, coalesce=True, max_instances=3)
# Start the scheduler
scheduler.start()
from datetime import timezone as tz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
id="send_email_newsletter",
fn=send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every 10 minutes
check_status = scheduler.add_task(
id="check_status",
fn=check_status,
trigger=IntervalTrigger(minutes=10),
max_instances=1,
replace_existing=True,
coalesce=False,
)
# Reschedule the tasks
scheduler.reschedule_task("send_email_newsletter", trigger="cron", day_of_week="mon", hour="1")
scheduler.reschedule_task(check_status, trigger="interval", minutes=20)
# Start the scheduler
scheduler.start()
Running a task immediately¶
Use scheduler.run_task(...) when you need an explicit operator-driven execution outside the normal trigger cadence.
from asyncz.schedulers import AsyncIOScheduler
scheduler = AsyncIOScheduler()
task = scheduler.add_task(my_cleanup, "interval", minutes=30, id="cleanup")
scheduler.start(paused=True)
# Force an immediate execution, then keep the task paused if the trigger has no
# future run time left (useful for one-off/date tasks surfaced in admin tools).
scheduler.run_task("cleanup", remove_finished=False)
Important behavior:
run_task()uses the task's configured executor- it dispatches
TASK_SUBMITTED(orTASK_MAX_INSTANCES) just like the main scheduler loop - it recomputes and persists the next run time after submission
- if the trigger is exhausted,
remove_finished=Trueremoves the task whileFalsekeeps it paused
Task inspection and querying¶
Schedulers expose immutable inspection snapshots through get_task_info() and get_task_infos().
info = scheduler.get_task_info("cleanup")
assert info is not None
print(info.schedule_state.value, info.trigger_alias, info.next_run_time)
scheduled = scheduler.get_task_infos(
schedule_state="scheduled",
trigger="interval",
q="cleanup",
sort_by="next_run_time",
)
get_task_infos() is intended for operational surfaces such as:
- admin dashboards
- CLI list commands
- health checks and diagnostics
- custom support tooling
Supported filters:
schedule_state:pending,paused, orscheduledexecutortriggerq: case-insensitive free-text search across identifiers, names, callable metadata, trigger metadata, executor, store, and state
Supported sort keys:
idnamenext_run_timeschedule_stateexecutorstoretrigger
Multi-process locking¶
Schedulers support file-based inter-process coordination through lock_path.
Example:
from asyncz.schedulers.asyncio import AsyncIOScheduler
# Create the scheduler
scheduler = AsyncIOScheduler(
global_config={
"asyncz.lock_path": "/tmp/asynzc_super_project_{store}_store.pid",
"asyncz.startup_delay": 2,
"asyncz.stores.mongo": {"type": "mongodb"},
"asyncz.stores.default": {"type": "redis", "database": "0"},
"asyncz.executors.pool": {
"max_workers": "20",
"class": "asyncz.executors.pool:ThreadPoolExecutor",
},
"asyncz.executors.default": {"class": "asyncz.executors.asyncio:AsyncIOExecutor"},
"asyncz.task_defaults.coalesce": "false",
"asyncz.task_defaults.max_instances": "3",
"asyncz.task_defaults.timezone": "UTC",
},
)
# Start the scheduler
with scheduler:
...
# your code
Available placeholders:
{store}: the store alias{ppid}: the parent process id{pgrp}: the process group id
When lock_path is set, Asyncz also defaults startup_delay to 1 second so multiple workers do not immediately stampede the same persisted tasks on startup.
ASGI integration and context managers¶
Schedulers can be used as:
- ASGI wrappers via
scheduler.asgi(...) - synchronous context managers
- asynchronous context managers
See ASGI and Context Managers for the full lifecycle patterns.