diff --git a/backend/etl/etl_opendatacommunities/README.md b/backend/etl/etl_opendatacommunities/README.md new file mode 100644 index 00000000..bf16ba89 --- /dev/null +++ b/backend/etl/etl_opendatacommunities/README.md @@ -0,0 +1,14 @@ +This website https://epc.opendatacommunities.org/ has closed down on 30th May 2026 + +So we downloaded the data and moved everything to S3 ( s3://retrofit-data-dev/epc_opendatacommunities/master_backup/ ) + +This scripts assumes the following: + +1) You downloaded the master copy, uncompressed it and set it to a path so we can read the csv + + +The script funciton is: + +1) reads csv for all data, seperate each iteration by postcode +2) compresses the csv and save it in the location +2) only gets the postcode data, compresses and uploads to s3 -> location s3://retrofit-data-dev/epc_opendatacommunities//compressed data \ No newline at end of file diff --git a/backend/etl/etl_opendatacommunities/main.py b/backend/etl/etl_opendatacommunities/main.py new file mode 100644 index 00000000..30b4045a --- /dev/null +++ b/backend/etl/etl_opendatacommunities/main.py @@ -0,0 +1,144 @@ +from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait +from io import BytesIO +from pathlib import Path +from typing import Any + +import boto3 +import pandas as pd +from botocore.config import Config +from tqdm import tqdm + +from utils.logger import setup_logger + +logger = setup_logger() + +SRC_ROOT = Path("/workspaces/home/epc_data") +TMP_ROOT = Path("/tmp/epc_postcodes") +S3_BUCKET = "retrofit-data-dev" +S3_PREFIX = "epc_opendatacommunities" + +REC_COLS = { + "IMPROVEMENT_ITEM", + "IMPROVEMENT_SUMMARY_TEXT", + "IMPROVEMENT_DESCR_TEXT", + "IMPROVEMENT_ID", + "IMPROVEMENT_ID_TEXT", + "INDICATIVE_COST", +} + +# This scripts assume you downloading the zip, unzip it, and running it locally + + +def sanitise(pc: pd.Series) -> pd.Series: + return pc.astype("string").str.upper().str.replace(" ", "", regex=False) + + +def shard_la(la_dir: Path) -> None: + certs = pd.read_csv(la_dir / "certificates.csv", low_memory=False) + recs = pd.read_csv(la_dir / "recommendations.csv", low_memory=False) + merged = certs.merge(recs, on="LMK_KEY", how="left") + + merged["POSTCODE_CLEAN"] = sanitise(merged["POSTCODE"]) + before = len(merged) + merged = merged.dropna(subset=["POSTCODE_CLEAN"]) + merged = merged[merged["POSTCODE_CLEAN"] != ""] + dropped = before - len(merged) + if dropped: + logger.warning(f"{la_dir.name}: dropped {dropped} rows with empty postcode") + + for pc, group in merged.groupby("POSTCODE_CLEAN", sort=False): + out = TMP_ROOT / f"{pc}.csv" + group.drop(columns=["POSTCODE_CLEAN"]).to_csv( + out, mode="a", header=not out.exists(), index=False + ) + + +def list_existing_keys(s3: Any) -> set[str]: + existing: set[str] = set() + paginator = s3.get_paginator("list_objects_v2") + pages = paginator.paginate(Bucket=S3_BUCKET, Prefix=f"{S3_PREFIX}/") + for page in tqdm(pages, desc="list s3"): + for obj in page.get("Contents", []): + existing.add(obj["Key"]) + logger.info(f"Found {len(existing)} existing objects under {S3_PREFIX}/") + return existing + + +def upload_postcode(path: Path, s3: Any) -> None: + df = pd.read_csv(path, low_memory=False).drop_duplicates() + + cert_cols = [c for c in df.columns if c not in REC_COLS] + cert_only = df[cert_cols].drop_duplicates() + dupes = cert_only["LMK_KEY"].value_counts() + bad = dupes[dupes > 1] + if not bad.empty: + raise ValueError( + f"Postcode {path.stem}: LMK_KEY appears with conflicting cert data: " + f"{bad.index.tolist()[:5]}" + ) + + buf = BytesIO() + df.to_csv(buf, index=False, compression="gzip") + s3.put_object( + Bucket=S3_BUCKET, + Key=f"{S3_PREFIX}/{path.stem}/data.csv.gz", + Body=buf.getvalue(), + ContentType="text/csv", + ContentEncoding="gzip", + ) + + +def main(): + TMP_ROOT.mkdir(parents=True, exist_ok=True) + la_dirs = sorted( + p for p in SRC_ROOT.iterdir() if p.is_dir() and p.name.startswith("domestic-") + ) + logger.info(f"Sharding {len(la_dirs)} LA folders -> {TMP_ROOT}") + + # for la in tqdm(la_dirs, desc="shard"): + # shard_la(la) + + s3 = boto3.client( + "s3", + config=Config(max_pool_connections=512, retries={"max_attempts": 5, "mode": "standard"}), + ) + pc_files = sorted(TMP_ROOT.glob("*.csv")) + logger.info(f"Found {len(pc_files)} local shards") + + existing = list_existing_keys(s3) + todo = [p for p in pc_files if f"{S3_PREFIX}/{p.stem}/data.csv.gz" not in existing] + skipped = len(pc_files) - len(todo) + logger.info( + f"Uploading {len(todo)} shards (skipping {skipped} already in S3) -> " + f"s3://{S3_BUCKET}/{S3_PREFIX}/" + ) + + workers = 256 + todo_iter = iter(todo) + inflight: dict[Any, Path] = {} + pbar = tqdm(total=len(todo), desc="upload") + with ThreadPoolExecutor(max_workers=workers) as pool: + for _ in range(workers * 2): + pc = next(todo_iter, None) + if pc is None: + break + inflight[pool.submit(upload_postcode, pc, s3)] = pc + + while inflight: + done, _ = wait(inflight.keys(), return_when=FIRST_COMPLETED) + for fut in done: + pc = inflight.pop(fut) + try: + fut.result() + except Exception as e: + logger.error(f"{pc.name}: {e}") + raise + pbar.update(1) + nxt = next(todo_iter, None) + if nxt is not None: + inflight[pool.submit(upload_postcode, nxt, s3)] = nxt + pbar.close() + + +if __name__ == "__main__": + main()