app.services.dsl_observability_service module

Observability sink for DSL strategy executions.

This service does two things for every DslStrategy.calculate_points call (the wiring lives in app.engine.dsl_strategy):

  1. Emits the Prometheus metrics defined in app.engine.dsl_metrics. Always synchronous, always cheap; never blocks scoring.

  2. Persists a row to strategyexecutionlog when the sampler picks the run or when the run failed. Errors are always kept so a post-mortem can replay the input via /simulate even months after the incident. OK runs are sampled at _config_module.configs.DSL_EXECUTION_LOG_SAMPLE_RATE (default 5 %).

Hot-path: record no longer awaits the DB write. The metrics emit + sampling decision stay synchronous (microseconds), and the chosen row is handed to a bounded in-process queue drained by a background worker task. Scoring therefore pays only the enqueue, never a DB round-trip. If the worker falls behind and the queue fills (a slow or down database), rows are dropped and counted via dsl_execution_log_dropped_total rather than blocking the scoring call - the audit log is best-effort by design. Call aclose() on shutdown to flush the queue and stop the worker cleanly.

The service is wired with random.Random so tests can pass a seeded instance and assert on exact rows persisted. In production the default random module instance is used.

Persistence is best-effort: any exception from the repository is caught and logged, never re-raised. The engine must never fail a scoring call because a metrics row couldn’t be written.

class app.services.dsl_observability_service.DslExecutionObserver(execution_log_repository=None, *, rng=None, queue_maxsize=None)[source]

Bases: object

Lives as a long-lived dependency (one per process is fine – it is stateless aside from the rng + repository pointer). Injected into DslStrategy via the container so tests can swap a no-op implementation.

Parameters:
async record(*, strategyId, strategyVersion, strategyType, realmId, externalGameId, externalTaskId, externalUserId, status, errorCode, points, caseName, durationMs, nodesExecuted, trace, parentStrategyId=None)[source]

Single entry point called by DslStrategy._run_phase once every execution finishes (success or failure).

durationMs is the wall-clock time including precompute, not just the interpreter walk, because that is what the SLO speaks to (“a scoring event taking >250ms is user-visible”).

Parameters:
  • strategyId (str)

  • strategyVersion (int)

  • strategyType (str)

  • realmId (str | None)

  • externalGameId (str | None)

  • externalTaskId (str | None)

  • externalUserId (str | None)

  • status (str)

  • errorCode (str | None)

  • points (float | None)

  • caseName (str | None)

  • durationMs (float)

  • nodesExecuted (int)

  • trace (List[Dict[str, Any]] | None)

  • parentStrategyId (str | None)

Return type:

None

async drain()[source]

Block until every queued row has been processed.

Mainly for tests and graceful shutdown - production scoring never calls this. No-op if nothing has been enqueued yet.

Return type:

None

async aclose()[source]

Flush pending rows and stop the worker. Idempotent.

Wired into the FastAPI lifespan so an orderly shutdown doesn’t lose buffered execution logs.

Return type:

None

class app.services.dsl_observability_service.NoopDslExecutionObserver[source]

Bases: object

Placeholder observer used when the engine is exercised outside the DI container (e.g. raw unit tests that instantiate DslStrategy directly). Drops everything on the floor.

async record(**_)[source]

No-op observer hook; discards all execution observations.

Parameters:

_ (Any)

Return type:

None