Source code for app.services.user_points_service

from collections import Counter
from uuid import UUID

from app.core.config import configs
from app.core.exceptions import (InternalServerError, NotFoundError,
                                 PreconditionFailedError)
from app.repository.game_repository import GameRepository
from app.repository.task_repository import TaskRepository
from app.repository.user_game_config_repository import UserGameConfigRepository
from app.repository.user_points_repository import UserPointsRepository
from app.repository.user_repository import UserRepository
from app.repository.wallet_repository import WalletRepository
from app.repository.wallet_transaction_repository import WalletTransactionRepository
from app.schema.games_schema import ListTasksWithUsers
from app.schema.task_schema import (AssignedPointsToExternalUserId, BaseUserFirstAction,
                                    TasksWithUsers)
from app.schema.user_game_config_schema import CreateUserGameConfig
from app.schema.user_points_schema import (AllPointsByGame, GameDetail,
                                           PointsAssignedToUser,
                                           PointsAssignedToUserDetails,
                                           ResponseGetPointsByGame,
                                           ResponseGetPointsByTask,
                                           ResponsePointsByExternalUserId, TaskDetail,
                                           TaskPointsByGame, UserGamePoints,
                                           UserPointsAssign)
from app.schema.wallet_schema import CreateWallet
from app.schema.wallet_transaction_schema import BaseWalletTransaction
from app.services.base_service import BaseService
from app.services.strategy_service import StrategyService
from app.util.is_valid_slug import is_valid_slug


