mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
77 lines
2.5 KiB
Python
77 lines
2.5 KiB
Python
"""
|
|
Postgres implementation of TaskRepository.
|
|
|
|
NOTE: this repository owns only the `tasks` table. Unlike the legacy
|
|
backend.app.db.functions.tasks.Tasks.TasksInterface.create_task, it does NOT
|
|
auto-create a child SubTask. Do not rewire existing Lambda callers to this
|
|
repo until the SubTask aggregate + TaskOrchestrator slice lands — they would
|
|
silently lose their initial SubTask row.
|
|
"""
|
|
|
|
from datetime import datetime, timezone
|
|
from uuid import UUID
|
|
|
|
from sqlmodel import Session
|
|
|
|
from domain.tasks.tasks import Task, TaskStatus
|
|
from infrastructure.postgres.task_table import TaskRow
|
|
from repositories.tasks.task_repository import TaskRepository
|
|
from utilities.private import private
|
|
|
|
|
|
class TaskPostgresRepository(TaskRepository):
|
|
def __init__(self, session: Session) -> None:
|
|
self._session = session
|
|
|
|
def create(self, task: Task) -> Task:
|
|
row = self._to_row(task)
|
|
self._session.add(row)
|
|
self._session.commit()
|
|
self._session.refresh(row)
|
|
return self._to_domain(row)
|
|
|
|
def get(self, task_id: UUID) -> Task:
|
|
row = self._session.get(TaskRow, task_id)
|
|
if row is None:
|
|
raise ValueError(f"Task {task_id} not found")
|
|
return self._to_domain(row)
|
|
|
|
def save(self, task: Task) -> None:
|
|
row = self._session.get(TaskRow, task.id)
|
|
if row is None:
|
|
raise ValueError(f"Task {task.id} not found")
|
|
row.status = task.status.value
|
|
row.job_started = task.job_started
|
|
row.job_completed = task.job_completed
|
|
row.service = task.service
|
|
row.source = task.source
|
|
row.source_id = task.source_id
|
|
row.updated_at = datetime.now(timezone.utc)
|
|
self._session.add(row)
|
|
self._session.commit()
|
|
|
|
@private
|
|
def _to_row(self, task: Task) -> TaskRow:
|
|
return TaskRow(
|
|
id=task.id,
|
|
task_source=task.task_source,
|
|
status=task.status.value,
|
|
service=task.service,
|
|
source=task.source,
|
|
source_id=task.source_id,
|
|
job_started=task.job_started,
|
|
job_completed=task.job_completed,
|
|
)
|
|
|
|
@private
|
|
def _to_domain(self, row: TaskRow) -> Task:
|
|
return Task(
|
|
id=row.id,
|
|
task_source=row.task_source,
|
|
status=TaskStatus(row.status.lower()),
|
|
service=row.service,
|
|
source=row.source,
|
|
source_id=row.source_id,
|
|
job_started=row.job_started,
|
|
job_completed=row.job_completed,
|
|
)
|