mvp implementation for A, C, D, year mapping for B

This commit is contained in:
Khalim Conn-Kowlessar 2025-09-22 12:32:51 +01:00
parent d3f941349a
commit a5ae166971

View file

@ -16,26 +16,8 @@ def get_element(elements, label):
return elements.get(label)
def adequacy_result_by_text(attr_desc: str):
"""
Generic adequacy parser.
Pass if description clearly says 'Adequate' and not 'Inadequate'.
Fail if it says 'Inadequate' (or equivalent).
Unknown -> 'no_data'
"""
if not attr_desc or not isinstance(attr_desc, str):
return "no_data"
text = attr_desc.strip().lower()
# Common patterns
if "inadequate" in text or "unsatisfactory" in text or "problems" in text:
return "fail"
if "adequate" in text or "standard" in text or "appropriate" in text:
return "pass"
return "no_data"
def append_result(decent_homes, variable, result, install_date=None):
decent_homes.append({
def append_result(decent_homes_meta, variable, result, install_date=None):
decent_homes_meta.append({
"variable": variable,
"result": result,
"hhsrs_rank": None,
@ -97,7 +79,8 @@ CRITERION_B_VARIABLES = [
]
CRITERION_C_VARIABLES = [
"kitchen_facilities",
"kitchen_less_than_20_years_old", "kitchen_adequate_space_and_layout", "bathroom_less_than_30_years_old",
"bathroom_wc_appropriately_located", "adequate_external_noise_insulation", "adequate_common_entrance_areas",
]
# Criterion C explicit age limits (different from component lifespans used elsewhere)
@ -151,40 +134,163 @@ HHSRS_MAPPING = {
"structural_collapse_and_falling_elements": {"HHSRSSTRUC": STANDARD_HHSRS_MAPPING}
}
print(houses_waltham_forest_data[
houses_waltham_forest_data["ELEMENT CODE"] == "INTHTIMP"
][["ATTRIBUTE CODE", "ATTRIBUTE CODE DESCRIPTION"]].drop_duplicates())
# print(houses_waltham_forest_data[
# houses_waltham_forest_data["ELEMENT CODE"] == "INTBTHADEQ"
# ][["ATTRIBUTE CODE", "ATTRIBUTE CODE DESCRIPTION"]].drop_duplicates())
# print(flats_waltham_forest_data[
# flats_waltham_forest_data["ELEMENT CODE"] == "INTBTHADEQ"
# ][["ATTRIBUTE CODE", "ATTRIBUTE CODE DESCRIPTION"]].drop_duplicates())
print(flats_waltham_forest_data[
flats_waltham_forest_data["ELEMENT CODE"] == "INTBTHADEQ"
][["ATTRIBUTE CODE", "ATTRIBUTE CODE DESCRIPTION"]].drop_duplicates())
# Criterion B
CRITERION_B_MAPPING = {
# TODO: Needs to be sorted!!!
# "external_walls_structure": {
# "EXTWALLSTR": {"pass": "GOOD", "fail": "POOR", "no_data": "Unknown if Structural Defects in External Area"}
# }
"lintels": {
"EXTLINTELS": {"pass": "GOOD", "fail": "POOR", "no_data": "Unknown Condition of Lintels"}
}
B_COMPONENT_LABELS = {
# Key components
"wall_structure": [
"Wall Structure in External Area",
],
"lintels": [
"Lintels in External Area",
],
"brickwork_spalling": [
"Wall Spalling in External Area",
],
"wall_finish": [
"Wall Finish 1 in External Area",
"Wall Finish 2 in External Area",
"External Decorations in External Area",
"Brickwork Pointing in External Area",
],
"roof_structure": [
"Roof Structure 1 in External Area",
"Roof Structure 2 in External Area",
"Roof Structure 3 in External Area",
# If you later decide to include ancillary items, add:
# "Fascia / Soffit / Bargeboard in External Area",
# "Gutters in External Area", "Downpipes in External Area",
# "Internal Downpipes in External Area",
# and give them a clear condition rule.
],
"roof_finish": [
"Roof Covering 1 in External Area",
"Roof Covering 2 in External Area",
"Roof Covering 3 in External Area",
],
"chimneys": [
"Chimneys in External Area",
],
"windows": [
"Windows in Property",
"Windows 1 in External Area",
"Windows 2 in External Area",
"Garage and Store Windows in External Area",
"Garage Windows in External Area",
"Store Windows in External Area",
],
"external_doors": [
"Type and Location of Front Door in Property",
"Front Door Fire Rating in Property",
"Patio and French Doors 1 in External Area",
"Back and Side Doors 1 in External Area",
"Back and Side Doors 2 in External Area",
"Garage and Store Doors in External Area",
"Garage Door in External Area",
"Store Door in External Area",
],
"central_heating_boiler": [
# If the dataset exposes a specific boiler element, put it here.
# For now we only have "Heating Improvement Required in Property" elsewhere (Criterion D),
# which isn't reliable for age. If your JSON later includes a boiler line with INSTALL DATE,
# add its label here.
],
"heating_other": [
# e.g., gas fires/storage heaters if present as discrete elements later.
],
"electrical_systems": [
# If you have an installation line with dates (e.g. "Electrics Required in Property")
# add it here; we will rely on INSTALL DATE + REMAINING LIFE.
"Electrics Required in Property",
],
# Other components
"kitchen": [
"Adequacy of Kitchen and Type in Property",
],
"bathroom": [
"Adequacy of Bathroom Location in Property",
],
"central_heating_distribution_system": [
"Heating Distribution System in Property",
],
}
KEY_COMPONENTS = {
"wall_structure", "lintels", "brickwork_spalling", "wall_finish",
"roof_structure", "roof_finish", "chimneys", "windows",
"external_doors", "central_heating_boiler", "heating_other",
"electrical_systems",
}
OTHER_COMPONENTS = {
"kitchen", "bathroom", "central_heating_distribution_system",
}
# Criterion C
CRITERION_C_MAPPING = {
# "kitchen_less_than_20_years_old":
}
COMPONENT_LIFESPANS = {
"kitchen": {"house": 30, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30},
"bathroom": {"house": 50, "flat_below_6_storeys": 50, "flat_above_6_storeys": 50}
# Key components
"wall_structure": {
"house": 80, "flat_below_6_storeys": 80, "flat_above_6_storeys": 80
},
"lintels": {
"house": 60, "flat_below_6_storeys": 60, "flat_above_6_storeys": 60
},
"brickwork_spalling": {
"house": 30, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30
},
"wall_finish": {
"house": 60, "flat_below_6_storeys": 60, "flat_above_6_storeys": 30
},
"roof_structure": {
"house": 50, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30
},
"roof_finish": {
"house": 50, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30
},
"chimneys": {
"house": 50, "flat_below_6_storeys": 50, "flat_above_6_storeys": None # N/A
},
"windows": {
"house": 40, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30
},
"external_doors": {
"house": 40, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30
},
"central_heating_boiler": {
"house": 15, "flat_below_6_storeys": 15, "flat_above_6_storeys": 15
},
"heating_other": {
"house": 30, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30
},
"electrical_systems": {
"house": 30, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30
},
# Other components
"kitchen": {
"house": 30, "flat_below_6_storeys": 30, "flat_above_6_storeys": 30
},
"bathroom": {
"house": 40, "flat_below_6_storeys": 40, "flat_above_6_storeys": 40
},
"central_heating_distribution_system": {
"house": 40, "flat_below_6_storeys": 40, "flat_above_6_storeys": 40
},
}
# Database design
# creation_date, uprn, variable, result, hhsrs_score (optional, numeric), hhsrs_rank (A-J), install_date (for
# components which expire, e.g. kitchen)
decent_homes = []
decent_homes_meta = []
# Use to capture criterion A, B, C and D. Should be:
# {"uprn": int, "creation_date": datetime, "criterion_a": bool, "criterion_b": bool, "criterion_c": bool,
# "criterion_d": bool, "decent_homes": bool"}
@ -193,20 +299,16 @@ for fn in filenames:
with open(os.path.join(folder, fn), "rb") as f:
data = json.load(f)
from pprint import pprint
pprint(data["elements"])
property_info = data["property_info"]
if property_info["PROP TYPE"] in ["HOU"]:
property_type = "house"
elif property_info["PROP TYPE"] == "FLA":
raise Exception("Implement distrinction between below and above 6 storeys")
property_type = "flat"
# property_type = "flat"
else:
raise NotImplementedError("Unknown property type")
# Criterion A
# ---------------- Criterion A ----------------
for hhsrs_variable, mapping in HHSRS_MAPPING.items():
element_code = list(mapping.keys())[0]
@ -235,14 +337,48 @@ for fn in filenames:
hhsrs_result = "no_data"
else:
raise NotImplementedError("Mixed results not implemented")
decent_homes.append(
decent_homes_meta.append(
{"variable": hhsrs_variable, 'result': hhsrs_result, "hhsrs_rank": None, "hhsrs_score": None,
"install_date": None}
)
# Criterion B
# ---------------- Criterion B ----------------
# Check each of the components
# --- Criterion C ---
component_pass_or_fail = []
# TODO: Delete me
component, labels = list(B_COMPONENT_LABELS.items())[1]
for component, labels in B_COMPONENT_LABELS.items():
# TODO: labels may not need to be multiple variables
for label in labels:
# Grab the label
label_data = get_element(data["elements"], label)
# 1) We check if the component is old
install_date = pd.to_datetime(label_data["INSTALL DATE"])
if pd.isnull(install_date):
raise ValueError("Missing install date - pls check")
component_lifetime = COMPONENT_LIFESPANS[component][property_type]
# This should be populated, and for the pilot it's okay if this errors if missing - we'll handle accordingly
is_old = years_between(today.to_pydatetime(), install_date.to_pydatetime()) >= component_lifetime
# 2) We check if the component is in poor condition
if pd.isnull(label_data["REMAINING LIFE"]):
raise ValueError("Missing remaining life - pls check")
has_failed = label_data["REMAINING LIFE"] < 0
# The component needs to have both failed and be old to fail criterion B
component_result = "fail" if is_old and has_failed else "pass"
component_pass_or_fail.append(
{
"component": component,
"label": label,
"install_date": str(install_date),
"remaining_life": label_data["REMAINING LIFE"],
"is_old": is_old,
"has_failed": has_failed,
"result": component_result
}
)
# ---------------- Criterion C ----------------
today = pd.Timestamp.today().normalize()
# Guard: property type string already set earlier
@ -251,71 +387,67 @@ for fn in filenames:
# 1) Kitchen age ≤ 20 years
kitchen = get_element(data["elements"], LABEL_KITCHEN)
if kitchen:
kit_install_raw = kitchen.get("INSTALL DATE")
try:
kit_install = pd.to_datetime(kit_install_raw)
kit_age_years = years_between(today.to_pydatetime(), kit_install.to_pydatetime())
kitchen_age_result = "pass" if kit_age_years <= CRITERION_C_AGE_LIMITS["kitchen_years_max"] else "fail"
# For transparency, store next renewal as install + 20 years (criterion C perspective)
kit_next_due = kit_install + pd.DateOffset(years=CRITERION_C_AGE_LIMITS["kitchen_years_max"])
except Exception:
kitchen_age_result = "no_data"
kit_next_due = None
kit_install_raw = kitchen["INSTALL DATE"]
kit_install = pd.to_datetime(kit_install_raw)
kit_age_years = years_between(today.to_pydatetime(), kit_install.to_pydatetime())
kitchen_age_result = "pass" if kit_age_years <= CRITERION_C_AGE_LIMITS["kitchen_years_max"] else "fail"
# For transparency, store next renewal as install + 20 years (criterion C perspective)
kit_next_due = kit_install + pd.DateOffset(years=CRITERION_C_AGE_LIMITS["kitchen_years_max"])
else:
kitchen_age_result = "no_data"
kit_next_due = None
append_result(decent_homes, "kitchen_less_than_20_years_old", kitchen_age_result, kit_next_due)
raise NotImplementedError("Kitchen data missing - pls check")
append_result(
decent_homes_meta, "kitchen_less_than_20_years_old", kitchen_age_result, install_date=str(kit_install)
)
# 2) Kitchen adequate space/layout
# Prefer explicit codes if you have them, fall back to text in ATTRIBUTE CODE DESCRIPTION
if kitchen:
kit_attr_desc = kitchen.get("ATTRIBUTE CODE DESCRIPTION", "")
# If you prefer codes, you can also branch here on kitchen.get("ATTRIBUTE CODE") == "STDKITADQ"
kitchen_adequacy_result = adequacy_result_by_text(kit_attr_desc)
kit_attr_desc = kitchen["ATTRIBUTE CODE"]
if kit_attr_desc == "STDKITADQ":
kitchen_adequacy_result = "pass"
else:
raise NotImplementedError("No other observed codes yet")
else:
kitchen_adequacy_result = "no_data"
append_result(decent_homes, "kitchen_adequate_space_and_layout", kitchen_adequacy_result)
raise NotImplementedError("Kitchen data missing - pls check")
append_result(decent_homes_meta, "kitchen_adequate_space_and_layout", kitchen_adequacy_result)
# 3) Bathroom age ≤ 30 years
bath = get_element(data["elements"], LABEL_BATHROOM)
if bath:
bth_install_raw = bath.get("INSTALL DATE")
try:
bth_install = pd.to_datetime(bth_install_raw)
bth_age_years = years_between(today.to_pydatetime(), bth_install.to_pydatetime())
bathroom_age_result = "pass" if bth_age_years <= CRITERION_C_AGE_LIMITS["bathroom_years_max"] else "fail"
bth_next_due = bth_install + pd.DateOffset(years=CRITERION_C_AGE_LIMITS["bathroom_years_max"])
except Exception:
bathroom_age_result = "no_data"
bth_next_due = None
bth_install_raw = bath["INSTALL DATE"]
bth_install = pd.to_datetime(bth_install_raw)
bth_age_years = years_between(today.to_pydatetime(), bth_install.to_pydatetime())
bathroom_age_result = "pass" if bth_age_years <= CRITERION_C_AGE_LIMITS["bathroom_years_max"] else "fail"
bth_next_due = bth_install + pd.DateOffset(years=CRITERION_C_AGE_LIMITS["bathroom_years_max"])
else:
bathroom_age_result = "no_data"
bth_next_due = None
append_result(decent_homes, "bathroom_less_than_30_years_old", bathroom_age_result, bth_next_due)
raise NotImplementedError("Bathroom data missing - pls check")
append_result(
decent_homes_meta, "bathroom_less_than_30_years_old", bathroom_age_result, install_date=str(bth_install)
)
# 4) Bathroom/WC appropriately located
if bath:
# You already observed codes like STDBTHADQ / ADPBTHADQ as 'pass'
bth_attr_code = bath.get("ATTRIBUTE CODE", "")
bth_attr_desc = bath.get("ATTRIBUTE CODE DESCRIPTION", "")
known_pass_codes = {"STDBTHADQ", "ADPBTHADQ"}
if bth_attr_code in known_pass_codes:
bth_attr_code = bath["ATTRIBUTE CODE"]
if bth_attr_code in {"STDBTHADQ", "ADPBTHADQ"}:
bathroom_location_result = "pass"
else:
# Fallback to text adequacy check
bathroom_location_result = adequacy_result_by_text(bth_attr_desc)
raise NotImplementedError("No other observed codes yet")
else:
bathroom_location_result = "no_data"
append_result(decent_homes, "bathroom_wc_appropriately_located", bathroom_location_result)
raise NotImplementedError("Bathroom data missing - pls check")
append_result(decent_homes_meta, "bathroom_wc_appropriately_located", bathroom_location_result)
# 5) Adequate external noise insulation
noise = get_element(data["elements"], LABEL_NOISE)
if noise:
noise_desc = noise.get("ATTRIBUTE CODE DESCRIPTION", "")
noise_result = adequacy_result_by_text(noise_desc)
noise_code = noise["ATTRIBUTE CODE"]
if noise_code in {"ADEQUATE"}:
noise_result = "pass"
else:
raise NotImplementedError("No other observed codes yet")
else:
noise_result = "no_data"
append_result(decent_homes, "adequate_external_noise_insulation", noise_result)
raise NotImplementedError("Noise insulation data missing - pls check")
append_result(decent_homes_meta, "adequate_external_noise_insulation", noise_result)
# 6) Adequate common entrance areas (flats only)
if is_flat:
@ -326,14 +458,13 @@ for fn in filenames:
common_areas_result = adequacy_result_by_text(circ_desc)
else:
common_areas_result = "no_data"
append_result(decent_homes, "adequate_common_entrance_areas", common_areas_result)
append_result(decent_homes_meta, "adequate_common_entrance_areas", common_areas_result)
# ---------------- Criterion D ----------------
# heating system type
heating = get_element(data["elements"], "Heating Improvement Required in Property")
if heating:
# Example: ATTRIBUTE CODE == "GOOD" means pass, "POOR" means fail
heat_type_code = heating.get("ATTRIBUTE CODE", "")
heat_type_code = heating["ATTRIBUTE CODE"]
if heat_type_code in {"NOTAPPLIC"}:
heating_type_result = "pass"
elif heat_type_code in {"WETINSFULL"}:
@ -343,28 +474,33 @@ for fn in filenames:
else:
raise NotImplementedError("Heating element missing in dataset")
append_result(decent_homes, "efficient_heating_system_type", heating_type_result)
append_result(decent_homes_meta, "efficient_heating_system_type", heating_type_result)
# heating distribution
heating_dist = get_element(data["elements"], "Heating Distribution System in Property")
if heating_dist:
dist_desc = heating_dist.get("ATTRIBUTE CODE DESCRIPTION", "")
heating_dist_result = adequacy_result_by_text(dist_desc)
dist_code = heating_dist["ATTRIBUTE CODE"]
if dist_code == "UNKNOWN":
# For the observed case, there was no heating and wet heating needed to be installed in full so the value
# was unknown
heating_dist_result = "no_data"
else:
raise NotImplementedError("No other observed codes yet")
else:
raise NotImplementedError("Heating distribution element missing in dataset")
append_result(decent_homes, "efficient_heating_distribution", heating_dist_result)
append_result(decent_homes_meta, "efficient_heating_distribution", heating_dist_result)
# insulation
loft = get_element(data["elements"], "Size in mm of Loft Insulation Thickness in Property")
wall = get_element(data["elements"], "Wall Insulation Improvement in External Area")
heating = get_element(data["elements"], "Heating Improvement Required in Property")
# To determine how much loft insulation is required
# Loft insulation check (example threshold: ≥ 270mm = pass)
if loft:
# We have a specific code, where further loft insulation is needed
loft_code = loft.get("ATTRIBUTE CODE", "")
# We have a specific code, where further loft insulation is needed - It appears the heating type check has
# already been completed in this dataset and so we just need to check the code
loft_code = loft["ATTRIBUTE CODE"]
if loft_code == "LOFTINSRQD":
loft_result = "fail"
elif loft_code.isnumeric():
@ -373,19 +509,22 @@ for fn in filenames:
raise NotImplementedError("Unknown loft insulation code - pls check")
else:
raise NotImplementedError("Loft insulation data missing - pls check")
append_result(decent_homes, "loft_insulation_sufficient", loft_result)
append_result(decent_homes_meta, "loft_insulation_sufficient", loft_result)
# Wall insulation check (simple adequacy parser)
# Wall insulation check
if wall:
wall_desc = wall.get("ATTRIBUTE CODE DESCRIPTION", "")
wall_result = adequacy_result_by_text(wall_desc)
wall_code = wall["ATTRIBUTE CODE"]
if wall_code in {"NONE"}: # Means no insulation improvement required
wall_result = "pass"
else:
raise NotImplementedError("No other observed codes yet")
else:
raise NotImplementedError("Wall insulation data missing - pls check")
append_result(decent_homes, "wall_insulation_sufficient", wall_result)
append_result(decent_homes_meta, "wall_insulation_sufficient", wall_result)
# ---------------- Criterion A overall ----------------
a_vars = set(HHSRS_MAPPING.keys())
latest_a_results = {r["variable"]: r["result"] for r in decent_homes if r["variable"] in a_vars}
latest_a_results = {r["variable"]: r["result"] for r in decent_homes_meta if r["variable"] in a_vars}
if any(v == "fail" for v in latest_a_results.values()):
criterion_a_result = "fail"
@ -405,20 +544,21 @@ for fn in filenames:
if is_flat:
criterion_c_vars.append("adequate_common_entrance_areas")
latest_c_results = {r["variable"]: r["result"] for r in decent_homes if r["variable"] in criterion_c_vars}
latest_c_results = {r["variable"]: r["result"] for r in decent_homes_meta if r["variable"] in criterion_c_vars}
count_fails = sum(1 for v in latest_c_results.values() if v == "fail")
# optionally count no_data too if you want strict interpretation
criterion_c_result = "fail" if count_fails >= 3 else "pass"
# ---------------- Criterion D overall ----------------
# Needs to have both efficient geating and distribution so all should pass
criterion_d_vars = [
"efficient_heating_system_type",
"efficient_heating_distribution",
"loft_insulation_sufficient",
"wall_insulation_sufficient",
]
latest_d_results = {r["variable"]: r["result"] for r in decent_homes if r["variable"] in criterion_d_vars}
latest_d_results = {r["variable"]: r["result"] for r in decent_homes_meta if r["variable"] in criterion_d_vars}
if any(v == "fail" for v in latest_d_results.values()):
criterion_d_result = "fail"
@ -429,7 +569,7 @@ for fn in filenames:
# ---------------- Append to property_decent_homes ----------------
property_decent_homes.append({
"uprn": property_info.get("UPRN"), # update field name if needed
"uprn": property_info.get("UPRN"), # TODO: Need UPRN
"creation_date": datetime.now().date().isoformat(),
"criterion_a": criterion_a_result,
"criterion_b": None, # not yet implemented
@ -438,5 +578,6 @@ for fn in filenames:
"decent_homes": (
criterion_a_result == "pass"
and criterion_c_result == "pass"
and criterion_d_result == "pass"
)
})