added backend funciton to functions

This commit is contained in:
Jun-te Kim 2026-04-22 14:21:39 +00:00
parent f220c792e4
commit a6849b28b3
4 changed files with 42 additions and 37 deletions

View file

@ -0,0 +1,36 @@
from uuid import UUID
from datetime import datetime, timezone
from sqlmodel import select
from backend.app.db.connection import get_db_session
from backend.app.db.models.bulk_address_uploads import BulkAddressUpload
def set_combining_status(task_id: UUID) -> None:
now = datetime.now(timezone.utc)
with get_db_session() as session:
row = session.exec(
select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id)
).first()
if not row:
raise ValueError(f"No bulk_address_uploads row for task_id {task_id}")
row.status = "combining"
row.updated_at = now
session.add(row)
session.commit()
def set_combined_output_s3_uri(task_id: UUID, s3_uri: str) -> None:
now = datetime.now(timezone.utc)
with get_db_session() as session:
row = session.exec(
select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id)
).first()
if not row:
raise ValueError(f"No bulk_address_uploads row for task_id {task_id}")
row.combined_output_s3_uri = s3_uri
row.status = "awaiting_review"
row.updated_at = now
session.add(row)
session.commit()

View file

@ -1,12 +1,10 @@
from typing import Optional
from uuid import UUID, uuid4
from datetime import datetime, timezone
from datetime import datetime
from sqlalchemy import Column
from sqlalchemy.dialects.postgresql import JSONB
from sqlmodel import SQLModel, Field, select
from backend.app.db.connection import get_db_session
from sqlmodel import SQLModel, Field
class BulkAddressUpload(SQLModel, table=True):
@ -24,32 +22,3 @@ class BulkAddressUpload(SQLModel, table=True):
combined_output_s3_uri: Optional[str] = Field(default=None)
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
def set_combining_status(task_id: UUID) -> None:
now = datetime.now(timezone.utc)
with get_db_session() as session:
row = session.exec(
select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id)
).first()
if not row:
raise ValueError(f"No bulk_address_uploads row for task_id {task_id}")
row.status = "combining"
row.updated_at = now
session.add(row)
session.commit()
def set_combined_output_s3_uri(task_id: UUID, s3_uri: str) -> None:
now = datetime.now(timezone.utc)
with get_db_session() as session:
row = session.exec(
select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id)
).first()
if not row:
raise ValueError(f"No bulk_address_uploads row for task_id {task_id}")
row.combined_output_s3_uri = s3_uri
row.status = "awaiting_review"
row.updated_at = now
session.add(row)
session.commit()

View file

@ -8,7 +8,7 @@ from datetime import datetime, timezone
from utils.logger import setup_logger
from backend.utils.subtasks import subtask_handler
from backend.app.db.models.bulk_address_uploads import (
from backend.app.db.functions.bulk_address_uploads_functions import (
set_combined_output_s3_uri,
set_combining_status,
)

View file

@ -5,12 +5,12 @@ import pytest
from sqlalchemy import create_engine
from sqlmodel import Session, SQLModel
from backend.app.db.models import bulk_address_uploads as module
from backend.app.db.models.bulk_address_uploads import (
BulkAddressUpload,
from backend.app.db.functions import bulk_address_uploads_functions as module
from backend.app.db.functions.bulk_address_uploads_functions import (
set_combined_output_s3_uri,
set_combining_status,
)
from backend.app.db.models.bulk_address_uploads import BulkAddressUpload
@pytest.fixture(scope="function")