app.services.dsl_observability_service module¶
Observability sink for DSL strategy executions.
This service does two things for every DslStrategy.calculate_points
call (the wiring lives in app.engine.dsl_strategy):
Emits the Prometheus metrics defined in
app.engine.dsl_metrics. Always synchronous, always cheap; never blocks scoring.Persists a row to
strategyexecutionlogwhen the sampler picks the run or when the run failed. Errors are always kept so a post-mortem can replay the input via/simulateeven months after the incident. OK runs are sampled at_config_module.configs.DSL_EXECUTION_LOG_SAMPLE_RATE(default 5 %).
Hot-path: record no longer awaits the DB write.
The metrics emit + sampling decision stay synchronous (microseconds),
and the chosen row is handed to a bounded in-process queue drained by a
background worker task. Scoring therefore pays only the enqueue, never a
DB round-trip. If the worker falls behind and the queue fills (a slow or
down database), rows are dropped and counted via
dsl_execution_log_dropped_total rather than blocking the scoring
call - the audit log is best-effort by design. Call aclose()
on shutdown to flush the queue and stop the worker cleanly.
The service is wired with random.Random so tests can pass a
seeded instance and assert on exact rows persisted. In production the
default random module instance is used.
Persistence is best-effort: any exception from the repository is caught and logged, never re-raised. The engine must never fail a scoring call because a metrics row couldn’t be written.
- class app.services.dsl_observability_service.DslExecutionObserver(execution_log_repository=None, *, rng=None, queue_maxsize=None)[source]¶
Bases:
objectLives as a long-lived dependency (one per process is fine – it is stateless aside from the rng + repository pointer). Injected into
DslStrategyvia the container so tests can swap a no-op implementation.- Parameters:
execution_log_repository (Optional[StrategyExecutionLogRepository])
rng (Optional[random.Random])
queue_maxsize (Optional[int])
- async record(*, strategyId, strategyVersion, strategyType, realmId, externalGameId, externalTaskId, externalUserId, status, errorCode, points, caseName, durationMs, nodesExecuted, trace, parentStrategyId=None)[source]¶
Single entry point called by
DslStrategy._run_phaseonce every execution finishes (success or failure).durationMsis the wall-clock time including precompute, not just the interpreter walk, because that is what the SLO speaks to (“a scoring event taking >250ms is user-visible”).- Parameters:
strategyId (str)
strategyVersion (int)
strategyType (str)
realmId (str | None)
externalGameId (str | None)
externalTaskId (str | None)
externalUserId (str | None)
status (str)
errorCode (str | None)
points (float | None)
caseName (str | None)
durationMs (float)
nodesExecuted (int)
trace (List[Dict[str, Any]] | None)
parentStrategyId (str | None)
- Return type:
None