From fb31f95457f3a40a60bc6ff502c1c2fe5e8233f1 Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Tue, 19 Mar 2024 11:48:48 +0000 Subject: [PATCH] add multiprocessing to process directory --- etl/epc/Pipeline.py | 54 ++++++++++++++++++++++++++++------ etl/epc/property_change_app.py | 1 + 2 files changed, 46 insertions(+), 9 deletions(-) diff --git a/etl/epc/Pipeline.py b/etl/epc/Pipeline.py index 0943b206..36c381ce 100644 --- a/etl/epc/Pipeline.py +++ b/etl/epc/Pipeline.py @@ -4,6 +4,7 @@ import pandas as pd from typing import List from pathlib import Path from tqdm import tqdm +import multiprocessing as mp from etl.epc.DataProcessor import EPCDataProcessor from etl.epc.Record import EPCRecord, EPCDifferenceRecord @@ -83,6 +84,7 @@ class EPCPipeline: epc_cleaning_dataset_key="sap_change_model/cleaning_dataset_rooms.parquet", epc_all_equal_rows_key="sap_change_model/all_equal_rows_rooms.parquet", epc_compiled_dataset_key="sap_change_model/dataset_rooms.parquet", + use_parallel=False, ): """ :param directories: List of directories to process @@ -107,6 +109,7 @@ class EPCPipeline: self.epc_cleaning_dataset_key = epc_cleaning_dataset_key self.epc_all_equal_rows_key = epc_all_equal_rows_key self.epc_compiled_dataset_key = epc_compiled_dataset_key + self.use_parallel = use_parallel def run(self): """ @@ -145,8 +148,11 @@ class EPCPipeline: "Directories not specified - Unable to run Training pipeline" ) - for directory in tqdm(self.directories): - self.process_directory(directory) + if self.use_parallel: + self.run_training_dataset_parallel_pipeline() + else: + for directory in tqdm(self.directories): + self.process_directory(directory) save_dataframe_to_s3_parquet( df=self.compiled_dataset, @@ -166,6 +172,41 @@ class EPCPipeline: file_key=self.epc_cleaning_dataset_key, ) + def run_training_dataset_parallel_pipeline(self): + """ + Run the training pipeline in parallel + """ + + with mp.Pool() as pool: + results = list( + tqdm( + pool.imap(self.process_directory_task, self.directories), + total=len(self.directories), + ), + ) + + for result in tqdm(results): + self.compiled_dataset = pd.concat( + [self.compiled_dataset, result["dataset"]] + ) + self.compiled_cleaning_averages.append(result["cleaning_averages"]) + self.compiled_all_equal_rows.extend(result["all_equal_rows"]) + + def process_directory_task(self, directory: str) -> pd.DataFrame: + """ + Task to enable parallel processing + """ + + self.process_directory(directory=directory) + + output = { + "dataset": self.compiled_dataset, + "cleaning_averages": self.epc_data_processor.cleaning_averages, + "all_equal_rows": self.compiled_all_equal_rows, + } + + return output + def process_directory(self, directory: Path): """ Process a single directory @@ -177,12 +218,13 @@ class EPCPipeline: self.epc_data_processor.prepare_data(filepath=filepath) constituency_data = self.epc_data_processor.data + self.compiled_cleaning_averages.append( self.epc_data_processor.cleaning_averages ) constituency_difference_records = [] - # self.check_records = [] + for uprn, property_data in constituency_data.groupby("uprn", observed=True): difference_records = self.process_uprn( uprn=str(uprn), property_data=property_data, directory=directory @@ -190,12 +232,6 @@ class EPCPipeline: if difference_records is not None: constituency_difference_records.extend(difference_records) - # check_list = [] - # for check_record in self.check_records: - # check_list.append(check_record["difference_record"]) - - # td = TrainingDataset(datasets=check_list, cleaned_lookup=clean_lookup) - constituency_dataset = TrainingDataset( datasets=constituency_difference_records, cleaned_lookup=clean_lookup ) diff --git a/etl/epc/property_change_app.py b/etl/epc/property_change_app.py index c8923d6d..c985567d 100644 --- a/etl/epc/property_change_app.py +++ b/etl/epc/property_change_app.py @@ -16,6 +16,7 @@ def main(): epc_pipeline = EPCPipeline( directories=directories, + use_parallel=True, epc_data_processor=EPCDataProcessor(run_mode="training"), )