Source code for app.services.strategy_observability_service

"""
Observability aggregation service.

Reads from the same ``strategyexecutionlog`` table the engine writes
via :class:`DslExecutionObserver` and shapes it into the metrics view
the dashboard renders for a single strategy, plus the A/B comparison
endpoint that runs the same aggregation against two ids and computes
deltas.

The repository already does the heavy lifting (SUM/COUNT/GROUP BY in
the DB); this service is responsible for:

  * Combining 5 narrow queries into one response - the dashboard fetches
    once per page load.
  * Computing percentiles + histograms in Python from a bounded sample,
    because SQLite (used in tests) doesn't have ``percentile_cont`` and
    PostgreSQL's window-function variant is expensive on tables with
    millions of rows.
  * Tenant scoping: callers pass the strategy id, we resolve it via
    :class:`StrategyDefinitionService.get_strategy` which 404s on
    cross-realm probes, so this service is implicitly tenant-aware.
"""

from __future__ import annotations

from datetime import datetime
from typing import List, Optional

from app.repository.strategy_execution_log_repository import \
    StrategyExecutionLogRepository
from app.schema.strategy_definition_schema import StrategyDefinitionRead
from app.schema.strategy_observability_schema import (CaseCount, DurationPercentiles,
                                                      ErrorCount, HistogramBucket,
                                                      MetricsDelta, StatusBreakdown,
                                                      StrategyComparisonResponse,
                                                      StrategyMetricsResponse)
from app.services.strategy_definition_service import StrategyDefinitionService

# Bucket edges chosen to align with the Prometheus histogram in
# ``dsl_metrics.DSL_LATENCY_BUCKETS`` so the dashboard's view of a
# strategy is consistent with what the on-call alert sees. Last bucket
# is "5 s and beyond" because anything past that has already tripped a
# DSL_TIMEOUT, where time stops being meaningful.
_DURATION_BUCKETS_MS = (
    5.0,
    10.0,
    25.0,
    50.0,
    100.0,
    150.0,
    200.0,
    250.0,
    500.0,
    1000.0,
    2500.0,
    5000.0,
)


def _percentile(sorted_values: List[float], pct: float) -> float:
    """Linear-interpolation percentile over a pre-sorted list. Matches
    NumPy's default behaviour so test expectations line up with what
    ops would see in Grafana."""
    if not sorted_values:
        return 0.0
    if len(sorted_values) == 1:
        return sorted_values[0]
    k = (len(sorted_values) - 1) * pct
    f = int(k)
    c = min(f + 1, len(sorted_values) - 1)
    if f == c:
        return sorted_values[f]
    return sorted_values[f] + (sorted_values[c] - sorted_values[f]) * (k - f)


def _duration_histogram(samples: List[float]) -> List[HistogramBucket]:
    """Bucket a sample of durations onto :data:`_DURATION_BUCKETS_MS`."""
    counts = [0] * (len(_DURATION_BUCKETS_MS) + 1)
    for v in samples:
        placed = False
        for i, edge in enumerate(_DURATION_BUCKETS_MS):
            if v <= edge:
                counts[i] += 1
                placed = True
                break
        if not placed:
            counts[-1] += 1
    buckets: List[HistogramBucket] = []
    prev = 0.0
    for i, edge in enumerate(_DURATION_BUCKETS_MS):
        buckets.append(
            HistogramBucket(
                label=f"≤{int(edge) if edge >= 1 else edge}ms",
                upperBound=edge,
                count=counts[i],
            )
        )
        prev = edge
    buckets.append(
        HistogramBucket(
            label=f">{int(prev)}ms",
            upperBound=None,
            count=counts[-1],
        )
    )
    return buckets


def _points_histogram(samples: List[float]) -> List[HistogramBucket]:
    """
    Adaptive bucketing for the points distribution: we don't know the
    range a priori (some strategies emit fractions, others whole
    hundreds), so derive 10 evenly-spaced bins from min..max. Falls
    back to a single all-zeros bucket when the sample is empty.
    """
    if not samples:
        return [HistogramBucket(label="0", upperBound=0.0, count=0)]
    lo = min(samples)
    hi = max(samples)
    if hi <= lo:
        return [
            HistogramBucket(
                label=f"{lo:g}",
                upperBound=lo,
                count=len(samples),
            )
        ]
    n_buckets = 10
    width = (hi - lo) / n_buckets
    counts = [0] * n_buckets
    for v in samples:
        idx = int((v - lo) / width)
        if idx >= n_buckets:
            idx = n_buckets - 1
        counts[idx] += 1
    out: List[HistogramBucket] = []
    for i in range(n_buckets):
        upper = lo + width * (i + 1)
        out.append(
            HistogramBucket(
                label=f"{lo + width * i:g}{upper:g}",
                upperBound=upper,
                count=counts[i],
            )
        )
    return out


def _status_breakdown(raw: dict) -> StatusBreakdown:
    """Normalise the repo's free-form ``{status: count}`` map onto the
    enum the dashboard expects, lumping anything unrecognised into
    ``other`` so a new status code doesn't blow up the chart."""
    known = {"ok", "error", "timeout", "limit"}
    ok = int(raw.get("ok", 0))
    error = int(raw.get("error", 0))
    timeout = int(raw.get("timeout", 0))
    limit = int(raw.get("limit", 0))
    other = sum(int(v) for k, v in raw.items() if k not in known)
    total = ok + error + timeout + limit + other
    return StatusBreakdown(
        ok=ok,
        error=error,
        timeout=timeout,
        limit=limit,
        other=other,
        total=total,
    )


