diff --git a/etl/ownership/Ownership.py b/etl/ownership/Ownership.py index 122c36e5..cfa3e3b3 100644 --- a/etl/ownership/Ownership.py +++ b/etl/ownership/Ownership.py @@ -7,6 +7,7 @@ import re from utils.s3 import save_excel_to_s3 from utils.logger import setup_logger from backend.SearchEpc import SearchEpc +from etl.spatial.OpenUprnClient import OpenUprnClient logger = setup_logger() @@ -100,6 +101,12 @@ class Ownership: self.combined_matching_lookup_pre_filter_filepath = ( f"ownership/{self.project_name}/{self.run_timestamp}/combined_matching_lookup_pre_filter.xlsx" ) + # Final output paths + self.portfolio_owners_filepath = f"ownership/{self.project_name}/{self.run_timestamp}/portfolio_owners.xlsx" + self.portfolio_properties_filepath = ( + f"ownership/{self.project_name}/{self.run_timestamp}/portfolio_properties.xlsx" + ) + # Data self.epc_data = None self.ownership_data = None @@ -114,6 +121,11 @@ class Ownership: self.matched_addresses = None self.land_registry_matches = None + # Final outputs data + self.portfolio_owners = None + self.portfolio_properties = None + self.portfolio_epc_data = None + def pipeline(self, column_filters=None): """ Runs the full ownership process @@ -917,24 +929,64 @@ class Ownership: ~self.matched_addresses["sale_lodged_recently"] ] + logger.info("Performing conservation area and listed/herigage building filtering") + + portfolio_spatial_data = OpenUprnClient.get_spatial_data( + self.epc_data["UPRN"].tolist(), bucket_name="retrofit-data-dev" + ) + + portfolio_spatial_data = portfolio_spatial_data[ + ["UPRN", "conservation_status", "is_listed_building", "is_heritage_building"] + ] + + # Filter matched_addresses_final and filter combined_matching_lookup_final + matched_addresses_final = matched_addresses_final.merge( + portfolio_spatial_data, how="left", on="UPRN" + ) + matched_addresses_final = matched_addresses_final[ + ~matched_addresses_final["conservation_status"] & + ~matched_addresses_final["is_listed_building"] & + ~matched_addresses_final["is_heritage_building"] + ] + # Filter combined_matching_lookup accordingly combined_matching_lookup_final = self.combined_matching_lookup[ self.combined_matching_lookup["UPRN"].isin(self.combined_matching_lookup["UPRN"]) ] + # Roll up portfolio combined_aggregate = self.aggregate_matches( matching_lookup=combined_matching_lookup_final, company_ownership=self.ownership_data, properties=self.epc_paths ) - investment_owners = combined_aggregate[combined_aggregate["cumulative_value"] <= self.portfolio_value] + self.portfolio_owners = combined_aggregate[combined_aggregate["cumulative_value"] <= self.portfolio_value] - investment_properties = matched_addresses_final[ + self.portfolio_properties = matched_addresses_final[ matched_addresses_final["Company Registration No. (1)"].isin( - investment_owners["Company Registration No. (1)"]) + self.portfolio_owners["Company Registration No. (1)"] + ) ] - portfolio_epc_data = self.epc_data[self.epc_data["UPRN"].isin(investment_properties["UPRN"])] + self.portfolio_epc_data = self.epc_data[self.epc_data["UPRN"].isin(self.portfolio_properties["UPRN"])] - # + logger.info("Storing final outptus") + # Store data + save_excel_to_s3( + df=self.portfolio_owners, + bucket_name=self.bucket, + file_key=self.portfolio_owners_filepath, + ) + + save_excel_to_s3( + df=self.portfolio_properties, + bucket_name=self.bucket, + file_key=self.portfolio_properties_filepath, + ) + + save_excel_to_s3( + df=self.portfolio_epc_data, + bucket_name=self.bucket, + file_key=self.portfolio_epc_data_filepath, + ) diff --git a/etl/spatial/OpenUprnClient.py b/etl/spatial/OpenUprnClient.py index 198f9945..11827f8d 100644 --- a/etl/spatial/OpenUprnClient.py +++ b/etl/spatial/OpenUprnClient.py @@ -119,7 +119,28 @@ class OpenUprnClient: ) @staticmethod - def set_spatial_data(input_properties: list[Property], bucket_name): + def make_uprn_map(uprns, uprn_filenames): + """ + Given a list of UPRNs, this method will return a map of the UPRN to the filename that the UPRN is contained in + :param uprns: List of UPRNs + :param uprn_filenames: Lookup from UPRN range to filename + :return: + """ + uprn_map = {} + for uprn in uprns: + filtered_df = uprn_filenames[ + (uprn_filenames["lower"] <= int(uprn)) + & (uprn_filenames["upper"] >= int(uprn)) + ] + if filtered_df["filenames"].values[0] in uprn_map: + uprn_map[filtered_df["filenames"].values[0]].append(int(uprn)) + else: + uprn_map[filtered_df["filenames"].values[0]] = [int(uprn)] + + return uprn_map + + @classmethod + def set_spatial_data(cls, input_properties: list[Property], bucket_name): """ Given a list of properties, this method will set the spatial data for each property The method will look for the minimal set of uprn datasets that it needs to read in to get all of the spatial @@ -130,16 +151,8 @@ class OpenUprnClient: bucket_name=bucket_name, file_key="spatial/filename_meta.parquet" ) - uprn_map = {} - for uprn in [p.uprn for p in input_properties]: - filtered_df = uprn_filenames[ - (uprn_filenames["lower"] <= int(uprn)) - & (uprn_filenames["upper"] >= int(uprn)) - ] - if filtered_df["filenames"].values[0] in uprn_map: - uprn_map[filtered_df["filenames"].values[0]].append(int(uprn)) - else: - uprn_map[filtered_df["filenames"].values[0]] = [int(uprn)] + uprns = [p.uprn for p in input_properties] + uprn_map = cls.make_uprn_map(uprns, uprn_filenames) for filename, associated_uprn in tqdm(uprn_map.items(), total=len(uprn_map)): # Read in the file @@ -158,3 +171,27 @@ class OpenUprnClient: raise Exception(f"Property with UPRN {p.uprn} does not have spatial data") return input_properties + + @classmethod + def get_spatial_data(cls, uprns: list[int], bucket_name): + """ + Similar method to set_spatial_data, but designed to work more generally on a list of uprns + :return: + """ + uprn_filenames = read_dataframe_from_s3_parquet( + bucket_name=bucket_name, file_key="spatial/filename_meta.parquet" + ) + + uprn_map = cls.make_uprn_map(uprns, uprn_filenames) + + uprn_spatial_table = [] + for filename, associated_uprn in tqdm(uprn_map.items(), total=len(uprn_map)): + # Read in the file + spatial_data = read_dataframe_from_s3_parquet( + bucket_name="retrofit-data-dev", file_key=f"spatial/{filename}" + ) + + spatial_df = spatial_data[spatial_data["UPRN"].isin(associated_uprn)] + uprn_spatial_table.append(spatial_df) + + return pd.concat(uprn_spatial_table)