refactored the model recommendations dataset

This commit is contained in:
Michael Duong 2024-01-16 16:40:10 +00:00
parent 069b59f2ce
commit 61dd26f23e
9 changed files with 898 additions and 486 deletions

View file

@ -5,8 +5,9 @@ import os
import numpy as np
import pandas as pd
from etl.epc.DataProcessor import DataProcessor
from etl.epc.settings import POTENTIAL_COLUMNS, EFFICIENCY_FEATURES, BUILT_FORM_REMAP
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.Dataset import TrainingDataset
from etl.epc.settings import LATEST_FIELD, MANDATORY_FIXED_FEATURES, POTENTIAL_COLUMNS, EFFICIENCY_FEATURES, BUILT_FORM_REMAP
from etl.epc_clean.epc_attributes.all_cleaners import all_cleaner_map
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet
@ -17,6 +18,7 @@ from recommendations.recommendation_utils import (
estimate_perimeter, get_wall_type, estimate_external_wall_area, esimtate_pitched_roof_area
)
ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev')
EPC_AUTH_TOKEN = os.environ.get('EPC_AUTH_TOKEN')
DATA_BUCKET = os.environ.get('DATA_BUCKET', 'retrofit-data-dev' if ENVIRONMENT == 'dev' else None)
@ -49,216 +51,201 @@ class Property(Definitions):
spatial = None
def __init__(self, id, postcode, address1, epc_client=None, data=None):
def __init__(self, id, postcode, address1, epc_record, data=None):
self.epc_record = epc_record
self.id = id
self.postcode = postcode
self.address1 = address1
self.data = data
self.old_data = None
self.data = {k.replace("_", "-"): v for k,v in epc_record.get("prepared_epc").items()}
self.old_data = epc_record.get("old_data")
self.property_dimensions = None
self.uprn = None
self.full_sap_epc = None
self.uprn = epc_record.get("uprn")
self.full_sap_epc = epc_record.get("full_sap_epc")
self.in_conservation_area, self.is_listed, self.is_heritage = None, None, None
self.restricted_measures = False
self.year_built = None
self.number_of_rooms = None
self.age_band = None
self.construction_age_band = None
self.number_of_floors = None
self.year_built = epc_record.get("year_built")
self.number_of_rooms = epc_record.prepared_epc.get("number_of_rooms")
self.age_band = epc_record.get("age_band")
self.construction_age_band = epc_record.get("construction_age_band")
self.number_of_floors = epc_record.get("number_of_floors")
self.perimeter = None
self.wall_type = None
self.floor_type = None
self.energy = None
self.ventilation = None
self.solar_pv = None
self.solar_hot_water = None
self.wind_turbine = None
self.number_of_open_fireplaces = None
self.number_of_extensions = None
self.number_of_storeys = None
self.heat_loss_corridor = None
self.mains_gas = None
self.floor_height = None
self.energy = {
"primary_energy_consumption": epc_record.get("energy_consumption_current"),
"co2_emissions": epc_record.get("co2_emissions_current"),
}
self.ventilation = {
"ventilation": epc_record.get("mechanical_ventilation"),
}
self.solar_pv = {
"solar_pv": epc_record.get("photo_supply"),
}
self.solar_hot_water = {
"solar_hot_water": epc_record.get("solar_water_heating_flag"),
}
self.wind_turbine = {
"wind_turbine": epc_record.prepared_epc.get("wind_turbine_count"),
}
self.number_of_open_fireplaces = {
"number_of_open_fireplaces": epc_record.prepared_epc.get("number_of_open_fireplaces"),
}
self.number_of_extensions = {
"number_of_extensions": epc_record.prepared_epc.get("number_of_extensions"),
}
self.number_of_storeys = {
"number_of_storeys": epc_record.prepared_epc.get("number_of_storeys"),
}
self.heat_loss_corridor = {
"heat_loss_corridor": epc_record.prepared_epc.get("heat_loss_corridor"),
"length": epc_record.prepared_epc.get("unheated_corridor_length"),
}
self.mains_gas = epc_record.prepared_epc.get('mains_gas_flag')
self.floor_height = epc_record.prepared_epc.get('floor_height')
self.insulation_wall_area = None
self.floor_area = None
self.floor_area = epc_record.prepared_epc.get('total_floor_area')
self.pitched_roof_area = None
self.insulation_floor_area = None
self.number_lighting_outlets = None
self.number_lighting_outlets = epc_record.prepared_epc.get("fixed_lighting_outlets_count")
self.floor_level = None
self.current_adjusted_energy = None
self.expected_adjusted_energy = None
if epc_client:
self.epc_client = epc_client
self.recommendations_scoring_data = []
def create_base_difference_epc_record(self, cleaned_lookup: dict):
"""
Creates a EPCDifferenceRecord object, which is used to store the difference between the current and
expected EPC
It will be the same starting and ending EPC, as we don't have the expected EPC yet
"""
difference_record = self.epc_record - self.epc_record
# TODO: change these lower and replace in the settings file
fixed_data_col_names = MANDATORY_FIXED_FEATURES + LATEST_FIELD
print("NEED TO CHANGE THE DASH TO LOWER CASE")
fixed_data_col_names = [x.lower().replace("_", "-") for x in fixed_data_col_names]
fixed_data = {k.replace("-", "_"):v for k,v in self.data.items() if k in fixed_data_col_names}
difference_record.append_fixed_data(fixed_data)
self.base_difference_record = TrainingDataset(datasets=[difference_record], cleaned_lookup=cleaned_lookup)
# TODO: adjust the base difference record with the previously calculated u values + features
# estimated_perimeter is different to the perimeter in the epc record
# self.base_difference_record.df
def adjust_difference_record_with_recommendations(self, property_recommendations):
"""
This method will adjust the difference record, based on the recommendations made for the property
:param recommendations: dictionary of recommendations for the property
:return:
"""
for recommendations_by_type in property_recommendations:
for i, rec in enumerate(recommendations_by_type):
scoring_dict = self.create_recommendation_scoring_data(
recommendation=rec,
)
scoring_dict['id'] = "+".join([str(self.id), str(rec["recommendation_id"])])
self.recommendations_scoring_data.append(scoring_dict)
def create_recommendation_scoring_data(self, recommendation: dict):
recommendation_record = self.base_difference_record.df.to_dict("records")[0].copy()
for col in [
"walls_insulation_thickness", "floor_insulation_thickness", "roof_insulation_thickness"
]:
if recommendation_record[col] is None:
recommendation_record[col] = "none"
# We update the description to indicate it's insulated
if recommendation["type"] in ["internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"]:
# The upgrade made here is to the u-value of the walls and the description of the
# insulation thickness
recommendation_record["walls_thermal_transmittance_ending"] = recommendation["new_u_value"]
recommendation_record["walls_insulation_thickness_ending"] = "above average"
recommendation_record["walls_energy_eff_ending"] = "Good"
else:
self.epc_client = EpcClient(auth_token=EPC_AUTH_TOKEN)
if recommendation_record["walls_thermal_transmittance_ending"] is None:
raise ValueError("We should not have a None value for the u value")
def search_address_epc(self):
"""
This method searches for an address in the EPC database and returns the first result
:return: property data
"""
if self.data:
return
if recommendation_record["walls_insulation_thickness_ending"] is None:
recommendation_record["walls_insulation_thickness_ending"] = "none"
# This will fail if a property does not have an EPC - this has been documented as a case to handle
response = self.epc_client.domestic.search(params={"address": self.address1, "postcode": self.postcode})
# Update description to indicate it's insulate
if recommendation["type"] in ["solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation"]:
if len(recommendation["parts"]) > 1:
raise NotImplementedError("Have more than 1 floor insulation part - handle this case")
# Check if we have a full sap EPC
self.full_sap_epc = [r for r in response["rows"] if r["transaction-type"] == "new dwelling"]
self.full_sap_epc = self.full_sap_epc[0] if self.full_sap_epc else self.full_sap_epc
recommendation_record["floor_thermal_transmittance_ending"] = recommendation["new_u_value"]
# We don't really see above average for this in the training data
recommendation_record["floor_insulation_thickness_ending"] = "average"
recommendation_record["floor_energy_eff_ending"] = "Good"
else:
if recommendation_record["floor_thermal_transmittance_ending"] is None:
raise ValueError("We should not have a None value for the u value")
if len(response["rows"]) > 1:
newest_response = [
r for r in response["rows"] if
r["lodgement-datetime"] == max([x["lodgement-datetime"] for x in response["rows"]])
if recommendation_record["floor_insulation_thickness_ending"] is None:
recommendation_record["floor_insulation_thickness_ending"] = "none"
if recommendation["type"] in ["loft_insulation", "room_roof_insulation", "flat_roof_insulation"]:
recommendation_record["roof_thermal_transmittance_ending"] = recommendation["new_u_value"]
parts = recommendation["parts"]
if len(parts) != 1:
raise ValueError("More than one part for roof insulation - investiage me")
# This is based on the values we have in the training data
valid_numeric_values = [
12, 25, 50, 75, 100, 150, 200, 250, 270, 300, 350, 400
]
if len(newest_response) > 1:
raise Exception("More than one result found for this address - investigate me")
# We'll keep old EPCs in case it contains information, not present on the newest one
self.old_data = [epc for epc in response["rows"] if epc["lmk-key"] != newest_response[0]["lmk-key"]]
proposed_depth = int(parts[0]["depth"])
if proposed_depth not in valid_numeric_values:
# Take the nearest value for scoring
proposed_depth = min(valid_numeric_values, key=lambda x: abs(x - proposed_depth))
response["rows"] = newest_response
self.data = response["rows"][0]
# For the moment, if we don't have a UPRN, we don't do anything about it, however we'll handle this in
# the future by using the Ordnance Survey places API
if not self.data["uprn"]:
logger.warning("We do not have a UPRN for this property")
recommendation_record["roof_insulation_thickness_ending"] = str(proposed_depth)
recommendation_record["roof_energy_eff_ending"] = "Very Good"
else:
self.uprn = int(self.data["uprn"])
# Fill missing roof u-values - this fill is not based on recommended upgrades
if recommendation_record["roof_thermal_transmittance_ending"] is None:
raise ValueError("We should not have a None value for the u value")
def set_energy(self):
"""
Extracts and formats data about the home's energy and co2 consumption
To being with, this is just formatting epc data
if recommendation_record["roof_insulation_thickness_ending"] is None:
recommendation_record["roof_insulation_thickness_ending"] = "none"
Data:
- primary_energy_consumption
This is based on the "energy-consumption-current" field in the EPC data.
Current estimated total energy consumption for the property in a 12 month period (kWh/m2). Displayed on EPC
as the current primary energy use per square metre of floor area.
if recommendation["type"] == "mechanical_ventilation":
recommendation_record["mechanical_ventilation_ending"] = 'mechanical, extract only'
- co2_emissions
This is based on the "co2-emissions-current" field in the EPC data.
CO₂ emissions per year in tonnes/year.
"""
if recommendation["type"] == "sealing_open_fireplace":
recommendation_record["number_open_fireplaces_ending"] = 0
self.energy = {
"primary_energy_consumption": float(self.data["energy-consumption-current"]),
"co2_emissions": float(self.data["co2-emissions-current"]),
}
if recommendation["type"] == "low_energy_lighting":
recommendation_record["low_energy_lighting_ending"] = 100
recommendation_record["lighting_energy_eff_starting"] = "Very Good"
def set_ventilation(self):
"""
Extracts and formats data about the home's ventilation
To being with, this is just formatting epc data
if recommendation["type"] not in [
"mechanical_ventilation", "sealing_open_fireplace", "low_energy_lighting",
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
"loft_insulation", "room_roof_insulation", "flat_roof_insulation",
"solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation"
]:
raise NotImplementedError("Implement me")
Data:
- ventilation
This is based on the "ventilation-type" field in the EPC data.
Ventilation type of the property.
"""
return recommendation_record
ventilation = self.data["mechanical-ventilation"]
# perform some simple cleaning - when checking 300k epc, the only unique values were
# {'', 'mechanical, supply and extract', 'NO DATA!', 'natural', 'mechanical, extract only'}
if ventilation in self.DATA_ANOMALY_MATCHES or ventilation in [""]:
ventilation = None
self.ventilation = {
"ventilation": ventilation,
}
def set_solar_pv(self):
"""
Extracts and formats data about the home's solar pv
To being with, this is just formatting epc data
Data:
- solar_pv
This is based on the "photo-supply" field in the EPC data.
When checking 100k epc, either the value was "" or a stringified number
"""
solar_pv = self.data["photo-supply"]
if solar_pv == "":
solar_pv = None
else:
solar_pv = float(solar_pv)
self.solar_pv = {
"solar_pv": solar_pv,
}
def set_solar_hot_water(self):
"""
Extracts and formats data about the home's solar hot water
We are just formatting the solar-water-heating-flag in the epc data
:return:
"""
value_map = {
"Y": True,
"N": False,
"": None,
}
self.solar_hot_water = {
"solar_hot_water": value_map[self.data["solar-water-heating-flag"]],
}
def set_wind_turbine(self):
"""
Extracts and formats data about the home's wind turbine
We are just formatting the wind-turbine-flag in the epc data
:return:
"""
wind_turbine_count = self.data["wind-turbine-count"]
if wind_turbine_count == "":
wind_turbine_count = None
else:
wind_turbine_count = int(wind_turbine_count)
self.wind_turbine = {
"wind_turbine": wind_turbine_count,
}
def set_count_variables(self):
"""
For EPC fields that are just counts, we'll set them here
These are fields that are integers but may contain additional values such as "" so we can't do a direct
conversion straight to an integer
:return:
"""
fields = {
"number_of_open_fireplaces": "number-open-fireplaces",
"number_of_extensions": "extension-count",
"number_of_storeys": "flat-storey-count",
"number_of_rooms": "number-habitable-rooms",
}
null_attributes = ["number_of_storeys", "number_of_rooms"]
for attribute, epc_field in fields.items():
value = self.data["extension-count"]
if value == "" or value in self.DATA_ANOMALY_MATCHES:
if attribute in null_attributes:
value = None
else:
value = 0
else:
value = int(value)
setattr(self, attribute, value)
def get_components(self, cleaned):
"""
@ -274,27 +261,9 @@ class Property(Definitions):
if not self.data:
raise ValueError("Property does not contain data")
# We need to implement an EPC cleaning process, which we run on the EPC data, immediately after we download
# it
self.data["built-form"] = BUILT_FORM_REMAP.get(self.data["built-form"], self.data["built-form"])
if self.data["built-form"] in self.DATA_ANOMALY_MATCHES:
if self.data["property-type"] == "Flat":
self.data["built-form"] = "Semi-Detached"
self.set_energy()
self.set_ventilation()
self.set_solar_pv()
self.set_solar_hot_water()
self.set_wind_turbine()
self.set_count_variables()
self.set_heat_loss_corridor()
self.set_mains_gas()
self.set_age_band()
self.set_basic_property_dimensions()
for description, attribute in cleaned.items():
if self.data[description] in self.DATA_ANOMALY_MATCHES:
template = cleaned[description][0]
fill_dict = dict(zip(template.keys(), [None] * len(template)))
@ -333,38 +302,6 @@ class Property(Definitions):
self.set_floor_type()
self.set_floor_level()
def set_age_band(self):
"""
Sets a cleaned version of the age band of the property given the EPC data
:return:
"""
if not self.data:
raise ValueError("Property does not contain data")
self.construction_age_band = DataProcessor.clean_construction_age_band(self.data["construction-age-band"])
if self.construction_age_band in self.DATA_ANOMALY_MATCHES:
if self.old_data:
# Take the most recent
max_datetime = max(
[x["lodgement-datetime"] for x in self.old_data if
x["construction-age-band"] not in self.DATA_ANOMALY_MATCHES]
)
most_recent = [x for x in self.old_data if x["lodgement-datetime"] == max_datetime]
self.construction_age_band = DataProcessor.clean_construction_age_band(
most_recent[0]["construction-age-band"]
)
self.age_band = england_wales_age_band_lookup.get(self.construction_age_band)
if (self.data["transaction-type"] == "new dwelling") and (self.age_band is None):
self.age_band = "L"
self.construction_age_band = 'England and Wales: 2012 onwards'
if self.age_band is None:
raise ValueError("age_band is missing")
def set_spatial(self, spatial: pd.DataFrame):
"""
Sets whether the property is in a conservation area given the output of the ConservationAreaClient
@ -392,71 +329,6 @@ class Property(Definitions):
"is_heritage_building": spatial_dict["is_heritage_building"],
}
def set_year_built(self):
"""
Estimates when the property was built based on as much available data as possible.
"""
if self.full_sap_epc:
self.year_built = datetime.strptime(self.full_sap_epc["lodgement-date"], '%Y-%m-%d').year
return
if self.data["construction-age-band"] not in self.DATA_ANOMALY_MATCHES:
# Take the lower limit. If we're pessimistic about the age of the property, that at least means we have
# more options for recommendations if that age falls before the year that insulation in walls became
# common practice
band = [int(x) for x in re.findall(r'\b\d{4}\b', self.data["construction-age-band"])]
self.year_built = band[0]
return
# We don't know when the property was built
self.year_built = None
def set_heat_loss_corridor(self):
"""
cleans the heat-loss-corridor
:return:
"""
map = {
"no corridor": False,
"unheated corridor": True,
"heated corridor": False
}
if self.data["heat-loss-corridor"] in self.DATA_ANOMALY_MATCHES:
has_heat_loss_corridor = False
else:
has_heat_loss_corridor = map[self.data["heat-loss-corridor"]]
length = self.data["unheated-corridor-length"]
if length == "":
length = None
else:
length = float(length)
self.heat_loss_corridor = {
"heat_loss_corridor": has_heat_loss_corridor,
"length": length
}
def set_mains_gas(self):
"""
Sets whether the property has mains gas
:return:
"""
map = {
"Y": True,
"N": False,
}
if self.data["mains-gas-flag"] == "" or self.data["mains-gas-flag"] in self.DATA_ANOMALY_MATCHES:
self.mains_gas = None
else:
self.mains_gas = map[self.data["mains-gas-flag"]]
def _clean_upload_data(self, to_update):
for k, v in to_update.items():
if v in self.DATA_ANOMALY_MATCHES:
@ -603,36 +475,6 @@ class Property(Definitions):
:return:
"""
self.floor_area = float(self.data["total-floor-area"])
if not self.data["number-habitable-rooms"] or (
self.data["floor-height"] == "" or self.data["floor-height"] in self.DATA_ANOMALY_MATCHES
):
if self.property_dimensions is None:
property_dimensions = read_dataframe_from_s3_parquet(
bucket_name=DATA_BUCKET, file_key=f"property_dimensions/{self.data['local-authority']}.parquet"
)
self.property_dimensions = self._filter_property_dimensions(property_dimensions)
if not self.data["number-habitable-rooms"]:
self.number_of_rooms = float(self.property_dimensions["NUMBER_HABITABLE_ROOMS"].round())
else:
self.number_of_rooms = float(self.data["number-habitable-rooms"])
if self.data["property-type"] == "House":
self.number_of_floors = 2
elif self.data["property-type"] in ["Flat", "Bungalow"]:
self.number_of_floors = 1
elif self.data["property-type"] == "Maisonette":
self.number_of_floors = 2
else:
raise NotImplementedError("Implement me")
if self.data["floor-height"] == "" or self.data["floor-height"] in self.DATA_ANOMALY_MATCHES:
self.floor_height = float(self.property_dimensions["FLOOR_HEIGHT"].round(2))
else:
self.floor_height = float(self.data["floor-height"])
self.perimeter = estimate_perimeter(
self.floor_area / self.number_of_floors, self.number_of_rooms / self.number_of_floors
)
@ -653,7 +495,8 @@ class Property(Definitions):
def set_floor_level(self):
self.floor_level = (
FLOOR_LEVEL_MAP[self.data["floor-level"]] if
self.data["floor-level"] not in self.DATA_ANOMALY_MATCHES else None
self.data["floor-level"] not in self.DATA_ANOMALY_MATCHES and self.data['floor-level'] is not None
else None
)
if self.floor_level is None:
@ -798,7 +641,7 @@ class Property(Definitions):
**hotwater,
**windows,
"SECONDHEAT_DESCRIPTION": second_heating,
"DAYS_TO": DataProcessor.calculate_days_to(self.data["lodgement-date"]),
"DAYS_TO": EPCDataProcessor.calculate_days_to(self.data["lodgement-date"]),
"SAP": float(self.data["current-energy-efficiency"]),
"CARBON": float(self.data["co2-emissions-current"]),
"HEAT_DEMAND": float(self.data["energy-consumption-current"]),

View file

@ -18,6 +18,8 @@ class MaterialType(enum.Enum):
exposed_floor_insulation = "exposed_floor_insulation"
flat_roof_insulation = "flat_roof_insulation"
room_roof_insulation = "room_roof_insulation"
windows_glazing = "windows_glazing"
iwi_wall_demolition = "iwi_wall_demolition"
iwi_vapour_barrier = "iwi_vapour_barrier"

View file

@ -3,6 +3,7 @@ from datetime import datetime
import numpy as np
import pandas as pd
from epc_api.client import EpcClient
from etl.epc.Record import EPCRecord
from fastapi import APIRouter, Depends
from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.orm import sessionmaker
@ -27,7 +28,7 @@ from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, read_par
from backend.ml_models.api import ModelApi
from backend.Property import Property
from etl.epc.DataProcessor import DataProcessor
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.settings import COLUMNS_TO_MERGE_ON
from recommendations.optimiser.CostOptimiser import CostOptimiser
from recommendations.optimiser.GainOptimiser import GainOptimiser
@ -42,6 +43,54 @@ logger = setup_logger()
BATCH_SIZE = 5
class DummyDownloader:
def __init__(self, postcode, address1, id, epc_client):
self.id = id
self.postcode = postcode
self.address1 = address1
self.data = None
self.old_data = None
self.epc_client = epc_client
def search_address_epc(self):
"""
This method searches for an address in the EPC database and returns the first result
:return: property data
"""
if self.data:
return
# This will fail if a property does not have an EPC - this has been documented as a case to handle
response = self.epc_client.domestic.search(params={"address": self.address1, "postcode": self.postcode})
# Check if we have a full sap EPC
self.full_sap_epc = [r for r in response["rows"] if r["transaction-type"] == "new dwelling"]
self.full_sap_epc = self.full_sap_epc[0] if self.full_sap_epc else self.full_sap_epc
if len(response["rows"]) > 1:
newest_response = [
r for r in response["rows"] if
r["lodgement-datetime"] == max([x["lodgement-datetime"] for x in response["rows"]])
]
if len(newest_response) > 1:
raise Exception("More than one result found for this address - investigate me")
# We'll keep old EPCs in case it contains information, not present on the newest one
self.old_data = [epc for epc in response["rows"] if epc["lmk-key"] != newest_response[0]["lmk-key"]]
response["rows"] = newest_response
self.data = response["rows"][0]
# For the moment, if we don't have a UPRN, we don't do anything about it, however we'll handle this in
# the future by using the Ordnance Survey places API
if not self.data["uprn"]:
logger.warning("We do not have a UPRN for this property")
else:
self.uprn = int(self.data["uprn"])
router = APIRouter(
prefix="/plan",
tags=["plan"],
@ -49,16 +98,19 @@ router = APIRouter(
responses={404: {"description": "Not found"}}
)
# TODO: Need to install base.txt requirements into new env
@router.post("/trigger")
async def trigger_plan(body: PlanTriggerRequest):
logger.info("Connecting to db")
session = sessionmaker(bind=db_engine)()
# session = sessionmaker(bind=db_engine)()
created_at = datetime.now().isoformat()
try:
session.begin()
logger.info("Getting the inputs")
Body = {'portfolio_id': '56', 'housing_type': 'Social', 'goal': 'Increase EPC', 'goal_value': 'A', 'trigger_file_path': '8/56/windows_portfolio_inputs.csv'}
body = PlanTriggerRequest(**Body)
epc_client = EpcClient(auth_token=get_settings().EPC_AUTH_TOKEN)
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
uprn_filenames = read_dataframe_from_s3_parquet(
@ -69,6 +121,7 @@ async def trigger_plan(body: PlanTriggerRequest):
)
input_properties = []
for config in plan_input:
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
# TODO: implment validation. We should also standardise postcode and address in some fashion as
@ -90,23 +143,35 @@ async def trigger_plan(body: PlanTriggerRequest):
heat_demand_target=None
)
input_properties.append(
Property(
postcode=config['postcode'],
epc_downloader = DummyDownloader(id=0, epc_client=epc_client, postcode=config['postcode'], address1=config['address'])
epc_downloader.search_address_epc()
epc_records ={
'original_epc': epc_downloader.data.copy(),
'full_sap_epc': epc_downloader.full_sap_epc.copy() if epc_downloader.full_sap_epc else [],
'old_data': epc_downloader.old_data.copy() if epc_downloader.old_data else []
}
prepared_epc = EPCRecord(epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data) # This uses all the epc records to clean the data
p = Property(
id=property_id,
address1=config['address'],
epc_client=epc_client,
id=property_id
postcode=config['postcode'],
epc_record=prepared_epc,
)
logger.info("Getting spatial data")
p.get_spatial_data(uprn_filenames)
input_properties.append(
p
)
if not input_properties:
return Response(status_code=204)
logger.info("Getting EPC, and spatial data")
for p in input_properties:
p.search_address_epc()
p.set_year_built()
p.get_spatial_data(uprn_filenames)
if not input_properties:
return Response(status_code=204)
# The materials data could be cached or local so we don't need to make
# consistent requests to the backend for
@ -129,14 +194,6 @@ async def trigger_plan(body: PlanTriggerRequest):
# Property recommendations
p.get_components(cleaned)
# This is temp - this should happen after scoring
cleaned_property_data = DataProcessor.apply_averages_cleaning(
data_to_clean=pd.DataFrame([dict(**p.get_model_data(), LOCAL_AUTHORITY=p.data["local-authority"])]),
cleaning_data=cleaning_data,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
)
p.set_number_lighting_outlets(cleaned_property_data)
recommender = Recommendations(property_instance=p, materials=materials)
property_recommendations = recommender.recommend()
@ -145,75 +202,17 @@ async def trigger_plan(body: PlanTriggerRequest):
recommendations[p.id] = property_recommendations
from etl.epc.Pipeline import EPCPipeline
from etl.epc.DataProcessor import EPCDataProcessor
newdata_pipeline = EPCPipeline(epc_data_processor=EPCDataProcessor(data=p.get_model_data(), run_mode="newdata"), run_mode="newdata")
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
p.adjust_difference_record_with_recommendations(property_recommendations)
# TODO: p.get_model_data() -> EPCRecord
recommendations_scoring_data.extend(p.recommendations_scoring_data)
# Finally, we'll prepare data for predicting the impact on SAP
# data_processor = DataProcessor(None, newdata=True)
# data_processor.insert_data(pd.DataFrame([p.get_model_data()]))
# # TODO: Temp
# if data_processor.data["UPRN"].values[0] == "":
# data_processor.data["UPRN"] = 0
# data_processor.pre_process()
# starting_epc_data = data_processor.get_component_features(suffix="_STARTING")
# ending_epc_data = data_processor.get_component_features(suffix="_ENDING")
# fixed_data = data_processor.get_fixed_features()
# # We update the ending record with the recommended updates and we set lodgement date to today
# ending_epc_data["DAYS_TO_ENDING"] = data_processor.calculate_days_to(created_at)
# TODO: EPCRecord - AdjustedEPCRecord
# property_scoring_data[p.id] = {
# "starting_epc_data": starting_epc_data,
# "ending_epc_data": ending_epc_data,
# "fixed_data": fixed_data
# }
for recommendations_by_type in property_recommendations:
for i, rec in enumerate(recommendations_by_type):
scoring_dict = create_recommendation_scoring_data(
property=p,
recommendation=rec,
starting_epc_data=starting_epc_data,
ending_epc_data=ending_epc_data,
fixed_data=fixed_data,
)
recommendations_scoring_data.append(scoring_dict)
# cleanup
# del data_processor
logger.info("Preparing data for scoring in sap change api")
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
# Perform the same cleaning as in the model - first clean number of room variables though
recommendations_scoring_data = DataProcessor.apply_averages_cleaning(
data_to_clean=recommendations_scoring_data,
cleaning_data=cleaning_data,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
)
recommendations_scoring_data = DataProcessor.apply_averages_cleaning(
data_to_clean=recommendations_scoring_data,
cleaning_data=cleaning_data,
cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"],
).drop(columns=["LOCAL_AUTHORITY"])
recommendations_scoring_data = DataProcessor.clean_missings_after_description_process(
recommendations_scoring_data,
ignore_cols=[c for c in recommendations_scoring_data.columns if ("thermal_transmittance" in c) or (
"insulation_thickness" in c) or ("ENERGY_EFF" in c)]
)
recommendations_scoring_data = DataProcessor.clean_efficiency_variables(recommendations_scoring_data)
recommendations_scoring_data = recommendations_scoring_data.drop(
columns=["rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending", "carbon_ending"]
)
model_api = ModelApi(portfolio_id=body.portfolio_id, timestamp=created_at)
all_predictions = model_api.predict_all(

View file

@ -27,6 +27,13 @@ from recommendations.rdsap_tables import FLOOR_LEVEL_MAP
from typing import List
# TODO: change the setting columns to lower
STARTING_SUFFIX_COMPONENT_COLS = [x.lower() for x in STARTING_SUFFIX_COMPONENT_COLS]
NO_SUFFIX_COMPONENT_COLS = [x.lower() for x in NO_SUFFIX_COMPONENT_COLS]
ENDING_SUFFIX_COMPONENT_COLS = [x.lower() for x in ENDING_SUFFIX_COMPONENT_COLS]
POTENTIAL_COLUMNS = [x.lower() for x in POTENTIAL_COLUMNS]
# These lookups are used to clean the construction age band
construction_age_bounds_map = {
"England and Wales: before 1900": {"l": 0, "u": 1899},
@ -67,15 +74,18 @@ class EPCDataProcessor:
Handle data loading and data preprocessing
"""
def __init__(self, data: pd.DataFrame | None = None, run_mode: str = "training", violation_mode: bool = False) -> None:
def __init__(self, data: pd.DataFrame | None = None, cleaning_averages: pd.DataFrame | None = None, run_mode: str = "training", violation_mode: bool = False) -> None:
"""
:param filepath: If specified, is the physical location of the data
:param is_newdata: Indicates if we are processing new, testing data.
In this instance, there are some operations we do not
want to perform, such as confine_data()
"""
self.data : pd.DataFrame = data if data else pd.DataFrame()
self.cleaning_averages : pd.DataFrame = pd.DataFrame()
is_data_a_dataframe = isinstance(data, pd.DataFrame)
self.data : pd.DataFrame = data if is_data_a_dataframe else pd.DataFrame()
is_cleaning_averages_a_dataframe = isinstance(cleaning_averages, pd.DataFrame)
self.cleaning_averages : pd.DataFrame = cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame()
# FOR NOW IF VIOLATION MODE IS ON, WE USE RUN MODE AS NEWDATA
self.violation_mode = violation_mode
@ -124,13 +134,22 @@ class EPCDataProcessor:
self.fill_invalid_constituency_fields(ignore_step=ignore_step)
self.cleaning_averages = self.make_cleaning_averages(ignore_step=ignore_step)
self.make_cleaning_averages(ignore_step=ignore_step)
# TODO: check if this has impact on training dataset
cleaned_data = self.apply_averages_cleaning(
data_to_clean=self.data,
cleaning_data=self.cleaning_averages,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
)
cleaned_data = self.apply_averages_cleaning(
data_to_clean=self.data,
cleaning_data=self.cleaning_averages,
cols_to_merge_on=COLUMNS_TO_MERGE_ON,
ignore_step=ignore_step
)
self.data = self.data if cleaned_data is None else cleaned_data
self.add_local_authority_to_cleaning_average(ignore_step=ignore_step)
@ -549,7 +568,7 @@ class EPCDataProcessor:
# "FLOOR_HEIGHT"
# ].fillna(FLOOR_HEIGHT_NATIONAL_AVERAGE)
return cleaning_averages_filled
self.cleaning_averages = cleaning_averages_filled
def retain_multiple_epc_properties(self, epc_minimum_count: int = 1, ignore_step: bool = False) -> None:
"""
@ -765,8 +784,8 @@ class EPCDataProcessor:
:return: Pandas dataframe containing the subset of columns defined in COMPONENT_FEATURES
"""
if suffix not in ["_STARTING", "_ENDING"]:
raise Exception("Suffix should be one of _STARTING or _ENDING")
if suffix not in ["_starting", "_ending"]:
raise Exception("Suffix should be one of _starting or _ending")
if suffix == "_STARTING":
starting_cols = self.data[STARTING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES].copy().add_suffix(suffix)

