mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
371 lines
14 KiB
Python
371 lines
14 KiB
Python
import re
|
|
from tqdm import tqdm
|
|
import pandas as pd
|
|
from backend.onboarders.base import OnboarderBase
|
|
# Parity mappings
|
|
from backend.onboarders.mappings.parity.property_type import parity_map as property_map
|
|
from backend.onboarders.mappings.parity.age_band import parity_map as age_band_map
|
|
from backend.onboarders.mappings.parity.built_form import parity_map as built_form_map
|
|
from backend.onboarders.mappings.parity.walls import wall_map, wall_unknown_age_fallback, WALL_DESCRIPTION_EFFICIENCIES
|
|
from onboarders.mappings.parity.roof import roof_map, roof_unknown_age_fallback, resolve_roof_efficiency
|
|
from onboarders.mappings.parity.floor import floor_map
|
|
from onboarders.mappings.parity.heating import heating_map
|
|
from onboarders.mappings.parity.glazing import glazing_map
|
|
from backend.onboarders.mappings.parity.as_built_wall_classifiers import as_built_wall_classifiers
|
|
from backend.onboarders.mappings.parity.as_built_roof_classifiers import as_built_roof_classifiers
|
|
from backend.onboarders.mappings.parity.as_built_floor_classifiers import (
|
|
as_built_floor_classifiers, unknown_as_built_floor_classifiers
|
|
)
|
|
from datatypes.epc.roof import EpcRoofDescriptions
|
|
from datatypes.epc.floor import EpcFloorDescriptions
|
|
from datatypes.epc.construction_age_band import EpcConstructionAgeBand
|
|
from datatypes.epc.walls import EpcWallDescriptions
|
|
from datatypes.epc.efficiency import EpcEfficiency
|
|
|
|
tqdm.pandas()
|
|
|
|
|
|
class ParityOnboarder(OnboarderBase):
|
|
|
|
def __init__(
|
|
self,
|
|
fileuri: str,
|
|
file_format: str,
|
|
**kwargs
|
|
):
|
|
# Extract bucket, and filekey; Will be in the format s3://bucket/key
|
|
self.bucket_name = fileuri.split("/")[2]
|
|
self.input_file_name = "/".join(fileuri.split("/")[3:])
|
|
# Also prepare output file name
|
|
self.output_file_name = self.input_file_name.replace("." + file_format, "") + "_transformed.csv"
|
|
|
|
self.read_s3(file_format=file_format, **kwargs)
|
|
pass
|
|
|
|
def map_construction_age_band(self):
|
|
self.data[self.landlord_construction_age_band] = self.data["Construction Years"].map(age_band_map)
|
|
self.assert_nulls_only_from_source_nulls(
|
|
self.data, "Construction Years", self.landlord_construction_age_band
|
|
)
|
|
|
|
def map_property_type(self):
|
|
self.data[self.landlord_property_type] = self.data["Type"].map(property_map)
|
|
self.assert_no_nulls(self.data, self.landlord_property_type)
|
|
|
|
def map_built_form(self):
|
|
self.data[self.landlord_built_form] = self.data["Attachment"].map(built_form_map)
|
|
self.assert_no_nulls(self.data, self.landlord_built_form)
|
|
|
|
@staticmethod
|
|
def _fill_wall_as_built(row: pd.Series) -> EpcWallDescriptions | None:
|
|
"""
|
|
Utility function, used by map_wall_construction in parity transformation module
|
|
:param row: row of input sustainability data, being transformed
|
|
:return: EpcWallDescriptions, the as built wall description for the input row, based on the wall construction
|
|
type and age band
|
|
"""
|
|
# Already resolved via direct mapping
|
|
if row.landlord_wall_construction is not None:
|
|
return row.landlord_wall_construction
|
|
|
|
wall_type = row["Wall Construction"]
|
|
|
|
# Missing construction age → conservative fallback
|
|
if pd.isnull(row.landlord_construction_age_band):
|
|
return wall_unknown_age_fallback.get(wall_type)
|
|
|
|
classifier = as_built_wall_classifiers.get(wall_type)
|
|
if classifier is None:
|
|
return None
|
|
|
|
return classifier(row.landlord_construction_age_band)
|
|
|
|
@staticmethod
|
|
def _resolve_wall_efficiency(
|
|
description: EpcWallDescriptions,
|
|
age_band: EpcConstructionAgeBand | None,
|
|
) -> EpcEfficiency:
|
|
# Unknown / holding descriptions → efficiency unknown
|
|
if "unknown insulation" in description.value.lower():
|
|
return EpcEfficiency.NA
|
|
|
|
rule = WALL_DESCRIPTION_EFFICIENCIES.get(description)
|
|
|
|
if rule is None:
|
|
return EpcEfficiency.NA
|
|
|
|
if isinstance(rule, EpcEfficiency):
|
|
return rule
|
|
|
|
# Rule needs age band but we don't have one
|
|
if age_band is None or pd.isnull(age_band):
|
|
return EpcEfficiency.NA
|
|
|
|
return rule(age_band)
|
|
|
|
def map_wall_construction(self):
|
|
self.data[self.landlord_wall_construction] = (
|
|
self.data[["Wall Construction", "Wall Insulation"]]
|
|
.apply(tuple, axis=1)
|
|
.map(wall_map)
|
|
)
|
|
|
|
self.data[self.landlord_wall_construction] = self.data.progress_apply(self._fill_wall_as_built, axis=1)
|
|
|
|
# Sanity check
|
|
self.assert_no_nulls(self.data, self.landlord_wall_construction)
|
|
|
|
self.data[self.landlord_wall_efficiency] = self.data.progress_apply(
|
|
lambda row: self._resolve_wall_efficiency(
|
|
row.landlord_wall_construction,
|
|
row.landlord_construction_age_band,
|
|
),
|
|
axis=1,
|
|
)
|
|
# Additional santify check
|
|
self.assert_no_nulls(self.data, self.landlord_wall_efficiency)
|
|
|
|
@staticmethod
|
|
def _fill_roof_as_built(row: pd.Series) -> EpcRoofDescriptions | None:
|
|
# Already resolved
|
|
if not pd.isnull(row.landlord_roof_construction):
|
|
return row.landlord_roof_construction
|
|
|
|
roof_type = row["Roof Construction"]
|
|
|
|
classifier = as_built_roof_classifiers.get(roof_type)
|
|
if classifier is None:
|
|
raise NotImplementedError(f"No roof classifier for roof type '{roof_type}'")
|
|
|
|
if pd.isnull(row.landlord_construction_age_band):
|
|
return roof_unknown_age_fallback.get(roof_type)
|
|
|
|
output = classifier(row.landlord_construction_age_band)
|
|
if output is None:
|
|
raise NotImplementedError(
|
|
f"Roof classification returned None for roof type '{roof_type}'"
|
|
)
|
|
|
|
return output
|
|
|
|
@staticmethod
|
|
def _extract_insulation_thickness(value: str | None) -> int | None:
|
|
"""
|
|
Extract insulation thickness in mm from a string like 'mm150'.
|
|
Returns None if not present or not parseable.
|
|
"""
|
|
if value is None or pd.isnull(value):
|
|
return None
|
|
|
|
match = re.search(r"(\d+)", str(value))
|
|
if not match:
|
|
return None
|
|
|
|
return int(match.group(1))
|
|
|
|
def map_roof_construction(self):
|
|
self.data[self.landlord_roof_construction] = (
|
|
self.data[["Roof Construction", "Roof Insulation"]]
|
|
.progress_apply(tuple, axis=1)
|
|
.map(roof_map)
|
|
)
|
|
|
|
self.data[self.landlord_roof_construction] = self.data.progress_apply(
|
|
self._fill_roof_as_built,
|
|
axis=1,
|
|
)
|
|
|
|
# sanity check
|
|
self.assert_no_nulls(self.data, self.landlord_roof_construction)
|
|
|
|
self.data["roof_insulation_thickness_mm"] = self.data["Roof Insulation"].apply(
|
|
self._extract_insulation_thickness
|
|
)
|
|
|
|
self.data[self.landlord_roof_efficiency] = self.data.progress_apply(
|
|
lambda row: resolve_roof_efficiency(
|
|
description=row.landlord_roof_construction,
|
|
age_band=row.landlord_construction_age_band,
|
|
insulation_thickness=row.roof_insulation_thickness_mm,
|
|
),
|
|
axis=1,
|
|
)
|
|
# sanity check
|
|
self.assert_no_nulls(self.data, self.landlord_roof_efficiency)
|
|
|
|
# Flag sloping ceiling
|
|
self.data[self.landlord_has_sloping_ceiling] = self.data["Roof Construction"].apply(
|
|
lambda x: x == "PitchedWithSlopingCeiling"
|
|
)
|
|
|
|
@staticmethod
|
|
def _fill_floor_as_built(row: pd.Series):
|
|
# 1. Already resolved
|
|
if row.landlord_floor_construction is not None:
|
|
return row.landlord_floor_construction
|
|
|
|
age_band = row.landlord_construction_age_band
|
|
floor_type = row["Floor Construction"]
|
|
insulation = row["Floor Insulation"]
|
|
|
|
# 2. Missing age band → conservative fallback
|
|
if pd.isnull(age_band):
|
|
return EpcFloorDescriptions.unknown
|
|
|
|
# 3. Known floor types
|
|
if floor_type in ["Solid", "SuspendedTimber", "SuspendedNotTimber"]:
|
|
classifier = as_built_floor_classifiers[floor_type]
|
|
return classifier(age_band)
|
|
|
|
# 4. Unknown floor type
|
|
if floor_type == "Unknown":
|
|
classifier = unknown_as_built_floor_classifiers[insulation]
|
|
return classifier(age_band)
|
|
|
|
# 5. Truly missing / garbage input
|
|
return EpcFloorDescriptions.unknown
|
|
|
|
def map_floor_construction(self):
|
|
self.data[self.landlord_floor_construction] = (
|
|
self.data[["Floor Construction", "Floor Insulation"]]
|
|
.progress_apply(tuple, axis=1)
|
|
.map(floor_map)
|
|
)
|
|
|
|
self.data[self.landlord_floor_construction] = self.data.progress_apply(
|
|
self._fill_floor_as_built,
|
|
axis=1,
|
|
)
|
|
|
|
self.assert_no_nulls(self.data, self.landlord_floor_construction)
|
|
|
|
def map_glazing(self):
|
|
# TODO: probably doesn't make sense to store multi glazed proportion, glazed type or glazed area.
|
|
# There is maybe an argument for landlord_multi_glaze_proportion as this could be variable,
|
|
# however
|
|
self.data[
|
|
[
|
|
self.landlord_windows_type,
|
|
self.landlord_windows_efficiency,
|
|
self.landlord_multi_glaze_proportion,
|
|
self.landlord_glazed_type,
|
|
self.landlord_glazed_area
|
|
]
|
|
] = self.data["Glazing"].map(glazing_map).progress_apply(pd.Series)
|
|
|
|
def map_heating(self):
|
|
# TODO - when mapping heating controls, we should check the existing heating controls and the efficiency rating
|
|
# For sub optimal heating controls, we're going to make an assumption as to what the heating controls are
|
|
# and the energy efficiency rating we prescribe here may not be accurate. We therefore use this as an
|
|
# upper limit
|
|
# as opposed to a guaranteed efficiency rating. To stress, this is only relevant for sub optimal heating
|
|
# controls. E.g. it may be programmer and room thermostat
|
|
self.data[
|
|
[
|
|
self.landlord_heating_construction,
|
|
self.landlord_heating_efficiency,
|
|
self.landlord_fuel_type,
|
|
self.landlord_heating_controls,
|
|
self.landlord_heating_controls_efficiency,
|
|
self.landlord_hot_water_system,
|
|
self.landlord_hot_water_efficiency
|
|
]
|
|
] = self.data[
|
|
[
|
|
"Heating",
|
|
"Boiler Efficiency",
|
|
"Main Fuel",
|
|
"Controls Adequacy"
|
|
]
|
|
].progress_apply(tuple, axis=1).map(heating_map).progress_apply(pd.Series)
|
|
|
|
def map_floor_area(self):
|
|
# This is just a rename
|
|
self.data = self.data.rename(
|
|
columns={"Total Floor Area (m2)": self.landlord_total_floor_area_m2}
|
|
)
|
|
|
|
def select_columns(self):
|
|
self.data = self.data[
|
|
[
|
|
"Org Ref",
|
|
"UPRN",
|
|
"Address 1",
|
|
"Address 2",
|
|
"Address 3",
|
|
"Postcode",
|
|
self.landlord_total_floor_area_m2,
|
|
self.landlord_construction_age_band,
|
|
self.landlord_property_type,
|
|
self.landlord_built_form,
|
|
self.landlord_wall_construction,
|
|
self.landlord_wall_efficiency,
|
|
self.landlord_roof_construction,
|
|
self.landlord_roof_efficiency,
|
|
self.landlord_has_sloping_ceiling,
|
|
self.landlord_floor_construction,
|
|
self.landlord_windows_type,
|
|
self.landlord_windows_efficiency,
|
|
self.landlord_multi_glaze_proportion,
|
|
self.landlord_glazed_type,
|
|
self.landlord_glazed_area,
|
|
self.landlord_heating_construction,
|
|
self.landlord_heating_efficiency,
|
|
self.landlord_fuel_type,
|
|
self.landlord_heating_controls,
|
|
self.landlord_heating_controls_efficiency,
|
|
self.landlord_hot_water_system,
|
|
self.landlord_hot_water_efficiency
|
|
]
|
|
].rename(
|
|
columns={
|
|
"Org Ref": "landlord_property_id",
|
|
"Address1": "address1",
|
|
"Address2": "address2",
|
|
"Address3": "address3",
|
|
"Postcode": "postcode",
|
|
}
|
|
)
|
|
|
|
def extract_values(self):
|
|
for columns in [
|
|
self.landlord_construction_age_band, self.landlord_property_type, self.landlord_built_form,
|
|
self.landlord_wall_construction, self.landlord_wall_efficiency, self.landlord_roof_construction,
|
|
self.landlord_roof_efficiency, self.landlord_floor_construction, self.landlord_windows_type,
|
|
self.landlord_windows_efficiency, self.landlord_heating_construction, self.landlord_heating_efficiency,
|
|
self.landlord_fuel_type, self.landlord_heating_controls, self.landlord_heating_controls_efficiency,
|
|
self.landlord_hot_water_system, self.landlord_hot_water_efficiency
|
|
]:
|
|
self.data[columns] = self.data[columns].progress_apply(lambda x: x.value if hasattr(x, "value") else x)
|
|
|
|
def transform(self):
|
|
# ------------ construction_age_band ------------
|
|
self.map_construction_age_band()
|
|
|
|
# ------------ property_type ------------
|
|
self.map_property_type()
|
|
|
|
# ------------ built_form ------------
|
|
self.map_built_form()
|
|
|
|
# ------------ Wall Construction ------------
|
|
self.map_wall_construction()
|
|
|
|
# ------------ Roof Construction ------------
|
|
self.map_roof_construction()
|
|
|
|
# ------------ Floor Construction ------------
|
|
self.map_floor_construction()
|
|
|
|
# ------------ Glazing ------------
|
|
self.map_glazing()
|
|
|
|
# ------------ Heating, fuel, controls & hot water ------------
|
|
self.map_heating()
|
|
|
|
# ------------ Floor Area ------------
|
|
self.map_floor_area()
|
|
|
|
# ------------ Formating ------------
|
|
self.select_columns()
|
|
self.extract_values()
|