added added historic epc data class with shape

This commit is contained in:
Jun-te Kim 2026-05-08 12:03:35 +00:00
parent 32bf1cc98d
commit a39c3a0772
5 changed files with 27 additions and 40 deletions

View file

@ -15,16 +15,7 @@ logger = setup_logger()
SRC_ROOT = Path("/workspaces/home/epc_data")
TMP_ROOT = Path("/tmp/epc_postcodes")
S3_BUCKET = "retrofit-data-dev"
S3_PREFIX = "epc_opendatacommunities"
REC_COLS = {
"IMPROVEMENT_ITEM",
"IMPROVEMENT_SUMMARY_TEXT",
"IMPROVEMENT_DESCR_TEXT",
"IMPROVEMENT_ID",
"IMPROVEMENT_ID_TEXT",
"INDICATIVE_COST",
}
S3_PREFIX = "historical_epc"
# This scripts assume you downloading the zip, unzip it, and running it locally
@ -35,18 +26,16 @@ def sanitise(pc: pd.Series) -> pd.Series:
def shard_la(la_dir: Path) -> None:
certs = pd.read_csv(la_dir / "certificates.csv", low_memory=False)
recs = pd.read_csv(la_dir / "recommendations.csv", low_memory=False)
merged = certs.merge(recs, on="LMK_KEY", how="left")
merged["POSTCODE_CLEAN"] = sanitise(merged["POSTCODE"])
before = len(merged)
merged = merged.dropna(subset=["POSTCODE_CLEAN"])
merged = merged[merged["POSTCODE_CLEAN"] != ""]
dropped = before - len(merged)
certs["POSTCODE_CLEAN"] = sanitise(certs["POSTCODE"])
before = len(certs)
certs = certs.dropna(subset=["POSTCODE_CLEAN"])
certs = certs[certs["POSTCODE_CLEAN"] != ""]
dropped = before - len(certs)
if dropped:
logger.warning(f"{la_dir.name}: dropped {dropped} rows with empty postcode")
for pc, group in merged.groupby("POSTCODE_CLEAN", sort=False):
for pc, group in certs.groupby("POSTCODE_CLEAN", sort=False):
out = TMP_ROOT / f"{pc}.csv"
group.drop(columns=["POSTCODE_CLEAN"]).to_csv(
out, mode="a", header=not out.exists(), index=False
@ -67,9 +56,7 @@ def list_existing_keys(s3: Any) -> set[str]:
def upload_postcode(path: Path, s3: Any) -> None:
df = pd.read_csv(path, low_memory=False).drop_duplicates()
cert_cols = [c for c in df.columns if c not in REC_COLS]
cert_only = df[cert_cols].drop_duplicates()
dupes = cert_only["LMK_KEY"].value_counts()
dupes = df["LMK_KEY"].value_counts()
bad = dupes[dupes > 1]
if not bad.empty:
raise ValueError(
@ -95,12 +82,14 @@ def main():
)
logger.info(f"Sharding {len(la_dirs)} LA folders -> {TMP_ROOT}")
# for la in tqdm(la_dirs, desc="shard"):
# shard_la(la)
for la in tqdm(la_dirs, desc="shard"):
shard_la(la)
s3 = boto3.client(
"s3",
config=Config(max_pool_connections=512, retries={"max_attempts": 5, "mode": "standard"}),
config=Config(
max_pool_connections=512, retries={"max_attempts": 5, "mode": "standard"}
),
)
pc_files = sorted(TMP_ROOT.glob("*.csv"))
logger.info(f"Found {len(pc_files)} local shards")

View file

@ -0,0 +1,10 @@
from dataclasses import dataclass
@dataclass
class HistoricEpc:
address1: str
address2: str
address3: str
postcode: str
uprn: str

View file

@ -96,9 +96,3 @@ class HistoricEpc:
uprn: str
uprn_source: str
report_type: str
improvement_item: str
improvement_summary_text: str
improvement_descr_text: str
improvement_id: str
improvement_id_text: str
indicative_cost: str

View file

@ -47,9 +47,3 @@ class TestHistoricEpcLoading:
def test_report_type(self, epc: HistoricEpc) -> None:
assert epc.report_type == "100"
def test_improvement_summary_text(self, epc: HistoricEpc) -> None:
assert epc.improvement_summary_text == "Increase loft insulation to 270 mm"
def test_indicative_cost(self, epc: HistoricEpc) -> None:
assert epc.indicative_cost == "£100 - £350"

View file

@ -26,13 +26,13 @@ from backend.app.db.functions.materials_functions import get_materials
from collections import defaultdict
from sqlalchemy import func
PORTFOLIO_ID = 711
SCENARIOS = [1233]
PORTFOLIO_ID = 632
SCENARIOS = [1144]
scenario_names = {
1233: "Reach EPC C",
1144: "EPC C",
}
project_name = "Novus"
project_name = "Calico Refresh"
def get_data(portfolio_id, scenario_ids):