Load surveys to database as part of processing

This commit is contained in:
Daniel Roth 2026-01-29 12:15:52 +00:00
parent 691762216c
commit 32555640c9
4 changed files with 61 additions and 9 deletions

View file

@ -44,7 +44,7 @@ class PropertyConditionSurveyModel(Base):
class ElementModel(Base):
__tablename__ = "element"
__tablename__ = "element" # TODO: rename to survey_element?
id = Column(BigInteger, primary_key=True, autoincrement=True)
@ -70,7 +70,7 @@ class ElementModel(Base):
class AspectConditionModel(Base):
__tablename__ = "aspect_condition"
__tablename__ = "aspect_condition" # TODO: rename to survey_aspect?
id = Column(BigInteger, primary_key=True, autoincrement=True)

View file

@ -20,7 +20,8 @@ def main() -> None:
/ "peabody"
/ "2026_01_06 - Peabody - Stock Condition Data - Survey Records - D Lower.xlsx"
)
filepaths = [lbwf_path, peabody_path]
# filepaths = [lbwf_path, peabody_path]
filepaths = [lbwf_path]
for fp in filepaths:
with fp.open("rb") as f:

View file

@ -1,17 +1,47 @@
from typing import List
import time
from typing import List, Optional
from sqlmodel import Session
from utils.logger import setup_logger
from backend.app.db.models.condition import (
AspectConditionModel,
ElementModel,
PropertyConditionSurveyModel,
)
from backend.condition.domain.property_condition_survey import PropertyConditionSurvey
from backend.app.db.connection import db_session
logger = setup_logger()
class ConditionPostgres:
def insert_surveys(surveys: List[PropertyConditionSurvey]) -> None:
raise NotImplementedError
def bulk_insert_surveys(
surveys: List[PropertyConditionSurvey], batch_size: Optional[int] = 100
) -> None:
logger.info(
f"Preparing to load {len(surveys)} property surveys to Postgres. Mapping to SQLModel objects..."
)
survey_models: List[PropertyConditionSurveyModel] = [
ConditionPostgres.map_survey_to_model(s) for s in surveys
]
total: int = len(survey_models)
logger.info(
f"Finished mapping {total} surveys. Writing to database in batches of {batch_size}..."
)
with db_session() as session:
for start in range(0, total, batch_size):
end = min(start + batch_size, total)
batch = survey_models[start:end]
t0: float = time.perf_counter()
ConditionPostgres._insert_surveys_batch(batch, session)
elapsed: float = time.perf_counter() - t0
logger.info(
f"Inserted batch {start} {end} ({len(batch)} surveys) in {elapsed} seconds",
)
@staticmethod
def map_survey_to_model(
@ -47,3 +77,10 @@ class ConditionPostgres:
survey_model.elements.append(element_model)
return survey_model
@staticmethod
def _insert_surveys_batch(
surveys: List[PropertyConditionSurveyModel], session: Session
) -> None:
session.add_all(surveys)
session.commit()

View file

@ -1,25 +1,33 @@
from typing import Any, BinaryIO, List
from datetime import datetime
from utils.logger import setup_logger
from backend.condition.domain.mapping.mapper import Mapper
from backend.condition.domain.property_condition_survey import PropertyConditionSurvey
from backend.condition.parsing.parser import Parser
from utils.logger import setup_logger
from backend.condition.persistence.condition_postgres import ConditionPostgres
from backend.condition.file_type import FileType, detect_file_type
from backend.condition.parsing.factory import select_parser, select_mapper
logger = setup_logger()
def process_file(file_stream: BinaryIO, source_key: str) -> None:
print(f"[processor] Received file: {source_key}")
logger.info(f"[processor] Received file: {source_key}")
# Instantiation
file_type: FileType = detect_file_type(source_key)
parser: Parser = select_parser(file_type)
mapper: Mapper = select_mapper(file_type)
persistence = ConditionPostgres()
# Orchestration
raw_properties: List[Any] = parser.parse(file_stream)
logger.info(
f"[processor] Finished loading customer survey data for {len(raw_properties)} properties. Mapping..."
)
survey_year = datetime.now().year # TODO: get this from filepath or elsewhere
property_condition_surveys: List[PropertyConditionSurvey] = []
@ -29,4 +37,10 @@ def process_file(file_stream: BinaryIO, source_key: str) -> None:
mapper.map_asset_conditions_for_property(p, survey_year)
)
print("done") # temp
logger.info(
f"[processor] Finished mapping {len(property_condition_surveys)} properties. Writing to database..."
)
persistence.bulk_insert_surveys(property_condition_surveys)
logger.info(f"[processor] Finished loading surveys to database")