AsyncZ Dashboard β Logs System¶
The AsyncZ Dashboard now ships with a built-in log viewer that lets you see all scheduler and task logs directly in the web interface β live, searchable, and filterable by task, log level, or message.
This guide explains everything about how it works, how to configure it, and how to extend it for your own applications.
π Overview¶
AsyncZ uses a modular, pluggable logging architecture designed for flexibility:
- π§© LogStorage interface β defines how logs are stored and queried.
- βοΈ MemoryLogStorage β default in-memory ring buffer (fast, simple, resets oldest entries).
- πͺ΅ TaskLogHandler β a logging.Handler that routes log records into the storage backend.
- π install_task_log_handler() β installs the handler globally.
- π§ Dashboard integration β /dashboard/logs/ UI renders all recent logs with filters and live updates.
- π§± Custom storage backends β you can implement your own, such as file rotation, JSON persistence, Postgres, or Redis.
Default Behavior¶
When you create the dashboard admin interface via:
from asyncz.contrib.dashboard import AsyncZAdmin
admin = AsyncZAdmin(
scheduler=scheduler,
)
AsyncZ automatically:
1. Creates or reuses a global MemoryLogStorage instance.
2. Installs a TaskLogHandler to the AsyncZ root logger (asyncz namespace).
3. Captures all logs emitted through AsyncZβs task runners and scheduler.
You can now view all captured logs at:
http://localhost:8000/dashboard/logs/
Passing Your Own Log Storage¶
AsyncZ allows you to define your own log storage backend.
You can pass a custom instance to AsyncZAdmin:
from asyncz.contrib.dashboard import AsyncZAdmin
from asyncz.contrib.dashboard.logs.storage import MemoryLogStorage
custom_storage = MemoryLogStorage(maxlen=20_000)
admin = AsyncZAdmin(
scheduler=scheduler,
log_storage=custom_storage,
)
This ensures the dashboard uses your chosen backend for all incoming logs.
Architecture Deep Dive¶
LogStorage Interface¶
All log storage backends implement this abstract base:
class LogStorage:
def append(self, entry: LogEntry) -> None:
"""Store a new log entry."""
raise NotImplementedError
def query(
self,
*,
task_id: str | None = None,
level: str | None = None,
q: str | None = None,
limit: int = 200,
) -> Iterable[LogEntry]:
"""Return logs filtered by the given criteria."""
raise NotImplementedError
You can get via:
from asyncz.contrib.dashboard.logs.storage import LogStorage
LogEntry¶
Every captured log is represented as a dataclass:
@dataclass(frozen=True)
class LogEntry:
ts: datetime
level: str
message: str
logger: str
task_id: str | None = None
extra: dict[str, Any] | None = None
You can get via:
from asyncz.contrib.dashboard.logs.storage import LogEntry
Default In-Memory Storage¶
The default MemoryLogStorage keeps a ring buffer of the most recent logs:
from asyncz.contrib.dashboard.logs.storage import MemoryLogStorage
storage = MemoryLogStorage(maxlen=10_000)
When the buffer reaches its maximum (maxlen):
- The oldest logs are automatically discarded, ensuring constant memory usage.
- You can inspect internal stats via .stats() if implemented (size, total appended, dropped).
Integration with Python Logging and Structlog¶
Standard Logging¶
AsyncZ installs the handler like this:
from asyncz.contrib.dashboard.logs.handler import install_task_log_handler
install_task_log_handler(storage)
Internally, this attaches TaskLogHandler to the root logger, so all logs from AsyncZ and user tasks flow into the dashboard.
Structlog Support¶
If your application uses structlog, you donβt need special configuration β
as long as structlog is configured to emit via the stdlib logging system.
Example:
import structlog
from structlog.stdlib import LoggerFactory, ProcessorFormatter
structlog.configure(
logger_factory=LoggerFactory(),
processors=[
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
)
AsyncZβs TaskLogHandler will automatically capture these events, including any task_id bound to the record (e.g., logger.bind(task_id=...)).
π§± Building Custom Log Storages¶
You can build persistent or specialized backends by subclassing LogStorage.
1. Rotating File Logs¶
import logging
from logging.handlers import RotatingFileHandler
from asyncz.contrib.dashboard.logs.storage import LogStorage, LogEntry
class RotatingFileStorage(LogStorage):
def __init__(self, path="asyncz.log", max_bytes=1_000_000, backup_count=5):
self.handler = RotatingFileHandler(path, maxBytes=max_bytes, backupCount=backup_count)
self.handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
def append(self, entry: LogEntry):
record = logging.LogRecord(
name=entry.logger,
level=getattr(logging, entry.level.upper(), logging.INFO),
pathname="",
lineno=0,
msg=entry.message,
args=None,
exc_info=None,
)
self.handler.emit(record)
def query(self, **kwargs):
# Simple case: reading back not supported for file-based rotation
return []
2. JSON File Storage¶
import json
from pathlib import Path
from asyncz.contrib.dashboard.logs.storage import LogStorage, LogEntry
class JSONLogStorage(LogStorage):
def __init__(self, path: str = "asyncz_logs.json"):
self.path = Path(path)
self.path.touch(exist_ok=True)
def append(self, entry: LogEntry):
with self.path.open("a", encoding="utf-8") as f:
f.write(json.dumps(entry.__dict__, default=str) + "\n")
def query(self, **_):
with self.path.open("r", encoding="utf-8") as f:
for line in f.readlines()[-200:]:
yield LogEntry(**json.loads(line))
3. Postgres Storage¶
You can persist logs to a relational database:
import asyncpg
from asyncz.contrib.dashboard.logs.storage import LogStorage, LogEntry
class PostgresLogStorage(LogStorage):
def __init__(self, dsn: str):
self.dsn = dsn
async def append(self, entry: LogEntry):
conn = await asyncpg.connect(self.dsn)
await conn.execute(
"INSERT INTO asyncz_logs (ts, level, message, logger, task_id, extra) VALUES ($1,$2,$3,$4,$5,$6)",
entry.ts, entry.level, entry.message, entry.logger, entry.task_id, entry.extra,
)
await conn.close()
async def query(self, **filters):
# Implementation using SQL filters and LIMIT
...
You can later render them via the same dashboard controllers (e.g., LogsTablePartialController) by providing your custom storage instance to AsyncZAdmin().
4. Redis Storage¶
For distributed or multi-process schedulers, you can store logs in Redis:
import json, redis
from asyncz.contrib.dashboard.logs.storage import LogStorage, LogEntry
class RedisLogStorage(LogStorage):
def __init__(self, url="redis://localhost:6379", key="asyncz:logs", maxlen=10_000):
self.client = redis.from_url(url)
self.key = key
self.maxlen = maxlen
def append(self, entry: LogEntry):
self.client.xadd(self.key, entry.__dict__, maxlen=self.maxlen, approximate=True)
def query(self, limit=200, **_):
messages = self.client.xrevrange(self.key, count=limit)
for _, fields in messages:
yield LogEntry(**{k.decode(): v.decode() for k, v in fields.items()})
This approach lets multiple AsyncZ nodes push logs to a shared centralized stream.
Using AsyncZAdmin¶
If youβre using the higher-level AsyncZAdmin, you can pass your storage directly:
from asyncz.contrib.dashboard import AsyncZAdmin
from myproject.logging import RedisLogStorage
admin = AsyncZAdmin(
scheduler=scheduler,
log_storage=RedisLogStorage(),
)
Internally, AsyncZAdmin passes your custom storage to the dashboard creation and configuration automatically.
Dashboard UI Features¶
- Automatic refresh every 10 seconds.
- Filters: by task ID, log level, or search term.
- Limit: configurable via the UI (default 200).
- Reset: clears filters and reloads latest entries.
- Zero reload: all dynamic via HTMX partials.
Summary¶
| Feature | Description |
|---|---|
| Default backend | MemoryLogStorage (in-memory, circular buffer). |
| Custom backends | Easily subclass LogStorage for file, JSON, DB, Redis, etc. |
| Live dashboard | /dashboard/logs/ auto-refreshes every 10s with filters. |
| Integration | Works seamlessly with Python logging and structlog. |
| Configurable | Pass log_storage to AsyncZAdmin. |
Best Practices¶
- Use
MemoryLogStoragein development and testing. - Use persistent storage (Redis/Postgres) in production if logs must survive restarts.
- Bind your
task_idin tasks for clearer traceability:
import logging
log = logging.getLogger("asyncz.task")
log.info("Task started", extra={"task_id": job.id})
- For structlog, use
logger.bind(task_id=job.id).