moved landlord remapping to epc record class

This commit is contained in:
Khalim Conn-Kowlessar 2026-03-09 16:57:13 +00:00
parent 0753584655
commit 8070168715
2 changed files with 230 additions and 147 deletions

View file

@ -824,7 +824,9 @@ async def model_engine(body: PlanTriggerRequest):
epc_records = patch_epc(patch, epc_records)
prepared_epc = EPCRecord(epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data)
prepared_epc = EPCRecord(
epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data, address_metadata=addr
)
# TODO: This is a temp function to handle a specific edge case with Peabody. We should
# factor this into EPCRecord as part of the cleaning however we need some more testing
@ -891,9 +893,10 @@ async def model_engine(body: PlanTriggerRequest):
)
logger.info("Reading in materials and cleaned datasets")
cleaned = get_cleaned()
with db_read_session() as session:
materials = db_funcs.materials_functions.get_materials(session)
cleaned = get_cleaned()
# Rebaselining
# TODO: MUST happen before setting features
@ -903,55 +906,55 @@ async def model_engine(body: PlanTriggerRequest):
# 2) Missing EPC
# 3) Materially different information from landlord vs EPC
# make the landlord remapping dictionary
addr = [a for a in addresses if a.uprn == p.uprn][0]
addr = next((a for a in addresses if a.uprn == p.uprn), None)
if addr is None:
raise ValueError("Could not find address for property with UPRN: %s", p.uprn)
landlord_remapping = {
"total-floor-area": addr.landlord_total_floor_area_m2, # 1m tolerance on floor area to perform remap
"property-type": addr.landlord_property_type,
"built-form": addr.landlord_built_form,
"total_floor_area": addr.landlord_total_floor_area_m2, # 1m tolerance on floor area to perform remap
"property_type": addr.landlord_property_type,
"built_form": addr.landlord_built_form,
# Components
"walls-description": addr.landlord_wall_construction,
"roof-description": addr.landlord_roof_construction,
"floor-description": addr.landlord_floor_construction,
"windows-description": addr.landlord_windows_type,
"main-fuel": addr.landlord_fuel_type,
"mainheat-description": addr.landlord_heating_system,
"mainheatcont-description": addr.landlord_heating_controls,
"hotwater-description": addr.landlord_hot_water_system,
"walls_description": addr.landlord_wall_construction,
"roof_description": addr.landlord_roof_construction,
"floor_description": addr.landlord_floor_construction,
"windows_description": addr.landlord_windows_type,
"main_fuel": addr.landlord_fuel_type,
"mainheat_description": addr.landlord_heating_system,
"mainheatcont_description": addr.landlord_heating_controls,
"hotwater_description": addr.landlord_hot_water_system,
# Efficiency
"walls-energy-eff": addr.landlord_wall_efficiency,
"roof-energy-eff": addr.landlord_roof_efficiency,
"windows-energy-eff": addr.landlord_windows_efficiency,
"mainheat-energy-eff": addr.landlord_heating_efficiency,
"mainheatc-energy-eff": addr.landlord_heating_controls_efficiency,
"hot-water-energy-eff": addr.landlord_hot_water_efficiency,
"multi-glaze-proportion": addr.landlord_multi_glaze_proportion * 100, # TODO: Fix this!
"construction-age-band": addr.landlord_construction_age_band,
"walls_energy_eff": addr.landlord_wall_efficiency,
"roof_energy_eff": addr.landlord_roof_efficiency,
"windows_energy_eff": addr.landlord_windows_efficiency,
"mainheat_energy_eff": addr.landlord_heating_efficiency,
"mainheatc_energy_eff": addr.landlord_heating_controls_efficiency,
"hot_water_energy_eff": addr.landlord_hot_water_efficiency,
"multi_glaze_proportion": addr.landlord_multi_glaze_proportion * 100, # TODO: Fix this!
"construction_age_band": addr.landlord_construction_age_band,
}
# Find differences between EPC and landlord data
differences = {}
for k, v in landlord_remapping.items():
if k == "total-floor-area":
if abs(p.data[k] - v) > 1: # 1m tolerance
if k == "total_floor_area":
if abs(p.epc_record.prepared_epc.get(k) - v) > 1: # 1m tolerance
differences[k] = v
else:
if v != p.data[k] and (not pd.isnull(v)) and (not pd.isnull(p.data[k])):
if v != p.epc_record.get(k) and (not pd.isnull(v)) and (not pd.isnull(p.epc_record.get(k))):
differences[k] = v
needs_rebaselining = p.epc_is_expired | p.epc_is_estimated | len(differences) > 0
needs_rebaselining = p.epc_is_expired | p.epc_is_estimated | (len(differences) > 0)
p.epc_record.update(differences)
# Need to adjust p.data and p.epc_record.df?
if needs_rebaselining:
if len(differences):
p.data.update(differences)
differences_underscored = {k.replace("-", "_"): v for k, v in differences.items()}
# Insert
for k, v in differences_underscored.items():
if not hasattr(p.epc_record, k) and k not in ["property_type", "built_form"]:
# Sanity check - while we're implementing
raise ValueError("Property does not have an EPC record to update with differences")
# Hack but these aren't in the data class
if k not in ["property_type", "built_form"]:
setattr(p.epc_record, k, v)
# Insert into prepared_epc
for k, v in differences.items():
p.epc_record.prepared_epc[k] = v
p.create_base_difference_epc_record(cleaned_lookup=cleaned)

