diff --git a/backend/ml_models/Valuation.py b/backend/ml_models/Valuation.py index 720005d3..6d4852b2 100644 --- a/backend/ml_models/Valuation.py +++ b/backend/ml_models/Valuation.py @@ -1,5 +1,4 @@ import numpy as np -from scipy.constants import value class PropertyValuation: @@ -216,6 +215,30 @@ class PropertyValuation: cls.UPRN_VALUE_LOOKUP.get(property_instance.uprn) ) + current_epc = property_instance.data["current-energy-rating"] + + if not current_value: + return { + "current_value": 0, + "lower_bound_increased_value": 0, + "upper_bound_increased_value": 0, + "average_increased_value": 0, + "average_increase": 0 + } + + return cls.estimate_valuation_improvement(current_value, current_epc, target_epc, total_cost) + + @classmethod + def estimate_valuation_improvement(cls, current_value, current_epc, target_epc, total_cost=None): + """ + This function estimates the value of a property based on the current EPC rating and the target EPC rating + :param current_value: + :param current_epc: + :param target_epc: + :param total_cost: + :return: + """ + if not current_value: return { "current_value": 0, @@ -225,7 +248,6 @@ class PropertyValuation: "average_increase": 0 } - current_epc = property_instance.data["current-energy-rating"] # We get the spectrum of ratings between the current and target EPC epc_band_range = cls.EPC_BANDS[cls.EPC_BANDS.index(current_epc): cls.EPC_BANDS.index(target_epc) + 1] diff --git a/etl/route_march_data_pull/app.py b/etl/route_march_data_pull/app.py index f9cb7cbb..ee6a46d3 100644 --- a/etl/route_march_data_pull/app.py +++ b/etl/route_march_data_pull/app.py @@ -24,21 +24,24 @@ load_dotenv(dotenv_path="backend/.env") EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") -def get_data(asset_list, fulladdress_column, address1_column, postcode_column, manual_uprn_map, epc_api_only=False): +def get_data( + asset_list, fulladdress_column, address1_column, postcode_column, manual_uprn_map, uprn_column=None, + epc_api_only=False +): epc_data = [] errors = [] no_epc = [] for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)): try: postcode = home[postcode_column] - house_number = home[address1_column].strip() + house_number = str(home[address1_column]).strip() full_address = home[fulladdress_column].strip() house_no = SearchEpc.get_house_number(address=str(house_number), postcode=postcode) if house_no is None: house_no = house_number uprn = manual_uprn_map.get(full_address, None) - if uprn is None and home.get("uprn"): - uprn = home["uprn"] + if uprn is None and home.get(uprn_column): + uprn = home[uprn_column] if pd.isnull(uprn): uprn = None @@ -149,7 +152,7 @@ def get_data(asset_list, fulladdress_column, address1_column, postcode_column, m return epc_data, errors, no_epc -def extract_address1(asset_list, full_address_col, method="first_two_words"): +def extract_address1(asset_list, full_address_col, postcode_col, method="first_two_words"): if method == "first_two_words": asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ") return asset_list @@ -158,6 +161,13 @@ def extract_address1(asset_list, full_address_col, method="first_two_words"): asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[0] return asset_list + if method == "house_number_extraction": + asset_list["address1_extracted"] = asset_list.apply( + lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]), + axis=1 + ) + return asset_list + raise ValueError(f"Method {method} not recognized") @@ -258,16 +268,29 @@ def app(): # - We want: fully insulated property (all wall types), EPC D or below (floors should be solid) # - Or the insulation required is loft/cavity (floors should be solid) - DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Community Housing" - DATA_FILENAME = "Community Housing PV data pull.xlsx" - SHEET_NAME = "Community Housing" - POSTCODE_COLUMN = "Postcode" + # For Westward + # DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Westward" + # DATA_FILENAME = "WESTWARD - completed list..xlsx" + # SHEET_NAME = "Sheet1" + # POSTCODE_COLUMN = "WFT EDIT Postcode" + # FULLADDRESS_COLUMN = "Address" + # ADDRESS1_COLUMN = None + # ADDRESS1_METHOD = "house_number_extraction" + # ADDRESS_COLS_TO_CONCAT = [] + # MISSING_POSTCODES_METHOD = None + # PROPERTY_YEAR_BUILT = "Build date" + # UPRN_COLUMN = "UPRN" + DATA_FOLDER = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Colchester" + DATA_FILENAME = "Warmfront data- Colchester Borough Homes (Complete).xlsx" + SHEET_NAME = "Sheet1" + POSTCODE_COLUMN = 'Full Address.1' FULLADDRESS_COLUMN = "Full Address" ADDRESS1_COLUMN = None ADDRESS1_METHOD = "first_word" ADDRESS_COLS_TO_CONCAT = [] MISSING_POSTCODES_METHOD = None - PROPERTY_YEAR_BUILT = "Build_Date" + PROPERTY_YEAR_BUILT = "Build Date" + UPRN_COLUMN = None # Maps addresses to uprn in problematic cases MANUAL_UPRN_MAP = {} @@ -299,7 +322,10 @@ def app(): if ADDRESS1_COLUMN is None: ADDRESS1_COLUMN = "address1_extracted" asset_list = extract_address1( - asset_list=asset_list, full_address_col=FULLADDRESS_COLUMN, method=ADDRESS1_METHOD + asset_list=asset_list, + full_address_col=FULLADDRESS_COLUMN, + postcode_col=POSTCODE_COLUMN, + method=ADDRESS1_METHOD ) if FULLADDRESS_COLUMN is None: @@ -315,6 +341,23 @@ def app(): asset_list[FULLADDRESS_COLUMN] = asset_list[FULLADDRESS_COLUMN].str.replace('\xa0', ' ', regex=False) asset_list[FULLADDRESS_COLUMN] = asset_list[FULLADDRESS_COLUMN].str.replace(' ', ' ', regex=False) + if UPRN_COLUMN is not None: + # Check if it's numeric and if so, make sure it's an integer + def convert_uprn(x): + + if pd.isnull(x): + return x + + # check if numeric + if np.isreal(x): + return str(int(x)) + + if str(x).isdigit(): + return str(int(x)) + return x + + asset_list[UPRN_COLUMN] = asset_list[UPRN_COLUMN].apply(convert_uprn) + # We check for duplicated addresses asset_list["deduper"] = asset_list[FULLADDRESS_COLUMN] + asset_list[POSTCODE_COLUMN] if asset_list["deduper"].duplicated().sum(): @@ -342,7 +385,8 @@ def app(): fulladdress_column=FULLADDRESS_COLUMN, address1_column=ADDRESS1_COLUMN, postcode_column=POSTCODE_COLUMN, - manual_uprn_map=MANUAL_UPRN_MAP + manual_uprn_map=MANUAL_UPRN_MAP, + uprn_column=UPRN_COLUMN ) # We now retrieve any failed properties @@ -535,6 +579,7 @@ def app(): # 3) If we have year in the asset list, we flag entries where the built year is different from the # EPC Age band if PROPERTY_YEAR_BUILT is not None: + raise Exception("THIS WAS WRONG!") asset_list["Does Age Match EPC Age Band?"] = asset_list.apply( lambda x: process_age_band(x, PROPERTY_YEAR_BUILT), axis=1 ) diff --git a/survey_report/app.py b/survey_report/app.py index 774d2a15..f6eddb8d 100644 --- a/survey_report/app.py +++ b/survey_report/app.py @@ -32,6 +32,15 @@ def generate_html_report(template_path, output_path, data): print(f"HTML report generated successfully: {output_path}") +def stringify_number(num: int, rounding: bool = True) -> str: + if num < 100000: # 5 figures or fewer + rounded_num = ((num + 99) // 100) * 100 if rounding else num + return f"{rounded_num:,}" + else: # More than 5 figures + rounded_num = ((num + 999) // 1000) * 1000 if rounding else num + return f"{rounded_num // 1000}k" + + class PlacidApi: # Errors as defined by docs: https://placid.app/docs/2.0/rest/errors ERROR_CODES = { @@ -89,7 +98,8 @@ class PlacidApi: ) response_body = response.json() - pdf_id = response_body["id"] + + return response_body def get_pdf(self, pdf_id: str): """ @@ -106,20 +116,22 @@ class PlacidApi: url = response_body["pdf_url"] # Download the PDF form this uurl pdf_download = requests.get(url) - with open("output.pdf", "wb") as f: + with open("survey_report/example_data/output.pdf", "wb") as f: f.write(pdf_download.content) -def handle(): +def handler(): """ Performs the data extraction process for the survey report :return: """ PLACID_API_KEY = "placid-mpkwidzer2mens9h-hifa3dmbxpfeghpa" - TEMPLATE_UUID = "hnwqgtumckfbf" + TEMPLATE_UUID = "5bst9mh1q9lk9" placid_api = PlacidApi(PLACID_API_KEY) + current_property_value = 250000 # Needs to be an input + EPC_COLOURS = { "A": "#117d58", "B": "#2da55c", @@ -136,26 +148,27 @@ def handle(): "WILLIS ROAD FLAT 1 PRE EPR SITE NOTES.pdf", "epr": "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data/Flat 1/3 WILLIS " "ROAD FLAT 1 PRE EPR PDF.pdf", - "scenario_epr": "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data/Flat 1/3 " - "WILLIS ROAD FLAT 1 POST EPR PDF.pdf" + "scenario_site_notes": "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data" + "/Flat 1/3 WILLIS ROAD FLAT 1 POST EPR SITE NOTES.pdf" }, { "site_notes": "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data/Flat 2/3 " "WILLIS ROAD FLAT 2 PRE EPR SITE NOTES.pdf", "epr": "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data/Flat 2/3 WILLIS " "ROAD FLAT 2 PRE EPR PDF.pdf", - "scenario_epr": "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data/Flat 2/3 " - "WILLIS ROAD FLAT 2 POST EPR PDF.pdf" + "scenario_site_notes": "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data" + "/Flat 2/3 WILLIS ROAD FLAT 2 POST EPR SITE NOTES.pdf" }, { "site_notes": "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data/Flat 3/3 " "WILLIS ROAD FLAT 3 PRE EPR SITE NOTES.pdf", "epr": "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data/Flat 3/3 WILLIS " "ROAD FLAT 3 PRE EPR PDF.pdf", - "scenario_epr": "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data/Flat 3/3 " - "WILLIS ROAD FLAT 3 POST EPR PDF.pdf" + "scenario_site_notes": "/Users/khalimconn-kowlessar/Documents/hestia/Model/survey_report/example_data" + "/Flat 3/3 WILLIS ROAD FLAT 3 POST EPR SITE NOTES.pdf" }, ] + data = [] for data_config in folders: @@ -181,26 +194,61 @@ def handle(): epr_extractor = EPRExtractor(file_mapping["epr"]) epr = epr_extractor.extract_all() - scenario_epr = EPRExtractor(file_mapping["scenario_epr"]) - scenario_epr = scenario_epr.extract_all() + # Valuation simulation + scenario_site_notes_extractor = SiteNotesExtractor(file_mapping["scenario_site_notes"]) + scenario_site_notes = scenario_site_notes_extractor.extract_all() + + from backend.ml_models.Valuation import PropertyValuation + valuation_uplift = PropertyValuation.estimate_valuation_improvement( + current_value=current_property_value, + current_epc=site_notes["Current EPC Band"], + target_epc=scenario_site_notes["Current EPC Band"], + ) + # TODO - should convert this, when it's more than 5 figures and we should certainly stringify this + + valuation_difference = round(valuation_uplift["average_increased_value"] - current_property_value) + + # Prepare the data for output + bill_savings = round( + site_notes['Estimated Annual Energy Cost (£)'] - scenario_site_notes['Estimated Annual Energy Cost (£)'] + ) + + carbon_savings = round( + site_notes["Current Carbon Emissions (TCO2)"] - scenario_site_notes["Current Carbon Emissions (TCO2)"], + 2 + ) + + payback_period = None + if payback_period is None: + raise NotImplementedError("Implement me") + + # We extract the measures from the site notes report_data = { - "template_uuid": TEMPLATE_UUID, "current_epc_rating": site_notes["Current EPC Band"], "current_epc_rating_colour": EPC_COLOURS[site_notes["Current EPC Band"]], - post_retrofit_epc_rating: str, - post_retrofit_epc_rating_colour: str, + "post_retrofit_epc_rating": scenario_site_notes["Current EPC Band"], + "post_retrofit_epc_rating_colour": EPC_COLOURS[scenario_site_notes["Current EPC Band"]], + "bill_savings": stringify_number(bill_savings), + "valuation_improvement": stringify_number(valuation_difference), + "carbon_savings": carbon_savings, + } # We now produce the combined data sheet which is the starting figure: - data_sheet = {**epr, **site_notes} - del data_sheet['Building Dimensions'] - # We unnest the Total Building Dimensions - data_sheet["Total Building Floor Area (m2)"] = data_sheet["Total Building Dimensions"]["floor_area"] - data_sheet["Total Building Heat Loss Area (m2)"] = data_sheet["Total Building Dimensions"]["heat_loss_area"] - del data_sheet["Total Building Dimensions"] + # data_sheet = {**epr, **site_notes} + # del data_sheet['Building Dimensions'] + # # We unnest the Total Building Dimensions + # data_sheet["Total Building Floor Area (m2)"] = data_sheet["Total Building Dimensions"]["floor_area"] + # data_sheet["Total Building Heat Loss Area (m2)"] = data_sheet["Total Building Dimensions"]["heat_loss_area"] + # del data_sheet["Total Building Dimensions"] - data.append(data_sheet) + create_pdf_response = placid_api.create_pdf( + template_uuid=TEMPLATE_UUID, **report_data + ) + # {'id': 769832, 'type': 'pdf', 'status': 'queued', 'pdf_url': None, 'transfer_url': None, 'passthrough': None} + # Download locally + placid_api.get_pdf(create_pdf_response["id"]) data = pd.DataFrame(data) diff --git a/survey_report/extraction/quidos.py b/survey_report/extraction/quidos.py index 374df084..2e772886 100644 --- a/survey_report/extraction/quidos.py +++ b/survey_report/extraction/quidos.py @@ -108,8 +108,98 @@ class SiteNotesExtractor: self.extract_carbon_emissions() self.extract_bills_estimate() self.extract_building_dimensions() + + # Extract specific measures + # Primary wall + # Secondary wall + # Roof + # Floor + # Heating system + # Hot water system + # Windows + # Doors + # Lighting + # Ventilation + # Solar + return self.data + def extract_walls(self): + """ + Extracts wall type, insulation, dry-lining, and thickness for each building part, + including any alternative wall details within the 7.0 Walls section of the summary PDF text. + """ + + text = self.text + wall_data = [] + + # Isolate the 7.0 Walls section + wall_section_match = re.search(r"7\.0 Walls\n(.*?)\n8\.0 Roofs", text, re.DOTALL) + if not wall_section_match: + raise ValueError("Failed to locate the walls section in the text.") + + wall_section = wall_section_match.group(1) + + # Define patterns to match walls for each building part + wall_pattern = re.compile( + r"(?P
Main Property(?: Alternative)?|Extension \d+)\s*\n" + r"(?:Construction\s*(?P[^\n]*)\n)?" + r"(?:Insulation\s*(?P[^\n]*)\n)?" + r"(?:Insulation Thickness\(mm\)\s*(?P[^\n]*)\n)?" + r"(?:Wall Thickness Measured\?\s*(?P[^\n]*)\n)?" + r"(?:Wall Thickness\(mm\)\s*(?P\d+))?", + re.MULTILINE + ) + + # TODO: We aren't effectively picking up alternative walls + # alt_wall_pattern = re.compile( + # r"Alternative Wall Sheltered\s*.*?\n" + # r".*?Construction\s*(?P[^\n]*)\n" + # r"Insulation\s*(?P[^\n]*)\n" + # r"Insulation Thickness\(mm\)\s*(?P[^\n]*)\n" + # r"Wall Thickness Measured\?\s*(?P[^\n]*)\n" + # r"Wall Thickness\(mm\)\s*(?P\d+)?", + # re.MULTILINE + # ) + + for match in wall_pattern.finditer(wall_section): + building_part = match.group("section") + # has_alternative_wall = "Alternative" in building_part + building_part = "Main Property" if "Main Property" in building_part else building_part + + wall_entry = { + "Building Part": building_part, + "Wall Type": match.group("construction") or "Unknown", + "Wall Insulation": match.group("insulation") or "Unknown", + "Insulation Thickness (mm)": match.group("insulation_thickness") or "Unknown", + "Wall Thickness Measured": match.group("thickness_measured") or "Unknown", + "Wall Thickness (mm)": int(match.group("thickness")) if match.group("thickness") and match.group( + "thickness").isdigit() else None, + "Alternative Wall Type": None, + "Alternative Wall Insulation": None, + "Alternative Insulation Thickness (mm)": None, + "Alternative Wall Thickness Measured": None, + "Alternative Wall Thickness (mm)": None, + } + + # Check if an alternative wall section exists + # if has_alternative_wall: + # alt_match = alt_wall_pattern.search(wall_section, match.end()) + # if alt_match: + # wall_entry["Alternative Wall Type"] = alt_match.group("alt_construction") or "Unknown" + # wall_entry["Alternative Wall Insulation"] = alt_match.group("alt_insulation") or "Unknown" + # wall_entry["Alternative Insulation Thickness (mm)"] = alt_match.group( + # "alt_insulation_thickness") or "Unknown" + # wall_entry["Alternative Wall Thickness Measured"] = alt_match.group( + # "alt_thickness_measured") or "Unknown" + # wall_entry["Alternative Wall Thickness (mm)"] = int( + # alt_match.group("alt_thickness")) if alt_match.group("alt_thickness") and alt_match.group( + # "alt_thickness").isdigit() else None + + wall_data.append(wall_entry) + + return wall_data + class EPRExtractor: """ @@ -123,7 +213,7 @@ class EPRExtractor: self.text = pdf_text self.data = {} - def extract_heating_data(self): + def extract_heating_consumption(self): """ Extracts space heating and water heating values from the report. """ @@ -162,5 +252,5 @@ class EPRExtractor: Runs all extraction methods and returns a dictionary with extracted data. """ self.extract_address() - self.extract_heating_data() + self.extract_heating_consumption() return self.data