from typing import Any, Optional
from uuid import UUID
from app.core.exceptions import BadRequestError, ConflictError, NotFoundError
from app.engine.all_engine_strategies import all_engine_strategies
from app.model.strategy_definition import StrategyDefinitionStatus
from app.repository.game_params_repository import GameParamsRepository
from app.repository.game_repository import GameRepository
from app.repository.task_params_repository import TaskParamsRepository
from app.repository.task_repository import TaskRepository
from app.repository.user_points_repository import UserPointsRepository
from app.repository.user_repository import UserRepository
from app.schema.task_schema import (CreateTask, CreateTaskPost,
CreateTaskPostSuccesfullyCreated, FindTask,
PatchTask, ResponseDeleteTask, ResponsePatchTask)
from app.schema.tasks_params_schema import (CreateTaskParams, InsertTaskParams,
UpdateTaskParams)
from app.services.base_service import BaseService
from app.services.game_access import get_authorized_game
from app.services.strategy_definition_service import StrategyDefinitionService
from app.services.strategy_service import (StrategyService, is_custom_strategy_id,
parse_custom_strategy_id, resolve_realm_id)
[docs]
class TaskService(BaseService):
"""
Service class for managing tasks.
Attributes:
strategy_service (StrategyService): Service instance for strategies.
task_repository (TaskRepository): Repository instance for tasks.
game_repository (GameRepository): Repository instance for games.
user_repository (UserRepository): Repository instance for users.
user_points_repository (UserPointsRepository): Repository instance for
user points.
game_params_repository (GameParamsRepository): Repository instance for
game parameters.
task_params_repository (TaskParamsRepository): Repository instance for
task parameters.
"""
[docs]
def __init__(
self,
strategy_service: StrategyService,
task_repository: TaskRepository,
game_repository: GameRepository,
user_repository: UserRepository,
user_points_repository: UserPointsRepository,
game_params_repository: GameParamsRepository,
task_params_repository: TaskParamsRepository,
strategy_definition_service: Optional[StrategyDefinitionService] = None,
) -> None:
"""
Initializes the TaskService with the provided repositories and
services.
``strategy_definition_service`` is optional so existing call sites
and tests that don't exercise ``custom:`` strategyIds keep
working. Required for :meth:`patch_task_by_id` to validate
``custom:`` ids against the persistent registry.
"""
self.strategy_service = strategy_service
self.task_repository = task_repository
self.game_repository = game_repository
self.user_repository = user_repository
self.user_points_repository = user_points_repository
self.game_params_repository = game_params_repository
self.task_params_repository = task_params_repository
self.strategy_definition_service = strategy_definition_service
super().__init__(task_repository)
async def _validate_strategy_assignment(
self,
strategy_id: str,
*,
api_key: Optional[str] = None,
oauth_user_id: Optional[str] = None,
) -> None:
"""
Mirror of ``GameService._validate_strategy_assignment``. Kept here
as a sibling rather than a shared helper to avoid a third
cross-service import path; the body is small enough that
duplication is cheaper than the extra abstraction.
"""
if not is_custom_strategy_id(strategy_id):
strategies = all_engine_strategies()
strategy = next((s for s in strategies if s.id == strategy_id), None)
if not strategy:
raise NotFoundError(detail=f"Strategy with id: {strategy_id} not found")
return
if self.strategy_definition_service is None:
raise BadRequestError(
detail=(
"Custom strategy assignment is unavailable: "
"StrategyDefinitionService not wired."
)
)
realmId = resolve_realm_id(api_key=api_key, oauth_user_id=oauth_user_id)
uuid_part = parse_custom_strategy_id(strategy_id)
definition = await self.strategy_definition_service.get_strategy(
id=uuid_part, realmId=realmId
)
if definition.status != StrategyDefinitionStatus.PUBLISHED.value:
raise BadRequestError(
detail=(
f"Only PUBLISHED custom strategies can be assigned. "
f"Strategy '{definition.name}' v{definition.version} "
f"is {definition.status}."
)
)
[docs]
async def patch_task_by_id(
self,
gameId: UUID,
taskId: UUID,
schema: PatchTask,
*,
api_key: Optional[str] = None,
oauth_user_id: Optional[str] = None,
is_admin: bool = False,
enforce_scope: bool = False,
) -> ResponsePatchTask:
"""
Partially update a task identified by ``gameId`` + ``taskId``.
Mutable fields: ``strategyId``, ``status`` and ``params``. When
``params`` is provided it is treated as the desired full set and
synced via :meth:`_sync_task_params` (update existing rows by id,
create rows without an id, delete rows omitted from the list);
omit ``params`` to leave them untouched.
``strategyId`` accepts both built-ins and ``custom:<uuid>`` -
validated by :meth:`_validate_strategy_assignment` with the
caller's tenant boundary.
"""
if enforce_scope:
game = await get_authorized_game(
self.game_repository,
gameId,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
)
else:
game = await self.game_repository.read_by_id(
gameId, not_found_raise_exception=False
)
if not game:
raise NotFoundError(detail=f"Game not found by gameId: {gameId}")
task = await self.task_repository.read_by_id(
taskId, not_found_raise_exception=False
)
if not task:
raise NotFoundError(detail=f"Task not found by taskId: {taskId}")
if str(task.gameId) != str(gameId):
# Mismatched parent - 404 instead of 400 so we don't leak
# which other game the task actually belongs to.
raise NotFoundError(
detail=(f"Task {taskId} does not belong to game {gameId}.")
)
# Build the scalar update payload only with non-None fields the
# caller actually wants to change. ``params`` is handled separately
# (synced below) so it doesn't go through ``patch_by_id``.
update_fields = {}
if schema.strategyId is not None:
await self._validate_strategy_assignment(
schema.strategyId,
api_key=api_key,
oauth_user_id=oauth_user_id,
)
update_fields["strategyId"] = schema.strategyId
if schema.status is not None:
update_fields["status"] = schema.status
params_provided = schema.params is not None
# An empty patch (no scalar fields and no params block) is rejected
# to keep the audit trail meaningful.
if not update_fields and not params_provided:
raise ConflictError(detail="Empty patch: no fields provided.")
# Only the scalar fields that actually differ get written; this also
# lets us reject a pure no-op field patch (when params aren't touched)
# without blocking a params-only change.
changed_fields = {
k: v for k, v in update_fields.items() if getattr(task, k) != v
}
if not changed_fields and not params_provided:
raise ConflictError(
detail=(
"It is not possible to update the task with the same data"
)
)
if changed_fields:
task = await self.task_repository.patch_by_id(taskId, changed_fields)
updated_params = None
if params_provided:
updated_params = await self._sync_task_params(
taskId, schema.params, api_key=api_key
)
return ResponsePatchTask(
taskId=task.id,
gameId=gameId,
externalTaskId=task.externalTaskId,
strategyId=task.strategyId,
status=task.status,
taskParams=updated_params,
message=(f"Task with taskId: {taskId} updated successfully"),
)
async def _sync_task_params(
self,
taskId: UUID,
params: list[UpdateTaskParams],
*,
api_key: Optional[str] = None,
) -> list:
"""
Reconcile a task's params to the desired set in ``params``.
For each entry: a present ``id`` matching an existing param updates
it in place; otherwise a new param is created. Existing params whose
id is absent from ``params`` are deleted. Values are persisted as
strings, mirroring task creation. Returns the surviving param rows.
"""
existing = (
await self.task_params_repository.read_by_column(
"taskId",
taskId,
not_found_raise_exception=False,
only_one=False,
)
or []
)
existing_by_id = {str(p.id): p for p in existing}
kept_ids = set()
result = []
for param in params:
param_id = getattr(param, "id", None)
id_str = str(param_id) if param_id is not None else None
if id_str and id_str in existing_by_id:
kept_ids.add(id_str)
patched = await self.task_params_repository.patch_task_params_by_id(
param_id,
CreateTaskParams(key=param.key, value=str(param.value)),
)
result.append(patched)
else:
to_insert = InsertTaskParams(
key=param.key,
value=str(param.value),
taskId=str(taskId),
apiKey_used=api_key,
)
created = await self.task_params_repository.create(to_insert)
result.append(created)
for id_str, row in existing_by_id.items():
if id_str not in kept_ids:
await self.task_params_repository.delete_by_id(row.id)
return result
async def _get_task_in_game(
self,
gameId: UUID,
taskId: UUID,
*,
api_key: Optional[str] = None,
oauth_user_id: Optional[str] = None,
is_admin: bool = False,
enforce_scope: bool = False,
):
"""
Resolve a task that must belong to ``gameId``, honouring scope.
Shared guard for the ``delete``/``duplicate`` task flows
(and a sibling of the inline checks in :meth:`patch_task_by_id`):
the game has to exist (and be in the caller's scope when
``enforce_scope`` is set), the task has to exist, and it has to be
a child of that game. A mismatched parent is reported as 404 so we
don't leak which other game the task actually belongs to.
Returns the resolved task row.
"""
if enforce_scope:
game = await get_authorized_game(
self.game_repository,
gameId,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
)
else:
game = await self.game_repository.read_by_id(
gameId, not_found_raise_exception=False
)
if not game:
raise NotFoundError(detail=f"Game not found by gameId: {gameId}")
task = await self.task_repository.read_by_id(
taskId, not_found_raise_exception=False
)
if not task:
raise NotFoundError(detail=f"Task not found by taskId: {taskId}")
if str(task.gameId) != str(gameId):
raise NotFoundError(
detail=(f"Task {taskId} does not belong to game {gameId}.")
)
return task
[docs]
async def delete_task_by_id(
self,
gameId: UUID,
taskId: UUID,
*,
api_key: Optional[str] = None,
oauth_user_id: Optional[str] = None,
is_admin: bool = False,
enforce_scope: bool = False,
) -> ResponseDeleteTask:
"""
Delete a task (and its params/points) identified by
``gameId`` + ``taskId``.
Full-stack addition mirroring
:meth:`GameService.delete_game_by_id`: the cascade lives in
:meth:`TaskRepository.delete_task_by_id` so task params and the
user-points rows that reference the task go before the task row.
"""
task = await self._get_task_in_game(
gameId,
taskId,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
enforce_scope=enforce_scope,
)
await self.task_repository.delete_task_by_id(taskId)
return ResponseDeleteTask(
taskId=task.id,
gameId=gameId,
externalTaskId=task.externalTaskId,
message=f"Task with taskId: {taskId} deleted successfully",
)
[docs]
async def duplicate_task(
self,
gameId: UUID,
taskId: UUID,
externalTaskId: str,
*,
api_key: Optional[str] = None,
oauth_user_id: Optional[str] = None,
is_admin: bool = False,
enforce_scope: bool = False,
) -> CreateTaskPostSuccesfullyCreated:
"""
Duplicate a task within the same game under a new
``externalTaskId``.
Deep-copies the source task's strategy and params onto the new
task. Reuses :meth:`create_task_by_game_id` for the heavy lifting
(uniqueness check on the new ``externalTaskId``, row + param
insertion, response shaping) so the duplicate path can't drift
from normal task creation.
"""
source = await self._get_task_in_game(
gameId,
taskId,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
enforce_scope=enforce_scope,
)
source_params = await self.task_params_repository.read_by_column(
"taskId", source.id, not_found_raise_exception=False, only_one=False
)
copied_params = [
CreateTaskParams(key=param.key, value=param.value)
for param in (source_params or [])
]
create_query = CreateTaskPost(
externalTaskId=externalTaskId,
strategyId=source.strategyId,
params=copied_params or None,
)
return await self.create_task_by_game_id(
gameId,
create_query,
api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
enforce_scope=enforce_scope,
)
[docs]
async def get_tasks_list_by_externalGameId(
self, externalGameId, find_query
) -> dict[str, Any]:
"""
Retrieves a list of tasks associated with a game by its external game
ID.
Args:
externalGameId (str): The external game ID.
find_query: The query for finding tasks.
Returns:
list: A list of tasks associated with the game.
"""
game = await self.game_repository.read_by_column(
"externalGameId",
externalGameId,
not_found_message=f"Game not found with externalGameId: "
f"{externalGameId}",
)
if not game:
raise NotFoundError(f"Game not found with externalGameId: {externalGameId}")
return await self.get_tasks_list_by_gameId(game.id, find_query)
[docs]
async def get_tasks_list_by_gameId(
self,
gameId,
find_query,
*,
api_key: str = None,
oauth_user_id: str = None,
is_admin: bool = False,
enforce_scope: bool = False,
) -> dict[str, Any]:
"""
Retrieves a list of tasks associated with a game by its game ID.
Args:
gameId (UUID): The game ID.
find_query: The query for finding tasks.
Returns:
list: A list of tasks associated with the game.
"""
if enforce_scope:
game = await get_authorized_game(
self.game_repository,
gameId,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
)
else:
game = await self.game_repository.read_by_id(
gameId, not_found_raise_exception=False
)
if not game:
raise NotFoundError(f"Game not found with gameId: {gameId}")
find_task_query = FindTask(
gameId=game.id, **find_query.model_dump(exclude_none=True)
)
all_tasks = await self.task_repository.read_by_gameId(find_task_query)
strategy_data = self.strategy_service.get_strategy_by_id(game.strategyId)
game_params = await self.game_params_repository.read_by_column(
"gameId", game.id, not_found_raise_exception=False, only_one=False
)
if game_params:
for param in game_params:
if param.key in strategy_data["variables"]:
try:
param.value = int(param.value)
except ValueError:
try:
param.value = float(param.value)
except ValueError:
pass
type_param = type(param.value)
type_strategy_variable = type(strategy_data["variables"][param.key])
if type_param == type_strategy_variable:
strategy_data["variables"][param.key] = param.value
cleaned_tasks = []
for task in all_tasks["items"]:
strategy_data = self.strategy_service.get_strategy_by_id(task.strategyId)
task_params = await self.task_params_repository.read_by_column(
"taskId", task.id, not_found_raise_exception=False, only_one=False
)
if game_params:
for param in game_params:
if param.key in strategy_data["variables"]:
try:
param.value = int(param.value)
except ValueError:
try:
param.value = float(param.value)
except ValueError:
pass
type_param = type(param.value)
type_strategy_variable = type(
strategy_data["variables"][param.key]
)
if type_param == type_strategy_variable:
strategy_data["variables"][param.key] = param.value
if task_params:
for param in task_params:
if param.key in strategy_data["variables"]:
try:
param.value = int(param.value)
except ValueError:
try:
param.value = float(param.value)
except ValueError:
pass
type_param = type(param.value)
type_strategy_variable = type(
strategy_data["variables"][param.key]
)
if type_param == type_strategy_variable:
strategy_data["variables"][param.key] = param.value
task_params = task_params if task_params else []
task_cleaned = task.model_dump()
task_cleaned["strategy"] = strategy_data
task_cleaned["gameParams"] = game_params
task_cleaned["taskParams"] = task_params
cleaned_tasks.append(task_cleaned)
all_tasks["items"] = cleaned_tasks
return all_tasks
[docs]
async def get_task_by_gameId_externalTaskId(
self,
gameId,
externalTaskId,
*,
api_key: str = None,
oauth_user_id: str = None,
is_admin: bool = False,
enforce_scope: bool = False,
) -> CreateTaskPostSuccesfullyCreated:
"""
Retrieves a task by its game ID and external task ID.
Args:
gameId (UUID): The game ID.
externalTaskId (str): The external task ID.
Returns:
CreateTaskPostSuccesfullyCreated: The task details.
"""
if enforce_scope:
game = await get_authorized_game(
self.game_repository,
gameId,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
)
else:
game = await self.game_repository.read_by_id(
gameId, not_found_raise_exception=False
)
if not game:
raise NotFoundError(f"Game not found with gameId: {gameId}")
task = await self.task_repository.read_by_gameId_and_externalTaskId(
game.id, externalTaskId
)
if not task:
raise NotFoundError(
f"Task not found with externalTaskId: {externalTaskId} for "
f"gameId: {gameId}"
)
strategy_data = self.strategy_service.get_strategy_by_id(task.strategyId)
game_params = await self.game_params_repository.read_by_column(
"gameId", game.id, not_found_raise_exception=False, only_one=False
)
if game_params:
for param in game_params:
if param.key in strategy_data["variables"]:
try:
param.value = int(param.value)
except ValueError:
try:
param.value = float(param.value)
except ValueError:
pass
type_param = type(param.value)
type_strategy_variable = type(strategy_data["variables"][param.key])
if type_param == type_strategy_variable:
strategy_data["variables"][param.key] = param.value
task_params = await self.task_params_repository.read_by_column(
"taskId", task.id, not_found_raise_exception=False, only_one=False
)
if task_params:
for param in task_params:
if param.key in strategy_data["variables"]:
try:
param.value = int(param.value)
except ValueError:
try:
param.value = float(param.value)
except ValueError:
pass
type_param = type(param.value)
type_strategy_variable = type(strategy_data["variables"][param.key])
if type_param == type_strategy_variable:
strategy_data["variables"][param.key] = param.value
response = CreateTaskPostSuccesfullyCreated(
externalTaskId=task.externalTaskId,
externalGameId=game.externalGameId,
gameParams=game_params,
taskParams=task_params,
strategy=strategy_data,
message=f"Task found successfully with externalTaskId: "
f"{task.externalTaskId} for gameId: {gameId}",
)
return response
[docs]
async def get_task_by_externalGameId_externalTaskId(
self,
gameId,
externalTaskId,
*,
api_key: str = None,
oauth_user_id: str = None,
is_admin: bool = False,
enforce_scope: bool = False,
) -> CreateTaskPostSuccesfullyCreated:
"""
Retrieves a task by its game ID and external task ID.
Args:
gameId (UUID): The game ID.
externalTaskId (str): The external task ID.
Returns:
CreateTaskPostSuccesfullyCreated: The task details.
"""
game = await self.game_repository.read_by_column(
"id",
gameId,
not_found_message=f"Game not found with id: " f"{gameId}",
only_one=True,
)
return await self.get_task_by_gameId_externalTaskId(
game.id,
externalTaskId,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
enforce_scope=enforce_scope,
)
[docs]
async def create_task_by_externalGameId(
self, externalGameId, create_query
) -> CreateTaskPostSuccesfullyCreated:
"""
Creates a task for a game by its external game ID.
Args:
externalGameId (str): The external game ID.
create_query: The query for creating the task.
Returns:
CreateTaskPostSuccesfullyCreated: The created task details.
"""
game = await self.game_repository.read_by_column(
"externalGameId",
externalGameId,
not_found_message=f"Game not found with externalGameId: "
f"{externalGameId}",
only_one=True,
)
return await self.create_task_by_game_id(game.id, externalGameId, create_query)
[docs]
async def create_task_by_game_id(
self,
gameId,
create_query,
api_key: str = None,
*,
oauth_user_id: str = None,
is_admin: bool = False,
enforce_scope: bool = False,
) -> CreateTaskPostSuccesfullyCreated:
"""
Creates a task for a game by its game ID.
Args:
gameId (UUID): The game ID.
create_query: The query for creating the task.
Returns:
CreateTaskPostSuccesfullyCreated: The created task details.
"""
if enforce_scope:
game_data = await get_authorized_game(
self.game_repository,
gameId,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
)
else:
game_data = await self.game_repository.read_by_id(
gameId, not_found_raise_exception=False
)
if not game_data:
raise NotFoundError(f"Game not found with gameId: {gameId}")
task = await self.task_repository.read_by_gameId_and_externalTaskId(
gameId, create_query.externalTaskId
)
if task:
raise ConflictError(
f"Task already exists with externalTaskId: "
f"{create_query.externalTaskId} for gameId: {gameId}"
)
strategy_id = create_query.strategyId
strategy_data = None
if strategy_id:
strategy_data = self.strategy_service.get_strategy_by_id(strategy_id)
if not strategy_data:
raise NotFoundError(
f"Strategy not found with strategyId: {strategy_id}"
)
if not strategy_id:
strategy_id = "default"
strategy_data = self.strategy_service.get_strategy_by_id(strategy_id)
strategy_id = str(strategy_id)
new_task_dict = create_query.model_dump()
new_task_dict["gameId"] = str(gameId)
if strategy_id:
new_task_dict["strategyId"] = str(strategy_id)
if api_key:
new_task_dict["apiKey_used"] = api_key
new_task = CreateTask(**new_task_dict)
created_task = await self.task_repository.create(new_task)
created_params = []
params = create_query.params
if params:
del create_query.params
for param in params:
params_dict = param.model_dump()
params_dict["taskId"] = str(created_task.id)
params_dict["value"] = str(params_dict["value"])
if api_key:
params_dict["apiKey_used"] = api_key
params_to_insert = InsertTaskParams(**params_dict)
created_param = await self.task_params_repository.create(
params_to_insert
)
created_params.append(created_param)
game_params = await self.game_params_repository.read_by_column(
"gameId", gameId, not_found_raise_exception=False, only_one=False
)
if game_params:
for param in game_params:
if param.key in strategy_data["variables"]:
try:
param.value = int(param.value)
except ValueError:
try:
param.value = float(param.value)
except ValueError:
pass
type_param = type(param.value)
type_strategy_variable = type(strategy_data["variables"][param.key])
if type_param == type_strategy_variable:
strategy_data["variables"][param.key] = param.value
if created_params:
for param in created_params:
if param.key in strategy_data["variables"]:
try:
param.value = int(param.value)
except ValueError:
try:
param.value = float(param.value)
except ValueError:
pass
type_param = type(param.value)
type_strategy_variable = type(strategy_data["variables"][param.key])
if type_param == type_strategy_variable:
strategy_data["variables"][param.key] = param.value
externalGameId = game_data.externalGameId
response = CreateTaskPostSuccesfullyCreated(
externalTaskId=created_task.externalTaskId,
externalGameId=externalGameId,
gameParams=game_params,
taskParams=created_params,
strategy=strategy_data,
message=f"Task created successfully with externalTaskId: "
f"{created_task.externalTaskId} for gameId: {gameId}",
)
return response
[docs]
async def get_task_detail_by_id(self, schema) -> dict[str, Any]:
"""
Retrieves task details by its ID.
Args:
schema: The schema containing the task ID.
Returns:
dict: The task and strategy details.
"""
taskId = schema.taskId
task = await self.task_repository.read_by_id(
taskId, not_found_message="Task not found by id: {taskId}"
)
strategyId = task.strategyId
strategy = None
if strategyId:
strategy = await self.strategy_repository.read_by_id(
strategyId,
not_found_message="Strategy not found by id: " f"{strategyId}",
)
return {"task": task, "strategy": strategy}
[docs]
async def get_points_by_task_id(
self,
gameId,
externalTaskId,
*,
api_key: str = None,
oauth_user_id: str = None,
is_admin: bool = False,
enforce_scope: bool = False,
) -> Any:
"""
Retrieves points by task ID.
Args:
gameId (UUID): The game ID.
externalTaskId (str): The external task ID.
Returns:
list: A list of points associated with the task.
"""
if enforce_scope:
game = await get_authorized_game(
self.game_repository,
gameId,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
)
else:
game = await self.game_repository.read_by_column(
"id",
gameId,
not_found_message=f"Game not found with gameId: {gameId}",
only_one=True,
)
task = await self.task_repository.read_by_gameId_and_externalTaskId(
game.id, externalTaskId
)
if not task:
raise NotFoundError(
f"Task not found with externalTaskId: {externalTaskId} for "
f"gameId: {gameId}"
)
task_id = task.id
user_points = await self.user_points_repository.get_all_UserPoints_by_taskId(
task_id
)
return user_points
[docs]
async def get_points_of_user_by_task_id(
self,
gameId,
externalTaskId,
externalUserId,
*,
api_key: str = None,
oauth_user_id: str = None,
is_admin: bool = False,
enforce_scope: bool = False,
) -> Any:
"""
Retrieves points of a user by task ID.
Args:
gameId (UUID): The game ID.
externalTaskId (str): The external task ID.
externalUserId (str): The external user ID.
Returns:
dict: The user's points details.
"""
points_task = await self.get_points_by_task_id_with_details(
gameId,
externalTaskId,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
enforce_scope=enforce_scope,
)
user_points = list(
filter(lambda x: x.externalUserId == externalUserId, points_task)
)
if not user_points:
raise NotFoundError(
f"User not found with externalUserId: {externalUserId} for "
f"externalTaskId: {externalTaskId} for gameId: {gameId}"
)
return user_points[0]
[docs]
async def get_points_by_task_id_with_details(
self,
gameId,
externalTaskId,
*,
api_key: str = None,
oauth_user_id: str = None,
is_admin: bool = False,
enforce_scope: bool = False,
) -> Any:
"""
Retrieves points by task ID with details.
Args:
gameId (UUID): The game ID.
externalTaskId (str): The external task ID.
Returns:
list: A list of points associated with the task.
"""
if enforce_scope:
await get_authorized_game(
self.game_repository,
gameId,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
)
task = await self.task_repository.read_by_gameId_and_externalTaskId(
gameId, externalTaskId
)
if not task:
raise NotFoundError(
f"Task not found with externalTaskId: {externalTaskId} for "
f"gameId: {gameId}"
)
task_id = task.id
user_points = await self.user_points_repository.get_all_UserPoints_by_taskId_with_details( # noqa: E501
task_id
)
return user_points
[docs]
async def get_task_params_by_externalTaskId(self, externalTaskId) -> Any:
"""
Retrieves task parameters by external task ID.
Args:
externalTaskId (str): The external task ID.
Returns:
list: A list of task parameters associated with the task.
"""
task = await self.task_repository.read_by_column(
"externalTaskId",
externalTaskId,
not_found_message=f"Task not found with externalTaskId: "
f"{externalTaskId}",
only_one=True,
)
task_id = task.id
task_params = await self.task_params_repository.read_by_column(
"taskId",
task_id,
not_found_raise_exception=False,
only_one=False,
)
return task_params