from app.core.exceptions import GoneError, NotFoundError
from app.repository.game_repository import GameRepository
from app.repository.task_repository import TaskRepository
from app.repository.user_actions_repository import UserActionsRepository
from app.repository.user_repository import UserRepository
from app.schema.task_schema import (AddActionDidByUserInTask,
ResponseAddActionDidByUserInTask)
from app.schema.user_actions_schema import (CreatedUserActions, CreateUserActions,
CreateUserBodyActions)
from app.services.base_service import BaseService
from app.services.game_access import get_authorized_game
[docs]
class UserActionsService(BaseService):
"""
Service class for managing user points.
Attributes:
user_points_repository (UserPointsRepository): Repository instance for
user points.
users_repository (UserRepository): Repository instance for users.
game_repository (GameRepository): Repository instance for games.
task_repository (TaskRepository): Repository instance for tasks.
wallet_repository (WalletRepository): Repository instance for wallets.
wallet_transaction_repository (WalletTransactionRepository):
Repository instance for wallet transactions.
strategy_service (StrategyService): Service instance for strategies.
"""
[docs]
def __init__(
self,
user_actions_repository: UserActionsRepository,
users_repository: UserRepository,
game_repository: GameRepository,
task_repository: TaskRepository,
) -> None:
"""
Initializes the UserPointsService with the provided repositories and
services.
Args:
user_actions_repository: The user points repository instance.
users_repository: The user repository instance.
game_repository: The game repository instance.
task_repository: The task repository instance.
"""
self.user_actions_repository = user_actions_repository
self.users_repository = users_repository
self.game_repository = game_repository
self.task_repository = task_repository
super().__init__(user_actions_repository)
[docs]
async def user_add_action_in_task(
# action is JSON object
self,
gameId: str,
externalTaskId: str,
action: AddActionDidByUserInTask,
api_key: str = None,
*,
oauth_user_id: str = None,
is_admin: bool = False,
enforce_scope: bool = False,
) -> ResponseAddActionDidByUserInTask:
"""
Add action in task for user.
Args:
user_id (UUID): The user ID.
task_id (UUID): The task ID.
action (str): The action.
Returns:
object: The added action in task for user.
Raises:
NotFoundError: If the user or task is not found.
GoneError: If the task is not active.
"""
if enforce_scope:
await get_authorized_game(
self.game_repository,
gameId,
api_key=api_key,
oauth_user_id=oauth_user_id,
is_admin=is_admin,
)
task = await self.task_repository.read_by_gameId_and_externalTaskId(
gameId,
externalTaskId,
)
if not task:
raise NotFoundError(f"Task not found (externalTaskId) : {externalTaskId}")
if task.status != "open":
raise GoneError("Task is not active")
user = await self.users_repository.get_or_create_by_externalUserId(
externalUserId=action.externalUserId
)
new_action = CreateUserActions(
typeAction=action.typeAction,
data=action.data,
description=action.description,
userId=str(user.id),
apiKey_used=api_key,
)
created_action = await self.user_actions_repository.create(new_action)
response = ResponseAddActionDidByUserInTask(
**created_action.model_dump(),
externalUserId=str(action.externalUserId),
message="Action added successfully",
)
return response
[docs]
async def user_add_action_default(
self,
externalUserId: str,
schema: CreateUserBodyActions,
api_key: str = None,
) -> CreatedUserActions:
"""
Add action for user.
Args:
externalUserId (str): The external user ID.
schema (CreateUserActions): The action schema.
api_key (str): The API key.
Returns:
object: The added action for user.
"""
user = await self.users_repository.read_by_column(
"externalUserId",
externalUserId,
not_found_raise_exception=False,
)
user_created = False
if user is None:
user = await self.users_repository.create_user_by_externalUserId(
externalUserId=externalUserId
)
user_created = True
new_action = CreateUserActions(
**schema.model_dump(),
userId=str(user.id),
)
created_action = await self.user_actions_repository.create(new_action)
response = CreatedUserActions(
typeAction=created_action.typeAction,
description=created_action.description,
userId=str(user.id),
is_user_created=user_created,
message="Action added successfully",
)
return response