from contextlib import AbstractAsyncContextManager
from typing import Callable
from sqlalchemy import and_
from sqlalchemy import delete as sa_delete
from sqlalchemy import func, or_, select
from sqlalchemy import update as sa_update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import configs
from app.core.exceptions import DuplicatedError, NotFoundError
from app.model.game_params import GamesParams
from app.model.games import Games
from app.model.task_params import TasksParams
from app.model.tasks import Tasks
from app.model.user_points import UserPoints
from app.repository.base_repository import BaseRepository
from app.schema.games_schema import BaseGameResult, FindGameResult
from app.util.query_builder import dict_to_sqlalchemy_filter_options
[docs]
class GameRepository(BaseRepository):
"""
Repository class for games.
"""
def __init__(
self,
session_factory: Callable[..., AbstractAsyncContextManager[AsyncSession]],
model=Games,
model_tasks=Tasks,
model_game_params=GamesParams,
model_tasks_params=TasksParams,
model_user_points=UserPoints,
) -> None:
self.model_tasks = model_tasks
self.model_game_params = model_game_params
self.model_tasks_params = model_tasks_params
self.model_user_points = model_user_points
super().__init__(session_factory, model)
[docs]
async def get_all_games(
self,
schema,
api_key=None,
oauth_user_id=None,
is_admin: bool = False,
):
"""
List games visible to the caller, filtered, ordered and paginated.
Non-null fields on ``schema`` become filters and drive
ordering/pagination. Unless ``is_admin`` is ``True``, results are
restricted to games owned by the supplied ``api_key`` /
``oauth_user_id``.
Args:
schema: Search schema with optional filters and
ordering/page/page_size.
api_key: API key of the caller, used for ownership scoping.
oauth_user_id: OAuth subject of the caller, used for scoping.
is_admin (bool): When ``True``, bypass ownership scoping and list
all games.
Returns:
A paginated result with the matching games and search metadata.
"""
async with self.session_factory() as session:
schema_as_dict = schema.model_dump(exclude_none=True)
ordering = schema_as_dict.get("ordering", configs.ORDERING)
order_query = (
getattr(self.model, ordering[1:]).desc()
if ordering.startswith("-")
else getattr(self.model, ordering).asc()
)
page = schema_as_dict.get("page", configs.PAGE)
page_size = schema_as_dict.get("page_size", configs.PAGE_SIZE)
filter_options = dict_to_sqlalchemy_filter_options(
self.model, schema_as_dict
)
# Build the column/scope predicate once and reuse it for both
# the COUNT and the page query so the two never disagree.
where_clause = filter_options
if not is_admin:
scope_filters = []
if api_key:
scope_filters.append(Games.apiKey_used == api_key)
if oauth_user_id:
scope_filters.append(Games.oauth_user_id == oauth_user_id)
if not scope_filters:
return FindGameResult(
items=[],
search_options={
"page": page,
"page_size": page_size,
"ordering": ordering,
"total_count": 0,
},
)
where_clause = and_(filter_options, or_(*scope_filters))
# total_count is the number of matching *games*, independent of
# the current page - the dashboard needs it to render page
# controls. Counting here (not len(items)) is what makes real
# pagination possible.
total_count = int(
(
await session.execute(
select(func.count()).select_from(Games).filter(where_clause)
)
).scalar()
or 0
)
# Paginate distinct games first. The previous implementation
# applied LIMIT/OFFSET to a Games⋈GamesParams join, so a game
# with N params consumed N rows of the page budget and the page
# held fewer than page_size games. Selecting games on their own
# keeps the page sized in games; params are fetched separately.
games_stmt = select(Games).filter(where_clause).order_by(order_query)
if page_size != "all":
games_stmt = games_stmt.limit(page_size).offset((page - 1) * page_size)
game_rows = (await session.execute(games_stmt)).scalars().all()
params_by_game = {}
game_ids = [g.id for g in game_rows]
if game_ids:
params_rows = (
(
await session.execute(
select(GamesParams).filter(GamesParams.gameId.in_(game_ids))
)
)
.scalars()
.all()
)
for param in params_rows:
params_by_game.setdefault(param.gameId, []).append(
{
"id": param.id,
"key": param.key,
"value": param.value,
}
)
items = [
BaseGameResult(
gameId=game.id,
updated_at=game.updated_at,
strategyId=game.strategyId,
created_at=game.created_at,
externalGameId=game.externalGameId,
platform=game.platform,
params=params_by_game.get(game.id, []),
)
for game in game_rows
]
return FindGameResult(
items=items,
search_options={
"page": page,
"page_size": page_size,
"ordering": ordering,
"total_count": total_count,
},
)
[docs]
async def get_game_by_id(self, id: str):
"""
Fetch a game together with its configured parameters.
Args:
id (str): Internal game identifier.
Returns:
BaseGameResult: The game and its ``params`` (id/key/value).
Raises:
NotFoundError: If no game has the given id.
"""
async with self.session_factory() as session:
game = (
(await session.execute(select(self.model).filter(self.model.id == id)))
.scalars()
.first()
)
if not game:
raise NotFoundError(detail=f"Not found id : {id}")
params = (
(
await session.execute(
select(self.model_game_params).filter(
self.model_game_params.gameId == id
)
)
)
.scalars()
.all()
)
game_params = [{"id": p.id, "key": p.key, "value": p.value} for p in params]
return BaseGameResult(
gameId=game.id,
created_at=game.created_at,
updated_at=game.updated_at,
externalGameId=game.externalGameId,
platform=game.platform,
params=game_params,
)
[docs]
async def patch_game_by_id(self, gameId: str, schema):
"""
Partially update a game's columns from ``schema``.
Only non-null fields on ``schema`` are written to the row.
Args:
gameId (str): Internal game identifier.
schema: Patch schema whose non-null fields are applied.
Returns:
The updated game.
Raises:
NotFoundError: If no game has the given id.
"""
async with self.session_factory() as session:
game = (
(
await session.execute(
select(self.model).filter(self.model.id == gameId)
)
)
.scalars()
.first()
)
if not game:
raise NotFoundError(detail=f"Not found id : {gameId}")
for key, value in schema.model_dump(exclude_none=True).items():
setattr(game, key, value)
try:
await session.commit()
await session.refresh(game)
except IntegrityError as e:
raise DuplicatedError(detail=str(e.orig))
return await self.get_game_by_id(gameId)
[docs]
async def list_by_strategy_id(self, strategy_id: str):
"""
Return all games whose ``strategyId`` matches the given value.
Used by the rollback cascade to know which games will be
reassigned to the rolled-back version (the actual UPDATE goes
through :meth:`bulk_update_strategy_id`; this helper is exposed
for audit logging and tests).
"""
async with self.session_factory() as session:
stmt = select(self.model).filter(self.model.strategyId == strategy_id)
return list((await session.execute(stmt)).scalars().all())
[docs]
async def list_external_ids(self, ids) -> dict:
"""
Map internal game ``id`` → ``externalGameId`` for the given ids.
Used by the strategy-usage view to render the parent game
of a task-level assignment by its human-readable external id
instead of a raw UUID, in a single query rather than N reads.
"""
unique_ids = list({i for i in ids if i is not None})
if not unique_ids:
return {}
async with self.session_factory() as session:
stmt = select(self.model.id, self.model.externalGameId).filter(
self.model.id.in_(unique_ids)
)
return {
row.id: row.externalGameId
for row in (await session.execute(stmt)).all()
}
[docs]
async def bulk_update_strategy_id(
self, *, old_strategy_id: str, new_strategy_id: str
) -> int:
"""
Rewrite every game's ``strategyId`` from ``old_strategy_id`` to
``new_strategy_id`` in a single UPDATE. Returns the row count so
the caller can log/audit the cascade.
Used by the rollback flow: when a published custom
strategy is rolled back, the games that pointed at the previous
UUID get reassigned to the target UUID in one trip so no game is
left referring to an ARCHIVED row.
"""
async with self.session_factory() as session:
result = await session.execute(
sa_update(self.model)
.where(self.model.strategyId == old_strategy_id)
.values(strategyId=new_strategy_id)
)
await session.commit()
return int(result.rowcount or 0)
[docs]
async def delete_game_by_id(self, game_id: str):
"""
Delete a game by its internal identifier.
Args:
game_id (str): Internal game identifier.
Raises:
NotFoundError: If no game has the given id.
"""
try:
async with self.session_factory() as session:
game = (
(
await session.execute(
select(self.model).filter(self.model.id == game_id)
)
)
.scalars()
.first()
)
if not game:
raise NotFoundError(detail=f"Not found id : {game_id}")
await session.execute(
sa_delete(self.model_game_params).where(
self.model_game_params.gameId == game_id
)
)
tasks = (
(
await session.execute(
select(self.model_tasks).filter(
self.model_tasks.gameId == game_id
)
)
)
.scalars()
.all()
)
for task in tasks:
await session.execute(
sa_delete(self.model_tasks_params).where(
self.model_tasks_params.taskId == task.id
)
)
await session.execute(
sa_delete(self.model_user_points).where(
self.model_user_points.taskId == task.id
)
)
await session.execute(
sa_delete(self.model_tasks).where(
self.model_tasks.gameId == game_id
)
)
await session.delete(game)
await session.commit()
return True
except IntegrityError as e:
raise DuplicatedError(detail=str(e.orig))
except NotFoundError:
raise
except Exception as e:
raise NotFoundError(detail=str(e))