View file

@ -2,12 +2,12 @@ import numpy as np
import pandas as pd
from typing import List
from etl.epc.Record import EPCDifferenceRecord
from ValidationConfiguration import DatasetValidationConfiguration
from etl.epc.ValidationConfiguration import DatasetValidationConfiguration
from etl.epc.settings import EARLIEST_EPC_DATE
from recommendations.rdsap_tables import england_wales_age_band_lookup
from recommendations.recommendation_utils import (
get_wall_u_value, get_roof_u_value, get_floor_u_value, estimate_perimeter,
estimate_number_of_floors, get_wall_u_value, get_roof_u_value, get_floor_u_value, estimate_perimeter,
get_wall_type
)
@ -204,9 +204,14 @@ class TrainingDataset(BaseDataset):
# ~~~~~~~~~~~~~~~~~~
# Floor
# ~~~~~~~~~~~~~~~~~~
self.df['estimated_number_of_floors'] = self.df.apply(
lambda row: estimate_number_of_floors(row['property_type']),
axis=1
)
self.df['estimated_perimeter_starting'] = self.df.apply(
lambda row: estimate_perimeter(row["total_floor_area_starting"], row["number_habitable_rooms"]),
lambda row: estimate_perimeter(row["total_floor_area_starting"]/ row['estimated_number_of_floors'], row["number_habitable_rooms"]/ row['estimated_number_of_floors']),
axis=1
)
self.df['estimated_perimeter_ending'] = self.df.apply(
@ -244,7 +249,7 @@ class TrainingDataset(BaseDataset):
self.df[f"{component}_thermal_transmittance"] = self.df[f"{component}_thermal_transmittance"].fillna(eval(f"{component}_starting_uvalue"))
self.df[f"{component}_thermal_transmittance_ending"] = self.df[f"{component}_thermal_transmittance_ending"].fillna(eval(f"{component}_ending_uvalue"))
self.df = self.df.drop(columns=["floor_type", "wall_type", "walls_clean_description", "walls_clean_description_ending"])
self.df = self.df.drop(columns=["floor_type", "wall_type", "walls_clean_description", "walls_clean_description_ending", 'estimated_number_of_floors'])
def _adjust_assumed_values_in_wall_descriptions(self):
@ -386,7 +391,7 @@ class TrainingDataset(BaseDataset):
suffixes=("", "_ending")
)
# Drop inconsistent properties
# Drop properties where key material types have changed
expanded_df = self._drop_inconsistent_properties(expanded_df, component)
# Drop original cols and cols to drop

