Skip to content

Schedulers

Schedulers coordinate every other Asyncz component. They own the configured stores and executors, calculate due run times, submit work, dispatch events, and expose the public task-management API.

Available schedulers

AsyncIOScheduler

The default scheduler for most applications. It can reuse an existing event loop or create an isolated loop when needed.

from asyncz.schedulers import AsyncIOScheduler

NativeAsyncIOScheduler

Use this variant when the surrounding application already owns the running event loop and you want start() / shutdown() to be awaitable.

from asyncz.schedulers import NativeAsyncIOScheduler

Defaults

When you instantiate a scheduler with no explicit configuration, Asyncz will lazily add:

  • the memory store as default
  • the asyncio executor as default

Configuring a scheduler

Asyncz accepts direct Python objects, plugin aliases, or configuration dictionaries.

Python-first configuration

from datetime import timezone as tz

from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.stores.mongo import MongoDBStore
from asyncz.stores.redis import RedisStore

# Define the stores
# Override the default MemoryStore to become RedisStore where the db is 0
stores = {"mongo": MongoDBStore(), "default": RedisStore(database=0)}

# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {
    "default": AsyncIOExecutor(),
    "threadpool": ThreadPoolExecutor(max_workers=20),
}

# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}

# Create the scheduler
scheduler = AsyncIOScheduler(
    stores=stores, executors=executors, task_defaults=task_defaults, timezone=tz.utc
)

# Start the scheduler
with scheduler:
    # note: you can also use start() and shutdown() manually
    # Nesting is also not a problem (start and shutdown are refcounted and only the outermost scope does start and shutdown the scheduler)

    scheduler.start()

    scheduler.stop()

# manually you have more control like:
scheduler.start(paused=True)
scheduler.resume()

# noop because not outermost scope
with scheduler:
    ...
scheduler.shutdown(wait=False)

Configuration dictionary at construction time

from asyncz.schedulers.asyncio import AsyncIOScheduler

# Create the scheduler
scheduler = AsyncIOScheduler(
    global_config={
        "asyncz.stores.mongo": {"type": "mongodb"},
        "asyncz.stores.default": {"type": "redis", "database": "0"},
        "asyncz.executors.pool": {
            "max_workers": "20",
            "class": "asyncz.executors.pool:ThreadPoolExecutor",
        },
        "asyncz.executors.default": {"class": "asyncz.executors.asyncio:AsyncIOExecutor"},
        "asyncz.task_defaults.coalesce": "false",
        "asyncz.task_defaults.max_instances": "3",
        "asyncz.task_defaults.timezone": "UTC",
    },
)

# Start the scheduler
with scheduler:
    ...
    # your code

Reconfiguring with setup()

from datetime import timezone as tz

from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.stores import MongoDBStore, RedisStore

# Define the stores
# Override the default MemoryStore to become RedisStore where the db is 0
stores = {"mongo": MongoDBStore(), "default": RedisStore(database=0)}

# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {
    "default": AsyncIOExecutor(),
    "threadpool": ThreadPoolExecutor(max_workers=20),
}

# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}

# Create the scheduler
scheduler = AsyncIOScheduler()

## Add some tasks here or anything else (for instance 3 tasks)
scheduler.add_task(...)
scheduler.add_task(...)
scheduler.add_task(...)

scheduler.setup(stores=stores, executors=executors, task_defaults=task_defaults, timezone=tz.utc)

# Start the scheduler
with scheduler:
    ...
    # your code

Logging

Asyncz uses the standard logging module.

The default scheduler logger namespace is:

asyncz.schedulers

If you pass logger_name="worker-a", the scheduler logger becomes:

asyncz.schedulers.worker-a

Stores and executors follow the same convention:

  • asyncz.stores.<alias>
  • asyncz.executors.<alias>

If you need custom logger creation, provide a custom loggers_class. The built-in implementation is asyncz.schedulers.base.ClassicLogging.

Starting and stopping

from asyncz.schedulers.asyncio import AsyncIOScheduler

scheduler = AsyncIOScheduler()

