Source code for app.services.user_points.assignment

"""Points assignment (write path).

Resolves the strategy for a task, runs scoring (or takes the points directly)
and delegates the atomic write to
:meth:`PointsPersistenceMixin._persist_points_wallet_and_transaction`.
"""

import logging

from app.core.exceptions import (InternalServerError, NotFoundError,
                                 PreconditionFailedError)
from app.schema.task_schema import AssignedPointsToExternalUserId
from app.services.game_access import get_authorized_game
from app.services.strategy_service import resolve_realm_id
from app.services.user_points.persistence import PointsPersistenceMixin
from app.util.is_valid_slug import is_valid_slug

logger = logging.getLogger(__name__)


[docs] class PointsAssignmentMixin(PointsPersistenceMixin):
[docs] async def assign_points_to_user_directly( self, gameId, externalTaskId, schema, api_key: str = None, *, oauth_user_id: str = None, is_admin: bool = False, enforce_scope: bool = False, ) -> AssignedPointsToExternalUserId: """ Assign points to a user directly (non-simulated), using a predefined strategy. Args: gameId (UUID): ID of the game. externalTaskId (str): External task identifier. schema (PostAssignPointsToUser): Input data schema. api_key (str, optional): API key used to register the operation. Returns: AssignedPointsToExternalUserId: Information about the assigned points. """ externalUserId = schema.externalUserId is_a_created_user = False if enforce_scope: game = await get_authorized_game( self.game_repository, gameId, api_key=api_key, oauth_user_id=oauth_user_id, is_admin=is_admin, ) else: game = await self.game_repository.read_by_column( column="id", value=gameId, not_found_message=f"Game with gameId {gameId} not found", only_one=True, ) externalGameId = game.externalGameId task = await self.task_repository.read_by_gameId_and_externalTaskId( game.id, externalTaskId ) if not task: raise NotFoundError(f"Task not found with externalTaskId: {externalTaskId}") strategyId = task.strategyId # Same async resolver used by ``assign_points_to_user``. This # endpoint doesn't actually invoke ``calculate_points`` - it just # verifies the strategy exists for the realm - but using the # same resolver keeps custom and built-in ids consistent. realm_id = resolve_realm_id(api_key=api_key, oauth_user_id=oauth_user_id) strategy_instance = await self.strategy_service.get_strategy_instance( strategyId, realmId=realm_id ) if not strategy_instance: raise NotFoundError(f"Strategy not found with id: {strategyId}") user = await self.users_repository.read_by_column( "externalUserId", externalUserId, not_found_raise_exception=False ) if not user: if not is_valid_slug(externalUserId): raise PreconditionFailedError( detail=f"Invalid externalUserId: {externalUserId}. Must be alphanumeric/underscore and 3–50 characters." ) user = await self.users_repository.create_user_by_externalUserId( externalUserId ) is_a_created_user = True data_to_add = schema.data or {} data_to_add["externalGameId"] = externalGameId data_to_add["externalTaskId"] = externalTaskId points = self._extract_points(data_to_add) if points is None: raise PreconditionFailedError( detail="Points cannot be None. Please provide a valid value." ) idempotency_key = self._extract_idempotency_key(data_to_add) direct_case_name = "External_points_assigned" user_points, _, _ = await self._persist_points_wallet_and_transaction( user_id=user.id, task_id=task.id, points=points, case_name=direct_case_name, data_to_add=data_to_add, description="Points assigned directly to GAME", api_key=api_key, external_user_id=externalUserId, external_task_id=externalTaskId, idempotency_key=idempotency_key, ) return AssignedPointsToExternalUserId( points=points, externalUserId=externalUserId, isACreatedUser=is_a_created_user, gameId=gameId, externalTaskId=externalTaskId, caseName=direct_case_name, created_at=str(user_points.created_at), )
[docs] async def assign_points_to_user( self, gameId, externalTaskId, schema, isSimulated: bool = False, api_key: str = None, *, oauth_user_id: str = None, is_admin: bool = False, enforce_scope: bool = False, ) -> AssignedPointsToExternalUserId: """ Assign points to a user. Args: gameId (UUID): The game ID. externalTaskId (str): The external task ID. schema (PostAssignPointsToUser): The schema with the data to assign points. api_key (str): The API key used. Returns: AssignedPointsToExternalUserId: The response with the points assigned. """ externalUserId = schema.externalUserId is_a_created_user = False if enforce_scope: game = await get_authorized_game( self.game_repository, gameId, api_key=api_key, oauth_user_id=oauth_user_id, is_admin=is_admin, ) else: game = await self.game_repository.read_by_column( column="id", value=gameId, not_found_message=(f"Game with gameId {gameId} not found"), only_one=True, ) externalGameId = game.externalGameId task = await self.task_repository.read_by_gameId_and_externalTaskId( game.id, externalTaskId ) if not task: raise NotFoundError(f"Task not found with externalTaskId: {externalTaskId}") strategyId = task.strategyId # A single async resolver handles both built-in # registry ids and ``custom:<uuid>`` DSL strategies (scoped by # realmId so a tenant can't invoke another tenant's strategy). # The resolver raises NotFoundError itself when the strategy is # missing, so the previous separate ``get_strategy_by_id`` guard # is redundant and has been removed. realm_id = resolve_realm_id(api_key=api_key, oauth_user_id=oauth_user_id) user = await self.users_repository.read_by_column( "externalUserId", externalUserId, not_found_raise_exception=False ) if not user: is_valid_externalUserId = is_valid_slug(externalUserId) if not is_valid_externalUserId: raise PreconditionFailedError( detail=( f"Invalid externalUserId: {externalUserId}. externalUserId should be a valid (Should have only alphanumeric characters and Underscore . Length should be between 3 and 50)" # noqa ) ) user = await self.users_repository.create_user_by_externalUserId( externalUserId=externalUserId ) is_a_created_user = True strategy_instance = await self.strategy_service.get_strategy_instance( strategyId, realmId=realm_id ) data_to_add = schema.data try: if data_to_add is None: data_to_add = {} result_calculated_points = await strategy_instance.calculate_points( externalGameId=externalGameId, externalTaskId=externalTaskId, externalUserId=externalUserId, data=data_to_add, ) points, case_name, callbackData = (result_calculated_points + (None,))[:3] logger.debug( "Calculated points result: points=%s case_name=%s callbackData_present=%s", points, case_name, callbackData is not None, ) if callbackData is not None: data_to_add["callbackData"] = callbackData except (KeyError, TypeError, ValueError) as exc: logger.warning( "Invalid scoring payload for externalTaskId=%s externalUserId=%s: %s", externalTaskId, externalUserId, str(exc), exc_info=True, ) raise PreconditionFailedError( detail=( "Invalid scoring payload for strategy execution. " "Verify required fields and data types." ) ) except Exception: logger.exception( "Error calculating points for externalTaskId=%s externalUserId=%s", externalTaskId, externalUserId, ) raise InternalServerError( detail=( f"Error in calculate points for task with externalTaskId: {externalTaskId} and user with externalUserId: {externalUserId}. Please try again later or contact support" # noqa ) ) if points == -1: raise PreconditionFailedError(detail=(case_name)) if points is None: raise InternalServerError( detail=( f"Points not calculated for task with externalTaskId: {externalTaskId} and user with externalUserId: {externalUserId}. Beacuse the strategy don't have condition to calculate it or the strategy don't have a case name" # noqa ) ) if not case_name: case_name = getattr(schema, "caseName", None) if not case_name: raise InternalServerError( detail=( f"Case name not resolved for task with externalTaskId: {externalTaskId} and user with externalUserId: {externalUserId}" # noqa ) ) idempotency_key = self._extract_idempotency_key(data_to_add) user_points, _, _ = await self._persist_points_wallet_and_transaction( user_id=user.id, task_id=task.id, points=points, case_name=case_name, data_to_add=data_to_add, description="Points assigned by GAME", api_key=api_key, external_user_id=externalUserId, external_task_id=externalTaskId, idempotency_key=idempotency_key, ) response = AssignedPointsToExternalUserId( points=points, externalUserId=externalUserId, isACreatedUser=is_a_created_user, gameId=gameId, externalTaskId=externalTaskId, caseName=case_name, created_at=str(user_points.created_at), ) return response