refactor near complete

This commit is contained in:
Michael Duong 2023-12-21 11:39:42 +00:00
parent 1eb98d2d35
commit 069b59f2ce
5 changed files with 45 additions and 18 deletions

View file

@ -145,6 +145,10 @@ async def trigger_plan(body: PlanTriggerRequest):
recommendations[p.id] = property_recommendations recommendations[p.id] = property_recommendations
from etl.epc.Pipeline import EPCPipeline
from etl.epc.DataProcessor import EPCDataProcessor
newdata_pipeline = EPCPipeline(epc_data_processor=EPCDataProcessor(data=p.get_model_data(), run_mode="newdata"), run_mode="newdata")
# TODO: p.get_model_data() -> EPCRecord # TODO: p.get_model_data() -> EPCRecord
# Finally, we'll prepare data for predicting the impact on SAP # Finally, we'll prepare data for predicting the impact on SAP

View file

@ -83,7 +83,7 @@ class EPCDataProcessor:
raise ValueError("Run mode must be either training or newdata") raise ValueError("Run mode must be either training or newdata")
self.run_mode = run_mode if not violation_mode else "newdata" self.run_mode = run_mode if not violation_mode else "newdata"
def prepare_data(self, filepath: Path | str) -> None: def prepare_data(self, filepath: Path | str | None = None) -> None:
""" """
Given the run mode, we apply the relevant pipeline steps Given the run mode, we apply the relevant pipeline steps
Ignore step is used to highlight which steps are not needed in newdata Ignore step is used to highlight which steps are not needed in newdata

View file

@ -486,19 +486,19 @@ class TrainingDataset(BaseDataset):
pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE) pd.to_datetime(lodgement_date) - pd.to_datetime(EARLIEST_EPC_DATE)
).dt.days ).dt.days
def __add__(self, other) -> "TrainingDataset": # def __add__(self, other) -> "TrainingDataset":
if not isinstance(other, TrainingDataset): # if not isinstance(other, TrainingDataset):
raise TypeError("Addition can only be performed with another instance of TrainingDataset") # raise TypeError("Addition can only be performed with another instance of TrainingDataset")
return TrainingDataset(self.datasets + other.datasets) # return TrainingDataset(self.datasets + other.datasets)
def __radd__(self, other): # def __radd__(self, other):
""" # """
Required for sum() to work # Required for sum() to work
""" # """
if isinstance(other, int): # if isinstance(other, int):
return self # return self
else: # else:
return self.__add__(other) # return self.__add__(other)
class NewDataset(BaseDataset): class NewDataset(BaseDataset):
""" """
@ -506,7 +506,7 @@ class NewDataset(BaseDataset):
""" """
def __init__(self, datasets: List[EPCDifferenceRecord]) -> None: def __init__(self, datasets: List[EPCDifferenceRecord]) -> None:
self.pipeline_steps = self.pipeline_factory("newdata") # self.pipeline_steps = self.pipeline_factory("newdata")
self.datasets = datasets self.datasets = datasets
def __add__(self, other) -> "NewDataset": def __add__(self, other) -> "NewDataset":

View file

@ -66,8 +66,8 @@ class EPCPipeline:
def __init__( def __init__(
self, self,
directories: List[Path],
epc_data_processor: EPCDataProcessor, epc_data_processor: EPCDataProcessor,
directories: List[Path] | None = None,
run_mode="training", run_mode="training",
epc_local_file="certificates.csv", epc_local_file="certificates.csv",
epc_bucket_name="retrofit-data-dev", epc_bucket_name="retrofit-data-dev",
@ -102,6 +102,29 @@ class EPCPipeline:
""" """
Entrypoint to run the pipeline Entrypoint to run the pipeline
""" """
if self.run_mode == "training":
self.run_training_dataset_pipeline()
elif self.run_mode == "newdata":
self.run_newdata_dataset_pipeline()
else:
raise ValueError("Run mode defined needs to be in 'training' or 'newdata'")
def run_newdata_dataset_pipeline(self):
"""
Main function to run the newdata pipeline
"""
self.epc_data_processor.prepare_data()
data = self.epc_data_processor.data
epc_records = [EPCRecord(**x, run_mode="newdata") for x in data.to_dict(orient='records')]
def run_training_dataset_pipeline(self):
"""
Main function to run the training dataset generation pipeline
"""
if self.directories is None:
raise ValueError("Directories not specified - Unable to run Training pipeline")
for directory in self.directories: for directory in self.directories:
self.process_directory(directory) self.process_directory(directory)
@ -169,7 +192,7 @@ class EPCPipeline:
variable_data = property_data[VARIABLE_DATA_FEATURES] variable_data = property_data[VARIABLE_DATA_FEATURES]
uprn = str(uprn) uprn = str(uprn)
epc_records = [EPCRecord(uprn, **x) for x in variable_data.to_dict(orient='records')] epc_records = [EPCRecord(uprn, **x, run_mode="training") for x in variable_data.to_dict(orient='records')]
# TODO: We want to be able to provide value for the u values in the main pipeline so this will need to be part of the EPCRecord # TODO: We want to be able to provide value for the u values in the main pipeline so this will need to be part of the EPCRecord

View file

@ -5,7 +5,7 @@ from etl.epc.ValidationConfiguration import (
EPCDifferenceRecordValidationConfiguration, EPCDifferenceRecordValidationConfiguration,
EPCDifferenceRecordFixedDataValidationConfiguration EPCDifferenceRecordFixedDataValidationConfiguration
) )
from typing import Union, List from typing import Any, Union, List
from etl.epc.settings import ( from etl.epc.settings import (
RDSAP_RESPONSE, RDSAP_RESPONSE,
HEAT_DEMAND_RESPONSE, HEAT_DEMAND_RESPONSE,
@ -242,7 +242,7 @@ class EPCRecord:
return self.__dict__[RDSAP_RESPONSE] <= other.__dict__[RDSAP_RESPONSE] return self.__dict__[RDSAP_RESPONSE] <= other.__dict__[RDSAP_RESPONSE]
def get(self, key: Union[str, List[str]], return_asdict: bool = False, key_suffix: str = None): def get(self, key: Union[str, List[str]], return_asdict: bool = False, key_suffix: str | None = None) -> Any:
""" """
This method will return the value of the key This method will return the value of the key
""" """