From aa998b3b71e2791aaca058d83c99c98a15c0dd72 Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Fri, 8 Dec 2023 20:07:48 +0000 Subject: [PATCH] add notes --- backend/app/plan/router.py | 38 ++++---- etl/epc/DataProcessor.py | 140 ++++++++++++++++++++++++++++- etl/epc/Dataset.py | 47 ++++++++-- etl/epc/EPCRecord.py | 39 +++++++- etl/epc/ValidationConfiguration.py | 4 + etl/epc/property_change_app.py | 60 ++++++++++++- 6 files changed, 296 insertions(+), 32 deletions(-) diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index b5acb3c0..75b42ff1 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -145,27 +145,31 @@ async def trigger_plan(body: PlanTriggerRequest): recommendations[p.id] = property_recommendations + # TODO: p.get_model_data() -> EPCRecord + # Finally, we'll prepare data for predicting the impact on SAP - data_processor = DataProcessor(None, newdata=True) - data_processor.insert_data(pd.DataFrame([p.get_model_data()])) - # TODO: Temp - if data_processor.data["UPRN"].values[0] == "": - data_processor.data["UPRN"] = 0 + # data_processor = DataProcessor(None, newdata=True) + # data_processor.insert_data(pd.DataFrame([p.get_model_data()])) + # # TODO: Temp + # if data_processor.data["UPRN"].values[0] == "": + # data_processor.data["UPRN"] = 0 - data_processor.pre_process() + # data_processor.pre_process() - starting_epc_data = data_processor.get_component_features(suffix="_STARTING") - ending_epc_data = data_processor.get_component_features(suffix="_ENDING") - fixed_data = data_processor.get_fixed_features() + # starting_epc_data = data_processor.get_component_features(suffix="_STARTING") + # ending_epc_data = data_processor.get_component_features(suffix="_ENDING") + # fixed_data = data_processor.get_fixed_features() - # We update the ending record with the recommended updates and we set lodgement date to today - ending_epc_data["DAYS_TO_ENDING"] = data_processor.calculate_days_to(created_at) + # # We update the ending record with the recommended updates and we set lodgement date to today + # ending_epc_data["DAYS_TO_ENDING"] = data_processor.calculate_days_to(created_at) - property_scoring_data[p.id] = { - "starting_epc_data": starting_epc_data, - "ending_epc_data": ending_epc_data, - "fixed_data": fixed_data - } + # TODO: EPCRecord - AdjustedEPCRecord + + # property_scoring_data[p.id] = { + # "starting_epc_data": starting_epc_data, + # "ending_epc_data": ending_epc_data, + # "fixed_data": fixed_data + # } for recommendations_by_type in property_recommendations: for i, rec in enumerate(recommendations_by_type): @@ -180,7 +184,7 @@ async def trigger_plan(body: PlanTriggerRequest): recommendations_scoring_data.append(scoring_dict) # cleanup - del data_processor + # del data_processor logger.info("Preparing data for scoring in sap change api") recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data) diff --git a/etl/epc/DataProcessor.py b/etl/epc/DataProcessor.py index 9fc66bab..5bb54088 100644 --- a/etl/epc/DataProcessor.py +++ b/etl/epc/DataProcessor.py @@ -67,6 +67,106 @@ class DataProcessor: Handle data loading and data preprocessing """ + training_pipeline = { + "load_data": { + "function": "load_data", + "args": [], + "kwargs": {"low_memory": DATA_PROCESSOR_SETTINGS["low_memory"]}, + }, + "confine_data": { + "function": "confine_data", + "args": [], + "kwargs": {}, + }, + "remap_columns": { + "function": "remap_columns", + "args": [], + "kwargs": {}, + }, + "standardise_construction_age_band": { + "function": "standardise_construction_age_band", + "args": [], + "kwargs": {}, + }, + "clean_missing_rooms": { + "function": "clean_missing_rooms", + "args": [], + "kwargs": {}, + }, + "recast_df_columns": { + "function": "recast_df_columns", + "args": [], + "kwargs": {"column_mappings": DATA_PROCESSOR_SETTINGS["column_mappings"]}, + }, + "clean_multi_glaze_proportion": { + "function": "clean_multi_glaze_proportion", + "args": [], + "kwargs": {}, + }, + "clean_photo_supply": { + "function": "clean_photo_supply", + "args": [], + "kwargs": {}, + }, + "retain_multiple_epc_properties": { + "function": "retain_multiple_epc_properties", + "args": [], + "kwargs": {"epc_minimum_count": DATA_PROCESSOR_SETTINGS["epc_minimum_count"]}, + }, + "fill_na_fields": { + "function": "fill_na_fields", + "args": [], + "kwargs": {"columns_to_fill": COLUMNS_TO_MERGE_ON}, + }, + "cleaning_averages": { + "function": "make_cleaning_averages", + "args": [], + "kwargs": {}, + }, + "apply_averages_cleaning": { + "function": "apply_averages_cleaning", + "args": [], + "kwargs": { + "data_to_clean": "data", + "cleaning_data": "cleaning_averages", + "cols_to_merge_on": COLUMNS_TO_MERGE_ON, + }, + }, + "na_remapping": { + "function": "na_remapping", + "args": [], + "kwargs": {}, + }, + } + + newdata_pipeline = { + "remap_columns": { + "function": "remap_columns", + "args": [], + "kwargs": {}, + }, + "recast_df_columns": { + "function": "recast_df_columns", + "args": [], + "kwargs": {"column_mappings": DATA_PROCESSOR_SETTINGS["column_mappings"]}, + }, + "clean_multi_glaze_proportion": { + "function": "clean_multi_glaze_proportion", + "args": [], + "kwargs": {}, + }, + "clean_photo_supply": { + "function": "clean_photo_supply", + "args": [], + "kwargs": {}, + }, + "na_remapping": { + "function": "na_remapping", + "args": [], + "kwargs": {}, + }, + } + def __init__(self, filepath: Path | None, is_newdata: bool = False) -> None: """ :param filepath: If specified, is the physical location of the data @@ -74,11 +174,47 @@ class DataProcessor: In this instance, there are some operations we do not want to perform, such as confine_data() """ + self.data : pd.DataFrame = None + self.cleaning_averages : pd.DataFrame = None + self.filepath = filepath - self.data = None - self.cleaning_averages = None + self.pipeline_steps = self.pipeline_factory("newdata" if is_newdata else "training") self.is_newdata = is_newdata + self.load_data(low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]) + + def pre_process_pipeline(self) -> None: + """ + For the pipeline_steps, we apply each function in turn + """ + + for step in self.pipeline_steps: + step_function = getattr(self, self.pipeline_steps[step]["function"]) + step_args = self.pipeline_steps[step]["args"] + step_kwargs = self.pipeline_steps[step]["kwargs"] + + if step_args: + step_function(*step_args, **step_kwargs) + else: + step_function(**step_kwargs) + + + def pipeline_factory(self, pipeline_type: str) -> dict: + """ + Determine which dataclient to use + """ + + pipelines = { + "training": self.training_pipeline, + "newdata": self.newdata_pipeline, + } + + if pipeline_type not in pipelines: + raise ValueError("Pipeline type specified is not in factory") + + return pipelines[pipeline_type] + + def load_data(self, low_memory=False) -> None: if not self.filepath: raise ValueError("No filepath specified") diff --git a/etl/epc/Dataset.py b/etl/epc/Dataset.py index 66715083..3c38eaa6 100644 --- a/etl/epc/Dataset.py +++ b/etl/epc/Dataset.py @@ -1,22 +1,50 @@ import pandas as pd from typing import List from etl.epc.EPCRecord import EPCDifferenceRecord +from ValidationConfiguration import DatasetValidationConfiguration +from etl.epc.settings import EARLIEST_EPC_DATE -class TrainingDataset: + +class BaseDataset: + """ + Base class for all datasets + """ + + def __init__(self) -> None: + self.pipeline_steps = {} + + def validate_dataset(self): + """ + Validate the dataset against the validation configuration + """ + self.dataset_validation: dict = DatasetValidationConfiguration + + def pipeline_factory(self, pipeline_type: str) -> dict: + """ + Factory method for creating a pipeline + """ + if pipeline_type not in self.pipeline_steps: + raise ValueError(f"Pipeline type {pipeline_type} not found") + + return self.pipeline_steps[pipeline_type] + +class TrainingDataset(BaseDataset): """ A collection of EPCDifferenceRecords can be combined into a TrainingDataset. """ def __init__(self, datasets: List[EPCDifferenceRecord]) -> None: + self.pipeline_steps = self.pipeline_factory("training") self.datasets = datasets self.df = pd.DataFrame([dataset.difference_record for dataset in datasets]) + # TODO: replace these with the pipeline steps self._feature_generation() self._drop_features() - self._clean_dataframe() + # self._clean_dataframe() self._clean_efficiency_variables() self._null_validation(information="Clean Efficiency Variables") - self._process_and_prune() + # self._process_and_prune() self._clean_missing_values() self._null_validation(information="Clean Missing Values") @@ -56,7 +84,7 @@ class TrainingDataset: self.df["DAYS_TO_STARTING"] = self._calculate_days_to(self.df["LODGEMENT_DATE_STARTING"]) self.df["DAYS_TO_ENDING"] = self._calculate_days_to(self.df["LODGEMENT_DATE_ENDING"]) - def _clean_efficiency_variables(self, df): + def _clean_efficiency_variables(self): """ These is scope to clean this by the model per corresponding description. @@ -88,7 +116,7 @@ class TrainingDataset: if isinstance(lodgement_date, str): return ( pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE) - ).daye + ).days return ( pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE) @@ -108,18 +136,19 @@ class TrainingDataset: else: return self.__add__(other) -class ScoringDataset: +class NewDataset(BaseDataset): """ A collection of EPCDifferenceRecords can be combined into a ScoringDataset. """ def __init__(self, datasets: List[EPCDifferenceRecord]) -> None: + self.pipeline_steps = self.pipeline_factory("newdata") self.datasets = datasets - def __add__(self, other) -> "ScoringDataset": - if not isinstance(other, ScoringDataset): + def __add__(self, other) -> "NewDataset": + if not isinstance(other, NewDataset): raise TypeError("Addition can only be performed with another instance of ScoringDataset") - return ScoringDataset(self.datasets + other.datasets) + return NewDataset(self.datasets + other.datasets) def __radd__(self, other): """ diff --git a/etl/epc/EPCRecord.py b/etl/epc/EPCRecord.py index 73e60483..a5079f39 100644 --- a/etl/epc/EPCRecord.py +++ b/etl/epc/EPCRecord.py @@ -19,6 +19,12 @@ class EPCRecord: """ Base class for a EPC record """ + # TODO: lower case and underscore + walls = None + floor = None + roof = None + + UPRN: str WALLS_DESCRIPTION: str FLOOR_DESCRIPTION: str @@ -60,6 +66,10 @@ class EPCRecord: ENERGY_CONSUMPTION_CURRENT: int CO2_EMISSIONS_CURRENT: float + u_values_walls = None + u_values_roof = None + u_values_floor = None + def __post_init__(self): # We can have validation and cleaning steps for each of the fields # self.WALLS_DESCRIPTION = 'check' @@ -67,8 +77,31 @@ class EPCRecord: self.validation_configuration = EPCRecordValidationConfiguration # self._field_validation() + self._clean_records() + self._expand_description() + self._generate_uvalues() + self._validate_expanded_description() + self._validate_u_values() + # etc pass + def _expand_description(self): + # TODO: can be loop over all the descriptions, or done in one + pass + + def _clean_records(self): + """ + This method will clean the records + """ + # self._clean_potential_energy_efficiency() + # self._clean_environment_impact_potential() + # self._clean_energy_consumption_potential() + # self._clean_co2_emissions_potential() + # self._clean_current_energy_efficiency() + # self._clean_energy_consumption_current() + # self._clean_co2_emissions_current() + + def _field_validation(self): """ This method will validate each of the fields in the EPC record @@ -194,8 +227,10 @@ class EPCDifferenceRecord: """ self.record1 = record1 self.record2 = record2 + self.flag_fabric_consistency = False self.difference_record = {} + self.difference_validation_configuration = EPCDifferenceRecordValidationConfiguration self.fixed_data_validation_configuration = EPCDifferenceRecordFixedDataValidationConfiguration @@ -204,7 +239,7 @@ class EPCDifferenceRecord: self._construct_difference_record() self._validate_difference_record() - + self._detect_fabric_consistency() def _construct_difference_record(self): @@ -220,7 +255,7 @@ class EPCDifferenceRecord: ending_record = self.record2.get(component_variables + ["LODGEMENT_DATE"], return_asdict=True, key_suffix="_ENDING") starting_record = self.record1.get(component_variables + ["LODGEMENT_DATE"], return_asdict=True, key_suffix="_STARTING") - # TODO: DO we want to take the earliest potentials or max potentials? + # TODO: Take the earliest potentials self.difference_record = { "UPRN": self.record1.get("UPRN"), "RDSAP_CHANGE": rdsap_change, diff --git a/etl/epc/ValidationConfiguration.py b/etl/epc/ValidationConfiguration.py index fdca024a..24427c20 100644 --- a/etl/epc/ValidationConfiguration.py +++ b/etl/epc/ValidationConfiguration.py @@ -54,4 +54,8 @@ EPCDifferenceRecordFixedDataValidationConfiguration = { "type": "string", "acceptable_values": [] } +} + +DatasetValidationConfiguration = { + } \ No newline at end of file diff --git a/etl/epc/property_change_app.py b/etl/epc/property_change_app.py index 14a1bfd2..1aec2b55 100644 --- a/etl/epc/property_change_app.py +++ b/etl/epc/property_change_app.py @@ -399,6 +399,58 @@ def make_uvalues(df): # if all_equal: # return True +from typing import List +class EPCPipeline: + """ + This class will take a list of directories and process them to create a dataset: + - Load the data + - Pre-process the data + - Create a dataset + - Clean the dataset + - Store the dataset + """ + + def __init__(self, directories: List[Path]): + self.directories = directories + self.dataset = [] + self.cleaning_dataset = [] + self.all_equal_rows = [] + + def load_data(self): + """ + Load the data from the directories + :return: + """ + for directory in self.directories: + filepath = directory / "certificates.csv" + + data_processor = DataProcessor(filepath=filepath) + + data_processor.pre_process() + + self.dataset.append(data_processor.data) + + def create_dataset(self): + """ + Create a dataset from the data + :return: + """ + pass + + def clean_dataset(self): + """ + Clean the dataset + :return: + """ + pass + + def store_dataset(self): + """ + Store the dataset + :return: + """ + pass + def app(): # Get all the files in the directory @@ -410,6 +462,8 @@ def app(): # List all subdirectories directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] + + dataset = [] cleaning_dataset = [] # Keep track of the all equals @@ -606,8 +660,8 @@ def app(): data_by_urpn_df = process_and_prune_desriptions(data_by_urpn_df, cleaned_lookup) # Apply u-values - for col in ["walls_clean_description", "walls_clean_description_ENDING"]: - data_by_urpn_df[col] = data_by_urpn_df[col].str.replace("(assumed)", "").str.rstrip() + # for col in ["walls_clean_description", "walls_clean_description_ENDING"]: + # data_by_urpn_df[col] = data_by_urpn_df[col].str.replace("(assumed)", "").str.rstrip() data_by_urpn_df = make_uvalues(data_by_urpn_df).drop( columns=["walls_clean_description", "walls_clean_description_ENDING"] @@ -638,11 +692,13 @@ def app(): # TODO: move into difference record # Remove any records that have huge swings in their floor area + # Move this into TrainingDataset as this won't be run in newdata output["tfa_diff_abs"] = abs(output["TOTAL_FLOOR_AREA_ENDING"] - output["TOTAL_FLOOR_AREA_STARTING"]) output["tfa_diff_prop"] = output["tfa_diff_abs"] / output["TOTAL_FLOOR_AREA_STARTING"] output = output[output["tfa_diff_prop"] < 0.5] output = output.drop(columns=["tfa_diff_abs", "tfa_diff_prop"]) + # TODO: move into EPCRecord record uvalue_columns = [col for col in output.columns if "thermal_transmittance" in col] for uvalue_col in uvalue_columns: output[uvalue_col] = pd.to_numeric(output[uvalue_col])