diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index ef148ed0..996ddcfd 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -145,6 +145,10 @@ async def trigger_plan(body: PlanTriggerRequest): recommendations[p.id] = property_recommendations + from etl.epc.Pipeline import EPCPipeline + from etl.epc.DataProcessor import EPCDataProcessor + newdata_pipeline = EPCPipeline(epc_data_processor=EPCDataProcessor(data=p.get_model_data(), run_mode="newdata"), run_mode="newdata") + # TODO: p.get_model_data() -> EPCRecord # Finally, we'll prepare data for predicting the impact on SAP diff --git a/etl/epc/DataProcessor.py b/etl/epc/DataProcessor.py index 7f41c96f..48b62dc7 100644 --- a/etl/epc/DataProcessor.py +++ b/etl/epc/DataProcessor.py @@ -83,7 +83,7 @@ class EPCDataProcessor: raise ValueError("Run mode must be either training or newdata") self.run_mode = run_mode if not violation_mode else "newdata" - def prepare_data(self, filepath: Path | str) -> None: + def prepare_data(self, filepath: Path | str | None = None) -> None: """ Given the run mode, we apply the relevant pipeline steps Ignore step is used to highlight which steps are not needed in newdata diff --git a/etl/epc/Dataset.py b/etl/epc/Dataset.py index b0c7f4c7..92efd0b3 100644 --- a/etl/epc/Dataset.py +++ b/etl/epc/Dataset.py @@ -486,19 +486,19 @@ class TrainingDataset(BaseDataset): pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE) ).dt.days - def __add__(self, other) -> "TrainingDataset": - if not isinstance(other, TrainingDataset): - raise TypeError("Addition can only be performed with another instance of TrainingDataset") - return TrainingDataset(self.datasets + other.datasets) + # def __add__(self, other) -> "TrainingDataset": + # if not isinstance(other, TrainingDataset): + # raise TypeError("Addition can only be performed with another instance of TrainingDataset") + # return TrainingDataset(self.datasets + other.datasets) - def __radd__(self, other): - """ - Required for sum() to work - """ - if isinstance(other, int): - return self - else: - return self.__add__(other) + # def __radd__(self, other): + # """ + # Required for sum() to work + # """ + # if isinstance(other, int): + # return self + # else: + # return self.__add__(other) class NewDataset(BaseDataset): """ @@ -506,7 +506,7 @@ class NewDataset(BaseDataset): """ def __init__(self, datasets: List[EPCDifferenceRecord]) -> None: - self.pipeline_steps = self.pipeline_factory("newdata") + # self.pipeline_steps = self.pipeline_factory("newdata") self.datasets = datasets def __add__(self, other) -> "NewDataset": diff --git a/etl/epc/Pipeline.py b/etl/epc/Pipeline.py index c4af6911..de827dee 100644 --- a/etl/epc/Pipeline.py +++ b/etl/epc/Pipeline.py @@ -66,8 +66,8 @@ class EPCPipeline: def __init__( self, - directories: List[Path], epc_data_processor: EPCDataProcessor, + directories: List[Path] | None = None, run_mode="training", epc_local_file="certificates.csv", epc_bucket_name="retrofit-data-dev", @@ -102,6 +102,29 @@ class EPCPipeline: """ Entrypoint to run the pipeline """ + if self.run_mode == "training": + self.run_training_dataset_pipeline() + elif self.run_mode == "newdata": + self.run_newdata_dataset_pipeline() + else: + raise ValueError("Run mode defined needs to be in 'training' or 'newdata'") + + def run_newdata_dataset_pipeline(self): + """ + Main function to run the newdata pipeline + """ + self.epc_data_processor.prepare_data() + data = self.epc_data_processor.data + + epc_records = [EPCRecord(**x, run_mode="newdata") for x in data.to_dict(orient='records')] + + def run_training_dataset_pipeline(self): + """ + Main function to run the training dataset generation pipeline + """ + if self.directories is None: + raise ValueError("Directories not specified - Unable to run Training pipeline") + for directory in self.directories: self.process_directory(directory) @@ -169,7 +192,7 @@ class EPCPipeline: variable_data = property_data[VARIABLE_DATA_FEATURES] uprn = str(uprn) - epc_records = [EPCRecord(uprn, **x) for x in variable_data.to_dict(orient='records')] + epc_records = [EPCRecord(uprn, **x, run_mode="training") for x in variable_data.to_dict(orient='records')] # TODO: We want to be able to provide value for the u values in the main pipeline so this will need to be part of the EPCRecord diff --git a/etl/epc/Record.py b/etl/epc/Record.py index f8c72969..f2b3f519 100644 --- a/etl/epc/Record.py +++ b/etl/epc/Record.py @@ -5,7 +5,7 @@ from etl.epc.ValidationConfiguration import ( EPCDifferenceRecordValidationConfiguration, EPCDifferenceRecordFixedDataValidationConfiguration ) -from typing import Union, List +from typing import Any, Union, List from etl.epc.settings import ( RDSAP_RESPONSE, HEAT_DEMAND_RESPONSE, @@ -242,7 +242,7 @@ class EPCRecord: return self.__dict__[RDSAP_RESPONSE] <= other.__dict__[RDSAP_RESPONSE] - def get(self, key: Union[str, List[str]], return_asdict: bool = False, key_suffix: str = None): + def get(self, key: Union[str, List[str]], return_asdict: bool = False, key_suffix: str | None = None) -> Any: """ This method will return the value of the key """