diff --git a/backend/app/db/models/condition.py b/backend/app/db/models/condition.py index 65e85058..cecf1fde 100644 --- a/backend/app/db/models/condition.py +++ b/backend/app/db/models/condition.py @@ -44,7 +44,7 @@ class PropertyConditionSurveyModel(Base): class ElementModel(Base): - __tablename__ = "element" + __tablename__ = "element" # TODO: rename to survey_element? id = Column(BigInteger, primary_key=True, autoincrement=True) @@ -70,7 +70,7 @@ class ElementModel(Base): class AspectConditionModel(Base): - __tablename__ = "aspect_condition" + __tablename__ = "aspect_condition" # TODO: rename to survey_aspect? id = Column(BigInteger, primary_key=True, autoincrement=True) diff --git a/backend/condition/local_runner.py b/backend/condition/local_runner.py index 404f64d4..f7580139 100644 --- a/backend/condition/local_runner.py +++ b/backend/condition/local_runner.py @@ -20,7 +20,8 @@ def main() -> None: / "peabody" / "2026_01_06 - Peabody - Stock Condition Data - Survey Records - D Lower.xlsx" ) - filepaths = [lbwf_path, peabody_path] + # filepaths = [lbwf_path, peabody_path] + filepaths = [lbwf_path] for fp in filepaths: with fp.open("rb") as f: diff --git a/backend/condition/persistence/condition_postgres.py b/backend/condition/persistence/condition_postgres.py index b0ee018b..14b513cd 100644 --- a/backend/condition/persistence/condition_postgres.py +++ b/backend/condition/persistence/condition_postgres.py @@ -1,17 +1,47 @@ -from typing import List +import time +from typing import List, Optional +from sqlmodel import Session +from utils.logger import setup_logger from backend.app.db.models.condition import ( AspectConditionModel, ElementModel, PropertyConditionSurveyModel, ) from backend.condition.domain.property_condition_survey import PropertyConditionSurvey +from backend.app.db.connection import db_session + +logger = setup_logger() class ConditionPostgres: - def insert_surveys(surveys: List[PropertyConditionSurvey]) -> None: - raise NotImplementedError + def bulk_insert_surveys( + surveys: List[PropertyConditionSurvey], batch_size: Optional[int] = 100 + ) -> None: + logger.info( + f"Preparing to load {len(surveys)} property surveys to Postgres. Mapping to SQLModel objects..." + ) + survey_models: List[PropertyConditionSurveyModel] = [ + ConditionPostgres.map_survey_to_model(s) for s in surveys + ] + total: int = len(survey_models) + logger.info( + f"Finished mapping {total} surveys. Writing to database in batches of {batch_size}..." + ) + + with db_session() as session: + for start in range(0, total, batch_size): + end = min(start + batch_size, total) + batch = survey_models[start:end] + + t0: float = time.perf_counter() + ConditionPostgres._insert_surveys_batch(batch, session) + elapsed: float = time.perf_counter() - t0 + + logger.info( + f"Inserted batch {start} – {end} ({len(batch)} surveys) in {elapsed} seconds", + ) @staticmethod def map_survey_to_model( @@ -47,3 +77,10 @@ class ConditionPostgres: survey_model.elements.append(element_model) return survey_model + + @staticmethod + def _insert_surveys_batch( + surveys: List[PropertyConditionSurveyModel], session: Session + ) -> None: + session.add_all(surveys) + session.commit() diff --git a/backend/condition/processor.py b/backend/condition/processor.py index 3cbff498..4d8f16cf 100644 --- a/backend/condition/processor.py +++ b/backend/condition/processor.py @@ -1,25 +1,33 @@ from typing import Any, BinaryIO, List from datetime import datetime +from utils.logger import setup_logger from backend.condition.domain.mapping.mapper import Mapper from backend.condition.domain.property_condition_survey import PropertyConditionSurvey from backend.condition.parsing.parser import Parser -from utils.logger import setup_logger +from backend.condition.persistence.condition_postgres import ConditionPostgres from backend.condition.file_type import FileType, detect_file_type from backend.condition.parsing.factory import select_parser, select_mapper +logger = setup_logger() + def process_file(file_stream: BinaryIO, source_key: str) -> None: - print(f"[processor] Received file: {source_key}") + logger.info(f"[processor] Received file: {source_key}") # Instantiation file_type: FileType = detect_file_type(source_key) parser: Parser = select_parser(file_type) mapper: Mapper = select_mapper(file_type) + persistence = ConditionPostgres() # Orchestration raw_properties: List[Any] = parser.parse(file_stream) + logger.info( + f"[processor] Finished loading customer survey data for {len(raw_properties)} properties. Mapping..." + ) + survey_year = datetime.now().year # TODO: get this from filepath or elsewhere property_condition_surveys: List[PropertyConditionSurvey] = [] @@ -29,4 +37,10 @@ def process_file(file_stream: BinaryIO, source_key: str) -> None: mapper.map_asset_conditions_for_property(p, survey_year) ) - print("done") # temp + logger.info( + f"[processor] Finished mapping {len(property_condition_surveys)} properties. Writing to database..." + ) + + persistence.bulk_insert_surveys(property_condition_surveys) + + logger.info(f"[processor] Finished loading surveys to database")