From c6daf520467b0c994a67f7746b51450f36b6bea7 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Thu, 22 Feb 2024 16:00:23 +0000 Subject: [PATCH] Trying to handle streetname extraction and edge case in ciga matching --- .../ha_15_32/ha_analysis_batch_3.py | 192 +++++++++++++----- 1 file changed, 143 insertions(+), 49 deletions(-) diff --git a/etl/eligibility/ha_15_32/ha_analysis_batch_3.py b/etl/eligibility/ha_15_32/ha_analysis_batch_3.py index d27bf8e8..cb4b9885 100644 --- a/etl/eligibility/ha_15_32/ha_analysis_batch_3.py +++ b/etl/eligibility/ha_15_32/ha_analysis_batch_3.py @@ -1,4 +1,5 @@ import os +import re import openpyxl from pathlib import Path import msgpack @@ -36,6 +37,10 @@ class DataLoader: } } + UNMATCHED_CIGA = { + "HA14": 6 + } + def __init__(self, directories, use_cache): self.directories = directories self.use_cache = use_cache @@ -101,6 +106,9 @@ class DataLoader: else: split_addresses = asset_list['matching_address'].str.split(',', expand=True) house_numbers = split_addresses[0].str.split(' ', expand=True) + # If we have "flat" or valley" as the house number, then the house number is actually in the second column + house_numbers[0] = np.where(house_numbers[0].isin(["flat", "valley"]), house_numbers[1], house_numbers[0]) + # THe first column should be HouseNo - we aren't interested in the other columns, but we don't know how # many columns there might be house_numbers = house_numbers.iloc[:, 0:1] @@ -117,7 +125,7 @@ class DataLoader: :return: """ - if ha_name in ["HA6"]: + if ha_name in ["HA6", "HA14"]: split_addresses = ciga_list['Matched Address'].str.split(',', expand=True) house_numbers = split_addresses[0].str.split(' ', expand=True) # THe first column should be HouseNo - we aren't interested in the other columns, but we don't know how @@ -132,16 +140,23 @@ class DataLoader: return ciga_list @staticmethod - def get_sheetname(workbook): + def get_asset_sheetname(workbook): if "Asset List" in workbook.sheetnames: return "Asset List" else: return "Assets" + @staticmethod + def get_ciga_sheetname(workbook): + if "CIGA Checks" in workbook.sheetnames: + return "CIGA Checks" + else: + return "CIGA" + def load_asset_list(self, filepath, ha_name): workbook = openpyxl.load_workbook(filepath) - sheetname = self.get_sheetname(workbook) - asset_sheet = workbook[sheetname] + asset_sheetname = self.get_asset_sheetname(workbook) + asset_sheet = workbook[asset_sheetname] asset_sheet_colnames = [cell.value for cell in asset_sheet[1]] rows_data = [] @@ -165,41 +180,46 @@ class DataLoader: asset_list = self.append_asset_list_built_form(ha_name=ha_name, asset_list=asset_list) + # We correct the asset list if it needs it + # Correct the asset list + correction_function_name = f"correct_{ha_name.lower()}_asset_list" + if hasattr(self, correction_function_name): + asset_list_correction_function = getattr(self, f"correct_{ha_name.lower()}_asset_list") + asset_list = asset_list_correction_function(asset_list) + # We check if there is a survey list - survey_list = pd.DataFrame() - if "ECO Surveys" in workbook.sheetnames: - survey_sheet = workbook["ECO Surveys"] - survey_rows = [] - for row in survey_sheet.iter_rows(min_row=2, values_only=False): # Assuming the first row is headers - row_data = [cell.value for cell in row] # This will get you the cell values - survey_rows.append(row_data) + survey_sheetname = "ECO Surveys" + survey_sheet = workbook[survey_sheetname] + survey_rows = [] + for row in survey_sheet.iter_rows(min_row=2, values_only=False): # Assuming the first row is headers + row_data = [cell.value for cell in row] # This will get you the cell values + survey_rows.append(row_data) - survey_list = pd.DataFrame(survey_rows, columns=[cell.value for cell in survey_sheet[1]]) - # Remove columns that are None - survey_list = survey_list.loc[:, survey_list.columns.notnull()] - survey_list["survey_list_row_id"] = [ha_name + "_survey_" + str(i) for i in range(0, len(survey_list))] + survey_list = pd.DataFrame(survey_rows, columns=[cell.value for cell in survey_sheet[1]]) + # Remove columns that are None + survey_list = survey_list.loc[:, survey_list.columns.notnull()] + survey_list["survey_list_row_id"] = [ha_name + "_survey_" + str(i) for i in range(0, len(survey_list))] - # Perform survey list merge - if not survey_list.empty: - survey_list = self.merge_surveys_to_assets(asset_list, survey_list, ha_name) + # Perform survey list merge + if not survey_list.empty: + survey_list = self.merge_surveys_to_assets(asset_list, survey_list, ha_name) # We check if there are CIGA checks - ciga_list = pd.DataFrame() - if "CIGA Checks" in workbook.sheetnames: - ciga_sheet = workbook["CIGA Checks"] - ciga_rows = [] - for row in ciga_sheet.iter_rows(min_row=2, values_only=False): - row_data = [cell.value for cell in row] # This will get you the cell values - ciga_rows.append(row_data) + ciga_sheetname = self.get_ciga_sheetname(workbook) + ciga_sheet = workbook[ciga_sheetname] + ciga_rows = [] + for row in ciga_sheet.iter_rows(min_row=2, values_only=False): + row_data = [cell.value for cell in row] # This will get you the cell values + ciga_rows.append(row_data) - ciga_list = pd.DataFrame(ciga_rows, columns=[cell.value for cell in ciga_sheet[1]]) - # Remove columns that are None - ciga_list = ciga_list.loc[:, ciga_list.columns.notnull()] - survey_list["survey_list_row_id"] = [ha_name + "_ciga_" + str(i) for i in range(0, len(survey_list))] - # Perform ciga list merge - if not ciga_list.empty: - ciga_list = self.create_ciga_list_house_no(ha_name, ciga_list) - ciga_list = self.merge_ciga_to_assets(asset_list, ciga_list, ha_name) + ciga_list = pd.DataFrame(ciga_rows, columns=[cell.value for cell in ciga_sheet[1]]) + # Remove columns that are None + ciga_list = ciga_list.loc[:, ciga_list.columns.notnull()] + ciga_list["ciga_list_row_id"] = [ha_name + "_ciga_" + str(i) for i in range(0, len(ciga_list))] + # Perform ciga list merge + if not ciga_list.empty: + ciga_list = self.create_ciga_list_house_no(ha_name, ciga_list) + ciga_list = self.merge_ciga_to_assets(asset_list, ciga_list, ha_name) return asset_list, survey_list, ciga_list @@ -222,6 +242,21 @@ class DataLoader: @staticmethod def correct_ha14_asset_list(asset_list): + + # For 5 Queens Court, DE72 3NP, the postcode is actually DE72 3QZ + asset_list.loc[ + (asset_list["Address 1"] == "5 Queens Court") & + (asset_list["Postcode"].str.strip() == "DE72 3NP"), + "matching_postcode" + ] = "DE72 3QZ" + + # We then correct the matching_address + asset_list.loc[ + (asset_list["Address 1"] == "5 Queens Court") & + (asset_list["Postcode"].str.strip() == "DE72 3NP"), + "matching_address" + ] = "5 queens court, garfield avenue, draycott, derby, de72 3qz" + return asset_list @staticmethod @@ -363,13 +398,22 @@ class DataLoader: "Oiliver Road", "Oliver Road" ) + # For postodes DE7 4FB, DE7 4EZ, it's actually spelled WINDERMERE AVENUE, not WINDEREMERE AVENUE (without the + # extra e) + survey_list.loc[ + (survey_list["Street / Block Name"] == "WINDEREMERE AVENUE") & + (survey_list["Post Code"].isin(["DE7 4FB", "DE7 4EZ"])), + "Street / Block Name" + ] = "WINDERMERE AVENUE" + + survey_list["Street / Block Name"] = survey_list["Street / Block Name"].str.replace( + "MACDONALD SQAURE", "MACDONALD SQUARE" + ) + return survey_list def merge_surveys_to_assets(self, asset_list, survey_list, ha_name): - # Correct the asset list - asset_list_correction_function = getattr(self, f"correct_{ha_name.lower()}_asset_list") - asset_list = asset_list_correction_function(asset_list) # Correct the survey list survey_list_correction_function = getattr(self, f"correct_{ha_name.lower()}_survey_list") survey_list = survey_list_correction_function(survey_list) @@ -411,7 +455,7 @@ class DataLoader: print(row["Street / Block Name"]) print(house_number) - print(row["Post Code"].lower()) + print(row["Post Code"]) raise ValueError("Investigate") matching_lookup.append( @@ -428,8 +472,38 @@ class DataLoader: return survey_list + @staticmethod + def extract_streetname(address, house_number=None, postcode=None): + """ + Cleans an address by removing the house number and postcode, and converts everything to lower case. + + :param address: The full address as a string. + :param house_number: The house number to remove, as a string or integer. + :param postcode: The postcode to remove, as a string. + :return: The cleaned address. + """ + # Convert everything to lower case + address = address.lower() + + if house_number is not None: + # Remove the house number + address = re.sub(r'\b{}\b'.format(house_number), '', address, flags=re.IGNORECASE).strip() + + if postcode is not None: + # Remove the postcode + address = re.sub(r'\b{}\b'.format(re.escape(postcode)), '', address, flags=re.IGNORECASE).strip() + + # Get first section before a comma + address = address.split(",")[0] + # Additional cleaning to remove extra spaces and commas left over + address = re.sub(r'\s+', ' ', address) # Replace multiple spaces with a single space + address = re.sub(r'\s*,\s*', ', ', address) # Clean up space around commas + + return address + def merge_ciga_to_assets(self, asset_list, ciga_list, ha_name): matching_lookup = [] + unmatched_addresses = [] for _, row in tqdm(ciga_list.iterrows(), total=len(ciga_list)): house_number = row["HouseNo"] @@ -442,22 +516,35 @@ class DataLoader: ].copy() df = df[df["HouseNo"] == str(house_number)] + # For ciga, we skip + if df.empty: + if row["Matched Postcode"] == "LE3 3EE": + dew + unmatched_addresses.append( + { + "ciga_list_row_id": row["ciga_list_row_id"], + "HouseNo": house_number, + "Matched Postcode": row["Matched Postcode"] + } + ) + continue # TODO: Might need to consider street name at some point if df.shape[0] != 1: - if df.shape[0] != 1: - df = df[df["matching_postcode"].str.lower().str.contains(row["Post Code"].lower())] - if df.shape[0] != 1: - postcode_lower = row["Post Code"].lower() - # if postcode_lower in missed_postcodes: - # matching_lookup.append( - # { - # "survey_list_row_id": row["survey_list_row_id"], - # "asset_list_row_id": None, - # } - # ) - # continue + # We split house number and postcode out of the matched address for ciga + street_name = self.extract_streetname( + address=row["Matched Address"], house_number=house_number, postcode=row["Matched Postcode"] + ) + df = df[df["matching_address"].str.contains(street_name)] + if df.shape[0] != 1: + # The final check we do here is to check for the presence of flat in the address + if "flat" in row["Matched Address"]: + df = df[df["matching_address"].str.contains("flat")] + else: + df = df[df["matching_address"].str.contains("flat") == False] + + if df.shape[0] != 1: print(row["Street / Block Name"]) print(house_number) print(row["Post Code"].lower()) @@ -470,6 +557,13 @@ class DataLoader: } ) + # We have an acceptable number of ciga failures for each HA + if len(unmatched_addresses) != self.UNMATCHED_CIGA[ha_name]: + raise ValueError(f"Unmatched addresses for {ha_name} is not as expected") + + # In ciga: 35 Valley Drive, Leicester, LE3 3EE + # + matching_lookup = pd.DataFrame(matching_lookup) # Merge onto the ciga list