mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
1529 lines
54 KiB
Python
1529 lines
54 KiB
Python
import warnings
|
|
from typing import Optional, get_origin, get_args, TypedDict, cast, TypeAlias, Literal, Callable
|
|
from backend.addresses.Address import Address
|
|
from dataclasses import fields
|
|
from datetime import datetime
|
|
from dataclasses import dataclass
|
|
from etl.epc.ValidationConfiguration import (
|
|
EPCRecordValidationConfiguration,
|
|
EPCDifferenceRecordValidationConfiguration,
|
|
EPCDifferenceRecordFixedDataValidationConfiguration,
|
|
)
|
|
from etl.epc.DataProcessor import EPCDataProcessor
|
|
from recommendations.rdsap_tables import england_wales_age_band_lookup, FLOOR_LEVEL_MAP
|
|
from etl.epc.settings import DATA_ANOMALY_MATCHES
|
|
import re
|
|
import os
|
|
import numpy as np
|
|
import pandas as pd
|
|
from typing import Any, Union, List
|
|
from etl.epc.settings import (
|
|
RDSAP_RESPONSE,
|
|
HEAT_DEMAND_RESPONSE,
|
|
CARBON_RESPONSE,
|
|
COMPONENT_FEATURES,
|
|
EFFICIENCY_FEATURES,
|
|
ROOM_FEATURES,
|
|
POST_SAP10_FEATURE,
|
|
)
|
|
from recommendations.recommendation_utils import estimate_number_of_floors
|
|
from utils.s3 import read_dataframe_from_s3_parquet
|
|
from utils.logger import setup_logger
|
|
from etl.epc.settings import EARLIEST_EPC_DATE
|
|
|
|
logger = setup_logger()
|
|
|
|
# TODO: Change these in the settings file
|
|
RDSAP_RESPONSE = RDSAP_RESPONSE.lower()
|
|
HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower()
|
|
CARBON_RESPONSE = CARBON_RESPONSE.lower()
|
|
COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES]
|
|
EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES]
|
|
|
|
ENVIRONMENT = os.environ.get("ENVIRONMENT", "dev")
|
|
DATA_BUCKET = os.environ.get(
|
|
"DATA_BUCKET", "retrofit-data-dev" if ENVIRONMENT == "dev" else None
|
|
)
|
|
|
|
pd.set_option("future.no_silent_downcasting", True)
|
|
|
|
RawEpcRow: TypeAlias = dict[str, str | None]
|
|
PreparedEpcValue: TypeAlias = str | int | float | bool | None
|
|
PreparedEpcRow: TypeAlias = dict[str, PreparedEpcValue]
|
|
|
|
|
|
class InputEpcRecords(TypedDict):
|
|
original_epc: RawEpcRow
|
|
full_sap_epc: RawEpcRow
|
|
old_data: list[RawEpcRow]
|
|
|
|
|
|
class CleaningRule(TypedDict, total=False):
|
|
cast: Callable[[Any], Any]
|
|
map: dict[Any, Any]
|
|
default: Any
|
|
anomaly_to: Any
|
|
|
|
|
|
CLEANING_RULES: dict[str, CleaningRule] = {
|
|
|
|
# -----------------------------
|
|
# BOOLEAN FLAGS
|
|
# -----------------------------
|
|
|
|
"mains-gas-flag": {
|
|
"map": {"Y": True, "N": False, True: True, False: False},
|
|
"anomaly_to": None,
|
|
},
|
|
|
|
"solar-water-heating-flag": {
|
|
"map": {"Y": "Y", "N": "N", "": "N", None: "N"},
|
|
},
|
|
|
|
# -----------------------------
|
|
# NUMERIC CASTS
|
|
# -----------------------------
|
|
|
|
"photo-supply": {
|
|
"cast": float,
|
|
"anomaly_to": None,
|
|
},
|
|
|
|
"energy-consumption-current": {
|
|
"cast": float,
|
|
},
|
|
|
|
"co2-emissions-current": {
|
|
"cast": float,
|
|
},
|
|
|
|
"wind-turbine-count": {
|
|
"cast": int,
|
|
"anomaly_to": None,
|
|
},
|
|
|
|
"number-open-fireplaces": {
|
|
"cast": int,
|
|
"default": 0
|
|
},
|
|
|
|
"extension-count": {
|
|
"cast": int,
|
|
"default": 0
|
|
},
|
|
|
|
"flat-storey-count": {
|
|
"cast": int,
|
|
"anomaly_to": None
|
|
},
|
|
|
|
"number-habitable-rooms": {
|
|
"cast": int,
|
|
"anomaly_to": None
|
|
},
|
|
|
|
# -----------------------------
|
|
# TO NONE
|
|
# -----------------------------
|
|
"mechanical-ventilation": {
|
|
"anomaly_to": None
|
|
},
|
|
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class EPCRecord:
|
|
"""
|
|
Base class for a EPC record
|
|
"""
|
|
|
|
# ------------------------------------------------------------------
|
|
# IDENTIFIERS / METADATA
|
|
# ------------------------------------------------------------------
|
|
|
|
uprn: Optional[int] = None
|
|
lmk_key: Optional[str] = None
|
|
building_reference_number: Optional[str] = None
|
|
report_type: Optional[str] = None
|
|
transaction_type: Optional[str] = None
|
|
uprn_source: Optional[str] = None
|
|
|
|
lodgement_date: Optional[str] = None
|
|
lodgement_datetime: Optional[str] = None
|
|
inspection_date: Optional[str] = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# ADDRESS / LOCATION DATA
|
|
# ------------------------------------------------------------------
|
|
|
|
address: Optional[str] = None
|
|
address1: Optional[str] = None
|
|
address2: Optional[str] = None
|
|
address3: Optional[str] = None
|
|
|
|
postcode: Optional[str] = None
|
|
posttown: Optional[str] = None
|
|
county: Optional[str] = None
|
|
|
|
local_authority: Optional[str] = None
|
|
local_authority_label: Optional[str] = None
|
|
constituency: Optional[str] = None
|
|
constituency_label: Optional[str] = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# PROPERTY CHARACTERISTICS
|
|
# ------------------------------------------------------------------
|
|
|
|
property_type: Optional[str] = None
|
|
built_form: Optional[str] = None
|
|
tenure: Optional[str] = None
|
|
floor_level: Optional[str] = None
|
|
flat_top_storey: Optional[str] = None
|
|
flat_storey_count: Optional[int] = None
|
|
|
|
glazed_area: Optional[str] = None
|
|
heat_loss_corridor: Optional[str] = None
|
|
unheated_corridor_length: Optional[float] = None
|
|
|
|
mains_gas_flag: Optional[str] = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# BUILDING FABRIC DESCRIPTIONS
|
|
# ------------------------------------------------------------------
|
|
|
|
walls_description: Optional[str] = None
|
|
floor_description: Optional[str] = None
|
|
roof_description: Optional[str] = None
|
|
windows_description: Optional[str] = None
|
|
|
|
walls_env_eff: Optional[str] = None
|
|
floor_env_eff: Optional[str] = None
|
|
roof_env_eff: Optional[str] = None
|
|
windows_env_eff: Optional[str] = None
|
|
mainheat_env_eff: Optional[str] = None
|
|
sheating_env_eff: Optional[str] = None
|
|
hot_water_env_eff: Optional[str] = None
|
|
mainheatc_env_eff: Optional[str] = None
|
|
|
|
walls_energy_eff: Optional[str] = None
|
|
floor_energy_eff: Optional[str] = None
|
|
roof_energy_eff: Optional[str] = None
|
|
windows_energy_eff: Optional[str] = None
|
|
hot_water_energy_eff: Optional[str] = None
|
|
sheating_energy_eff: Optional[str] = None
|
|
mainheat_energy_eff: Optional[str] = None
|
|
mainheatc_energy_eff: Optional[str] = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# HEATING / HOT WATER / SYSTEMS
|
|
# ------------------------------------------------------------------
|
|
|
|
mainheat_description: Optional[str] = None
|
|
mainheatcont_description: Optional[str] = None
|
|
secondheat_description: Optional[str] = None
|
|
hotwater_description: Optional[str] = None
|
|
main_fuel: Optional[str] = None
|
|
main_heating_controls: Optional[str] = None
|
|
|
|
mechanical_ventilation: Optional[str] = None
|
|
|
|
solar_water_heating_flag: Optional[str] = None
|
|
wind_turbine_count: Optional[int] = None
|
|
photo_supply: Optional[float] = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# LIGHTING
|
|
# ------------------------------------------------------------------
|
|
|
|
lighting_description: Optional[str] = None
|
|
lighting_env_eff: Optional[str] = None
|
|
lighting_energy_eff: Optional[str] = None
|
|
|
|
low_energy_lighting: Optional[float] = None
|
|
fixed_lighting_outlets_count: Optional[int] = None
|
|
low_energy_fixed_light_count: Optional[int] = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# ENERGY RATINGS
|
|
# ------------------------------------------------------------------
|
|
|
|
current_energy_rating: Optional[str] = None
|
|
potential_energy_rating: Optional[str] = None
|
|
|
|
current_energy_efficiency: Optional[int] = None
|
|
potential_energy_efficiency: Optional[float] = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# ENERGY / CARBON METRICS
|
|
# ------------------------------------------------------------------
|
|
|
|
energy_consumption_current: Optional[int] = None
|
|
energy_consumption_potential: Optional[float] = None
|
|
|
|
co2_emissions_current: Optional[float] = None
|
|
co2_emissions_potential: Optional[float] = None
|
|
|
|
co2_emiss_curr_per_floor_area: Optional[float] = None
|
|
|
|
environment_impact_current: Optional[int] = None
|
|
environment_impact_potential: Optional[float] = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# COST METRICS
|
|
# ------------------------------------------------------------------
|
|
|
|
heating_cost_current: Optional[float] = None
|
|
lighting_cost_current: Optional[float] = None
|
|
hot_water_cost_current: Optional[float] = None
|
|
|
|
heating_cost_potential: Optional[float] = None
|
|
lighting_cost_potential: Optional[float] = None
|
|
hot_water_cost_potential: Optional[float] = None
|
|
|
|
energy_tariff: Optional[str] = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# PROPERTY DIMENSIONS / COUNTS
|
|
# ------------------------------------------------------------------
|
|
|
|
total_floor_area: Optional[float] = None
|
|
floor_height: Optional[float] = None
|
|
|
|
number_habitable_rooms: Optional[float] = None
|
|
number_heated_rooms: Optional[float] = None
|
|
number_open_fireplaces: Optional[float] = None
|
|
|
|
extension_count: Optional[float] = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# GLAZING
|
|
# ------------------------------------------------------------------
|
|
|
|
glazed_type: Optional[str] = None
|
|
multi_glaze_proportion: Optional[float] = None
|
|
|
|
# ------------------------------------------------------------------
|
|
# CLEANING FLAG
|
|
# ------------------------------------------------------------------
|
|
# Indicates if the EPC record has been predicted. By default, false
|
|
estimated: Optional[bool] = False
|
|
sap_05_overwritten: Optional[bool] = False
|
|
has_been_remodelled: Optional[bool] = False
|
|
|
|
# ------------------------------------------------------------------
|
|
# MODEL FLAGS
|
|
# ------------------------------------------------------------------
|
|
|
|
is_post_sap10: Optional[bool] = None
|
|
|
|
run_mode: str = "training"
|
|
|
|
# ------------------------------------------------------------------
|
|
# INPUT DATA STRUCTURES
|
|
# ------------------------------------------------------------------
|
|
|
|
epc_records: Optional[InputEpcRecords] = None
|
|
address_metadata: Optional[Address] = None
|
|
# Raw EPC input (immutable)
|
|
original_epc: Optional[RawEpcRow] = None
|
|
|
|
# Working dictionary that gets cleaned
|
|
_prepared_epc: Optional[PreparedEpcRow] = None
|
|
# Record of differences applied by landlord data
|
|
landlord_differences: Optional[dict[str, PreparedEpcValue]] = None
|
|
|
|
# Supporting
|
|
full_sap_epc: Optional[RawEpcRow] = None
|
|
old_data: Optional[list[RawEpcRow]] = None
|
|
|
|
# # Metadata generated during processing
|
|
prepared_epc_delta_metadata: pd.DataFrame = None
|
|
cleaning_data: pd.DataFrame = None
|
|
|
|
# Not used in training mod but used in newdata mode
|
|
age_band: Optional[str] = None
|
|
construction_age_band: Optional[str] = None
|
|
year_built: Optional[int] = None
|
|
number_of_floors: Optional[int] = None
|
|
number_of_open_fireplaces: Optional[int] = None
|
|
heat_loss_corridor_bool: Optional[bool] = None
|
|
solar_water_heating_flag_bool: Optional[bool] = None
|
|
|
|
def __post_init__(self) -> None:
|
|
# We can have validation and cleaning steps for each of the fields
|
|
# self.WALLS_DESCRIPTION = 'check'
|
|
# Could also have cleaning of records if needed
|
|
|
|
if self.run_mode == "training":
|
|
self.validation_configuration = EPCRecordValidationConfiguration
|
|
return
|
|
|
|
# We are running in newdata mode
|
|
if self.epc_records is None:
|
|
raise ValueError("Must provide epc records if running in newdata mode")
|
|
|
|
# Immutable copy; raw record
|
|
self.original_epc = self.epc_records["original_epc"].copy()
|
|
|
|
# Working copy that we will clean and manipulate
|
|
self._prepared_epc = self.epc_records["original_epc"].copy()
|
|
|
|
self.full_sap_epc = self.epc_records["full_sap_epc"]
|
|
self.old_data = self.epc_records["old_data"]
|
|
|
|
if self.cleaning_data is None:
|
|
raise ValueError("Must provide cleaning data if running in newdata mode")
|
|
|
|
invalid_rules = [k for k in CLEANING_RULES if k not in self._prepared_epc]
|
|
if invalid_rules:
|
|
logger.warning(f"Cleaning rules for unknown fields: {invalid_rules}")
|
|
|
|
self._clean_records_using_epc_records()
|
|
self._clean_with_data_processor()
|
|
self._inject_address_metadata()
|
|
self._expand_prepared_epc_to_attributes()
|
|
self._identify_delta_between_prepared_and_original_records()
|
|
|
|
return
|
|
|
|
def insert_new_performance_values(
|
|
self, new_sap: float, new_epc: float, new_carbon: float, new_heat_demand: float,
|
|
):
|
|
"""
|
|
Given re-modelling for this property, is used to insert the new values and also keep a record of the
|
|
fact that re-modelling has taken place
|
|
:param new_sap:
|
|
:param new_epc:
|
|
:param new_carbon:
|
|
:param new_heat_demand:
|
|
:return:
|
|
"""
|
|
|
|
self.has_been_remodelled = True
|
|
# Update prepared epc
|
|
update_data = {
|
|
"current_energy_efficiency": new_sap,
|
|
"current_energy_rating": new_epc,
|
|
"co2_emissions_current": new_carbon,
|
|
"energy_consumption_current": new_heat_demand,
|
|
}
|
|
# Validate we're updating correct fields
|
|
for k in update_data:
|
|
if k not in self._prepared_epc:
|
|
raise ValueError(f"Attempting to update unknown field '{k}' in prepared EPC")
|
|
self._prepared_epc.update(update_data)
|
|
# Update dataclass attributes
|
|
self._expand_prepared_epc_to_attributes()
|
|
|
|
def _apply_averages_cleaning(self) -> None:
|
|
"""
|
|
Fills missing property dimension values using medians from cleaning_data.
|
|
"""
|
|
|
|
if self._prepared_epc is None:
|
|
raise ValueError("Prepared EPC missing")
|
|
|
|
if self.cleaning_data is None:
|
|
raise ValueError("Cleaning data required for averages cleaning")
|
|
|
|
variables = [
|
|
"number-habitable-rooms",
|
|
"number-heated-rooms",
|
|
"floor-height",
|
|
]
|
|
|
|
if not any(pd.isnull(self._prepared_epc.get(v)) for v in variables):
|
|
return
|
|
|
|
cleaning_data: pd.DataFrame = self.cleaning_data
|
|
|
|
clean_with = cleaning_data[
|
|
(cleaning_data["property_type"] == self._prepared_epc["property-type"])
|
|
]
|
|
|
|
if self._prepared_epc["local-authority"] in clean_with["local_authority"].values:
|
|
clean_with = clean_with[
|
|
clean_with["local_authority"] == self._prepared_epc["local-authority"]
|
|
]
|
|
|
|
floor_area = self._prepared_epc.get("total-floor-area")
|
|
|
|
if floor_area is not None:
|
|
subset = clean_with[
|
|
(
|
|
(clean_with["total_floor_area"].astype(float) <= floor_area * 1.1) &
|
|
(clean_with["total_floor_area"].astype(float) >= floor_area * 0.9)
|
|
)
|
|
]
|
|
if not subset.empty:
|
|
clean_with = subset
|
|
|
|
medians = {
|
|
"number-habitable-rooms": int(round(clean_with["number_habitable_rooms"].median())),
|
|
"number-heated-rooms": int(round(clean_with["number_heated_rooms"].median())),
|
|
"floor-height": float(clean_with["floor_height"].median()),
|
|
}
|
|
|
|
# heated rooms should never exceed habitable
|
|
if medians["number-heated-rooms"] > medians["number-habitable-rooms"]:
|
|
medians["number-heated-rooms"] = medians["number-habitable-rooms"]
|
|
|
|
for key, value in medians.items():
|
|
if pd.isnull(self._prepared_epc.get(key)):
|
|
self._prepared_epc[key] = value
|
|
|
|
def _apply_cleaning_rules(self) -> None:
|
|
"""
|
|
Apply simple field-level cleaning rules defined in CLEANING_RULES.
|
|
"""
|
|
|
|
if not self._prepared_epc:
|
|
raise ValueError("EPCRecord does not contain prepared EPC data")
|
|
|
|
for field, rule in CLEANING_RULES.items():
|
|
|
|
if field not in self._prepared_epc:
|
|
logger.warning(f"Cleaning rule defined for missing field '{field}'")
|
|
continue
|
|
|
|
value = self._prepared_epc[field]
|
|
|
|
# ------------------------------------------------
|
|
# 1. Mapping rules (highest priority)
|
|
# ------------------------------------------------
|
|
|
|
if "map" in rule and value in rule["map"]:
|
|
self._prepared_epc[field] = rule["map"][value]
|
|
continue
|
|
|
|
# ------------------------------------------------
|
|
# 2. Handle anomaly values
|
|
# ------------------------------------------------
|
|
|
|
if value in DATA_ANOMALY_MATCHES:
|
|
|
|
if "anomaly_to" in rule:
|
|
self._prepared_epc[field] = rule["anomaly_to"]
|
|
continue
|
|
|
|
if "default" in rule:
|
|
self._prepared_epc[field] = rule["default"]
|
|
continue
|
|
|
|
continue
|
|
|
|
# ------------------------------------------------
|
|
# 3. Casting rules
|
|
# ------------------------------------------------
|
|
|
|
if "cast" in rule and value is not None:
|
|
try:
|
|
if rule["cast"] is int:
|
|
self._prepared_epc[field] = int(float(value))
|
|
else:
|
|
self._prepared_epc[field] = rule["cast"](value)
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed casting field '{field}' value '{value}': {e}"
|
|
)
|
|
|
|
def _inject_address_metadata(self):
|
|
"""
|
|
Given metadata about an address, provided by the landlord on input, this method will inject it into the prepared
|
|
EPC record, to allow it to be used in cleaning and processing steps. This is particularly useful for cleaning
|
|
missing or anomalous location data, by using other location data provided by the landlord.
|
|
:return:
|
|
"""
|
|
|
|
addr = self.address_metadata
|
|
if addr is None:
|
|
# We don't always have address metadata and so we don't inject if it's not there
|
|
return
|
|
|
|
landlord_remapping = {
|
|
"total_floor_area": addr.landlord_total_floor_area_m2, # 1m tolerance on floor area to perform remap
|
|
"property_type": addr.landlord_property_type,
|
|
"built_form": addr.landlord_built_form,
|
|
|
|
# Components
|
|
"walls_description": addr.landlord_wall_construction,
|
|
"roof_description": addr.landlord_roof_construction,
|
|
"floor_description": addr.landlord_floor_construction,
|
|
"windows_description": addr.landlord_windows_type,
|
|
"main_fuel": addr.landlord_fuel_type,
|
|
"mainheat_description": addr.landlord_heating_system,
|
|
"mainheatcont_description": addr.landlord_heating_controls,
|
|
"hotwater_description": addr.landlord_hot_water_system,
|
|
|
|
# Efficiency
|
|
"walls_energy_eff": addr.landlord_wall_efficiency,
|
|
"roof_energy_eff": addr.landlord_roof_efficiency,
|
|
"windows_energy_eff": addr.landlord_windows_efficiency,
|
|
"mainheat_energy_eff": addr.landlord_heating_efficiency,
|
|
"mainheatc_energy_eff": addr.landlord_heating_controls_efficiency,
|
|
"hot_water_energy_eff": addr.landlord_hot_water_efficiency,
|
|
|
|
"multi_glaze_proportion": addr.landlord_multi_glaze_proportion * 100, # TODO: Fix this!
|
|
"construction_age_band": addr.landlord_construction_age_band,
|
|
}
|
|
|
|
# Sanity check - ensure valid keys
|
|
if any(k not in self._prepared_epc for k in landlord_remapping):
|
|
raise ValueError("Landlord remapping contains keys that are not in the EPC record")
|
|
|
|
self.landlord_differences = {} # Anything actaully changed
|
|
for k, v in landlord_remapping.items():
|
|
if k == "total_floor_area":
|
|
existing = self._prepared_epc.get(k)
|
|
if existing is not None and v is not None and abs(existing - v) > 1: # 1m tolerance
|
|
self.landlord_differences[k] = v
|
|
else:
|
|
if v != self._prepared_epc.get(k) and (not pd.isnull(v)) and (not pd.isnull(self._prepared_epc.get(k))):
|
|
self.landlord_differences[k] = v
|
|
|
|
self._prepared_epc.update(self.landlord_differences)
|
|
|
|
@staticmethod
|
|
def _calculate_days_to(lodgement_date: Union[str, pd.Series]) -> Union[int, pd.Series]:
|
|
if isinstance(lodgement_date, str):
|
|
return (
|
|
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
|
|
).days
|
|
|
|
return (
|
|
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
|
|
).dt.days
|
|
|
|
def _clean_with_data_processor(self) -> None:
|
|
"""
|
|
This method will clean the records using the data processor
|
|
"""
|
|
epc_data_processor = EPCDataProcessor(
|
|
data=self.epc_record_as_dataframe("_prepared_epc").copy(),
|
|
run_mode="newdata",
|
|
cleaning_averages=self.cleaning_data,
|
|
)
|
|
epc_data_processor.prepare_data()
|
|
|
|
record = epc_data_processor.data.to_dict(orient="records")[0]
|
|
|
|
self._prepared_epc = cast(PreparedEpcRow, record)
|
|
|
|
@staticmethod
|
|
def _cast_value(value: PreparedEpcValue, type_hint: Any) -> PreparedEpcValue:
|
|
|
|
origin = get_origin(type_hint)
|
|
args = get_args(type_hint)
|
|
|
|
# Handle Optional[T] / Union[T, None]
|
|
if origin is Union:
|
|
args = [a for a in get_args(type_hint) if a is not type(None)]
|
|
if len(args) == 1:
|
|
type_hint = args[0]
|
|
|
|
if type_hint is int:
|
|
return int(value)
|
|
|
|
if type_hint is float:
|
|
return float(value)
|
|
|
|
if type_hint is bool:
|
|
if isinstance(value, bool):
|
|
return value
|
|
return str(value).lower() in ["true", "1", "y", "yes"]
|
|
|
|
if type_hint is str:
|
|
return str(value)
|
|
|
|
return value
|
|
|
|
def _expand_prepared_epc_to_attributes(self):
|
|
"""
|
|
Expand prepared_epc dictionary into dataclass attributes.
|
|
Assumes prepared_epc keys are snake_case.
|
|
"""
|
|
|
|
field_map = {f.name: f for f in fields(self)}
|
|
|
|
for key, value in self._prepared_epc.items():
|
|
|
|
# Enforce schema consistency
|
|
if "-" in key:
|
|
raise ValueError(f"Invalid EPC key format (expected snake_case): {key}")
|
|
|
|
if key not in field_map:
|
|
# Ignore keys that are not part of the dataclass schema
|
|
continue
|
|
|
|
if value is None:
|
|
setattr(self, key, None)
|
|
continue
|
|
|
|
try:
|
|
cast_value = self._cast_value(value, field_map[key].type)
|
|
setattr(self, key, cast_value)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed casting field '{key}' with value '{value}': {e}")
|
|
setattr(self, key, value)
|
|
|
|
def _identify_delta_between_prepared_and_original_records(self):
|
|
"""
|
|
This method will identify the delta between the prepared and original records
|
|
"""
|
|
prepared_epc_df = self.epc_record_as_dataframe("_prepared_epc")
|
|
original_epc_df = self.epc_record_as_dataframe("original_epc")
|
|
|
|
df = pd.concat(
|
|
[prepared_epc_df, original_epc_df],
|
|
keys=["prepared_epc", "original_epc"],
|
|
axis=0,
|
|
)
|
|
|
|
same_index = df.apply(pd.Series.duplicated).any()
|
|
self.prepared_epc_delta_metadata = df[same_index[~same_index].index]
|
|
|
|
def _clean_records_using_epc_records(self) -> None:
|
|
"""
|
|
This method will clean the records
|
|
"""
|
|
|
|
self._apply_cleaning_rules()
|
|
|
|
self._clean_built_form()
|
|
self._clean_solar_hot_water()
|
|
self._clean_heat_loss_corridor()
|
|
self._clean_age_band()
|
|
self._clean_year_built()
|
|
self._clean_floor_area()
|
|
self._clean_property_dimensions()
|
|
self._clean_number_lighting_outlets()
|
|
self._clean_floor_level()
|
|
self._clean_floor_height()
|
|
self._clean_constituency()
|
|
self._clean_new_build_descriptions()
|
|
|
|
def epc_record_as_dataframe(
|
|
self,
|
|
epc_type: Literal["_prepared_epc", "original_epc"] = "_prepared_epc",
|
|
use_upper_columns: bool = True,
|
|
replace_empty_string: bool = False,
|
|
) -> pd.DataFrame:
|
|
"""
|
|
This method will return the dataframe representation of the epc record
|
|
"""
|
|
|
|
if epc_type not in ("_prepared_epc", "original_epc"):
|
|
raise ValueError(f"Invalid epc_type: {epc_type}")
|
|
|
|
source = getattr(self, epc_type)
|
|
if source is None:
|
|
raise ValueError(f"{epc_type} is None")
|
|
|
|
df = pd.DataFrame.from_dict(source, orient="index").T
|
|
|
|
if use_upper_columns:
|
|
df.columns = [x.upper().replace("-", "_") for x in df.columns]
|
|
|
|
if replace_empty_string:
|
|
df = df.replace("", np.nan).infer_objects(copy=False)
|
|
|
|
return df
|
|
|
|
def _clean_floor_height(self) -> None:
|
|
"""Remaps anomalies in floor height to the average floor height for the property type"""
|
|
floor_height_data = self.cleaning_data[
|
|
(self.cleaning_data["property_type"] == self._prepared_epc["property-type"])
|
|
& (self.cleaning_data["built_form"] == self._prepared_epc["built-form"])
|
|
]
|
|
average = float(np.mean(floor_height_data["floor_height"]))
|
|
sd = float(np.std(floor_height_data["floor_height"]))
|
|
# If we're in the top 0.5 percentile of floor heights, we'll set it to the average
|
|
if self._prepared_epc["floor-height"] > average + 10 * sd:
|
|
self._prepared_epc["floor-height"] = average
|
|
if self._prepared_epc["floor-height"] <= 1.665:
|
|
self._prepared_epc["floor-height"] = average
|
|
|
|
def _clean_new_build_descriptions(self) -> None:
|
|
for col in ["roof-description", "walls-description", "floor-description"]:
|
|
self._prepared_epc[col] = self._prepared_epc[col].replace("W/m²K", "W/m-¦K")
|
|
|
|
def _clean_constituency(self) -> None:
|
|
"""
|
|
We handle the single case of finding a missing constituency by using the local authority
|
|
"""
|
|
if pd.isnull(self._prepared_epc["constituency"]) or (
|
|
self._prepared_epc["constituency"] == ""
|
|
):
|
|
if self._prepared_epc["local-authority"] != "E06000044":
|
|
raise NotImplementedError(
|
|
"This function is only implemented for Portsmouth, in the single edgecase seen"
|
|
)
|
|
self._prepared_epc["constituency"] = "E14000883"
|
|
|
|
def _clean_floor_level(self) -> None:
|
|
"""
|
|
This method will clean the floor level, if empty or invalid
|
|
"""
|
|
if not self._prepared_epc:
|
|
raise ValueError("EPC Recrod doesn not contain epc data")
|
|
|
|
self._prepared_epc["floor-level"] = (
|
|
FLOOR_LEVEL_MAP[self._prepared_epc["floor-level"]]
|
|
if self._prepared_epc["floor-level"] not in DATA_ANOMALY_MATCHES
|
|
else None
|
|
)
|
|
|
|
def _clean_number_lighting_outlets(self) -> None:
|
|
"""
|
|
This method will clean the number of lighting outlets, if empty or invalid
|
|
"""
|
|
if not self._prepared_epc:
|
|
raise ValueError("EPC Recrod doesn not contain epc data")
|
|
|
|
if self._prepared_epc["fixed-lighting-outlets-count"] in DATA_ANOMALY_MATCHES:
|
|
# We check old EPCs and the full SAP EPC
|
|
|
|
lighting_data = []
|
|
|
|
if len(self.old_data):
|
|
lighting_data.extend(
|
|
[
|
|
int(old_record["fixed-lighting-outlets-count"])
|
|
for old_record in self.old_data
|
|
if old_record["fixed-lighting-outlets-count"] != ""
|
|
]
|
|
)
|
|
|
|
if len(self.full_sap_epc):
|
|
if self.full_sap_epc["fixed-lighting-outlets-count"] != "":
|
|
lighting_data.append(
|
|
int(self.full_sap_epc["fixed-lighting-outlets-count"])
|
|
)
|
|
|
|
if lighting_data:
|
|
self._prepared_epc["fixed-lighting-outlets-count"] = round(
|
|
np.median(lighting_data)
|
|
)
|
|
else:
|
|
# Use averages from the cleaning dataset, based on the property type, built form, construction age
|
|
# band and local authority
|
|
|
|
cleaning_data = self.cleaning_data.copy()
|
|
# When running in new-data more, the columns will have been coerced to lower case so we push them
|
|
# back to upper case
|
|
if self.run_mode == "newdata":
|
|
cleaning_data.columns = [x.upper() for x in cleaning_data.columns]
|
|
|
|
cleaned_property_data = EPCDataProcessor.apply_averages_cleaning(
|
|
data_to_clean=self.epc_record_as_dataframe(
|
|
"_prepared_epc", replace_empty_string=True
|
|
),
|
|
cleaning_data=cleaning_data,
|
|
cols_to_merge_on=[
|
|
"PROPERTY_TYPE",
|
|
"BUILT_FORM",
|
|
"CONSTRUCTION_AGE_BAND",
|
|
"LOCAL_AUTHORITY",
|
|
],
|
|
)
|
|
self._prepared_epc["fixed-lighting-outlets-count"] = round(
|
|
cleaned_property_data["FIXED_LIGHTING_OUTLETS_COUNT"].values[0]
|
|
)
|
|
else:
|
|
self._prepared_epc["fixed-lighting-outlets-count"] = float(
|
|
self._prepared_epc["fixed-lighting-outlets-count"])
|
|
|
|
def _filter_property_dimensions(self, property_dimensions) -> pd.Series:
|
|
"""
|
|
Will filter the property dimensions dataframe to only include the relevant rows for the property
|
|
:param property_dimensions:
|
|
:return: filtered property dimensions dataframe
|
|
"""
|
|
|
|
result = property_dimensions[
|
|
(property_dimensions["PROPERTY_TYPE"] == self._prepared_epc["property-type"])
|
|
]
|
|
|
|
if (
|
|
(self.construction_age_band not in DATA_ANOMALY_MATCHES) and
|
|
(self.construction_age_band in result["CONSTRUCTION_AGE_BAND"].values)
|
|
):
|
|
result = result[
|
|
(result["CONSTRUCTION_AGE_BAND"] == self.construction_age_band)
|
|
]
|
|
|
|
if (
|
|
self._prepared_epc["built-form"] not in DATA_ANOMALY_MATCHES
|
|
and self._prepared_epc["built-form"] in result["BUILT_FORM"].values
|
|
):
|
|
result = result[(result["BUILT_FORM"] == self._prepared_epc["built-form"])]
|
|
|
|
return result[
|
|
[
|
|
"NUMBER_HABITABLE_ROOMS",
|
|
"NUMBER_HEATED_ROOMS",
|
|
"TOTAL_FLOOR_AREA",
|
|
"FLOOR_HEIGHT",
|
|
]
|
|
].mean()
|
|
|
|
def _clean_property_dimensions(self) -> None:
|
|
"""
|
|
Cleans up the number of floors, number of habitable rooms, and the floor height
|
|
"""
|
|
|
|
if not self._prepared_epc:
|
|
raise ValueError("EPC Record doesn not contain epc data")
|
|
|
|
if (
|
|
(self._prepared_epc["number-habitable-rooms"] in DATA_ANOMALY_MATCHES)
|
|
or (self._prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES)
|
|
or (self._prepared_epc["number-heated-rooms"] in DATA_ANOMALY_MATCHES)
|
|
):
|
|
# TODO - this probably shouldn't live here - but we only need to use this for specific properties
|
|
# when we meet this condition
|
|
property_dimensions: pd.DataFrame = read_dataframe_from_s3_parquet(
|
|
bucket_name=DATA_BUCKET,
|
|
file_key=f"property_dimensions/{self._prepared_epc['local-authority']}.parquet",
|
|
)
|
|
self.property_dimensions: pd.Series = self._filter_property_dimensions(
|
|
property_dimensions
|
|
)
|
|
|
|
if self._prepared_epc["number-habitable-rooms"] in DATA_ANOMALY_MATCHES:
|
|
self._prepared_epc["number-habitable-rooms"] = float(
|
|
self.property_dimensions["NUMBER_HABITABLE_ROOMS"].round()
|
|
)
|
|
else:
|
|
self._prepared_epc["number-habitable-rooms"] = float(
|
|
self._prepared_epc["number-habitable-rooms"]
|
|
)
|
|
|
|
if self._prepared_epc["number-heated-rooms"] in DATA_ANOMALY_MATCHES:
|
|
self._prepared_epc["number-heated-rooms"] = float(
|
|
self.property_dimensions["NUMBER_HEATED_ROOMS"].round()
|
|
)
|
|
else:
|
|
self._prepared_epc["number-heated-rooms"] = float(
|
|
self._prepared_epc["number-heated-rooms"]
|
|
)
|
|
|
|
self.number_of_floors = estimate_number_of_floors(
|
|
self._prepared_epc["property-type"]
|
|
)
|
|
|
|
if (
|
|
self._prepared_epc["floor-height"] == ""
|
|
or self._prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES
|
|
):
|
|
self._prepared_epc["floor-height"] = float(
|
|
self.property_dimensions["FLOOR_HEIGHT"].round(2)
|
|
)
|
|
else:
|
|
self._prepared_epc["floor-height"] = float(self._prepared_epc["floor-height"])
|
|
|
|
def _clean_floor_area(self) -> None:
|
|
"""
|
|
This method will clean the floor area, if empty or invalid
|
|
"""
|
|
if not self._prepared_epc:
|
|
raise ValueError("EPC Recrod doesn not contain epc data")
|
|
|
|
if self._prepared_epc["total-floor-area"] is None:
|
|
return
|
|
|
|
self._prepared_epc["total-floor-area"] = float(
|
|
self._prepared_epc["total-floor-area"]
|
|
)
|
|
|
|
# We handle the edge case of floor area being 0. We set it to zero and it is cleaned by
|
|
# _clean_with_data_processor
|
|
if self._prepared_epc["total-floor-area"] == 0:
|
|
print(
|
|
"Edge case of floor area being zero - will set to none and will be cleaned in "
|
|
"_clean_with_data_processor"
|
|
)
|
|
self._prepared_epc["total-floor-area"] = None
|
|
|
|
def _clean_heat_loss_corridor(self) -> None:
|
|
"""
|
|
This method will clean the heat loss corridor, if empty or invalid
|
|
"""
|
|
if not self._prepared_epc:
|
|
raise ValueError("EPC Recrod doesn not contain epc data")
|
|
|
|
valid_values = ["no corridor", "unheated corridor", "heated corridor"]
|
|
|
|
boolean_map = {
|
|
"no corridor": False,
|
|
"unheated corridor": True,
|
|
"heated corridor": False,
|
|
}
|
|
|
|
self._prepared_epc["heat-loss-corridor"] = (
|
|
"no corridor"
|
|
if self._prepared_epc["heat-loss-corridor"] in DATA_ANOMALY_MATCHES
|
|
else self._prepared_epc["heat-loss-corridor"]
|
|
)
|
|
if self._prepared_epc["heat-loss-corridor"] not in valid_values:
|
|
self._prepared_epc["heat-loss-corridor"] = "no corridor"
|
|
|
|
self._prepared_epc["unheated-corridor-length"] = (
|
|
float(self._prepared_epc["unheated-corridor-length"])
|
|
if self._prepared_epc["unheated-corridor-length"] not in DATA_ANOMALY_MATCHES
|
|
else None
|
|
)
|
|
|
|
# We create boolean versions of heat-loss-corridor
|
|
self.heat_loss_corridor_bool = boolean_map[
|
|
self._prepared_epc["heat-loss-corridor"]
|
|
]
|
|
|
|
def _clean_solar_hot_water(self) -> None:
|
|
"""
|
|
This method will clean the solar hot water, if empty or invalid
|
|
"""
|
|
if not self._prepared_epc:
|
|
raise ValueError("EPC Recrod doesn not contain epc data")
|
|
|
|
value_map = {"Y": "Y", "N": "N", "": "N", None: "N"}
|
|
|
|
boolean_map = {
|
|
"Y": True,
|
|
"N": False,
|
|
}
|
|
|
|
self._prepared_epc["solar-water-heating-flag"] = value_map[
|
|
self._prepared_epc["solar-water-heating-flag"]
|
|
]
|
|
|
|
# Create a boolean version for storage in the database
|
|
self.solar_water_heating_flag_bool = boolean_map[
|
|
self._prepared_epc["solar-water-heating-flag"]
|
|
]
|
|
|
|
def _clean_built_form(self) -> None:
|
|
"""
|
|
This method will clean the build form, if empty or invalid
|
|
"""
|
|
if not self._prepared_epc:
|
|
raise ValueError("EPC Recrod doesn not contain epc data")
|
|
|
|
if self._prepared_epc["built-form"] in DATA_ANOMALY_MATCHES:
|
|
if self._prepared_epc["property-type"] in ["Flat", "Maisonette"]:
|
|
self._prepared_epc["built-form"] = "End-Terrace"
|
|
else:
|
|
self._prepared_epc["built-form"] = "Semi-Detached"
|
|
|
|
def _clean_age_band(self) -> None:
|
|
"""
|
|
This method will clean the age band, if empty or invalid
|
|
"""
|
|
if not self._prepared_epc:
|
|
raise ValueError("EPC Recrod doesn not contain epc data")
|
|
|
|
self._prepared_epc["construction-age-band"] = (
|
|
EPCDataProcessor.clean_construction_age_band(
|
|
self._prepared_epc["construction-age-band"]
|
|
)
|
|
)
|
|
|
|
if self._prepared_epc["construction-age-band"] in DATA_ANOMALY_MATCHES:
|
|
if self.old_data:
|
|
# Take the most recent
|
|
old_age_bands = [
|
|
old_record["lodgement-datetime"]
|
|
for old_record in self.old_data
|
|
if old_record["construction-age-band"] not in DATA_ANOMALY_MATCHES
|
|
]
|
|
|
|
if old_age_bands:
|
|
max_datetime = max(old_age_bands)
|
|
|
|
most_recent = [
|
|
old_record
|
|
for old_record in self.old_data
|
|
if old_record["lodgement-datetime"] == max_datetime
|
|
]
|
|
|
|
self._prepared_epc["construction-age-band"] = (
|
|
EPCDataProcessor.clean_construction_age_band(
|
|
most_recent[0]["construction-age-band"]
|
|
)
|
|
)
|
|
|
|
self.construction_age_band = self._prepared_epc["construction-age-band"]
|
|
self.age_band = england_wales_age_band_lookup.get(self.construction_age_band)
|
|
|
|
if (self._prepared_epc["transaction-type"] == "new dwelling") and (
|
|
self.age_band is None
|
|
):
|
|
self.age_band = "L"
|
|
self.construction_age_band = "England and Wales: 2012 onwards"
|
|
self._prepared_epc["construction-age-band"] = self.construction_age_band
|
|
|
|
if self.age_band is None:
|
|
self.age_band = "C"
|
|
self.construction_age_band = "England and Wales: 1930-1949"
|
|
self._prepared_epc["construction-age-band"] = self.construction_age_band
|
|
|
|
def _clean_year_built(self) -> None:
|
|
"""
|
|
This method will clean the year built, if empty or invalid
|
|
"""
|
|
if self.full_sap_epc:
|
|
lodgement_date = self.full_sap_epc["lodgement-date"]
|
|
|
|
if lodgement_date is None:
|
|
raise ValueError("full_sap_epc lodgement-date is missing")
|
|
|
|
self.year_built = datetime.strptime(str(lodgement_date), "%Y-%m-%d").year
|
|
|
|
return
|
|
|
|
if self.construction_age_band not in DATA_ANOMALY_MATCHES:
|
|
# Take the lower limit. If we're pessimistic about the age of the property, that at least means we have
|
|
# more options for recommendations if that age falls before the year that insulation in walls became
|
|
# common practice
|
|
band = [
|
|
int(x)
|
|
for x in re.findall(
|
|
r"\b\d{4}\b", self._prepared_epc["construction-age-band"]
|
|
)
|
|
]
|
|
self.year_built = band[0]
|
|
return
|
|
|
|
# We don't know when the property was built
|
|
self.year_built = None
|
|
|
|
def _field_validation(self) -> None:
|
|
"""
|
|
This method will validate each of the fields in the EPC record
|
|
"""
|
|
|
|
for record_key, validation_config in self.validation_configuration.items():
|
|
# Get the variable named record key from self
|
|
field_value = self.__dict__[record_key]
|
|
|
|
if validation_config["type"] == "string":
|
|
self._validate_string(record_key, field_value, validation_config)
|
|
elif validation_config["type"] == "float":
|
|
self._validate_float(record_key, field_value, validation_config)
|
|
else:
|
|
raise ValueError(
|
|
f"Validation type {validation_config['type']} not supported"
|
|
)
|
|
|
|
@staticmethod
|
|
def _validate_string(
|
|
record_key: str, field_value: Union[str, float], validation_config: dict
|
|
) -> None:
|
|
"""
|
|
Validate a string field
|
|
"""
|
|
if not isinstance(field_value, str):
|
|
raise ValueError(
|
|
f"Field {record_key} has value {field_value} which is not a string"
|
|
)
|
|
|
|
if "function" in validation_config:
|
|
try:
|
|
validation_config["function"](field_value)
|
|
except:
|
|
raise ValueError(
|
|
f"Field {record_key} has value {field_value} which does not pass the validation function "
|
|
f"{validation_config['function']}"
|
|
)
|
|
|
|
if validation_config["acceptable_values"] is not None:
|
|
if field_value not in validation_config["acceptable_values"]:
|
|
raise ValueError(
|
|
f"Field {record_key} has value {field_value} which is not in the acceptable values of "
|
|
f"{validation_config['acceptable_values']}"
|
|
)
|
|
|
|
@staticmethod
|
|
def _validate_float(
|
|
record_key: str, field_value: Union[str, float], validation_config: dict
|
|
) -> None:
|
|
"""
|
|
Validate a float field
|
|
"""
|
|
if not isinstance(field_value, float):
|
|
raise ValueError(
|
|
f"Field {record_key} has value {field_value} which is not a float"
|
|
)
|
|
|
|
if "function" in validation_config:
|
|
try:
|
|
validation_config["function"](field_value)
|
|
except:
|
|
raise ValueError(
|
|
f"Field {record_key} has value {field_value} which does not pass the validation function "
|
|
f"{validation_config['function']}"
|
|
)
|
|
|
|
if validation_config["range"] is not None:
|
|
if (
|
|
field_value < validation_config["range"][0]
|
|
or field_value > validation_config["range"][1]
|
|
):
|
|
raise ValueError(
|
|
f"Field {record_key} has value {field_value} which is not in the acceptable range of "
|
|
f"{validation_config['range']}"
|
|
)
|
|
|
|
def create_epc_difference_record(self, other, fixed_data, auto_sort: bool = True):
|
|
"""
|
|
This method will create the difference record between the two records
|
|
"""
|
|
if not isinstance(other, EPCRecord):
|
|
raise ValueError("Can only subtract EPCRecord from EPCRecord")
|
|
|
|
difference_record = EPCDifferenceRecord(
|
|
record1=self, record2=other, auto_sort=auto_sort
|
|
)
|
|
difference_record.append_fixed_data(fixed_data)
|
|
|
|
return difference_record
|
|
|
|
def _require_prepared_epc(self) -> None:
|
|
if self._prepared_epc is None:
|
|
raise ValueError("EPCRecord does not contain prepared EPC data")
|
|
|
|
def __sub__(self, other):
|
|
"""
|
|
This method will return the difference between two EPC records
|
|
"""
|
|
if not isinstance(other, EPCRecord):
|
|
raise ValueError("Can only subtract EPCRecord from EPCRecord")
|
|
|
|
print("Deprecated method, use create_EPCDifferenceRecord instead")
|
|
|
|
difference_record = EPCDifferenceRecord(
|
|
record1=self, record2=other, auto_sort=True
|
|
)
|
|
|
|
return difference_record
|
|
|
|
def __gt__(self, other):
|
|
"""
|
|
This method will return True if the EPC record is greater than or equal to the other
|
|
"""
|
|
if not isinstance(other, EPCRecord):
|
|
raise ValueError("Can only compare EPCRecord to EPCRecord")
|
|
|
|
return self.__dict__[RDSAP_RESPONSE] > other.__dict__[RDSAP_RESPONSE]
|
|
|
|
def __ge__(self, other):
|
|
"""
|
|
This method will return True if the EPC record is greater than or equal to the other
|
|
"""
|
|
if not isinstance(other, EPCRecord):
|
|
raise ValueError("Can only compare EPCRecord to EPCRecord")
|
|
|
|
return self.__dict__[RDSAP_RESPONSE] >= other.__dict__[RDSAP_RESPONSE]
|
|
|
|
def __lt__(self, other):
|
|
"""
|
|
This method will return True if the EPC record is greater than or equal to the other
|
|
"""
|
|
if not isinstance(other, EPCRecord):
|
|
raise ValueError("Can only compare EPCRecord to EPCRecord")
|
|
|
|
return self.__dict__[RDSAP_RESPONSE] < other.__dict__[RDSAP_RESPONSE]
|
|
|
|
def __le__(self, other):
|
|
"""
|
|
This method will return True if the EPC record is greater than or equal to the other
|
|
"""
|
|
if not isinstance(other, EPCRecord):
|
|
raise ValueError("Can only compare EPCRecord to EPCRecord")
|
|
|
|
return self.__dict__[RDSAP_RESPONSE] <= other.__dict__[RDSAP_RESPONSE]
|
|
|
|
def to_dict(
|
|
self,
|
|
case: Literal["snake", "kebab"] = "kebab",
|
|
source: Literal["prepared", "attributes"] = "prepared",
|
|
) -> dict[str, Any]:
|
|
|
|
if source == "prepared":
|
|
if self._prepared_epc is None:
|
|
raise ValueError("Prepared EPC not available")
|
|
data = self._prepared_epc.copy()
|
|
|
|
elif source == "attributes":
|
|
data = {
|
|
k: v for k, v in vars(self).items()
|
|
if not k.startswith("_")
|
|
}
|
|
|
|
else:
|
|
raise ValueError(f"Unknown source: {source}")
|
|
|
|
if case == "snake":
|
|
return {k.replace("-", "_"): v for k, v in data.items()}
|
|
|
|
if case == "kebab":
|
|
return {k.replace("_", "-"): v for k, v in data.items()}
|
|
|
|
return data
|
|
|
|
def get(
|
|
self,
|
|
key: str | list[str],
|
|
return_asdict: bool = False,
|
|
key_suffix: str | None = None,
|
|
) -> PreparedEpcValue | list[PreparedEpcValue] | dict[str, PreparedEpcValue]:
|
|
|
|
"""
|
|
Retrieves the value(s) for the specified key(s) from the prepared EPC data.
|
|
:param key: A single key (str) or a list of keys (list[str]) to retrieve values for.
|
|
:param return_asdict: If True and key is a list, returns a dictionary of key-value pairs instead of a list of
|
|
values.
|
|
:param key_suffix: An optional suffix to append to each key in the returned dictionary when return_asdict is
|
|
True.
|
|
:return: The value(s) corresponding to the specified key(s). Returns a single value if key is a string,
|
|
a list of values if key is a list and return_asdict is False, or a dictionary of key-value pairs if key is a
|
|
list and return_asdict is True.
|
|
"""
|
|
|
|
if isinstance(key, str):
|
|
return self.__dict__.get(key)
|
|
|
|
if isinstance(key, list):
|
|
|
|
if return_asdict:
|
|
result = {k: self.__dict__.get(k) for k in key}
|
|
|
|
if key_suffix:
|
|
result = {f"{k}{key_suffix}": v for k, v in result.items()}
|
|
|
|
return result
|
|
|
|
return [self.__dict__.get(k) for k in key]
|
|
|
|
raise TypeError(f"Key {key} is not a recognised type")
|
|
|
|
@property
|
|
def prepared_epc(self):
|
|
warnings.warn(
|
|
"Accessing prepared_epc directly is deprecated, use get method instead",
|
|
DeprecationWarning,
|
|
stacklevel=2,
|
|
)
|
|
return self._prepared_epc
|
|
|
|
|
|
class EPCDifferenceRecord:
|
|
"""
|
|
Base class for the difference between two EPC records
|
|
"""
|
|
|
|
def __init__(self, record1: EPCRecord, record2: EPCRecord, auto_sort: bool = False) -> None:
|
|
"""
|
|
This method will initialise the EPCDifferenceRecord
|
|
Defaults usage is with record2 to have the higher RDSAP score
|
|
"""
|
|
self.record1 = record1
|
|
self.record2 = record2
|
|
self.earliest_record = (
|
|
record1 if record1.lodgement_date < record2.lodgement_date else record2
|
|
)
|
|
self.flag_fabric_consistency = False
|
|
self.difference_record = {}
|
|
|
|
self.difference_validation_configuration = (
|
|
EPCDifferenceRecordValidationConfiguration
|
|
)
|
|
self.fixed_data_validation_configuration = (
|
|
EPCDifferenceRecordFixedDataValidationConfiguration
|
|
)
|
|
|
|
if auto_sort and (self.record2 <= self.record1):
|
|
self.record1, self.record2 = self.record2, self.record1
|
|
|
|
self._construct_difference_record()
|
|
self._validate_difference_record()
|
|
# self._detect_fabric_consistency()
|
|
|
|
def _construct_difference_record(self) -> None:
|
|
"""
|
|
This method will construct the difference record between the two records
|
|
"""
|
|
|
|
rdsap_change = self.record2.get(RDSAP_RESPONSE) - self.record1.get(
|
|
RDSAP_RESPONSE
|
|
)
|
|
heat_demand_change = self.record2.get(HEAT_DEMAND_RESPONSE) - self.record1.get(
|
|
HEAT_DEMAND_RESPONSE
|
|
)
|
|
carbon_change = self.record2.get(CARBON_RESPONSE) - self.record1.get(
|
|
CARBON_RESPONSE
|
|
)
|
|
|
|
component_variables = (
|
|
COMPONENT_FEATURES
|
|
+ EFFICIENCY_FEATURES
|
|
+ ROOM_FEATURES
|
|
+ POST_SAP10_FEATURE
|
|
)
|
|
ending_record = self.record2.get(
|
|
component_variables + ["lodgement_date"],
|
|
return_asdict=True,
|
|
key_suffix="_ending",
|
|
)
|
|
starting_record = self.record1.get(
|
|
component_variables + ["lodgement_date"],
|
|
return_asdict=True,
|
|
key_suffix="_starting",
|
|
)
|
|
|
|
self.difference_record = {
|
|
"uprn": self.record1.get("uprn"),
|
|
"rdsap_change": rdsap_change,
|
|
"heat_demand_change": heat_demand_change,
|
|
"carbon_change": carbon_change,
|
|
"sap_starting": self.record1.get(RDSAP_RESPONSE),
|
|
"sap_ending": self.record2.get(RDSAP_RESPONSE),
|
|
"heat_demand_starting": self.record1.get(HEAT_DEMAND_RESPONSE),
|
|
"heat_demand_ending": self.record2.get(HEAT_DEMAND_RESPONSE),
|
|
"carbon_starting": self.record1.get(CARBON_RESPONSE),
|
|
"carbon_ending": self.record2.get(CARBON_RESPONSE),
|
|
"lighting_cost_starting": self.record1.get("lighting_cost_current"),
|
|
"lighting_cost_ending": self.record2.get("lighting_cost_current"),
|
|
"heating_cost_starting": self.record1.get("heating_cost_current"),
|
|
"heating_cost_ending": self.record2.get("heating_cost_current"),
|
|
"hot_water_cost_starting": self.record1.get("hot_water_cost_current"),
|
|
"hot_water_cost_ending": self.record2.get("hot_water_cost_current"),
|
|
"potential_energy_efficiency": self.earliest_record.get(
|
|
"potential_energy_efficiency"
|
|
),
|
|
"environment_impact_potential": self.earliest_record.get(
|
|
"environment_impact_potential"
|
|
),
|
|
"energy_consumption_potential": self.earliest_record.get(
|
|
"energy_consumption_potential"
|
|
),
|
|
"co2_emissions_potential": self.earliest_record.get(
|
|
"co2_emissions_potential"
|
|
),
|
|
**ending_record,
|
|
**starting_record,
|
|
}
|
|
|
|
def _validate_difference_record(self):
|
|
"""
|
|
This method will validate the difference record
|
|
"""
|
|
pass
|
|
|
|
def compare_fields_in_records(self, fields: List[str]):
|
|
"""
|
|
This method will compare the records, for specific fields
|
|
"""
|
|
|
|
all_equal = True
|
|
for field in fields:
|
|
if self.record1.get(field) != self.record2.get(field):
|
|
return False
|
|
|
|
if all_equal:
|
|
return True
|
|
|
|
return False
|
|
|
|
def get(self, key: str) -> PreparedEpcValue:
|
|
"""
|
|
This method will return the value of the key
|
|
"""
|
|
return (
|
|
self.difference_record[key]
|
|
if key in self.difference_record.keys()
|
|
else None
|
|
)
|
|
|
|
def append_fixed_data(self, fixed_data: dict) -> None:
|
|
"""
|
|
This method will append fixed data to the difference record
|
|
"""
|
|
self._validate_fixed_data(fixed_data)
|
|
self.difference_record.update(fixed_data)
|
|
|
|
def _validate_fixed_data(self, fixed_data: dict) -> None:
|
|
"""
|
|
This method will validate the fixed data
|
|
"""
|
|
|
|
# Can have more sophisticated checks here
|
|
# self.fixed_data_validataion_configuration
|
|
|
|
pass
|
|
|
|
def ensure_adequate_data(self) -> bool:
|
|
"""
|
|
This method will ensure that the difference record has adequate data, to keep record, even if rdsap change is
|
|
zero
|
|
Can move into the initiation of the difference record
|
|
"""
|
|
wall_check = self.record1.walls_description == self.record2.walls_description
|
|
|
|
floor_check = self.record1.floor_description == self.record2.floor_description
|
|
|
|
roof_check = self.record1.roof_description == self.record2.roof_description
|
|
|
|
mainheat_check = (
|
|
self.record1.mainheat_description == self.record2.mainheat_description
|
|
)
|
|
|
|
windows_check = (
|
|
self.record1.windows_description == self.record2.windows_description
|
|
)
|
|
|
|
solar_water_heating_flag_check = (
|
|
self.record1.solar_water_heating_flag
|
|
== self.record2.solar_water_heating_flag
|
|
)
|
|
|
|
solar_pv_check = self.record1.photo_supply == self.record2.photo_supply
|
|
|
|
heating_control_check = (
|
|
self.record1.mainheatcont_description
|
|
== self.record2.mainheatcont_description
|
|
)
|
|
|
|
extension_count_check = (
|
|
self.record1.extension_count == self.record2.extension_count
|
|
)
|
|
|
|
floor_height_check = (
|
|
abs(1 - (self.record1.floor_height / self.record2.floor_height)) < 0.05
|
|
)
|
|
|
|
total_floor_area_check = (
|
|
abs(1 - (self.record1.total_floor_area / self.record2.total_floor_area))
|
|
< 0.05
|
|
)
|
|
|
|
if all(
|
|
[
|
|
wall_check,
|
|
floor_check,
|
|
roof_check,
|
|
mainheat_check,
|
|
windows_check,
|
|
solar_water_heating_flag_check,
|
|
extension_count_check,
|
|
floor_height_check,
|
|
total_floor_area_check,
|
|
solar_pv_check,
|
|
heating_control_check,
|
|
]
|
|
):
|
|
return True
|
|
else:
|
|
return False
|