Skip to content

Stores

In simple terms, the stores are places where, as the name suggests, stores the scheduled tasks. Asyncz has a default store wich is a MemoryStore but also brings an integration with three major ones, Redis, SQLAlchemy and MongoDB.

Besides the default memory store, the tasks are not store in memory but in the corresponding desired store.

You are also capable of building your own store if you which, for instance to integrate SQLAlchemy or Tortoise ORM or anything else. Check out the custom store section for more information.

All the states are serialized and reversed using pydantic objects, so keep that in mind when creating a custom store.

Store Encryption

You should use store encryption for security reasons.

All standard stores except MemoryStore support the environment variable ASYNCZ_STORE_ENCRYPTION_KEY. If set and non-empty the hash of the value is used for AESGCM encrypting the elements before sending them to the store. This way store entries are encrypted and authentificated so there is no security hole. This is highly recommended! Because if someone can inject store entries he can execute code.

MemoryStore

This is the default store when using Asyncz and probably the one you will be using when running stateless tasks or tasks that do not require some sort of cache.

Store Alias - memory

from asyncz.stores.memory import MemoryStore

FileStore

This is also a basic store which is always available. It supports the task synchronization between multiple processes via task files in a directory. It is not fast and its security relies solely on file permissions and encryption.

Warning

People who can inject a file in the directory, will be able to inject code. Except if you use encryption.

Store Alias - file

Parameters

  • directory - The directory to use. Should be well protected.
  • suffix - The suffix of task files. Files with other suffixes are ignored.

    Default: ".task"

  • mode - The mode of the directory.

    Default: 0o700

  • cleanup_directory - Shall the directory be deleted after shutdown? This will cleanup old tasks.

    Default: False

  • pickle_protocol- Pickle protocol level to use (for serialization), defaults to the highest available.

    Default: pickle.HIGHEST_PROTOCOL

RedisStore

Store Alias - redis

from asyncz.schedulers import AsyncIOScheduler
from asyncz.stores.redis import RedisStore
# assuming redis runs on localhost
scheduler = AsyncIOScheduler(stores={"default": RedisStore()})
# or
scheduler = AsyncIOScheduler(stores={"default": {"type": "redis"}})

Parameters

  • database - The database number to store tasks in.

    Default: 0

  • tasks_key - The key to store tasks in.

    Default: asyncz.tasks

  • run_times_key - The key to store the tasks run times in.

    Default: asyncz.run_times

  • pickle_protocol - Pickle protocol level to use (for serialization), defaults to the highest available.

    Default: pickle.HIGHEST_PROTOCOL

  • host - Host to connect.

    Default: localhost

  • port - Port to connect.

    Default: 6379

MongoDBStore

Store Alias - mongo

from asyncz.schedulers import AsyncIOScheduler
from asyncz.stores.mongo import MongoDBStore
# assuming mongo db runs on localhost
scheduler = AsyncIOScheduler(stores={"default": MongoDBStore()})
# or
scheduler = AsyncIOScheduler(stores={"default": {"type": "mongo"}})

Parameters

  • database - The database to store the tasks.

    Default: asyncz

  • collection - The collection to store the tasks.

    Default: tasks

  • client - A pymongo.mongo_client.MongoClient instance.

    Default: None

  • pickle_protocol - Pickle protocol level to use (for serialization), defaults to the highest available.

    Default: pickle.HIGHEST_PROTOCOL

  • host - Host to connect.

    Default: localhost

  • port - Port to connect.

    Default: 27017

SQLAlchemyStore

Store Alias - sqlalchemy

from asyncz.schedulers import AsyncIOScheduler
from asyncz.stores.sqlalchemy import SQLAlchemyStore

scheduler = AsyncIOScheduler(stores={"default": SQLAlchemyStore(database="sqlite:///./test_suite.sqlite3")})
# or
scheduler = AsyncIOScheduler(stores={"default": {"type": "sqlalchemy", "database": "sqlite:///./test_suite.sqlite3"}})

Parameters

  • database - The database to store the tasks. Can be string url or engine.

  • tablename - The tablename.

    Default: asyncz_store

  • pickle_protocol - Pickle protocol level to use (for serialization), defaults to the highest available.

    Default: pickle.HIGHEST_PROTOCOL

Other kwargs are passed to sqlalchemy.create_engine.

Custom store

from asyncz.stores.base import BaseStore

You might want to use a different store besides the ones provided and that is also possible. Foor example using a SQLAlchemy or a Tortoise ORM.

To create a custom store you must subclass the BaseStore and it should implement some other pieces of functionality as well.

from datetime import datetime
from typing import List, Union, Optional

from asyncz.schedulers import AsyncIOScheduler
from asyncz.stores.base import BaseStore
from asyncz.tasks.types import TaskType


class CustomStore(BaseStore):
    """
    A new custom store.
    """

    def get_due_tasks(self, now: datetime) -> List["TaskType"]: ...

    def lookup_task(self, task_id: str) -> "TaskType": ...

    def delete_task(self, task_id: str): ...

    def remove_all_tasks(self): ...

    def get_next_run_time(self) -> Optional[datetime]: ...

    def get_all_tasks(self) -> List["TaskType"]: ...

    def add_task(self, task: "TaskType"): ...

    def update_task(self, task: "TaskType"): ...


scheduler = AsyncIOScheduler()

# Add custom store
scheduler.add_store(CustomStore, "custom")