[docs] class StrategyObservabilityService: """Single entrypoint for the dashboard view + A/B view.""" def __init__( self, execution_log_repository: StrategyExecutionLogRepository, strategy_definition_service: StrategyDefinitionService, ) -> None: self._repo = execution_log_repository self._strategy_definition_service = strategy_definition_service
[docs] async def get_metrics( self, *, id: str, realmId: Optional[str], sinceDt: Optional[datetime] = None, untilDt: Optional[datetime] = None, ) -> StrategyMetricsResponse: """Build the per-strategy metrics card. Resolves the strategy first so the response includes name + version + status - and so cross-realm probes 404 before we spend a query on the execution log. """ strategy = await self._strategy_definition_service.get_strategy( id=id, realmId=realmId ) return await self._build_response( strategy=strategy, sinceDt=sinceDt, untilDt=untilDt, )
[docs] async def compare( self, *, idA: str, idB: str, realmId: Optional[str], sinceDt: Optional[datetime] = None, untilDt: Optional[datetime] = None, ) -> StrategyComparisonResponse: """Run the metrics aggregation twice in parallel-ish (awaited sequentially since each takes ~5 narrow queries) and compute deltas server-side. Both ids are validated through the same get_strategy path so neither can leak from a foreign realm.""" a = await self.get_metrics( id=idA, realmId=realmId, sinceDt=sinceDt, untilDt=untilDt ) b = await self.get_metrics( id=idB, realmId=realmId, sinceDt=sinceDt, untilDt=untilDt ) # Per-run averages. Sum of points is meaningless for direct # comparison when run counts differ; the per-event average is # the apples-to-apples number. avg_points_a = ( a.pointsSum / a.statusBreakdown.total if a.statusBreakdown.total else 0.0 ) avg_points_b = ( b.pointsSum / b.statusBreakdown.total if b.statusBreakdown.total else 0.0 ) delta = MetricsDelta( successRate=b.successRate - a.successRate, errorRate=b.errorRate - a.errorRate, p95Ms=b.duration.p95Ms - a.duration.p95Ms, avgMs=b.duration.avgMs - a.duration.avgMs, pointsAvg=avg_points_b - avg_points_a, ) return StrategyComparisonResponse(a=a, b=b, delta=delta)
async def _build_response( self, *, strategy: StrategyDefinitionRead, sinceDt: Optional[datetime], untilDt: Optional[datetime], ) -> StrategyMetricsResponse: """ Assemble the metrics response for one strategy over a time window. Aggregates execution counts, status/error breakdowns and latency percentiles for the strategy within the optional date range. Args: strategy (StrategyDefinitionRead): The strategy to report on. sinceDt (Optional[datetime]): Inclusive lower time bound. untilDt (Optional[datetime]): Inclusive upper time bound. Returns: StrategyMetricsResponse: The aggregated metrics. """ # Strategy executions persist with the bare definition id # (StrategyDefinitionLog.strategyId - see how the observer is # called from DslStrategy._run_phase). We pass that id verbatim # rather than the assignable ``custom:<uuid>`` form. strat_id = strategy.id status_raw = await self._repo.count_by_status( strategyId=strat_id, sinceDt=sinceDt, untilDt=untilDt ) top_errors = await self._repo.count_by_error_code( strategyId=strat_id, sinceDt=sinceDt, untilDt=untilDt ) top_cases = await self._repo.count_by_case_name( strategyId=strat_id, sinceDt=sinceDt, untilDt=untilDt ) summary = await self._repo.duration_and_nodes_summary( strategyId=strat_id, sinceDt=sinceDt, untilDt=untilDt ) duration_samples = await self._repo.sample_durations( strategyId=strat_id, sinceDt=sinceDt, untilDt=untilDt ) points_samples = await self._repo.sample_points( strategyId=strat_id, sinceDt=sinceDt, untilDt=untilDt ) breakdown = _status_breakdown(status_raw) if breakdown.total: success_rate = breakdown.ok / breakdown.total error_rate = ( breakdown.error + breakdown.timeout + breakdown.limit ) / breakdown.total else: success_rate = 0.0 error_rate = 0.0 sorted_durations = sorted(duration_samples) percentiles = DurationPercentiles( avgMs=summary["durationAvgMs"], p50Ms=_percentile(sorted_durations, 0.50), p95Ms=_percentile(sorted_durations, 0.95), p99Ms=_percentile(sorted_durations, 0.99), maxMs=summary["durationMaxMs"], sampleSize=len(sorted_durations), ) return StrategyMetricsResponse( strategyId=strat_id, name=strategy.name, version=strategy.version, status=strategy.status, windowFrom=sinceDt, windowUntil=untilDt, statusBreakdown=breakdown, successRate=success_rate, errorRate=error_rate, duration=percentiles, durationHistogram=_duration_histogram(duration_samples), topErrors=[ ErrorCount(code=e["code"], count=e["count"]) for e in top_errors ], topCases=[ CaseCount(caseName=c["caseName"], count=c["count"]) for c in top_cases ], pointsHistogram=_points_histogram(points_samples), pointsSum=summary["pointsSum"], nodesAvg=summary["nodesAvg"], nodesMax=summary["nodesMax"], )