working on filtering methodology

This commit is contained in:
Khalim Conn-Kowlessar 2024-08-19 11:52:12 +01:00
parent 56889fa4b0
commit aca7e6935e
2 changed files with 159 additions and 13 deletions

View file

@ -54,7 +54,10 @@ class Ownership:
domestic_ownership_path: str,
overseas_ownership_path: str,
land_registry_path: str,
project_name: str
project_name: str,
bucket: str,
average_property_value: float,
portfolio_value: float
):
"""
@ -67,6 +70,8 @@ class Ownership:
corporate ownership of properties in the UK, where the companies are overseas
:param land_registry_path: A string that points to the location of the land registry data
:param project_name: A string that is used to identify the project
:param bucket: The name of the s3 bucket where the data will be stored
:param average_property_value: The average property value in the area
"""
# All epc paths should end with certificates.csv
@ -78,13 +83,23 @@ class Ownership:
self.land_registry_path = land_registry_path
self.run_timestamp = str(datetime.now())
self.project_name = project_name
self.bucket = bucket
self.average_property_value = average_property_value
self.portfolio_value = portfolio_value
# Data storage paths
self.epc_data_filepath = f"ownership/{project_name}/{self.run_timestamp}/epc_data.xlsx"
self.epc_data_filepath = f"ownership/{self.project_name}/{self.run_timestamp}/epc_data.xlsx"
self.filtered_land_registry_filepath = (
f"ownership/{project_name}/{self.run_timestamp}/filtered_land_registry.xlsx"
f"ownership/{self.project_name}/{self.run_timestamp}/filtered_land_registry.xlsx"
)
self.matched_addresses_pre_filter_filepath = (
f"ownership/{self.project_name}/{self.run_timestamp}/matched_addresses_pre_filter.xlsx"
)
self.combined_matching_lookup_pre_filter_filepath = (
f"ownership/{self.project_name}/{self.run_timestamp}/combined_matching_lookup_pre_filter.xlsx"
)
# Data
self.epc_data = None
self.ownership_data = None
@ -99,8 +114,40 @@ class Ownership:
self.matched_addresses = None
self.land_registry_matches = None
def pipeline(self):
pass
def pipeline(self, column_filters=None):
"""
Runs the full ownership process
:param column_filters: Dictionary with column names as keys and list of acceptable values as values. This
dictionary is is used to filter the EPC data and should look like this:
{"column_name": ["value1", "value2", ...]}, where column_name is the name of the column
in the EPC data and ["value1", "value2", ...] is a list of acceptable values for that
column. If a column is not found in the EPC data, an exception is raised.
"""
# Step 1: Get EPC data
self.source_epc_properties(column_filters=column_filters)
# Step 2: Get company ownership data
self.load_company_ownership()
# Step 3: Prepare data for matching
self.prepare_for_matching()
# Step 4: Match EPC data to ownership data
self.match()
# Step 5: Match land registry data to existing matches
self.match_with_land_registry()
# We store this data in s3 before we perform any filtering
save_excel_to_s3(
df=self.matched_addresses,
bucket_name=self.bucket,
file_key=self.matched_addresses_pre_filter_filepath
)
save_excel_to_s3(
df=self.combined_matching_lookup,
bucket_name=self.bucket,
file_key=self.combined_matching_lookup_pre_filter_filepath
)
def source_epc_properties(self, column_filters=None):
"""
@ -139,7 +186,7 @@ class Ownership:
# We now store the data in s3
save_excel_to_s3(
df=self.epc_data,
bucket_name="epc_data",
bucket_name=self.bucket,
file_key=self.epc_data_filepath
)
@ -169,7 +216,8 @@ class Ownership:
"""
logger.info("Preparing data for matching")
# Now we filter properties the other way around
# Now we filter properties the other way around, since the ownership data might not have all of the
# postcodes that appear in the EPC data
self.epc_data = self.epc_data[
self.epc_data["POSTCODE"].str.lower().isin(self.ownership_data["Postcode"].str.lower().unique())
]
@ -468,6 +516,8 @@ class Ownership:
}
)
logger.info("Matching complete - creating lookup tables")
self.freehold_matching_lookup = pd.DataFrame(freehold_matching_lookup)
self.leasehold_matching_lookup = pd.DataFrame(leasehold_matching_lookup)
@ -540,6 +590,8 @@ class Ownership:
.str.replace(",", "")
)
logger.info("Successfully completed matching")
def get_land_registry(self):
"""
This function reads in the land registry data and filters it on the postcodes found in the EPC data
@ -573,7 +625,7 @@ class Ownership:
# Store this fitereed version in s3
save_excel_to_s3(
df=self.land_registry,
bucket_name="epc_data",
bucket_name=self.bucket,
file_key=self.filtered_land_registry_filepath,
)
@ -780,6 +832,7 @@ class Ownership:
self.land_registry_matches = pd.DataFrame(land_registry_matches)
logger.info("Sucessfully completed land registry matching - merging onto matched_addresses")
# Merge onto the EPC - ownership matches
self.matched_addresses = self.matched_addresses.merge(
land_registry_matches,
@ -803,5 +856,85 @@ class Ownership:
(self.matched_addresses["TRANSACTION_TYPE"].isin(["marketed sale", "non marketed sale"]))
)
def filter_matches(self):
pass
def aggregate_matches(self, matching_lookup, company_ownership, properties):
df = matching_lookup.merge(
company_ownership, how="left", on="Title Number"
).merge(
properties[["UPRN", "LOCAL_AUTHORITY_LABEL"]], how="left", on="UPRN"
)
counts = (
df.groupby(["Company Registration No. (1)", "LOCAL_AUTHORITY_LABEL"])["UPRN"]
.count()
.reset_index(name="number_of_properties")
)
counts = counts.sort_values("number_of_properties", ascending=False)
pivot_counts = counts.pivot_table(
index=["Company Registration No. (1)"], # Rows: companies and proprietors
columns="LOCAL_AUTHORITY_LABEL", # Columns: each local authority
values="number_of_properties", # The counts of properties
fill_value=0 # Fill missing values with 0 (where there are no properties owned)
).reset_index()
total_counts = (
df.groupby(["Company Registration No. (1)"])["UPRN"]
.count()
.reset_index(name="total_number_of_properties")
)
# We have cases where the same company registration number results in the same company name, so we produce a
# best
# name per company registration number
best_names = (
df.groupby(["Company Registration No. (1)"])["Proprietor Name (1)"]
.first()
.reset_index()
)
total_counts = best_names.merge(
total_counts, how="left", on=["Company Registration No. (1)"]
)
pivot_counts = pivot_counts.merge(
total_counts, how="left", on=["Company Registration No. (1)"]
)
pivot_counts = pivot_counts.sort_values("total_number_of_properties", ascending=False)
pivot_counts["approx_value"] = self.average_property_value * pivot_counts["total_number_of_properties"]
pivot_counts["cumulative_value"] = pivot_counts["approx_value"].cumsum()
return pivot_counts
def create_final_matches(self):
"""
Given the matching to this point, this method creates the final matching tables
:return:
"""
logger.info("Creating final matches")
matched_addresses_final = self.matched_addresses[
~self.matched_addresses["sold_recently"] &
~self.matched_addresses["sale_lodged_recently"]
]
# Filter combined_matching_lookup accordingly
combined_matching_lookup_final = self.combined_matching_lookup[
self.combined_matching_lookup["UPRN"].isin(self.combined_matching_lookup["UPRN"])
]
combined_aggregate = self.aggregate_matches(
matching_lookup=combined_matching_lookup_final,
company_ownership=self.ownership_data,
properties=self.epc_paths
)
investment_owners = combined_aggregate[combined_aggregate["cumulative_value"] <= self.portfolio_value]
investment_properties = matched_addresses_final[
matched_addresses_final["Company Registration No. (1)"].isin(
investment_owners["Company Registration No. (1)"])
]
portfolio_epc_data = self.epc_data[self.epc_data["UPRN"].isin(investment_properties["UPRN"])]
#

View file

@ -49,16 +49,29 @@ OVERSEAS_OWNERSHIP_PATH = "/Users/khalimconn-kowlessar/Downloads/OCOD_FULL_2024_
LAND_REGISTRY_PATH = "/Users/khalimconn-kowlessar/Downloads/pp-complete.csv"
PROJECT_NAME = "Midlands Portfolio"
DATA_BUCKET = "retrofit-data-dev"
# We use this as a rough figure, which helps us shape the portfolio
PROPERTY_VALUE_ESTIMATE = 200_000
# We want a 50m portfolio, but we create a bigger portfolio that needed, since properties will be filtered out
PORTFOLIO_VALUE = 75_000_000
def app():
epc_column_filters = {
"CURRENT_ENERGY_RATING": ["F", "G"]
}
ownership_instance = Ownership(
epc_paths=EPC_PATHS,
domestic_ownership_path=DOMESTIC_OWNERSHIP_PATH,
overseas_ownership_path=OVERSEAS_OWNERSHIP_PATH,
land_registry_path=LAND_REGISTRY_PATH,
project_name=PROJECT_NAME
project_name=PROJECT_NAME,
bucket=DATA_BUCKET,
average_property_value=PROPERTY_VALUE_ESTIMATE,
portfolio_value=PORTFOLIO_VALUE
)
ownership_instance.pipeline()
ownership_instance.pipeline(column_filters=epc_column_filters)
# TODO: Create portfolio and payload