new pipeline working

This commit is contained in:
Michael Duong 2023-12-20 16:27:27 +00:00
parent 174299dcd3
commit 1eb98d2d35
4 changed files with 72 additions and 643 deletions

View file

@ -83,7 +83,7 @@ class EPCDataProcessor:
raise ValueError("Run mode must be either training or newdata")
self.run_mode = run_mode if not violation_mode else "newdata"
def prepare_data(self, filepath: str) -> tuple[pd.DataFrame, pd.DataFrame]:
def prepare_data(self, filepath: Path | str) -> None:
"""
Given the run mode, we apply the relevant pipeline steps
Ignore step is used to highlight which steps are not needed in newdata
@ -91,9 +91,12 @@ class EPCDataProcessor:
ignore_step = True if self.run_mode == "newdata" else False
if self.data is None:
if filepath is not None:
self.load_data(filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"])
if len(self.data) == 0:
raise Exception("No data to process - check filepath/ data being passed in")
self.confine_data(ignore_step=ignore_step)
self.remap_anomalies()
self.remap_floor_level(ignore_step=ignore_step)
@ -116,6 +119,7 @@ class EPCDataProcessor:
# Final re-casting after data transformed and prepared
self.recast_df_columns(column_mappings=COLUMNTYPES, auto_subset_columns=True)
self.recast_all_data(column_mappings=COLUMNTYPES, auto_subset_columns=True)
self.na_remapping(auto_subset_columns=True)
self.fill_invalid_constituency_fields(ignore_step=ignore_step)
@ -132,9 +136,8 @@ class EPCDataProcessor:
self.add_local_authority_to_cleaning_average(ignore_step=ignore_step)
self.cast_cleaning_averages_columns_to_lower(ignore_step=ignore_step)
self.cast_data_columns_to_lower()
return self.data, self.cleaning_averages
def cast_data_columns_to_lower(self):
"""
Convert all columns names to lower
@ -577,8 +580,19 @@ class EPCDataProcessor:
for key, values in column_mappings.items():
if key not in self.data.columns:
raise ValueError("Column mapping incorrectly specified")
for value in values:
self.data[key] = self.data[key].astype(value)
if isinstance(values, list):
for value in values:
self.data[key] = self.data[key].astype(value)
else:
self.data[key] = self.data[key].astype(values)
def recast_all_data(self, column_mappings: dict, auto_subset_columns: bool = False) -> None:
"""
Using a dictionary to recast all columns at once
"""
if auto_subset_columns:
column_mappings = {k: v for k, v in column_mappings.items() if k in self.data.columns}
self.data = self.data.astype(column_mappings)

View file

