Merge pull request #271 from Hestia-Homes/new-etl-tests

New etl tests
This commit is contained in:
KhalimCK 2024-01-16 19:16:17 +00:00 committed by GitHub
commit 255bfc182d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 371 additions and 616 deletions

View file

@ -7,7 +7,8 @@ import pandas as pd
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.Dataset import TrainingDataset
from etl.epc.settings import LATEST_FIELD, MANDATORY_FIXED_FEATURES, POTENTIAL_COLUMNS, EFFICIENCY_FEATURES, BUILT_FORM_REMAP
from etl.epc.settings import LATEST_FIELD, MANDATORY_FIXED_FEATURES, POTENTIAL_COLUMNS, EFFICIENCY_FEATURES, \
BUILT_FORM_REMAP
from etl.epc_clean.epc_attributes.all_cleaners import all_cleaner_map
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from utils.logger import setup_logger
@ -18,7 +19,6 @@ from recommendations.recommendation_utils import (
estimate_perimeter, get_wall_type, estimate_external_wall_area, esimtate_pitched_roof_area, estimate_windows
)
ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev')
DATA_BUCKET = os.environ.get('DATA_BUCKET', 'retrofit-data-dev' if ENVIRONMENT == 'dev' else None)
@ -49,8 +49,9 @@ class Property(Definitions):
lighting = None
spatial = None
base_difference_record = None
def __init__(self, id, postcode, address, epc_record, data=None):
def __init__(self, id, postcode, address, epc_record):
self.epc_record = epc_record
@ -58,7 +59,7 @@ class Property(Definitions):
self.address = address
self.postcode = postcode
self.data = {k.replace("_", "-"): v for k,v in epc_record.get("prepared_epc").items()}
self.data = {k.replace("_", "-"): v for k, v in epc_record.get("prepared_epc").items()}
self.old_data = epc_record.get("old_data")
self.property_dimensions = None
@ -135,7 +136,7 @@ class Property(Definitions):
print("NEED TO CHANGE THE DASH TO LOWER CASE")
fixed_data_col_names = [x.lower().replace("_", "-") for x in fixed_data_col_names]
fixed_data = {k.replace("-", "_"):v for k,v in self.data.items() if k in fixed_data_col_names}
fixed_data = {k.replace("-", "_"): v for k, v in self.data.items() if k in fixed_data_col_names}
difference_record.append_fixed_data(fixed_data)
@ -143,28 +144,28 @@ class Property(Definitions):
# TODO: adjust the base difference record with the previously calculated u values + features
# estimated_perimeter is different to the perimeter in the epc record
# self.base_difference_record.df
def adjust_difference_record_with_recommendations(self, property_recommendations):
"""
This method will adjust the difference record, based on the recommendations made for the property
:param recommendations: dictionary of recommendations for the property
:return:
:param property_recommendations: dictionary of recommendations for the property
"""
self.recommendations_scoring_data = []
for recommendations_by_type in property_recommendations:
for i, rec in enumerate(recommendations_by_type):
recommendation_record = self.base_difference_record.df.to_dict("records")[0].copy()
scoring_dict = self.create_recommendation_scoring_data(
recommendation=rec,
property_id=self.id, recommendation_record=recommendation_record, recommendation=rec,
)
scoring_dict['id'] = "+".join([str(self.id), str(rec["recommendation_id"])])
self.recommendations_scoring_data.append(scoring_dict)
def create_recommendation_scoring_data(self, recommendation: dict):
recommendation_record = self.base_difference_record.df.to_dict("records")[0].copy()
@staticmethod
def create_recommendation_scoring_data(property_id, recommendation_record, recommendation: dict):
for col in [
"walls_insulation_thickness", "floor_insulation_thickness", "roof_insulation_thickness"
@ -180,41 +181,100 @@ class Property(Definitions):
recommendation_record["walls_insulation_thickness_ending"] = "above average"
recommendation_record["walls_energy_eff_ending"] = "Good"
else:
wind_turbine_count = int(wind_turbine_count)
if recommendation_record["walls_thermal_transmittance_ending"] is None:
raise ValueError("We should not have a None value for the u value")
self.wind_turbine = {
"wind_turbine": wind_turbine_count,
}
if recommendation_record["walls_insulation_thickness_ending"] is None:
recommendation_record["walls_insulation_thickness_ending"] = "none"
def set_count_variables(self):
# Update description to indicate it's insulate
if recommendation["type"] in ["solid_floor_insulation", "suspended_floor_insulation",
"exposed_floor_insulation"]:
if len(recommendation["parts"]) > 1:
raise NotImplementedError("Have more than 1 floor insulation part - handle this case")
"""
For EPC fields that are just counts, we'll set them here
These are fields that are integers but may contain additional values such as "" so we can't do a direct
conversion straight to an integer
:return:
"""
recommendation_record["floor_thermal_transmittance_ending"] = recommendation["new_u_value"]
# We don't really see above average for this in the training data
recommendation_record["floor_insulation_thickness_ending"] = "average"
recommendation_record["floor_energy_eff_ending"] = "Good"
else:
if recommendation_record["floor_thermal_transmittance_ending"] is None:
raise ValueError("We should not have a None value for the u value")
fields = {
"number_of_open_fireplaces": "number-open-fireplaces",
"number_of_extensions": "extension-count",
"number_of_storeys": "flat-storey-count",
"number_of_rooms": "number-habitable-rooms",
}
if recommendation_record["floor_insulation_thickness_ending"] is None:
recommendation_record["floor_insulation_thickness_ending"] = "none"
null_attributes = ["number_of_storeys", "number_of_rooms"]
if recommendation["type"] in ["loft_insulation", "room_roof_insulation", "flat_roof_insulation"]:
recommendation_record["roof_thermal_transmittance_ending"] = recommendation["new_u_value"]
for attribute, epc_field in fields.items():
value = self.data["extension-count"]
if value == "" or value in self.DATA_ANOMALY_MATCHES:
if attribute in null_attributes:
value = None
else:
value = 0
parts = recommendation["parts"]
if len(parts) != 1:
raise ValueError("More than one part for roof insulation - investiage me")
# This is based on the values we have in the training data
valid_numeric_values = [
12, 25, 50, 75, 100, 150, 200, 250, 270, 300, 350, 400
]
proposed_depth = int(parts[0]["depth"])
if proposed_depth not in valid_numeric_values:
# Take the nearest value for scoring
proposed_depth = min(valid_numeric_values, key=lambda x: abs(x - proposed_depth))
recommendation_record["roof_insulation_thickness_ending"] = str(proposed_depth)
recommendation_record["roof_energy_eff_ending"] = "Very Good"
else:
# Fill missing roof u-values - this fill is not based on recommended upgrades
if recommendation_record["roof_thermal_transmittance_ending"] is None:
raise ValueError("We should not have a None value for the u value")
if recommendation_record["roof_insulation_thickness_ending"] is None:
recommendation_record["roof_insulation_thickness_ending"] = "none"
if recommendation["type"] == "mechanical_ventilation":
recommendation_record["mechanical_ventilation_ending"] = 'mechanical, extract only'
if recommendation["type"] == "sealing_open_fireplace":
recommendation_record["number_open_fireplaces_ending"] = 0
if recommendation["type"] == "low_energy_lighting":
recommendation_record["low_energy_lighting_ending"] = 100
recommendation_record["lighting_energy_eff_starting"] = "Very Good"
if recommendation["type"] == "windows_glazing":
recommendation_record["multi_glaze_proportion_ending"] = 100
recommendation_record["windows_energy_eff_ending"] = "Average"
is_secondary_glazing = recommendation["is_secondary_glazing"]
if recommendation_record["glazing_type_ending"] == "multiple":
pass
elif recommendation_record["glazing_type_ending"] == "single":
recommendation_record["glazing_type_ending"] = "secondary" if is_secondary_glazing else "double"
elif recommendation_record["glazing_type_ending"] == "double":
recommendation_record["glazing_type_ending"] = "multiple" if is_secondary_glazing else "double"
elif recommendation_record["glazing_type_ending"] == "secondary":
recommendation_record["glazing_type_ending"] = "secondary" if is_secondary_glazing else "multiple"
elif recommendation_record["glazing_type_ending"] in ["triple", "high performance"]:
recommendation_record["glazing_type_ending"] = "multiple"
else:
value = int(value)
raise ValueError("Invalid glazing type - implement me")
setattr(self, attribute, value)
if recommendation["type"] == "solar_pv":
recommendation_record["photo_supply_ending"] = recommendation["photo_supply"]
if recommendation["type"] not in [
"mechanical_ventilation", "sealing_open_fireplace", "low_energy_lighting",
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
"loft_insulation", "room_roof_insulation", "flat_roof_insulation",
"solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation",
"windows_glazing", "solar_pv"
]:
raise NotImplementedError("Implement me")
recommendation_record['id'] = "+".join([str(property_id), str(recommendation["recommendation_id"])])
return recommendation_record
def get_components(self, cleaned, photo_supply_lookup, floor_area_decile_thresholds):
"""
@ -378,9 +438,9 @@ class Property(Definitions):
"floor_height": self.floor_height,
"heat_loss_corridor": self.heat_loss_corridor["heat_loss_corridor"],
"unheated_corridor_length": self.heat_loss_corridor["length"],
"number_of_open_fireplaces": self.number_of_open_fireplaces,
"number_of_extensions": self.number_of_extensions,
"number_of_storeys": self.number_of_storeys,
"number_of_open_fireplaces": self.number_of_open_fireplaces["number_of_open_fireplaces"],
"number_of_extensions": self.number_of_extensions["number_of_extensions"],
"number_of_storeys": self.number_of_storeys["number_of_storeys"],
"mains_gas": self.mains_gas,
"energy_tariff": self.data["energy-tariff"],
"primary_energy_consumption": self.energy["primary_energy_consumption"],
@ -453,6 +513,9 @@ class Property(Definitions):
:return:
"""
# TODO: These functions should work on an EPCRecord object, so that the format is more standardised.
# They could also be added as attributes to the EPC Record
self.perimeter = estimate_perimeter(
self.floor_area / self.number_of_floors, self.number_of_rooms / self.number_of_floors
)
@ -473,7 +536,7 @@ class Property(Definitions):
def set_floor_level(self):
self.floor_level = (
FLOOR_LEVEL_MAP[self.data["floor-level"]] if
self.data["floor-level"] not in self.DATA_ANOMALY_MATCHES and self.data['floor-level'] is not None
self.data["floor-level"] not in self.DATA_ANOMALY_MATCHES and self.data['floor-level'] is not None
else None
)
@ -545,126 +608,6 @@ class Property(Definitions):
return component_data
def get_model_data(self):
"""
This method extracts cleaned data from the property object, which is used in our machine learning models
This will use many of the cleaned properties, extracted from the epc data, or methods in DataProcessor.
For future iterations of this, we probably want to implement a singular method in DataProcessor, which can
be used in the etl code and in here
:return: dictionary of model data to be scored in the model
"""
drop_cols = ["original_description", "clean_description"]
insulation_drop_cols = ["thermal_transmittance_unit", "is_assumed", "is_valid"]
insulation_rename_cols = ["thermal_transmittance", "insulation_thickness"]
walls = self._extract_component(self.walls, insulation_rename_cols, insulation_drop_cols + drop_cols, "walls")
roof = self._extract_component(self.roof, insulation_rename_cols, insulation_drop_cols + drop_cols, "roof")
floor = self._extract_component(self.floor, insulation_rename_cols, insulation_drop_cols + drop_cols, "floor")
windows = self._extract_component(self.windows, [], drop_cols + ["no_data"])
fuel = self._extract_component(self.main_fuel, ["tariff_type"], drop_cols + ["tariff_type"], "main-fuel")
main_heating = self._extract_component(self.main_heating, [], drop_cols + ["has_assumed"])
main_heating_controls = self._extract_component(self.main_heating_controls, [], drop_cols)
hotwater = self._extract_component(self.hotwater, ["tariff_type"], drop_cols + ['assumed'], "hotwater")
# We'll need to clean second heating
second_heating = self.data["secondheat-description"]
epc_raw_columns = POTENTIAL_COLUMNS + EFFICIENCY_FEATURES + [
'TRANSACTION_TYPE',
'ENERGY_TARIFF',
'PROPERTY_TYPE',
'UPRN',
'NUMBER_OPEN_FIREPLACES',
'MULTI_GLAZE_PROPORTION',
'MECHANICAL_VENTILATION',
'PHOTO_SUPPLY',
'LOW_ENERGY_LIGHTING',
'SOLAR_WATER_HEATING_FLAG',
'GLAZED_TYPE',
'CONSTITUENCY',
'NUMBER_HEATED_ROOMS',
'EXTENSION_COUNT',
]
epc_raw_data = {
k: self.data[k.lower().replace("_", "-")] for k in epc_raw_columns
}
built_form_cleaning_map = {
"Flat": "Mid-Terrace",
"House": "Semi-Detached",
"Bungalow": "Detached",
"Maisonette": "Mid-Terrace"
}
built_form = self.data["built-form"]
if built_form in self.DATA_ANOMALY_MATCHES:
# TODO: If built form isn't captured, we use the most common value for that property type - we shall
# improve this methodology
built_form = built_form_cleaning_map.get(self.data["property-type"])
if not built_form:
raise NotImplementedError("Not handled this property type when cleaning built form")
property_data = {
**walls,
**roof,
**floor,
**fuel,
**main_heating,
**main_heating_controls,
**hotwater,
**windows,
"SECONDHEAT_DESCRIPTION": second_heating,
"DAYS_TO": EPCDataProcessor.calculate_days_to(self.data["lodgement-date"]),
"SAP": float(self.data["current-energy-efficiency"]),
"CARBON": float(self.data["co2-emissions-current"]),
"HEAT_DEMAND": float(self.data["energy-consumption-current"]),
"estimated_perimeter": self.perimeter,
"CONSTRUCTION_AGE_BAND": self.construction_age_band,
"FLOOR_HEIGHT": self.floor_height,
"NUMBER_HABITABLE_ROOMS": self.number_of_rooms,
"TOTAL_FLOOR_AREA": self.floor_area,
"FIXED_LIGHTING_OUTLETS_COUNT": self.number_lighting_outlets,
**epc_raw_data,
"BUILT_FORM": built_form,
"POSTCODE": self.data["postcode"],
}
return property_data
def set_number_lighting_outlets(self, cleaned_property_data):
"""
Extracts and cleans the estimated number of lighting outlets
:return:
"""
if self.data["fixed-lighting-outlets-count"] in [None, ""]:
# We check old EPCs and the full SAP EPC
lighting_data = []
if len(self.old_data):
lighting_data.extend([
int(x["fixed-lighting-outlets-count"]) for x in self.old_data if
x["fixed-lighting-outlets-count"] != ""
])
if len(self.full_sap_epc):
if self.full_sap_epc["fixed-lighting-outlets-count"] != "":
lighting_data.append(int(self.full_sap_epc["fixed-lighting-outlets-count"]))
if lighting_data:
self.number_lighting_outlets = round(np.median(lighting_data))
else:
self.number_lighting_outlets = round(cleaned_property_data["FIXED_LIGHTING_OUTLETS_COUNT"].values[0])
else:
self.number_lighting_outlets = float(self.data["fixed-lighting-outlets-count"])
def set_adjusted_energy(self, current_adjusted_energy, expected_adjusted_energy):
"""
Stores these values for usage later

View file

@ -2,7 +2,6 @@ from datetime import datetime
import numpy as np
import pandas as pd
from epc_api.client import EpcClient
from etl.epc.Record import EPCRecord
from backend.SearchEpc import SearchEpc
from fastapi import APIRouter, Depends
@ -24,8 +23,8 @@ from backend.app.db.functions.recommendations_functions import (
from backend.app.db.models.portfolio import rating_lookup
from backend.app.dependencies import validate_token
from backend.app.plan.schemas import PlanTriggerRequest
from backend.app.plan.utils import create_recommendation_scoring_data, get_cleaned
from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, read_parquet_from_s3, sap_to_epc
from backend.app.plan.utils import get_cleaned
from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, sap_to_epc
from backend.ml_models.api import ModelApi
from backend.Property import Property
@ -53,7 +52,6 @@ router = APIRouter(
responses={404: {"description": "Not found"}}
)
# TODO: Need to install base.txt requirements into new env
@router.post("/trigger")
async def trigger_plan(body: PlanTriggerRequest):
@ -64,8 +62,10 @@ async def trigger_plan(body: PlanTriggerRequest):
try:
session.begin()
logger.info("Getting the inputs")
epc_client = EpcClient(auth_token=get_settings().EPC_AUTH_TOKEN)
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
)
input_properties = []
@ -95,26 +95,25 @@ async def trigger_plan(body: PlanTriggerRequest):
heat_demand_target=None
)
epc_records ={
epc_records = {
'original_epc': epc_searcher.newest_epc,
'full_sap_epc': epc_searcher.full_sap_epc,
'old_data': epc_searcher.old_data,
'old_data': epc_searcher.older_epcs,
}
prepared_epc = EPCRecord(epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data) # This uses all the epc records to clean the data
prepared_epc = EPCRecord(epc_records=epc_records, run_mode="newdata",
cleaning_data=cleaning_data) # This uses all the epc records to clean the data
input_properties.append(
Property(
id=property_id,
address1=config['address'],
postcode=config['postcode'],
address=epc_searcher.address_clean,
postcode=epc_searcher.postcode_clean,
epc_record=prepared_epc,
)
)
if not input_properties:
if not input_properties:
return Response(status_code=204)
# The materials data could be cached or local so we don't need to make
@ -127,9 +126,6 @@ async def trigger_plan(body: PlanTriggerRequest):
uprn_filenames = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet"
)
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
)
photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket=get_settings().DATA_BUCKET)
logger.info("Getting spatial data")
@ -140,7 +136,6 @@ async def trigger_plan(body: PlanTriggerRequest):
recommendations = {}
recommendations_scoring_data = []
property_scoring_data = {}
for p in input_properties:
@ -160,14 +155,15 @@ async def trigger_plan(body: PlanTriggerRequest):
recommendations_scoring_data.extend(p.recommendations_scoring_data)
logger.info("Preparing data for scoring in sap change api")
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
recommendations_scoring_data = recommendations_scoring_data.drop(
columns=["rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending", "carbon_ending"]
)
columns=["rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending",
"carbon_ending"]
)
model_api = ModelApi(portfolio_id=body.portfolio_id, timestamp=created_at)
all_predictions = model_api.predict_all(
df=recommendations_scoring_data,
bucket=get_settings().DATA_BUCKET,
@ -177,6 +173,8 @@ async def trigger_plan(body: PlanTriggerRequest):
"carbon_change_predictions": get_settings().CARBON_PREDICTIONS_BUCKET
}
)
# all_predictions["heat_demand_predictions"]= all_predictions["sap_change_predictions"].copy()
# all_predictions["carbon_change_predictions"] = all_predictions["sap_change_predictions"].copy()
# Insert the predictions into the recommendations and run the optimiser
logger.info("Optimising recommendations")
@ -282,58 +280,26 @@ async def trigger_plan(body: PlanTriggerRequest):
property_instance = [p for p in input_properties if p.id == property_id][0]
property_scoring_datasets = property_scoring_data[property_id]
starting_epc_data = property_scoring_datasets["starting_epc_data"].copy()
ending_epc_data = property_scoring_datasets["ending_epc_data"].copy()
fixed_data = property_scoring_datasets["fixed_data"].copy()
recommendation_record = property_instance.base_difference_record.df.to_dict("records")[0].copy()
scoring_dict = {}
for rec in default_recommendations:
scoring_dict = create_recommendation_scoring_data(
property=property_instance,
recommendation=rec,
starting_epc_data=starting_epc_data,
ending_epc_data=ending_epc_data,
fixed_data=fixed_data,
scoring_dict = Property.create_recommendation_scoring_data(
property_id=property_instance.id,
recommendation_record=recommendation_record,
recommendation=rec
)
# At each iteration, we want to update the ending_epc_data, so in the end, ending_epc_data contains
# all of the updates
# At each iterations, we update the recommendation record with the changes reflectecd in the
# scoring_dict
for k in scoring_dict.keys():
if k in ending_epc_data.columns:
ending_epc_data[k] = scoring_dict[k]
if k in recommendation_record.keys():
recommendation_record[k] = scoring_dict[k]
combined_recommendations_scoring_data.append(scoring_dict)
# PERFORM SAME STEPS AGAIN - TODO: TO BE REMOVED
combined_recommendations_scoring_data = pd.DataFrame(combined_recommendations_scoring_data)
# Perform the same cleaning as in the model - first clean number of room variables though
combined_recommendations_scoring_data = DataProcessor.apply_averages_cleaning(
data_to_clean=combined_recommendations_scoring_data,
cleaning_data=cleaning_data,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
)
combined_recommendations_scoring_data = DataProcessor.apply_averages_cleaning(
data_to_clean=combined_recommendations_scoring_data,
cleaning_data=cleaning_data,
cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"],
).drop(columns=["LOCAL_AUTHORITY"])
combined_recommendations_scoring_data = DataProcessor.clean_missings_after_description_process(
combined_recommendations_scoring_data,
ignore_cols=[
c for c in combined_recommendations_scoring_data.columns if ("thermal_transmittance" in c) or (
"insulation_thickness" in c) or ("ENERGY_EFF" in c)
]
)
combined_recommendations_scoring_data = DataProcessor.clean_efficiency_variables(
combined_recommendations_scoring_data
)
model_api = ModelApi(portfolio_id=body.portfolio_id, timestamp=created_at)
all_combined_predictions = model_api.predict_all(
df=combined_recommendations_scoring_data,
bucket=get_settings().DATA_BUCKET,
@ -344,6 +310,10 @@ async def trigger_plan(body: PlanTriggerRequest):
}
)
# all_combined_predictions["heat_demand_predictions"]= all_combined_predictions["sap_change_predictions"].copy()
# all_combined_predictions["carbon_change_predictions"] = all_combined_predictions[
# "sap_change_predictions"].copy()
# We update the carbon and heat demand predictions
for property_id, property_recommendations in recommendations.items():
combined_heat_demand = all_combined_predictions["heat_demand_predictions"]
@ -472,11 +442,6 @@ async def trigger_plan(body: PlanTriggerRequest):
update_or_create_property_spatial_details(session, p.uprn, p.spatial)
# TODO: TEMP
if p.data["uprn"] == "":
print("Get rid of me!")
p.data["uprn"] = 0
property_data = p.get_full_property_data()
update_property_data(
session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data

View file

@ -25,185 +25,3 @@ def get_cleaned():
cleaned = msgpack.unpackb(cleaned, raw=False)
return cleaned
def create_recommendation_scoring_data(
property: Property,
recommendation: dict,
starting_epc_data: pd.DataFrame,
ending_epc_data: pd.DataFrame,
fixed_data: pd.DataFrame,
):
"""
This wrapper function prepares data to be passed to the sap model api
:return:
"""
scoring_dict = {
"UPRN": property.data["uprn"],
"id": "+".join([str(property.id), str(recommendation["recommendation_id"])]),
"LOCAL_AUTHORITY": property.data["local-authority"],
**starting_epc_data.to_dict("records")[0],
**ending_epc_data.to_dict("records")[0],
**fixed_data.to_dict("records")[0]
}
# Set staring u-values if we don't have them
if scoring_dict["walls_thermal_transmittance"] is None:
scoring_dict["walls_thermal_transmittance"] = get_wall_u_value(
clean_description=property.walls["clean_description"],
age_band=property.age_band,
is_granite_or_whinstone=property.walls["is_granite_or_whinstone"],
is_sandstone_or_limestone=property.walls["is_sandstone_or_limestone"]
)
if scoring_dict["floor_thermal_transmittance"] is None:
scoring_dict["floor_thermal_transmittance"] = get_floor_u_value(
floor_type=property.floor_type,
area=property.floor_area,
perimeter=property.perimeter,
wall_type=property.wall_type,
insulation_thickness=property.floor["insulation_thickness"],
age_band=property.age_band,
)
if scoring_dict["roof_thermal_transmittance"] is None:
scoring_dict["roof_thermal_transmittance"] = get_roof_u_value(
insulation_thickness=property.roof["insulation_thickness"],
has_dwelling_above=property.roof["has_dwelling_above"],
is_loft=property.roof["is_loft"],
is_roof_room=property.roof["is_roof_room"],
is_thatched=property.roof["is_thatched"],
age_band=property.age_band,
is_flat=property.roof["is_flat"],
is_pitched=property.roof["is_pitched"],
is_at_rafters=property.roof["is_at_rafters"],
)
for col in [
"walls_insulation_thickness", "floor_insulation_thickness", "roof_insulation_thickness"
]:
if scoring_dict[col] is None:
scoring_dict[col] = "none"
# We update the description to indicate it's insulated
if recommendation["type"] in ["internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"]:
# The upgrade made here is to the u-value of the walls and the description of the
# insulation thickness
scoring_dict["walls_thermal_transmittance_ENDING"] = recommendation["new_u_value"]
scoring_dict["walls_insulation_thickness_ENDING"] = "above average"
scoring_dict["WALLS_ENERGY_EFF_ENDING"] = "Good"
else:
if scoring_dict["walls_thermal_transmittance_ENDING"] is None:
scoring_dict["walls_thermal_transmittance_ENDING"] = get_wall_u_value(
clean_description=property.walls["clean_description"],
age_band=property.age_band,
is_granite_or_whinstone=property.walls["is_granite_or_whinstone"],
is_sandstone_or_limestone=property.walls["is_sandstone_or_limestone"]
)
if scoring_dict["walls_insulation_thickness_ENDING"] is None:
scoring_dict["walls_insulation_thickness_ENDING"] = "none"
# Update description to indicate it's insulate
if recommendation["type"] in ["solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation"]:
if len(recommendation["parts"]) > 1:
raise NotImplementedError("Have more than 1 floor insulation part - handle this case")
scoring_dict["floor_thermal_transmittance_ENDING"] = recommendation["new_u_value"]
# We don't really see above average for this in the training data
scoring_dict["floor_insulation_thickness_ENDING"] = "average"
scoring_dict["FLOOR_ENERGY_EFF_ENDING"] = "Good"
else:
if scoring_dict["floor_thermal_transmittance_ENDING"] is None:
scoring_dict["floor_thermal_transmittance_ENDING"] = get_floor_u_value(
floor_type=property.floor_type,
area=property.floor_area,
perimeter=property.perimeter,
wall_type=property.wall_type,
insulation_thickness=property.floor["insulation_thickness"],
age_band=property.age_band,
)
if scoring_dict["floor_insulation_thickness_ENDING"] is None:
scoring_dict["floor_insulation_thickness_ENDING"] = "none"
if recommendation["type"] in ["loft_insulation", "room_roof_insulation", "flat_roof_insulation"]:
scoring_dict["roof_thermal_transmittance_ENDING"] = recommendation["new_u_value"]
parts = recommendation["parts"]
if len(parts) != 1:
raise ValueError("More than one part for roof insulation - investiage me")
# This is based on the values we have in the training data
valid_numeric_values = [
12, 25, 50, 75, 100, 150, 200, 250, 270, 300, 350, 400
]
proposed_depth = int(parts[0]["depth"])
if proposed_depth not in valid_numeric_values:
# Take the nearest value for scoring
proposed_depth = min(valid_numeric_values, key=lambda x: abs(x - proposed_depth))
scoring_dict["roof_insulation_thickness_ENDING"] = str(proposed_depth)
scoring_dict["ROOF_ENERGY_EFF_ENDING"] = "Very Good"
else:
# Fill missing roof u-values - this fill is not based on recommended upgrades
if scoring_dict["roof_thermal_transmittance_ENDING"] is None:
scoring_dict["roof_thermal_transmittance_ENDING"] = get_roof_u_value(
insulation_thickness=property.roof["insulation_thickness"],
has_dwelling_above=property.roof["has_dwelling_above"],
is_loft=property.roof["is_loft"],
is_roof_room=property.roof["is_roof_room"],
is_thatched=property.roof["is_thatched"],
age_band=property.age_band,
is_flat=property.roof["is_flat"],
is_pitched=property.roof["is_pitched"],
is_at_rafters=property.roof["is_at_rafters"],
)
if scoring_dict["roof_insulation_thickness_ENDING"] is None:
scoring_dict["roof_insulation_thickness_ENDING"] = "none"
if recommendation["type"] == "mechanical_ventilation":
scoring_dict["MECHANICAL_VENTILATION_ENDING"] = 'mechanical, extract only'
if recommendation["type"] == "sealing_open_fireplace":
scoring_dict["NUMBER_OPEN_FIREPLACES_ENDING"] = 0
if recommendation["type"] == "low_energy_lighting":
scoring_dict["LOW_ENERGY_LIGHTING_ENDING"] = 100
scoring_dict["LIGHTING_ENERGY_EFF_STARTING"] = "Very Good"
if recommendation["type"] == "windows_glazing":
scoring_dict["MULTI_GLAZE_PROPORTION_ENDING"] = 100
scoring_dict["WINDOWS_ENERGY_EFF_ENDING"] = "Average"
is_secondary_glazing = recommendation["is_secondary_glazing"]
if scoring_dict["glazing_type_ENDING"] == "multiple":
pass
elif scoring_dict["glazing_type_ENDING"] == "single":
scoring_dict["glazing_type_ENDING"] = "secondary" if is_secondary_glazing else "double"
elif scoring_dict["glazing_type_ENDING"] == "double":
scoring_dict["glazing_type_ENDING"] = "multiple" if is_secondary_glazing else "double"
elif scoring_dict["glazing_type_ENDING"] == "secondary":
scoring_dict["glazing_type_ENDING"] = "secondary" if is_secondary_glazing else "multiple"
elif scoring_dict["glazing_type_ENDING"] in ["triple", "high performance"]:
scoring_dict["glazing_type_ENDING"] = "multiple"
else:
raise ValueError("Invalid glazing type - implement me")
if recommendation["type"] == "solar_pv":
scoring_dict["PHOTO_SUPPLY_ENDING"] = recommendation["photo_supply"]
if recommendation["type"] not in [
"mechanical_ventilation", "sealing_open_fireplace", "low_energy_lighting",
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
"loft_insulation", "room_roof_insulation", "flat_roof_insulation",
"solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation",
"windows_glazing", "solar_pv"
]:
raise NotImplementedError("Implement me")
return scoring_dict

View file

@ -4,7 +4,6 @@ from io import StringIO
import string
import secrets
import logging
import pandas as pd
from io import BytesIO

View file

@ -34,7 +34,8 @@ class BaseDataset:
# raise ValueError(f"Pipeline type {pipeline_type} not found")
# return self.pipeline_steps[pipeline_type]
class TrainingDataset(BaseDataset):
"""
A collection of EPCDifferenceRecords can be combined into a TrainingDataset.
@ -45,7 +46,7 @@ class TrainingDataset(BaseDataset):
# self.pipeline_steps = self.pipeline_factory("training")
self.datasets = datasets
self.df = pd.DataFrame([dataset.difference_record for dataset in datasets])
self._feature_generation()
self._drop_features()
self._clean_efficiency_variables()
@ -91,7 +92,7 @@ class TrainingDataset(BaseDataset):
if row["has_dwelling_above"]:
if row["roof_thermal_transmittance"] != 0:
raise ValueError("Should have 0 u-value for roof")
if row["roof_thermal_transmittance_ending"] != 0:
raise ValueError("Should have 0 u-value for roof")
@ -105,15 +106,16 @@ class TrainingDataset(BaseDataset):
is_pitched=row["is_pitched"],
is_at_rafters=row["is_at_rafters"],
age_band=england_wales_age_band_lookup[row["construction_age_band"]]
)
)
@staticmethod
def _lambda_function_to_generate_wall_uvalue(row, is_end=False):
"""
Using the apply method, use the get_wall_u_value method to generate the u-value
"""
description_col_name = "walls_clean_description" if not is_end else "walls_clean_description_ending"
thermal_transistance_col_name = "walls_thermal_transmittance" if not is_end else "walls_thermal_transmittance_ending"
thermal_transistance_col_name = "walls_thermal_transmittance" if not is_end else \
"walls_thermal_transmittance_ending"
if pd.isnull(row[thermal_transistance_col_name]):
output = get_wall_u_value(
@ -126,7 +128,7 @@ class TrainingDataset(BaseDataset):
output = row[thermal_transistance_col_name]
return output
@staticmethod
def _lambda_function_to_generate_floor_uvalue(row, is_end=False):
"""
@ -146,20 +148,19 @@ class TrainingDataset(BaseDataset):
uvalue = row[floor_thermal_col_name]
if pd.isnull(uvalue):
insulation_col_name = "floor_insulation_thickness" if not is_end else "floor_insulation_thickness_ending"
floor_area_col_name = "estimated_perimeter_starting" if not is_end else "estimated_perimeter_ending"
perimeter_col_name = "total_floor_area_starting" if not is_end else "total_floor_area_ending"
perimeter_col_name = "estimated_perimeter_starting" if not is_end else "estimated_perimeter_ending"
floor_area_col_name = "ground_floor_area_starting" if not is_end else "ground_floor_area_ending"
uvalue = get_floor_u_value(
floor_type=row["floor_type"],
perimeter=row[floor_area_col_name],
area=row[perimeter_col_name],
insulation_thickness=row[insulation_col_name],
wall_type=row["wall_type"],
age_band=england_wales_age_band_lookup[row["construction_age_band"]]
)
floor_type=row["floor_type"],
perimeter=row[perimeter_col_name],
area=row[floor_area_col_name],
insulation_thickness=row[insulation_col_name],
wall_type=row["wall_type"],
age_band=england_wales_age_band_lookup[row["construction_age_band"]]
)
return uvalue
def _generate_u_values_from_features(self):
@ -181,13 +182,15 @@ class TrainingDataset(BaseDataset):
)
walls_starting_uvalue = self.df['walls_thermal_transmittance'].fillna(walls_starting_uvalue)
walls_starting_equals_ending_flag = self.df['walls_clean_description'] == self.df["walls_clean_description_ending"]
walls_ending_uvalue[walls_starting_equals_ending_flag] = walls_starting_uvalue[walls_starting_equals_ending_flag]
walls_starting_equals_ending_flag = self.df['walls_clean_description'] == self.df[
"walls_clean_description_ending"]
walls_ending_uvalue[walls_starting_equals_ending_flag] = walls_starting_uvalue[
walls_starting_equals_ending_flag]
# ~~~~~~~~~~~~~~~~~~
# Roof
# ~~~~~~~~~~~~~~~~~~
roof_starting_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_roof_uvalue(row),
axis=1
@ -200,7 +203,6 @@ class TrainingDataset(BaseDataset):
roof_starting_uvalue = self.df['roof_thermal_transmittance'].fillna(roof_starting_uvalue)
roof_ending_uvalue = self.df['roof_thermal_transmittance_ending'].fillna(roof_ending_uvalue)
# ~~~~~~~~~~~~~~~~~~
# Floor
# ~~~~~~~~~~~~~~~~~~
@ -210,29 +212,40 @@ class TrainingDataset(BaseDataset):
axis=1
)
self.df["ground_floor_area_starting"] = (
self.df["total_floor_area_starting"] / self.df['estimated_number_of_floors']
)
self.df["ground_floor_area_ending"] = (
self.df["total_floor_area_ending"] / self.df['estimated_number_of_floors']
)
self.df['estimated_perimeter_starting'] = self.df.apply(
lambda row: estimate_perimeter(row["total_floor_area_starting"]/ row['estimated_number_of_floors'], row["number_habitable_rooms"]/ row['estimated_number_of_floors']),
lambda row: estimate_perimeter(
row["ground_floor_area_starting"], row["number_habitable_rooms"] / row['estimated_number_of_floors']
),
axis=1
)
self.df['estimated_perimeter_ending'] = self.df.apply(
lambda row: estimate_perimeter(row["total_floor_area_ending"], row["number_habitable_rooms"]),
lambda row: estimate_perimeter(
row["ground_floor_area_starting"], row["number_habitable_rooms"] / row['estimated_number_of_floors']
),
axis=1
)
self.df["floor_type"] = self.df["is_suspended"].replace({True: "suspended", False: "solid"})
self.df["wall_type"] = self.df.apply(
lambda row: get_wall_type(
is_cavity_wall=row["is_cavity_wall"],
is_solid_brick=row["is_solid_brick"],
is_timber_frame=row["is_timber_frame"],
is_granite_or_whinstone=row["is_granite_or_whinstone"],
is_cob=row["is_cob"],
is_cavity_wall=row["is_cavity_wall"],
is_solid_brick=row["is_solid_brick"],
is_timber_frame=row["is_timber_frame"],
is_granite_or_whinstone=row["is_granite_or_whinstone"],
is_cob=row["is_cob"],
is_sandstone_or_limestone=row["is_sandstone_or_limestone"],
is_system_built=row["is_system_built"],
is_park_home=row["is_park_home"]
),
),
axis=1
)
floor_starting_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_floor_uvalue(row),
axis=1
@ -246,19 +259,21 @@ class TrainingDataset(BaseDataset):
floor_ending_uvalue = self.df['floor_thermal_transmittance_ending'].fillna(floor_ending_uvalue)
for component in ["walls", "roof", "floor"]:
self.df[f"{component}_thermal_transmittance"] = self.df[f"{component}_thermal_transmittance"].fillna(eval(f"{component}_starting_uvalue"))
self.df[f"{component}_thermal_transmittance_ending"] = self.df[f"{component}_thermal_transmittance_ending"].fillna(eval(f"{component}_ending_uvalue"))
self.df[f"{component}_thermal_transmittance"] = self.df[f"{component}_thermal_transmittance"].fillna(
eval(f"{component}_starting_uvalue"))
self.df[f"{component}_thermal_transmittance_ending"] = self.df[
f"{component}_thermal_transmittance_ending"].fillna(eval(f"{component}_ending_uvalue"))
self.df = self.df.drop(columns=["floor_type", "wall_type", "walls_clean_description", "walls_clean_description_ending", 'estimated_number_of_floors'])
self.df = self.df.drop(
columns=["floor_type", "wall_type", "walls_clean_description", "walls_clean_description_ending",
'estimated_number_of_floors', "ground_floor_area_starting", "ground_floor_area_ending"])
def _adjust_assumed_values_in_wall_descriptions(self):
"""
Strip out assumed values for all wall descriptions
"""
for col in ["walls_clean_description", "walls_clean_description_ending"]:
self.df[col] = self.df[col].str.replace("(assumed)", "").str.rstrip()
self.df[col] = self.df[col].str.replace("(assumed)", "", regex=False).str.rstrip()
def _drop_inconsistent_properties(self, expanded_df: pd.DataFrame, component: str):
"""
@ -292,9 +307,8 @@ class TrainingDataset(BaseDataset):
(expanded_df["is_at_rafters"] == expanded_df["is_at_rafters_ending"]) &
(expanded_df["has_dwelling_above"] == expanded_df["has_dwelling_above_ending"])
]
return expanded_df
def _expand_description_to_features(self, cleaned_lookup: dict):
"""
@ -306,7 +320,7 @@ class TrainingDataset(BaseDataset):
# remove this record, as it indicates that the quality of the EPC conducted in the first instance
# is low
# We also replace descriptions with their cleaned variants
"""
"""
cols_to_drop = {
"walls": [
@ -361,9 +375,9 @@ class TrainingDataset(BaseDataset):
}
components_to_expand = cols_to_drop.keys()
for component in components_to_expand:
# TODO: change cleaned dataframe to have underscores instead of dashes
if component == "main-fuel":
cleaned_key = "main-fuel"
@ -377,7 +391,7 @@ class TrainingDataset(BaseDataset):
original_cols = [f"{component}_description_starting", f"{component}_description_ending"]
cleaned_lookup_df_for_key = pd.DataFrame(cleaned_lookup[cleaned_key])
expanded_df = self.df.merge(
cleaned_lookup_df_for_key,
how="left",
@ -393,7 +407,7 @@ class TrainingDataset(BaseDataset):
# Drop properties where key material types have changed
expanded_df = self._drop_inconsistent_properties(expanded_df, component)
# Drop original cols and cols to drop
expanded_df = expanded_df.drop(columns=cols_to_drop[component] + original_cols)
@ -411,11 +425,10 @@ class TrainingDataset(BaseDataset):
}
)
self.df = expanded_df
# We don't need any lighting specific cleaning, we just drop the original description as we use
# LOW_ENERGY_LIGHTING_STARTING, LOW_ENERGY_LIGHTING_ENDING
self.df = self.df.drop(columns=["lighting_description_starting", "lighting_description_ending"])
def _clean_missing_values(self, ignore_cols=None):
missings = pd.isnull(self.df).sum()
@ -433,7 +446,6 @@ class TrainingDataset(BaseDataset):
else:
self.df[col] = self.df[col].fillna("Unknown")
def _null_validation(self, information: str):
print(f"Null validation after {information}")
if pd.isnull(self.df).sum().sum():
@ -445,7 +457,6 @@ class TrainingDataset(BaseDataset):
"""
self.df = self.df.drop(columns=["lodgement_date_starting", "lodgement_date_ending"])
def _feature_generation(self):
"""
Generate features for modelling
@ -469,16 +480,15 @@ class TrainingDataset(BaseDataset):
missings = missings[missings >= 1]
if len(missings) == 0:
return
return
# Make sure they are all efficiency columns
# Make sure they are all efficiency columns
if any(~missings.index.str.contains("energy_eff")):
raise ValueError("Non efficiency columns are missing")
for m in missings.index:
self.df[m] = self.df[m].fillna("NO_RATING")
@staticmethod
def _calculate_days_to(lodgement_date):
@ -495,7 +505,7 @@ class TrainingDataset(BaseDataset):
# if not isinstance(other, TrainingDataset):
# raise TypeError("Addition can only be performed with another instance of TrainingDataset")
# return TrainingDataset(self.datasets + other.datasets)
# def __radd__(self, other):
# """
# Required for sum() to work
@ -505,6 +515,7 @@ class TrainingDataset(BaseDataset):
# else:
# return self.__add__(other)
class NewDataset(BaseDataset):
"""
A collection of EPCDifferenceRecords can be combined into a ScoringDataset.
@ -518,7 +529,7 @@ class NewDataset(BaseDataset):
if not isinstance(other, NewDataset):
raise TypeError("Addition can only be performed with another instance of ScoringDataset")
return NewDataset(self.datasets + other.datasets)
def __radd__(self, other):
"""
Required for sum() to work
@ -526,4 +537,4 @@ class NewDataset(BaseDataset):
if isinstance(other, int):
return self
else:
return self.__add__(other)
return self.__add__(other)

