From 5cb35e1d9eb3beec22d772293208fef09c18fbba Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Thu, 2 May 2024 18:33:25 +0100 Subject: [PATCH] working on property ownership pipeline --- backend/SearchEpc.py | 13 +- etl/customers/goldman/property_ownership.py | 369 ++++++++++++++++-- etl/customers/vander_elliot/__init__.py | 0 .../vander_elliot/single_property_pilot.py | 56 +++ recommendations/HeatingRecommender.py | 14 +- recommendations/Recommendations.py | 2 +- recommendations/SolarPvRecommendations.py | 2 +- 7 files changed, 418 insertions(+), 38 deletions(-) create mode 100644 etl/customers/vander_elliot/__init__.py create mode 100644 etl/customers/vander_elliot/single_property_pilot.py diff --git a/backend/SearchEpc.py b/backend/SearchEpc.py index 06eea258..db9ec4ff 100644 --- a/backend/SearchEpc.py +++ b/backend/SearchEpc.py @@ -196,6 +196,13 @@ class SearchEpc: This method uses the usaddress library to parse an address and extract the primary house or flat number. """ try: + + # Custom regex to catch a broad range of cases + pattern = r'(?i)(?:flat|apartment)\s*(\d+)|^\s*(\d+)' + match = re.search(pattern, address) + if match: + return next(g for g in match.groups() if g is not None) + parsed = usaddress.parse(address) # First, try to get the 'OccupancyIdentifier' if 'OccupancyType' is detected for part, type_ in parsed: @@ -208,12 +215,6 @@ class SearchEpc: if address_number: return address_number.replace(",", "") # Remove any trailing commas - # Further fallback to custom regex (in case usaddress completely fails) - pattern = r'(?i)(?:flat|apartment)\s*(\d+)|^\s*(\d+)' - match = re.search(pattern, address) - if match: - return next(g for g in match.groups() if g is not None) - except Exception as e: print(f"Error parsing address: {e}") diff --git a/etl/customers/goldman/property_ownership.py b/etl/customers/goldman/property_ownership.py index 4a6faede..abc2645d 100644 --- a/etl/customers/goldman/property_ownership.py +++ b/etl/customers/goldman/property_ownership.py @@ -1,27 +1,248 @@ +import re import pandas as pd from tqdm import tqdm +import Levenshtein from backend.SearchEpc import SearchEpc +# Average value of a property in the midlands in 2024 was £238,000. Since these are EPC F & G properties, we assume +# £207,000 since they trade at a discount. This is based on the rightmove study where moving from an EPC F/G -> C has a +# +15% impact on valuation and D -> C has a +3% impact on valuation. +# The mode EPC rating is D, so we associate the £238k valuation with an EPC D property +# Therefore value_of_F * 1.15 = value_of_D * 1.03 +# Therefore value_of_F = value_of_D * 1.03/1.15 = 238k * (1.03/1.15) = 213165 +PROPERTY_VALUE_ESTIMATE = 213_165 -def aggregate_matches(matching_lookup, company_ownership): - df = matching_lookup.merge(company_ownership, how="left", on="Title Number") + +def aggregate_matches(matching_lookup, company_ownership, properties): + df = matching_lookup.merge( + company_ownership, how="left", on="Title Number" + ).merge( + properties[["UPRN", "LOCAL_AUTHORITY_LABEL"]], how="left", on="UPRN" + ) counts = ( - df.groupby(["Company Registration No. (1)", "Proprietor Name (1)"])["UPRN"] + df.groupby(["Company Registration No. (1)", "Proprietor Name (1)", "LOCAL_AUTHORITY_LABEL"])["UPRN"] .count() .reset_index(name="number_of_properties") ) counts = counts.sort_values("number_of_properties", ascending=False) - return counts + pivot_counts = counts.pivot_table( + index=["Company Registration No. (1)", "Proprietor Name (1)"], # Rows: companies and proprietors + columns="LOCAL_AUTHORITY_LABEL", # Columns: each local authority + values="number_of_properties", # The counts of properties + fill_value=0 # Fill missing values with 0 (where there are no properties owned) + ).reset_index() + + total_counts = ( + df.groupby(["Company Registration No. (1)", "Proprietor Name (1)"])["UPRN"] + .count() + .reset_index(name="total_number_of_properties") + ) + + pivot_counts = pivot_counts.merge( + total_counts, how="left", on=["Company Registration No. (1)", "Proprietor Name (1)"] + ) + + pivot_counts = pivot_counts.sort_values("total_number_of_properties", ascending=False) + + pivot_counts["approx_value"] = PROPERTY_VALUE_ESTIMATE * pivot_counts["total_number_of_properties"] + pivot_counts["cumulative_value"] = pivot_counts["approx_value"].cumsum() + + return pivot_counts + + +def find_f_g_properties(paths): + data = [] + for path in tqdm(paths): + epc_data = pd.read_csv(path, low_memory=False) + + epc_data = epc_data[~pd.isnull(epc_data["UPRN"])] + epc_data["UPRN"] = epc_data["UPRN"].astype(int).astype(str) + + # Get the newest EPC for each UPRN. We use LODGEMENT_DATE as a proxy for this + epc_data["LODGEMENT_DATETIME"] = pd.to_datetime(epc_data["LODGEMENT_DATETIME"], format='mixed') + + epc_data = epc_data.sort_values("LODGEMENT_DATETIME", ascending=False).drop_duplicates("UPRN") + + # Get G & F properties + epc_data = epc_data[epc_data["CURRENT_ENERGY_RATING"].isin(["G", "F"])] + data.append(epc_data) + + data = pd.concat(data) + + # Save as an excel + data.to_excel("EPC F & G Properties.xlsx", index=False) + + +def remove_text_in_brackets(address: str) -> str: + """ + Removes any text within parentheses, including the parentheses themselves. + + Parameters: + - address (str): The address string to clean. + + Returns: + - str: The cleaned address with text in parentheses removed. + """ + # Regex to find and remove content in parentheses + cleaned_address = re.sub(r'\s*\([^)]*\)', '', address) + return cleaned_address + + +def extract_numeric_part(house_number: str) -> str: + """ + Extracts only the numeric part from a house number that may contain letters. + + Parameters: + - house_number (str): The house number string possibly containing letters. + + Returns: + - str: The numeric part of the house number. + """ + # Use regular expression to replace all non-digit characters with nothing + numeric_part = re.sub(r'\D', '', house_number) + return numeric_part + + +def levenstein_match(matching_string, df, address_col): + match_to = df[address_col].tolist() + # Strip out punctuation and spaces + match_to = [re.sub(r'[^\w\s]', '', x) for x in match_to] + match_to = [x.replace(" ", "") for x in match_to] + + # Perform matching between full key and match_to + distances = [Levenshtein.distance(matching_string, s) for s in match_to] + best_match_index = distances.index(min(distances)) + # We might want to consider a threshold for the distance, however for the momeny, + # we don't consider this for the moment + df = df.iloc[best_match_index:best_match_index + 1] + + return df + + +def extract_range_from_house_number(house_number_range: str): + """ + Detects if the house number includes a numeric range (formatted as 'x-y') and extracts all values within this range. + Non-numeric strings containing hyphens are ignored. + + Parameters: + - house_number_range (str): The house number string that might contain a range. + + Returns: + - list of str: A list of all numbers within the range if it is a range; otherwise, returns None. + """ + + if not house_number_range: + return None + + if '-' in house_number_range: + parts = house_number_range.split('-') + if len(parts) == 2 and parts[0].isdigit() and parts[1].isdigit(): + # Both parts are numeric, so it's a valid range + start, end = map(int, parts) # Convert parts to integers + return [str(x) for x in range(start, end + 1)] + else: + # Not a valid numeric range + return None + else: + # No hyphen present or not a range + return None + + +def is_in_range(row, house_no): + """ Check if the house number is within the range provided in the row. """ + if row and any(house_no == num for num in row): + return True + return False + + +def remove_duplicate_matches(matching_lookup, properties, company_ownership): + duplicated_titles = matching_lookup[matching_lookup["Title Number"].duplicated()]["Title Number"].unique() + + to_drop = [] + for dupe_title in duplicated_titles: + dupe_data = matching_lookup[matching_lookup["Title Number"] == dupe_title].copy() + matched_addresses = dupe_data.merge( + properties[["UPRN", "ADDRESS"]].rename(columns={"ADDRESS": "epc_address"}), + how="left", on="UPRN" + ).merge( + company_ownership[["Title Number", "Property Address"]], + how="left", on="Title Number" + ) + # We perform levenstein to get the best match + best_match = levenstein_match( + matching_string=matched_addresses["Property Address"].values[0], + df=matched_addresses, + address_col="epc_address" + ) + matches_to_drop = matched_addresses[ + ~matched_addresses["UPRN"].isin(best_match["UPRN"].values) + ] + + to_drop.append( + matches_to_drop[["UPRN", "Title Number"]].copy() + ) + + to_drop = pd.concat(to_drop) + + if not to_drop.empty: + merged = pd.merge(matching_lookup, to_drop, on=['UPRN', 'Title Number'], how='left', indicator=True) + merged[merged['_merge'] == 'left_only'].drop(columns=['_merge']) + + return merged + + return matching_lookup def app(): """ This script is for scoping property ownership for EPC F & G rated properties in Birmingam, for Goldman Sachs """ + # paths = [ + # "local_data/all-domestic-certificates/domestic-E08000025-Birmingham/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E08000031-Wolverhampton/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E08000026-Coventry/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E06000016-Leicester/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E06000015-Derby/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E06000021-Stoke-on-Trent/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E06000018-Nottingham/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E07000154-Northampton/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E06000061-North-Northamptonshire/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E06000062-West-Northamptonshire/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E07000152-East-Northamptonshire/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E07000155-South-Northamptonshire/certificates.csv", + # # + # "local_data/all-domestic-certificates/domestic-E08000027-Dudley/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E08000029-Solihull/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E07000234-Bromsgrove/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E08000030-Walsall/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E08000028-Sandwell/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E06000019-Herefordshire-County-of/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E06000020-Telford-and-Wrekin/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E07000218-North-Warwickshire/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E07000222-Warwick/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E07000237-Worcester/certificates.csv", + # # East midlands + # "local_data/all-domestic-certificates/domestic-E07000035-Derbyshire-Dales/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E07000038-North-East-Derbyshire/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E07000039-South-Derbyshire/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E06000012-North-East-Lincolnshire/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E06000013-North-Lincolnshire/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E07000138-Lincoln/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E07000134-North-West-Leicestershire/certificates.csv", + # "local_data/all-domestic-certificates/domestic-E06000017-Rutland/certificates.csv", + # ] + # paths = list(set(paths)) + # find_f_g_properties(paths) - properties = pd.read_excel("Birmingham EPC F & G Properties.xlsx") + properties = pd.read_excel("EPC F & G Properties.xlsx") company_ownership = pd.read_csv("/Users/khalimconn-kowlessar/Downloads/CCOD_FULL_2024_04.csv") + company_ownership["is_overseas"] = False + overseas_company_ownership = pd.read_csv("/Users/khalimconn-kowlessar/Downloads/OCOD_FULL_2024_04 2.csv") + overseas_company_ownership["is_overseas"] = True + + company_ownership = pd.concat([company_ownership, overseas_company_ownership]) + # FIlter on relevant postcodes company_ownership = company_ownership[ company_ownership["Postcode"].str.lower().isin(properties["POSTCODE"].str.lower().unique())] @@ -29,6 +250,10 @@ def app(): # Now we filter properties the other way around properties = properties[properties["POSTCODE"].str.lower().isin(company_ownership["Postcode"].str.lower().unique())] # We end up with 7.4k entires on a postcode match, however we need to now do a direct address match + # Take just private rentals + properties = properties[ + properties["TENURE"].isin(["rental (private)", "Rented (private)", "owner-occupied", "Owner-occupied"]) + ] ignore_title_numbers = [ "WM922695", # Land at the back of 17 Plumstead Road, Birmingham (B44 0EA): relates to WM154788 @@ -36,22 +261,78 @@ def app(): "WM44948", ] company_ownership = company_ownership[~company_ownership["Title Number"].isin(ignore_title_numbers)] - # Remove entries where the address begins with the term "land adjoining": - company_ownership = company_ownership[~company_ownership["Property Address"].str.startswith("land adjoining")] + # Remove entries where the address begins with the term "land adjoining", or other records that don't reference the + # the property itself + starting_terms = [ + "land adjoining", "land on the", "land to the rear of", "land and buildings on the", + "garage adjoining", "car park adjoining", "the land adjoining", "land and buildings adjoining", + "all royal mines" + ] + for starting_term in starting_terms: + company_ownership = company_ownership[ + ~company_ownership["Property Address"].str.lower().str.startswith() + ] - freehold_matching_lookup = [] - leasehold_matching_lookup = [] + biggest_ownership = ( + company_ownership + .groupby(["Company Registration No. (1)", "Proprietor Name (1)"])["Title Number"] + .count() + .reset_index(name="n_owned_properties") + ) + biggest_ownership = biggest_ownership.sort_values("n_owned_properties", ascending=False) + + freehold_matching_lookup = [] # 634 + leasehold_matching_lookup = [] # 86 shared_leasehold_match = [] + shared_freehold_match = [] for _, address in tqdm(properties.iterrows(), total=len(properties)): + match_type = "exact" filtered = company_ownership[ company_ownership["Postcode"].str.lower() == address["POSTCODE"].lower() ].copy() - filtered["house_number"] = filtered["Property Address"].apply(SearchEpc.get_house_number) + # Remove postcode and remove trailing commas + filtered["house_number"] = ( + filtered["Property Address"] + .apply(remove_text_in_brackets) + .apply(SearchEpc.get_house_number) + .str.lower() + .str.replace(",", "") + ) house_no = SearchEpc.get_house_number(address["ADDRESS1"]) + if house_no is not None: + house_no = house_no.replace(",", "") - filtered = filtered[filtered["house_number"] == house_no] + if house_no is None: + # It's hard for us to get a reliable match + # filtered = filtered[filtered["Property Address"].str.contains(address["ADDRESS1"])] + # if filtered.shape[0] > 1: + # raise Exception("No valid - maybe we should do levenstein?") + continue + + else: + + if house_no not in filtered["house_number"].values: + # If this happens, we check house_number for a x-y range of addresses + filtered["house_number_range"] = filtered["house_number"].apply(extract_range_from_house_number) + # If we have found a house number range, we check if the house number is in the range and if not, + # we drop the row + filtered['is_in_range'] = filtered['house_number_range'].apply(lambda x: is_in_range(x, house_no)) + + if filtered['is_in_range'].any(): + # If house_no is found in any range, keep only rows where it is in range + filtered = filtered[filtered['is_in_range']] + else: + # If house_no is not found in any range, filter out rows where 'house_number_range' is not None + filtered = filtered[filtered['house_number_range'].isnull()] + + # Strip out letters from house_no and house_number + house_no = extract_numeric_part(house_no) + filtered["house_number"] = filtered["house_number"].astype(str).apply(extract_numeric_part) + match_type = "approximate" + + filtered = filtered[filtered["house_number"] == house_no] if filtered.empty: continue @@ -60,7 +341,17 @@ def app(): filtered_leasehold = filtered[filtered["Tenure"] == "Leasehold"] if filtered_freehold.shape[0] > 1: - raise ValueError("Multiple freehold matches") + matched = filtered_leasehold[["Title Number"]].copy() + matched.insert(0, "UPRN", address["UPRN"]) + shared_freehold_match.append(matched) + elif not filtered_freehold.empty: + freehold_matching_lookup.append( + { + "UPRN": address["UPRN"], + "Title Number": filtered_freehold["Title Number"].values[0], + "match_type": match_type, + } + ) if filtered_leasehold.shape[0] > 1: matched = filtered_leasehold[["Title Number"]].copy() @@ -70,20 +361,52 @@ def app(): leasehold_matching_lookup.append( { "UPRN": address["UPRN"], - "Title Number": filtered_leasehold["Title Number"].values[0] - } - ) - - if not filtered_freehold.empty: - freehold_matching_lookup.append( - { - "UPRN": address["UPRN"], - "Title Number": filtered_freehold["Title Number"].values[0] + "Title Number": filtered_leasehold["Title Number"].values[0], + "match_type": match_type, } ) freehold_matching_lookup = pd.DataFrame(freehold_matching_lookup) leasehold_matching_lookup = pd.DataFrame(leasehold_matching_lookup) + shared_leasehold_match = pd.concat(shared_leasehold_match) - freehold_aggregate = aggregate_matches(freehold_matching_lookup, company_ownership) - leasehold_aggregate = aggregate_matches(leasehold_matching_lookup, company_ownership) + # The approximate matches aren't very good + freehold_matching_lookup = freehold_matching_lookup[freehold_matching_lookup["match_type"] == "exact"] + leasehold_matching_lookup = leasehold_matching_lookup[leasehold_matching_lookup["match_type"] == "exact"] + + # There are some cases where we have duplicates + freehold_matching_lookup = remove_duplicate_matches(freehold_matching_lookup, properties, company_ownership) + leasehold_matching_lookup = remove_duplicate_matches(leasehold_matching_lookup, properties, company_ownership) + + matched_addresses = freehold_matching_lookup.merge( + properties[["UPRN", "ADDRESS"]].rename(columns={"ADDRESS": "epc_address"}), + how="left", on="UPRN" + ).merge( + company_ownership[["Title Number", "Property Address"]], + how="left", on="Title Number" + ) + + # shared_freehold_match = pd.DataFrame(shared_freehold_match) + # Strore these files + freehold_matching_lookup.to_excel("freehold_matching_lookup.xlsx") + leasehold_matching_lookup.to_excel("leasehold_matching_lookup.xlsx") + shared_leasehold_match.to_excel("shared_leasehold_match.xlsx") + # shared_freehold_match.to_excel("shared_freehold_match.xlsx") + + freehold_aggregate = aggregate_matches(freehold_matching_lookup, company_ownership, properties) + leasehold_aggregate = aggregate_matches(leasehold_matching_lookup, company_ownership, properties) + + combined_aggregate = aggregate_matches( + pd.concat([freehold_matching_lookup, leasehold_matching_lookup]), company_ownership, properties + ) + + investment_20m = combined_aggregate[combined_aggregate["cumulative_value"] <= 20_500_000] + investment_50m = combined_aggregate[combined_aggregate["cumulative_value"] <= 51_000_000] + + z = company_ownership[ + (company_ownership["Company Registration No. (1)"] == freehold_aggregate["Company Registration No. (1)"].values[ + 0]) & + (company_ownership["Title Number"].isin(freehold_matching_lookup["Title Number"].values)) + ] + + df = freehold_matching_lookup.merge(company_ownership, how="left", on="Title Number") diff --git a/etl/customers/vander_elliot/__init__.py b/etl/customers/vander_elliot/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/etl/customers/vander_elliot/single_property_pilot.py b/etl/customers/vander_elliot/single_property_pilot.py new file mode 100644 index 00000000..99624dfc --- /dev/null +++ b/etl/customers/vander_elliot/single_property_pilot.py @@ -0,0 +1,56 @@ +import pandas as pd +from utils.s3 import read_excel_from_s3 +from utils.s3 import save_csv_to_s3 + +PORTFOLIO_ID = 77 +USER_ID = 8 + +patches = [ + { + "address": "79 Perryn Road", + "postcode": "W3 7LT", + "roof-description": "Pitched, no insulation (assumed)" + } +] + + +def app(): + asset_list = [ + { + 'uprn': 12103117, + "address": "79 Perryn Road", + "postcode": "W3 7LT", + }, + + ] + + asset_list = pd.DataFrame(asset_list) + + # Store the asset list in s3 + filename = f"{USER_ID}/{PORTFOLIO_ID}/pilot.csv" + save_csv_to_s3( + dataframe=asset_list, + bucket_name="retrofit-plan-inputs-dev", + file_name=filename + ) + + # Store patches in s3 + patches_filename = f"{USER_ID}/{PORTFOLIO_ID}/patches.json" + save_csv_to_s3( + dataframe=pd.DataFrame(patches), + bucket_name="retrofit-plan-inputs-dev", + file_name=patches_filename + ) + + body = { + "portfolio_id": str(PORTFOLIO_ID), + "housing_type": "Private", + "goal": "Increase EPC", + "goal_value": "B", + "trigger_file_path": filename, + "already_installed_file_path": "", + "patches_file_path": patches_filename, + "non_invasive_recommendations_file_path": "", + "budget": None, + } + print(body) diff --git a/recommendations/HeatingRecommender.py b/recommendations/HeatingRecommender.py index b197d817..b42a9d5b 100644 --- a/recommendations/HeatingRecommender.py +++ b/recommendations/HeatingRecommender.py @@ -93,13 +93,13 @@ class HeatingRecommender: # In the future, we'll allow overrides, so that non-intrusive surveys can contradict these conditions # and either allow or prevent the recommendation of an air source heat pump - suitable_property_types = self.property.data["property-type"] in ["House", "Bungalow"] - has_air_source_heat_pump = self.property.main_heating["has_air_source_heat_pump"] - - if suitable_property_types and not has_air_source_heat_pump: - self.recommend_air_source_heat_pump( - phase=phase, has_cavity_and_loft_recommendations=has_cavity_and_loft_recommendations - ) + # suitable_property_types = self.property.data["property-type"] in ["House", "Bungalow"] + # has_air_source_heat_pump = self.property.main_heating["has_air_source_heat_pump"] + # + # if suitable_property_types and not has_air_source_heat_pump: + # self.recommend_air_source_heat_pump( + # phase=phase, has_cavity_and_loft_recommendations=has_cavity_and_loft_recommendations + # ) return diff --git a/recommendations/Recommendations.py b/recommendations/Recommendations.py index 06dc2d61..1a6d7a1c 100644 --- a/recommendations/Recommendations.py +++ b/recommendations/Recommendations.py @@ -109,7 +109,7 @@ class Recommendations: # Heating and Electical systems if "heating" not in self.exclusions: - self.heating_recommender.recommend(phase=phase) + self.heating_recommender.recommend(phase=phase, has_cavity_and_loft_recommendations=None) if ( self.heating_recommender.heating_recommendations or self.heating_recommender.heating_control_recommendations diff --git a/recommendations/SolarPvRecommendations.py b/recommendations/SolarPvRecommendations.py index b44557ab..58d4b123 100644 --- a/recommendations/SolarPvRecommendations.py +++ b/recommendations/SolarPvRecommendations.py @@ -44,7 +44,7 @@ class SolarPvRecommendations: :return: """ - is_valid_property_type = self.property.data["property-type"] in ["House", "Bungalow"] + is_valid_property_type = self.property.data["property-type"] in ["House", "Bungalow", "Maisonette"] is_valid_roof_type = ( self.property.roof["is_flat"] or self.property.roof["is_pitched"] or self.property.roof["is_roof_room"] )