Merge branch 'etl-michael-recommend' of github.com:Hestia-Homes/Model into etl-michael-recommend

This commit is contained in:
Michael Duong 2024-05-15 09:03:54 +00:00
commit f9533ce500
7 changed files with 556 additions and 92 deletions

View file

@ -56,8 +56,11 @@ construction_age_remap = {
expanded_map = {
i: [
label for label, bounds in construction_age_bounds_map.items() if (i <= bounds["u"]) and (i >= bounds['l'])
][0] for i in range(0, 3001)
label
for label, bounds in construction_age_bounds_map.items()
if (i <= bounds["u"]) and (i >= bounds["l"])
][0]
for i in range(0, 3001)
}
@ -74,8 +77,13 @@ class EPCDataProcessor:
Handle data loading and data preprocessing
"""
def __init__(self, data: pd.DataFrame | None = None, cleaning_averages: pd.DataFrame | None = None,
run_mode: str = "training", violation_mode: bool = False) -> None:
def __init__(
self,
data: pd.DataFrame | None = None,
cleaning_averages: pd.DataFrame | None = None,
run_mode: str = "training",
violation_mode: bool = False,
) -> None:
"""
:param filepath: If specified, is the physical location of the data
:param is_newdata: Indicates if we are processing new, testing data.
@ -86,7 +94,9 @@ class EPCDataProcessor:
self.data: pd.DataFrame = data if is_data_a_dataframe else pd.DataFrame()
is_cleaning_averages_a_dataframe = isinstance(cleaning_averages, pd.DataFrame)
self.cleaning_averages: pd.DataFrame = cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame()
self.cleaning_averages: pd.DataFrame = (
cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame()
)
# FOR NOW IF VIOLATION MODE IS ON, WE USE RUN MODE AS NEWDATA
self.violation_mode = violation_mode
@ -103,7 +113,9 @@ class EPCDataProcessor:
ignore_step = True if self.run_mode == "newdata" else False
if filepath is not None:
self.load_data(filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
self.load_data(
filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]
)
if len(self.data) == 0:
raise Exception("No data to process - check filepath/ data being passed in")
@ -121,7 +133,8 @@ class EPCDataProcessor:
self.clean_multi_glaze_proportion(ignore_step=ignore_step)
self.clean_photo_supply()
self.retain_multiple_epc_properties(
epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"], ignore_step=ignore_step
epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"],
ignore_step=ignore_step,
)
self.fill_na_fields()
@ -188,7 +201,9 @@ class EPCDataProcessor:
if ignore_step:
return
self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0]
self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[
0
]
def fill_invalid_constituency_fields(self, ignore_step: bool = False):
"""
@ -201,7 +216,9 @@ class EPCDataProcessor:
if ignore_step:
return
self.data = self.data.fillna({"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]})
self.data = self.data.fillna(
{"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]}
)
def sort_data_by_uprn_lodgement_date(self, ignore_step: bool = False):
"""
@ -301,7 +318,7 @@ class EPCDataProcessor:
"""
if self.violation_mode:
# TODO: to fill in
# TODO: to fill in
return
if ignore_step:
@ -311,9 +328,7 @@ class EPCDataProcessor:
lambda x: self.clean_construction_age_band(x)
)
self.data = self.data[
~pd.isnull(self.data["CONSTRUCTION_AGE_BAND"])
]
self.data = self.data[~pd.isnull(self.data["CONSTRUCTION_AGE_BAND"])]
def clean_missing_rooms(self, ignore_step: bool = False):
"""
@ -331,31 +346,45 @@ class EPCDataProcessor:
return
# TODO: DO we want to move this out of this function? (i.e. alter the data before we do any cleaning)
self.data["POSTAL_AREA"] = self.data["POSTCODE"].apply(lambda x: x.split(" ")[0])
self.data["POSTAL_AREA"] = self.data["POSTCODE"].apply(
lambda x: x.split(" ")[0]
)
def apply_clean(data, matching_columns):
cleaning_data = data[~pd.isnull(data[col])].groupby(
matching_columns
)[col].median().reset_index()
data = data.merge(
cleaning_data, how="left", on=matching_columns, suffixes=("", "_CLEANING")
cleaning_data = (
data[~pd.isnull(data[col])]
.groupby(matching_columns)[col]
.median()
.reset_index()
)
data[col] = np.where(pd.isnull(data[col]), data[f"{col}_CLEANING"], data[col])
data = data.merge(
cleaning_data,
how="left",
on=matching_columns,
suffixes=("", "_CLEANING"),
)
data[col] = np.where(
pd.isnull(data[col]), data[f"{col}_CLEANING"], data[col]
)
data = data.drop(columns=f"{col}_CLEANING")
return data
for col in ["NUMBER_HEATED_ROOMS", "NUMBER_HABITABLE_ROOMS"]:
to_index = 3
matching_columns = ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "POSTAL_AREA"]
matching_columns = [
"PROPERTY_TYPE",
"BUILT_FORM",
"CONSTRUCTION_AGE_BAND",
"POSTAL_AREA",
]
has_missings = pd.isnull(self.data[col]).sum()
while has_missings:
self.data = apply_clean(
data=self.data,
matching_columns=matching_columns[0:to_index + 1]
data=self.data, matching_columns=matching_columns[0 : to_index + 1]
)
has_missings = pd.isnull(self.data[col]).sum()
@ -363,7 +392,10 @@ class EPCDataProcessor:
# Check if we've gotten to index 0 and still have missings - something has gone wrong or
# we have a very unique property type
if has_missings:
raise NotImplementedError("Handle this edge case, we still have missings for column %s" % col)
raise NotImplementedError(
"Handle this edge case, we still have missings for column %s"
% col
)
break
to_index -= 1
@ -410,7 +442,7 @@ class EPCDataProcessor:
# coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.is_newdata else
# COLUMNTYPES
# for k, v in coltypes.items():
# self.data[k] = self.data[k].astype(v)
# self.data[k] = self.data[k].astype(v)
# self.data = self.data.astype(coltypes)
# self.na_remapping()
@ -437,9 +469,11 @@ class EPCDataProcessor:
def na_remapping(self, auto_subset_columns: bool = False):
fill_na_map_apply = {
k: v for k, v in fill_na_map.items() if k in self.data.columns
} if auto_subset_columns else fill_na_map
fill_na_map_apply = (
{k: v for k, v in fill_na_map.items() if k in self.data.columns}
if auto_subset_columns
else fill_na_map
)
for column, fill_value in fill_na_map_apply.items():
self.data[column] = self.data[column].fillna(fill_value)
@ -535,28 +569,34 @@ class EPCDataProcessor:
for variable in AVERAGE_FIXED_FEATURES:
# Replace any missing NAN values with averages for the same Property type and built form
cleaning_averages_filled[variable] = cleaning_averages_filled[variable].fillna(
cleaning_averages_filled[f"{variable}_AVERAGE"]
)
cleaning_averages_filled[variable] = cleaning_averages_filled[
variable
].fillna(cleaning_averages_filled[f"{variable}_AVERAGE"])
cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_AVERAGE")
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=f"{variable}_AVERAGE"
)
# If there are still NA values i.e. the averages do not have values for a speicifc group of property tyope
# and built form
# We can use just the property type average and replace
cleaning_averages_filled[variable] = cleaning_averages_filled[variable].fillna(
cleaning_averages_filled[f"{variable}_PROPERTY_AVERAGE"]
)
cleaning_averages_filled[variable] = cleaning_averages_filled[
variable
].fillna(cleaning_averages_filled[f"{variable}_PROPERTY_AVERAGE"])
cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_PROPERTY_AVERAGE")
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=f"{variable}_PROPERTY_AVERAGE"
)
# If there are still NA values, use BUILT FORM averages
cleaning_averages_filled["variable"] = cleaning_averages_filled[variable].fillna(
cleaning_averages_filled[f"{variable}_BUILT_FORM_AVERAGE"]
)
cleaning_averages_filled["variable"] = cleaning_averages_filled[
variable
].fillna(cleaning_averages_filled[f"{variable}_BUILT_FORM_AVERAGE"])
cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_BUILT_FORM_AVERAGE")
cleaning_averages_filled = cleaning_averages_filled.drop(
columns=f"{variable}_BUILT_FORM_AVERAGE"
)
# If there still is na values, use average across all epc in consituecy
cleaning_averages_filled[variable] = cleaning_averages_filled[
@ -573,7 +613,9 @@ class EPCDataProcessor:
self.cleaning_averages = cleaning_averages_filled
def retain_multiple_epc_properties(self, epc_minimum_count: int = 1, ignore_step: bool = False) -> None:
def retain_multiple_epc_properties(
self, epc_minimum_count: int = 1, ignore_step: bool = False
) -> None:
"""
Reduce the data futher by keeping only datasets with multiple epcs
"""
@ -592,12 +634,16 @@ class EPCDataProcessor:
counts = counts[counts["count"] > epc_minimum_count]
self.data = pd.merge(self.data, counts, on="UPRN")
def recast_df_columns(self, column_mappings: dict, auto_subset_columns: bool = False) -> None:
def recast_df_columns(
self, column_mappings: dict, auto_subset_columns: bool = False
) -> None:
"""
Recast columns from the dataframe to ensure the behaviour we want
"""
if auto_subset_columns:
column_mappings = {k: v for k, v in column_mappings.items() if k in self.data.columns}
column_mappings = {
k: v for k, v in column_mappings.items() if k in self.data.columns
}
for key, values in column_mappings.items():
if key not in self.data.columns:
@ -608,13 +654,17 @@ class EPCDataProcessor:
else:
self.data[key] = self.data[key].astype(values)
def recast_all_data(self, column_mappings: dict, auto_subset_columns: bool = False) -> None:
def recast_all_data(
self, column_mappings: dict, auto_subset_columns: bool = False
) -> None:
"""
Using a dictionary to recast all columns at once
"""
if auto_subset_columns:
column_mappings = {k: v for k, v in column_mappings.items() if k in self.data.columns}
column_mappings = {
k: v for k, v in column_mappings.items() if k in self.data.columns
}
self.data = self.data.astype(column_mappings)
@ -625,14 +675,28 @@ class EPCDataProcessor:
if self.violation_mode:
violation_uprn_missing = pd.isnull(self.data["UPRN"])
violation_old_lodgment_date = self.data["LODGEMENT_DATE"] < EARLIEST_EPC_DATE
violation_invalid_transaction_type = self.data["TRANSACTION_TYPE"] == IGNORED_TRANSACTION_TYPES
violation_ignored_floor_level = self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS)
violation_old_lodgment_date = (
self.data["LODGEMENT_DATE"] < EARLIEST_EPC_DATE
)
violation_invalid_transaction_type = (
self.data["TRANSACTION_TYPE"] == IGNORED_TRANSACTION_TYPES
)
violation_ignored_floor_level = self.data["FLOOR_LEVEL"].isin(
IGNORED_FLOOR_LEVELS
)
violation_rdsap_score_above_max = self.data[RDSAP_RESPONSE] > MAX_SAP_SCORE
violation_missing_windows_description = pd.isnull(self.data["WINDOWS_DESCRIPTION"])
violation_missing_hotwater_description = pd.isnull(self.data["HOTWATER_DESCRIPTION"])
violation_missing_roof_description = pd.isnull(self.data["ROOF_DESCRIPTION"])
violation_invalid_property_type = self.data["PROPERTY_TYPE"] == IGNORED_PROPERTY_TYPES
violation_missing_windows_description = pd.isnull(
self.data["WINDOWS_DESCRIPTION"]
)
violation_missing_hotwater_description = pd.isnull(
self.data["HOTWATER_DESCRIPTION"]
)
violation_missing_roof_description = pd.isnull(
self.data["ROOF_DESCRIPTION"]
)
violation_invalid_property_type = (
self.data["PROPERTY_TYPE"] == IGNORED_PROPERTY_TYPES
)
violation_invalid_tenure = self.data["TENURE"].isin(IGNORED_TENURES)
violation_df = pd.concat(
@ -647,7 +711,8 @@ class EPCDataProcessor:
violation_missing_roof_description,
violation_invalid_property_type,
violation_invalid_tenure,
], axis=1,
],
axis=1,
keys=[
"violation_uprn_missing",
"violation_old_lodgment_date",
@ -658,8 +723,8 @@ class EPCDataProcessor:
"violation_missing_hotwater_description",
"violation_missing_roof_description",
"violation_invalid_property_type",
"violation_invalid_tenure"
]
"violation_invalid_tenure",
],
)
self.data = pd.concat([self.data, violation_df], axis=1)
@ -685,10 +750,10 @@ class EPCDataProcessor:
self.data = self.data[~pd.isnull(self.data["UPRN"])]
self.data = self.data[self.data["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE]
self.data = self.data[self.data["TRANSACTION_TYPE"] != IGNORED_TRANSACTION_TYPES]
self.data = self.data[
~self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS)
self.data["TRANSACTION_TYPE"] != IGNORED_TRANSACTION_TYPES
]
self.data = self.data[~self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS)]
self.data = self.data[self.data[RDSAP_RESPONSE] <= MAX_SAP_SCORE]
# We observed 7 final records with missing windows and 2 records with missing hot water so we shall remove them
@ -705,7 +770,10 @@ class EPCDataProcessor:
self.data = self.data[~self.data["TENURE"].isin(IGNORED_TENURES)]
# We remap zero values to None
self.data.loc[self.data['FLOOR_HEIGHT'] == 0, 'FLOOR_HEIGHT'] = None
self.data.loc[self.data["FLOOR_HEIGHT"] == 0, "FLOOR_HEIGHT"] = None
# Keep only non zero floor area
self.data = self.data[self.data["TOTAL_FLOOR_AREA"] != 0]
def clean_multi_glaze_proportion(self, ignore_step: bool = False) -> None:
"""
@ -734,7 +802,11 @@ class EPCDataProcessor:
@staticmethod
def apply_averages_cleaning(
data_to_clean, cleaning_data, cols_to_merge_on, colnames=None, ignore_step: bool = False
data_to_clean,
cleaning_data,
cols_to_merge_on,
colnames=None,
ignore_step: bool = False,
):
"""
Clean the input DataFrame using averages from a cleaning DataFrame.
@ -752,12 +824,13 @@ class EPCDataProcessor:
# The desired colnames to clean - which may not be present
if colnames is None:
colnames = ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT", "FIXED_LIGHTING_OUTLETS_COUNT"]
colnames = [
"TOTAL_FLOOR_AREA",
"FLOOR_HEIGHT",
"FIXED_LIGHTING_OUTLETS_COUNT",
]
cols_to_clean = [
c for c in colnames if
c in data_to_clean.columns
]
cols_to_clean = [c for c in colnames if c in data_to_clean.columns]
# Enforce data types
for col in ["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"]:
@ -768,7 +841,15 @@ class EPCDataProcessor:
# Calculate averages
cleaning_averages_to_merge = cleaning_data.groupby(columns_to_merge_on).agg(
dict(zip(cols_to_clean, ["mean", ] * len(cols_to_clean)))
dict(
zip(
cols_to_clean,
[
"mean",
]
* len(cols_to_clean),
)
)
)
# Merge with the original data
@ -777,7 +858,7 @@ class EPCDataProcessor:
cleaning_averages_to_merge,
on=columns_to_merge_on,
suffixes=("", "_AVERAGE"),
how='left'
how="left",
)
global_averages = cleaning_data[cols_to_clean].mean()
@ -806,14 +887,20 @@ class EPCDataProcessor:
raise Exception("Suffix should be one of _starting or _ending")
if suffix == "_STARTING":
starting_cols = self.data[STARTING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES].copy().add_suffix(suffix)
starting_cols = (
self.data[STARTING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES]
.copy()
.add_suffix(suffix)
)
fixed_cols = self.data[NO_SUFFIX_COMPONENT_COLS + POTENTIAL_COLUMNS].copy()
return pd.concat([starting_cols, fixed_cols], axis=1)
return self.data[
ENDING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES
].copy().add_suffix(suffix)
return (
self.data[ENDING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES]
.copy()
.add_suffix(suffix)
)
def get_fixed_features(self) -> pd.DataFrame:
"""
@ -831,14 +918,17 @@ class EPCDataProcessor:
:param cols_to_ignore: If specified, is a list of columns to ignore, e.g. uuids
:return: DataFrame with coerced columns.
"""
object_columns = df.select_dtypes(include=['object']).columns
object_columns = df.select_dtypes(include=["object"]).columns
if cols_to_ignore:
object_columns = [c for c in object_columns if c not in cols_to_ignore]
for column in object_columns:
unique_values = df[column].dropna().unique()
# If the unique values in the column are 'True' and 'False', convert the column to boolean
if set(unique_values) == {'True', 'False'} or set(unique_values) == {True, False}:
if set(unique_values) == {"True", "False"} or set(unique_values) == {
True,
False,
}:
df[column] = df[column].astype(bool)
return df
@ -877,7 +967,6 @@ class EPCDataProcessor:
@staticmethod
def clean_efficiency_variables(df):
"""
These is scope to clean this by the model per corresponding description.
E.g. for WALLS_ENG_EFF we could look at the mode efficiency rating by description and

View file

@ -93,6 +93,8 @@ class EPCPipeline:
epc_all_equal_rows_key="sap_change_model/{}/all_equal_rows_rooms.parquet",
epc_compiled_dataset_key="sap_change_model/{}/dataset_rooms.parquet",
use_parallel=False,
use_recommendations=False,
epc_recommendations_file="recommendations.csv",
):
"""
:param directories: List of directories to process
@ -107,6 +109,7 @@ class EPCPipeline:
self.compiled_dataset: pd.DataFrame = pd.DataFrame()
self.compiled_all_equal_rows: list = []
self.compiled_cleaning_averages: list = []
self.recommendation_dataset: pd.DataFrame = pd.DataFrame()
self.directories = directories
self.epc_data_processor = epc_data_processor
@ -115,6 +118,9 @@ class EPCPipeline:
self.epc_local_file = epc_local_file
self.epc_bucket_name = epc_bucket_name
self.use_recommendations = use_recommendations
self.epc_recommendations_file = epc_recommendations_file
self.use_parallel = use_parallel
self.timeprefix = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
@ -257,6 +263,9 @@ class EPCPipeline:
self.compiled_dataset = pd.concat(
[self.compiled_dataset, result["dataset"]]
)
self.recommendation_dataset = pd.concat(
[self.recommendation_dataset, result["recommendation_dataset"]]
)
self.compiled_cleaning_averages.append(result["cleaning_averages"])
self.compiled_all_equal_rows.extend(result["all_equal_rows"])
@ -271,6 +280,7 @@ class EPCPipeline:
"dataset": self.compiled_dataset,
"cleaning_averages": self.epc_data_processor.cleaning_averages,
"all_equal_rows": self.compiled_all_equal_rows,
"recommendation_dataset": self.recommendation_dataset,
}
return output
@ -287,15 +297,54 @@ class EPCPipeline:
constituency_data = self.epc_data_processor.data
if self.use_recommendations:
# Use only the most recent epc for each uprn
constituency_data = constituency_data.sort_values(
"lodgement_date", ascending=False
).drop_duplicates("uprn")
recommendations_filepath = directory / self.epc_recommendations_file
recommendations_df = pd.read_csv(recommendations_filepath)
recommendations_df = recommendations_df[
recommendations_df["IMPROVEMENT_ID"].notnull()
]
recommendations_df["IMPROVEMENT_ID"] = recommendations_df[
"IMPROVEMENT_ID"
].astype(int)
recommendations_df.columns = recommendations_df.columns.str.lower()
# Get all recommendations for all properties in the constituency (after cleaning)
recommendations_df = recommendations_df.merge(
constituency_data[["lmk_key", "uprn"]], on="lmk_key", how="inner"
)
# Keep all properties that have recommendations
constituency_data = constituency_data[
constituency_data["lmk_key"].isin(recommendations_df["lmk_key"])
]
# In order to create a difference record, we repeat each row for each uprn
constituency_data = pd.concat(
[constituency_data, constituency_data]
).reset_index(drop=True)
constituency_data = constituency_data.sort_values("uprn")
self.compiled_cleaning_averages.append(
self.epc_data_processor.cleaning_averages
)
constituency_difference_records = []
require_adequate_data_check = False if self.use_recommendations else True
for uprn, property_data in constituency_data.groupby("uprn", observed=True):
difference_records = self.process_uprn(
uprn=str(uprn), property_data=property_data, directory=directory
uprn=str(uprn),
property_data=property_data,
directory=directory,
require_adequate_data_check=require_adequate_data_check,
)
if difference_records is not None:
constituency_difference_records.extend(difference_records)
@ -308,7 +357,18 @@ class EPCPipeline:
[self.compiled_dataset, constituency_dataset.df]
)
def process_uprn(self, uprn: str, property_data: pd.DataFrame, directory: Path):
if self.use_recommendations:
self.recommendation_dataset = pd.concat(
[self.recommendation_dataset, recommendations_df]
)
def process_uprn(
self,
uprn: str,
property_data: pd.DataFrame,
directory: Path,
require_adequate_data_check: bool = True,
):
"""
Process a single UPRN, which may have multiple different EPCs
:param uprn: UPRN
@ -342,13 +402,18 @@ class EPCPipeline:
# We can use multiple types of comparison datasets - i.e. Compare consecutive records, or compare all permutations of records
property_difference_records = self._generate_property_difference_records(
epc_records, uprn, directory, fixed_data
epc_records, uprn, directory, fixed_data, require_adequate_data_check
)
return property_difference_records
def _generate_property_difference_records(
self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict
self,
epc_records: List[EPCRecord],
uprn: str,
directory: Path,
fixed_data: dict,
require_adequate_data_check: bool = True,
):
"""
We can use multiple types of comparison datasets, for example:
@ -364,7 +429,12 @@ class EPCPipeline:
# property_difference_records = self._compare_consecutive_epcs(epc_records, uprn, directory, fixed_data, property_difference_records)
property_difference_records = self._compare_all_permutation_epcs(
epc_records, uprn, directory, fixed_data, property_difference_records
epc_records,
uprn,
directory,
fixed_data,
property_difference_records,
require_adequate_data_check,
)
return property_difference_records
@ -376,6 +446,7 @@ class EPCPipeline:
directory: Path,
fixed_data: dict,
property_difference_records: list,
require_adequate_data_check: bool = True,
):
"""
Compare all permutations of EPCs for a given UPRN
@ -400,7 +471,10 @@ class EPCPipeline:
# TODO: Pull out RDSAP_CHANGE to a variable
if difference_record.get("rdsap_change") == 0:
if not difference_record.ensure_adequate_data():
if (
not difference_record.ensure_adequate_data()
and require_adequate_data_check
):
# Rdsap hasn't changed but we have enough data to use this record
# i.e. all fields aside from mechnical ventilation are the same]
# self.check_records.append({"uprn": uprn, "directory_name": directory.name, "difference_record": difference_record, "earliest_record": earliest_record, "latest_record": latest_record})
@ -410,7 +484,7 @@ class EPCPipeline:
fields=[x.lower() for x in CORE_COMPONENT_FEATURES]
)
if all_equal:
if all_equal and require_adequate_data_check:
# Keep track of this for the moment so we can analyse
self.compiled_all_equal_rows.append(
{"uprn": uprn, "directory_name": directory.name}

View file

@ -18,6 +18,7 @@ def main():
directories=directories,
use_parallel=True,
epc_data_processor=EPCDataProcessor(run_mode="training"),
use_recommendations=True,
)
epc_pipeline.run()

View file

@ -0,0 +1,223 @@
# Pipeline to load all EPC data similar to EPCPipeline but once data is made into EPCRecord,
# We intantiate a Property instance so that we can get both the recommendations and the classification of the
# walls, roof and floor (i.e. average, above average etc)
import os
from datetime import datetime
import itertools
from tqdm import tqdm
import pandas as pd
from etl.epc.Record import EPCRecord
from backend.SearchEpc import SearchEpc
from sqlalchemy.orm import sessionmaker
from backend.app.config import get_settings
from backend.app.db.connection import db_engine
from backend.app.db.functions.materials_functions import get_materials
from backend.app.plan.utils import get_cleaned
from backend.Property import Property
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from recommendations.Recommendations import Recommendations
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet, save_dataframe_to_s3_parquet
from datetime import datetime
now = datetime.now().strftime("%d-%m-%Y-%H-%M-%S")
logger = setup_logger()
logger.info("Connecting to db")
session = sessionmaker(bind=db_engine)()
created_at = datetime.now().isoformat()
session.begin()
logger.info("Getting the inputs")
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET,
file_key="sap_change_model/cleaning_dataset.parquet",
)
materials = get_materials(session)
cleaned = get_cleaned()
uprn_filenames = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet"
)
photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(
bucket=get_settings().DATA_BUCKET
)
scenario_properties_df = pd.read_csv(
Path(__file__).parent / "improvement_data_sample.csv"
)
improvement_id_to_check = 1
properties_to_check = scenario_properties_df[
scenario_properties_df["IMPROVEMENT_ID"] == improvement_id_to_check
]
property_list = []
for i, row in tdqm(properties_to_check.iterrows()):
try:
epc_searcher = SearchEpc(
address1=row["ADDRESS1"],
postcode=row["POSTCODE"],
auth_token=get_settings().EPC_AUTH_TOKEN,
os_api_key=get_settings().ORDNANCE_SURVEY_API_KEY,
)
epc_searcher.find_property()
epc_records = {
"original_epc": epc_searcher.newest_epc.copy(),
"full_sap_epc": epc_searcher.full_sap_epc.copy(),
"old_data": epc_searcher.older_epcs.copy(),
}
prepared_epc = EPCRecord(
epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data
)
p = Property(
id=prepared_epc.uprn,
address=epc_searcher.address_clean,
postcode=epc_searcher.postcode_clean,
epc_record=prepared_epc,
)
p.get_spatial_data(uprn_filenames)
p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds)
recommender = Recommendations(property_instance=p, materials=materials)
property_recommendations = recommender.recommend()
wall_recommendations = recommender.wall_recomender.recommendations
loft_recommendations = recommender.roof_recommender.recommendations
solar_recommendations = recommender.solar_recommender.recommendation
windows_recommendations = recommender.windows_recommender.recommendation
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
property_list.append(p.base_difference_record.df)
except:
pass
property_df = pd.concat(property_list)
property_df["walls_insulation_thickness"]
scenario_properties = [
{
"address": "2 South Terrace",
"postcode": "NN1 5JY",
"lmk-key": "1459796789102016070507274146560098",
"measures": [
[
["internal_wall_insulation"],
"11",
{"walls_insulation_thickness_ending": "average"},
[0],
],
[
["external_wall_insulation"],
"10",
{"walls_insulation_thickness_ending": "average"},
[0],
],
[["solar", "windows"], "15", {"photo_supply_ending": 50}, [0, 1]],
],
},
{
"address": "8 Lindlings",
"postcode": "HP1 2HA",
"lmk-key": "c14029235739827d5f627dc8aa9bb567d026b267e851e0db0001db24638667b1",
"measures": [
[
["cavity_wall_insulation", "loft_insulation"],
"15",
{"walls_insulation_thickness_ending": "average"},
[0, 1],
],
],
},
{
"address": "44 Lindlings",
"postcode": "HP1 2HE",
"lmk-key": "99296a6dda21314fef3a61cda59e441e9a2aacf115eb96f4a0fa85696bf7b117",
"measures": [
[
["cavity_wall_insulation", "loft_insulation"],
"15",
{"walls_insulation_thickness_ending": "average"},
[0, 1],
],
],
},
{
"address": "46 Chaulden Terrace",
"postcode": "HP1 2AN",
"lmk-key": "d1e0534be3a44c33003323b21d0e322e3daddc65b5ee71936f89c59ddab96b50",
"measures": [
[
["cavity_wall_insulation", "loft_insulation"],
"15",
{"walls_insulation_thickness_ending": "average"},
[0, 1],
],
],
},
{
"address": "73 Long Chaulden",
"postcode": "HP1 2HX",
"lmk-key": "1eae354db522a95188018d9cd0502ed8c609910b6c88f8797d3a25f59b11770a",
"measures": [
[
["cavity_wall_insulation", "loft_insulation"],
"15",
{"walls_insulation_thickness_ending": "average"},
[0, 1],
],
],
},
]
from pathlib import Path
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.Pipeline import EPCPipeline
DATA_DIRECTORY = (
Path(__file__).parent.parent / "epc" / "local_data" / "all-domestic-certificates"
)
def main():
"""
Orchestration function
"""
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
# Set up the a new pipeline only up into the EPCRecord stage
# So that we can instantiate a Property instance and get the recommendations
# directories = directories[0:3]
# epc_pipeline = EPCPipeline(
# directories=directories,
# use_parallel=True,
# epc_data_processor=EPCDataProcessor(run_mode="training"),
# )
# epc_pipeline.run()
if __name__ == "__main__":
main()

View file

@ -0,0 +1,59 @@
| | improvement_description |
|---:|:---------------------------------------------------------|
| 1 | Hot water cylinder insulation |
| 2 | Hot water cylinder insulation |
| 3 | Hot water cylinder insulation |
| 4 | Hot water cylinder thermostat |
| 5 | Floor insulation (suspended floor) |
| 6 | Cavity wall insulation |
| 7 | Internal or external wall insulation |
| 8 | Double glazed windows |
| 9 | Secondary glazing |
| 10 | Solar water heating |
| 11 | Heating controls (programmer, room thermostat and TRVs) |
| 12 | Heating controls (room thermostat and TRVs) |
| 13 | Heating controls (thermostatic radiator valves) |
| 14 | Heating controls (room thermostat) |
| 15 | Heating controls (programmer and TRVs) |
| 16 | Heating controls (time and temperature zone control) |
| 17 | Heating controls (programmer and room thermostat) |
| 18 | Heating controls (room thermostat) |
| 19 | Solar water heating |
| 20 | Replace boiler with new condensing boiler |
| 21 | Replace boiler with new condensing boiler |
| 22 | Replace boiler with biomass boiler |
| 23 | Biomass stove with boiler |
| 24 | Fan assisted storage heaters and dual immersion cylinder |
| 25 | Fan assisted storage heaters |
| 26 | Replacement warm air unit |
| 27 | Change heating to gas condensing boiler |
| 28 | Condensing oil boiler with radiators |
| 29 | Gas condensing boiler |
| 30 | Internal or external wall insulation |
| 31 | Fan-assisted storage heaters |
| 32 | Change heating to gas condensing boiler |
| 34 | Solar photovoltaic panels, 2.5 kWp |
| 35 | Low energy lighting |
| 36 | Condensing heating unit |
| 37 | Condensing boiler (separate from the range cooker) |
| 38 | Condensing boiler (separate from the range cooker) |
| 39 | Biomass stove with boiler |
| 40 | Change room heaters to condensing boiler |
| 41 | Translation missing |
| 42 | Mains gas condensing heating unit |
| 43 | Translation missing |
| 44 | Wind turbine |
| 45 | Flat roof or sloping ceiling insulation |
| 46 | Room-in-roof insulation |
| 47 | Floor insulation (solid floor) |
| 48 | High performance external doors |
| 49 | Heat recovery system for mixer showers |
| 50 | Flue gas heat recovery device in conjunction with boiler |
| 56 | Replacement glazing units |
| 57 | Floor insulation (suspended floor) |
| 58 | Floor insulation (solid floor) |
| 59 | High heat retention storage heaters |
| 60 | High heat retention storage heaters |
| 61 | High heat retention storage heaters |
| 62 | High heat retention storage heaters |
| 63 | Party wall insulation |

View file

@ -0,0 +1,4 @@
beautifulsoup4==4.12.3
requests==2.31.0
pandas==2.2.2
tqdm==4.66.2

View file

@ -4,7 +4,7 @@ import numpy as np
from backend.Property import Property
from recommendations.Costs import Costs
from recommendation_utils import override_costs
from recommendations.recommendation_utils import override_costs
class WindowsRecommendations:
@ -14,7 +14,7 @@ class WindowsRecommendations:
# glazed
"most": 0.33,
# If glazing is partial, we assume 50/50 split between glazed and unglazed
"partial": 0.5
"partial": 0.5,
}
def __init__(self, property_instance: Property, materials: List):
@ -52,14 +52,20 @@ class WindowsRecommendations:
if not number_of_windows:
raise ValueError("Number of windows not specified")
if self.property.windows["has_glazing"] & (self.property.windows["glazing_coverage"] == "full"):
if self.property.windows["has_glazing"] & (
self.property.windows["glazing_coverage"] == "full"
):
return
# We scale the number of windows based on the proportion of existing glazing
if self.property.data["multi-glaze-proportion"] != "":
n_windows_scalar = 1 - (int(self.property.data["multi-glaze-proportion"]) / 100)
n_windows_scalar = 1 - (
int(self.property.data["multi-glaze-proportion"]) / 100
)
else:
n_windows_scalar = self.COVERAGE_MAP.get(self.property.windows["glazing_coverage"], 1)
n_windows_scalar = self.COVERAGE_MAP.get(
self.property.windows["glazing_coverage"], 1
)
number_of_windows *= n_windows_scalar
number_of_windows = np.ceil(number_of_windows)
@ -68,7 +74,7 @@ class WindowsRecommendations:
cost_result = self.costs.window_glazing(
number_of_windows=number_of_windows,
material=self.glazing_material,
is_secondary_glazing=is_secondary_glazing
is_secondary_glazing=is_secondary_glazing,
)
already_installed = "windows_glazing" in self.property.already_installed
@ -76,18 +82,26 @@ class WindowsRecommendations:
cost_result = override_costs(cost_result)
description = "The property already has double glazing installed. No further action is required."
else:
glazing_type = "secondary glazing" if is_secondary_glazing else "double glazing"
glazing_type = (
"secondary glazing" if is_secondary_glazing else "double glazing"
)
if self.property.windows["glazing_coverage"] in ["partial", "most"]:
description = f"Install {glazing_type} to the remaining windows"
else:
description = f"Install {glazing_type} to all windows"
if self.property.is_listed:
description += ". Secondary glazing recommended due to listed building status"
description += (
". Secondary glazing recommended due to listed building status"
)
elif self.property.is_heritage:
description += ". Secondary glazing recommended due to herigate building status"
description += (
". Secondary glazing recommended due to herigate building status"
)
elif self.property.in_conservation_area:
description += ". Secondary glazing recommended due to conservation area status"
description += (
". Secondary glazing recommended due to conservation area status"
)
self.recommendation = [
{
@ -100,6 +114,6 @@ class WindowsRecommendations:
"sap_points": None,
"already_installed": already_installed,
**cost_result,
"is_secondary_glazing": is_secondary_glazing
"is_secondary_glazing": is_secondary_glazing,
}
]