set up structure

This commit is contained in:
Khalim Conn-Kowlessar 2024-08-19 12:07:18 +01:00
parent aca7e6935e
commit 0db2f59230
2 changed files with 105 additions and 16 deletions

View file

@ -7,6 +7,7 @@ import re
from utils.s3 import save_excel_to_s3
from utils.logger import setup_logger
from backend.SearchEpc import SearchEpc
from etl.spatial.OpenUprnClient import OpenUprnClient
logger = setup_logger()
@ -100,6 +101,12 @@ class Ownership:
self.combined_matching_lookup_pre_filter_filepath = (
f"ownership/{self.project_name}/{self.run_timestamp}/combined_matching_lookup_pre_filter.xlsx"
)
# Final output paths
self.portfolio_owners_filepath = f"ownership/{self.project_name}/{self.run_timestamp}/portfolio_owners.xlsx"
self.portfolio_properties_filepath = (
f"ownership/{self.project_name}/{self.run_timestamp}/portfolio_properties.xlsx"
)
# Data
self.epc_data = None
self.ownership_data = None
@ -114,6 +121,11 @@ class Ownership:
self.matched_addresses = None
self.land_registry_matches = None
# Final outputs data
self.portfolio_owners = None
self.portfolio_properties = None
self.portfolio_epc_data = None
def pipeline(self, column_filters=None):
"""
Runs the full ownership process
@ -917,24 +929,64 @@ class Ownership:
~self.matched_addresses["sale_lodged_recently"]
]
logger.info("Performing conservation area and listed/herigage building filtering")
portfolio_spatial_data = OpenUprnClient.get_spatial_data(
self.epc_data["UPRN"].tolist(), bucket_name="retrofit-data-dev"
)
portfolio_spatial_data = portfolio_spatial_data[
["UPRN", "conservation_status", "is_listed_building", "is_heritage_building"]
]
# Filter matched_addresses_final and filter combined_matching_lookup_final
matched_addresses_final = matched_addresses_final.merge(
portfolio_spatial_data, how="left", on="UPRN"
)
matched_addresses_final = matched_addresses_final[
~matched_addresses_final["conservation_status"] &
~matched_addresses_final["is_listed_building"] &
~matched_addresses_final["is_heritage_building"]
]
# Filter combined_matching_lookup accordingly
combined_matching_lookup_final = self.combined_matching_lookup[
self.combined_matching_lookup["UPRN"].isin(self.combined_matching_lookup["UPRN"])
]
# Roll up portfolio
combined_aggregate = self.aggregate_matches(
matching_lookup=combined_matching_lookup_final,
company_ownership=self.ownership_data,
properties=self.epc_paths
)
investment_owners = combined_aggregate[combined_aggregate["cumulative_value"] <= self.portfolio_value]
self.portfolio_owners = combined_aggregate[combined_aggregate["cumulative_value"] <= self.portfolio_value]
investment_properties = matched_addresses_final[
self.portfolio_properties = matched_addresses_final[
matched_addresses_final["Company Registration No. (1)"].isin(
investment_owners["Company Registration No. (1)"])
self.portfolio_owners["Company Registration No. (1)"]
)
]
portfolio_epc_data = self.epc_data[self.epc_data["UPRN"].isin(investment_properties["UPRN"])]
self.portfolio_epc_data = self.epc_data[self.epc_data["UPRN"].isin(self.portfolio_properties["UPRN"])]
#
logger.info("Storing final outptus")
# Store data
save_excel_to_s3(
df=self.portfolio_owners,
bucket_name=self.bucket,
file_key=self.portfolio_owners_filepath,
)
save_excel_to_s3(
df=self.portfolio_properties,
bucket_name=self.bucket,
file_key=self.portfolio_properties_filepath,
)
save_excel_to_s3(
df=self.portfolio_epc_data,
bucket_name=self.bucket,
file_key=self.portfolio_epc_data_filepath,
)

View file

@ -119,7 +119,28 @@ class OpenUprnClient:
)
@staticmethod
def set_spatial_data(input_properties: list[Property], bucket_name):
def make_uprn_map(uprns, uprn_filenames):
"""
Given a list of UPRNs, this method will return a map of the UPRN to the filename that the UPRN is contained in
:param uprns: List of UPRNs
:param uprn_filenames: Lookup from UPRN range to filename
:return:
"""
uprn_map = {}
for uprn in uprns:
filtered_df = uprn_filenames[
(uprn_filenames["lower"] <= int(uprn))
& (uprn_filenames["upper"] >= int(uprn))
]
if filtered_df["filenames"].values[0] in uprn_map:
uprn_map[filtered_df["filenames"].values[0]].append(int(uprn))
else:
uprn_map[filtered_df["filenames"].values[0]] = [int(uprn)]
return uprn_map
@classmethod
def set_spatial_data(cls, input_properties: list[Property], bucket_name):
"""
Given a list of properties, this method will set the spatial data for each property
The method will look for the minimal set of uprn datasets that it needs to read in to get all of the spatial
@ -130,16 +151,8 @@ class OpenUprnClient:
bucket_name=bucket_name, file_key="spatial/filename_meta.parquet"
)
uprn_map = {}
for uprn in [p.uprn for p in input_properties]:
filtered_df = uprn_filenames[
(uprn_filenames["lower"] <= int(uprn))
& (uprn_filenames["upper"] >= int(uprn))
]
if filtered_df["filenames"].values[0] in uprn_map:
uprn_map[filtered_df["filenames"].values[0]].append(int(uprn))
else:
uprn_map[filtered_df["filenames"].values[0]] = [int(uprn)]
uprns = [p.uprn for p in input_properties]
uprn_map = cls.make_uprn_map(uprns, uprn_filenames)
for filename, associated_uprn in tqdm(uprn_map.items(), total=len(uprn_map)):
# Read in the file
@ -158,3 +171,27 @@ class OpenUprnClient:
raise Exception(f"Property with UPRN {p.uprn} does not have spatial data")
return input_properties
@classmethod
def get_spatial_data(cls, uprns: list[int], bucket_name):
"""
Similar method to set_spatial_data, but designed to work more generally on a list of uprns
:return:
"""
uprn_filenames = read_dataframe_from_s3_parquet(
bucket_name=bucket_name, file_key="spatial/filename_meta.parquet"
)
uprn_map = cls.make_uprn_map(uprns, uprn_filenames)
uprn_spatial_table = []
for filename, associated_uprn in tqdm(uprn_map.items(), total=len(uprn_map)):
# Read in the file
spatial_data = read_dataframe_from_s3_parquet(
bucket_name="retrofit-data-dev", file_key=f"spatial/{filename}"
)
spatial_df = spatial_data[spatial_data["UPRN"].isin(associated_uprn)]
uprn_spatial_table.append(spatial_df)
return pd.concat(uprn_spatial_table)