Schedulers¶
Schedulers are the thing that makes all magic and binds everything together. You can see it as a glue.
Usually the developer does not deal/handle the stores, executors or even triggers manually, instead that is managed by the scheduler that acts as an interface amongst them all.
Asyncz being dedicated to ASGI and asyncio brings the AsyncIOScheduler out of the box and only supports this one natively but like everything in Asyncz, you can also create your own custom scheduler that does not necessarily need to be for async. You can build your own scheduler for blocking/background applications.
In fact, Asyncz is used by Esmerald as internal scheduling system and uses the supported scheduler from Asyncz to perform its tasks.
Parameters¶
All schedulers contain at least:
-
global_config - A python dictionary containing configurations for the schedulers. See the examples of how to configure a scheduler.
Default:
None
-
kwargs - Any keyword parameters being passed to the scheduler up instantiation.
Default:
None
Configuring the scheduler¶
Due its simplificy, Asyncz provides some ways of configuring the scheduler for you.
from asyncz.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
from asyncz.schedulers import AsyncIOScheduler
scheduler = AsyncIOScheduler()
What is happening here?:
When you create the scheduler like the examples above, it is creating an AsyncIOScheduler
with a MemoryStore named default
and starting the
asyncio event loop.
Example configuration¶
Let us assume you now need a very custom configuration with more than one store, executors and custom settings.
- Two stores - A mongo and a redis.
- Two executors - An asyncio and a thread pool.
- Coalesce turned off for new tasks by default.
- Maximum instance limiting to 4 for new tasks.
First option¶
The first way of doing the configuration is in a simple pythonic fashion.
from datetime import timezone as tz
from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.stores.mongo import MongoDBStore
from asyncz.stores.redis import RedisStore
# Define the stores
# Override the default MemoryStore to become RedisStore where the db is 0
stores = {"mongo": MongoDBStore(), "default": RedisStore(database=0)}
# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {
"default": AsyncIOExecutor(),
"threadpool": ThreadPoolExecutor(max_workers=20),
}
# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}
# Create the scheduler
scheduler = AsyncIOScheduler(
stores=stores, executors=executors, task_defaults=task_defaults, timezone=tz.utc
)
# Start the scheduler
with scheduler:
# note: you can also use start() and shutdown() manually
# Nesting is also not a problem (start and shutdown are refcounted and only the outermost scope does start and shutdown the scheduler)
scheduler.start()
scheduler.stop()
# manually you have more control like:
scheduler.start(paused=True)
scheduler.resume()
# noop because not outermost scope
with scheduler:
...
scheduler.shutdown(wait=False)
Second option¶
The second option is by starting the scheduler and injecting a dictionary directly upon instantiation.
from asyncz.schedulers.asyncio import AsyncIOScheduler
# Create the scheduler
scheduler = AsyncIOScheduler(
global_config={
"asyncz.stores.mongo": {"type": "mongodb"},
"asyncz.stores.default": {"type": "redis", "database": "0"},
"asyncz.executors.pool": {
"max_workers": "20",
"class": "asyncz.executors.pool:ThreadPoolExecutor",
},
"asyncz.executors.default": {"class": "asyncz.executors.asyncio:AsyncIOExecutor"},
"asyncz.task_defaults.coalesce": "false",
"asyncz.task_defaults.max_instances": "3",
"asyncz.task_defaults.timezone": "UTC",
},
)
# Start the scheduler
with scheduler:
...
# your code
Third option¶
The third option is by starting the scheduler and use the setup
method.
from datetime import timezone as tz
from asyncz.executors import AsyncIOExecutor, ThreadPoolExecutor
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.stores import MongoDBStore, RedisStore
# Define the stores
# Override the default MemoryStore to become RedisStore where the db is 0
stores = {"mongo": MongoDBStore(), "default": RedisStore(database=0)}
# Define the executors
# Override the default ot be the AsyncIOExecutor
executors = {
"default": AsyncIOExecutor(),
"threadpool": ThreadPoolExecutor(max_workers=20),
}
# Set the defaults
task_defaults = {"coalesce": False, "max_instances": 4}
# Create the scheduler
scheduler = AsyncIOScheduler()
## Add some tasks here or anything else (for instance 3 tasks)
scheduler.add_task(...)
scheduler.add_task(...)
scheduler.add_task(...)
scheduler.setup(stores=stores, executors=executors, task_defaults=task_defaults, timezone=tz.utc)
# Start the scheduler
with scheduler:
...
# your code
Multi-Proccessing mode¶
Asyncz schedulers have an optional multiprocessing mode. It can be activated by setting the
lock_path
option to e.g. "/tmp/asyncz_{store}_{pgrp}.lock"
This defines a per-store process lock via a file.
Parameters:
{store}
(no format string) - Set the store name. Should be provided.ppid
- Replaced by the ppid of the process. Formatting possible.pgrp
- Replaced by the pgrp of the process. Formatting possible.
from asyncz.schedulers.asyncio import AsyncIOScheduler
# Create the scheduler
scheduler = AsyncIOScheduler(
global_config={
"asyncz.lock_path": "/tmp/asynzc_super_project_{store}_store.pid",
"asyncz.startup_delay": 2,
"asyncz.stores.mongo": {"type": "mongodb"},
"asyncz.stores.default": {"type": "redis", "database": "0"},
"asyncz.executors.pool": {
"max_workers": "20",
"class": "asyncz.executors.pool:ThreadPoolExecutor",
},
"asyncz.executors.default": {"class": "asyncz.executors.asyncio:AsyncIOExecutor"},
"asyncz.task_defaults.coalesce": "false",
"asyncz.task_defaults.max_instances": "3",
"asyncz.task_defaults.timezone": "UTC",
},
)
# Start the scheduler
with scheduler:
...
# your code
Note
You may want to set an explicit startup_delay
in case of the initial started tasks are behaving spurious.
By default a startup_delay
of 1 second is used in case of lock_path not empty. Otherwise the default is 0.
Changing logger name and class¶
asyncz
uses a custom way of logging: it builds up a dictionary store with loggers of the standard logger interface.
They are retrieved from schedulers via their alias name plus prefix.
e.g. asyncz.schedulers
, asyncz.stores.default
, asyncz.executors.default
Scheduler has an optional parameter named logger_name
. If set the the schedulers logger becomes:
asyncz.schedulers.<name specified>
By default asyncz
uses loguru as logger (when available) and falls back to classical logging.
If this is not wished there are some methods:
- setting either via global config or direct the value of
loggers_class
toasyncz.schedulers.base:ClassicLogging
(or the class itself instead of the string) when creating a scheduler object - setting
asyncz.schedulers.base.default_loggers_class
to ClassicLogging (same file, only class is possible here)
Starting and stopping the scheduler¶
Every scheduler inherits from the BaseScheduler and therefore implement the
mandatory functions such as start
and shutdown
.
To start the scheduler simply run:
from asyncz.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
# Start the scheduler
scheduler.start()
To stop the scheduler simply run:
from asyncz.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
# Shutdown the scheduler
scheduler.shutdown()
Adding tasks¶
A scheduler to work needs tasks, of course and Asyncz offers some ways of adding tasks into the scheduler.
There is also a third option but that is related with the integration with ASGI frameworks, for instance esmerald which it should not be used in this agnostic context.
Add tasks¶
Adding a task via add_task
is the most common.
The add_task
returns an instance of Task.
So, how can you add a task?
from datetime import timezone as tz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger
from asyncz.tasks import Task
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def collect_www_info():
# Add logic to collect information from the internet
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every 2 minutes
scheduler.add_task(
collect_www_info,
trigger=IntervalTrigger(minutes=2),
max_instances=1,
replace_existing=True,
coalesce=True,
)
# Run every 10 minutes
scheduler.add_task(
check_status,
trigger=IntervalTrigger(minutes=10),
max_instances=1,
replace_existing=True,
coalesce=False,
)
# Run every 10 minutes collect_www_info before sending the newsletter
feed_data = collect_www_info()
scheduler.add_task(
fn_or_task=send_email_newsletter,
args=[feed_data],
trigger=IntervalTrigger(minutes=10),
max_instances=1,
replace_existing=True,
coalesce=False,
)
# Add Task object
task = Task(
fn=send_email_newsletter,
args=[feed_data],
trigger=IntervalTrigger(minutes=10),
max_instances=1,
replace_existing=True,
coalesce=False,
)
# you can update most attributes here. Note: a task can be only submitted once
scheduler.add_task(task)
# Use Task as decorator (leave fn empty)
decorator = scheduler.add_task(
args=[feed_data],
trigger=IntervalTrigger(minutes=10),
max_instances=1,
replace_existing=True,
coalesce=False,
)
decorator(send_email_newsletter)
# Start the scheduler
scheduler.start()
# Add paused Task
scheduler.add_task(
send_email_newsletter,
args=[feed_data],
trigger=IntervalTrigger(minutes=10),
max_instances=1,
replace_existing=True,
coalesce=False,
# this pauses the task on submit
next_run_time=None,
)
What happen here is actually very simple. We created an instance of the AsyncIOScheduler
and
added the functions send_email_newsletter
, collect_www_info
, check_status
to the scheduler
and started it.
Why then passing CronTrigger
and IntervalTrigger
instances instead of simply passing cron
or interval
?
Well, we want to pass some attributes to the object and this way makes it cleaner and simpler.
When adding tasks there is not a specific order. You can add tasks at any given time. If the scheduler is not yet running, once it does it will add the tasks to it.
Parameters¶
- fn_or_task - (positional or via this name). The callable function to execute or the task to submit.
- id - The unique identifier of this task. Leave empty to autogenerate an id or switch to the
- name - The description of this task.
- args - Positional arguments to the callable.
- kwargs - Keyword arguments to the callable.
- coalesce - Whether to only run the task once when several run times are due.
- trigger - The trigger object that controls the schedule of this task.
- executor - The name of the executor that will run this task.
- mistrigger_grace_time - The time (in seconds) how much this task's execution is allowed to be late (None means "allow the task to run no matter how late it is").
- max_instances - The maximum number of concurrently executing instances allowed for this task.
- next_run_time - The next scheduled run time of this task. If set to None, the task start paused.
- replace_existing - The submitted task replaces an existing task with the same id. Otherwise a
ConflictId
error is thrown.
Note
add_task
has a special pause mode: next_run_time
can be set to None for starting a Task paused. This works also with Task objects.
Tip
When submitting a Task object, most attributes can be changed by providing arguments for e.g. trigger, name and other kwargs. However the task is updated in-place. No copy is made. This has interesting effects: for example a decorator mode Task can be turned in a normal one by providing an id and is submitted in-place.
Add tasks as decorator¶
When leaving out the fn
parameter, you get back a decorator mode Task.
It has two submodes:
- with provided id
- without provided id
Both modes share, that a copy of the task is created and submitted (with the function applied on it).
With provided id¶
The Task copy is submitted to the scheduler with replacing_existing=True
. Other tasks with the same id are replaced.
For getting the copy the task.id can be used to retrieve it.
Without provided id¶
The Task copy is submitted to the scheduler with replacing_existing=False
and has an autogenerated id
.
Also the function decorated has now an attribute: asyncz_tasks
containing the copy. If multiple decorator mode tasks are applied all of the copies are now saved in asyncz_tasks
attribute of the function.
If this behavior is unwanted, scheduler.add_task
can be used in a partial.
Examples¶
from datetime import timezone as tz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)
# Run every Monday, Wednesday and Friday
@scheduler.add_task(
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5")
)
def send_email_newsletter():
# Add logic to send emails here
...
# Run every 2 minutes
@scheduler.add_task(
trigger=IntervalTrigger(minutes=2),
max_instances=1,
coalesce=True,
)
def collect_www_info():
# Add logic to collect information from the internet
...
@scheduler.add_task(
trigger=IntervalTrigger(minutes=10),
max_instances=1,
coalesce=False,
)
def check_status():
# Logic to check a given status of whatever needed
...
# has now asyncz_tasks containing the copy
hasattr(check_status, "asyncz_tasks")
# Start the scheduler
scheduler.start()
Deleting tasks¶
In the same way you can add tasks you can also remove them with the same ease and there are also different ways of removing them.
Delete task¶
This is probably the most common way of removing tasks from the scheduler using the task id and the store alias.
from datetime import timezone as tz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def collect_www_info():
# Add logic to collect information from the internet
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
id="send_newsletter",
fn=send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every hour and minute 1
status = scheduler.add_task(
id="status",
fn=check_status,
trigger=CronTrigger(hour="0-23", minute="1"),
)
# Remove the tasks by ID or task object
scheduler.delete_task("send_newsletter")
scheduler.delete_task(status)
Delete¶
The delete
function is probably more convenient but it requires that you store the Task
somewhere ocne the instance is received and for tasks scheduled by thetask decorator
this method does not work, instead only the delete task will work.
from datetime import timezone as tz
from asyncz.schedulers.asyncio import AsyncIOScheduler
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
task = scheduler.add_task(send_email_newsletter, "cron", hour="0-23", minute="1")
# delete task
task.delete()
# Run every hour and minute 1
task = scheduler.add_task(check_status, "cron", hour="0-23", minute="1")
# delete task
task.delete()
Pause and resume task¶
As shown above, you can add and remove tasks but you can pause and resume tasks as well. When a task is paused, there is no next time to run since the action is no longer being validate. That can be again reactivated by resuming that same Task.
Like the previous examples, there are also multiple ways of achieving that.
Pause task¶
Like delete_task, you can pause a task using the id.
from datetime import timezone as tz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
id="send_newsletter",
fn=send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every hour and minute 1
status = scheduler.add_task(
id="status",
fn=check_status,
trigger=CronTrigger(hour="0-23", minute="1"),
)
# Pause the tasks by ID and store alias
scheduler.pause_task("send_newsletter")
scheduler.pause_task(status)
Pause¶
The same is applied to the simple pause where you can do it directly via task instance.
from datetime import timezone as tz
from asyncz.schedulers.asyncio import AsyncIOScheduler
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
task = scheduler.add_task(send_email_newsletter, "cron", hour="0-23", minute="1")
# Pause task
task.pause()
# Run every hour and minute 1
task = scheduler.add_task(check_status, "cron", hour="0-23", minute="1")
# Pause task
task.pause()
Resume task¶
Resuming a task is as simple as again, passing a task id.
from datetime import timezone as tz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
id="send_newsletter",
fn=send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every hour and minute 1
status = scheduler.add_task(
id="status",
fn=check_status,
trigger=CronTrigger(hour="0-23", minute="1"),
)
# Resume the tasks by ID or task object
scheduler.resume_task("send_newsletter")
scheduler.resume_task(status)
Resume¶
Same for the resume. You can resume a task directly from the instance.
from datetime import timezone as tz
from asyncz.schedulers.asyncio import AsyncIOScheduler
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
task = scheduler.add_task(send_email_newsletter, "cron", hour="0-23", minute="1")
# resume task
task.resume()
# Run every hour and minute 1
task = scheduler.add_task(check_status, "cron", hour="0-23", minute="1")
# Resume task
task.resume()
Check
add_task, delete_task, pause_task and resume_task expect a mandatory task_id parameter as well an optional store name. Why the store name? Because you might want to store the tasks in different places and this points it out the right place.
Update task¶
As mentioned in the tasks section, internally the scheduler updates the information given to the task and then executes it.
You can update any attribute of the task by calling:
- asyncz.tasks.Task.update() - The update method from a task instance.
- update_task - The function from the scheduler.
From a task instance¶
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger
# Create a scheduler
scheduler = AsyncIOScheduler()
def check_status():
# Logic to check statuses
...
# Create a task
task = Task(
id="my-task",
fn=check_status,
name="my-func",
scheduler=scheduler,
trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
max_instances=3,
coalesce=True,
)
# Update the task
task.update(
name="my-new-task-id",
max_instances=5,
coalesce=False,
)
From the scheduler¶
from datetime import timezone as tz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
id="send_email_newsletter",
fn=send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every 10 minutes
check_status = scheduler.add_task(
id="check_status",
fn=check_status,
trigger=IntervalTrigger(minutes=10),
max_instances=1,
replace_existing=True,
coalesce=False,
)
# Update the task by id or object
scheduler.update_task("send_email_newsletter", coalesce=False, max_instances=4)
scheduler.update_task(check_status, coalesce=True, max_instances=3)
# Start the scheduler
scheduler.start()
Important note¶
All attributes can be updated but the id as this is immutable.
Reschedule tasks¶
You can also reschedule a task if you want/need but by change what it means is changing only the trigger by using:
- asyncz.tasks.Taskk.reschedule() - The reschedule task from the Task instance. The trigger must be the alias of the trigger object.
- reschedule_task - The function from the scheduler instance to reschedule the task.
Reschedule the task instance¶
from asyncz.schedulers import AsyncIOScheduler
from asyncz.tasks import Task
from asyncz.triggers import CronTrigger
# Create a scheduler
scheduler = AsyncIOScheduler()
def check_status():
# Logic to check statuses
...
# Create a task
task = Task(
id="my-task",
fn=check_status,
name="my-func",
scheduler=scheduler,
trigger=CronTrigger(day_of_week="mon,tue,wed,thu,fri,sat,sun", hour=8, minute=1),
max_instances=3,
coalesce=True,
)
# Reschedule the task
task.reschedule("my-task", trigger="cron", hour=10, minute=5)
Reschedule from the scheduler¶
from datetime import timezone as tz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger, IntervalTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
id="send_email_newsletter",
fn=send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every 10 minutes
check_status = scheduler.add_task(
id="check_status",
fn=check_status,
trigger=IntervalTrigger(minutes=10),
max_instances=1,
replace_existing=True,
coalesce=False,
)
# Reschedule the tasks
scheduler.reschedule_task("send_email_newsletter", trigger="cron", day_of_week="mon", hour="1")
scheduler.reschedule_task(check_status, trigger="interval", minutes=20)
# Start the scheduler
scheduler.start()
Resume and pause the tasks¶
Resuming and pausing task processing (all tasks) is also allowed with simple instructions.
Pausing all tasks¶
from datetime import timezone as tz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
id="send_newsletter",
fn=send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every hour and minute 1
scheduler.add_task(
id="status",
fn=check_status,
trigger=CronTrigger(hour="0-23", minute="1"),
)
# Pause all tasks
scheduler.pause()
Resuming all tasks¶
from datetime import timezone as tz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
id="send_newsletter",
fn=send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every hour and minute 1
scheduler.add_task(
id="status",
fn=check_status,
trigger=CronTrigger(hour="0-23", minute="1"),
)
# Pause all tasks
scheduler.pause()
# Resume all tasks
scheduler.resume()
Start the scheduler in the paused state¶
Starting the scheduler without the paused state means without the first wakeup call.
from datetime import timezone as tz
from asyncz.schedulers.asyncio import AsyncIOScheduler
from asyncz.triggers import CronTrigger
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)
def send_email_newsletter():
# Add logic to send emails here
...
def check_status():
# Logic to check a given status of whatever needed
...
# Create the tasks
# Run every Monday, Wednesday and Friday
scheduler.add_task(
id="send_newsletter",
fn=send_email_newsletter,
trigger=CronTrigger(day_of_week="mon,wed,fri", hour="8", minute="1", second="5"),
)
# Run every hour and minute 1
scheduler.add_task(
id="status",
fn=check_status,
trigger=CronTrigger(hour="0-23", minute="1"),
)
# Pause all tasks
scheduler.start(paused=True)
BaseScheduler¶
The base of all available schedulers provided by Asyncz and it should be the base of any custom scheduler.
The parameters are the same as the ones described before.
from asyncz.schedulers.base import BaseScheduler
AsyncIOScheduler¶
This scheduler has a mostly synchronous interface. It is handy for an synchronous environment and supports asynchronous functions. Because of the synchronous interface it has a slight delay when shutting down.
from asyncz.schedulers import AsyncIOScheduler
This scheduler has besides the normal parameters of the scheduler some additional ones.
-
event_loop - An optional. async event_loop to be used. If nothing is provided, it will use the
asyncio.get_event_loop()
(global) ifisolated_event_loop
isFalse
.Default:
None
-
isolated_event_loop - Instead of using an existing event_loop a new one is used.
Default:
False
-
timeout - A timeout used for start and stop the scheduler.
Default:
None
NativeAsyncIOScheduler¶
This scheduler uses an async start/shutdown interface and is very handy for asynchronous environments because it hasn't a shutdown delay and has less sync/async changes.
from asyncz.schedulers import AsyncIOScheduler
This scheduler has besides the normal parameters of the scheduler some additional ones.
-
isolated_event_loop - Instead of using an existing event_loop a new one is used.
Default:
False
-
timeout - A timeout used for start and stop the scheduler.
Default:
None
Note: in contrast to AsyncIOScheduler it is not possible to provide an event_loop (except via isolated_event_loop).
Custom Scheduler¶
As mentioned before, Asyncz and the nature of its existence is to be more focused on ASGI and asyncio applications but it is not limited to it.
You can create your own scheduler for any other use case, for example a blocking or background scheduler.
Usually when creating a custom scheduler you must override at least 3 functions.
- start() - Function used to start/wakeup the scheduler for the first time.
- shutdown() - Function used to stop the scheduler and release the resources created up
start()
. - wakeup() - Manage the timer to notify the scheduler of the changes in the store.
There are also some optional functionalities you can override if you want.
- create_default_executor - Override this function if you want a different default executor.
from asyncz.schedulers.base import BaseScheduler
from asyncz.typing import Any
class MyCustomScheduler(BaseScheduler):
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
def start(self, paused: bool = False):
# logic for the start
...
def shutdown(self, wait: bool = True):
# logic for the shutdown
...
def wakeup(self):
# logic for the wakeup
...
def create_default_executor(self):
# logic for your default executor
...
Limit the number of currently executing instances¶
By default, only one instance of each Task is allowed to run at the same time.
To change that when creating a task you can set the max_instances
to the number you desire and
this will let the scheduler know how many should run concurrently.
Events¶
It is also possible to attach event listeners to the schedule. The events are triggered on specific occasions and may carry some additional information with them regarding detauls of that specific event. Check the events section to see the available events.
from datetime import timezone as tz
from loguru import logger
from asyncz.events.constants import TASK_ADDED, TASK_REMOVED
from asyncz.schedulers.asyncio import AsyncIOScheduler
# Create the scheduler
scheduler = AsyncIOScheduler(timezone=tz.utc)
def my_custom_listener(event):
if not event.exception:
logger.info("All good")
else:
logger.exception("Problem with the task")
# Add event listener
scheduler.add_listener(my_custom_listener, TASK_ADDED | TASK_REMOVED)
Final thoughts¶
Asyncz since it is a revamp, simplified and rewritten version of APScheduler, you will find very common ground and similarities to it and that is intentional as you shouldn't be unfamiliar with a lot of concepts if you are already familiar with APScheduler.