diff --git a/etl/epc/DataProcessor.py b/etl/epc/DataProcessor.py index 801a9456..4987a23e 100644 --- a/etl/epc/DataProcessor.py +++ b/etl/epc/DataProcessor.py @@ -135,6 +135,7 @@ class EPCDataProcessor: self.fill_invalid_constituency_fields(ignore_step=ignore_step) self.make_cleaning_averages(ignore_step=ignore_step) + self.add_local_authority_to_cleaning_average(ignore_step=ignore_step) # TODO: check if this has impact on training dataset cleaned_data = self.apply_averages_cleaning( @@ -152,7 +153,6 @@ class EPCDataProcessor: self.data = self.data if cleaned_data is None else cleaned_data - self.add_local_authority_to_cleaning_average(ignore_step=ignore_step) self.cast_cleaning_averages_columns_to_lower(ignore_step=ignore_step) self.cast_data_columns_to_lower() diff --git a/etl/epc/Pipeline.py b/etl/epc/Pipeline.py index ea484e56..168f7f71 100644 --- a/etl/epc/Pipeline.py +++ b/etl/epc/Pipeline.py @@ -123,7 +123,7 @@ class EPCPipeline: data = self.epc_data_processor.data - + epc_records = [EPCRecord(**x, run_mode="newdata") for x in data.to_dict(orient='records')] @@ -137,23 +137,23 @@ class EPCPipeline: for directory in tqdm(self.directories): self.process_directory(directory) - # save_dataframe_to_s3_parquet( - # df=self.compiled_dataset, - # bucket_name=self.epc_bucket_name, - # file_key=self.epc_compiled_dataset_key, - # ) + save_dataframe_to_s3_parquet( + df=self.compiled_dataset, + bucket_name=self.epc_bucket_name, + file_key=self.epc_compiled_dataset_key, + ) - # save_dataframe_to_s3_parquet( - # df=pd.concat(self.compiled_all_equal_rows), - # bucket_name=self.epc_bucket_name, - # file_key=self.epc_all_equal_rows_key, - # ) + save_dataframe_to_s3_parquet( + df=pd.DataFrame(self.compiled_all_equal_rows), + bucket_name=self.epc_bucket_name, + file_key=self.epc_all_equal_rows_key, + ) - # save_dataframe_to_s3_parquet( - # df=pd.concat(self.compiled_cleaning_averages), - # bucket_name=self.epc_bucket_name, - # file_key=self.epc_cleaning_dataset_key, - # ) + save_dataframe_to_s3_parquet( + df=pd.concat(self.compiled_cleaning_averages), + bucket_name=self.epc_bucket_name, + file_key=self.epc_cleaning_dataset_key, + ) def process_directory(self, directory: Path): """ diff --git a/etl/epc/property_change_app.py b/etl/epc/property_change_app.py index 3dc6e39b..5486ee99 100644 --- a/etl/epc/property_change_app.py +++ b/etl/epc/property_change_app.py @@ -18,16 +18,16 @@ def main(): epc_pipeline.run() # For testing - dataset_df = epc_pipeline.compiled_dataset - dataset_df.to_parquet("refactor_datasets/dataset.parquet") - pd.DataFrame(epc_pipeline.compiled_all_equal_rows).to_parquet("refactor_datasets/all_equal_rows.parquet") - pd.concat(epc_pipeline.compiled_cleaning_averages).to_parquet("refactor_datasets/cleaning_averages.parquet") + # dataset_df = epc_pipeline.compiled_dataset + # dataset_df.to_parquet("refactor_datasets/dataset.parquet") + # pd.DataFrame(epc_pipeline.compiled_all_equal_rows).to_parquet("refactor_datasets/all_equal_rows.parquet") + # pd.concat(epc_pipeline.compiled_cleaning_averages).to_parquet("refactor_datasets/cleaning_averages.parquet") - from utils.s3 import read_dataframe_from_s3_parquet - dataset = read_dataframe_from_s3_parquet( - bucket_name="retrofit-data-dev", - file_key="sap_change_model/dataset_test.parquet", - ) + # from utils.s3 import read_dataframe_from_s3_parquet + # dataset = read_dataframe_from_s3_parquet( + # bucket_name="retrofit-data-dev", + # file_key="sap_change_model/dataset_test.parquet", + # ) if __name__ == "__main__":