diff --git a/backend/Property.py b/backend/Property.py index bb545248..e193ffbb 100644 --- a/backend/Property.py +++ b/backend/Property.py @@ -5,8 +5,9 @@ import os import numpy as np import pandas as pd -from etl.epc.DataProcessor import DataProcessor -from etl.epc.settings import POTENTIAL_COLUMNS, EFFICIENCY_FEATURES, BUILT_FORM_REMAP +from etl.epc.DataProcessor import EPCDataProcessor +from etl.epc.Dataset import TrainingDataset +from etl.epc.settings import LATEST_FIELD, MANDATORY_FIXED_FEATURES, POTENTIAL_COLUMNS, EFFICIENCY_FEATURES, BUILT_FORM_REMAP from etl.epc_clean.epc_attributes.all_cleaners import all_cleaner_map from utils.logger import setup_logger from utils.s3 import read_dataframe_from_s3_parquet @@ -17,6 +18,7 @@ from recommendations.recommendation_utils import ( estimate_perimeter, get_wall_type, estimate_external_wall_area, esimtate_pitched_roof_area ) + ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev') EPC_AUTH_TOKEN = os.environ.get('EPC_AUTH_TOKEN') DATA_BUCKET = os.environ.get('DATA_BUCKET', 'retrofit-data-dev' if ENVIRONMENT == 'dev' else None) @@ -49,216 +51,201 @@ class Property(Definitions): spatial = None - def __init__(self, id, postcode, address1, epc_client=None, data=None): + def __init__(self, id, postcode, address1, epc_record, data=None): + + self.epc_record = epc_record + self.id = id self.postcode = postcode self.address1 = address1 - self.data = data - self.old_data = None + self.data = {k.replace("_", "-"): v for k,v in epc_record.get("prepared_epc").items()} + self.old_data = epc_record.get("old_data") self.property_dimensions = None - self.uprn = None - self.full_sap_epc = None + self.uprn = epc_record.get("uprn") + self.full_sap_epc = epc_record.get("full_sap_epc") self.in_conservation_area, self.is_listed, self.is_heritage = None, None, None self.restricted_measures = False - self.year_built = None - self.number_of_rooms = None - self.age_band = None - self.construction_age_band = None - self.number_of_floors = None + self.year_built = epc_record.get("year_built") + self.number_of_rooms = epc_record.prepared_epc.get("number_of_rooms") + self.age_band = epc_record.get("age_band") + self.construction_age_band = epc_record.get("construction_age_band") + self.number_of_floors = epc_record.get("number_of_floors") self.perimeter = None self.wall_type = None self.floor_type = None - self.energy = None - self.ventilation = None - self.solar_pv = None - self.solar_hot_water = None - self.wind_turbine = None - self.number_of_open_fireplaces = None - self.number_of_extensions = None - self.number_of_storeys = None - self.heat_loss_corridor = None - self.mains_gas = None - self.floor_height = None + self.energy = { + "primary_energy_consumption": epc_record.get("energy_consumption_current"), + "co2_emissions": epc_record.get("co2_emissions_current"), + } + self.ventilation = { + "ventilation": epc_record.get("mechanical_ventilation"), + } + self.solar_pv = { + "solar_pv": epc_record.get("photo_supply"), + } + self.solar_hot_water = { + "solar_hot_water": epc_record.get("solar_water_heating_flag"), + } + self.wind_turbine = { + "wind_turbine": epc_record.prepared_epc.get("wind_turbine_count"), + } + self.number_of_open_fireplaces = { + "number_of_open_fireplaces": epc_record.prepared_epc.get("number_of_open_fireplaces"), + } + self.number_of_extensions = { + "number_of_extensions": epc_record.prepared_epc.get("number_of_extensions"), + } + self.number_of_storeys = { + "number_of_storeys": epc_record.prepared_epc.get("number_of_storeys"), + } + self.heat_loss_corridor = { + "heat_loss_corridor": epc_record.prepared_epc.get("heat_loss_corridor"), + "length": epc_record.prepared_epc.get("unheated_corridor_length"), + } + self.mains_gas = epc_record.prepared_epc.get('mains_gas_flag') + self.floor_height = epc_record.prepared_epc.get('floor_height') self.insulation_wall_area = None - self.floor_area = None + self.floor_area = epc_record.prepared_epc.get('total_floor_area') self.pitched_roof_area = None self.insulation_floor_area = None - self.number_lighting_outlets = None + self.number_lighting_outlets = epc_record.prepared_epc.get("fixed_lighting_outlets_count") self.floor_level = None self.current_adjusted_energy = None self.expected_adjusted_energy = None - if epc_client: - self.epc_client = epc_client + self.recommendations_scoring_data = [] + + def create_base_difference_epc_record(self, cleaned_lookup: dict): + """ + Creates a EPCDifferenceRecord object, which is used to store the difference between the current and + expected EPC + It will be the same starting and ending EPC, as we don't have the expected EPC yet + """ + + difference_record = self.epc_record - self.epc_record + + # TODO: change these lower and replace in the settings file + fixed_data_col_names = MANDATORY_FIXED_FEATURES + LATEST_FIELD + print("NEED TO CHANGE THE DASH TO LOWER CASE") + fixed_data_col_names = [x.lower().replace("_", "-") for x in fixed_data_col_names] + + fixed_data = {k.replace("-", "_"):v for k,v in self.data.items() if k in fixed_data_col_names} + + difference_record.append_fixed_data(fixed_data) + + self.base_difference_record = TrainingDataset(datasets=[difference_record], cleaned_lookup=cleaned_lookup) + + # TODO: adjust the base difference record with the previously calculated u values + features + # estimated_perimeter is different to the perimeter in the epc record + + # self.base_difference_record.df + + def adjust_difference_record_with_recommendations(self, property_recommendations): + """ + This method will adjust the difference record, based on the recommendations made for the property + :param recommendations: dictionary of recommendations for the property + :return: + """ + + for recommendations_by_type in property_recommendations: + for i, rec in enumerate(recommendations_by_type): + scoring_dict = self.create_recommendation_scoring_data( + recommendation=rec, + ) + scoring_dict['id'] = "+".join([str(self.id), str(rec["recommendation_id"])]) + + self.recommendations_scoring_data.append(scoring_dict) + + def create_recommendation_scoring_data(self, recommendation: dict): + + recommendation_record = self.base_difference_record.df.to_dict("records")[0].copy() + + for col in [ + "walls_insulation_thickness", "floor_insulation_thickness", "roof_insulation_thickness" + ]: + if recommendation_record[col] is None: + recommendation_record[col] = "none" + + # We update the description to indicate it's insulated + if recommendation["type"] in ["internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"]: + # The upgrade made here is to the u-value of the walls and the description of the + # insulation thickness + recommendation_record["walls_thermal_transmittance_ending"] = recommendation["new_u_value"] + recommendation_record["walls_insulation_thickness_ending"] = "above average" + recommendation_record["walls_energy_eff_ending"] = "Good" else: - self.epc_client = EpcClient(auth_token=EPC_AUTH_TOKEN) + if recommendation_record["walls_thermal_transmittance_ending"] is None: + raise ValueError("We should not have a None value for the u value") - def search_address_epc(self): - """ - This method searches for an address in the EPC database and returns the first result - :return: property data - """ - if self.data: - return + if recommendation_record["walls_insulation_thickness_ending"] is None: + recommendation_record["walls_insulation_thickness_ending"] = "none" - # This will fail if a property does not have an EPC - this has been documented as a case to handle - response = self.epc_client.domestic.search(params={"address": self.address1, "postcode": self.postcode}) + # Update description to indicate it's insulate + if recommendation["type"] in ["solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation"]: + if len(recommendation["parts"]) > 1: + raise NotImplementedError("Have more than 1 floor insulation part - handle this case") - # Check if we have a full sap EPC - self.full_sap_epc = [r for r in response["rows"] if r["transaction-type"] == "new dwelling"] - self.full_sap_epc = self.full_sap_epc[0] if self.full_sap_epc else self.full_sap_epc + recommendation_record["floor_thermal_transmittance_ending"] = recommendation["new_u_value"] + # We don't really see above average for this in the training data + recommendation_record["floor_insulation_thickness_ending"] = "average" + recommendation_record["floor_energy_eff_ending"] = "Good" + else: + if recommendation_record["floor_thermal_transmittance_ending"] is None: + raise ValueError("We should not have a None value for the u value") - if len(response["rows"]) > 1: - newest_response = [ - r for r in response["rows"] if - r["lodgement-datetime"] == max([x["lodgement-datetime"] for x in response["rows"]]) + if recommendation_record["floor_insulation_thickness_ending"] is None: + recommendation_record["floor_insulation_thickness_ending"] = "none" + + if recommendation["type"] in ["loft_insulation", "room_roof_insulation", "flat_roof_insulation"]: + recommendation_record["roof_thermal_transmittance_ending"] = recommendation["new_u_value"] + + parts = recommendation["parts"] + if len(parts) != 1: + raise ValueError("More than one part for roof insulation - investiage me") + + # This is based on the values we have in the training data + valid_numeric_values = [ + 12, 25, 50, 75, 100, 150, 200, 250, 270, 300, 350, 400 ] - if len(newest_response) > 1: - raise Exception("More than one result found for this address - investigate me") - # We'll keep old EPCs in case it contains information, not present on the newest one - self.old_data = [epc for epc in response["rows"] if epc["lmk-key"] != newest_response[0]["lmk-key"]] + proposed_depth = int(parts[0]["depth"]) + if proposed_depth not in valid_numeric_values: + # Take the nearest value for scoring + proposed_depth = min(valid_numeric_values, key=lambda x: abs(x - proposed_depth)) - response["rows"] = newest_response - - self.data = response["rows"][0] - # For the moment, if we don't have a UPRN, we don't do anything about it, however we'll handle this in - # the future by using the Ordnance Survey places API - if not self.data["uprn"]: - logger.warning("We do not have a UPRN for this property") + recommendation_record["roof_insulation_thickness_ending"] = str(proposed_depth) + recommendation_record["roof_energy_eff_ending"] = "Very Good" else: - self.uprn = int(self.data["uprn"]) + # Fill missing roof u-values - this fill is not based on recommended upgrades + if recommendation_record["roof_thermal_transmittance_ending"] is None: + raise ValueError("We should not have a None value for the u value") - def set_energy(self): - """ - Extracts and formats data about the home's energy and co2 consumption - To being with, this is just formatting epc data + if recommendation_record["roof_insulation_thickness_ending"] is None: + recommendation_record["roof_insulation_thickness_ending"] = "none" - Data: - - primary_energy_consumption - This is based on the "energy-consumption-current" field in the EPC data. - Current estimated total energy consumption for the property in a 12 month period (kWh/m2). Displayed on EPC - as the current primary energy use per square metre of floor area. + if recommendation["type"] == "mechanical_ventilation": + recommendation_record["mechanical_ventilation_ending"] = 'mechanical, extract only' - - co2_emissions - This is based on the "co2-emissions-current" field in the EPC data. - CO₂ emissions per year in tonnes/year. - """ + if recommendation["type"] == "sealing_open_fireplace": + recommendation_record["number_open_fireplaces_ending"] = 0 - self.energy = { - "primary_energy_consumption": float(self.data["energy-consumption-current"]), - "co2_emissions": float(self.data["co2-emissions-current"]), - } + if recommendation["type"] == "low_energy_lighting": + recommendation_record["low_energy_lighting_ending"] = 100 + recommendation_record["lighting_energy_eff_starting"] = "Very Good" - def set_ventilation(self): - """ - Extracts and formats data about the home's ventilation - To being with, this is just formatting epc data + if recommendation["type"] not in [ + "mechanical_ventilation", "sealing_open_fireplace", "low_energy_lighting", + "internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation", + "loft_insulation", "room_roof_insulation", "flat_roof_insulation", + "solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation" + ]: + raise NotImplementedError("Implement me") - Data: - - ventilation - This is based on the "ventilation-type" field in the EPC data. - Ventilation type of the property. - """ + return recommendation_record - ventilation = self.data["mechanical-ventilation"] - # perform some simple cleaning - when checking 300k epc, the only unique values were - # {'', 'mechanical, supply and extract', 'NO DATA!', 'natural', 'mechanical, extract only'} - if ventilation in self.DATA_ANOMALY_MATCHES or ventilation in [""]: - ventilation = None - - self.ventilation = { - "ventilation": ventilation, - } - - def set_solar_pv(self): - """ - Extracts and formats data about the home's solar pv - To being with, this is just formatting epc data - - Data: - - solar_pv - This is based on the "photo-supply" field in the EPC data. - - When checking 100k epc, either the value was "" or a stringified number - """ - - solar_pv = self.data["photo-supply"] - if solar_pv == "": - solar_pv = None - else: - solar_pv = float(solar_pv) - - self.solar_pv = { - "solar_pv": solar_pv, - } - - def set_solar_hot_water(self): - """ - Extracts and formats data about the home's solar hot water - We are just formatting the solar-water-heating-flag in the epc data - :return: - """ - - value_map = { - "Y": True, - "N": False, - "": None, - } - - self.solar_hot_water = { - "solar_hot_water": value_map[self.data["solar-water-heating-flag"]], - } - - def set_wind_turbine(self): - """ - Extracts and formats data about the home's wind turbine - We are just formatting the wind-turbine-flag in the epc data - :return: - """ - - wind_turbine_count = self.data["wind-turbine-count"] - if wind_turbine_count == "": - wind_turbine_count = None - else: - wind_turbine_count = int(wind_turbine_count) - - self.wind_turbine = { - "wind_turbine": wind_turbine_count, - } - - def set_count_variables(self): - - """ - For EPC fields that are just counts, we'll set them here - These are fields that are integers but may contain additional values such as "" so we can't do a direct - conversion straight to an integer - :return: - """ - - fields = { - "number_of_open_fireplaces": "number-open-fireplaces", - "number_of_extensions": "extension-count", - "number_of_storeys": "flat-storey-count", - "number_of_rooms": "number-habitable-rooms", - } - - null_attributes = ["number_of_storeys", "number_of_rooms"] - - for attribute, epc_field in fields.items(): - value = self.data["extension-count"] - if value == "" or value in self.DATA_ANOMALY_MATCHES: - if attribute in null_attributes: - value = None - else: - value = 0 - else: - value = int(value) - - setattr(self, attribute, value) def get_components(self, cleaned): """ @@ -274,27 +261,9 @@ class Property(Definitions): if not self.data: raise ValueError("Property does not contain data") - # We need to implement an EPC cleaning process, which we run on the EPC data, immediately after we download - # it - self.data["built-form"] = BUILT_FORM_REMAP.get(self.data["built-form"], self.data["built-form"]) - if self.data["built-form"] in self.DATA_ANOMALY_MATCHES: - if self.data["property-type"] == "Flat": - self.data["built-form"] = "Semi-Detached" - - self.set_energy() - self.set_ventilation() - self.set_solar_pv() - self.set_solar_hot_water() - self.set_wind_turbine() - self.set_count_variables() - self.set_heat_loss_corridor() - self.set_mains_gas() - self.set_age_band() - self.set_basic_property_dimensions() for description, attribute in cleaned.items(): - if self.data[description] in self.DATA_ANOMALY_MATCHES: template = cleaned[description][0] fill_dict = dict(zip(template.keys(), [None] * len(template))) @@ -333,38 +302,6 @@ class Property(Definitions): self.set_floor_type() self.set_floor_level() - def set_age_band(self): - """ - Sets a cleaned version of the age band of the property given the EPC data - :return: - """ - - if not self.data: - raise ValueError("Property does not contain data") - - self.construction_age_band = DataProcessor.clean_construction_age_band(self.data["construction-age-band"]) - if self.construction_age_band in self.DATA_ANOMALY_MATCHES: - if self.old_data: - # Take the most recent - max_datetime = max( - [x["lodgement-datetime"] for x in self.old_data if - x["construction-age-band"] not in self.DATA_ANOMALY_MATCHES] - ) - most_recent = [x for x in self.old_data if x["lodgement-datetime"] == max_datetime] - - self.construction_age_band = DataProcessor.clean_construction_age_band( - most_recent[0]["construction-age-band"] - ) - - self.age_band = england_wales_age_band_lookup.get(self.construction_age_band) - - if (self.data["transaction-type"] == "new dwelling") and (self.age_band is None): - self.age_band = "L" - self.construction_age_band = 'England and Wales: 2012 onwards' - - if self.age_band is None: - raise ValueError("age_band is missing") - def set_spatial(self, spatial: pd.DataFrame): """ Sets whether the property is in a conservation area given the output of the ConservationAreaClient @@ -392,71 +329,6 @@ class Property(Definitions): "is_heritage_building": spatial_dict["is_heritage_building"], } - def set_year_built(self): - """ - Estimates when the property was built based on as much available data as possible. - - """ - - if self.full_sap_epc: - self.year_built = datetime.strptime(self.full_sap_epc["lodgement-date"], '%Y-%m-%d').year - - return - - if self.data["construction-age-band"] not in self.DATA_ANOMALY_MATCHES: - # Take the lower limit. If we're pessimistic about the age of the property, that at least means we have - # more options for recommendations if that age falls before the year that insulation in walls became - # common practice - band = [int(x) for x in re.findall(r'\b\d{4}\b', self.data["construction-age-band"])] - self.year_built = band[0] - return - - # We don't know when the property was built - self.year_built = None - - def set_heat_loss_corridor(self): - """ - cleans the heat-loss-corridor - :return: - """ - map = { - "no corridor": False, - "unheated corridor": True, - "heated corridor": False - } - - if self.data["heat-loss-corridor"] in self.DATA_ANOMALY_MATCHES: - has_heat_loss_corridor = False - else: - has_heat_loss_corridor = map[self.data["heat-loss-corridor"]] - - length = self.data["unheated-corridor-length"] - if length == "": - length = None - else: - length = float(length) - - self.heat_loss_corridor = { - "heat_loss_corridor": has_heat_loss_corridor, - "length": length - } - - def set_mains_gas(self): - """ - Sets whether the property has mains gas - :return: - """ - - map = { - "Y": True, - "N": False, - } - - if self.data["mains-gas-flag"] == "" or self.data["mains-gas-flag"] in self.DATA_ANOMALY_MATCHES: - self.mains_gas = None - else: - self.mains_gas = map[self.data["mains-gas-flag"]] - def _clean_upload_data(self, to_update): for k, v in to_update.items(): if v in self.DATA_ANOMALY_MATCHES: @@ -603,36 +475,6 @@ class Property(Definitions): :return: """ - self.floor_area = float(self.data["total-floor-area"]) - - if not self.data["number-habitable-rooms"] or ( - self.data["floor-height"] == "" or self.data["floor-height"] in self.DATA_ANOMALY_MATCHES - ): - if self.property_dimensions is None: - property_dimensions = read_dataframe_from_s3_parquet( - bucket_name=DATA_BUCKET, file_key=f"property_dimensions/{self.data['local-authority']}.parquet" - ) - self.property_dimensions = self._filter_property_dimensions(property_dimensions) - - if not self.data["number-habitable-rooms"]: - self.number_of_rooms = float(self.property_dimensions["NUMBER_HABITABLE_ROOMS"].round()) - else: - self.number_of_rooms = float(self.data["number-habitable-rooms"]) - - if self.data["property-type"] == "House": - self.number_of_floors = 2 - elif self.data["property-type"] in ["Flat", "Bungalow"]: - self.number_of_floors = 1 - elif self.data["property-type"] == "Maisonette": - self.number_of_floors = 2 - else: - raise NotImplementedError("Implement me") - - if self.data["floor-height"] == "" or self.data["floor-height"] in self.DATA_ANOMALY_MATCHES: - self.floor_height = float(self.property_dimensions["FLOOR_HEIGHT"].round(2)) - else: - self.floor_height = float(self.data["floor-height"]) - self.perimeter = estimate_perimeter( self.floor_area / self.number_of_floors, self.number_of_rooms / self.number_of_floors ) @@ -653,7 +495,8 @@ class Property(Definitions): def set_floor_level(self): self.floor_level = ( FLOOR_LEVEL_MAP[self.data["floor-level"]] if - self.data["floor-level"] not in self.DATA_ANOMALY_MATCHES else None + self.data["floor-level"] not in self.DATA_ANOMALY_MATCHES and self.data['floor-level'] is not None + else None ) if self.floor_level is None: @@ -798,7 +641,7 @@ class Property(Definitions): **hotwater, **windows, "SECONDHEAT_DESCRIPTION": second_heating, - "DAYS_TO": DataProcessor.calculate_days_to(self.data["lodgement-date"]), + "DAYS_TO": EPCDataProcessor.calculate_days_to(self.data["lodgement-date"]), "SAP": float(self.data["current-energy-efficiency"]), "CARBON": float(self.data["co2-emissions-current"]), "HEAT_DEMAND": float(self.data["energy-consumption-current"]), diff --git a/backend/app/db/models/materials.py b/backend/app/db/models/materials.py index f887fc25..2ac7ddf4 100644 --- a/backend/app/db/models/materials.py +++ b/backend/app/db/models/materials.py @@ -18,6 +18,8 @@ class MaterialType(enum.Enum): exposed_floor_insulation = "exposed_floor_insulation" flat_roof_insulation = "flat_roof_insulation" room_roof_insulation = "room_roof_insulation" + windows_glazing = "windows_glazing" + iwi_wall_demolition = "iwi_wall_demolition" iwi_vapour_barrier = "iwi_vapour_barrier" diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 996ddcfd..1704a42f 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -3,6 +3,7 @@ from datetime import datetime import numpy as np import pandas as pd from epc_api.client import EpcClient +from etl.epc.Record import EPCRecord from fastapi import APIRouter, Depends from sqlalchemy.exc import IntegrityError, OperationalError from sqlalchemy.orm import sessionmaker @@ -27,7 +28,7 @@ from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, read_par from backend.ml_models.api import ModelApi from backend.Property import Property -from etl.epc.DataProcessor import DataProcessor +from etl.epc.DataProcessor import EPCDataProcessor from etl.epc.settings import COLUMNS_TO_MERGE_ON from recommendations.optimiser.CostOptimiser import CostOptimiser from recommendations.optimiser.GainOptimiser import GainOptimiser @@ -42,6 +43,54 @@ logger = setup_logger() BATCH_SIZE = 5 +class DummyDownloader: + + def __init__(self, postcode, address1, id, epc_client): + self.id = id + self.postcode = postcode + self.address1 = address1 + + self.data = None + self.old_data = None + + self.epc_client = epc_client + + def search_address_epc(self): + """ + This method searches for an address in the EPC database and returns the first result + :return: property data + """ + if self.data: + return + + # This will fail if a property does not have an EPC - this has been documented as a case to handle + response = self.epc_client.domestic.search(params={"address": self.address1, "postcode": self.postcode}) + + # Check if we have a full sap EPC + self.full_sap_epc = [r for r in response["rows"] if r["transaction-type"] == "new dwelling"] + self.full_sap_epc = self.full_sap_epc[0] if self.full_sap_epc else self.full_sap_epc + + if len(response["rows"]) > 1: + newest_response = [ + r for r in response["rows"] if + r["lodgement-datetime"] == max([x["lodgement-datetime"] for x in response["rows"]]) + ] + if len(newest_response) > 1: + raise Exception("More than one result found for this address - investigate me") + + # We'll keep old EPCs in case it contains information, not present on the newest one + self.old_data = [epc for epc in response["rows"] if epc["lmk-key"] != newest_response[0]["lmk-key"]] + + response["rows"] = newest_response + + self.data = response["rows"][0] + # For the moment, if we don't have a UPRN, we don't do anything about it, however we'll handle this in + # the future by using the Ordnance Survey places API + if not self.data["uprn"]: + logger.warning("We do not have a UPRN for this property") + else: + self.uprn = int(self.data["uprn"]) + router = APIRouter( prefix="/plan", tags=["plan"], @@ -49,16 +98,19 @@ router = APIRouter( responses={404: {"description": "Not found"}} ) +# TODO: Need to install base.txt requirements into new env @router.post("/trigger") async def trigger_plan(body: PlanTriggerRequest): logger.info("Connecting to db") - session = sessionmaker(bind=db_engine)() + # session = sessionmaker(bind=db_engine)() created_at = datetime.now().isoformat() try: session.begin() logger.info("Getting the inputs") + Body = {'portfolio_id': '56', 'housing_type': 'Social', 'goal': 'Increase EPC', 'goal_value': 'A', 'trigger_file_path': '8/56/windows_portfolio_inputs.csv'} + body = PlanTriggerRequest(**Body) epc_client = EpcClient(auth_token=get_settings().EPC_AUTH_TOKEN) plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path) uprn_filenames = read_dataframe_from_s3_parquet( @@ -69,6 +121,7 @@ async def trigger_plan(body: PlanTriggerRequest): ) input_properties = [] + for config in plan_input: # We validate each record in the file. If the record is NOT valid, we need to handle this accordingly # TODO: implment validation. We should also standardise postcode and address in some fashion as @@ -90,23 +143,35 @@ async def trigger_plan(body: PlanTriggerRequest): heat_demand_target=None ) - input_properties.append( - Property( - postcode=config['postcode'], + epc_downloader = DummyDownloader(id=0, epc_client=epc_client, postcode=config['postcode'], address1=config['address']) + epc_downloader.search_address_epc() + + epc_records ={ + 'original_epc': epc_downloader.data.copy(), + 'full_sap_epc': epc_downloader.full_sap_epc.copy() if epc_downloader.full_sap_epc else [], + 'old_data': epc_downloader.old_data.copy() if epc_downloader.old_data else [] + } + + prepared_epc = EPCRecord(epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data) # This uses all the epc records to clean the data + + p = Property( + id=property_id, address1=config['address'], - epc_client=epc_client, - id=property_id + postcode=config['postcode'], + epc_record=prepared_epc, ) + + logger.info("Getting spatial data") + + p.get_spatial_data(uprn_filenames) + input_properties.append( + p ) - if not input_properties: - return Response(status_code=204) - logger.info("Getting EPC, and spatial data") - for p in input_properties: - p.search_address_epc() - p.set_year_built() - p.get_spatial_data(uprn_filenames) + if not input_properties: + + return Response(status_code=204) # The materials data could be cached or local so we don't need to make # consistent requests to the backend for @@ -129,14 +194,6 @@ async def trigger_plan(body: PlanTriggerRequest): # Property recommendations p.get_components(cleaned) - # This is temp - this should happen after scoring - cleaned_property_data = DataProcessor.apply_averages_cleaning( - data_to_clean=pd.DataFrame([dict(**p.get_model_data(), LOCAL_AUTHORITY=p.data["local-authority"])]), - cleaning_data=cleaning_data, - cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'], - ) - p.set_number_lighting_outlets(cleaned_property_data) - recommender = Recommendations(property_instance=p, materials=materials) property_recommendations = recommender.recommend() @@ -145,75 +202,17 @@ async def trigger_plan(body: PlanTriggerRequest): recommendations[p.id] = property_recommendations - from etl.epc.Pipeline import EPCPipeline - from etl.epc.DataProcessor import EPCDataProcessor - newdata_pipeline = EPCPipeline(epc_data_processor=EPCDataProcessor(data=p.get_model_data(), run_mode="newdata"), run_mode="newdata") + p.create_base_difference_epc_record(cleaned_lookup=cleaned) + p.adjust_difference_record_with_recommendations(property_recommendations) - # TODO: p.get_model_data() -> EPCRecord + recommendations_scoring_data.extend(p.recommendations_scoring_data) - # Finally, we'll prepare data for predicting the impact on SAP - # data_processor = DataProcessor(None, newdata=True) - # data_processor.insert_data(pd.DataFrame([p.get_model_data()])) - # # TODO: Temp - # if data_processor.data["UPRN"].values[0] == "": - # data_processor.data["UPRN"] = 0 - - # data_processor.pre_process() - - # starting_epc_data = data_processor.get_component_features(suffix="_STARTING") - # ending_epc_data = data_processor.get_component_features(suffix="_ENDING") - # fixed_data = data_processor.get_fixed_features() - - # # We update the ending record with the recommended updates and we set lodgement date to today - # ending_epc_data["DAYS_TO_ENDING"] = data_processor.calculate_days_to(created_at) - - # TODO: EPCRecord - AdjustedEPCRecord - - # property_scoring_data[p.id] = { - # "starting_epc_data": starting_epc_data, - # "ending_epc_data": ending_epc_data, - # "fixed_data": fixed_data - # } - - for recommendations_by_type in property_recommendations: - for i, rec in enumerate(recommendations_by_type): - scoring_dict = create_recommendation_scoring_data( - property=p, - recommendation=rec, - starting_epc_data=starting_epc_data, - ending_epc_data=ending_epc_data, - fixed_data=fixed_data, - ) - - recommendations_scoring_data.append(scoring_dict) - - # cleanup - # del data_processor logger.info("Preparing data for scoring in sap change api") recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data) - - # Perform the same cleaning as in the model - first clean number of room variables though - recommendations_scoring_data = DataProcessor.apply_averages_cleaning( - data_to_clean=recommendations_scoring_data, - cleaning_data=cleaning_data, - cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'], - colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"], - ) - - recommendations_scoring_data = DataProcessor.apply_averages_cleaning( - data_to_clean=recommendations_scoring_data, - cleaning_data=cleaning_data, - cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"], - ).drop(columns=["LOCAL_AUTHORITY"]) - - recommendations_scoring_data = DataProcessor.clean_missings_after_description_process( - recommendations_scoring_data, - ignore_cols=[c for c in recommendations_scoring_data.columns if ("thermal_transmittance" in c) or ( - "insulation_thickness" in c) or ("ENERGY_EFF" in c)] - ) - - recommendations_scoring_data = DataProcessor.clean_efficiency_variables(recommendations_scoring_data) + recommendations_scoring_data = recommendations_scoring_data.drop( + columns=["rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending", "carbon_ending"] + ) model_api = ModelApi(portfolio_id=body.portfolio_id, timestamp=created_at) all_predictions = model_api.predict_all( diff --git a/etl/epc/DataProcessor.py b/etl/epc/DataProcessor.py index 48b62dc7..c9f937c0 100644 --- a/etl/epc/DataProcessor.py +++ b/etl/epc/DataProcessor.py @@ -27,6 +27,13 @@ from recommendations.rdsap_tables import FLOOR_LEVEL_MAP from typing import List +# TODO: change the setting columns to lower +STARTING_SUFFIX_COMPONENT_COLS = [x.lower() for x in STARTING_SUFFIX_COMPONENT_COLS] +NO_SUFFIX_COMPONENT_COLS = [x.lower() for x in NO_SUFFIX_COMPONENT_COLS] +ENDING_SUFFIX_COMPONENT_COLS = [x.lower() for x in ENDING_SUFFIX_COMPONENT_COLS] +POTENTIAL_COLUMNS = [x.lower() for x in POTENTIAL_COLUMNS] + + # These lookups are used to clean the construction age band construction_age_bounds_map = { "England and Wales: before 1900": {"l": 0, "u": 1899}, @@ -67,15 +74,18 @@ class EPCDataProcessor: Handle data loading and data preprocessing """ - def __init__(self, data: pd.DataFrame | None = None, run_mode: str = "training", violation_mode: bool = False) -> None: + def __init__(self, data: pd.DataFrame | None = None, cleaning_averages: pd.DataFrame | None = None, run_mode: str = "training", violation_mode: bool = False) -> None: """ :param filepath: If specified, is the physical location of the data :param is_newdata: Indicates if we are processing new, testing data. In this instance, there are some operations we do not want to perform, such as confine_data() """ - self.data : pd.DataFrame = data if data else pd.DataFrame() - self.cleaning_averages : pd.DataFrame = pd.DataFrame() + is_data_a_dataframe = isinstance(data, pd.DataFrame) + self.data : pd.DataFrame = data if is_data_a_dataframe else pd.DataFrame() + + is_cleaning_averages_a_dataframe = isinstance(cleaning_averages, pd.DataFrame) + self.cleaning_averages : pd.DataFrame = cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame() # FOR NOW IF VIOLATION MODE IS ON, WE USE RUN MODE AS NEWDATA self.violation_mode = violation_mode @@ -124,13 +134,22 @@ class EPCDataProcessor: self.fill_invalid_constituency_fields(ignore_step=ignore_step) - self.cleaning_averages = self.make_cleaning_averages(ignore_step=ignore_step) + self.make_cleaning_averages(ignore_step=ignore_step) + + # TODO: check if this has impact on training dataset + cleaned_data = self.apply_averages_cleaning( + data_to_clean=self.data, + cleaning_data=self.cleaning_averages, + cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'], + colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"], + ) + cleaned_data = self.apply_averages_cleaning( data_to_clean=self.data, cleaning_data=self.cleaning_averages, cols_to_merge_on=COLUMNS_TO_MERGE_ON, - ignore_step=ignore_step ) + self.data = self.data if cleaned_data is None else cleaned_data self.add_local_authority_to_cleaning_average(ignore_step=ignore_step) @@ -549,7 +568,7 @@ class EPCDataProcessor: # "FLOOR_HEIGHT" # ].fillna(FLOOR_HEIGHT_NATIONAL_AVERAGE) - return cleaning_averages_filled + self.cleaning_averages = cleaning_averages_filled def retain_multiple_epc_properties(self, epc_minimum_count: int = 1, ignore_step: bool = False) -> None: """ @@ -765,8 +784,8 @@ class EPCDataProcessor: :return: Pandas dataframe containing the subset of columns defined in COMPONENT_FEATURES """ - if suffix not in ["_STARTING", "_ENDING"]: - raise Exception("Suffix should be one of _STARTING or _ENDING") + if suffix not in ["_starting", "_ending"]: + raise Exception("Suffix should be one of _starting or _ending") if suffix == "_STARTING": starting_cols = self.data[STARTING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES].copy().add_suffix(suffix) diff --git a/etl/epc/Dataset.py b/etl/epc/Dataset.py index 92efd0b3..4ca98fc6 100644 --- a/etl/epc/Dataset.py +++ b/etl/epc/Dataset.py @@ -2,12 +2,12 @@ import numpy as np import pandas as pd from typing import List from etl.epc.Record import EPCDifferenceRecord -from ValidationConfiguration import DatasetValidationConfiguration +from etl.epc.ValidationConfiguration import DatasetValidationConfiguration from etl.epc.settings import EARLIEST_EPC_DATE from recommendations.rdsap_tables import england_wales_age_band_lookup from recommendations.recommendation_utils import ( - get_wall_u_value, get_roof_u_value, get_floor_u_value, estimate_perimeter, + estimate_number_of_floors, get_wall_u_value, get_roof_u_value, get_floor_u_value, estimate_perimeter, get_wall_type ) @@ -204,9 +204,14 @@ class TrainingDataset(BaseDataset): # ~~~~~~~~~~~~~~~~~~ # Floor # ~~~~~~~~~~~~~~~~~~ - + + self.df['estimated_number_of_floors'] = self.df.apply( + lambda row: estimate_number_of_floors(row['property_type']), + axis=1 + ) + self.df['estimated_perimeter_starting'] = self.df.apply( - lambda row: estimate_perimeter(row["total_floor_area_starting"], row["number_habitable_rooms"]), + lambda row: estimate_perimeter(row["total_floor_area_starting"]/ row['estimated_number_of_floors'], row["number_habitable_rooms"]/ row['estimated_number_of_floors']), axis=1 ) self.df['estimated_perimeter_ending'] = self.df.apply( @@ -244,7 +249,7 @@ class TrainingDataset(BaseDataset): self.df[f"{component}_thermal_transmittance"] = self.df[f"{component}_thermal_transmittance"].fillna(eval(f"{component}_starting_uvalue")) self.df[f"{component}_thermal_transmittance_ending"] = self.df[f"{component}_thermal_transmittance_ending"].fillna(eval(f"{component}_ending_uvalue")) - self.df = self.df.drop(columns=["floor_type", "wall_type", "walls_clean_description", "walls_clean_description_ending"]) + self.df = self.df.drop(columns=["floor_type", "wall_type", "walls_clean_description", "walls_clean_description_ending", 'estimated_number_of_floors']) def _adjust_assumed_values_in_wall_descriptions(self): @@ -386,7 +391,7 @@ class TrainingDataset(BaseDataset): suffixes=("", "_ending") ) - # Drop inconsistent properties + # Drop properties where key material types have changed expanded_df = self._drop_inconsistent_properties(expanded_df, component) # Drop original cols and cols to drop diff --git a/etl/epc/Pipeline.py b/etl/epc/Pipeline.py index de827dee..ea484e56 100644 --- a/etl/epc/Pipeline.py +++ b/etl/epc/Pipeline.py @@ -3,6 +3,7 @@ import pandas as pd from typing import List from pathlib import Path +from tqdm import tqdm from etl.epc.DataProcessor import EPCDataProcessor from etl.epc.Record import EPCRecord, EPCDifferenceRecord @@ -52,7 +53,7 @@ def get_cleaned_description_mapping(): return cleaned - +clean_lookup = get_cleaned_description_mapping() class EPCPipeline: """ @@ -67,6 +68,7 @@ class EPCPipeline: def __init__( self, epc_data_processor: EPCDataProcessor, + api_epc_records: dict = None, directories: List[Path] | None = None, run_mode="training", epc_local_file="certificates.csv", @@ -91,6 +93,7 @@ class EPCPipeline: self.directories = directories self.epc_data_processor = epc_data_processor + self.api_epc_records = api_epc_records self.run_mode = run_mode self.epc_local_file = epc_local_file self.epc_bucket_name = epc_bucket_name @@ -113,9 +116,15 @@ class EPCPipeline: """ Main function to run the newdata pipeline """ + prepared_epc = EPCRecord(self.api_epc_records, run_mode="newdata") # This uses all the epc records to clean the data + + self.epc_data_processor.insert_data(prepared_epc) self.epc_data_processor.prepare_data() + data = self.epc_data_processor.data + + epc_records = [EPCRecord(**x, run_mode="newdata") for x in data.to_dict(orient='records')] def run_training_dataset_pipeline(self): @@ -125,7 +134,7 @@ class EPCPipeline: if self.directories is None: raise ValueError("Directories not specified - Unable to run Training pipeline") - for directory in self.directories: + for directory in tqdm(self.directories): self.process_directory(directory) # save_dataframe_to_s3_parquet( @@ -165,7 +174,7 @@ class EPCPipeline: if difference_records is not None: constituency_difference_records.extend(difference_records) - constituency_dataset = TrainingDataset(datasets=constituency_difference_records, cleaned_lookup=get_cleaned_description_mapping()) + constituency_dataset = TrainingDataset(datasets=constituency_difference_records, cleaned_lookup=clean_lookup) self.compiled_dataset = pd.concat([self.compiled_dataset, constituency_dataset.df]) diff --git a/etl/epc/Record.py b/etl/epc/Record.py index f2b3f519..85a8f0f2 100644 --- a/etl/epc/Record.py +++ b/etl/epc/Record.py @@ -1,10 +1,17 @@ - +from datetime import datetime from dataclasses import dataclass from etl.epc.ValidationConfiguration import ( EPCRecordValidationConfiguration, EPCDifferenceRecordValidationConfiguration, EPCDifferenceRecordFixedDataValidationConfiguration ) +from etl.epc.DataProcessor import EPCDataProcessor +from recommendations.rdsap_tables import england_wales_age_band_lookup, FLOOR_LEVEL_MAP +from etl.epc.settings import DATA_ANOMALY_MATCHES, BUILT_FORM_REMAP +import re +import os +import numpy as np +import pandas as pd from typing import Any, Union, List from etl.epc.settings import ( RDSAP_RESPONSE, @@ -13,6 +20,8 @@ from etl.epc.settings import ( COMPONENT_FEATURES, EFFICIENCY_FEATURES ) +from utils.s3 import read_dataframe_from_s3_parquet +from etl.epc.settings import EARLIEST_EPC_DATE # TODO: Change these in the settings file RDSAP_RESPONSE = RDSAP_RESPONSE.lower() @@ -21,78 +30,235 @@ CARBON_RESPONSE = CARBON_RESPONSE.lower() COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES] EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES] +ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev') +DATA_BUCKET = os.environ.get('DATA_BUCKET', 'retrofit-data-dev' if ENVIRONMENT == 'dev' else None) + @dataclass class EPCRecord: """ Base class for a EPC record """ - uprn: str - walls_description: str - floor_description : str - lighting_description : str - roof_description : str - mainheat_description : str - hotwater_description : str - main_fuel : str - mechanical_ventilation : str - secondheat_description : str - windows_description : str - glazed_type : str - multi_glaze_proportion : float - low_energy_lighting : float - number_open_fireplaces : float - mainheatcont_description : str - solar_water_heating_flag : str - photo_supply : float - transaction_type : str - energy_tariff : str - extension_count : float - total_floor_area : float - floor_height : float - hot_water_energy_eff : str - floor_energy_eff : str - windows_energy_eff : str - walls_energy_eff : str - sheating_energy_eff : str - roof_energy_eff : str - mainheat_energy_eff : str - mainheatc_energy_eff : str - lighting_energy_eff : str - potential_energy_efficiency : float - environment_impact_potential : float - energy_consumption_potential : float - co2_emissions_potential : float - lodgement_date : str - current_energy_efficiency : int - energy_consumption_current : int - co2_emissions_current : float + uprn: int = None + walls_description: str = None + floor_description : str = None + lighting_description : str = None + roof_description : str = None + mainheat_description : str = None + hotwater_description : str = None + main_fuel : str = None + mechanical_ventilation : str = None + secondheat_description : str = None + windows_description : str = None + glazed_type : str = None + multi_glaze_proportion : float = None + low_energy_lighting : float = None + number_open_fireplaces : float = None + mainheatcont_description : str = None + solar_water_heating_flag : str = None + photo_supply : float = None + transaction_type : str = None + energy_tariff : str = None + extension_count : float = None + total_floor_area : float = None + floor_height : float = None + hot_water_energy_eff : str = None + floor_energy_eff : str = None + windows_energy_eff : str = None + walls_energy_eff : str = None + sheating_energy_eff : str = None + roof_energy_eff : str = None + mainheat_energy_eff : str = None + mainheatc_energy_eff : str = None + lighting_energy_eff : str = None + potential_energy_efficiency : float = None + environment_impact_potential : float = None + energy_consumption_potential : float = None + co2_emissions_potential : float = None + lodgement_date : str = None + current_energy_efficiency : int = None + energy_consumption_current : int = None + co2_emissions_current : float = None - u_values_walls = None - u_values_roof = None - u_values_floor = None + # u_values_walls = None + # u_values_roof = None + # u_values_floor = None run_mode: str = "training" + # TODO: Make this a class so thet api_records is structured + epc_records: dict = None + full_sap_epc: dict = None + old_data: list[dict] = None + original_epc: dict = None + prepared_epc: dict = None + prepared_epc_delta_metadata: pd.DataFrame = None + cleaning_data: pd.DataFrame = None + + # Not used in training mod but used in newdata mode + age_band: str = None + construction_age_band: str = None + year_built: int = None + number_of_floors: int = None + number_of_open_fireplaces: int = None + def __post_init__(self): # We can have validation and cleaning steps for each of the fields # self.WALLS_DESCRIPTION = 'check' - # Could also have cleaning of records if needed - self.validation_configuration = EPCRecordValidationConfiguration + # Could also have cleaning of records if needed - # self._field_validation() - self._clean_records() - if self.run_mode == "newdata": - self._expand_description_to_features() - self._expand_description_to_uvalues() + if self.run_mode == "training": + self.validation_configuration = EPCRecordValidationConfiguration + # self._field_validation() + return + # We are running in newdata mode + if self.epc_records is None: + raise ValueError("Must provide epc records if running in newdata mode") + self.prepared_epc = self.epc_records['original_epc'] + self.original_epc = self.epc_records['original_epc'].copy() + + self.full_sap_epc = self.epc_records['full_sap_epc'] + self.old_data = self.epc_records['old_data'] + + if self.cleaning_data is None: + raise ValueError("Must provide cleaning data if running in newdata mode") + + self._clean_records_using_epc_records() + self._clean_with_data_processor() + self._temp_uprn_catch() + + self._expand_prepared_epc_to_attributes() + + self._identify_delta_between_prepared_and_original_records() + + # Process to create uvalues for the single epc record + + # selff.df = self.epc_record_as_dataframe('prepared_epc') + + # self._feature_generation() + # self._drop_features() + + return + + self._expand_description_to_features() + self._expand_description_to_uvalues() + # self._generate_uvalues() # self._validate_expanded_description() # self._validate_u_values() # etc pass + def _drop_features(self): + """ + Drop features that are not needed for modelling + """ + self.df = self.df.drop(columns=["lodgement_date_starting", "lodgement_date_ending"]) + + def _feature_generation(self): + """ + Generate features for modelling + """ + self.df["days_to_lodgement_date"] = self._calculate_days_to(self.prepared_epc["lodgement_date"]) + + @staticmethod + def _calculate_days_to(lodgement_date): + + if isinstance(lodgement_date, str): + return ( + pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE) + ).days + + return ( + pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE) + ).dt.days + + + def _temp_uprn_catch(self): + """ + Catch the case we do now have uprn + """ + if self.prepared_epc["uprn"] == "": + self.prepared_epc["uprn"] = 0 + + def _clean_with_data_processor(self): + """ + This method will clean the records using the data processor + """ + epc_data_processor = EPCDataProcessor( + data=self.epc_record_as_dataframe("prepared_epc"), + run_mode="newdata", + cleaning_averages=self.cleaning_data + ) + epc_data_processor.prepare_data() + + self.prepared_epc = epc_data_processor.data.to_dict(orient="records")[0] + + + def _expand_prepared_epc_to_attributes(self): + """ + This method will expand the prepared epc to attributes + """ + + # for key, value in self.prepared_epc.items(): + # setattr(self, key, value) + + self.uprn: int = int(self.prepared_epc["uprn"]) + self.walls_description: str = self.prepared_epc["walls_description"] + self.floor_description : str = self.prepared_epc["floor_description"] + self.lighting_description : str = self.prepared_epc["lighting_description"] + self.roof_description : str = self.prepared_epc["roof_description"] + self.mainheat_description : str = self.prepared_epc["mainheat_description"] + self.hotwater_description : str = self.prepared_epc["hotwater_description"] + self.main_fuel : str = self.prepared_epc["main_fuel"] + self.mechanical_ventilation : str = self.prepared_epc["mechanical_ventilation"] + self.secondheat_description : str = self.prepared_epc["secondheat_description"] + self.windows_description : str = self.prepared_epc["windows_description"] + self.glazed_type : str = self.prepared_epc["glazed_type"] + self.multi_glaze_proportion : float = float(self.prepared_epc["multi_glaze_proportion"]) + self.low_energy_lighting : float = float(self.prepared_epc["low_energy_lighting"]) + self.number_open_fireplaces : float = float(self.prepared_epc["number_open_fireplaces"]) + self.mainheatcont_description : str = self.prepared_epc["mainheatcont_description"] + self.solar_water_heating_flag : str = self.prepared_epc["solar_water_heating_flag"] + self.photo_supply : float = float(self.prepared_epc["photo_supply"]) + self.transaction_type : str = self.prepared_epc["transaction_type"] + self.energy_tariff : str = self.prepared_epc["energy_tariff"] + self.extension_count : float = float(self.prepared_epc["extension_count"]) + self.total_floor_area : float = float(self.prepared_epc["total_floor_area"]) + self.floor_height : float = float(self.prepared_epc["floor_height"]) + self.hot_water_energy_eff : str = self.prepared_epc["hot_water_energy_eff"] + self.floor_energy_eff : str = self.prepared_epc["floor_energy_eff"] + self.windows_energy_eff : str = self.prepared_epc["windows_energy_eff"] + self.walls_energy_eff : str = self.prepared_epc["walls_energy_eff"] + self.sheating_energy_eff : str = self.prepared_epc["sheating_energy_eff"] + self.roof_energy_eff : str = self.prepared_epc["roof_energy_eff"] + self.mainheat_energy_eff : str = self.prepared_epc["mainheat_energy_eff"] + self.mainheatc_energy_eff : str = self.prepared_epc["mainheatc_energy_eff"] + self.lighting_energy_eff : str = self.prepared_epc["lighting_energy_eff"] + self.potential_energy_efficiency : float = float(self.prepared_epc["potential_energy_efficiency"]) + self.environment_impact_potential : float = float(self.prepared_epc["environment_impact_potential"]) + self.energy_consumption_potential : float = float(self.prepared_epc["energy_consumption_potential"]) + self.co2_emissions_potential : float = float(self.prepared_epc["co2_emissions_potential"]) + self.lodgement_date : str = self.prepared_epc["lodgement_date"] + self.current_energy_efficiency : int = int(self.prepared_epc["current_energy_efficiency"]) + self.energy_consumption_current : int = int(self.prepared_epc["energy_consumption_current"]) + self.co2_emissions_current : float = float(self.prepared_epc["co2_emissions_current"]) + + def _identify_delta_between_prepared_and_original_records(self): + """ + This method will identify the delta between the prepared and original records + """ + prepared_epc_df = self.epc_record_as_dataframe("prepared_epc") + original_epc_df = self.epc_record_as_dataframe("original_epc") + + df = pd.concat([prepared_epc_df, original_epc_df], keys=["prepared_epc", "original_epc"], axis=0) + + same_index = df.apply(pd.Series.duplicated).any() + self.prepared_epc_delta_metadata = df[same_index[~same_index].index] + + def _expand_description_to_features(self): pass @@ -132,10 +298,28 @@ class EPCRecord: # ) - def _clean_records(self): + def _clean_records_using_epc_records(self): """ This method will clean the records """ + + # TODO: Move all the cleaning steps in the Property class into there + self._clean_built_form() + self._clean_energy() + self._clean_ventilation() + self._clean_solar_pv() + self._clean_solar_hot_water() + self._clean_wind_turbine() + self._clean_count_variables() + self._clean_heat_loss_corridor() + self._clean_mains_gas() + self._clean_age_band() + self._clean_year_built() + self._clean_floor_area() + self._clean_property_dimensions() + self._clean_number_lighting_outlets() + self._clean_floor_level() + # self._clean_potential_energy_efficiency() # self._clean_environment_impact_potential() # self._clean_energy_consumption_potential() @@ -144,6 +328,305 @@ class EPCRecord: # self._clean_energy_consumption_current() # self._clean_co2_emissions_current() + def epc_record_as_dataframe(self, epc_type: str = "prepared_epc", use_upper_columns: bool = True, replace_empty_string: bool = False): + """ + This method will return the dataframe representation of the epc record + """ + df = pd.DataFrame.from_dict(self.get(epc_type), orient="index").T + + if use_upper_columns: + df.columns = [x.upper().replace("-","_") for x in df.columns] + + if replace_empty_string: + df = df.replace("", np.nan) + + return df + + def _clean_floor_level(self): + """ + This method will clean the floor level, if empty or invalid + """ + if not self.prepared_epc: + raise ValueError("EPC Recrod doesn not contain epc data") + + self.prepared_epc["floor-level"] = ( + FLOOR_LEVEL_MAP[self.prepared_epc["floor-level"]] if + self.prepared_epc["floor-level"] not in DATA_ANOMALY_MATCHES else None + ) + + def _clean_number_lighting_outlets(self): + """ + This method will clean the number of lighting outlets, if empty or invalid + """ + if not self.prepared_epc: + raise ValueError("EPC Recrod doesn not contain epc data") + + if self.prepared_epc["fixed-lighting-outlets-count"] == "": + + # We check old EPCs and the full SAP EPC + + lighting_data = [] + + if len(self.old_data): + lighting_data.extend([ + int(old_record["fixed-lighting-outlets-count"]) for old_record in self.old_data if + old_record["fixed-lighting-outlets-count"] != "" + ]) + + if len(self.full_sap_epc): + if self.full_sap_epc["fixed-lighting-outlets-count"] != "": + lighting_data.append(int(self.full_sap_epc["fixed-lighting-outlets-count"])) + + if lighting_data: + self.prepared_epc["fixed-lighting-outlets-count"] = round(np.median(lighting_data)) + else: + # Use averages from the cleaning dataset, based on the property type, built form, construction age band and local authority + cleaned_property_data = EPCDataProcessor.apply_averages_cleaning( + data_to_clean=self.epc_record_as_dataframe("prepared_epc", replace_empty_string=True), + cleaning_data=self.cleaning_data, + cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'], + ) + self.prepared_epc["fixed-lighting-outlets-count"] = round(cleaned_property_data["FIXED_LIGHTING_OUTLETS_COUNT"].values[0]) + else: + self.prepared_epc["fixed-lighting-outlets-count"] = float(self.prepared_epc["fixed-lighting-outlets-count"]) + + def _filter_property_dimensions(self, property_dimensions): + """ + Will filter the property dimensions dataframe to only include the relevant rows for the property + :param property_dimensions: + :return: filtered property dimensions dataframe + """ + + result = property_dimensions[(property_dimensions["PROPERTY_TYPE"] == self.prepared_epc["property-type"])] + + if self.construction_age_band is not None and self.construction_age_band not in DATA_ANOMALY_MATCHES: + result = result[(result["CONSTRUCTION_AGE_BAND"] == self.construction_age_band)] + + if self.prepared_epc["built-form"] not in DATA_ANOMALY_MATCHES and self.prepared_epc["built-form"] in result["BUILT_FORM"]: + result = result[(result["BUILT_FORM"] == self.prepared_epc["built-form"])] + + return result[["NUMBER_HABITABLE_ROOMS", "TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]].mean() + + def _clean_property_dimensions(self): + """ + Cleans up the number of floors, number of habitable rooms, and the floor height + """ + + if not self.prepared_epc: + raise ValueError("EPC Recrod doesn not contain epc data") + + if not self.prepared_epc["number-habitable-rooms"] or ( + self.prepared_epc["floor-height"] == "" or self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES + ): + property_dimensions = read_dataframe_from_s3_parquet( + bucket_name=DATA_BUCKET, file_key=f"property_dimensions/{self.prepared_epc['local-authority']}.parquet" + ) + self.property_dimensions = self._filter_property_dimensions(property_dimensions) + + if not self.prepared_epc["number-habitable-rooms"]: + self.prepared_epc["number-habitable-rooms"] = float(self.property_dimensions["NUMBER_HABITABLE_ROOMS"].round()) + else: + self.prepared_epc["number-habitable-rooms"] = float(self.prepared_epc["number-habitable-rooms"]) + + if self.prepared_epc["property-type"] == "House": + self.number_of_floors = 2 + elif self.prepared_epc["property-type"] in ["Flat", "Bungalow"]: + self.number_of_floors = 1 + elif self.prepared_epc["property-type"] == "Maisonette": + self.number_of_floors = 2 + else: + raise NotImplementedError("Implement me") + + if self.prepared_epc["floor-height"] == "" or self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES: + self.prepared_epc["floor-height"] = float(self.property_dimensions["FLOOR_HEIGHT"].round(2)) + else: + self.prepared_epc["floor-height"] = float(self.prepared_epc["floor-height"]) + + def _clean_floor_area(self): + """ + This method will clean the floor area, if empty or invalid + """ + if not self.prepared_epc: + raise ValueError("EPC Recrod doesn not contain epc data") + + self.prepared_epc["total-floor-area"] = float(self.prepared_epc["total-floor-area"]) + + def _clean_mains_gas(self): + """ + This method will clean the mains gas, if empty or invalid + """ + if not self.prepared_epc: + raise ValueError("EPC Recrod doesn not contain epc data") + + map = { + "Y": True, + "N": False, + } + + self.prepared_epc["mains-gas-flag"] = None if ( + self.prepared_epc["mains-gas-flag"] == "" or self.prepared_epc["mains-gas-flag"] in DATA_ANOMALY_MATCHES + ) else map[self.prepared_epc["mains-gas-flag"]] + + def _clean_heat_loss_corridor(self): + """ + This method will clean the heat loss corridor, if empty or invalid + """ + if not self.prepared_epc: + raise ValueError("EPC Recrod doesn not contain epc data") + + map = { + "no corridor": False, + "unheated corridor": True, + "heated corridor": False + } + + self.prepared_epc["heat-loss-corridor"] = False if self.prepared_epc["heat-loss-corridor"] in DATA_ANOMALY_MATCHES else map[self.prepared_epc["heat-loss-corridor"]] + + self.prepared_epc["unheated-corridor-length"] = float(self.prepared_epc["unheated-corridor-length"]) if self.prepared_epc["unheated-corridor-length"] != "" else None + + + def _clean_count_variables(self): + """ + This method will clean the count variables, if empty or invalid + """ + if not self.prepared_epc: + raise ValueError("EPC Recrod doesn not contain epc data") + + fields = { + "number_of_open_fireplaces": "number-open-fireplaces", + "number_of_extensions": "extension-count", + "number_of_storeys": "flat-storey-count", + "number_of_rooms": "number-habitable-rooms", + } + + null_attributes = ["number_of_storeys", "number_of_rooms"] + + for attribute, epc_field in fields.items(): + # TODO: check this + # value = self.data["extension-count"] + value = self.prepared_epc[epc_field] + if value == "" or value in DATA_ANOMALY_MATCHES: + if attribute in null_attributes: + value = None + else: + value = 0 + else: + value = int(value) + + self.prepared_epc[attribute] = value + + def _clean_wind_turbine(self): + """ + This method will clean the wind turbine, if empty or invalid + """ + if not self.prepared_epc: + raise ValueError("EPC Recrod doesn not contain epc data") + + self.prepared_epc['wind-turbine-count'] = int(self.prepared_epc['wind-turbine-count']) if self.prepared_epc['wind-turbine-count'] != "" else None + + def _clean_solar_hot_water(self): + """ + This method will clean the solar hot water, if empty or invalid + """ + if not self.prepared_epc: + raise ValueError("EPC Recrod doesn not contain epc data") + + value_map = { + "Y": True, + "N": False, + "": None, + } + + self.prepared_epc['solar-water-heating-flag'] = value_map[self.prepared_epc['solar-water-heating-flag']] + + def _clean_solar_pv(self): + """ + This method will clean the solar pv, if empty or invalid + """ + if not self.prepared_epc: + raise ValueError("EPC Recrod doesn not contain epc data") + + self.prepared_epc['photo-supply'] = float(self.prepared_epc['photo-supply']) if self.prepared_epc['photo-supply'] != "" else None + + def _clean_energy(self): + """ + This method will clean the energy, if empty or invalid + """ + if not self.prepared_epc: + raise ValueError("EPC Recrod doesn not contain epc data") + + self.prepared_epc['energy-consumption-current'] = float(self.prepared_epc["energy-consumption-current"]) + self.prepared_epc['co2-emissions-current'] = float(self.prepared_epc["co2-emissions-current"]) + + + def _clean_built_form(self): + """ + This method will clean the build form, if empty or invalid + """ + if not self.prepared_epc: + raise ValueError("EPC Recrod doesn not contain epc data") + + self.prepared_epc['built-form'] = BUILT_FORM_REMAP.get(self.prepared_epc["built-form"], self.prepared_epc["built-form"]) + if self.prepared_epc["built-form"] in DATA_ANOMALY_MATCHES: + if self.prepared_epc["property-type"] == "Flat": + self.prepared_epc["built-form"] = "Semi-Detached" + + def _clean_age_band(self): + """ + This method will clean the age band, if empty or invalid + """ + if not self.prepared_epc: + raise ValueError("EPC Recrod doesn not contain epc data") + + self.construction_age_band = EPCDataProcessor.clean_construction_age_band(self.prepared_epc["construction-age-band"]) + if self.construction_age_band in DATA_ANOMALY_MATCHES: + if self.old_data: + # Take the most recent + max_datetime = max( + [old_record["lodgement-datetime"] for old_record in self.old_data if + old_record["construction-age-band"] not in DATA_ANOMALY_MATCHES] + ) + most_recent = [old_record for old_record in self.old_data if old_record["lodgement-datetime"] == max_datetime] + + self.construction_age_band = EPCDataProcessor.clean_construction_age_band( + most_recent[0]["construction-age-band"] + ) + + self.age_band = england_wales_age_band_lookup.get(self.construction_age_band) + + if (self.prepared_epc["transaction-type"] == "new dwelling") and (self.age_band is None): + self.age_band = "L" + self.construction_age_band = 'England and Wales: 2012 onwards' + + if self.age_band is None: + raise ValueError("age_band is missing") + + def _clean_year_built(self): + """ + This method will clean the year built, if empty or invalid + """ + if self.full_sap_epc: + self.year_built = datetime.strptime(self.full_sap_epc["lodgement-date"], '%Y-%m-%d').year + + return + + if self.construction_age_band not in DATA_ANOMALY_MATCHES: + # Take the lower limit. If we're pessimistic about the age of the property, that at least means we have + # more options for recommendations if that age falls before the year that insulation in walls became + # common practice + band = [int(x) for x in re.findall(r'\b\d{4}\b', self.prepared_epc["construction-age-band"])] + self.year_built = band[0] + return + + # We don't know when the property was built + self.year_built = None + + def _clean_ventilation(self): + """ + This method will clean the ventilation, if empty or invalid + """ + self.prepared_epc['mechanical-ventilation'] = None if (self.mechanical_ventilation == "" or self.mechanical_ventilation in DATA_ANOMALY_MATCHES) else self.mechanical_ventilation + def _field_validation(self): """ diff --git a/etl/epc/property_change_app.py b/etl/epc/property_change_app.py index bb24f431..b4befcd7 100644 --- a/etl/epc/property_change_app.py +++ b/etl/epc/property_change_app.py @@ -1,4 +1,4 @@ - +import pandas as pd from pathlib import Path from etl.epc.DataProcessor import EPCDataProcessor from etl.epc.Pipeline import EPCPipeline @@ -11,18 +11,17 @@ def main(): """ directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] - directories = directories[0:2] + # directories = directories[125:128] epc_pipeline = EPCPipeline(directories=directories, epc_data_processor=EPCDataProcessor(run_mode="training")) epc_pipeline.run() - # For testing - epc_pipeline.compiled_dataset - epc_pipeline.compiled_all_equal_rows - import pandas as pd - pd.concat(epc_pipeline.compiled_cleaning_averages) + dataset_df = epc_pipeline.compiled_dataset + dataset_df.to_parquet("refactor_datasets/dataset.parquet") + pd.DataFrame(epc_pipeline.compiled_all_equal_rows).to_parquet("refactor_datasets/all_equal_rows.parquet") + pd.concat(epc_pipeline.compiled_cleaning_averages).to_parquet("refactor_datasets/cleaning_averages.parquet") if __name__ == "__main__": diff --git a/etl/epc/settings.py b/etl/epc/settings.py index eb8eb641..24c23ebc 100644 --- a/etl/epc/settings.py +++ b/etl/epc/settings.py @@ -2,6 +2,59 @@ # TODO: migrate to dynaconf from pathlib import Path +DATA_ANOMALY_MATCHES = { + # Invalid reports are where the value provided is out of bounds, e.g. a negative energy rating of -1199 or a + # non-integer, there is no valid energy band for this, so it is marked as INVALID! + "INVALID", + "INVALID!", + # When the energy certificate was first lodged on the register there was no requirement to lodge this data + # item, i.e. a non-mandatory item. + "NO DATA!", + "NODATA!", + # When the energy certificate was first lodged on the register there was no requirement to lodge this data item, + # i.e.a non - mandatory item. + "N/A", + # A value generated by the register to account for a data item that was not mandatory when the lodgement of + # the energy certificate occurred. When the data item became mandatory the register operator, for backwards + # compatibility purposes, populated the data field with a value of ‘not recorded’ to ensure that the energy + # certificate retrieval process is successfully completed. Mandatory data items cannot be applied + # retrospectively to energy certificates lodged before the date of the change. + "Not recorded", + # The data also contains DECs with an operational rating of ‘9999’ (a ‘default’ DEC). The production of a + # ‘default’ DEC value was allowed to enable building occupiers, with poor quality or no energy data, + # the opportunity to comply with the regulations. From April 2011 the ability to lodge a ‘default’ DEC was no + # longer allowed. + "9999", + # The Building Emission Rate (BER) data field for non-domestic buildings may contain a ‘blank’ value. The BER + # was only lodged on the register from 7 March 2010. + "Blank" + # There are currently just over 8,600 records where the local authority identifier is ‘null’. This is due to + # the Register Operator not being able to match the building address in the Markermap Ordinance Survey (GB) + # lookup tables or OS MasterMap Address Layer 2 data. The majority of these addresses have been requested + # manually by energy assessors for inclusion by the Register Operator in the registers (e.g. new builds, + # etc). These records are being published for completeness. An ongoing process to manage these manually added + # addresses will take time to develop to deal with these and future anomalies. + # + # There are several fields within the lodged data where it is possible to enter multiple entries to cater for + # different data_types of build within a single property, i.e. extensions. This results in multiple entries for + # the description fields for floor, roof and wall. For the purposes of this data release only the information + # contained within the first of these multiple entries is being provided. As there are no restrictions on the + # value in this first field it means that sometimes the first field in a multiple entry description field may + # contain a ‘null’ value. A resolution to correct these anomalies will be considered for future data releases. + "NULL", + # We sometimes see fields populated with just an empty string. + "" +} + +DATA_ANOMALY_SUBSTRINGS = { + # Where values in a ‘pick’ list that have been superseded by another value. For example, where a value for + # ‘pitched roof’ has been replaced by three sub-categories of pitched roof. The original value is retained + # but ‘for backward compatibility only’ it is appended to ensure that the energy certificate retrieval + # process can be successfully completed. Replacement data items cannot be applied retrospectively to energy + # certificates lodged on the register before the date of the change. + "for backward compatibility only" +} + METRIC_FILENAME = "metrics.csv" OPTIMISE_METRIC = "mean_absolute_error"