added walthamforest etl process

This commit is contained in:
Jun-te Kim 2025-09-23 10:41:53 +00:00
parent 15465eb6e0
commit 6c214c9f89
4 changed files with 891 additions and 15 deletions

View file

@ -0,0 +1,60 @@
"""added more enums
Revision ID: 4c67501b7451
Revises: ac8dba8cef50
Create Date: 2025-09-23 10:22:20.648664
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "4c67501b7451"
down_revision: Union[str, None] = "ac8dba8cef50"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
ENUM_NAME = "reporttype"
# Values that were already present BEFORE this migration
OLD_VALUES = (
"QUIDOS_PRESITE_NOTE",
"CHARTED_SURVEYOR_REPORT",
"ENERGY_PERFORMANCE_REPORT",
"U_VALUE_CALCULATOR_REPORT",
"OVERWRITING_U_VALUE_DECLARATION_FORM",
"OSMOSIS_CONDITION_PAS_2035_REPORT",
"DOMNA_CONDITION_PAS_2035_REPORT",
)
# Values we are ADDING in this migration
NEW_VALUES = (
"DECENT_HOMES_RAW_DATA",
"DECENT_HOMES_SUMMARY",
"DECENT_HOMES_PROPERTY_META",
)
def upgrade() -> None:
for v in NEW_VALUES:
op.execute(f"ALTER TYPE {ENUM_NAME} ADD VALUE IF NOT EXISTS '{v}'")
def downgrade() -> None:
# 1) Create a replacement type with ONLY the old values
old_vals = ", ".join(f"'{v}'" for v in OLD_VALUES)
op.execute(f"CREATE TYPE {ENUM_NAME}_old AS ENUM ({old_vals})")
# 2) Move columns to the temporary type
op.execute(
f"ALTER TABLE documents ALTER COLUMN document_type TYPE {ENUM_NAME}_old "
f"USING document_type::text::{ENUM_NAME}_old"
)
op.execute(
f"ALTER TABLE uploaded_files ALTER COLUMN doc_type TYPE {ENUM_NAME}_old "
f"USING doc_type::text::{ENUM_NAME}_old"
)
# 3) Drop original type and rename the temp back
op.execute(f"DROP TYPE {ENUM_NAME}")
op.execute(f"ALTER TYPE {ENUM_NAME}_old RENAME TO {ENUM_NAME}")

View file

@ -0,0 +1,38 @@
"""added more report type
Revision ID: ac8dba8cef50
Revises: a8cc4a5fccb6
Create Date: 2025-09-23 10:14:54.461633
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'ac8dba8cef50'
down_revision: Union[str, None] = 'a8cc4a5fccb6'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('uploaded_files', 'id',
existing_type=sa.UUID(),
server_default=None,
existing_nullable=False)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('uploaded_files', 'id',
existing_type=sa.UUID(),
server_default=sa.text('gen_random_uuid()'),
existing_nullable=False)
# ### end Alembic commands ###

View file