[docs] class UserPointsService(BaseService): def __init__( self, user_points_repository: UserPointsRepository, users_repository: UserRepository, users_game_config_repository: UserGameConfigRepository, game_repository: GameRepository, task_repository: TaskRepository, wallet_repository: WalletRepository, wallet_transaction_repository: WalletTransactionRepository, ): self.user_points_repository = user_points_repository self.users_repository = users_repository self.users_game_config_repository = users_game_config_repository self.game_repository = game_repository self.task_repository = task_repository self.wallet_repository = wallet_repository self.wallet_transaction_repository = wallet_transaction_repository self.strategy_service = StrategyService() super().__init__(user_points_repository)
[docs] def query_user_points(self, schema): return self.user_points_repository.read_by_options(schema)
[docs] def get_users_by_gameId(self, gameId): game = self.game_repository.read_by_column( "id", gameId, not_found_raise_exception=False ) if not game: raise NotFoundError(detail=f"Game not found by gameId: {game}") tasks = self.task_repository.read_by_column( "gameId", game.id, not_found_raise_exception=False, only_one=False ) if not tasks: raise NotFoundError(detail=f"Tasks not found by gameId: {game.id}") response = [] all_tasks = [] for task in tasks: all_externalUserId = [] points = self.user_points_repository.get_points_and_users_by_taskId( task.id) externalTaskId = task.externalTaskId if points: for point in points: externalUserId = point.externalUserId user = self.users_repository.read_by_column( "externalUserId", externalUserId, not_found_raise_exception=True ) if not user: raise NotFoundError( detail=f"User not found by userId: {point.userId}. Please try again later or contact support" # noqa ) first_user_point = self.user_points_repository.get_first_user_points_in_external_task_id_by_user_id( externalTaskId, externalUserId ) all_externalUserId.append( BaseUserFirstAction( externalUserId=user.externalUserId, created_at=str(user.created_at), firstAction=str(first_user_point.created_at), ) ) all_tasks = {"externalTaskId": externalTaskId, "users": all_externalUserId} response.append(TasksWithUsers(**all_tasks)) return ListTasksWithUsers(gameId=gameId, tasks=response)
[docs] def get_points_by_user_list(self, users_list): response = [] for user in users_list: user_points = self.get_all_points_by_externalUserId(user) response.append(user_points) return response # pass} return True
[docs] def get_points_by_externalUserId(self, externalUserId): user = self.users_repository.read_by_column( "externalUserId", externalUserId, not_found_raise_exception=True ) if not user: raise NotFoundError( detail=f"User not found by externalUserId: {externalUserId}" ) tasks_of_users = self.user_points_repository.get_task_by_externalUserId( externalUserId ) response = [] for task in tasks_of_users: game = self.game_repository.read_by_column( "id", task.gameId, not_found_raise_exception=True ) response.append(self.get_points_by_gameId_with_details(game.id)) new_response = [] for game in response: for task in game.task: for point in task.points: if point.externalUserId == externalUserId: new_response.append( AllPointsByGame( externalGameId=game.externalGameId, created_at=game.created_at, task=[ TaskPointsByGame( externalTaskId=task.externalTaskId, points=[ PointsAssignedToUser( externalUserId=point.externalUserId, points=point.points, timesAwarded=point.timesAwarded, pointsData=point.pointsData, ) ], ) ], ) ) return new_response
[docs] def get_points_by_gameId(self, gameId): game = self.game_repository.read_by_column( "id", gameId, not_found_message=f"Game with gameId: {gameId} not found" ) tasks = self.task_repository.read_by_column( "gameId", game.id, not_found_raise_exception=False, only_one=False ) if not tasks: raise NotFoundError(detail=f"Tasks not found by gameId: {game.id}") game_points = [] for task in tasks: user_points = [] points = self.user_points_repository.get_points_and_users_by_taskId( task.id) if points: for point in points: points_of_user = PointsAssignedToUser( externalUserId=point.externalUserId, points=point.points, timesAwarded=point.timesAwarded, ) user_points.append(points_of_user) task_points = TaskPointsByGame( externalTaskId=task.externalTaskId, points=user_points ) game_points.append(task_points) response = AllPointsByGame( externalGameId=game.externalGameId, created_at=str(game.created_at), task=game_points, ) return response
[docs] def get_points_by_gameId_with_details(self, gameId: UUID): game = self.game_repository.read_by_column( "id", gameId, not_found_message=f"Game with gameId: {gameId} not found" ) tasks = self.task_repository.read_by_column( "gameId", game.id, not_found_raise_exception=False, only_one=False ) if not tasks: raise NotFoundError(detail=f"Tasks not found by gameId: {game.id}") game_points = [] for task in tasks: user_points = [] points = self.user_points_repository.get_points_and_users_by_taskId( task.id) if points: for point in points: points_of_user = PointsAssignedToUserDetails( externalUserId=point.externalUserId, points=point.points, timesAwarded=point.timesAwarded, pointsData=point.pointsData, ) user_points.append(points_of_user) task_points = TaskPointsByGame( externalTaskId=task.externalTaskId, points=user_points ) game_points.append(task_points) response = AllPointsByGame( externalGameId=game.externalGameId, created_at=str(game.created_at), task=game_points, ) return response
[docs] def get_points_of_user_in_game(self, gameId, externalUserId): game = self.game_repository.read_by_column( "id", gameId, not_found_raise_exception=False ) if not game: raise NotFoundError(detail=f"Game not found by gameId: {gameId}") user = self.users_repository.read_by_column( "externalUserId", externalUserId, not_found_raise_exception=False ) if not user: raise NotFoundError( detail=f"User not found by externalUserId: {externalUserId}" ) tasks = self.task_repository.read_by_column( "gameId", game.id, not_found_raise_exception=False, only_one=False ) if not tasks: raise NotFoundError(detail=f"Tasks not found by gameId: {game.id}") response = [] for task in tasks: points = self.user_points_repository.get_points_and_users_by_taskId( task.id) if points: for point in points: if point.externalUserId == externalUserId: response.append( PointsAssignedToUser( externalUserId=point.externalUserId, points=point.points, timesAwarded=point.timesAwarded, ) ) return response
[docs] async def assign_points_to_user( self, gameId, externalTaskId, schema, isSimulated: bool = False, api_key: str = None, ): """ Assign points to a user. Args: gameId (UUID): The game ID. externalTaskId (str): The external task ID. schema (PostAssignPointsToUser): The schema with the data to assign points. api_key (str): The API key used. Returns: AssignedPointsToExternalUserId: The response with the points assigned. """ externalUserId = schema.externalUserId is_a_created_user = False game = self.game_repository.read_by_column( column="id", value=gameId, not_found_message=(f"Game with gameId {gameId} not found"), only_one=True, ) externalGameId = game.externalGameId task = self.task_repository.read_by_gameId_and_externalTaskId( game.id, externalTaskId ) if not task: raise NotFoundError( f"Task not found with externalTaskId: {externalTaskId}") strategyId = task.strategyId strategy = self.strategy_service.get_strategy_by_id(strategyId) if not strategy: raise NotFoundError( f"Strategy not found with id: {strategyId} for task with externalTaskId: {externalTaskId}" # noqa ) user = self.users_repository.read_by_column( "externalUserId", externalUserId, not_found_raise_exception=False ) if not user: is_valid_externalUserId = is_valid_slug(externalUserId) if not is_valid_externalUserId: raise PreconditionFailedError( detail=( f"Invalid externalUserId: {externalUserId}. externalUserId should be a valid (Should have only alphanumeric characters and Underscore . Length should be between 3 and 50)" # noqa ) ) user = self.users_repository.create_user_by_externalUserId( externalUserId=externalUserId ) is_a_created_user = True strategy_instance = self.strategy_service.get_Class_by_id(strategyId) data_to_add = schema.data try: if data_to_add is None: data_to_add = {} print('-1') data_to_add["externalGameId"] = externalGameId print('-2') data_to_add["externalTaskId"] = externalTaskId print('-3') result_calculated_points = await strategy_instance.calculate_points( externalGameId=externalGameId, externalTaskId=externalTaskId, externalUserId=externalUserId, data=data_to_add, ) print('-4') points, case_name, callbackData = ( result_calculated_points + (None,))[:3] print('-5') print( 'points', points, ' | ', 'case_name', ' | ', 'callbackData', callbackData ) if callbackData is not None: print('-6') data_to_add["callbackData"] = callbackData print('-7') except Exception as e: print("----------------- ERROR -----------------") print(e) print("----------------- ERROR -----------------") raise InternalServerError( detail=( f"Error in calculate points for task with externalTaskId: {externalTaskId} and user with externalUserId: {externalUserId}. Please try again later or contact support" # noqa ) ) if points == -1: raise PreconditionFailedError(detail=(case_name)) print( 'points', points, ' | ', 'case_name', ' | ', 'data_to_add', data_to_add ) if points is None or not case_name: raise InternalServerError( detail=( f"Points not calculated for task with externalTaskId: {externalTaskId} and user with externalUserId: {externalUserId}. Beacuse the strategy don't have condition to calculate it or the strategy don't have a case name" # noqa ) ) user_points_schema = UserPointsAssign( userId=str(user.id), taskId=str(task.id), points=points, caseName=case_name, data=data_to_add, description="Points assigned by GAME", apiKey_used=api_key, ) user_points = await self.user_points_repository.create(user_points_schema) wallet = self.wallet_repository.read_by_column( "userId", user.id, not_found_raise_exception=False ) if wallet: wallet.pointsBalance += points self.wallet_repository.update(wallet.id, wallet) if not wallet: new_wallet = CreateWallet( userId=str(user.id), points=points, coinsBalance=0, pointsBalance=points, conversionRate=configs.DEFAULT_CONVERTION_RATE_POINTS_TO_COIN, apiKey_used=api_key, ) wallet = await self.wallet_repository.create(new_wallet) wallet_transaction = BaseWalletTransaction( transactionType="AssignPoints", points=points, coins=0, data=data_to_add, appliedConversionRate=0, walletId=str(wallet.id), apiKey_used=api_key, ) wallet_transaction_repository = await self.wallet_transaction_repository.create( wallet_transaction ) if not wallet_transaction_repository: raise InternalServerError( detail=( f"Wallet transaction not created for user with externalUserId: {externalUserId} and task with externalTaskId: {externalTaskId}. Please try again later or contact support" # noqa ) ) response = AssignedPointsToExternalUserId( points=points, externalUserId=externalUserId, isACreatedUser=is_a_created_user, gameId=gameId, externalTaskId=externalTaskId, caseName=case_name, created_at=str(user_points.created_at), ) return response
[docs] async def get_points_simulated_of_user_in_game( self, gameId, externalUserId, oauth_user_id: str = None, assign_control_group: bool = False, ): """ Simulates the assignment of points for a user without persisting the changes. Args: gameId (UUID): The ID of the game. externalTaskId (str): The external task ID. schema: The schema containing user and action data. oauth_user_id (str): The OAuth user ID. Returns: dict: Simulation result with calculated points and case name. """ game = self.game_repository.read_by_column( column="id", value=gameId, not_found_message=(f"Game with gameId {gameId} not found"), only_one=True, ) all_tasks = self.task_repository.read_by_column( "gameId", game.id, not_found_raise_exception=False, only_one=False ) if not all_tasks: raise NotFoundError(detail=f"Tasks not found by gameId: {game.id}") # First: Check if all strategies exist strategy = None for task in all_tasks: strategyId = task.strategyId strategy = self.strategy_service.get_strategy_by_id(strategyId) if not strategy: raise NotFoundError( f"One of the strategies not found with id: {strategyId} for task with externalTaskId: {task.externalTaskId}" # noqa ) user = self.users_repository.read_by_column( "externalUserId", externalUserId, not_found_raise_exception=False ) if not user: is_valid_externalUserId = is_valid_slug(externalUserId) if not is_valid_externalUserId: raise PreconditionFailedError( detail=( f"Invalid externalUserId: {externalUserId}. externalUserId should be a valid (Should have only alphanumeric characters and Underscore . Length should be between 3 and 50)" # noqa ) ) user = self.users_repository.create_user_by_externalUserId( externalUserId=externalUserId, oauth_user_id=oauth_user_id, ) userGroup = None if assign_control_group: user_config = self.users_game_config_repository.read_by_columns( {"userId": user.id, "gameId": game.id}, only_one=True, not_found_raise_exception=False, ) if user_config: userGroup = user_config.experimentGroup if not userGroup: group_control = ["random_range", "average_score", "dynamic_calculation"] all_users = self.users_game_config_repository.get_all_users_by_gameId( game.id ) group_counts = Counter( user_config.experimentGroup for user_config in all_users ) min_group = min( group_control, key=lambda g: group_counts.get(g, 0)) userGroup = min_group new_user_config = CreateUserGameConfig( userId=str(user.id), gameId=str(game.id), experimentGroup=userGroup, configData={}, ) user_config = await self.users_game_config_repository.create( new_user_config ) grouped_by_strategyId = {} for task in all_tasks: strategy_id_applied = task.strategyId if strategy_id_applied not in grouped_by_strategyId: grouped_by_strategyId[strategy_id_applied] = [] grouped_by_strategyId[strategy_id_applied].append(task) response = [] user_last_task = self.user_points_repository.get_last_task_by_userId( user.id) externalUserId = user.externalUserId for strategy_id_applied, tasks in grouped_by_strategyId.items(): strategy_instance = self.strategy_service.get_Class_by_id( strategy_id_applied ) # check if strategy_instance have simulate_strategy if not hasattr(strategy_instance, "simulate_strategy"): raise NotFoundError( f"Strategy with id: {strategy_id_applied} don't have simulate_strategy method" ) for task in tasks: data_to_simulate = { "task": task, "allTasks": tasks, "externalUserId": externalUserId, } task_simulation = strategy_instance.simulate_strategy( data_to_simulate=data_to_simulate, userGroup=userGroup, user_last_task=user_last_task ) response.append(task_simulation) externalGameId = game.externalGameId return response, externalGameId
[docs] def get_users_points_by_externalGameId(self, externalGameId): game = self.game_repository.read_by_column( column="externalGameId", value=externalGameId, not_found_message=( f"Game with externalGameId {externalGameId} not found"), ) tasks = self.task_repository.read_by_column( "gameId", game.id, only_one=False, not_found_raise_exception=False ) if tasks: tasks = [task.id for task in tasks] if not tasks: raise NotFoundError( f"The game with externalGameId {externalGameId} has no tasks" ) response = [] for task in tasks: points = self.user_points_repository.get_points_and_users_by_taskId( task) response_by_task = [] if points: for point in points: response_by_task.append( ResponseGetPointsByTask( externalUserId=point.externalUserId, points=point.points ) ) if response_by_task: response.append( ResponseGetPointsByGame( externalTaskId=point.externalTaskId, points=response_by_task ) ) return response
[docs] def get_users_points_by_externalTaskId(self, externalTaskId): task = self.task_repository.read_by_column( column="externalTaskId", value=externalTaskId, not_found_message=( f"Task with externalTaskId {externalTaskId} not found"), ) points_by_task = self.user_points_repository.get_points_and_users_by_taskId( task.id ) cleaned_points_by_task = [] if points_by_task: for point in points_by_task: cleaned_points_by_task.append( ResponseGetPointsByTask( externalUserId=point.externalUserId, points=point.points ) ) return cleaned_points_by_task
[docs] def get_users_points_by_externalTaskId_and_externalUserId( self, externalTaskId, externalUserId ): task = self.task_repository.read_by_column( column="externalTaskId", value=externalTaskId, not_found_message=( f"Task with externalTaskId {externalTaskId} not found"), ) user = self.users_repository.read_by_column( column="externalUserId", value=externalUserId, not_found_message=( f"User with externalUserId {externalUserId} not found"), ) points = self.user_points_repository.read_by_columns( {"taskId": task.id, "userId": user.id} ) return points
[docs] def get_all_points_by_externalUserId(self, externalUserId): user_data = self.users_repository.read_by_column( column="externalUserId", value=externalUserId, not_found_message=( f"User with externalUserId {externalUserId} not found"), not_found_raise_exception=False, ) if not user_data: return UserGamePoints( externalUserId=externalUserId, points=0, timesAwarded=0, games=[], userExists=False, ) tasks = self.user_points_repository.get_task_by_externalUserId( externalUserId) response = [] for task in tasks: game = self.game_repository.read_by_column( "id", task.gameId, not_found_raise_exception=True ) response.append(self.get_points_by_gameId_with_details(game.id)) for game in response: # UserGamePoints points = 0 times_awarded = 0 games = [] for task in game.task: # GameDetail task_points = 0 task_times_awarded = 0 tasks = [] for point in task.points: if point.externalUserId == externalUserId: task_points += point.points task_times_awarded += point.timesAwarded if point.points > 0: tasks.append( TaskDetail( externalTaskId=task.externalTaskId, pointsData=point.pointsData, ) ) points += task_points times_awarded += task_times_awarded if points > 0 and len(tasks) > 0: games.append( GameDetail( externalGameId=game.externalGameId, points=task_points, timesAwarded=task_times_awarded, tasks=tasks, ) ) return UserGamePoints( externalUserId=externalUserId, points=points, timesAwarded=times_awarded, games=games, ) return None
[docs] def get_points_of_user(self, externalUserId): user = self.users_repository.read_by_column( column="externalUserId", value=externalUserId, not_found_message=( f"User with externalUserId {externalUserId} not found"), ) points = self.user_points_repository.get_task_and_sum_points_by_userId( user.id) total_points = 0 for point in points: total_points += point.points response = ResponsePointsByExternalUserId( externalUserId=externalUserId, points=total_points, points_by_task=points, # noqa ) return response
[docs] def count_measurements_by_external_task_id(self, external_task_id): return self.user_points_repository.count_measurements_by_external_task_id( external_task_id ) # noqa
[docs] def get_user_task_measurements_count(self, externalTaskId, externalUserId): return self.user_points_repository.get_user_task_measurements_count( externalTaskId, externalUserId )
[docs] def get_user_task_measurements_count_the_last_seconds( self, externalTaskId, externalUserId, seconds ): return self.user_points_repository.get_user_task_measurements_count_the_last_seconds( externalTaskId, externalUserId, seconds )
[docs] def get_avg_time_between_tasks_by_user_and_game_task( self, externalGameId, externalTaskId, externalUserId ): return self.user_points_repository.get_avg_time_between_tasks_by_user_and_game_task( # noqa externalGameId, externalTaskId, externalUserId )
[docs] def get_avg_time_between_tasks_for_all_users(self, externalGameId, externalTaskId): return self.user_points_repository.get_avg_time_between_tasks_for_all_users( # noqa externalGameId, externalTaskId )
[docs] def get_last_window_time_diff(self, externalTaskId, externalUserId): return self.user_points_repository.get_last_window_time_diff( externalTaskId, externalUserId )
[docs] def get_new_last_window_time_diff( self, externalTaskId, externalUserId, externalGameId ): return self.user_points_repository.get_new_last_window_time_diff( externalTaskId, externalUserId, externalGameId )
[docs] def get_user_task_measurements(self, externalTaskId, externalUserId): return self.user_points_repository.get_user_task_measurements( externalTaskId, externalUserId )
# get count personal points in the game, should have "minutes" field in the data
[docs] def count_personal_records_by_external_game_id( self, external_game_id, externalUserId ): """ Count the number of records for a user in a game. Args: external_game_id (str): The external game id. externalUserId (str): The external user id. Returns: int: The number of records. """ return self.user_points_repository.count_personal_records_by_external_game_id( external_game_id, externalUserId )
[docs] def user_has_record_before_in_externalTaskId_last_min( self, externalTaskId, externalUserId, minutes ): """ Check if a user has a record before in the task in the last minute. Args: externalTaskId (str): The external task id. externalUserId (str): The external user id. minutes (int): The number of minutes. Returns: bool: True if the user has a record before in the task, False otherwise """ return self.user_points_repository.user_has_record_before_in_externalTaskId_last_min( externalTaskId, externalUserId, minutes )
[docs] def get_global_avg_by_external_game_id(self, external_game_id): """ Get the global average time rewarded. It does not take into account the time with 0 value (minutes) Args: external_game_id (str): The external game id. Returns: float: The global average. """ return self.user_points_repository.get_global_avg_by_external_game_id( external_game_id )
[docs] def get_personal_avg_by_external_game_id(self, external_game_id, externalUserId): """ Get the personal average time rewarded. It does not take into account the time with 0 value (minutes) Args: external_game_id (str): The external game id. externalUserId (str): The external user id. Returns: float: The personal average. """ return self.user_points_repository.get_personal_avg_by_external_game_id( external_game_id, externalUserId )
[docs] def get_points_of_simulated_task(self, externalTaskId, simulationHash): return self.user_points_repository.get_points_of_simulated_task( externalTaskId, simulationHash )
[docs] def get_all_point_of_tasks_list(self, list_ids_tasks, withData=False): return self.user_points_repository.get_all_point_of_tasks_list( list_ids_tasks, withData )