Source code for app.repository.task_repository
from contextlib import AbstractContextManager
from typing import Callable
from sqlalchemy.orm import Session, joinedload
from app.core.config import configs
from app.core.exceptions import NotFoundError
from app.model.tasks import Tasks
from app.repository.base_repository import BaseRepository
from app.util.query_builder import dict_to_sqlalchemy_filter_options
[docs]
class TaskRepository(BaseRepository):
"""
Repository class for tasks.
Attributes:
session_factory (Callable[..., AbstractContextManager[Session]]):
Factory for creating SQLAlchemy sessions.
model: SQLAlchemy model class for tasks.
"""
def __init__(
self,
session_factory: Callable[..., AbstractContextManager[Session]],
model=Tasks,
) -> None:
"""
Initializes the TaskRepository with the provided session factory and
model.
Args:
session_factory (Callable[..., AbstractContextManager[Session]]):
The session factory.
model: The SQLAlchemy model class for tasks.
"""
super().__init__(session_factory, model)
[docs]
def read_by_gameId(self, schema, eager=False):
"""
Reads tasks by game ID based on the provided schema.
Args:
schema: The schema containing query options.
eager (bool): Whether to use eager loading.
Returns:
dict: Query results and search options.
"""
with self.session_factory() as session:
schema_as_dict = schema.dict(exclude_none=True)
ordering = schema_as_dict.get("ordering", configs.ORDERING)
order_query = (
getattr(self.model, ordering[1:]).desc()
if ordering.startswith("-")
else getattr(self.model, ordering).asc()
)
page = schema_as_dict.get("page", configs.PAGE)
page_size = schema_as_dict.get("page_size", configs.PAGE_SIZE)
filter_options = dict_to_sqlalchemy_filter_options(
self.model, schema.dict(exclude_none=True)
)
query = session.query(self.model)
if eager:
for eager in getattr(self.model, "eagers", []):
query = query.options(joinedload(getattr(self.model, eager)))
filtered_query = query.filter(filter_options)
query = filtered_query.order_by(order_query)
if page_size == "all":
query = query.all()
else:
query = query.limit(page_size).offset((page - 1) * page_size).all()
total_count = filtered_query.count()
return {
"items": query,
"search_options": {
"page": page,
"page_size": page_size,
"ordering": ordering,
"total_count": total_count,
},
}
[docs]
def read_by_gameId_and_externalTaskId(self, gameId: int, externalTaskId: str):
"""
Reads a task by game ID and external task ID.
Args:
gameId (int): The game ID.
externalTaskId (str): The external task ID.
Returns:
object: The task if found, otherwise None.
"""
with self.session_factory() as session:
query = (
session.query(self.model)
.filter(
self.model.gameId == gameId,
self.model.externalTaskId == externalTaskId,
)
.first()
)
return query
[docs]
def get_points_and_users_by_taskId(self, taskId):
"""
Retrieves points and users associated with a task ID.
Args:
taskId (int): The task ID.
Returns:
object: The task if found, otherwise raises NotFoundError.
"""
with self.session_factory() as session:
query = session.query(self.model).filter(self.model.id == taskId).first()
if not query:
raise NotFoundError(detail=f"Task not found by id : {taskId}")
return query