Source code for app.services.game_service

from typing import Any, Optional
from uuid import UUID

from app.core.exceptions import BadRequestError, ConflictError, NotFoundError
from app.engine.all_engine_strategies import all_engine_strategies
from app.model.strategy_definition import StrategyDefinitionStatus
from app.repository.game_params_repository import GameParamsRepository
from app.repository.game_repository import GameRepository
from app.repository.task_params_repository import TaskParamsRepository
from app.repository.task_repository import TaskRepository
from app.repository.user_points_repository import UserPointsRepository
from app.schema.games_params_schema import CreateGameParams, InsertGameParams
from app.schema.games_schema import (BaseGameResult, FindGameResult, GameCreated,
                                     PatchGame, PostCreateGame, ResponsePatchGame)
from app.schema.task_schema import CreateTask
from app.schema.tasks_params_schema import InsertTaskParams
from app.services.base_service import BaseService
from app.services.game_access import get_authorized_game
from app.services.strategy_definition_service import StrategyDefinitionService
from app.services.strategy_service import (StrategyService, is_custom_strategy_id,
                                           parse_custom_strategy_id, resolve_realm_id)
from app.util.are_variables_matching import are_variables_matching
from app.util.is_valid_slug import is_valid_slug


[docs] class GameService(BaseService): """ Service class for managing games. Attributes: game_repository (GameRepository): Repository instance for games. game_params_repository (GameParamsRepository): Repository instance for game parameters. task_repository (TaskRepository): Repository instance for tasks. strategy_service (StrategyService): Service instance for strategies. """
[docs] def __init__( self, game_repository: GameRepository, game_params_repository: GameParamsRepository, task_repository: TaskRepository, user_points_repository: UserPointsRepository, strategy_service: StrategyService, strategy_definition_service: Optional[StrategyDefinitionService] = None, task_params_repository: Optional[TaskParamsRepository] = None, ) -> None: """ Initializes the GameService with the provided repositories and services. ``strategy_definition_service`` is optional so existing call sites and tests that don't exercise ``custom:`` strategyIds keep working; when omitted, attempting to PATCH a game with a ``custom:`` id raises a clear error instead of silently accepting it. ``task_params_repository`` is optional for the same backward-compat reason; it is only needed by :meth:`duplicate_game` to deep-copy each task's params. When omitted, duplication raises a clear error instead of silently dropping params. """ self.game_repository = game_repository self.game_params_repository = game_params_repository self.task_repository = task_repository self.task_params_repository = task_params_repository self.strategy_service = strategy_service self.strategy_definition_service = strategy_definition_service super().__init__(game_repository)
[docs] async def get_by_gameId( self, gameId: UUID, *, api_key: str = None, oauth_user_id: str = None, is_admin: bool = False, enforce_scope: bool = False, ) -> BaseGameResult: """ Retrieves a game by its game ID. Args: gameId (UUID): The game ID. Returns: BaseGameResult: The game details. """ if enforce_scope: response = await get_authorized_game( self.game_repository, gameId, api_key=api_key, oauth_user_id=oauth_user_id, is_admin=is_admin, ) else: response = await self.game_repository.read_by_column( "id", gameId, not_found_raise_exception=True, only_one=True, not_found_message=f"Game not found by gameId: {gameId}", ) params = await self.game_params_repository.read_by_column( "gameId", response.id, not_found_raise_exception=False, only_one=False ) response_dict = response.model_dump() response_dict["params"] = params response = BaseGameResult(**response_dict, gameId=gameId) return response
[docs] async def delete_game_by_id( self, gameId: UUID, *, api_key: str = None, oauth_user_id: str = None, is_admin: bool = False, enforce_scope: bool = False, ) -> BaseGameResult | dict[str, str]: """ Deletes a game by its game ID. Args: gameId (UUID): The game ID. Raises: NotFoundError: If the game is not found. """ if enforce_scope: game = await get_authorized_game( self.game_repository, gameId, api_key=api_key, oauth_user_id=oauth_user_id, is_admin=is_admin, ) else: game = await self.game_repository.read_by_id( gameId, not_found_raise_exception=False ) if not game: raise NotFoundError(detail=f"Game not found by gameId: {gameId}") if await self.game_repository.delete_game_by_id(gameId): response = BaseGameResult( externalGameId=game.externalGameId, strategyId=game.strategyId, platform=game.platform, gameId=gameId, created_at=game.created_at, updated_at=game.updated_at, params=[], message=f"Game with gameId: {gameId} deleted successfully", ) return response return {"message": f"Game with gameId: {gameId} not deleted"}
[docs] async def get_all_games( self, schema, api_key=None, oauth_user_id=None, is_admin: bool = False, ) -> FindGameResult: """ Retrieves all games based on the provided schema. Args: schema: The schema for filtering the games. api_key (str): The API key. Returns: list: A list of all games matching the schema. """ return await self.game_repository.get_all_games( schema, api_key=api_key, oauth_user_id=oauth_user_id, is_admin=is_admin, )
[docs] async def get_by_externalId(self, externalGameId: str) -> Any: """ Retrieves a game by its external game ID. Args: externalGameId (str): The external game ID. Returns: object: The game details. """ return await self.game_repository.read_by_column( "externalGameId", externalGameId )
[docs] async def create( self, schema: PostCreateGame, api_key: str = None, oauth_user_id=None ) -> GameCreated: """ Creates a new game using the provided schema. Args: schema (PostCreateGame): The schema representing the game to be created. api_key (str): The API key. oauth_user_id (str): The OAuth user ID. Returns: GameCreated: The created game details. """ params = schema.params externalGameId = schema.externalGameId externalGameId_exist = await self.game_repository.read_by_column( "externalGameId", externalGameId, not_found_raise_exception=False ) is_valid_externalGameId = is_valid_slug(externalGameId) if not is_valid_externalGameId: raise ConflictError( detail=( f"Invalid externalGameId: {externalGameId}. externalGameId" f" should be a valid slug (Should have only alphanumeric" f" characters and Underscore. Length should be between 3" f" and 60)" ) ) if externalGameId_exist: raise ConflictError( detail={ "message": f"Game already exists with externalGameId: " f"{externalGameId}", "gameId": str(externalGameId_exist.id), } ) created_params = [] default_strategyId = schema.strategyId if default_strategyId is None: default_strategyId = "default" strategies = all_engine_strategies() strategy = next( (strategy for strategy in strategies if strategy.id == default_strategyId), None, ) if not strategy: raise NotFoundError( detail=f"Strategy with id: {default_strategyId} not found" ) if api_key: schema.apiKey_used = api_key if oauth_user_id: schema.oauth_user_id = oauth_user_id game = await self.game_repository.create(schema) if params: del schema.params for param in params: params_dict = param.model_dump() params_dict["gameId"] = str(game.id) if api_key: params_dict["apiKey_used"] = api_key if oauth_user_id: params_dict["oauth_user_id"] = oauth_user_id params_to_insert = InsertGameParams(**params_dict) created_param = await self.game_params_repository.create( params_to_insert ) created_params.append(created_param) response = GameCreated( **game.model_dump(), params=created_params, gameId=game.id, message=f"Game with gameId: {game.id} created successfully", ) return response
[docs] async def duplicate_game( self, gameId: UUID, externalGameId: str, *, api_key: str = None, oauth_user_id: str = None, is_admin: bool = False, enforce_scope: bool = False, ) -> GameCreated: """ Deep-copy a game into a brand new one under ``externalGameId``. Copies the source game's platform, strategy and params, then every task with its own strategy and params. The new game's params are recreated via :meth:`create` so all the creation guards (slug validation, externalGameId uniqueness, strategy existence) run against the copy exactly as they would for a fresh game. Tasks are recreated directly through the repositories - the duplicate just needs the rows, not the elaborate per-task response shaping. Duplicated tasks start in the default ``open`` status: a copy is a fresh task, not a resumption of the original's lifecycle. """ if self.task_params_repository is None: raise BadRequestError( detail=( "Game duplication is unavailable: " "TaskParamsRepository not wired." ) ) if enforce_scope: source = await get_authorized_game( self.game_repository, gameId, api_key=api_key, oauth_user_id=oauth_user_id, is_admin=is_admin, ) else: source = await self.game_repository.read_by_id( gameId, not_found_raise_exception=False ) if not source: raise NotFoundError(detail=f"Game not found by gameId: {gameId}") source_params = await self.game_params_repository.read_by_column( "gameId", source.id, not_found_raise_exception=False, only_one=False ) copied_params = [ CreateGameParams(key=param.key, value=param.value) for param in (source_params or []) ] # ``create`` runs the slug + uniqueness + strategy guards for us and # raises ConflictError if ``externalGameId`` is already taken. new_game_schema = PostCreateGame( externalGameId=externalGameId, platform=source.platform, strategyId=source.strategyId, params=copied_params or None, ) created = await self.create(new_game_schema, api_key, oauth_user_id) new_game_id = created.gameId source_tasks = await self.task_repository.read_by_column( "gameId", source.id, not_found_raise_exception=False, only_one=False ) for task in source_tasks or []: new_task = CreateTask( externalTaskId=task.externalTaskId, gameId=str(new_game_id), strategyId=task.strategyId, apiKey_used=api_key, ) created_task = await self.task_repository.create(new_task) task_params = await self.task_params_repository.read_by_column( "taskId", task.id, not_found_raise_exception=False, only_one=False ) for param in task_params or []: params_to_insert = InsertTaskParams( taskId=str(created_task.id), key=param.key, value=str(param.value), apiKey_used=api_key, ) await self.task_params_repository.create(params_to_insert) created.message = ( f"Game with gameId: {new_game_id} duplicated successfully " f"from gameId: {gameId}" ) return created
async def _validate_strategy_assignment( self, strategy_id: str, *, api_key: Optional[str] = None, oauth_user_id: Optional[str] = None, ) -> None: """ Validate a ``strategyId`` before persisting it onto a Game or Task. Two paths: * Built-in id (no ``custom:`` prefix): must resolve in the in-process registry (legacy behaviour). * ``custom:<uuid>``: must resolve in the DB-backed ``strategydefinition`` table, scoped to the caller's tenant, and must be PUBLISHED. We refuse DRAFT/ARCHIVED to avoid assigning a strategy that the resolver can't execute (DRAFT never runs; ARCHIVED would only run by accident if a prior rollback left dangling references - which the cascade is precisely designed to prevent). Raises ``NotFoundError`` for unknown ids, ``BadRequestError`` for not-yet-published customs. """ if not is_custom_strategy_id(strategy_id): strategies = all_engine_strategies() strategy = next((s for s in strategies if s.id == strategy_id), None) if not strategy: raise NotFoundError(detail=f"Strategy with id: {strategy_id} not found") return if self.strategy_definition_service is None: raise BadRequestError( detail=( "Custom strategy assignment is unavailable: " "StrategyDefinitionService not wired." ) ) realmId = resolve_realm_id(api_key=api_key, oauth_user_id=oauth_user_id) uuid_part = parse_custom_strategy_id(strategy_id) definition = await self.strategy_definition_service.get_strategy( id=uuid_part, realmId=realmId ) if definition.status != StrategyDefinitionStatus.PUBLISHED.value: raise BadRequestError( detail=( f"Only PUBLISHED custom strategies can be assigned. " f"Strategy '{definition.name}' v{definition.version} " f"is {definition.status}." ) )
[docs] async def patch_game_by_externalGameId( self, externalGameId: str, schema: PatchGame ) -> ResponsePatchGame: """ Updates a game by its external game ID using the provided schema. Args: externalGameId (str): The external game ID. schema (PatchGame): The schema representing the updated data. Returns: ResponsePatchGame: The updated game details. """ game = await self.game_repository.read_by_column( "externalGameId", externalGameId, not_found_raise_exception=False ) if not game: raise NotFoundError( detail=f"Game not found by externalGameId: {externalGameId}" ) return await self.patch_game_by_id(game.id, schema)
[docs] async def patch_game_by_id( self, gameId: UUID, schema: PatchGame, *, api_key: str = None, oauth_user_id: str = None, is_admin: bool = False, enforce_scope: bool = False, ) -> ResponsePatchGame: """ Updates a game by its game ID using the provided schema. Args: gameId (UUID): The game ID. schema (PatchGame): The schema representing the updated data. Returns: ResponsePatchGame: The updated game details. """ if enforce_scope: game = await get_authorized_game( self.game_repository, gameId, api_key=api_key, oauth_user_id=oauth_user_id, is_admin=is_admin, ) else: game = await self.game_repository.read_by_id( gameId, not_found_raise_exception=False ) if not game: raise NotFoundError(detail=f"Game not found by gameId: {gameId}") if schema.externalGameId and schema.externalGameId != game.externalGameId: externalGameId_exist = await self.game_repository.read_by_column( "externalGameId", schema.externalGameId, not_found_raise_exception=False ) if externalGameId_exist: raise ConflictError( detail=f"Game already exists with externalGameId: " f"{schema.externalGameId} . Cannot update externalGameId" ) is_matching = are_variables_matching(schema.model_dump(), game.model_dump()) params_schema = schema.model_dump().get("params", None) params_game = game.model_dump().get("params", None) params_is_matching = False if params_schema and params_game: params_is_matching = are_variables_matching(params_schema, params_game) if is_matching and params_is_matching: raise ConflictError( detail=("It is not possible to update the game with the same data") ) if schema.model_dump() == game.model_dump(): raise ConflictError(detail="No difference between schema and game") strategyId = schema.strategyId if strategyId: await self._validate_strategy_assignment( strategyId, api_key=api_key, oauth_user_id=oauth_user_id, ) if not strategyId: strategyId = game.strategyId if not strategyId: strategyId = "default" schema.strategyId = strategyId params = schema.params del schema.params updated_params = [] if params: for param in params: await self.game_params_repository.patch_game_params_by_id( param.id, param ) updated_params.append(param) game = await self.game_repository.patch_game_by_id(gameId, schema) game_dict = game.model_dump() response = ResponsePatchGame( externalGameId=game_dict["externalGameId"], strategyId=strategyId, platform=game_dict["platform"], params=updated_params, gameId=gameId, message=f"Game with gameId: {gameId} updated successfully", ) return response
[docs] async def get_strategy_by_externalGameId( self, externalGameId: str ) -> dict[str, Any]: """ Retrieves the strategy associated with a game by its external game ID. Args: externalGameId (str): The external game ID. Returns: dict: The strategy details. """ game = await self.game_repository.read_by_column( "externalGameId", externalGameId, not_found_raise_exception=True ) if not game: raise NotFoundError( detail=f"Game not found by externalGameId: {externalGameId}" ) return await self.get_strategy_by_gameId(game.id)
[docs] async def get_strategy_by_gameId( self, gameId: UUID, *, api_key: str = None, oauth_user_id: str = None, is_admin: bool = False, enforce_scope: bool = False, ) -> dict[str, Any]: """ Retrieves the strategy associated with a game by its game ID. Args: gameId (UUID): The game ID. Returns: dict: The strategy details. """ if enforce_scope: game = await get_authorized_game( self.game_repository, gameId, api_key=api_key, oauth_user_id=oauth_user_id, is_admin=is_admin, ) else: game = await self.game_repository.read_by_id( gameId, not_found_raise_exception=False ) if not game: raise NotFoundError(detail=f"Game not found by gameId: {gameId}") if not game.strategyId: raise ConflictError( detail=f"Game with gameId: {gameId} does not have a strategyId" ) strategy = self.strategy_service.get_strategy_by_id(game.strategyId) game_params = await self.game_params_repository.read_by_column( "gameId", game.id, not_found_raise_exception=False, only_one=False ) if game_params: for param in game_params: if param.key in strategy["variables"]: try: param.value = int(param.value) except ValueError: try: param.value = float(param.value) except ValueError: pass type_param = type(param.value) type_strategy_variable = type(strategy["variables"][param.key]) if type_param == type_strategy_variable: strategy["variables"][param.key] = param.value strategy["game_params"] = game_params return strategy
[docs] async def get_tasks_by_gameId( self, gameId: UUID, *, api_key: str = None, oauth_user_id: str = None, is_admin: bool = False, enforce_scope: bool = False, ) -> dict[str, Any]: """ Retrieves the tasks associated with a game by its game ID. Args: gameId (UUID): The game ID. Returns: dict: The game details including tasks. """ if enforce_scope: game = await get_authorized_game( self.game_repository, gameId, api_key=api_key, oauth_user_id=oauth_user_id, is_admin=is_admin, ) else: game = await self.game_repository.read_by_id(gameId) if not game: raise NotFoundError(detail=f"Game not found by id: {gameId}") tasks = await self.task_repository.read_by_column( "gameId", gameId, not_found_raise_exception=False, only_one=False ) tasks_list = [] if tasks: for task in tasks: tasks_list.append(task.model_dump()) game_dict = game.model_dump() game_dict["tasks"] = tasks_list return game_dict
[docs] async def get_game_by_external_id( self, externalGameId: str, api_key: str = None, oauth_user_id=None ) -> Any: """ Retrieves a game by its external game ID. Args: externalGameId (str): The external game ID. api_key (str): The API key. oauth_user_id (str): The OAuth user ID. Returns: dict: The game details. """ game = await self.game_repository.read_by_column( "externalGameId", externalGameId, not_found_raise_exception=False ) if not game: raise NotFoundError( detail=f"Game not found by externalGameId: {externalGameId}" ) if api_key: game.apiKey_used = api_key if oauth_user_id: game.oauth_user_id = oauth_user_id return game