# Start the scheduler
scheduler.start()
from asyncz.schedulers.asyncio import AsyncIOScheduler

scheduler = AsyncIOScheduler()

# Shutdown the scheduler
scheduler.shutdown()

Adding tasks

scheduler.add_task(...) is the main entry point for registering work.

  • Pass a trigger instance directly.
  • Or pass a trigger alias plus trigger-specific keyword arguments.
  • The return value is an asyncz.tasks.Task.
from datetime import timezone as tz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger
from asyncz.tasks import Task

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def collect_www_info():
    # Add logic to collect information from the internet
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every 2 minutes
scheduler.add_task(
    collect_www_info,
    trigger=IntervalTrigger(minutes=2),
    max_instances=1,
    replace_existing=True,
    coalesce=True,
)

# Run every 10 minutes
scheduler.add_task(
    check_status,
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    replace_existing=True,
    coalesce=False,
)

# Run every 10 minutes collect_www_info before sending the newsletter
feed_data = collect_www_info()

scheduler.add_task(
    fn_or_task=send_email_newsletter,
    args=[feed_data],
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    replace_existing=True,
    coalesce=False,
)

# Add Task object

task = Task(
    fn=send_email_newsletter,
    args=[feed_data],
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    replace_existing=True,
    coalesce=False,
)
# you can update most attributes here. Note: a task can be only submitted once
scheduler.add_task(task)

# Use Task as decorator (leave fn empty)
decorator = scheduler.add_task(
    args=[feed_data],
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    replace_existing=True,
    coalesce=False,
)
decorator(send_email_newsletter)


# Start the scheduler
scheduler.start()

# Add paused Task
scheduler.add_task(
    send_email_newsletter,
    args=[feed_data],
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    replace_existing=True,
    coalesce=False,
    # this pauses the task on submit
    next_run_time=None,
)

Decorator mode

If you omit the callable, add_task() returns a decorator-style task definition.

from datetime import timezone as tz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)


# Run every Monday, Wednesday and Friday
@scheduler.add_task(
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5")
)
def send_email_newsletter():
    # Add logic to send emails here
    ...


# Run every 2 minutes
@scheduler.add_task(
    trigger=IntervalTrigger(minutes=2),
    max_instances=1,
    coalesce=True,
)
def collect_www_info():
    # Add logic to collect information from the internet
    ...


@scheduler.add_task(
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    coalesce=False,
)
def check_status():
    # Logic to check a given status of whatever needed
    ...


# has now asyncz_tasks containing the copy
hasattr(check_status, "asyncz_tasks")

# Start the scheduler
scheduler.start()

Task management operations

Asyncz also supports:

  • pause() / resume() for the whole scheduler
  • run_task() for administrative "run now" flows
  • pause_task() / resume_task()
  • update_task()
  • reschedule_task()
  • delete_task() / remove_all_tasks()

Examples:

from datetime import timezone as tz

from asyncz.schedulers.asyncio import AsyncIOScheduler

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
task = scheduler.add_task(send_email_newsletter, "cron", hour="0-23", minute="1")

# Pause task
task.pause()

# Run every hour and minute 1
task = scheduler.add_task(check_status, "cron", hour="0-23", minute="1")

# Pause task
task.pause()
from datetime import timezone as tz

from asyncz.schedulers.asyncio import AsyncIOScheduler

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
task = scheduler.add_task(send_email_newsletter, "cron", hour="0-23", minute="1")

# resume task
task.resume()

# Run every hour and minute 1
task = scheduler.add_task(check_status, "cron", hour="0-23", minute="1")

# Resume task
task.resume()
from datetime import timezone as tz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    id="send_email_newsletter",
    fn=send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every 10 minutes
check_status = scheduler.add_task(
    id="check_status",
    fn=check_status,
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    replace_existing=True,
    coalesce=False,
)

# Update the task by id or object
scheduler.update_task("send_email_newsletter", coalesce=False, max_instances=4)
scheduler.update_task(check_status, coalesce=True, max_instances=3)

