mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
49 lines
1.7 KiB
Python
49 lines
1.7 KiB
Python
from typing import Any, BinaryIO, List
|
|
from datetime import datetime
|
|
|
|
from backend.condition.lookups.uprn_lookup import UprnLookup
|
|
from utils.logger import setup_logger
|
|
from backend.condition.domain.mapping.mapper import Mapper
|
|
from backend.condition.domain.property_condition_survey import PropertyConditionSurvey
|
|
from backend.condition.parsing.parser import Parser
|
|
from backend.condition.persistence.condition_postgres import ConditionPostgres
|
|
from backend.condition.file_type import FileType, detect_file_type
|
|
from backend.condition.parsing.factory import select_parser, select_mapper
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
def process_file(
|
|
file_stream: BinaryIO, source_key: str, uprn_lookup: UprnLookup
|
|
) -> None:
|
|
logger.info(f"[processor] Received file: {source_key}")
|
|
|
|
# Instantiation
|
|
file_type: FileType = detect_file_type(source_key)
|
|
parser: Parser = select_parser(file_type, uprn_lookup)
|
|
mapper: Mapper = select_mapper(file_type)
|
|
persistence = ConditionPostgres()
|
|
|
|
# Orchestration
|
|
raw_properties: List[Any] = parser.parse(file_stream)
|
|
|
|
logger.info(
|
|
f"[processor] Finished loading customer survey data for {len(raw_properties)} properties. Mapping..."
|
|
)
|
|
|
|
survey_year = datetime.now().year # TODO: get this from filepath or elsewhere
|
|
|
|
property_condition_surveys: List[PropertyConditionSurvey] = []
|
|
|
|
for p in raw_properties:
|
|
property_condition_surveys.append(
|
|
mapper.map_asset_conditions_for_property(p, survey_year)
|
|
)
|
|
|
|
logger.info(
|
|
f"[processor] Finished mapping {len(property_condition_surveys)} properties. Writing to database..."
|
|
)
|
|
|
|
# persistence.bulk_insert_surveys(property_condition_surveys)
|
|
|
|
logger.info(f"[processor] Finished loading surveys to database")
|