diff --git a/etl/ownership/Ownership.py b/etl/ownership/Ownership.py index fc5c0632..90abe147 100644 --- a/etl/ownership/Ownership.py +++ b/etl/ownership/Ownership.py @@ -20,8 +20,20 @@ class Ownership: "all royal mines" ] + # anything that is sold within this many months is flagged to have sold recently and is then + # considered to be dropped from matching + SOLD_RECENTLY_MONTHS = 12 + + # Anything that has been lodged for a marketed or unmarketed sale within this many months is + # flagged as potentially in the process of being sold + LODGED_RECENTLY_MONTHS = 12 + def __init__( - self, epc_paths: List[str], domestic_ownership_path: str, overseas_ownership_path + self, + epc_paths: List[str], + domestic_ownership_path: str, + overseas_ownership_path: str, + land_registry_path: str ): """ @@ -32,6 +44,7 @@ class Ownership: corporate ownership of properties in the UK, where the companies are UK based :param overseas_ownership_path: A string which points to the location of the OCOD ownership data, that details corporate ownership of properties in the UK, where the companies are overseas + :param land_registry_path: A string that points to the location of the land registry data """ # All epc paths should end with certificates.csv @@ -40,6 +53,7 @@ class Ownership: self.epc_paths = epc_paths self.domestic_ownership_path = domestic_ownership_path self.overseas_ownership_path = overseas_ownership_path + self.land_registry_path = land_registry_path self.run_timestamp = str(datetime.now()) @@ -48,12 +62,17 @@ class Ownership: self.ownership_data = None self.freehold_matching_lookup = None self.leasehold_matching_lookup = None - self.shared_freehold_match = None self.shared_leasehold_match = None + self.land_registry = None + # Match tables self.combined_matching_lookup = None self.matched_addresses = None + self.land_registry_matches = None + + def pipeline(self): + pass def source_epc_properties(self, column_filters=None): """ @@ -301,6 +320,36 @@ class Ownership: return matching_lookup + @staticmethod + def is_substring(x, match_string): + if pd.isnull(x): + return False + return x in match_string.lower() + + @staticmethod + def house_number_match(paon, house_number): + # Firstly try and convert to numberic + try: + paon_numeric = int(paon) + house_number_numeric = int(house_number) + return paon_numeric == house_number_numeric + except Exception as e: # noqa + # If we can't convert both to numeric, we do an equality + + return paon == house_number + + @staticmethod + def check_equalities(lr_filtered): + all_paon_equal = all(lr_filtered["paon"] == lr_filtered["paon"].values[0]) + if pd.isnull(lr_filtered["saon"].values[0]): + all_saon_equal = all(pd.isnull(lr_filtered["saon"])) + else: + all_saon_equal = all(lr_filtered["saon"] == lr_filtered["saon"].values[0]) + + all_street_equal = all(lr_filtered["street"] == lr_filtered["street"].values[0]) + + return all_paon_equal, all_saon_equal, all_street_equal + def match(self): if (self.epc_data is None) or (self.ownership_data is None): raise ValueError("epc_data and ownership_data should not be null") @@ -458,10 +507,249 @@ class Ownership: ) # Let's try and get the house number - matched_addresses["house_number"] = ( - matched_addresses["epc_address"] + self.matched_addresses["house_number"] = ( + self.matched_addresses["epc_address"] .apply(self.remove_text_in_brackets) .apply(SearchEpc.get_house_number) .str.lower() .str.replace(",", "") ) + + def match_with_land_registry(self): + """ + This function matches the land registry data to the existing matches + :return: + """ + # TODO: Refactor this + if self.matched_addresses is None: + raise ValueError("Run match() first!") + + self.land_registry = pd.read_csv(self.land_registry_path) + + for col in ["postcode", "street", "paon", "saon"]: + self.land_registry[col] = self.land_registry[col].str.lower().str.strip() + + self.land_registry["date_of_transfer"] = pd.to_datetime(self.land_registry["date_of_transfer"]) + + land_registry_matches = [] + for _, match in tqdm(self.matched_addresses.iterrows(), total=len(self.matched_addresses)): + # Filter land registry on the postcode + lr_filtered = self.land_registry[ + (self.land_registry["postcode"] == match["epc_postcode"].lower().strip()) + ].copy() + + # Filter further, when the street is in in the address + # street should be contained in epc_address + lr_filtered = lr_filtered[ + lr_filtered["street"].apply(lambda x: self.is_substring(x, match["epc_address"].lower())) | + lr_filtered["street"].apply(lambda x: self.is_substring(x, match["Property Address"].lower())) + ] + + if lr_filtered.empty: + continue + + # We now check if paon is in address 1 + lr_filtered["paon_match"] = lr_filtered["paon"].apply( + lambda x: self.house_number_match(x, match["house_number"]) + ) + # We also try the secondary match + lr_filtered["saon_match"] = ( + lr_filtered["saon"].apply( + lambda x: False if pd.isnull(x) else self.is_substring(x, match["epc_address1"]) + ) + ) + # We fileter where we have a primary or secondary match + lr_filtered = lr_filtered[ + lr_filtered["paon_match"] | lr_filtered["saon_match"] + ] + + if lr_filtered.empty: + continue + elif lr_filtered.shape[0] == 1: + land_registry_matches.append( + { + "uprn": match["UPRN"], + "transaction_id": lr_filtered['transaction_id'].values[0], + "price": lr_filtered["price"].values[0], + "date_of_transfer": lr_filtered["date_of_transfer"].values[0], + } + ) + continue + elif lr_filtered.shape[0] > 1: + # We make sure all records are the same and take the newest + all_paon_equal, all_saon_equal, all_street_equal = self.check_equalities(lr_filtered) + has_paon_match = any(lr_filtered["paon_match"]) + + if all_paon_equal and all_street_equal and all_saon_equal: + # Take the newest record, append and continue + lr_filtered = lr_filtered.sort_values("date_of_transfer", ascending=False) + lr_filtered = lr_filtered.head(1) + land_registry_matches.append( + { + "uprn": match["UPRN"], + "transaction_id": lr_filtered['transaction_id'].values[0], + "price": lr_filtered["price"].values[0], + "date_of_transfer": lr_filtered["date_of_transfer"].values[0], + } + ) + continue + elif has_paon_match and all_street_equal: + # Peform filter on paon + lr_filtered = lr_filtered[lr_filtered["paon_match"]] + # Do an addtiioanl equality check + all_paon_equal, all_saon_equal, all_street_equal = self.check_equalities(lr_filtered) + if all_paon_equal and all_street_equal and all_saon_equal: + lr_filtered = lr_filtered.sort_values("date_of_transfer", ascending=False) + lr_filtered = lr_filtered.head(1) + land_registry_matches.append( + { + "uprn": match["UPRN"], + "transaction_id": lr_filtered['transaction_id'].values[0], + "price": lr_filtered["price"].values[0], + "date_of_transfer": lr_filtered["date_of_transfer"].values[0], + } + ) + else: + # We do a match on saon + lr_filtered["saon_match2"] = lr_filtered["saon"].apply( + lambda x: False if pd.isnull(x) else self.is_substring(x, match["epc_address"]) + ) + + lr_filtered = lr_filtered[lr_filtered["saon_match2"]] + + if lr_filtered.empty: + continue + elif lr_filtered.shape[0] == 1: + land_registry_matches.append( + { + "uprn": match["UPRN"], + "transaction_id": lr_filtered['transaction_id'].values[0], + "price": lr_filtered["price"].values[0], + "date_of_transfer": lr_filtered["date_of_transfer"].values[0], + } + ) + continue + else: + raise NotImplementedError("wtf") + else: + # We have a final check, based on an observed case + lr_address_1 = " ".join([x.lower().strip() for x in match["Property Address"].split(",")[0:2]]) + + lr_filtered["paon_match2"] = lr_filtered["paon"].apply( + lambda x: False if pd.isnull(x) else self.is_substring(x, lr_address_1) + ) + + lr_filtered = lr_filtered[lr_filtered["paon_match2"]] + + if lr_filtered.empty: + continue + elif lr_filtered.shape[0] == 1: + land_registry_matches.append( + { + "uprn": match["UPRN"], + "transaction_id": lr_filtered['transaction_id'].values[0], + "price": lr_filtered["price"].values[0], + "date_of_transfer": lr_filtered["date_of_transfer"].values[0], + } + ) + continue + else: + # Check all the same + all_paon_equal, all_saon_equal, all_street_equal = self.check_equalities(lr_filtered) + + # Check saon is house number with exact match + lr_filtered["saon_match2"] = lr_filtered["saon"].apply( + lambda x: False if pd.isnull(x) else self.house_number_match(x, match["house_number"]) + ) + # We check if we have a flat + match_flat_number = re.match("flat (\d+)", match["epc_address1"].lower()) + match_apartment_number = re.match("apartment (\d+)", match["epc_address1"].lower()) + lr_filtered["saon_match3"] = False + if match_flat_number is not None: + # Get out the match + match_flat_number = "flat " + match_flat_number.group(1) + lr_filtered["saon_match3"] = lr_filtered["saon"].apply( + lambda x: False if pd.isnull(x) else x == match_flat_number + ) + + if match_apartment_number is not None: + # Get out the match + match_apartment_number = "apartment " + match_apartment_number.group(1) + lr_filtered["saon_match3"] = lr_filtered["saon"].apply( + lambda x: False if pd.isnull(x) else x == match_apartment_number + ) + + if all_paon_equal and all_saon_equal and all_street_equal: + # Take the newest record + lr_filtered = lr_filtered.sort_values("date_of_transfer", ascending=False) + lr_filtered = lr_filtered.head(1) + land_registry_matches.append( + { + "uprn": match["UPRN"], + "transaction_id": lr_filtered['transaction_id'].values[0], + "price": lr_filtered["price"].values[0], + "date_of_transfer": lr_filtered["date_of_transfer"].values[0], + } + ) + continue + elif any(lr_filtered["saon_match2"]): + lr_filtered = lr_filtered[lr_filtered["saon_match2"]] + all_saon_equal, all_paon_equal, all_street_equal = self.check_equalities(lr_filtered) + if all_paon_equal and all_saon_equal and all_street_equal: + # Filter on the newest record + lr_filtered = lr_filtered.sort_values("date_of_transfer", ascending=False) + lr_filtered = lr_filtered.head(1) + if lr_filtered.shape[0] == 1: + land_registry_matches.append( + { + "uprn": match["UPRN"], + "transaction_id": lr_filtered['transaction_id'].values[0], + "price": lr_filtered["price"].values[0], + "date_of_transfer": lr_filtered["date_of_transfer"].values[0], + } + ) + continue + elif any(lr_filtered["saon_match3"]): + lr_filtered = lr_filtered[lr_filtered["saon_match3"]] + if lr_filtered.shape[0] == 1: + land_registry_matches.append( + { + "uprn": match["UPRN"], + "transaction_id": lr_filtered['transaction_id'].values[0], + "price": lr_filtered["price"].values[0], + "date_of_transfer": lr_filtered["date_of_transfer"].values[0], + } + ) + continue + + raise NotImplementedError("wtf") + else: + raise NotImplementedError("What happened here?") + + self.land_registry_matches = pd.DataFrame(land_registry_matches) + + # Merge onto the EPC - ownership matches + self.matched_addresses = self.matched_addresses.merge( + land_registry_matches, + how="left", + left_on="UPRN", + right_on="uprn" + ).drop(columns=["uprn"]) + + # Flag anything that sold in the last year + self.matched_addresses["sold_recently"] = ( + self.matched_addresses["date_of_transfer"] >= pd.Timestamp.now() - + pd.DateOffset(month=self.SOLD_RECENTLY_MONTHS) + ) + + self.matched_addresses["sale_lodged_recently"] = ( + ( + pd.to_datetime( + self.matched_addresses["LODGEMENT_DATE"] + ) >= pd.Timestamp.now() - pd.DateOffset(months=self.LODGED_RECENTLY_MONTHS) + ) & + (self.matched_addresses["TRANSACTION_TYPE"].isin(["marketed sale", "non marketed sale"])) + ) + + def filter_matches(self): + pass