app.repository.strategy_execution_log_repository module¶
Repository for StrategyExecutionLog rows.
Writes are best-effort: the engine never blocks scoring on the audit log, so the service layer wraps the insert in its own try/except. The read methods power the runbook UI and the post-mortem command-line helpers.
Aggregation queries power the observability dashboard: counts grouped by status / error code / case name, a percentile-friendly sample of durations, and a time-bucketed timeseries. All of them are single round-trips so the dashboard renders in one fetch.
- class app.repository.strategy_execution_log_repository.StrategyExecutionLogRepository(session_factory, model=<class 'app.model.strategy_execution_log.StrategyExecutionLog'>)[source]¶
Bases:
BaseRepository- Parameters:
session_factory (Callable[[...], AbstractAsyncContextManager[AsyncSession]])
- async insert_row(row)[source]¶
Insert a pre-built model instance directly. Bypasses
BaseRepository.createbecause we don’t have a pydantic schema for these – the row is assembled by the observer from interpreter output and persisted as-is.- Parameters:
row (StrategyExecutionLog)
- Return type:
None
- async list_for_strategy(*, strategyId, status=None, limit=50)[source]¶
Most-recent-first listing for the runbook view.
statusfilter is the common “show me the failures” use case from the runbook. The result is capped at 200 so the JSON payload stays small even when the operator forgets to passlimit.- Parameters:
strategyId (str)
status (str | None)
limit (int)
- Return type:
List[StrategyExecutionLog]
- async count_by_status(*, strategyId, sinceDt=None, untilDt=None)[source]¶
Counts grouped by
status(ok/error/timeout/limit).- Parameters:
strategyId (str)
sinceDt (datetime | None)
untilDt (datetime | None)
- Return type:
Dict[str, int]
- async count_by_error_code(*, strategyId, sinceDt=None, untilDt=None, limit=10)[source]¶
Top error codes by frequency. Most failures share one code.
- Parameters:
strategyId (str)
sinceDt (datetime | None)
untilDt (datetime | None)
limit (int)
- Return type:
List[Dict[str, Any]]
- async count_by_case_name(*, strategyId, sinceDt=None, untilDt=None, limit=10)[source]¶
Which rule cases the strategy is firing - null caseName = no rule matched (fell back to defaultPoints).
- Parameters:
strategyId (str)
sinceDt (datetime | None)
untilDt (datetime | None)
limit (int)
- Return type:
List[Dict[str, Any]]
- async duration_and_nodes_summary(*, strategyId, sinceDt=None, untilDt=None)[source]¶
Summary stats: count, avg/min/max/sum for duration and nodes, plus sum of points. Percentiles are computed in Python from a bounded sample because some backends (SQLite, used in tests) don’t support
percentile_cont.- Parameters:
strategyId (str)
sinceDt (datetime | None)
untilDt (datetime | None)
- Return type:
Dict[str, Any]
- async sample_durations(*, strategyId, sinceDt=None, untilDt=None, limit=1000)[source]¶
Sample of raw durations used to compute p50/p95/p99 in the service layer. Bounded at 1000 rows so we don’t fetch the whole log when a busy strategy has millions of entries - that yields ±2% accuracy on p95 which is fine for the UI.
- Parameters:
strategyId (str)
sinceDt (datetime | None)
untilDt (datetime | None)
limit (int)
- Return type:
List[float]
- async sample_points(*, strategyId, sinceDt=None, untilDt=None, limit=1000)[source]¶
Sample of awarded points. Powers the points-distribution histogram.
pointsis null on failed runs so we filter those.- Parameters:
strategyId (str)
sinceDt (datetime | None)
untilDt (datetime | None)
limit (int)
- Return type:
List[float]