import logging
import logging.config
import os
import subprocess
from contextlib import asynccontextmanager
import sentry_sdk
import toml
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
from fastapi.responses import RedirectResponse
from prometheus_fastapi_instrumentator import Instrumentator
from starlette.middleware.cors import CORSMiddleware
from app.api.v1.routes import routers as v1_routers
from app.core.config import configs
from app.core.container import Container
from app.middlewares.error_handler import CatchUnhandledErrorsMiddleware
from app.util.class_object import singleton
def _configure_logging() -> None:
"""
Configure the root logger and uvicorn/gunicorn loggers process-wide.
Emits structured JSON logs in ``prod``/``stage`` environments and
human-readable plain text everywhere else. The log level is taken from
the ``LOG_LEVEL`` environment variable (defaulting to ``INFO``). Called
once at import time so handlers are in place before the app is built.
"""
use_json = configs.ENV in {"prod", "stage"}
level = os.getenv("LOG_LEVEL", "INFO").upper()
config = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"plain": {
"format": "%(asctime)s %(levelname)s %(name)s: %(message)s",
},
"json": {
"()": "pythonjsonlogger.jsonlogger.JsonFormatter",
"fmt": "%(asctime)s %(levelname)s %(name)s %(message)s",
"rename_fields": {
"asctime": "timestamp",
"levelname": "level",
"name": "logger",
},
},
},
"handlers": {
"stdout": {
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
"formatter": "json" if use_json else "plain",
},
},
"root": {"level": level, "handlers": ["stdout"]},
"loggers": {
"uvicorn": {"handlers": ["stdout"], "level": level, "propagate": False},
"uvicorn.error": {
"handlers": ["stdout"],
"level": level,
"propagate": False,
},
"uvicorn.access": {
"handlers": ["stdout"],
"level": level,
"propagate": False,
},
"gunicorn": {"handlers": ["stdout"], "level": level, "propagate": False},
"gunicorn.error": {
"handlers": ["stdout"],
"level": level,
"propagate": False,
},
"gunicorn.access": {
"handlers": ["stdout"],
"level": level,
"propagate": False,
},
},
}
logging.config.dictConfig(config)
_configure_logging()
logger = logging.getLogger(__name__)
[docs]
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
FastAPI lifespan context manager handling graceful shutdown.
Yields immediately on startup (no startup work needed) and, on shutdown,
flushes the buffered DSL execution-log queue so a graceful stop does not
drop pending audit rows. The flush is best-effort: failures are logged
and swallowed so they never block shutdown.
Args:
app (FastAPI): The application instance whose ``state`` may hold the
``dsl_execution_observer`` to flush.
"""
yield
# Flush the DSL execution-log queue so a graceful
# shutdown doesn't drop buffered audit rows. ``aclose`` is
# idempotent and tolerant of an observer that never enqueued.
observer = getattr(app.state, "dsl_execution_observer", None)
if observer is not None:
try:
await observer.aclose()
except Exception: # pragma: no cover - shutdown best-effort
logger.warning(
"Failed to flush DSL execution-log queue on shutdown",
exc_info=True,
)
[docs]
def get_project_data():
"""
Retrieves project data from the pyproject.toml file.
Returns:
dict: Project data from the toml file.
"""
pyproject_path = "pyproject.toml"
with open(pyproject_path, "r") as pyproject_file:
pyproject_content = toml.load(pyproject_file)
return pyproject_content["tool"]["poetry"]
project_data = get_project_data()
[docs]
def get_swagger_oauth_config() -> dict:
"""
Builds Swagger UI OAuth init configuration from environment-based settings.
Returns:
dict: Swagger OAuth init configuration.
"""
oauth_config = {}
if configs.KEYCLOAK_CLIENT_ID:
oauth_config["clientId"] = configs.KEYCLOAK_CLIENT_ID
if configs.KEYCLOAK_CLIENT_SECRET:
oauth_config["clientSecret"] = configs.KEYCLOAK_CLIENT_SECRET
return oauth_config
[docs]
def custom_openapi():
"""
Customizes the OpenAPI schema for the FastAPI application.
Returns:
dict: The customized OpenAPI schema.
"""
servers = [{"url": configs.API_V1_STR, "description": "Local"}]
extra_server_url = configs.EXTRA_SERVER_URL
extra_server_description = configs.EXTRA_SERVER_DESCRIPTION
if extra_server_url:
servers.append(
{"url": extra_server_url, "description": extra_server_description}
)
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title=project_data["name"],
version=project_data["version"],
description=project_data["description"],
routes=app.routes,
servers=servers,
)
for path in list(openapi_schema["paths"].keys()):
if path.startswith("/api/v1"):
openapi_schema["paths"][path[7:]] = openapi_schema["paths"].pop(path)
app.openapi_schema = openapi_schema
return app.openapi_schema
[docs]
def get_git_commit_hash() -> str:
"""
Returns the current git commit hash, or "unknown" if not available.
Returns:
str: The current git commit hash, or "unknown" if not available.
"""
try:
commit_hash = (
subprocess.check_output(["git", "rev-parse", "HEAD"])
.decode("ascii")
.strip()
)
except Exception:
commit_hash = "unknown"
return commit_hash
@singleton
class AppCreator:
"""
Singleton class to create and configure the FastAPI application.
"""
def __init__(self):
if configs.SENTRY_DSN:
sentry_sdk.init(
dsn=configs.SENTRY_DSN,
environment=configs.SENTRY_ENVIRONMENT,
release=configs.SENTRY_RELEASE,
send_default_pii=True,
traces_sample_rate=1.0,
_experiments={
"continuous_profiling_auto_start": True,
},
)
self.app = FastAPI(
lifespan=lifespan,
root_path=configs.ROOT_PATH,
title=project_data["name"],
version=project_data["version"],
description=project_data["description"],
license_info={"name": project_data["license"]},
contact={
"name": project_data["authors"][0],
"email": project_data["authors"][0],
},
redoc_url="/redocs",
docs_url="/docs",
servers=[{"url": configs.API_V1_STR, "description": "Local"}],
swagger_ui_init_oauth=get_swagger_oauth_config(),
)
self.app.openapi = custom_openapi
self.container = Container()
self.db = self.container.db()
# Expose the singleton execution-log observer so the
# lifespan shutdown hook can flush its background queue.
self.app.state.dsl_execution_observer = self.container.dsl_execution_observer()
# Added before CORSMiddleware on purpose: add_middleware prepends, so
# the CORS layer added below stays the outermost user middleware and
# wraps this one. That lets unhandled 500s be rendered from inside the
# stack and still receive CORS headers (otherwise the browser blocks
# them and the dashboard shows a bare "Network Error").
self.app.add_middleware(CatchUnhandledErrorsMiddleware)
if configs.BACKEND_CORS_ORIGINS:
self.app.add_middleware(
CORSMiddleware,
allow_origins=[str(origin) for origin in configs.BACKEND_CORS_ORIGINS],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Expose Prometheus /metrics. Wired here
# (after CORS, before include_router) so the instrumentor sees
# every request middleware but does NOT itself sit behind any
# router-level auth dependency. The custom DSL counters in
# app/engine/dsl_metrics.py already live in the default
# prometheus_client registry, so Instrumentator.expose() emits
# them automatically without extra wiring.
if configs.METRICS_ENABLED:
Instrumentator(
should_group_status_codes=True,
should_ignore_untemplated=True,
excluded_handlers=["/metrics"],
).instrument(self.app).expose(
self.app,
endpoint="/metrics",
include_in_schema=False,
tags=["observability"],
)
@self.app.get("/", include_in_schema=False)
def read_root():
"""
Redirect to /docs
return:
RedirectResponse: Redirect to /docs
"""
return RedirectResponse(url="/docs")
self.app.include_router(v1_routers, prefix=configs.API_V1_STR)
app_creator = AppCreator()
app = app_creator.app
db = app_creator.db
container = app_creator.container