diff --git a/etl/ownership/Ownership.py b/etl/ownership/Ownership.py index 0bbb4689..122c36e5 100644 --- a/etl/ownership/Ownership.py +++ b/etl/ownership/Ownership.py @@ -54,7 +54,10 @@ class Ownership: domestic_ownership_path: str, overseas_ownership_path: str, land_registry_path: str, - project_name: str + project_name: str, + bucket: str, + average_property_value: float, + portfolio_value: float ): """ @@ -67,6 +70,8 @@ class Ownership: corporate ownership of properties in the UK, where the companies are overseas :param land_registry_path: A string that points to the location of the land registry data :param project_name: A string that is used to identify the project + :param bucket: The name of the s3 bucket where the data will be stored + :param average_property_value: The average property value in the area """ # All epc paths should end with certificates.csv @@ -78,13 +83,23 @@ class Ownership: self.land_registry_path = land_registry_path self.run_timestamp = str(datetime.now()) + self.project_name = project_name + self.bucket = bucket + + self.average_property_value = average_property_value + self.portfolio_value = portfolio_value # Data storage paths - self.epc_data_filepath = f"ownership/{project_name}/{self.run_timestamp}/epc_data.xlsx" + self.epc_data_filepath = f"ownership/{self.project_name}/{self.run_timestamp}/epc_data.xlsx" self.filtered_land_registry_filepath = ( - f"ownership/{project_name}/{self.run_timestamp}/filtered_land_registry.xlsx" + f"ownership/{self.project_name}/{self.run_timestamp}/filtered_land_registry.xlsx" + ) + self.matched_addresses_pre_filter_filepath = ( + f"ownership/{self.project_name}/{self.run_timestamp}/matched_addresses_pre_filter.xlsx" + ) + self.combined_matching_lookup_pre_filter_filepath = ( + f"ownership/{self.project_name}/{self.run_timestamp}/combined_matching_lookup_pre_filter.xlsx" ) - # Data self.epc_data = None self.ownership_data = None @@ -99,8 +114,40 @@ class Ownership: self.matched_addresses = None self.land_registry_matches = None - def pipeline(self): - pass + def pipeline(self, column_filters=None): + """ + Runs the full ownership process + :param column_filters: Dictionary with column names as keys and list of acceptable values as values. This + dictionary is is used to filter the EPC data and should look like this: + {"column_name": ["value1", "value2", ...]}, where column_name is the name of the column + in the EPC data and ["value1", "value2", ...] is a list of acceptable values for that + column. If a column is not found in the EPC data, an exception is raised. + """ + # Step 1: Get EPC data + self.source_epc_properties(column_filters=column_filters) + + # Step 2: Get company ownership data + self.load_company_ownership() + + # Step 3: Prepare data for matching + self.prepare_for_matching() + + # Step 4: Match EPC data to ownership data + self.match() + + # Step 5: Match land registry data to existing matches + self.match_with_land_registry() + # We store this data in s3 before we perform any filtering + save_excel_to_s3( + df=self.matched_addresses, + bucket_name=self.bucket, + file_key=self.matched_addresses_pre_filter_filepath + ) + save_excel_to_s3( + df=self.combined_matching_lookup, + bucket_name=self.bucket, + file_key=self.combined_matching_lookup_pre_filter_filepath + ) def source_epc_properties(self, column_filters=None): """ @@ -139,7 +186,7 @@ class Ownership: # We now store the data in s3 save_excel_to_s3( df=self.epc_data, - bucket_name="epc_data", + bucket_name=self.bucket, file_key=self.epc_data_filepath ) @@ -169,7 +216,8 @@ class Ownership: """ logger.info("Preparing data for matching") - # Now we filter properties the other way around + # Now we filter properties the other way around, since the ownership data might not have all of the + # postcodes that appear in the EPC data self.epc_data = self.epc_data[ self.epc_data["POSTCODE"].str.lower().isin(self.ownership_data["Postcode"].str.lower().unique()) ] @@ -468,6 +516,8 @@ class Ownership: } ) + logger.info("Matching complete - creating lookup tables") + self.freehold_matching_lookup = pd.DataFrame(freehold_matching_lookup) self.leasehold_matching_lookup = pd.DataFrame(leasehold_matching_lookup) @@ -540,6 +590,8 @@ class Ownership: .str.replace(",", "") ) + logger.info("Successfully completed matching") + def get_land_registry(self): """ This function reads in the land registry data and filters it on the postcodes found in the EPC data @@ -573,7 +625,7 @@ class Ownership: # Store this fitereed version in s3 save_excel_to_s3( df=self.land_registry, - bucket_name="epc_data", + bucket_name=self.bucket, file_key=self.filtered_land_registry_filepath, ) @@ -780,6 +832,7 @@ class Ownership: self.land_registry_matches = pd.DataFrame(land_registry_matches) + logger.info("Sucessfully completed land registry matching - merging onto matched_addresses") # Merge onto the EPC - ownership matches self.matched_addresses = self.matched_addresses.merge( land_registry_matches, @@ -803,5 +856,85 @@ class Ownership: (self.matched_addresses["TRANSACTION_TYPE"].isin(["marketed sale", "non marketed sale"])) ) - def filter_matches(self): - pass + def aggregate_matches(self, matching_lookup, company_ownership, properties): + df = matching_lookup.merge( + company_ownership, how="left", on="Title Number" + ).merge( + properties[["UPRN", "LOCAL_AUTHORITY_LABEL"]], how="left", on="UPRN" + ) + counts = ( + df.groupby(["Company Registration No. (1)", "LOCAL_AUTHORITY_LABEL"])["UPRN"] + .count() + .reset_index(name="number_of_properties") + ) + counts = counts.sort_values("number_of_properties", ascending=False) + + pivot_counts = counts.pivot_table( + index=["Company Registration No. (1)"], # Rows: companies and proprietors + columns="LOCAL_AUTHORITY_LABEL", # Columns: each local authority + values="number_of_properties", # The counts of properties + fill_value=0 # Fill missing values with 0 (where there are no properties owned) + ).reset_index() + + total_counts = ( + df.groupby(["Company Registration No. (1)"])["UPRN"] + .count() + .reset_index(name="total_number_of_properties") + ) + + # We have cases where the same company registration number results in the same company name, so we produce a + # best + # name per company registration number + best_names = ( + df.groupby(["Company Registration No. (1)"])["Proprietor Name (1)"] + .first() + .reset_index() + ) + + total_counts = best_names.merge( + total_counts, how="left", on=["Company Registration No. (1)"] + ) + + pivot_counts = pivot_counts.merge( + total_counts, how="left", on=["Company Registration No. (1)"] + ) + + pivot_counts = pivot_counts.sort_values("total_number_of_properties", ascending=False) + + pivot_counts["approx_value"] = self.average_property_value * pivot_counts["total_number_of_properties"] + pivot_counts["cumulative_value"] = pivot_counts["approx_value"].cumsum() + + return pivot_counts + + def create_final_matches(self): + """ + Given the matching to this point, this method creates the final matching tables + :return: + """ + logger.info("Creating final matches") + matched_addresses_final = self.matched_addresses[ + ~self.matched_addresses["sold_recently"] & + ~self.matched_addresses["sale_lodged_recently"] + ] + + # Filter combined_matching_lookup accordingly + combined_matching_lookup_final = self.combined_matching_lookup[ + self.combined_matching_lookup["UPRN"].isin(self.combined_matching_lookup["UPRN"]) + ] + + combined_aggregate = self.aggregate_matches( + matching_lookup=combined_matching_lookup_final, + company_ownership=self.ownership_data, + properties=self.epc_paths + ) + + investment_owners = combined_aggregate[combined_aggregate["cumulative_value"] <= self.portfolio_value] + + investment_properties = matched_addresses_final[ + matched_addresses_final["Company Registration No. (1)"].isin( + investment_owners["Company Registration No. (1)"]) + ] + + portfolio_epc_data = self.epc_data[self.epc_data["UPRN"].isin(investment_properties["UPRN"])] + + # diff --git a/etl/ownership/projects/midlands_portfolio/app.py b/etl/ownership/projects/midlands_portfolio/app.py index d370ba1e..17baed07 100644 --- a/etl/ownership/projects/midlands_portfolio/app.py +++ b/etl/ownership/projects/midlands_portfolio/app.py @@ -49,16 +49,29 @@ OVERSEAS_OWNERSHIP_PATH = "/Users/khalimconn-kowlessar/Downloads/OCOD_FULL_2024_ LAND_REGISTRY_PATH = "/Users/khalimconn-kowlessar/Downloads/pp-complete.csv" PROJECT_NAME = "Midlands Portfolio" +DATA_BUCKET = "retrofit-data-dev" + +# We use this as a rough figure, which helps us shape the portfolio +PROPERTY_VALUE_ESTIMATE = 200_000 +# We want a 50m portfolio, but we create a bigger portfolio that needed, since properties will be filtered out +PORTFOLIO_VALUE = 75_000_000 def app(): + epc_column_filters = { + "CURRENT_ENERGY_RATING": ["F", "G"] + } + ownership_instance = Ownership( epc_paths=EPC_PATHS, domestic_ownership_path=DOMESTIC_OWNERSHIP_PATH, overseas_ownership_path=OVERSEAS_OWNERSHIP_PATH, land_registry_path=LAND_REGISTRY_PATH, - project_name=PROJECT_NAME + project_name=PROJECT_NAME, + bucket=DATA_BUCKET, + average_property_value=PROPERTY_VALUE_ESTIMATE, + portfolio_value=PORTFOLIO_VALUE ) - ownership_instance.pipeline() + ownership_instance.pipeline(column_filters=epc_column_filters) # TODO: Create portfolio and payload