Merge pull request #270 from Hestia-Homes/etl-michael

Etl michael
This commit is contained in:
KhalimCK 2024-01-16 16:58:57 +00:00 committed by GitHub
commit 6e8c83c228
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 12347 additions and 1045 deletions

8
.gitignore vendored
View file

@ -241,6 +241,7 @@ fabric.properties
# Locally stored data
local_data/*
/local_data/*
etl/epc/local_data/*
*.DS_Store
infrastructure/terraform/.terraform*
@ -255,7 +256,7 @@ open_uprn/.idea/
conservation_areas/.idea/
model_data/.idea/
model_data/simulation_system/.idea/
model_data/simulation_system/
model_data/simulation_system/data*
model_data/simulation_system/model_directory/
model_data/simulation_system/predictions/
@ -264,4 +265,7 @@ model_data/simulation_system/predictions/
.idea/misc.iml
adhoc
adhoc/*
adhoc/*
etl-router-venv/
refactor_datasets/

View file

@ -5,8 +5,9 @@ import os
import numpy as np
import pandas as pd
from etl.epc.DataProcessor import DataProcessor
from etl.epc.settings import POTENTIAL_COLUMNS, EFFICIENCY_FEATURES, BUILT_FORM_REMAP
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.Dataset import TrainingDataset
from etl.epc.settings import LATEST_FIELD, MANDATORY_FIXED_FEATURES, POTENTIAL_COLUMNS, EFFICIENCY_FEATURES, BUILT_FORM_REMAP
from etl.epc_clean.epc_attributes.all_cleaners import all_cleaner_map
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from utils.logger import setup_logger
@ -17,6 +18,7 @@ from recommendations.recommendation_utils import (
estimate_perimeter, get_wall_type, estimate_external_wall_area, esimtate_pitched_roof_area, estimate_windows
)
ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev')
DATA_BUCKET = os.environ.get('DATA_BUCKET', 'retrofit-data-dev' if ENVIRONMENT == 'dev' else None)
@ -48,45 +50,67 @@ class Property(Definitions):
spatial = None
def __init__(self, id, address, postcode, data=None, old_data=None, full_sap_epc=None):
def __init__(self, id, postcode, address, epc_record, data=None):
self.epc_record = epc_record
self.id = id
self.address = address
self.postcode = postcode
self.data = data
self.old_data = old_data
self.full_sap_epc = full_sap_epc
self.data = {k.replace("_", "-"): v for k,v in epc_record.get("prepared_epc").items()}
self.old_data = epc_record.get("old_data")
self.property_dimensions = None
self.uprn = None if data is None else int(data["uprn"])
self.uprn = epc_record.get("uprn")
self.full_sap_epc = epc_record.get("full_sap_epc")
self.in_conservation_area, self.is_listed, self.is_heritage = None, None, None
self.restricted_measures = False
self.year_built = None
self.number_of_rooms = None
self.age_band = None
self.construction_age_band = None
self.number_of_floors = None
self.year_built = epc_record.get("year_built")
self.number_of_rooms = epc_record.prepared_epc.get("number_of_rooms")
self.age_band = epc_record.get("age_band")
self.construction_age_band = epc_record.get("construction_age_band")
self.number_of_floors = epc_record.get("number_of_floors")
self.perimeter = None
self.wall_type = None
self.floor_type = None
self.energy = None
self.ventilation = None
self.solar_pv = None
self.solar_hot_water = None
self.wind_turbine = None
self.number_of_open_fireplaces = None
self.number_of_extensions = None
self.number_of_storeys = None
self.heat_loss_corridor = None
self.mains_gas = None
self.floor_height = None
self.energy = {
"primary_energy_consumption": epc_record.get("energy_consumption_current"),
"co2_emissions": epc_record.get("co2_emissions_current"),
}
self.ventilation = {
"ventilation": epc_record.get("mechanical_ventilation"),
}
self.solar_pv = {
"solar_pv": epc_record.get("photo_supply"),
}
self.solar_hot_water = {
"solar_hot_water": epc_record.get("solar_water_heating_flag"),
}
self.wind_turbine = {
"wind_turbine": epc_record.prepared_epc.get("wind_turbine_count"),
}
self.number_of_open_fireplaces = {
"number_of_open_fireplaces": epc_record.prepared_epc.get("number_of_open_fireplaces"),
}
self.number_of_extensions = {
"number_of_extensions": epc_record.prepared_epc.get("number_of_extensions"),
}
self.number_of_storeys = {
"number_of_storeys": epc_record.prepared_epc.get("number_of_storeys"),
}
self.heat_loss_corridor = {
"heat_loss_corridor": epc_record.prepared_epc.get("heat_loss_corridor"),
"length": epc_record.prepared_epc.get("unheated_corridor_length"),
}
self.mains_gas = epc_record.prepared_epc.get('mains_gas_flag')
self.floor_height = epc_record.prepared_epc.get('floor_height')
self.insulation_wall_area = None
self.floor_area = None
self.floor_area = epc_record.prepared_epc.get('total_floor_area')
self.pitched_roof_area = None
self.insulation_floor_area = None
self.number_lighting_outlets = None
self.number_lighting_outlets = epc_record.prepared_epc.get("fixed_lighting_outlets_count")
self.floor_level = None
self.number_of_windows = None
self.solar_pv_roof_area = None
@ -95,98 +119,66 @@ class Property(Definitions):
self.current_adjusted_energy = None
self.expected_adjusted_energy = None
def set_energy(self):
self.recommendations_scoring_data = []
def create_base_difference_epc_record(self, cleaned_lookup: dict):
"""
Extracts and formats data about the home's energy and co2 consumption
To being with, this is just formatting epc data
Data:
- primary_energy_consumption
This is based on the "energy-consumption-current" field in the EPC data.
Current estimated total energy consumption for the property in a 12 month period (kWh/m2). Displayed on EPC
as the current primary energy use per square metre of floor area.
- co2_emissions
This is based on the "co2-emissions-current" field in the EPC data.
CO₂ emissions per year in tonnes/year.
Creates a EPCDifferenceRecord object, which is used to store the difference between the current and
expected EPC
It will be the same starting and ending EPC, as we don't have the expected EPC yet
"""
self.energy = {
"primary_energy_consumption": float(self.data["energy-consumption-current"]),
"co2_emissions": float(self.data["co2-emissions-current"]),
}
difference_record = self.epc_record - self.epc_record
def set_ventilation(self):
# TODO: change these lower and replace in the settings file
fixed_data_col_names = MANDATORY_FIXED_FEATURES + LATEST_FIELD
print("NEED TO CHANGE THE DASH TO LOWER CASE")
fixed_data_col_names = [x.lower().replace("_", "-") for x in fixed_data_col_names]
fixed_data = {k.replace("-", "_"):v for k,v in self.data.items() if k in fixed_data_col_names}
difference_record.append_fixed_data(fixed_data)
self.base_difference_record = TrainingDataset(datasets=[difference_record], cleaned_lookup=cleaned_lookup)
# TODO: adjust the base difference record with the previously calculated u values + features
# estimated_perimeter is different to the perimeter in the epc record
# self.base_difference_record.df
def adjust_difference_record_with_recommendations(self, property_recommendations):
"""
Extracts and formats data about the home's ventilation
To being with, this is just formatting epc data
Data:
- ventilation
This is based on the "ventilation-type" field in the EPC data.
Ventilation type of the property.
"""
ventilation = self.data["mechanical-ventilation"]
# perform some simple cleaning - when checking 300k epc, the only unique values were
# {'', 'mechanical, supply and extract', 'NO DATA!', 'natural', 'mechanical, extract only'}
if ventilation in self.DATA_ANOMALY_MATCHES or ventilation in [""]:
ventilation = None
self.ventilation = {
"ventilation": ventilation,
}
def set_solar_pv(self):
"""
Extracts and formats data about the home's solar pv
To being with, this is just formatting epc data
Data:
- solar_pv
This is based on the "photo-supply" field in the EPC data.
When checking 100k epc, either the value was "" or a stringified number
"""
solar_pv = self.data["photo-supply"]
if solar_pv in ["", None]:
solar_pv = None
else:
solar_pv = float(solar_pv)
self.solar_pv = {
"solar_pv": solar_pv,
}
def set_solar_hot_water(self):
"""
Extracts and formats data about the home's solar hot water
We are just formatting the solar-water-heating-flag in the epc data
This method will adjust the difference record, based on the recommendations made for the property
:param recommendations: dictionary of recommendations for the property
:return:
"""
value_map = {
"Y": True,
"N": False,
"": None,
None: None,
}
for recommendations_by_type in property_recommendations:
for i, rec in enumerate(recommendations_by_type):
scoring_dict = self.create_recommendation_scoring_data(
recommendation=rec,
)
scoring_dict['id'] = "+".join([str(self.id), str(rec["recommendation_id"])])
self.solar_hot_water = {
"solar_hot_water": value_map[self.data["solar-water-heating-flag"]],
}
self.recommendations_scoring_data.append(scoring_dict)
def create_recommendation_scoring_data(self, recommendation: dict):
def set_wind_turbine(self):
"""
Extracts and formats data about the home's wind turbine
We are just formatting the wind-turbine-flag in the epc data
:return:
"""
recommendation_record = self.base_difference_record.df.to_dict("records")[0].copy()
wind_turbine_count = self.data["wind-turbine-count"]
if wind_turbine_count == "":
wind_turbine_count = None
for col in [
"walls_insulation_thickness", "floor_insulation_thickness", "roof_insulation_thickness"
]:
if recommendation_record[col] is None:
recommendation_record[col] = "none"
# We update the description to indicate it's insulated
if recommendation["type"] in ["internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"]:
# The upgrade made here is to the u-value of the walls and the description of the
# insulation thickness
recommendation_record["walls_thermal_transmittance_ending"] = recommendation["new_u_value"]
recommendation_record["walls_insulation_thickness_ending"] = "above average"
recommendation_record["walls_energy_eff_ending"] = "Good"
else:
wind_turbine_count = int(wind_turbine_count)
@ -242,28 +234,9 @@ class Property(Definitions):
if not self.data:
raise ValueError("Property does not contain data")
# We need to implement an EPC cleaning process, which we run on the EPC data, immediately after we download
# it
self.data["built-form"] = BUILT_FORM_REMAP.get(self.data["built-form"], self.data["built-form"])
if self.data["built-form"] in self.DATA_ANOMALY_MATCHES:
if self.data["property-type"] in ["Flat", "Maisonette"]:
self.data["built-form"] = "End-Terrace"
self.set_year_built()
self.set_energy()
self.set_ventilation()
self.set_solar_pv()
self.set_solar_hot_water()
self.set_wind_turbine()
self.set_count_variables()
self.set_heat_loss_corridor()
self.set_mains_gas()
self.set_age_band()
self.set_basic_property_dimensions()
for description, attribute in cleaned.items():
if self.data[description] in self.DATA_ANOMALY_MATCHES:
template = cleaned[description][0]
fill_dict = dict(zip(template.keys(), [None] * len(template)))
@ -306,40 +279,6 @@ class Property(Definitions):
photo_supply_lookup=photo_supply_lookup, floor_area_decile_thresholds=floor_area_decile_thresholds
)
def set_age_band(self):
"""
Sets a cleaned version of the age band of the property given the EPC data
:return:
"""
if not self.data:
raise ValueError("Property does not contain data")
self.construction_age_band = DataProcessor.clean_construction_age_band(self.data["construction-age-band"])
if self.construction_age_band in self.DATA_ANOMALY_MATCHES:
if self.old_data:
# Take the most recent
max_datetime = max(
[x["lodgement-datetime"] for x in self.old_data if
x["construction-age-band"] not in self.DATA_ANOMALY_MATCHES]
)
most_recent = [x for x in self.old_data if x["lodgement-datetime"] == max_datetime]
self.construction_age_band = DataProcessor.clean_construction_age_band(
most_recent[0]["construction-age-band"]
)
self.age_band = england_wales_age_band_lookup.get(self.construction_age_band)
if (self.data["transaction-type"] == "new dwelling") and (self.age_band is None):
self.age_band = "L"
self.construction_age_band = 'England and Wales: 2012 onwards'
if self.age_band is None:
logger.info("Age band is missing - filling with national average")
self.age_band = "C"
self.construction_age_band = "England and Wales: 1930-1949"
def set_spatial(self, spatial: pd.DataFrame):
"""
Sets whether the property is in a conservation area given the output of the ConservationAreaClient
@ -367,72 +306,6 @@ class Property(Definitions):
"is_heritage_building": spatial_dict["is_heritage_building"],
}
def set_year_built(self):
"""
Estimates when the property was built based on as much available data as possible.
"""
if self.full_sap_epc:
self.year_built = datetime.strptime(self.full_sap_epc["lodgement-date"], '%Y-%m-%d').year
return
if self.data["construction-age-band"] not in self.DATA_ANOMALY_MATCHES:
# Take the lower limit. If we're pessimistic about the age of the property, that at least means we have
# more options for recommendations if that age falls before the year that insulation in walls became
# common practice
band = [int(x) for x in re.findall(r'\b\d{4}\b', self.data["construction-age-band"])]
self.year_built = band[0]
return
# We don't know when the property was built
self.year_built = None
def set_heat_loss_corridor(self):
"""
cleans the heat-loss-corridor
:return:
"""
map = {
"no corridor": False,
"unheated corridor": True,
"heated corridor": False,
None: False
}
if self.data["heat-loss-corridor"] in self.DATA_ANOMALY_MATCHES:
has_heat_loss_corridor = False
else:
has_heat_loss_corridor = map[self.data["heat-loss-corridor"]]
length = self.data["unheated-corridor-length"]
if length in ["", None]:
length = None
else:
length = float(length)
self.heat_loss_corridor = {
"heat_loss_corridor": has_heat_loss_corridor,
"length": length
}
def set_mains_gas(self):
"""
Sets whether the property has mains gas
:return:
"""
map = {
"Y": True,
"N": False,
}
if self.data["mains-gas-flag"] == "" or self.data["mains-gas-flag"] in self.DATA_ANOMALY_MATCHES:
self.mains_gas = None
else:
self.mains_gas = map[self.data["mains-gas-flag"]]
def _clean_upload_data(self, to_update):
for k, v in to_update.items():
if v in self.DATA_ANOMALY_MATCHES:
@ -580,36 +453,6 @@ class Property(Definitions):
:return:
"""
self.floor_area = float(self.data["total-floor-area"])
if not self.data["number-habitable-rooms"] or (
self.data["floor-height"] in ["", None] or self.data["floor-height"] in self.DATA_ANOMALY_MATCHES
):
if self.property_dimensions is None:
property_dimensions = read_dataframe_from_s3_parquet(
bucket_name=DATA_BUCKET, file_key=f"property_dimensions/{self.data['local-authority']}.parquet"
)
self.property_dimensions = self._filter_property_dimensions(property_dimensions)
if not self.data["number-habitable-rooms"]:
self.number_of_rooms = float(self.property_dimensions["NUMBER_HABITABLE_ROOMS"].round())
else:
self.number_of_rooms = float(self.data["number-habitable-rooms"])
if self.data["property-type"] == "House":
self.number_of_floors = 2
elif self.data["property-type"] in ["Flat", "Bungalow"]:
self.number_of_floors = 1
elif self.data["property-type"] == "Maisonette":
self.number_of_floors = 2
else:
raise NotImplementedError("Implement me")
if self.data["floor-height"] in [None, ""] or self.data["floor-height"] in self.DATA_ANOMALY_MATCHES:
self.floor_height = float(self.property_dimensions["FLOOR_HEIGHT"].round(2))
else:
self.floor_height = float(self.data["floor-height"])
self.perimeter = estimate_perimeter(
self.floor_area / self.number_of_floors, self.number_of_rooms / self.number_of_floors
)
@ -630,7 +473,8 @@ class Property(Definitions):
def set_floor_level(self):
self.floor_level = (
FLOOR_LEVEL_MAP[self.data["floor-level"]] if
self.data["floor-level"] not in list(self.DATA_ANOMALY_MATCHES) + [None] else None
self.data["floor-level"] not in self.DATA_ANOMALY_MATCHES and self.data['floor-level'] is not None
else None
)
if self.floor_level is None:
@ -775,7 +619,7 @@ class Property(Definitions):
**hotwater,
**windows,
"SECONDHEAT_DESCRIPTION": second_heating,
"DAYS_TO": DataProcessor.calculate_days_to(self.data["lodgement-date"]),
"DAYS_TO": EPCDataProcessor.calculate_days_to(self.data["lodgement-date"]),
"SAP": float(self.data["current-energy-efficiency"]),
"CARBON": float(self.data["co2-emissions-current"]),
"HEAT_DEMAND": float(self.data["energy-consumption-current"]),

View file

@ -2,6 +2,8 @@ from datetime import datetime
import numpy as np
import pandas as pd
from epc_api.client import EpcClient
from etl.epc.Record import EPCRecord
from backend.SearchEpc import SearchEpc
from fastapi import APIRouter, Depends
from sqlalchemy.exc import IntegrityError, OperationalError
@ -23,13 +25,14 @@ from backend.app.db.models.portfolio import rating_lookup
from backend.app.dependencies import validate_token
from backend.app.plan.schemas import PlanTriggerRequest
from backend.app.plan.utils import create_recommendation_scoring_data, get_cleaned
from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, sap_to_epc
from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, read_parquet_from_s3, sap_to_epc
from backend.ml_models.api import ModelApi
from backend.Property import Property
from etl.epc.DataProcessor import DataProcessor
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.settings import COLUMNS_TO_MERGE_ON
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from recommendations.optimiser.CostOptimiser import CostOptimiser
from recommendations.optimiser.GainOptimiser import GainOptimiser
from recommendations.optimiser.optimiser_functions import prepare_input_measures
@ -50,6 +53,7 @@ router = APIRouter(
responses={404: {"description": "Not found"}}
)
# TODO: Need to install base.txt requirements into new env
@router.post("/trigger")
async def trigger_plan(body: PlanTriggerRequest):
@ -60,9 +64,11 @@ async def trigger_plan(body: PlanTriggerRequest):
try:
session.begin()
logger.info("Getting the inputs")
epc_client = EpcClient(auth_token=get_settings().EPC_AUTH_TOKEN)
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
input_properties = []
for config in plan_input:
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
@ -89,24 +95,32 @@ async def trigger_plan(body: PlanTriggerRequest):
heat_demand_target=None
)
epc_records ={
'original_epc': epc_searcher.newest_epc,
'full_sap_epc': epc_searcher.full_sap_epc,
'old_data': epc_searcher.old_data,
}
prepared_epc = EPCRecord(epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data) # This uses all the epc records to clean the data
input_properties.append(
Property(
id=property_id,
address=epc_searcher.address_clean,
postcode=epc_searcher.postcode_clean,
data=epc_searcher.newest_epc,
old_data=epc_searcher.older_epcs,
full_sap_epc=epc_searcher.full_sap_epc,
address1=config['address'],
postcode=config['postcode'],
epc_record=prepared_epc,
)
)
if not input_properties:
if not input_properties:
return Response(status_code=204)
# The materials data could be cached or local so we don't need to make
# consistent requests to the backend for
# the same data
logger.info("Reading in data sources required for the engine")
logger.info("Reading in materials and cleaned datasets")
materials = get_materials(session)
cleaned = get_cleaned()
@ -133,14 +147,6 @@ async def trigger_plan(body: PlanTriggerRequest):
# Property recommendations
p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds)
# This is temp - this should happen after scoring
cleaned_property_data = DataProcessor.apply_averages_cleaning(
data_to_clean=pd.DataFrame([dict(**p.get_model_data(), LOCAL_AUTHORITY=p.data["local-authority"])]),
cleaning_data=cleaning_data,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
)
p.set_number_lighting_outlets(cleaned_property_data)
recommender = Recommendations(property_instance=p, materials=materials)
property_recommendations = recommender.recommend()
@ -149,64 +155,17 @@ async def trigger_plan(body: PlanTriggerRequest):
recommendations[p.id] = property_recommendations
# Finally, we'll prepare data for predicting the impact on SAP
data_processor = DataProcessor(None, newdata=True)
data_processor.insert_data(pd.DataFrame([p.get_model_data()]))
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
p.adjust_difference_record_with_recommendations(property_recommendations)
data_processor.pre_process()
recommendations_scoring_data.extend(p.recommendations_scoring_data)
starting_epc_data = data_processor.get_component_features(suffix="_STARTING")
ending_epc_data = data_processor.get_component_features(suffix="_ENDING")
fixed_data = data_processor.get_fixed_features()
# We update the ending record with the recommended updates and we set lodgement date to today
ending_epc_data["DAYS_TO_ENDING"] = data_processor.calculate_days_to(created_at)
property_scoring_data[p.id] = {
"starting_epc_data": starting_epc_data,
"ending_epc_data": ending_epc_data,
"fixed_data": fixed_data
}
for recommendations_by_type in property_recommendations:
for i, rec in enumerate(recommendations_by_type):
scoring_dict = create_recommendation_scoring_data(
property=p,
recommendation=rec,
starting_epc_data=starting_epc_data,
ending_epc_data=ending_epc_data,
fixed_data=fixed_data,
)
recommendations_scoring_data.append(scoring_dict)
# cleanup
del data_processor
logger.info("Preparing data for scoring in sap change api")
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
# Perform the same cleaning as in the model - first clean number of room variables though
recommendations_scoring_data = DataProcessor.apply_averages_cleaning(
data_to_clean=recommendations_scoring_data,
cleaning_data=cleaning_data,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
)
recommendations_scoring_data = DataProcessor.apply_averages_cleaning(
data_to_clean=recommendations_scoring_data,
cleaning_data=cleaning_data,
cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"],
).drop(columns=["LOCAL_AUTHORITY"])
recommendations_scoring_data = DataProcessor.clean_missings_after_description_process(
recommendations_scoring_data,
ignore_cols=[c for c in recommendations_scoring_data.columns if ("thermal_transmittance" in c) or (
"insulation_thickness" in c) or ("ENERGY_EFF" in c)]
)
recommendations_scoring_data = DataProcessor.clean_efficiency_variables(recommendations_scoring_data)
recommendations_scoring_data = recommendations_scoring_data.drop(
columns=["rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending", "carbon_ending"]
)
model_api = ModelApi(portfolio_id=body.portfolio_id, timestamp=created_at)
all_predictions = model_api.predict_all(
@ -513,6 +472,11 @@ async def trigger_plan(body: PlanTriggerRequest):
update_or_create_property_spatial_details(session, p.uprn, p.spatial)
# TODO: TEMP
if p.data["uprn"] == "":
print("Get rid of me!")
p.data["uprn"] = 0
property_data = p.get_full_property_data()
update_property_data(
session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data
@ -542,7 +506,9 @@ async def trigger_plan(body: PlanTriggerRequest):
valuations = PropertyValuation.estimate(property_instance=p, target_epc=new_epc)
property_valuation_increases.append(valuations["average_increase"])
property_valuation_increases.append(
valuations["average_increased_value"] - valuations["current_value"]
)
# Commit the session after each batch
session.commit()
@ -560,7 +526,7 @@ async def trigger_plan(body: PlanTriggerRequest):
# recommendation from being default to not default, we'll need to re-run this process to re-calculate the
# the portfolion level impact
total_valuation_increase = sum([v for v in property_valuation_increases if v is not None])
total_valuation_increase = sum(property_valuation_increases)
labour_days = round(max(
[sum(r["labour_days"] for r in rec_group if r["default"]) for p_id, rec_group in recommendations.items()]
))

View file

@ -5,6 +5,9 @@ from BaseUtility import Definitions
from etl.epc.settings import (
DATA_PROCESSOR_SETTINGS,
EARLIEST_EPC_DATE,
IGNORED_TRANSACTION_TYPES,
IGNORED_FLOOR_LEVELS,
IGNORED_PROPERTY_TYPES,
FULLY_GLAZED_DESCRIPTIONS,
AVERAGE_FIXED_FEATURES,
BUILT_FORM_REMAP,
@ -24,8 +27,15 @@ from recommendations.rdsap_tables import FLOOR_LEVEL_MAP
from typing import List
# TODO: change the setting columns to lower
STARTING_SUFFIX_COMPONENT_COLS = [x.lower() for x in STARTING_SUFFIX_COMPONENT_COLS]
NO_SUFFIX_COMPONENT_COLS = [x.lower() for x in NO_SUFFIX_COMPONENT_COLS]
ENDING_SUFFIX_COMPONENT_COLS = [x.lower() for x in ENDING_SUFFIX_COMPONENT_COLS]
POTENTIAL_COLUMNS = [x.lower() for x in POTENTIAL_COLUMNS]
# These lookups are used to clean the construction age band
bounds_map = {
construction_age_bounds_map = {
"England and Wales: before 1900": {"l": 0, "u": 1899},
"England and Wales: 1930-1949": {"l": 1930, "u": 1949},
"England and Wales: 1900-1929": {"l": 1900, "u": 1929},
@ -40,13 +50,13 @@ bounds_map = {
"England and Wales: 2012 onwards": {"l": 2012, "u": 3000},
}
remap = {
construction_age_remap = {
"England and Wales: 2007 onwards": "England and Wales: 2007-2011"
}
expanded_map = {
i: [
label for label, bounds in bounds_map.items() if (i <= bounds["u"]) and (i >= bounds['l'])
label for label, bounds in construction_age_bounds_map.items() if (i <= bounds["u"]) and (i >= bounds['l'])
][0] for i in range(0, 3001)
}
@ -59,26 +69,202 @@ def is_int(x):
return False
class DataProcessor:
class EPCDataProcessor:
"""
Handle data loading and data preprocessing
"""
def __init__(self, filepath: Path | None, newdata: bool = False) -> None:
def __init__(self, data: pd.DataFrame | None = None, cleaning_averages: pd.DataFrame | None = None, run_mode: str = "training", violation_mode: bool = False) -> None:
"""
:param filepath: If specified, is the physical location of the data
:param newdata: Indicates if we are processing new, testing data.
:param is_newdata: Indicates if we are processing new, testing data.
In this instance, there are some operations we do not
want to perform, such as confine_data()
"""
self.filepath = filepath
self.data = None
self.newdata = newdata
is_data_a_dataframe = isinstance(data, pd.DataFrame)
self.data : pd.DataFrame = data if is_data_a_dataframe else pd.DataFrame()
def load_data(self, low_memory=False) -> None:
if not self.filepath:
is_cleaning_averages_a_dataframe = isinstance(cleaning_averages, pd.DataFrame)
self.cleaning_averages : pd.DataFrame = cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame()
# FOR NOW IF VIOLATION MODE IS ON, WE USE RUN MODE AS NEWDATA
self.violation_mode = violation_mode
if run_mode not in ["training", "newdata"]:
raise ValueError("Run mode must be either training or newdata")
self.run_mode = run_mode if not violation_mode else "newdata"
def prepare_data(self, filepath: Path | str | None = None) -> None:
"""
Given the run mode, we apply the relevant pipeline steps
Ignore step is used to highlight which steps are not needed in newdata
"""
ignore_step = True if self.run_mode == "newdata" else False
if filepath is not None:
self.load_data(filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
if len(self.data) == 0:
raise Exception("No data to process - check filepath/ data being passed in")
self.confine_data(ignore_step=ignore_step)
self.remap_anomalies()
self.remap_floor_level(ignore_step=ignore_step)
self.remap_build_form()
self.cast_data_column_values_to_lower()
self.standardise_construction_age_band(ignore_step=ignore_step)
self.clean_missing_rooms(ignore_step=ignore_step)
self.recast_df_columns(
column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"]
)
self.clean_multi_glaze_proportion(ignore_step=ignore_step)
self.clean_photo_supply()
self.retain_multiple_epc_properties(
epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"], ignore_step=ignore_step
)
self.fill_na_fields()
self.sort_data_by_uprn_lodgement_date(ignore_step=ignore_step)
# Final re-casting after data transformed and prepared
self.recast_df_columns(column_mappings=COLUMNTYPES, auto_subset_columns=True)
self.recast_all_data(column_mappings=COLUMNTYPES, auto_subset_columns=True)
self.na_remapping(auto_subset_columns=True)
self.fill_invalid_constituency_fields(ignore_step=ignore_step)
self.make_cleaning_averages(ignore_step=ignore_step)
# TODO: check if this has impact on training dataset
cleaned_data = self.apply_averages_cleaning(
data_to_clean=self.data,
cleaning_data=self.cleaning_averages,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
)
cleaned_data = self.apply_averages_cleaning(
data_to_clean=self.data,
cleaning_data=self.cleaning_averages,
cols_to_merge_on=COLUMNS_TO_MERGE_ON,
)
self.data = self.data if cleaned_data is None else cleaned_data
self.add_local_authority_to_cleaning_average(ignore_step=ignore_step)
self.cast_cleaning_averages_columns_to_lower(ignore_step=ignore_step)
self.cast_data_columns_to_lower()
def cast_data_columns_to_lower(self):
"""
Convert all columns names to lower
"""
self.data.columns = self.data.columns.str.lower()
def cast_cleaning_averages_columns_to_lower(self, ignore_step: bool = False):
"""
Convert all column names to lower
No need in newdata mode
"""
if ignore_step:
return
self.cleaning_averages.columns = self.cleaning_averages.columns.str.lower()
def add_local_authority_to_cleaning_average(self, ignore_step: bool = False):
"""
Add the Local authority column to the cleaning averages
No need in newdata mode
"""
if ignore_step:
return
self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0]
def fill_invalid_constituency_fields(self, ignore_step: bool = False):
"""
For some weird cases, where data has missing constituency, we add a dummy value
"""
if self.violation_mode:
# TODO: to fill in
return
if ignore_step:
return
self.data = self.data.fillna({"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]})
def sort_data_by_uprn_lodgement_date(self, ignore_step: bool = False):
"""
Order data by uprn and lodgement data
No Violation mode needed
"""
if ignore_step:
return
self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
def cast_data_column_values_to_lower(self):
"""
For given columns, cast values to lower
No Violation mode or newdata modes required
"""
convert_to_lower = ["TRANSACTION_TYPE"]
for col in convert_to_lower:
self.data[col] = self.data[col].str.lower()
def remap_build_form(self):
"""
Remap build form to standard values
No Violation mode or newdata modes required
"""
self.data["BUILT_FORM"] = self.data["BUILT_FORM"].replace(BUILT_FORM_REMAP)
def remap_anomalies(self):
"""
Remap anomalies to None
No Violation mode or newdata modes required
"""
# Map all anomaly values to None
data_anomaly_map = dict(
zip(
Definitions.DATA_ANOMALY_MATCHES,
[None] * len(Definitions.DATA_ANOMALY_MATCHES),
)
)
# Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values
data = self.data.replace(data_anomaly_map)
data = data.replace(np.NAN, None)
self.data = data
def remap_floor_level(self, ignore_step: bool = False):
"""
Remap floor level to standard values
"""
if self.violation_mode:
# TODO: We need to handle this case
return
if ignore_step:
return
self.data["FLOOR_LEVEL"] = self.data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP)
def load_data(self, filepath, low_memory=False) -> None:
if not filepath:
raise ValueError("No filepath specified")
self.data = pd.read_csv(self.filepath, low_memory=low_memory)
self.data = pd.read_csv(filepath, low_memory=low_memory)
def insert_data(self, data: pd.DataFrame) -> None:
self.data = data
@ -90,11 +276,11 @@ class DataProcessor:
return x
# Next, we check if it's a value in our map
if bounds_map.get(x):
if construction_age_bounds_map.get(x):
return x
# We check if it's a standard remap value
remap_value = remap.get(x, None)
remap_value = construction_age_remap.get(x, None)
if remap_value:
return remap_value
@ -105,12 +291,19 @@ class DataProcessor:
raise NotImplementedError("Not handled the case for value %s" % x)
def standardise_construction_age_band(self):
def standardise_construction_age_band(self, ignore_step: bool = False):
"""
This function will tidy up some of the non-standard values that are populated in the construction age
band, which is useful for cleaning
"""
if self.violation_mode:
# TODO: to fill in
return
if ignore_step:
return
self.data["CONSTRUCTION_AGE_BAND"] = self.data["CONSTRUCTION_AGE_BAND"].apply(
lambda x: self.clean_construction_age_band(x)
)
@ -119,7 +312,7 @@ class DataProcessor:
~pd.isnull(self.data["CONSTRUCTION_AGE_BAND"])
]
def clean_missing_rooms(self):
def clean_missing_rooms(self, ignore_step: bool = False):
"""
For the number of heated rooms and number of habitable rooms, we clean these values up front,
based on property archetype and age
@ -127,6 +320,14 @@ class DataProcessor:
TODO: We could use a model based impution approach for possibly more accurate cleaning
"""
if self.violation_mode:
# TODO: to fill in
return
if ignore_step:
return
# TODO: DO we want to move this out of this function? (i.e. alter the data before we do any cleaning)
self.data["POSTAL_AREA"] = self.data["POSTCODE"].apply(lambda x: x.split(" ")[0])
def apply_clean(data, matching_columns):
@ -164,59 +365,78 @@ class DataProcessor:
break
to_index -= 1
def pre_process(self) -> pd.DataFrame:
"""
Load data and begin initial cleaning
"""
if self.data is None:
self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
# def pre_process(self, filepath: Path | None = None) -> tuple[pd.DataFrame, pd.DataFrame]:
# """
# Load data and begin initial cleaning
# """
# if self.data is None:
# self.load_data(filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
if not self.newdata:
self.confine_data()
# if not self.is_newdata:
# self.confine_data()
self.remap_columns()
# self.remap_columns()
# We have some non-standard construction age bands which we'll clean for matching
if not self.newdata:
self.standardise_construction_age_band()
self.clean_missing_rooms()
# # We have some non-standard construction age bands which we'll clean for matching
# if not self.is_newdata:
# self.standardise_construction_age_band()
# self.clean_missing_rooms()
self.recast_df_columns(
column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"]
)
# self.recast_df_columns(
# column_mappings=DATA_PROCESSOR_SETTINGS["column_mappings"]
# )
if not self.newdata:
self.clean_multi_glaze_proportion()
# if not self.is_newdata:
# self.clean_multi_glaze_proportion()
self.clean_photo_supply()
# self.clean_photo_supply()
if not self.newdata:
self.retain_multiple_epc_properties(
epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"]
)
# if not self.is_newdata:
# self.retain_multiple_epc_properties(
# epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"]
# )
if DATA_PROCESSOR_SETTINGS["epc_minimum_count"] >= 1:
# If we have multiple EPC records, we can try and do filling
self.fill_na_fields()
# if DATA_PROCESSOR_SETTINGS["epc_minimum_count"] >= 1:
# # If we have multiple EPC records, we can try and do filling
# self.fill_na_fields()
if not self.newdata:
self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
# if not self.is_newdata:
# self.data = self.data.sort_values(["UPRN", "LODGEMENT_DATE"], ascending=True)
# Final re-casting after data transformed and prepared
coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.newdata else COLUMNTYPES
for k, v in coltypes.items():
self.data[k] = self.data[k].astype(v)
self.data = self.data.astype(coltypes)
# # Final re-casting after data transformed and prepared
# coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.is_newdata else COLUMNTYPES
# for k, v in coltypes.items():
# self.data[k] = self.data[k].astype(v)
# self.data = self.data.astype(coltypes)
self.na_remapping()
# self.na_remapping()
return self.data
# self.cleaning_averages = None
# if not self.is_newdata:
# # We have some odd cases with missing constituency so we fill
# self.data = self.data.fillna({"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]})
def na_remapping(self):
# self.cleaning_averages = self.make_cleaning_averages()
# # We apply averages cleaning to the data
# self.data = self.apply_averages_cleaning(
# data_to_clean=self.data,
# cleaning_data=self.cleaning_averages,
# cols_to_merge_on=COLUMNS_TO_MERGE_ON
# )
# self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0]
# self.cleaning_averages.columns = self.cleaning_averages.columns.str.lower()
# self.data.columns = self.data.columns.str.lower()
# return self.data, self.cleaning_averages
def na_remapping(self, auto_subset_columns: bool = False):
fill_na_map_apply = {
k: v for k, v in fill_na_map.items() if k in self.data.columns
} if self.newdata else fill_na_map
} if auto_subset_columns else fill_na_map
for column, fill_value in fill_na_map_apply.items():
self.data[column] = self.data[column].fillna(fill_value)
@ -243,35 +463,15 @@ class DataProcessor:
["FLOOR_HEIGHT", "TOTAL_FLOOR_AREA"]
].replace("", None)
def remap_columns(self):
def make_cleaning_averages(self, ignore_step: bool = False) -> pd.DataFrame:
"""
Remap all columns, for any non values
Create a dataset to hold averages based on property type, built form, construction age, and rooms.
Not require in newdata mode
"""
# Map all anomaly values to None
data_anomaly_map = dict(
zip(
Definitions.DATA_ANOMALY_MATCHES,
[None] * len(Definitions.DATA_ANOMALY_MATCHES),
)
)
if ignore_step:
return pd.DataFrame()
# Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values
data = self.data.replace(data_anomaly_map)
data = data.replace(np.NAN, None)
# Remap certain columns
if not self.newdata:
data["FLOOR_LEVEL"] = data["FLOOR_LEVEL"].replace(FLOOR_LEVEL_MAP)
data["BUILT_FORM"] = data["BUILT_FORM"].replace(BUILT_FORM_REMAP)
convert_to_lower = ["TRANSACTION_TYPE"]
for col in convert_to_lower:
data[col] = data[col].str.lower()
self.data = data
def make_cleaning_averages(self) -> pd.DataFrame:
# Define a custom function to calculate the median, excluding missing values
def median_without_missing(group):
return group[AVERAGE_FIXED_FEATURES].median(skipna=True)
@ -368,13 +568,20 @@ class DataProcessor:
# "FLOOR_HEIGHT"
# ].fillna(FLOOR_HEIGHT_NATIONAL_AVERAGE)
return cleaning_averages_filled
self.cleaning_averages = cleaning_averages_filled
def retain_multiple_epc_properties(self, epc_minimum_count: int = 1) -> None:
def retain_multiple_epc_properties(self, epc_minimum_count: int = 1, ignore_step: bool = False) -> None:
"""
Reduce the data futher by keeping only datasets with multiple epcs
"""
if self.violation_mode:
# TODO: to fill in
return
if ignore_step:
return
counts = self.data.groupby("UPRN").size().reset_index()
counts.columns = ["UPRN", "count"]
@ -382,22 +589,78 @@ class DataProcessor:
counts = counts[counts["count"] > epc_minimum_count]
self.data = pd.merge(self.data, counts, on="UPRN")
def recast_df_columns(self, column_mappings: dict) -> None:
def recast_df_columns(self, column_mappings: dict, auto_subset_columns: bool = False) -> None:
"""
Recast columns from the dataframe to ensure the behaviour we want
"""
if auto_subset_columns:
column_mappings = {k: v for k, v in column_mappings.items() if k in self.data.columns}
for key, values in column_mappings.items():
if key not in self.data.columns:
raise ValueError("Column mapping incorrectly specified")
for value in values:
self.data[key] = self.data[key].astype(value)
if isinstance(values, list):
for value in values:
self.data[key] = self.data[key].astype(value)
else:
self.data[key] = self.data[key].astype(values)
def recast_all_data(self, column_mappings: dict, auto_subset_columns: bool = False) -> None:
"""
Using a dictionary to recast all columns at once
"""
def confine_data(self) -> None:
if auto_subset_columns:
column_mappings = {k: v for k, v in column_mappings.items() if k in self.data.columns}
self.data = self.data.astype(column_mappings)
def confine_data(self, ignore_step: bool = False):
"""
Include all step to reduce down the data based on assumptions
"""
if self.violation_mode:
violation_uprn_missing = pd.isnull(self.data["UPRN"])
violation_old_lodgment_date = self.data["LODGEMENT_DATE"] < EARLIEST_EPC_DATE
violation_invalid_transaction_type = self.data["TRANSACTION_TYPE"] == IGNORED_TRANSACTION_TYPES
violation_ignored_floor_level = self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS)
violation_rdsap_score_above_max = self.data[RDSAP_RESPONSE] > MAX_SAP_SCORE
violation_missing_windows_description = pd.isnull(self.data["WINDOWS_DESCRIPTION"])
violation_missing_hotwater_description = pd.isnull(self.data["HOTWATER_DESCRIPTION"])
violation_missing_roof_description = pd.isnull(self.data["ROOF_DESCRIPTION"])
violation_invalid_property_type = self.data["PROPERTY_TYPE"] == IGNORED_PROPERTY_TYPES
violation_df = pd.concat(
[
violation_uprn_missing,
violation_old_lodgment_date,
violation_invalid_transaction_type,
violation_ignored_floor_level,
violation_rdsap_score_above_max,
violation_missing_windows_description,
violation_missing_hotwater_description,
violation_missing_roof_description,
violation_invalid_property_type,
], axis=1,
keys=[
"violation_uprn_missing",
"violation_old_lodgment_date",
"violation_invalid_transaction_type",
"violation_ignored_floor_level",
"violation_rdsap_score_above_max",
"violation_missing_windows_description",
"violation_missing_hotwater_description",
"violation_missing_roof_description",
"violation_invalid_property_type",
]
)
self.data = pd.concat([self.data, violation_df], axis=1)
if ignore_step:
return
# Filter 1: UPRN is a unique identifier for a property, so we remove any EPCs that don't have one
# Filter 2: Lodgement date is the date the EPC was lodged, so we remove any EPCs that were lodged
@ -416,9 +679,9 @@ class DataProcessor:
self.data = self.data[~pd.isnull(self.data["UPRN"])]
self.data = self.data[self.data["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE]
self.data = self.data[self.data["TRANSACTION_TYPE"] != "new dwelling"]
self.data = self.data[self.data["TRANSACTION_TYPE"] != IGNORED_TRANSACTION_TYPES]
self.data = self.data[
~self.data["FLOOR_LEVEL"].isin(["top floor", "mid floor"])
~self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS)
]
self.data = self.data[self.data[RDSAP_RESPONSE] <= MAX_SAP_SCORE]
@ -430,16 +693,24 @@ class DataProcessor:
# Because park homes are surveyed unusually (for example, we don't have u-values to
# look up for their different components, they need to be collected in survey and aren't reflected in
# EPCs) we'll ignore them from the model
self.data = self.data[self.data["PROPERTY_TYPE"] != "Park home"]
self.data = self.data[self.data["PROPERTY_TYPE"] != IGNORED_PROPERTY_TYPES]
def clean_multi_glaze_proportion(self) -> None:
def clean_multi_glaze_proportion(self, ignore_step: bool = False) -> None:
"""
If there is no multi-glaze proportion but the windows are fully glazed, then we should assume a score of 100
"""
if self.violation_mode:
# TODO:
return
if ignore_step:
return
no_multi_glaze_proportion_index = pd.isnull(
self.data["MULTI_GLAZE_PROPORTION"]
) & (self.data["WINDOWS_DESCRIPTION"].isin(FULLY_GLAZED_DESCRIPTIONS))
self.data.loc[no_multi_glaze_proportion_index, "MULTI_GLAZE_PROPORTION"] = 100
def clean_photo_supply(self) -> None:
@ -450,7 +721,7 @@ class DataProcessor:
self.data["PHOTO_SUPPLY"] = self.data["PHOTO_SUPPLY"].fillna(0)
@staticmethod
def apply_averages_cleaning(data_to_clean, cleaning_data, cols_to_merge_on, colnames=None):
def apply_averages_cleaning(data_to_clean, cleaning_data, cols_to_merge_on, colnames=None, ignore_step: bool = False):
"""
Clean the input DataFrame using averages from a cleaning DataFrame.
@ -462,6 +733,9 @@ class DataProcessor:
:return: Cleaned DataFrame.
"""
if ignore_step:
return None
# The desired colnames to clean - which may not be present
if colnames is None:
colnames = ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT", "FIXED_LIGHTING_OUTLETS_COUNT"]
@ -514,8 +788,8 @@ class DataProcessor:
:return: Pandas dataframe containing the subset of columns defined in COMPONENT_FEATURES
"""
if suffix not in ["_STARTING", "_ENDING"]:
raise Exception("Suffix should be one of _STARTING or _ENDING")
if suffix not in ["_starting", "_ending"]:
raise Exception("Suffix should be one of _starting or _ending")
if suffix == "_STARTING":
starting_cols = self.data[STARTING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES].copy().add_suffix(suffix)
@ -577,6 +851,7 @@ class DataProcessor:
for col in missings.index:
unique_values = df[col].unique()
# TODO: confirm this behaviour
if True in unique_values or False in unique_values:
df[col] = df[col].fillna(False)
if "none" in unique_values:

529
etl/epc/Dataset.py Normal file
View file

@ -0,0 +1,529 @@
import numpy as np
import pandas as pd
from typing import List
from etl.epc.Record import EPCDifferenceRecord
from etl.epc.ValidationConfiguration import DatasetValidationConfiguration
from etl.epc.settings import EARLIEST_EPC_DATE
from recommendations.rdsap_tables import england_wales_age_band_lookup
from recommendations.recommendation_utils import (
estimate_number_of_floors, get_wall_u_value, get_roof_u_value, get_floor_u_value, estimate_perimeter,
get_wall_type
)
class BaseDataset:
"""
Base class for all datasets
"""
def __init__(self) -> None:
self.pipeline_steps = {}
def validate_dataset(self):
"""
Validate the dataset against the validation configuration
"""
self.dataset_validation: dict = DatasetValidationConfiguration
# def pipeline_factory(self, pipeline_type: str) -> dict:
# """
# Factory method for creating a pipeline
# """
# if pipeline_type not in self.pipeline_steps:
# raise ValueError(f"Pipeline type {pipeline_type} not found")
# return self.pipeline_steps[pipeline_type]
class TrainingDataset(BaseDataset):
"""
A collection of EPCDifferenceRecords can be combined into a TrainingDataset.
"""
def __init__(self, datasets: List[EPCDifferenceRecord], cleaned_lookup: dict) -> None:
# self.pipeline_steps = self.pipeline_factory("training")
self.datasets = datasets
self.df = pd.DataFrame([dataset.difference_record for dataset in datasets])
self._feature_generation()
self._drop_features()
self._clean_efficiency_variables()
self._null_validation(information="Clean Efficiency Variables")
self._expand_description_to_features(cleaned_lookup)
self._adjust_assumed_values_in_wall_descriptions()
self._generate_u_values_from_features()
# TODO: For some of the features that we clean, we have either a true, false or possibly null value
# Those nulls should be False. clean_missings_after_description_process handles this but shouldn't
# need to
self._clean_missing_values()
self._null_validation(information="Clean Missing Values")
self._remove_abnormal_change_in_floor_area()
self._ensure_numeric()
def _remove_abnormal_change_in_floor_area(self):
"""
Remove properties where the change in floor area is greater than 100%
"""
self.df["tfa_diff_abs"] = abs(self.df["total_floor_area_ending"] - self.df["total_floor_area_starting"])
self.df["tfa_diff_prop"] = self.df["tfa_diff_abs"] / self.df["total_floor_area_starting"]
self.df = self.df[self.df["tfa_diff_prop"] < 0.5]
self.df = self.df.drop(columns=["tfa_diff_abs", "tfa_diff_prop"])
def _ensure_numeric(self):
"""
Ensure that all columns are numeric
"""
# TODO: move into EPCRecord record
uvalue_columns = [col for col in self.df.columns if "thermal_transmittance" in col]
for uvalue_col in uvalue_columns:
self.df[uvalue_col] = pd.to_numeric(self.df[uvalue_col])
@staticmethod
def _lambda_function_to_generate_roof_uvalue(row, is_end=False):
"""
Using the apply method, use the get_roof_u_value method to generate the u-value
"""
col_name = "roof_insulation_thickness" if not is_end else "roof_insulation_thickness_ending"
if row["has_dwelling_above"]:
if row["roof_thermal_transmittance"] != 0:
raise ValueError("Should have 0 u-value for roof")
if row["roof_thermal_transmittance_ending"] != 0:
raise ValueError("Should have 0 u-value for roof")
return get_roof_u_value(
insulation_thickness=row[col_name],
has_dwelling_above=row["has_dwelling_above"],
is_loft=row["is_loft"],
is_roof_room=row["is_roof_room"],
is_thatched=row["is_thatched"],
is_flat=row["is_flat"],
is_pitched=row["is_pitched"],
is_at_rafters=row["is_at_rafters"],
age_band=england_wales_age_band_lookup[row["construction_age_band"]]
)
@staticmethod
def _lambda_function_to_generate_wall_uvalue(row, is_end=False):
"""
Using the apply method, use the get_wall_u_value method to generate the u-value
"""
description_col_name = "walls_clean_description" if not is_end else "walls_clean_description_ending"
thermal_transistance_col_name = "walls_thermal_transmittance" if not is_end else "walls_thermal_transmittance_ending"
if pd.isnull(row[thermal_transistance_col_name]):
output = get_wall_u_value(
clean_description=row[description_col_name],
age_band=england_wales_age_band_lookup[row["construction_age_band"]],
is_granite_or_whinstone=row["is_granite_or_whinstone"],
is_sandstone_or_limestone=row["is_sandstone_or_limestone"],
)
else:
output = row[thermal_transistance_col_name]
return output
@staticmethod
def _lambda_function_to_generate_floor_uvalue(row, is_end=False):
"""
Using the apply method, use the get_floor_u_value method to generate the u-value
"""
floor_thermal_col_name = "floor_thermal_transmittance" if not is_end else "floor_thermal_transmittance_ending"
if row["another_property_below"]:
if row["floor_thermal_transmittance"] != 0:
raise ValueError("Should have 0 u-value for floor")
if row["floor_thermal_transmittance_ending"] != 0:
raise ValueError("Should have 0 u-value for floor")
return 0
else:
uvalue = row[floor_thermal_col_name]
if pd.isnull(uvalue):
insulation_col_name = "floor_insulation_thickness" if not is_end else "floor_insulation_thickness_ending"
floor_area_col_name = "estimated_perimeter_starting" if not is_end else "estimated_perimeter_ending"
perimeter_col_name = "total_floor_area_starting" if not is_end else "total_floor_area_ending"
uvalue = get_floor_u_value(
floor_type=row["floor_type"],
perimeter=row[floor_area_col_name],
area=row[perimeter_col_name],
insulation_thickness=row[insulation_col_name],
wall_type=row["wall_type"],
age_band=england_wales_age_band_lookup[row["construction_age_band"]]
)
return uvalue
def _generate_u_values_from_features(self):
"""
Generate u-values from the features
"""
# ~~~~~~~~~~~~~~~~~~
# Walls
# ~~~~~~~~~~~~~~~~~~
walls_starting_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_wall_uvalue(row),
axis=1
)
walls_ending_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_wall_uvalue(row, is_end=True),
axis=1
)
walls_starting_uvalue = self.df['walls_thermal_transmittance'].fillna(walls_starting_uvalue)
walls_starting_equals_ending_flag = self.df['walls_clean_description'] == self.df["walls_clean_description_ending"]
walls_ending_uvalue[walls_starting_equals_ending_flag] = walls_starting_uvalue[walls_starting_equals_ending_flag]
# ~~~~~~~~~~~~~~~~~~
# Roof
# ~~~~~~~~~~~~~~~~~~
roof_starting_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_roof_uvalue(row),
axis=1
)
roof_ending_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_roof_uvalue(row, is_end=True),
axis=1
)
roof_starting_uvalue = self.df['roof_thermal_transmittance'].fillna(roof_starting_uvalue)
roof_ending_uvalue = self.df['roof_thermal_transmittance_ending'].fillna(roof_ending_uvalue)
# ~~~~~~~~~~~~~~~~~~
# Floor
# ~~~~~~~~~~~~~~~~~~
self.df['estimated_number_of_floors'] = self.df.apply(
lambda row: estimate_number_of_floors(row['property_type']),
axis=1
)
self.df['estimated_perimeter_starting'] = self.df.apply(
lambda row: estimate_perimeter(row["total_floor_area_starting"]/ row['estimated_number_of_floors'], row["number_habitable_rooms"]/ row['estimated_number_of_floors']),
axis=1
)
self.df['estimated_perimeter_ending'] = self.df.apply(
lambda row: estimate_perimeter(row["total_floor_area_ending"], row["number_habitable_rooms"]),
axis=1
)
self.df["floor_type"] = self.df["is_suspended"].replace({True: "suspended", False: "solid"})
self.df["wall_type"] = self.df.apply(
lambda row: get_wall_type(
is_cavity_wall=row["is_cavity_wall"],
is_solid_brick=row["is_solid_brick"],
is_timber_frame=row["is_timber_frame"],
is_granite_or_whinstone=row["is_granite_or_whinstone"],
is_cob=row["is_cob"],
is_sandstone_or_limestone=row["is_sandstone_or_limestone"],
is_system_built=row["is_system_built"],
is_park_home=row["is_park_home"]
),
axis=1
)
floor_starting_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_floor_uvalue(row),
axis=1
)
floor_ending_uvalue = self.df.apply(
lambda row: self._lambda_function_to_generate_floor_uvalue(row, is_end=True),
axis=1
)
floor_starting_uvalue = self.df['floor_thermal_transmittance'].fillna(floor_starting_uvalue)
floor_ending_uvalue = self.df['floor_thermal_transmittance_ending'].fillna(floor_ending_uvalue)
for component in ["walls", "roof", "floor"]:
self.df[f"{component}_thermal_transmittance"] = self.df[f"{component}_thermal_transmittance"].fillna(eval(f"{component}_starting_uvalue"))
self.df[f"{component}_thermal_transmittance_ending"] = self.df[f"{component}_thermal_transmittance_ending"].fillna(eval(f"{component}_ending_uvalue"))
self.df = self.df.drop(columns=["floor_type", "wall_type", "walls_clean_description", "walls_clean_description_ending", 'estimated_number_of_floors'])
def _adjust_assumed_values_in_wall_descriptions(self):
"""
Strip out assumed values for all wall descriptions
"""
for col in ["walls_clean_description", "walls_clean_description_ending"]:
self.df[col] = self.df[col].str.replace("(assumed)", "").str.rstrip()
def _drop_inconsistent_properties(self, expanded_df: pd.DataFrame, component: str):
"""
Drop properties that have inconsistent data, i.e. changing material types
"""
if component == "walls":
expanded_df = expanded_df[
(expanded_df["is_cavity_wall"] == expanded_df["is_cavity_wall_ending"]) &
(expanded_df["is_solid_brick"] == expanded_df["is_solid_brick_ending"]) &
(expanded_df["is_timber_frame"] == expanded_df["is_timber_frame_ending"]) &
(expanded_df["is_granite_or_whinstone"] == expanded_df["is_granite_or_whinstone_ending"]) &
(expanded_df["is_cob"] == expanded_df["is_cob_ending"]) &
(expanded_df["is_sandstone_or_limestone"] == expanded_df["is_sandstone_or_limestone_ending"])
]
elif component == "floor":
expanded_df = expanded_df[
(expanded_df["is_suspended"] == expanded_df["is_suspended_ending"]) &
(expanded_df["is_solid"] == expanded_df["is_solid_ending"]) &
(expanded_df["another_property_below"] == expanded_df["another_property_below_ending"]) &
(expanded_df["is_to_unheated_space"] == expanded_df["is_to_unheated_space_ending"]) &
(expanded_df["is_to_external_air"] == expanded_df["is_to_external_air_ending"])
]
elif component == "roof":
expanded_df = expanded_df[
(expanded_df["is_pitched"] == expanded_df["is_pitched_ending"]) &
(expanded_df["is_roof_room"] == expanded_df["is_roof_room_ending"]) &
(expanded_df["is_loft"] == expanded_df["is_loft_ending"]) &
(expanded_df["is_flat"] == expanded_df["is_flat_ending"]) &
(expanded_df["is_thatched"] == expanded_df["is_thatched_ending"]) &
(expanded_df["is_at_rafters"] == expanded_df["is_at_rafters_ending"]) &
(expanded_df["has_dwelling_above"] == expanded_df["has_dwelling_above_ending"])
]
return expanded_df
def _expand_description_to_features(self, cleaned_lookup: dict):
"""
This method will merge on the cleaned lookup table and ensure that the building fabric in the
starting and ending EPC is consistent, so ensure that we are performing our modelling on the cleanest
possible dataset.
# We look for key building fabric features that have changed from one EPC to the next.
# if, for example, we see that a home has gone from being a cavity wall to a solid wall, we
# remove this record, as it indicates that the quality of the EPC conducted in the first instance
# is low
# We also replace descriptions with their cleaned variants
"""
cols_to_drop = {
"walls": [
# We need to cleaned descriptions for pulling out u-values
'original_description', 'thermal_transmittance_unit',
'original_description_ending',
'thermal_transmittance_unit_ending',
'is_cavity_wall_ending', 'is_filled_cavity_ending',
'is_solid_brick_ending', 'is_system_built_ending',
'is_timber_frame_ending', 'is_granite_or_whinstone_ending',
'is_as_built_ending', 'is_cob_ending', 'is_assumed_ending',
'is_sandstone_or_limestone_ending',
# Re remove the is_assumed columns
"is_assumed", "is_assumed_ending"
],
"floor": [
"original_description", "clean_description", "thermal_transmittance_unit",
"no_data", "no_data_ending", "original_description_ending",
"clean_description_ending", "thermal_transmittance_unit_ending",
"is_suspended_ending", "is_solid_ending", "another_property_below_ending",
"is_to_unheated_space_ending", "is_to_external_air_ending", "is_assumed",
"is_assumed_ending"
],
"roof": [
"original_description", "clean_description", "thermal_transmittance_unit",
"is_assumed", "is_valid", "original_description_ending", "clean_description_ending",
"thermal_transmittance_unit_ending", "is_pitched_ending", "is_roof_room_ending",
"is_loft_ending", "is_flat_ending", "is_thatched_ending", "is_at_rafters_ending",
"has_dwelling_above_ending", "is_assumed_ending", "is_valid_ending"
],
"hotwater": [
"original_description", "clean_description", "assumed", "original_description_ending",
"clean_description_ending", "assumed_ending"
],
"mainheat": [
"original_description", "clean_description", "original_description_ending",
"has_assumed", "original_description_ending", "clean_description_ending",
"has_assumed_ending",
],
"mainheatcont": [
"original_description", "clean_description", "original_description_ending", "clean_description_ending"
],
"windows": [
"original_description", "clean_description", "original_description_ending", "clean_description_ending",
# We don't need many of the glazing coverage features because we have the multi_glaze_proportion feature
"has_glazing", "glazing_coverage", "no_data", "has_glazing_ending", "glazing_coverage_ending",
"no_data_ending"
],
"main-fuel": [
"original_description", "clean_description", "original_description_ending", "clean_description_ending"
],
}
components_to_expand = cols_to_drop.keys()
for component in components_to_expand:
# TODO: change cleaned dataframe to have underscores instead of dashes
if component == "main-fuel":
cleaned_key = "main-fuel"
left_on_starting = "main_fuel_starting"
left_on_ending = "main_fuel_ending"
original_cols = ["main_fuel_starting", "main_fuel_ending"]
else:
cleaned_key = f"{component}-description"
left_on_starting = f"{component}_description_starting"
left_on_ending = f"{component}_description_ending"
original_cols = [f"{component}_description_starting", f"{component}_description_ending"]
cleaned_lookup_df_for_key = pd.DataFrame(cleaned_lookup[cleaned_key])
expanded_df = self.df.merge(
cleaned_lookup_df_for_key,
how="left",
left_on=left_on_starting,
right_on="original_description",
).merge(
cleaned_lookup_df_for_key,
how="left",
left_on=left_on_ending,
right_on="original_description",
suffixes=("", "_ending")
)
# Drop properties where key material types have changed
expanded_df = self._drop_inconsistent_properties(expanded_df, component)
# Drop original cols and cols to drop
expanded_df = expanded_df.drop(columns=cols_to_drop[component] + original_cols)
# Rename columns to component specific names, if they have not been dropped
expanded_df = expanded_df.rename(
columns={
"insulation_thickness": f"{component}_insulation_thickness",
"insulation_thickness_ending": f"{component}_insulation_thickness_ending",
"thermal_transmittance": f"{component}_thermal_transmittance",
"thermal_transmittance_ending": f"{component}_thermal_transmittance_ending",
"tariff_type": f"{component}_tariff_type",
"tariff_type_ending": f"{component}_tariff_type_ending",
"clean_description": f"{component}_clean_description",
"clean_description_ending": f"{component}_clean_description_ending",
}
)
self.df = expanded_df
# We don't need any lighting specific cleaning, we just drop the original description as we use
# LOW_ENERGY_LIGHTING_STARTING, LOW_ENERGY_LIGHTING_ENDING
self.df = self.df.drop(columns=["lighting_description_starting", "lighting_description_ending"])
def _clean_missing_values(self, ignore_cols=None):
missings = pd.isnull(self.df).sum()
missings = missings[missings > 0]
if ignore_cols:
missings = missings[~missings.index.isin(ignore_cols)]
for col in missings.index:
unique_values = self.df[col].unique()
if True in unique_values or False in unique_values:
self.df[col] = self.df[col].fillna(False)
if "none" in unique_values:
self.df[col] = self.df[col].fillna("none")
else:
self.df[col] = self.df[col].fillna("Unknown")
def _null_validation(self, information: str):
print(f"Null validation after {information}")
if pd.isnull(self.df).sum().sum():
raise ValueError(f"Null values found in dataset, after step {information}")
def _drop_features(self):
"""
Drop features that are not needed for modelling
"""
self.df = self.df.drop(columns=["lodgement_date_starting", "lodgement_date_ending"])
def _feature_generation(self):
"""
Generate features for modelling
"""
self.df["days_to_starting"] = self._calculate_days_to(self.df["lodgement_date_starting"])
self.df["day_to_ending"] = self._calculate_days_to(self.df["lodgement_date_ending"])
def _clean_efficiency_variables(self):
"""
These is scope to clean this by the model per corresponding description.
E.g. for WALLS_ENG_EFF we could look at the mode efficiency rating by description and
fill in the missing values with this.
When looking at this initially, there are a large volume of records with missing energy efficiency
values and therefore a simpler approach was taken just to test including these variables
:param df:
:return:
"""
missings = pd.isnull(self.df).sum()
missings = missings[missings >= 1]
if len(missings) == 0:
return
# Make sure they are all efficiency columns
if any(~missings.index.str.contains("energy_eff")):
raise ValueError("Non efficiency columns are missing")
for m in missings.index:
self.df[m] = self.df[m].fillna("NO_RATING")
@staticmethod
def _calculate_days_to(lodgement_date):
if isinstance(lodgement_date, str):
return (
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
).days
return (
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
).dt.days
# def __add__(self, other) -> "TrainingDataset":
# if not isinstance(other, TrainingDataset):
# raise TypeError("Addition can only be performed with another instance of TrainingDataset")
# return TrainingDataset(self.datasets + other.datasets)
# def __radd__(self, other):
# """
# Required for sum() to work
# """
# if isinstance(other, int):
# return self
# else:
# return self.__add__(other)
class NewDataset(BaseDataset):
"""
A collection of EPCDifferenceRecords can be combined into a ScoringDataset.
"""
def __init__(self, datasets: List[EPCDifferenceRecord]) -> None:
# self.pipeline_steps = self.pipeline_factory("newdata")
self.datasets = datasets
def __add__(self, other) -> "NewDataset":
if not isinstance(other, NewDataset):
raise TypeError("Addition can only be performed with another instance of ScoringDataset")
return NewDataset(self.datasets + other.datasets)
def __radd__(self, other):
"""
Required for sum() to work
"""
if isinstance(other, int):
return self
else:
return self.__add__(other)

302
etl/epc/Pipeline.py Normal file
View file

@ -0,0 +1,302 @@
import msgpack
import pandas as pd
from typing import List
from pathlib import Path
from tqdm import tqdm
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.Record import EPCRecord, EPCDifferenceRecord
from etl.epc.Dataset import TrainingDataset
from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3
from etl.epc.settings import (
MANDATORY_FIXED_FEATURES,
LATEST_FIELD,
COMPONENT_FEATURES,
RDSAP_RESPONSE,
HEAT_DEMAND_RESPONSE,
CARBON_RESPONSE,
CORE_COMPONENT_FEATURES,
EFFICIENCY_FEATURES,
POTENTIAL_COLUMNS,
)
# TODO: change in setting file
MANDATORY_FIXED_FEATURES = [x.lower() for x in MANDATORY_FIXED_FEATURES]
LATEST_FIELD = [x.lower() for x in LATEST_FIELD]
COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES]
RDSAP_RESPONSE = RDSAP_RESPONSE.lower()
HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower()
CARBON_RESPONSE = CARBON_RESPONSE.lower()
CORE_COMPONENT_FEATURES = [x.lower() for x in CORE_COMPONENT_FEATURES]
EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES]
POTENTIAL_COLUMNS = [x.lower() for x in POTENTIAL_COLUMNS]
VARIABLE_DATA_FEATURES = COMPONENT_FEATURES + EFFICIENCY_FEATURES + POTENTIAL_COLUMNS + [
"lodgement_date", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, CARBON_RESPONSE
]
def get_cleaned_description_mapping():
"""
This function will retrieve the cleaned dataset from s3 which has the cleaned
descriptions for the epc dataset
This data is stored in MessagePack format and therefore needs to be decoded
:return:
"""
cleaned = read_from_s3(
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name="retrofit-data-dev"
)
cleaned = msgpack.unpackb(cleaned, raw=False)
return cleaned
clean_lookup = get_cleaned_description_mapping()
class EPCPipeline:
"""
This class will take a list of directories and process them to create a dataset:
- Load the data
- Pre-process the data
- Create a dataset
- Clean the dataset
- Store the dataset
"""
def __init__(
self,
epc_data_processor: EPCDataProcessor,
api_epc_records: dict = None,
directories: List[Path] | None = None,
run_mode="training",
epc_local_file="certificates.csv",
epc_bucket_name="retrofit-data-dev",
epc_cleaning_dataset_key="sap_change_model/cleaning_dataset.parquet",
epc_all_equal_rows_key="sap_change_model/all_equal_rows.parquet",
epc_compiled_dataset_key="sap_change_model/dataset.parquet"
):
"""
:param directories: List of directories to process
:param epc_data_processor: EPCDataProcessor object
:param run_mode: Either training or newdata
:param epc_local_file: Local file name of the EPC data
:param epc_bucket_name: S3 bucket name
:param epc_cleaning_dataset_key: S3 key for the cleaning dataset
:param epc_all_equal_rows_key: S3 key for the all equal rows dataset
:param epc_compiled_dataset_key: S3 key for the compiled dataset
"""
self.compiled_dataset: pd.DataFrame = pd.DataFrame()
self.compiled_all_equal_rows: list = []
self.compiled_cleaning_averages: list = []
self.directories = directories
self.epc_data_processor = epc_data_processor
self.api_epc_records = api_epc_records
self.run_mode = run_mode
self.epc_local_file = epc_local_file
self.epc_bucket_name = epc_bucket_name
self.epc_cleaning_dataset_key = epc_cleaning_dataset_key
self.epc_all_equal_rows_key = epc_all_equal_rows_key
self.epc_compiled_dataset_key = epc_compiled_dataset_key
def run(self):
"""
Entrypoint to run the pipeline
"""
if self.run_mode == "training":
self.run_training_dataset_pipeline()
elif self.run_mode == "newdata":
self.run_newdata_dataset_pipeline()
else:
raise ValueError("Run mode defined needs to be in 'training' or 'newdata'")
def run_newdata_dataset_pipeline(self):
"""
Main function to run the newdata pipeline
"""
prepared_epc = EPCRecord(self.api_epc_records, run_mode="newdata") # This uses all the epc records to clean the data
self.epc_data_processor.insert_data(prepared_epc)
self.epc_data_processor.prepare_data()
data = self.epc_data_processor.data
epc_records = [EPCRecord(**x, run_mode="newdata") for x in data.to_dict(orient='records')]
def run_training_dataset_pipeline(self):
"""
Main function to run the training dataset generation pipeline
"""
if self.directories is None:
raise ValueError("Directories not specified - Unable to run Training pipeline")
for directory in tqdm(self.directories):
self.process_directory(directory)
# save_dataframe_to_s3_parquet(
# df=self.compiled_dataset,
# bucket_name=self.epc_bucket_name,
# file_key=self.epc_compiled_dataset_key,
# )
# save_dataframe_to_s3_parquet(
# df=pd.concat(self.compiled_all_equal_rows),
# bucket_name=self.epc_bucket_name,
# file_key=self.epc_all_equal_rows_key,
# )
# save_dataframe_to_s3_parquet(
# df=pd.concat(self.compiled_cleaning_averages),
# bucket_name=self.epc_bucket_name,
# file_key=self.epc_cleaning_dataset_key,
# )
def process_directory(self, directory: Path):
"""
Process a single directory
:param directory:
:return:
"""
filepath = directory / self.epc_local_file
self.epc_data_processor.prepare_data(filepath=filepath)
constituency_data = self.epc_data_processor.data
self.compiled_cleaning_averages.append(self.epc_data_processor.cleaning_averages)
constituency_difference_records = []
for uprn, property_data in constituency_data.groupby("uprn", observed=True):
difference_records = self.process_uprn(uprn=str(uprn), property_data=property_data, directory=directory)
if difference_records is not None:
constituency_difference_records.extend(difference_records)
constituency_dataset = TrainingDataset(datasets=constituency_difference_records, cleaned_lookup=clean_lookup)
self.compiled_dataset = pd.concat([self.compiled_dataset, constituency_dataset.df])
def process_uprn(self, uprn: str, property_data: pd.DataFrame, directory: Path):
"""
Process a single UPRN, which may have multiple different EPCs
:param uprn: UPRN
:param property_data: pd.DataFrame, Data for a single UPRN
:param directory: Path, Directory of the UPRN
:return:
"""
# If a property has changed building type, we can ignore the epc rating i.e. this should be 1 unique row
if any(property_data[MANDATORY_FIXED_FEATURES].nunique() > 1) or (
pd.isnull(property_data[MANDATORY_FIXED_FEATURES]).sum().sum() > 0
):
return None
# Fixed features - these are property attributes that shouldn't change over time
# Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS and combine all fields together
fixed_data = property_data[MANDATORY_FIXED_FEATURES + LATEST_FIELD].iloc[-1].to_dict()
# We include the lodgement date here as we probably need to factor time into the
# model, since EPC standards and rigour have changed over time
variable_data = property_data[VARIABLE_DATA_FEATURES]
uprn = str(uprn)
epc_records = [EPCRecord(uprn, **x, run_mode="training") for x in variable_data.to_dict(orient='records')]
# TODO: We want to be able to provide value for the u values in the main pipeline so this will need to be part of the EPCRecord
# We can use multiple types of comparison datasets - i.e. Compare consecutive records, or compare all permutations of records
property_difference_records = self._generate_property_difference_records(epc_records, uprn, directory, fixed_data)
return property_difference_records
def _generate_property_difference_records(self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict):
"""
We can use multiple types of comparison datasets, for example:
- First vs second
- Second vs third
- First vs third
:param epc_records:
:return:
"""
property_difference_records: list = []
property_difference_records = self._compare_consecutive_epcs(epc_records, uprn, directory, fixed_data, property_difference_records)
# property_difference_records = self._compare_all_permutation_epcs(epc_records, uprn, directory, fixed_data, property_difference_records)
return property_difference_records
def _compare_all_permutation_epcs(self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, property_difference_records: list):
"""
Compare all permutations of EPCs for a given UPRN
:param epc_records:
:return:
"""
for idx in range(0, len(epc_records) - 1):
for idx2 in range(idx + 1, len(epc_records)):
earliest_record: EPCRecord = epc_records[idx]
latest_record: EPCRecord = epc_records[idx2]
# Auto sort the records so that the record with highest RDSAP score is always record1
difference_record: EPCDifferenceRecord = latest_record - earliest_record
# TODO: Pull out RDSAP_CHANGE to a variable
if difference_record.get("rdsap_change") == 0:
continue
all_equal = difference_record.compare_fields_in_records(
fields=[x.lower() for x in CORE_COMPONENT_FEATURES]
)
if all_equal:
# Keep track of this for the moment so we can analyse
self.compiled_all_equal_rows.append({"uprn": uprn, "directory_name": directory.name})
continue
difference_record.append_fixed_data(fixed_data)
property_difference_records.append(difference_record)
return property_difference_records
def _compare_consecutive_epcs(self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, property_difference_records: list):
"""
Compare consecutive EPCs for a given UPRN
:param epc_records:
:return:
"""
for idx in range(0, len(epc_records) - 1):
if idx >= len(epc_records) - 1:
break
earliest_record: EPCRecord = epc_records[idx]
latest_record: EPCRecord = epc_records[idx + 1]
# Auto sort the records so that the record with highest RDSAP score is always record1
difference_record: EPCDifferenceRecord = latest_record - earliest_record
# TODO: Pull out RDSAP_CHANGE to a variable
if difference_record.get("rdsap_change") == 0:
continue
all_equal = difference_record.compare_fields_in_records(
fields=[x.lower() for x in CORE_COMPONENT_FEATURES]
)
if all_equal:
# Keep track of this for the moment so we can analyse
self.compiled_all_equal_rows.append({"uprn": uprn, "directory_name": directory.name})
continue
difference_record.append_fixed_data(fixed_data)
property_difference_records.append(difference_record)
return property_difference_records

850
etl/epc/Record.py Normal file
View file

@ -0,0 +1,850 @@
from datetime import datetime
from dataclasses import dataclass
from etl.epc.ValidationConfiguration import (
EPCRecordValidationConfiguration,
EPCDifferenceRecordValidationConfiguration,
EPCDifferenceRecordFixedDataValidationConfiguration
)
from etl.epc.DataProcessor import EPCDataProcessor
from recommendations.rdsap_tables import england_wales_age_band_lookup, FLOOR_LEVEL_MAP
from etl.epc.settings import DATA_ANOMALY_MATCHES, BUILT_FORM_REMAP
import re
import os
import numpy as np
import pandas as pd
from typing import Any, Union, List
from etl.epc.settings import (
RDSAP_RESPONSE,
HEAT_DEMAND_RESPONSE,
CARBON_RESPONSE,
COMPONENT_FEATURES,
EFFICIENCY_FEATURES
)
from utils.s3 import read_dataframe_from_s3_parquet
from etl.epc.settings import EARLIEST_EPC_DATE
# TODO: Change these in the settings file
RDSAP_RESPONSE = RDSAP_RESPONSE.lower()
HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower()
CARBON_RESPONSE = CARBON_RESPONSE.lower()
COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES]
EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES]
ENVIRONMENT = os.environ.get('ENVIRONMENT', 'dev')
DATA_BUCKET = os.environ.get('DATA_BUCKET', 'retrofit-data-dev' if ENVIRONMENT == 'dev' else None)
@dataclass
class EPCRecord:
"""
Base class for a EPC record
"""
uprn: int = None
walls_description: str = None
floor_description : str = None
lighting_description : str = None
roof_description : str = None
mainheat_description : str = None
hotwater_description : str = None
main_fuel : str = None
mechanical_ventilation : str = None
secondheat_description : str = None
windows_description : str = None
glazed_type : str = None
multi_glaze_proportion : float = None
low_energy_lighting : float = None
number_open_fireplaces : float = None
mainheatcont_description : str = None
solar_water_heating_flag : str = None
photo_supply : float = None
transaction_type : str = None
energy_tariff : str = None
extension_count : float = None
total_floor_area : float = None
floor_height : float = None
hot_water_energy_eff : str = None
floor_energy_eff : str = None
windows_energy_eff : str = None
walls_energy_eff : str = None
sheating_energy_eff : str = None
roof_energy_eff : str = None
mainheat_energy_eff : str = None
mainheatc_energy_eff : str = None
lighting_energy_eff : str = None
potential_energy_efficiency : float = None
environment_impact_potential : float = None
energy_consumption_potential : float = None
co2_emissions_potential : float = None
lodgement_date : str = None
current_energy_efficiency : int = None
energy_consumption_current : int = None
co2_emissions_current : float = None
# u_values_walls = None
# u_values_roof = None
# u_values_floor = None
run_mode: str = "training"
# TODO: Make this a class so thet api_records is structured
epc_records: dict = None
full_sap_epc: dict = None
old_data: list[dict] = None
original_epc: dict = None
prepared_epc: dict = None
prepared_epc_delta_metadata: pd.DataFrame = None
cleaning_data: pd.DataFrame = None
# Not used in training mod but used in newdata mode
age_band: str = None
construction_age_band: str = None
year_built: int = None
number_of_floors: int = None
number_of_open_fireplaces: int = None
def __post_init__(self):
# We can have validation and cleaning steps for each of the fields
# self.WALLS_DESCRIPTION = 'check'
# Could also have cleaning of records if needed
if self.run_mode == "training":
self.validation_configuration = EPCRecordValidationConfiguration
# self._field_validation()
return
# We are running in newdata mode
if self.epc_records is None:
raise ValueError("Must provide epc records if running in newdata mode")
self.prepared_epc = self.epc_records['original_epc']
self.original_epc = self.epc_records['original_epc'].copy()
self.full_sap_epc = self.epc_records['full_sap_epc']
self.old_data = self.epc_records['old_data']
if self.cleaning_data is None:
raise ValueError("Must provide cleaning data if running in newdata mode")
self._clean_records_using_epc_records()
self._clean_with_data_processor()
self._temp_uprn_catch()
self._expand_prepared_epc_to_attributes()
self._identify_delta_between_prepared_and_original_records()
# Process to create uvalues for the single epc record
# selff.df = self.epc_record_as_dataframe('prepared_epc')
# self._feature_generation()
# self._drop_features()
return
self._expand_description_to_features()
self._expand_description_to_uvalues()
# self._generate_uvalues()
# self._validate_expanded_description()
# self._validate_u_values()
# etc
pass
def _drop_features(self):
"""
Drop features that are not needed for modelling
"""
self.df = self.df.drop(columns=["lodgement_date_starting", "lodgement_date_ending"])
def _feature_generation(self):
"""
Generate features for modelling
"""
self.df["days_to_lodgement_date"] = self._calculate_days_to(self.prepared_epc["lodgement_date"])
@staticmethod
def _calculate_days_to(lodgement_date):
if isinstance(lodgement_date, str):
return (
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
).days
return (
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
).dt.days
def _temp_uprn_catch(self):
"""
Catch the case we do now have uprn
"""
if self.prepared_epc["uprn"] == "":
self.prepared_epc["uprn"] = 0
def _clean_with_data_processor(self):
"""
This method will clean the records using the data processor
"""
epc_data_processor = EPCDataProcessor(
data=self.epc_record_as_dataframe("prepared_epc"),
run_mode="newdata",
cleaning_averages=self.cleaning_data
)
epc_data_processor.prepare_data()
self.prepared_epc = epc_data_processor.data.to_dict(orient="records")[0]
def _expand_prepared_epc_to_attributes(self):
"""
This method will expand the prepared epc to attributes
"""
# for key, value in self.prepared_epc.items():
# setattr(self, key, value)
self.uprn: int = int(self.prepared_epc["uprn"])
self.walls_description: str = self.prepared_epc["walls_description"]
self.floor_description : str = self.prepared_epc["floor_description"]
self.lighting_description : str = self.prepared_epc["lighting_description"]
self.roof_description : str = self.prepared_epc["roof_description"]
self.mainheat_description : str = self.prepared_epc["mainheat_description"]
self.hotwater_description : str = self.prepared_epc["hotwater_description"]
self.main_fuel : str = self.prepared_epc["main_fuel"]
self.mechanical_ventilation : str = self.prepared_epc["mechanical_ventilation"]
self.secondheat_description : str = self.prepared_epc["secondheat_description"]
self.windows_description : str = self.prepared_epc["windows_description"]
self.glazed_type : str = self.prepared_epc["glazed_type"]
self.multi_glaze_proportion : float = float(self.prepared_epc["multi_glaze_proportion"])
self.low_energy_lighting : float = float(self.prepared_epc["low_energy_lighting"])
self.number_open_fireplaces : float = float(self.prepared_epc["number_open_fireplaces"])
self.mainheatcont_description : str = self.prepared_epc["mainheatcont_description"]
self.solar_water_heating_flag : str = self.prepared_epc["solar_water_heating_flag"]
self.photo_supply : float = float(self.prepared_epc["photo_supply"])
self.transaction_type : str = self.prepared_epc["transaction_type"]
self.energy_tariff : str = self.prepared_epc["energy_tariff"]
self.extension_count : float = float(self.prepared_epc["extension_count"])
self.total_floor_area : float = float(self.prepared_epc["total_floor_area"])
self.floor_height : float = float(self.prepared_epc["floor_height"])
self.hot_water_energy_eff : str = self.prepared_epc["hot_water_energy_eff"]
self.floor_energy_eff : str = self.prepared_epc["floor_energy_eff"]
self.windows_energy_eff : str = self.prepared_epc["windows_energy_eff"]
self.walls_energy_eff : str = self.prepared_epc["walls_energy_eff"]
self.sheating_energy_eff : str = self.prepared_epc["sheating_energy_eff"]
self.roof_energy_eff : str = self.prepared_epc["roof_energy_eff"]
self.mainheat_energy_eff : str = self.prepared_epc["mainheat_energy_eff"]
self.mainheatc_energy_eff : str = self.prepared_epc["mainheatc_energy_eff"]
self.lighting_energy_eff : str = self.prepared_epc["lighting_energy_eff"]
self.potential_energy_efficiency : float = float(self.prepared_epc["potential_energy_efficiency"])
self.environment_impact_potential : float = float(self.prepared_epc["environment_impact_potential"])
self.energy_consumption_potential : float = float(self.prepared_epc["energy_consumption_potential"])
self.co2_emissions_potential : float = float(self.prepared_epc["co2_emissions_potential"])
self.lodgement_date : str = self.prepared_epc["lodgement_date"]
self.current_energy_efficiency : int = int(self.prepared_epc["current_energy_efficiency"])
self.energy_consumption_current : int = int(self.prepared_epc["energy_consumption_current"])
self.co2_emissions_current : float = float(self.prepared_epc["co2_emissions_current"])
def _identify_delta_between_prepared_and_original_records(self):
"""
This method will identify the delta between the prepared and original records
"""
prepared_epc_df = self.epc_record_as_dataframe("prepared_epc")
original_epc_df = self.epc_record_as_dataframe("original_epc")
df = pd.concat([prepared_epc_df, original_epc_df], keys=["prepared_epc", "original_epc"], axis=0)
same_index = df.apply(pd.Series.duplicated).any()
self.prepared_epc_delta_metadata = df[same_index[~same_index].index]
def _expand_description_to_features(self):
pass
def _expand_description_to_uvalues(self):
# TODO: can be loop over all the descriptions, or done in one
pass
# def _process_and_prune(self, cleaned_lookup: dict):
# """
# This method will merge on the cleaned lookup table and ensure that the building fabric in the
# starting and ending EPC is consistent, so ensure that we are performing our modelling on the cleanest
# possible dataset.
# """
# for component in ["walls", "floor", "roof", "hotwater", "mainheat", "mainheatcont", "windows", "main-fuel"]:
# if component == "main-fuel":
# component = component.replace("-", "_")
# cleaned_key = "main-fuel" if component == "main-fuel" else f"{component}-description"
# left_on_starting = (
# f"{component}_starting" if component == "main-fuel" else f"{component}_description_starting"
# )
# left_on_ending = (
# f"{component}_ending" if component == "main-fuel" else f"{component}_description_ending"
# )
# self.df2 = self.df.merge(
# pd.DataFrame(cleaned_lookup[cleaned_key]),
# how="left",
# left_on=left_on_starting,
# right_on="original_description",
# ).merge(
# pd.DataFrame(cleaned_lookup[cleaned_key]),
# how="left",
# left_on=left_on_ending,
# right_on="original_description",
# suffixes=("", "_ending")
# )
def _clean_records_using_epc_records(self):
"""
This method will clean the records
"""
# TODO: Move all the cleaning steps in the Property class into there
self._clean_built_form()
self._clean_energy()
self._clean_ventilation()
self._clean_solar_pv()
self._clean_solar_hot_water()
self._clean_wind_turbine()
self._clean_count_variables()
self._clean_heat_loss_corridor()
self._clean_mains_gas()
self._clean_age_band()
self._clean_year_built()
self._clean_floor_area()
self._clean_property_dimensions()
self._clean_number_lighting_outlets()
self._clean_floor_level()
# self._clean_potential_energy_efficiency()
# self._clean_environment_impact_potential()
# self._clean_energy_consumption_potential()
# self._clean_co2_emissions_potential()
# self._clean_current_energy_efficiency()
# self._clean_energy_consumption_current()
# self._clean_co2_emissions_current()
def epc_record_as_dataframe(self, epc_type: str = "prepared_epc", use_upper_columns: bool = True, replace_empty_string: bool = False):
"""
This method will return the dataframe representation of the epc record
"""
df = pd.DataFrame.from_dict(self.get(epc_type), orient="index").T
if use_upper_columns:
df.columns = [x.upper().replace("-","_") for x in df.columns]
if replace_empty_string:
df = df.replace("", np.nan)
return df
def _clean_floor_level(self):
"""
This method will clean the floor level, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["floor-level"] = (
FLOOR_LEVEL_MAP[self.prepared_epc["floor-level"]] if
self.prepared_epc["floor-level"] not in DATA_ANOMALY_MATCHES else None
)
def _clean_number_lighting_outlets(self):
"""
This method will clean the number of lighting outlets, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
if self.prepared_epc["fixed-lighting-outlets-count"] == "":
# We check old EPCs and the full SAP EPC
lighting_data = []
if len(self.old_data):
lighting_data.extend([
int(old_record["fixed-lighting-outlets-count"]) for old_record in self.old_data if
old_record["fixed-lighting-outlets-count"] != ""
])
if len(self.full_sap_epc):
if self.full_sap_epc["fixed-lighting-outlets-count"] != "":
lighting_data.append(int(self.full_sap_epc["fixed-lighting-outlets-count"]))
if lighting_data:
self.prepared_epc["fixed-lighting-outlets-count"] = round(np.median(lighting_data))
else:
# Use averages from the cleaning dataset, based on the property type, built form, construction age band and local authority
cleaned_property_data = EPCDataProcessor.apply_averages_cleaning(
data_to_clean=self.epc_record_as_dataframe("prepared_epc", replace_empty_string=True),
cleaning_data=self.cleaning_data,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
)
self.prepared_epc["fixed-lighting-outlets-count"] = round(cleaned_property_data["FIXED_LIGHTING_OUTLETS_COUNT"].values[0])
else:
self.prepared_epc["fixed-lighting-outlets-count"] = float(self.prepared_epc["fixed-lighting-outlets-count"])
def _filter_property_dimensions(self, property_dimensions):
"""
Will filter the property dimensions dataframe to only include the relevant rows for the property
:param property_dimensions:
:return: filtered property dimensions dataframe
"""
result = property_dimensions[(property_dimensions["PROPERTY_TYPE"] == self.prepared_epc["property-type"])]
if self.construction_age_band is not None and self.construction_age_band not in DATA_ANOMALY_MATCHES:
result = result[(result["CONSTRUCTION_AGE_BAND"] == self.construction_age_band)]
if self.prepared_epc["built-form"] not in DATA_ANOMALY_MATCHES and self.prepared_epc["built-form"] in result["BUILT_FORM"]:
result = result[(result["BUILT_FORM"] == self.prepared_epc["built-form"])]
return result[["NUMBER_HABITABLE_ROOMS", "TOTAL_FLOOR_AREA", "FLOOR_HEIGHT"]].mean()
def _clean_property_dimensions(self):
"""
Cleans up the number of floors, number of habitable rooms, and the floor height
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
if not self.prepared_epc["number-habitable-rooms"] or (
self.prepared_epc["floor-height"] == "" or self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES
):
property_dimensions = read_dataframe_from_s3_parquet(
bucket_name=DATA_BUCKET, file_key=f"property_dimensions/{self.prepared_epc['local-authority']}.parquet"
)
self.property_dimensions = self._filter_property_dimensions(property_dimensions)
if not self.prepared_epc["number-habitable-rooms"]:
self.prepared_epc["number-habitable-rooms"] = float(self.property_dimensions["NUMBER_HABITABLE_ROOMS"].round())
else:
self.prepared_epc["number-habitable-rooms"] = float(self.prepared_epc["number-habitable-rooms"])
if self.prepared_epc["property-type"] == "House":
self.number_of_floors = 2
elif self.prepared_epc["property-type"] in ["Flat", "Bungalow"]:
self.number_of_floors = 1
elif self.prepared_epc["property-type"] == "Maisonette":
self.number_of_floors = 2
else:
raise NotImplementedError("Implement me")
if self.prepared_epc["floor-height"] == "" or self.prepared_epc["floor-height"] in DATA_ANOMALY_MATCHES:
self.prepared_epc["floor-height"] = float(self.property_dimensions["FLOOR_HEIGHT"].round(2))
else:
self.prepared_epc["floor-height"] = float(self.prepared_epc["floor-height"])
def _clean_floor_area(self):
"""
This method will clean the floor area, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc["total-floor-area"] = float(self.prepared_epc["total-floor-area"])
def _clean_mains_gas(self):
"""
This method will clean the mains gas, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
map = {
"Y": True,
"N": False,
}
self.prepared_epc["mains-gas-flag"] = None if (
self.prepared_epc["mains-gas-flag"] == "" or self.prepared_epc["mains-gas-flag"] in DATA_ANOMALY_MATCHES
) else map[self.prepared_epc["mains-gas-flag"]]
def _clean_heat_loss_corridor(self):
"""
This method will clean the heat loss corridor, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
map = {
"no corridor": False,
"unheated corridor": True,
"heated corridor": False
}
self.prepared_epc["heat-loss-corridor"] = False if self.prepared_epc["heat-loss-corridor"] in DATA_ANOMALY_MATCHES else map[self.prepared_epc["heat-loss-corridor"]]
self.prepared_epc["unheated-corridor-length"] = float(self.prepared_epc["unheated-corridor-length"]) if self.prepared_epc["unheated-corridor-length"] != "" else None
def _clean_count_variables(self):
"""
This method will clean the count variables, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
fields = {
"number_of_open_fireplaces": "number-open-fireplaces",
"number_of_extensions": "extension-count",
"number_of_storeys": "flat-storey-count",
"number_of_rooms": "number-habitable-rooms",
}
null_attributes = ["number_of_storeys", "number_of_rooms"]
for attribute, epc_field in fields.items():
# TODO: check this
# value = self.data["extension-count"]
value = self.prepared_epc[epc_field]
if value == "" or value in DATA_ANOMALY_MATCHES:
if attribute in null_attributes:
value = None
else:
value = 0
else:
value = int(value)
self.prepared_epc[attribute] = value
def _clean_wind_turbine(self):
"""
This method will clean the wind turbine, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc['wind-turbine-count'] = int(self.prepared_epc['wind-turbine-count']) if self.prepared_epc['wind-turbine-count'] != "" else None
def _clean_solar_hot_water(self):
"""
This method will clean the solar hot water, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
value_map = {
"Y": True,
"N": False,
"": None,
}
self.prepared_epc['solar-water-heating-flag'] = value_map[self.prepared_epc['solar-water-heating-flag']]
def _clean_solar_pv(self):
"""
This method will clean the solar pv, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc['photo-supply'] = float(self.prepared_epc['photo-supply']) if self.prepared_epc['photo-supply'] != "" else None
def _clean_energy(self):
"""
This method will clean the energy, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc['energy-consumption-current'] = float(self.prepared_epc["energy-consumption-current"])
self.prepared_epc['co2-emissions-current'] = float(self.prepared_epc["co2-emissions-current"])
def _clean_built_form(self):
"""
This method will clean the build form, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.prepared_epc['built-form'] = BUILT_FORM_REMAP.get(self.prepared_epc["built-form"], self.prepared_epc["built-form"])
if self.prepared_epc["built-form"] in DATA_ANOMALY_MATCHES:
if self.prepared_epc["property-type"] == "Flat":
self.prepared_epc["built-form"] = "Semi-Detached"
def _clean_age_band(self):
"""
This method will clean the age band, if empty or invalid
"""
if not self.prepared_epc:
raise ValueError("EPC Recrod doesn not contain epc data")
self.construction_age_band = EPCDataProcessor.clean_construction_age_band(self.prepared_epc["construction-age-band"])
if self.construction_age_band in DATA_ANOMALY_MATCHES:
if self.old_data:
# Take the most recent
max_datetime = max(
[old_record["lodgement-datetime"] for old_record in self.old_data if
old_record["construction-age-band"] not in DATA_ANOMALY_MATCHES]
)
most_recent = [old_record for old_record in self.old_data if old_record["lodgement-datetime"] == max_datetime]
self.construction_age_band = EPCDataProcessor.clean_construction_age_band(
most_recent[0]["construction-age-band"]
)
self.age_band = england_wales_age_band_lookup.get(self.construction_age_band)
if (self.prepared_epc["transaction-type"] == "new dwelling") and (self.age_band is None):
self.age_band = "L"
self.construction_age_band = 'England and Wales: 2012 onwards'
if self.age_band is None:
raise ValueError("age_band is missing")
def _clean_year_built(self):
"""
This method will clean the year built, if empty or invalid
"""
if self.full_sap_epc:
self.year_built = datetime.strptime(self.full_sap_epc["lodgement-date"], '%Y-%m-%d').year
return
if self.construction_age_band not in DATA_ANOMALY_MATCHES:
# Take the lower limit. If we're pessimistic about the age of the property, that at least means we have
# more options for recommendations if that age falls before the year that insulation in walls became
# common practice
band = [int(x) for x in re.findall(r'\b\d{4}\b', self.prepared_epc["construction-age-band"])]
self.year_built = band[0]
return
# We don't know when the property was built
self.year_built = None
def _clean_ventilation(self):
"""
This method will clean the ventilation, if empty or invalid
"""
self.prepared_epc['mechanical-ventilation'] = None if (self.mechanical_ventilation == "" or self.mechanical_ventilation in DATA_ANOMALY_MATCHES) else self.mechanical_ventilation
def _field_validation(self):
"""
This method will validate each of the fields in the EPC record
"""
for record_key, validation_config in self.validation_configuration.items():
# Get the variable named record key from self
field_value = self.__dict__[record_key]
if validation_config['type'] == "string":
self._validate_string(record_key, field_value, validation_config)
elif validation_config['type'] == "float":
self._validate_float(record_key, field_value, validation_config)
else:
raise ValueError(f"Validation type {validation_config['type']} not supported")
def _validate_string(self, record_key: str, field_value: Union[str, float], validation_config: dict):
"""
Validate a string field
"""
if not isinstance(field_value, str):
raise ValueError(f"Field {record_key} has value {field_value} which is not a string")
if 'function' in validation_config:
try:
validation_config['function'](field_value)
except:
raise ValueError(f"Field {record_key} has value {field_value} which does not pass the validation function {validation_config['function']}")
if validation_config['acceptable_values'] is not None:
if field_value not in validation_config['acceptable_values']:
raise ValueError(f"Field {record_key} has value {field_value} which is not in the acceptable values of {validation_config['acceptable_values']}")
def _validate_float(self, record_key: str, field_value: Union[str, float], validation_config: dict):
"""
Validate a float field
"""
if not isinstance(field_value, float):
raise ValueError(f"Field {record_key} has value {field_value} which is not a float")
if 'function' in validation_config:
try:
validation_config['function'](field_value)
except:
raise ValueError(f"Field {record_key} has value {field_value} which does not pass the validation function {validation_config['function']}")
if validation_config['range'] is not None:
if field_value < validation_config['range'][0] or field_value > validation_config['range'][1]:
raise ValueError(f"Field {record_key} has value {field_value} which is not in the acceptable range of {validation_config['range']}")
def __sub__(self, other):
"""
This method will return the difference between two EPC records
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only subtract EPCRecord from EPCRecord")
difference_record = EPCDifferenceRecord(record1=self, record2=other, auto_sort=True)
return difference_record
def __gt__(self, other):
"""
This method will return True if the EPC record is greater than or equal to the other
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only compare EPCRecord to EPCRecord")
return self.__dict__[RDSAP_RESPONSE] > other.__dict__[RDSAP_RESPONSE]
def __ge__(self, other):
"""
This method will return True if the EPC record is greater than or equal to the other
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only compare EPCRecord to EPCRecord")
return self.__dict__[RDSAP_RESPONSE] >= other.__dict__[RDSAP_RESPONSE]
def __lt__(self, other):
"""
This method will return True if the EPC record is greater than or equal to the other
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only compare EPCRecord to EPCRecord")
return self.__dict__[RDSAP_RESPONSE] < other.__dict__[RDSAP_RESPONSE]
def __le__(self, other):
"""
This method will return True if the EPC record is greater than or equal to the other
"""
if not isinstance(other, EPCRecord):
raise ValueError("Can only compare EPCRecord to EPCRecord")
return self.__dict__[RDSAP_RESPONSE] <= other.__dict__[RDSAP_RESPONSE]
def get(self, key: Union[str, List[str]], return_asdict: bool = False, key_suffix: str | None = None) -> Any:
"""
This method will return the value of the key
"""
if return_asdict:
output_dict = {x: self.__dict__[x] if x in self.__dict__.keys() else None for x in key}
if key_suffix is not None:
output_dict = {f"{x}{key_suffix}": y for x, y in output_dict.items()}
return output_dict
if isinstance(key, list):
return [self.__dict__[x] if x in self.__dict__.keys() else None for x in key]
elif isinstance(key, str):
return self.__dict__[key] if key in self.__dict__.keys() else None
class EPCDifferenceRecord:
"""
Base class for the difference between two EPC records
"""
def __init__(self, record1: EPCRecord, record2: EPCRecord, auto_sort: bool = False):
"""
This method will initialise the EPCDifferenceRecord
Defaults usage is with record2 to have the higher RDSAP score
"""
self.record1 = record1
self.record2 = record2
self.earliest_record = record1 if record1.lodgement_date < record2.lodgement_date else record2
self.flag_fabric_consistency = False
self.difference_record = {}
self.difference_validation_configuration = EPCDifferenceRecordValidationConfiguration
self.fixed_data_validation_configuration = EPCDifferenceRecordFixedDataValidationConfiguration
if auto_sort and (self.record2 <= self.record1):
self.record1, self.record2 = self.record2, self.record1
self._construct_difference_record()
self._validate_difference_record()
# self._detect_fabric_consistency()
def _construct_difference_record(self):
"""
This method will construct the difference record between the two records
"""
rdsap_change = self.record2.get(RDSAP_RESPONSE) - self.record1.get(RDSAP_RESPONSE)
heat_demand_change = self.record2.get(HEAT_DEMAND_RESPONSE) - self.record1.get(HEAT_DEMAND_RESPONSE)
carbon_change = self.record2.get(CARBON_RESPONSE) - self.record1.get(CARBON_RESPONSE)
component_variables = COMPONENT_FEATURES + EFFICIENCY_FEATURES
ending_record = self.record2.get(component_variables + ["lodgement_date"], return_asdict=True, key_suffix="_ending")
starting_record = self.record1.get(component_variables + ["lodgement_date"], return_asdict=True, key_suffix="_starting")
self.difference_record = {
"uprn": self.record1.get("uprn"),
"rdsap_change": rdsap_change,
"heat_demand_change": heat_demand_change,
"carbon_change": carbon_change,
"sap_starting": self.record1.get(RDSAP_RESPONSE),
"sap_ending": self.record2.get(RDSAP_RESPONSE),
"heat_demand_starting": self.record1.get(HEAT_DEMAND_RESPONSE),
"heat_demand_ending": self.record2.get(HEAT_DEMAND_RESPONSE),
"carbon_starting": self.record1.get(CARBON_RESPONSE),
"carbon_ending": self.record2.get(CARBON_RESPONSE),
"potential_energy_efficiency": self.earliest_record.get("potential_energy_efficiency"),
"environment_impact_potential": self.earliest_record.get("environment_impact_potential"),
"energy_consumption_potential": self.earliest_record.get("energy_consumption_potential"),
"co2_emissions_potential": self.earliest_record.get("co2_emissions_potential"),
**ending_record,
**starting_record
}
def _validate_difference_record(self):
"""
This method will validate the difference record
"""
# for key, value in self.difference_record.items():
# if key == "LODGEMENT_DATE":
# continue
# if isinstance(value, str):
# continue
# if value < 0:
# raise ValueError(f"Difference record has negative value for {key}")
pass
def compare_fields_in_records(self, fields: List[str]):
"""
This method will compare the records, for specific fields
"""
all_equal = True
for field in fields:
if self.record1.get(field) != self.record2.get(field):
return False
if all_equal:
return True
def get(self, key: str):
"""
This method will return the value of the key
"""
return self.difference_record[key] if key in self.difference_record.keys() else None
def append_fixed_data(self, fixed_data: dict):
"""
This method will append fixed data to the difference record
"""
self._validate_fixed_data(fixed_data)
self.difference_record.update(fixed_data)
def _validate_fixed_data(self, fixed_data: dict):
"""
This method will validate the fixed data
"""
# Can have more sophisticated checks here
# self.fixed_data_validataion_configuration
pass

View file

@ -0,0 +1,61 @@
"""
Specify the validation rules for each field in the differents record.
"""
def validate_walls_description(value):
if value not in ["Cavity", "Solid", "System built", "Timber frame", "Suspended timber", "Other"]:
raise ValueError("Walls description is not valid")
EPCRecordValidationConfiguration = {
"WALLS_DESCRIPTION": {
"type": "string",
"acceptable_values": ["Cavity", "Solid", "System built", "Timber frame", "Suspended timber", "Other"],
"function": validate_walls_description
},
"FLOOR_DESCRIPTION": {
"type": "string",
"acceptable_values": ["Solid", "Suspended", "Other"]
},
"ENERGY_CONSUMPTION_CURRENT": {
"type": "float",
"range": [0, 100]
}
}
EPCDifferenceRecordValidationConfiguration = {
}
EPCDifferenceRecordFixedDataValidationConfiguration = {
"PROPERTY_TYPE": {
"type": "string",
"acceptable_values": ["House", "Flat", "Bungalow", "Maisonette", "Park home", "Other"]
},
"BUILT_FORM": {
"type": "string",
"acceptable_values": ["Detached", "Semi-Detached", "End-Terrace", "Mid-Terrace", "Enclosed Mid-Terrace", "Enclosed End-Terrace", "Enclosed Detached", "Not applicable"]
},
"CONSITUENCY": {
"type": "string",
"acceptable_values": ["England", "Wales", "Scotland", "Northern Ireland"]
},
"NUMBER_HABITABLE_ROOMS": {
"type": "integer",
"range": [0, 100]
},
"NUMBER_HEATED_ROOMS": {
"type": "integer",
"range": [0, 100]
},
"FIXED_LIGHTING_OUTLETS_COUNT": {
"type": "integer",
"range": [0, 100]
},
"CONSTRUCTION_AGE_BAND": {
"type": "string",
"acceptable_values": []
}
}
DatasetValidationConfiguration = {
}

View file

@ -1,635 +1,27 @@
import pandas as pd
import numpy as np
from tqdm import tqdm
import msgpack
from pathlib import Path
from etl.epc.settings import (
MANDATORY_FIXED_FEATURES,
LATEST_FIELD,
COMPONENT_FEATURES,
RDSAP_RESPONSE,
HEAT_DEMAND_RESPONSE,
COLUMNS_TO_MERGE_ON,
CARBON_RESPONSE,
CORE_COMPONENT_FEATURES,
EFFICIENCY_FEATURES,
POTENTIAL_COLUMNS,
MINIMUM_FLOOR_HEIGHT
)
from etl.epc.DataProcessor import DataProcessor
from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3
from recommendations.rdsap_tables import england_wales_age_band_lookup
from recommendations.recommendation_utils import (
get_wall_u_value, get_roof_u_value, get_floor_u_value, estimate_perimeter,
get_wall_type
)
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.Pipeline import EPCPipeline
DATA_DIRECTORY = Path(__file__).parent / "local_data" / "all-domestic-certificates"
def get_cleaned():
def main():
"""
This function will retrieve the cleaned dataset from s3 which has the cleaned
descriptions for the epc dataset
This data is stored in MessagePack format and therefore needs to be decoded
:return:
Orchestration function
"""
cleaned = read_from_s3(
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name="retrofit-data-dev"
)
cleaned = msgpack.unpackb(cleaned, raw=False)
return cleaned
def process_and_prune_desriptions(df, cleaned_lookup):
"""
This method will merge on the cleaned lookup table and ensure that the building fabric in the
starting and ending EPC is consistent, so ensure that we are performing our modelling on the cleanest
possible dataset.
:param df:
:param cleaned_lookup:
:return:
"""
cols_to_drop = {
"walls": [
# We need to cleaned descriptions for pulling out u-values
'original_description', 'thermal_transmittance_unit',
'original_description_ENDING',
'thermal_transmittance_unit_ENDING',
'is_cavity_wall_ENDING', 'is_filled_cavity_ENDING',
'is_solid_brick_ENDING', 'is_system_built_ENDING',
'is_timber_frame_ENDING', 'is_granite_or_whinstone_ENDING',
'is_as_built_ENDING', 'is_cob_ENDING', 'is_assumed_ENDING',
'is_sandstone_or_limestone_ENDING',
# Re remove the is_assumed columns
"is_assumed", "is_assumed_ENDING"
],
"floor": [
"original_description", "clean_description", "thermal_transmittance_unit",
"no_data", "no_data_ENDING", "original_description_ENDING",
"clean_description_ENDING", "thermal_transmittance_unit_ENDING",
"is_suspended_ENDING", "is_solid_ENDING", "another_property_below_ENDING",
"is_to_unheated_space_ENDING", "is_to_external_air_ENDING", "is_assumed",
"is_assumed_ENDING"
],
"roof": [
"original_description", "clean_description", "thermal_transmittance_unit",
"is_assumed", "is_valid", "original_description_ENDING", "clean_description_ENDING",
"thermal_transmittance_unit_ENDING", "is_pitched_ENDING", "is_roof_room_ENDING",
"is_loft_ENDING", "is_flat_ENDING", "is_thatched_ENDING", "is_at_rafters_ENDING",
"has_dwelling_above_ENDING", "is_assumed_ENDING", "is_valid_ENDING"
],
"hotwater": [
"original_description", "clean_description", "assumed", "original_description_ENDING",
"clean_description_ENDING", "assumed_ENDING"
],
"mainheat": [
"original_description", "clean_description", "original_description_ENDING",
"has_assumed", "original_description_ENDING", "clean_description_ENDING",
"has_assumed_ENDING",
],
"mainheatcont": [
"original_description", "clean_description", "original_description_ENDING", "clean_description_ENDING"
],
"windows": [
"original_description", "clean_description", "original_description_ENDING", "clean_description_ENDING",
# We don't need many of the glazing coverage features because we have the multi_glaze_proportion feature
"has_glazing", "glazing_coverage", "no_data", "has_glazing_ENDING", "glazing_coverage_ENDING",
"no_data_ENDING"
],
"main-fuel": [
"original_description", "clean_description", "original_description_ENDING", "clean_description_ENDING"
],
}
for component in ["walls", "floor", "roof", "hotwater", "mainheat", "mainheatcont", "windows", "main-fuel"]:
component_upper = component.upper()
if component == "main-fuel":
component_upper = component_upper.replace("-", "_")
cleaned_key = "main-fuel" if component == "main-fuel" else f"{component}-description"
left_on_starting = (
f"{component_upper}_STARTING" if component == "main-fuel" else f"{component_upper}_DESCRIPTION_STARTING"
)
left_on_ending = (
f"{component_upper}_ENDING" if component == "main-fuel" else f"{component_upper}_DESCRIPTION_ENDING"
)
df = df.merge(
pd.DataFrame(cleaned_lookup[cleaned_key]),
how="left",
left_on=left_on_starting,
right_on="original_description",
).merge(
pd.DataFrame(cleaned_lookup[cleaned_key]),
how="left",
left_on=left_on_ending,
right_on="original_description",
suffixes=("", "_ENDING")
)
if component == "walls":
# We make sure the wall construction hasn't changed
df = df[
(df["is_cavity_wall"] == df["is_cavity_wall_ENDING"]) &
(df["is_solid_brick"] == df["is_solid_brick_ENDING"]) &
(df["is_timber_frame"] == df["is_timber_frame_ENDING"]) &
(df["is_granite_or_whinstone"] == df["is_granite_or_whinstone_ENDING"]) &
(df["is_cob"] == df["is_cob_ENDING"]) &
(df["is_sandstone_or_limestone"] == df["is_sandstone_or_limestone_ENDING"])
]
elif component == "floor":
df = df[
(df["is_suspended"] == df["is_suspended_ENDING"]) &
(df["is_solid"] == df["is_solid_ENDING"]) &
(df["another_property_below"] == df["another_property_below_ENDING"]) &
(df["is_to_unheated_space"] == df["is_to_unheated_space_ENDING"]) &
(df["is_to_external_air"] == df["is_to_external_air_ENDING"])
]
elif component == "roof":
df = df[
(df["is_pitched"] == df["is_pitched_ENDING"]) &
(df["is_roof_room"] == df["is_roof_room_ENDING"]) &
(df["is_loft"] == df["is_loft_ENDING"]) &
(df["is_flat"] == df["is_flat_ENDING"]) &
(df["is_thatched"] == df["is_thatched_ENDING"]) &
(df["is_at_rafters"] == df["is_at_rafters_ENDING"]) &
(df["has_dwelling_above"] == df["has_dwelling_above_ENDING"])
]
# Drop the binary indicators and replace the original description with the cleaned version
# Drop original cols
original_cols = [
f"{component_upper}_DESCRIPTION_STARTING", f"{component_upper}_DESCRIPTION_ENDING"
] if component != "main-fuel" else [
f"{component_upper}_STARTING", f"{component_upper}_ENDING"
]
df = df.drop(columns=cols_to_drop[component] + original_cols)
# If we have an insulation_thickness column, rename it
if "insulation_thickness" in cleaned_lookup[cleaned_key][0]:
df = df.rename(
columns={
"insulation_thickness": f"{component}_insulation_thickness",
"insulation_thickness_ENDING": f"{component}_insulation_thickness_ENDING",
}
)
# If we have thermal transmittance, rename it
if "thermal_transmittance" in cleaned_lookup[cleaned_key][0]:
df = df.rename(
columns={
"thermal_transmittance": f"{component}_thermal_transmittance",
"thermal_transmittance_ENDING": f"{component}_thermal_transmittance_ENDING",
}
)
# If we have tarrif, rename it
if "tariff_type" in cleaned_lookup[cleaned_key][0]:
df = df.rename(
columns={
"tariff_type": f"{component}_tariff_type",
"tariff_type_ENDING": f"{component}_tariff_type_ENDING",
}
)
# We need the walls descriptions so we rename them to distinguish them
if component == "walls":
df = df.rename(
columns={
"clean_description": f"{component}_clean_description",
"clean_description_ENDING": f"{component}_clean_description_ENDING",
}
)
# We don't need any lighting specific cleaning, we just drop the original description as we use
# LOW_ENERGY_LIGHTING_STARTING, LOW_ENERGY_LIGHTING_ENDING
df = df.drop(columns=["LIGHTING_DESCRIPTION_STARTING", "LIGHTING_DESCRIPTION_ENDING"])
return df
def make_uvalues(df):
df["row_index"] = df.index
uvalues = []
for _, x in df.iterrows():
uprn = x["UPRN"]
row_index = x["row_index"]
age_band = england_wales_age_band_lookup[x["CONSTRUCTION_AGE_BAND"]]
# ~~~~~~~~~~~~~~~~~~
# Walls
# ~~~~~~~~~~~~~~~~~~
starting_wall_uvalue = x["walls_thermal_transmittance"]
if pd.isnull(starting_wall_uvalue):
starting_wall_uvalue = get_wall_u_value(
clean_description=x["walls_clean_description"],
age_band=age_band,
is_granite_or_whinstone=x["is_granite_or_whinstone"],
is_sandstone_or_limestone=x["is_sandstone_or_limestone"],
)
ending_wall_uvalue = x["walls_thermal_transmittance_ENDING"]
if pd.isnull(ending_wall_uvalue):
if x["walls_clean_description"] != x["walls_clean_description_ENDING"]:
ending_wall_uvalue = get_wall_u_value(
clean_description=x["walls_clean_description_ENDING"],
age_band=age_band,
is_granite_or_whinstone=x["is_granite_or_whinstone"],
is_sandstone_or_limestone=x["is_sandstone_or_limestone"],
)
else:
ending_wall_uvalue = starting_wall_uvalue
# ~~~~~~~~~~~~~~~~~~
# Roof
# ~~~~~~~~~~~~~~~~~~
if x["has_dwelling_above"]:
if x["roof_thermal_transmittance"] != 0:
raise ValueError("Should have 0 u-value for roof")
if x["roof_thermal_transmittance_ENDING"] != 0:
raise ValueError("Should have 0 u-value for roof")
starting_roof_uvalue = x["roof_thermal_transmittance"]
if pd.isnull(starting_roof_uvalue):
starting_roof_uvalue = get_roof_u_value(
insulation_thickness=x["roof_insulation_thickness"],
has_dwelling_above=x["has_dwelling_above"],
is_loft=x["is_loft"],
is_roof_room=x["is_roof_room"],
is_thatched=x["is_thatched"],
is_flat=x["is_flat"],
is_pitched=x["is_pitched"],
is_at_rafters=x["is_at_rafters"],
age_band=age_band
)
ending_roof_uvalue = x["roof_thermal_transmittance_ENDING"]
if pd.isnull(ending_roof_uvalue):
ending_roof_uvalue = get_roof_u_value(
insulation_thickness=x["roof_insulation_thickness_ENDING"],
has_dwelling_above=x["has_dwelling_above"],
is_loft=x["is_loft"],
is_roof_room=x["is_roof_room"],
is_thatched=x["is_thatched"],
is_flat=x["is_flat"],
is_pitched=x["is_pitched"],
is_at_rafters=x["is_at_rafters"],
age_band=age_band
)
# ~~~~~~~~~~~~~~~~~~
# Floor
# ~~~~~~~~~~~~~~~~~~
perimeters = {}
for suffix in ["_STARTING", "_ENDING"]:
floor_area = x[f"TOTAL_FLOOR_AREA{suffix}"]
n_rooms = x["NUMBER_HABITABLE_ROOMS"]
perimeters[f"estimated_perimeter{suffix}"] = estimate_perimeter(floor_area, n_rooms)
floor_type = "suspended" if x["is_suspended"] else "solid"
wall_type = get_wall_type(**x)
if x["another_property_below"]:
if x["floor_thermal_transmittance"] != 0:
raise ValueError("Should have 0 u-value for floor")
if x["floor_thermal_transmittance_ENDING"] != 0:
raise ValueError("Should have 0 u-value for floor")
starting_floor_uvalue, ending_floor_uvalue = 0, 0
else:
starting_floor_uvalue = x["floor_thermal_transmittance"]
ending_floor_uvalue = x["floor_thermal_transmittance_ENDING"]
if pd.isnull(starting_floor_uvalue):
starting_floor_uvalue = get_floor_u_value(
floor_type=floor_type,
perimeter=perimeters["estimated_perimeter_STARTING"],
area=x[f"TOTAL_FLOOR_AREA_STARTING"],
insulation_thickness=x["floor_insulation_thickness"],
wall_type=wall_type,
age_band=age_band
)
if pd.isnull(ending_floor_uvalue):
ending_floor_uvalue = get_floor_u_value(
floor_type=floor_type,
perimeter=perimeters["estimated_perimeter_ENDING"],
area=x[f"TOTAL_FLOOR_AREA_ENDING"],
insulation_thickness=x["floor_insulation_thickness_ENDING"],
wall_type=wall_type,
age_band=age_band
)
uvalues.append(
{
"UPRN": uprn,
"row_index": row_index,
"starting_walls_uvalue": starting_wall_uvalue,
"ending_walls_uvalue": ending_wall_uvalue,
"starting_roof_uvalue": starting_roof_uvalue,
"ending_roof_uvalue": ending_roof_uvalue,
"starting_floor_uvalue": starting_floor_uvalue,
"ending_floor_uvalue": ending_floor_uvalue,
**perimeters
}
)
uvalues = pd.DataFrame(uvalues)
df = df.merge(
uvalues, how="left", on=["UPRN", "row_index"]
).drop(columns="row_index")
# Fill missings
for component in ["walls", "floor", "roof"]:
for suffix in ["", "_ENDING"]:
fill_col = f"starting_{component}_uvalue" if suffix == "" else f"ending_{component}_uvalue"
df[f"{component}_thermal_transmittance{suffix}"] = np.where(
pd.isnull(df[f"{component}_thermal_transmittance{suffix}"]),
df[fill_col],
df[f"{component}_thermal_transmittance{suffix}"]
)
df = df.drop(
columns=[
"starting_walls_uvalue", "ending_walls_uvalue", "starting_roof_uvalue",
"ending_roof_uvalue", "starting_floor_uvalue", "ending_floor_uvalue"
]
)
return df
def compare_records(earliest_record: pd.Series, latest_record: pd.Series, columns: list):
"""
For a list of columns, check if the earliest and latest record are the same
If they are the same, we indicate this, because we have example of SAP scores changing
without any feature changes
:param earliest_record: pd.Series
:param latest_record: pd.Series
:param columns: list of columns to compare
:return: boolean indicating whether or not all features are the same
"""
all_equal = True
for col in columns:
if earliest_record[col] != latest_record[col]:
return False
if all_equal:
return True
def app():
# Get all the files in the directory
# Data glossary:
# https://epc.opendatacommunities.org/docs/guidance#glossary
cleaned_lookup = get_cleaned()
# List all subdirectories
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
# directories = directories[125:128]
dataset = []
cleaning_dataset = []
# Keep track of the all equals
all_equal_rows = []
epc_pipeline = EPCPipeline(directories=directories, epc_data_processor=EPCDataProcessor(run_mode="training"))
for directory in tqdm(directories):
filepath = directory / "certificates.csv"
epc_pipeline.run()
data_processor = DataProcessor(filepath=filepath)
df = data_processor.pre_process()
cleaning_averages = data_processor.make_cleaning_averages()
# We have some odd cases with missing constituency so we fill
df = df.fillna({"CONSTITUENCY": df["CONSTITUENCY"].mode().values[0]})
df = DataProcessor.apply_averages_cleaning(
data_to_clean=df,
cleaning_data=cleaning_averages,
cols_to_merge_on=COLUMNS_TO_MERGE_ON
)
data_by_urpn = []
for uprn, property_data in df.groupby("UPRN", observed=True):
# Fixed features - these are property attributes that shouldn't change over time
fixed_data = {}
# If a property has changed building type, we can ignore the epc rating i.e. this should be 1 unique row
if any(property_data[MANDATORY_FIXED_FEATURES].nunique() > 1) or (
pd.isnull(property_data[MANDATORY_FIXED_FEATURES]).sum().sum() > 0
):
continue
# Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS
latest_field_data = property_data[LATEST_FIELD].iloc[-1].to_dict()
mandatory_field_data = (
property_data[MANDATORY_FIXED_FEATURES].iloc[-1].to_dict()
)
# Combine all fields together
fixed_data.update(mandatory_field_data)
fixed_data.update(latest_field_data)
# We include the lodgement date here as we probably need to factor time into the
# model, since EPC standards and rigour have changed over time
variable_data = property_data[
COMPONENT_FEATURES + EFFICIENCY_FEATURES + POTENTIAL_COLUMNS + [
"LODGEMENT_DATE", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, CARBON_RESPONSE
]
]
# Note: we look at changes between subsequent EPCS, however we could look at other permutations
# e.g. first vs second, second vs third and also first vs third
property_model_data = []
for idx in range(0, property_data.shape[0] - 1):
if idx >= property_data.shape[0] - 1:
break
earliest_record = variable_data.iloc[idx]
latest_record = variable_data.iloc[idx + 1]
# Check if the sap gets better or worse
gets_better = earliest_record[RDSAP_RESPONSE] <= latest_record[RDSAP_RESPONSE]
component_variables = COMPONENT_FEATURES + EFFICIENCY_FEATURES
if gets_better:
starting_sap = earliest_record[RDSAP_RESPONSE]
starting_heat_demand = earliest_record[HEAT_DEMAND_RESPONSE]
starting_carbon = earliest_record[CARBON_RESPONSE]
ending_sap = latest_record[RDSAP_RESPONSE]
ending_heat_demand = latest_record[HEAT_DEMAND_RESPONSE]
ending_carbon = latest_record[CARBON_RESPONSE]
rdsap_change = latest_record[RDSAP_RESPONSE] - starting_sap
heat_demand_change = latest_record[HEAT_DEMAND_RESPONSE] - starting_heat_demand
carbon_change = latest_record[CARBON_RESPONSE] - starting_carbon
starting_record = earliest_record[component_variables + ["LODGEMENT_DATE"]].add_suffix("_STARTING")
ending_record = latest_record[component_variables + ["LODGEMENT_DATE"]].add_suffix("_ENDING")
else:
starting_sap = latest_record[RDSAP_RESPONSE]
starting_heat_demand = latest_record[HEAT_DEMAND_RESPONSE]
starting_carbon = latest_record[CARBON_RESPONSE]
ending_sap = earliest_record[RDSAP_RESPONSE]
ending_heat_demand = earliest_record[HEAT_DEMAND_RESPONSE]
ending_carbon = earliest_record[CARBON_RESPONSE]
rdsap_change = earliest_record[RDSAP_RESPONSE] - starting_sap
heat_demand_change = earliest_record[HEAT_DEMAND_RESPONSE] - starting_heat_demand
carbon_change = earliest_record[CARBON_RESPONSE] - starting_carbon
starting_record = latest_record[component_variables + ["LODGEMENT_DATE"]].add_suffix("_STARTING")
ending_record = earliest_record[component_variables + ["LODGEMENT_DATE"]].add_suffix("_ENDING")
if rdsap_change == 0:
continue
all_equal = compare_records(
earliest_record=earliest_record,
latest_record=latest_record,
columns=CORE_COMPONENT_FEATURES
)
if all_equal:
# Keep track of this for the moment so we can analyse
all_equal_rows.append({"uprn": uprn, "directory_name": directory.name})
continue
features = pd.concat([starting_record, ending_record])
property_model_data.append(
{
"UPRN": uprn,
"RDSAP_CHANGE": rdsap_change,
"HEAT_DEMAND_CHANGE": heat_demand_change,
"CARBON_CHANGE": carbon_change,
"SAP_STARTING": starting_sap,
"SAP_ENDING": ending_sap,
"HEAT_DEMAND_STARTING": starting_heat_demand,
"HEAT_DEMAND_ENDING": ending_heat_demand,
"CARBON_STARTING": starting_carbon,
"CARBON_ENDING": ending_carbon,
"POTENTIAL_ENERGY_EFFICIENCY": earliest_record["POTENTIAL_ENERGY_EFFICIENCY"],
"ENVIRONMENT_IMPACT_POTENTIAL": earliest_record["ENVIRONMENT_IMPACT_POTENTIAL"],
"ENERGY_CONSUMPTION_POTENTIAL": earliest_record["ENERGY_CONSUMPTION_POTENTIAL"],
"CO2_EMISSIONS_POTENTIAL": earliest_record["CO2_EMISSIONS_POTENTIAL"],
**fixed_data,
**features.to_dict(),
}
)
data_by_urpn.extend(property_model_data)
data_by_urpn_df = pd.DataFrame(data_by_urpn)
data_by_urpn_df["DAYS_TO_STARTING"] = DataProcessor.calculate_days_to(
data_by_urpn_df["LODGEMENT_DATE_STARTING"]
)
data_by_urpn_df["DAYS_TO_ENDING"] = DataProcessor.calculate_days_to(
data_by_urpn_df["LODGEMENT_DATE_ENDING"]
)
data_by_urpn_df = data_by_urpn_df.drop(columns=["LODGEMENT_DATE_STARTING", "LODGEMENT_DATE_ENDING"])
data_by_urpn_df = DataProcessor.clean_efficiency_variables(data_by_urpn_df)
# We look for key building fabric features that have changed from one EPC to the next.
# if, for example, we see that a home has gone from being a cavity wall to a solid wall, we
# remove this record, as it indicates that the quality of the EPC conducted in the first instance
# is low
# We also replace descriptions with their cleaned variants
if pd.isnull(data_by_urpn_df).sum().sum():
raise ValueError("Null values found in dataset")
data_by_urpn_df = process_and_prune_desriptions(data_by_urpn_df, cleaned_lookup)
# Apply u-values
for col in ["walls_clean_description", "walls_clean_description_ENDING"]:
data_by_urpn_df[col] = data_by_urpn_df[col].str.replace("(assumed)", "").str.rstrip()
data_by_urpn_df = make_uvalues(data_by_urpn_df).drop(
columns=["walls_clean_description", "walls_clean_description_ENDING"]
)
# TODO: For some of the features that we clean, we have either a true, false or possibly null value
# Those nulls should be False. clean_missings_after_description_process handles this but shouldn't
# need to
data_by_urpn_df = DataProcessor.clean_missings_after_description_process(data_by_urpn_df)
if pd.isnull(data_by_urpn_df).sum().sum():
raise ValueError("Null values found in dataset after process_and_prune_desriptions")
dataset.append(data_by_urpn_df)
cleaning_averages["LOCAL_AUTHORITY"] = df["LOCAL_AUTHORITY"].values[0]
cleaning_dataset.append(cleaning_averages)
print("Final all equal count: %s" % str(len(all_equal_rows)))
# Store cleaning dataset in s3 as a parquet file
cleaning_dataset = pd.concat(cleaning_dataset)
save_dataframe_to_s3_parquet(
df=cleaning_dataset,
bucket_name="retrofit-data-dev",
file_key="sap_change_model/cleaning_dataset.parquet",
)
output = pd.concat(dataset)
# Remove any records that have huge swings in their floor area
output["tfa_diff_abs"] = abs(output["TOTAL_FLOOR_AREA_ENDING"] - output["TOTAL_FLOOR_AREA_STARTING"])
output["tfa_diff_prop"] = output["tfa_diff_abs"] / output["TOTAL_FLOOR_AREA_STARTING"]
output = output[output["tfa_diff_prop"] < 0.5]
output = output.drop(columns=["tfa_diff_abs", "tfa_diff_prop"])
uvalue_columns = [col for col in output.columns if "thermal_transmittance" in col]
for uvalue_col in uvalue_columns:
output[uvalue_col] = pd.to_numeric(output[uvalue_col])
save_dataframe_to_s3_parquet(
df=output,
bucket_name="retrofit-data-dev",
file_key="sap_change_model/dataset.parquet",
)
# Store all_equal_rows
all_equal_rows = pd.DataFrame(all_equal_rows)
save_dataframe_to_s3_parquet(
df=all_equal_rows,
bucket_name="retrofit-data-dev",
file_key="sap_change_model/all_equal_rows.parquet",
)
# For testing
dataset_df = epc_pipeline.compiled_dataset
dataset_df.to_parquet("refactor_datasets/dataset.parquet")
pd.DataFrame(epc_pipeline.compiled_all_equal_rows).to_parquet("refactor_datasets/all_equal_rows.parquet")
pd.concat(epc_pipeline.compiled_cleaning_averages).to_parquet("refactor_datasets/cleaning_averages.parquet")
from utils.s3 import read_dataframe_from_s3_parquet
dataset = read_dataframe_from_s3_parquet(
@ -639,4 +31,4 @@ def app():
if __name__ == "__main__":
app()
main()

View file

@ -0,0 +1,4 @@
pandas==2.1.3
tqdm==4.66.1
msgpack==1.0.7
boto3==1.29.6

View file

@ -2,6 +2,59 @@
# TODO: migrate to dynaconf
from pathlib import Path
DATA_ANOMALY_MATCHES = {
# Invalid reports are where the value provided is out of bounds, e.g. a negative energy rating of -1199 or a
# non-integer, there is no valid energy band for this, so it is marked as INVALID!
"INVALID",
"INVALID!",
# When the energy certificate was first lodged on the register there was no requirement to lodge this data
# item, i.e. a non-mandatory item.
"NO DATA!",
"NODATA!",
# When the energy certificate was first lodged on the register there was no requirement to lodge this data item,
# i.e.a non - mandatory item.
"N/A",
# A value generated by the register to account for a data item that was not mandatory when the lodgement of
# the energy certificate occurred. When the data item became mandatory the register operator, for backwards
# compatibility purposes, populated the data field with a value of not recorded to ensure that the energy
# certificate retrieval process is successfully completed. Mandatory data items cannot be applied
# retrospectively to energy certificates lodged before the date of the change.
"Not recorded",
# The data also contains DECs with an operational rating of 9999 (a default DEC). The production of a
# default DEC value was allowed to enable building occupiers, with poor quality or no energy data,
# the opportunity to comply with the regulations. From April 2011 the ability to lodge a default DEC was no
# longer allowed.
"9999",
# The Building Emission Rate (BER) data field for non-domestic buildings may contain a blank value. The BER
# was only lodged on the register from 7 March 2010.
"Blank"
# There are currently just over 8,600 records where the local authority identifier is null. This is due to
# the Register Operator not being able to match the building address in the Markermap Ordinance Survey (GB)
# lookup tables or OS MasterMap Address Layer 2 data. The majority of these addresses have been requested
# manually by energy assessors for inclusion by the Register Operator in the registers (e.g. new builds,
# etc). These records are being published for completeness. An ongoing process to manage these manually added
# addresses will take time to develop to deal with these and future anomalies.
#
# There are several fields within the lodged data where it is possible to enter multiple entries to cater for
# different data_types of build within a single property, i.e. extensions. This results in multiple entries for
# the description fields for floor, roof and wall. For the purposes of this data release only the information
# contained within the first of these multiple entries is being provided. As there are no restrictions on the
# value in this first field it means that sometimes the first field in a multiple entry description field may
# contain a null value. A resolution to correct these anomalies will be considered for future data releases.
"NULL",
# We sometimes see fields populated with just an empty string.
""
}
DATA_ANOMALY_SUBSTRINGS = {
# Where values in a pick list that have been superseded by another value. For example, where a value for
# pitched roof has been replaced by three sub-categories of pitched roof. The original value is retained
# but for backward compatibility only it is appended to ensure that the energy certificate retrieval
# process can be successfully completed. Replacement data items cannot be applied retrospectively to energy
# certificates lodged on the register before the date of the change.
"for backward compatibility only"
}
METRIC_FILENAME = "metrics.csv"
OPTIMISE_METRIC = "mean_absolute_error"
@ -155,6 +208,10 @@ MANDATORY_FIXED_FEATURES = ["PROPERTY_TYPE", "BUILT_FORM", "CONSTITUENCY"]
# and Wales from 31 July 2014
EARLIEST_EPC_DATE = "2014-08-01"
IGNORED_TRANSACTION_TYPES = "new dwelling"
IGNORED_FLOOR_LEVELS = ["top floor", "mid floor"]
IGNORED_PROPERTY_TYPES = "Park home"
RDSAP_RESPONSE = "CURRENT_ENERGY_EFFICIENCY"
HEAT_DEMAND_RESPONSE = "ENERGY_CONSUMPTION_CURRENT"
CARBON_RESPONSE = "CO2_EMISSIONS_CURRENT"

10001
etl/epc/testfile.csv Normal file

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,7 @@
import math
from datetime import datetime
from copy import deepcopy
from typing import Union
import numpy as np
import pandas as pd
@ -310,6 +311,22 @@ def get_roof_u_value(
return float(u_value)
def estimate_number_of_floors(property_type):
"""
Using the property type, we estimate the number of floors in the property
"""
if property_type == "House":
number_of_floors = 2
elif property_type in ["Flat", "Bungalow"]:
number_of_floors = 1
elif property_type == "Maisonette":
number_of_floors = 2
else:
raise NotImplementedError("Implement me")
return number_of_floors
def estimate_perimeter(floor_area, num_rooms):
"""
@ -498,7 +515,7 @@ def get_wall_type(
is_system_built,
is_park_home,
**kwargs
):
) -> Union[str, None]:
"""
Converts booleans to a string wall type, for querying the wall thickness table
:return: