mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
103 lines
2.9 KiB
Python
103 lines
2.9 KiB
Python
from datetime import datetime, timezone
|
|
from uuid import uuid4
|
|
|
|
import pytest
|
|
from sqlalchemy import create_engine
|
|
from sqlmodel import Session, SQLModel
|
|
|
|
from backend.app.db.models import bulk_address_uploads as module
|
|
from backend.app.db.models.bulk_address_uploads import (
|
|
BulkAddressUpload,
|
|
set_combined_output_s3_uri,
|
|
set_combining_status,
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def pg_engine(postgresql):
|
|
connection_string = (
|
|
f"postgresql+psycopg://"
|
|
f"{postgresql.info.user}:"
|
|
f"{postgresql.info.password}@"
|
|
f"{postgresql.info.host}:"
|
|
f"{postgresql.info.port}/"
|
|
f"{postgresql.info.dbname}"
|
|
)
|
|
engine = create_engine(connection_string)
|
|
SQLModel.metadata.create_all(engine)
|
|
yield engine
|
|
SQLModel.metadata.drop_all(engine)
|
|
engine.dispose()
|
|
|
|
|
|
@pytest.fixture
|
|
def patched_session(pg_engine, monkeypatch):
|
|
sessions = []
|
|
|
|
def factory():
|
|
s = Session(pg_engine)
|
|
sessions.append(s)
|
|
return s
|
|
|
|
monkeypatch.setattr(module, "get_db_session", factory)
|
|
yield pg_engine
|
|
for s in sessions:
|
|
s.close()
|
|
|
|
|
|
def _insert_row(engine, task_id, status="processing"):
|
|
with Session(engine) as session:
|
|
row = BulkAddressUpload(
|
|
id=uuid4(),
|
|
portfolio_id="p1",
|
|
user_id="u1",
|
|
s3_bucket="b",
|
|
s3_key="k",
|
|
filename="f.csv",
|
|
status=status,
|
|
task_id=task_id,
|
|
created_at=datetime.now(timezone.utc),
|
|
updated_at=datetime.now(timezone.utc),
|
|
)
|
|
session.add(row)
|
|
session.commit()
|
|
|
|
|
|
def _fetch(engine, task_id):
|
|
from sqlmodel import select
|
|
with Session(engine) as session:
|
|
return session.exec(
|
|
select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id)
|
|
).first()
|
|
|
|
|
|
def test_set_combining_status_updates_row(patched_session):
|
|
task_id = uuid4()
|
|
_insert_row(patched_session, task_id, status="processing")
|
|
|
|
set_combining_status(task_id)
|
|
|
|
row = _fetch(patched_session, task_id)
|
|
assert row.status == "combining"
|
|
assert row.combined_output_s3_uri is None
|
|
|
|
|
|
def test_set_combined_output_s3_uri_writes_uri_and_awaiting_review(patched_session):
|
|
task_id = uuid4()
|
|
_insert_row(patched_session, task_id, status="combining")
|
|
|
|
set_combined_output_s3_uri(task_id, "s3://bucket/bulk_final_outputs/abc/combined.csv")
|
|
|
|
row = _fetch(patched_session, task_id)
|
|
assert row.status == "awaiting_review"
|
|
assert row.combined_output_s3_uri == "s3://bucket/bulk_final_outputs/abc/combined.csv"
|
|
|
|
|
|
def test_set_combining_status_missing_row_raises(patched_session):
|
|
with pytest.raises(ValueError, match="No bulk_address_uploads row"):
|
|
set_combining_status(uuid4())
|
|
|
|
|
|
def test_set_combined_output_s3_uri_missing_row_raises(patched_session):
|
|
with pytest.raises(ValueError, match="No bulk_address_uploads row"):
|
|
set_combined_output_s3_uri(uuid4(), "s3://x/y.csv")
|