Model/repositories/tasks/task_postgres_repository.py
2026-05-19 16:35:09 +00:00

77 lines
2.5 KiB
Python

"""
Postgres implementation of TaskRepository.
NOTE: this repository owns only the `tasks` table. Unlike the legacy
backend.app.db.functions.tasks.Tasks.TasksInterface.create_task, it does NOT
auto-create a child SubTask. Do not rewire existing Lambda callers to this
repo until the SubTask aggregate + TaskOrchestrator slice lands — they would
silently lose their initial SubTask row.
"""
from datetime import datetime, timezone
from uuid import UUID
from sqlmodel import Session
from domain.tasks.tasks import Task, TaskStatus
from infrastructure.postgres.task_table import TaskRow
from repositories.tasks.task_repository import TaskRepository
from utilities.private import private
class TaskPostgresRepository(TaskRepository):
def __init__(self, session: Session) -> None:
self._session = session
def create(self, task: Task) -> Task:
row = self._to_row(task)
self._session.add(row)
self._session.commit()
self._session.refresh(row)
return self._to_domain(row)
def get(self, task_id: UUID) -> Task:
row = self._session.get(TaskRow, task_id)
if row is None:
raise ValueError(f"Task {task_id} not found")
return self._to_domain(row)
def save(self, task: Task) -> None:
row = self._session.get(TaskRow, task.id)
if row is None:
raise ValueError(f"Task {task.id} not found")
row.status = task.status.value
row.job_started = task.job_started
row.job_completed = task.job_completed
row.service = task.service
row.source = task.source
row.source_id = task.source_id
row.updated_at = datetime.now(timezone.utc)
self._session.add(row)
self._session.commit()
@private
def _to_row(self, task: Task) -> TaskRow:
return TaskRow(
id=task.id,
task_source=task.task_source,
status=task.status.value,
service=task.service,
source=task.source,
source_id=task.source_id,
job_started=task.job_started,
job_completed=task.job_completed,
)
@private
def _to_domain(self, row: TaskRow) -> Task:
return Task(
id=row.id,
task_source=row.task_source,
status=TaskStatus(row.status.lower()),
service=row.service,
source=row.source,
source_id=row.source_id,
job_started=row.job_started,
job_completed=row.job_completed,
)