View file

@ -3,6 +3,7 @@ import pandas as pd
from typing import List
from pathlib import Path
from tqdm import tqdm
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.Record import EPCRecord, EPCDifferenceRecord
@ -52,7 +53,7 @@ def get_cleaned_description_mapping():
return cleaned
clean_lookup = get_cleaned_description_mapping()
class EPCPipeline:
"""
@ -67,6 +68,7 @@ class EPCPipeline:
def __init__(
self,
epc_data_processor: EPCDataProcessor,
api_epc_records: dict = None,
directories: List[Path] | None = None,
run_mode="training",
epc_local_file="certificates.csv",
@ -91,6 +93,7 @@ class EPCPipeline:
self.directories = directories
self.epc_data_processor = epc_data_processor
self.api_epc_records = api_epc_records
self.run_mode = run_mode
self.epc_local_file = epc_local_file
self.epc_bucket_name = epc_bucket_name
@ -113,9 +116,15 @@ class EPCPipeline:
"""
Main function to run the newdata pipeline
"""
prepared_epc = EPCRecord(self.api_epc_records, run_mode="newdata") # This uses all the epc records to clean the data
self.epc_data_processor.insert_data(prepared_epc)
self.epc_data_processor.prepare_data()
data = self.epc_data_processor.data
epc_records = [EPCRecord(**x, run_mode="newdata") for x in data.to_dict(orient='records')]
def run_training_dataset_pipeline(self):
@ -125,7 +134,7 @@ class EPCPipeline:
if self.directories is None:
raise ValueError("Directories not specified - Unable to run Training pipeline")
for directory in self.directories:
for directory in tqdm(self.directories):
self.process_directory(directory)
# save_dataframe_to_s3_parquet(
@ -165,7 +174,7 @@ class EPCPipeline:
if difference_records is not None:
constituency_difference_records.extend(difference_records)
constituency_dataset = TrainingDataset(datasets=constituency_difference_records, cleaned_lookup=get_cleaned_description_mapping())
constituency_dataset = TrainingDataset(datasets=constituency_difference_records, cleaned_lookup=clean_lookup)
self.compiled_dataset = pd.concat([self.compiled_dataset, constituency_dataset.df])

View file

@ -1,10 +1,17 @@
from datetime import datetime
from dataclasses import dataclass
from etl.epc.ValidationConfiguration import (
EPCRecordValidationConfiguration,
EPCDifferenceRecordValidationConfiguration,
EPCDifferenceRecordFixedDataValidationConfiguration
)
from etl.epc.DataProcessor import EPCDataProcessor
from recommendations.rdsap_tables import england_wales_age_band_lookup, FLOOR_LEVEL_MAP
from etl.epc.settings import DATA_ANOMALY_MATCHES, BUILT_FORM_REMAP
import re
import os
import numpy as np
import pandas as pd
from typing import Any, Union, List
from etl.epc.settings import (
RDSAP_RESPONSE,
@ -13,6 +20,8 @@ from etl.epc.settings import (
COMPONENT_FEATURES,
EFFICIENCY_FEATURES
)
from utils.s3 import read_dataframe_from_s3_parquet
from etl.epc.settings import EARLIEST_EPC_DATE
# TODO: Change these in the settings file
RDSAP_RESPONSE = RDSAP_RESPONSE.lower()
@ -21,78 +30,235 @@ CARBON_RESPONSE = CARBON_RESPONSE.lower()
COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES]
EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES]
ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev')
DATA_BUCKET = os.environ.get('DATA_BUCKET', 'retrofit-data-dev' if ENVIRONMENT == 'dev' else None)
@dataclass
class EPCRecord:
"""
Base class for a EPC record
"""
uprn: str
walls_description: str
floor_description : str
lighting_description : str
roof_description : str
mainheat_description : str
hotwater_description : str
main_fuel : str
mechanical_ventilation : str
secondheat_description : str
windows_description : str
glazed_type : str
multi_glaze_proportion : float
low_energy_lighting : float
number_open_fireplaces : float
mainheatcont_description : str
solar_water_heating_flag : str
photo_supply : float
transaction_type : str
energy_tariff : str
extension_count : float
total_floor_area : float
floor_height : float
hot_water_energy_eff : str
floor_energy_eff : str
windows_energy_eff : str
walls_energy_eff : str
sheating_energy_eff : str
roof_energy_eff : str
mainheat_energy_eff : str
mainheatc_energy_eff : str
lighting_energy_eff : str
potential_energy_efficiency : float
environment_impact_potential : float
energy_consumption_potential : float
co2_emissions_potential : float
lodgement_date : str
current_energy_efficiency : int
energy_consumption_current : int
co2_emissions_current : float
uprn: int = None
walls_description: str = None
floor_description : str = None
lighting_description : str = None
roof_description : str = None
mainheat_description : str = None
hotwater_description : str = None
main_fuel : str = None
mechanical_ventilation : str = None
secondheat_description : str = None
windows_description : str = None
glazed_type : str = None
multi_glaze_proportion : float = None
low_energy_lighting : float = None
number_open_fireplaces : float = None
mainheatcont_description : str = None
solar_water_heating_flag : str = None
photo_supply : float = None
transaction_type : str = None
energy_tariff : str = None
extension_count : float = None
total_floor_area : float = None
floor_height : float = None
hot_water_energy_eff : str = None
floor_energy_eff : str = None
windows_energy_eff : str = None
walls_energy_eff : str = None
sheating_energy_eff : str = None
roof_energy_eff : str = None
mainheat_energy_eff : str = None
mainheatc_energy_eff : str = None
lighting_energy_eff : str = None
potential_energy_efficiency : float = None
environment_impact_potential : float = None
energy_consumption_potential : float = None
co2_emissions_potential : float = None
lodgement_date : str = None
current_energy_efficiency : int = None
energy_consumption_current : int = None
co2_emissions_current : float = None
u_values_walls = None
u_values_roof = None
u_values_floor = None
# u_values_walls = None
# u_values_roof = None
# u_values_floor = None
run_mode: str = "training"
# TODO: Make this a class so thet api_records is structured
epc_records: dict = None
full_sap_epc: dict = None
old_data: list[dict] = None
original_epc: dict = None
prepared_epc: dict = None
prepared_epc_delta_metadata: pd.DataFrame = None
cleaning_data: pd.DataFrame = None
# Not used in training mod but used in newdata mode
age_band: str = None
construction_age_band: str = None
year_built: int = None
number_of_floors: int = None
number_of_open_fireplaces: int = None
def __post_init__(self):
# We can have validation and cleaning steps for each of the fields
# self.WALLS_DESCRIPTION = 'check'
# Could also have cleaning of records if needed
self.validation_configuration = EPCRecordValidationConfiguration
# Could also have cleaning of records if needed
# self._field_validation()
self._clean_records()
if self.run_mode == "newdata":
self._expand_description_to_features()
self._expand_description_to_uvalues()
if self.run_mode == "training":
self.validation_configuration = EPCRecordValidationConfiguration
# self._field_validation()
return
# We are running in newdata mode
if self.epc_records is None:
raise ValueError("Must provide epc records if running in newdata mode")
self.prepared_epc = self.epc_records['original_epc']
self.original_epc = self.epc_records['original_epc'].copy()
self.full_sap_epc = self.epc_records['full_sap_epc']
self.old_data = self.epc_records['old_data']
if self.cleaning_data is None:
raise ValueError("Must provide cleaning data if running in newdata mode")
self._clean_records_using_epc_records()
self._clean_with_data_processor()
self._temp_uprn_catch()
self._expand_prepared_epc_to_attributes()
self._identify_delta_between_prepared_and_original_records()
# Process to create uvalues for the single epc record
# selff.df = self.epc_record_as_dataframe('prepared_epc')
# self._feature_generation()
# self._drop_features()
return
self._expand_description_to_features()
self._expand_description_to_uvalues()
# self._generate_uvalues()
# self._validate_expanded_description()
# self._validate_u_values()
# etc
pass
def _drop_features(self):
"""
Drop features that are not needed for modelling
"""
self.df = self.df.drop(columns=["lodgement_date_starting", "lodgement_date_ending"])
def _feature_generation(self):
"""
Generate features for modelling
"""
self.df["days_to_lodgement_date"] = self._calculate_days_to(self.prepared_epc["lodgement_date"])
@staticmethod
def _calculate_days_to(lodgement_date):
if isinstance(lodgement_date, str):
return (
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
).days
return (
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
).dt.days
def _temp_uprn_catch(self):
"""
Catch the case we do now have uprn
"""
if self.prepared_epc["uprn"] == "":
self.prepared_epc["uprn"] = 0
def _clean_with_data_processor(self):
"""
This method will clean the records using the data processor
"""
epc_data_processor = EPCDataProcessor(
data=self.epc_record_as_dataframe("prepared_epc"),
run_mode="newdata",
cleaning_averages=self.cleaning_data
)
epc_data_processor.prepare_data()
self.prepared_epc = epc_data_processor.data.to_dict(orient="records")[0]
def _expand_prepared_epc_to_attributes(self):
"""
This method will expand the prepared epc to attributes
"""
# for key, value in self.prepared_epc.items():
# setattr(self, key, value)
self.uprn: int = int(self.prepared_epc["uprn"])
self.walls_description: str = self.prepared_epc["walls_description"]
self.floor_description : str = self.prepared_epc["floor_description"]
self.lighting_description : str = self.prepared_epc["lighting_description"]
self.roof_description : str = self.prepared_epc["roof_description"]
self.mainheat_description : str = self.prepared_epc["mainheat_description"]
self.hotwater_description : str = self.prepared_epc["hotwater_description"]
self.main_fuel : str = self.prepared_epc["main_fuel"]
self.mechanical_ventilation : str = self.prepared_epc["mechanical_ventilation"]
self.secondheat_description : str = self.prepared_epc["secondheat_description"]
self.windows_description : str = self.prepared_epc["windows_description"]
self.glazed_type : str = self.prepared_epc["glazed_type"]
self.multi_glaze_proportion : float = float(self.prepared_epc["multi_glaze_proportion"])
self.low_energy_lighting : float = float(self.prepared_epc["low_energy_lighting"])
self.number_open_fireplaces : float = float(self.prepared_epc["number_open_fireplaces"])
self.mainheatcont_description : str = self.prepared_epc["mainheatcont_description"]
self.solar_water_heating_flag : str = self.prepared_epc["solar_water_heating_flag"]
self.photo_supply : float = float(self.prepared_epc["photo_supply"])
self.transaction_type : str = self.prepared_epc["transaction_type"]
self.energy_tariff : str = self.prepared_epc["energy_tariff"]
self.extension_count : float = float(self.prepared_epc["extension_count"])
self.total_floor_area : float = float(self.prepared_epc["total_floor_area"])
self.floor_height : float = float(self.prepared_epc["floor_height"])
self.hot_water_energy_eff : str = self.prepared_epc["hot_water_energy_eff"]
self.floor_energy_eff : str = self.prepared_epc["floor_energy_eff"]
self.windows_energy_eff : str = self.prepared_epc["windows_energy_eff"]
self.walls_energy_eff : str = self.prepared_epc["walls_energy_eff"]
self.sheating_energy_eff : str = self.prepared_epc["sheating_energy_eff"]
self.roof_energy_eff : str = self.prepared_epc["roof_energy_eff"]
self.mainheat_energy_eff : str = self.prepared_epc["mainheat_energy_eff"]
self.mainheatc_energy_eff : str = self.prepared_epc["mainheatc_energy_eff"]
self.lighting_energy_eff : str = self.prepared_epc["lighting_energy_eff"]
self.potential_energy_efficiency : float = float(self.prepared_epc["potential_energy_efficiency"])
self.environment_impact_potential : float = float(self.prepared_epc["environment_impact_potential"])
self.energy_consumption_potential : float = float(self.prepared_epc["energy_consumption_potential"])
self.co2_emissions_potential : float = float(self.prepared_epc["co2_emissions_potential"])
self.lodgement_date : str = self.prepared_epc["lodgement_date"]
self.current_energy_efficiency : int = int(self.prepared_epc["current_energy_efficiency"])
self.energy_consumption_current : int = int(self.prepared_epc["energy_consumption_current"])
self.co2_emissions_current : float = float(self.prepared_epc["co2_emissions_current"])
def _identify_delta_between_prepared_and_original_records(self):
"""
This method will identify the delta between the prepared and original records
"""
prepared_epc_df = self.epc_record_as_dataframe("prepared_epc")
original_epc_df = self.epc_record_as_dataframe("original_epc")
df = pd.concat([prepared_epc_df, original_epc_df], keys=["prepared_epc", "original_epc"], axis=0)
same_index = df.apply(pd.Series.duplicated).any()
self.prepared_epc_delta_metadata = df[same_index[~same_index].index]
def _expand_description_to_features(self):
pass
@ -132,10 +298,28 @@ class EPCRecord:
# )
def _clean_records(self):
def _clean_records_using_epc_records(self):
"""
This method will clean the records
"""
# TODO: Move all the cleaning steps in the Property class into there
self._clean_built_form()
self._clean_energy()
self._clean_ventilation()
self._clean_solar_pv()
self._clean_solar_hot_water()
self._clean_wind_turbine()
self._clean_count_variables()
self._clean_heat_loss_corridor()
self._clean_mains_gas()
self._clean_age_band()
self._clean_year_built()
self._clean_floor_area()
self._clean_property_dimensions()
self._clean_number_lighting_outlets()
self._clean_floor_level()
# self._clean_potential_energy_efficiency()
# self._clean_environment_impact_potential()
# self._clean_energy_consumption_potential()
@ -144,6 +328,305 @@ class EPCRecord:
# self._clean_energy_consumption_current()
# self._clean_co2_emissions_current()
def epc_record_as_dataframe(self, epc_type: str = "prepared_epc", use_upper_columns: bool = True, replace_empty_string: bool = False):
"""
This method will return the dataframe representation of the epc record
"""
df = pd.DataFrame.from_dict(self.get(epc_type), orient="index").T
if use_upper_columns:
df.columns = [x.upper().replace("-","_") for x in df.columns]
if replace_empty_string:
df = df.replace("", np.nan)
return df
def _clean_floor_level(self):
"""
This method will clean the floor level, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["floor-level"] = (
FLOOR_LEVEL_MAP[self.prepared_epc["floor-level"]] if
self.prepared_epc["floor-level"] not in DATA_ANOMALY_MATCHES else None
)
def _clean_number_lighting_outlets(self):
"""
This method will clean the number of lighting outlets, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
if self.prepared_epc["fixed-lighting-outlets-count"] == "":
# We check old EPCs and the full SAP EPC
lighting_data = []
if len(self.old_data):
lighting_data.extend([
int(old_record["fixed-lighting-outlets-count"]) for old_record in self.old_data if
old_record["fixed-lighting-outlets-count"] != ""
])
if len(self.full_sap_epc):
if self.full_sap_epc["fixed-lighting-outlets-count"] != "":
lighting_data.append(int(self.full_sap_epc["fixed-lighting-outlets-count"]))
if lighting_data:
self.prepared_epc["fixed-lighting-outlets-count"] = round(np.median(lighting_data))
else:
# Use averages from the cleaning dataset, based on the property type, built form, construction age band and local authority
cleaned_property_data = EPCDataProcessor.apply_averages_cleaning(
data_to_clean=self.epc_record_as_dataframe("prepared_epc", replace_empty_string=True),
cleaning_data=self.cleaning_data,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
)
self.prepared_epc["fixed-lighting-outlets-count"] = round(cleaned_property_data["FIXED_LIGHTING_OUTLETS_COUNT"].values[0])
else:
self.prepared_epc["fixed-lighting-outlets-count"] = float(self.prepared_epc["fixed-lighting-outlets-count"])
def _filter_property_dimensions(self, property_dimensions):
"""
Will filter the property dimensions dataframe to only include the relevant rows for the property
:param property_dimensions:
:return: filtered property dimensions dataframe
"""
result = property_dimensions[(property_dimensions["PROPERTY_TYPE"] == self.prepared_epc["property-type"])]
if self.construction_age_band is not None and self.construction_age_band not in DATA_ANOMALY_MATCHES:
result = result[(result["CONSTRUCTION_AGE_BAND"] == self.construction_age_band)]
if self.prepared_epc["built-form"] not in DATA_ANOMALY_MATCHES and self.prepared_epc["built-form"] in result["BUILT_FORM"]:
result = result[(result["BUILT_FORM"] == self.prepared_epc["built-form"])]
return result[["NUMBER_HABITABLE_ROOMS", "TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]].mean()
def _clean_property_dimensions(self):
"""
Cleans up the number of floors, number of habitable rooms, and the floor height
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
if not self.prepared_epc["number-habitable-rooms"] or (
self.prepared_epc["floor-height"] == "" or self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES
):
property_dimensions = read_dataframe_from_s3_parquet(
bucket_name=DATA_BUCKET, file_key=f"property_dimensions/{self.prepared_epc['local-authority']}.parquet"
)
self.property_dimensions = self._filter_property_dimensions(property_dimensions)
if not self.prepared_epc["number-habitable-rooms"]:
self.prepared_epc["number-habitable-rooms"] = float(self.property_dimensions["NUMBER_HABITABLE_ROOMS"].round())
else:
self.prepared_epc["number-habitable-rooms"] = float(self.prepared_epc["number-habitable-rooms"])
if self.prepared_epc["property-type"] == "House":
self.number_of_floors = 2
elif self.prepared_epc["property-type"] in ["Flat", "Bungalow"]:
self.number_of_floors = 1
elif self.prepared_epc["property-type"] == "Maisonette":
self.number_of_floors = 2
else:
raise NotImplementedError("Implement me")
if self.prepared_epc["floor-height"] == "" or self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES:
self.prepared_epc["floor-height"] = float(self.property_dimensions["FLOOR_HEIGHT"].round(2))
else:
self.prepared_epc["floor-height"] = float(self.prepared_epc["floor-height"])
def _clean_floor_area(self):
"""
This method will clean the floor area, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["total-floor-area"] = float(self.prepared_epc["total-floor-area"])
def _clean_mains_gas(self):
"""
This method will clean the mains gas, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
map = {
"Y": True,
"N": False,
}
self.prepared_epc["mains-gas-flag"] = None if (
self.prepared_epc["mains-gas-flag"] == "" or self.prepared_epc["mains-gas-flag"] in DATA_ANOMALY_MATCHES
) else map[self.prepared_epc["mains-gas-flag"]]
def _clean_heat_loss_corridor(self):
"""
This method will clean the heat loss corridor, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
map = {
"no corridor": False,
"unheated corridor": True,
"heated corridor": False
}
self.prepared_epc["heat-loss-corridor"] = False if self.prepared_epc["heat-loss-corridor"] in DATA_ANOMALY_MATCHES else map[self.prepared_epc["heat-loss-corridor"]]
self.prepared_epc["unheated-corridor-length"] = float(self.prepared_epc["unheated-corridor-length"]) if self.prepared_epc["unheated-corridor-length"] != "" else None
def _clean_count_variables(self):
"""
This method will clean the count variables, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
fields = {
"number_of_open_fireplaces": "number-open-fireplaces",
"number_of_extensions": "extension-count",
"number_of_storeys": "flat-storey-count",
"number_of_rooms": "number-habitable-rooms",
}
null_attributes = ["number_of_storeys", "number_of_rooms"]
for attribute, epc_field in fields.items():
# TODO: check this
# value = self.data["extension-count"]
value = self.prepared_epc[epc_field]
if value == "" or value in DATA_ANOMALY_MATCHES:
if attribute in null_attributes:
value = None
else:
value = 0
else:
value = int(value)
self.prepared_epc[attribute] = value
def _clean_wind_turbine(self):
"""
This method will clean the wind turbine, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc['wind-turbine-count'] = int(self.prepared_epc['wind-turbine-count']) if self.prepared_epc['wind-turbine-count'] != "" else None
def _clean_solar_hot_water(self):
"""
This method will clean the solar hot water, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
value_map = {
"Y": True,
"N": False,
"": None,
}
self.prepared_epc['solar-water-heating-flag'] = value_map[self.prepared_epc['solar-water-heating-flag']]
def _clean_solar_pv(self):
"""
This method will clean the solar pv, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc['photo-supply'] = float(self.prepared_epc['photo-supply']) if self.prepared_epc['photo-supply'] != "" else None
def _clean_energy(self):
"""
This method will clean the energy, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc['energy-consumption-current'] = float(self.prepared_epc["energy-consumption-current"])
self.prepared_epc['co2-emissions-current'] = float(self.prepared_epc["co2-emissions-current"])
def _clean_built_form(self):
"""
This method will clean the build form, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc['built-form'] = BUILT_FORM_REMAP.get(self.prepared_epc["built-form"], self.prepared_epc["built-form"])
if self.prepared_epc["built-form"] in DATA_ANOMALY_MATCHES:
if self.prepared_epc["property-type"] == "Flat":
self.prepared_epc["built-form"] = "Semi-Detached"
def _clean_age_band(self):
"""
This method will clean the age band, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.construction_age_band = EPCDataProcessor.clean_construction_age_band(self.prepared_epc["construction-age-band"])
if self.construction_age_band in DATA_ANOMALY_MATCHES:
if self.old_data:
# Take the most recent
max_datetime = max(
[old_record["lodgement-datetime"] for old_record in self.old_data if
old_record["construction-age-band"] not in DATA_ANOMALY_MATCHES]
)
most_recent = [old_record for old_record in self.old_data if old_record["lodgement-datetime"] == max_datetime]
self.construction_age_band = EPCDataProcessor.clean_construction_age_band(
most_recent[0]["construction-age-band"]
)
self.age_band = england_wales_age_band_lookup.get(self.construction_age_band)
if (self.prepared_epc["transaction-type"] == "new dwelling") and (self.age_band is None):
self.age_band = "L"
self.construction_age_band = 'England and Wales: 2012 onwards'
if self.age_band is None:
raise ValueError("age_band is missing")
def _clean_year_built(self):
"""
This method will clean the year built, if empty or invalid
"""
if self.full_sap_epc:
self.year_built = datetime.strptime(self.full_sap_epc["lodgement-date"], '%Y-%m-%d').year
return
if self.construction_age_band not in DATA_ANOMALY_MATCHES:
# Take the lower limit. If we're pessimistic about the age of the property, that at least means we have
# more options for recommendations if that age falls before the year that insulation in walls became
# common practice
band = [int(x) for x in re.findall(r'\b\d{4}\b', self.prepared_epc["construction-age-band"])]
self.year_built = band[0]
return
# We don't know when the property was built
self.year_built = None
def _clean_ventilation(self):
"""
This method will clean the ventilation, if empty or invalid
"""
self.prepared_epc['mechanical-ventilation'] = None if (self.mechanical_ventilation == "" or self.mechanical_ventilation in DATA_ANOMALY_MATCHES) else self.mechanical_ventilation
def _field_validation(self):
"""

View file

@ -1,4 +1,4 @@
import pandas as pd
from pathlib import Path
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.Pipeline import EPCPipeline
@ -11,18 +11,17 @@ def main():
"""
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
directories = directories[0:2]
# directories = directories[125:128]
epc_pipeline = EPCPipeline(directories=directories, epc_data_processor=EPCDataProcessor(run_mode="training"))
epc_pipeline.run()
# For testing
epc_pipeline.compiled_dataset
epc_pipeline.compiled_all_equal_rows
import pandas as pd
pd.concat(epc_pipeline.compiled_cleaning_averages)
dataset_df = epc_pipeline.compiled_dataset
dataset_df.to_parquet("refactor_datasets/dataset.parquet")
pd.DataFrame(epc_pipeline.compiled_all_equal_rows).to_parquet("refactor_datasets/all_equal_rows.parquet")
pd.concat(epc_pipeline.compiled_cleaning_averages).to_parquet("refactor_datasets/cleaning_averages.parquet")
if __name__ == "__main__":

