add record mode for testing

This commit is contained in:
Michael Duong 2024-02-22 16:11:26 +00:00
parent c53e4ce849
commit f715538c53
2 changed files with 558 additions and 503 deletions

File diff suppressed because it is too large Load diff

View file

@ -7,7 +7,7 @@ from tqdm import tqdm
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.Record import EPCRecord, EPCDifferenceRecord
from etl.epc.Dataset import TrainingDataset
from etl.epc.Dataset import TrainingDataset, RecordDataset
from utils.s3 import save_dataframe_to_s3_parquet, read_from_s3
from etl.epc.settings import (
MANDATORY_FIXED_FEATURES,
@ -24,8 +24,8 @@ from etl.epc.settings import (
# TODO: change in setting file
MANDATORY_FIXED_FEATURES = [x.lower() for x in MANDATORY_FIXED_FEATURES]
# LATEST_FIELD = [x.lower() for x in LATEST_FIELD if x.lower() not in ROOM_FEATURES]
LATEST_FIELD = [x.lower() for x in LATEST_FIELD]
LATEST_FIELD = [x.lower() for x in LATEST_FIELD if x.lower() not in ROOM_FEATURES]
# LATEST_FIELD = [x.lower() for x in LATEST_FIELD]
COMPONENT_FEATURES = [x.lower() for x in COMPONENT_FEATURES]
RDSAP_RESPONSE = RDSAP_RESPONSE.lower()
HEAT_DEMAND_RESPONSE = HEAT_DEMAND_RESPONSE.lower()
@ -62,6 +62,12 @@ def get_cleaned_description_mapping():
clean_lookup = get_cleaned_description_mapping()
# import pickle
# with open("./clean_lookup.pkl", "wb") as f:
# pickle.dump(clean_lookup, f)
# clean_lookup = pickle.load(open("./clean_lookup.pkl", "rb"))
class EPCPipeline:
"""
@ -117,8 +123,58 @@ class EPCPipeline:
self.run_training_dataset_pipeline()
elif self.run_mode == "newdata":
self.run_newdata_dataset_pipeline()
elif self.run_mode == "record":
self.run_record_dataset_pipeline()
else:
raise ValueError("Run mode defined needs to be in 'training' or 'newdata'")
def run_record_dataset_pipeline(self):
"""
Running pipeline with just the EPCRecords
"""
if self.directories is None:
raise ValueError(
"Directories not specified - Unable to run Training pipeline"
)
for directory in tqdm(self.directories):
filepath = directory / self.epc_local_file
self.epc_data_processor.prepare_data(filepath=filepath)
constituency_data = self.epc_data_processor.data
self.compiled_cleaning_averages.append(
self.epc_data_processor.cleaning_averages
)
# TODO: integrate with EPCRecord
record_dataset = constituency_data[['uprn'] + VARIABLE_DATA_FEATURES + MANDATORY_FIXED_FEATURES + LATEST_FIELD]
constituency_dataset = RecordDataset(datasets=record_dataset, cleaned_lookup=clean_lookup)
self.compiled_dataset = pd.concat(
[self.compiled_dataset, constituency_dataset.df]
)
save_dataframe_to_s3_parquet(
df=self.compiled_dataset,
bucket_name=self.epc_bucket_name,
file_key=self.epc_compiled_dataset_key,
)
save_dataframe_to_s3_parquet(
df=pd.DataFrame(self.compiled_all_equal_rows),
bucket_name=self.epc_bucket_name,
file_key=self.epc_all_equal_rows_key,
)
save_dataframe_to_s3_parquet(
df=pd.concat(self.compiled_cleaning_averages),
bucket_name=self.epc_bucket_name,
file_key=self.epc_cleaning_dataset_key,
)
def run_newdata_dataset_pipeline(self):
"""