fixed merge conflict

This commit is contained in:
Michael Duong 2024-03-21 18:52:06 +00:00
commit 25965f3801
2 changed files with 45 additions and 9 deletions

View file

@ -91,6 +91,7 @@ class EPCPipeline:
epc_cleaning_dataset_key="sap_change_model/cleaning_dataset_record.parquet",
epc_all_equal_rows_key="sap_change_model/all_equal_rows_record.parquet",
epc_compiled_dataset_key="sap_change_model/dataset_record.parquet",
use_parallel=False,
):
"""
:param directories: List of directories to process
@ -115,6 +116,7 @@ class EPCPipeline:
self.epc_cleaning_dataset_key = epc_cleaning_dataset_key
self.epc_all_equal_rows_key = epc_all_equal_rows_key
self.epc_compiled_dataset_key = epc_compiled_dataset_key
self.use_parallel = use_parallel
def run(self):
"""
@ -254,8 +256,11 @@ class EPCPipeline:
"Directories not specified - Unable to run Training pipeline"
)
for directory in tqdm(self.directories):
self.process_directory(directory)
if self.use_parallel:
self.run_training_dataset_parallel_pipeline()
else:
for directory in tqdm(self.directories):
self.process_directory(directory)
save_dataframe_to_s3_parquet(
df=self.compiled_dataset,
@ -275,6 +280,41 @@ class EPCPipeline:
file_key=self.epc_cleaning_dataset_key,
)
def run_training_dataset_parallel_pipeline(self):
"""
Run the training pipeline in parallel
"""
with mp.Pool() as pool:
results = list(
tqdm(
pool.imap(self.process_directory_task, self.directories),
total=len(self.directories),
),
)
for result in tqdm(results):
self.compiled_dataset = pd.concat(
[self.compiled_dataset, result["dataset"]]
)
self.compiled_cleaning_averages.append(result["cleaning_averages"])
self.compiled_all_equal_rows.extend(result["all_equal_rows"])
def process_directory_task(self, directory: str) -> pd.DataFrame:
"""
Task to enable parallel processing
"""
self.process_directory(directory=directory)
output = {
"dataset": self.compiled_dataset,
"cleaning_averages": self.epc_data_processor.cleaning_averages,
"all_equal_rows": self.compiled_all_equal_rows,
}
return output
def process_directory(self, directory: Path):
"""
Process a single directory
@ -286,12 +326,13 @@ class EPCPipeline:
self.epc_data_processor.prepare_data(filepath=filepath)
constituency_data = self.epc_data_processor.data
self.compiled_cleaning_averages.append(
self.epc_data_processor.cleaning_averages
)
constituency_difference_records = []
# self.check_records = []
for uprn, property_data in constituency_data.groupby("uprn", observed=True):
difference_records = self.process_uprn(
uprn=str(uprn), property_data=property_data, directory=directory
@ -299,12 +340,6 @@ class EPCPipeline:
if difference_records is not None:
constituency_difference_records.extend(difference_records)
# check_list = []
# for check_record in self.check_records:
# check_list.append(check_record["difference_record"])
# td = TrainingDataset(datasets=check_list, cleaned_lookup=clean_lookup)
constituency_dataset = TrainingDataset(
datasets=constituency_difference_records, cleaned_lookup=clean_lookup
)

View file

@ -17,6 +17,7 @@ def main():
epc_pipeline = EPCPipeline(
directories=directories,
run_mode="record",
use_parallel=True,
epc_data_processor=EPCDataProcessor(run_mode="training"),
)