UploadedFilePostgresRepository returns latest uploaded file by deal ID and type 🟥

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Daniel Roth 2026-06-09 11:53:25 +00:00
parent 5178cd02c5
commit 53f0da8666
4 changed files with 100 additions and 0 deletions

View file

View file

@ -0,0 +1,20 @@
from __future__ import annotations
from typing import Optional
from sqlmodel import Session
from infrastructure.postgres.uploaded_file_table import FileTypeEnum, UploadedFile
class UploadedFilePostgresRepository:
def __init__(self, session: Session) -> None:
self._session = session
def get_latest_by_hubspot_deal_id(
self, hubspot_deal_id: str, file_type: FileTypeEnum
) -> Optional[UploadedFile]:
raise NotImplementedError
def insert(self, uploaded_file: UploadedFile) -> None:
raise NotImplementedError

View file

@ -0,0 +1,80 @@
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from sqlalchemy import Engine
from sqlmodel import Session
from infrastructure.postgres.uploaded_file_table import FileTypeEnum, UploadedFile
from repositories.uploaded_file.uploaded_file_postgres_repository import (
UploadedFilePostgresRepository,
)
_DEAL_ID = "deal-abc-123"
_BUCKET = "test-bucket"
def _make_uploaded_file(
hubspot_deal_id: str = _DEAL_ID,
file_type: FileTypeEnum = FileTypeEnum.MAGIC_PLAN_JSON,
offset_seconds: int = 0,
) -> UploadedFile:
return UploadedFile(
s3_file_bucket=_BUCKET,
s3_file_key=f"documents/{hubspot_deal_id}/plan.json",
s3_upload_timestamp=datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
+ timedelta(seconds=offset_seconds),
hubspot_deal_id=hubspot_deal_id,
file_type=file_type.value,
)
def test_returns_most_recent_row_by_timestamp(db_engine: Engine) -> None:
# Arrange — two rows for the same deal/type; older first, newer second
older = _make_uploaded_file(offset_seconds=0)
newer = _make_uploaded_file(offset_seconds=60)
with Session(db_engine) as session:
session.add(older)
session.add(newer)
session.commit()
newer_id = newer.id
# Act
with Session(db_engine) as session:
result = UploadedFilePostgresRepository(session).get_latest_by_hubspot_deal_id(
_DEAL_ID, FileTypeEnum.MAGIC_PLAN_JSON
)
# Assert
assert result is not None
assert result.id == newer_id
def test_returns_none_when_no_matching_row(db_engine: Engine) -> None:
# Arrange — empty database
# Act
with Session(db_engine) as session:
result = UploadedFilePostgresRepository(session).get_latest_by_hubspot_deal_id(
"nonexistent-deal", FileTypeEnum.MAGIC_PLAN_JSON
)
# Assert
assert result is None
def test_does_not_return_row_with_different_file_type(db_engine: Engine) -> None:
# Arrange — row exists but for a different file_type
row = _make_uploaded_file(file_type=FileTypeEnum.OTHER)
with Session(db_engine) as session:
session.add(row)
session.commit()
# Act
with Session(db_engine) as session:
result = UploadedFilePostgresRepository(session).get_latest_by_hubspot_deal_id(
_DEAL_ID, FileTypeEnum.MAGIC_PLAN_JSON
)
# Assert
assert result is None