set up template of ownership class

This commit is contained in:
Khalim Conn-Kowlessar 2024-08-16 12:43:16 +01:00
parent cee16b8166
commit 3aa29e18a6
2 changed files with 477 additions and 0 deletions

467
etl/ownership/Ownership.py Normal file
View file

@ -0,0 +1,467 @@
from datetime import datetime
from typing import List
from tqdm import tqdm
import pandas as pd
import Levenshtein
import re
from utils.s3 import save_excel_to_s3
from utils.logger import setup_logger
from backend.SearchEpc import SearchEpc
logger = setup_logger()
class Ownership:
# These are a number of prefix phrases, found in the ownership data. If an address begins with a any of these
# terms, we remove them
OWNERSHIP_STARTING_TERMS = [
"land adjoining", "land on the", "land to the rear of", "land and buildings on the",
"garage adjoining", "car park adjoining", "the land adjoining", "land and buildings adjoining",
"all royal mines"
]
def __init__(
self, epc_paths: List[str], domestic_ownership_path: str, overseas_ownership_path
):
"""
:param epc_paths: A list of strings, which points to the location of the EPC data to be used. TO date, this
data has been held locally, and so will require extension to read from remote locaations like
s3
:param domestic_ownership_path: A string which points to the location of the CCOD ownership data, that details
corporate ownership of properties in the UK, where the companies are UK based
:param overseas_ownership_path: A string which points to the location of the OCOD ownership data, that details
corporate ownership of properties in the UK, where the companies are overseas
"""
# All epc paths should end with certificates.csv
if not any(path for path in epc_paths if path.endswith("certificates.csv")):
raise ValueError("epc_paths contains a path that does not end with certificates.csv")
self.epc_paths = epc_paths
self.domestic_ownership_path = domestic_ownership_path
self.overseas_ownership_path = overseas_ownership_path
self.run_timestamp = str(datetime.now())
# Data
self.epc_data = None
self.ownership_data = None
self.freehold_matching_lookup = None
self.leasehold_matching_lookup = None
self.shared_freehold_match = None
self.shared_leasehold_match = None
self.combined_matching_lookup = None
self.matched_addresses = None
def source_epc_properties(self, column_filters=None):
"""
This function will filter the epc data as specified by column filers, searching across all of the EPC tables
as defined by
:param column_filters:
:return:
"""
column_filters = {} if column_filters is None else column_filters
# TODO: Do the tenure filtering here!
# ["rental (private)", "Rented (private)", "owner-occupied", "Owner-occupied"]
data = []
for path in tqdm(self.epc_paths):
epc_data = pd.read_csv(path, low_memory=False)
epc_data = epc_data[~pd.isnull(epc_data["UPRN"])]
epc_data["UPRN"] = epc_data["UPRN"].astype(int).astype(str)
if pd.isnull(pd.to_datetime(epc_data["LODGEMENT_DATETIME"], errors="coerce")).sum():
raise Exception("Lodgement datetime contains ")
# Get the newest EPC for each UPRN. We use LODGEMENT_DATE as a proxy for this
epc_data["LODGEMENT_DATETIME"] = pd.to_datetime(epc_data["LODGEMENT_DATETIME"], errors="coerce")
epc_data = epc_data.sort_values(
["LODGEMENT_DATE", "LODGEMENT_DATETIME"], ascending=False
).drop_duplicates("UPRN")
# Get G & F properties
raise Exception("IMPLEMENT ME")
epc_data = epc_data[epc_data["CURRENT_ENERGY_RATING"].isin(["G", "F"])]
data.append(epc_data)
self.epc_data = pd.concat(data)
# Save as an excel
# TODO: Implement me
save_excel_to_s3(
)
# data.to_excel("EPC F & G Properties - V2.xlsx", index=False)
def load_company_ownership(self):
"""
This function reads in the company ownership data and
:return:
"""
logger.info("Reading in company ownership data")
self.ownership_data = pd.read_csv(self.domestic_ownership_path)
self.ownership_data["is_overseas"] = False
overseas_company_ownership = pd.read_csv(self.overseas_ownership_path)
overseas_company_ownership["is_overseas"] = True
self.ownership_data = pd.concat([self.ownership_data, overseas_company_ownership])
# FIlter on relevant postcodes - this is done to reduce the large size of the ownership dataset
logger.info("Filtering ownership data on EPC postcodes")
self.ownership_data = self.ownership_data[
self.ownership_data["Postcode"].str.lower().isin(self.epc_data["POSTCODE"].str.lower().unique())
]
def prepare_for_matching(self):
"""
Given the epc properties and the ownership data, this function performs a number of operations on both datasets
to prepare them for matching
"""
logger.info("Preparing data for matching")
# Now we filter properties the other way around
self.epc_data = self.epc_data[
self.epc_data["POSTCODE"].str.lower().isin(self.ownership_data["Postcode"].str.lower().unique())
]
# We have some duplicated on UPRN
# Take the newest UPRN
self.epc_data = self.epc_data.sort_values("LODGEMENT_DATE", ascending=False).drop_duplicates("UPRN")
# Remove entries where the address begins with the term "land adjoining", or other records that don't
# reference the
# the property itself
for starting_term in self.OWNERSHIP_STARTING_TERMS:
self.ownership_data = self.ownership_data[
~self.ownership_data["Property Address"].str.lower().str.startswith(starting_term)
]
@staticmethod
def extract_numeric_part(house_number: str) -> str:
"""
Extracts only the numeric part from a house number that may contain letters.
Parameters:
- house_number (str): The house number string possibly containing letters.
Returns:
- str: The numeric part of the house number.
"""
# Use regular expression to replace all non-digit characters with nothing
numeric_part = re.sub(r'\D', '', house_number)
return numeric_part
@staticmethod
def remove_text_in_brackets(address: str) -> str:
"""
Removes any text within parentheses, including the parentheses themselves.
Parameters:
- address (str): The address string to clean.
Returns:
- str: The cleaned address with text in parentheses removed.
"""
# Regex to find and remove content in parentheses
cleaned_address = re.sub(r'\s*\([^)]*\)', '', address)
return cleaned_address
@staticmethod
def extract_range_from_house_number(house_number_range: str):
"""
Detects if the house number includes a numeric range (formatted as 'x-y') and extracts all values within this
range.
Non-numeric strings containing hyphens are ignored.
Parameters:
- house_number_range (str): The house number string that might contain a range.
Returns:
- list of str: A list of all numbers within the range if it is a range; otherwise, returns None.
"""
if not house_number_range:
return None
if '-' in house_number_range:
parts = house_number_range.split('-')
if len(parts) == 2 and parts[0].isdigit() and parts[1].isdigit():
# Both parts are numeric, so it's a valid range
start, end = map(int, parts) # Convert parts to integers
return [str(x) for x in range(start, end + 1)]
else:
# Not a valid numeric range
return None
else:
# No hyphen present or not a range
return None
@staticmethod
def is_in_range(row, house_no):
""" Check if the house number is within the range provided in the row. """
if row and any(house_no == num for num in row):
return True
return False
@staticmethod
def levenstein_match(matching_string, df, address_col):
match_to = df[address_col].tolist()
# Strip out punctuation and spaces
match_to = [re.sub(r'[^\w\s]', '', x) for x in match_to]
match_to = [x.replace(" ", "") for x in match_to]
# Perform matching between full key and match_to
distances = [Levenshtein.distance(matching_string, s) for s in match_to]
best_match_index = distances.index(min(distances))
# We might want to consider a threshold for the distance, however for the momeny,
# we don't consider this for the moment
df = df.iloc[best_match_index:best_match_index + 1]
return df
@classmethod
def remove_duplicate_matches(cls, matching_lookup, properties, company_ownership):
duplicated_titles = matching_lookup[matching_lookup["Title Number"].duplicated()]["Title Number"].unique()
to_drop = []
for dupe_title in duplicated_titles:
dupe_data = matching_lookup[matching_lookup["Title Number"] == dupe_title].copy()
matched_addresses = dupe_data.merge(
properties[["UPRN", "ADDRESS"]].rename(columns={"ADDRESS": "epc_address"}),
how="left", on="UPRN"
).merge(
company_ownership[["Title Number", "Property Address"]],
how="left", on="Title Number"
)
# We perform levenstein to get the best match
best_match = cls.levenstein_match(
matching_string=matched_addresses["Property Address"].values[0],
df=matched_addresses,
address_col="epc_address"
)
matches_to_drop = matched_addresses[
~matched_addresses["UPRN"].isin(best_match["UPRN"].values)
]
to_drop.append(
matches_to_drop[["UPRN", "Title Number"]].copy()
)
to_drop = pd.concat(to_drop) if to_drop else pd.DataFrame()
if not to_drop.empty:
merged = pd.merge(matching_lookup, to_drop, on=['UPRN', 'Title Number'], how='left', indicator=True)
merged = merged[merged['_merge'] == 'left_only'].drop(columns=['_merge'])
return merged
return matching_lookup
@classmethod
def remove_duplicate_uprn_matches(cls, matching_lookup, properties, company_ownership):
dupe_uprns = matching_lookup[matching_lookup["UPRN"].duplicated()]["UPRN"].unique().tolist()
to_drop = []
for dupe_uprn in dupe_uprns:
dupe_data = matching_lookup[matching_lookup["UPRN"] == dupe_uprn].copy()
matched_addresses = dupe_data.merge(
properties[["UPRN", "ADDRESS"]].rename(columns={"ADDRESS": "epc_address"}),
how="left", on="UPRN"
).merge(
company_ownership[["Title Number", "Property Address"]],
how="left", on="Title Number"
)
# We perform levenstein to get the best match
best_match = cls.levenstein_match(
matching_string=matched_addresses["Property Address"].values[0],
df=matched_addresses,
address_col="epc_address"
)
matches_to_drop = matched_addresses[
~matched_addresses["Title Number"].isin(best_match["Title Number"].values)
]
to_drop.append(
matches_to_drop[["UPRN", "Title Number"]].copy()
)
to_drop = pd.concat(to_drop)
if not to_drop.empty:
merged = pd.merge(matching_lookup, to_drop, on=['UPRN', 'Title Number'], how='left', indicator=True)
merged = merged[merged['_merge'] == 'left_only'].drop(columns=['_merge'])
return merged
return matching_lookup
def match(self):
if (self.epc_data is None) or (self.ownership_data is None):
raise ValueError("epc_data and ownership_data should not be null")
logger.info("Matching EPC data to ownership data")
freehold_matching_lookup = []
leasehold_matching_lookup = []
shared_leasehold_match = []
shared_freehold_match = []
for _, address in tqdm(self.epc_data.iterrows(), total=len(self.epc_data)):
match_type = "exact"
filtered = self.ownership_data[
self.ownership_data["Postcode"].str.lower() == address["POSTCODE"].lower()
].copy()
# Remove postcode and remove trailing commas
filtered["house_number"] = (
filtered["Property Address"]
.apply(self.remove_text_in_brackets)
.apply(SearchEpc.get_house_number)
.str.lower()
.str.replace(",", "")
)
house_no = SearchEpc.get_house_number(address["ADDRESS1"])
if house_no is not None:
house_no = house_no.replace(",", "")
if house_no is None:
# It's hard for us to get a reliable match
# filtered = filtered[filtered["Property Address"].str.contains(address["ADDRESS1"])]
# if filtered.shape[0] > 1:
# raise Exception("No valid - maybe we should do levenstein?")
continue
else:
if house_no not in filtered["house_number"].values:
# If this happens, we check house_number for a x-y range of addresses
filtered["house_number_range"] = filtered["house_number"].apply(
self.extract_range_from_house_number
)
# If we have found a house number range, we check if the house number is in the range and if not,
# we drop the row
filtered['is_in_range'] = filtered['house_number_range'].apply(
lambda x: self.is_in_range(x, house_no)
)
if filtered['is_in_range'].any():
# If house_no is found in any range, keep only rows where it is in range
filtered = filtered[filtered['is_in_range']]
else:
# If house_no is not found in any range, filter out rows where 'house_number_range' is not None
filtered = filtered[filtered['house_number_range'].isnull()]
# Strip out letters from house_no and house_number
house_no = self.extract_numeric_part(house_no)
filtered["house_number"] = filtered["house_number"].astype(str).apply(self.extract_numeric_part)
match_type = "approximate"
filtered = filtered[filtered["house_number"] == house_no]
if filtered.empty:
continue
filtered_freehold = filtered[filtered["Tenure"] == "Freehold"]
filtered_leasehold = filtered[filtered["Tenure"] == "Leasehold"]
if filtered_freehold.shape[0] > 1:
matched = filtered_leasehold[["Title Number"]].copy()
matched.insert(0, "UPRN", address["UPRN"])
shared_freehold_match.append(matched)
elif not filtered_freehold.empty:
freehold_matching_lookup.append(
{
"UPRN": address["UPRN"],
"Title Number": filtered_freehold["Title Number"].values[0],
"match_type": match_type,
}
)
if filtered_leasehold.shape[0] > 1:
matched = filtered_leasehold[["Title Number"]].copy()
matched.insert(0, "UPRN", address["UPRN"])
shared_leasehold_match.append(matched)
elif not filtered_leasehold.empty:
leasehold_matching_lookup.append(
{
"UPRN": address["UPRN"],
"Title Number": filtered_leasehold["Title Number"].values[0],
"match_type": match_type,
}
)
self.freehold_matching_lookup = pd.DataFrame(freehold_matching_lookup)
self.leasehold_matching_lookup = pd.DataFrame(leasehold_matching_lookup)
self.freehold_matching_lookup = self.freehold_matching_lookup[
self.freehold_matching_lookup["match_type"] == "exact"
]
self.leasehold_matching_lookup = self.leasehold_matching_lookup[
self.leasehold_matching_lookup["match_type"] == "exact"
]
self.shared_leasehold_match = shared_leasehold_match
self.shared_freehold_match = shared_freehold_match
# finally, we create matched addresses
combined_matching_lookup = pd.concat([self.freehold_matching_lookup, self.leasehold_matching_lookup])
# Remove duplicates
combined_matching_lookup = self.remove_duplicate_matches(
matching_lookup=combined_matching_lookup,
properties=self.epc_data,
company_ownership=self.ownership_data
)
# We also have duplicates at a UPRN level
self.combined_matching_lookup = self.remove_duplicate_uprn_matches(
matching_lookup=combined_matching_lookup,
properties=self.epc_data,
company_ownership=self.ownership_data
)
self.matched_addresses = combined_matching_lookup.merge(
self.epc_data[
[
"UPRN",
"ADDRESS",
"ADDRESS1",
"CURRENT_ENERGY_EFFICIENCY",
"CURRENT_ENERGY_RATING",
"POSTCODE",
"LODGEMENT_DATE",
"TRANSACTION_TYPE"
]
].rename(
columns={
"ADDRESS": "epc_address",
"ADDRESS1": "epc_address1",
"POSTCODE": "epc_postcode"
}
),
how="left", on="UPRN"
).merge(
self.ownership_data[
[
"Title Number",
"Property Address",
"Postcode",
"Company Registration No. (1)",
"Proprietor Name (1)",
"Date Proprietor Added",
]
],
how="left", on="Title Number"
)
# Let's try and get the house number
matched_addresses["house_number"] = (
matched_addresses["epc_address"]
.apply(self.remove_text_in_brackets)
.apply(SearchEpc.get_house_number)
.str.lower()
.str.replace(",", "")
)

10
etl/ownership/README.md Normal file
View file

@ -0,0 +1,10 @@
# Ownership Application
This application contains methods that allows us to attempt to discover
corporate ownership of properties, where possible.
Practically, it's likely that the code within this application will be
exported into other areas of this repository, and used to assemble
pipelines that solve specific property ownership questions, and so this
codebase is set up with the goal of providing farily easy to use, plug
and play tools.