survey-extraction/etl/surveyedData/surveryedData.py
2025-07-24 09:51:15 +00:00

1238 lines
No EOL
48 KiB
Python

from etl.fileReader.pdfReaderToText import pdfReaderToText
from etl.fileReader.xmlReader import xmlReader
from etl.fileReader.reportType import ReportType
import math
from xml.dom.minidom import parseString
from etl.models.preSiteNoteTypes import (
AssessorInfo, CompanyInfo,
PreSiteNotesSummaryInfo,
PreSiteNote,
PropertyDescription, Dimension, HeatingType, HeatingFromPreSiteNotes, HeatingSystemControls,
OtherDetails, WindTurbine, PhotovoltaicPanel, FlueGasHeatRecoverySystem, ShowerAndBaths,
SolarWaterHeating, HotWaterCylinder, WaterHeating, Lighting, VentilationAndCooling,
Door, Walls, Roofs, Floors, PropertyDetail, Windows
)
from pprint import pprint
from etl.models.conditionReport import (
AssessorDetails, InspectionAndProject, TheProperty, ElevationInfo,
MainElevation, Elevation,GeneralInformation,
PropertyAccess, ExternalElevation,
ExternalElevationFront, ExternalElevationRear, ExternalElevationGableOne,
ExternalElevationGableTwo, ConservatoryOrOutbuilding, AccessAndElevations,Hallway,
VentilationInfo,WindowsInfo,RoomInfo,LivingRoom, DiningRoom, Kitchen, Utility, WC,
Landing,LoftSpace,RoomInRoof, Bedroom,Bathroom, Rooms,
GeneralConditionHeatingSystem, MainHeatingOne, MainHeatingTwo,
SecondaryHeating, HeatingByRoom, Renewables, HeatingSystem, Occupant, EnergyUse,
HeatingFromConditionReport, ShowerAndBath, FridgeAndFreezers, Cooker, TumbleDryer,
OccupantAssessment, ConditionReportModel
)
from etl.models.topLevel import(
Buildings, Documents
)
import uuid
class surveyedDataProcessor():
def __init__(self, address, files):
self.address = address
self.files = files
self.pre_site_note = None
self.csr = None
self.condition_report = None
self.hubspot_deal_id = None
self.epr_with_data = None
self.epr_summary_information = None
self.epr_summary_information_file_path = None
self.full_sap_xml = None
self.lig_sap_xml = None
self.rd_sap_xml = None
self.identify_files()
def identify_files(self):
for file in self.files:
if file.lower().endswith('.pdf'):
pdf = pdfReaderToText(file)
if pdf:
if pdf.type == ReportType.QUIDOS_PRESITE_NOTE:
self.pre_site_note = pdf.get_reader()
self.address = self.pre_site_note.survey_information.address
elif pdf.type == ReportType.CHARTED_SURVEYOR_REPORT:
self.csr = pdf.get_reader()
elif pdf.type == ReportType.WARM_HOMES_CONDITION_REPORT:
self.condition_report = pdf.get_reader()
elif pdf.type == ReportType.ECO_CONDITION_REPORT:
self.condition_report = pdf.get_reader()
elif pdf.type == ReportType.ENERGY_PERFORMANCE_REPORT_WITH_DATA:
self.epr_with_data = pdf.get_reader()
elif pdf.type == ReportType.ENERGY_PERFORMANCE_REPORT_SUMMARY_INFORMATION:
self.epr_summary_information = pdf.get_reader()
self.epr_summary_information_file_path = file
elif file.lower().endswith('.xml'):
xml = xmlReader(file)
if xml:
if xml.type is ReportType.FULLSAP_XML:
self.full_sap_xml = xml.xml_obj
elif xml.type is ReportType.LIG_XML:
self.lig_sap_xml = xml.xml_obj
elif xml.type is ReportType.RDSAP_XML:
self.rd_sap_xml = xml.xml_obj
def load_condition_report(self, db_session):
general_information = self.load_general_information_from_condition_report(db_session)
access_and_elevations = self.load_access_and_elevations_from_condition_report(db_session)
rooms = self.load_rooms_from_condition_report(db_session)
heating_system = self.load_heating_system_from_condition_report(db_session)
occupant_assessment = self.load_occupant_info_from_condition_report(db_session)
condition_report = self.get_attribute_and_load(
self.condition_report,
"master_obj",
ConditionReportModel,
db_session,
additional_fields={
"general_information_id": general_information.id,
"access_and_elevations_id": access_and_elevations.id,
"rooms_id": rooms.id,
"heating_system_id": heating_system.id,
"occupancy_assessment_id": occupant_assessment.id,
},
)
# Create building table
data = {
"address": self.pre_site_note.survey_information.address,
"postcode": self.pre_site_note.survey_information.postcode,
"UPRN": self.pre_site_note.survey_information.uprn,
"landlord_id": landlord_id,
"domna_id": domna_id
}
building = self.upsert_record(
db_session=db_session,
model_class=Buildings,
data_dict=data,
lookup_field="UPRN",
)
return building
# Create document table
def load_occupant_info_from_condition_report(self, db_session):
occupant_info = self.load_occupant_information_from_condition_report(db_session)
energy_use = self.load_energy_use_from_condition_report(db_session)
heating = self.load_heating_from_condition_report(db_session)
showerAndBath = self.load_shower_and_bath_from_condition_report(db_session)
# The example I (Jun-te) had, had no appliances so skipped until I have one that needs this information
appliances = self.load_appliances_from_condition_report(db_session)
fridgeAndFreezers = self.load_fridge_and_freezer_from_condition_report(db_session)
cooker = self.load_cooker_from_condition_report(db_session)
tumble_dryer = self.load_tumble_dryer_from_condition_report(db_session)
return self.get_attribute_and_load(
self.condition_report.master_obj,
"occupancy_assessment",
OccupantAssessment,
db_session,
additional_fields={
"occupant_id": occupant_info.id,
"energy_use_id": energy_use.id,
"heating_id": heating.id,
"shower_and_bath_id": showerAndBath.id,
"fridge_and_freezers_id": fridgeAndFreezers.id,
"cooker_id": cooker.id,
"tumble_dryer_id": tumble_dryer.id
}
)
def load_tumble_dryer_from_condition_report(self, db_session):
return self.get_attribute_and_load(
self.condition_report.master_obj.occupancy_assessment,
"tumble_dryer",
TumbleDryer,
db_session,
)
def load_cooker_from_condition_report(self, db_session):
return self.get_attribute_and_load(
self.condition_report.master_obj.occupancy_assessment,
"cooker",
Cooker,
db_session,
)
def load_fridge_and_freezer_from_condition_report(self, db_session):
return self.get_attribute_and_load(
self.condition_report.master_obj.occupancy_assessment,
"fridge_and_freezers",
FridgeAndFreezers,
db_session,
)
def load_appliances_from_condition_report(self, db_session):
print("Appliances was not done, please contact jun-te to complete this!")
return None
def load_shower_and_bath_from_condition_report(self, db_session):
return self.get_attribute_and_load(
self.condition_report.master_obj.occupancy_assessment,
"shower_and_bath",
ShowerAndBath,
db_session
)
def load_heating_from_condition_report(self, db_session):
return self.get_attribute_and_load(
self.condition_report.master_obj.occupancy_assessment,
"heating",
HeatingFromConditionReport,
db_session
)
def load_energy_use_from_condition_report(self, db_session):
return self.get_attribute_and_load(
self.condition_report.master_obj.occupancy_assessment,
"energy_use",
EnergyUse,
db_session,
)
def load_occupant_information_from_condition_report(self, db_session):
return self.get_attribute_and_load(
self.condition_report.master_obj.occupancy_assessment,
"occupant",
Occupant,
db_session,
additional_fields={
"landlord_wrote_that_the_tenent_agrees_assessment_been_supplied":self.condition_report.master_obj.occupancy_assessment.occupant.landlord_has_written_confirmation_that_the_tenent_agrees_to_the_assessment_been_supplied
}
)
def load_heating_system_from_condition_report(self, db_session):
general_condition = self.load_general_condition_of_heating_system_from_condition_report(db_session)
main_heating_one = self.load_main_heating_one_from_condition_report(db_session)
main_heating_two = self.load_main_heating_two_from_condition_report(db_session)
secondary_heating = self.load_secondary_heating_from_condition_report(db_session)
heating_by_room = self.load_heating_by_room_from_condition_report(db_session)
renwables = self.load_renewables_from_condition_report(db_session)
return self.get_attribute_and_load(
self.condition_report.master_obj,
"heating_system",
HeatingSystem,
db_session,
additional_fields={
"general_condition_id": general_condition.id,
"main_heating_one_id": main_heating_one.id,
"main_heating_two_id": main_heating_two.id,
"secondary_heating_id": secondary_heating.id,
"heating_by_room_id": heating_by_room.id,
"renewables_id": renwables.id
},
)
def load_renewables_from_condition_report(self, db_session):
return self.get_attribute_and_load(
self.condition_report.master_obj.heating_system,
"renewables",
Renewables,
db_session,
)
def load_heating_by_room_from_condition_report(self, db_session):
return self.get_attribute_and_load(
self.condition_report.master_obj.heating_system,
"heating_by_room",
HeatingByRoom,
db_session,
exclude_list=False
)
def load_secondary_heating_from_condition_report(self, db_session):
return self.get_attribute_and_load(
self.condition_report.master_obj.heating_system,
"secondary_heating",
SecondaryHeating,
db_session,
)
def load_main_heating_two_from_condition_report(self, db_session):
return self.get_attribute_and_load(
self.condition_report.master_obj.heating_system,
"main_heating_two",
MainHeatingTwo,
db_session,
)
def load_main_heating_one_from_condition_report(self, db_session):
return self.get_attribute_and_load(
self.condition_report.master_obj.heating_system,
"main_heating_one",
MainHeatingOne,
db_session,
)
def load_general_condition_of_heating_system_from_condition_report(self, db_session):
return self.get_attribute_and_load(
self.condition_report.master_obj.heating_system,
"general_condition",
GeneralConditionHeatingSystem,
db_session,
)
def load_rooms_from_condition_report(self, db_session):
hallway = self.load_hallway_from_condition_report(db_session)
living_room = self.load_living_room_from_condition_report(db_session)
dining_room = self.load_dining_room_from_condition_report(db_session)
kitchen = self.load_kitchen_from_condition_report(db_session)
utility_room = self.load_utility_room_from_condition_report(db_session)
wash_chamber = self.load_wash_chamber_from_condition_report(db_session)
landing = self.load_landing_from_condition_report(db_session)
loft_space = self.load_loft_space_from_condition_report(db_session)
rir = self.load_room_in_roof_from_condition_report(db_session)
rooms = self.upsert_record(
model_class=Rooms,
data_dict=self.condition_report.master_obj.rooms.model_dump(),
db_session=db_session,
additional_fields={
"hallway_id": hallway.id,
"living_room_id": living_room.id,
"dining_room_id": dining_room.id,
"kitchen_id": kitchen.id,
"utility_id": utility_room.id,
"wash_chamber_id": wash_chamber.id,
"landing_id": landing.id,
"loft_space_id": loft_space.id,
"room_in_roof_id": rir.id,
}
)
bedrooms = self.load_bedrooms_from_condition_report(db_session, rooms.id)
bathrooms = self.load_bathrooms_from_condition_report(db_session, rooms.id)
return rooms
def load_bathrooms_from_condition_report(self, db_session, rooms_id):
bathroom_ids = []
for i, bathrooms in enumerate(self.condition_report.master_obj.rooms.bathrooms):
room_info = self.load_room_info_from_condition_report(
db_session,
self.condition_report.master_obj.rooms.bathrooms[i]
)
bathroom = self.upsert_record(
db_session=db_session,
model_class=Bathroom,
data_dict=bathrooms.model_dump(),
additional_fields={
"room_info_id": room_info.id,
"rooms_id": rooms_id,
}
)
bathroom_ids.append(bathroom.id)
return bathroom_ids
def load_bedrooms_from_condition_report(self, db_session, rooms_id):
bedroom_ids = []
for i, bedrooms in enumerate(self.condition_report.master_obj.rooms.bedrooms):
room_info = self.load_room_info_from_condition_report(
db_session,
bedrooms
)
bedroom = self.upsert_record(
db_session=db_session,
model_class=Bedroom,
data_dict=bedrooms.model_dump(),
additional_fields={
"room_info_id": room_info.id,
"rooms_id": rooms_id,
}
)
bedroom_ids.append(bedroom.id)
return bedroom_ids
def load_room_in_roof_from_condition_report(self, db_session):
room_info = None
if self.condition_report.master_obj.rooms.room_in_roof.is_there_a_room_in_roof:
room_info = self.load_room_info_from_condition_report(
db_session,
self.condition_report.master_obj.rooms.room_in_roof,
)
return self.get_attribute_and_load(
self.condition_report.master_obj.rooms,
"room_in_roof",
RoomInRoof,
db_session,
additional_fields={
"room_info_id": room_info.id if room_info else None,
}
)
def load_landing_from_condition_report(self, db_session):
room_info = None
if self.condition_report.master_obj.rooms.landing.is_there_a_landing:
room_info = self.load_room_info_from_condition_report(
db_session,
self.condition_report.master_obj.rooms.landing,
)
return self.get_attribute_and_load(
self.condition_report.master_obj.rooms,
"landing",
Landing,
db_session,
additional_fields={
"room_info_id": room_info.id if room_info else None,
}
)
def load_loft_space_from_condition_report(self, db_session):
return self.get_attribute_and_load(
self.condition_report.master_obj.rooms,
"loft_space",
LoftSpace,
db_session,
)
def load_wash_chamber_from_condition_report(self, db_session):
room_info = None
if self.condition_report.master_obj.rooms.wash_chamber.is_there_a_seperated_wc:
room_info = self.load_room_info_from_condition_report(
db_session,
self.condition_report.master_obj.rooms.wash_chamber,
)
return self.get_attribute_and_load(
self.condition_report.master_obj.rooms,
"wash_chamber",
WC,
db_session,
additional_fields={
"room_info_id": room_info.id if room_info else None,
}
)
def load_utility_room_from_condition_report(self, db_session):
room_info = None
if self.condition_report.master_obj.rooms.utility.is_there_a_utility_room:
room_info = self.load_room_info_from_condition_report(
db_session,
self.condition_report.master_obj.rooms.utility
)
return self.get_attribute_and_load(
self.condition_report.master_obj.rooms,
"utility",
Utility,
db_session,
additional_fields={
"room_info_id": room_info.id if room_info else None,
}
)
def load_kitchen_from_condition_report(self, db_session):
room_info = self.load_room_info_from_condition_report(
db_session,
self.condition_report.master_obj.rooms.kitchen,
)
return self.get_attribute_and_load(
self.condition_report.master_obj.rooms,
"kitchen",
pydanticModel=Kitchen,
db_session=db_session,
additional_fields={
"room_info_id": room_info.id if room_info else None,
}
)
def load_dining_room_from_condition_report(self, db_session):
room_info = None
if self.condition_report.master_obj.rooms.dining_room.is_there_a_dining_room:
room_info = self.load_room_info_from_condition_report(
db_session,
self.condition_report.master_obj.rooms.dining_room
)
return self.get_attribute_and_load(
self.condition_report.master_obj.rooms,
"dining_room",
pydanticModel=DiningRoom,
db_session=db_session,
additional_fields={
"room_info_id": room_info.id if room_info else None,
}
)
def load_living_room_from_condition_report(self, db_session):
room_info = self.load_room_info_from_condition_report(
db_session,
self.condition_report.master_obj.rooms.living_room,
)
return self.get_attribute_and_load(
self.condition_report.master_obj.rooms,
"living_room",
pydanticModel=LivingRoom,
db_session=db_session,
additional_fields={
"room_info_id": room_info.id if room_info else None,
}
)
def load_hallway_from_condition_report(self, db_session):
room_info = None
if self.condition_report.master_obj.rooms.hallway.is_there_a_hallway:
room_info = self.load_room_info_from_condition_report(
db_session,
self.condition_report.master_obj.rooms.hallway
)
return self.get_attribute_and_load(
self.condition_report.master_obj.rooms,
"hallway",
pydanticModel=Hallway,
db_session=db_session,
additional_fields={
"is_there_a_hallway": self.condition_report.master_obj.rooms.hallway.is_there_a_hallway,
"room_info_id": room_info.id if room_info else None,
}
)
def load_room_info_from_condition_report(self, db_session, obj):
room_info = obj.room_info
ventilation_info = room_info.ventilation_info
ventilation = self.get_attribute_and_load(
room_info,
"ventilation_info",
VentilationInfo,
db_session,
additional_fields={
"any_damp_mould_or_excessive_condensation_within_the_room": ventilation_info.are_there_any_visible_or_reported_signs_of_damp_mould_or_excessive_condensation_within_the_room
}
)
windows = self.get_attribute_and_load(
room_info,
"windows_info",
WindowsInfo,
db_session,
)
room_info = self.get_attribute_and_load(
obj,
"room_info",
RoomInfo,
db_session,
additional_fields={
"windows_info_id": windows.id,
"ventilation_info_id": ventilation.id,
}
)
return room_info
def load_ventilation_info_from_condition_report(self, db_session, obj):
self.get_attribute_and_load(
obj
)
def load_access_and_elevations_from_condition_report(self, db_session):
pa = self.get_attribute_and_load(
self.condition_report.master_obj.access_and_elevations,
"property_access",
PropertyAccess,
db_session=db_session,
additional_fields={
"more_than_1_5_meters_in_width_to_fence_or__along_the_full_gable_elevation": self.condition_report.master_obj.access_and_elevations.property_access.is_there_more_than_1_point_5_meters_in_width_to_fence_or_neighbouring_boundary_along_the_full_gable_elevation
}
)
print(f"hello junte {pa.id}")
# Front
obj = self.condition_report.master_obj.access_and_elevations.external_elevation_front
external_elevation = self.get_attribute_and_load(
obj,
"external_elevation",
ExternalElevation,
db_session=db_session,
additional_fields={
"any_signs_of_water_penetration_caused_by_failed_rainwater_goods_or_pipework": obj.external_elevation.are_there_any_signs_of_water_penetration_caused_by_failed_rainwater_goods_or_pipework,
},
)
external_elevation_front = self.upsert_record(
db_session=db_session,
model_class=ExternalElevationFront,
data_dict={
"external_elevation_id": external_elevation.id
},
lookup_field=None,
)
def external_elevation_load_for_side(obj, pydanticModel, different="do_all_answers_for_the_front_elevation_apply_to_this_wall"):
external_elevation = None
data_dict={
different: getattr(obj, different),
}
if getattr(obj, different) is False:
if getattr(obj, "external_elevation") is not None:
external_elevation = self.get_attribute_and_load(
obj,
"external_elevation",
ExternalElevation,
db_session=db_session,
additional_fields={
"any_signs_of_water_penetration_caused_by_failed_rainwater_goods_or_pipework": obj.external_elevation.are_there_any_signs_of_water_penetration_caused_by_failed_rainwater_goods_or_pipework,
},
)
data_dict.update({"external_elevation_id": external_elevation.id})
return self.upsert_record(
db_session=db_session,
model_class=pydanticModel,
data_dict=data_dict,
)
# Back
back = external_elevation_load_for_side(self.condition_report.master_obj.access_and_elevations.external_elevation_back, ExternalElevationRear)
# One
gable_one = external_elevation_load_for_side(self.condition_report.master_obj.access_and_elevations.external_elevation_gable_one, ExternalElevationGableOne)
# Two
gable_two = external_elevation_load_for_side(self.condition_report.master_obj.access_and_elevations.external_elevation_gable_two, ExternalElevationGableTwo, different="is_there_a_fourth_external_elevation")
# CO
co = self.get_attribute_and_load(
self.condition_report.master_obj.access_and_elevations,
"conservatory_or_out_building",
ConservatoryOrOutbuilding,
db_session=db_session,
)
return self.upsert_record(
db_session=db_session,
model_class=AccessAndElevations,
data_dict={
"property_access_id": pa.id,
"external_elevation_front_id": external_elevation_front.id,
"external_elevation_back_id": back.id,
"external_elevation_gable_one_id": gable_one.id,
"external_elevation_gable_two_id": gable_two.id,
"conservatory_or_out_building_id": co.id,
}
)
def load_general_information_from_condition_report(self, db_session):
assessors_data = self.condition_report.master_obj.general_information.assessor_details.model_dump()
assessor_details = self.upsert_record(
db_session=db_session,
model_class=AssessorDetails,
data_dict=assessors_data,
lookup_field="elmhurst_id",
)
inspection_data = self.condition_report.master_obj.general_information.inspection_and_project.model_dump()
inspection_and_project = self.upsert_record(
db_session=db_session,
model_class=InspectionAndProject,
data_dict=inspection_data,
lookup_field=None
)
property_data = self.condition_report.master_obj.general_information.the_property.model_dump()
the_property = self.upsert_record(
db_session=db_session,
model_class=TheProperty,
data_dict=property_data,
lookup_field=None
)
# Main elevation
def upload_elevation_and_return_elevation(data):
return self.upsert_record(
db_session=db_session,
model_class=ElevationInfo,
data_dict=data,
lookup_field=None
)
elevation_data = self.condition_report.master_obj.general_information.main_elevation.elevation_info.model_dump()
elevation_info = upload_elevation_and_return_elevation(elevation_data)
main_elevation = self.upsert_record(
db_session=db_session,
model_class=MainElevation,
data_dict={
"elevation_info_id": elevation_info.id
},
lookup_field=None
)
# elevations multiple, elevation definition first then elevationInfo
elevations_data = self.condition_report.master_obj.general_information.elevations.model_dump()
elevations = self.upsert_record(
db_session=db_session,
model_class=Elevation,
data_dict={
"protected_conservatory_or_aonb": elevations_data.get("protected_conservatory_or_aonb"),
"material_type": elevations_data.get("material_type"),
"visible_signs_of_existing_wall_insulation": elevations_data.get("are_there_any_visible_signs_of_existing_wall_insulation"),
"ground_level_bridge_the_dpc": elevations_data.get("does_the_existing_ground_level_on_any_elevation_bridge_the_dpc"),
},
lookup_field=None,
)
for i, elevation in enumerate(self.condition_report.master_obj.general_information.elevations.info):
data = elevation.model_dump()
data.update({"elevation_id": elevations.id})
elevation_info = upload_elevation_and_return_elevation(data)
return self.upsert_record(
db_session=db_session,
model_class=GeneralInformation,
data_dict={
"assessor_detail_id": assessor_details.id,
"inspection_and_project_id": inspection_and_project.id,
"the_property_id": the_property.id,
"main_elevation_id": main_elevation.id,
"elevations_id": elevations.id,
},
lookup_field=None,
)
def load_pre_site_notes_summary_table(self, db_session):
summary_data = self.pre_site_note.survey_information.model_dump()
return self.upsert_record(
db_session=db_session,
model_class=PreSiteNotesSummaryInfo,
data_dict=summary_data,
lookup_field="reference_number"
)
def get_attribute_and_load(self, obj, attr_string, pydanticModel, db_session, lookup_field=None, additional_fields={}, exclude_list=True):
found = getattr(obj, attr_string, None)
if found:
print(f"Uploading to data base {found}")
print(f"Uploaded to database with this dict {found.model_dump()}")
print(f"Pydantic model {pydanticModel}")
if found.model_dump():
db = self.upsert_record(
db_session=db_session,
model_class=pydanticModel,
data_dict=found.model_dump(),
lookup_field=lookup_field,
additional_fields=additional_fields,
exclude_list=exclude_list,
)
return db
return None
def load_property_description(self, db_session):
def check_if_attribute_exists(obj, attribute):
a = getattr(obj, attribute, None)
if a:
return True
else:
return False
property_des = self.pre_site_note.property_description.model_dump()
# Seconday Heating
secondary_heating = self.get_attribute_and_load(
self.pre_site_note.property_description,
"secondaryHeatingType",
HeatingType,
db_session
)
# main heating 2 and main heating 2 controls
mainheating2 = None
mainheating2controls = None
if check_if_attribute_exists(self.pre_site_note.property_description, "mainHeating2"):
if check_if_attribute_exists(self.pre_site_note.property_description.mainHeating2, "controls"):
mainheating2controls = self.get_attribute_and_load(self.pre_site_note.property_description.mainHeating2, "controls", HeatingSystemControls, db_session)
data = self.pre_site_note.property_description.mainHeating2.model_dump()
if data:
mainheating2 = self.upsert_record(
db_session=db_session,
model_class=HeatingFromPreSiteNotes,
data_dict=data,
lookup_field=None,
additional_fields= {"controls_id": mainheating2controls.id},
)
# main heating and main heating control
mainheating = None
mainheatingcontrols = None
if check_if_attribute_exists(self.pre_site_note.property_description, "mainHeating"):
if check_if_attribute_exists(self.pre_site_note.property_description.mainHeating, "controls"):
print(self.pre_site_note.property_description.mainHeating)
mainheatingcontrols = self.get_attribute_and_load(self.pre_site_note.property_description.mainHeating, 'controls', HeatingSystemControls, db_session)
data = self.pre_site_note.property_description.mainHeating.model_dump()
if data:
mainheating = self.upsert_record(
db_session=db_session,
model_class=HeatingFromPreSiteNotes,
data_dict=data,
lookup_field=None,
additional_fields={"controls_id": mainheatingcontrols.id}
)
# Other details
otherDetails = self.get_attribute_and_load(
self.pre_site_note.property_description,
"otherDetails",
OtherDetails,
db_session
)
# windTurbine
windTurbine = self.get_attribute_and_load(
self.pre_site_note.property_description,
"windTurbine",
WindTurbine,
db_session
)
#photo_volatic_panel
photo_volatic_panel = self.get_attribute_and_load(
self.pre_site_note.property_description,
"photovoltaicPanel",
PhotovoltaicPanel,
db_session,
)
#fluegasheatrecoverysystem
flue_gas_heat_recovery_system = self.get_attribute_and_load(
self.pre_site_note.property_description,
"flueGasHeatRecoverySystem",
FlueGasHeatRecoverySystem,
db_session,
)
#shower and baths
shower_and_baths = self.get_attribute_and_load(
self.pre_site_note.property_description,
"showerAndBaths",
ShowerAndBaths,
db_session,
)
#solar water heating
solar_water_heating = self.get_attribute_and_load(
self.pre_site_note.property_description,
"solarWaterHeating",
SolarWaterHeating,
db_session,
)
# hotwatercycling
hot_water_cylinder = self.get_attribute_and_load(
self.pre_site_note.property_description,
"hotWaterCylinder",
HotWaterCylinder,
db_session,
)
# water heating
water_heating = self.get_attribute_and_load(
self.pre_site_note.property_description,
"waterHeating",
WaterHeating,
db_session,
)
# lighting
lighting = self.get_attribute_and_load(
self.pre_site_note.property_description,
"lighting",
Lighting,
db_session,
)
# ventilation and cooling
ventilation_and_cooling = self.get_attribute_and_load(
self.pre_site_note.property_description,
"ventilationAndCooling",
VentilationAndCooling,
db_session,
)
# door
door = self.get_attribute_and_load(
self.pre_site_note.property_description,
"door",
Door,
db_session,
)
def upload_property_detail(property_part="main_property"):
if check_if_attribute_exists(self.pre_site_note.property_description, property_part):
wall = None
obj = getattr(self.pre_site_note.property_description, property_part)
if check_if_attribute_exists(obj, "wall"):
wall = self.get_attribute_and_load(obj, "wall", Walls, db_session)
roof = None
if check_if_attribute_exists(obj, "roof"):
roof = self.get_attribute_and_load(obj, "roof", Roofs, db_session)
floor = None
if check_if_attribute_exists(obj, "floor"):
floor = self.get_attribute_and_load(obj, "floor", Floors, db_session)
property_detail = self.upsert_record(
db_session=db_session,
model_class=PropertyDetail,
data_dict={
"age_band": obj.age_band,
"floor_id": floor.id if floor else None,
"roof_id": roof.id if roof else None,
"wall_id": wall.id if wall else None,
},
lookup_field=None,
)
dimensions = []
if check_if_attribute_exists(obj, "dimensions"):
dimension_obj = getattr(obj, "dimensions")
for eachDimension in dimension_obj:
data = eachDimension.model_dump()
dimension = self.upsert_record(
db_session=db_session,
model_class=Dimension,
data_dict=data,
lookup_field=None,
additional_fields={"property_detail_id": property_detail.id},
)
dimensions.append(dimension.id)
windows = []
if check_if_attribute_exists(obj, "windows"):
windows_obj = getattr(obj, "windows")
for eachWindow in windows_obj:
data = eachWindow.model_dump()
window = self.upsert_record(
db_session=db_session,
model_class=Windows,
data_dict=data,
lookup_field=None,
additional_fields={"property_detail_id": property_detail.id},
)
windows.append(window.id)
return property_detail
# main_property
main_property = upload_property_detail("main_property")
ex1_property = upload_property_detail("ex1_property")
ex2_property = upload_property_detail("ex2_property")
ex3_property = upload_property_detail("ex3_property")
ex4_property = upload_property_detail("ex4_property")
data = self.pre_site_note.property_description.model_dump()
def remove_dicts_and_lists(data):
if isinstance(data, dict):
# Create a new dict with only primitive types (ignore dicts/lists)
return {
k: remove_dicts_and_lists(v)
for k, v in data.items()
if not isinstance(v, (dict, list))
}
elif isinstance(data, list):
# Remove lists entirely
return None
else:
return data
data = remove_dicts_and_lists(data)
property_description = self.upsert_record(
db_session=db_session,
model_class=PropertyDescription,
data_dict=data,
lookup_field=None,
additional_fields={
"main_heating_id": mainheating.id if mainheating else None,
"main_heating_controls_id": mainheatingcontrols.id if mainheatingcontrols else None,
"main_heating2_id": mainheating2.id if mainheating2 else None,
"main_heating2_controls_id": mainheating2controls.id if mainheating2controls else None,
"secondary_heating_type_id": secondary_heating.id if secondary_heating else None,
"other_details_id": otherDetails.id if otherDetails else None,
"wind_turbine_id": windTurbine.id if windTurbine else None,
"photovoltaic_panel_id": photo_volatic_panel.id if photo_volatic_panel else None,
"flue_gas_heat_recovery_system_id": flue_gas_heat_recovery_system.id if flue_gas_heat_recovery_system else None,
"shower_and_baths_id": shower_and_baths.id if shower_and_baths else None,
"solar_water_heating_id": solar_water_heating.id if solar_water_heating else None,
"hot_water_cylinder_id": hot_water_cylinder.id if hot_water_cylinder else None,
"water_heating_id": water_heating.id if water_heating else None,
"lighting_id": lighting.id if lighting else None,
"ventilation_and_cooling_id": ventilation_and_cooling.id if ventilation_and_cooling else None,
"door_id": door.id if door else None,
"main_property_id": main_property.id if main_property else None,
"ex1_property_id": ex1_property.id if ex1_property else None,
"ex2_property_id": ex2_property.id if ex2_property else None,
"ex3_property_id": ex3_property.id if ex3_property else None,
"ex4_property_id": ex4_property.id if ex4_property else None,
}
)
return property_description
def load_company_table(self, db_session):
company_data = self.pre_site_note.company_information.model_dump()
return self.upsert_record(
db_session=db_session,
model_class=CompanyInfo,
data_dict=company_data,
lookup_field="trading_name"
)
def create_document_table_via_pre_site_note(self, db_session, pre_site_note, assessor, building):
data = {
"assessor_id": assessor.id,
"created_at": self.pre_site_note.survey_information.inspection_date,
"document_type": ReportType.QUIDOS_PRESITE_NOTE,
"building_id": building.id,
"target_table": "pre_site_note",
"target_id": pre_site_note.id
}
return self.upsert_record(
db_session=db_session,
model_class=Documents,
data_dict=data,
lookup_field=None,
)
def create_buildings_table(
self,
db_session,
landlord_id,
domna_id,
):
data = {
"address": self.pre_site_note.survey_information.address,
"postcode": self.pre_site_note.survey_information.postcode,
"UPRN": self.pre_site_note.survey_information.uprn,
"landlord_id": landlord_id,
"domna_id": domna_id
}
building = self.upsert_record(
db_session=db_session,
model_class=Buildings,
data_dict=data,
lookup_field="UPRN",
)
return building
def create_pre_site_note_table(
self,
db_session,
assessor,
summary_info,
pre_site_note_description,
):
preSiteNote = PreSiteNote(
summary_info_id=summary_info.id,
assessor_id=assessor.id,
pre_site_note_description_id=pre_site_note_description.id,
)
db_session.add(preSiteNote)
db_session.commit()
return preSiteNote
def upsert_record(
self,
db_session,
model_class,
data_dict,
lookup_field=None,
update_if_exists: bool = False,
additional_fields: dict = None,
exclude_list=True
):
def remove_nested_dicts_and_lists(data: dict):
if exclude_list:
return {k: v for k, v in data.items() if not isinstance(v, (dict, list))}
else:
return {k: v for k, v in data.items() if not isinstance(v, (dict))}
clean_data = remove_nested_dicts_and_lists(data_dict)
# Merge additional fields if provided
if additional_fields:
clean_data.update(additional_fields)
if lookup_field is not None:
lookup_value = clean_data.get(lookup_field)
if not lookup_value:
raise ValueError(f"Missing lookup field '{lookup_field}' in data.")
# Try to find existing record
existing_record = db_session.query(model_class).filter(
getattr(model_class, lookup_field) == lookup_value
).first()
if existing_record:
# Update existing record if update_if_exists is True
if update_if_exists:
for key, value in clean_data.items():
setattr(existing_record, key, value)
db_session.commit()
return existing_record
# Filter out invalid fields that don't exist in the model class
valid_fields = [field for field in clean_data if hasattr(model_class, field)]
print(f"valid fields {valid_fields}")
clean_data = {field: clean_data[field] for field in valid_fields}
print(f'clean data is {clean_data}')
# Handle Pydantic models (with model_validate or parse_obj)
new_record = model_class(**clean_data)
# Add the new record to the session and commit
db_session.add(new_record)
db_session.commit()
return new_record
def load_assessor_table(self, db_session):
company = self.load_company_table(db_session)
assessor_data = self.pre_site_note.assessor_information.model_dump()
return self.upsert_record(
db_session=db_session,
model_class=AssessorInfo,
data_dict=assessor_data,
lookup_field="accreditation_number",
additional_fields={"company_id": company.id}
)
def get_insulation_info(self):
if self.csr:
if self.csr.insulation_info:
insultation = self.csr.insulation_info.type.upper()
return insultation
return None
@staticmethod
def get_band(sap_score_number):
bands = [
("HIGH A", 96, float("inf")),
("LOW A", 92, 96),
("HIGH B", 86, 92),
("LOW B", 81, 86),
("HIGH C", 74.5, 81),
("LOW C", 69, 74.5),
("HIGH D", 61.5, 69),
("LOW D", 55, 61.5),
("HIGH E", 46.5, 55),
("LOW E", 39, 46.5),
("HIGH F", 29.5, 39),
("LOW F", 21, 29.5),
("HIGH G", 10.5, 21),
("LOW G", 1, 10.5),
]
for band, lower, upper in bands:
if lower <= sap_score_number < upper:
return band
return None
@staticmethod
def gbis_or_eco4_scheme(presap_letter, postsap_letter):
"""
*ECO4 (minimum movement)*
D to C
E to C
F to D
G to D
G -> ABCD
F -> ABCD
E -> ABC
D -> ABC
*GBIS - Cavity Wall Insulation ONLY*
D-D
E-D
E-E
F-E
F-F
G-E
G-F
G-G
"""
eco4 = {
"G": ['A', 'B', 'C', 'D'],
"F": ['A', 'B', 'C', 'D'],
"E": ['A', 'B', 'C'],
"D": ['A', 'B', 'C'],
}
gbis ={
'D': ['D'],
'E': ['E', 'D'],
'F': ['E', 'F'],
'G': ['E', 'F', 'G'],
}
if presap_letter.upper() in eco4:
if postsap_letter.upper() in eco4[presap_letter.upper()]:
return "ECO4"
if presap_letter.upper() in gbis:
if postsap_letter.upper() in gbis[presap_letter.upper()]:
return "GBIS"
return None
def work_out_total_floor_area(self):
total = 0
def add_all_floors(floor_list):
total = 0
for floor in floor_list:
total += floor.floor_area_m2
return total
main = True if self.pre_site_note.property_description.no_of_main_property > 0 else False
ext1 = True if self.pre_site_note.property_description.no_of_extension_1 > 0 else False
ext2 = True if self.pre_site_note.property_description.no_of_extension_2 > 0 else False
ext3 = True if self.pre_site_note.property_description.no_of_extension_3 > 0 else False
ext4 = True if self.pre_site_note.property_description.no_of_extension_4 > 0 else False
total += add_all_floors(self.pre_site_note.property_description.main_property.dimensions) if main is True else 0
total += add_all_floors(self.pre_site_note.property_description.ex1_property.dimensions) if ext1 is True else 0
total += add_all_floors(self.pre_site_note.property_description.ex2_property.dimensions) if ext2 is True else 0
total += add_all_floors(self.pre_site_note.property_description.ex3_property.dimensions) if ext3 is True else 0
total += add_all_floors(self.pre_site_note.property_description.ex4_proprerty.dimensions) if ext4 is True else 0
floor_area = math.ceil(total) if total%1 >=0.5 else math.floor(total)
if 0 <= floor_area <= 72:
return '0-72m', floor_area
elif 72 < floor_area <= 97:
return '73-97m', floor_area
elif 97 < floor_area <= 199:
return '98-199m', floor_area
elif 199 <= floor_area:
return 'over 200m', floor_area
else:
raise NotImplementedError(f"unknown floor area {floor_area} {self.pre_site_note.summary_information.address}")
def get_current_sap_score(self):
score_list = self.pre_site_note.survey_information.current_sap.split(" ")
score = int(score_list[1])
return score