import re from dataclasses import dataclass, asdict from typing import Optional, Dict, Any, Type, TypeVar from sqlalchemy.orm import Session from enum import Enum from datetime import datetime, timedelta import math import pytz import enum from backend.app.db.models.inspections import ( InspectionModel, InspectionArchetype, InspectionArchetype2, InspectionsWallConstruction, InspectionsWallInsulation, InspectionsInsulationMaterial, InspectionBorescoped, InspectionsRoofOrientation, InspectionsTileHung, InspectionsRendered, InspectionsCladding, InspectionsAccessIssues, ) NON_INTRUSIVE_PREFIX = "non-intrusives:" @dataclass class InspectionData: archetype: Optional[InspectionArchetype] = None archetype_2: Optional[InspectionArchetype2] = None wall_construction: Optional[InspectionsWallConstruction] = None insulation: Optional[InspectionsWallInsulation] = None insulation_material: Optional[InspectionsInsulationMaterial] = None borescoped: Optional[InspectionBorescoped] = None roof_orientation: Optional[InspectionsRoofOrientation] = None tile_hung: Optional[InspectionsTileHung] = None rendered: Optional[InspectionsRendered] = None cladding: Optional[InspectionsCladding] = None access_issues: Optional[InspectionsAccessIssues] = None date: Optional[datetime] = None # Reflects the date when the survey was actually conducted notes: Optional[str] = None surveyor_name: Optional[str] = None def _clean_string(value: Any) -> Optional[str]: """Normalize strings for enum matching, tolerant of NaN/None.""" if value is None: return None if isinstance(value, float) and math.isnan(value): return None if not isinstance(value, str): return None v = ( value.strip() .lower() .replace("“", '"') .replace("”", '"') .replace("’", "'") ) return re.sub(r"\s+", " ", v) E = TypeVar("E", bound=Enum) def _match_enum(value: Any, enum_cls: Type[E]) -> Optional[E]: """Case-insensitive fuzzy matching for enums, tolerant of NaN/None.""" v = _clean_string(value) if not v: return None for e in enum_cls: if v == e.value.lower(): return e for e in enum_cls: if v in e.value.lower() or e.value.lower() in v: return e return None def _lower_key_dict(d: dict) -> dict: """Convert all keys to lowercase for case-insensitive lookup.""" return {str(k).lower(): v for k, v in d.items() if isinstance(k, str)} def extract_inspection_data(config: Dict[str, Any]) -> Optional[InspectionData]: """Extract and map inspection data from a config row.""" config_lower = _lower_key_dict(config) non_intrusive_fields = { k: v for k, v in config_lower.items() if k.startswith(NON_INTRUSIVE_PREFIX) } if not non_intrusive_fields: return None data = InspectionData() data.archetype = _match_enum( config_lower.get("non-intrusives: archetype"), InspectionArchetype ) data.archetype_2 = _match_enum( config_lower.get("non-intrusives: archetype 2"), InspectionArchetype2 ) data.wall_construction = _match_enum( config_lower.get("non-intrusives: construction"), InspectionsWallConstruction ) data.insulation = _match_enum( config_lower.get("non-intrusives: insulated"), InspectionsWallInsulation ) data.insulation_material = _match_enum( config_lower.get("non-intrusives: material"), InspectionsInsulationMaterial ) data.borescoped = _match_enum( config_lower.get("non-intrusives: boroscoped?"), InspectionBorescoped ) data.roof_orientation = _match_enum( config_lower.get("non-intrusives: roof orientation"), InspectionsRoofOrientation ) data.tile_hung = _match_enum( config_lower.get("non-intrusives: tile hung"), InspectionsTileHung ) data.rendered = _match_enum( config_lower.get("non-intrusives: rendered"), InspectionsRendered ) data.cladding = _match_enum( config_lower.get("non-intrusives: cladding"), InspectionsCladding ) data.access_issues = _match_enum( config_lower.get("non-intrusives: access issues"), InspectionsAccessIssues ) data.date = config_lower.get("non-intrusives: date") data.notes = config_lower.get("non-intrusives: further surveyor notes") # convert surveyor name to title case if present data.surveyor_name = config_lower.get("non-intrusives: name of surveyor").title() if config_lower.get( "non-intrusives: name of surveyor") else None return data def bulk_upsert_inspections_pg(session: Session, inspections_map): """ Bulk insert/update inspection records: - 'created_at' = actual survey date - 'uploaded_at' = time of upload or update - If an inspection exists for the same property on the same date → overwrite - Otherwise → insert a new record """ if not inspections_map: return now = datetime.now(pytz.utc) for property_id, data in inspections_map.items(): # Extract survey date from the data record = asdict(data) survey_date = getattr(data, "survey_date", None) or record.get("survey_date") if not survey_date: continue # skip if no survey date available # Convert to UTC datetime if needed if hasattr(survey_date, "to_pydatetime"): survey_date = survey_date.to_pydatetime() if survey_date.tzinfo is None: survey_date = survey_date.replace(tzinfo=pytz.utc) record["property_id"] = property_id record["created_at"] = survey_date record["uploaded_at"] = now # Normalize enums and NaNs for key, value in record.items(): if isinstance(value, enum.Enum): record[key] = value.value elif isinstance(value, float) and math.isnan(value): record[key] = None # Find existing inspection *for same property on same day* start_of_day = survey_date.replace(hour=0, minute=0, second=0, microsecond=0) end_of_day = start_of_day + timedelta(days=1) existing_inspection = ( session.query(InspectionModel) .filter( InspectionModel.property_id == property_id, InspectionModel.created_at >= start_of_day, InspectionModel.created_at < end_of_day, ) .first() ) if existing_inspection: # Overwrite existing record (same survey day) for field, value in record.items(): setattr(existing_inspection, field, value) existing_inspection.uploaded_at = now else: # Create new inspection for new day new_inspection = InspectionModel(**record) session.add(new_inspection) session.flush()