diff --git a/asset_list/AssetList.py b/asset_list/AssetList.py index ad3087c3..3f5b99cb 100644 --- a/asset_list/AssetList.py +++ b/asset_list/AssetList.py @@ -1104,7 +1104,7 @@ class AssetList: num_floors=x[self.ATTRIBUTE_NUMBER_OF_FLOORS], floor_height=( float(x[self.EPC_API_DATA_NAMES["floor-height"]]) if - x[self.EPC_API_DATA_NAMES["floor-height"]] else 2.5 + not pd.isnull(x[self.EPC_API_DATA_NAMES["floor-height"]]) else 2.5 ), perimeter=x[self.ATTRIBUTE_ESTIMATED_PERIMETER], built_form=x[self.EPC_API_DATA_NAMES["built-form"]] diff --git a/backend/app/assumptions.py b/backend/app/assumptions.py index 7dd986ed..08bd6b6b 100644 --- a/backend/app/assumptions.py +++ b/backend/app/assumptions.py @@ -63,6 +63,7 @@ DESCRIPTIONS_TO_FUEL_TYPES = { 'Room heaters, electric, Boiler and radiators, mains gas': {"fuel": "Natural Gas", "cop": 0.85}, 'Boiler and radiators, mains gas, Boiler and radiators, mains gas': {"fuel": "Natural Gas", "cop": 0.85}, 'Room heaters, electric, Electric storage heaters': {"fuel": "Electricity", "cop": 1}, + "Boiler and radiators, mains gas, Electric storage heaters": {"fuel": "Natural Gas", "cop": 0.85}, } # These are the measure types where if there is a ventilation recommendation, we force the inclusion of it diff --git a/etl/customers/acis/solid_wall_funding.py b/etl/customers/acis/solid_wall_funding.py new file mode 100644 index 00000000..5515b29c --- /dev/null +++ b/etl/customers/acis/solid_wall_funding.py @@ -0,0 +1,144 @@ +import os +import pandas as pd +import numpy as np +from dotenv import load_dotenv +from etl.find_my_epc.AssetListEpcData import AssetListEpcData +from backend.Funding import Funding +from backend.app.utils import sap_to_epc +from recommendations.recommendation_utils import estimate_external_wall_area + +load_dotenv(dotenv_path="backend/.env") +EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") + +abs_matrix = pd.read_csv( + "/Users/khalimconn-kowlessar/Downloads/ECO4 Full Project Scores Matrix.csv" +) +pps_matrix = pd.read_excel( + "/Users/khalimconn-kowlessar/Downloads/ECO4 Partial Project Scores Matrix v5.xlsx", + header=1 +) +pps_matrix.columns = [c.strip() for c in pps_matrix.columns] + +asset_list = pd.read_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/Customers/ACIS/Solid Wall Properties - Standardised_2.xlsx", + sheet_name="Standardised Asset List" +) + +asset_list = asset_list.rename( + columns={"domna_address_1": "address", "domna_postcode": "postcode"} +) +asset_list["address"] = asset_list["address"].astype(str) + +# Pull the find my EPC data and get the SAP points for solid wall +asset_list_epc_client = AssetListEpcData( + asset_list=asset_list, + epc_auth_token=EPC_AUTH_TOKEN +) +asset_list_epc_client.get_data() +asset_list_epc_client.get_non_invasive_recommendations() +# We pull out solid wall insulation +solid_wall_sap_points = [] +for r in asset_list_epc_client.non_invasive_recommendations: + solid_recommendations = [ + x for x in r["recommendations"] if ("internal_wall_insulation" in x["type"]) or ( + "external_wall_insulation" in x["type"] + ) + ] + if solid_recommendations: + solid_recommendations = solid_recommendations[0] + else: + continue + + address = r["address"] + postcode = r["postcode"] + + solid_wall_sap_points.append( + { + "address": address, + "postcode": postcode, + "sap_points": solid_recommendations["sap_points"] + } + ) + +solid_wall_sap_points = pd.DataFrame(solid_wall_sap_points) +avg_points = solid_wall_sap_points["sap_points"].median() + +asset_list = asset_list.merge(solid_wall_sap_points, how="left", on=["address", "postcode"]) +asset_list["sap_points"] = asset_list["sap_points"].fillna(avg_points) +asset_list["post_works_sap"] = asset_list["epc_sap_score_on_register"] + asset_list["sap_points"] +asset_list["post_works_epc"] = asset_list["post_works_sap"].apply(lambda x: sap_to_epc(x)) +asset_list["starting_half_band"] = asset_list["epc_sap_score_on_register"].apply(lambda x: Funding.get_sap_band(x)) +asset_list["ending_half_band"] = asset_list["post_works_sap"].apply(lambda x: Funding.get_sap_band(x)) +asset_list["floor_area_band"] = asset_list["epc_total_floor_area"].apply(lambda x: Funding.get_floor_area_band(x)) + +asset_list["funding_scheme"] = np.where( + ( + (asset_list["post_works_epc"] == asset_list["epc_rating_on_register"]) + ), + "GBIS", + "ECO4" +) + +# Merge on the ABS matrix +asset_list = asset_list.merge( + abs_matrix, how="left", left_on=["starting_half_band", "ending_half_band", "floor_area_band"], + right_on=['Starting Band', 'Finishing Band', 'Floor Area Segment', ] +) +asset_list = asset_list.drop(columns=['Starting Band', 'Finishing Band', 'Floor Area Segment']) + +# store for backup +# asset_list.to_csv( +# "/Users/khalimconn-kowlessar/Documents/hestia/Customers/ACIS/Solid Wall Properties - +# Standardised_2_with_funding.csv", +# index=False +# ) + +# For GBIS, we use the PPS +# Almost all properties are gas + +# Using IWI solid 1.7 -> 0.3 rates +pps_matrix = pps_matrix[ + pps_matrix["Measure_Type"].isin(["IWI_solid_1.7_0.3"]) +] + +# Merge on +asset_list = asset_list.merge( + pps_matrix[['Starting Band', 'Total Floor Area Band', 'Cost Savings']].rename( + columns={ + "Cost Savings": "partial_project_score", + "Starting Band": "starting_half_band", + "Total Floor Area Band": "floor_area_band" + } + ), + how="left", + on=["starting_half_band", "floor_area_band"], +) +asset_list["partial_project_score"] = np.where( + asset_list["starting_half_band"].isin(["Low_C", "High_C"]), + None, + asset_list["partial_project_score"] +) + +asset_list["funding_abs"] = np.where( + asset_list["funding_scheme"] == "GBIS", + asset_list["partial_project_score"], + asset_list["Cost Savings"] +) + +asset_list["heat_loss_area"] = asset_list.apply( + lambda x: estimate_external_wall_area( + num_floors=x["attribute_est_number_floors"], + floor_height=( + float(x["epc_floor_height"]) if + not pd.isnull(x["epc_floor_height"]) else 2.5 + ), + perimeter=x["attribute_est_perimter"], + built_form=x["epc_archetype"] + ), + axis=1 +) + +filename = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/ACIS/20250624 ACIS solid wall - standardised.xlsx" + +with pd.ExcelWriter(filename) as writer: + asset_list.to_excel(writer, sheet_name="Standardised Asset List", index=False) diff --git a/etl/find_my_epc/AssetListEpcData.py b/etl/find_my_epc/AssetListEpcData.py index f085c8fb..9845ee3b 100644 --- a/etl/find_my_epc/AssetListEpcData.py +++ b/etl/find_my_epc/AssetListEpcData.py @@ -27,6 +27,7 @@ class AssetListEpcData: self.extracted_data = None self.non_invasive_recommendations = None self.patches = None + self.epc_data = None @staticmethod def check_asset_list(asset_list): @@ -74,7 +75,9 @@ class AssetListEpcData: # Pull the additional data extracted_data = [] + epc_data = [] for _, home in tqdm(self.asset_list.iterrows(), total=len(self.asset_list)): + add1 = home["address"] pc = home["postcode"] # Retrieve the EPC data @@ -92,9 +95,6 @@ class AssetListEpcData: if epc_searcher.newest_epc is None: continue - if not pd.isnull(home.get("patch")): - epc_searcher.newest_epc["address1"] = add1 - # Attempt both methods: try: find_epc_searcher = RetrieveFindMyEpc( @@ -104,6 +104,8 @@ class AssetListEpcData: find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data() except Exception as e: logger.error(f"Error retrieving find my epc data: {e}") + if not pd.isnull(home.get("patch")): + epc_searcher.newest_epc["address1"] = add1 find_epc_searcher = RetrieveFindMyEpc( address=epc_searcher.newest_epc["address1"], postcode=epc_searcher.newest_epc["postcode"] @@ -113,7 +115,7 @@ class AssetListEpcData: # We need uprn to_append = { - "uprn": home.get("uprn"), + "uprn": home.get("uprn", epc_searcher.newest_epc["uprn"]), "address": home["address"], "postcode": home["postcode"], **find_epc_data, @@ -128,6 +130,8 @@ class AssetListEpcData: } extracted_data.append(to_append) + epc_data.append(epc_searcher.newest_epc) self.extracted_data = extracted_data + self.epc_data = epc_data logger.info("Data Extrction complete") diff --git a/etl/find_my_epc/RetrieveFindMyEpc.py b/etl/find_my_epc/RetrieveFindMyEpc.py index fad0c78e..d4092fe7 100644 --- a/etl/find_my_epc/RetrieveFindMyEpc.py +++ b/etl/find_my_epc/RetrieveFindMyEpc.py @@ -1,3 +1,4 @@ +import time import re import pandas as pd import requests @@ -125,6 +126,243 @@ class RetrieveFindMyEpc: return results + def _extract_epc_from_soup(self, soup, epc_certificate, sap_2012_date=None): + + ratings = soup.find('desc', {'id': 'svg-desc'}).text + current_rating = ratings.split(".")[0] + potential_rating = ratings.split(".")[1] + current_sap = int(current_rating.split(' ')[-1]) + + # Retrieve the energy consumption + bills = soup.find('div', {'id': 'bills-affected'}) + bills_list = bills.find_all('li') + if not bills_list: + # If this is the case, it's usually becaue the EPC was very old. Early EPCs did not have this information + heating_text = None + hot_water_text = None + else: + heating_text = bills_list[0].text + hot_water_text = bills_list[1].text + + # Retrieve the recommendations and SAP points + recommendations = [] + recommendations_div = soup.find('div', class_='epb-recommended-improvements') + if recommendations_div: + # Find all h3 headers for each step and extract their related information + step_headers = recommendations_div.find_all('h3', class_='govuk-heading-m') + previous_sap_score = current_sap + previous_epc = current_rating.split(' ')[-6] + for step_num, step_header in enumerate(step_headers, start=1): + # Extract the step title (the measure) + measure_title = step_header.text.strip().replace(f"Step {step_num}: ", "") + + # Find the div containing the potential rating within the same section + potential_rating_div = step_header.find_next( + 'div', class_='epb-recommended-improvements__potential-rating' + ) + + # Check if the potential rating div is found + if potential_rating_div: + # Extract the rating text within the SVG text element + extracted_rating_text = potential_rating_div.find('text', class_='govuk-!-font-weight-bold') + if extracted_rating_text is not None: + rating_text = extracted_rating_text.text.strip() + else: + rating_text = " ".join([str(previous_sap_score), previous_epc]) + # Parse the rating text to separate the numeric rating and EPC letter + new_rating = int(rating_text.split()[0]) + new_epc = rating_text.split()[1] + + # Append the information as a dictionary to the recommendations list + recommendations.append({ + "step": step_num, + "measure": measure_title, + "new_rating": new_rating, + "new_epc": new_epc, + "sap_points": new_rating - previous_sap_score + }) + previous_sap_score = new_rating + previous_epc = new_epc + + # Search for the assessment informaton + assessment_information = soup.find('div', {'id': 'information'}) + # Parse this information + rows = assessment_information.find_all('div', class_='govuk-summary-list__row') + # Create a dictionary to hold the parsed information + assessment_data = {} + for row in rows: + key = row.find('dt').text.strip() + if key == "Type of assessment": + # We dont reliably extract this + continue + value_tag = row.find('dd') + + # Check if value contains a link (email) + if value_tag.find('a'): + value = value_tag.find('a').text.strip() + elif value_tag.find('summary'): + value = value_tag.find('span').text.strip() + else: + value = value_tag.text.strip() + + # These are keys that we have for both the surveyor and the acreditation scheme. Firstly, we'll + # get the surveyor's name and email so we make that information clear + if key in ["Telephone", "Email"]: + if "Assessor's " + key not in assessment_data: + assessment_data["Assessor's " + key] = value + else: + assessment_data["Accreditation Scheme's " + key] = value + continue + + assessment_data[key] = value + + expected_keys = [ + 'Assessor’s name', + "Assessor's Telephone", + "Assessor's Email", + 'Assessor’s ID', + 'Accreditation scheme', + 'Assessor’s declaration', + "Accreditation Scheme's Telephone", + "Accreditation Scheme's Email", + 'Date of assessment', + 'Date of certificate' + ] + # Check we have all the expected keys + for key in expected_keys: + if key not in assessment_data: + raise ValueError(f"Missing key: {key}") + + # The wall types of the property + property_features_table = soup.find("tbody", class_="govuk-table__body") + property_features_table = property_features_table.find_all("tr") + + # Extract wall types + self.walls = [] + for row in property_features_table: + cells = row.find_all("td") + if row.find("th").text.strip() == "Wall": + self.walls.append(cells[0].text.strip()) + + # Finally, we format the recommendations + recommendations = self.format_recommendations(recommendations, assessment_data, sap_2012_date) + + # 4) Low and zero carbon energy sources + low_carbon_energy_sources = self.extract_low_carbon_sources(soup) + + # 5) Pull out the EPC data + epc_data = self.extract_epc_data(soup) + + resulting_data = { + 'epc_certificate': epc_certificate, + 'current_epc_rating': current_rating.split(' ')[-6], + 'current_epc_efficiency': current_sap, + 'potential_epc_rating': potential_rating.split(' ')[-6], + "potential_epc_efficiency": int(potential_rating.split(' ')[-1]), + "heating_text": heating_text, + "hot_water_text": hot_water_text, + "recommendations": recommendations, + "epc_data": epc_data, + **assessment_data, + **low_carbon_energy_sources, + } + + return resulting_data + + def retrieve_all_find_my_epc_data(self, sap_2012_date=None): + + """ + This is a quick function to retrieve all the data from the find my epc website for a given postcode and address. + Using this to fulfill a short term need to retrieve all history for a property + :param sap_2012_date: + :return: + """ + + postcode_input = self.postcode.replace(" ", "+") + postcode_search = self.SEARCH_POSTCODE_URL.format(postcode_input=postcode_input) + postcode_response = requests.get(postcode_search, headers=self.HEADERS) + + postcode_res = BeautifulSoup(postcode_response.text, features="html.parser") + rows = postcode_res.find_all('tr', class_='govuk-table__row') + + extracted_table = [] + for row in rows: + # Extract the address and URL + address_tag = row.find('a', class_='govuk-link') + if address_tag is None: + continue + extracted_address = None + extracted_address_url = None + if address_tag: + extracted_address = address_tag.text.strip() + extracted_address_url = address_tag['href'] + + extracted_address_cleaned = extracted_address.replace(",", "").replace(" ", "").lower() + if not extracted_address_cleaned.startswith(self.address_cleaned): + continue + + # If the address is a match, we can extract the data + + # Extract the expiry date + expiry_date_tag = row.find('td', class_='govuk-table__cell date') + expiry_date = None + if expiry_date_tag is not None: + expiry_date = expiry_date_tag.parent.find('span').text.strip() + + extracted_table.append( + { + "extracted_address": extracted_address, + "extracted_address_url": extracted_address_url, + "expiry_date": datetime.strptime(expiry_date, '%d %B %Y'), + } + ) + + if not extracted_table: + raise ValueError("No EPC found") + + if len(extracted_table) > 1: + # We take the one with the most recent expiry date + extracted_table = sorted(extracted_table, key=lambda x: x['expiry_date'], reverse=True) + + chosen_epc = self.BASE_ENERGY_URL + extracted_table[0]['extracted_address_url'] + epc_certificate = chosen_epc.split('/')[-1] + + address_response = requests.get(chosen_epc, headers=self.HEADERS) + address_res = BeautifulSoup(address_response.text, features="html.parser") + + # We check the section on "Other cerificates for this property and get the url" + # Find the section for other certificates + other_cert_section = address_res.find('div', id='other_certificates_and_reports') + + # Extract all certificate number rows (anchor tags within a govuk-summary-list) + other_cert_links = other_cert_section.select('dd.govuk-summary-list__value a') + + other_certificates = [] + for link in other_cert_links: + cert_number = link.text.strip() + cert_url = link['href'].strip() + other_certificates.append({ + "certificate_number": cert_number, + "certificate_url": f"https://find-energy-certificate.service.gov.uk{cert_url}" + }) + + # Always include the currently selected EPC first + soup_list = [address_res] + + # Add additional historic certificates + for link in other_cert_links: + cert_url = f"https://find-energy-certificate.service.gov.uk{link['href'].strip()}" + response = requests.get(cert_url, headers=self.HEADERS) + time.sleep(0.3) + soup_list.append(BeautifulSoup(response.text, features="html.parser")) + + all_find_my_epc_data = [] + for soup in soup_list: + # Start with the primary one + all_find_my_epc_data.append(self._extract_epc_from_soup(soup, epc_certificate, sap_2012_date)) + + return all_find_my_epc_data + def retrieve_newest_find_my_epc_data(self, sap_2012_date=None): """ For a post code and address, we pull out all the required data from the find my epc website @@ -195,9 +433,6 @@ class RetrieveFindMyEpc: potential_rating = ratings.split(".")[1] current_sap = int(current_rating.split(' ')[-1]) - # Floor area - address_res.find() - # Retrieve the energy consumption bills = address_res.find('div', {'id': 'bills-affected'}) bills_list = bills.find_all('li') diff --git a/recommendations/Costs.py b/recommendations/Costs.py index ee384b22..0ef37add 100644 --- a/recommendations/Costs.py +++ b/recommendations/Costs.py @@ -194,7 +194,7 @@ class Costs: IWI_CONTINGENCY = 0.2 # For air source heat pumps, we inflate the assume cost by quite a bit to account for design and installation - ASHP_CONTINGENCY = 0.35 + ASHP_CONTINGENCY = 0.25 # Where there is more uncertainty, a higher contingency rate is used HIGH_RISK_CONTINGENCY = 0.2 # When there is less uncertainty, a lower contingency rate is used diff --git a/recommendations/HeatingRecommender.py b/recommendations/HeatingRecommender.py index 576b545b..9d1a094e 100644 --- a/recommendations/HeatingRecommender.py +++ b/recommendations/HeatingRecommender.py @@ -517,17 +517,30 @@ class HeatingRecommender: ] # This is a map from the heating controls description to the description of the air source heat pump set up - ashp_descriptions = { - "Time and temperature zone control": ( - f"Install a {ashp_size}KW air source heat pump, and upgrade heating controls to Smart Thermostats, " - "room sensors and smart radiator valves (time & temperature zone control). Ensure you have an 18 or " - "24 hour tariff" - ), - "Programmer, TRVs and bypass": ( - f"Install a {ashp_size}KW air source heat pump, with programmer, TRVs and a Bypass valve. Ensure you " - "have an 18 or 24 hour tariff" - ), - } + if ashp_size is None: + ashp_descriptions = { + "Time and temperature zone control": ( + f"Install two cascaded air source heat pumps, and upgrade heating controls to Smart Thermostats, " + "room sensors and smart radiator valves (time & temperature zone control). Ensure you have an 18 " + "or " + "24 hour tariff" + ) + } + else: + + ashp_descriptions = { + "Time and temperature zone control": ( + f"Install a {ashp_size}KW air source heat pump, and upgrade heating controls to Smart Thermostats, " + "room sensors and smart radiator valves (time & temperature zone control). Ensure you have an 18 " + "or " + "24 hour tariff" + ), + "Programmer, TRVs and bypass": ( + f"Install a {ashp_size}KW air source heat pump, with programmer, TRVs and a Bypass valve. Ensure " + f"you " + "have an 18 or 24 hour tariff" + ), + } new_heating_description = "Air source heat pump, radiators, electric" new_hot_water_description = "From main system" diff --git a/recommendations/RoofRecommendations.py b/recommendations/RoofRecommendations.py index fa8b831c..31ac2433 100644 --- a/recommendations/RoofRecommendations.py +++ b/recommendations/RoofRecommendations.py @@ -191,11 +191,22 @@ class RoofRecommendations: non_invasive_recommendations = self.property.non_invasive_recommendations + # We check a specific condition - which will imply loft insulation isn't appropriate but room in roof + # insulation is + # 1) We have an uninsulated loft (assumed) + # 2) We have a non-intrusive recommendation for room in roof insulation + + rir_over_loft = ( + self.property.roof["is_pitched"] and + self.property.roof["insulation_thickness"] == "none" and + "room_in_roof_insulation" in [x["type"] for x in non_invasive_recommendations] + ) + # We firstly handle non-intrusive recommendations, which may override the normal roof insulation recommendations if ("loft_insulation" in [x["type"] for x in non_invasive_recommendations]) or ( self.property.roof["is_pitched"] and "loft_insulation" in measures and not self.property.roof["is_at_rafters"] - ): + ) and not rir_over_loft: self.recommend_roof_insulation( u_value=u_value, insulation_thickness=self.insulation_thickness, @@ -223,7 +234,8 @@ class RoofRecommendations: # There are cases where the property might have a room roof as the second roof, but we have a recommendation for # it, so we allow this override if self.property.roof["is_roof_room"] and ("room_roof_insulation" in measures) or ( - "room_roof_insulation" in [x["type"] for x in non_invasive_recommendations] + "room_roof_insulation" in [x["type"] for x in non_invasive_recommendations] or + rir_over_loft ): self.recommend_room_roof_insulation(u_value, phase, default_u_values) return @@ -502,7 +514,7 @@ class RoofRecommendations: # and the cost of the materials rir_non_invasive_recommendation = next( - (x for x in self.property.non_invasive_recommendations if x["type"] == "room_roof_insulation"), {} + (x for x in self.property.non_invasive_recommendations if x["type"] == "room_in_roof_insulation"), {} ) insulation_materials = pd.DataFrame(self.room_roof_insulation_materials) diff --git a/recommendations/VentilationRecommendations.py b/recommendations/VentilationRecommendations.py index 31c4d023..05113acf 100644 --- a/recommendations/VentilationRecommendations.py +++ b/recommendations/VentilationRecommendations.py @@ -31,7 +31,7 @@ class VentilationRecommendations(Definitions): """ self.property.identify_ventilation() - if self.property.has_ventilaion: + if self.property.has_ventilation: return if len(self.materials) != 1: diff --git a/sfr/principal_pitch/0_prepare_sample.py b/sfr/principal_pitch/0_prepare_sample.py index dc1f0a1f..bcab16b9 100644 --- a/sfr/principal_pitch/0_prepare_sample.py +++ b/sfr/principal_pitch/0_prepare_sample.py @@ -4,6 +4,7 @@ data, we know it will work. """ import pandas as pd +from utils.s3 import read_csv_from_s3 birmingham_epcs = pd.read_csv( "/Users/khalimconn-kowlessar/Documents/hestia/sfr/Spring JV/domestic-E08000025-Birmingham/certificates.csv" @@ -17,6 +18,29 @@ birmingham_epcs = birmingham_epcs.sort_values( ascending=[True, False] ).drop_duplicates(subset='UPRN') +birmingham_epcs["postal_region"] = birmingham_epcs["POSTCODE"].str.split(" ").str[0] + +addressable_market = birmingham_epcs[ + (birmingham_epcs['CURRENT_ENERGY_RATING'].isin(['F', 'G', 'E'])) & + (birmingham_epcs['LODGEMENT_DATE'] >= '2020-01-01') & + (birmingham_epcs['PROPERTY_TYPE'].isin(['House', 'Bungalow'])) & + (birmingham_epcs['TENURE'].isin( + ['rental (private)', 'Rented (private)'] + )) + ] + +# We take the Spring portfolio and remove the properties in their sample +asset_list = read_csv_from_s3(bucket_name="retrofit-plan-inputs-dev", filepath='8/206/asset_list.csv') +asset_list = pd.DataFrame(asset_list) +asset_list["postal_region"] = asset_list["postcode"].str.split(" ").str[0] + +addressable_market = addressable_market[ + ~addressable_market["UPRN"].astype(int).astype(str).isin(asset_list["uprn"].values) +] +addressable_market = addressable_market[ + addressable_market["postal_region"].isin(asset_list["postal_region"].unique()) +] + # Take a sample of properties, EPC F or G, EPC lodged in 2025. We focus on houses/bingalows sample = birmingham_epcs[ (birmingham_epcs['CURRENT_ENERGY_RATING'].isin(['F', 'G'])) & diff --git a/sfr/principal_pitch/1_prepare_data.py b/sfr/principal_pitch/1_prepare_data.py index 43002b88..53969ec9 100644 --- a/sfr/principal_pitch/1_prepare_data.py +++ b/sfr/principal_pitch/1_prepare_data.py @@ -16,8 +16,23 @@ EPC_TARGET = "C" # Read the input file properties = pd.read_excel( - "/Users/khalimconn-kowlessar/Documents/hestia/sfr/Spring JV/birmingham_sample.xlsx" + "/Users/khalimconn-kowlessar/Documents/hestia/sfr/Spring JV/Birmingham_price_top300.xlsx" ) +# Keep just the D's and below +properties = properties[properties["current_energy_rating"].isin(["D", "E", "F", "G"])].copy() +# Focus on houses +properties = properties[properties["property_type_std"] != "Flat"] +properties = properties[properties["property_type"] != "flat"] + +# Rename the key columns +properties = properties.rename( + columns={ + "address1": "address", + "number_of_bathrooms": "n_bathrooms", + "num_beds": "n_bedrooms" + } +) +properties["patch"] = True # Pull the non-invasive recommendations asset_list_epc_client = AssetListEpcData( @@ -27,7 +42,39 @@ asset_list_epc_client = AssetListEpcData( asset_list_epc_client.get_data() asset_list_epc_client.get_non_invasive_recommendations() asset_list_epc_client.get_patch() -# TODO; Find some new, on-market opportunities that aren't on the EPC API, so we definitely have a patch + +extracted_df = pd.DataFrame(asset_list_epc_client.extracted_data) +epc_df = pd.DataFrame(asset_list_epc_client.epc_data) + +# Find examples where patches are different to the api +compare_epc = [] +for patch in asset_list_epc_client.patches: + extracted = extracted_df[extracted_df["uprn"] == patch["uprn"]].squeeze() + epc = epc_df[epc_df["uprn"] == patch["uprn"]].squeeze() + compare_epc.append( + { + "uprn": extracted["uprn"], + "address": extracted["address"], + "postcode": extracted["postcode"], + "api_epc": int(extracted["current_epc_efficiency"]), + "fme_epc": int(epc["current-energy-efficiency"]), + } + ) +compare_epc = pd.DataFrame(compare_epc) +diff = compare_epc[compare_epc["api_epc"] != compare_epc["fme_epc"]] +# Compare matched addresses to make sure they are the same +compare_addresses = extracted_df[["address", "postcode", "uprn"]].merge( + epc_df[["uprn", "address1", "postcode"]].rename(columns={"address1": "epc_address1", "postcode": "epc_postcode"}), + how="left", + on=["uprn"] +) + +# Add on uprn +properties = properties.merge( + extracted_df[["address", "postcode", "uprn"]], + how="left", + on=["address", "postcode"] +) # Store the asset list in s3 filename = f"{USER_ID}/{PORTFOLIO_ID}/asset_list.csv" diff --git a/sfr/principal_pitch/2_export_data.py b/sfr/principal_pitch/2_export_data.py index e69de29b..5660b78d 100644 --- a/sfr/principal_pitch/2_export_data.py +++ b/sfr/principal_pitch/2_export_data.py @@ -0,0 +1,224 @@ +""" +This script prepares the data for the financial model +""" + +import pandas as pd +from backend.app.utils import sap_to_epc +from sqlalchemy.orm import sessionmaker +from backend.app.db.connection import db_engine +from backend.app.db.models.recommendations import Recommendation, Plan, PlanRecommendations +from backend.app.db.models.portfolio import PropertyModel, PropertyDetailsEpcModel + +PORTFOLIO_ID = 206 +SCENARIOS = [389] + + +def get_data(portfolio_id, scenario_ids): + session = sessionmaker(bind=db_engine)() + session.begin() + + # Get properties and their details for a specific portfolio + properties_query = session.query( + PropertyModel, + PropertyDetailsEpcModel + ).join( + PropertyDetailsEpcModel, PropertyModel.id == PropertyDetailsEpcModel.property_id + ).filter( + PropertyModel.portfolio_id == portfolio_id # Filter by portfolio ID + ).all() + + # Transform properties data to include all fields dynamically + properties_data = [ + {**{col.name: getattr(prop.PropertyModel, col.name) for col in PropertyModel.__table__.columns}, + **{col.name: getattr(prop.PropertyDetailsEpcModel, col.name) for col in + PropertyDetailsEpcModel.__table__.columns}} + for prop in properties_query + ] + + # Get property IDs from fetched properties + + # Get plans linked to the fetched properties + plans_query = session.query(Plan).filter(Plan.scenario_id.in_(scenario_ids)).all() + + # Transform plans data to include all fields dynamically + plans_data = [ + {col.name: getattr(plan, col.name) for col in Plan.__table__.columns} + for plan in plans_query + ] + + # Extract plan IDs for filtering recommendations through PlanRecommendations + plan_ids = [plan['id'] for plan in plans_data] + + # Get recommendations through PlanRecommendations for those plans and that are default + recommendations_query = session.query( + Recommendation, + Plan.scenario_id + ).join( + PlanRecommendations, Recommendation.id == PlanRecommendations.recommendation_id + ).join( + Plan, Plan.id == PlanRecommendations.plan_id # Join with Plan to access scenario_id + ).filter( + PlanRecommendations.plan_id.in_(plan_ids), + Recommendation.default == True # Filtering for default recommendations + ).all() + + # Transform recommendations data to include all fields dynamically and include scenario_id + recommendations_data = [ + {**{col.name: getattr(rec.Recommendation, col.name) if hasattr(rec, 'Recommendation') else getattr(rec, + col.name) for + col in Recommendation.__table__.columns}, + "Scenario ID": rec.scenario_id} + for rec in recommendations_query + ] + + session.close() + + return properties_data, plans_data, recommendations_data + + +properties_data, plans_data, recommendations_data = get_data(portfolio_id=PORTFOLIO_ID, scenario_ids=SCENARIOS) + +properties_df = pd.DataFrame(properties_data) +plans_df = pd.DataFrame(plans_data) +recommendations_df = pd.DataFrame(recommendations_data) + +recommended_measures_df = recommendations_df[ + ["property_id", "measure_type", "estimated_cost", "default"] +] +recommended_measures_df = recommended_measures_df[recommended_measures_df["default"]] +recommended_measures_df = recommended_measures_df.drop(columns=["default"]) + +post_install_sap = recommendations_df[["property_id", "default", "sap_points"]] +post_install_sap = post_install_sap[post_install_sap["default"]] +# Sum up the sap points by property id +post_install_sap = post_install_sap.groupby("property_id")[["sap_points"]].sum().reset_index() + +recommendations_measures_pivot = recommended_measures_df.pivot( + index='property_id', + columns='measure_type', + values='estimated_cost' +) +recommendations_measures_pivot = recommendations_measures_pivot.reset_index() + +# Total cost is the row sum, excluding the property_id column +recommendations_measures_pivot["total_retrofit_cost"] = recommendations_measures_pivot.drop( + columns=["property_id"] +).sum(axis=1) + +df = properties_df[ + [ + "property_id", "uprn", "address", "postcode", "property_type", "walls", "roof", "heating", "windows", + "current_epc_rating", + "current_sap_points", "total_floor_area", "number_of_rooms", + ] +].merge( + recommendations_measures_pivot, how="left", on="property_id" +).merge( + post_install_sap, how="left", on="property_id" +) + +df = df.drop(columns=["property_id"]) +df["sap_points"] = df["sap_points"].fillna(0) + +df["predicted_post_works_sap"] = df["current_sap_points"] + df["sap_points"] +df["predicted_post_works_sap"] = df["predicted_post_works_sap"].round() +df["predicted_post_works_epc"] = df["predicted_post_works_sap"].apply(lambda x: sap_to_epc(x)) + +# We merge this back to the main dataframe, which will contain the bathrooms +from utils.s3 import read_csv_from_s3 + +asset_list = read_csv_from_s3(bucket_name="retrofit-plan-inputs-dev", filepath='8/206/asset_list.csv') +asset_list = pd.DataFrame(asset_list) +df["uprn"] = df["uprn"].astype(str) +asset_list = asset_list.merge( + df.drop(columns=["address", "postcode", "property_type", "total_floor_area"]), + how="left", + on="uprn" +) + +condition_costs = pd.read_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/sfr/Spring JV/Condition costs.xlsx", + sheet_name="Prices - Khalim", + header=35 +) +# Remove unnamed columns and reset index +condition_costs = condition_costs.loc[:, ~condition_costs.columns.str.contains('^Unnamed')] +condition_costs = condition_costs.reset_index(drop=True) + + +# We now estimate condition cost +def simulate_condition(asset_list, condition_costs): + """ + This function is for testing, and will simulate condition cost from 1-10 for each property to see what the + costing array looks like. + :param df: + :return: + """ + + condition_df = [] + for _, row in asset_list.iterrows(): + + n_bathrooms = row["bathrooms"] + + conditions = {} + for condition in reversed(range(1, 11)): + condition_cost = condition_costs[ + condition_costs["Condition"] == condition + ].drop(columns=["Condition"]).iloc[0] + + # Each cost is scaled by floor area + condition_cost = condition_cost * row["total_floor_area"] + condition_cost["Bathroom"] = condition_cost["Bathroom"] * n_bathrooms + + total_condition_cost = condition_cost.sum() + conditions["Condition " + str(condition)] = (total_condition_cost) + + condition_df.append( + { + "uprn": row["uprn"], + **conditions + } + ) + + condition_df = pd.DataFrame(condition_df) + + asset_list = asset_list.merge( + condition_df, + how="left", + on="uprn" + ) + + return asset_list + + +# asset_list = simulate_condition(asset_list, condition_costs) + +# We calculate the condition cost based on the condition +for _, row in asset_list.iterrows(): + + condition = row["condition_score"] + if condition in [None, ""]: + continue + condition = int(float(condition)) + + condition_cost = condition_costs[ + condition_costs["Condition"] == condition + ].drop(columns=["Condition"]).iloc[0] + + # Each cost is scaled by floor area + condition_cost = condition_cost * float(row["total_floor_area"]) + n_bathrooms = row["n_bathrooms"] + condition_cost["Bathroom"] = condition_cost["Bathroom"] * float(n_bathrooms) + + total_condition_cost = condition_cost.sum() + asset_list.loc[asset_list["uprn"] == row["uprn"], "domna_condition_cost"] = total_condition_cost + +# Store output +asset_list.to_excel( + "/Users/khalimconn-kowlessar/Documents/hestia/sfr/Spring JV/20250624_portfolio_retrofit_packages.xlsx", + index=False +) + +condition_cost_comparison = asset_list[ + ["condition_score", "decoration_sum_min ", "decoration_sum_max", "domna_condition_cost"] +]