Model/etl/epc/Record.py

887 lines
36 KiB
Python

from datetime import datetime
from dataclasses import dataclass
from etl.epc.ValidationConfiguration import (
EPCRecordValidationConfiguration,
EPCDifferenceRecordValidationConfiguration,
EPCDifferenceRecordFixedDataValidationConfiguration
)
from etl.epc.DataProcessor import EPCDataProcessor
from recommendations.rdsap_tables import england_wales_age_band_lookup, FLOOR_LEVEL_MAP
from etl.epc.settings import DATA_ANOMALY_MATCHES, BUILT_FORM_REMAP
import re
import os
import numpy as np
import pandas as pd
from typing import Any, Union, List
from etl.epc.settings import (
RDSAP_RESPONSE,
HEAT_DEMAND_RESPONSE,
CARBON_RESPONSE,
COMPONENT_FEATURES,
EFFICIENCY_FEATURES
)
from utils.s3 import read_dataframe_from_s3_parquet
from etl.epc.settings import EARLIEST_EPC_DATE
# TODO: Change these in the settings file
RDSAP_RESPONSE = RDSAP_RESPONSE.lower()
HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower()
CARBON_RESPONSE = CARBON_RESPONSE.lower()
COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES]
EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES]
ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev')
DATA_BUCKET = os.environ.get('DATA_BUCKET', 'retrofit-data-dev' if ENVIRONMENT == 'dev' else None)
@dataclass
class EPCRecord:
"""
Base class for a EPC record
"""
uprn: int = None
walls_description: str = None
floor_description: str = None
lighting_description: str = None
roof_description: str = None
mainheat_description: str = None
hotwater_description: str = None
main_fuel: str = None
mechanical_ventilation: str = None
secondheat_description: str = None
windows_description: str = None
glazed_type: str = None
multi_glaze_proportion: float = None
low_energy_lighting: float = None
number_open_fireplaces: float = None
mainheatcont_description: str = None
solar_water_heating_flag: str = None
photo_supply: float = None
transaction_type: str = None
energy_tariff: str = None
extension_count: float = None
total_floor_area: float = None
floor_height: float = None
hot_water_energy_eff: str = None
floor_energy_eff: str = None
windows_energy_eff: str = None
walls_energy_eff: str = None
sheating_energy_eff: str = None
roof_energy_eff: str = None
mainheat_energy_eff: str = None
mainheatc_energy_eff: str = None
lighting_energy_eff: str = None
potential_energy_efficiency: float = None
environment_impact_potential: float = None
energy_consumption_potential: float = None
co2_emissions_potential: float = None
lodgement_date: str = None
current_energy_efficiency: int = None
energy_consumption_current: int = None
co2_emissions_current: float = None
# u_values_walls = None
# u_values_roof = None
# u_values_floor = None
run_mode: str = "training"
# TODO: Make this a class so thet api_records is structured
epc_records: dict = None
full_sap_epc: dict = None
old_data: list[dict] = None
original_epc: dict = None
prepared_epc: dict = None
prepared_epc_delta_metadata: pd.DataFrame = None
cleaning_data: pd.DataFrame = None
# Not used in training mod but used in newdata mode
age_band: str = None
construction_age_band: str = None
year_built: int = None
number_of_floors: int = None
number_of_open_fireplaces: int = None
def __post_init__(self):
# We can have validation and cleaning steps for each of the fields
# self.WALLS_DESCRIPTION = 'check'
# Could also have cleaning of records if needed
if self.run_mode == "training":
self.validation_configuration = EPCRecordValidationConfiguration
# self._field_validation()
return
# We are running in newdata mode
if self.epc_records is None:
raise ValueError("Must provide epc records if running in newdata mode")
self.prepared_epc = self.epc_records['original_epc']
self.original_epc = self.epc_records['original_epc'].copy()
self.full_sap_epc = self.epc_records['full_sap_epc']
self.old_data = self.epc_records['old_data']
if self.cleaning_data is None:
raise ValueError("Must provide cleaning data if running in newdata mode")
self._clean_records_using_epc_records()
self._clean_with_data_processor()
self._temp_uprn_catch()
self._expand_prepared_epc_to_attributes()
self._identify_delta_between_prepared_and_original_records()
# Process to create uvalues for the single epc record
# selff.df = self.epc_record_as_dataframe('prepared_epc')
# self._feature_generation()
# self._drop_features()
return
self._expand_description_to_features()
self._expand_description_to_uvalues()
# self._generate_uvalues()
# self._validate_expanded_description()
# self._validate_u_values()
# etc
pass
def _drop_features(self):
"""
Drop features that are not needed for modelling
"""
self.df = self.df.drop(columns=["lodgement_date_starting", "lodgement_date_ending"])
def _feature_generation(self):
"""
Generate features for modelling
"""
self.df["days_to_lodgement_date"] = self._calculate_days_to(self.prepared_epc["lodgement_date"])
@staticmethod
def _calculate_days_to(lodgement_date):
if isinstance(lodgement_date, str):
return (
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
).days
return (
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
).dt.days
def _temp_uprn_catch(self):
"""
Catch the case we do now have uprn
"""
if self.prepared_epc["uprn"] == "":
self.prepared_epc["uprn"] = 0
def _clean_with_data_processor(self):
"""
This method will clean the records using the data processor
"""
epc_data_processor = EPCDataProcessor(
data=self.epc_record_as_dataframe("prepared_epc"),
run_mode="newdata",
cleaning_averages=self.cleaning_data
)
epc_data_processor.prepare_data()
self.prepared_epc = epc_data_processor.data.to_dict(orient="records")[0]
def _expand_prepared_epc_to_attributes(self):
"""
This method will expand the prepared epc to attributes
"""
# for key, value in self.prepared_epc.items():
# setattr(self, key, value)
self.uprn: int = int(self.prepared_epc["uprn"])
self.walls_description: str = self.prepared_epc["walls_description"]
self.floor_description: str = self.prepared_epc["floor_description"]
self.lighting_description: str = self.prepared_epc["lighting_description"]
self.roof_description: str = self.prepared_epc["roof_description"]
self.mainheat_description: str = self.prepared_epc["mainheat_description"]
self.hotwater_description: str = self.prepared_epc["hotwater_description"]
self.main_fuel: str = self.prepared_epc["main_fuel"]
self.mechanical_ventilation: str = self.prepared_epc["mechanical_ventilation"]
self.secondheat_description: str = self.prepared_epc["secondheat_description"]
self.windows_description: str = self.prepared_epc["windows_description"]
self.glazed_type: str = self.prepared_epc["glazed_type"]
self.multi_glaze_proportion: float = float(self.prepared_epc["multi_glaze_proportion"])
self.low_energy_lighting: float = float(self.prepared_epc["low_energy_lighting"])
self.number_open_fireplaces: float = float(self.prepared_epc["number_open_fireplaces"])
self.mainheatcont_description: str = self.prepared_epc["mainheatcont_description"]
self.solar_water_heating_flag: str = self.prepared_epc["solar_water_heating_flag"]
self.photo_supply: float = float(self.prepared_epc["photo_supply"])
self.transaction_type: str = self.prepared_epc["transaction_type"]
self.energy_tariff: str = self.prepared_epc["energy_tariff"]
self.extension_count: float = float(self.prepared_epc["extension_count"])
self.total_floor_area: float = float(self.prepared_epc["total_floor_area"])
self.floor_height: float = float(self.prepared_epc["floor_height"])
self.hot_water_energy_eff: str = self.prepared_epc["hot_water_energy_eff"]
self.floor_energy_eff: str = self.prepared_epc["floor_energy_eff"]
self.windows_energy_eff: str = self.prepared_epc["windows_energy_eff"]
self.walls_energy_eff: str = self.prepared_epc["walls_energy_eff"]
self.sheating_energy_eff: str = self.prepared_epc["sheating_energy_eff"]
self.roof_energy_eff: str = self.prepared_epc["roof_energy_eff"]
self.mainheat_energy_eff: str = self.prepared_epc["mainheat_energy_eff"]
self.mainheatc_energy_eff: str = self.prepared_epc["mainheatc_energy_eff"]
self.lighting_energy_eff: str = self.prepared_epc["lighting_energy_eff"]
self.potential_energy_efficiency: float = float(self.prepared_epc["potential_energy_efficiency"])
self.environment_impact_potential: float = float(self.prepared_epc["environment_impact_potential"])
self.energy_consumption_potential: float = float(self.prepared_epc["energy_consumption_potential"])
self.co2_emissions_potential: float = float(self.prepared_epc["co2_emissions_potential"])
self.lodgement_date: str = self.prepared_epc["lodgement_date"]
self.current_energy_efficiency: int = int(self.prepared_epc["current_energy_efficiency"])
self.energy_consumption_current: int = int(self.prepared_epc["energy_consumption_current"])
self.co2_emissions_current: float = float(self.prepared_epc["co2_emissions_current"])
def _identify_delta_between_prepared_and_original_records(self):
"""
This method will identify the delta between the prepared and original records
"""
prepared_epc_df = self.epc_record_as_dataframe("prepared_epc")
original_epc_df = self.epc_record_as_dataframe("original_epc")
df = pd.concat([prepared_epc_df, original_epc_df], keys=["prepared_epc", "original_epc"], axis=0)
same_index = df.apply(pd.Series.duplicated).any()
self.prepared_epc_delta_metadata = df[same_index[~same_index].index]
def _expand_description_to_features(self):
pass
def _expand_description_to_uvalues(self):
# TODO: can be loop over all the descriptions, or done in one
pass
# def _process_and_prune(self, cleaned_lookup: dict):
# """
# This method will merge on the cleaned lookup table and ensure that the building fabric in the
# starting and ending EPC is consistent, so ensure that we are performing our modelling on the cleanest
# possible dataset.
# """
# for component in ["walls", "floor", "roof", "hotwater", "mainheat", "mainheatcont", "windows", "main-fuel"]:
# if component == "main-fuel":
# component = component.replace("-", "_")
# cleaned_key = "main-fuel" if component == "main-fuel" else f"{component}-description"
# left_on_starting = (
# f"{component}_starting" if component == "main-fuel" else f"{component}_description_starting"
# )
# left_on_ending = (
# f"{component}_ending" if component == "main-fuel" else f"{component}_description_ending"
# )
# self.df2 = self.df.merge(
# pd.DataFrame(cleaned_lookup[cleaned_key]),
# how="left",
# left_on=left_on_starting,
# right_on="original_description",
# ).merge(
# pd.DataFrame(cleaned_lookup[cleaned_key]),
# how="left",
# left_on=left_on_ending,
# right_on="original_description",
# suffixes=("", "_ending")
# )
def _clean_records_using_epc_records(self):
"""
This method will clean the records
"""
# TODO: Move all the cleaning steps in the Property class into there
self._clean_built_form()
self._clean_energy()
self._clean_ventilation()
self._clean_solar_pv()
self._clean_solar_hot_water()
self._clean_wind_turbine()
self._clean_count_variables()
self._clean_heat_loss_corridor()
self._clean_mains_gas()
self._clean_age_band()
self._clean_year_built()
self._clean_floor_area()
self._clean_property_dimensions()
self._clean_number_lighting_outlets()
self._clean_floor_level()
# self._clean_potential_energy_efficiency()
# self._clean_environment_impact_potential()
# self._clean_energy_consumption_potential()
# self._clean_co2_emissions_potential()
# self._clean_current_energy_efficiency()
# self._clean_energy_consumption_current()
# self._clean_co2_emissions_current()
def epc_record_as_dataframe(self, epc_type: str = "prepared_epc", use_upper_columns: bool = True,
replace_empty_string: bool = False):
"""
This method will return the dataframe representation of the epc record
"""
df = pd.DataFrame.from_dict(self.get(epc_type), orient="index").T
if use_upper_columns:
df.columns = [x.upper().replace("-", "_") for x in df.columns]
if replace_empty_string:
df = df.replace("", np.nan)
return df
def _clean_floor_level(self):
"""
This method will clean the floor level, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["floor-level"] = (
FLOOR_LEVEL_MAP[self.prepared_epc["floor-level"]] if
self.prepared_epc["floor-level"] not in list(DATA_ANOMALY_MATCHES) + ["", None] else None
)
def _clean_number_lighting_outlets(self):
"""
This method will clean the number of lighting outlets, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
if self.prepared_epc["fixed-lighting-outlets-count"] == "":
# We check old EPCs and the full SAP EPC
lighting_data = []
if len(self.old_data):
lighting_data.extend([
int(old_record["fixed-lighting-outlets-count"]) for old_record in self.old_data if
old_record["fixed-lighting-outlets-count"] != ""
])
if len(self.full_sap_epc):
if self.full_sap_epc["fixed-lighting-outlets-count"] != "":
lighting_data.append(int(self.full_sap_epc["fixed-lighting-outlets-count"]))
if lighting_data:
self.prepared_epc["fixed-lighting-outlets-count"] = round(np.median(lighting_data))
else:
# Use averages from the cleaning dataset, based on the property type, built form, construction age
# band and local authority
cleaning_data = self.cleaning_data.copy()
# When running in new-data more, the columns will have been coerced to lower case so we push them
# back to upper case
if self.run_mode == "newdata":
cleaning_data.columns = [x.upper() for x in cleaning_data.columns]
cleaned_property_data = EPCDataProcessor.apply_averages_cleaning(
data_to_clean=self.epc_record_as_dataframe("prepared_epc", replace_empty_string=True),
cleaning_data=cleaning_data,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
)
self.prepared_epc["fixed-lighting-outlets-count"] = round(
cleaned_property_data["FIXED_LIGHTING_OUTLETS_COUNT"].values[0]
)
else:
self.prepared_epc["fixed-lighting-outlets-count"] = float(self.prepared_epc["fixed-lighting-outlets-count"])
def _filter_property_dimensions(self, property_dimensions):
"""
Will filter the property dimensions dataframe to only include the relevant rows for the property
:param property_dimensions:
:return: filtered property dimensions dataframe
"""
result = property_dimensions[(property_dimensions["PROPERTY_TYPE"] == self.prepared_epc["property-type"])]
if self.construction_age_band is not None and self.construction_age_band not in DATA_ANOMALY_MATCHES:
result = result[(result["CONSTRUCTION_AGE_BAND"] == self.construction_age_band)]
if self.prepared_epc["built-form"] not in DATA_ANOMALY_MATCHES and self.prepared_epc["built-form"] in result[
"BUILT_FORM"]:
result = result[(result["BUILT_FORM"] == self.prepared_epc["built-form"])]
return result[["NUMBER_HABITABLE_ROOMS", "TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]].mean()
def _clean_property_dimensions(self):
"""
Cleans up the number of floors, number of habitable rooms, and the floor height
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
if not self.prepared_epc["number-habitable-rooms"] or (
self.prepared_epc["floor-height"] == "" or self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES
):
property_dimensions = read_dataframe_from_s3_parquet(
bucket_name=DATA_BUCKET, file_key=f"property_dimensions/{self.prepared_epc['local-authority']}.parquet"
)
self.property_dimensions = self._filter_property_dimensions(property_dimensions)
if not self.prepared_epc["number-habitable-rooms"]:
self.prepared_epc["number-habitable-rooms"] = float(
self.property_dimensions["NUMBER_HABITABLE_ROOMS"].round())
else:
self.prepared_epc["number-habitable-rooms"] = float(self.prepared_epc["number-habitable-rooms"])
if self.prepared_epc["property-type"] == "House":
self.number_of_floors = 2
elif self.prepared_epc["property-type"] in ["Flat", "Bungalow"]:
self.number_of_floors = 1
elif self.prepared_epc["property-type"] == "Maisonette":
self.number_of_floors = 2
else:
raise NotImplementedError("Implement me")
if self.prepared_epc["floor-height"] == "" or self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES:
self.prepared_epc["floor-height"] = float(self.property_dimensions["FLOOR_HEIGHT"].round(2))
else:
self.prepared_epc["floor-height"] = float(self.prepared_epc["floor-height"])
def _clean_floor_area(self):
"""
This method will clean the floor area, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["total-floor-area"] = float(self.prepared_epc["total-floor-area"])
def _clean_mains_gas(self):
"""
This method will clean the mains gas, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
mains_gas_map = {
"Y": True,
"N": False,
}
self.prepared_epc["mains-gas-flag"] = None if (
self.prepared_epc["mains-gas-flag"] == "" or self.prepared_epc["mains-gas-flag"] in DATA_ANOMALY_MATCHES
) else mains_gas_map[self.prepared_epc["mains-gas-flag"]]
def _clean_heat_loss_corridor(self):
"""
This method will clean the heat loss corridor, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
valid_values = [
"no corridor",
"unheated corridor",
"heated corridor"
]
self.prepared_epc["heat-loss-corridor"] = (
"no corridor" if self.prepared_epc["heat-loss-corridor"] in DATA_ANOMALY_MATCHES else
self.prepared_epc["heat-loss-corridor"]
)
if self.prepared_epc["heat-loss-corridor"] not in valid_values:
self.prepared_epc["heat-loss-corridor"] = "no corridor"
self.prepared_epc["unheated-corridor-length"] = (
float(self.prepared_epc["unheated-corridor-length"]) if
self.prepared_epc["unheated-corridor-length"] not in ["", None] else None
)
def _clean_count_variables(self):
"""
This method will clean the count variables, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Record doesn not contain epc data")
fields = [
"number-open-fireplaces",
"extension-count",
"flat-storey-count",
"number-habitable-rooms"
]
null_attributes = ["flat-storey-count", "number-habitable-rooms"]
for attribute in fields:
value = self.prepared_epc[attribute]
if value in ["", None] or value in DATA_ANOMALY_MATCHES:
if attribute in null_attributes:
value = None
else:
value = 0
else:
value = int(value)
self.prepared_epc[attribute] = value
def _clean_wind_turbine(self):
"""
This method will clean the wind turbine, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc['wind-turbine-count'] = int(
self.prepared_epc['wind-turbine-count']
) if self.prepared_epc['wind-turbine-count'] not in ["", None] else None
def _clean_solar_hot_water(self):
"""
This method will clean the solar hot water, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
value_map = {
"Y": "Y",
"N": "N",
"": "N",
None: "N"
}
self.prepared_epc['solar-water-heating-flag'] = value_map[self.prepared_epc['solar-water-heating-flag']]
def _clean_solar_pv(self):
"""
This method will clean the solar pv, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc['photo-supply'] = float(self.prepared_epc['photo-supply']) if self.prepared_epc[
'photo-supply'] != "" \
else None
def _clean_energy(self):
"""
This method will clean the energy, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc['energy-consumption-current'] = float(self.prepared_epc["energy-consumption-current"])
self.prepared_epc['co2-emissions-current'] = float(self.prepared_epc["co2-emissions-current"])
def _clean_built_form(self):
"""
This method will clean the build form, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc['built-form'] = BUILT_FORM_REMAP.get(
self.prepared_epc["built-form"], self.prepared_epc["built-form"]
)
if self.prepared_epc["built-form"] in DATA_ANOMALY_MATCHES:
if self.prepared_epc["property-type"] in ["Flat", "Maisonette"]:
self.prepared_epc["built-form"] = "End-Terrace"
def _clean_age_band(self):
"""
This method will clean the age band, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.construction_age_band = EPCDataProcessor.clean_construction_age_band(
self.prepared_epc["construction-age-band"])
if self.construction_age_band in DATA_ANOMALY_MATCHES:
if self.old_data:
# Take the most recent
max_datetime = max(
[old_record["lodgement-datetime"] for old_record in self.old_data if
old_record["construction-age-band"] not in DATA_ANOMALY_MATCHES]
)
most_recent = [old_record for old_record in self.old_data if
old_record["lodgement-datetime"] == max_datetime]
self.construction_age_band = EPCDataProcessor.clean_construction_age_band(
most_recent[0]["construction-age-band"]
)
self.age_band = england_wales_age_band_lookup.get(self.construction_age_band)
if (self.prepared_epc["transaction-type"] == "new dwelling") and (self.age_band is None):
self.age_band = "L"
self.construction_age_band = 'England and Wales: 2012 onwards'
if self.age_band is None:
raise ValueError("age_band is missing")
def _clean_year_built(self):
"""
This method will clean the year built, if empty or invalid
"""
if self.full_sap_epc:
self.year_built = datetime.strptime(self.full_sap_epc["lodgement-date"], '%Y-%m-%d').year
return
if self.construction_age_band not in DATA_ANOMALY_MATCHES:
# Take the lower limit. If we're pessimistic about the age of the property, that at least means we have
# more options for recommendations if that age falls before the year that insulation in walls became
# common practice
band = [int(x) for x in re.findall(r'\b\d{4}\b', self.prepared_epc["construction-age-band"])]
self.year_built = band[0]
return
# We don't know when the property was built
self.year_built = None
def _clean_ventilation(self):
"""
This method will clean the ventilation, if empty or invalid
"""
self.prepared_epc['mechanical-ventilation'] = None if (
(self.prepared_epc['mechanical-ventilation'] == "") or
(self.prepared_epc['mechanical-ventilation'] in DATA_ANOMALY_MATCHES)
) else (
self.prepared_epc['mechanical-ventilation']
)
def _field_validation(self):
"""
This method will validate each of the fields in the EPC record
"""
for record_key, validation_config in self.validation_configuration.items():
# Get the variable named record key from self
field_value = self.__dict__[record_key]
if validation_config['type'] == "string":
self._validate_string(record_key, field_value, validation_config)
elif validation_config['type'] == "float":
self._validate_float(record_key, field_value, validation_config)
else:
raise ValueError(f"Validation type {validation_config['type']} not supported")
def _validate_string(self, record_key: str, field_value: Union[str, float], validation_config: dict):
"""
Validate a string field
"""
if not isinstance(field_value, str):
raise ValueError(f"Field {record_key} has value {field_value} which is not a string")
if 'function' in validation_config:
try:
validation_config['function'](field_value)
except:
raise ValueError(
f"Field {record_key} has value {field_value} which does not pass the validation function "
f"{validation_config['function']}")
if validation_config['acceptable_values'] is not None:
if field_value not in validation_config['acceptable_values']:
raise ValueError(
f"Field {record_key} has value {field_value} which is not in the acceptable values of "
f"{validation_config['acceptable_values']}")
def _validate_float(self, record_key: str, field_value: Union[str, float], validation_config: dict):
"""
Validate a float field
"""
if not isinstance(field_value, float):
raise ValueError(f"Field {record_key} has value {field_value} which is not a float")
if 'function' in validation_config:
try:
validation_config['function'](field_value)
except:
raise ValueError(
f"Field {record_key} has value {field_value} which does not pass the validation function "
f"{validation_config['function']}")
if validation_config['range'] is not None:
if field_value < validation_config['range'][0] or field_value > validation_config['range'][1]:
raise ValueError(
f"Field {record_key} has value {field_value} which is not in the acceptable range of "
f"{validation_config['range']}")
def __sub__(self, other):
"""
This method will return the difference between two EPC records
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only subtract EPCRecord from EPCRecord")
difference_record = EPCDifferenceRecord(record1=self, record2=other, auto_sort=True)
return difference_record
def __gt__(self, other):
"""
This method will return True if the EPC record is greater than or equal to the other
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only compare EPCRecord to EPCRecord")
return self.__dict__[RDSAP_RESPONSE] > other.__dict__[RDSAP_RESPONSE]
def __ge__(self, other):
"""
This method will return True if the EPC record is greater than or equal to the other
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only compare EPCRecord to EPCRecord")
return self.__dict__[RDSAP_RESPONSE] >= other.__dict__[RDSAP_RESPONSE]
def __lt__(self, other):
"""
This method will return True if the EPC record is greater than or equal to the other
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only compare EPCRecord to EPCRecord")
return self.__dict__[RDSAP_RESPONSE] < other.__dict__[RDSAP_RESPONSE]
def __le__(self, other):
"""
This method will return True if the EPC record is greater than or equal to the other
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only compare EPCRecord to EPCRecord")
return self.__dict__[RDSAP_RESPONSE] <= other.__dict__[RDSAP_RESPONSE]
def get(self, key: Union[str, List[str]], return_asdict: bool = False, key_suffix: str | None = None) -> Any:
"""
This method will return the value of the key
"""
if return_asdict:
output_dict = {x: self.__dict__[x] if x in self.__dict__.keys() else None for x in key}
if key_suffix is not None:
output_dict = {f"{x}{key_suffix}": y for x, y in output_dict.items()}
return output_dict
if isinstance(key, list):
return [self.__dict__[x] if x in self.__dict__.keys() else None for x in key]
elif isinstance(key, str):
return self.__dict__[key] if key in self.__dict__.keys() else None
class EPCDifferenceRecord:
"""
Base class for the difference between two EPC records
"""
def __init__(self, record1: EPCRecord, record2: EPCRecord, auto_sort: bool = False):
"""
This method will initialise the EPCDifferenceRecord
Defaults usage is with record2 to have the higher RDSAP score
"""
self.record1 = record1
self.record2 = record2
self.earliest_record = record1 if record1.lodgement_date < record2.lodgement_date else record2
self.flag_fabric_consistency = False
self.difference_record = {}
self.difference_validation_configuration = EPCDifferenceRecordValidationConfiguration
self.fixed_data_validation_configuration = EPCDifferenceRecordFixedDataValidationConfiguration
if auto_sort and (self.record2 <= self.record1):
self.record1, self.record2 = self.record2, self.record1
self._construct_difference_record()
self._validate_difference_record()
# self._detect_fabric_consistency()
def _construct_difference_record(self):
"""
This method will construct the difference record between the two records
"""
rdsap_change = self.record2.get(RDSAP_RESPONSE) - self.record1.get(RDSAP_RESPONSE)
heat_demand_change = self.record2.get(HEAT_DEMAND_RESPONSE) - self.record1.get(HEAT_DEMAND_RESPONSE)
carbon_change = self.record2.get(CARBON_RESPONSE) - self.record1.get(CARBON_RESPONSE)
component_variables = COMPONENT_FEATURES + EFFICIENCY_FEATURES
ending_record = self.record2.get(component_variables + ["lodgement_date"], return_asdict=True,
key_suffix="_ending")
starting_record = self.record1.get(component_variables + ["lodgement_date"], return_asdict=True,
key_suffix="_starting")
self.difference_record = {
"uprn": self.record1.get("uprn"),
"rdsap_change": rdsap_change,
"heat_demand_change": heat_demand_change,
"carbon_change": carbon_change,
"sap_starting": self.record1.get(RDSAP_RESPONSE),
"sap_ending": self.record2.get(RDSAP_RESPONSE),
"heat_demand_starting": self.record1.get(HEAT_DEMAND_RESPONSE),
"heat_demand_ending": self.record2.get(HEAT_DEMAND_RESPONSE),
"carbon_starting": self.record1.get(CARBON_RESPONSE),
"carbon_ending": self.record2.get(CARBON_RESPONSE),
"potential_energy_efficiency": self.earliest_record.get("potential_energy_efficiency"),
"environment_impact_potential": self.earliest_record.get("environment_impact_potential"),
"energy_consumption_potential": self.earliest_record.get("energy_consumption_potential"),
"co2_emissions_potential": self.earliest_record.get("co2_emissions_potential"),
**ending_record,
**starting_record
}
def _validate_difference_record(self):
"""
This method will validate the difference record
"""
# for key, value in self.difference_record.items():
# if key == "LODGEMENT_DATE":
# continue
# if isinstance(value, str):
# continue
# if value < 0:
# raise ValueError(f"Difference record has negative value for {key}")
pass
def compare_fields_in_records(self, fields: List[str]):
"""
This method will compare the records, for specific fields
"""
all_equal = True
for field in fields:
if self.record1.get(field) != self.record2.get(field):
return False
if all_equal:
return True
def get(self, key: str):
"""
This method will return the value of the key
"""
return self.difference_record[key] if key in self.difference_record.keys() else None
def append_fixed_data(self, fixed_data: dict):
"""
This method will append fixed data to the difference record
"""
self._validate_fixed_data(fixed_data)
self.difference_record.update(fixed_data)
def _validate_fixed_data(self, fixed_data: dict):
"""
This method will validate the fixed data
"""
# Can have more sophisticated checks here
# self.fixed_data_validataion_configuration
pass