diff --git a/alembic/versions/4c67501b7451_added_more_enums.py b/alembic/versions/4c67501b7451_added_more_enums.py new file mode 100644 index 0000000..19a813a --- /dev/null +++ b/alembic/versions/4c67501b7451_added_more_enums.py @@ -0,0 +1,60 @@ +"""added more enums + +Revision ID: 4c67501b7451 +Revises: ac8dba8cef50 +Create Date: 2025-09-23 10:22:20.648664 +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision: str = "4c67501b7451" +down_revision: Union[str, None] = "ac8dba8cef50" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +ENUM_NAME = "reporttype" + +# Values that were already present BEFORE this migration +OLD_VALUES = ( + "QUIDOS_PRESITE_NOTE", + "CHARTED_SURVEYOR_REPORT", + "ENERGY_PERFORMANCE_REPORT", + "U_VALUE_CALCULATOR_REPORT", + "OVERWRITING_U_VALUE_DECLARATION_FORM", + "OSMOSIS_CONDITION_PAS_2035_REPORT", + "DOMNA_CONDITION_PAS_2035_REPORT", +) + +# Values we are ADDING in this migration +NEW_VALUES = ( + "DECENT_HOMES_RAW_DATA", + "DECENT_HOMES_SUMMARY", + "DECENT_HOMES_PROPERTY_META", +) + +def upgrade() -> None: + for v in NEW_VALUES: + op.execute(f"ALTER TYPE {ENUM_NAME} ADD VALUE IF NOT EXISTS '{v}'") + + +def downgrade() -> None: + # 1) Create a replacement type with ONLY the old values + old_vals = ", ".join(f"'{v}'" for v in OLD_VALUES) + op.execute(f"CREATE TYPE {ENUM_NAME}_old AS ENUM ({old_vals})") + + # 2) Move columns to the temporary type + op.execute( + f"ALTER TABLE documents ALTER COLUMN document_type TYPE {ENUM_NAME}_old " + f"USING document_type::text::{ENUM_NAME}_old" + ) + op.execute( + f"ALTER TABLE uploaded_files ALTER COLUMN doc_type TYPE {ENUM_NAME}_old " + f"USING doc_type::text::{ENUM_NAME}_old" + ) + + # 3) Drop original type and rename the temp back + op.execute(f"DROP TYPE {ENUM_NAME}") + op.execute(f"ALTER TYPE {ENUM_NAME}_old RENAME TO {ENUM_NAME}") diff --git a/alembic/versions/ac8dba8cef50_added_more_report_type.py b/alembic/versions/ac8dba8cef50_added_more_report_type.py new file mode 100644 index 0000000..9d60b57 --- /dev/null +++ b/alembic/versions/ac8dba8cef50_added_more_report_type.py @@ -0,0 +1,38 @@ +"""added more report type + +Revision ID: ac8dba8cef50 +Revises: a8cc4a5fccb6 +Create Date: 2025-09-23 10:14:54.461633 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'ac8dba8cef50' +down_revision: Union[str, None] = 'a8cc4a5fccb6' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.alter_column('uploaded_files', 'id', + existing_type=sa.UUID(), + server_default=None, + existing_nullable=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.alter_column('uploaded_files', 'id', + existing_type=sa.UUID(), + server_default=sa.text('gen_random_uuid()'), + existing_nullable=False) + # ### end Alembic commands ### diff --git a/deployment/lambda/walthamforest_etl/docker/app.py b/deployment/lambda/walthamforest_etl/docker/app.py index 9d6d19f..7183b2b 100644 --- a/deployment/lambda/walthamforest_etl/docker/app.py +++ b/deployment/lambda/walthamforest_etl/docker/app.py @@ -222,29 +222,45 @@ def generate_file_uri(UPRN): file_uri = f"https://retrofit-energy-assessments-dev.s3.eu-west-2.amazonaws.com/documents/{UPRN}/" return file_uri -def create_uploaded_file_entry( +def create_or_update_uploaded_file_entry( db_session, - uprn, + uprn: str, doc_type: ReportType, json_uri: str, - s3_file_uri:str + s3_file_uri: str ): """ - Create a new entry in uploaded_files with s3_json_uri and timestamp. + Create or update an entry in uploaded_files. + - If a record with the same (uprn, doc_type) exists, update it. + - Otherwise, insert a new record. Commits, refreshes, and returns the ORM object. """ - new_obj = uploaded_files( - doc_type=doc_type, - s3_json_uri=json_uri, - s3_json_upload_timestamp=datetime.now(timezone.utc), - s3_file_uri=s3_file_uri, - uprn=uprn, + existing = ( + db_session.query(uploaded_files) + .filter(uploaded_files.uprn == uprn, uploaded_files.doc_type == doc_type) + .one_or_none() ) - db_session.add(new_obj) + if existing: + # Update existing record + existing.s3_json_uri = json_uri + existing.s3_json_upload_timestamp = datetime.now(timezone.utc) + existing.s3_file_uri = s3_file_uri + obj = existing + else: + # Insert new record + obj = uploaded_files( + doc_type=doc_type, + s3_json_uri=json_uri, + s3_json_upload_timestamp=datetime.now(timezone.utc), + s3_file_uri=s3_file_uri, + uprn=uprn, + ) + db_session.add(obj) + db_session.commit() - db_session.refresh(new_obj) - return new_obj + db_session.refresh(obj) + return obj def handler(event, context): @@ -280,7 +296,7 @@ def handler(event, context): property_decent_home, decent_home_meta = decent_homes_calc(filepath) json_uri_1 = upload_json_to_s3(property_decent_home, generate_file_uri(uprn), location="decent_homes/property_decent_home") with get_db_session() as session: - create_uploaded_file_entry( + create_or_update_uploaded_file_entry( db_session=session, uprn=uprn, doc_type=ReportType.DECENT_HOMES_SUMMARY, @@ -289,7 +305,7 @@ def handler(event, context): ) json_uri_1 = upload_json_to_s3(decent_home_meta, generate_file_uri(uprn), location="decent_homes/decent_homes_meta") with get_db_session() as session: - create_uploaded_file_entry( + create_or_update_uploaded_file_entry( db_session=session, uprn=uprn, doc_type=ReportType.DECENT_HOMES_SUMMARY, diff --git a/deployment/lambda/walthamforest_etl/docker/decent_homes_pilot.py b/deployment/lambda/walthamforest_etl/docker/decent_homes_pilot.py new file mode 100644 index 0000000..02995c7 --- /dev/null +++ b/deployment/lambda/walthamforest_etl/docker/decent_homes_pilot.py @@ -0,0 +1,762 @@ +import json +import os + +import pandas as pd + +from datetime import datetime + +from docutils.nodes import table + + +def years_between(d1, d2): + # precise year difference (accounts for months/days) + return (d1.year - d2.year) - ((d1.month, d1.day) < (d2.month, d2.day)) + + +def get_element(elements, label): + """Safely get an element dict by display label (your JSON keys).""" + return elements.get(label) + + +def append_result(decent_homes_meta, criteria, variable, sub_variable, result, install_date=None, expiry_date=None): + decent_homes_meta.append({ + "criteria": criteria, + "variable": variable, + "sub_variable": sub_variable, + "result": result, + "hhsrs_rank": None, + "hhsrs_score": None, + "install_date": install_date, + "expiry_date": expiry_date, + }) + + +def decent_homes_calc(one_property): + # Read in static json, which is transformed by Jun-te's script + folder = "../../../../../home/Downloads/" + fn = one_property + + # filenames = ["flat 1.json", "house 1.json"] + + houses_waltham_forest_data = pd.read_excel( + os.path.join(folder, "data.xlsx"), + sheet_name="Houses Asset Data" + ) + flats_waltham_forest_data = pd.read_excel( + os.path.join(folder, "data.xlsx"), + sheet_name="CHINGFORD ROAD 236-254 Asset Bl" + ) + + # Standardised variables which will form the enums in the db + HHSRS_VARIABLES = [ + "damp_and_mould_growth", + "excess_cold", + "excess_heat", + "asbestos_and_mm_fibres", + "biocides", + "carbon_monoxide_and_fuel_combustion_products", + "lead", + "radiation", + "uncombusted_fuel_gas", + "volatile_organic_compounds", + "crowding_and_space", + "entry_by_intruders", + "lighting", + "noise", + "domestic_hygiene_pests_and_refuse", + "food_safety", + "personal_hygiene_sanitation_and_drainage", + "water_supply", + "falls_associated_with_baths", + "falls_on_level_surfaces", + "falls_on_stairs_and_steps", + "falls_between_levels", + "electrical_hazards", + "fire", + "flames_hot_surfaces_and_materials", + "collision_and_entrapment", + "explosions", + "ergonomics", + "structural_collapse_and_falling_elements" + ] + + ELEMENT_CODE_TO_DESCRIPTION = { + # One-to-one + "HHSRSDAMP": "damp_and_mould_growth", + "HHSRSCOLD": "excess_cold", + "HHSRSHEAT": "excess_heat", + "HHSRSASB": "asbestos_and_mm_fibres", + "HHSRSBIOC": "biocides", + "HHSRSLEAD": "lead", + "HHSRSRADIA": "radiation", + "HHSRSFUEL": "uncombusted_fuel_gas", + "HHSRSORGAN": "volatile_organic_compounds", + "HHSRSCROWD": "crowding_and_space", + "HHSRSENTRY": "entry_by_intruders", + "HHSRSLIGHT": "lighting", + "HHSRSNOISE": "noise", + "HHSRSDOMES": "domestic_hygiene_pests_and_refuse", + "HHSRSFOOD": "food_safety", + "HHSRSPERS": "personal_hygiene_sanitation_and_drainage", + "HHSRSWATER": "water_supply", + "HHSRSFBATH": "falls_associated_with_baths", + "HHSRSFLEVE": "falls_on_level_surfaces", + "HHSRSFSTAI": "falls_on_stairs_and_steps", + "HHSRSFBETW": "falls_between_levels", + "HHSRSELEC": "electrical_hazards", + "HHSRSFIRE": "fire", + "HHSRSFLAME": "flames_hot_surfaces_and_materials", + "HHSRSEXPLO": "explosions", + "HHSRSPOSI": "ergonomics", + "HHSRSSTRUC": "structural_collapse_and_falling_elements", + + # One-to-many expansions + "HHSRSCO": "carbon_monoxide", + "HHSRSSO2": "sulphur_dioxide_and_smoke", + "HHSRSNO2": "nitrogen_dioxide", + "HHSRSENTRP": "collision_and_entrapment", + "HHSRSCLOW": "collision_hazards_and_low_headroom", + } + + CRITERION_B_VARIABLES = [ + "external_walls_structure", "lintels", "brickwork_spalling", "wall_finish", "roof_structure", "roof_finish", + "chimneys", "windows", "external_doors", "kitchens", "bathrooms", "central_heating_boiler", + "central_heating_distribution_system", "heating_other", "electrical_systems", + ] + + CRITERION_C_VARIABLES = [ + "kitchen_less_than_20_years_old", "kitchen_adequate_space_and_layout", "bathroom_less_than_30_years_old", + "bathroom_wc_appropriately_located", "adequate_external_noise_insulation", "adequate_common_entrance_areas", + ] + + # Criterion C explicit age limits (different from component lifespans used elsewhere) + CRITERION_C_AGE_LIMITS = { + "kitchen_years_max": 20, + "bathroom_years_max": 30, + } + + # Field labels as they appear in your JSON (based on your code) + LABEL_KITCHEN = "Adequacy of Kitchen and Type in Property" + LABEL_BATHROOM = "Adequacy of Bathroom Location in Property" + LABEL_NOISE = "Adequacy of Noise Insulation in Property" + LABEL_COMMON_CIRC = "Circulation Space in Common Area" # flats only + + STANDARD_HHSRS_MAPPING = {"pass": "TYPRISK", "fail": "MODRISK", "no_data": "TOBEASSESS"} + + # Criterion A - mapping of HHSRS variables to Waltham forest element codes + HHSRS_MAPPING = { + "damp_and_mould_growth": {"HHSRSDAMP": STANDARD_HHSRS_MAPPING}, + "excess_cold": {"HHSRSCOLD": STANDARD_HHSRS_MAPPING}, + "excess_heat": {"HHSRSHEAT": STANDARD_HHSRS_MAPPING}, + "asbestos_and_mm_fibres": {"HHSRSASB": STANDARD_HHSRS_MAPPING}, + "biocides": {"HHSRSBIOC": STANDARD_HHSRS_MAPPING}, + "carbon_monoxide_and_fuel_combustion_products": { + "HHSRSCO": STANDARD_HHSRS_MAPPING, + "HHSRSSO2": STANDARD_HHSRS_MAPPING, + "HHSRSNO2": STANDARD_HHSRS_MAPPING + }, + "lead": {"HHSRSLEAD": STANDARD_HHSRS_MAPPING}, + "radiation": {"HHSRSRADIA": STANDARD_HHSRS_MAPPING}, + "uncombusted_fuel_gas": {"HHSRSFUEL": STANDARD_HHSRS_MAPPING}, + "volatile_organic_compounds": {"HHSRSORGAN": STANDARD_HHSRS_MAPPING}, + "crowding_and_space": {"HHSRSCROWD": STANDARD_HHSRS_MAPPING}, + "entry_by_intruders": {"HHSRSENTRY": STANDARD_HHSRS_MAPPING}, + "lighting": {"HHSRSLIGHT": STANDARD_HHSRS_MAPPING}, + "noise": {"HHSRSNOISE": STANDARD_HHSRS_MAPPING}, + "domestic_hygiene_pests_and_refuse": {"HHSRSDOMES": STANDARD_HHSRS_MAPPING}, + "food_safety": {"HHSRSFOOD": STANDARD_HHSRS_MAPPING}, + "personal_hygiene_sanitation_and_drainage": {"HHSRSPERS": STANDARD_HHSRS_MAPPING}, + "water_supply": {"HHSRSWATER": STANDARD_HHSRS_MAPPING}, + "falls_associated_with_baths": {"HHSRSFBATH": STANDARD_HHSRS_MAPPING}, + "falls_on_level_surfaces": {"HHSRSFLEVE": STANDARD_HHSRS_MAPPING}, + "falls_on_stairs_and_steps": {"HHSRSFSTAI": STANDARD_HHSRS_MAPPING}, + "falls_between_levels": {"HHSRSFBETW": STANDARD_HHSRS_MAPPING}, + "electrical_hazards": {"HHSRSELEC": STANDARD_HHSRS_MAPPING}, + "fire": {"HHSRSFIRE": STANDARD_HHSRS_MAPPING}, + "flames_hot_surfaces_and_materials": {"HHSRSFLAME": STANDARD_HHSRS_MAPPING}, + "collision_and_entrapment": {"HHSRSENTRP": STANDARD_HHSRS_MAPPING, "HHSRSCLOW": STANDARD_HHSRS_MAPPING}, + "explosions": {"HHSRSEXPLO": STANDARD_HHSRS_MAPPING}, + "ergonomics": {"HHSRSPOSI": STANDARD_HHSRS_MAPPING}, + "structural_collapse_and_falling_elements": {"HHSRSSTRUC": STANDARD_HHSRS_MAPPING} + } + + # print(houses_waltham_forest_data[ + # houses_waltham_forest_data["ELEMENT CODE"] == "INTBTHADEQ" + # ][["ATTRIBUTE CODE", "ATTRIBUTE CODE DESCRIPTION"]].drop_duplicates()) + + # print(flats_waltham_forest_data[ + # flats_waltham_forest_data["ELEMENT CODE"] == "INTBTHADEQ" + # ][["ATTRIBUTE CODE", "ATTRIBUTE CODE DESCRIPTION"]].drop_duplicates()) + + + # Criterion B + B_COMPONENT_LABELS = { + # Key components + "wall_structure": [ + "Wall Structure in External Area", + ], + "lintels": [ + "Lintels in External Area", + ], + "brickwork_spalling": [ + "Wall Spalling in External Area", + ], + "wall_finish": [ + "Wall Finish 1 in External Area", + "Wall Finish 2 in External Area", + "External Decorations in External Area", + "Brickwork Pointing in External Area", + ], + "roof_structure": [ + "Roof Structure 1 in External Area", + "Roof Structure 2 in External Area", + "Roof Structure 3 in External Area", + "Garage Roof in External Area", + "Garage and Store Roofs in External Area", + "Store Roof in External Area", + "Fascia / Soffit / Bargeboard in External Area", + "Gutters in External Area", + "Downpipes in External Area", + "Internal Downpipes in External Area" + ], + "roof_finish": [ + "Roof Covering 1 in External Area", + "Roof Covering 2 in External Area", + "Roof Covering 3 in External Area", + ], + "chimneys": [ + "Chimneys in External Area", + ], + "windows": [ + "Windows in Property", + "Windows 1 in External Area", + "Windows 2 in External Area", + "Garage and Store Windows in External Area", + "Garage Windows in External Area", + "Store Windows in External Area", + ], + "external_doors": [ + "Type and Location of Front Door in Property", + "Front Door Fire Rating in Property", + "Patio and French Doors 1 in External Area", + "Back and Side Doors 1 in External Area", + "Back and Side Doors 2 in External Area", + "Garage and Store Doors in External Area", + "Garage Door in External Area", + "Store Door in External Area", + ], + "central_heating_boiler": [ + # "Heating Improvement Required in Property", + "Boiler Fuel in Property", + "Type of Water Heating in Property", + ], + "heating_other": [ + # "Heating Distribution System in Property" + "Boiler Fuel in Property", + "Type of Water Heating in Property", + ], + "electrical_systems": [ + "Electrics Required in Property", + ], + # Other components + "kitchen": [ + "Adequacy of Kitchen and Type in Property", + ], + "bathroom": [ + "Adequacy of Bathroom Location in Property", + ], + "central_heating_distribution_system": [ + "Heating Distribution System in Property", + ], + } + + KEY_COMPONENTS = { + "wall_structure", "lintels", "brickwork_spalling", "wall_finish", + "roof_structure", "roof_finish", "chimneys", "windows", + "external_doors", "central_heating_boiler", "heating_other", + "electrical_systems", + } + OTHER_COMPONENTS = { + "kitchen", "bathroom", "central_heating_distribution_system", + } + + # Criterion C + COMPONENT_LIFESPANS = { + # Key components + "wall_structure": { + "house": 80, "flat_below_6_storeys": 80, "flat_above_6_storeys": 80 + }, + "lintels": { + "house": 60, "flat_below_6_storeys": 60, "flat_above_6_storeys": 60 + }, + "brickwork_spalling": { + "house": 30, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30 + }, + "wall_finish": { + "house": 60, "flat_below_6_storeys": 60, "flat_above_6_storeys": 30 + }, + "roof_structure": { + "house": 50, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30 + }, + "roof_finish": { + "house": 50, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30 + }, + "chimneys": { + "house": 50, "flat_below_6_storeys": 50, "flat_above_6_storeys": None # N/A + }, + "windows": { + "house": 40, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30 + }, + "external_doors": { + "house": 40, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30 + }, + "central_heating_boiler": { + "house": 15, "flat_below_6_storeys": 15, "flat_above_6_storeys": 15 + }, + "heating_other": { + "house": 30, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30 + }, + "electrical_systems": { + "house": 30, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30 + }, + + # Other components + "kitchen": { + "house": 30, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30 + }, + "bathroom": { + "house": 40, "flat_below_6_storeys": 40, "flat_above_6_storeys": 40 + }, + "central_heating_distribution_system": { + "house": 40, "flat_below_6_storeys": 40, "flat_above_6_storeys": 40 + }, + } + + # Database design + # creation_date, uprn, variable, result (pass/fail/nodata), hhsrs_score (optional, numeric), hhsrs_rank (A-J), + # install_date (for components which expire, e.g. kitchen), remaining_life (for components which expire, e.g. kitchen), + + # TODO: Add the criterion + decent_homes_meta = [] + # Use to capture criterion A, B, C and D. Should be: + # {"uprn": int, "creation_date": datetime, "criterion_a": bool, "criterion_b": bool, "criterion_c": bool, + # "criterion_d": bool, "decent_homes": bool"} + property_decent_homes = [] + with open(os.path.join(fn), "rb") as f: + data = json.load(f) + + today = pd.Timestamp.today().normalize() + + property_info = data["property_info"] + if property_info["PROP TYPE"] in ["HOU"]: + property_type = "house" + elif property_info["PROP TYPE"] == "FLA": + raise NotImplementedError("Implement distrinction between below and above 6 storeys") + # property_type = "flat" + else: + raise NotImplementedError("Unknown property type") + + # ---------------- Criterion A ---------------- + # Critrion A: pass/fail + # If fail, why? + for hhsrs_variable, mapping in HHSRS_MAPPING.items(): + element_code = list(mapping.keys())[0] + + # Find the data in the JSON within data["elements"] + check_pass = [] + for k, v in data["elements"].items(): + if v["ELEMENT CODE"] == element_code: + # We check the attribute code + # Check if pass + if v["ATTRIBUTE CODE"] == mapping[element_code]["pass"]: + result = "pass" + elif v["ATTRIBUTE CODE"] == mapping[element_code]["fail"]: + result = "fail" + elif v["ATTRIBUTE CODE"] == mapping[element_code]["no_data"]: + result = "no_data" + else: + raise ValueError("Unknown attribute code") + check_pass.append(result) + append_result( + decent_homes_meta, + criteria="A", + variable=hhsrs_variable, + sub_variable=ELEMENT_CODE_TO_DESCRIPTION[element_code], + result=result, + install_date=None, + expiry_date=None, + ) + + # We check if we have a pass, fail or no_data + # if all([x == "pass" for x in check_pass]): + # hhsrs_result = "pass" + # elif any([x == "fail" for x in check_pass]): + # hhsrs_result = "fail" + # elif any([x == "no_data" for x in check_pass]): + # hhsrs_result = "no_data" + # else: + # raise NotImplementedError("Mixed results not implemented") + + # ---------------- Criterion B ---------------- + # Check each of the components + + # ---------------- Criterion B ---------------- + property_boiler = get_element(data["elements"], "Boiler Fuel in Property") + + for component, labels in B_COMPONENT_LABELS.items(): + for label in labels: + label_data = get_element(data["elements"], label) + + # Handle no-data or not-applicable + if label_data["ATTRIBUTE CODE"] in ["UNKNOWN", "NONE", "UNKNOWNG", "UNKNOWNS"]: + # append_result( + # decent_homes_meta, + # criteria="B", + # variable=component, + # sub_variable=label, + # result="pass", + # install_date=None, + # expiry_date=None, + # ) + continue + + # Special skip conditions for heating + no_boiler_condition = ( + property_boiler["ATTRIBUTE CODE"] in ["NONENOCH"] + and component == "central_heating_boiler" + ) + other_heating_condition = ( + label_data["ATTRIBUTE CODE"] in ["NONENOCH"] + and component == "heating_other" + ) + if no_boiler_condition or other_heating_condition: + # append_result( + # decent_homes_meta, + # criteria="B", + # variable=component, + # sub_variable=label, + # result="pass", + # install_date=None, + # expiry_date=None, + # ) + continue + + # Normal case: evaluate install date + lifetime + remaining life + install_date = pd.to_datetime(label_data["INSTALL DATE"]) + if pd.isnull(install_date): + raise ValueError(f"Missing install date for {component}/{label}") + + component_lifetime = COMPONENT_LIFESPANS[component][property_type] + is_old = years_between(today.to_pydatetime(), install_date.to_pydatetime()) > component_lifetime + + if pd.isnull(label_data["REMAINING LIFE"]): + raise ValueError(f"Missing remaining life for {component}/{label}") + has_failed = label_data["REMAINING LIFE"] < 0 + + expiry_date = install_date + pd.DateOffset(years=component_lifetime) + component_result = "fail" if is_old and has_failed else "pass" + + # Push into decent_homes_meta + append_result( + decent_homes_meta, + criteria="B", + variable=component, + sub_variable=label, + result=component_result, + install_date=str(install_date), + expiry_date=str(expiry_date), + ) + + # ---------------- Criterion C ---------------- + + # Guard: property type string already set earlier + is_flat = (property_info["PROP TYPE"] == "FLA") + + # 1) Kitchen age ≤ 20 years + kitchen = get_element(data["elements"], LABEL_KITCHEN) + if kitchen: + kit_install_raw = kitchen["INSTALL DATE"] + kit_install = pd.to_datetime(kit_install_raw) + kit_age_years = years_between(today.to_pydatetime(), kit_install.to_pydatetime()) + kitchen_age_result = "pass" if kit_age_years <= CRITERION_C_AGE_LIMITS["kitchen_years_max"] else "fail" + # For transparency, store next renewal as install + 20 years (criterion C perspective) + kit_next_due = kit_install + pd.DateOffset(years=CRITERION_C_AGE_LIMITS["kitchen_years_max"]) + else: + raise NotImplementedError("Kitchen data missing - pls check") + append_result( + decent_homes_meta, + criteria="C", + variable="kitchen_less_than_20_years_old", + sub_variable="kitchen_less_than_20_years_old", + result=kitchen_age_result, + install_date=str(kit_install), + expiry_date=str(kit_next_due) + ) + + # 2) Kitchen adequate space/layout + # Prefer explicit codes if you have them, fall back to text in ATTRIBUTE CODE DESCRIPTION + if kitchen: + kit_attr_desc = kitchen["ATTRIBUTE CODE"] + if kit_attr_desc == "STDKITADQ": + kitchen_adequacy_result = "pass" + else: + raise NotImplementedError("No other observed codes yet") + else: + raise NotImplementedError("Kitchen data missing - pls check") + append_result( + decent_homes_meta, + criteria="C", + variable="kitchen_adequate_space_and_layout", + sub_variable="kitchen_adequate_space_and_layout", + result=kitchen_adequacy_result, + ) + + # 3) Bathroom age ≤ 30 years + bath = get_element(data["elements"], LABEL_BATHROOM) + if bath: + bth_install_raw = bath["INSTALL DATE"] + bth_install = pd.to_datetime(bth_install_raw) + bth_age_years = years_between(today.to_pydatetime(), bth_install.to_pydatetime()) + bathroom_age_result = "pass" if bth_age_years <= CRITERION_C_AGE_LIMITS["bathroom_years_max"] else "fail" + bth_next_due = bth_install + pd.DateOffset(years=CRITERION_C_AGE_LIMITS["bathroom_years_max"]) + else: + raise NotImplementedError("Bathroom data missing - pls check") + append_result( + decent_homes_meta, + criteria="C", + variable="bathroom_less_than_30_years_old", + sub_variable="bathroom_less_than_30_years_old", + result=bathroom_age_result, + install_date=str(bth_install), + expiry_date=bth_next_due + ) + + # 4) Bathroom/WC appropriately located + if bath: + bth_attr_code = bath["ATTRIBUTE CODE"] + if bth_attr_code in {"STDBTHADQ", "ADPBTHADQ"}: + bathroom_location_result = "pass" + else: + raise NotImplementedError("No other observed codes yet") + else: + raise NotImplementedError("Bathroom data missing - pls check") + + append_result( + decent_homes_meta, + criteria="C", + variable="bathroom_wc_appropriately_located", + sub_variable="bathroom_wc_appropriately_located", + result=bathroom_location_result + ) + + # 5) Adequate external noise insulation + noise = get_element(data["elements"], LABEL_NOISE) + if noise: + noise_code = noise["ATTRIBUTE CODE"] + if noise_code in {"ADEQUATE"}: + noise_result = "pass" + else: + raise NotImplementedError("No other observed codes yet") + else: + raise NotImplementedError("Noise insulation data missing - pls check") + append_result( + decent_homes_meta, + criteria="C", + variable="adequate_external_noise_insulation", + sub_variable="adequate_external_noise_insulation", + result=noise_result + ) + + # 6) Adequate common entrance areas (flats only) + if is_flat: + raise Exception("Pls check this") + common = get_element(data["elements"], LABEL_COMMON_CIRC) + if common: + circ_desc = common.get("ATTRIBUTE CODE DESCRIPTION", "") + common_areas_result = adequacy_result_by_text(circ_desc) + else: + common_areas_result = "no_data" + append_result(decent_homes_meta, "adequate_common_entrance_areas", common_areas_result) + + # ---------------- Criterion D ---------------- + # heating system type + heating = get_element(data["elements"], "Heating Improvement Required in Property") + if heating: + heat_type_code = heating["ATTRIBUTE CODE"] + if heat_type_code in {"NOTAPPLIC"}: + heating_type_result = "pass" + elif heat_type_code in {"WETINSFULL"}: + heating_type_result = "fail" + else: + raise NotImplementedError("No other observed codes yet") + else: + raise NotImplementedError("Heating element missing in dataset") + + append_result( + decent_homes_meta, + criteria="D", + variable="efficient_heating_system_type", + sub_variable="efficient_heating_system_type", + result=heating_type_result + ) + + # heating distribution + heating_dist = get_element(data["elements"], "Heating Distribution System in Property") + if heating_dist: + dist_code = heating_dist["ATTRIBUTE CODE"] + if dist_code == "UNKNOWN": + # For the observed case, there was no heating and wet heating needed to be installed in full so the value + # was unknown + heating_dist_result = "no_data" + else: + raise NotImplementedError("No other observed codes yet") + else: + raise NotImplementedError("Heating distribution element missing in dataset") + + append_result( + decent_homes_meta, + criteria="D", + variable="efficient_heating_distribution", + sub_variable="efficient_heating_distribution", + result=heating_dist_result + ) + + # insulation + loft = get_element(data["elements"], "Size in mm of Loft Insulation Thickness in Property") + wall = get_element(data["elements"], "Wall Insulation Improvement in External Area") + # To determine how much loft insulation is required + + # Loft insulation check (example threshold: ≥ 270mm = pass) + if loft: + # We have a specific code, where further loft insulation is needed - It appears the heating type check has + # already been completed in this dataset and so we just need to check the code + loft_code = loft["ATTRIBUTE CODE"] + if loft_code == "LOFTINSRQD": + loft_result = "fail" + elif loft_code.isnumeric(): + loft_result = "pass" + else: + raise NotImplementedError("Unknown loft insulation code - pls check") + else: + raise NotImplementedError("Loft insulation data missing - pls check") + append_result( + decent_homes_meta, + criteria="D", + variable="loft_insulation_sufficient", + sub_variable="loft_insulation_sufficient", + result=loft_result + ) + + # Wall insulation check + if wall: + wall_code = wall["ATTRIBUTE CODE"] + if wall_code in {"NONE"}: # Means no insulation improvement required + wall_result = "pass" + else: + raise NotImplementedError("No other observed codes yet") + else: + raise NotImplementedError("Wall insulation data missing - pls check") + append_result( + decent_homes_meta, + criteria="D", + variable="wall_insulation_sufficient", + sub_variable="wall_insulation_sufficient", + result=wall_result + ) + + # ---------------- Criterion A overall ---------------- + a_vars = set(HHSRS_MAPPING.keys()) + latest_a_results = {r["variable"]: r["result"] for r in decent_homes_meta if r["variable"] in a_vars} + + if any(v == "fail" for v in latest_a_results.values()): + criterion_a_result = "fail" + elif all(v == "pass" for v in latest_a_results.values()): + criterion_a_result = "pass" + else: + criterion_a_result = "no_data" + + # ---------------- Criterion B overall ---------------- + + component_results = {} + + for component in B_COMPONENT_LABELS.keys(): + comp_rows = [r for r in decent_homes_meta if + r["criteria"] == "B" and r["variable"] == component and r["sub_variable"] is not None] + comp_sub_results = [r["result"] for r in comp_rows] + + if not comp_sub_results: # no rows at all + comp_result = "no_data" + elif any(r == "fail" for r in comp_sub_results): + comp_result = "fail" + elif all(r == "pass" for r in comp_sub_results if r != "no_data"): + comp_result = "pass" + elif all(r == "no_data" for r in comp_sub_results): + comp_result = "no_data" + else: + comp_result = "no_data" + + component_results[component] = comp_result + + key_fails = [c for c, r in component_results.items() if c in KEY_COMPONENTS and r == "fail"] + other_fails = [c for c, r in component_results.items() if c in OTHER_COMPONENTS and r == "fail"] + + if key_fails: + criterion_b_result = "fail" + elif len(other_fails) >= 2: + criterion_b_result = "fail" + elif all(r == "no_data" for r in component_results.values()): + criterion_b_result = "no_data" + else: + criterion_b_result = "pass" + + # ---------------- Criterion C overall ---------------- + criterion_c_vars = [ + "kitchen_less_than_20_years_old", + "kitchen_adequate_space_and_layout", + "bathroom_less_than_30_years_old", + "bathroom_wc_appropriately_located", + "adequate_external_noise_insulation", + ] + if is_flat: + criterion_c_vars.append("adequate_common_entrance_areas") + + latest_c_results = {r["variable"]: r["result"] for r in decent_homes_meta if r["variable"] in criterion_c_vars} + + count_fails = sum(1 for v in latest_c_results.values() if v == "fail") + # optionally count no_data too if you want strict interpretation + criterion_c_result = "fail" if count_fails >= 3 else "pass" + + # ---------------- Criterion D overall ---------------- + # Needs to have both efficient geating and distribution so all should pass + criterion_d_vars = [ + "efficient_heating_system_type", + "efficient_heating_distribution", + "loft_insulation_sufficient", + "wall_insulation_sufficient", + ] + latest_d_results = {r["variable"]: r["result"] for r in decent_homes_meta if r["variable"] in criterion_d_vars} + + if any(v == "fail" for v in latest_d_results.values()): + criterion_d_result = "fail" + elif all(v == "pass" for v in latest_d_results.values()): + criterion_d_result = "pass" + else: + criterion_d_result = "no_data" + + # ---------------- Append to property_decent_homes ---------------- + property_decent_homes.append({ + "uprn": data.get("UPRN"), # TODO: Need UPRN + "creation_date": datetime.now().date().isoformat(), + "criterion_a": criterion_a_result, + "criterion_b": criterion_b_result, + "criterion_c": criterion_c_result, + "criterion_d": criterion_d_result, + "decent_homes": ( + criterion_a_result == "pass" + and criterion_c_result == "pass" + and criterion_d_result == "pass" + ) + }) + + return property_decent_homes[0], decent_homes_meta, +