Source code for app.services.user_actions_service

from app.core.exceptions import GoneError, NotFoundError
from app.repository.game_repository import GameRepository
from app.repository.task_repository import TaskRepository
from app.repository.user_actions_repository import UserActionsRepository
from app.repository.user_repository import UserRepository
from app.schema.task_schema import (AddActionDidByUserInTask,
                                    ResponseAddActionDidByUserInTask)
from app.schema.user_actions_schema import (CreatedUserActions, CreateUserActions,
                                            CreateUserBodyActions)
from app.services.base_service import BaseService
from app.services.game_access import get_authorized_game


[docs] class UserActionsService(BaseService): """ Service class for managing user points. Attributes: user_points_repository (UserPointsRepository): Repository instance for user points. users_repository (UserRepository): Repository instance for users. game_repository (GameRepository): Repository instance for games. task_repository (TaskRepository): Repository instance for tasks. wallet_repository (WalletRepository): Repository instance for wallets. wallet_transaction_repository (WalletTransactionRepository): Repository instance for wallet transactions. strategy_service (StrategyService): Service instance for strategies. """
[docs] def __init__( self, user_actions_repository: UserActionsRepository, users_repository: UserRepository, game_repository: GameRepository, task_repository: TaskRepository, ) -> None: """ Initializes the UserPointsService with the provided repositories and services. Args: user_actions_repository: The user points repository instance. users_repository: The user repository instance. game_repository: The game repository instance. task_repository: The task repository instance. """ self.user_actions_repository = user_actions_repository self.users_repository = users_repository self.game_repository = game_repository self.task_repository = task_repository super().__init__(user_actions_repository)
[docs] async def user_add_action_in_task( # action is JSON object self, gameId: str, externalTaskId: str, action: AddActionDidByUserInTask, api_key: str = None, *, oauth_user_id: str = None, is_admin: bool = False, enforce_scope: bool = False, ) -> ResponseAddActionDidByUserInTask: """ Add action in task for user. Args: user_id (UUID): The user ID. task_id (UUID): The task ID. action (str): The action. Returns: object: The added action in task for user. Raises: NotFoundError: If the user or task is not found. GoneError: If the task is not active. """ if enforce_scope: await get_authorized_game( self.game_repository, gameId, api_key=api_key, oauth_user_id=oauth_user_id, is_admin=is_admin, ) task = await self.task_repository.read_by_gameId_and_externalTaskId( gameId, externalTaskId, ) if not task: raise NotFoundError(f"Task not found (externalTaskId) : {externalTaskId}") if task.status != "open": raise GoneError("Task is not active") user = await self.users_repository.get_or_create_by_externalUserId( externalUserId=action.externalUserId ) new_action = CreateUserActions( typeAction=action.typeAction, data=action.data, description=action.description, userId=str(user.id), apiKey_used=api_key, ) created_action = await self.user_actions_repository.create(new_action) response = ResponseAddActionDidByUserInTask( **created_action.model_dump(), externalUserId=str(action.externalUserId), message="Action added successfully", ) return response
[docs] async def user_add_action_default( self, externalUserId: str, schema: CreateUserBodyActions, api_key: str = None, ) -> CreatedUserActions: """ Add action for user. Args: externalUserId (str): The external user ID. schema (CreateUserActions): The action schema. api_key (str): The API key. Returns: object: The added action for user. """ user = await self.users_repository.read_by_column( "externalUserId", externalUserId, not_found_raise_exception=False, ) user_created = False if user is None: user = await self.users_repository.create_user_by_externalUserId( externalUserId=externalUserId ) user_created = True new_action = CreateUserActions( **schema.model_dump(), userId=str(user.id), ) created_action = await self.user_actions_repository.create(new_action) response = CreatedUserActions( typeAction=created_action.typeAction, description=created_action.description, userId=str(user.id), is_user_created=user_created, message="Action added successfully", ) return response