From c90c6d860b668f4d1960e4380ec170be1b95ddb1 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Thu, 25 Jul 2024 23:56:36 +0100 Subject: [PATCH] starting looking at sfr --- .../functions/energy_assessment_functions.py | 30 ++++++-- etl/bill_savings/data_collection.py | 4 +- etl/customers/goldman/property_ownership.py | 75 +++++++++++++++---- etl/xml_survey_extraction/XmlParser.py | 2 + etl/xml_survey_extraction/app.py | 7 +- 5 files changed, 94 insertions(+), 24 deletions(-) diff --git a/backend/app/db/functions/energy_assessment_functions.py b/backend/app/db/functions/energy_assessment_functions.py index 8befe903..0970b71f 100644 --- a/backend/app/db/functions/energy_assessment_functions.py +++ b/backend/app/db/functions/energy_assessment_functions.py @@ -5,21 +5,35 @@ from sqlalchemy.exc import IntegrityError def bulk_insert_energy_assessments(session: Session, data_list): """ - This function inserts multiple energy assessment records into the database. + This function inserts or updates multiple energy assessment records into the database. - :param session: The database session + :param session: The SQLAlchemy session. :param data_list: A list of dictionaries containing energy assessment data. """ - try: - # Map dictionaries to EnergyAssessment instances - assessments = [EnergyAssessment(**data) for data in data_list] + for data in data_list: + uprn = data.get('uprn') + lodgement_date = data.get('lodgement_date') + + # Check if a record with the same uprn and lodgement_date exists + existing_record = session.query(EnergyAssessment).filter_by( + uprn=uprn, + lodgement_date=lodgement_date + ).first() + + if existing_record: + # Update the existing record with new data + for key, value in data.items(): + setattr(existing_record, key, value) + session.add(existing_record) + else: + # Insert a new record + new_assessment = EnergyAssessment(**data) + session.add(new_assessment) - # Add all instances to the session - session.add_all(assessments) # Commit the transaction session.commit() - print("All records inserted successfully.") + print("All records inserted or updated successfully.") except IntegrityError as e: # Rollback the session in case of error diff --git a/etl/bill_savings/data_collection.py b/etl/bill_savings/data_collection.py index 6095741f..e6f6de6f 100644 --- a/etl/bill_savings/data_collection.py +++ b/etl/bill_savings/data_collection.py @@ -133,8 +133,8 @@ def app(): energy_consumption_data = [] for i, directory in tqdm(enumerate(epc_directories), total=len(epc_directories)): # Skip the first 50 - # if i < 344: - # continue + if i < 57: + continue data = pd.read_csv(directory / "certificates.csv", low_memory=False) # Rename the columns to the same format as the api returns diff --git a/etl/customers/goldman/property_ownership.py b/etl/customers/goldman/property_ownership.py index 500963a1..1b1cf014 100644 --- a/etl/customers/goldman/property_ownership.py +++ b/etl/customers/goldman/property_ownership.py @@ -73,7 +73,7 @@ def find_f_g_properties(paths): epc_data["UPRN"] = epc_data["UPRN"].astype(int).astype(str) # Get the newest EPC for each UPRN. We use LODGEMENT_DATE as a proxy for this - epc_data["LODGEMENT_DATETIME"] = pd.to_datetime(epc_data["LODGEMENT_DATETIME"], format='mixed') + epc_data["LODGEMENT_DATETIME"] = pd.to_datetime(epc_data["LODGEMENT_DATETIME"], format='mixed', errors="coerce") epc_data = epc_data.sort_values("LODGEMENT_DATETIME", ascending=False).drop_duplicates("UPRN") @@ -84,7 +84,7 @@ def find_f_g_properties(paths): data = pd.concat(data) # Save as an excel - data.to_excel("EPC F & G Properties.xlsx", index=False) + data.to_excel("EPC F & G Properties - V2.xlsx", index=False) def remove_text_in_brackets(address: str) -> str: @@ -196,7 +196,7 @@ def remove_duplicate_matches(matching_lookup, properties, company_ownership): matches_to_drop[["UPRN", "Title Number"]].copy() ) - to_drop = pd.concat(to_drop) + to_drop = pd.concat(to_drop) if to_drop else pd.DataFrame() if not to_drop.empty: merged = pd.merge(matching_lookup, to_drop, on=['UPRN', 'Title Number'], how='left', indicator=True) @@ -245,6 +245,44 @@ def remove_duplicate_uprn_matches(matching_lookup, properties, company_ownership return matching_lookup +def filter_land_registry(properties): + column_names = [ + "transaction_id", + "price", + "date_of_transfer", + "postcode", + "property_type", + "old_new", + "duration", + "paon", + "saon", + "street", + "locality", + "town_city", + "district", + "county", + "ppd_category_type", + "record_status", + ] + land_registry = pd.read_csv("/Users/khalimconn-kowlessar/Downloads/pp-complete.csv", header=None) + land_registry.columns = column_names + land_registry = land_registry[ + land_registry["postcode"].str.lower().isin(properties["POSTCODE"].str.lower().unique()) + ] + land_registry["date_of_transfer"] = pd.to_datetime( + land_registry["date_of_transfer"], format="%Y-%m-%d", errors="coerce" + ) + # Take data from the last 5 years + land_registry = land_registry[ + (land_registry["date_of_transfer"] >= "2019-01-01") + ] + + # Filter this + land_registry.to_csv( + "/Users/khalimconn-kowlessar/Downloads/land_registry_prices_paid_filtered.csv", index=False + ) + + def app(): """ This script is for scoping property ownership for EPC F & G rated properties in Birmingam, for Goldman Sachs @@ -293,17 +331,22 @@ def app(): # paths = list(set(paths)) # find_f_g_properties(paths) - properties = pd.read_excel("EPC F & G Properties.xlsx") - company_ownership = pd.read_csv("/Users/khalimconn-kowlessar/Downloads/CCOD_FULL_2024_04.csv") + properties = pd.read_excel("EPC F & G Properties - V2.xlsx") + # filter_land_registry(properties) + company_ownership = pd.read_csv("/Users/khalimconn-kowlessar/Downloads/CCOD_FULL_2024_07.csv") company_ownership["is_overseas"] = False - overseas_company_ownership = pd.read_csv("/Users/khalimconn-kowlessar/Downloads/OCOD_FULL_2024_04 2.csv") + overseas_company_ownership = pd.read_csv("/Users/khalimconn-kowlessar/Downloads/OCOD_FULL_2024_07.csv") overseas_company_ownership["is_overseas"] = True company_ownership = pd.concat([company_ownership, overseas_company_ownership]) # FIlter on relevant postcodes company_ownership = company_ownership[ - company_ownership["Postcode"].str.lower().isin(properties["POSTCODE"].str.lower().unique())] + company_ownership["Postcode"].str.lower().isin(properties["POSTCODE"].str.lower().unique()) + ] + + # Read in land registry + land_registry = pd.read_csv("/Users/khalimconn-kowlessar/Downloads/land_registry_prices_paid_filtered.csv") # Now we filter properties the other way around properties = properties[properties["POSTCODE"].str.lower().isin(company_ownership["Postcode"].str.lower().unique())] @@ -316,6 +359,8 @@ def app(): # Take the newest UPRN properties = properties.sort_values("LODGEMENT_DATE", ascending=False).drop_duplicates("UPRN") + # TODO: Do we want to filter properties based on lodgement dates? + # Remove entries where the address begins with the term "land adjoining", or other records that don't reference the # the property itself starting_terms = [ @@ -414,8 +459,8 @@ def app(): freehold_matching_lookup = pd.DataFrame(freehold_matching_lookup) leasehold_matching_lookup = pd.DataFrame(leasehold_matching_lookup) - shared_leasehold_match = pd.concat(shared_leasehold_match) - shared_freehold_match = pd.concat(shared_freehold_match) + # shared_leasehold_match = pd.concat(shared_leasehold_match) + # shared_freehold_match = pd.concat(shared_freehold_match) # freehold_matching_lookup.to_excel("freehold_matching_lookup_new.xlsx") # leasehold_matching_lookup.to_excel("leasehold_matching_lookup_new.xlsx") @@ -429,7 +474,9 @@ def app(): # Combine combined_matching_lookup = pd.concat([freehold_matching_lookup, leasehold_matching_lookup]) # Remove duplicates - combined_matching_lookup = remove_duplicate_matches(combined_matching_lookup, properties, company_ownership) + combined_matching_lookup = remove_duplicate_matches( + matching_lookup=combined_matching_lookup, properties=properties, company_ownership=company_ownership + ) # We also have duplicates at a UPRN level combined_matching_lookup = remove_duplicate_uprn_matches(combined_matching_lookup, properties, company_ownership) @@ -457,11 +504,13 @@ def app(): # leasehold_matching_lookup = pd.read_excel("leasehold_matching_lookup.xlsx") # shared_leasehold_match = pd.read_excel("shared_leasehold_match.xlsx") - freehold_aggregate = aggregate_matches(freehold_matching_lookup, company_ownership, properties) - leasehold_aggregate = aggregate_matches(leasehold_matching_lookup, company_ownership, properties) + # freehold_aggregate = aggregate_matches(freehold_matching_lookup, company_ownership, properties) + # leasehold_aggregate = aggregate_matches(leasehold_matching_lookup, company_ownership, properties) combined_aggregate = aggregate_matches( - combined_matching_lookup, company_ownership, properties + matching_lookup=combined_matching_lookup, + company_ownership=company_ownership, + properties=properties ) investment_20m = combined_aggregate[combined_aggregate["cumulative_value"] <= 20_500_000] diff --git a/etl/xml_survey_extraction/XmlParser.py b/etl/xml_survey_extraction/XmlParser.py index 478891bf..90a51ae6 100644 --- a/etl/xml_survey_extraction/XmlParser.py +++ b/etl/xml_survey_extraction/XmlParser.py @@ -546,6 +546,8 @@ class XmlParser: county = property_tag.getElementsByTagName("County") if county: county = county[0].firstChild.nodeValue + else: + county = "" # Seems to be unavailable in the xml constituency = None diff --git a/etl/xml_survey_extraction/app.py b/etl/xml_survey_extraction/app.py index eea030e5..0cb95332 100644 --- a/etl/xml_survey_extraction/app.py +++ b/etl/xml_survey_extraction/app.py @@ -1,7 +1,7 @@ from backend.app.db.functions.energy_assessment_functions import bulk_insert_energy_assessments from sqlalchemy.orm import sessionmaker from backend.app.db.connection import db_engine -from utils.s3 import read_from_s3, list_files_and_subfolders_in_s3_folder, list_xmls_in_s3_folder +from utils.s3 import read_from_s3, list_files_and_subfolders_in_s3_folder, list_xmls_in_s3_folder, save_csv_to_s3 from utils.logger import setup_logger from etl.xml_survey_extraction.XmlParser import XmlParser import os @@ -70,6 +70,11 @@ def main(): data_to_update = { **extracted_epc, **extracted_additional_data } + + # We need to update the keys to match the database schema - i.e. we should replace all hyphens with + # underscores + data_to_update = {k.replace("-", "_"): v for k, v in data_to_update.items()} + extracted_data.update(data_to_update) database_data.append(extracted_data)