@ -222,29 +222,45 @@ def generate_file_uri(UPRN):
file_uri = f"https://retrofit-energy-assessments-dev.s3.eu-west-2.amazonaws.com/documents/{UPRN}/"
return file_uri
def create_uploaded_file_entry(
def create_or_update_uploaded_file_entry(
db_session,
uprn,
uprn: str,
doc_type: ReportType,
json_uri: str,
s3_file_uri:str
s3_file_uri: str
):
"""
Create a new entry in uploaded_files with s3_json_uri and timestamp.
Create or update an entry in uploaded_files.
- If a record with the same (uprn, doc_type) exists, update it.
- Otherwise, insert a new record.
Commits, refreshes, and returns the ORM object.
"""
new_obj = uploaded_files(
doc_type=doc_type,
s3_json_uri=json_uri,
s3_json_upload_timestamp=datetime.now(timezone.utc),
s3_file_uri=s3_file_uri,
uprn=uprn,
existing = (
db_session.query(uploaded_files)
.filter(uploaded_files.uprn == uprn, uploaded_files.doc_type == doc_type)
.one_or_none()
)
db_session.add(new_obj)
if existing:
# Update existing record
existing.s3_json_uri = json_uri
existing.s3_json_upload_timestamp = datetime.now(timezone.utc)
existing.s3_file_uri = s3_file_uri
obj = existing
else:
# Insert new record
obj = uploaded_files(
doc_type=doc_type,
s3_json_uri=json_uri,
s3_json_upload_timestamp=datetime.now(timezone.utc),
s3_file_uri=s3_file_uri,
uprn=uprn,
)
db_session.add(obj)
db_session.commit()
db_session.refresh(new_obj)
return new_obj
db_session.refresh(obj)
return obj
def handler(event, context):
@ -280,7 +296,7 @@ def handler(event, context):
property_decent_home, decent_home_meta = decent_homes_calc(filepath)
json_uri_1 = upload_json_to_s3(property_decent_home, generate_file_uri(uprn), location="decent_homes/property_decent_home")
with get_db_session() as session:
create_uploaded_file_entry(
create_or_update_uploaded_file_entry(
db_session=session,
uprn=uprn,
doc_type=ReportType.DECENT_HOMES_SUMMARY,
@ -289,7 +305,7 @@ def handler(event, context):
)
json_uri_1 = upload_json_to_s3(decent_home_meta, generate_file_uri(uprn), location="decent_homes/decent_homes_meta")
with get_db_session() as session:
create_uploaded_file_entry(
create_or_update_uploaded_file_entry(
db_session=session,
uprn=uprn,
doc_type=ReportType.DECENT_HOMES_SUMMARY,

View file

@ -0,0 +1,762 @@
import json
import os
import pandas as pd
from datetime import datetime
from docutils.nodes import table
def years_between(d1, d2):
# precise year difference (accounts for months/days)
return (d1.year - d2.year) - ((d1.month, d1.day) < (d2.month, d2.day))
def get_element(elements, label):
"""Safely get an element dict by display label (your JSON keys)."""
return elements.get(label)
def append_result(decent_homes_meta, criteria, variable, sub_variable, result, install_date=None, expiry_date=None):
decent_homes_meta.append({
"criteria": criteria,
"variable": variable,
"sub_variable": sub_variable,
"result": result,
"hhsrs_rank": None,
"hhsrs_score": None,
"install_date": install_date,
"expiry_date": expiry_date,
})
def decent_homes_calc(one_property):
# Read in static json, which is transformed by Jun-te's script
folder = "../../../../../home/Downloads/"
fn = one_property
# filenames = ["flat 1.json", "house 1.json"]
houses_waltham_forest_data = pd.read_excel(
os.path.join(folder, "data.xlsx"),
sheet_name="Houses Asset Data"
)
flats_waltham_forest_data = pd.read_excel(
os.path.join(folder, "data.xlsx"),
sheet_name="CHINGFORD ROAD 236-254 Asset Bl"
)
# Standardised variables which will form the enums in the db
HHSRS_VARIABLES = [
"damp_and_mould_growth",
"excess_cold",
"excess_heat",
"asbestos_and_mm_fibres",
"biocides",
"carbon_monoxide_and_fuel_combustion_products",
"lead",
"radiation",
"uncombusted_fuel_gas",
"volatile_organic_compounds",
"crowding_and_space",
"entry_by_intruders",
"lighting",
"noise",
"domestic_hygiene_pests_and_refuse",
"food_safety",
"personal_hygiene_sanitation_and_drainage",
"water_supply",
"falls_associated_with_baths",
"falls_on_level_surfaces",
"falls_on_stairs_and_steps",
"falls_between_levels",
"electrical_hazards",
"fire",
"flames_hot_surfaces_and_materials",
"collision_and_entrapment",
"explosions",
"ergonomics",
"structural_collapse_and_falling_elements"
]
ELEMENT_CODE_TO_DESCRIPTION = {
# One-to-one
"HHSRSDAMP": "damp_and_mould_growth",
"HHSRSCOLD": "excess_cold",
"HHSRSHEAT": "excess_heat",
"HHSRSASB": "asbestos_and_mm_fibres",
"HHSRSBIOC": "biocides",
"HHSRSLEAD": "lead",
"HHSRSRADIA": "radiation",
"HHSRSFUEL": "uncombusted_fuel_gas",
"HHSRSORGAN": "volatile_organic_compounds",
"HHSRSCROWD": "crowding_and_space",
"HHSRSENTRY": "entry_by_intruders",
"HHSRSLIGHT": "lighting",
"HHSRSNOISE": "noise",
"HHSRSDOMES": "domestic_hygiene_pests_and_refuse",
"HHSRSFOOD": "food_safety",
"HHSRSPERS": "personal_hygiene_sanitation_and_drainage",
"HHSRSWATER": "water_supply",
"HHSRSFBATH": "falls_associated_with_baths",
"HHSRSFLEVE": "falls_on_level_surfaces",
"HHSRSFSTAI": "falls_on_stairs_and_steps",
"HHSRSFBETW": "falls_between_levels",
"HHSRSELEC": "electrical_hazards",
"HHSRSFIRE": "fire",
"HHSRSFLAME": "flames_hot_surfaces_and_materials",
"HHSRSEXPLO": "explosions",
"HHSRSPOSI": "ergonomics",
"HHSRSSTRUC": "structural_collapse_and_falling_elements",
# One-to-many expansions
"HHSRSCO": "carbon_monoxide",
"HHSRSSO2": "sulphur_dioxide_and_smoke",
"HHSRSNO2": "nitrogen_dioxide",
"HHSRSENTRP": "collision_and_entrapment",
"HHSRSCLOW": "collision_hazards_and_low_headroom",
}
CRITERION_B_VARIABLES = [
"external_walls_structure", "lintels", "brickwork_spalling", "wall_finish", "roof_structure", "roof_finish",
"chimneys", "windows", "external_doors", "kitchens", "bathrooms", "central_heating_boiler",
"central_heating_distribution_system", "heating_other", "electrical_systems",
]
CRITERION_C_VARIABLES = [
"kitchen_less_than_20_years_old", "kitchen_adequate_space_and_layout", "bathroom_less_than_30_years_old",
"bathroom_wc_appropriately_located", "adequate_external_noise_insulation", "adequate_common_entrance_areas",
]
# Criterion C explicit age limits (different from component lifespans used elsewhere)
CRITERION_C_AGE_LIMITS = {
"kitchen_years_max": 20,
"bathroom_years_max": 30,
}
# Field labels as they appear in your JSON (based on your code)
LABEL_KITCHEN = "Adequacy of Kitchen and Type in Property"
LABEL_BATHROOM = "Adequacy of Bathroom Location in Property"
LABEL_NOISE = "Adequacy of Noise Insulation in Property"
LABEL_COMMON_CIRC = "Circulation Space in Common Area" # flats only
STANDARD_HHSRS_MAPPING = {"pass": "TYPRISK", "fail": "MODRISK", "no_data": "TOBEASSESS"}
# Criterion A - mapping of HHSRS variables to Waltham forest element codes
HHSRS_MAPPING = {
"damp_and_mould_growth": {"HHSRSDAMP": STANDARD_HHSRS_MAPPING},
"excess_cold": {"HHSRSCOLD": STANDARD_HHSRS_MAPPING},
"excess_heat": {"HHSRSHEAT": STANDARD_HHSRS_MAPPING},
"asbestos_and_mm_fibres": {"HHSRSASB": STANDARD_HHSRS_MAPPING},
"biocides": {"HHSRSBIOC": STANDARD_HHSRS_MAPPING},
"carbon_monoxide_and_fuel_combustion_products": {
"HHSRSCO": STANDARD_HHSRS_MAPPING,
"HHSRSSO2": STANDARD_HHSRS_MAPPING,
"HHSRSNO2": STANDARD_HHSRS_MAPPING
},
"lead": {"HHSRSLEAD": STANDARD_HHSRS_MAPPING},
"radiation": {"HHSRSRADIA": STANDARD_HHSRS_MAPPING},
"uncombusted_fuel_gas": {"HHSRSFUEL": STANDARD_HHSRS_MAPPING},
"volatile_organic_compounds": {"HHSRSORGAN": STANDARD_HHSRS_MAPPING},
"crowding_and_space": {"HHSRSCROWD": STANDARD_HHSRS_MAPPING},
"entry_by_intruders": {"HHSRSENTRY": STANDARD_HHSRS_MAPPING},
"lighting": {"HHSRSLIGHT": STANDARD_HHSRS_MAPPING},
"noise": {"HHSRSNOISE": STANDARD_HHSRS_MAPPING},
"domestic_hygiene_pests_and_refuse": {"HHSRSDOMES": STANDARD_HHSRS_MAPPING},
"food_safety": {"HHSRSFOOD": STANDARD_HHSRS_MAPPING},
"personal_hygiene_sanitation_and_drainage": {"HHSRSPERS": STANDARD_HHSRS_MAPPING},
"water_supply": {"HHSRSWATER": STANDARD_HHSRS_MAPPING},
"falls_associated_with_baths": {"HHSRSFBATH": STANDARD_HHSRS_MAPPING},
"falls_on_level_surfaces": {"HHSRSFLEVE": STANDARD_HHSRS_MAPPING},
"falls_on_stairs_and_steps": {"HHSRSFSTAI": STANDARD_HHSRS_MAPPING},
"falls_between_levels": {"HHSRSFBETW": STANDARD_HHSRS_MAPPING},
"electrical_hazards": {"HHSRSELEC": STANDARD_HHSRS_MAPPING},
"fire": {"HHSRSFIRE": STANDARD_HHSRS_MAPPING},
"flames_hot_surfaces_and_materials": {"HHSRSFLAME": STANDARD_HHSRS_MAPPING},
"collision_and_entrapment": {"HHSRSENTRP": STANDARD_HHSRS_MAPPING, "HHSRSCLOW": STANDARD_HHSRS_MAPPING},
"explosions": {"HHSRSEXPLO": STANDARD_HHSRS_MAPPING},
"ergonomics": {"HHSRSPOSI": STANDARD_HHSRS_MAPPING},
"structural_collapse_and_falling_elements": {"HHSRSSTRUC": STANDARD_HHSRS_MAPPING}
}
# print(houses_waltham_forest_data[
# houses_waltham_forest_data["ELEMENT CODE"] == "INTBTHADEQ"
# ][["ATTRIBUTE CODE", "ATTRIBUTE CODE DESCRIPTION"]].drop_duplicates())
# print(flats_waltham_forest_data[
# flats_waltham_forest_data["ELEMENT CODE"] == "INTBTHADEQ"
# ][["ATTRIBUTE CODE", "ATTRIBUTE CODE DESCRIPTION"]].drop_duplicates())
# Criterion B
B_COMPONENT_LABELS = {
# Key components
"wall_structure": [
"Wall Structure in External Area",
],
"lintels": [
"Lintels in External Area",
],
"brickwork_spalling": [
"Wall Spalling in External Area",
],
"wall_finish": [
"Wall Finish 1 in External Area",
"Wall Finish 2 in External Area",
"External Decorations in External Area",
"Brickwork Pointing in External Area",
],
"roof_structure": [
"Roof Structure 1 in External Area",
"Roof Structure 2 in External Area",
"Roof Structure 3 in External Area",
"Garage Roof in External Area",
"Garage and Store Roofs in External Area",
"Store Roof in External Area",
"Fascia / Soffit / Bargeboard in External Area",
"Gutters in External Area",
"Downpipes in External Area",
"Internal Downpipes in External Area"
],
"roof_finish": [
"Roof Covering 1 in External Area",
"Roof Covering 2 in External Area",
"Roof Covering 3 in External Area",
],
"chimneys": [
"Chimneys in External Area",
],
"windows": [
"Windows in Property",
"Windows 1 in External Area",
"Windows 2 in External Area",
"Garage and Store Windows in External Area",
"Garage Windows in External Area",
"Store Windows in External Area",
],
"external_doors": [
"Type and Location of Front Door in Property",
"Front Door Fire Rating in Property",
"Patio and French Doors 1 in External Area",
"Back and Side Doors 1 in External Area",
"Back and Side Doors 2 in External Area",
"Garage and Store Doors in External Area",
"Garage Door in External Area",
"Store Door in External Area",
],
"central_heating_boiler": [
# "Heating Improvement Required in Property",
"Boiler Fuel in Property",
"Type of Water Heating in Property",
],
"heating_other": [
# "Heating Distribution System in Property"
"Boiler Fuel in Property",
"Type of Water Heating in Property",
],
"electrical_systems": [
"Electrics Required in Property",
],
# Other components
"kitchen": [
"Adequacy of Kitchen and Type in Property",
],
"bathroom": [
"Adequacy of Bathroom Location in Property",
],
"central_heating_distribution_system": [
"Heating Distribution System in Property",
],
}
KEY_COMPONENTS = {
"wall_structure", "lintels", "brickwork_spalling", "wall_finish",
"roof_structure", "roof_finish", "chimneys", "windows",
"external_doors", "central_heating_boiler", "heating_other",
"electrical_systems",
}
OTHER_COMPONENTS = {
"kitchen", "bathroom", "central_heating_distribution_system",
}
# Criterion C
COMPONENT_LIFESPANS = {
# Key components
"wall_structure": {
"house": 80, "flat_below_6_storeys": 80, "flat_above_6_storeys": 80
},
"lintels": {
"house": 60, "flat_below_6_storeys": 60, "flat_above_6_storeys": 60
},
"brickwork_spalling": {
"house": 30, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30
},
"wall_finish": {
"house": 60, "flat_below_6_storeys": 60, "flat_above_6_storeys": 30
},
"roof_structure": {
"house": 50, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30
},
"roof_finish": {
"house": 50, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30
},
"chimneys": {
"house": 50, "flat_below_6_storeys": 50, "flat_above_6_storeys": None # N/A
},
"windows": {
"house": 40, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30
},
"external_doors": {
"house": 40, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30
},
"central_heating_boiler": {
"house": 15, "flat_below_6_storeys": 15, "flat_above_6_storeys": 15
},
"heating_other": {
"house": 30, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30
},
"electrical_systems": {
"house": 30, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30
},
# Other components
"kitchen": {
"house": 30, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30
},
"bathroom": {
"house": 40, "flat_below_6_storeys": 40, "flat_above_6_storeys": 40
},
"central_heating_distribution_system": {
"house": 40, "flat_below_6_storeys": 40, "flat_above_6_storeys": 40
},
}
# Database design
# creation_date, uprn, variable, result (pass/fail/nodata), hhsrs_score (optional, numeric), hhsrs_rank (A-J),
# install_date (for components which expire, e.g. kitchen), remaining_life (for components which expire, e.g. kitchen),
# TODO: Add the criterion
decent_homes_meta = []
# Use to capture criterion A, B, C and D. Should be:
# {"uprn": int, "creation_date": datetime, "criterion_a": bool, "criterion_b": bool, "criterion_c": bool,
# "criterion_d": bool, "decent_homes": bool"}
property_decent_homes = []
with open(os.path.join(fn), "rb") as f:
data = json.load(f)
today = pd.Timestamp.today().normalize()
property_info = data["property_info"]
if property_info["PROP TYPE"] in ["HOU"]:
property_type = "house"
elif property_info["PROP TYPE"] == "FLA":
raise NotImplementedError("Implement distrinction between below and above 6 storeys")
# property_type = "flat"
else:
raise NotImplementedError("Unknown property type")
# ---------------- Criterion A ----------------
# Critrion A: pass/fail
# If fail, why?
for hhsrs_variable, mapping in HHSRS_MAPPING.items():
element_code = list(mapping.keys())[0]
# Find the data in the JSON within data["elements"]
check_pass = []
for k, v in data["elements"].items():
if v["ELEMENT CODE"] == element_code:
# We check the attribute code
# Check if pass
if v["ATTRIBUTE CODE"] == mapping[element_code]["pass"]:
result = "pass"
elif v["ATTRIBUTE CODE"] == mapping[element_code]["fail"]:
result = "fail"
elif v["ATTRIBUTE CODE"] == mapping[element_code]["no_data"]:
result = "no_data"
else:
raise ValueError("Unknown attribute code")
check_pass.append(result)
append_result(
decent_homes_meta,
criteria="A",
variable=hhsrs_variable,
sub_variable=ELEMENT_CODE_TO_DESCRIPTION[element_code],
result=result,
install_date=None,
expiry_date=None,
)
# We check if we have a pass, fail or no_data
# if all([x == "pass" for x in check_pass]):
# hhsrs_result = "pass"
# elif any([x == "fail" for x in check_pass]):
# hhsrs_result = "fail"
# elif any([x == "no_data" for x in check_pass]):
# hhsrs_result = "no_data"
# else:
# raise NotImplementedError("Mixed results not implemented")
# ---------------- Criterion B ----------------
# Check each of the components
# ---------------- Criterion B ----------------
property_boiler = get_element(data["elements"], "Boiler Fuel in Property")
for component, labels in B_COMPONENT_LABELS.items():
for label in labels:
label_data = get_element(data["elements"], label)
# Handle no-data or not-applicable
if label_data["ATTRIBUTE CODE"] in ["UNKNOWN", "NONE", "UNKNOWNG", "UNKNOWNS"]:
# append_result(
# decent_homes_meta,
# criteria="B",
# variable=component,
# sub_variable=label,
# result="pass",
# install_date=None,
# expiry_date=None,
# )
continue
# Special skip conditions for heating
no_boiler_condition = (
property_boiler["ATTRIBUTE CODE"] in ["NONENOCH"]
and component == "central_heating_boiler"
)
other_heating_condition = (
label_data["ATTRIBUTE CODE"] in ["NONENOCH"]
and component == "heating_other"
)
if no_boiler_condition or other_heating_condition:
# append_result(
# decent_homes_meta,
# criteria="B",
# variable=component,
# sub_variable=label,
# result="pass",
# install_date=None,
# expiry_date=None,
# )
continue
# Normal case: evaluate install date + lifetime + remaining life
install_date = pd.to_datetime(label_data["INSTALL DATE"])
if pd.isnull(install_date):
raise ValueError(f"Missing install date for {component}/{label}")
component_lifetime = COMPONENT_LIFESPANS[component][property_type]
is_old = years_between(today.to_pydatetime(), install_date.to_pydatetime()) > component_lifetime
if pd.isnull(label_data["REMAINING LIFE"]):
raise ValueError(f"Missing remaining life for {component}/{label}")
has_failed = label_data["REMAINING LIFE"] < 0
expiry_date = install_date + pd.DateOffset(years=component_lifetime)
component_result = "fail" if is_old and has_failed else "pass"
# Push into decent_homes_meta
append_result(
decent_homes_meta,
criteria="B",
variable=component,
sub_variable=label,
result=component_result,
install_date=str(install_date),
expiry_date=str(expiry_date),
)
# ---------------- Criterion C ----------------
# Guard: property type string already set earlier
is_flat = (property_info["PROP TYPE"] == "FLA")
# 1) Kitchen age ≤ 20 years
kitchen = get_element(data["elements"], LABEL_KITCHEN)
if kitchen:
kit_install_raw = kitchen["INSTALL DATE"]
kit_install = pd.to_datetime(kit_install_raw)
kit_age_years = years_between(today.to_pydatetime(), kit_install.to_pydatetime())
kitchen_age_result = "pass" if kit_age_years <= CRITERION_C_AGE_LIMITS["kitchen_years_max"] else "fail"
# For transparency, store next renewal as install + 20 years (criterion C perspective)
kit_next_due = kit_install + pd.DateOffset(years=CRITERION_C_AGE_LIMITS["kitchen_years_max"])
else:
raise NotImplementedError("Kitchen data missing - pls check")
append_result(
decent_homes_meta,
criteria="C",
variable="kitchen_less_than_20_years_old",
sub_variable="kitchen_less_than_20_years_old",
result=kitchen_age_result,
install_date=str(kit_install),
expiry_date=str(kit_next_due)
)
# 2) Kitchen adequate space/layout
# Prefer explicit codes if you have them, fall back to text in ATTRIBUTE CODE DESCRIPTION
if kitchen:
kit_attr_desc = kitchen["ATTRIBUTE CODE"]
if kit_attr_desc == "STDKITADQ":
kitchen_adequacy_result = "pass"
else:
raise NotImplementedError("No other observed codes yet")
else:
raise NotImplementedError("Kitchen data missing - pls check")
append_result(
decent_homes_meta,
criteria="C",
variable="kitchen_adequate_space_and_layout",
sub_variable="kitchen_adequate_space_and_layout",
result=kitchen_adequacy_result,
)
# 3) Bathroom age ≤ 30 years
bath = get_element(data["elements"], LABEL_BATHROOM)
if bath:
bth_install_raw = bath["INSTALL DATE"]
bth_install = pd.to_datetime(bth_install_raw)
bth_age_years = years_between(today.to_pydatetime(), bth_install.to_pydatetime())
bathroom_age_result = "pass" if bth_age_years <= CRITERION_C_AGE_LIMITS["bathroom_years_max"] else "fail"
bth_next_due = bth_install + pd.DateOffset(years=CRITERION_C_AGE_LIMITS["bathroom_years_max"])
else:
raise NotImplementedError("Bathroom data missing - pls check")
append_result(
decent_homes_meta,
criteria="C",
variable="bathroom_less_than_30_years_old",
sub_variable="bathroom_less_than_30_years_old",
result=bathroom_age_result,
install_date=str(bth_install),
expiry_date=bth_next_due
)
# 4) Bathroom/WC appropriately located
if bath:
bth_attr_code = bath["ATTRIBUTE CODE"]
if bth_attr_code in {"STDBTHADQ", "ADPBTHADQ"}:
bathroom_location_result = "pass"
else:
raise NotImplementedError("No other observed codes yet")
else:
raise NotImplementedError("Bathroom data missing - pls check")
append_result(
decent_homes_meta,
criteria="C",
variable="bathroom_wc_appropriately_located",
sub_variable="bathroom_wc_appropriately_located",
result=bathroom_location_result
)
# 5) Adequate external noise insulation
noise = get_element(data["elements"], LABEL_NOISE)
if noise:
noise_code = noise["ATTRIBUTE CODE"]
if noise_code in {"ADEQUATE"}:
noise_result = "pass"
else:
raise NotImplementedError("No other observed codes yet")
else:
raise NotImplementedError("Noise insulation data missing - pls check")
append_result(
decent_homes_meta,
criteria="C",
variable="adequate_external_noise_insulation",
sub_variable="adequate_external_noise_insulation",
result=noise_result
)
# 6) Adequate common entrance areas (flats only)
if is_flat:
raise Exception("Pls check this")
common = get_element(data["elements"], LABEL_COMMON_CIRC)
if common:
circ_desc = common.get("ATTRIBUTE CODE DESCRIPTION", "")
common_areas_result = adequacy_result_by_text(circ_desc)
else:
common_areas_result = "no_data"
append_result(decent_homes_meta, "adequate_common_entrance_areas", common_areas_result)
# ---------------- Criterion D ----------------
# heating system type
heating = get_element(data["elements"], "Heating Improvement Required in Property")
if heating:
heat_type_code = heating["ATTRIBUTE CODE"]
if heat_type_code in {"NOTAPPLIC"}:
heating_type_result = "pass"
elif heat_type_code in {"WETINSFULL"}:
heating_type_result = "fail"
else:
raise NotImplementedError("No other observed codes yet")
else:
raise NotImplementedError("Heating element missing in dataset")
append_result(
decent_homes_meta,
criteria="D",
variable="efficient_heating_system_type",
sub_variable="efficient_heating_system_type",
result=heating_type_result
)
# heating distribution
heating_dist = get_element(data["elements"], "Heating Distribution System in Property")
if heating_dist:
dist_code = heating_dist["ATTRIBUTE CODE"]
if dist_code == "UNKNOWN":
# For the observed case, there was no heating and wet heating needed to be installed in full so the value
# was unknown
heating_dist_result = "no_data"
else:
raise NotImplementedError("No other observed codes yet")
else:
raise NotImplementedError("Heating distribution element missing in dataset")
append_result(
decent_homes_meta,
criteria="D",
variable="efficient_heating_distribution",
sub_variable="efficient_heating_distribution",
result=heating_dist_result
)
# insulation
loft = get_element(data["elements"], "Size in mm of Loft Insulation Thickness in Property")
wall = get_element(data["elements"], "Wall Insulation Improvement in External Area")
# To determine how much loft insulation is required
# Loft insulation check (example threshold: ≥ 270mm = pass)
if loft:
# We have a specific code, where further loft insulation is needed - It appears the heating type check has
# already been completed in this dataset and so we just need to check the code
loft_code = loft["ATTRIBUTE CODE"]
if loft_code == "LOFTINSRQD":
loft_result = "fail"
elif loft_code.isnumeric():
loft_result = "pass"
else:
raise NotImplementedError("Unknown loft insulation code - pls check")
else:
raise NotImplementedError("Loft insulation data missing - pls check")
append_result(
decent_homes_meta,
criteria="D",
variable="loft_insulation_sufficient",
sub_variable="loft_insulation_sufficient",
result=loft_result
)
# Wall insulation check
if wall:
wall_code = wall["ATTRIBUTE CODE"]
if wall_code in {"NONE"}: # Means no insulation improvement required
wall_result = "pass"
else:
raise NotImplementedError("No other observed codes yet")
else:
raise NotImplementedError("Wall insulation data missing - pls check")
append_result(
decent_homes_meta,
criteria="D",
variable="wall_insulation_sufficient",
sub_variable="wall_insulation_sufficient",
result=wall_result
)
# ---------------- Criterion A overall ----------------
a_vars = set(HHSRS_MAPPING.keys())
latest_a_results = {r["variable"]: r["result"] for r in decent_homes_meta if r["variable"] in a_vars}
if any(v == "fail" for v in latest_a_results.values()):
criterion_a_result = "fail"
elif all(v == "pass" for v in latest_a_results.values()):
criterion_a_result = "pass"
else:
criterion_a_result = "no_data"
# ---------------- Criterion B overall ----------------
component_results = {}
for component in B_COMPONENT_LABELS.keys():
comp_rows = [r for r in decent_homes_meta if
r["criteria"] == "B" and r["variable"] == component and r["sub_variable"] is not None]
comp_sub_results = [r["result"] for r in comp_rows]
if not comp_sub_results: # no rows at all
comp_result = "no_data"
elif any(r == "fail" for r in comp_sub_results):
comp_result = "fail"
elif all(r == "pass" for r in comp_sub_results if r != "no_data"):
comp_result = "pass"
elif all(r == "no_data" for r in comp_sub_results):
comp_result = "no_data"
else:
comp_result = "no_data"
component_results[component] = comp_result
key_fails = [c for c, r in component_results.items() if c in KEY_COMPONENTS and r == "fail"]
other_fails = [c for c, r in component_results.items() if c in OTHER_COMPONENTS and r == "fail"]
if key_fails:
criterion_b_result = "fail"
elif len(other_fails) >= 2:
criterion_b_result = "fail"
elif all(r == "no_data" for r in component_results.values()):
criterion_b_result = "no_data"
else:
criterion_b_result = "pass"
# ---------------- Criterion C overall ----------------
criterion_c_vars = [
"kitchen_less_than_20_years_old",
"kitchen_adequate_space_and_layout",
"bathroom_less_than_30_years_old",
"bathroom_wc_appropriately_located",
"adequate_external_noise_insulation",
]
if is_flat:
criterion_c_vars.append("adequate_common_entrance_areas")
latest_c_results = {r["variable"]: r["result"] for r in decent_homes_meta if r["variable"] in criterion_c_vars}
count_fails = sum(1 for v in latest_c_results.values() if v == "fail")
# optionally count no_data too if you want strict interpretation
criterion_c_result = "fail" if count_fails >= 3 else "pass"
# ---------------- Criterion D overall ----------------
# Needs to have both efficient geating and distribution so all should pass
criterion_d_vars = [
"efficient_heating_system_type",
"efficient_heating_distribution",
"loft_insulation_sufficient",
"wall_insulation_sufficient",
]
latest_d_results = {r["variable"]: r["result"] for r in decent_homes_meta if r["variable"] in criterion_d_vars}
if any(v == "fail" for v in latest_d_results.values()):
criterion_d_result = "fail"
elif all(v == "pass" for v in latest_d_results.values()):
criterion_d_result = "pass"
else:
criterion_d_result = "no_data"
# ---------------- Append to property_decent_homes ----------------
property_decent_homes.append({
"uprn": data.get("UPRN"), # TODO: Need UPRN
"creation_date": datetime.now().date().isoformat(),
"criterion_a": criterion_a_result,
"criterion_b": criterion_b_result,
"criterion_c": criterion_c_result,
"criterion_d": criterion_d_result,
"decent_homes": (
criterion_a_result == "pass"
and criterion_c_result == "pass"
and criterion_d_result == "pass"
)
})
return property_decent_homes[0], decent_homes_meta,