mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
477 lines
17 KiB
Python
477 lines
17 KiB
Python
import msgpack
|
|
import pandas as pd
|
|
from datetime import datetime
|
|
|
|
from typing import List
|
|
from pathlib import Path
|
|
from tqdm import tqdm
|
|
import multiprocessing as mp
|
|
|
|
from etl.epc.DataProcessor import EPCDataProcessor
|
|
from etl.epc.Record import EPCRecord, EPCDifferenceRecord
|
|
from etl.epc.Dataset import TrainingDataset
|
|
from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3
|
|
from etl.epc.settings import (
|
|
MANDATORY_FIXED_FEATURES,
|
|
LATEST_FIELD,
|
|
COMPONENT_FEATURES,
|
|
RDSAP_RESPONSE,
|
|
HEAT_DEMAND_RESPONSE,
|
|
CARBON_RESPONSE,
|
|
CORE_COMPONENT_FEATURES,
|
|
EFFICIENCY_FEATURES,
|
|
POTENTIAL_COLUMNS,
|
|
ROOM_FEATURES,
|
|
COST_FEATURES,
|
|
POST_SAP10_FEATURE,
|
|
)
|
|
|
|
# TODO: change in setting file
|
|
MANDATORY_FIXED_FEATURES = [x.lower() for x in MANDATORY_FIXED_FEATURES]
|
|
# LATEST_FIELD = [x.lower() for x in LATEST_FIELD if x.lower() not in ROOM_FEATURES]
|
|
LATEST_FIELD = [x.lower() for x in LATEST_FIELD]
|
|
COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES]
|
|
RDSAP_RESPONSE = RDSAP_RESPONSE.lower()
|
|
HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower()
|
|
CARBON_RESPONSE = CARBON_RESPONSE.lower()
|
|
CORE_COMPONENT_FEATURES = [x.lower() for x in CORE_COMPONENT_FEATURES]
|
|
EFFICIENCY_FEATURES = [x.lower() for x in EFFICIENCY_FEATURES]
|
|
POTENTIAL_COLUMNS = [x.lower() for x in POTENTIAL_COLUMNS]
|
|
VARIABLE_DATA_FEATURES = (
|
|
COMPONENT_FEATURES
|
|
+ ROOM_FEATURES
|
|
+ EFFICIENCY_FEATURES
|
|
+ POTENTIAL_COLUMNS
|
|
+ ["lodgement_date", RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, CARBON_RESPONSE]
|
|
)
|
|
COST_FEATURES = [x.lower() for x in COST_FEATURES]
|
|
|
|
|
|
def get_cleaned_description_mapping():
|
|
"""
|
|
This function will retrieve the cleaned dataset from s3 which has the cleaned
|
|
descriptions for the epc dataset
|
|
|
|
This data is stored in MessagePack format and therefore needs to be decoded
|
|
:return:
|
|
"""
|
|
|
|
cleaned = read_from_s3(
|
|
s3_file_name="cleaned_epc_data/cleaned.bson", bucket_name="retrofit-data-dev"
|
|
)
|
|
|
|
cleaned = msgpack.unpackb(cleaned, raw=False)
|
|
|
|
return cleaned
|
|
|
|
|
|
clean_lookup = get_cleaned_description_mapping()
|
|
|
|
# TODO: THIS IS A TEMPORARY FIX
|
|
new_walls_description_mapping = pd.DataFrame(clean_lookup["walls-description"])
|
|
new_walls_description_mapping.loc[
|
|
~new_walls_description_mapping["thermal_transmittance_unit"].isnull(),
|
|
"thermal_transmittance_unit",
|
|
] = "w/m-¦k"
|
|
|
|
clean_lookup["walls-description"] = new_walls_description_mapping.to_dict(
|
|
orient="records"
|
|
)
|
|
# TODO: THIS IS A TEMPORARY FIX
|
|
new_mainheatcont_mapping = pd.DataFrame(clean_lookup["mainheatcont-description"])
|
|
new_mainheatcont_mapping.loc[
|
|
new_mainheatcont_mapping["original_description"] == "SAP:Main-Heating-Controls",
|
|
[
|
|
"thermostatic_control",
|
|
"charging_system",
|
|
"switch_system",
|
|
"no_control",
|
|
"dhw_control",
|
|
"community_heating",
|
|
"multiple_room_thermostats",
|
|
"auxiliary_systems",
|
|
"trvs",
|
|
"rate_control",
|
|
],
|
|
] = None
|
|
new_mainheatcont_mapping.loc[
|
|
new_mainheatcont_mapping["original_description"] == " ",
|
|
[
|
|
"thermostatic_control",
|
|
"charging_system",
|
|
"switch_system",
|
|
"no_control",
|
|
"dhw_control",
|
|
"community_heating",
|
|
"multiple_room_thermostats",
|
|
"auxiliary_systems",
|
|
"trvs",
|
|
"rate_control",
|
|
],
|
|
] = None
|
|
clean_lookup["mainheatcont-description"] = new_mainheatcont_mapping.to_dict(
|
|
orient="records"
|
|
)
|
|
|
|
# TEMP FIX - GRANITE OR WHINSTONE BOOLEAN ISSUE
|
|
new_walls_description_mapping = pd.DataFrame(clean_lookup["walls-description"])
|
|
new_walls_description_mapping.loc[
|
|
new_walls_description_mapping["original_description"].str.contains("Granite"),
|
|
"is_granite_or_whinstone",
|
|
] = True
|
|
clean_lookup["walls-description"] = new_walls_description_mapping.to_dict(
|
|
orient="records"
|
|
)
|
|
|
|
|
|
class EPCPipeline:
|
|
"""
|
|
This class will take a list of directories and process them to create a dataset:
|
|
- Load the data
|
|
- Pre-process the data
|
|
- Create a dataset
|
|
- Clean the dataset
|
|
- Store the dataset
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
epc_data_processor: EPCDataProcessor,
|
|
api_epc_records: dict = None,
|
|
directories: List[Path] | None = None,
|
|
run_mode="training",
|
|
epc_local_file="certificates.csv",
|
|
epc_bucket_name="retrofit-data-dev",
|
|
epc_cleaning_dataset_key="sap_change_model/{}/cleaning_dataset_rooms.parquet",
|
|
epc_all_equal_rows_key="sap_change_model/{}/all_equal_rows_rooms.parquet",
|
|
epc_compiled_dataset_key="sap_change_model/{}/dataset_rooms.parquet",
|
|
use_parallel=False,
|
|
):
|
|
"""
|
|
:param directories: List of directories to process
|
|
:param epc_data_processor: EPCDataProcessor object
|
|
:param run_mode: Either training or newdata
|
|
:param epc_local_file: Local file name of the EPC data
|
|
:param epc_bucket_name: S3 bucket name
|
|
:param epc_cleaning_dataset_key: S3 key for the cleaning dataset
|
|
:param epc_all_equal_rows_key: S3 key for the all equal rows dataset
|
|
:param epc_compiled_dataset_key: S3 key for the compiled dataset
|
|
"""
|
|
self.compiled_dataset: pd.DataFrame = pd.DataFrame()
|
|
self.compiled_all_equal_rows: list = []
|
|
self.compiled_cleaning_averages: list = []
|
|
|
|
self.directories = directories
|
|
self.epc_data_processor = epc_data_processor
|
|
self.api_epc_records = api_epc_records
|
|
self.run_mode = run_mode
|
|
self.epc_local_file = epc_local_file
|
|
self.epc_bucket_name = epc_bucket_name
|
|
|
|
self.use_parallel = use_parallel
|
|
self.timeprefix = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
|
|
|
|
self.epc_cleaning_dataset_key = epc_cleaning_dataset_key.format(self.timeprefix)
|
|
self.epc_all_equal_rows_key = epc_all_equal_rows_key.format(self.timeprefix)
|
|
self.epc_compiled_dataset_key = epc_compiled_dataset_key.format(self.timeprefix)
|
|
|
|
def run(self):
|
|
"""
|
|
Entrypoint to run the pipeline
|
|
"""
|
|
if self.run_mode == "training":
|
|
self.run_training_dataset_pipeline()
|
|
elif self.run_mode == "newdata":
|
|
self.run_newdata_dataset_pipeline()
|
|
else:
|
|
raise ValueError("Run mode defined needs to be in 'training' or 'newdata'")
|
|
|
|
def run_newdata_dataset_pipeline(self):
|
|
"""
|
|
Main function to run the newdata pipeline
|
|
"""
|
|
prepared_epc = EPCRecord(
|
|
self.api_epc_records, run_mode="newdata"
|
|
) # This uses all the epc records to clean the data
|
|
|
|
self.epc_data_processor.insert_data(prepared_epc)
|
|
self.epc_data_processor.prepare_data()
|
|
|
|
data = self.epc_data_processor.data
|
|
|
|
epc_records = [
|
|
EPCRecord(**x, run_mode="newdata") for x in data.to_dict(orient="records")
|
|
]
|
|
|
|
def run_training_dataset_pipeline(self):
|
|
"""
|
|
Main function to run the training dataset generation pipeline
|
|
"""
|
|
if self.directories is None:
|
|
raise ValueError(
|
|
"Directories not specified - Unable to run Training pipeline"
|
|
)
|
|
|
|
if self.use_parallel:
|
|
self.run_training_dataset_parallel_pipeline()
|
|
else:
|
|
for directory in tqdm(self.directories):
|
|
self.process_directory(directory)
|
|
|
|
save_dataframe_to_s3_parquet(
|
|
df=self.compiled_dataset,
|
|
bucket_name=self.epc_bucket_name,
|
|
file_key=self.epc_compiled_dataset_key,
|
|
)
|
|
|
|
save_dataframe_to_s3_parquet(
|
|
df=pd.DataFrame(self.compiled_all_equal_rows),
|
|
bucket_name=self.epc_bucket_name,
|
|
file_key=self.epc_all_equal_rows_key,
|
|
)
|
|
|
|
save_dataframe_to_s3_parquet(
|
|
df=pd.concat(self.compiled_cleaning_averages),
|
|
bucket_name=self.epc_bucket_name,
|
|
file_key=self.epc_cleaning_dataset_key,
|
|
)
|
|
|
|
def run_training_dataset_parallel_pipeline(self):
|
|
"""
|
|
Run the training pipeline in parallel
|
|
"""
|
|
|
|
with mp.Pool() as pool:
|
|
results = list(
|
|
tqdm(
|
|
pool.imap(self.process_directory_task, self.directories),
|
|
total=len(self.directories),
|
|
),
|
|
)
|
|
|
|
for result in tqdm(results):
|
|
self.compiled_dataset = pd.concat(
|
|
[self.compiled_dataset, result["dataset"]]
|
|
)
|
|
self.compiled_cleaning_averages.append(result["cleaning_averages"])
|
|
self.compiled_all_equal_rows.extend(result["all_equal_rows"])
|
|
|
|
def process_directory_task(self, directory: str) -> pd.DataFrame:
|
|
"""
|
|
Task to enable parallel processing
|
|
"""
|
|
|
|
self.process_directory(directory=directory)
|
|
|
|
output = {
|
|
"dataset": self.compiled_dataset,
|
|
"cleaning_averages": self.epc_data_processor.cleaning_averages,
|
|
"all_equal_rows": self.compiled_all_equal_rows,
|
|
}
|
|
|
|
return output
|
|
|
|
def process_directory(self, directory: Path):
|
|
"""
|
|
Process a single directory
|
|
:param directory:
|
|
:return:
|
|
"""
|
|
filepath = directory / self.epc_local_file
|
|
|
|
self.epc_data_processor.prepare_data(filepath=filepath)
|
|
|
|
constituency_data = self.epc_data_processor.data
|
|
|
|
self.compiled_cleaning_averages.append(
|
|
self.epc_data_processor.cleaning_averages
|
|
)
|
|
|
|
constituency_difference_records = []
|
|
|
|
for uprn, property_data in constituency_data.groupby("uprn", observed=True):
|
|
difference_records = self.process_uprn(
|
|
uprn=str(uprn), property_data=property_data, directory=directory
|
|
)
|
|
if difference_records is not None:
|
|
constituency_difference_records.extend(difference_records)
|
|
|
|
constituency_dataset = TrainingDataset(
|
|
datasets=constituency_difference_records, cleaned_lookup=clean_lookup
|
|
)
|
|
|
|
self.compiled_dataset = pd.concat(
|
|
[self.compiled_dataset, constituency_dataset.df]
|
|
)
|
|
|
|
def process_uprn(self, uprn: str, property_data: pd.DataFrame, directory: Path):
|
|
"""
|
|
Process a single UPRN, which may have multiple different EPCs
|
|
:param uprn: UPRN
|
|
:param property_data: pd.DataFrame, Data for a single UPRN
|
|
:param directory: Path, Directory of the UPRN
|
|
:return:
|
|
"""
|
|
# If a property has changed building type, we can ignore the epc rating i.e. this should be 1 unique row
|
|
if any(property_data[MANDATORY_FIXED_FEATURES].nunique() > 1) or (
|
|
pd.isnull(property_data[MANDATORY_FIXED_FEATURES]).sum().sum() > 0
|
|
):
|
|
return None
|
|
|
|
# Fixed features - these are property attributes that shouldn't change over time
|
|
# Take the latest row for both the LATEST_FEILDS and MANDATORY FIELDS and combine all fields together
|
|
fixed_data = (
|
|
property_data[MANDATORY_FIXED_FEATURES + LATEST_FIELD].iloc[-1].to_dict()
|
|
)
|
|
|
|
# We include the lodgement date here as we probably need to factor time into the
|
|
# model, since EPC standards and rigour have changed over time
|
|
variable_data = property_data[
|
|
VARIABLE_DATA_FEATURES + COST_FEATURES + POST_SAP10_FEATURE
|
|
]
|
|
|
|
uprn = str(uprn)
|
|
epc_records = [
|
|
EPCRecord(uprn, **x, run_mode="training")
|
|
for x in variable_data.to_dict(orient="records")
|
|
]
|
|
|
|
# TODO: We want to be able to provide value for the u values in the main pipeline so this will need to be
|
|
# part of the EPCRecord
|
|
|
|
# We can use multiple types of comparison datasets - i.e. Compare consecutive records, or compare all
|
|
# permutations of records
|
|
property_difference_records = self._generate_property_difference_records(
|
|
epc_records, uprn, directory, fixed_data
|
|
)
|
|
|
|
return property_difference_records
|
|
|
|
def _generate_property_difference_records(
|
|
self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict
|
|
):
|
|
"""
|
|
We can use multiple types of comparison datasets, for example:
|
|
- First vs second
|
|
- Second vs third
|
|
- First vs third
|
|
:param epc_records:
|
|
:return:
|
|
"""
|
|
|
|
property_difference_records: list = []
|
|
|
|
# property_difference_records = self._compare_consecutive_epcs(epc_records, uprn, directory, fixed_data,
|
|
# property_difference_records)
|
|
|
|
property_difference_records = self._compare_all_permutation_epcs(
|
|
epc_records, uprn, directory, fixed_data, property_difference_records
|
|
)
|
|
|
|
return property_difference_records
|
|
|
|
def _compare_all_permutation_epcs(
|
|
self,
|
|
epc_records: List[EPCRecord],
|
|
uprn: str,
|
|
directory: Path,
|
|
fixed_data: dict,
|
|
property_difference_records: list,
|
|
):
|
|
"""
|
|
Compare all permutations of EPCs for a given UPRN
|
|
:param epc_records:
|
|
:return:
|
|
"""
|
|
|
|
for idx in range(0, len(epc_records) - 1):
|
|
for idx2 in range(idx + 1, len(epc_records)):
|
|
earliest_record: EPCRecord = epc_records[idx]
|
|
latest_record: EPCRecord = epc_records[idx2]
|
|
|
|
# Auto sort the records so that the record with highest RDSAP score is always record1
|
|
difference_record: EPCDifferenceRecord = (
|
|
latest_record.create_EPCDifferenceRecord(
|
|
other=earliest_record, fixed_data=fixed_data
|
|
)
|
|
)
|
|
# difference_record: EPCDifferenceRecord = latest_record - earliest_record
|
|
# # TODO: Use method above instead of overloading operator
|
|
# difference_record.append_fixed_data(fixed_data)
|
|
|
|
# TODO: Pull out RDSAP_CHANGE to a variable
|
|
if difference_record.get("rdsap_change") == 0:
|
|
if not difference_record.ensure_adequate_data():
|
|
# Rdsap hasn't changed but we have enough data to use this record
|
|
# i.e. all fields aside from mechnical ventilation are the same]
|
|
# self.check_records.append({"uprn": uprn, "directory_name": directory.name,
|
|
# "difference_record": difference_record, "earliest_record": earliest_record,
|
|
# "latest_record": latest_record})
|
|
continue
|
|
|
|
all_equal = difference_record.compare_fields_in_records(
|
|
fields=[x.lower() for x in CORE_COMPONENT_FEATURES]
|
|
)
|
|
|
|
if all_equal:
|
|
# Keep track of this for the moment so we can analyse
|
|
self.compiled_all_equal_rows.append(
|
|
{"uprn": uprn, "directory_name": directory.name}
|
|
)
|
|
continue
|
|
|
|
property_difference_records.append(difference_record)
|
|
|
|
return property_difference_records
|
|
|
|
def _compare_consecutive_epcs(
|
|
self,
|
|
epc_records: List[EPCRecord],
|
|
uprn: str,
|
|
directory: Path,
|
|
fixed_data: dict,
|
|
property_difference_records: list,
|
|
):
|
|
"""
|
|
Compare consecutive EPCs for a given UPRN
|
|
:param epc_records:
|
|
:return:
|
|
"""
|
|
|
|
for idx in range(0, len(epc_records) - 1):
|
|
if idx >= len(epc_records) - 1:
|
|
break
|
|
|
|
earliest_record: EPCRecord = epc_records[idx]
|
|
latest_record: EPCRecord = epc_records[idx + 1]
|
|
|
|
# Auto sort the records so that the record with highest RDSAP score is always record1
|
|
difference_record: EPCDifferenceRecord = latest_record - earliest_record
|
|
# TODO: Use method above instead of overloading operator
|
|
difference_record.append_fixed_data(fixed_data)
|
|
|
|
# TODO: Pull out RDSAP_CHANGE to a variable
|
|
if difference_record.get("rdsap_change") == 0:
|
|
if not difference_record.ensure_adequate_data():
|
|
# Rdsap hasn't changed but we have enough data to use this record
|
|
# i.e. all fields aside from mechnical ventilation are the same]
|
|
# self.check_records.append({"uprn": uprn, "directory_name": directory.name, "difference_record":
|
|
# difference_record, "earliest_record": earliest_record, "latest_record": latest_record})
|
|
continue
|
|
|
|
all_equal = difference_record.compare_fields_in_records(
|
|
fields=[x.lower() for x in CORE_COMPONENT_FEATURES]
|
|
)
|
|
|
|
if all_equal:
|
|
# Keep track of this for the moment so we can analyse
|
|
self.compiled_all_equal_rows.append(
|
|
{"uprn": uprn, "directory_name": directory.name}
|
|
)
|
|
continue
|
|
|
|
# difference_record.append_fixed_data(fixed_data)
|
|
|
|
property_difference_records.append(difference_record)
|
|
|
|
return property_difference_records
|