"""
Observability sink for DSL strategy executions.
This service does *two* things for every ``DslStrategy.calculate_points``
call (the wiring lives in :mod:`app.engine.dsl_strategy`):
1. Emits the Prometheus metrics defined in
:mod:`app.engine.dsl_metrics`. Always synchronous, always cheap;
never blocks scoring.
2. Persists a row to ``strategyexecutionlog`` when the sampler picks
the run or when the run failed. Errors are *always* kept so a
post-mortem can replay the input via ``/simulate`` even months
after the incident. OK runs are sampled at
:data:`_config_module.configs.DSL_EXECUTION_LOG_SAMPLE_RATE` (default 5 %).
Hot-path: ``record`` no longer ``await``\\s the DB write.
The metrics emit + sampling decision stay synchronous (microseconds),
and the chosen row is handed to a bounded in-process queue drained by a
background worker task. Scoring therefore pays only the enqueue, never a
DB round-trip. If the worker falls behind and the queue fills (a slow or
down database), rows are dropped and counted via
``dsl_execution_log_dropped_total`` rather than blocking the scoring
call - the audit log is best-effort by design. Call :meth:`aclose`
on shutdown to flush the queue and stop the worker cleanly.
The service is wired with ``random.Random`` so tests can pass a
seeded instance and assert on exact rows persisted. In production the
default ``random`` module instance is used.
Persistence is best-effort: any exception from the repository is
caught and logged, never re-raised. The engine must never fail a
scoring call because a metrics row couldn't be written.
"""
from __future__ import annotations
import asyncio
import logging
import random
from typing import Any, Dict, List, Optional
# Imported as a module rather than via ``from .. import configs`` so the
# observer always sees the live ``configs`` object even when a test
# reloads ``app.core.config`` (which rebinds the symbol at module level
# but leaves any direct imports pointing at the stale instance).
from app.core import config as _config_module
from app.engine import dsl_metrics
from app.model.strategy_execution_log import StrategyExecutionLog
from app.repository.strategy_execution_log_repository import \
StrategyExecutionLogRepository
logger = logging.getLogger(__name__)
[docs]
class DslExecutionObserver:
"""
Lives as a long-lived dependency (one per process is fine -- it is
stateless aside from the rng + repository pointer). Injected into
``DslStrategy`` via the container so tests can swap a no-op
implementation.
"""
def __init__(
self,
execution_log_repository: Optional[StrategyExecutionLogRepository] = None,
*,
rng: Optional[random.Random] = None,
queue_maxsize: Optional[int] = None,
) -> None:
self._repository = execution_log_repository
self._rng = rng or random
# Lazily-created bounded queue + drain worker. Both are
# created on the first enqueue (which always happens inside the
# running event loop), so constructing the observer outside a loop
# - as the DI container Singleton does at import time - is safe.
self._queue_maxsize = (
queue_maxsize
if queue_maxsize is not None
else _config_module.configs.DSL_EXECUTION_LOG_QUEUE_MAXSIZE
)
self._queue: Optional["asyncio.Queue[StrategyExecutionLog]"] = None
self._worker: Optional[asyncio.Task] = None
self._closed = False
[docs]
async def record(
self,
*,
strategyId: str,
strategyVersion: int,
strategyType: str,
realmId: Optional[str],
externalGameId: Optional[str],
externalTaskId: Optional[str],
externalUserId: Optional[str],
status: str,
errorCode: Optional[str],
points: Optional[float],
caseName: Optional[str],
durationMs: float,
nodesExecuted: int,
trace: Optional[List[Dict[str, Any]]],
parentStrategyId: Optional[str] = None,
) -> None:
"""
Single entry point called by ``DslStrategy._run_phase`` once
every execution finishes (success or failure).
``durationMs`` is the *wall-clock* time including precompute,
not just the interpreter walk, because that is what the SLO
speaks to ("a scoring event taking >250ms is user-visible").
"""
# metrics
try:
dsl_metrics.observe(
realm=realmId,
strategy_type=strategyType,
status=status,
duration_seconds=durationMs / 1000.0,
nodes_executed=nodesExecuted,
error_code=errorCode,
)
except Exception: # pragma: no cover - defensive
logger.exception(
"Failed to emit DSL metrics for strategyId=%s status=%s",
strategyId,
status,
)
# sampled persistence
if not _config_module.configs.DSL_EXECUTION_LOG_ENABLED:
return
if self._repository is None:
# Light call sites (legacy tests) don't wire the repo; that
# is fine -- metrics still flow, persistence simply skips.
return
is_error = status != "ok"
sample_rate = _config_module.configs.DSL_EXECUTION_LOG_SAMPLE_RATE
# Always keep errors; sample OK runs.
should_persist = is_error or (
sample_rate > 0 and self._rng.random() < sample_rate
)
if not should_persist:
return
truncated_trace = _truncate_trace(
trace, _config_module.configs.DSL_EXECUTION_LOG_TRACE_LIMIT
)
notes = None
if trace is not None and len(trace) > len(truncated_trace or []):
notes = (
f"trace truncated: {len(trace)} -> " f"{len(truncated_trace)} entries"
)
row = StrategyExecutionLog(
strategyId=strategyId,
strategyVersion=strategyVersion,
strategyType=strategyType,
realmId=realmId,
externalGameId=externalGameId,
externalTaskId=externalTaskId,
externalUserId=externalUserId,
status=status,
errorCode=errorCode,
points=points,
caseName=caseName,
durationMs=durationMs,
nodesExecuted=nodesExecuted,
trace=truncated_trace,
sampled=not is_error,
parentStrategyId=parentStrategyId,
notes=notes,
)
# Hand off to the background worker instead of awaiting
# the DB write here. ``_enqueue`` never blocks and never raises -
# a full queue drops the row (counted) so a slow database can't
# apply backpressure to the scoring hot-path.
self._enqueue(row, realmId=realmId, strategyType=strategyType)
# Background drain worker.
def _enqueue(
self,
row: StrategyExecutionLog,
*,
realmId: Optional[str],
strategyType: str,
) -> None:
"""Best-effort, non-blocking handoff to the drain worker."""
if self._closed:
return
self._ensure_worker()
try:
self._queue.put_nowait(row)
except asyncio.QueueFull:
# The worker is behind (DB slow/down). Drop rather than wait:
# scoring must never block on the audit log. Surface the drop
# so a saturated sink is visible instead of silent data loss.
dsl_metrics.observe_log_dropped(
realm=realmId,
strategy_type=strategyType,
)
logger.warning(
"DSL execution-log queue full (maxsize=%s); dropped a "
"row for realm=%s type=%s",
self._queue_maxsize,
realmId,
strategyType,
)
def _ensure_worker(self) -> None:
"""Create the queue + drain task on first use, inside the loop."""
if self._queue is None:
self._queue = asyncio.Queue(maxsize=max(self._queue_maxsize, 1))
if self._worker is None or self._worker.done():
self._worker = asyncio.ensure_future(self._drain_loop())
async def _drain_loop(self) -> None:
"""
Background worker that persists queued execution-log rows.
Runs forever, pulling rows off the queue and writing each to the
repository. Persistence failures are logged at WARNING and swallowed so
the worker keeps draining and a bad row never escapes the loop.
"""
assert self._queue is not None
while True:
row = await self._queue.get()
try:
await self._repository.insert_row(row)
except Exception:
# Persistence failure must never escape the worker.
# Logged at WARNING so it surfaces in dashboards but
# doesn't page; the next row keeps draining.
logger.warning(
"Failed to persist StrategyExecutionLog for "
"strategyId=%s status=%s",
getattr(row, "strategyId", None),
getattr(row, "status", None),
exc_info=True,
)
finally:
self._queue.task_done()
[docs]
async def drain(self) -> None:
"""Block until every queued row has been processed.
Mainly for tests and graceful shutdown - production scoring never
calls this. No-op if nothing has been enqueued yet.
"""
if self._queue is not None:
await self._queue.join()
[docs]
async def aclose(self) -> None:
"""Flush pending rows and stop the worker. Idempotent.
Wired into the FastAPI lifespan so an orderly shutdown doesn't
lose buffered execution logs.
"""
self._closed = True
await self.drain()
if self._worker is not None and not self._worker.done():
self._worker.cancel()
try:
await self._worker
except (asyncio.CancelledError, Exception):
pass
self._worker = None
def _truncate_trace(
trace: Optional[List[Dict[str, Any]]],
limit: int,
) -> Optional[List[Dict[str, Any]]]:
"""
Keep the head of the trace (where rule matching happens) and drop
the tail when the program is long. Limit <= 0 disables truncation
so tests can persist full traces deterministically.
"""
if trace is None:
return None
if limit <= 0 or len(trace) <= limit:
return list(trace)
return list(trace[:limit])
[docs]
class NoopDslExecutionObserver:
"""
Placeholder observer used when the engine is exercised outside
the DI container (e.g. raw unit tests that instantiate
``DslStrategy`` directly). Drops everything on the floor.
"""
[docs]
async def record(self, **_: Any) -> None:
"""No-op observer hook; discards all execution observations."""
return None