app.services.export_service module¶
ExportService: builds streamed CSV/XLSX/JSON dumps for the 4 admin-facing datasets exposed by /v1/exports.
- Design notes:
Queries use SQLAlchemy 2.0’s async streaming (
session.stream) with a hardLIMITso memory stays bounded even on the 100k-row cap.Row-level transformation lives in private
_serialize_*helpers so the formatter layer (CSV/JSON/XLSX) only sees flat dicts.The XLSX path imports
openpyxllazily; if the dependency is not installed the endpoint surfaces a clear 503 instead of a 500.
- class app.services.export_service.ExportService(export_audit_log_repository)[source]¶
Bases:
BaseServiceStreams large datasets out of Postgres without buffering them in memory.
Public surface is one async generator per dataset (
iter_usersetc.), plus audit-log helpers consumed by the endpoint layer.- Parameters:
export_audit_log_repository (ExportAuditLogRepository)
- async audit_start(dataset_type, export_format, filters, *, api_key=None, oauth_user_id=None)[source]¶
Persist the audit row at the moment the export request is accepted. Written before streaming starts so interrupted downloads remain recorded with status=”started”.
- Parameters:
dataset_type (str)
export_format (str)
filters (ExportFilters)
api_key (str | None)
oauth_user_id (str | None)
- Return type:
- async audit_finish(audit_id, *, row_count, status)[source]¶
Mark an export audit row finished with its final row count and status.
- Parameters:
audit_id (str) – Id of the audit row started by
audit_start.row_count (int) – Number of rows actually streamed.
status (str) – Terminal status (e.g. completed/failed).
- Return type:
None
- async list_history(*, limit=50, oauth_user_id=None)[source]¶
Return recent audit rows mapped onto the public entry schema, hiding internal fields (raw apiKey/oauth_user_id are dropped - only the
requestedBydisplay string survives).- Parameters:
limit (int)
oauth_user_id (str | None)
- Return type:
List[ExportAuditLogEntry]
- async iter_users(filters)[source]¶
Stream user rows as plain dicts for export.
- Parameters:
filters (ExportFilters) – Date range and row limit (game/task filters do not apply to users).
- Yields:
Dict[str, Any] – One serializable user record per row.
- Return type:
AsyncIterator[Dict[str, Any]]
- async iter_user_points(filters)[source]¶
Stream user-point rows (joined with user/task/game) as dicts.
- Parameters:
filters (ExportFilters) – Date range, row limit and optional game/task filters.
- Yields:
Dict[str, Any] – One serializable user-point record per row.
- Return type:
AsyncIterator[Dict[str, Any]]
- async iter_user_interactions(filters)[source]¶
Stream user-action (interaction) rows as dicts.
Backed by the
useractionstable; game/task filters are ignored here because those ids only live inside the JSONdatacolumn.- Parameters:
filters (ExportFilters) – Date range and row limit.
- Yields:
Dict[str, Any] – One serializable interaction record per row.
- Return type:
AsyncIterator[Dict[str, Any]]
- async iter_wallet_transactions(filters)[source]¶
Stream wallet-transaction rows (joined with the owning user) as dicts.
- Parameters:
filters (ExportFilters) – Date range and row limit (game/task filters do not apply).
- Yields:
Dict[str, Any] – One serializable transaction record per row.
- Return type:
AsyncIterator[Dict[str, Any]]
- iter_dataset(dataset_type, filters)[source]¶
Returns the row iterator for the requested dataset. Raises 500 if the dataset name is unknown - this should never happen because the endpoint layer already validated the path.
- Parameters:
dataset_type (str)
filters (ExportFilters)
- Return type:
AsyncIterator[Dict[str, Any]]
- static format_as_csv(rows, columns)[source]¶
Emit a UTF-8 CSV stream. Uses an in-memory StringIO buffer per row so we never accumulate the whole table.
- Parameters:
rows (AsyncIterator[Dict[str, Any]])
columns (List[str])
- Return type:
AsyncIterator[bytes]
- static format_as_json(rows)[source]¶
Emit a JSON array streamed incrementally so the response body looks like a normal JSON array to the client. Uses default=str so any unexpected type (UUID, Decimal) doesn’t crash the dump.
- Parameters:
rows (AsyncIterator[Dict[str, Any]])
- Return type:
AsyncIterator[bytes]
- static format_as_xlsx(rows, columns)[source]¶
Build an XLSX file using openpyxl write-only mode. The library buffers rows internally as it writes them out to a zip stream - for the 100k row cap this stays well below 200MB resident.
openpyxl is imported lazily so the rest of the export pipeline keeps working if the dependency is missing.
- Parameters:
rows (AsyncIterator[Dict[str, Any]])
columns (List[str])
- Return type:
AsyncIterator[bytes]
- format_iterator(dataset_type, export_format, rows)[source]¶
Wrap a row iterator in the byte-stream formatter for the chosen format.
- Parameters:
dataset_type (str) – Dataset name, used to resolve column order.
export_format (str) – One of
csv/json/xlsx.rows (AsyncIterator[Dict[str, Any]]) – The dataset rows to encode.
- Returns:
AsyncIterator[bytes] – An async iterator yielding encoded chunks.
- Raises:
InternalServerError – If
export_formatis unknown.- Return type:
AsyncIterator[bytes]
- static media_type_for(export_format)[source]¶
Return the HTTP
Content-Typefor an export format.- Parameters:
export_format (str) – One of
csv/json/xlsx.- Returns:
str – The matching media type.
- Return type:
str
- static filename_for(dataset_type, export_format)[source]¶
Build a timestamped download filename for an export.
- Parameters:
dataset_type (str) – Dataset name used as the filename stem.
export_format (str) – Format extension (
csv/json/xlsx).
- Returns:
str – A name like
users_20260609T120000Z.csv.- Return type:
str