# Start the scheduler
scheduler.start()
from datetime import timezone as tz

from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger

# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)


def send_email_newsletter():
    # Add logic to send emails here
    ...


def check_status():
    # Logic to check a given status of whatever needed
    ...


# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
    id="send_email_newsletter",
    fn=send_email_newsletter,
    trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)

# Run every 10 minutes
check_status = scheduler.add_task(
    id="check_status",
    fn=check_status,
    trigger=IntervalTrigger(minutes=10),
    max_instances=1,
    replace_existing=True,
    coalesce=False,
)

# Reschedule the tasks
scheduler.reschedule_task("send_email_newsletter", trigger="cron", day_of_week="mon", hour="1")
scheduler.reschedule_task(check_status, trigger="interval", minutes=20)

# Start the scheduler
scheduler.start()

Running a task immediately

Use scheduler.run_task(...) when you need an explicit operator-driven execution outside the normal trigger cadence.

from asyncz.schedulers import AsyncIOScheduler

scheduler = AsyncIOScheduler()

task = scheduler.add_task(my_cleanup, "interval", minutes=30, id="cleanup")
scheduler.start(paused=True)

# Force an immediate execution, then keep the task paused if the trigger has no
# future run time left (useful for one-off/date tasks surfaced in admin tools).
scheduler.run_task("cleanup", remove_finished=False)

Important behavior:

  • run_task() uses the task's configured executor
  • it dispatches TASK_SUBMITTED (or TASK_MAX_INSTANCES) just like the main scheduler loop
  • it recomputes and persists the next run time after submission
  • if the trigger is exhausted, remove_finished=True removes the task while False keeps it paused

Task inspection and querying

Schedulers expose immutable inspection snapshots through get_task_info() and get_task_infos().

info = scheduler.get_task_info("cleanup")
assert info is not None
print(info.schedule_state.value, info.trigger_alias, info.next_run_time)

scheduled = scheduler.get_task_infos(
    schedule_state="scheduled",
    trigger="interval",
    q="cleanup",
    sort_by="next_run_time",
)

get_task_infos() is intended for operational surfaces such as:

  • admin dashboards
  • CLI list commands
  • health checks and diagnostics
  • custom support tooling

Supported filters:

  • schedule_state: pending, paused, or scheduled
  • executor
  • trigger
  • q: case-insensitive free-text search across identifiers, names, callable metadata, trigger metadata, executor, store, and state

Supported sort keys:

  • id
  • name
  • next_run_time
  • schedule_state
  • executor
  • store
  • trigger

Multi-process locking

Schedulers support file-based inter-process coordination through lock_path.

Example:

from asyncz.schedulers.asyncio import AsyncIOScheduler

# Create the scheduler
scheduler = AsyncIOScheduler(
    global_config={
        "asyncz.lock_path": "/tmp/asynzc_super_project_{store}_store.pid",
        "asyncz.startup_delay": 2,
        "asyncz.stores.mongo": {"type": "mongodb"},
        "asyncz.stores.default": {"type": "redis", "database": "0"},
        "asyncz.executors.pool": {
            "max_workers": "20",
            "class": "asyncz.executors.pool:ThreadPoolExecutor",
        },
        "asyncz.executors.default": {"class": "asyncz.executors.asyncio:AsyncIOExecutor"},
        "asyncz.task_defaults.coalesce": "false",
        "asyncz.task_defaults.max_instances": "3",
        "asyncz.task_defaults.timezone": "UTC",
    },
)

# Start the scheduler
with scheduler:
    ...
    # your code

Available placeholders:

  • {store}: the store alias
  • {ppid}: the parent process id
  • {pgrp}: the process group id

When lock_path is set, Asyncz also defaults startup_delay to 1 second so multiple workers do not immediately stampede the same persisted tasks on startup.

ASGI integration and context managers

Schedulers can be used as:

  • ASGI wrappers via scheduler.asgi(...)
  • synchronous context managers
  • asynchronous context managers

See ASGI and Context Managers for the full lifecycle patterns.