Model/backend/onboarders/parity.py
2026-02-12 22:25:03 +00:00

371 lines
14 KiB
Python

import re
from tqdm import tqdm
import pandas as pd
from backend.onboarders.base import OnboarderBase
# Parity mappings
from backend.onboarders.mappings.parity.property_type import parity_map as property_map
from backend.onboarders.mappings.parity.age_band import parity_map as age_band_map
from backend.onboarders.mappings.parity.built_form import parity_map as built_form_map
from backend.onboarders.mappings.parity.walls import wall_map, wall_unknown_age_fallback, WALL_DESCRIPTION_EFFICIENCIES
from onboarders.mappings.parity.roof import roof_map, roof_unknown_age_fallback, resolve_roof_efficiency
from onboarders.mappings.parity.floor import floor_map
from onboarders.mappings.parity.heating import heating_map
from onboarders.mappings.parity.glazing import glazing_map
from backend.onboarders.mappings.parity.as_built_wall_classifiers import as_built_wall_classifiers
from backend.onboarders.mappings.parity.as_built_roof_classifiers import as_built_roof_classifiers
from backend.onboarders.mappings.parity.as_built_floor_classifiers import (
as_built_floor_classifiers, unknown_as_built_floor_classifiers
)
from datatypes.epc.roof import EpcRoofDescriptions
from datatypes.epc.floor import EpcFloorDescriptions
from datatypes.epc.construction_age_band import EpcConstructionAgeBand
from datatypes.epc.walls import EpcWallDescriptions
from datatypes.epc.efficiency import EpcEfficiency
tqdm.pandas()
class ParityOnboarder(OnboarderBase):
def __init__(
self,
fileuri: str,
file_format: str,
**kwargs
):
# Extract bucket, and filekey; Will be in the format s3://bucket/key
self.bucket_name = fileuri.split("/")[2]
self.input_file_name = "/".join(fileuri.split("/")[3:])
# Also prepare output file name
self.output_file_name = self.input_file_name.replace("." + file_format, "") + "_transformed.csv"
self.read_s3(file_format=file_format, **kwargs)
pass
def map_construction_age_band(self):
self.data[self.landlord_construction_age_band] = self.data["Construction Years"].map(age_band_map)
self.assert_nulls_only_from_source_nulls(
self.data, "Construction Years", self.landlord_construction_age_band
)
def map_property_type(self):
self.data[self.landlord_property_type] = self.data["Type"].map(property_map)
self.assert_no_nulls(self.data, self.landlord_property_type)
def map_built_form(self):
self.data[self.landlord_built_form] = self.data["Attachment"].map(built_form_map)
self.assert_no_nulls(self.data, self.landlord_built_form)
@staticmethod
def _fill_wall_as_built(row: pd.Series) -> EpcWallDescriptions | None:
"""
Utility function, used by map_wall_construction in parity transformation module
:param row: row of input sustainability data, being transformed
:return: EpcWallDescriptions, the as built wall description for the input row, based on the wall construction
type and age band
"""
# Already resolved via direct mapping
if row.landlord_wall_construction is not None:
return row.landlord_wall_construction
wall_type = row["Wall Construction"]
# Missing construction age → conservative fallback
if pd.isnull(row.landlord_construction_age_band):
return wall_unknown_age_fallback.get(wall_type)
classifier = as_built_wall_classifiers.get(wall_type)
if classifier is None:
return None
return classifier(row.landlord_construction_age_band)
@staticmethod
def _resolve_wall_efficiency(
description: EpcWallDescriptions,
age_band: EpcConstructionAgeBand | None,
) -> EpcEfficiency:
# Unknown / holding descriptions → efficiency unknown
if "unknown insulation" in description.value.lower():
return EpcEfficiency.NA
rule = WALL_DESCRIPTION_EFFICIENCIES.get(description)
if rule is None:
return EpcEfficiency.NA
if isinstance(rule, EpcEfficiency):
return rule
# Rule needs age band but we don't have one
if age_band is None or pd.isnull(age_band):
return EpcEfficiency.NA
return rule(age_band)
def map_wall_construction(self):
self.data[self.landlord_wall_construction] = (
self.data[["Wall Construction", "Wall Insulation"]]
.apply(tuple, axis=1)
.map(wall_map)
)
self.data[self.landlord_wall_construction] = self.data.progress_apply(self._fill_wall_as_built, axis=1)
# Sanity check
self.assert_no_nulls(self.data, self.landlord_wall_construction)
self.data[self.landlord_wall_efficiency] = self.data.progress_apply(
lambda row: self._resolve_wall_efficiency(
row.landlord_wall_construction,
row.landlord_construction_age_band,
),
axis=1,
)
# Additional santify check
self.assert_no_nulls(self.data, self.landlord_wall_efficiency)
@staticmethod
def _fill_roof_as_built(row: pd.Series) -> EpcRoofDescriptions | None:
# Already resolved
if not pd.isnull(row.landlord_roof_construction):
return row.landlord_roof_construction
roof_type = row["Roof Construction"]
classifier = as_built_roof_classifiers.get(roof_type)
if classifier is None:
raise NotImplementedError(f"No roof classifier for roof type '{roof_type}'")
if pd.isnull(row.landlord_construction_age_band):
return roof_unknown_age_fallback.get(roof_type)
output = classifier(row.landlord_construction_age_band)
if output is None:
raise NotImplementedError(
f"Roof classification returned None for roof type '{roof_type}'"
)
return output
@staticmethod
def _extract_insulation_thickness(value: str | None) -> int | None:
"""
Extract insulation thickness in mm from a string like 'mm150'.
Returns None if not present or not parseable.
"""
if value is None or pd.isnull(value):
return None
match = re.search(r"(\d+)", str(value))
if not match:
return None
return int(match.group(1))
def map_roof_construction(self):
self.data[self.landlord_roof_construction] = (
self.data[["Roof Construction", "Roof Insulation"]]
.progress_apply(tuple, axis=1)
.map(roof_map)
)
self.data[self.landlord_roof_construction] = self.data.progress_apply(
self._fill_roof_as_built,
axis=1,
)
# sanity check
self.assert_no_nulls(self.data, self.landlord_roof_construction)
self.data["roof_insulation_thickness_mm"] = self.data["Roof Insulation"].apply(
self._extract_insulation_thickness
)
self.data[self.landlord_roof_efficiency] = self.data.progress_apply(
lambda row: resolve_roof_efficiency(
description=row.landlord_roof_construction,
age_band=row.landlord_construction_age_band,
insulation_thickness=row.roof_insulation_thickness_mm,
),
axis=1,
)
# sanity check
self.assert_no_nulls(self.data, self.landlord_roof_efficiency)
# Flag sloping ceiling
self.data[self.landlord_has_sloping_ceiling] = self.data["Roof Construction"].apply(
lambda x: x == "PitchedWithSlopingCeiling"
)
@staticmethod
def _fill_floor_as_built(row: pd.Series):
# 1. Already resolved
if row.landlord_floor_construction is not None:
return row.landlord_floor_construction
age_band = row.landlord_construction_age_band
floor_type = row["Floor Construction"]
insulation = row["Floor Insulation"]
# 2. Missing age band → conservative fallback
if pd.isnull(age_band):
return EpcFloorDescriptions.unknown
# 3. Known floor types
if floor_type in ["Solid", "SuspendedTimber", "SuspendedNotTimber"]:
classifier = as_built_floor_classifiers[floor_type]
return classifier(age_band)
# 4. Unknown floor type
if floor_type == "Unknown":
classifier = unknown_as_built_floor_classifiers[insulation]
return classifier(age_band)
# 5. Truly missing / garbage input
return EpcFloorDescriptions.unknown
def map_floor_construction(self):
self.data[self.landlord_floor_construction] = (
self.data[["Floor Construction", "Floor Insulation"]]
.progress_apply(tuple, axis=1)
.map(floor_map)
)
self.data[self.landlord_floor_construction] = self.data.progress_apply(
self._fill_floor_as_built,
axis=1,
)
self.assert_no_nulls(self.data, self.landlord_floor_construction)
def map_glazing(self):
# TODO: probably doesn't make sense to store multi glazed proportion, glazed type or glazed area.
# There is maybe an argument for landlord_multi_glaze_proportion as this could be variable,
# however
self.data[
[
self.landlord_windows_type,
self.landlord_windows_efficiency,
self.landlord_multi_glaze_proportion,
self.landlord_glazed_type,
self.landlord_glazed_area
]
] = self.data["Glazing"].map(glazing_map).progress_apply(pd.Series)
def map_heating(self):
# TODO - when mapping heating controls, we should check the existing heating controls and the efficiency rating
# For sub optimal heating controls, we're going to make an assumption as to what the heating controls are
# and the energy efficiency rating we prescribe here may not be accurate. We therefore use this as an
# upper limit
# as opposed to a guaranteed efficiency rating. To stress, this is only relevant for sub optimal heating
# controls. E.g. it may be programmer and room thermostat
self.data[
[
self.landlord_heating_system,
self.landlord_heating_efficiency,
self.landlord_fuel_type,
self.landlord_heating_controls,
self.landlord_heating_controls_efficiency,
self.landlord_hot_water_system,
self.landlord_hot_water_efficiency
]
] = self.data[
[
"Heating",
"Boiler Efficiency",
"Main Fuel",
"Controls Adequacy"
]
].progress_apply(tuple, axis=1).map(heating_map).progress_apply(pd.Series)
def map_floor_area(self):
# This is just a rename
self.data = self.data.rename(
columns={"Total Floor Area (m2)": self.landlord_total_floor_area_m2}
)
def select_columns(self):
self.data = self.data[
[
"Org Ref",
"UPRN",
"Address 1",
"Address 2",
"Address 3",
"Postcode",
self.landlord_total_floor_area_m2,
self.landlord_construction_age_band,
self.landlord_property_type,
self.landlord_built_form,
self.landlord_wall_construction,
self.landlord_wall_efficiency,
self.landlord_roof_construction,
self.landlord_roof_efficiency,
self.landlord_has_sloping_ceiling,
self.landlord_floor_construction,
self.landlord_windows_type,
self.landlord_windows_efficiency,
self.landlord_multi_glaze_proportion,
self.landlord_glazed_type,
self.landlord_glazed_area,
self.landlord_heating_system,
self.landlord_heating_efficiency,
self.landlord_fuel_type,
self.landlord_heating_controls,
self.landlord_heating_controls_efficiency,
self.landlord_hot_water_system,
self.landlord_hot_water_efficiency
]
].rename(
columns={
"Org Ref": "landlord_property_id",
"Address1": "address1",
"Address2": "address2",
"Address3": "address3",
"Postcode": "postcode",
}
)
def extract_values(self):
for columns in [
self.landlord_construction_age_band, self.landlord_property_type, self.landlord_built_form,
self.landlord_wall_construction, self.landlord_wall_efficiency, self.landlord_roof_construction,
self.landlord_roof_efficiency, self.landlord_floor_construction, self.landlord_windows_type,
self.landlord_windows_efficiency, self.landlord_heating_system, self.landlord_heating_efficiency,
self.landlord_fuel_type, self.landlord_heating_controls, self.landlord_heating_controls_efficiency,
self.landlord_hot_water_system, self.landlord_hot_water_efficiency
]:
self.data[columns] = self.data[columns].progress_apply(lambda x: x.value if hasattr(x, "value") else x)
def transform(self):
# ------------ construction_age_band ------------
self.map_construction_age_band()
# ------------ property_type ------------
self.map_property_type()
# ------------ built_form ------------
self.map_built_form()
# ------------ Wall Construction ------------
self.map_wall_construction()
# ------------ Roof Construction ------------
self.map_roof_construction()
# ------------ Floor Construction ------------
self.map_floor_construction()
# ------------ Glazing ------------
self.map_glazing()
# ------------ Heating, fuel, controls & hot water ------------
self.map_heating()
# ------------ Floor Area ------------
self.map_floor_area()
# ------------ Formating ------------
self.select_columns()
self.extract_values()