From 8070168715e210cb858bd5958afb63dd7414d159 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Mon, 9 Mar 2026 16:57:13 +0000 Subject: [PATCH] moved landlord remapping to epc record class --- backend/engine/engine.py | 75 +++++----- etl/epc/Record.py | 302 +++++++++++++++++++++++++-------------- 2 files changed, 230 insertions(+), 147 deletions(-) diff --git a/backend/engine/engine.py b/backend/engine/engine.py index 98db7b88..45a3f5e6 100644 --- a/backend/engine/engine.py +++ b/backend/engine/engine.py @@ -824,7 +824,9 @@ async def model_engine(body: PlanTriggerRequest): epc_records = patch_epc(patch, epc_records) - prepared_epc = EPCRecord(epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data) + prepared_epc = EPCRecord( + epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data, address_metadata=addr + ) # TODO: This is a temp function to handle a specific edge case with Peabody. We should # factor this into EPCRecord as part of the cleaning however we need some more testing @@ -891,9 +893,10 @@ async def model_engine(body: PlanTriggerRequest): ) logger.info("Reading in materials and cleaned datasets") + cleaned = get_cleaned() + with db_read_session() as session: materials = db_funcs.materials_functions.get_materials(session) - cleaned = get_cleaned() # Rebaselining # TODO: MUST happen before setting features @@ -903,55 +906,55 @@ async def model_engine(body: PlanTriggerRequest): # 2) Missing EPC # 3) Materially different information from landlord vs EPC # make the landlord remapping dictionary - addr = [a for a in addresses if a.uprn == p.uprn][0] + addr = next((a for a in addresses if a.uprn == p.uprn), None) + if addr is None: + raise ValueError("Could not find address for property with UPRN: %s", p.uprn) + landlord_remapping = { - "total-floor-area": addr.landlord_total_floor_area_m2, # 1m tolerance on floor area to perform remap - "property-type": addr.landlord_property_type, - "built-form": addr.landlord_built_form, + "total_floor_area": addr.landlord_total_floor_area_m2, # 1m tolerance on floor area to perform remap + "property_type": addr.landlord_property_type, + "built_form": addr.landlord_built_form, + # Components - "walls-description": addr.landlord_wall_construction, - "roof-description": addr.landlord_roof_construction, - "floor-description": addr.landlord_floor_construction, - "windows-description": addr.landlord_windows_type, - "main-fuel": addr.landlord_fuel_type, - "mainheat-description": addr.landlord_heating_system, - "mainheatcont-description": addr.landlord_heating_controls, - "hotwater-description": addr.landlord_hot_water_system, + "walls_description": addr.landlord_wall_construction, + "roof_description": addr.landlord_roof_construction, + "floor_description": addr.landlord_floor_construction, + "windows_description": addr.landlord_windows_type, + "main_fuel": addr.landlord_fuel_type, + "mainheat_description": addr.landlord_heating_system, + "mainheatcont_description": addr.landlord_heating_controls, + "hotwater_description": addr.landlord_hot_water_system, + # Efficiency - "walls-energy-eff": addr.landlord_wall_efficiency, - "roof-energy-eff": addr.landlord_roof_efficiency, - "windows-energy-eff": addr.landlord_windows_efficiency, - "mainheat-energy-eff": addr.landlord_heating_efficiency, - "mainheatc-energy-eff": addr.landlord_heating_controls_efficiency, - "hot-water-energy-eff": addr.landlord_hot_water_efficiency, - "multi-glaze-proportion": addr.landlord_multi_glaze_proportion * 100, # TODO: Fix this! - "construction-age-band": addr.landlord_construction_age_band, + "walls_energy_eff": addr.landlord_wall_efficiency, + "roof_energy_eff": addr.landlord_roof_efficiency, + "windows_energy_eff": addr.landlord_windows_efficiency, + "mainheat_energy_eff": addr.landlord_heating_efficiency, + "mainheatc_energy_eff": addr.landlord_heating_controls_efficiency, + "hot_water_energy_eff": addr.landlord_hot_water_efficiency, + + "multi_glaze_proportion": addr.landlord_multi_glaze_proportion * 100, # TODO: Fix this! + "construction_age_band": addr.landlord_construction_age_band, } # Find differences between EPC and landlord data differences = {} for k, v in landlord_remapping.items(): - if k == "total-floor-area": - if abs(p.data[k] - v) > 1: # 1m tolerance + if k == "total_floor_area": + if abs(p.epc_record.prepared_epc.get(k) - v) > 1: # 1m tolerance differences[k] = v else: - if v != p.data[k] and (not pd.isnull(v)) and (not pd.isnull(p.data[k])): + if v != p.epc_record.get(k) and (not pd.isnull(v)) and (not pd.isnull(p.epc_record.get(k))): differences[k] = v - needs_rebaselining = p.epc_is_expired | p.epc_is_estimated | len(differences) > 0 + needs_rebaselining = p.epc_is_expired | p.epc_is_estimated | (len(differences) > 0) + + p.epc_record.update(differences) # Need to adjust p.data and p.epc_record.df? if needs_rebaselining: if len(differences): - p.data.update(differences) - differences_underscored = {k.replace("-", "_"): v for k, v in differences.items()} - # Insert - for k, v in differences_underscored.items(): - if not hasattr(p.epc_record, k) and k not in ["property_type", "built_form"]: - # Sanity check - while we're implementing - raise ValueError("Property does not have an EPC record to update with differences") - # Hack but these aren't in the data class - if k not in ["property_type", "built_form"]: - setattr(p.epc_record, k, v) + # Insert into prepared_epc + for k, v in differences.items(): p.epc_record.prepared_epc[k] = v p.create_base_difference_epc_record(cleaned_lookup=cleaned) diff --git a/etl/epc/Record.py b/etl/epc/Record.py index bebddf9b..89e33cd8 100644 --- a/etl/epc/Record.py +++ b/etl/epc/Record.py @@ -1,4 +1,6 @@ +from warnings import deprecated from typing import Optional, get_origin, get_args, TypedDict, cast, TypeAlias +from backend.addresses.Address import Address from dataclasses import fields from datetime import datetime from dataclasses import dataclass @@ -240,11 +242,14 @@ class EPCRecord: # ------------------------------------------------------------------ epc_records: Optional[InputEpcRecords] = None + address_metadata: Optional[Address] = None # Raw EPC input (immutable) original_epc: Optional[RawEpcRow] = None # Working dictionary that gets cleaned - prepared_epc: Optional[PreparedEpcRow] = None + _prepared_epc: Optional[PreparedEpcRow] = None + # Record of differences applied by landlord data + landlord_differences: Optional[dict[str, PreparedEpcValue]] = None # Supporting full_sap_epc: Optional[RawEpcRow] = None @@ -280,7 +285,7 @@ class EPCRecord: self.original_epc = self.epc_records["original_epc"].copy() # Working copy that we will clean and manipulate - self.prepared_epc = self.epc_records["original_epc"].copy() + self._prepared_epc = self.epc_records["original_epc"].copy() self.full_sap_epc = self.epc_records["full_sap_epc"] self.old_data = self.epc_records["old_data"] @@ -290,11 +295,67 @@ class EPCRecord: self._clean_records_using_epc_records() self._clean_with_data_processor() + self._inject_address_metadata() self._expand_prepared_epc_to_attributes() self._identify_delta_between_prepared_and_original_records() return + def _inject_address_metadata(self): + """ + Given metadata about an address, provided by the landlord on input, this method will inject it into the prepared + EPC record, to allow it to be used in cleaning and processing steps. This is particularly useful for cleaning + missing or anomalous location data, by using other location data provided by the landlord. + :return: + """ + + addr = self.address_metadata + if addr is None: + # We don't always have address metadata and so we don't inject if it's not there + return + + landlord_remapping = { + "total_floor_area": addr.landlord_total_floor_area_m2, # 1m tolerance on floor area to perform remap + "property_type": addr.landlord_property_type, + "built_form": addr.landlord_built_form, + + # Components + "walls_description": addr.landlord_wall_construction, + "roof_description": addr.landlord_roof_construction, + "floor_description": addr.landlord_floor_construction, + "windows_description": addr.landlord_windows_type, + "main_fuel": addr.landlord_fuel_type, + "mainheat_description": addr.landlord_heating_system, + "mainheatcont_description": addr.landlord_heating_controls, + "hotwater_description": addr.landlord_hot_water_system, + + # Efficiency + "walls_energy_eff": addr.landlord_wall_efficiency, + "roof_energy_eff": addr.landlord_roof_efficiency, + "windows_energy_eff": addr.landlord_windows_efficiency, + "mainheat_energy_eff": addr.landlord_heating_efficiency, + "mainheatc_energy_eff": addr.landlord_heating_controls_efficiency, + "hot_water_energy_eff": addr.landlord_hot_water_efficiency, + + "multi_glaze_proportion": addr.landlord_multi_glaze_proportion * 100, # TODO: Fix this! + "construction_age_band": addr.landlord_construction_age_band, + } + + # Saniry check - ensure valid keys + if any(k for k in landlord_remapping.keys() if k not in self._prepared_epc): + raise ValueError("Landlord remapping contains keys that are not in the EPC record") + + self.landlord_differences = {} # Anything actaully changed + for k, v in landlord_remapping.items(): + if k == "total_floor_area": + if abs(self._prepared_epc.get(k) - v) > 1: # 1m tolerance + self.landlord_differences[k] = v + else: + if v != self._prepared_epc.get(k) and (not pd.isnull(v)) and (not pd.isnull(self._prepared_epc.get(k))): + self.landlord_differences[k] = v + + self.prepared_epc.update(self.landlord_differences) + @staticmethod def _calculate_days_to(lodgement_date: Union[str, pd.Series]) -> Union[int, pd.Series]: if isinstance(lodgement_date, str): @@ -319,7 +380,7 @@ class EPCRecord: record = epc_data_processor.data.to_dict(orient="records")[0] - self.prepared_epc = cast(RawEpcRow, record) + self._prepared_epc = cast(RawEpcRow, record) @staticmethod def _cast_value(value: PreparedEpcValue, type_hint: Any) -> PreparedEpcValue: @@ -354,7 +415,7 @@ class EPCRecord: field_map = {f.name: f for f in fields(self)} - for key, value in self.prepared_epc.items(): + for key, value in self._prepared_epc.items(): # Enforce schema consistency if "-" in key: @@ -439,44 +500,44 @@ class EPCRecord: def _clean_floor_height(self) -> None: """Remaps anomalies in floor height to the average floor height for the property type""" floor_height_data = self.cleaning_data[ - (self.cleaning_data["property_type"] == self.prepared_epc["property-type"]) - & (self.cleaning_data["built_form"] == self.prepared_epc["built-form"]) + (self.cleaning_data["property_type"] == self._prepared_epc["property-type"]) + & (self.cleaning_data["built_form"] == self._prepared_epc["built-form"]) ] average = float(np.mean(floor_height_data["floor_height"])) sd = float(np.std(floor_height_data["floor_height"])) # If we're in the top 0.5 percentile of floor heights, we'll set it to the average - if self.prepared_epc["floor-height"] > average + 10 * sd: - self.prepared_epc["floor-height"] = average - if self.prepared_epc["floor-height"] <= 1.665: - self.prepared_epc["floor-height"] = average + if self._prepared_epc["floor-height"] > average + 10 * sd: + self._prepared_epc["floor-height"] = average + if self._prepared_epc["floor-height"] <= 1.665: + self._prepared_epc["floor-height"] = average def _clean_new_build_descriptions(self) -> None: for col in ["roof-description", "walls-description", "floor-description"]: - self.prepared_epc[col] = self.prepared_epc[col].replace("W/m²K", "W/m-¦K") + self._prepared_epc[col] = self._prepared_epc[col].replace("W/m²K", "W/m-¦K") def _clean_constituency(self) -> None: """ We handle the single case of finding a missing constituency by using the local authority """ - if pd.isnull(self.prepared_epc["constituency"]) or ( - self.prepared_epc["constituency"] == "" + if pd.isnull(self._prepared_epc["constituency"]) or ( + self._prepared_epc["constituency"] == "" ): - if self.prepared_epc["local-authority"] != "E06000044": + if self._prepared_epc["local-authority"] != "E06000044": raise NotImplementedError( "This function is only implemented for Portsmouth, in the single edgecase seen" ) - self.prepared_epc["constituency"] = "E14000883" + self._prepared_epc["constituency"] = "E14000883" def _clean_floor_level(self) -> None: """ This method will clean the floor level, if empty or invalid """ - if not self.prepared_epc: + if not self._prepared_epc: raise ValueError("EPC Recrod doesn not contain epc data") - self.prepared_epc["floor-level"] = ( - FLOOR_LEVEL_MAP[self.prepared_epc["floor-level"]] - if self.prepared_epc["floor-level"] not in DATA_ANOMALY_MATCHES + self._prepared_epc["floor-level"] = ( + FLOOR_LEVEL_MAP[self._prepared_epc["floor-level"]] + if self._prepared_epc["floor-level"] not in DATA_ANOMALY_MATCHES else None ) @@ -484,10 +545,10 @@ class EPCRecord: """ This method will clean the number of lighting outlets, if empty or invalid """ - if not self.prepared_epc: + if not self._prepared_epc: raise ValueError("EPC Recrod doesn not contain epc data") - if self.prepared_epc["fixed-lighting-outlets-count"] in DATA_ANOMALY_MATCHES: + if self._prepared_epc["fixed-lighting-outlets-count"] in DATA_ANOMALY_MATCHES: # We check old EPCs and the full SAP EPC lighting_data = [] @@ -508,7 +569,7 @@ class EPCRecord: ) if lighting_data: - self.prepared_epc["fixed-lighting-outlets-count"] = round( + self._prepared_epc["fixed-lighting-outlets-count"] = round( np.median(lighting_data) ) else: @@ -533,11 +594,12 @@ class EPCRecord: "LOCAL_AUTHORITY", ], ) - self.prepared_epc["fixed-lighting-outlets-count"] = round( + self._prepared_epc["fixed-lighting-outlets-count"] = round( cleaned_property_data["FIXED_LIGHTING_OUTLETS_COUNT"].values[0] ) else: - self.prepared_epc["fixed-lighting-outlets-count"] = float(self.prepared_epc["fixed-lighting-outlets-count"]) + self._prepared_epc["fixed-lighting-outlets-count"] = float( + self._prepared_epc["fixed-lighting-outlets-count"]) def _filter_property_dimensions(self, property_dimensions) -> pd.Series: """ @@ -547,7 +609,7 @@ class EPCRecord: """ result = property_dimensions[ - (property_dimensions["PROPERTY_TYPE"] == self.prepared_epc["property-type"]) + (property_dimensions["PROPERTY_TYPE"] == self._prepared_epc["property-type"]) ] if self.construction_age_band not in DATA_ANOMALY_MATCHES: @@ -556,10 +618,10 @@ class EPCRecord: ] if ( - self.prepared_epc["built-form"] not in DATA_ANOMALY_MATCHES - and self.prepared_epc["built-form"] in result["BUILT_FORM"] + self._prepared_epc["built-form"] not in DATA_ANOMALY_MATCHES + and self._prepared_epc["built-form"] in result["BUILT_FORM"] ): - result = result[(result["BUILT_FORM"] == self.prepared_epc["built-form"])] + result = result[(result["BUILT_FORM"] == self._prepared_epc["built-form"])] return result[ [ @@ -575,102 +637,102 @@ class EPCRecord: Cleans up the number of floors, number of habitable rooms, and the floor height """ - if not self.prepared_epc: + if not self._prepared_epc: raise ValueError("EPC Record doesn not contain epc data") if ( - (self.prepared_epc["number-habitable-rooms"] in DATA_ANOMALY_MATCHES) - or (self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES) - or (self.prepared_epc["number-heated-rooms"] in DATA_ANOMALY_MATCHES) + (self._prepared_epc["number-habitable-rooms"] in DATA_ANOMALY_MATCHES) + or (self._prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES) + or (self._prepared_epc["number-heated-rooms"] in DATA_ANOMALY_MATCHES) ): # TODO - this probably shouldn't live here - but we only need to use this for specific properties # when we meet this condition property_dimensions: pd.DataFrame = read_dataframe_from_s3_parquet( bucket_name=DATA_BUCKET, - file_key=f"property_dimensions/{self.prepared_epc['local-authority']}.parquet", + file_key=f"property_dimensions/{self._prepared_epc['local-authority']}.parquet", ) self.property_dimensions: pd.Series = self._filter_property_dimensions( property_dimensions ) - if self.prepared_epc["number-habitable-rooms"] in DATA_ANOMALY_MATCHES: - self.prepared_epc["number-habitable-rooms"] = float( + if self._prepared_epc["number-habitable-rooms"] in DATA_ANOMALY_MATCHES: + self._prepared_epc["number-habitable-rooms"] = float( self.property_dimensions["NUMBER_HABITABLE_ROOMS"].round() ) else: - self.prepared_epc["number-habitable-rooms"] = float( - self.prepared_epc["number-habitable-rooms"] + self._prepared_epc["number-habitable-rooms"] = float( + self._prepared_epc["number-habitable-rooms"] ) - if self.prepared_epc["number-heated-rooms"] in DATA_ANOMALY_MATCHES: - self.prepared_epc["number-heated-rooms"] = float( + if self._prepared_epc["number-heated-rooms"] in DATA_ANOMALY_MATCHES: + self._prepared_epc["number-heated-rooms"] = float( self.property_dimensions["NUMBER_HEATED_ROOMS"].round() ) else: - self.prepared_epc["number-heated-rooms"] = float( - self.prepared_epc["number-heated-rooms"] + self._prepared_epc["number-heated-rooms"] = float( + self._prepared_epc["number-heated-rooms"] ) self.number_of_floors = estimate_number_of_floors( - self.prepared_epc["property-type"] + self._prepared_epc["property-type"] ) if ( - self.prepared_epc["floor-height"] == "" - or self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES + self._prepared_epc["floor-height"] == "" + or self._prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES ): - self.prepared_epc["floor-height"] = float( + self._prepared_epc["floor-height"] = float( self.property_dimensions["FLOOR_HEIGHT"].round(2) ) else: - self.prepared_epc["floor-height"] = float(self.prepared_epc["floor-height"]) + self._prepared_epc["floor-height"] = float(self._prepared_epc["floor-height"]) def _clean_floor_area(self) -> None: """ This method will clean the floor area, if empty or invalid """ - if not self.prepared_epc: + if not self._prepared_epc: raise ValueError("EPC Recrod doesn not contain epc data") - if self.prepared_epc["total-floor-area"] is None: + if self._prepared_epc["total-floor-area"] is None: return - self.prepared_epc["total-floor-area"] = float( - self.prepared_epc["total-floor-area"] + self._prepared_epc["total-floor-area"] = float( + self._prepared_epc["total-floor-area"] ) # We handle the edge case of floor area being 0. We set it to zero and it is cleaned by # _clean_with_data_processor - if self.prepared_epc["total-floor-area"] == 0: + if self._prepared_epc["total-floor-area"] == 0: print( "Edge case of floor area being zero - will set to none and will be cleaned in " "_clean_with_data_processor" ) - self.prepared_epc["total-floor-area"] = None + self._prepared_epc["total-floor-area"] = None def _clean_mains_gas(self) -> None: """ This method will clean the mains gas, if empty or invalid """ - if not self.prepared_epc: + if not self._prepared_epc: raise ValueError("EPC Recrod doesn not contain epc data") mains_gas_map = {"Y": True, "N": False, True: True, False: False} - self.prepared_epc["mains-gas-flag"] = ( + self._prepared_epc["mains-gas-flag"] = ( None if ( - self.prepared_epc["mains-gas-flag"] == "" - or self.prepared_epc["mains-gas-flag"] in DATA_ANOMALY_MATCHES + self._prepared_epc["mains-gas-flag"] == "" + or self._prepared_epc["mains-gas-flag"] in DATA_ANOMALY_MATCHES ) - else mains_gas_map[self.prepared_epc["mains-gas-flag"]] + else mains_gas_map[self._prepared_epc["mains-gas-flag"]] ) def _clean_heat_loss_corridor(self) -> None: """ This method will clean the heat loss corridor, if empty or invalid """ - if not self.prepared_epc: + if not self._prepared_epc: raise ValueError("EPC Recrod doesn not contain epc data") valid_values = ["no corridor", "unheated corridor", "heated corridor"] @@ -681,30 +743,30 @@ class EPCRecord: "heated corridor": False, } - self.prepared_epc["heat-loss-corridor"] = ( + self._prepared_epc["heat-loss-corridor"] = ( "no corridor" - if self.prepared_epc["heat-loss-corridor"] in DATA_ANOMALY_MATCHES - else self.prepared_epc["heat-loss-corridor"] + if self._prepared_epc["heat-loss-corridor"] in DATA_ANOMALY_MATCHES + else self._prepared_epc["heat-loss-corridor"] ) - if self.prepared_epc["heat-loss-corridor"] not in valid_values: - self.prepared_epc["heat-loss-corridor"] = "no corridor" + if self._prepared_epc["heat-loss-corridor"] not in valid_values: + self._prepared_epc["heat-loss-corridor"] = "no corridor" - self.prepared_epc["unheated-corridor-length"] = ( - float(self.prepared_epc["unheated-corridor-length"]) - if self.prepared_epc["unheated-corridor-length"] not in ["", None] + self._prepared_epc["unheated-corridor-length"] = ( + float(self._prepared_epc["unheated-corridor-length"]) + if self._prepared_epc["unheated-corridor-length"] not in ["", None] else None ) # We create boolean versions of heat-loss-corridor self.heat_loss_corridor_bool = boolean_map[ - self.prepared_epc["heat-loss-corridor"] + self._prepared_epc["heat-loss-corridor"] ] def _clean_count_variables(self) -> None: """ This method will clean the count variables, if empty or invalid """ - if not self.prepared_epc: + if not self._prepared_epc: raise ValueError("EPC Recrod doesn not contain epc data") _fields = [ @@ -717,7 +779,7 @@ class EPCRecord: null_attributes = ["flat-storey-count", "number-habitable-rooms"] for attribute in _fields: - value = self.prepared_epc[attribute] + value = self._prepared_epc[attribute] if value in DATA_ANOMALY_MATCHES or pd.isnull(value): if attribute in null_attributes: value = None @@ -726,7 +788,7 @@ class EPCRecord: else: value = int(float(value)) - self.prepared_epc[attribute] = value + self._prepared_epc[attribute] = value def _clean_wind_turbine(self) -> None: """ @@ -745,7 +807,7 @@ class EPCRecord: """ This method will clean the solar hot water, if empty or invalid """ - if not self.prepared_epc: + if not self._prepared_epc: raise ValueError("EPC Recrod doesn not contain epc data") value_map = {"Y": "Y", "N": "N", "": "N", None: "N"} @@ -755,25 +817,25 @@ class EPCRecord: "N": False, } - self.prepared_epc["solar-water-heating-flag"] = value_map[ - self.prepared_epc["solar-water-heating-flag"] + self._prepared_epc["solar-water-heating-flag"] = value_map[ + self._prepared_epc["solar-water-heating-flag"] ] # Create a boolean version for storage in the database self.solar_water_heating_flag_bool = boolean_map[ - self.prepared_epc["solar-water-heating-flag"] + self._prepared_epc["solar-water-heating-flag"] ] def _clean_solar_pv(self) -> None: """ This method will clean the solar pv, if empty or invalid """ - if not self.prepared_epc: + if not self._prepared_epc: raise ValueError("EPC Recrod doesn not contain epc data") - self.prepared_epc["photo-supply"] = ( - float(self.prepared_epc["photo-supply"]) - if (self.prepared_epc["photo-supply"] not in DATA_ANOMALY_MATCHES) + self._prepared_epc["photo-supply"] = ( + float(self._prepared_epc["photo-supply"]) + if (self._prepared_epc["photo-supply"] not in DATA_ANOMALY_MATCHES) else None ) @@ -781,43 +843,43 @@ class EPCRecord: """ This method will clean the energy, if empty or invalid """ - if not self.prepared_epc: + if not self._prepared_epc: raise ValueError("EPC Recrod doesn not contain epc data") - self.prepared_epc["energy-consumption-current"] = float( - self.prepared_epc["energy-consumption-current"] + self._prepared_epc["energy-consumption-current"] = float( + self._prepared_epc["energy-consumption-current"] ) - self.prepared_epc["co2-emissions-current"] = float( - self.prepared_epc["co2-emissions-current"] + self._prepared_epc["co2-emissions-current"] = float( + self._prepared_epc["co2-emissions-current"] ) def _clean_built_form(self) -> None: """ This method will clean the build form, if empty or invalid """ - if not self.prepared_epc: + if not self._prepared_epc: raise ValueError("EPC Recrod doesn not contain epc data") - if self.prepared_epc["built-form"] in DATA_ANOMALY_MATCHES: - if self.prepared_epc["property-type"] in ["Flat", "Maisonette"]: - self.prepared_epc["built-form"] = "End-Terrace" + if self._prepared_epc["built-form"] in DATA_ANOMALY_MATCHES: + if self._prepared_epc["property-type"] in ["Flat", "Maisonette"]: + self._prepared_epc["built-form"] = "End-Terrace" else: - self.prepared_epc["built-form"] = "Semi-Detached" + self._prepared_epc["built-form"] = "Semi-Detached" def _clean_age_band(self) -> None: """ This method will clean the age band, if empty or invalid """ - if not self.prepared_epc: + if not self._prepared_epc: raise ValueError("EPC Recrod doesn not contain epc data") - self.prepared_epc["construction-age-band"] = ( + self._prepared_epc["construction-age-band"] = ( EPCDataProcessor.clean_construction_age_band( - self.prepared_epc["construction-age-band"] + self._prepared_epc["construction-age-band"] ) ) - if self.prepared_epc["construction-age-band"] in DATA_ANOMALY_MATCHES: + if self._prepared_epc["construction-age-band"] in DATA_ANOMALY_MATCHES: if self.old_data: # Take the most recent old_age_bands = [ @@ -835,26 +897,26 @@ class EPCRecord: if old_record["lodgement-datetime"] == max_datetime ] - self.prepared_epc["construction-age-band"] = ( + self._prepared_epc["construction-age-band"] = ( EPCDataProcessor.clean_construction_age_band( most_recent[0]["construction-age-band"] ) ) - self.construction_age_band = self.prepared_epc["construction-age-band"] + self.construction_age_band = self._prepared_epc["construction-age-band"] self.age_band = england_wales_age_band_lookup.get(self.construction_age_band) - if (self.prepared_epc["transaction-type"] == "new dwelling") and ( + if (self._prepared_epc["transaction-type"] == "new dwelling") and ( self.age_band is None ): self.age_band = "L" self.construction_age_band = "England and Wales: 2012 onwards" - self.prepared_epc["construction-age-band"] = self.construction_age_band + self._prepared_epc["construction-age-band"] = self.construction_age_band if self.age_band is None: self.age_band = "C" self.construction_age_band = "England and Wales: 1930-1949" - self.prepared_epc["construction-age-band"] = self.construction_age_band + self._prepared_epc["construction-age-band"] = self.construction_age_band def _clean_year_built(self) -> None: """ @@ -1044,27 +1106,45 @@ class EPCRecord: def get( self, - key: Union[str, List[str]], + key: str | list[str], return_asdict: bool = False, key_suffix: str | None = None, ) -> PreparedEpcValue | list[PreparedEpcValue] | dict[str, PreparedEpcValue]: + """ - This method will return the value of the key + Retrieves the value(s) for the specified key(s) from the prepared EPC data. + :param key: A single key (str) or a list of keys (list[str]) to retrieve values for. + :param return_asdict: If True and key is a list, returns a dictionary of key-value pairs instead of a list of + values. + :param key_suffix: An optional suffix to append to each key in the returned dictionary when return_asdict is + True. + :return: The value(s) corresponding to the specified key(s). Returns a single value if key is a string, + a list of values if key is a list and return_asdict is False, or a dictionary of key-value pairs if key is a + list and return_asdict is True. """ - if return_asdict: - output_dict = { - x: self.__dict__[x] if x in self.__dict__.keys() else None for x in key - } - if key_suffix is not None: - output_dict = {f"{x}{key_suffix}": y for x, y in output_dict.items()} - return output_dict + + source = self.prepared_epc if self.prepared_epc is not None else self.__dict__ + + if isinstance(key, str): + return source.get(key) if isinstance(key, list): - return [ - self.__dict__[x] if x in self.__dict__.keys() else None for x in key - ] - elif isinstance(key, str): - return self.__dict__[key] if key in self.__dict__.keys() else None + + if return_asdict: + result = {k: source.get(k) for k in key} + + if key_suffix: + result = {f"{k}{key_suffix}": v for k, v in result.items()} + + return result + + return [source.get(k) for k in key] + + raise TypeError(f"Key {key} is not a recognised type") + + @property + def prepared_epc(self): + return self._prepared_epc class EPCDifferenceRecord: