add multiprocessing to process directory

This commit is contained in:
Michael Duong 2024-03-19 11:48:48 +00:00
parent a45cf2f319
commit fb31f95457
2 changed files with 46 additions and 9 deletions

View file

@ -4,6 +4,7 @@ import pandas as pd
from typing import List
from pathlib import Path
from tqdm import tqdm
import multiprocessing as mp
from etl.epc.DataProcessor import EPCDataProcessor
from etl.epc.Record import EPCRecord, EPCDifferenceRecord
@ -83,6 +84,7 @@ class EPCPipeline:
epc_cleaning_dataset_key="sap_change_model/cleaning_dataset_rooms.parquet",
epc_all_equal_rows_key="sap_change_model/all_equal_rows_rooms.parquet",
epc_compiled_dataset_key="sap_change_model/dataset_rooms.parquet",
use_parallel=False,
):
"""
:param directories: List of directories to process
@ -107,6 +109,7 @@ class EPCPipeline:
self.epc_cleaning_dataset_key = epc_cleaning_dataset_key
self.epc_all_equal_rows_key = epc_all_equal_rows_key
self.epc_compiled_dataset_key = epc_compiled_dataset_key
self.use_parallel = use_parallel
def run(self):
"""
@ -145,8 +148,11 @@ class EPCPipeline:
"Directories not specified - Unable to run Training pipeline"
)
for directory in tqdm(self.directories):
self.process_directory(directory)
if self.use_parallel:
self.run_training_dataset_parallel_pipeline()
else:
for directory in tqdm(self.directories):
self.process_directory(directory)
save_dataframe_to_s3_parquet(
df=self.compiled_dataset,
@ -166,6 +172,41 @@ class EPCPipeline:
file_key=self.epc_cleaning_dataset_key,
)
def run_training_dataset_parallel_pipeline(self):
"""
Run the training pipeline in parallel
"""
with mp.Pool() as pool:
results = list(
tqdm(
pool.imap(self.process_directory_task, self.directories),
total=len(self.directories),
),
)
for result in tqdm(results):
self.compiled_dataset = pd.concat(
[self.compiled_dataset, result["dataset"]]
)
self.compiled_cleaning_averages.append(result["cleaning_averages"])
self.compiled_all_equal_rows.extend(result["all_equal_rows"])
def process_directory_task(self, directory: str) -> pd.DataFrame:
"""
Task to enable parallel processing
"""
self.process_directory(directory=directory)
output = {
"dataset": self.compiled_dataset,
"cleaning_averages": self.epc_data_processor.cleaning_averages,
"all_equal_rows": self.compiled_all_equal_rows,
}
return output
def process_directory(self, directory: Path):
"""
Process a single directory
@ -177,12 +218,13 @@ class EPCPipeline:
self.epc_data_processor.prepare_data(filepath=filepath)
constituency_data = self.epc_data_processor.data
self.compiled_cleaning_averages.append(
self.epc_data_processor.cleaning_averages
)
constituency_difference_records = []
# self.check_records = []
for uprn, property_data in constituency_data.groupby("uprn", observed=True):
difference_records = self.process_uprn(
uprn=str(uprn), property_data=property_data, directory=directory
@ -190,12 +232,6 @@ class EPCPipeline:
if difference_records is not None:
constituency_difference_records.extend(difference_records)
# check_list = []
# for check_record in self.check_records:
# check_list.append(check_record["difference_record"])
# td = TrainingDataset(datasets=check_list, cleaned_lookup=clean_lookup)
constituency_dataset = TrainingDataset(
datasets=constituency_difference_records, cleaned_lookup=clean_lookup
)

View file

@ -16,6 +16,7 @@ def main():
epc_pipeline = EPCPipeline(
directories=directories,
use_parallel=True,
epc_data_processor=EPCDataProcessor(run_mode="training"),
)