From b8622457bd3dcd31cfb29f34b99400e823ddd6c6 Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Fri, 26 Apr 2024 15:34:36 +0100 Subject: [PATCH 1/4] start work on pulling out all recommendations in relation to properties --- etl/epc_recommendations/Pipeline.py | 327 +++++++++++++++++++++++ etl/epc_recommendations/requirements.txt | 4 + 2 files changed, 331 insertions(+) create mode 100644 etl/epc_recommendations/Pipeline.py create mode 100644 etl/epc_recommendations/requirements.txt diff --git a/etl/epc_recommendations/Pipeline.py b/etl/epc_recommendations/Pipeline.py new file mode 100644 index 00000000..a6de78e5 --- /dev/null +++ b/etl/epc_recommendations/Pipeline.py @@ -0,0 +1,327 @@ +# Pipeline to combined recommendations and certificates data together + +import pandas as pd +from pathlib import Path +from tqdm import tqdm +import multiprocessing as mp +import itertools +import requests +from bs4 import BeautifulSoup +import time + +DATA_DIRECTORY = ( + Path(__file__).parent.parent / "epc" / "local_data" / "all-domestic-certificates" +) +directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] +# Start with one folder in the local_data directory + + +class EPCRecommendationsPipeline: + + SEARCH_POSTCODE_URL = "https://find-energy-certificate.service.gov.uk/find-a-certificate/search-by-postcode?postcode={postcode_input}" + BASE_ENERGY_URL = "https://find-energy-certificate.service.gov.uk" + HEADERS = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36" + } + + def __init__(self, directories: list, use_parallel: bool = True): + self.directories = directories + self.use_parallel = use_parallel + + def determine_number_of_improvement_ids(self): + with mp.Pool() as pool: + results = list( + tqdm( + pool.imap(self._task_check_number_of_improvement_ids, directories), + total=len(directories), + ), + ) + + results = list(itertools.chain(*results)) + + self.number_improvement_ids = set(results) + + def extract_improvement_description(self): + with mp.Pool() as pool: + results = list( + tqdm( + pool.imap(self._task_extract_improvement_description, directories), + total=len(directories), + ), + ) + + results = pd.concat(results) + self.improvement_description_df = results.groupby("IMPROVEMENT_ID").sample(1) + + # improvement_description = self._get_descriptions_of_improvements( + # improvement_description_df + # ) + + # self.improvement_descriptions = improvement_description + + def _task_check_number_of_improvement_ids(self, directory: Path): + """ + Parallel task for checking the number of improvement ids + """ + + recommendations_filepath = directory / "recommendations.csv" + recommendations_df = pd.read_csv(recommendations_filepath) + + recommendations_df = recommendations_df[ + recommendations_df["IMPROVEMENT_ID"].notnull() + ] + recommendations_df["IMPROVEMENT_ID"] = recommendations_df[ + "IMPROVEMENT_ID" + ].astype(int) + + output = list(recommendations_df["IMPROVEMENT_ID"].unique()) + + return output + + def _task_extract_improvement_description(self, directory: Path) -> pd.DataFrame: + """ + Parallel task for checking the number of improvement ids + Flow will be get the certificates, + Find the latest EPC certificate for the UPRN, + Load the recommendations, + Merge on the LMK_KEY, + """ + + recommendations_filepath = directory / "recommendations.csv" + recommendations_df = pd.read_csv(recommendations_filepath) + + recommendations_df = recommendations_df[ + recommendations_df["IMPROVEMENT_ID"].notnull() + ] + recommendations_df["IMPROVEMENT_ID"] = recommendations_df[ + "IMPROVEMENT_ID" + ].astype(int) + + recommendations_df = recommendations_df[ + ~recommendations_df["IMPROVEMENT_SUMMARY_TEXT"].isnull() + ] + + recommendations_df = ( + recommendations_df.sort_values("IMPROVEMENT_ID") + .groupby("IMPROVEMENT_ID") + .head(1) + ) + + return recommendations_df + + def _task_extract_full_improvement_dataset(self, directory: Path) -> pd.DataFrame: + """ + Parallel task for checking the number of improvement ids + Flow will be get the certificates, + Find the latest EPC certificate for the UPRN, + Load the recommendations, + Merge on the LMK_KEY, + """ + + certificates_filepath = directory / "certificates.csv" + certificates_df = pd.read_csv(certificates_filepath) + + certificates_df = ( + certificates_df.sort_values("LODGEMENT_DATE", ascending=False) + .groupby("UPRN") + .head(1) + .reset_index(drop=True) + ) + + recommendations_filepath = directory / "recommendations.csv" + recommendations_df = pd.read_csv(recommendations_filepath) + + recommendations_df = recommendations_df[ + recommendations_df["IMPROVEMENT_ID"].notnull() + ] + recommendations_df["IMPROVEMENT_ID"] = recommendations_df[ + "IMPROVEMENT_ID" + ].astype(int) + + # sampled_df = recommendations_df.groupby("IMPROVEMENT_ID").sample(1) + + output = certificates_df.merge(recommendations_df, on="LMK_KEY", how="inner") + + return output + + def _get_descriptions_of_improvements( + self, improvement_description_df: pd.DataFrame + ) -> dict[int, str]: + """ + For each row of the improvement descriptions, get the description of the improvement via web scraping + """ + + improvement_description_mapping = {} + + for row in improvement_description_df.itertuples(): + # time.sleep(1) + postcode = row.POSTCODE + postcode_input = postcode.replace(" ", "+") + postcode_search = self.SEARCH_POSTCODE_URL.format( + postcode_input=postcode_input + ) + postcode_response = requests.get(postcode_search, headers=self.HEADERS) + + postcode_res = BeautifulSoup(postcode_response.text, features="html.parser") + address_links_full = postcode_res.findAll( + "a", {"class": "govuk-link", "rel": "nofollow"} + ) + address_links = { + element.text.lstrip().rstrip(): self.BASE_ENERGY_URL + element["href"] + for element in address_links_full + } + + address_links = {k.replace(",", ""): v for k, v in address_links.items()} + + adjusted_address = row.ADDRESS1.replace(",", "") + + address_link = [ + (k, v) for k, v in address_links.items() if adjusted_address in k + ] + + if len(address_link) == 0: + raise ValueError("Address not found") + + if len(address_link) > 1: + split_address_components = adjusted_address.split(" ") + for address in address_link: + if split_address_components[0] in address[0].split(" "): + chosen_epc = address[1] + break + raise ValueError("Multiple addresses found") + else: + chosen_epc = address_link[0][1] + + # time.sleep(1) + address_response = requests.get(chosen_epc, headers=self.HEADERS) + address_res = BeautifulSoup(address_response.text, features="html.parser") + + # epc_certificate = chosen_epc.split('/')[-1] + + # ratings = address_res.find("desc", {"id": "svg-desc"}).text + # current_rating = ratings.split(".")[0] + # potential_rating = ratings.split(".")[1] + + # new_property_df = pd.DataFrame( + # { + # "address": [address_link[0][0]], + # "epc_certificate": [epc_certificate], + # "current_epc_rating": [current_rating.split(" ")[-6]], + # "current_epc_efficiency": [current_rating.split(" ")[-1]], + # "potential_epc_rating": [potential_rating.split(" ")[-6]], + # "potential_epc_efficiency": [potential_rating.split(" ")[-1]], + # "LMK_KEY": [row.LMK_KEY], + # } + # ) + + improvements = address_res.find( + "div", + {"class": "govuk-body printable-area epb-recommended-improvements"}, + ) + + changes = improvements.find_all("h3") + changes_impact = improvements.find_all( + "dl", {"class": "govuk-summary-list"} + ) + element = list(zip(changes, changes_impact))[row.IMPROVEMENT_ITEM - 1] + + improvement_header = element[0].text + + col_name = improvement_header.split(":")[1].lstrip().rstrip() + # cost = element[1].find('dd', {"class": "govuk-summary-list__value"}).text.lstrip().rstrip() + + improvement_description_mapping[row.IMPROVEMENT_ID] = col_name + + +# headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'} +# postcode_input = postcode_input.replace(" ", "+") +# postcode_search = SEARCH_POSTCODE_URL.format(postcode_input=postcode_input) +# postcode_response = requests.get(postcode_search, headers=headers) + +# postcode_res = BeautifulSoup(postcode_response.text) +# address_links_full = postcode_res.findAll('a', {'class': 'govuk-link', 'rel': 'nofollow'}) +# address_links = {element.text.lstrip().rstrip(): BASE_ENERGY_URL + element['href'] for element in address_links_full} +# address_input = st.selectbox('Please select an address:', address_links.keys()) + +# if address_input is None: +# st.stop() + +# chosen_epc = address_links[address_input] + +# st.write("### The EPC Certificate of this property is:") +# epc_certificate = chosen_epc.split('/')[-1] +# st.write("##### " + epc_certificate) + +# address_response = requests.get(chosen_epc, headers=headers) +# address_res = BeautifulSoup(address_response.text) + +# svg = address_res.find("svg", {'class': 'epc-energy-rating-graph'}) +# render_svg(svg) + +# st.write("## Energy rating - current and potential") +# # st.write(address_res.find('desc', {'id': 'svg-desc'}).text) +# # st.image(address_res.find_all('svg', {'class': 'epc-energy-rating-graph'})[0]) +# ratings = address_res.find('desc', {'id': 'svg-desc'}).text + +# st.write('### Current EPC rating') +# current_rating = ratings.split(".")[0] +# st.write("##### " + current_rating) + +# st.write('### Potential EPC rating') +# potential_rating = ratings.split(".")[1] +# st.write("##### " + potential_rating) + +# new_property_df = pd.DataFrame( +# {'address': [address_input], +# 'epc_certificate': [epc_certificate], +# 'current_epc_rating': [current_rating.split(' ')[-6]], +# 'current_epc_efficiency': [current_rating.split(' ')[-1]], +# 'potential_epc_rating': [potential_rating.split(' ')[-6]], +# "potential_epc_efficiency": [potential_rating.split(' ')[-1]]} +# ) + +# st.write('### Changes that can be made:') +# improvements = address_res.find('div', {"class": "govuk-body printable-area epb-recommended-improvements"}) + +# if improvements is None: +# st.write("No changes suggested") +# else: +# changes = improvements.find_all('h3') +# changes_impact = improvements.find_all('dl', {"class": 'govuk-summary-list'}) + +# for element in zip(changes, changes_impact): +# improvement_header = element[0].text +# st.write("#### " + improvement_header) + +# improvement_text = element[1].text +# st.write(improvement_text) + +# col_name = improvement_header.split(":")[1] +# cost = element[1].find('dd', {"class": "govuk-summary-list__value"}).text.lstrip().rstrip() + +# impact = element[1].find('text', {"class": "govuk-!-font-weight-bold"}).text.split(" ") +# impact_num = impact[0] +# impact_cat = impact[1] +# print(cost) +# new_property_df[col_name] = True +# # cost_column = col_name + '-cost' +# # new_property_df.assign(cost_column=cost) +# new_property_df[col_name + '-cost'] = cost +# new_property_df[col_name + '-impact_num'] = impact_num +# new_property_df[col_name + '-impact_cat'] = impact_cat +# st.markdown("---") + +if __name__ == "__main__": + e = EPCRecommendationsPipeline(directories=directories, use_parallel=True) + e.determine_number_of_improvement_ids() + e.number_improvement_ids + e.extract_improvement_description() + e.improvement_description_df + + full_id = pd.DataFrame(e.number_improvement_ids, columns=["IMPROVEMENT_ID"]) + + e.improvement_description_df.merge( + full_id, on="IMPROVEMENT_ID", how="right" + ).to_markdown("improvement_description.md") + + # e. diff --git a/etl/epc_recommendations/requirements.txt b/etl/epc_recommendations/requirements.txt new file mode 100644 index 00000000..44d37f07 --- /dev/null +++ b/etl/epc_recommendations/requirements.txt @@ -0,0 +1,4 @@ +beautifulsoup4==4.12.3 +requests==2.31.0 +pandas==2.2.2 +tqdm==4.66.2 \ No newline at end of file From a66b3782950366f2213019ed533b8031be16c5fe Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Fri, 26 Apr 2024 18:51:59 +0100 Subject: [PATCH 2/4] Markdown file with all possible improvements --- etl/epc_recommendations/Pipeline.py | 35 +++++++++-- .../improvement_description.md | 59 +++++++++++++++++++ 2 files changed, 89 insertions(+), 5 deletions(-) create mode 100644 etl/epc_recommendations/improvement_description.md diff --git a/etl/epc_recommendations/Pipeline.py b/etl/epc_recommendations/Pipeline.py index a6de78e5..014d3c56 100644 --- a/etl/epc_recommendations/Pipeline.py +++ b/etl/epc_recommendations/Pipeline.py @@ -59,6 +59,26 @@ class EPCRecommendationsPipeline: # self.improvement_descriptions = improvement_description + def extract_full_improvement_dataset(self): + with mp.Pool() as pool: + results = list( + tqdm( + pool.imap(self._task_extract_full_improvement_dataset, directories), + total=len(directories), + ), + ) + + results_df = pd.concat(results) + + # Only sample one for each improvement as we just want to hit the find my energy website minimally for now + sampled_df = results_df.groupby("IMPROVEMENT_ID").sample(1) + + improvement_description = self._get_descriptions_of_improvements(sampled_df) + + self.improvement_description = improvement_description + + # self.full_improvement_df = sampled_df + def _task_check_number_of_improvement_ids(self, directory: Path): """ Parallel task for checking the number of improvement ids @@ -81,10 +101,6 @@ class EPCRecommendationsPipeline: def _task_extract_improvement_description(self, directory: Path) -> pd.DataFrame: """ Parallel task for checking the number of improvement ids - Flow will be get the certificates, - Find the latest EPC certificate for the UPRN, - Load the recommendations, - Merge on the LMK_KEY, """ recommendations_filepath = directory / "recommendations.csv" @@ -142,7 +158,9 @@ class EPCRecommendationsPipeline: output = certificates_df.merge(recommendations_df, on="LMK_KEY", how="inner") - return output + res = output.groupby("IMPROVEMENT_ID").sample(1) + + return res def _get_descriptions_of_improvements( self, improvement_description_df: pd.DataFrame @@ -232,6 +250,8 @@ class EPCRecommendationsPipeline: improvement_description_mapping[row.IMPROVEMENT_ID] = col_name + return improvement_description_mapping + # headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'} # postcode_input = postcode_input.replace(" ", "+") @@ -318,6 +338,11 @@ if __name__ == "__main__": e.extract_improvement_description() e.improvement_description_df + e.extract_full_improvement_dataset() + pd.DataFrame.from_dict( + e.improvement_description, orient="index", columns=["improvement_description"] + ).to_markdown("improvement_description.md") + full_id = pd.DataFrame(e.number_improvement_ids, columns=["IMPROVEMENT_ID"]) e.improvement_description_df.merge( diff --git a/etl/epc_recommendations/improvement_description.md b/etl/epc_recommendations/improvement_description.md new file mode 100644 index 00000000..abbf1eff --- /dev/null +++ b/etl/epc_recommendations/improvement_description.md @@ -0,0 +1,59 @@ +| | improvement_description | +|---:|:---------------------------------------------------------| +| 1 | Hot water cylinder insulation | +| 2 | Hot water cylinder insulation | +| 3 | Hot water cylinder insulation | +| 4 | Hot water cylinder thermostat | +| 5 | Floor insulation (suspended floor) | +| 6 | Cavity wall insulation | +| 7 | Internal or external wall insulation | +| 8 | Double glazed windows | +| 9 | Secondary glazing | +| 10 | Solar water heating | +| 11 | Heating controls (programmer, room thermostat and TRVs) | +| 12 | Heating controls (room thermostat and TRVs) | +| 13 | Heating controls (thermostatic radiator valves) | +| 14 | Heating controls (room thermostat) | +| 15 | Heating controls (programmer and TRVs) | +| 16 | Heating controls (time and temperature zone control) | +| 17 | Heating controls (programmer and room thermostat) | +| 18 | Heating controls (room thermostat) | +| 19 | Solar water heating | +| 20 | Replace boiler with new condensing boiler | +| 21 | Replace boiler with new condensing boiler | +| 22 | Replace boiler with biomass boiler | +| 23 | Biomass stove with boiler | +| 24 | Fan assisted storage heaters and dual immersion cylinder | +| 25 | Fan assisted storage heaters | +| 26 | Replacement warm air unit | +| 27 | Change heating to gas condensing boiler | +| 28 | Condensing oil boiler with radiators | +| 29 | Gas condensing boiler | +| 30 | Internal or external wall insulation | +| 31 | Fan-assisted storage heaters | +| 32 | Change heating to gas condensing boiler | +| 34 | Solar photovoltaic panels, 2.5 kWp | +| 35 | Low energy lighting | +| 36 | Condensing heating unit | +| 37 | Condensing boiler (separate from the range cooker) | +| 38 | Condensing boiler (separate from the range cooker) | +| 39 | Biomass stove with boiler | +| 40 | Change room heaters to condensing boiler | +| 41 | Translation missing | +| 42 | Mains gas condensing heating unit | +| 43 | Translation missing | +| 44 | Wind turbine | +| 45 | Flat roof or sloping ceiling insulation | +| 46 | Room-in-roof insulation | +| 47 | Floor insulation (solid floor) | +| 48 | High performance external doors | +| 49 | Heat recovery system for mixer showers | +| 50 | Flue gas heat recovery device in conjunction with boiler | +| 56 | Replacement glazing units | +| 57 | Floor insulation (suspended floor) | +| 58 | Floor insulation (solid floor) | +| 59 | High heat retention storage heaters | +| 60 | High heat retention storage heaters | +| 61 | High heat retention storage heaters | +| 62 | High heat retention storage heaters | +| 63 | Party wall insulation | \ No newline at end of file From fae578ba3aa620af3982b81a9c9e1c582c682991 Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Mon, 13 May 2024 08:22:21 +0100 Subject: [PATCH 3/4] push code to generate recommendations pickle file --- etl/epc/DataProcessor.py | 237 +++++++--- etl/epc/Pipeline.py | 88 +++- etl/epc/property_change_app.py | 1 + etl/epc_recommendations/Pipeline.py | 541 ++++++++-------------- recommendations/WindowsRecommendations.py | 36 +- 5 files changed, 476 insertions(+), 427 deletions(-) diff --git a/etl/epc/DataProcessor.py b/etl/epc/DataProcessor.py index a77bcaa3..6275177e 100644 --- a/etl/epc/DataProcessor.py +++ b/etl/epc/DataProcessor.py @@ -56,8 +56,11 @@ construction_age_remap = { expanded_map = { i: [ - label for label, bounds in construction_age_bounds_map.items() if (i <= bounds["u"]) and (i >= bounds['l']) - ][0] for i in range(0, 3001) + label + for label, bounds in construction_age_bounds_map.items() + if (i <= bounds["u"]) and (i >= bounds["l"]) + ][0] + for i in range(0, 3001) } @@ -74,8 +77,13 @@ class EPCDataProcessor: Handle data loading and data preprocessing """ - def __init__(self, data: pd.DataFrame | None = None, cleaning_averages: pd.DataFrame | None = None, - run_mode: str = "training", violation_mode: bool = False) -> None: + def __init__( + self, + data: pd.DataFrame | None = None, + cleaning_averages: pd.DataFrame | None = None, + run_mode: str = "training", + violation_mode: bool = False, + ) -> None: """ :param filepath: If specified, is the physical location of the data :param is_newdata: Indicates if we are processing new, testing data. @@ -86,7 +94,9 @@ class EPCDataProcessor: self.data: pd.DataFrame = data if is_data_a_dataframe else pd.DataFrame() is_cleaning_averages_a_dataframe = isinstance(cleaning_averages, pd.DataFrame) - self.cleaning_averages: pd.DataFrame = cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame() + self.cleaning_averages: pd.DataFrame = ( + cleaning_averages if is_cleaning_averages_a_dataframe else pd.DataFrame() + ) # FOR NOW IF VIOLATION MODE IS ON, WE USE RUN MODE AS NEWDATA self.violation_mode = violation_mode @@ -103,7 +113,9 @@ class EPCDataProcessor: ignore_step = True if self.run_mode == "newdata" else False if filepath is not None: - self.load_data(filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"]) + self.load_data( + filepath=filepath, low_memory=DATA_PROCESSOR_SETTINGS["low_memory"] + ) if len(self.data) == 0: raise Exception("No data to process - check filepath/ data being passed in") @@ -121,7 +133,8 @@ class EPCDataProcessor: self.clean_multi_glaze_proportion(ignore_step=ignore_step) self.clean_photo_supply() self.retain_multiple_epc_properties( - epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"], ignore_step=ignore_step + epc_minimum_count=DATA_PROCESSOR_SETTINGS["epc_minimum_count"], + ignore_step=ignore_step, ) self.fill_na_fields() @@ -188,7 +201,9 @@ class EPCDataProcessor: if ignore_step: return - self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[0] + self.cleaning_averages["LOCAL_AUTHORITY"] = self.data["LOCAL_AUTHORITY"].values[ + 0 + ] def fill_invalid_constituency_fields(self, ignore_step: bool = False): """ @@ -201,7 +216,9 @@ class EPCDataProcessor: if ignore_step: return - self.data = self.data.fillna({"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]}) + self.data = self.data.fillna( + {"CONSTITUENCY": self.data["CONSTITUENCY"].mode().values[0]} + ) def sort_data_by_uprn_lodgement_date(self, ignore_step: bool = False): """ @@ -301,7 +318,7 @@ class EPCDataProcessor: """ if self.violation_mode: - # TODO: to fill in + # TODO: to fill in return if ignore_step: @@ -311,9 +328,7 @@ class EPCDataProcessor: lambda x: self.clean_construction_age_band(x) ) - self.data = self.data[ - ~pd.isnull(self.data["CONSTRUCTION_AGE_BAND"]) - ] + self.data = self.data[~pd.isnull(self.data["CONSTRUCTION_AGE_BAND"])] def clean_missing_rooms(self, ignore_step: bool = False): """ @@ -331,31 +346,45 @@ class EPCDataProcessor: return # TODO: DO we want to move this out of this function? (i.e. alter the data before we do any cleaning) - self.data["POSTAL_AREA"] = self.data["POSTCODE"].apply(lambda x: x.split(" ")[0]) + self.data["POSTAL_AREA"] = self.data["POSTCODE"].apply( + lambda x: x.split(" ")[0] + ) def apply_clean(data, matching_columns): - cleaning_data = data[~pd.isnull(data[col])].groupby( - matching_columns - )[col].median().reset_index() - - data = data.merge( - cleaning_data, how="left", on=matching_columns, suffixes=("", "_CLEANING") + cleaning_data = ( + data[~pd.isnull(data[col])] + .groupby(matching_columns)[col] + .median() + .reset_index() ) - data[col] = np.where(pd.isnull(data[col]), data[f"{col}_CLEANING"], data[col]) + data = data.merge( + cleaning_data, + how="left", + on=matching_columns, + suffixes=("", "_CLEANING"), + ) + + data[col] = np.where( + pd.isnull(data[col]), data[f"{col}_CLEANING"], data[col] + ) data = data.drop(columns=f"{col}_CLEANING") return data for col in ["NUMBER_HEATED_ROOMS", "NUMBER_HABITABLE_ROOMS"]: to_index = 3 - matching_columns = ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "POSTAL_AREA"] + matching_columns = [ + "PROPERTY_TYPE", + "BUILT_FORM", + "CONSTRUCTION_AGE_BAND", + "POSTAL_AREA", + ] has_missings = pd.isnull(self.data[col]).sum() while has_missings: self.data = apply_clean( - data=self.data, - matching_columns=matching_columns[0:to_index + 1] + data=self.data, matching_columns=matching_columns[0 : to_index + 1] ) has_missings = pd.isnull(self.data[col]).sum() @@ -363,7 +392,10 @@ class EPCDataProcessor: # Check if we've gotten to index 0 and still have missings - something has gone wrong or # we have a very unique property type if has_missings: - raise NotImplementedError("Handle this edge case, we still have missings for column %s" % col) + raise NotImplementedError( + "Handle this edge case, we still have missings for column %s" + % col + ) break to_index -= 1 @@ -410,7 +442,7 @@ class EPCDataProcessor: # coltypes = {k: v for k, v in COLUMNTYPES.items() if k in self.data.columns} if self.is_newdata else # COLUMNTYPES # for k, v in coltypes.items(): - # self.data[k] = self.data[k].astype(v) + # self.data[k] = self.data[k].astype(v) # self.data = self.data.astype(coltypes) # self.na_remapping() @@ -437,9 +469,11 @@ class EPCDataProcessor: def na_remapping(self, auto_subset_columns: bool = False): - fill_na_map_apply = { - k: v for k, v in fill_na_map.items() if k in self.data.columns - } if auto_subset_columns else fill_na_map + fill_na_map_apply = ( + {k: v for k, v in fill_na_map.items() if k in self.data.columns} + if auto_subset_columns + else fill_na_map + ) for column, fill_value in fill_na_map_apply.items(): self.data[column] = self.data[column].fillna(fill_value) @@ -535,28 +569,34 @@ class EPCDataProcessor: for variable in AVERAGE_FIXED_FEATURES: # Replace any missing NAN values with averages for the same Property type and built form - cleaning_averages_filled[variable] = cleaning_averages_filled[variable].fillna( - cleaning_averages_filled[f"{variable}_AVERAGE"] - ) + cleaning_averages_filled[variable] = cleaning_averages_filled[ + variable + ].fillna(cleaning_averages_filled[f"{variable}_AVERAGE"]) - cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_AVERAGE") + cleaning_averages_filled = cleaning_averages_filled.drop( + columns=f"{variable}_AVERAGE" + ) # If there are still NA values i.e. the averages do not have values for a speicifc group of property tyope # and built form # We can use just the property type average and replace - cleaning_averages_filled[variable] = cleaning_averages_filled[variable].fillna( - cleaning_averages_filled[f"{variable}_PROPERTY_AVERAGE"] - ) + cleaning_averages_filled[variable] = cleaning_averages_filled[ + variable + ].fillna(cleaning_averages_filled[f"{variable}_PROPERTY_AVERAGE"]) - cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_PROPERTY_AVERAGE") + cleaning_averages_filled = cleaning_averages_filled.drop( + columns=f"{variable}_PROPERTY_AVERAGE" + ) # If there are still NA values, use BUILT FORM averages - cleaning_averages_filled["variable"] = cleaning_averages_filled[variable].fillna( - cleaning_averages_filled[f"{variable}_BUILT_FORM_AVERAGE"] - ) + cleaning_averages_filled["variable"] = cleaning_averages_filled[ + variable + ].fillna(cleaning_averages_filled[f"{variable}_BUILT_FORM_AVERAGE"]) - cleaning_averages_filled = cleaning_averages_filled.drop(columns=f"{variable}_BUILT_FORM_AVERAGE") + cleaning_averages_filled = cleaning_averages_filled.drop( + columns=f"{variable}_BUILT_FORM_AVERAGE" + ) # If there still is na values, use average across all epc in consituecy cleaning_averages_filled[variable] = cleaning_averages_filled[ @@ -573,7 +613,9 @@ class EPCDataProcessor: self.cleaning_averages = cleaning_averages_filled - def retain_multiple_epc_properties(self, epc_minimum_count: int = 1, ignore_step: bool = False) -> None: + def retain_multiple_epc_properties( + self, epc_minimum_count: int = 1, ignore_step: bool = False + ) -> None: """ Reduce the data futher by keeping only datasets with multiple epcs """ @@ -592,12 +634,16 @@ class EPCDataProcessor: counts = counts[counts["count"] > epc_minimum_count] self.data = pd.merge(self.data, counts, on="UPRN") - def recast_df_columns(self, column_mappings: dict, auto_subset_columns: bool = False) -> None: + def recast_df_columns( + self, column_mappings: dict, auto_subset_columns: bool = False + ) -> None: """ Recast columns from the dataframe to ensure the behaviour we want """ if auto_subset_columns: - column_mappings = {k: v for k, v in column_mappings.items() if k in self.data.columns} + column_mappings = { + k: v for k, v in column_mappings.items() if k in self.data.columns + } for key, values in column_mappings.items(): if key not in self.data.columns: @@ -608,13 +654,17 @@ class EPCDataProcessor: else: self.data[key] = self.data[key].astype(values) - def recast_all_data(self, column_mappings: dict, auto_subset_columns: bool = False) -> None: + def recast_all_data( + self, column_mappings: dict, auto_subset_columns: bool = False + ) -> None: """ Using a dictionary to recast all columns at once """ if auto_subset_columns: - column_mappings = {k: v for k, v in column_mappings.items() if k in self.data.columns} + column_mappings = { + k: v for k, v in column_mappings.items() if k in self.data.columns + } self.data = self.data.astype(column_mappings) @@ -625,14 +675,28 @@ class EPCDataProcessor: if self.violation_mode: violation_uprn_missing = pd.isnull(self.data["UPRN"]) - violation_old_lodgment_date = self.data["LODGEMENT_DATE"] < EARLIEST_EPC_DATE - violation_invalid_transaction_type = self.data["TRANSACTION_TYPE"] == IGNORED_TRANSACTION_TYPES - violation_ignored_floor_level = self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS) + violation_old_lodgment_date = ( + self.data["LODGEMENT_DATE"] < EARLIEST_EPC_DATE + ) + violation_invalid_transaction_type = ( + self.data["TRANSACTION_TYPE"] == IGNORED_TRANSACTION_TYPES + ) + violation_ignored_floor_level = self.data["FLOOR_LEVEL"].isin( + IGNORED_FLOOR_LEVELS + ) violation_rdsap_score_above_max = self.data[RDSAP_RESPONSE] > MAX_SAP_SCORE - violation_missing_windows_description = pd.isnull(self.data["WINDOWS_DESCRIPTION"]) - violation_missing_hotwater_description = pd.isnull(self.data["HOTWATER_DESCRIPTION"]) - violation_missing_roof_description = pd.isnull(self.data["ROOF_DESCRIPTION"]) - violation_invalid_property_type = self.data["PROPERTY_TYPE"] == IGNORED_PROPERTY_TYPES + violation_missing_windows_description = pd.isnull( + self.data["WINDOWS_DESCRIPTION"] + ) + violation_missing_hotwater_description = pd.isnull( + self.data["HOTWATER_DESCRIPTION"] + ) + violation_missing_roof_description = pd.isnull( + self.data["ROOF_DESCRIPTION"] + ) + violation_invalid_property_type = ( + self.data["PROPERTY_TYPE"] == IGNORED_PROPERTY_TYPES + ) violation_invalid_tenure = self.data["TENURE"].isin(IGNORED_TENURES) violation_df = pd.concat( @@ -647,7 +711,8 @@ class EPCDataProcessor: violation_missing_roof_description, violation_invalid_property_type, violation_invalid_tenure, - ], axis=1, + ], + axis=1, keys=[ "violation_uprn_missing", "violation_old_lodgment_date", @@ -658,8 +723,8 @@ class EPCDataProcessor: "violation_missing_hotwater_description", "violation_missing_roof_description", "violation_invalid_property_type", - "violation_invalid_tenure" - ] + "violation_invalid_tenure", + ], ) self.data = pd.concat([self.data, violation_df], axis=1) @@ -685,10 +750,10 @@ class EPCDataProcessor: self.data = self.data[~pd.isnull(self.data["UPRN"])] self.data = self.data[self.data["LODGEMENT_DATE"] >= EARLIEST_EPC_DATE] - self.data = self.data[self.data["TRANSACTION_TYPE"] != IGNORED_TRANSACTION_TYPES] self.data = self.data[ - ~self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS) + self.data["TRANSACTION_TYPE"] != IGNORED_TRANSACTION_TYPES ] + self.data = self.data[~self.data["FLOOR_LEVEL"].isin(IGNORED_FLOOR_LEVELS)] self.data = self.data[self.data[RDSAP_RESPONSE] <= MAX_SAP_SCORE] # We observed 7 final records with missing windows and 2 records with missing hot water so we shall remove them @@ -705,7 +770,10 @@ class EPCDataProcessor: self.data = self.data[~self.data["TENURE"].isin(IGNORED_TENURES)] # We remap zero values to None - self.data.loc[self.data['FLOOR_HEIGHT'] == 0, 'FLOOR_HEIGHT'] = None + self.data.loc[self.data["FLOOR_HEIGHT"] == 0, "FLOOR_HEIGHT"] = None + + # Keep only non zero floor area + self.data = self.data[self.data["TOTAL_FLOOR_AREA"] != 0] def clean_multi_glaze_proportion(self, ignore_step: bool = False) -> None: """ @@ -734,7 +802,11 @@ class EPCDataProcessor: @staticmethod def apply_averages_cleaning( - data_to_clean, cleaning_data, cols_to_merge_on, colnames=None, ignore_step: bool = False + data_to_clean, + cleaning_data, + cols_to_merge_on, + colnames=None, + ignore_step: bool = False, ): """ Clean the input DataFrame using averages from a cleaning DataFrame. @@ -752,12 +824,13 @@ class EPCDataProcessor: # The desired colnames to clean - which may not be present if colnames is None: - colnames = ["TOTAL_FLOOR_AREA", "FLOOR_HEIGHT", "FIXED_LIGHTING_OUTLETS_COUNT"] + colnames = [ + "TOTAL_FLOOR_AREA", + "FLOOR_HEIGHT", + "FIXED_LIGHTING_OUTLETS_COUNT", + ] - cols_to_clean = [ - c for c in colnames if - c in data_to_clean.columns - ] + cols_to_clean = [c for c in colnames if c in data_to_clean.columns] # Enforce data types for col in ["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"]: @@ -768,7 +841,15 @@ class EPCDataProcessor: # Calculate averages cleaning_averages_to_merge = cleaning_data.groupby(columns_to_merge_on).agg( - dict(zip(cols_to_clean, ["mean", ] * len(cols_to_clean))) + dict( + zip( + cols_to_clean, + [ + "mean", + ] + * len(cols_to_clean), + ) + ) ) # Merge with the original data @@ -777,7 +858,7 @@ class EPCDataProcessor: cleaning_averages_to_merge, on=columns_to_merge_on, suffixes=("", "_AVERAGE"), - how='left' + how="left", ) global_averages = cleaning_data[cols_to_clean].mean() @@ -806,14 +887,20 @@ class EPCDataProcessor: raise Exception("Suffix should be one of _starting or _ending") if suffix == "_STARTING": - starting_cols = self.data[STARTING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES].copy().add_suffix(suffix) + starting_cols = ( + self.data[STARTING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES] + .copy() + .add_suffix(suffix) + ) fixed_cols = self.data[NO_SUFFIX_COMPONENT_COLS + POTENTIAL_COLUMNS].copy() return pd.concat([starting_cols, fixed_cols], axis=1) - return self.data[ - ENDING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES - ].copy().add_suffix(suffix) + return ( + self.data[ENDING_SUFFIX_COMPONENT_COLS + EFFICIENCY_FEATURES] + .copy() + .add_suffix(suffix) + ) def get_fixed_features(self) -> pd.DataFrame: """ @@ -831,14 +918,17 @@ class EPCDataProcessor: :param cols_to_ignore: If specified, is a list of columns to ignore, e.g. uuids :return: DataFrame with coerced columns. """ - object_columns = df.select_dtypes(include=['object']).columns + object_columns = df.select_dtypes(include=["object"]).columns if cols_to_ignore: object_columns = [c for c in object_columns if c not in cols_to_ignore] for column in object_columns: unique_values = df[column].dropna().unique() # If the unique values in the column are 'True' and 'False', convert the column to boolean - if set(unique_values) == {'True', 'False'} or set(unique_values) == {True, False}: + if set(unique_values) == {"True", "False"} or set(unique_values) == { + True, + False, + }: df[column] = df[column].astype(bool) return df @@ -877,7 +967,6 @@ class EPCDataProcessor: @staticmethod def clean_efficiency_variables(df): - """ These is scope to clean this by the model per corresponding description. E.g. for WALLS_ENG_EFF we could look at the mode efficiency rating by description and diff --git a/etl/epc/Pipeline.py b/etl/epc/Pipeline.py index 6abf05bd..dcd5f6a7 100644 --- a/etl/epc/Pipeline.py +++ b/etl/epc/Pipeline.py @@ -87,6 +87,8 @@ class EPCPipeline: epc_all_equal_rows_key="sap_change_model/{}/all_equal_rows_rooms.parquet", epc_compiled_dataset_key="sap_change_model/{}/dataset_rooms.parquet", use_parallel=False, + use_recommendations=False, + epc_recommendations_file="recommendations.csv", ): """ :param directories: List of directories to process @@ -101,6 +103,7 @@ class EPCPipeline: self.compiled_dataset: pd.DataFrame = pd.DataFrame() self.compiled_all_equal_rows: list = [] self.compiled_cleaning_averages: list = [] + self.recommendation_dataset: pd.DataFrame = pd.DataFrame() self.directories = directories self.epc_data_processor = epc_data_processor @@ -109,6 +112,9 @@ class EPCPipeline: self.epc_local_file = epc_local_file self.epc_bucket_name = epc_bucket_name + self.use_recommendations = use_recommendations + self.epc_recommendations_file = epc_recommendations_file + self.use_parallel = use_parallel self.timeprefix = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") @@ -194,6 +200,9 @@ class EPCPipeline: self.compiled_dataset = pd.concat( [self.compiled_dataset, result["dataset"]] ) + self.recommendation_dataset = pd.concat( + [self.recommendation_dataset, result["recommendation_dataset"]] + ) self.compiled_cleaning_averages.append(result["cleaning_averages"]) self.compiled_all_equal_rows.extend(result["all_equal_rows"]) @@ -208,6 +217,7 @@ class EPCPipeline: "dataset": self.compiled_dataset, "cleaning_averages": self.epc_data_processor.cleaning_averages, "all_equal_rows": self.compiled_all_equal_rows, + "recommendation_dataset": self.recommendation_dataset, } return output @@ -224,15 +234,54 @@ class EPCPipeline: constituency_data = self.epc_data_processor.data + if self.use_recommendations: + + # Use only the most recent epc for each uprn + constituency_data = constituency_data.sort_values( + "lodgement_date", ascending=False + ).drop_duplicates("uprn") + + recommendations_filepath = directory / self.epc_recommendations_file + recommendations_df = pd.read_csv(recommendations_filepath) + + recommendations_df = recommendations_df[ + recommendations_df["IMPROVEMENT_ID"].notnull() + ] + recommendations_df["IMPROVEMENT_ID"] = recommendations_df[ + "IMPROVEMENT_ID" + ].astype(int) + recommendations_df.columns = recommendations_df.columns.str.lower() + + # Get all recommendations for all properties in the constituency (after cleaning) + recommendations_df = recommendations_df.merge( + constituency_data[["lmk_key", "uprn"]], on="lmk_key", how="inner" + ) + + # Keep all properties that have recommendations + constituency_data = constituency_data[ + constituency_data["lmk_key"].isin(recommendations_df["lmk_key"]) + ] + + # In order to create a difference record, we repeat each row for each uprn + constituency_data = pd.concat( + [constituency_data, constituency_data] + ).reset_index(drop=True) + constituency_data = constituency_data.sort_values("uprn") + self.compiled_cleaning_averages.append( self.epc_data_processor.cleaning_averages ) constituency_difference_records = [] + require_adequate_data_check = False if self.use_recommendations else True + for uprn, property_data in constituency_data.groupby("uprn", observed=True): difference_records = self.process_uprn( - uprn=str(uprn), property_data=property_data, directory=directory + uprn=str(uprn), + property_data=property_data, + directory=directory, + require_adequate_data_check=require_adequate_data_check, ) if difference_records is not None: constituency_difference_records.extend(difference_records) @@ -245,7 +294,18 @@ class EPCPipeline: [self.compiled_dataset, constituency_dataset.df] ) - def process_uprn(self, uprn: str, property_data: pd.DataFrame, directory: Path): + if self.use_recommendations: + self.recommendation_dataset = pd.concat( + [self.recommendation_dataset, recommendations_df] + ) + + def process_uprn( + self, + uprn: str, + property_data: pd.DataFrame, + directory: Path, + require_adequate_data_check: bool = True, + ): """ Process a single UPRN, which may have multiple different EPCs :param uprn: UPRN @@ -279,13 +339,18 @@ class EPCPipeline: # We can use multiple types of comparison datasets - i.e. Compare consecutive records, or compare all permutations of records property_difference_records = self._generate_property_difference_records( - epc_records, uprn, directory, fixed_data + epc_records, uprn, directory, fixed_data, require_adequate_data_check ) return property_difference_records def _generate_property_difference_records( - self, epc_records: List[EPCRecord], uprn: str, directory: Path, fixed_data: dict + self, + epc_records: List[EPCRecord], + uprn: str, + directory: Path, + fixed_data: dict, + require_adequate_data_check: bool = True, ): """ We can use multiple types of comparison datasets, for example: @@ -301,7 +366,12 @@ class EPCPipeline: # property_difference_records = self._compare_consecutive_epcs(epc_records, uprn, directory, fixed_data, property_difference_records) property_difference_records = self._compare_all_permutation_epcs( - epc_records, uprn, directory, fixed_data, property_difference_records + epc_records, + uprn, + directory, + fixed_data, + property_difference_records, + require_adequate_data_check, ) return property_difference_records @@ -313,6 +383,7 @@ class EPCPipeline: directory: Path, fixed_data: dict, property_difference_records: list, + require_adequate_data_check: bool = True, ): """ Compare all permutations of EPCs for a given UPRN @@ -337,7 +408,10 @@ class EPCPipeline: # TODO: Pull out RDSAP_CHANGE to a variable if difference_record.get("rdsap_change") == 0: - if not difference_record.ensure_adequate_data(): + if ( + not difference_record.ensure_adequate_data() + and require_adequate_data_check + ): # Rdsap hasn't changed but we have enough data to use this record # i.e. all fields aside from mechnical ventilation are the same] # self.check_records.append({"uprn": uprn, "directory_name": directory.name, "difference_record": difference_record, "earliest_record": earliest_record, "latest_record": latest_record}) @@ -347,7 +421,7 @@ class EPCPipeline: fields=[x.lower() for x in CORE_COMPONENT_FEATURES] ) - if all_equal: + if all_equal and require_adequate_data_check: # Keep track of this for the moment so we can analyse self.compiled_all_equal_rows.append( {"uprn": uprn, "directory_name": directory.name} diff --git a/etl/epc/property_change_app.py b/etl/epc/property_change_app.py index c985567d..b6e0f6e6 100644 --- a/etl/epc/property_change_app.py +++ b/etl/epc/property_change_app.py @@ -18,6 +18,7 @@ def main(): directories=directories, use_parallel=True, epc_data_processor=EPCDataProcessor(run_mode="training"), + use_recommendations=True, ) epc_pipeline.run() diff --git a/etl/epc_recommendations/Pipeline.py b/etl/epc_recommendations/Pipeline.py index 014d3c56..de98a520 100644 --- a/etl/epc_recommendations/Pipeline.py +++ b/etl/epc_recommendations/Pipeline.py @@ -1,352 +1,223 @@ -# Pipeline to combined recommendations and certificates data together +# Pipeline to load all EPC data similar to EPCPipeline but once data is made into EPCRecord, +# We intantiate a Property instance so that we can get both the recommendations and the classification of the +# walls, roof and floor (i.e. average, above average etc) + +import os +from datetime import datetime +import itertools +from tqdm import tqdm import pandas as pd +from etl.epc.Record import EPCRecord +from backend.SearchEpc import SearchEpc + +from sqlalchemy.orm import sessionmaker + +from backend.app.config import get_settings +from backend.app.db.connection import db_engine +from backend.app.db.functions.materials_functions import get_materials + +from backend.app.plan.utils import get_cleaned + +from backend.Property import Property +from etl.solar.SolarPhotoSupply import SolarPhotoSupply + +from recommendations.Recommendations import Recommendations +from utils.logger import setup_logger +from utils.s3 import read_dataframe_from_s3_parquet, save_dataframe_to_s3_parquet + +from datetime import datetime + +now = datetime.now().strftime("%d-%m-%Y-%H-%M-%S") + +logger = setup_logger() + +logger.info("Connecting to db") +session = sessionmaker(bind=db_engine)() +created_at = datetime.now().isoformat() + +session.begin() +logger.info("Getting the inputs") + +cleaning_data = read_dataframe_from_s3_parquet( + bucket_name=get_settings().DATA_BUCKET, + file_key="sap_change_model/cleaning_dataset.parquet", +) + +materials = get_materials(session) +cleaned = get_cleaned() + +uprn_filenames = read_dataframe_from_s3_parquet( + bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet" +) +photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load( + bucket=get_settings().DATA_BUCKET +) + +scenario_properties_df = pd.read_csv( + Path(__file__).parent / "improvement_data_sample.csv" +) + +improvement_id_to_check = 1 +properties_to_check = scenario_properties_df[ + scenario_properties_df["IMPROVEMENT_ID"] == improvement_id_to_check +] + +property_list = [] + +for i, row in tdqm(properties_to_check.iterrows()): + try: + epc_searcher = SearchEpc( + address1=row["ADDRESS1"], + postcode=row["POSTCODE"], + auth_token=get_settings().EPC_AUTH_TOKEN, + os_api_key=get_settings().ORDNANCE_SURVEY_API_KEY, + ) + epc_searcher.find_property() + + epc_records = { + "original_epc": epc_searcher.newest_epc.copy(), + "full_sap_epc": epc_searcher.full_sap_epc.copy(), + "old_data": epc_searcher.older_epcs.copy(), + } + + prepared_epc = EPCRecord( + epc_records=epc_records, run_mode="newdata", cleaning_data=cleaning_data + ) + + p = Property( + id=prepared_epc.uprn, + address=epc_searcher.address_clean, + postcode=epc_searcher.postcode_clean, + epc_record=prepared_epc, + ) + + p.get_spatial_data(uprn_filenames) + p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds) + + recommender = Recommendations(property_instance=p, materials=materials) + property_recommendations = recommender.recommend() + + wall_recommendations = recommender.wall_recomender.recommendations + loft_recommendations = recommender.roof_recommender.recommendations + solar_recommendations = recommender.solar_recommender.recommendation + windows_recommendations = recommender.windows_recommender.recommendation + + p.create_base_difference_epc_record(cleaned_lookup=cleaned) + + property_list.append(p.base_difference_record.df) + except: + pass + +property_df = pd.concat(property_list) + +property_df["walls_insulation_thickness"] + +scenario_properties = [ + { + "address": "2 South Terrace", + "postcode": "NN1 5JY", + "lmk-key": "1459796789102016070507274146560098", + "measures": [ + [ + ["internal_wall_insulation"], + "11", + {"walls_insulation_thickness_ending": "average"}, + [0], + ], + [ + ["external_wall_insulation"], + "10", + {"walls_insulation_thickness_ending": "average"}, + [0], + ], + [["solar", "windows"], "15", {"photo_supply_ending": 50}, [0, 1]], + ], + }, + { + "address": "8 Lindlings", + "postcode": "HP1 2HA", + "lmk-key": "c14029235739827d5f627dc8aa9bb567d026b267e851e0db0001db24638667b1", + "measures": [ + [ + ["cavity_wall_insulation", "loft_insulation"], + "15", + {"walls_insulation_thickness_ending": "average"}, + [0, 1], + ], + ], + }, + { + "address": "44 Lindlings", + "postcode": "HP1 2HE", + "lmk-key": "99296a6dda21314fef3a61cda59e441e9a2aacf115eb96f4a0fa85696bf7b117", + "measures": [ + [ + ["cavity_wall_insulation", "loft_insulation"], + "15", + {"walls_insulation_thickness_ending": "average"}, + [0, 1], + ], + ], + }, + { + "address": "46 Chaulden Terrace", + "postcode": "HP1 2AN", + "lmk-key": "d1e0534be3a44c33003323b21d0e322e3daddc65b5ee71936f89c59ddab96b50", + "measures": [ + [ + ["cavity_wall_insulation", "loft_insulation"], + "15", + {"walls_insulation_thickness_ending": "average"}, + [0, 1], + ], + ], + }, + { + "address": "73 Long Chaulden", + "postcode": "HP1 2HX", + "lmk-key": "1eae354db522a95188018d9cd0502ed8c609910b6c88f8797d3a25f59b11770a", + "measures": [ + [ + ["cavity_wall_insulation", "loft_insulation"], + "15", + {"walls_insulation_thickness_ending": "average"}, + [0, 1], + ], + ], + }, +] + + from pathlib import Path -from tqdm import tqdm -import multiprocessing as mp -import itertools -import requests -from bs4 import BeautifulSoup -import time +from etl.epc.DataProcessor import EPCDataProcessor +from etl.epc.Pipeline import EPCPipeline DATA_DIRECTORY = ( Path(__file__).parent.parent / "epc" / "local_data" / "all-domestic-certificates" ) -directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] -# Start with one folder in the local_data directory -class EPCRecommendationsPipeline: +def main(): + """ + Orchestration function + """ - SEARCH_POSTCODE_URL = "https://find-energy-certificate.service.gov.uk/find-a-certificate/search-by-postcode?postcode={postcode_input}" - BASE_ENERGY_URL = "https://find-energy-certificate.service.gov.uk" - HEADERS = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36" - } + directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] - def __init__(self, directories: list, use_parallel: bool = True): - self.directories = directories - self.use_parallel = use_parallel + # Set up the a new pipeline only up into the EPCRecord stage + # So that we can instantiate a Property instance and get the recommendations - def determine_number_of_improvement_ids(self): - with mp.Pool() as pool: - results = list( - tqdm( - pool.imap(self._task_check_number_of_improvement_ids, directories), - total=len(directories), - ), - ) + # directories = directories[0:3] - results = list(itertools.chain(*results)) + # epc_pipeline = EPCPipeline( + # directories=directories, + # use_parallel=True, + # epc_data_processor=EPCDataProcessor(run_mode="training"), + # ) - self.number_improvement_ids = set(results) + # epc_pipeline.run() - def extract_improvement_description(self): - with mp.Pool() as pool: - results = list( - tqdm( - pool.imap(self._task_extract_improvement_description, directories), - total=len(directories), - ), - ) - - results = pd.concat(results) - self.improvement_description_df = results.groupby("IMPROVEMENT_ID").sample(1) - - # improvement_description = self._get_descriptions_of_improvements( - # improvement_description_df - # ) - - # self.improvement_descriptions = improvement_description - - def extract_full_improvement_dataset(self): - with mp.Pool() as pool: - results = list( - tqdm( - pool.imap(self._task_extract_full_improvement_dataset, directories), - total=len(directories), - ), - ) - - results_df = pd.concat(results) - - # Only sample one for each improvement as we just want to hit the find my energy website minimally for now - sampled_df = results_df.groupby("IMPROVEMENT_ID").sample(1) - - improvement_description = self._get_descriptions_of_improvements(sampled_df) - - self.improvement_description = improvement_description - - # self.full_improvement_df = sampled_df - - def _task_check_number_of_improvement_ids(self, directory: Path): - """ - Parallel task for checking the number of improvement ids - """ - - recommendations_filepath = directory / "recommendations.csv" - recommendations_df = pd.read_csv(recommendations_filepath) - - recommendations_df = recommendations_df[ - recommendations_df["IMPROVEMENT_ID"].notnull() - ] - recommendations_df["IMPROVEMENT_ID"] = recommendations_df[ - "IMPROVEMENT_ID" - ].astype(int) - - output = list(recommendations_df["IMPROVEMENT_ID"].unique()) - - return output - - def _task_extract_improvement_description(self, directory: Path) -> pd.DataFrame: - """ - Parallel task for checking the number of improvement ids - """ - - recommendations_filepath = directory / "recommendations.csv" - recommendations_df = pd.read_csv(recommendations_filepath) - - recommendations_df = recommendations_df[ - recommendations_df["IMPROVEMENT_ID"].notnull() - ] - recommendations_df["IMPROVEMENT_ID"] = recommendations_df[ - "IMPROVEMENT_ID" - ].astype(int) - - recommendations_df = recommendations_df[ - ~recommendations_df["IMPROVEMENT_SUMMARY_TEXT"].isnull() - ] - - recommendations_df = ( - recommendations_df.sort_values("IMPROVEMENT_ID") - .groupby("IMPROVEMENT_ID") - .head(1) - ) - - return recommendations_df - - def _task_extract_full_improvement_dataset(self, directory: Path) -> pd.DataFrame: - """ - Parallel task for checking the number of improvement ids - Flow will be get the certificates, - Find the latest EPC certificate for the UPRN, - Load the recommendations, - Merge on the LMK_KEY, - """ - - certificates_filepath = directory / "certificates.csv" - certificates_df = pd.read_csv(certificates_filepath) - - certificates_df = ( - certificates_df.sort_values("LODGEMENT_DATE", ascending=False) - .groupby("UPRN") - .head(1) - .reset_index(drop=True) - ) - - recommendations_filepath = directory / "recommendations.csv" - recommendations_df = pd.read_csv(recommendations_filepath) - - recommendations_df = recommendations_df[ - recommendations_df["IMPROVEMENT_ID"].notnull() - ] - recommendations_df["IMPROVEMENT_ID"] = recommendations_df[ - "IMPROVEMENT_ID" - ].astype(int) - - # sampled_df = recommendations_df.groupby("IMPROVEMENT_ID").sample(1) - - output = certificates_df.merge(recommendations_df, on="LMK_KEY", how="inner") - - res = output.groupby("IMPROVEMENT_ID").sample(1) - - return res - - def _get_descriptions_of_improvements( - self, improvement_description_df: pd.DataFrame - ) -> dict[int, str]: - """ - For each row of the improvement descriptions, get the description of the improvement via web scraping - """ - - improvement_description_mapping = {} - - for row in improvement_description_df.itertuples(): - # time.sleep(1) - postcode = row.POSTCODE - postcode_input = postcode.replace(" ", "+") - postcode_search = self.SEARCH_POSTCODE_URL.format( - postcode_input=postcode_input - ) - postcode_response = requests.get(postcode_search, headers=self.HEADERS) - - postcode_res = BeautifulSoup(postcode_response.text, features="html.parser") - address_links_full = postcode_res.findAll( - "a", {"class": "govuk-link", "rel": "nofollow"} - ) - address_links = { - element.text.lstrip().rstrip(): self.BASE_ENERGY_URL + element["href"] - for element in address_links_full - } - - address_links = {k.replace(",", ""): v for k, v in address_links.items()} - - adjusted_address = row.ADDRESS1.replace(",", "") - - address_link = [ - (k, v) for k, v in address_links.items() if adjusted_address in k - ] - - if len(address_link) == 0: - raise ValueError("Address not found") - - if len(address_link) > 1: - split_address_components = adjusted_address.split(" ") - for address in address_link: - if split_address_components[0] in address[0].split(" "): - chosen_epc = address[1] - break - raise ValueError("Multiple addresses found") - else: - chosen_epc = address_link[0][1] - - # time.sleep(1) - address_response = requests.get(chosen_epc, headers=self.HEADERS) - address_res = BeautifulSoup(address_response.text, features="html.parser") - - # epc_certificate = chosen_epc.split('/')[-1] - - # ratings = address_res.find("desc", {"id": "svg-desc"}).text - # current_rating = ratings.split(".")[0] - # potential_rating = ratings.split(".")[1] - - # new_property_df = pd.DataFrame( - # { - # "address": [address_link[0][0]], - # "epc_certificate": [epc_certificate], - # "current_epc_rating": [current_rating.split(" ")[-6]], - # "current_epc_efficiency": [current_rating.split(" ")[-1]], - # "potential_epc_rating": [potential_rating.split(" ")[-6]], - # "potential_epc_efficiency": [potential_rating.split(" ")[-1]], - # "LMK_KEY": [row.LMK_KEY], - # } - # ) - - improvements = address_res.find( - "div", - {"class": "govuk-body printable-area epb-recommended-improvements"}, - ) - - changes = improvements.find_all("h3") - changes_impact = improvements.find_all( - "dl", {"class": "govuk-summary-list"} - ) - element = list(zip(changes, changes_impact))[row.IMPROVEMENT_ITEM - 1] - - improvement_header = element[0].text - - col_name = improvement_header.split(":")[1].lstrip().rstrip() - # cost = element[1].find('dd', {"class": "govuk-summary-list__value"}).text.lstrip().rstrip() - - improvement_description_mapping[row.IMPROVEMENT_ID] = col_name - - return improvement_description_mapping - - -# headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'} -# postcode_input = postcode_input.replace(" ", "+") -# postcode_search = SEARCH_POSTCODE_URL.format(postcode_input=postcode_input) -# postcode_response = requests.get(postcode_search, headers=headers) - -# postcode_res = BeautifulSoup(postcode_response.text) -# address_links_full = postcode_res.findAll('a', {'class': 'govuk-link', 'rel': 'nofollow'}) -# address_links = {element.text.lstrip().rstrip(): BASE_ENERGY_URL + element['href'] for element in address_links_full} -# address_input = st.selectbox('Please select an address:', address_links.keys()) - -# if address_input is None: -# st.stop() - -# chosen_epc = address_links[address_input] - -# st.write("### The EPC Certificate of this property is:") -# epc_certificate = chosen_epc.split('/')[-1] -# st.write("##### " + epc_certificate) - -# address_response = requests.get(chosen_epc, headers=headers) -# address_res = BeautifulSoup(address_response.text) - -# svg = address_res.find("svg", {'class': 'epc-energy-rating-graph'}) -# render_svg(svg) - -# st.write("## Energy rating - current and potential") -# # st.write(address_res.find('desc', {'id': 'svg-desc'}).text) -# # st.image(address_res.find_all('svg', {'class': 'epc-energy-rating-graph'})[0]) -# ratings = address_res.find('desc', {'id': 'svg-desc'}).text - -# st.write('### Current EPC rating') -# current_rating = ratings.split(".")[0] -# st.write("##### " + current_rating) - -# st.write('### Potential EPC rating') -# potential_rating = ratings.split(".")[1] -# st.write("##### " + potential_rating) - -# new_property_df = pd.DataFrame( -# {'address': [address_input], -# 'epc_certificate': [epc_certificate], -# 'current_epc_rating': [current_rating.split(' ')[-6]], -# 'current_epc_efficiency': [current_rating.split(' ')[-1]], -# 'potential_epc_rating': [potential_rating.split(' ')[-6]], -# "potential_epc_efficiency": [potential_rating.split(' ')[-1]]} -# ) - -# st.write('### Changes that can be made:') -# improvements = address_res.find('div', {"class": "govuk-body printable-area epb-recommended-improvements"}) - -# if improvements is None: -# st.write("No changes suggested") -# else: -# changes = improvements.find_all('h3') -# changes_impact = improvements.find_all('dl', {"class": 'govuk-summary-list'}) - -# for element in zip(changes, changes_impact): -# improvement_header = element[0].text -# st.write("#### " + improvement_header) - -# improvement_text = element[1].text -# st.write(improvement_text) - -# col_name = improvement_header.split(":")[1] -# cost = element[1].find('dd', {"class": "govuk-summary-list__value"}).text.lstrip().rstrip() - -# impact = element[1].find('text', {"class": "govuk-!-font-weight-bold"}).text.split(" ") -# impact_num = impact[0] -# impact_cat = impact[1] -# print(cost) -# new_property_df[col_name] = True -# # cost_column = col_name + '-cost' -# # new_property_df.assign(cost_column=cost) -# new_property_df[col_name + '-cost'] = cost -# new_property_df[col_name + '-impact_num'] = impact_num -# new_property_df[col_name + '-impact_cat'] = impact_cat -# st.markdown("---") if __name__ == "__main__": - e = EPCRecommendationsPipeline(directories=directories, use_parallel=True) - e.determine_number_of_improvement_ids() - e.number_improvement_ids - e.extract_improvement_description() - e.improvement_description_df - - e.extract_full_improvement_dataset() - pd.DataFrame.from_dict( - e.improvement_description, orient="index", columns=["improvement_description"] - ).to_markdown("improvement_description.md") - - full_id = pd.DataFrame(e.number_improvement_ids, columns=["IMPROVEMENT_ID"]) - - e.improvement_description_df.merge( - full_id, on="IMPROVEMENT_ID", how="right" - ).to_markdown("improvement_description.md") - - # e. + main() diff --git a/recommendations/WindowsRecommendations.py b/recommendations/WindowsRecommendations.py index b7c2823a..8c0cc493 100644 --- a/recommendations/WindowsRecommendations.py +++ b/recommendations/WindowsRecommendations.py @@ -4,7 +4,7 @@ import numpy as np from backend.Property import Property from recommendations.Costs import Costs -from recommendation_utils import override_costs +from recommendations.recommendation_utils import override_costs class WindowsRecommendations: @@ -14,7 +14,7 @@ class WindowsRecommendations: # glazed "most": 0.33, # If glazing is partial, we assume 50/50 split between glazed and unglazed - "partial": 0.5 + "partial": 0.5, } def __init__(self, property_instance: Property, materials: List): @@ -52,14 +52,20 @@ class WindowsRecommendations: if not number_of_windows: raise ValueError("Number of windows not specified") - if self.property.windows["has_glazing"] & (self.property.windows["glazing_coverage"] == "full"): + if self.property.windows["has_glazing"] & ( + self.property.windows["glazing_coverage"] == "full" + ): return # We scale the number of windows based on the proportion of existing glazing if self.property.data["multi-glaze-proportion"] != "": - n_windows_scalar = 1 - (int(self.property.data["multi-glaze-proportion"]) / 100) + n_windows_scalar = 1 - ( + int(self.property.data["multi-glaze-proportion"]) / 100 + ) else: - n_windows_scalar = self.COVERAGE_MAP.get(self.property.windows["glazing_coverage"], 1) + n_windows_scalar = self.COVERAGE_MAP.get( + self.property.windows["glazing_coverage"], 1 + ) number_of_windows *= n_windows_scalar number_of_windows = np.ceil(number_of_windows) @@ -68,7 +74,7 @@ class WindowsRecommendations: cost_result = self.costs.window_glazing( number_of_windows=number_of_windows, material=self.glazing_material, - is_secondary_glazing=is_secondary_glazing + is_secondary_glazing=is_secondary_glazing, ) already_installed = "windows_glazing" in self.property.already_installed @@ -76,18 +82,26 @@ class WindowsRecommendations: cost_result = override_costs(cost_result) description = "The property already has double glazing installed. No further action is required." else: - glazing_type = "secondary glazing" if is_secondary_glazing else "double glazing" + glazing_type = ( + "secondary glazing" if is_secondary_glazing else "double glazing" + ) if self.property.windows["glazing_coverage"] in ["partial", "most"]: description = f"Install {glazing_type} to the remaining windows" else: description = f"Install {glazing_type} to all windows" if self.property.is_listed: - description += ". Secondary glazing recommended due to listed building status" + description += ( + ". Secondary glazing recommended due to listed building status" + ) elif self.property.is_heritage: - description += ". Secondary glazing recommended due to herigate building status" + description += ( + ". Secondary glazing recommended due to herigate building status" + ) elif self.property.in_conservation_area: - description += ". Secondary glazing recommended due to conservation area status" + description += ( + ". Secondary glazing recommended due to conservation area status" + ) self.recommendation = [ { @@ -100,6 +114,6 @@ class WindowsRecommendations: "sap_points": None, "already_installed": already_installed, **cost_result, - "is_secondary_glazing": is_secondary_glazing + "is_secondary_glazing": is_secondary_glazing, } ] From 1ba73c8115b8ee7024f1f648d42be93090272060 Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Wed, 15 May 2024 09:00:16 +0000 Subject: [PATCH 4/4] ignore env --- .gitignore | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 63884ad7..4a204ac3 100644 --- a/.gitignore +++ b/.gitignore @@ -268,4 +268,6 @@ adhoc adhoc/* etl-router-venv/ -refactor_datasets/ \ No newline at end of file +refactor_datasets/ +etl-router-*/ +.vscode/ \ No newline at end of file