app.repository.strategy_execution_log_repository module

Repository for StrategyExecutionLog rows.

Writes are best-effort: the engine never blocks scoring on the audit log, so the service layer wraps the insert in its own try/except. The read methods power the runbook UI and the post-mortem command-line helpers.

Aggregation queries power the observability dashboard: counts grouped by status / error code / case name, a percentile-friendly sample of durations, and a time-bucketed timeseries. All of them are single round-trips so the dashboard renders in one fetch.

class app.repository.strategy_execution_log_repository.StrategyExecutionLogRepository(session_factory, model=<class 'app.model.strategy_execution_log.StrategyExecutionLog'>)[source]

Bases: BaseRepository

Parameters:

session_factory (Callable[[...], AbstractAsyncContextManager[AsyncSession]])

async insert_row(row)[source]

Insert a pre-built model instance directly. Bypasses BaseRepository.create because we don’t have a pydantic schema for these – the row is assembled by the observer from interpreter output and persisted as-is.

Parameters:

row (StrategyExecutionLog)

Return type:

None

async list_for_strategy(*, strategyId, status=None, limit=50)[source]

Most-recent-first listing for the runbook view.

status filter is the common “show me the failures” use case from the runbook. The result is capped at 200 so the JSON payload stays small even when the operator forgets to pass limit.

Parameters:
  • strategyId (str)

  • status (str | None)

  • limit (int)

Return type:

List[StrategyExecutionLog]

async count_by_status(*, strategyId, sinceDt=None, untilDt=None)[source]

Counts grouped by status (ok/error/timeout/limit).

Parameters:
  • strategyId (str)

  • sinceDt (datetime | None)

  • untilDt (datetime | None)

Return type:

Dict[str, int]

async count_by_error_code(*, strategyId, sinceDt=None, untilDt=None, limit=10)[source]

Top error codes by frequency. Most failures share one code.

Parameters:
  • strategyId (str)

  • sinceDt (datetime | None)

  • untilDt (datetime | None)

  • limit (int)

Return type:

List[Dict[str, Any]]

async count_by_case_name(*, strategyId, sinceDt=None, untilDt=None, limit=10)[source]

Which rule cases the strategy is firing - null caseName = no rule matched (fell back to defaultPoints).

Parameters:
  • strategyId (str)

  • sinceDt (datetime | None)

  • untilDt (datetime | None)

  • limit (int)

Return type:

List[Dict[str, Any]]

async duration_and_nodes_summary(*, strategyId, sinceDt=None, untilDt=None)[source]

Summary stats: count, avg/min/max/sum for duration and nodes, plus sum of points. Percentiles are computed in Python from a bounded sample because some backends (SQLite, used in tests) don’t support percentile_cont.

Parameters:
  • strategyId (str)

  • sinceDt (datetime | None)

  • untilDt (datetime | None)

Return type:

Dict[str, Any]

async sample_durations(*, strategyId, sinceDt=None, untilDt=None, limit=1000)[source]

Sample of raw durations used to compute p50/p95/p99 in the service layer. Bounded at 1000 rows so we don’t fetch the whole log when a busy strategy has millions of entries - that yields ±2% accuracy on p95 which is fine for the UI.

Parameters:
  • strategyId (str)

  • sinceDt (datetime | None)

  • untilDt (datetime | None)

  • limit (int)

Return type:

List[float]

async sample_points(*, strategyId, sinceDt=None, untilDt=None, limit=1000)[source]

Sample of awarded points. Powers the points-distribution histogram. points is null on failed runs so we filter those.

Parameters:
  • strategyId (str)

  • sinceDt (datetime | None)

  • untilDt (datetime | None)

  • limit (int)

Return type:

List[float]