Source code for app.services.apikey_cache_backend

"""
Pluggable cache backends for the API key header lookup.

The in-memory backend preserves the original per-process dict (zero
dependency, fast, but one cache per gunicorn worker -- revocations only
reach the worker that handled the request and other workers serve the
cached value until TTL expires). The Redis backend stores entries in a
single shared keyspace so every worker observes revocations on the next
request.

Selection is driven by ``configs.APIKEY_CACHE_BACKEND`` ("memory" or
"redis") and wired in ``app/core/container.py``.
"""

from __future__ import annotations

import asyncio
import json
import logging
from time import monotonic
from types import SimpleNamespace
from typing import Optional, Protocol, runtime_checkable

from app.services.rate_limit_counter_backend import build_redis_client_from_url

logger = logging.getLogger(__name__)


[docs] @runtime_checkable class ApiKeyCacheBackend(Protocol): """Cache the resolved ``(apiKey, active)`` tuple keyed by sha256 hash."""
[docs] async def get(self, cache_key: str) -> Optional[SimpleNamespace]: """Return the cached value for ``cache_key``, or ``None`` if absent.""" ...
[docs] async def set( self, cache_key: str, value: SimpleNamespace, ttl_seconds: int ) -> None: """Store ``value`` under ``cache_key`` with a ``ttl_seconds`` lifetime.""" ...
[docs] async def delete(self, cache_key: str) -> None: """Remove the entry for ``cache_key`` if present.""" ...
[docs] async def clear(self) -> None: """Remove every cached entry.""" ...
[docs] class InMemoryApiKeyCacheBackend: """ Per-process dict cache with monotonic TTL and lazy eviction. The asyncio lock is lazy-bound on first use and reset by ``sync_clear`` so tests that swap event loops between cases don't trip "attached to a different loop". This mirrors the behavior the cache had when it lived on ``ApiKeyService`` directly. """ def __init__(self) -> None: self._store: dict = {} self._lock: Optional[asyncio.Lock] = None def _get_lock(self) -> asyncio.Lock: """ Return the asyncio lock, binding it to the current loop on first use. Returns: asyncio.Lock: The lazily-created per-instance lock. """ if self._lock is None: self._lock = asyncio.Lock() return self._lock
[docs] async def get(self, cache_key: str) -> Optional[SimpleNamespace]: """ Return the cached value for ``cache_key``, evicting it if expired. Args: cache_key (str): The lookup key (sha256 of the API key plaintext). Returns: Optional[SimpleNamespace]: The cached ``(apiKey, active)`` value, or ``None`` if absent or past its TTL. """ now = monotonic() async with self._get_lock(): entry = self._store.get(cache_key) if entry is None: return None expires_at, value = entry if expires_at <= now: self._store.pop(cache_key, None) return None return value
[docs] async def set( self, cache_key: str, value: SimpleNamespace, ttl_seconds: int ) -> None: """ Cache ``value`` under ``cache_key`` for ``ttl_seconds`` (no-op if ≤ 0). Args: cache_key (str): The lookup key. value (SimpleNamespace): The ``(apiKey, active)`` value to store. ttl_seconds (int): Lifetime in seconds; values ≤ 0 are ignored. """ if ttl_seconds <= 0: return expires_at = monotonic() + ttl_seconds async with self._get_lock(): self._store[cache_key] = (expires_at, value)
[docs] async def delete(self, cache_key: str) -> None: """ Remove the cached entry for ``cache_key`` if present. Args: cache_key (str): The lookup key to evict. """ async with self._get_lock(): self._store.pop(cache_key, None)
[docs] async def clear(self) -> None: """Remove every cached entry (async wrapper over ``sync_clear``).""" self.sync_clear()
[docs] def sync_clear(self) -> None: """Clear the store synchronously and reset the lock for a new loop.""" # dict.clear() is atomic under the GIL; resetting the lock lets the # next async caller bind it to the current event loop. self._store.clear() self._lock = None
[docs] class RedisApiKeyCacheBackend: """ Redis-backed cache shared across workers. Values are stored as compact JSON (``{"apiKey": <prefix>, "active": <bool>}``) with TTL set via the ``EX`` argument on ``SET`` so Redis handles expiration server-side. """ def __init__(self, client, key_prefix: str = "game:apikey:") -> None: self._client = client self._key_prefix = key_prefix def _build_key(self, cache_key: str) -> str: """ Namespace a cache key with the configured Redis prefix. Args: cache_key (str): The bare lookup key. Returns: str: The prefixed Redis key. """ return f"{self._key_prefix}{cache_key}"
[docs] async def get(self, cache_key: str) -> Optional[SimpleNamespace]: """ Read and deserialize a cached value from Redis. Args: cache_key (str): The lookup key. Returns: Optional[SimpleNamespace]: The decoded ``(apiKey, active)`` value, or ``None`` if the key is absent. """ raw = await self._client.get(self._build_key(cache_key)) if raw is None: return None payload = json.loads(raw) return SimpleNamespace( apiKey=payload.get("apiKey"), active=payload.get("active") )
[docs] async def set( self, cache_key: str, value: SimpleNamespace, ttl_seconds: int ) -> None: """ Store ``value`` as JSON in Redis with a server-side TTL (no-op if ≤ 0). Args: cache_key (str): The lookup key. value (SimpleNamespace): The ``(apiKey, active)`` value to store. ttl_seconds (int): Expiry passed to Redis ``SET ... EX``. """ if ttl_seconds <= 0: return payload = json.dumps({"apiKey": value.apiKey, "active": value.active}) await self._client.set( self._build_key(cache_key), payload, ex=max(1, int(ttl_seconds)) )
[docs] async def delete(self, cache_key: str) -> None: """ Delete a single cached entry from Redis. Args: cache_key (str): The lookup key to evict. """ await self._client.delete(self._build_key(cache_key))
[docs] async def clear(self) -> None: """Delete every entry under the key prefix via a bounded SCAN + DEL.""" # SCAN + DEL is bounded and non-blocking for other clients; only # called from admin/maintenance paths, never the request hot path. pattern = f"{self._key_prefix}*" async for key in self._client.scan_iter(match=pattern, count=500): await self._client.delete(key)
[docs] def build_apikey_cache_backend( backend_name: str, redis_url: Optional[str], redis_key_prefix: str, ) -> ApiKeyCacheBackend: """ Select the configured backend. Falls back to the in-memory backend with a warning when Redis is requested but ``REDIS_URL`` is missing -- so a misconfigured deploy still authenticates (just with the per-process consistency caveat) instead of failing requests outright. """ normalized = (backend_name or "memory").strip().lower() if normalized == "redis": if not redis_url: logger.warning( "APIKEY_CACHE_BACKEND=redis but REDIS_URL is empty; " "falling back to the in-process API key cache." ) return InMemoryApiKeyCacheBackend() client = build_redis_client_from_url(redis_url) return RedisApiKeyCacheBackend(client, key_prefix=redis_key_prefix) return InMemoryApiKeyCacheBackend()