Model/etl/epc/Record.py
2026-03-06 13:25:39 +00:00

1272 lines
45 KiB
Python

from typing import Optional, get_origin, get_args, TypedDict, Dict
from dataclasses import fields
from datetime import datetime
from dataclasses import dataclass
from etl.epc.ValidationConfiguration import (
EPCRecordValidationConfiguration,
EPCDifferenceRecordValidationConfiguration,
EPCDifferenceRecordFixedDataValidationConfiguration,
)
from etl.epc.DataProcessor import EPCDataProcessor
from recommendations.rdsap_tables import england_wales_age_band_lookup, FLOOR_LEVEL_MAP
from etl.epc.settings import DATA_ANOMALY_MATCHES
import re
import os
import numpy as np
import pandas as pd
from typing import Any, Union, List
from etl.epc.settings import (
RDSAP_RESPONSE,
HEAT_DEMAND_RESPONSE,
CARBON_RESPONSE,
COMPONENT_FEATURES,
EFFICIENCY_FEATURES,
ROOM_FEATURES,
POST_SAP10_FEATURE,
)
from recommendations.recommendation_utils import estimate_number_of_floors
from utils.s3 import read_dataframe_from_s3_parquet
from utils.logger import setup_logger
from etl.epc.settings import EARLIEST_EPC_DATE
logger = setup_logger()
# TODO: Change these in the settings file
RDSAP_RESPONSE = RDSAP_RESPONSE.lower()
HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower()
CARBON_RESPONSE = CARBON_RESPONSE.lower()
COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES]
EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES]
ENVIRONMENT = os.environ.get("ENVIRONMENT", "dev")
DATA_BUCKET = os.environ.get(
"DATA_BUCKET", "retrofit-data-dev" if ENVIRONMENT == "dev" else None
)
pd.set_option("future.no_silent_downcasting", True)
class InputEpcRecords(TypedDict):
original_epc: Dict[str, Any]
full_sap_epc: Dict[str, Any]
old_data: List[Dict[str, Any]]
@dataclass
class EPCRecord:
"""
Base class for a EPC record
"""
# ------------------------------------------------------------------
# IDENTIFIERS / METADATA
# ------------------------------------------------------------------
uprn: Optional[int] = None
lmk_key: Optional[str] = None
building_reference_number: Optional[str] = None
report_type: Optional[str] = None
transaction_type: Optional[str] = None
uprn_source: Optional[str] = None
lodgement_date: Optional[str] = None
lodgement_datetime: Optional[str] = None
inspection_date: Optional[str] = None
# ------------------------------------------------------------------
# ADDRESS / LOCATION DATA
# ------------------------------------------------------------------
address: Optional[str] = None
address1: Optional[str] = None
address2: Optional[str] = None
address3: Optional[str] = None
postcode: Optional[str] = None
posttown: Optional[str] = None
county: Optional[str] = None
local_authority: Optional[str] = None
local_authority_label: Optional[str] = None
constituency: Optional[str] = None
constituency_label: Optional[str] = None
# ------------------------------------------------------------------
# PROPERTY CHARACTERISTICS
# ------------------------------------------------------------------
property_type: Optional[str] = None
built_form: Optional[str] = None
tenure: Optional[str] = None
floor_level: Optional[str] = None
flat_top_storey: Optional[str] = None
flat_storey_count: Optional[int] = None
glazed_area: Optional[str] = None
heat_loss_corridor: Optional[str] = None
unheated_corridor_length: Optional[float] = None
mains_gas_flag: Optional[str] = None
# ------------------------------------------------------------------
# BUILDING FABRIC DESCRIPTIONS
# ------------------------------------------------------------------
walls_description: Optional[str] = None
floor_description: Optional[str] = None
roof_description: Optional[str] = None
windows_description: Optional[str] = None
walls_env_eff: Optional[str] = None
floor_env_eff: Optional[str] = None
roof_env_eff: Optional[str] = None
windows_env_eff: Optional[str] = None
mainheat_env_eff: Optional[str] = None
sheating_env_eff: Optional[str] = None
hot_water_env_eff: Optional[str] = None
mainheatc_env_eff: Optional[str] = None
walls_energy_eff: Optional[str] = None
floor_energy_eff: Optional[str] = None
roof_energy_eff: Optional[str] = None
windows_energy_eff: Optional[str] = None
hot_water_energy_eff: Optional[str] = None
sheating_energy_eff: Optional[str] = None
mainheat_energy_eff: Optional[str] = None
mainheatc_energy_eff: Optional[str] = None
# ------------------------------------------------------------------
# HEATING / HOT WATER / SYSTEMS
# ------------------------------------------------------------------
mainheat_description: Optional[str] = None
mainheatcont_description: Optional[str] = None
secondheat_description: Optional[str] = None
hotwater_description: Optional[str] = None
main_fuel: Optional[str] = None
main_heating_controls: Optional[str] = None
mechanical_ventilation: Optional[str] = None
solar_water_heating_flag: Optional[str] = None
wind_turbine_count: Optional[int] = None
photo_supply: Optional[float] = None
# ------------------------------------------------------------------
# LIGHTING
# ------------------------------------------------------------------
lighting_description: Optional[str] = None
lighting_env_eff: Optional[str] = None
lighting_energy_eff: Optional[str] = None
low_energy_lighting: Optional[float] = None
fixed_lighting_outlets_count: Optional[int] = None
low_energy_fixed_light_count: Optional[int] = None
# ------------------------------------------------------------------
# ENERGY RATINGS
# ------------------------------------------------------------------
current_energy_rating: Optional[str] = None
potential_energy_rating: Optional[str] = None
current_energy_efficiency: Optional[int] = None
potential_energy_efficiency: Optional[float] = None
# ------------------------------------------------------------------
# ENERGY / CARBON METRICS
# ------------------------------------------------------------------
energy_consumption_current: Optional[int] = None
energy_consumption_potential: Optional[float] = None
co2_emissions_current: Optional[float] = None
co2_emissions_potential: Optional[float] = None
co2_emiss_curr_per_floor_area: Optional[float] = None
environment_impact_current: Optional[int] = None
environment_impact_potential: Optional[float] = None
# ------------------------------------------------------------------
# COST METRICS
# ------------------------------------------------------------------
heating_cost_current: Optional[float] = None
lighting_cost_current: Optional[float] = None
hot_water_cost_current: Optional[float] = None
heating_cost_potential: Optional[float] = None
lighting_cost_potential: Optional[float] = None
hot_water_cost_potential: Optional[float] = None
energy_tariff: Optional[str] = None
# ------------------------------------------------------------------
# PROPERTY DIMENSIONS / COUNTS
# ------------------------------------------------------------------
total_floor_area: Optional[float] = None
floor_height: Optional[float] = None
number_habitable_rooms: Optional[float] = None
number_heated_rooms: Optional[float] = None
number_open_fireplaces: Optional[float] = None
extension_count: Optional[float] = None
# ------------------------------------------------------------------
# GLAZING
# ------------------------------------------------------------------
glazed_type: Optional[str] = None
multi_glaze_proportion: Optional[float] = None
# ------------------------------------------------------------------
# MODEL FLAGS
# ------------------------------------------------------------------
is_post_sap10: Optional[bool] = None
run_mode: str = "training"
epc_records: Optional[InputEpcRecords] = None
full_sap_epc: Optional[dict] = None
old_data: list[dict] = None
original_epc: Optional[dict] = None
prepared_epc: Optional[dict] = None
prepared_epc_delta_metadata: pd.DataFrame = None
cleaning_data: pd.DataFrame = None
# Not used in training mod but used in newdata mode
age_band: str = None
construction_age_band: str = None
year_built: int = None
number_of_floors: int = None
number_of_open_fireplaces: int = None
heat_loss_corridor_bool: bool = None
solar_water_heating_flag_bool: bool = None
def __post_init__(self):
# We can have validation and cleaning steps for each of the fields
# self.WALLS_DESCRIPTION = 'check'
# Could also have cleaning of records if needed
if self.run_mode == "training":
self.validation_configuration = EPCRecordValidationConfiguration
# self._field_validation()
return
# We are running in newdata mode
if self.epc_records is None:
raise ValueError("Must provide epc records if running in newdata mode")
self.prepared_epc = self.epc_records["original_epc"]
self.original_epc = self.epc_records["original_epc"].copy()
self.full_sap_epc = self.epc_records["full_sap_epc"]
self.old_data = self.epc_records["old_data"]
if self.cleaning_data is None:
raise ValueError("Must provide cleaning data if running in newdata mode")
self._clean_records_using_epc_records()
self._clean_with_data_processor()
self._expand_prepared_epc_to_attributes()
self._identify_delta_between_prepared_and_original_records()
return
@staticmethod
def _calculate_days_to(lodgement_date):
if isinstance(lodgement_date, str):
return (
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
).days
return (
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
).dt.days
def _clean_with_data_processor(self):
"""
This method will clean the records using the data processor
"""
epc_data_processor = EPCDataProcessor(
data=self.epc_record_as_dataframe("prepared_epc").copy(),
run_mode="newdata",
cleaning_averages=self.cleaning_data,
)
epc_data_processor.prepare_data()
self.prepared_epc = epc_data_processor.data.to_dict(orient="records")[0]
def _cast_value(self, value, type_hint):
origin = get_origin(type_hint)
args = get_args(type_hint)
if origin is Union:
type_hint = [a for a in args if a is not type(None)][0]
if type_hint is int:
return int(value)
if type_hint is float:
return float(value)
if type_hint is bool:
if isinstance(value, bool):
return value
return str(value).lower() in ["true", "1", "y", "yes"]
if type_hint is str:
return str(value)
return value
def _expand_prepared_epc_to_attributes(self):
"""
Expand prepared_epc dictionary into dataclass attributes.
Assumes prepared_epc keys are snake_case.
"""
field_map = {f.name: f for f in fields(self)}
for key, value in self.prepared_epc.items():
# Enforce schema consistency
if "-" in key:
raise ValueError(f"Invalid EPC key format (expected snake_case): {key}")
if key not in field_map:
# Ignore keys that are not part of the dataclass schema
continue
if value in ("", None):
setattr(self, key, None)
continue
try:
cast_value = self._cast_value(value, field_map[key].type)
setattr(self, key, cast_value)
except Exception as e:
logger.error(f"Failed casting field '{key}' with value '{value}': {e}")
setattr(self, key, value)
def _identify_delta_between_prepared_and_original_records(self):
"""
This method will identify the delta between the prepared and original records
"""
prepared_epc_df = self.epc_record_as_dataframe("prepared_epc")
original_epc_df = self.epc_record_as_dataframe("original_epc")
df = pd.concat(
[prepared_epc_df, original_epc_df],
keys=["prepared_epc", "original_epc"],
axis=0,
)
same_index = df.apply(pd.Series.duplicated).any()
self.prepared_epc_delta_metadata = df[same_index[~same_index].index]
def _clean_records_using_epc_records(self):
"""
This method will clean the records
"""
# TODO: Move all the cleaning steps in the Property class into there
self._clean_built_form()
self._clean_energy()
self._clean_ventilation()
self._clean_solar_pv()
self._clean_solar_hot_water()
self._clean_wind_turbine()
self._clean_count_variables()
self._clean_heat_loss_corridor()
self._clean_mains_gas()
self._clean_age_band()
self._clean_year_built()
self._clean_floor_area()
self._clean_property_dimensions()
self._clean_number_lighting_outlets()
self._clean_floor_level()
self._clean_floor_height()
self._clean_constituency()
self._clean_new_build_descriptions()
# self._clean_potential_energy_efficiency()
# self._clean_environment_impact_potential()
# self._clean_energy_consumption_potential()
# self._clean_co2_emissions_potential()
# self._clean_current_energy_efficiency()
# self._clean_energy_consumption_current()
# self._clean_co2_emissions_current()
def epc_record_as_dataframe(
self,
epc_type: str = "prepared_epc",
use_upper_columns: bool = True,
replace_empty_string: bool = False,
):
"""
This method will return the dataframe representation of the epc record
"""
df = pd.DataFrame.from_dict(self.get(epc_type), orient="index").T
if use_upper_columns:
df.columns = [x.upper().replace("-", "_") for x in df.columns]
if replace_empty_string:
df = df.replace("", np.nan).infer_objects(copy=False)
return df
def _clean_floor_height(self):
"""Remaps anomalies in floor height to the average floor height for the property type"""
floor_height_data = self.cleaning_data[
(self.cleaning_data["property_type"] == self.prepared_epc["property-type"])
& (self.cleaning_data["built_form"] == self.prepared_epc["built-form"])
]
average = floor_height_data["floor_height"].mean()
sd = floor_height_data["floor_height"].std()
# If we're in the top 0.5 percentile of floor heights, we'll set it to the average
if self.prepared_epc["floor-height"] > average + 10 * sd:
self.prepared_epc["floor-height"] = average
if self.prepared_epc["floor-height"] <= 1.665:
self.prepared_epc["floor-height"] = average
def _clean_new_build_descriptions(self):
for col in ["roof-description", "walls-description", "floor-description"]:
self.prepared_epc[col] = self.prepared_epc[col].replace("W/m²K", "W/m-¦K")
def _clean_constituency(self):
"""
We handle the single case of finding a missing constituency by using the local authority
"""
if pd.isnull(self.prepared_epc["constituency"]) or (
self.prepared_epc["constituency"] == ""
):
if self.prepared_epc["local-authority"] != "E06000044":
raise NotImplementedError(
"This function is only implemented for Portsmouth, in the single edgecase seen"
)
self.prepared_epc["constituency"] = "E14000883"
def _clean_floor_level(self):
"""
This method will clean the floor level, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["floor-level"] = (
FLOOR_LEVEL_MAP[self.prepared_epc["floor-level"]]
if self.prepared_epc["floor-level"] not in DATA_ANOMALY_MATCHES
else None
)
def _clean_number_lighting_outlets(self):
"""
This method will clean the number of lighting outlets, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
if self.prepared_epc["fixed-lighting-outlets-count"] in DATA_ANOMALY_MATCHES:
# We check old EPCs and the full SAP EPC
lighting_data = []
if len(self.old_data):
lighting_data.extend(
[
int(old_record["fixed-lighting-outlets-count"])
for old_record in self.old_data
if old_record["fixed-lighting-outlets-count"] != ""
]
)
if len(self.full_sap_epc):
if self.full_sap_epc["fixed-lighting-outlets-count"] != "":
lighting_data.append(
int(self.full_sap_epc["fixed-lighting-outlets-count"])
)
if lighting_data:
self.prepared_epc["fixed-lighting-outlets-count"] = round(
np.median(lighting_data)
)
else:
# Use averages from the cleaning dataset, based on the property type, built form, construction age
# band and local authority
cleaning_data = self.cleaning_data.copy()
# When running in new-data more, the columns will have been coerced to lower case so we push them
# back to upper case
if self.run_mode == "newdata":
cleaning_data.columns = [x.upper() for x in cleaning_data.columns]
cleaned_property_data = EPCDataProcessor.apply_averages_cleaning(
data_to_clean=self.epc_record_as_dataframe(
"prepared_epc", replace_empty_string=True
),
cleaning_data=cleaning_data,
cols_to_merge_on=[
"PROPERTY_TYPE",
"BUILT_FORM",
"CONSTRUCTION_AGE_BAND",
"LOCAL_AUTHORITY",
],
)
self.prepared_epc["fixed-lighting-outlets-count"] = round(
cleaned_property_data["FIXED_LIGHTING_OUTLETS_COUNT"].values[0]
)
else:
self.prepared_epc["fixed-lighting-outlets-count"] = float(
self.prepared_epc["fixed-lighting-outlets-count"]
)
def _filter_property_dimensions(self, property_dimensions):
"""
Will filter the property dimensions dataframe to only include the relevant rows for the property
:param property_dimensions:
:return: filtered property dimensions dataframe
"""
result = property_dimensions[
(property_dimensions["PROPERTY_TYPE"] == self.prepared_epc["property-type"])
]
if self.construction_age_band not in DATA_ANOMALY_MATCHES:
result = result[
(result["CONSTRUCTION_AGE_BAND"] == self.construction_age_band)
]
if (
self.prepared_epc["built-form"] not in DATA_ANOMALY_MATCHES
and self.prepared_epc["built-form"] in result["BUILT_FORM"]
):
result = result[(result["BUILT_FORM"] == self.prepared_epc["built-form"])]
return result[
[
"NUMBER_HABITABLE_ROOMS",
"NUMBER_HEATED_ROOMS",
"TOTAL_FLOOR_AREA",
"FLOOR_HEIGHT",
]
].mean()
def _clean_property_dimensions(self):
"""
Cleans up the number of floors, number of habitable rooms, and the floor height
"""
if not self.prepared_epc:
raise ValueError("EPC Record doesn not contain epc data")
if (
(self.prepared_epc["number-habitable-rooms"] in DATA_ANOMALY_MATCHES)
or (self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES)
or (self.prepared_epc["number-heated-rooms"] in DATA_ANOMALY_MATCHES)
):
# TODO - this probably shouldn't live here - but we only need to use this for specific properties
# when we meet this condition
property_dimensions = read_dataframe_from_s3_parquet(
bucket_name=DATA_BUCKET,
file_key=f"property_dimensions/{self.prepared_epc['local-authority']}.parquet",
)
self.property_dimensions = self._filter_property_dimensions(
property_dimensions
)
if self.prepared_epc["number-habitable-rooms"] in DATA_ANOMALY_MATCHES:
self.prepared_epc["number-habitable-rooms"] = float(
self.property_dimensions["NUMBER_HABITABLE_ROOMS"].round()
)
else:
self.prepared_epc["number-habitable-rooms"] = float(
self.prepared_epc["number-habitable-rooms"]
)
if self.prepared_epc["number-heated-rooms"] in DATA_ANOMALY_MATCHES:
self.prepared_epc["number-heated-rooms"] = float(
self.property_dimensions["NUMBER_HEATED_ROOMS"].round()
)
else:
self.prepared_epc["number-heated-rooms"] = float(
self.prepared_epc["number-heated-rooms"]
)
self.number_of_floors = estimate_number_of_floors(
self.prepared_epc["property-type"]
)
# if self.prepared_epc["property-type"] == "House":
# self.number_of_floors = 2
# elif self.prepared_epc["property-type"] in ["Flat", "Bungalow"]:
# self.number_of_floors = 1
# elif self.prepared_epc["property-type"] == "Maisonette":
# self.number_of_floors = 2
# else:
# raise NotImplementedError("Implement me")
if (
self.prepared_epc["floor-height"] == ""
or self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES
):
self.prepared_epc["floor-height"] = float(
self.property_dimensions["FLOOR_HEIGHT"].round(2)
)
else:
self.prepared_epc["floor-height"] = float(self.prepared_epc["floor-height"])
def _clean_floor_area(self):
"""
This method will clean the floor area, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
if self.prepared_epc["total-floor-area"] is None:
return
self.prepared_epc["total-floor-area"] = float(
self.prepared_epc["total-floor-area"]
)
# We handle the edge case of floor area being 0. We set it to zero and it is cleaned by
# _clean_with_data_processor
if self.prepared_epc["total-floor-area"] == 0:
print(
"Edge case of floor area being zero - will set to none and will be cleaned in "
"_clean_with_data_processor"
)
self.prepared_epc["total-floor-area"] = None
def _clean_mains_gas(self):
"""
This method will clean the mains gas, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
mains_gas_map = {"Y": True, "N": False, True: True, False: False}
self.prepared_epc["mains-gas-flag"] = (
None
if (
self.prepared_epc["mains-gas-flag"] == ""
or self.prepared_epc["mains-gas-flag"] in DATA_ANOMALY_MATCHES
)
else mains_gas_map[self.prepared_epc["mains-gas-flag"]]
)
def _clean_heat_loss_corridor(self):
"""
This method will clean the heat loss corridor, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
valid_values = ["no corridor", "unheated corridor", "heated corridor"]
boolean_map = {
"no corridor": False,
"unheated corridor": True,
"heated corridor": False,
}
self.prepared_epc["heat-loss-corridor"] = (
"no corridor"
if self.prepared_epc["heat-loss-corridor"] in DATA_ANOMALY_MATCHES
else self.prepared_epc["heat-loss-corridor"]
)
if self.prepared_epc["heat-loss-corridor"] not in valid_values:
self.prepared_epc["heat-loss-corridor"] = "no corridor"
self.prepared_epc["unheated-corridor-length"] = (
float(self.prepared_epc["unheated-corridor-length"])
if self.prepared_epc["unheated-corridor-length"] not in ["", None]
else None
)
# We create boolean versions of heat-loss-corridor
self.heat_loss_corridor_bool = boolean_map[
self.prepared_epc["heat-loss-corridor"]
]
def _clean_count_variables(self):
"""
This method will clean the count variables, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
fields = [
"number-open-fireplaces",
"extension-count",
"flat-storey-count",
"number-habitable-rooms",
]
null_attributes = ["flat-storey-count", "number-habitable-rooms"]
for attribute in fields:
value = self.prepared_epc[attribute]
if value in DATA_ANOMALY_MATCHES or pd.isnull(value):
if attribute in null_attributes:
value = None
else:
value = 0
else:
value = int(float(value))
self.prepared_epc[attribute] = value
def _clean_wind_turbine(self):
"""
This method will clean the wind turbine, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["wind-turbine-count"] = (
int(self.prepared_epc["wind-turbine-count"])
if self.prepared_epc["wind-turbine-count"] not in DATA_ANOMALY_MATCHES
else None
)
def _clean_solar_hot_water(self):
"""
This method will clean the solar hot water, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
value_map = {"Y": "Y", "N": "N", "": "N", None: "N"}
boolean_map = {
"Y": True,
"N": False,
}
self.prepared_epc["solar-water-heating-flag"] = value_map[
self.prepared_epc["solar-water-heating-flag"]
]
# Create a boolean version for storage in the database
self.solar_water_heating_flag_bool = boolean_map[
self.prepared_epc["solar-water-heating-flag"]
]
def _clean_solar_pv(self):
"""
This method will clean the solar pv, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["photo-supply"] = (
float(self.prepared_epc["photo-supply"])
if (self.prepared_epc["photo-supply"] not in DATA_ANOMALY_MATCHES)
else None
)
def _clean_energy(self):
"""
This method will clean the energy, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["energy-consumption-current"] = float(
self.prepared_epc["energy-consumption-current"]
)
self.prepared_epc["co2-emissions-current"] = float(
self.prepared_epc["co2-emissions-current"]
)
def _clean_built_form(self):
"""
This method will clean the build form, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
if self.prepared_epc["built-form"] in DATA_ANOMALY_MATCHES:
if self.prepared_epc["property-type"] in ["Flat", "Maisonette"]:
self.prepared_epc["built-form"] = "End-Terrace"
else:
self.prepared_epc["built-form"] = "Semi-Detached"
def _clean_age_band(self):
"""
This method will clean the age band, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["construction-age-band"] = (
EPCDataProcessor.clean_construction_age_band(
self.prepared_epc["construction-age-band"]
)
)
if self.prepared_epc["construction-age-band"] in DATA_ANOMALY_MATCHES:
if self.old_data:
# Take the most recent
old_age_bands = [
old_record["lodgement-datetime"]
for old_record in self.old_data
if old_record["construction-age-band"] not in DATA_ANOMALY_MATCHES
]
if old_age_bands:
max_datetime = max(old_age_bands)
most_recent = [
old_record
for old_record in self.old_data
if old_record["lodgement-datetime"] == max_datetime
]
self.prepared_epc["construction-age-band"] = (
EPCDataProcessor.clean_construction_age_band(
most_recent[0]["construction-age-band"]
)
)
self.construction_age_band = self.prepared_epc["construction-age-band"]
self.age_band = england_wales_age_band_lookup.get(self.construction_age_band)
if (self.prepared_epc["transaction-type"] == "new dwelling") and (
self.age_band is None
):
self.age_band = "L"
self.construction_age_band = "England and Wales: 2012 onwards"
self.prepared_epc["construction-age-band"] = self.construction_age_band
if self.age_band is None:
self.age_band = "C"
self.construction_age_band = "England and Wales: 1930-1949"
self.prepared_epc["construction-age-band"] = self.construction_age_band
def _clean_year_built(self):
"""
This method will clean the year built, if empty or invalid
"""
if self.full_sap_epc:
self.year_built = datetime.strptime(
self.full_sap_epc["lodgement-date"], "%Y-%m-%d"
).year
return
if self.construction_age_band not in DATA_ANOMALY_MATCHES:
# Take the lower limit. If we're pessimistic about the age of the property, that at least means we have
# more options for recommendations if that age falls before the year that insulation in walls became
# common practice
band = [
int(x)
for x in re.findall(
r"\b\d{4}\b", self.prepared_epc["construction-age-band"]
)
]
self.year_built = band[0]
return
# We don't know when the property was built
self.year_built = None
def _clean_ventilation(self):
"""
This method will clean the ventilation, if empty or invalid
"""
self.prepared_epc["mechanical-ventilation"] = (
None
if (self.prepared_epc["mechanical-ventilation"] in DATA_ANOMALY_MATCHES)
else (self.prepared_epc["mechanical-ventilation"])
)
def _field_validation(self):
"""
This method will validate each of the fields in the EPC record
"""
for record_key, validation_config in self.validation_configuration.items():
# Get the variable named record key from self
field_value = self.__dict__[record_key]
if validation_config["type"] == "string":
self._validate_string(record_key, field_value, validation_config)
elif validation_config["type"] == "float":
self._validate_float(record_key, field_value, validation_config)
else:
raise ValueError(
f"Validation type {validation_config['type']} not supported"
)
def _validate_string(
self, record_key: str, field_value: Union[str, float], validation_config: dict
):
"""
Validate a string field
"""
if not isinstance(field_value, str):
raise ValueError(
f"Field {record_key} has value {field_value} which is not a string"
)
if "function" in validation_config:
try:
validation_config["function"](field_value)
except:
raise ValueError(
f"Field {record_key} has value {field_value} which does not pass the validation function "
f"{validation_config['function']}"
)
if validation_config["acceptable_values"] is not None:
if field_value not in validation_config["acceptable_values"]:
raise ValueError(
f"Field {record_key} has value {field_value} which is not in the acceptable values of "
f"{validation_config['acceptable_values']}"
)
@staticmethod
def _validate_float(
record_key: str, field_value: Union[str, float], validation_config: dict
):
"""
Validate a float field
"""
if not isinstance(field_value, float):
raise ValueError(
f"Field {record_key} has value {field_value} which is not a float"
)
if "function" in validation_config:
try:
validation_config["function"](field_value)
except:
raise ValueError(
f"Field {record_key} has value {field_value} which does not pass the validation function "
f"{validation_config['function']}"
)
if validation_config["range"] is not None:
if (
field_value < validation_config["range"][0]
or field_value > validation_config["range"][1]
):
raise ValueError(
f"Field {record_key} has value {field_value} which is not in the acceptable range of "
f"{validation_config['range']}"
)
def create_EPCDifferenceRecord(self, other, fixed_data, auto_sort: bool = True):
"""
This method will create the difference record between the two records
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only subtract EPCRecord from EPCRecord")
difference_record = EPCDifferenceRecord(
record1=self, record2=other, auto_sort=auto_sort
)
difference_record.append_fixed_data(fixed_data)
return difference_record
def __sub__(self, other):
"""
This method will return the difference between two EPC records
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only subtract EPCRecord from EPCRecord")
print("Deprecated method, use create_EPCDifferenceRecord instead")
difference_record = EPCDifferenceRecord(
record1=self, record2=other, auto_sort=True
)
return difference_record
def __gt__(self, other):
"""
This method will return True if the EPC record is greater than or equal to the other
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only compare EPCRecord to EPCRecord")
return self.__dict__[RDSAP_RESPONSE] > other.__dict__[RDSAP_RESPONSE]
def __ge__(self, other):
"""
This method will return True if the EPC record is greater than or equal to the other
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only compare EPCRecord to EPCRecord")
return self.__dict__[RDSAP_RESPONSE] >= other.__dict__[RDSAP_RESPONSE]
def __lt__(self, other):
"""
This method will return True if the EPC record is greater than or equal to the other
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only compare EPCRecord to EPCRecord")
return self.__dict__[RDSAP_RESPONSE] < other.__dict__[RDSAP_RESPONSE]
def __le__(self, other):
"""
This method will return True if the EPC record is greater than or equal to the other
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only compare EPCRecord to EPCRecord")
return self.__dict__[RDSAP_RESPONSE] <= other.__dict__[RDSAP_RESPONSE]
def get(
self,
key: Union[str, List[str]],
return_asdict: bool = False,
key_suffix: str | None = None,
) -> Any:
"""
This method will return the value of the key
"""
if return_asdict:
output_dict = {
x: self.__dict__[x] if x in self.__dict__.keys() else None for x in key
}
if key_suffix is not None:
output_dict = {f"{x}{key_suffix}": y for x, y in output_dict.items()}
return output_dict
if isinstance(key, list):
return [
self.__dict__[x] if x in self.__dict__.keys() else None for x in key
]
elif isinstance(key, str):
return self.__dict__[key] if key in self.__dict__.keys() else None
class EPCDifferenceRecord:
"""
Base class for the difference between two EPC records
"""
def __init__(self, record1: EPCRecord, record2: EPCRecord, auto_sort: bool = False):
"""
This method will initialise the EPCDifferenceRecord
Defaults usage is with record2 to have the higher RDSAP score
"""
self.record1 = record1
self.record2 = record2
self.earliest_record = (
record1 if record1.lodgement_date < record2.lodgement_date else record2
)
self.flag_fabric_consistency = False
self.difference_record = {}
self.difference_validation_configuration = (
EPCDifferenceRecordValidationConfiguration
)
self.fixed_data_validation_configuration = (
EPCDifferenceRecordFixedDataValidationConfiguration
)
if auto_sort and (self.record2 <= self.record1):
self.record1, self.record2 = self.record2, self.record1
self._construct_difference_record()
self._validate_difference_record()
# self._detect_fabric_consistency()
def _construct_difference_record(self):
"""
This method will construct the difference record between the two records
"""
rdsap_change = self.record2.get(RDSAP_RESPONSE) - self.record1.get(
RDSAP_RESPONSE
)
heat_demand_change = self.record2.get(HEAT_DEMAND_RESPONSE) - self.record1.get(
HEAT_DEMAND_RESPONSE
)
carbon_change = self.record2.get(CARBON_RESPONSE) - self.record1.get(
CARBON_RESPONSE
)
component_variables = (
COMPONENT_FEATURES
+ EFFICIENCY_FEATURES
+ ROOM_FEATURES
+ POST_SAP10_FEATURE
)
ending_record = self.record2.get(
component_variables + ["lodgement_date"],
return_asdict=True,
key_suffix="_ending",
)
starting_record = self.record1.get(
component_variables + ["lodgement_date"],
return_asdict=True,
key_suffix="_starting",
)
self.difference_record = {
"uprn": self.record1.get("uprn"),
"rdsap_change": rdsap_change,
"heat_demand_change": heat_demand_change,
"carbon_change": carbon_change,
"sap_starting": self.record1.get(RDSAP_RESPONSE),
"sap_ending": self.record2.get(RDSAP_RESPONSE),
"heat_demand_starting": self.record1.get(HEAT_DEMAND_RESPONSE),
"heat_demand_ending": self.record2.get(HEAT_DEMAND_RESPONSE),
"carbon_starting": self.record1.get(CARBON_RESPONSE),
"carbon_ending": self.record2.get(CARBON_RESPONSE),
"lighting_cost_starting": self.record1.get("lighting_cost_current"),
"lighting_cost_ending": self.record2.get("lighting_cost_current"),
"heating_cost_starting": self.record1.get("heating_cost_current"),
"heating_cost_ending": self.record2.get("heating_cost_current"),
"hot_water_cost_starting": self.record1.get("hot_water_cost_current"),
"hot_water_cost_ending": self.record2.get("hot_water_cost_current"),
"potential_energy_efficiency": self.earliest_record.get(
"potential_energy_efficiency"
),
"environment_impact_potential": self.earliest_record.get(
"environment_impact_potential"
),
"energy_consumption_potential": self.earliest_record.get(
"energy_consumption_potential"
),
"co2_emissions_potential": self.earliest_record.get(
"co2_emissions_potential"
),
**ending_record,
**starting_record,
}
def _validate_difference_record(self):
"""
This method will validate the difference record
"""
# for key, value in self.difference_record.items():
# if key == "LODGEMENT_DATE":
# continue
# if isinstance(value, str):
# continue
# if value < 0:
# raise ValueError(f"Difference record has negative value for {key}")
pass
def compare_fields_in_records(self, fields: List[str]):
"""
This method will compare the records, for specific fields
"""
all_equal = True
for field in fields:
if self.record1.get(field) != self.record2.get(field):
return False
if all_equal:
return True
def get(self, key: str):
"""
This method will return the value of the key
"""
return (
self.difference_record[key]
if key in self.difference_record.keys()
else None
)
def append_fixed_data(self, fixed_data: dict):
"""
This method will append fixed data to the difference record
"""
self._validate_fixed_data(fixed_data)
self.difference_record.update(fixed_data)
def _validate_fixed_data(self, fixed_data: dict):
"""
This method will validate the fixed data
"""
# Can have more sophisticated checks here
# self.fixed_data_validataion_configuration
pass
def ensure_adequate_data(self) -> bool:
"""
This method will ensure that the difference record has adequate data, to keep record, even if rdsap change is
zero
Can move into the initiation of the difference record
"""
wall_check = self.record1.walls_description == self.record2.walls_description
floor_check = self.record1.floor_description == self.record2.floor_description
roof_check = self.record1.roof_description == self.record2.roof_description
mainheat_check = (
self.record1.mainheat_description == self.record2.mainheat_description
)
windows_check = (
self.record1.windows_description == self.record2.windows_description
)
solar_water_heating_flag_check = (
self.record1.solar_water_heating_flag
== self.record2.solar_water_heating_flag
)
solar_pv_check = self.record1.photo_supply == self.record2.photo_supply
heating_control_check = (
self.record1.mainheatcont_description
== self.record2.mainheatcont_description
)
extension_count_check = (
self.record1.extension_count == self.record2.extension_count
)
floor_height_check = (
abs(1 - (self.record1.floor_height / self.record2.floor_height)) < 0.05
)
total_floor_area_check = (
abs(1 - (self.record1.total_floor_area / self.record2.total_floor_area))
< 0.05
)
if all(
[
wall_check,
floor_check,
roof_check,
mainheat_check,
windows_check,
solar_water_heating_flag_check,
extension_count_check,
floor_height_check,
total_floor_area_check,
solar_pv_check,
heating_control_check,
]
):
return True
else:
return False