View file

@ -2,6 +2,59 @@
# TODO: migrate to dynaconf
from pathlib import Path
DATA_ANOMALY_MATCHES = {
# Invalid reports are where the value provided is out of bounds, e.g. a negative energy rating of -1199 or a
# non-integer, there is no valid energy band for this, so it is marked as INVALID!
"INVALID",
"INVALID!",
# When the energy certificate was first lodged on the register there was no requirement to lodge this data
# item, i.e. a non-mandatory item.
"NO DATA!",
"NODATA!",
# When the energy certificate was first lodged on the register there was no requirement to lodge this data item,
# i.e.a non - mandatory item.
"N/A",
# A value generated by the register to account for a data item that was not mandatory when the lodgement of
# the energy certificate occurred. When the data item became mandatory the register operator, for backwards
# compatibility purposes, populated the data field with a value of not recorded to ensure that the energy
# certificate retrieval process is successfully completed. Mandatory data items cannot be applied
# retrospectively to energy certificates lodged before the date of the change.
"Not recorded",
# The data also contains DECs with an operational rating of 9999 (a default DEC). The production of a
# default DEC value was allowed to enable building occupiers, with poor quality or no energy data,
# the opportunity to comply with the regulations. From April 2011 the ability to lodge a default DEC was no
# longer allowed.
"9999",
# The Building Emission Rate (BER) data field for non-domestic buildings may contain a blank value. The BER
# was only lodged on the register from 7 March 2010.
"Blank"
# There are currently just over 8,600 records where the local authority identifier is null. This is due to
# the Register Operator not being able to match the building address in the Markermap Ordinance Survey (GB)
# lookup tables or OS MasterMap Address Layer 2 data. The majority of these addresses have been requested
# manually by energy assessors for inclusion by the Register Operator in the registers (e.g. new builds,
# etc). These records are being published for completeness. An ongoing process to manage these manually added
# addresses will take time to develop to deal with these and future anomalies.
#
# There are several fields within the lodged data where it is possible to enter multiple entries to cater for
# different data_types of build within a single property, i.e. extensions. This results in multiple entries for
# the description fields for floor, roof and wall. For the purposes of this data release only the information
# contained within the first of these multiple entries is being provided. As there are no restrictions on the
# value in this first field it means that sometimes the first field in a multiple entry description field may
# contain a null value. A resolution to correct these anomalies will be considered for future data releases.
"NULL",
# We sometimes see fields populated with just an empty string.
""
}
DATA_ANOMALY_SUBSTRINGS = {
# Where values in a pick list that have been superseded by another value. For example, where a value for
# pitched roof has been replaced by three sub-categories of pitched roof. The original value is retained
# but for backward compatibility only it is appended to ensure that the energy certificate retrieval
# process can be successfully completed. Replacement data items cannot be applied retrospectively to energy
# certificates lodged on the register before the date of the change.
"for backward compatibility only"
}
METRIC_FILENAME = "metrics.csv"
OPTIMISE_METRIC = "mean_absolute_error"