@ -45,7 +45,7 @@ class TrainingDataset(BaseDataset):
# self.pipeline_steps = self.pipeline_factory("training")
self.datasets = datasets
self.df = pd.DataFrame([dataset.difference_record for dataset in datasets])
self._feature_generation()
self._drop_features()
self._clean_efficiency_variables()
@ -112,14 +112,20 @@ class TrainingDataset(BaseDataset):
"""
Using the apply method, use the get_wall_u_value method to generate the u-value
"""
col_name = "walls_clean_description" if not is_end else "walls_clean_description_ending"
description_col_name = "walls_clean_description" if not is_end else "walls_clean_description_ending"
thermal_transistance_col_name = "walls_thermal_transmittance" if not is_end else "walls_thermal_transmittance_ending"
return get_wall_u_value(
clean_description=row[col_name],
age_band=england_wales_age_band_lookup[row["construction_age_band"]],
is_granite_or_whinstone=row["is_granite_or_whinstone"],
is_sandstone_or_limestone=row["is_sandstone_or_limestone"],
)
if pd.isnull(row[thermal_transistance_col_name]):
output = get_wall_u_value(
clean_description=row[description_col_name],
age_band=england_wales_age_band_lookup[row["construction_age_band"]],
is_granite_or_whinstone=row["is_granite_or_whinstone"],
is_sandstone_or_limestone=row["is_sandstone_or_limestone"],
)
else:
output = row[thermal_transistance_col_name]
return output
@staticmethod
def _lambda_function_to_generate_floor_uvalue(row, is_end=False):
@ -235,8 +241,8 @@ class TrainingDataset(BaseDataset):
floor_ending_uvalue = self.df['floor_thermal_transmittance_ending'].fillna(floor_ending_uvalue)
for component in ["walls", "roof", "floor"]:
self.df[f"{component}_thermal_transmittance"] = self.df[f"{component}_thermal_transmittance"].fillna(f"{component}_starting_uvalue")
self.df[f"{component}_thermal_transmittance_ending"] = self.df[f"{component}_thermal_transmittance_ending"].fillna(f"{component}_ending_uvalue")
self.df[f"{component}_thermal_transmittance"] = self.df[f"{component}_thermal_transmittance"].fillna(eval(f"{component}_starting_uvalue"))
self.df[f"{component}_thermal_transmittance_ending"] = self.df[f"{component}_thermal_transmittance_ending"].fillna(eval(f"{component}_ending_uvalue"))
self.df = self.df.drop(columns=["floor_type", "wall_type", "walls_clean_description", "walls_clean_description_ending"])
@ -353,11 +359,17 @@ class TrainingDataset(BaseDataset):
for component in components_to_expand:
# TODO: change cleaned dataframe to have underscores instead of dashes
cleaned_key, left_on_starting, left_on_ending, original_cols = (
("main-fuel", "main_fuel_starting", "main_fuel_ending", ["main_fuel_starting", "main_fuel_ending"]) if component == "main-fuel" else
(f"{component}-description", f"{component}_description_starting", f"{component}_description_ending", [f"{component}_description_starting", f"{component}_description_ending"])
)
# TODO: change cleaned dataframe to have underscores instead of dashes
if component == "main-fuel":
cleaned_key = "main-fuel"
left_on_starting = "main_fuel_starting"
left_on_ending = "main_fuel_ending"
original_cols = ["main_fuel_starting", "main_fuel_ending"]
else:
cleaned_key = f"{component}-description"
left_on_starting = f"{component}_description_starting"
left_on_ending = f"{component}_description_ending"
original_cols = [f"{component}_description_starting", f"{component}_description_ending"]
cleaned_lookup_df_for_key = pd.DataFrame(cleaned_lookup[cleaned_key])

View file

@ -105,23 +105,23 @@ class EPCPipeline:
for directory in self.directories:
self.process_directory(directory)
save_dataframe_to_s3_parquet(
df=self.compiled_dataset,
bucket_name=self.epc_bucket_name,
file_key=self.epc_compiled_dataset_key,
)
# save_dataframe_to_s3_parquet(
# df=self.compiled_dataset,
# bucket_name=self.epc_bucket_name,
# file_key=self.epc_compiled_dataset_key,
# )
save_dataframe_to_s3_parquet(
df=pd.concat(self.compiled_all_equal_rows),
bucket_name=self.epc_bucket_name,
file_key=self.epc_all_equal_rows_key,
)
# save_dataframe_to_s3_parquet(
# df=pd.concat(self.compiled_all_equal_rows),
# bucket_name=self.epc_bucket_name,
# file_key=self.epc_all_equal_rows_key,
# )
save_dataframe_to_s3_parquet(
df=pd.concat(self.compiled_cleaning_averages),
bucket_name=self.epc_bucket_name,
file_key=self.epc_cleaning_dataset_key,
)
# save_dataframe_to_s3_parquet(
# df=pd.concat(self.compiled_cleaning_averages),
# bucket_name=self.epc_bucket_name,
# file_key=self.epc_cleaning_dataset_key,
# )
def process_directory(self, directory: Path):
"""
@ -131,7 +131,7 @@ class EPCPipeline:
"""
filepath = directory / self.epc_local_file
self.epc_data_processor.pre_process(filepath=filepath)
self.epc_data_processor.prepare_data(filepath=filepath)
constituency_data = self.epc_data_processor.data
self.compiled_cleaning_averages.append(self.epc_data_processor.cleaning_averages)

View file

@ -1,625 +1,28 @@
import pandas as pd
import numpy as np
from tqdm import tqdm
import msgpack
from typing import List
from pathlib import Path
from etl.epc.settings import (
MANDATORY_FIXED_FEATURES,
LATEST_FIELD,
COMPONENT_FEATURES,
RDSAP_RESPONSE,
HEAT_DEMAND_RESPONSE,
CARBON_RESPONSE,
CORE_COMPONENT_FEATURES,
EFFICIENCY_FEATURES,
POTENTIAL_COLUMNS,
MINIMUM_FLOOR_HEIGHT
)
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.Record import EPCRecord, EPCDifferenceRecord
from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3
from recommendations.rdsap_tables import england_wales_age_band_lookup
from recommendations.recommendation_utils import (
get_wall_u_value, get_roof_u_value, get_floor_u_value, estimate_perimeter,
get_wall_type
)
from etl.epc.Pipeline import EPCPipeline
DATA_DIRECTORY = Path(__file__).parent / "local_data" / "all-domestic-certificates"
# TODO: change in setting file
MANDATORY_FIXED_FEATURES = [x.lower() for x in MANDATORY_FIXED_FEATURES]
LATEST_FIELD = [x.lower() for x in LATEST_FIELD]
COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES]
RDSAP_RESPONSE = RDSAP_RESPONSE.lower()
HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower()
CARBON_RESPONSE = CARBON_RESPONSE.lower()
CORE_COMPONENT_FEATURES = [x.lower() for x in CORE_COMPONENT_FEATURES]
EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES]
POTENTIAL_COLUMNS = [x.lower() for x in POTENTIAL_COLUMNS]
VARIABLE_DATA_FEATURES = COMPONENT_FEATURES + EFFICIENCY_FEATURES + POTENTIAL_COLUMNS + [
"lodgement_date", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, CARBON_RESPONSE
]
def get_cleaned_description_mapping():
"""
This function will retrieve the cleaned dataset from s3 which has the cleaned
descriptions for the epc dataset
This data is stored in MessagePack format and therefore needs to be decoded
:return:
"""
cleaned = read_from_s3(
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name="retrofit-data-dev"
)
cleaned = msgpack.unpackb(cleaned, raw=False)
return cleaned
def process_and_prune_desriptions(df, cleaned_lookup):
"""
This method will merge on the cleaned lookup table and ensure that the building fabric in the
starting and ending EPC is consistent, so ensure that we are performing our modelling on the cleanest
possible dataset.
:param df:
:param cleaned_lookup:
:return:
"""
cols_to_drop = {
"walls": [
# We need to cleaned descriptions for pulling out u-values
'original_description', 'thermal_transmittance_unit',
'original_description_ENDING',
'thermal_transmittance_unit_ENDING',
'is_cavity_wall_ENDING', 'is_filled_cavity_ENDING',
'is_solid_brick_ENDING', 'is_system_built_ENDING',
'is_timber_frame_ENDING', 'is_granite_or_whinstone_ENDING',
'is_as_built_ENDING', 'is_cob_ENDING', 'is_assumed_ENDING',
'is_sandstone_or_limestone_ENDING',
# Re remove the is_assumed columns
"is_assumed", "is_assumed_ENDING"
],
"floor": [
"original_description", "clean_description", "thermal_transmittance_unit",
"no_data", "no_data_ENDING", "original_description_ENDING",
"clean_description_ENDING", "thermal_transmittance_unit_ENDING",
"is_suspended_ENDING", "is_solid_ENDING", "another_property_below_ENDING",
"is_to_unheated_space_ENDING", "is_to_external_air_ENDING", "is_assumed",
"is_assumed_ENDING"
],
"roof": [
"original_description", "clean_description", "thermal_transmittance_unit",
"is_assumed", "is_valid", "original_description_ENDING", "clean_description_ENDING",
"thermal_transmittance_unit_ENDING", "is_pitched_ENDING", "is_roof_room_ENDING",
"is_loft_ENDING", "is_flat_ENDING", "is_thatched_ENDING", "is_at_rafters_ENDING",
"has_dwelling_above_ENDING", "is_assumed_ENDING", "is_valid_ENDING"
],
"hotwater": [
"original_description", "clean_description", "assumed", "original_description_ENDING",
"clean_description_ENDING", "assumed_ENDING"
],
"mainheat": [
"original_description", "clean_description", "original_description_ENDING",
"has_assumed", "original_description_ENDING", "clean_description_ENDING",
"has_assumed_ENDING",
],
"mainheatcont": [
"original_description", "clean_description", "original_description_ENDING", "clean_description_ENDING"
],
"windows": [
"original_description", "clean_description", "original_description_ENDING", "clean_description_ENDING",
# We don't need many of the glazing coverage features because we have the multi_glaze_proportion feature
"has_glazing", "glazing_coverage", "no_data", "has_glazing_ENDING", "glazing_coverage_ENDING",
"no_data_ENDING"
],
"main-fuel": [
"original_description", "clean_description", "original_description_ENDING", "clean_description_ENDING"
],
}
for component in ["walls", "floor", "roof", "hotwater", "mainheat", "mainheatcont", "windows", "main-fuel"]:
component_upper = component.upper()
if component == "main-fuel":
component_upper = component_upper.replace("-", "_")
cleaned_key = "main-fuel" if component == "main-fuel" else f"{component}-description"
left_on_starting = (
f"{component_upper}_STARTING" if component == "main-fuel" else f"{component_upper}_DESCRIPTION_STARTING"
)
left_on_ending = (
f"{component_upper}_ENDING" if component == "main-fuel" else f"{component_upper}_DESCRIPTION_ENDING"
)
df = df.merge(
pd.DataFrame(cleaned_lookup[cleaned_key]),
how="left",
left_on=left_on_starting,
right_on="original_description",
).merge(
pd.DataFrame(cleaned_lookup[cleaned_key]),
how="left",
left_on=left_on_ending,
right_on="original_description",
suffixes=("", "_ENDING")
)
if component == "walls":
# We make sure the wall construction hasn't changed
df = df[
(df["is_cavity_wall"] == df["is_cavity_wall_ENDING"]) &
(df["is_solid_brick"] == df["is_solid_brick_ENDING"]) &
(df["is_timber_frame"] == df["is_timber_frame_ENDING"]) &
(df["is_granite_or_whinstone"] == df["is_granite_or_whinstone_ENDING"]) &
(df["is_cob"] == df["is_cob_ENDING"]) &
(df["is_sandstone_or_limestone"] == df["is_sandstone_or_limestone_ENDING"])
]
elif component == "floor":
df = df[
(df["is_suspended"] == df["is_suspended_ENDING"]) &
(df["is_solid"] == df["is_solid_ENDING"]) &
(df["another_property_below"] == df["another_property_below_ENDING"]) &
(df["is_to_unheated_space"] == df["is_to_unheated_space_ENDING"]) &
(df["is_to_external_air"] == df["is_to_external_air_ENDING"])
]
elif component == "roof":
df = df[
(df["is_pitched"] == df["is_pitched_ENDING"]) &
(df["is_roof_room"] == df["is_roof_room_ENDING"]) &
(df["is_loft"] == df["is_loft_ENDING"]) &
(df["is_flat"] == df["is_flat_ENDING"]) &
(df["is_thatched"] == df["is_thatched_ENDING"]) &
(df["is_at_rafters"] == df["is_at_rafters_ENDING"]) &
(df["has_dwelling_above"] == df["has_dwelling_above_ENDING"])
]
# Drop the binary indicators and replace the original description with the cleaned version
# Drop original cols
original_cols = [
f"{component_upper}_DESCRIPTION_STARTING", f"{component_upper}_DESCRIPTION_ENDING"
] if component != "main-fuel" else [
f"{component_upper}_STARTING", f"{component_upper}_ENDING"
]
df = df.drop(columns=cols_to_drop[component] + original_cols)
# If we have an insulation_thickness column, rename it
if "insulation_thickness" in cleaned_lookup[cleaned_key][0]:
df = df.rename(
columns={
"insulation_thickness": f"{component}_insulation_thickness",
"insulation_thickness_ENDING": f"{component}_insulation_thickness_ENDING",
}
)
# If we have thermal transmittance, rename it
if "thermal_transmittance" in cleaned_lookup[cleaned_key][0]:
df = df.rename(
columns={
"thermal_transmittance": f"{component}_thermal_transmittance",
"thermal_transmittance_ENDING": f"{component}_thermal_transmittance_ENDING",
}
)
# If we have tarrif, rename it
if "tariff_type" in cleaned_lookup[cleaned_key][0]:
df = df.rename(
columns={
"tariff_type": f"{component}_tariff_type",
"tariff_type_ENDING": f"{component}_tariff_type_ENDING",
}
)
# We need the walls descriptions so we rename them to distinguish them
if component == "walls":
df = df.rename(
columns={
"clean_description": f"{component}_clean_description",
"clean_description_ENDING": f"{component}_clean_description_ENDING",
}
)
# We don't need any lighting specific cleaning, we just drop the original description as we use
# LOW_ENERGY_LIGHTING_STARTING, LOW_ENERGY_LIGHTING_ENDING
df = df.drop(columns=["LIGHTING_DESCRIPTION_STARTING", "LIGHTING_DESCRIPTION_ENDING"])
return df
def make_uvalues(df):
df["row_index"] = df.index
uvalues = []
# TODO: iterrows is the slowest way to do this, we should use a vectorised approach or itertuples
for index_no, x in df.iterrows():
uprn = x["uprn"]
row_index = x["row_index"]
age_band = england_wales_age_band_lookup[x["construction_age_band"]]
# ~~~~~~~~~~~~~~~~~~
# Walls
# ~~~~~~~~~~~~~~~~~~
starting_wall_uvalue = x["walls_thermal_transmittance"]
if pd.isnull(starting_wall_uvalue):
starting_wall_uvalue = get_wall_u_value(
clean_description=x["walls_clean_description"],
age_band=age_band,
is_granite_or_whinstone=x["is_granite_or_whinstone"],
is_sandstone_or_limestone=x["is_sandstone_or_limestone"],
)
ending_wall_uvalue = x["walls_thermal_transmittance_ending"]
if pd.isnull(ending_wall_uvalue):
if x["walls_clean_description"] != x["walls_clean_description_ending"]:
ending_wall_uvalue = get_wall_u_value(
clean_description=x["walls_clean_description_ending"],
age_band=age_band,
is_granite_or_whinstone=x["is_granite_or_whinstone"],
is_sandstone_or_limestone=x["is_sandstone_or_limestone"],
)
else:
ending_wall_uvalue = starting_wall_uvalue
# ~~~~~~~~~~~~~~~~~~
# Roof
# ~~~~~~~~~~~~~~~~~~
if x["has_dwelling_above"]:
if x["roof_thermal_transmittance"] != 0:
raise ValueError("Should have 0 u-value for roof")
if x["roof_thermal_transmittance_ENDING"] != 0:
raise ValueError("Should have 0 u-value for roof")
starting_roof_uvalue = x["roof_thermal_transmittance"]
if pd.isnull(starting_roof_uvalue):
starting_roof_uvalue = get_roof_u_value(
insulation_thickness=x["roof_insulation_thickness"],
has_dwelling_above=x["has_dwelling_above"],
is_loft=x["is_loft"],
is_roof_room=x["is_roof_room"],
is_thatched=x["is_thatched"],
is_flat=x["is_flat"],
is_pitched=x["is_pitched"],
is_at_rafters=x["is_at_rafters"],
age_band=age_band
)
ending_roof_uvalue = x["roof_thermal_transmittance_ENDING"]
if pd.isnull(ending_roof_uvalue):
ending_roof_uvalue = get_roof_u_value(
insulation_thickness=x["roof_insulation_thickness_ENDING"],
has_dwelling_above=x["has_dwelling_above"],
is_loft=x["is_loft"],
is_roof_room=x["is_roof_room"],
is_thatched=x["is_thatched"],
is_flat=x["is_flat"],
is_pitched=x["is_pitched"],
is_at_rafters=x["is_at_rafters"],
age_band=age_band
)
# ~~~~~~~~~~~~~~~~~~
# Floor
# ~~~~~~~~~~~~~~~~~~
perimeters = {}
for suffix in ["_STARTING", "_ENDING"]:
floor_area = x[f"TOTAL_FLOOR_AREA{suffix}"]
n_rooms = x["NUMBER_HABITABLE_ROOMS"]
perimeters[f"estimated_perimeter{suffix}"] = estimate_perimeter(floor_area, n_rooms)
floor_type = "suspended" if x["is_suspended"] else "solid"
wall_type = get_wall_type(**x)
if x["another_property_below"]:
if x["floor_thermal_transmittance"] != 0:
raise ValueError("Should have 0 u-value for floor")
if x["floor_thermal_transmittance_ENDING"] != 0:
raise ValueError("Should have 0 u-value for floor")
starting_floor_uvalue, ending_floor_uvalue = 0, 0
else:
starting_floor_uvalue = x["floor_thermal_transmittance"]
ending_floor_uvalue = x["floor_thermal_transmittance_ENDING"]
if pd.isnull(starting_floor_uvalue):
starting_floor_uvalue = get_floor_u_value(
floor_type=floor_type,
perimeter=perimeters["estimated_perimeter_STARTING"],
area=x[f"TOTAL_FLOOR_AREA_STARTING"],
insulation_thickness=x["floor_insulation_thickness"],
wall_type=wall_type,
age_band=age_band
)
if pd.isnull(ending_floor_uvalue):
ending_floor_uvalue = get_floor_u_value(
floor_type=floor_type,
perimeter=perimeters["estimated_perimeter_ENDING"],
area=x[f"TOTAL_FLOOR_AREA_ENDING"],
insulation_thickness=x["floor_insulation_thickness_ENDING"],
wall_type=wall_type,
age_band=age_band
)
uvalues.append(
{
"UPRN": uprn,
"row_index": row_index,
"starting_walls_uvalue": starting_wall_uvalue,
"ending_walls_uvalue": ending_wall_uvalue,
"starting_roof_uvalue": starting_roof_uvalue,
"ending_roof_uvalue": ending_roof_uvalue,
"starting_floor_uvalue": starting_floor_uvalue,
"ending_floor_uvalue": ending_floor_uvalue,
**perimeters
}
)
uvalues = pd.DataFrame(uvalues)
df = df.merge(
uvalues, how="left", on=["UPRN", "row_index"]
).drop(columns="row_index")
# Fill missings
for component in ["walls", "floor", "roof"]:
for suffix in ["", "_ENDING"]:
fill_col = f"starting_{component}_uvalue" if suffix == "" else f"ending_{component}_uvalue"
df[f"{component}_thermal_transmittance{suffix}"] = np.where(
pd.isnull(df[f"{component}_thermal_transmittance{suffix}"]),
df[fill_col],
df[f"{component}_thermal_transmittance{suffix}"]
)
df = df.drop(
columns=[
"starting_walls_uvalue", "ending_walls_uvalue", "starting_roof_uvalue",
"ending_roof_uvalue", "starting_floor_uvalue", "ending_floor_uvalue"
]
)
return df
# def compare_records(earliest_record: pd.Series, latest_record: pd.Series, columns: list):
# """
# For a list of columns, check if the earliest and latest record are the same
# If they are the same, we indicate this, because we have example of SAP scores changing
# without any feature changes
# :param earliest_record: pd.Series
# :param latest_record: pd.Series
# :param columns: list of columns to compare
# :return: boolean indicating whether or not all features are the same
# """
# all_equal = True
# for col in columns:
# if earliest_record[col] != latest_record[col]:
# return False
# if all_equal:
# return True
def generate_property_difference_records(epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, all_equal_rows: list):
"""
We can use multiple types of comparison datasets, for example:
- First vs second
- Second vs third
- First vs third
:param epc_records:
:return:
"""
property_difference_records: list = []
property_difference_records, all_equal_rows = compare_consecutive_epcs(epc_records, uprn, directory, fixed_data, property_difference_records, all_equal_rows)
# property_difference_records, all_equal_rows = compare_all_permutation_epcs(epc_records, uprn, directory, fixed_data, property_difference_records, all_equal_rows)
return property_difference_records, all_equal_rows
def compare_all_permutation_epcs(epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, property_difference_records: list, all_equal_rows: list):
"""
Compare all permutations of EPCs for a given UPRN
:param epc_records:
:return:
"""
for idx in range(0, len(epc_records) - 1):
for idx2 in range(idx + 1, len(epc_records)):
earliest_record: EPCRecord = epc_records[idx]
latest_record: EPCRecord = epc_records[idx2]
# Auto sort the records so that the record with highest RDSAP score is always record1
difference_record: EPCDifferenceRecord = latest_record - earliest_record
# TODO: Pull out RDSAP_CHANGE to a variable
if difference_record.get("rdsap_change") == 0:
continue
all_equal = difference_record.compare_fields_in_records(
fields=[x.lower() for x in CORE_COMPONENT_FEATURES]
)
if all_equal:
# Keep track of this for the moment so we can analyse
all_equal_rows.append({"uprn": uprn, "directory_name": directory.name})
continue
difference_record.append_fixed_data(fixed_data)
property_difference_records.append(difference_record)
return property_difference_records, all_equal_rows
def compare_consecutive_epcs(epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict, property_difference_records: list, all_equal_rows: list):
"""
Compare consecutive EPCs for a given UPRN
:param epc_records:
:return:
"""
for idx in range(0, len(epc_records) - 1):
if idx >= len(epc_records) - 1:
break
earliest_record: EPCRecord = epc_records[idx]
latest_record: EPCRecord = epc_records[idx + 1]
# Auto sort the records so that the record with highest RDSAP score is always record1
difference_record: EPCDifferenceRecord = latest_record - earliest_record
# TODO: Pull out RDSAP_CHANGE to a variable
if difference_record.get("rdsap_change") == 0:
continue
all_equal = difference_record.compare_fields_in_records(
fields=[x.lower() for x in CORE_COMPONENT_FEATURES]
)
if all_equal:
# Keep track of this for the moment so we can analyse
all_equal_rows.append({"uprn": uprn, "directory_name": directory.name})
continue
difference_record.append_fixed_data(fixed_data)
property_difference_records.append(difference_record)
return property_difference_records, all_equal_rows
from etl.epc.Dataset import TrainingDataset
def app():
# Get all the files in the directory
# Data glossary:
# https://epc.opendatacommunities.org/docs/guidance#glossary
cleaned_lookup = get_cleaned_description_mapping()
# List all subdirectories
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
dataset = []
cleaning_dataset = []
# Keep track of the all equals
all_equal_rows = []
for directory in tqdm(directories):
filepath = directory / "certificates.csv"
epc_data_processor = EPCDataProcessor(filepath=filepath)
epc_data_processor.pre_process()
df = epc_data_processor.data
cleaning_dataset.append(epc_data_processor.cleaning_averages)
constituency_difference_records = []
for uprn, property_data in df.groupby("uprn", observed=True):
# If a property has changed building type, we can ignore the epc rating i.e. this should be 1 unique row
if any(property_data[MANDATORY_FIXED_FEATURES].nunique() > 1) or (
pd.isnull(property_data[MANDATORY_FIXED_FEATURES]).sum().sum() > 0
):
continue
# Fixed features - these are property attributes that shouldn't change over time
# Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS and combine all fields together
fixed_data = property_data[MANDATORY_FIXED_FEATURES + LATEST_FIELD].iloc[-1].to_dict()
# We include the lodgement date here as we probably need to factor time into the
# model, since EPC standards and rigour have changed over time
variable_data = property_data[VARIABLE_DATA_FEATURES]
uprn = str(uprn)
epc_records = [EPCRecord(uprn, **x, run_mode="training") for x in variable_data.to_dict(orient='records')]
# TODO: We want to be able to provide value for the u values in the main pipeline so this will need to be part of the EPCRecord
# We can use multiple types of comparison datasets - i.e. Compare consecutive records, or compare all permutations of records
property_difference_records, all_equal_rows = generate_property_difference_records(epc_records, uprn, directory, fixed_data, all_equal_rows)
constituency_difference_records.extend(property_difference_records)
constituency_dataset = TrainingDataset(datasets=constituency_difference_records, cleaned_lookup=cleaned_lookup)
constituency_dataset_df = constituency_dataset.df
dataset.append(constituency_dataset_df)
print("Final all equal count: %s" % str(len(all_equal_rows)))
# Store cleaning dataset in s3 as a parquet file
cleaning_dataset = pd.concat(cleaning_dataset)
save_dataframe_to_s3_parquet(
df=cleaning_dataset,
bucket_name="retrofit-data-dev",
file_key="sap_change_model/cleaning_dataset.parquet",
)
output = pd.concat(dataset)
# TODO: move into difference record
# Remove any records that have huge swings in their floor area
# Move this into TrainingDataset as this won't be run in newdata
# output["tfa_diff_abs"] = abs(output["TOTAL_FLOOR_AREA_ENDING"] - output["TOTAL_FLOOR_AREA_STARTING"])
# output["tfa_diff_prop"] = output["tfa_diff_abs"] / output["TOTAL_FLOOR_AREA_STARTING"]
# output = output[output["tfa_diff_prop"] < 0.5]
# output = output.drop(columns=["tfa_diff_abs", "tfa_diff_prop"])
# # TODO: move into EPCRecord record
# uvalue_columns = [col for col in output.columns if "thermal_transmittance" in col]
# for uvalue_col in uvalue_columns:
# output[uvalue_col] = pd.to_numeric(output[uvalue_col])
save_dataframe_to_s3_parquet(
df=output,
bucket_name="retrofit-data-dev",
file_key="sap_change_model/dataset.parquet",
)
# Store all_equal_rows
all_equal_rows = pd.DataFrame(all_equal_rows)
save_dataframe_to_s3_parquet(
df=all_equal_rows,
bucket_name="retrofit-data-dev",
file_key="sap_change_model/all_equal_rows.parquet",
)
from etl.epc.Pipeline import EPCPipeline
def main():
"""
Orchestration function
"""
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
directories = directories[0:2]
epc_pipeline = EPCPipeline(directories=directories, epc_data_processor=EPCDataProcessor(run_mode="training"))
epc_pipeline.run()
# For testing
epc_pipeline.compiled_dataset
epc_pipeline.compiled_all_equal_rows
import pandas as pd
pd.concat(epc_pipeline.compiled_cleaning_averages)
if __name__ == "__main__":