Model/etl/epc/Pipeline.py
2025-11-02 09:44:41 +00:00

477 lines
17 KiB
Python

import msgpack
import pandas as pd
from datetime import datetime
from typing import List
from pathlib import Path
from tqdm import tqdm
import multiprocessing as mp
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.Record import EPCRecord, EPCDifferenceRecord
from etl.epc.Dataset import TrainingDataset
from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3
from etl.epc.settings import (
MANDATORY_FIXED_FEATURES,
LATEST_FIELD,
COMPONENT_FEATURES,
RDSAP_RESPONSE,
HEAT_DEMAND_RESPONSE,
CARBON_RESPONSE,
CORE_COMPONENT_FEATURES,
EFFICIENCY_FEATURES,
POTENTIAL_COLUMNS,
ROOM_FEATURES,
COST_FEATURES,
POST_SAP10_FEATURE,
)
# TODO: change in setting file
MANDATORY_FIXED_FEATURES = [x.lower() for x in MANDATORY_FIXED_FEATURES]
# LATEST_FIELD = [x.lower() for x in LATEST_FIELD if x.lower() not in ROOM_FEATURES]
LATEST_FIELD = [x.lower() for x in LATEST_FIELD]
COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES]
RDSAP_RESPONSE = RDSAP_RESPONSE.lower()
HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower()
CARBON_RESPONSE = CARBON_RESPONSE.lower()
CORE_COMPONENT_FEATURES = [x.lower() for x in CORE_COMPONENT_FEATURES]
EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES]
POTENTIAL_COLUMNS = [x.lower() for x in POTENTIAL_COLUMNS]
VARIABLE_DATA_FEATURES = (
COMPONENT_FEATURES
+ ROOM_FEATURES
+ EFFICIENCY_FEATURES
+ POTENTIAL_COLUMNS
+ ["lodgement_date", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, CARBON_RESPONSE]
)
COST_FEATURES = [x.lower() for x in COST_FEATURES]
def get_cleaned_description_mapping():
"""
This function will retrieve the cleaned dataset from s3 which has the cleaned
descriptions for the epc dataset
This data is stored in MessagePack format and therefore needs to be decoded
:return:
"""
cleaned = read_from_s3(
s3_file_name="cleaned_epc_data/cleaned.bson", bucket_name="retrofit-data-dev"
)
cleaned = msgpack.unpackb(cleaned, raw=False)
return cleaned
clean_lookup = get_cleaned_description_mapping()
# TODO: THIS IS A TEMPORARY FIX
new_walls_description_mapping = pd.DataFrame(clean_lookup["walls-description"])
new_walls_description_mapping.loc[
~new_walls_description_mapping["thermal_transmittance_unit"].isnull(),
"thermal_transmittance_unit",
] = "w/m-¦k"
clean_lookup["walls-description"] = new_walls_description_mapping.to_dict(
orient="records"
)
# TODO: THIS IS A TEMPORARY FIX
new_mainheatcont_mapping = pd.DataFrame(clean_lookup["mainheatcont-description"])
new_mainheatcont_mapping.loc[
new_mainheatcont_mapping["original_description"] == "SAP:Main-Heating-Controls",
[
"thermostatic_control",
"charging_system",
"switch_system",
"no_control",
"dhw_control",
"community_heating",
"multiple_room_thermostats",
"auxiliary_systems",
"trvs",
"rate_control",
],
] = None
new_mainheatcont_mapping.loc[
new_mainheatcont_mapping["original_description"] == " ",
[
"thermostatic_control",
"charging_system",
"switch_system",
"no_control",
"dhw_control",
"community_heating",
"multiple_room_thermostats",
"auxiliary_systems",
"trvs",
"rate_control",
],
] = None
clean_lookup["mainheatcont-description"] = new_mainheatcont_mapping.to_dict(
orient="records"
)
# TEMP FIX - GRANITE OR WHINSTONE BOOLEAN ISSUE
new_walls_description_mapping = pd.DataFrame(clean_lookup["walls-description"])
new_walls_description_mapping.loc[
new_walls_description_mapping["original_description"].str.contains("Granite"),
"is_granite_or_whinstone",
] = True
clean_lookup["walls-description"] = new_walls_description_mapping.to_dict(
orient="records"
)
class EPCPipeline:
"""
This class will take a list of directories and process them to create a dataset:
- Load the data
- Pre-process the data
- Create a dataset
- Clean the dataset
- Store the dataset
"""
def __init__(
self,
epc_data_processor: EPCDataProcessor,
api_epc_records: dict = None,
directories: List[Path] | None = None,
run_mode="training",
epc_local_file="certificates.csv",
epc_bucket_name="retrofit-data-dev",
epc_cleaning_dataset_key="sap_change_model/{}/cleaning_dataset_rooms.parquet",
epc_all_equal_rows_key="sap_change_model/{}/all_equal_rows_rooms.parquet",
epc_compiled_dataset_key="sap_change_model/{}/dataset_rooms.parquet",
use_parallel=False,
):
"""
:param directories: List of directories to process
:param epc_data_processor: EPCDataProcessor object
:param run_mode: Either training or newdata
:param epc_local_file: Local file name of the EPC data
:param epc_bucket_name: S3 bucket name
:param epc_cleaning_dataset_key: S3 key for the cleaning dataset
:param epc_all_equal_rows_key: S3 key for the all equal rows dataset
:param epc_compiled_dataset_key: S3 key for the compiled dataset
"""
self.compiled_dataset: pd.DataFrame = pd.DataFrame()
self.compiled_all_equal_rows: list = []
self.compiled_cleaning_averages: list = []
self.directories = directories
self.epc_data_processor = epc_data_processor
self.api_epc_records = api_epc_records
self.run_mode = run_mode
self.epc_local_file = epc_local_file
self.epc_bucket_name = epc_bucket_name
self.use_parallel = use_parallel
self.timeprefix = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
self.epc_cleaning_dataset_key = epc_cleaning_dataset_key.format(self.timeprefix)
self.epc_all_equal_rows_key = epc_all_equal_rows_key.format(self.timeprefix)
self.epc_compiled_dataset_key = epc_compiled_dataset_key.format(self.timeprefix)
def run(self):
"""
Entrypoint to run the pipeline
"""
if self.run_mode == "training":
self.run_training_dataset_pipeline()
elif self.run_mode == "newdata":
self.run_newdata_dataset_pipeline()
else:
raise ValueError("Run mode defined needs to be in 'training' or 'newdata'")
def run_newdata_dataset_pipeline(self):
"""
Main function to run the newdata pipeline
"""
prepared_epc = EPCRecord(
self.api_epc_records, run_mode="newdata"
) # This uses all the epc records to clean the data
self.epc_data_processor.insert_data(prepared_epc)
self.epc_data_processor.prepare_data()
data = self.epc_data_processor.data
epc_records = [
EPCRecord(**x, run_mode="newdata") for x in data.to_dict(orient="records")
]
def run_training_dataset_pipeline(self):
"""
Main function to run the training dataset generation pipeline
"""
if self.directories is None:
raise ValueError(
"Directories not specified - Unable to run Training pipeline"
)
if self.use_parallel:
self.run_training_dataset_parallel_pipeline()
else:
for directory in tqdm(self.directories):
self.process_directory(directory)
save_dataframe_to_s3_parquet(
df=self.compiled_dataset,
bucket_name=self.epc_bucket_name,
file_key=self.epc_compiled_dataset_key,
)
save_dataframe_to_s3_parquet(
df=pd.DataFrame(self.compiled_all_equal_rows),
bucket_name=self.epc_bucket_name,
file_key=self.epc_all_equal_rows_key,
)
save_dataframe_to_s3_parquet(
df=pd.concat(self.compiled_cleaning_averages),
bucket_name=self.epc_bucket_name,
file_key=self.epc_cleaning_dataset_key,
)
def run_training_dataset_parallel_pipeline(self):
"""
Run the training pipeline in parallel
"""
with mp.Pool() as pool:
results = list(
tqdm(
pool.imap(self.process_directory_task, self.directories),
total=len(self.directories),
),
)
for result in tqdm(results):
self.compiled_dataset = pd.concat(
[self.compiled_dataset, result["dataset"]]
)
self.compiled_cleaning_averages.append(result["cleaning_averages"])
self.compiled_all_equal_rows.extend(result["all_equal_rows"])
def process_directory_task(self, directory: str) -> pd.DataFrame:
"""
Task to enable parallel processing
"""
self.process_directory(directory=directory)
output = {
"dataset": self.compiled_dataset,
"cleaning_averages": self.epc_data_processor.cleaning_averages,
"all_equal_rows": self.compiled_all_equal_rows,
}
return output
def process_directory(self, directory: Path):
"""
Process a single directory
:param directory:
:return:
"""
filepath = directory / self.epc_local_file
self.epc_data_processor.prepare_data(filepath=filepath)
constituency_data = self.epc_data_processor.data
self.compiled_cleaning_averages.append(
self.epc_data_processor.cleaning_averages
)
constituency_difference_records = []
for uprn, property_data in constituency_data.groupby("uprn", observed=True):
difference_records = self.process_uprn(
uprn=str(uprn), property_data=property_data, directory=directory
)
if difference_records is not None:
constituency_difference_records.extend(difference_records)
constituency_dataset = TrainingDataset(
datasets=constituency_difference_records, cleaned_lookup=clean_lookup
)
self.compiled_dataset = pd.concat(
[self.compiled_dataset, constituency_dataset.df]
)
def process_uprn(self, uprn: str, property_data: pd.DataFrame, directory: Path):
"""
Process a single UPRN, which may have multiple different EPCs
:param uprn: UPRN
:param property_data: pd.DataFrame, Data for a single UPRN
:param directory: Path, Directory of the UPRN
:return:
"""
# If a property has changed building type, we can ignore the epc rating i.e. this should be 1 unique row
if any(property_data[MANDATORY_FIXED_FEATURES].nunique() > 1) or (
pd.isnull(property_data[MANDATORY_FIXED_FEATURES]).sum().sum() > 0
):
return None
# Fixed features - these are property attributes that shouldn't change over time
# Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS and combine all fields together
fixed_data = (
property_data[MANDATORY_FIXED_FEATURES + LATEST_FIELD].iloc[-1].to_dict()
)
# We include the lodgement date here as we probably need to factor time into the
# model, since EPC standards and rigour have changed over time
variable_data = property_data[
VARIABLE_DATA_FEATURES + COST_FEATURES + POST_SAP10_FEATURE
]
uprn = str(uprn)
epc_records = [
EPCRecord(uprn, **x, run_mode="training")
for x in variable_data.to_dict(orient="records")
]
# TODO: We want to be able to provide value for the u values in the main pipeline so this will need to be
# part of the EPCRecord
# We can use multiple types of comparison datasets - i.e. Compare consecutive records, or compare all
# permutations of records
property_difference_records = self._generate_property_difference_records(
epc_records, uprn, directory, fixed_data
)
return property_difference_records
def _generate_property_difference_records(
self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict
):
"""
We can use multiple types of comparison datasets, for example:
- First vs second
- Second vs third
- First vs third
:param epc_records:
:return:
"""
property_difference_records: list = []
# property_difference_records = self._compare_consecutive_epcs(epc_records, uprn, directory, fixed_data,
# property_difference_records)
property_difference_records = self._compare_all_permutation_epcs(
epc_records, uprn, directory, fixed_data, property_difference_records
)
return property_difference_records
def _compare_all_permutation_epcs(
self,
epc_records: List[EPCRecord],
uprn: str,
directory: Path,
fixed_data: dict,
property_difference_records: list,
):
"""
Compare all permutations of EPCs for a given UPRN
:param epc_records:
:return:
"""
for idx in range(0, len(epc_records) - 1):
for idx2 in range(idx + 1, len(epc_records)):
earliest_record: EPCRecord = epc_records[idx]
latest_record: EPCRecord = epc_records[idx2]
# Auto sort the records so that the record with highest RDSAP score is always record1
difference_record: EPCDifferenceRecord = (
latest_record.create_EPCDifferenceRecord(
other=earliest_record, fixed_data=fixed_data
)
)
# difference_record: EPCDifferenceRecord = latest_record - earliest_record
# # TODO: Use method above instead of overloading operator
# difference_record.append_fixed_data(fixed_data)
# TODO: Pull out RDSAP_CHANGE to a variable
if difference_record.get("rdsap_change") == 0:
if not difference_record.ensure_adequate_data():
# Rdsap hasn't changed but we have enough data to use this record
# i.e. all fields aside from mechnical ventilation are the same]
# self.check_records.append({"uprn": uprn, "directory_name": directory.name,
# "difference_record": difference_record, "earliest_record": earliest_record,
# "latest_record": latest_record})
continue
all_equal = difference_record.compare_fields_in_records(
fields=[x.lower() for x in CORE_COMPONENT_FEATURES]
)
if all_equal:
# Keep track of this for the moment so we can analyse
self.compiled_all_equal_rows.append(
{"uprn": uprn, "directory_name": directory.name}
)
continue
property_difference_records.append(difference_record)
return property_difference_records
def _compare_consecutive_epcs(
self,
epc_records: List[EPCRecord],
uprn: str,
directory: Path,
fixed_data: dict,
property_difference_records: list,
):
"""
Compare consecutive EPCs for a given UPRN
:param epc_records:
:return:
"""
for idx in range(0, len(epc_records) - 1):
if idx >= len(epc_records) - 1:
break
earliest_record: EPCRecord = epc_records[idx]
latest_record: EPCRecord = epc_records[idx + 1]
# Auto sort the records so that the record with highest RDSAP score is always record1
difference_record: EPCDifferenceRecord = latest_record - earliest_record
# TODO: Use method above instead of overloading operator
difference_record.append_fixed_data(fixed_data)
# TODO: Pull out RDSAP_CHANGE to a variable
if difference_record.get("rdsap_change") == 0:
if not difference_record.ensure_adequate_data():
# Rdsap hasn't changed but we have enough data to use this record
# i.e. all fields aside from mechnical ventilation are the same]
# self.check_records.append({"uprn": uprn, "directory_name": directory.name, "difference_record":
# difference_record, "earliest_record": earliest_record, "latest_record": latest_record})
continue
all_equal = difference_record.compare_fields_in_records(
fields=[x.lower() for x in CORE_COMPONENT_FEATURES]
)
if all_equal:
# Keep track of this for the moment so we can analyse
self.compiled_all_equal_rows.append(
{"uprn": uprn, "directory_name": directory.name}
)
continue
# difference_record.append_fixed_data(fixed_data)
property_difference_records.append(difference_record)
return property_difference_records