"""Read-only points queries.
Aggregations and lookups over ``user_points`` for games, tasks and users.
These methods never mutate state; the write path lives in
:mod:`app.services.user_points.assignment`.
"""
import asyncio
from typing import Any
from uuid import UUID
from app.core.exceptions import NotFoundError
from app.schema.games_schema import ListTasksWithUsers
from app.schema.task_schema import BaseUserFirstAction, TasksWithUsers
from app.schema.user_points_schema import (AllPointsByGame, GameDetail,
PointsAssignedToUser,
PointsAssignedToUserDetails,
ResponseGetPointsByGame,
ResponseGetPointsByTask,
ResponsePointsByExternalUserId, TaskDetail,
TaskPointsByGame, UserGamePoints)
from app.services.game_access import get_authorized_game, get_authorized_user
from app.services.user_points._base import FANOUT_LIMIT, UserPointsContext
[docs]
class PointsQueryMixin(UserPointsContext):
[docs]
async def query_user_points(self, schema) -> Any:
"""
Run a filtered, paginated query over the ``user_points`` table.
Args:
schema: Search schema with filters and ordering/pagination.
Returns:
Any: Items plus search metadata, as returned by the repository.
"""
return await self.user_points_repository.read_by_options(schema)
[docs]
async def get_users_by_gameId(
self,
gameId,
*,
api_key: str = None,
oauth_user_id: str = None,
is_admin: bool = False,
enforce_scope: bool = False,
) -> ListTasksWithUsers:
"""
List a game's tasks and, per task, the users who earned points.
For each user the response includes their first action timestamp on
that task.
Args:
gameId: Internal game identifier.
api_key (str): Caller's API key, used when ``enforce_scope``.
oauth_user_id (str): Caller's OAuth subject, used when scoping.
is_admin (bool): Whether the caller has the admin role.
enforce_scope (bool): When ``True``, verify the caller may access
the game before returning data.
Returns:
ListTasksWithUsers: Tasks each paired with their participating
users.
Raises:
NotFoundError: If the game or its tasks do not exist.
"""
if enforce_scope:
game = await get_authorized_game(
self.game_repository,
gameId,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
)
else:
game = await self.game_repository.read_by_column(
"id", gameId, not_found_raise_exception=False
)
if not game:
raise NotFoundError(detail=f"Game not found by gameId: {game}")
tasks = await self.task_repository.read_by_column(
"gameId", game.id, not_found_raise_exception=False, only_one=False
)
if not tasks:
raise NotFoundError(detail=f"Tasks not found by gameId: {game.id}")
response = []
all_tasks = []
for task in tasks:
all_externalUserId = []
points = await self.user_points_repository.get_points_and_users_by_taskId(
task.id
)
externalTaskId = task.externalTaskId
if points:
for point in points:
externalUserId = point.externalUserId
user = await self.users_repository.read_by_column(
"externalUserId", externalUserId, not_found_raise_exception=True
)
if not user:
raise NotFoundError(
detail=f"User not found by userId: {point.userId}. Please try again later or contact support" # noqa
)
first_user_point = await self.user_points_repository.get_first_user_points_in_external_task_id_by_user_id(
externalTaskId, externalUserId
)
all_externalUserId.append(
BaseUserFirstAction(
externalUserId=user.externalUserId,
created_at=str(user.created_at),
firstAction=str(first_user_point.created_at),
)
)
all_tasks = {"externalTaskId": externalTaskId, "users": all_externalUserId}
response.append(TasksWithUsers(**all_tasks))
return ListTasksWithUsers(gameId=gameId, tasks=response)
[docs]
async def get_points_by_user_list(
self,
users_list,
*,
api_key: str = None,
oauth_user_id: str = None,
is_admin: bool = False,
enforce_scope: bool = False,
) -> list[UserGamePoints]:
"""
Fetch per-game point totals for a list of users, concurrently.
Fans out one :meth:`get_all_points_by_externalUserId` call per user,
bounded by a concurrency semaphore.
Args:
users_list: External user identifiers to look up.
api_key (str): Caller's API key, used when ``enforce_scope``.
oauth_user_id (str): Caller's OAuth subject, used when scoping.
is_admin (bool): Whether the caller has the admin role.
enforce_scope (bool): When ``True``, enforce per-user access checks.
Returns:
list[UserGamePoints]: One aggregate result per requested user.
"""
semaphore = asyncio.Semaphore(FANOUT_LIMIT)
async def _fetch(user) -> UserGamePoints:
"""Fetch one user's totals under the shared concurrency semaphore."""
async with semaphore:
return await self.get_all_points_by_externalUserId(
user,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
enforce_scope=enforce_scope,
)
return list(await asyncio.gather(*[_fetch(u) for u in users_list]))
[docs]
async def get_points_by_externalUserId(
self,
externalUserId,
*,
api_key: str = None,
oauth_user_id: str = None,
is_admin: bool = False,
enforce_scope: bool = False,
) -> list[AllPointsByGame]:
"""
Return a user's points across every game they participate in.
Resolves the user, finds the games behind their points rows, and
aggregates each game's detailed points concurrently.
Args:
externalUserId: External identifier of the user.
api_key (str): Caller's API key, used when ``enforce_scope``.
oauth_user_id (str): Caller's OAuth subject, used when scoping.
is_admin (bool): Whether the caller has the admin role.
enforce_scope (bool): When ``True``, verify access to the user.
Returns:
list[AllPointsByGame]: Detailed points grouped per game.
Raises:
NotFoundError: If the user does not exist.
"""
if enforce_scope:
user = await get_authorized_user(
self.users_repository,
externalUserId,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
)
else:
user = await self.users_repository.read_by_column(
"externalUserId", externalUserId, not_found_raise_exception=True
)
if not user:
raise NotFoundError(
detail=f"User not found by externalUserId: {externalUserId}"
)
tasks_of_users = await self.user_points_repository.get_task_by_externalUserId(
externalUserId
)
semaphore = asyncio.Semaphore(FANOUT_LIMIT)
async def _fetch(task) -> AllPointsByGame:
"""Resolve a task's game and fetch its detailed points, bounded."""
async with semaphore:
game = await self.game_repository.read_by_column(
"id", task.gameId, not_found_raise_exception=True
)
return await self.get_points_by_gameId_with_details(
game.id,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
enforce_scope=enforce_scope,
)
response = list(
await asyncio.gather(*[_fetch(task) for task in tasks_of_users])
)
new_response = []
for game in response:
for task in game.task:
for point in task.points:
if point.externalUserId == externalUserId:
new_response.append(
AllPointsByGame(
externalGameId=game.externalGameId,
created_at=game.created_at,
task=[
TaskPointsByGame(
externalTaskId=task.externalTaskId,
points=[
PointsAssignedToUser(
externalUserId=point.externalUserId,
points=point.points,
timesAwarded=point.timesAwarded,
pointsData=point.pointsData,
)
],
)
],
)
)
return new_response
[docs]
async def get_points_by_gameId(
self,
gameId,
*,
api_key: str = None,
oauth_user_id: str = None,
is_admin: bool = False,
enforce_scope: bool = False,
) -> AllPointsByGame:
"""
Aggregate all points awarded within a game.
Args:
gameId: Internal game identifier.
api_key (str): Caller's API key, used when ``enforce_scope``.
oauth_user_id (str): Caller's OAuth subject, used when scoping.
is_admin (bool): Whether the caller has the admin role.
enforce_scope (bool): When ``True``, verify access to the game.
Returns:
AllPointsByGame: The game's aggregated points.
Raises:
NotFoundError: If the game does not exist.
"""
if enforce_scope:
game = await get_authorized_game(
self.game_repository,
gameId,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
)
else:
game = await self.game_repository.read_by_column(
"id", gameId, not_found_message=f"Game with gameId: {gameId} not found"
)
tasks = await self.task_repository.read_by_column(
"gameId", game.id, not_found_raise_exception=False, only_one=False
)
if not tasks:
raise NotFoundError(detail=f"Tasks not found by gameId: {game.id}")
game_points = []
for task in tasks:
user_points = []
points = await self.user_points_repository.get_points_and_users_by_taskId(
task.id
)
if points:
for point in points:
points_of_user = PointsAssignedToUser(
externalUserId=point.externalUserId,
points=point.points,
timesAwarded=point.timesAwarded,
)
user_points.append(points_of_user)
task_points = TaskPointsByGame(
externalTaskId=task.externalTaskId, points=user_points
)
game_points.append(task_points)
response = AllPointsByGame(
externalGameId=game.externalGameId,
created_at=str(game.created_at),
task=game_points,
)
return response
[docs]
async def get_points_by_gameId_with_details(
self,
gameId: UUID,
*,
api_key: str = None,
oauth_user_id: str = None,
is_admin: bool = False,
enforce_scope: bool = False,
) -> AllPointsByGame:
"""
Aggregate a game's points including per-award detail.
Like :meth:`get_points_by_gameId` but the result carries the detailed
per-award breakdown for each task/user.
Args:
gameId (UUID): Internal game identifier.
api_key (str): Caller's API key, used when ``enforce_scope``.
oauth_user_id (str): Caller's OAuth subject, used when scoping.
is_admin (bool): Whether the caller has the admin role.
enforce_scope (bool): When ``True``, verify access to the game.
Returns:
AllPointsByGame: The game's aggregated points with detail.
Raises:
NotFoundError: If the game does not exist.
"""
if enforce_scope:
game = await get_authorized_game(
self.game_repository,
gameId,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
)
else:
game = await self.game_repository.read_by_column(
"id", gameId, not_found_message=f"Game with gameId: {gameId} not found"
)
tasks = await self.task_repository.read_by_column(
"gameId", game.id, not_found_raise_exception=False, only_one=False
)
if not tasks:
raise NotFoundError(detail=f"Tasks not found by gameId: {game.id}")
game_points = []
for task in tasks:
user_points = []
points = await self.user_points_repository.get_points_and_users_by_taskId(
task.id
)
if points:
for point in points:
points_of_user = PointsAssignedToUserDetails(
externalUserId=point.externalUserId,
points=point.points,
timesAwarded=point.timesAwarded,
pointsData=point.pointsData,
)
user_points.append(points_of_user)
task_points = TaskPointsByGame(
externalTaskId=task.externalTaskId, points=user_points
)
game_points.append(task_points)
response = AllPointsByGame(
externalGameId=game.externalGameId,
created_at=str(game.created_at),
task=game_points,
)
return response
[docs]
async def get_points_of_user_in_game(
self,
gameId,
externalUserId,
*,
api_key: str = None,
oauth_user_id: str = None,
is_admin: bool = False,
enforce_scope: bool = False,
) -> list[PointsAssignedToUser]:
"""
Return one user's point awards within a single game.
Args:
gameId: Internal game identifier.
externalUserId: External identifier of the user.
api_key (str): Caller's API key, used when ``enforce_scope``.
oauth_user_id (str): Caller's OAuth subject, used when scoping.
is_admin (bool): Whether the caller has the admin role.
enforce_scope (bool): When ``True``, verify access to the game.
Returns:
list[PointsAssignedToUser]: The user's awards in the game.
Raises:
NotFoundError: If the game or user does not exist.
"""
if enforce_scope:
game = await get_authorized_game(
self.game_repository,
gameId,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
)
else:
game = await self.game_repository.read_by_column(
"id", gameId, not_found_raise_exception=False
)
if not game:
raise NotFoundError(detail=f"Game not found by gameId: {gameId}")
user = await self.users_repository.read_by_column(
"externalUserId", externalUserId, not_found_raise_exception=False
)
if not user:
raise NotFoundError(
detail=f"User not found by externalUserId: {externalUserId}"
)
tasks = await self.task_repository.read_by_column(
"gameId", game.id, not_found_raise_exception=False, only_one=False
)
if not tasks:
raise NotFoundError(detail=f"Tasks not found by gameId: {game.id}")
response = []
for task in tasks:
points = await self.user_points_repository.get_points_and_users_by_taskId(
task.id
)
if points:
for point in points:
if point.externalUserId == externalUserId:
response.append(
PointsAssignedToUser(
externalUserId=point.externalUserId,
points=point.points,
timesAwarded=point.timesAwarded,
)
)
return response
[docs]
async def get_users_points_by_externalGameId(
self, externalGameId
) -> list[ResponseGetPointsByGame]:
"""
Return per-user point totals for a game identified by external id.
Args:
externalGameId: External identifier of the game.
Returns:
list[ResponseGetPointsByGame]: Aggregated points per user/task in
the game.
Raises:
NotFoundError: If the game or its tasks do not exist.
"""
game = await self.game_repository.read_by_column(
column="externalGameId",
value=externalGameId,
not_found_message=(f"Game with externalGameId {externalGameId} not found"),
)
tasks = await self.task_repository.read_by_column(
"gameId", game.id, only_one=False, not_found_raise_exception=False
)
if not tasks:
raise NotFoundError(
f"The game with externalGameId {externalGameId} has no tasks"
)
response = []
for task in tasks:
points = await self.user_points_repository.get_points_and_users_by_taskId(
task.id
)
response_by_task = []
if points:
for point in points:
response_by_task.append(
ResponseGetPointsByTask(
externalUserId=point.externalUserId, points=point.points
)
)
if response_by_task:
response.append(
ResponseGetPointsByGame(
externalTaskId=task.externalTaskId, points=response_by_task
)
)
return response
[docs]
async def get_users_points_by_externalTaskId(
self, externalTaskId
) -> list[ResponseGetPointsByTask]:
"""
Return per-user point totals for a task identified by external id.
Args:
externalTaskId: External identifier of the task.
Returns:
list[ResponseGetPointsByTask]: Aggregated points per user on the
task.
Raises:
NotFoundError: If the task does not exist.
"""
task = await self.task_repository.read_by_column(
column="externalTaskId",
value=externalTaskId,
not_found_message=(f"Task with externalTaskId {externalTaskId} not found"),
)
points_by_task = (
await self.user_points_repository.get_points_and_users_by_taskId(task.id)
)
cleaned_points_by_task = []
if points_by_task:
for point in points_by_task:
cleaned_points_by_task.append(
ResponseGetPointsByTask(
externalUserId=point.externalUserId, points=point.points
)
)
return cleaned_points_by_task
[docs]
async def get_users_points_by_externalTaskId_and_externalUserId(
self, externalTaskId, externalUserId
) -> Any:
"""
Return a single user's points on a single task (both by external id).
Args:
externalTaskId: External identifier of the task.
externalUserId: External identifier of the user.
Returns:
Any: The user's points for that task.
Raises:
NotFoundError: If the task or user does not exist.
"""
task = await self.task_repository.read_by_column(
column="externalTaskId",
value=externalTaskId,
not_found_message=(f"Task with externalTaskId {externalTaskId} not found"),
)
user = await self.users_repository.read_by_column(
column="externalUserId",
value=externalUserId,
not_found_message=(f"User with externalUserId {externalUserId} not found"),
)
points = await self.user_points_repository.read_by_columns(
{"taskId": task.id, "userId": user.id}
)
return points
[docs]
async def get_all_points_by_externalUserId(
self,
externalUserId,
*,
api_key: str = None,
oauth_user_id: str = None,
is_admin: bool = False,
enforce_scope: bool = False,
) -> UserGamePoints:
"""
Return one user's points aggregated across all their games.
Args:
externalUserId: External identifier of the user.
api_key (str): Caller's API key, used when ``enforce_scope``.
oauth_user_id (str): Caller's OAuth subject, used when scoping.
is_admin (bool): Whether the caller has the admin role.
enforce_scope (bool): When ``True``, verify access to the user.
Returns:
UserGamePoints: The user's points grouped by game.
Raises:
NotFoundError: If the user does not exist.
"""
if enforce_scope:
user_data = await get_authorized_user(
self.users_repository,
externalUserId,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
)
else:
user_data = await self.users_repository.read_by_column(
column="externalUserId",
value=externalUserId,
not_found_message=(
f"User with externalUserId {externalUserId} not found"
),
not_found_raise_exception=False,
)
if not user_data:
return UserGamePoints(
externalUserId=externalUserId,
points=0,
timesAwarded=0,
games=[],
userExists=False,
)
tasks = await self.user_points_repository.get_task_by_externalUserId(
externalUserId
)
response = []
for task in tasks:
game = await self.game_repository.read_by_column(
"id", task.gameId, not_found_raise_exception=True
)
response.append(
await self.get_points_by_gameId_with_details(
game.id,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
enforce_scope=enforce_scope,
)
)
total_points = 0
total_times_awarded = 0
games = []
for game in response:
for task in game.task:
task_points = 0
task_times_awarded = 0
task_details = []
for point in task.points:
if point.externalUserId == externalUserId:
task_points += point.points
task_times_awarded += point.timesAwarded
if point.points > 0:
task_details.append(
TaskDetail(
externalTaskId=task.externalTaskId,
pointsData=point.pointsData,
)
)
total_points += task_points
total_times_awarded += task_times_awarded
if task_points > 0 and len(task_details) > 0:
games.append(
GameDetail(
externalGameId=game.externalGameId,
points=task_points,
timesAwarded=task_times_awarded,
tasks=task_details,
)
)
return UserGamePoints(
externalUserId=externalUserId,
points=total_points,
timesAwarded=total_times_awarded,
games=games,
)
[docs]
async def get_points_of_user(
self, externalUserId
) -> ResponsePointsByExternalUserId:
"""
Return a user's total points plus a per-task breakdown.
Args:
externalUserId: External identifier of the user.
Returns:
ResponsePointsByExternalUserId: The summed total and the per-task
points list.
Raises:
NotFoundError: If the user does not exist.
"""
user = await self.users_repository.read_by_column(
column="externalUserId",
value=externalUserId,
not_found_message=(f"User with externalUserId {externalUserId} not found"),
)
points = await self.user_points_repository.get_task_and_sum_points_by_userId(
user.id
)
total_points = 0
for point in points:
total_points += point.points
response = ResponsePointsByExternalUserId(
externalUserId=externalUserId,
points=total_points,
points_by_task=points, # noqa
)
return response
[docs]
async def get_points_of_simulated_task(self, externalTaskId, simulationHash) -> Any:
"""
Return points rows produced by a specific simulation run of a task.
Args:
externalTaskId: External identifier of the task.
simulationHash: Hash identifying the simulation run.
Returns:
Any: The points rows belonging to that simulation.
"""
return await self.user_points_repository.get_points_of_simulated_task(
externalTaskId, simulationHash
)
[docs]
async def get_all_point_of_tasks_list(self, list_ids_tasks, withData=False) -> Any:
"""
Return all points rows for a list of task ids.
Args:
list_ids_tasks: Internal task ids to fetch points for.
withData (bool): When ``True``, include the full JSON ``data``
column; otherwise return a lighter projection.
Returns:
Any: The matching points rows.
"""
return await self.user_points_repository.get_all_point_of_tasks_list(
list_ids_tasks, withData
)