View file

@ -1,4 +1,6 @@
from warnings import deprecated
from typing import Optional, get_origin, get_args, TypedDict, cast, TypeAlias
from backend.addresses.Address import Address
from dataclasses import fields
from datetime import datetime
from dataclasses import dataclass
@ -240,11 +242,14 @@ class EPCRecord:
# ------------------------------------------------------------------
epc_records: Optional[InputEpcRecords] = None
address_metadata: Optional[Address] = None
# Raw EPC input (immutable)
original_epc: Optional[RawEpcRow] = None
# Working dictionary that gets cleaned
prepared_epc: Optional[PreparedEpcRow] = None
_prepared_epc: Optional[PreparedEpcRow] = None
# Record of differences applied by landlord data
landlord_differences: Optional[dict[str, PreparedEpcValue]] = None
# Supporting
full_sap_epc: Optional[RawEpcRow] = None
@ -280,7 +285,7 @@ class EPCRecord:
self.original_epc = self.epc_records["original_epc"].copy()
# Working copy that we will clean and manipulate
self.prepared_epc = self.epc_records["original_epc"].copy()
self._prepared_epc = self.epc_records["original_epc"].copy()
self.full_sap_epc = self.epc_records["full_sap_epc"]
self.old_data = self.epc_records["old_data"]
@ -290,11 +295,67 @@ class EPCRecord:
self._clean_records_using_epc_records()
self._clean_with_data_processor()
self._inject_address_metadata()
self._expand_prepared_epc_to_attributes()
self._identify_delta_between_prepared_and_original_records()
return
def _inject_address_metadata(self):
"""
Given metadata about an address, provided by the landlord on input, this method will inject it into the prepared
EPC record, to allow it to be used in cleaning and processing steps. This is particularly useful for cleaning
missing or anomalous location data, by using other location data provided by the landlord.
:return:
"""
addr = self.address_metadata
if addr is None:
# We don't always have address metadata and so we don't inject if it's not there
return
landlord_remapping = {
"total_floor_area": addr.landlord_total_floor_area_m2, # 1m tolerance on floor area to perform remap
"property_type": addr.landlord_property_type,
"built_form": addr.landlord_built_form,
# Components
"walls_description": addr.landlord_wall_construction,
"roof_description": addr.landlord_roof_construction,
"floor_description": addr.landlord_floor_construction,
"windows_description": addr.landlord_windows_type,
"main_fuel": addr.landlord_fuel_type,
"mainheat_description": addr.landlord_heating_system,
"mainheatcont_description": addr.landlord_heating_controls,
"hotwater_description": addr.landlord_hot_water_system,
# Efficiency
"walls_energy_eff": addr.landlord_wall_efficiency,
"roof_energy_eff": addr.landlord_roof_efficiency,
"windows_energy_eff": addr.landlord_windows_efficiency,
"mainheat_energy_eff": addr.landlord_heating_efficiency,
"mainheatc_energy_eff": addr.landlord_heating_controls_efficiency,
"hot_water_energy_eff": addr.landlord_hot_water_efficiency,
"multi_glaze_proportion": addr.landlord_multi_glaze_proportion * 100, # TODO: Fix this!
"construction_age_band": addr.landlord_construction_age_band,
}
# Saniry check - ensure valid keys
if any(k for k in landlord_remapping.keys() if k not in self._prepared_epc):
raise ValueError("Landlord remapping contains keys that are not in the EPC record")
self.landlord_differences = {} # Anything actaully changed
for k, v in landlord_remapping.items():
if k == "total_floor_area":
if abs(self._prepared_epc.get(k) - v) > 1: # 1m tolerance
self.landlord_differences[k] = v
else:
if v != self._prepared_epc.get(k) and (not pd.isnull(v)) and (not pd.isnull(self._prepared_epc.get(k))):
self.landlord_differences[k] = v
self.prepared_epc.update(self.landlord_differences)
@staticmethod
def _calculate_days_to(lodgement_date: Union[str, pd.Series]) -> Union[int, pd.Series]:
if isinstance(lodgement_date, str):
@ -319,7 +380,7 @@ class EPCRecord:
record = epc_data_processor.data.to_dict(orient="records")[0]
self.prepared_epc = cast(RawEpcRow, record)
self._prepared_epc = cast(RawEpcRow, record)
@staticmethod
def _cast_value(value: PreparedEpcValue, type_hint: Any) -> PreparedEpcValue:
@ -354,7 +415,7 @@ class EPCRecord:
field_map = {f.name: f for f in fields(self)}
for key, value in self.prepared_epc.items():
for key, value in self._prepared_epc.items():
# Enforce schema consistency
if "-" in key:
@ -439,44 +500,44 @@ class EPCRecord:
def _clean_floor_height(self) -> None:
"""Remaps anomalies in floor height to the average floor height for the property type"""
floor_height_data = self.cleaning_data[
(self.cleaning_data["property_type"] == self.prepared_epc["property-type"])
& (self.cleaning_data["built_form"] == self.prepared_epc["built-form"])
(self.cleaning_data["property_type"] == self._prepared_epc["property-type"])
& (self.cleaning_data["built_form"] == self._prepared_epc["built-form"])
]
average = float(np.mean(floor_height_data["floor_height"]))
sd = float(np.std(floor_height_data["floor_height"]))
# If we're in the top 0.5 percentile of floor heights, we'll set it to the average
if self.prepared_epc["floor-height"] > average + 10 * sd:
self.prepared_epc["floor-height"] = average
if self.prepared_epc["floor-height"] <= 1.665:
self.prepared_epc["floor-height"] = average
if self._prepared_epc["floor-height"] > average + 10 * sd:
self._prepared_epc["floor-height"] = average
if self._prepared_epc["floor-height"] <= 1.665:
self._prepared_epc["floor-height"] = average
def _clean_new_build_descriptions(self) -> None:
for col in ["roof-description", "walls-description", "floor-description"]:
self.prepared_epc[col] = self.prepared_epc[col].replace("W/m²K", "W/m-¦K")
self._prepared_epc[col] = self._prepared_epc[col].replace("W/m²K", "W/m-¦K")
def _clean_constituency(self) -> None:
"""
We handle the single case of finding a missing constituency by using the local authority
"""
if pd.isnull(self.prepared_epc["constituency"]) or (
self.prepared_epc["constituency"] == ""
if pd.isnull(self._prepared_epc["constituency"]) or (
self._prepared_epc["constituency"] == ""
):
if self.prepared_epc["local-authority"] != "E06000044":
if self._prepared_epc["local-authority"] != "E06000044":
raise NotImplementedError(
"This function is only implemented for Portsmouth, in the single edgecase seen"
)
self.prepared_epc["constituency"] = "E14000883"
self._prepared_epc["constituency"] = "E14000883"
def _clean_floor_level(self) -> None:
"""
This method will clean the floor level, if empty or invalid
"""
if not self.prepared_epc:
if not self._prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["floor-level"] = (
FLOOR_LEVEL_MAP[self.prepared_epc["floor-level"]]
if self.prepared_epc["floor-level"] not in DATA_ANOMALY_MATCHES
self._prepared_epc["floor-level"] = (
FLOOR_LEVEL_MAP[self._prepared_epc["floor-level"]]
if self._prepared_epc["floor-level"] not in DATA_ANOMALY_MATCHES
else None
)
@ -484,10 +545,10 @@ class EPCRecord:
"""
This method will clean the number of lighting outlets, if empty or invalid
"""
if not self.prepared_epc:
if not self._prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
if self.prepared_epc["fixed-lighting-outlets-count"] in DATA_ANOMALY_MATCHES:
if self._prepared_epc["fixed-lighting-outlets-count"] in DATA_ANOMALY_MATCHES:
# We check old EPCs and the full SAP EPC
lighting_data = []
@ -508,7 +569,7 @@ class EPCRecord:
)
if lighting_data:
self.prepared_epc["fixed-lighting-outlets-count"] = round(
self._prepared_epc["fixed-lighting-outlets-count"] = round(
np.median(lighting_data)
)
else:
@ -533,11 +594,12 @@ class EPCRecord:
"LOCAL_AUTHORITY",
],
)
self.prepared_epc["fixed-lighting-outlets-count"] = round(
self._prepared_epc["fixed-lighting-outlets-count"] = round(
cleaned_property_data["FIXED_LIGHTING_OUTLETS_COUNT"].values[0]
)
else:
self.prepared_epc["fixed-lighting-outlets-count"] = float(self.prepared_epc["fixed-lighting-outlets-count"])
self._prepared_epc["fixed-lighting-outlets-count"] = float(
self._prepared_epc["fixed-lighting-outlets-count"])
def _filter_property_dimensions(self, property_dimensions) -> pd.Series:
"""
@ -547,7 +609,7 @@ class EPCRecord:
"""
result = property_dimensions[
(property_dimensions["PROPERTY_TYPE"] == self.prepared_epc["property-type"])
(property_dimensions["PROPERTY_TYPE"] == self._prepared_epc["property-type"])
]
if self.construction_age_band not in DATA_ANOMALY_MATCHES:
@ -556,10 +618,10 @@ class EPCRecord:
]
if (
self.prepared_epc["built-form"] not in DATA_ANOMALY_MATCHES
and self.prepared_epc["built-form"] in result["BUILT_FORM"]
self._prepared_epc["built-form"] not in DATA_ANOMALY_MATCHES
and self._prepared_epc["built-form"] in result["BUILT_FORM"]
):
result = result[(result["BUILT_FORM"] == self.prepared_epc["built-form"])]
result = result[(result["BUILT_FORM"] == self._prepared_epc["built-form"])]
return result[
[
@ -575,102 +637,102 @@ class EPCRecord:
Cleans up the number of floors, number of habitable rooms, and the floor height
"""
if not self.prepared_epc:
if not self._prepared_epc:
raise ValueError("EPC Record doesn not contain epc data")
if (
(self.prepared_epc["number-habitable-rooms"] in DATA_ANOMALY_MATCHES)
or (self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES)
or (self.prepared_epc["number-heated-rooms"] in DATA_ANOMALY_MATCHES)
(self._prepared_epc["number-habitable-rooms"] in DATA_ANOMALY_MATCHES)
or (self._prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES)
or (self._prepared_epc["number-heated-rooms"] in DATA_ANOMALY_MATCHES)
):
# TODO - this probably shouldn't live here - but we only need to use this for specific properties
# when we meet this condition
property_dimensions: pd.DataFrame = read_dataframe_from_s3_parquet(
bucket_name=DATA_BUCKET,
file_key=f"property_dimensions/{self.prepared_epc['local-authority']}.parquet",
file_key=f"property_dimensions/{self._prepared_epc['local-authority']}.parquet",
)
self.property_dimensions: pd.Series = self._filter_property_dimensions(
property_dimensions
)
if self.prepared_epc["number-habitable-rooms"] in DATA_ANOMALY_MATCHES:
self.prepared_epc["number-habitable-rooms"] = float(
if self._prepared_epc["number-habitable-rooms"] in DATA_ANOMALY_MATCHES:
self._prepared_epc["number-habitable-rooms"] = float(
self.property_dimensions["NUMBER_HABITABLE_ROOMS"].round()
)
else:
self.prepared_epc["number-habitable-rooms"] = float(
self.prepared_epc["number-habitable-rooms"]
self._prepared_epc["number-habitable-rooms"] = float(
self._prepared_epc["number-habitable-rooms"]
)
if self.prepared_epc["number-heated-rooms"] in DATA_ANOMALY_MATCHES:
self.prepared_epc["number-heated-rooms"] = float(
if self._prepared_epc["number-heated-rooms"] in DATA_ANOMALY_MATCHES:
self._prepared_epc["number-heated-rooms"] = float(
self.property_dimensions["NUMBER_HEATED_ROOMS"].round()
)
else:
self.prepared_epc["number-heated-rooms"] = float(
self.prepared_epc["number-heated-rooms"]
self._prepared_epc["number-heated-rooms"] = float(
self._prepared_epc["number-heated-rooms"]
)
self.number_of_floors = estimate_number_of_floors(
self.prepared_epc["property-type"]
self._prepared_epc["property-type"]
)
if (
self.prepared_epc["floor-height"] == ""
or self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES
self._prepared_epc["floor-height"] == ""
or self._prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES
):
self.prepared_epc["floor-height"] = float(
self._prepared_epc["floor-height"] = float(
self.property_dimensions["FLOOR_HEIGHT"].round(2)
)
else:
self.prepared_epc["floor-height"] = float(self.prepared_epc["floor-height"])
self._prepared_epc["floor-height"] = float(self._prepared_epc["floor-height"])
def _clean_floor_area(self) -> None:
"""
This method will clean the floor area, if empty or invalid
"""
if not self.prepared_epc:
if not self._prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
if self.prepared_epc["total-floor-area"] is None:
if self._prepared_epc["total-floor-area"] is None:
return
self.prepared_epc["total-floor-area"] = float(
self.prepared_epc["total-floor-area"]
self._prepared_epc["total-floor-area"] = float(
self._prepared_epc["total-floor-area"]
)
# We handle the edge case of floor area being 0. We set it to zero and it is cleaned by
# _clean_with_data_processor
if self.prepared_epc["total-floor-area"] == 0:
if self._prepared_epc["total-floor-area"] == 0:
print(
"Edge case of floor area being zero - will set to none and will be cleaned in "
"_clean_with_data_processor"
)
self.prepared_epc["total-floor-area"] = None
self._prepared_epc["total-floor-area"] = None
def _clean_mains_gas(self) -> None:
"""
This method will clean the mains gas, if empty or invalid
"""
if not self.prepared_epc:
if not self._prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
mains_gas_map = {"Y": True, "N": False, True: True, False: False}
self.prepared_epc["mains-gas-flag"] = (
self._prepared_epc["mains-gas-flag"] = (
None
if (
self.prepared_epc["mains-gas-flag"] == ""
or self.prepared_epc["mains-gas-flag"] in DATA_ANOMALY_MATCHES
self._prepared_epc["mains-gas-flag"] == ""
or self._prepared_epc["mains-gas-flag"] in DATA_ANOMALY_MATCHES
)
else mains_gas_map[self.prepared_epc["mains-gas-flag"]]
else mains_gas_map[self._prepared_epc["mains-gas-flag"]]
)
def _clean_heat_loss_corridor(self) -> None:
"""
This method will clean the heat loss corridor, if empty or invalid
"""
if not self.prepared_epc:
if not self._prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
valid_values = ["no corridor", "unheated corridor", "heated corridor"]
@ -681,30 +743,30 @@ class EPCRecord:
"heated corridor": False,
}
self.prepared_epc["heat-loss-corridor"] = (
self._prepared_epc["heat-loss-corridor"] = (
"no corridor"
if self.prepared_epc["heat-loss-corridor"] in DATA_ANOMALY_MATCHES
else self.prepared_epc["heat-loss-corridor"]
if self._prepared_epc["heat-loss-corridor"] in DATA_ANOMALY_MATCHES
else self._prepared_epc["heat-loss-corridor"]
)
if self.prepared_epc["heat-loss-corridor"] not in valid_values:
self.prepared_epc["heat-loss-corridor"] = "no corridor"
if self._prepared_epc["heat-loss-corridor"] not in valid_values:
self._prepared_epc["heat-loss-corridor"] = "no corridor"
self.prepared_epc["unheated-corridor-length"] = (
float(self.prepared_epc["unheated-corridor-length"])
if self.prepared_epc["unheated-corridor-length"] not in ["", None]
self._prepared_epc["unheated-corridor-length"] = (
float(self._prepared_epc["unheated-corridor-length"])
if self._prepared_epc["unheated-corridor-length"] not in ["", None]
else None
)
# We create boolean versions of heat-loss-corridor
self.heat_loss_corridor_bool = boolean_map[
self.prepared_epc["heat-loss-corridor"]
self._prepared_epc["heat-loss-corridor"]
]
def _clean_count_variables(self) -> None:
"""
This method will clean the count variables, if empty or invalid
"""
if not self.prepared_epc:
if not self._prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
_fields = [
@ -717,7 +779,7 @@ class EPCRecord:
null_attributes = ["flat-storey-count", "number-habitable-rooms"]
for attribute in _fields:
value = self.prepared_epc[attribute]
value = self._prepared_epc[attribute]
if value in DATA_ANOMALY_MATCHES or pd.isnull(value):
if attribute in null_attributes:
value = None
@ -726,7 +788,7 @@ class EPCRecord:
else:
value = int(float(value))
self.prepared_epc[attribute] = value
self._prepared_epc[attribute] = value
def _clean_wind_turbine(self) -> None:
"""
@ -745,7 +807,7 @@ class EPCRecord:
"""
This method will clean the solar hot water, if empty or invalid
"""
if not self.prepared_epc:
if not self._prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
value_map = {"Y": "Y", "N": "N", "": "N", None: "N"}
@ -755,25 +817,25 @@ class EPCRecord:
"N": False,
}
self.prepared_epc["solar-water-heating-flag"] = value_map[
self.prepared_epc["solar-water-heating-flag"]
self._prepared_epc["solar-water-heating-flag"] = value_map[
self._prepared_epc["solar-water-heating-flag"]
]
# Create a boolean version for storage in the database
self.solar_water_heating_flag_bool = boolean_map[
self.prepared_epc["solar-water-heating-flag"]
self._prepared_epc["solar-water-heating-flag"]
]
def _clean_solar_pv(self) -> None:
"""
This method will clean the solar pv, if empty or invalid
"""
if not self.prepared_epc:
if not self._prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["photo-supply"] = (
float(self.prepared_epc["photo-supply"])
if (self.prepared_epc["photo-supply"] not in DATA_ANOMALY_MATCHES)
self._prepared_epc["photo-supply"] = (
float(self._prepared_epc["photo-supply"])
if (self._prepared_epc["photo-supply"] not in DATA_ANOMALY_MATCHES)
else None
)
@ -781,43 +843,43 @@ class EPCRecord:
"""
This method will clean the energy, if empty or invalid
"""
if not self.prepared_epc:
if not self._prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["energy-consumption-current"] = float(
self.prepared_epc["energy-consumption-current"]
self._prepared_epc["energy-consumption-current"] = float(
self._prepared_epc["energy-consumption-current"]
)
self.prepared_epc["co2-emissions-current"] = float(
self.prepared_epc["co2-emissions-current"]
self._prepared_epc["co2-emissions-current"] = float(
self._prepared_epc["co2-emissions-current"]
)
def _clean_built_form(self) -> None:
"""
This method will clean the build form, if empty or invalid
"""
if not self.prepared_epc:
if not self._prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
if self.prepared_epc["built-form"] in DATA_ANOMALY_MATCHES:
if self.prepared_epc["property-type"] in ["Flat", "Maisonette"]:
self.prepared_epc["built-form"] = "End-Terrace"
if self._prepared_epc["built-form"] in DATA_ANOMALY_MATCHES:
if self._prepared_epc["property-type"] in ["Flat", "Maisonette"]:
self._prepared_epc["built-form"] = "End-Terrace"
else:
self.prepared_epc["built-form"] = "Semi-Detached"
self._prepared_epc["built-form"] = "Semi-Detached"
def _clean_age_band(self) -> None:
"""
This method will clean the age band, if empty or invalid
"""
if not self.prepared_epc:
if not self._prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["construction-age-band"] = (
self._prepared_epc["construction-age-band"] = (
EPCDataProcessor.clean_construction_age_band(
self.prepared_epc["construction-age-band"]
self._prepared_epc["construction-age-band"]
)
)
if self.prepared_epc["construction-age-band"] in DATA_ANOMALY_MATCHES:
if self._prepared_epc["construction-age-band"] in DATA_ANOMALY_MATCHES:
if self.old_data:
# Take the most recent
old_age_bands = [
@ -835,26 +897,26 @@ class EPCRecord:
if old_record["lodgement-datetime"] == max_datetime
]
self.prepared_epc["construction-age-band"] = (
self._prepared_epc["construction-age-band"] = (
EPCDataProcessor.clean_construction_age_band(
most_recent[0]["construction-age-band"]
)
)
self.construction_age_band = self.prepared_epc["construction-age-band"]
self.construction_age_band = self._prepared_epc["construction-age-band"]
self.age_band = england_wales_age_band_lookup.get(self.construction_age_band)
if (self.prepared_epc["transaction-type"] == "new dwelling") and (
if (self._prepared_epc["transaction-type"] == "new dwelling") and (
self.age_band is None
):
self.age_band = "L"
self.construction_age_band = "England and Wales: 2012 onwards"
self.prepared_epc["construction-age-band"] = self.construction_age_band
self._prepared_epc["construction-age-band"] = self.construction_age_band
if self.age_band is None:
self.age_band = "C"
self.construction_age_band = "England and Wales: 1930-1949"
self.prepared_epc["construction-age-band"] = self.construction_age_band
self._prepared_epc["construction-age-band"] = self.construction_age_band
def _clean_year_built(self) -> None:
"""
@ -1044,27 +1106,45 @@ class EPCRecord:
def get(
self,
key: Union[str, List[str]],
key: str | list[str],
return_asdict: bool = False,
key_suffix: str | None = None,
) -> PreparedEpcValue | list[PreparedEpcValue] | dict[str, PreparedEpcValue]:
"""
This method will return the value of the key
Retrieves the value(s) for the specified key(s) from the prepared EPC data.
:param key: A single key (str) or a list of keys (list[str]) to retrieve values for.
:param return_asdict: If True and key is a list, returns a dictionary of key-value pairs instead of a list of
values.
:param key_suffix: An optional suffix to append to each key in the returned dictionary when return_asdict is
True.
:return: The value(s) corresponding to the specified key(s). Returns a single value if key is a string,
a list of values if key is a list and return_asdict is False, or a dictionary of key-value pairs if key is a
list and return_asdict is True.
"""
if return_asdict:
output_dict = {
x: self.__dict__[x] if x in self.__dict__.keys() else None for x in key
}
if key_suffix is not None:
output_dict = {f"{x}{key_suffix}": y for x, y in output_dict.items()}
return output_dict
source = self.prepared_epc if self.prepared_epc is not None else self.__dict__
if isinstance(key, str):
return source.get(key)
if isinstance(key, list):
return [
self.__dict__[x] if x in self.__dict__.keys() else None for x in key
]
elif isinstance(key, str):
return self.__dict__[key] if key in self.__dict__.keys() else None
if return_asdict:
result = {k: source.get(k) for k in key}
if key_suffix:
result = {f"{k}{key_suffix}": v for k, v in result.items()}
return result
return [source.get(k) for k in key]
raise TypeError(f"Key {key} is not a recognised type")
@property
def prepared_epc(self):
return self._prepared_epc
class EPCDifferenceRecord: