start work on pulling out all recommendations in relation to properties

This commit is contained in:
Michael Duong 2024-04-26 15:34:36 +01:00
parent 2f45ed8955
commit b8622457bd
2 changed files with 331 additions and 0 deletions

View file

@ -0,0 +1,327 @@
# Pipeline to combined recommendations and certificates data together
import pandas as pd
from pathlib import Path
from tqdm import tqdm
import multiprocessing as mp
import itertools
import requests
from bs4 import BeautifulSoup
import time
DATA_DIRECTORY = (
Path(__file__).parent.parent / "epc" / "local_data" / "all-domestic-certificates"
)
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
# Start with one folder in the local_data directory
class EPCRecommendationsPipeline:
SEARCH_POSTCODE_URL = "https://find-energy-certificate.service.gov.uk/find-a-certificate/search-by-postcode?postcode={postcode_input}"
BASE_ENERGY_URL = "https://find-energy-certificate.service.gov.uk"
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36"
}
def __init__(self, directories: list, use_parallel: bool = True):
self.directories = directories
self.use_parallel = use_parallel
def determine_number_of_improvement_ids(self):
with mp.Pool() as pool:
results = list(
tqdm(
pool.imap(self._task_check_number_of_improvement_ids, directories),
total=len(directories),
),
)
results = list(itertools.chain(*results))
self.number_improvement_ids = set(results)
def extract_improvement_description(self):
with mp.Pool() as pool:
results = list(
tqdm(
pool.imap(self._task_extract_improvement_description, directories),
total=len(directories),
),
)
results = pd.concat(results)
self.improvement_description_df = results.groupby("IMPROVEMENT_ID").sample(1)
# improvement_description = self._get_descriptions_of_improvements(
# improvement_description_df
# )
# self.improvement_descriptions = improvement_description
def _task_check_number_of_improvement_ids(self, directory: Path):
"""
Parallel task for checking the number of improvement ids
"""
recommendations_filepath = directory / "recommendations.csv"
recommendations_df = pd.read_csv(recommendations_filepath)
recommendations_df = recommendations_df[
recommendations_df["IMPROVEMENT_ID"].notnull()
]
recommendations_df["IMPROVEMENT_ID"] = recommendations_df[
"IMPROVEMENT_ID"
].astype(int)
output = list(recommendations_df["IMPROVEMENT_ID"].unique())
return output
def _task_extract_improvement_description(self, directory: Path) -> pd.DataFrame:
"""
Parallel task for checking the number of improvement ids
Flow will be get the certificates,
Find the latest EPC certificate for the UPRN,
Load the recommendations,
Merge on the LMK_KEY,
"""
recommendations_filepath = directory / "recommendations.csv"
recommendations_df = pd.read_csv(recommendations_filepath)
recommendations_df = recommendations_df[
recommendations_df["IMPROVEMENT_ID"].notnull()
]
recommendations_df["IMPROVEMENT_ID"] = recommendations_df[
"IMPROVEMENT_ID"
].astype(int)
recommendations_df = recommendations_df[
~recommendations_df["IMPROVEMENT_SUMMARY_TEXT"].isnull()
]
recommendations_df = (
recommendations_df.sort_values("IMPROVEMENT_ID")
.groupby("IMPROVEMENT_ID")
.head(1)
)
return recommendations_df
def _task_extract_full_improvement_dataset(self, directory: Path) -> pd.DataFrame:
"""
Parallel task for checking the number of improvement ids
Flow will be get the certificates,
Find the latest EPC certificate for the UPRN,
Load the recommendations,
Merge on the LMK_KEY,
"""
certificates_filepath = directory / "certificates.csv"
certificates_df = pd.read_csv(certificates_filepath)
certificates_df = (
certificates_df.sort_values("LODGEMENT_DATE", ascending=False)
.groupby("UPRN")
.head(1)
.reset_index(drop=True)
)
recommendations_filepath = directory / "recommendations.csv"
recommendations_df = pd.read_csv(recommendations_filepath)
recommendations_df = recommendations_df[
recommendations_df["IMPROVEMENT_ID"].notnull()
]
recommendations_df["IMPROVEMENT_ID"] = recommendations_df[
"IMPROVEMENT_ID"
].astype(int)
# sampled_df = recommendations_df.groupby("IMPROVEMENT_ID").sample(1)
output = certificates_df.merge(recommendations_df, on="LMK_KEY", how="inner")
return output
def _get_descriptions_of_improvements(
self, improvement_description_df: pd.DataFrame
) -> dict[int, str]:
"""
For each row of the improvement descriptions, get the description of the improvement via web scraping
"""
improvement_description_mapping = {}
for row in improvement_description_df.itertuples():
# time.sleep(1)
postcode = row.POSTCODE
postcode_input = postcode.replace(" ", "+")
postcode_search = self.SEARCH_POSTCODE_URL.format(
postcode_input=postcode_input
)
postcode_response = requests.get(postcode_search, headers=self.HEADERS)
postcode_res = BeautifulSoup(postcode_response.text, features="html.parser")
address_links_full = postcode_res.findAll(
"a", {"class": "govuk-link", "rel": "nofollow"}
)
address_links = {
element.text.lstrip().rstrip(): self.BASE_ENERGY_URL + element["href"]
for element in address_links_full
}
address_links = {k.replace(",", ""): v for k, v in address_links.items()}
adjusted_address = row.ADDRESS1.replace(",", "")
address_link = [
(k, v) for k, v in address_links.items() if adjusted_address in k
]
if len(address_link) == 0:
raise ValueError("Address not found")
if len(address_link) > 1:
split_address_components = adjusted_address.split(" ")
for address in address_link:
if split_address_components[0] in address[0].split(" "):
chosen_epc = address[1]
break
raise ValueError("Multiple addresses found")
else:
chosen_epc = address_link[0][1]
# time.sleep(1)
address_response = requests.get(chosen_epc, headers=self.HEADERS)
address_res = BeautifulSoup(address_response.text, features="html.parser")
# epc_certificate = chosen_epc.split('/')[-1]
# ratings = address_res.find("desc", {"id": "svg-desc"}).text
# current_rating = ratings.split(".")[0]
# potential_rating = ratings.split(".")[1]
# new_property_df = pd.DataFrame(
# {
# "address": [address_link[0][0]],
# "epc_certificate": [epc_certificate],
# "current_epc_rating": [current_rating.split(" ")[-6]],
# "current_epc_efficiency": [current_rating.split(" ")[-1]],
# "potential_epc_rating": [potential_rating.split(" ")[-6]],
# "potential_epc_efficiency": [potential_rating.split(" ")[-1]],
# "LMK_KEY": [row.LMK_KEY],
# }
# )
improvements = address_res.find(
"div",
{"class": "govuk-body printable-area epb-recommended-improvements"},
)
changes = improvements.find_all("h3")
changes_impact = improvements.find_all(
"dl", {"class": "govuk-summary-list"}
)
element = list(zip(changes, changes_impact))[row.IMPROVEMENT_ITEM - 1]
improvement_header = element[0].text
col_name = improvement_header.split(":")[1].lstrip().rstrip()
# cost = element[1].find('dd', {"class": "govuk-summary-list__value"}).text.lstrip().rstrip()
improvement_description_mapping[row.IMPROVEMENT_ID] = col_name
# headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36'}
# postcode_input = postcode_input.replace(" ", "+")
# postcode_search = SEARCH_POSTCODE_URL.format(postcode_input=postcode_input)
# postcode_response = requests.get(postcode_search, headers=headers)
# postcode_res = BeautifulSoup(postcode_response.text)
# address_links_full = postcode_res.findAll('a', {'class': 'govuk-link', 'rel': 'nofollow'})
# address_links = {element.text.lstrip().rstrip(): BASE_ENERGY_URL + element['href'] for element in address_links_full}
# address_input = st.selectbox('Please select an address:', address_links.keys())
# if address_input is None:
# st.stop()
# chosen_epc = address_links[address_input]
# st.write("### The EPC Certificate of this property is:")
# epc_certificate = chosen_epc.split('/')[-1]
# st.write("##### " + epc_certificate)
# address_response = requests.get(chosen_epc, headers=headers)
# address_res = BeautifulSoup(address_response.text)
# svg = address_res.find("svg", {'class': 'epc-energy-rating-graph'})
# render_svg(svg)
# st.write("## Energy rating - current and potential")
# # st.write(address_res.find('desc', {'id': 'svg-desc'}).text)
# # st.image(address_res.find_all('svg', {'class': 'epc-energy-rating-graph'})[0])
# ratings = address_res.find('desc', {'id': 'svg-desc'}).text
# st.write('### Current EPC rating')
# current_rating = ratings.split(".")[0]
# st.write("##### " + current_rating)
# st.write('### Potential EPC rating')
# potential_rating = ratings.split(".")[1]
# st.write("##### " + potential_rating)
# new_property_df = pd.DataFrame(
# {'address': [address_input],
# 'epc_certificate': [epc_certificate],
# 'current_epc_rating': [current_rating.split(' ')[-6]],
# 'current_epc_efficiency': [current_rating.split(' ')[-1]],
# 'potential_epc_rating': [potential_rating.split(' ')[-6]],
# "potential_epc_efficiency": [potential_rating.split(' ')[-1]]}
# )
# st.write('### Changes that can be made:')
# improvements = address_res.find('div', {"class": "govuk-body printable-area epb-recommended-improvements"})
# if improvements is None:
# st.write("No changes suggested")
# else:
# changes = improvements.find_all('h3')
# changes_impact = improvements.find_all('dl', {"class": 'govuk-summary-list'})
# for element in zip(changes, changes_impact):
# improvement_header = element[0].text
# st.write("#### " + improvement_header)
# improvement_text = element[1].text
# st.write(improvement_text)
# col_name = improvement_header.split(":")[1]
# cost = element[1].find('dd', {"class": "govuk-summary-list__value"}).text.lstrip().rstrip()
# impact = element[1].find('text', {"class": "govuk-!-font-weight-bold"}).text.split(" ")
# impact_num = impact[0]
# impact_cat = impact[1]
# print(cost)
# new_property_df[col_name] = True
# # cost_column = col_name + '-cost'
# # new_property_df.assign(cost_column=cost)
# new_property_df[col_name + '-cost'] = cost
# new_property_df[col_name + '-impact_num'] = impact_num
# new_property_df[col_name + '-impact_cat'] = impact_cat
# st.markdown("---")
if __name__ == "__main__":
e = EPCRecommendationsPipeline(directories=directories, use_parallel=True)
e.determine_number_of_improvement_ids()
e.number_improvement_ids
e.extract_improvement_description()
e.improvement_description_df
full_id = pd.DataFrame(e.number_improvement_ids, columns=["IMPROVEMENT_ID"])
e.improvement_description_df.merge(
full_id, on="IMPROVEMENT_ID", how="right"
).to_markdown("improvement_description.md")
# e.

View file

@ -0,0 +1,4 @@
beautifulsoup4==4.12.3
requests==2.31.0
pandas==2.2.2
tqdm==4.66.2