View file

@ -1,8 +1,8 @@
from datetime import datetime
from dataclasses import dataclass
from etl.epc.ValidationConfiguration import (
EPCRecordValidationConfiguration,
EPCDifferenceRecordValidationConfiguration,
EPCRecordValidationConfiguration,
EPCDifferenceRecordValidationConfiguration,
EPCDifferenceRecordFixedDataValidationConfiguration
)
from etl.epc.DataProcessor import EPCDataProcessor
@ -33,6 +33,7 @@ EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES]
ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev')
DATA_BUCKET = os.environ.get('DATA_BUCKET', 'retrofit-data-dev' if ENVIRONMENT == 'dev' else None)
@dataclass
class EPCRecord:
"""
@ -41,44 +42,44 @@ class EPCRecord:
uprn: int = None
walls_description: str = None
floor_description : str = None
lighting_description : str = None
roof_description : str = None
mainheat_description : str = None
hotwater_description : str = None
main_fuel : str = None
mechanical_ventilation : str = None
secondheat_description : str = None
windows_description : str = None
glazed_type : str = None
multi_glaze_proportion : float = None
low_energy_lighting : float = None
number_open_fireplaces : float = None
mainheatcont_description : str = None
solar_water_heating_flag : str = None
photo_supply : float = None
transaction_type : str = None
energy_tariff : str = None
extension_count : float = None
total_floor_area : float = None
floor_height : float = None
hot_water_energy_eff : str = None
floor_energy_eff : str = None
windows_energy_eff : str = None
walls_energy_eff : str = None
sheating_energy_eff : str = None
roof_energy_eff : str = None
mainheat_energy_eff : str = None
mainheatc_energy_eff : str = None
lighting_energy_eff : str = None
potential_energy_efficiency : float = None
environment_impact_potential : float = None
energy_consumption_potential : float = None
co2_emissions_potential : float = None
lodgement_date : str = None
current_energy_efficiency : int = None
energy_consumption_current : int = None
co2_emissions_current : float = None
floor_description: str = None
lighting_description: str = None
roof_description: str = None
mainheat_description: str = None
hotwater_description: str = None
main_fuel: str = None
mechanical_ventilation: str = None
secondheat_description: str = None
windows_description: str = None
glazed_type: str = None
multi_glaze_proportion: float = None
low_energy_lighting: float = None
number_open_fireplaces: float = None
mainheatcont_description: str = None
solar_water_heating_flag: str = None
photo_supply: float = None
transaction_type: str = None
energy_tariff: str = None
extension_count: float = None
total_floor_area: float = None
floor_height: float = None
hot_water_energy_eff: str = None
floor_energy_eff: str = None
windows_energy_eff: str = None
walls_energy_eff: str = None
sheating_energy_eff: str = None
roof_energy_eff: str = None
mainheat_energy_eff: str = None
mainheatc_energy_eff: str = None
lighting_energy_eff: str = None
potential_energy_efficiency: float = None
environment_impact_potential: float = None
energy_consumption_potential: float = None
co2_emissions_potential: float = None
lodgement_date: str = None
current_energy_efficiency: int = None
energy_consumption_current: int = None
co2_emissions_current: float = None
# u_values_walls = None
# u_values_roof = None
@ -107,7 +108,7 @@ class EPCRecord:
# self.WALLS_DESCRIPTION = 'check'
# Could also have cleaning of records if needed
if self.run_mode == "training":
if self.run_mode == "training":
self.validation_configuration = EPCRecordValidationConfiguration
# self._field_validation()
return
@ -115,7 +116,7 @@ class EPCRecord:
# We are running in newdata mode
if self.epc_records is None:
raise ValueError("Must provide epc records if running in newdata mode")
self.prepared_epc = self.epc_records['original_epc']
self.original_epc = self.epc_records['original_epc'].copy()
@ -123,8 +124,8 @@ class EPCRecord:
self.old_data = self.epc_records['old_data']
if self.cleaning_data is None:
raise ValueError("Must provide cleaning data if running in newdata mode")
raise ValueError("Must provide cleaning data if running in newdata mode")
self._clean_records_using_epc_records()
self._clean_with_data_processor()
self._temp_uprn_catch()
@ -175,7 +176,6 @@ class EPCRecord:
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
).dt.days
def _temp_uprn_catch(self):
"""
Catch the case we do now have uprn
@ -188,15 +188,14 @@ class EPCRecord:
This method will clean the records using the data processor
"""
epc_data_processor = EPCDataProcessor(
data=self.epc_record_as_dataframe("prepared_epc"),
run_mode="newdata",
data=self.epc_record_as_dataframe("prepared_epc"),
run_mode="newdata",
cleaning_averages=self.cleaning_data
)
epc_data_processor.prepare_data()
self.prepared_epc = epc_data_processor.data.to_dict(orient="records")[0]
def _expand_prepared_epc_to_attributes(self):
"""
This method will expand the prepared epc to attributes
@ -207,44 +206,44 @@ class EPCRecord:
self.uprn: int = int(self.prepared_epc["uprn"])
self.walls_description: str = self.prepared_epc["walls_description"]
self.floor_description : str = self.prepared_epc["floor_description"]
self.lighting_description : str = self.prepared_epc["lighting_description"]
self.roof_description : str = self.prepared_epc["roof_description"]
self.mainheat_description : str = self.prepared_epc["mainheat_description"]
self.hotwater_description : str = self.prepared_epc["hotwater_description"]
self.main_fuel : str = self.prepared_epc["main_fuel"]
self.mechanical_ventilation : str = self.prepared_epc["mechanical_ventilation"]
self.secondheat_description : str = self.prepared_epc["secondheat_description"]
self.windows_description : str = self.prepared_epc["windows_description"]
self.glazed_type : str = self.prepared_epc["glazed_type"]
self.multi_glaze_proportion : float = float(self.prepared_epc["multi_glaze_proportion"])
self.low_energy_lighting : float = float(self.prepared_epc["low_energy_lighting"])
self.number_open_fireplaces : float = float(self.prepared_epc["number_open_fireplaces"])
self.mainheatcont_description : str = self.prepared_epc["mainheatcont_description"]
self.solar_water_heating_flag : str = self.prepared_epc["solar_water_heating_flag"]
self.photo_supply : float = float(self.prepared_epc["photo_supply"])
self.transaction_type : str = self.prepared_epc["transaction_type"]
self.energy_tariff : str = self.prepared_epc["energy_tariff"]
self.extension_count : float = float(self.prepared_epc["extension_count"])
self.total_floor_area : float = float(self.prepared_epc["total_floor_area"])
self.floor_height : float = float(self.prepared_epc["floor_height"])
self.hot_water_energy_eff : str = self.prepared_epc["hot_water_energy_eff"]
self.floor_energy_eff : str = self.prepared_epc["floor_energy_eff"]
self.windows_energy_eff : str = self.prepared_epc["windows_energy_eff"]
self.walls_energy_eff : str = self.prepared_epc["walls_energy_eff"]
self.sheating_energy_eff : str = self.prepared_epc["sheating_energy_eff"]
self.roof_energy_eff : str = self.prepared_epc["roof_energy_eff"]
self.mainheat_energy_eff : str = self.prepared_epc["mainheat_energy_eff"]
self.mainheatc_energy_eff : str = self.prepared_epc["mainheatc_energy_eff"]
self.lighting_energy_eff : str = self.prepared_epc["lighting_energy_eff"]
self.potential_energy_efficiency : float = float(self.prepared_epc["potential_energy_efficiency"])
self.environment_impact_potential : float = float(self.prepared_epc["environment_impact_potential"])
self.energy_consumption_potential : float = float(self.prepared_epc["energy_consumption_potential"])
self.co2_emissions_potential : float = float(self.prepared_epc["co2_emissions_potential"])
self.lodgement_date : str = self.prepared_epc["lodgement_date"]
self.current_energy_efficiency : int = int(self.prepared_epc["current_energy_efficiency"])
self.energy_consumption_current : int = int(self.prepared_epc["energy_consumption_current"])
self.co2_emissions_current : float = float(self.prepared_epc["co2_emissions_current"])
self.floor_description: str = self.prepared_epc["floor_description"]
self.lighting_description: str = self.prepared_epc["lighting_description"]
self.roof_description: str = self.prepared_epc["roof_description"]
self.mainheat_description: str = self.prepared_epc["mainheat_description"]
self.hotwater_description: str = self.prepared_epc["hotwater_description"]
self.main_fuel: str = self.prepared_epc["main_fuel"]
self.mechanical_ventilation: str = self.prepared_epc["mechanical_ventilation"]
self.secondheat_description: str = self.prepared_epc["secondheat_description"]
self.windows_description: str = self.prepared_epc["windows_description"]
self.glazed_type: str = self.prepared_epc["glazed_type"]
self.multi_glaze_proportion: float = float(self.prepared_epc["multi_glaze_proportion"])
self.low_energy_lighting: float = float(self.prepared_epc["low_energy_lighting"])
self.number_open_fireplaces: float = float(self.prepared_epc["number_open_fireplaces"])
self.mainheatcont_description: str = self.prepared_epc["mainheatcont_description"]
self.solar_water_heating_flag: str = self.prepared_epc["solar_water_heating_flag"]
self.photo_supply: float = float(self.prepared_epc["photo_supply"])
self.transaction_type: str = self.prepared_epc["transaction_type"]
self.energy_tariff: str = self.prepared_epc["energy_tariff"]
self.extension_count: float = float(self.prepared_epc["extension_count"])
self.total_floor_area: float = float(self.prepared_epc["total_floor_area"])
self.floor_height: float = float(self.prepared_epc["floor_height"])
self.hot_water_energy_eff: str = self.prepared_epc["hot_water_energy_eff"]
self.floor_energy_eff: str = self.prepared_epc["floor_energy_eff"]
self.windows_energy_eff: str = self.prepared_epc["windows_energy_eff"]
self.walls_energy_eff: str = self.prepared_epc["walls_energy_eff"]
self.sheating_energy_eff: str = self.prepared_epc["sheating_energy_eff"]
self.roof_energy_eff: str = self.prepared_epc["roof_energy_eff"]
self.mainheat_energy_eff: str = self.prepared_epc["mainheat_energy_eff"]
self.mainheatc_energy_eff: str = self.prepared_epc["mainheatc_energy_eff"]
self.lighting_energy_eff: str = self.prepared_epc["lighting_energy_eff"]
self.potential_energy_efficiency: float = float(self.prepared_epc["potential_energy_efficiency"])
self.environment_impact_potential: float = float(self.prepared_epc["environment_impact_potential"])
self.energy_consumption_potential: float = float(self.prepared_epc["energy_consumption_potential"])
self.co2_emissions_potential: float = float(self.prepared_epc["co2_emissions_potential"])
self.lodgement_date: str = self.prepared_epc["lodgement_date"]
self.current_energy_efficiency: int = int(self.prepared_epc["current_energy_efficiency"])
self.energy_consumption_current: int = int(self.prepared_epc["energy_consumption_current"])
self.co2_emissions_current: float = float(self.prepared_epc["co2_emissions_current"])
def _identify_delta_between_prepared_and_original_records(self):
"""
@ -258,14 +257,13 @@ class EPCRecord:
same_index = df.apply(pd.Series.duplicated).any()
self.prepared_epc_delta_metadata = df[same_index[~same_index].index]
def _expand_description_to_features(self):
pass
def _expand_description_to_uvalues(self):
# TODO: can be loop over all the descriptions, or done in one
pass
# def _process_and_prune(self, cleaned_lookup: dict):
# """
# This method will merge on the cleaned lookup table and ensure that the building fabric in the
@ -283,7 +281,7 @@ class EPCRecord:
# left_on_ending = (
# f"{component}_ending" if component == "main-fuel" else f"{component}_description_ending"
# )
# self.df2 = self.df.merge(
# pd.DataFrame(cleaned_lookup[cleaned_key]),
# how="left",
@ -296,7 +294,6 @@ class EPCRecord:
# right_on="original_description",
# suffixes=("", "_ending")
# )
def _clean_records_using_epc_records(self):
"""
@ -328,20 +325,21 @@ class EPCRecord:
# self._clean_energy_consumption_current()
# self._clean_co2_emissions_current()
def epc_record_as_dataframe(self, epc_type: str = "prepared_epc", use_upper_columns: bool = True, replace_empty_string: bool = False):
def epc_record_as_dataframe(self, epc_type: str = "prepared_epc", use_upper_columns: bool = True,
replace_empty_string: bool = False):
"""
This method will return the dataframe representation of the epc record
"""
df = pd.DataFrame.from_dict(self.get(epc_type), orient="index").T
if use_upper_columns:
df.columns = [x.upper().replace("-","_") for x in df.columns]
df.columns = [x.upper().replace("-", "_") for x in df.columns]
if replace_empty_string:
df = df.replace("", np.nan)
return df
def _clean_floor_level(self):
"""
This method will clean the floor level, if empty or invalid
@ -360,7 +358,7 @@ class EPCRecord:
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
if self.prepared_epc["fixed-lighting-outlets-count"] == "":
# We check old EPCs and the full SAP EPC
@ -380,13 +378,15 @@ class EPCRecord:
if lighting_data:
self.prepared_epc["fixed-lighting-outlets-count"] = round(np.median(lighting_data))
else:
# Use averages from the cleaning dataset, based on the property type, built form, construction age band and local authority
# Use averages from the cleaning dataset, based on the property type, built form, construction age
# band and local authority
cleaned_property_data = EPCDataProcessor.apply_averages_cleaning(
data_to_clean=self.epc_record_as_dataframe("prepared_epc", replace_empty_string=True),
cleaning_data=self.cleaning_data,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
)
self.prepared_epc["fixed-lighting-outlets-count"] = round(cleaned_property_data["FIXED_LIGHTING_OUTLETS_COUNT"].values[0])
self.prepared_epc["fixed-lighting-outlets-count"] = round(
cleaned_property_data["FIXED_LIGHTING_OUTLETS_COUNT"].values[0])
else:
self.prepared_epc["fixed-lighting-outlets-count"] = float(self.prepared_epc["fixed-lighting-outlets-count"])
@ -402,7 +402,8 @@ class EPCRecord:
if self.construction_age_band is not None and self.construction_age_band not in DATA_ANOMALY_MATCHES:
result = result[(result["CONSTRUCTION_AGE_BAND"] == self.construction_age_band)]
if self.prepared_epc["built-form"] not in DATA_ANOMALY_MATCHES and self.prepared_epc["built-form"] in result["BUILT_FORM"]:
if self.prepared_epc["built-form"] not in DATA_ANOMALY_MATCHES and self.prepared_epc["built-form"] in result[
"BUILT_FORM"]:
result = result[(result["BUILT_FORM"] == self.prepared_epc["built-form"])]
return result[["NUMBER_HABITABLE_ROOMS", "TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]].mean()
@ -424,7 +425,8 @@ class EPCRecord:
self.property_dimensions = self._filter_property_dimensions(property_dimensions)
if not self.prepared_epc["number-habitable-rooms"]:
self.prepared_epc["number-habitable-rooms"] = float(self.property_dimensions["NUMBER_HABITABLE_ROOMS"].round())
self.prepared_epc["number-habitable-rooms"] = float(
self.property_dimensions["NUMBER_HABITABLE_ROOMS"].round())
else:
self.prepared_epc["number-habitable-rooms"] = float(self.prepared_epc["number-habitable-rooms"])
@ -451,7 +453,7 @@ class EPCRecord:
self.prepared_epc["total-floor-area"] = float(self.prepared_epc["total-floor-area"])
def _clean_mains_gas(self):
def _clean_mains_gas(self):
"""
This method will clean the mains gas, if empty or invalid
"""
@ -465,7 +467,7 @@ class EPCRecord:
self.prepared_epc["mains-gas-flag"] = None if (
self.prepared_epc["mains-gas-flag"] == "" or self.prepared_epc["mains-gas-flag"] in DATA_ANOMALY_MATCHES
) else map[self.prepared_epc["mains-gas-flag"]]
) else map[self.prepared_epc["mains-gas-flag"]]
def _clean_heat_loss_corridor(self):
"""
@ -480,17 +482,21 @@ class EPCRecord:
"heated corridor": False
}
self.prepared_epc["heat-loss-corridor"] = False if self.prepared_epc["heat-loss-corridor"] in DATA_ANOMALY_MATCHES else map[self.prepared_epc["heat-loss-corridor"]]
self.prepared_epc["unheated-corridor-length"] = float(self.prepared_epc["unheated-corridor-length"]) if self.prepared_epc["unheated-corridor-length"] != "" else None
self.prepared_epc["heat-loss-corridor"] = False if self.prepared_epc[
"heat-loss-corridor"] in DATA_ANOMALY_MATCHES else map[
self.prepared_epc["heat-loss-corridor"]]
self.prepared_epc["unheated-corridor-length"] = (
float(self.prepared_epc["unheated-corridor-length"]) if
self.prepared_epc["unheated-corridor-length"] != "" else None
)
def _clean_count_variables(self):
"""
This method will clean the count variables, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
raise ValueError("EPC Record doesn not contain epc data")
fields = {
"number_of_open_fireplaces": "number-open-fireplaces",
@ -502,8 +508,6 @@ class EPCRecord:
null_attributes = ["number_of_storeys", "number_of_rooms"]
for attribute, epc_field in fields.items():
# TODO: check this
# value = self.data["extension-count"]
value = self.prepared_epc[epc_field]
if value == "" or value in DATA_ANOMALY_MATCHES:
if attribute in null_attributes:
@ -522,7 +526,8 @@ class EPCRecord:
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc['wind-turbine-count'] = int(self.prepared_epc['wind-turbine-count']) if self.prepared_epc['wind-turbine-count'] != "" else None
self.prepared_epc['wind-turbine-count'] = int(self.prepared_epc['wind-turbine-count']) if self.prepared_epc[
'wind-turbine-count'] != "" else None
def _clean_solar_hot_water(self):
"""
@ -530,7 +535,7 @@ class EPCRecord:
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
value_map = {
"Y": True,
"N": False,
@ -546,7 +551,9 @@ class EPCRecord:
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc['photo-supply'] = float(self.prepared_epc['photo-supply']) if self.prepared_epc['photo-supply'] != "" else None
self.prepared_epc['photo-supply'] = float(self.prepared_epc['photo-supply']) if self.prepared_epc[
'photo-supply'] != "" \
else None
def _clean_energy(self):
"""
@ -558,7 +565,6 @@ class EPCRecord:
self.prepared_epc['energy-consumption-current'] = float(self.prepared_epc["energy-consumption-current"])
self.prepared_epc['co2-emissions-current'] = float(self.prepared_epc["co2-emissions-current"])
def _clean_built_form(self):
"""
This method will clean the build form, if empty or invalid
@ -566,7 +572,8 @@ class EPCRecord:
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc['built-form'] = BUILT_FORM_REMAP.get(self.prepared_epc["built-form"], self.prepared_epc["built-form"])
self.prepared_epc['built-form'] = BUILT_FORM_REMAP.get(self.prepared_epc["built-form"],
self.prepared_epc["built-form"])
if self.prepared_epc["built-form"] in DATA_ANOMALY_MATCHES:
if self.prepared_epc["property-type"] == "Flat":
self.prepared_epc["built-form"] = "Semi-Detached"
@ -578,7 +585,8 @@ class EPCRecord:
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.construction_age_band = EPCDataProcessor.clean_construction_age_band(self.prepared_epc["construction-age-band"])
self.construction_age_band = EPCDataProcessor.clean_construction_age_band(
self.prepared_epc["construction-age-band"])
if self.construction_age_band in DATA_ANOMALY_MATCHES:
if self.old_data:
# Take the most recent
@ -586,7 +594,8 @@ class EPCRecord:
[old_record["lodgement-datetime"] for old_record in self.old_data if
old_record["construction-age-band"] not in DATA_ANOMALY_MATCHES]
)
most_recent = [old_record for old_record in self.old_data if old_record["lodgement-datetime"] == max_datetime]
most_recent = [old_record for old_record in self.old_data if
old_record["lodgement-datetime"] == max_datetime]
self.construction_age_band = EPCDataProcessor.clean_construction_age_band(
most_recent[0]["construction-age-band"]
@ -625,14 +634,15 @@ class EPCRecord:
"""
This method will clean the ventilation, if empty or invalid
"""
self.prepared_epc['mechanical-ventilation'] = None if (self.mechanical_ventilation == "" or self.mechanical_ventilation in DATA_ANOMALY_MATCHES) else self.mechanical_ventilation
self.prepared_epc['mechanical-ventilation'] = None if (
self.mechanical_ventilation == "" or self.mechanical_ventilation in DATA_ANOMALY_MATCHES) else (
self.mechanical_ventilation)
def _field_validation(self):
"""
This method will validate each of the fields in the EPC record
"""
for record_key, validation_config in self.validation_configuration.items():
# Get the variable named record key from self
field_value = self.__dict__[record_key]
@ -650,81 +660,89 @@ class EPCRecord:
"""
if not isinstance(field_value, str):
raise ValueError(f"Field {record_key} has value {field_value} which is not a string")
if 'function' in validation_config:
try:
validation_config['function'](field_value)
except:
raise ValueError(f"Field {record_key} has value {field_value} which does not pass the validation function {validation_config['function']}")
raise ValueError(
f"Field {record_key} has value {field_value} which does not pass the validation function "
f"{validation_config['function']}")
if validation_config['acceptable_values'] is not None:
if field_value not in validation_config['acceptable_values']:
raise ValueError(f"Field {record_key} has value {field_value} which is not in the acceptable values of {validation_config['acceptable_values']}")
raise ValueError(
f"Field {record_key} has value {field_value} which is not in the acceptable values of "
f"{validation_config['acceptable_values']}")
def _validate_float(self, record_key: str, field_value: Union[str, float], validation_config: dict):
"""
Validate a float field
"""
if not isinstance(field_value, float):
raise ValueError(f"Field {record_key} has value {field_value} which is not a float")
if 'function' in validation_config:
try:
validation_config['function'](field_value)
except:
raise ValueError(f"Field {record_key} has value {field_value} which does not pass the validation function {validation_config['function']}")
raise ValueError(
f"Field {record_key} has value {field_value} which does not pass the validation function "
f"{validation_config['function']}")
if validation_config['range'] is not None:
if field_value < validation_config['range'][0] or field_value > validation_config['range'][1]:
raise ValueError(f"Field {record_key} has value {field_value} which is not in the acceptable range of {validation_config['range']}")
raise ValueError(
f"Field {record_key} has value {field_value} which is not in the acceptable range of "
f"{validation_config['range']}")
def __sub__(self, other):
"""
This method will return the difference between two EPC records
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only subtract EPCRecord from EPCRecord")
difference_record = EPCDifferenceRecord(record1=self, record2=other, auto_sort=True)
return difference_record
def __gt__(self, other):
"""
This method will return True if the EPC record is greater than or equal to the other
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only compare EPCRecord to EPCRecord")
return self.__dict__[RDSAP_RESPONSE] > other.__dict__[RDSAP_RESPONSE]
def __ge__(self, other):
"""
This method will return True if the EPC record is greater than or equal to the other
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only compare EPCRecord to EPCRecord")
return self.__dict__[RDSAP_RESPONSE] >= other.__dict__[RDSAP_RESPONSE]
def __lt__(self, other):
"""
This method will return True if the EPC record is greater than or equal to the other
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only compare EPCRecord to EPCRecord")
return self.__dict__[RDSAP_RESPONSE] < other.__dict__[RDSAP_RESPONSE]
def __le__(self, other):
"""
This method will return True if the EPC record is greater than or equal to the other
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only compare EPCRecord to EPCRecord")
return self.__dict__[RDSAP_RESPONSE] <= other.__dict__[RDSAP_RESPONSE]
def get(self, key: Union[str, List[str]], return_asdict: bool = False, key_suffix: str | None = None) -> Any:
"""
This method will return the value of the key
@ -738,8 +756,8 @@ class EPCRecord:
if isinstance(key, list):
return [self.__dict__[x] if x in self.__dict__.keys() else None for x in key]
elif isinstance(key, str):
return self.__dict__[key] if key in self.__dict__.keys() else None
return self.__dict__[key] if key in self.__dict__.keys() else None
class EPCDifferenceRecord:
"""
@ -767,7 +785,6 @@ class EPCDifferenceRecord:
self._validate_difference_record()
# self._detect_fabric_consistency()
def _construct_difference_record(self):
"""
This method will construct the difference record between the two records
@ -778,8 +795,10 @@ class EPCDifferenceRecord:
carbon_change = self.record2.get(CARBON_RESPONSE) - self.record1.get(CARBON_RESPONSE)
component_variables = COMPONENT_FEATURES + EFFICIENCY_FEATURES
ending_record = self.record2.get(component_variables + ["lodgement_date"], return_asdict=True, key_suffix="_ending")
starting_record = self.record1.get(component_variables + ["lodgement_date"], return_asdict=True, key_suffix="_starting")
ending_record = self.record2.get(component_variables + ["lodgement_date"], return_asdict=True,
key_suffix="_ending")
starting_record = self.record1.get(component_variables + ["lodgement_date"], return_asdict=True,
key_suffix="_starting")
self.difference_record = {
"uprn": self.record1.get("uprn"),
@ -812,30 +831,30 @@ class EPCDifferenceRecord:
# if value < 0:
# raise ValueError(f"Difference record has negative value for {key}")
pass
def compare_fields_in_records(self, fields: List[str]):
"""
This method will compare the records, for specific fields
"""
all_equal = True
for field in fields:
if self.record1.get(field) != self.record2.get(field):
return False
if all_equal:
return True
def get(self, key: str):
"""
This method will return the value of the key
"""
return self.difference_record[key] if key in self.difference_record.keys() else None
return self.difference_record[key] if key in self.difference_record.keys() else None
def append_fixed_data(self, fixed_data: dict):
"""
This method will append fixed data to the difference record
"""
"""
self._validate_fixed_data(fixed_data)
self.difference_record.update(fixed_data)