From 3aa29e18a6629b952601c35a283e6f6dc7e66b62 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Fri, 16 Aug 2024 12:43:16 +0100 Subject: [PATCH] set up template of ownership class --- etl/ownership/Ownership.py | 467 +++++++++++++++++++++++++++++++++++++ etl/ownership/README.md | 10 + 2 files changed, 477 insertions(+) create mode 100644 etl/ownership/Ownership.py create mode 100644 etl/ownership/README.md diff --git a/etl/ownership/Ownership.py b/etl/ownership/Ownership.py new file mode 100644 index 00000000..fc5c0632 --- /dev/null +++ b/etl/ownership/Ownership.py @@ -0,0 +1,467 @@ +from datetime import datetime +from typing import List +from tqdm import tqdm +import pandas as pd +import Levenshtein +import re +from utils.s3 import save_excel_to_s3 +from utils.logger import setup_logger +from backend.SearchEpc import SearchEpc + +logger = setup_logger() + + +class Ownership: + # These are a number of prefix phrases, found in the ownership data. If an address begins with a any of these + # terms, we remove them + OWNERSHIP_STARTING_TERMS = [ + "land adjoining", "land on the", "land to the rear of", "land and buildings on the", + "garage adjoining", "car park adjoining", "the land adjoining", "land and buildings adjoining", + "all royal mines" + ] + + def __init__( + self, epc_paths: List[str], domestic_ownership_path: str, overseas_ownership_path + ): + """ + + :param epc_paths: A list of strings, which points to the location of the EPC data to be used. TO date, this + data has been held locally, and so will require extension to read from remote locaations like + s3 + :param domestic_ownership_path: A string which points to the location of the CCOD ownership data, that details + corporate ownership of properties in the UK, where the companies are UK based + :param overseas_ownership_path: A string which points to the location of the OCOD ownership data, that details + corporate ownership of properties in the UK, where the companies are overseas + """ + + # All epc paths should end with certificates.csv + if not any(path for path in epc_paths if path.endswith("certificates.csv")): + raise ValueError("epc_paths contains a path that does not end with certificates.csv") + self.epc_paths = epc_paths + self.domestic_ownership_path = domestic_ownership_path + self.overseas_ownership_path = overseas_ownership_path + + self.run_timestamp = str(datetime.now()) + + # Data + self.epc_data = None + self.ownership_data = None + self.freehold_matching_lookup = None + self.leasehold_matching_lookup = None + + self.shared_freehold_match = None + self.shared_leasehold_match = None + + self.combined_matching_lookup = None + self.matched_addresses = None + + def source_epc_properties(self, column_filters=None): + """ + This function will filter the epc data as specified by column filers, searching across all of the EPC tables + as defined by + :param column_filters: + :return: + """ + + column_filters = {} if column_filters is None else column_filters + + # TODO: Do the tenure filtering here! + # ["rental (private)", "Rented (private)", "owner-occupied", "Owner-occupied"] + + data = [] + for path in tqdm(self.epc_paths): + epc_data = pd.read_csv(path, low_memory=False) + + epc_data = epc_data[~pd.isnull(epc_data["UPRN"])] + epc_data["UPRN"] = epc_data["UPRN"].astype(int).astype(str) + + if pd.isnull(pd.to_datetime(epc_data["LODGEMENT_DATETIME"], errors="coerce")).sum(): + raise Exception("Lodgement datetime contains ") + + # Get the newest EPC for each UPRN. We use LODGEMENT_DATE as a proxy for this + epc_data["LODGEMENT_DATETIME"] = pd.to_datetime(epc_data["LODGEMENT_DATETIME"], errors="coerce") + + epc_data = epc_data.sort_values( + ["LODGEMENT_DATE", "LODGEMENT_DATETIME"], ascending=False + ).drop_duplicates("UPRN") + + # Get G & F properties + raise Exception("IMPLEMENT ME") + epc_data = epc_data[epc_data["CURRENT_ENERGY_RATING"].isin(["G", "F"])] + data.append(epc_data) + + self.epc_data = pd.concat(data) + + # Save as an excel + # TODO: Implement me + save_excel_to_s3( + + ) + # data.to_excel("EPC F & G Properties - V2.xlsx", index=False) + + def load_company_ownership(self): + """ + This function reads in the company ownership data and + :return: + """ + logger.info("Reading in company ownership data") + self.ownership_data = pd.read_csv(self.domestic_ownership_path) + self.ownership_data["is_overseas"] = False + overseas_company_ownership = pd.read_csv(self.overseas_ownership_path) + overseas_company_ownership["is_overseas"] = True + + self.ownership_data = pd.concat([self.ownership_data, overseas_company_ownership]) + + # FIlter on relevant postcodes - this is done to reduce the large size of the ownership dataset + logger.info("Filtering ownership data on EPC postcodes") + self.ownership_data = self.ownership_data[ + self.ownership_data["Postcode"].str.lower().isin(self.epc_data["POSTCODE"].str.lower().unique()) + ] + + def prepare_for_matching(self): + """ + Given the epc properties and the ownership data, this function performs a number of operations on both datasets + to prepare them for matching + """ + + logger.info("Preparing data for matching") + # Now we filter properties the other way around + self.epc_data = self.epc_data[ + self.epc_data["POSTCODE"].str.lower().isin(self.ownership_data["Postcode"].str.lower().unique()) + ] + # We have some duplicated on UPRN + # Take the newest UPRN + self.epc_data = self.epc_data.sort_values("LODGEMENT_DATE", ascending=False).drop_duplicates("UPRN") + + # Remove entries where the address begins with the term "land adjoining", or other records that don't + # reference the + # the property itself + + for starting_term in self.OWNERSHIP_STARTING_TERMS: + self.ownership_data = self.ownership_data[ + ~self.ownership_data["Property Address"].str.lower().str.startswith(starting_term) + ] + + @staticmethod + def extract_numeric_part(house_number: str) -> str: + """ + Extracts only the numeric part from a house number that may contain letters. + + Parameters: + - house_number (str): The house number string possibly containing letters. + + Returns: + - str: The numeric part of the house number. + """ + # Use regular expression to replace all non-digit characters with nothing + numeric_part = re.sub(r'\D', '', house_number) + return numeric_part + + @staticmethod + def remove_text_in_brackets(address: str) -> str: + """ + Removes any text within parentheses, including the parentheses themselves. + + Parameters: + - address (str): The address string to clean. + + Returns: + - str: The cleaned address with text in parentheses removed. + """ + # Regex to find and remove content in parentheses + cleaned_address = re.sub(r'\s*\([^)]*\)', '', address) + return cleaned_address + + @staticmethod + def extract_range_from_house_number(house_number_range: str): + """ + Detects if the house number includes a numeric range (formatted as 'x-y') and extracts all values within this + range. + Non-numeric strings containing hyphens are ignored. + + Parameters: + - house_number_range (str): The house number string that might contain a range. + + Returns: + - list of str: A list of all numbers within the range if it is a range; otherwise, returns None. + """ + + if not house_number_range: + return None + + if '-' in house_number_range: + parts = house_number_range.split('-') + if len(parts) == 2 and parts[0].isdigit() and parts[1].isdigit(): + # Both parts are numeric, so it's a valid range + start, end = map(int, parts) # Convert parts to integers + return [str(x) for x in range(start, end + 1)] + else: + # Not a valid numeric range + return None + else: + # No hyphen present or not a range + return None + + @staticmethod + def is_in_range(row, house_no): + """ Check if the house number is within the range provided in the row. """ + if row and any(house_no == num for num in row): + return True + return False + + @staticmethod + def levenstein_match(matching_string, df, address_col): + match_to = df[address_col].tolist() + # Strip out punctuation and spaces + match_to = [re.sub(r'[^\w\s]', '', x) for x in match_to] + match_to = [x.replace(" ", "") for x in match_to] + + # Perform matching between full key and match_to + distances = [Levenshtein.distance(matching_string, s) for s in match_to] + best_match_index = distances.index(min(distances)) + # We might want to consider a threshold for the distance, however for the momeny, + # we don't consider this for the moment + df = df.iloc[best_match_index:best_match_index + 1] + + return df + + @classmethod + def remove_duplicate_matches(cls, matching_lookup, properties, company_ownership): + duplicated_titles = matching_lookup[matching_lookup["Title Number"].duplicated()]["Title Number"].unique() + + to_drop = [] + for dupe_title in duplicated_titles: + dupe_data = matching_lookup[matching_lookup["Title Number"] == dupe_title].copy() + matched_addresses = dupe_data.merge( + properties[["UPRN", "ADDRESS"]].rename(columns={"ADDRESS": "epc_address"}), + how="left", on="UPRN" + ).merge( + company_ownership[["Title Number", "Property Address"]], + how="left", on="Title Number" + ) + # We perform levenstein to get the best match + best_match = cls.levenstein_match( + matching_string=matched_addresses["Property Address"].values[0], + df=matched_addresses, + address_col="epc_address" + ) + matches_to_drop = matched_addresses[ + ~matched_addresses["UPRN"].isin(best_match["UPRN"].values) + ] + + to_drop.append( + matches_to_drop[["UPRN", "Title Number"]].copy() + ) + + to_drop = pd.concat(to_drop) if to_drop else pd.DataFrame() + + if not to_drop.empty: + merged = pd.merge(matching_lookup, to_drop, on=['UPRN', 'Title Number'], how='left', indicator=True) + merged = merged[merged['_merge'] == 'left_only'].drop(columns=['_merge']) + + return merged + + return matching_lookup + + @classmethod + def remove_duplicate_uprn_matches(cls, matching_lookup, properties, company_ownership): + dupe_uprns = matching_lookup[matching_lookup["UPRN"].duplicated()]["UPRN"].unique().tolist() + + to_drop = [] + for dupe_uprn in dupe_uprns: + dupe_data = matching_lookup[matching_lookup["UPRN"] == dupe_uprn].copy() + matched_addresses = dupe_data.merge( + properties[["UPRN", "ADDRESS"]].rename(columns={"ADDRESS": "epc_address"}), + how="left", on="UPRN" + ).merge( + company_ownership[["Title Number", "Property Address"]], + how="left", on="Title Number" + ) + # We perform levenstein to get the best match + best_match = cls.levenstein_match( + matching_string=matched_addresses["Property Address"].values[0], + df=matched_addresses, + address_col="epc_address" + ) + matches_to_drop = matched_addresses[ + ~matched_addresses["Title Number"].isin(best_match["Title Number"].values) + ] + + to_drop.append( + matches_to_drop[["UPRN", "Title Number"]].copy() + ) + + to_drop = pd.concat(to_drop) + + if not to_drop.empty: + merged = pd.merge(matching_lookup, to_drop, on=['UPRN', 'Title Number'], how='left', indicator=True) + merged = merged[merged['_merge'] == 'left_only'].drop(columns=['_merge']) + + return merged + + return matching_lookup + + def match(self): + if (self.epc_data is None) or (self.ownership_data is None): + raise ValueError("epc_data and ownership_data should not be null") + + logger.info("Matching EPC data to ownership data") + freehold_matching_lookup = [] + leasehold_matching_lookup = [] + shared_leasehold_match = [] + shared_freehold_match = [] + for _, address in tqdm(self.epc_data.iterrows(), total=len(self.epc_data)): + match_type = "exact" + filtered = self.ownership_data[ + self.ownership_data["Postcode"].str.lower() == address["POSTCODE"].lower() + ].copy() + + # Remove postcode and remove trailing commas + filtered["house_number"] = ( + filtered["Property Address"] + .apply(self.remove_text_in_brackets) + .apply(SearchEpc.get_house_number) + .str.lower() + .str.replace(",", "") + ) + house_no = SearchEpc.get_house_number(address["ADDRESS1"]) + if house_no is not None: + house_no = house_no.replace(",", "") + + if house_no is None: + # It's hard for us to get a reliable match + # filtered = filtered[filtered["Property Address"].str.contains(address["ADDRESS1"])] + # if filtered.shape[0] > 1: + # raise Exception("No valid - maybe we should do levenstein?") + continue + + else: + + if house_no not in filtered["house_number"].values: + # If this happens, we check house_number for a x-y range of addresses + filtered["house_number_range"] = filtered["house_number"].apply( + self.extract_range_from_house_number + ) + # If we have found a house number range, we check if the house number is in the range and if not, + # we drop the row + filtered['is_in_range'] = filtered['house_number_range'].apply( + lambda x: self.is_in_range(x, house_no) + ) + + if filtered['is_in_range'].any(): + # If house_no is found in any range, keep only rows where it is in range + filtered = filtered[filtered['is_in_range']] + else: + # If house_no is not found in any range, filter out rows where 'house_number_range' is not None + filtered = filtered[filtered['house_number_range'].isnull()] + + # Strip out letters from house_no and house_number + house_no = self.extract_numeric_part(house_no) + filtered["house_number"] = filtered["house_number"].astype(str).apply(self.extract_numeric_part) + match_type = "approximate" + + filtered = filtered[filtered["house_number"] == house_no] + + if filtered.empty: + continue + + filtered_freehold = filtered[filtered["Tenure"] == "Freehold"] + filtered_leasehold = filtered[filtered["Tenure"] == "Leasehold"] + + if filtered_freehold.shape[0] > 1: + matched = filtered_leasehold[["Title Number"]].copy() + matched.insert(0, "UPRN", address["UPRN"]) + shared_freehold_match.append(matched) + elif not filtered_freehold.empty: + freehold_matching_lookup.append( + { + "UPRN": address["UPRN"], + "Title Number": filtered_freehold["Title Number"].values[0], + "match_type": match_type, + } + ) + + if filtered_leasehold.shape[0] > 1: + matched = filtered_leasehold[["Title Number"]].copy() + matched.insert(0, "UPRN", address["UPRN"]) + shared_leasehold_match.append(matched) + elif not filtered_leasehold.empty: + leasehold_matching_lookup.append( + { + "UPRN": address["UPRN"], + "Title Number": filtered_leasehold["Title Number"].values[0], + "match_type": match_type, + } + ) + + self.freehold_matching_lookup = pd.DataFrame(freehold_matching_lookup) + self.leasehold_matching_lookup = pd.DataFrame(leasehold_matching_lookup) + + self.freehold_matching_lookup = self.freehold_matching_lookup[ + self.freehold_matching_lookup["match_type"] == "exact" + ] + self.leasehold_matching_lookup = self.leasehold_matching_lookup[ + self.leasehold_matching_lookup["match_type"] == "exact" + ] + + self.shared_leasehold_match = shared_leasehold_match + self.shared_freehold_match = shared_freehold_match + + # finally, we create matched addresses + combined_matching_lookup = pd.concat([self.freehold_matching_lookup, self.leasehold_matching_lookup]) + + # Remove duplicates + combined_matching_lookup = self.remove_duplicate_matches( + matching_lookup=combined_matching_lookup, + properties=self.epc_data, + company_ownership=self.ownership_data + ) + # We also have duplicates at a UPRN level + self.combined_matching_lookup = self.remove_duplicate_uprn_matches( + matching_lookup=combined_matching_lookup, + properties=self.epc_data, + company_ownership=self.ownership_data + ) + + self.matched_addresses = combined_matching_lookup.merge( + self.epc_data[ + [ + "UPRN", + "ADDRESS", + "ADDRESS1", + "CURRENT_ENERGY_EFFICIENCY", + "CURRENT_ENERGY_RATING", + "POSTCODE", + "LODGEMENT_DATE", + "TRANSACTION_TYPE" + ] + ].rename( + columns={ + "ADDRESS": "epc_address", + "ADDRESS1": "epc_address1", + "POSTCODE": "epc_postcode" + } + ), + how="left", on="UPRN" + ).merge( + self.ownership_data[ + [ + "Title Number", + "Property Address", + "Postcode", + "Company Registration No. (1)", + "Proprietor Name (1)", + "Date Proprietor Added", + ] + ], + how="left", on="Title Number" + ) + + # Let's try and get the house number + matched_addresses["house_number"] = ( + matched_addresses["epc_address"] + .apply(self.remove_text_in_brackets) + .apply(SearchEpc.get_house_number) + .str.lower() + .str.replace(",", "") + ) diff --git a/etl/ownership/README.md b/etl/ownership/README.md new file mode 100644 index 00000000..38b71474 --- /dev/null +++ b/etl/ownership/README.md @@ -0,0 +1,10 @@ +# Ownership Application + +This application contains methods that allows us to attempt to discover +corporate ownership of properties, where possible. + +Practically, it's likely that the code within this application will be +exported into other areas of this repository, and used to assemble +pipelines that solve specific property ownership questions, and so this +codebase is set up with the goal of providing farily easy to use, plug +and play tools. \ No newline at end of file