From b8622457bd3dcd31cfb29f34b99400e823ddd6c6 Mon Sep 17 00:00:00 2001 From: Michael Duong Date: Fri, 26 Apr 2024 15:34:36 +0100 Subject: [PATCH] start work on pulling out all recommendations in relation to properties --- etl/epc_recommendations/Pipeline.py | 327 +++++++++++++++++++++++ etl/epc_recommendations/requirements.txt | 4 + 2 files changed, 331 insertions(+) create mode 100644 etl/epc_recommendations/Pipeline.py create mode 100644 etl/epc_recommendations/requirements.txt diff --git a/etl/epc_recommendations/Pipeline.py b/etl/epc_recommendations/Pipeline.py new file mode 100644 index 00000000..a6de78e5 --- /dev/null +++ b/etl/epc_recommendations/Pipeline.py @@ -0,0 +1,327 @@ +# Pipeline to combined recommendations and certificates data together + +import pandas as pd +from pathlib import Path +from tqdm import tqdm +import multiprocessing as mp +import itertools +import requests +from bs4 import BeautifulSoup +import time + +DATA_DIRECTORY = ( + Path(__file__).parent.parent / "epc" / "local_data" / "all-domestic-certificates" +) +directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] +# Start with one folder in the local_data directory + + +class EPCRecommendationsPipeline: + + SEARCH_POSTCODE_URL = "https://find-energy-certificate.service.gov.uk/find-a-certificate/search-by-postcode?postcode={postcode_input}" + BASE_ENERGY_URL = "https://find-energy-certificate.service.gov.uk" + HEADERS = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36" + } + + def __init__(self, directories: list, use_parallel: bool = True): + self.directories = directories + self.use_parallel = use_parallel + + def determine_number_of_improvement_ids(self): + with mp.Pool() as pool: + results = list( + tqdm( + pool.imap(self._task_check_number_of_improvement_ids, directories), + total=len(directories), + ), + ) + + results = list(itertools.chain(*results)) + + self.number_improvement_ids = set(results) + + def extract_improvement_description(self): + with mp.Pool() as pool: + results = list( + tqdm( + pool.imap(self._task_extract_improvement_description, directories), + total=len(directories), + ), + ) + + results = pd.concat(results) + self.improvement_description_df = results.groupby("IMPROVEMENT_ID").sample(1) + + # improvement_description = self._get_descriptions_of_improvements( + # improvement_description_df + # ) + + # self.improvement_descriptions = improvement_description + + def _task_check_number_of_improvement_ids(self, directory: Path): + """ + Parallel task for checking the number of improvement ids + """ + + recommendations_filepath = directory / "recommendations.csv" + recommendations_df = pd.read_csv(recommendations_filepath) + + recommendations_df = recommendations_df[ + recommendations_df["IMPROVEMENT_ID"].notnull() + ] + recommendations_df["IMPROVEMENT_ID"] = recommendations_df[ + "IMPROVEMENT_ID" + ].astype(int) + + output = list(recommendations_df["IMPROVEMENT_ID"].unique()) + + return output + + def _task_extract_improvement_description(self, directory: Path) -> pd.DataFrame: + """ + Parallel task for checking the number of improvement ids + Flow will be get the certificates, + Find the latest EPC certificate for the UPRN, + Load the recommendations, + Merge on the LMK_KEY, + """ + + recommendations_filepath = directory / "recommendations.csv" + recommendations_df = pd.read_csv(recommendations_filepath) + + recommendations_df = recommendations_df[ + recommendations_df["IMPROVEMENT_ID"].notnull() + ] + recommendations_df["IMPROVEMENT_ID"] = recommendations_df[ + "IMPROVEMENT_ID" + ].astype(int) + + recommendations_df = recommendations_df[ + ~recommendations_df["IMPROVEMENT_SUMMARY_TEXT"].isnull() + ] + + recommendations_df = ( + recommendations_df.sort_values("IMPROVEMENT_ID") + .groupby("IMPROVEMENT_ID") + .head(1) + ) + + return recommendations_df + + def _task_extract_full_improvement_dataset(self, directory: Path) -> pd.DataFrame: + """ + Parallel task for checking the number of improvement ids + Flow will be get the certificates, + Find the latest EPC certificate for the UPRN, + Load the recommendations, + Merge on the LMK_KEY, + """ + + certificates_filepath = directory / "certificates.csv" + certificates_df = pd.read_csv(certificates_filepath) + + certificates_df = ( + certificates_df.sort_values("LODGEMENT_DATE", ascending=False) + .groupby("UPRN") + .head(1) + .reset_index(drop=True) + ) + + recommendations_filepath = directory / "recommendations.csv" + recommendations_df = pd.read_csv(recommendations_filepath) + + recommendations_df = recommendations_df[ + recommendations_df["IMPROVEMENT_ID"].notnull() + ] + recommendations_df["IMPROVEMENT_ID"] = recommendations_df[ + "IMPROVEMENT_ID" + ].astype(int) + + # sampled_df = recommendations_df.groupby("IMPROVEMENT_ID").sample(1) + + output = certificates_df.merge(recommendations_df, on="LMK_KEY", how="inner") + + return output + + def _get_descriptions_of_improvements( + self, improvement_description_df: pd.DataFrame + ) -> dict[int, str]: + """ + For each row of the improvement descriptions, get the description of the improvement via web scraping + """ + + improvement_description_mapping = {} + + for row in improvement_description_df.itertuples(): + # time.sleep(1) + postcode = row.POSTCODE + postcode_input = postcode.replace(" ", "+") + postcode_search = self.SEARCH_POSTCODE_URL.format( + postcode_input=postcode_input + ) + postcode_response = requests.get(postcode_search, headers=self.HEADERS) + + postcode_res = BeautifulSoup(postcode_response.text, features="html.parser") + address_links_full = postcode_res.findAll( + "a", {"class": "govuk-link", "rel": "nofollow"} + ) + address_links = { + element.text.lstrip().rstrip(): self.BASE_ENERGY_URL + element["href"] + for element in address_links_full + } + + address_links = {k.replace(",", ""): v for k, v in address_links.items()} + + adjusted_address = row.ADDRESS1.replace(",", "") + + address_link = [ + (k, v) for k, v in address_links.items() if adjusted_address in k + ] + + if len(address_link) == 0: + raise ValueError("Address not found") + + if len(address_link) > 1: + split_address_components = adjusted_address.split(" ") + for address in address_link: + if split_address_components[0] in address[0].split(" "): + chosen_epc = address[1] + break + raise ValueError("Multiple addresses found") + else: + chosen_epc = address_link[0][1] + + # time.sleep(1) + address_response = requests.get(chosen_epc, headers=self.HEADERS) + address_res = BeautifulSoup(address_response.text, features="html.parser") + + # epc_certificate = chosen_epc.split('/')[-1] + + # ratings = address_res.find("desc", {"id": "svg-desc"}).text + # current_rating = ratings.split(".")[0] + # potential_rating = ratings.split(".")[1] + + # new_property_df = pd.DataFrame( + # { + # "address": [address_link[0][0]], + # "epc_certificate": [epc_certificate], + # "current_epc_rating": [current_rating.split(" ")[-6]], + # "current_epc_efficiency": [current_rating.split(" ")[-1]], + # "potential_epc_rating": [potential_rating.split(" ")[-6]], + # "potential_epc_efficiency": [potential_rating.split(" ")[-1]], + # "LMK_KEY": [row.LMK_KEY], + # } + # ) + + improvements = address_res.find( + "div", + {"class": "govuk-body printable-area epb-recommended-improvements"}, + ) + + changes = improvements.find_all("h3") + changes_impact = improvements.find_all( + "dl", {"class": "govuk-summary-list"} + ) + element = list(zip(changes, changes_impact))[row.IMPROVEMENT_ITEM - 1] + + improvement_header = element[0].text + + col_name = improvement_header.split(":")[1].lstrip().rstrip() + # cost = element[1].find('dd', {"class": "govuk-summary-list__value"}).text.lstrip().rstrip() + + improvement_description_mapping[row.IMPROVEMENT_ID] = col_name + + +# headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'} +# postcode_input = postcode_input.replace(" ", "+") +# postcode_search = SEARCH_POSTCODE_URL.format(postcode_input=postcode_input) +# postcode_response = requests.get(postcode_search, headers=headers) + +# postcode_res = BeautifulSoup(postcode_response.text) +# address_links_full = postcode_res.findAll('a', {'class': 'govuk-link', 'rel': 'nofollow'}) +# address_links = {element.text.lstrip().rstrip(): BASE_ENERGY_URL + element['href'] for element in address_links_full} +# address_input = st.selectbox('Please select an address:', address_links.keys()) + +# if address_input is None: +# st.stop() + +# chosen_epc = address_links[address_input] + +# st.write("### The EPC Certificate of this property is:") +# epc_certificate = chosen_epc.split('/')[-1] +# st.write("##### " + epc_certificate) + +# address_response = requests.get(chosen_epc, headers=headers) +# address_res = BeautifulSoup(address_response.text) + +# svg = address_res.find("svg", {'class': 'epc-energy-rating-graph'}) +# render_svg(svg) + +# st.write("## Energy rating - current and potential") +# # st.write(address_res.find('desc', {'id': 'svg-desc'}).text) +# # st.image(address_res.find_all('svg', {'class': 'epc-energy-rating-graph'})[0]) +# ratings = address_res.find('desc', {'id': 'svg-desc'}).text + +# st.write('### Current EPC rating') +# current_rating = ratings.split(".")[0] +# st.write("##### " + current_rating) + +# st.write('### Potential EPC rating') +# potential_rating = ratings.split(".")[1] +# st.write("##### " + potential_rating) + +# new_property_df = pd.DataFrame( +# {'address': [address_input], +# 'epc_certificate': [epc_certificate], +# 'current_epc_rating': [current_rating.split(' ')[-6]], +# 'current_epc_efficiency': [current_rating.split(' ')[-1]], +# 'potential_epc_rating': [potential_rating.split(' ')[-6]], +# "potential_epc_efficiency": [potential_rating.split(' ')[-1]]} +# ) + +# st.write('### Changes that can be made:') +# improvements = address_res.find('div', {"class": "govuk-body printable-area epb-recommended-improvements"}) + +# if improvements is None: +# st.write("No changes suggested") +# else: +# changes = improvements.find_all('h3') +# changes_impact = improvements.find_all('dl', {"class": 'govuk-summary-list'}) + +# for element in zip(changes, changes_impact): +# improvement_header = element[0].text +# st.write("#### " + improvement_header) + +# improvement_text = element[1].text +# st.write(improvement_text) + +# col_name = improvement_header.split(":")[1] +# cost = element[1].find('dd', {"class": "govuk-summary-list__value"}).text.lstrip().rstrip() + +# impact = element[1].find('text', {"class": "govuk-!-font-weight-bold"}).text.split(" ") +# impact_num = impact[0] +# impact_cat = impact[1] +# print(cost) +# new_property_df[col_name] = True +# # cost_column = col_name + '-cost' +# # new_property_df.assign(cost_column=cost) +# new_property_df[col_name + '-cost'] = cost +# new_property_df[col_name + '-impact_num'] = impact_num +# new_property_df[col_name + '-impact_cat'] = impact_cat +# st.markdown("---") + +if __name__ == "__main__": + e = EPCRecommendationsPipeline(directories=directories, use_parallel=True) + e.determine_number_of_improvement_ids() + e.number_improvement_ids + e.extract_improvement_description() + e.improvement_description_df + + full_id = pd.DataFrame(e.number_improvement_ids, columns=["IMPROVEMENT_ID"]) + + e.improvement_description_df.merge( + full_id, on="IMPROVEMENT_ID", how="right" + ).to_markdown("improvement_description.md") + + # e. diff --git a/etl/epc_recommendations/requirements.txt b/etl/epc_recommendations/requirements.txt new file mode 100644 index 00000000..44d37f07 --- /dev/null +++ b/etl/epc_recommendations/requirements.txt @@ -0,0 +1,4 @@ +beautifulsoup4==4.12.3 +requests==2.31.0 +pandas==2.2.2 +tqdm==4.66.2 \ No newline at end of file