mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
202 lines
5.5 KiB
Python
202 lines
5.5 KiB
Python
from bs4 import BeautifulSoup
|
|
import pandas as pd
|
|
import time
|
|
from stealth_requests import StealthSession
|
|
import random
|
|
import os
|
|
from multiprocessing import Pool
|
|
from tqdm import tqdm
|
|
import re
|
|
import json
|
|
|
|
ENGINES = ["safari", "chrome"]
|
|
CACHE_DIR = "zoopla_cache"
|
|
os.makedirs(CACHE_DIR, exist_ok=True)
|
|
|
|
|
|
def random_delay():
|
|
time.sleep(random.uniform(0.5, 2))
|
|
|
|
|
|
def extract_embedded_json(text):
|
|
match = re.search(
|
|
r'"attributes"\s*:\s*\{.*?\}\s*,.*?"historicSales".*?\]',
|
|
text,
|
|
re.DOTALL
|
|
)
|
|
if match:
|
|
snippet = "{" + match.group(0) + "}"
|
|
snippet = re.sub(r"\\u0022", '"', snippet)
|
|
snippet = re.sub(r",(\s*[}\]])", r"\1", snippet)
|
|
try:
|
|
return json.loads(snippet)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
result = {}
|
|
for key in [
|
|
"attributes", "energy", "rentEstimate",
|
|
"saleEstimate", "saleHistory", "historicSales"
|
|
]:
|
|
key_match = re.search(
|
|
rf'"{key}"\s*:\s*(\{{.*?\}}|\[.*?\])',
|
|
text,
|
|
re.DOTALL
|
|
)
|
|
if key_match:
|
|
try:
|
|
result[key] = json.loads(key_match.group(1))
|
|
except Exception:
|
|
pass
|
|
return result
|
|
|
|
|
|
def scrape_all_estimates(session, url):
|
|
resp = session.get(url, impersonate=random.choice(ENGINES))
|
|
html = resp.text
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
estimates = soup.find_all("div", {"data-testid": "sale-estimate"})
|
|
data = extract_embedded_json(html)
|
|
|
|
return {
|
|
"estimates": estimates,
|
|
"is_blocked": len(estimates) == 0,
|
|
"response_html": html,
|
|
"attributes": data.get("attributes", {}),
|
|
"rentEstimate": data.get("rentEstimate", {}),
|
|
"historicSales": data.get("historicSales", []),
|
|
}
|
|
|
|
|
|
def extract_estimates(estimates):
|
|
est = estimates[0]
|
|
low = est.find("span", {"data-testid": "low-estimate-blurred"}).text
|
|
mid = est.find("p", {"data-testid": "estimate-blurred"}).text
|
|
high = est.find("span", {"data-testid": "high-estimate-blurred"}).text
|
|
return low, mid, high
|
|
|
|
|
|
def cache_path_for_url(url):
|
|
uprn = url.split("/")[-2]
|
|
return os.path.join(CACHE_DIR, f"{uprn}.html")
|
|
|
|
|
|
def parse_cached_html(url, html):
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
estimates = soup.find_all("div", {"data-testid": "sale-estimate"})
|
|
data = extract_embedded_json(html)
|
|
history = data.get("historicSales") or [{}]
|
|
|
|
if not estimates:
|
|
return None
|
|
|
|
low, mid, high = extract_estimates(estimates)
|
|
|
|
return {
|
|
"URL": url,
|
|
"Low Estimate": low,
|
|
"Middle Estimate": mid,
|
|
"High Estimate": high,
|
|
**data.get("attributes", {}),
|
|
**data.get("rentEstimate", {}),
|
|
**history[0],
|
|
}
|
|
|
|
|
|
def parallel_task(url):
|
|
cache_path = cache_path_for_url(url)
|
|
|
|
if os.path.exists(cache_path):
|
|
with open(cache_path, "r", encoding="utf-8") as f:
|
|
html = f.read()
|
|
cached = parse_cached_html(url, html)
|
|
if cached:
|
|
return cached
|
|
|
|
with StealthSession() as session:
|
|
for attempt in range(5):
|
|
output = scrape_all_estimates(session, url)
|
|
|
|
if not output["is_blocked"] and output["estimates"]:
|
|
html = output.get("response_html")
|
|
if html:
|
|
with open(cache_path, "w", encoding="utf-8") as f:
|
|
f.write(html)
|
|
|
|
history = output.get("historicSales") or [{}]
|
|
low, mid, high = extract_estimates(output["estimates"])
|
|
|
|
return {
|
|
"URL": url,
|
|
"Low Estimate": low,
|
|
"Middle Estimate": mid,
|
|
"High Estimate": high,
|
|
**output.get("attributes", {}),
|
|
**output.get("rentEstimate", {}),
|
|
**history[0],
|
|
}
|
|
|
|
random_delay()
|
|
|
|
return {
|
|
"URL": url,
|
|
"Low Estimate": None,
|
|
"Middle Estimate": None,
|
|
"High Estimate": None,
|
|
}
|
|
|
|
|
|
def parse_price(p):
|
|
if not p:
|
|
return None
|
|
|
|
p = p.replace("£", "").strip().lower()
|
|
if p.endswith("k"):
|
|
return float(p[:-1]) * 1_000
|
|
if p.endswith("m"):
|
|
return float(p[:-1]) * 1_000_000
|
|
|
|
try:
|
|
return float(p.replace(",", ""))
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asset_list = pd.read_excel(
|
|
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting "
|
|
"Project/modelling_sample.xlsx",
|
|
sheet_name="Standardised Asset List"
|
|
)
|
|
|
|
asset_list = asset_list[~pd.isnull(asset_list["epc_os_uprn"])]
|
|
asset_list = asset_list.drop_duplicates("epc_os_uprn")
|
|
asset_list["epc_os_uprn"] = asset_list["epc_os_uprn"].astype(int).astype(str)
|
|
|
|
uprns = asset_list["epc_os_uprn"].tolist()
|
|
urls = [f"https://www.zoopla.co.uk/property/uprn/{uprn}/" for uprn in uprns]
|
|
|
|
with Pool(processes=2) as pool:
|
|
estimates_list = list(
|
|
tqdm(pool.imap(parallel_task, urls), total=len(urls))
|
|
)
|
|
|
|
df = pd.DataFrame(estimates_list)
|
|
df["uprn"] = df["URL"].str.extract(r"uprn/(\d+)/")
|
|
df["valuation"] = df["Middle Estimate"].apply(parse_price)
|
|
|
|
df.to_csv("zoopla_estimates.csv", index=False)
|
|
|
|
merged = asset_list.merge(
|
|
df[["uprn", "valuation"]],
|
|
left_on="epc_os_uprn",
|
|
right_on="uprn",
|
|
how="left"
|
|
)
|
|
|
|
merged.to_excel(
|
|
"20251029 AL Portfolio - Standardised - with valuations.xlsx",
|
|
index=False
|
|
)
|
|
|
|
print("Done. Results saved.")
|