Source code for app.services.apikey_service
from types import SimpleNamespace
from typing import Any, Optional
from fastapi import Security
from fastapi.security.api_key import APIKeyHeader
from app.core.config import configs
from app.core.exceptions import ForbiddenError, NotFoundError
from app.repository.apikey_repository import ApiKeyRepository
from app.services.apikey_cache_backend import (ApiKeyCacheBackend,
InMemoryApiKeyCacheBackend)
from app.services.base_service import BaseService
from app.util.generate_api_key import GeneratedApiKey, generate_api_key, hash_api_key
from app.util.response import Response
API_KEY_NAME = "X-API-Key"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
[docs]
class ApiKeyService(BaseService):
"""
Service class for API keys.
Plaintext keys are surfaced to the caller exactly once at creation and
never persisted. Authentication compares the sha256 hash of the value
presented in the ``X-API-Key`` header against the canonical
``apiKeyHash`` column.
Revocation consistency depends on ``configs.APIKEY_CACHE_BACKEND``:
- ``redis`` (recommended for multi-worker deployments): cache entries
live in a shared Redis keyspace, so a revoke on any worker is
observed by every other worker on its next request.
- ``memory`` (default, single process): each gunicorn worker keeps its
own dict; revocations only invalidate the local worker's entry and
remote workers continue serving the cached value until the TTL
(``API_KEY_HEADER_CACHE_TTL_SECONDS``, default 5s) expires.
Attributes:
apikey_repository (ApiKeyRepository): Repository instance for API
keys.
cache_backend (ApiKeyCacheBackend): Backing store for the resolved
``(apiKey, active)`` tuple keyed by sha256 hash.
"""
[docs]
def __init__(
self,
apikey_repository: ApiKeyRepository,
cache_backend: Optional[ApiKeyCacheBackend] = None,
) -> None:
"""
Initializes the ApiKeyService with the provided repository and cache.
Args:
apikey_repository: The repository instance.
cache_backend: The cache backend. Defaults to a fresh in-process
instance so direct instantiation (mostly tests) keeps working
without wiring through the DI container.
"""
self.apikey_repository = apikey_repository
self.cache_backend = cache_backend or InMemoryApiKeyCacheBackend()
super().__init__(apikey_repository)
[docs]
async def generate_api_key_service(self) -> GeneratedApiKey:
"""
Generate a cryptographically-secure API key whose prefix and hash do
not collide with any existing record.
"""
while True:
generated = generate_api_key()
hash_collision = await self.apikey_repository.read_by_column(
"apiKeyHash",
generated.key_hash,
not_found_raise_exception=False,
)
if hash_collision is not None:
continue
prefix_collision = await self.apikey_repository.read_by_column(
"apiKey",
generated.prefix,
not_found_raise_exception=False,
)
if prefix_collision is None:
return generated
[docs]
async def create_api_key(self, apikeyPostBody) -> Any:
"""
Persist a new API-key record.
Args:
apikeyPostBody: Schema describing the API key to store (including
its prefix and hash).
Returns:
Any: The created API-key entity.
"""
return await self.apikey_repository.create(apikeyPostBody)
[docs]
async def get_all_api_keys(self) -> Any:
"""
Return every API-key record.
Returns:
Any: All stored API-key entities.
"""
return await self.apikey_repository.read_all()
[docs]
async def revoke_api_key_by_prefix(self, prefix: str) -> Any:
"""
Revoke an API key identified by its public prefix and drop the
matching cache entry. The deactivated row's ``apiKeyHash`` is the
exact cache key used by ``get_api_key_header`` (both are
``hash_api_key(plaintext)``), so a precise ``delete`` is enough --
no need to nuke unrelated entries.
"""
row = await self.apikey_repository.read_by_column(
"apiKey", prefix, not_found_raise_exception=False
)
if row is None:
raise NotFoundError(detail=f"API key not found: {prefix}")
updated = await self.apikey_repository.update_attr(row.id, "active", False)
cache_key = getattr(row, "apiKeyHash", None)
if cache_key:
await self.cache_backend.delete(cache_key)
return updated