app.services.export_service module

ExportService: builds streamed CSV/XLSX/JSON dumps for the 4 admin-facing datasets exposed by /v1/exports.

Design notes:
  • Queries use SQLAlchemy 2.0’s async streaming (session.stream) with a hard LIMIT so memory stays bounded even on the 100k-row cap.

  • Row-level transformation lives in private _serialize_* helpers so the formatter layer (CSV/JSON/XLSX) only sees flat dicts.

  • The XLSX path imports openpyxl lazily; if the dependency is not installed the endpoint surfaces a clear 503 instead of a 500.

class app.services.export_service.ExportService(export_audit_log_repository)[source]

Bases: BaseService

Streams large datasets out of Postgres without buffering them in memory.

Public surface is one async generator per dataset (iter_users etc.), plus audit-log helpers consumed by the endpoint layer.

Parameters:

export_audit_log_repository (ExportAuditLogRepository)

async audit_start(dataset_type, export_format, filters, *, api_key=None, oauth_user_id=None)[source]

Persist the audit row at the moment the export request is accepted. Written before streaming starts so interrupted downloads remain recorded with status=”started”.

Parameters:
  • dataset_type (str)

  • export_format (str)

  • filters (ExportFilters)

  • api_key (str | None)

  • oauth_user_id (str | None)

Return type:

ExportAuditLog

async audit_finish(audit_id, *, row_count, status)[source]

Mark an export audit row finished with its final row count and status.

Parameters:
  • audit_id (str) – Id of the audit row started by audit_start.

  • row_count (int) – Number of rows actually streamed.

  • status (str) – Terminal status (e.g. completed/failed).

Return type:

None

async list_history(*, limit=50, oauth_user_id=None)[source]

Return recent audit rows mapped onto the public entry schema, hiding internal fields (raw apiKey/oauth_user_id are dropped - only the requestedBy display string survives).

Parameters:
  • limit (int)

  • oauth_user_id (str | None)

Return type:

List[ExportAuditLogEntry]

async iter_users(filters)[source]

Stream user rows as plain dicts for export.

Parameters:

filters (ExportFilters) – Date range and row limit (game/task filters do not apply to users).

Yields:

Dict[str, Any] – One serializable user record per row.

Return type:

AsyncIterator[Dict[str, Any]]

async iter_user_points(filters)[source]

Stream user-point rows (joined with user/task/game) as dicts.

Parameters:

filters (ExportFilters) – Date range, row limit and optional game/task filters.

Yields:

Dict[str, Any] – One serializable user-point record per row.

Return type:

AsyncIterator[Dict[str, Any]]

async iter_user_interactions(filters)[source]

Stream user-action (interaction) rows as dicts.

Backed by the useractions table; game/task filters are ignored here because those ids only live inside the JSON data column.

Parameters:

filters (ExportFilters) – Date range and row limit.

Yields:

Dict[str, Any] – One serializable interaction record per row.

Return type:

AsyncIterator[Dict[str, Any]]

async iter_wallet_transactions(filters)[source]

Stream wallet-transaction rows (joined with the owning user) as dicts.

Parameters:

filters (ExportFilters) – Date range and row limit (game/task filters do not apply).

Yields:

Dict[str, Any] – One serializable transaction record per row.

Return type:

AsyncIterator[Dict[str, Any]]

iter_dataset(dataset_type, filters)[source]

Returns the row iterator for the requested dataset. Raises 500 if the dataset name is unknown - this should never happen because the endpoint layer already validated the path.

Parameters:
  • dataset_type (str)

  • filters (ExportFilters)

Return type:

AsyncIterator[Dict[str, Any]]

static format_as_csv(rows, columns)[source]

Emit a UTF-8 CSV stream. Uses an in-memory StringIO buffer per row so we never accumulate the whole table.

Parameters:
  • rows (AsyncIterator[Dict[str, Any]])

  • columns (List[str])

Return type:

AsyncIterator[bytes]

static format_as_json(rows)[source]

Emit a JSON array streamed incrementally so the response body looks like a normal JSON array to the client. Uses default=str so any unexpected type (UUID, Decimal) doesn’t crash the dump.

Parameters:

rows (AsyncIterator[Dict[str, Any]])

Return type:

AsyncIterator[bytes]

static format_as_xlsx(rows, columns)[source]

Build an XLSX file using openpyxl write-only mode. The library buffers rows internally as it writes them out to a zip stream - for the 100k row cap this stays well below 200MB resident.

openpyxl is imported lazily so the rest of the export pipeline keeps working if the dependency is missing.

Parameters:
  • rows (AsyncIterator[Dict[str, Any]])

  • columns (List[str])

Return type:

AsyncIterator[bytes]

format_iterator(dataset_type, export_format, rows)[source]

Wrap a row iterator in the byte-stream formatter for the chosen format.

Parameters:
  • dataset_type (str) – Dataset name, used to resolve column order.

  • export_format (str) – One of csv/json/xlsx.

  • rows (AsyncIterator[Dict[str, Any]]) – The dataset rows to encode.

Returns:

AsyncIterator[bytes] – An async iterator yielding encoded chunks.

Raises:

InternalServerError – If export_format is unknown.

Return type:

AsyncIterator[bytes]

static media_type_for(export_format)[source]

Return the HTTP Content-Type for an export format.

Parameters:

export_format (str) – One of csv/json/xlsx.

Returns:

str – The matching media type.

Return type:

str

static filename_for(dataset_type, export_format)[source]

Build a timestamped download filename for an export.

Parameters:
  • dataset_type (str) – Dataset name used as the filename stem.

  • export_format (str) – Format extension (csv/json/xlsx).

Returns:

str – A name like users_20260609T120000Z.csv.

Return type:

str