From 6f053a20d159ca08a87bed6b245b4ba32fe27d1b Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Mon, 19 Aug 2024 11:20:38 +0100 Subject: [PATCH] completing source_epc_properties --- etl/ownership/Ownership.py | 48 ++++++++++++++++++++------------------ utils/s3.py | 33 ++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 23 deletions(-) diff --git a/etl/ownership/Ownership.py b/etl/ownership/Ownership.py index 90abe147..3bdae59c 100644 --- a/etl/ownership/Ownership.py +++ b/etl/ownership/Ownership.py @@ -33,7 +33,8 @@ class Ownership: epc_paths: List[str], domestic_ownership_path: str, overseas_ownership_path: str, - land_registry_path: str + land_registry_path: str, + project_name: str ): """ @@ -45,6 +46,7 @@ class Ownership: :param overseas_ownership_path: A string which points to the location of the OCOD ownership data, that details corporate ownership of properties in the UK, where the companies are overseas :param land_registry_path: A string that points to the location of the land registry data + :param project_name: A string that is used to identify the project """ # All epc paths should end with certificates.csv @@ -57,6 +59,9 @@ class Ownership: self.run_timestamp = str(datetime.now()) + # Data storage paths + self.epc_data_filepath = f"ownership/{project_name}/{self.run_timestamp}/epc_data.xlsx" + # Data self.epc_data = None self.ownership_data = None @@ -76,47 +81,44 @@ class Ownership: def source_epc_properties(self, column_filters=None): """ - This function will filter the epc data as specified by column filers, searching across all of the EPC tables - as defined by - :param column_filters: - :return: + This function will filter the epc data as specified by column filters, searching across all of the EPC tables + :param column_filters: Dictionary with column names as keys and list of acceptable values as values. This + dictionary is is used to filter the EPC data and should look like this: + {"column_name": ["value1", "value2", ...]}, where column_name is the name of the column + in the EPC data and ["value1", "value2", ...] is a list of acceptable values for that + column. If a column is not found in the EPC data, an exception is raised. """ column_filters = {} if column_filters is None else column_filters - # TODO: Do the tenure filtering here! - # ["rental (private)", "Rented (private)", "owner-occupied", "Owner-occupied"] - data = [] for path in tqdm(self.epc_paths): epc_data = pd.read_csv(path, low_memory=False) - epc_data = epc_data[~pd.isnull(epc_data["UPRN"])] epc_data["UPRN"] = epc_data["UPRN"].astype(int).astype(str) if pd.isnull(pd.to_datetime(epc_data["LODGEMENT_DATETIME"], errors="coerce")).sum(): - raise Exception("Lodgement datetime contains ") + raise Exception("Lodgement datetime contains invalid data") - # Get the newest EPC for each UPRN. We use LODGEMENT_DATE as a proxy for this epc_data["LODGEMENT_DATETIME"] = pd.to_datetime(epc_data["LODGEMENT_DATETIME"], errors="coerce") + epc_data = epc_data.sort_values(["LODGEMENT_DATETIME"], ascending=False).drop_duplicates("UPRN") - epc_data = epc_data.sort_values( - ["LODGEMENT_DATE", "LODGEMENT_DATETIME"], ascending=False - ).drop_duplicates("UPRN") + # Apply column filters + for column, values in column_filters.items(): + if column in epc_data.columns: + epc_data = epc_data[epc_data[column].isin(values)] + else: + raise Exception(f"Column {column} not found in data. column_filters is malformed") - # Get G & F properties - raise Exception("IMPLEMENT ME") - epc_data = epc_data[epc_data["CURRENT_ENERGY_RATING"].isin(["G", "F"])] data.append(epc_data) - self.epc_data = pd.concat(data) - - # Save as an excel - # TODO: Implement me + self.epc_data = pd.concat(data, ignore_index=True) + # We now store the data in s3 save_excel_to_s3( - + df=self.epc_data, + bucket_name="epc_data", + file_key=self.epc_data_filepath ) - # data.to_excel("EPC F & G Properties - V2.xlsx", index=False) def load_company_ownership(self): """ diff --git a/utils/s3.py b/utils/s3.py index b3553824..ca0cbfac 100644 --- a/utils/s3.py +++ b/utils/s3.py @@ -229,6 +229,39 @@ def read_excel_from_s3(bucket_name, file_key, header_row, drop_all_na=True): return df +def save_excel_to_s3(df, bucket_name, file_key): + """ + Save a pandas DataFrame as an Excel file on S3. + + :param df: DataFrame to save. + :param bucket_name: S3 bucket name. + :param file_key: S3 file key. This includes the file name and path. + """ + # Ensure the DataFrame is not empty + if df.empty: + raise ValueError("The DataFrame is empty. Nothing to save to Excel.") + + # Ensure the file_key ends with an appropriate Excel file extension + if not file_key.endswith((".xls", ".xlsx")): + raise ValueError("The specified file key does not appear to be an Excel file.") + + # Create a BytesIO buffer + output = BytesIO() + # Save DataFrame to an Excel file buffer + df.to_excel(output, index=False) + output.seek(0) # Important: move back to the beginning of the buffer + + # Initialize a session using boto3 + session = boto3.session.Session() + s3 = session.resource('s3') + + # Upload the Excel file from the buffer to S3 + bucket = s3.Bucket(bucket_name) + bucket.put_object(Body=output, Key=file_key) + + logger.info(f"Excel file saved to S3 bucket '{bucket_name}' with key '{file_key}'") + + def read_csv_from_s3(bucket_name, filepath): s3 = boto3.client('s3')