diff --git a/.devcontainer/backend/docker-compose.yml b/.devcontainer/backend/docker-compose.yml
index 75526e79..683b4489 100644
--- a/.devcontainer/backend/docker-compose.yml
+++ b/.devcontainer/backend/docker-compose.yml
@@ -9,10 +9,20 @@ services:
command: sleep infinity
volumes:
- ../../:/workspaces/model
- networks:
- - model-net
-networks:
- model-net:
- driver: bridge
+ db:
+ image: postgres:17.4
+ restart: unless-stopped
+ ports:
+ - 5432:5432
+ environment:
+ - PGDATABASE=tech_team_local_db
+ - POSTGRES_USER=postgres
+ - POSTGRES_PASSWORD=makingwarmerhomes
+ volumes:
+ - postgres-data-two:/var/lib/postgresql/data
+
+
+volumes:
+ postgres-data-two:
\ No newline at end of file
diff --git a/.idea/copilot.data.migration.agent.xml b/.idea/copilot.data.migration.agent.xml
new file mode 100644
index 00000000..4ea72a91
--- /dev/null
+++ b/.idea/copilot.data.migration.agent.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/copilot.data.migration.ask.xml b/.idea/copilot.data.migration.ask.xml
new file mode 100644
index 00000000..7ef04e2e
--- /dev/null
+++ b/.idea/copilot.data.migration.ask.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/copilot.data.migration.ask2agent.xml b/.idea/copilot.data.migration.ask2agent.xml
new file mode 100644
index 00000000..1f2ea11e
--- /dev/null
+++ b/.idea/copilot.data.migration.ask2agent.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/copilot.data.migration.edit.xml b/.idea/copilot.data.migration.edit.xml
new file mode 100644
index 00000000..8648f940
--- /dev/null
+++ b/.idea/copilot.data.migration.edit.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/asset_list/app.py b/asset_list/app.py
index 8b3abefb..9907a609 100644
--- a/asset_list/app.py
+++ b/asset_list/app.py
@@ -12,35 +12,23 @@ from asset_list.utils import get_data
from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
-
load_dotenv(dotenv_path="backend/.env")
-EPC_AUTH_TOKEN = os.getenv(
- "EPC_AUTH_TOKEN",
- "a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzA=",
-)
+EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN", "a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzA=")
-def extract_address1(
- asset_list, full_address_col, postcode_col, method="first_two_words"
-):
+def extract_address1(asset_list, full_address_col, postcode_col, method="first_two_words"):
if method == "first_two_words":
- asset_list["address1_extracted"] = (
- asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
- )
+ asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
return asset_list
if method == "first_word":
- asset_list["address1_extracted"] = (
- asset_list[full_address_col].str.split(" ").str[0]
- )
+ asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[0]
return asset_list
if method == "house_number_extraction":
asset_list["address1_extracted"] = asset_list.apply(
- lambda x: SearchEpc.get_house_number(
- address=x[full_address_col], postcode=x[postcode_col]
- ),
- axis=1,
+ lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]),
+ axis=1
)
return asset_list
@@ -69,24 +57,65 @@ def app():
EPC recommendations
Property UPRN
"""
- data_folder = "/workspaces/model/asset_list"
+<<<<<<< HEAD
+ data_folder = ("/workspaces/model/asset_list")
data_filename = "assets.xlsx"
+=======
+
+ data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Hackney"
+ data_filename = "Domna SHF Wave 3 (3).xlsx"
+ sheet_name = "Domna Wave 3"
+ postcode_column = 'Postcode'
+ address1_column = "Address 1"
+ address1_method = None
+ fulladdress_column = None
+ address_cols_to_concat = ["Address 1"]
+ missing_postcodes_method = None
+ landlord_year_built = "Construction Years"
+ landlord_os_uprn = "UPRN"
+ landlord_property_type = "Type"
+ landlord_built_form = "Attachment"
+ landlord_wall_construction = "Wall type"
+ landlord_roof_construction = None
+ landlord_heating_system = None
+ landlord_existing_pv = None
+ landlord_property_id = "Row ID"
+ landlord_sap = None
+ outcomes_filename = None
+ outcomes_sheetname = None
+ outcomes_postcode = None
+ outcomes_houseno = None
+ outcomes_id = None
+ outcomes_address = None
+ master_filepaths = []
+ master_id_colnames = []
+ master_to_asset_list_filepath = None
+ phase = False
+ ecosurv_landlords = None
+ asset_list_header = 0
+ landlord_block_reference = None
+
+ # Peabody data for cleaning
+ data_folder = ("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting "
+ "Project/data_validation")
+ data_filename = "to_standardise_uprns.xlsx"
+>>>>>>> 3874da6177cbcc37f7a488bec0a06e387906653c
sheet_name = "Sheet1"
- postcode_column = "POSTCODE"
+ postcode_column = 'Postcode'
address1_column = None
- address1_method = "house_number_extraction"
- fulladdress_column = "ADDRESS"
+ address1_method = 'house_number_extraction'
+ fulladdress_column = 'Address'
address_cols_to_concat = None
missing_postcodes_method = None
landlord_year_built = None
- landlord_os_uprn = "UPRN"
+ landlord_os_uprn = None
landlord_property_type = None
- landlord_built_form = "BUILD FORM"
+ landlord_built_form = None
landlord_wall_construction = None
landlord_roof_construction = None
landlord_heating_system = None
landlord_existing_pv = None
- landlord_property_id = "UPRN"
+ landlord_property_id = "LLUPRN"
landlord_sap = None
outcomes_filename = None
outcomes_sheetname = None
@@ -126,62 +155,49 @@ def app():
landlord_existing_pv=landlord_existing_pv,
landlord_sap=landlord_sap,
landlord_block_reference=landlord_block_reference,
- phase=phase,
+ phase=phase
)
asset_list.init_standardise()
# We produce the new maps, which can be saved for future useage
new_property_type_map = {
- k: v
- for k, v in (
- asset_list.variable_mappings[asset_list.landlord_property_type]
- if asset_list.landlord_property_type
- else {}
+ k: v for k, v in (
+ asset_list.variable_mappings[asset_list.landlord_property_type] if
+ asset_list.landlord_property_type else {}
).items()
if k not in PROPERTY_MAPPING
}
new_built_form_map = {
- k: v
- for k, v in (
- asset_list.variable_mappings[asset_list.landlord_built_form]
- if asset_list.landlord_built_form
- else {}
+ k: v for k, v in (
+ asset_list.variable_mappings[asset_list.landlord_built_form] if
+ asset_list.landlord_built_form else {}
).items()
if k not in BUILT_FORM_MAPPINGS
}
new_wall_map = {
- k: v
- for k, v in (
- asset_list.variable_mappings[asset_list.landlord_wall_construction]
- if asset_list.landlord_wall_construction
- else {}
+ k: v for k, v in (
+ asset_list.variable_mappings[asset_list.landlord_wall_construction] if
+ asset_list.landlord_wall_construction else {}
).items()
if k not in WALL_CONSTRUCTION_MAPPINGS
}
new_heating_map = {
- k: v
- for k, v in (
- asset_list.variable_mappings[asset_list.landlord_heating_system]
- if asset_list.landlord_heating_system
- else {}
+ k: v for k, v in (
+ asset_list.variable_mappings[asset_list.landlord_heating_system] if
+ asset_list.landlord_heating_system else {}
).items()
if k not in HEATING_MAPPINGS
}
new_existing_pv_map = {
- k: v
- for k, v in (
- asset_list.variable_mappings[asset_list.landlord_existing_pv]
- if asset_list.landlord_existing_pv
- else {}
+ k: v for k, v in (
+ asset_list.variable_mappings[asset_list.landlord_existing_pv] if asset_list.landlord_existing_pv else {}
).items()
if k not in EXISTING_PV_MAPPINGS
}
new_roof_construction_map = {
- k: v
- for k, v in (
- asset_list.variable_mappings[asset_list.landlord_roof_construction]
- if asset_list.landlord_roof_construction
- else {}
+ k: v for k, v in (
+ asset_list.variable_mappings[asset_list.landlord_roof_construction] if
+ asset_list.landlord_roof_construction else {}
).items()
if k not in ROOF_CONSTRUCTION_MAPPINGS
}
@@ -195,7 +211,7 @@ def app():
outcomes_address=outcomes_address,
outcomes_postcode=outcomes_postcode,
outcomes_houseno=outcomes_houseno,
- outcomes_id=outcomes_id,
+ outcomes_id=outcomes_id
)
asset_list.flag_survey_master(
@@ -229,16 +245,14 @@ def app():
skip = max(chunk_indexes)
if any(x in folder_contents for x in downloaded_files):
- skip = max(
- [i for i in chunk_indexes if filename.format(i=i) in folder_contents]
- )
+ skip = max([i for i in chunk_indexes if filename.format(i=i) in folder_contents])
for i in range(0, len(asset_list.standardised_asset_list), chunk_size):
print(f"Processing chunk {i} to {i + chunk_size}")
if skip is not None and not force_retrieve_data:
if i <= skip:
continue
- chunk = asset_list.standardised_asset_list[i : i + chunk_size]
+ chunk = asset_list.standardised_asset_list[i:i + chunk_size]
epc_data_chunk, errors_chunk, no_epc_chunk = get_data(
df=chunk,
row_id_name=asset_list.DOMNA_PROPERTY_ID,
@@ -250,7 +264,7 @@ def app():
built_form_column=AssetList.STANDARD_BUILT_FORM,
manual_uprn_map=manual_uprn_map,
epc_api_only=epc_api_only,
- epc_auth_token=EPC_AUTH_TOKEN,
+ epc_auth_token=EPC_AUTH_TOKEN
)
# We now retrieve any failed properties
@@ -273,9 +287,7 @@ def app():
# Append the failed data to the main data
# Store the chunk locally as a csv
- pd.DataFrame(epc_data_chunk).to_csv(
- os.path.join(data_folder, f"Chunks/Chunk {i}.csv"), index=False
- )
+ pd.DataFrame(epc_data_chunk).to_csv(os.path.join(data_folder, f"Chunks/Chunk {i}.csv"), index=False)
# Store the errors and no-data locally
with open(os.path.join(data_folder, f"Chunks/Chunk {i} errors.json"), "w") as f:
json.dump(errors_chunk, f)
@@ -306,9 +318,7 @@ def app():
unique_recommendations = set()
for _, row in recommendations_df.iterrows():
- unique_recommendations.update(
- [rec["improvement-summary-text"] for rec in row["recommendations"]]
- )
+ unique_recommendations.update([rec["improvement-summary-text"] for rec in row["recommendations"]])
columns = [asset_list.DOMNA_PROPERTY_ID] + list(unique_recommendations)
transformed_data = []
@@ -328,24 +338,20 @@ def app():
transformed_df = pd.DataFrame(transformed_data)
for col in [
"Floor insulation (solid floor)",
- "Floor insulation",
- "Floor insulation (suspended floor)",
+ "Floor insulation", "Floor insulation (suspended floor)"
]:
if col not in transformed_df.columns:
transformed_df[col] = False
transformed_df = transformed_df[
[
- asset_list.DOMNA_PROPERTY_ID,
- "Floor insulation (solid floor)",
- "Floor insulation",
- "Floor insulation (suspended floor)",
+ asset_list.DOMNA_PROPERTY_ID, "Floor insulation (solid floor)",
+ "Floor insulation", "Floor insulation (suspended floor)"
]
]
transformed_df["epc_has_floor_recommendation"] = (
- transformed_df["Floor insulation (solid floor)"]
- | transformed_df["Floor insulation"]
- | transformed_df["Floor insulation (suspended floor)"]
+ transformed_df["Floor insulation (solid floor)"] | transformed_df["Floor insulation"] |
+ transformed_df["Floor insulation (suspended floor)"]
)
# Get the find my epc data
@@ -358,20 +364,21 @@ def app():
find_my_epc_data.append(
{
asset_list.DOMNA_PROPERTY_ID: x[asset_list.DOMNA_PROPERTY_ID],
- **x["find_my_epc_data"],
+ **x["find_my_epc_data"]
}
)
else:
find_my_epc_data.append(
- {asset_list.DOMNA_PROPERTY_ID: x[asset_list.DOMNA_PROPERTY_ID]}
+ {
+ asset_list.DOMNA_PROPERTY_ID: x[asset_list.DOMNA_PROPERTY_ID]
+ }
)
find_my_epc_data = pd.DataFrame(find_my_epc_data)
find_my_epc_data = find_my_epc_data.merge(
transformed_df[[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"]],
- how="left",
- on=asset_list.DOMNA_PROPERTY_ID,
+ how="left", on=asset_list.DOMNA_PROPERTY_ID
)
# We check if we get the solar pv column:
@@ -381,26 +388,24 @@ def app():
# Retrieve just the data we need
epc_df = epc_df[
[asset_list.DOMNA_PROPERTY_ID] + list(asset_list.EPC_API_DATA_NAMES.keys())
- ].rename(columns=asset_list.EPC_API_DATA_NAMES)
+ ].rename(
+ columns=asset_list.EPC_API_DATA_NAMES
+ )
# Look for columns not in the find my EPC data, which will have happened if we didn't
# retrieve it in the first place
- missed_find_epc_cols = [
- c
- for c in list(asset_list.FIND_EPC_DATA_NAMES.keys())
- if c not in find_my_epc_data.columns
- ]
+ missed_find_epc_cols = [c for c in list(asset_list.FIND_EPC_DATA_NAMES.keys()) if c not in find_my_epc_data.columns]
if missed_find_epc_cols:
for c in missed_find_epc_cols:
find_my_epc_data[c] = None
epc_df = epc_df.merge(
find_my_epc_data[
- [asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"]
- + list(asset_list.FIND_EPC_DATA_NAMES.keys())
- ].rename(columns=asset_list.FIND_EPC_DATA_NAMES),
+ [asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"] + list(asset_list.FIND_EPC_DATA_NAMES.keys())
+ ]
+ .rename(columns=asset_list.FIND_EPC_DATA_NAMES),
how="left",
- on=asset_list.DOMNA_PROPERTY_ID,
+ on=asset_list.DOMNA_PROPERTY_ID
)
asset_list.merge_data(epc_df)
@@ -417,10 +422,7 @@ def app():
asset_list.get_work_figures()
# Store as an excel
- filename = (
- os.path.join(data_folder, ".".join(data_filename.split(".")[:-1]))
- + " - Standardised.xlsx"
- )
+ filename = os.path.join(data_folder, ".".join(data_filename.split(".")[:-1])) + " - Standardised.xlsx"
# Store the data in two tabs. One for the asset list with the EPC data and the second with the flat data
# Determine inspections priority
@@ -444,42 +446,26 @@ def app():
# )
with pd.ExcelWriter(filename) as writer:
- asset_list.standardised_asset_list.to_excel(
- writer, sheet_name="Standardised Asset List", index=False
- )
+ asset_list.standardised_asset_list.to_excel(writer, sheet_name="Standardised Asset List", index=False)
if asset_list.block_analysis_df is not None:
- asset_list.block_analysis_df.to_excel(
- writer, sheet_name="Block Analysis", index=False
- )
+ asset_list.block_analysis_df.to_excel(writer, sheet_name="Block Analysis", index=False)
# If we have outcomes, we add a tab with the outcomes
if not asset_list.outcomes_for_output.empty:
- asset_list.outcomes_for_output.to_excel(
- writer, sheet_name="Outcomes", index=False
- )
+ asset_list.outcomes_for_output.to_excel(writer, sheet_name="Outcomes", index=False)
if not asset_list.unmatched_submissions.empty:
- asset_list.unmatched_submissions.to_excel(
- writer, sheet_name="Unmatched Submissions", index=False
- )
+ asset_list.unmatched_submissions.to_excel(writer, sheet_name="Unmatched Submissions", index=False)
if not asset_list.outcomes_no_match.empty:
- asset_list.outcomes_no_match.to_excel(
- writer, sheet_name="Unmatched Outcomes", index=False
- )
+ asset_list.outcomes_no_match.to_excel(writer, sheet_name="Unmatched Outcomes", index=False)
if not asset_list.ecosurv_no_match.empty:
- asset_list.ecosurv_no_match.to_excel(
- writer, sheet_name="Unmatched Ecosurv", index=False
- )
+ asset_list.ecosurv_no_match.to_excel(writer, sheet_name="Unmatched Ecosurv", index=False)
if not asset_list.geographical_areas.empty:
- asset_list.geographical_areas.to_excel(
- writer, sheet_name="Geographical Areas", index=False
- )
+ asset_list.geographical_areas.to_excel(writer, sheet_name="Geographical Areas", index=False)
# Store dupes
if asset_list.duplicated_addresses is not None:
if not asset_list.duplicated_addresses.empty:
- asset_list.duplicated_addresses.to_excel(
- writer, sheet_name="Duplicate Properties", index=False
- )
+ asset_list.duplicated_addresses.to_excel(writer, sheet_name="Duplicate Properties", index=False)
diff --git a/backend/.env.local b/backend/.env.local
new file mode 100644
index 00000000..a05c93a3
--- /dev/null
+++ b/backend/.env.local
@@ -0,0 +1,22 @@
+DB_HOST=db
+DB_PORT=5432
+DB_NAME=tech_team_local_db
+DB_USERNAME=postgres
+DB_PASSWORD=makingwarmerhomes
+
+
+#not used
+GOOGLE_SOLAR_API_KEY="test"
+SAP_PREDICTIONS_BUCKET="test"
+CARBON_PREDICTIONS_BUCKET="test"
+HEAT_PREDICTIONS_BUCKET="test"
+HEATING_KWH_PREDICTIONS_BUCKET="test"
+HOTWATER_KWH_PREDICTIONS_BUCKET="test"
+API_KEY="test"
+ENVIRONMENT="test"
+SECRET_KEY="test"
+PLAN_TRIGGER_BUCKET="test"
+DATA_BUCKET="test"
+EPC_AUTH_TOKEN="test"
+ENGINE_SQS_URL="test"
+ENERGY_ASSESSMENTS_BUCKET="test"
\ No newline at end of file
diff --git a/backend/Property.py b/backend/Property.py
index fa607cfd..14f7e03f 100644
--- a/backend/Property.py
+++ b/backend/Property.py
@@ -84,6 +84,7 @@ class Property:
uprn=None, # Pass as an optional input
property_valuation=None,
already_installed=None,
+ find_my_epc_components=None,
non_invasive_recommendations=None,
measures=None,
energy_assessment=None,
@@ -114,6 +115,7 @@ class Property:
non_invasive_recommendations['recommendations'] if
non_invasive_recommendations else []
)
+ self.find_my_epc_components = find_my_epc_components # Store the find my epc components
# This is a list of measures that have been recommended for the property
if isinstance(measures, list):
self.measures = measures
@@ -551,7 +553,7 @@ class Property:
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
"cylinder_thermostat", "loft_insulation", "room_roof_insulation", "flat_roof_insulation",
"solid_floor_insulation", "suspended_floor_insulation", "mixed_glazing",
- "windows_glazing", "mechanical_ventilation", "solar_pv"
+ "windows_glazing", "mechanical_ventilation", "solar_pv", "sloping_ceiling_insulation"
]:
# We update the data, as defined in the recommendaton
for prefix in ["walls", "roof", "floor"]:
@@ -574,7 +576,7 @@ class Property:
"solid_floor_insulation", "suspended_floor_insulation",
"windows_glazing", "solar_pv", "heating", "hot_water_tank_insulation",
"heating_control", "secondary_heating", "cylinder_thermostat", "mixed_glazing",
- "extension_cavity_wall_insulation", "mechanical_ventilation",
+ "extension_cavity_wall_insulation", "mechanical_ventilation", "sloping_ceiling_insulation"
]:
raise NotImplementedError(
"Implement me, given type %s" % recommendation["type"]
diff --git a/backend/app/config.py b/backend/app/config.py
index dd3f5db1..b335c215 100644
--- a/backend/app/config.py
+++ b/backend/app/config.py
@@ -42,7 +42,7 @@ class Settings(BaseSettings):
AWS_DEFAULT_REGION: Optional[str] = None
class Config:
- env_file = "backend/.env"
+ env_file = "backend/.env.local"
@lru_cache()
diff --git a/backend/app/db/connection.py b/backend/app/db/connection.py
index 74f3bd2e..f0649c71 100644
--- a/backend/app/db/connection.py
+++ b/backend/app/db/connection.py
@@ -3,7 +3,9 @@ from contextlib import contextmanager
from backend.app.config import get_settings
from sqlmodel import Session
-connection_string = "postgresql+{drivername}://{username}:{password}@{server}:{port}/{dbname}"
+connection_string = (
+ "postgresql+{drivername}://{username}:{password}@{server}:{port}/{dbname}"
+)
db_string = connection_string.format(
drivername="psycopg2", # You'll need to use psycopg2 driver for PostgreSQL
username=get_settings().DB_USERNAME,
@@ -28,7 +30,9 @@ db_engine = create_engine(
def get_db_session():
if db_engine is None:
- raise RuntimeError("Database is not configured. Set DATABASE_URL in environment variables.")
+ raise RuntimeError(
+ "Database is not configured. Set DATABASE_URL in environment variables."
+ )
return Session(db_engine)
diff --git a/backend/app/db/functions/condition_functions.py b/backend/app/db/functions/condition_functions.py
new file mode 100644
index 00000000..d281b9a4
--- /dev/null
+++ b/backend/app/db/functions/condition_functions.py
@@ -0,0 +1,12 @@
+from typing import List
+from sqlalchemy import insert, delete
+from sqlalchemy.orm import Session
+
+from backend.app.db.connection import db_session, db_read_session
+from backend.app.db.models.condition import PropertyConditionSurveyModel
+
+
+def bulk_insert_property_surveys(
+ session: Session, surveys: List[PropertyConditionSurveyModel]
+) -> None:
+ raise NotImplementedError
diff --git a/backend/app/db/models/condition.py b/backend/app/db/models/condition.py
new file mode 100644
index 00000000..77043366
--- /dev/null
+++ b/backend/app/db/models/condition.py
@@ -0,0 +1,97 @@
+from sqlalchemy import (
+ BigInteger,
+ Column,
+ Date,
+ ForeignKey,
+ Integer,
+ String,
+ Enum as SqlEnum,
+)
+from sqlalchemy.orm import declarative_base, relationship
+
+from backend.condition.domain.aspect_type import AspectType
+from backend.condition.domain.element_type import ElementType
+
+Base = declarative_base()
+
+ElementTypeDb = SqlEnum(
+ ElementType,
+ name="element_type",
+ native_enum=True,
+ values_callable=lambda enum: [e.value for e in enum],
+)
+
+AspectTypeDb = SqlEnum(
+ AspectType,
+ name="aspect_type",
+ native_enum=True,
+ values_callable=lambda enum: [a.value for a in enum],
+)
+
+
+class PropertyConditionSurveyModel(Base):
+ __tablename__ = "property_condition_survey"
+
+ id = Column(BigInteger, primary_key=True, autoincrement=True)
+ uprn = Column(BigInteger, nullable=False)
+
+ date = Column(Date, nullable=False)
+ source = Column(String, nullable=False)
+
+ elements = relationship(
+ "ElementModel",
+ back_populates="survey",
+ cascade="all, delete-orphan",
+ )
+
+
+class ElementModel(Base):
+ __tablename__ = "element" # TODO: rename to survey_element?
+
+ id = Column(BigInteger, primary_key=True, autoincrement=True)
+
+ survey_id = Column(
+ BigInteger,
+ ForeignKey("property_condition_survey.id"),
+ nullable=False,
+ )
+
+ element_type = Column(ElementTypeDb, nullable=False)
+ element_instance = Column(BigInteger, nullable=False)
+
+ survey = relationship(
+ "PropertyConditionSurveyModel",
+ back_populates="elements",
+ )
+
+ aspect_conditions = relationship(
+ "AspectConditionModel",
+ back_populates="element",
+ cascade="all, delete-orphan",
+ )
+
+
+class AspectConditionModel(Base):
+ __tablename__ = "aspect_condition" # TODO: rename to survey_aspect?
+
+ id = Column(BigInteger, primary_key=True, autoincrement=True)
+
+ element_id = Column(
+ BigInteger,
+ ForeignKey("element.id"),
+ nullable=False,
+ )
+
+ aspect_type = Column(AspectTypeDb, nullable=False)
+ aspect_instance = Column(BigInteger, nullable=False)
+
+ value = Column(String)
+ quantity = Column(Integer)
+ install_date = Column(Date)
+ renewal_year = Column(Integer)
+ comments = Column(String)
+
+ element = relationship(
+ "ElementModel",
+ back_populates="aspect_conditions",
+ )
diff --git a/backend/app/plan/schemas.py b/backend/app/plan/schemas.py
index edac31dc..7c352eba 100644
--- a/backend/app/plan/schemas.py
+++ b/backend/app/plan/schemas.py
@@ -9,7 +9,9 @@ TYPICAL_MEASURE_TYPES = [
]
WALL_INSULATION_MEASURES = ["internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"]
-ROOF_INSULATION_MEASURES = ["loft_insulation", "flat_roof_insulation", "room_roof_insulation"]
+ROOF_INSULATION_MEASURES = [
+ "loft_insulation", "flat_roof_insulation", "room_roof_insulation", "sloping_ceiling_insulation"
+]
# Both all and roof insulaiton measures are eligible for ECO4. These are the remaining fabric and heating measures
# This is based on th measures we have recommendations for
@@ -31,7 +33,7 @@ SPECIFIC_MEASURES = (
INSULATION_MEASURES = [
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
- "loft_insulation", "flat_roof_insulation", "room_roof_insulation",
+ "loft_insulation", "flat_roof_insulation", "room_roof_insulation", "sloping_ceiling_insulation",
"suspended_floor_insulation", "solid_floor_insulation",
]
@@ -46,7 +48,9 @@ MEASURE_MAP = {
"wall_insulation": [
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
],
- "roof_insulation": ["loft_insulation", "flat_roof_insulation", "room_roof_insulation"],
+ "roof_insulation": [
+ "loft_insulation", "flat_roof_insulation", "room_roof_insulation", "sloping_ceiling_insulation"
+ ],
"floor_insulation": ["suspended_floor_insulation", "solid_floor_insulation"],
"heating": ["boiler_upgrade", "high_heat_retention_storage_heaters", "air_source_heat_pump"],
"windows": ["double_glazing", "secondary_glazing"],
diff --git a/backend/condition/README.md b/backend/condition/README.md
index 140d4585..46302cab 100644
--- a/backend/condition/README.md
+++ b/backend/condition/README.md
@@ -20,7 +20,7 @@ The processor currently supports file formats provided by **Peabody** and **LBWF
The `local_runner` script allows the processor to be executed in a local environment.
-1. Copy a sample input file into the `sample_data/` directory.
+1. Copy sample input file(s) into the `sample_data/` directory. If working with Peabody data, you'll need the Landlord Reference / UPRN lookup file as well.
2. Update `local_runner.py` as required, specifically the definitions of:
- `lbwf_path`
- `peabody_path`
diff --git a/backend/condition/local_runner.py b/backend/condition/local_runner.py
index 404f64d4..e39d38c7 100644
--- a/backend/condition/local_runner.py
+++ b/backend/condition/local_runner.py
@@ -21,6 +21,8 @@ def main() -> None:
/ "2026_01_06 - Peabody - Stock Condition Data - Survey Records - D Lower.xlsx"
)
filepaths = [lbwf_path, peabody_path]
+ # filepaths = [lbwf_path]
+ # filepaths = [peabody_path]
for fp in filepaths:
with fp.open("rb") as f:
diff --git a/backend/condition/parsing/lbwf_parser.py b/backend/condition/parsing/lbwf_parser.py
index 14d2efe4..3a23d028 100644
--- a/backend/condition/parsing/lbwf_parser.py
+++ b/backend/condition/parsing/lbwf_parser.py
@@ -1,4 +1,4 @@
-from typing import BinaryIO, Any, Dict, Iterator, List, Tuple
+from typing import BinaryIO, Any, Dict, Iterator, List, Optional, Tuple
from openpyxl import Workbook, load_workbook
from collections import defaultdict
@@ -15,7 +15,11 @@ logger = setup_logger()
class LbwfParser(Parser):
- def parse(self, file_stream: BinaryIO) -> Any:
+ def parse(
+ self,
+ file_stream: BinaryIO,
+ location_ref_to_uprn_map: Optional[Dict[str, int]] = None,
+ ) -> Any:
wb: Workbook = load_workbook(file_stream)
address_to_uprn_map: Dict[str, int] = LbwfParser._generate_address_to_uprn_dict(
wb
diff --git a/backend/condition/parsing/parser.py b/backend/condition/parsing/parser.py
index 105fda36..825abcd5 100644
--- a/backend/condition/parsing/parser.py
+++ b/backend/condition/parsing/parser.py
@@ -1,8 +1,13 @@
from abc import ABC, abstractmethod
-from typing import BinaryIO, Any
+from typing import BinaryIO, Any, Dict, Optional
+
class Parser(ABC):
@abstractmethod
- def parse(self, file_stream: BinaryIO) -> Any:
- pass
\ No newline at end of file
+ def parse(
+ self,
+ file_stream: BinaryIO,
+ location_ref_to_uprn_map: Optional[Dict[str, int]] = None,
+ ) -> Any:
+ pass
diff --git a/backend/condition/parsing/peabody_parser.py b/backend/condition/parsing/peabody_parser.py
index b8a548a7..c53fd6d1 100644
--- a/backend/condition/parsing/peabody_parser.py
+++ b/backend/condition/parsing/peabody_parser.py
@@ -1,26 +1,55 @@
-from typing import Any, BinaryIO, Dict, Iterator, List, Tuple, DefaultDict
+import csv
+from pathlib import Path
+from typing import Any, BinaryIO, Dict, List, Optional, Tuple, DefaultDict
from openpyxl import Workbook, load_workbook
from collections import defaultdict
from backend.condition.parsing.parser import Parser
-from backend.condition.parsing.records.peabody.peabody_asset_condition import PeabodyAssetCondition
+from backend.condition.parsing.records.peabody.peabody_asset_condition import (
+ PeabodyAssetCondition,
+)
from backend.condition.parsing.records.peabody.peabody_property import PeabodyProperty
from utils.logger import setup_logger
logger = setup_logger()
-class PeabodyParser(Parser):
- def parse(self, file_stream: BinaryIO) -> Any:
- wb: Workbook = load_workbook(file_stream)
- address_to_uprn_map: Dict[str, int] = PeabodyParser._generate_address_to_uprn_dict(wb)
-
- assets = self._parse_assets(wb)
- return self._group_assets_into_properties(
+class PeabodyParser(Parser):
+ def parse(
+ self,
+ file_stream: BinaryIO,
+ location_ref_to_uprn_map: Optional[Dict[str, int]] = None,
+ ) -> Any:
+ wb: Workbook = load_workbook(file_stream)
+
+ if location_ref_to_uprn_map is None:
+ location_ref_to_uprn_map: Dict[str, int] = (
+ PeabodyParser._build_location_ref_to_uprn_map()
+ )
+
+ assets = PeabodyParser._parse_assets(wb)
+
+ return PeabodyParser._group_assets_into_properties(
assets=assets,
- address_to_uprn_map=address_to_uprn_map,
+ location_ref_to_uprn_map=location_ref_to_uprn_map,
)
+ @staticmethod
+ def _build_location_ref_to_uprn_map() -> Dict[str, int]:
+ location_ref_to_uprn_filepath: Path = (
+ Path(__file__).resolve().parents[1]
+ / "sample_data"
+ / "peabody"
+ / "PeabodyPropertymatched_Dec25_propref_UPRN.csv"
+ )
+ location_ref_to_uprn_map: Dict[str, int] = {}
+
+ with location_ref_to_uprn_filepath.open(newline="") as f:
+ reader: Any = csv.DictReader(f)
+ for row in reader:
+ location_ref_to_uprn_map[row["reference"]] = int(row["out_uprn"])
+
+ return location_ref_to_uprn_map
@staticmethod
def _parse_assets(wb: Workbook) -> List[PeabodyAssetCondition]:
@@ -33,39 +62,43 @@ class PeabodyParser(Parser):
assets: List[PeabodyAssetCondition] = []
for row in asset_rows:
try:
- asset = PeabodyParser._map_row_to_asset_record(row, asset_header_indexes)
+ asset = PeabodyParser._map_row_to_asset_record(
+ row, asset_header_indexes
+ )
if not asset.is_block_level:
# Block-level condition surveys are out of scope for now
- # until we have a wider think on how to handle block
- assets.append(asset) # TODO: handle block-level assets
+ # until we have a wider think on how to handle block
+ assets.append(asset) # TODO: handle block-level assets
except Exception as e:
logger.error(f"Error mapping Peabody row to asset record: {e}")
continue
return assets
-
+
@staticmethod
def _group_assets_into_properties(
assets: List[PeabodyAssetCondition],
- address_to_uprn_map: Dict[str, int],
+ location_ref_to_uprn_map: Dict[str, int],
) -> List[PeabodyProperty]:
- assets_by_address: DefaultDict[str, List[PeabodyAssetCondition]] = defaultdict(list)
+ assets_by_location_reference: DefaultDict[str, List[PeabodyAssetCondition]] = (
+ defaultdict(list)
+ )
for asset in assets:
- if asset.full_address is None:
+ if asset.lo_reference is None:
continue
- address = asset.full_address.strip()
- assets_by_address[address].append(asset)
+ assets_by_location_reference[asset.lo_reference].append(asset)
properties: List[PeabodyProperty] = []
- for address, grouped_assets in assets_by_address.items():
- uprn = address_to_uprn_map.get(address)
+ for location_ref, grouped_assets in assets_by_location_reference.items():
+
+ uprn = location_ref_to_uprn_map.get(location_ref)
if uprn is None:
- logger.warning(f"No UPRN found for address: {address}")
+ logger.warning(f"No UPRN found for Location Reference: {location_ref}")
continue
properties.append(
@@ -77,7 +110,6 @@ class PeabodyParser(Parser):
return properties
-
@staticmethod
def _map_row_to_asset_record(
row: Any | Tuple[object | None, ...],
@@ -102,39 +134,9 @@ class PeabodyParser(Parser):
condition_survey_date=row[header_indexes["condition_survey_date"]],
)
- @staticmethod
- def _generate_address_to_uprn_dict(wb: Workbook) -> Dict[str, int | None]:
- sheet = wb["Survey Records - D & Lower"]
- rows: Iterator[Tuple[object | None, ...]] = sheet.iter_rows(values_only=True)
-
- headers = next(rows)
- header_indexes: Dict[str, int] = PeabodyParser._get_column_indexes_by_name(headers)
-
- address_idx = header_indexes["full_address"]
-
-
- address_to_uprn: Dict[str, int] = {}
- # Generate random UPRNs for now
- next_uprn = 1 # TODO: get real UPRNs
-
- for row in rows:
- address = row[address_idx]
-
- if address is None:
- continue
-
- address = address.strip()
-
- if address not in address_to_uprn:
- address_to_uprn[address] = next_uprn
- next_uprn += 1
-
- return address_to_uprn
-
-
@staticmethod
def _get_column_indexes_by_name(
- headers: Tuple[object | None, ...]
+ headers: Tuple[object | None, ...],
) -> Dict[str, int]:
index: Dict[str, int] = {}
@@ -142,4 +144,4 @@ class PeabodyParser(Parser):
if isinstance(header, str):
index[header] = i
- return index
\ No newline at end of file
+ return index
diff --git a/backend/condition/persistence/condition_postgres.py b/backend/condition/persistence/condition_postgres.py
new file mode 100644
index 00000000..9d7895f0
--- /dev/null
+++ b/backend/condition/persistence/condition_postgres.py
@@ -0,0 +1,86 @@
+import time
+from typing import List, Optional
+from sqlmodel import Session
+
+from utils.logger import setup_logger
+from backend.app.db.models.condition import (
+ AspectConditionModel,
+ ElementModel,
+ PropertyConditionSurveyModel,
+)
+from backend.condition.domain.property_condition_survey import PropertyConditionSurvey
+from backend.app.db.connection import db_session
+
+logger = setup_logger()
+
+
+class ConditionPostgres:
+
+ def bulk_insert_surveys(
+ self, surveys: List[PropertyConditionSurvey], batch_size: Optional[int] = 100
+ ) -> None:
+ logger.info(
+ f"Preparing to load {len(surveys)} property surveys to Postgres. Mapping to SQLModel objects..."
+ )
+ survey_models: List[PropertyConditionSurveyModel] = [
+ ConditionPostgres.map_survey_to_model(s) for s in surveys
+ ]
+ total: int = len(survey_models)
+ logger.info(
+ f"Finished mapping {total} surveys. Writing to database in batches of {batch_size}..."
+ )
+
+ with db_session() as session:
+ for start in range(0, total, batch_size):
+ end = min(start + batch_size, total)
+ batch = survey_models[start:end]
+
+ t0: float = time.perf_counter()
+ ConditionPostgres._insert_surveys_batch(batch, session)
+ elapsed: float = time.perf_counter() - t0
+
+ logger.info(
+ f"Inserted batch {start} - {end} ({len(batch)} surveys) in {elapsed} seconds",
+ )
+
+ @staticmethod
+ def map_survey_to_model(
+ survey: PropertyConditionSurvey,
+ ) -> PropertyConditionSurveyModel:
+ survey_model = PropertyConditionSurveyModel(
+ uprn=survey.uprn,
+ date=survey.date,
+ source=survey.source,
+ elements=[],
+ )
+
+ for element in survey.elements:
+ element_model = ElementModel(
+ element_type=element.element_type,
+ element_instance=element.element_instance,
+ aspect_conditions=[],
+ )
+
+ for aspect in element.aspect_conditions:
+ aspect_model = AspectConditionModel(
+ aspect_type=aspect.aspect_type,
+ aspect_instance=aspect.aspect_instance,
+ value=aspect.value,
+ quantity=aspect.quantity,
+ install_date=aspect.install_date,
+ renewal_year=aspect.renewal_year,
+ comments=aspect.comments,
+ )
+
+ element_model.aspect_conditions.append(aspect_model)
+
+ survey_model.elements.append(element_model)
+
+ return survey_model
+
+ @staticmethod
+ def _insert_surveys_batch(
+ surveys: List[PropertyConditionSurveyModel], session: Session
+ ) -> None:
+ session.add_all(surveys)
+ session.commit()
diff --git a/backend/condition/processor.py b/backend/condition/processor.py
index 3cbff498..4d8f16cf 100644
--- a/backend/condition/processor.py
+++ b/backend/condition/processor.py
@@ -1,25 +1,33 @@
from typing import Any, BinaryIO, List
from datetime import datetime
+from utils.logger import setup_logger
from backend.condition.domain.mapping.mapper import Mapper
from backend.condition.domain.property_condition_survey import PropertyConditionSurvey
from backend.condition.parsing.parser import Parser
-from utils.logger import setup_logger
+from backend.condition.persistence.condition_postgres import ConditionPostgres
from backend.condition.file_type import FileType, detect_file_type
from backend.condition.parsing.factory import select_parser, select_mapper
+logger = setup_logger()
+
def process_file(file_stream: BinaryIO, source_key: str) -> None:
- print(f"[processor] Received file: {source_key}")
+ logger.info(f"[processor] Received file: {source_key}")
# Instantiation
file_type: FileType = detect_file_type(source_key)
parser: Parser = select_parser(file_type)
mapper: Mapper = select_mapper(file_type)
+ persistence = ConditionPostgres()
# Orchestration
raw_properties: List[Any] = parser.parse(file_stream)
+ logger.info(
+ f"[processor] Finished loading customer survey data for {len(raw_properties)} properties. Mapping..."
+ )
+
survey_year = datetime.now().year # TODO: get this from filepath or elsewhere
property_condition_surveys: List[PropertyConditionSurvey] = []
@@ -29,4 +37,10 @@ def process_file(file_stream: BinaryIO, source_key: str) -> None:
mapper.map_asset_conditions_for_property(p, survey_year)
)
- print("done") # temp
+ logger.info(
+ f"[processor] Finished mapping {len(property_condition_surveys)} properties. Writing to database..."
+ )
+
+ persistence.bulk_insert_surveys(property_condition_surveys)
+
+ logger.info(f"[processor] Finished loading surveys to database")
diff --git a/backend/condition/tests/custom_asserts.py b/backend/condition/tests/custom_asserts.py
index 9e3abd7f..623dcf0c 100644
--- a/backend/condition/tests/custom_asserts.py
+++ b/backend/condition/tests/custom_asserts.py
@@ -1,3 +1,4 @@
+from backend.app.db.models.condition import PropertyConditionSurveyModel
from backend.condition.domain.property_condition_survey import PropertyConditionSurvey
@@ -72,3 +73,41 @@ class CustomAsserts:
f"{actual_aspect.comments} != {expected_aspect.comments}"
)
return True
+
+ def assert_property_condition_survey_model_matches_expected(
+ actual_model: PropertyConditionSurveyModel,
+ expected: dict,
+ ) -> None:
+ assert actual_model.uprn == expected["uprn"], "UPRN differs"
+ assert actual_model.date == expected["date"], "Date differs"
+ assert actual_model.source == expected["source"], "Source differs"
+
+ assert len(actual_model.elements) == len(expected["elements"]), (
+ f"Expected {len(expected['elements'])} elements, "
+ f"got {len(actual_model.elements)}"
+ )
+
+ for i, (actual_element, expected_element) in enumerate(
+ zip(actual_model.elements, expected["elements"])
+ ):
+ assert (
+ actual_element.element_type == expected_element["element_type"]
+ ), f"Element[{i}].element_type differs"
+ assert (
+ actual_element.element_instance == expected_element["element_instance"]
+ ), f"Element[{i}].element_instance differs"
+
+ assert len(actual_element.aspect_conditions) == len(
+ expected_element["aspects"]
+ ), f"Element[{i}] aspect count differs"
+
+ for j, (actual_aspect, expected_aspect) in enumerate(
+ zip(actual_element.aspect_conditions, expected_element["aspects"])
+ ):
+ prefix = f"Element[{i}].Aspect[{j}]"
+
+ for key, value in expected_aspect.items():
+ assert getattr(actual_aspect, key) == value, (
+ f"{prefix}.{key} differs: "
+ f"{getattr(actual_aspect, key)} != {value}"
+ )
diff --git a/backend/condition/tests/parsing/test_peabody_parser.py b/backend/condition/tests/parsing/test_peabody_parser.py
index 32ff79d8..20f7a28e 100644
--- a/backend/condition/tests/parsing/test_peabody_parser.py
+++ b/backend/condition/tests/parsing/test_peabody_parser.py
@@ -1,127 +1,141 @@
import pytest
-from typing import Any
+from typing import Any, Dict
from io import BytesIO
from openpyxl import Workbook
from datetime import datetime
from backend.condition.parsing.peabody_parser import PeabodyParser
-from backend.condition.parsing.records.peabody.peabody_asset_condition import PeabodyAssetCondition
+from backend.condition.parsing.records.peabody.peabody_asset_condition import (
+ PeabodyAssetCondition,
+)
from backend.condition.parsing.records.peabody.peabody_property import PeabodyProperty
+
@pytest.fixture
def peabody_assets_xlsx_bytes() -> BytesIO:
wb = Workbook()
survey_records_d_and_lower = wb.active
survey_records_d_and_lower.title = "Survey Records - D & Lower"
- survey_records_d_and_lower.append([
- "Lo_Reference",
- "full_address",
- "location_type_code",
- "Parent_Lo_Reference",
- "Element_Code",
- "Element",
- "Sub_Element_Code",
- "Sub_Element",
- "Material_Code",
- "material_or_answer",
- "Renewal_Quantity",
- "Renewal_Year",
- "Renewal_Cost",
- "cloned",
- "lo_type_code",
- "condition_survey_date",
- ])
- survey_records_d_and_lower.append([
- "B000RAND",
- "1 RANDOM HOUSE LONDON",
- 3,
- "RAND2EST",
- 110,
- "ROOFS",
- 1,
- "Primary Roof",
- 9,
- "Other",
- 3,
- 2054,
- 330,
- "N",
- 3,
- datetime(2025,12,4,9,17,0)
- ])
- survey_records_d_and_lower.append([
- "B000BLOCK",
- "1100 BLOCK",
- 3,
- "RAND2EST",
- 110,
- "ROOFS",
- 1,
- "Primary Roof",
- 9,
- "Other",
- 3,
- 2054,
- 330,
- "N",
- 3,
- datetime(2025,12,4,9,17,0)
- ])
- survey_records_d_and_lower.append([
- "B000FAKE",
- "3 FAKE CLOSE LONDON",
- 3,
- "FAKEEST",
- 100,
- "GENERAL",
- 15,
- "External Decoration",
- 2,
- "Normal",
- 1,
- 2035,
- 1500.7,
- "N",
- 3,
- datetime(2025,7,5,0,0,0)
- ])
- survey_records_d_and_lower.append([
- "B000MIS",
- "99 MISC ROAD LONDON",
- 3,
- "300828",
- 54,
- "HHSRS",
- 29,
- "HHSRS Structural Collapse & Falling Elements",
- 4,
- "HHSRS Moderate",
- 2,
- 2027,
- None,
- "N",
- 3,
- None
- ])
- survey_records_d_and_lower.append([
- "B000MIS",
- "99 MISC ROAD LONDON",
- 3,
- "300828",
- 53,
- "External",
- 2,
- "Chimney",
- 2,
- "Present",
- 33,
- 2053,
- 3531,
- "N",
- 3,
- None
- ])
-
+ survey_records_d_and_lower.append(
+ [
+ "Lo_Reference",
+ "full_address",
+ "location_type_code",
+ "Parent_Lo_Reference",
+ "Element_Code",
+ "Element",
+ "Sub_Element_Code",
+ "Sub_Element",
+ "Material_Code",
+ "material_or_answer",
+ "Renewal_Quantity",
+ "Renewal_Year",
+ "Renewal_Cost",
+ "cloned",
+ "lo_type_code",
+ "condition_survey_date",
+ ]
+ )
+ survey_records_d_and_lower.append(
+ [
+ "B000RAND",
+ "1 RANDOM HOUSE LONDON",
+ 3,
+ "RAND2EST",
+ 110,
+ "ROOFS",
+ 1,
+ "Primary Roof",
+ 9,
+ "Other",
+ 3,
+ 2054,
+ 330,
+ "N",
+ 3,
+ datetime(2025, 12, 4, 9, 17, 0),
+ ]
+ )
+ survey_records_d_and_lower.append(
+ [
+ "B000BLOCK",
+ "1100 BLOCK",
+ 3,
+ "RAND2EST",
+ 110,
+ "ROOFS",
+ 1,
+ "Primary Roof",
+ 9,
+ "Other",
+ 3,
+ 2054,
+ 330,
+ "N",
+ 3,
+ datetime(2025, 12, 4, 9, 17, 0),
+ ]
+ )
+ survey_records_d_and_lower.append(
+ [
+ "B000FAKE",
+ "3 FAKE CLOSE LONDON",
+ 3,
+ "FAKEEST",
+ 100,
+ "GENERAL",
+ 15,
+ "External Decoration",
+ 2,
+ "Normal",
+ 1,
+ 2035,
+ 1500.7,
+ "N",
+ 3,
+ datetime(2025, 7, 5, 0, 0, 0),
+ ]
+ )
+ survey_records_d_and_lower.append(
+ [
+ "B000MIS",
+ "99 MISC ROAD LONDON",
+ 3,
+ "300828",
+ 54,
+ "HHSRS",
+ 29,
+ "HHSRS Structural Collapse & Falling Elements",
+ 4,
+ "HHSRS Moderate",
+ 2,
+ 2027,
+ None,
+ "N",
+ 3,
+ None,
+ ]
+ )
+ survey_records_d_and_lower.append(
+ [
+ "B000MIS",
+ "99 MISC ROAD LONDON",
+ 3,
+ "300828",
+ 53,
+ "External",
+ 2,
+ "Chimney",
+ 2,
+ "Present",
+ 33,
+ 2053,
+ 3531,
+ "N",
+ 3,
+ None,
+ ]
+ )
stream = BytesIO()
wb.save(stream)
@@ -129,18 +143,32 @@ def peabody_assets_xlsx_bytes() -> BytesIO:
return stream
-def test_peabody_parser_parses_conditions(peabody_assets_xlsx_bytes):
+
+@pytest.fixture
+def location_ref_to_uprn_map() -> Dict[str, int]:
+ return {
+ "B000RAND": 1,
+ "B000BLOCK": 2,
+ "B000FAKE": 3,
+ "B000MIS": 4,
+ }
+
+
+def test_peabody_parser_parses_conditions(
+ peabody_assets_xlsx_bytes, location_ref_to_uprn_map
+):
# arrange
parser = PeabodyParser()
# act
- result: Any = parser.parse(peabody_assets_xlsx_bytes)
+ result: Any = parser.parse(peabody_assets_xlsx_bytes, location_ref_to_uprn_map)
# assert
assert len(result) == 3
assert all(isinstance(item, PeabodyProperty) for item in result)
+
@pytest.fixture
def asset_condition_factory():
def _factory(full_address: str) -> PeabodyAssetCondition:
@@ -165,6 +193,7 @@ def asset_condition_factory():
return _factory
+
@pytest.mark.parametrize(
"full_address, expected_block_level",
[
@@ -175,7 +204,7 @@ def asset_condition_factory():
("81A-B GORE ROAD LONDON", True),
("73 & 74 HARVEST COURT ST. ALBANS", True),
("25 HAVERSHAM COURT GREENFORD", False),
- ("FLAT 10 SPARROW COURT SOUTHMERE DRIVE LONDON SE2 9ES", False)
+ ("FLAT 10 SPARROW COURT SOUTHMERE DRIVE LONDON SE2 9ES", False),
],
)
def test_peabody_asset_is_block_level(
@@ -187,4 +216,4 @@ def test_peabody_asset_is_block_level(
asset_condition = asset_condition_factory(full_address)
# act + assert
- assert asset_condition.is_block_level == expected_block_level
\ No newline at end of file
+ assert asset_condition.is_block_level == expected_block_level
diff --git a/backend/condition/tests/persistence/test_condition_postgres.py b/backend/condition/tests/persistence/test_condition_postgres.py
new file mode 100644
index 00000000..ca95eaaa
--- /dev/null
+++ b/backend/condition/tests/persistence/test_condition_postgres.py
@@ -0,0 +1,164 @@
+import pytest
+from datetime import date
+
+from backend.condition.persistence.condition_postgres import ConditionPostgres
+from backend.condition.domain.property_condition_survey import PropertyConditionSurvey
+from backend.condition.domain.element import Element
+from backend.condition.domain.element_type import ElementType
+from backend.condition.domain.aspect_condition import AspectCondition
+from backend.condition.domain.aspect_type import AspectType
+from backend.app.db.models.condition import PropertyConditionSurveyModel
+from backend.condition.tests.custom_asserts import CustomAsserts
+
+
+def test_map_survey_to_model() -> None:
+ # arrange
+ survey = PropertyConditionSurvey(
+ uprn=1,
+ elements=[
+ Element(
+ element_type=ElementType.EXTERNAL_WINDOWS,
+ element_instance=1,
+ aspect_conditions=[
+ AspectCondition(
+ aspect_type=AspectType.MATERIAL,
+ aspect_instance=1,
+ value="UPVC Double Glazed",
+ quantity=8,
+ install_date=None,
+ renewal_year=2036,
+ comments=None,
+ ),
+ ],
+ ),
+ Element(
+ element_type=ElementType.EXTERNAL_DECORATION,
+ element_instance=1,
+ aspect_conditions=[
+ AspectCondition(
+ aspect_type=AspectType.CONDITION,
+ aspect_instance=1,
+ value="Normal",
+ quantity=1,
+ install_date=None,
+ renewal_year=2029,
+ comments=None,
+ )
+ ],
+ ),
+ Element(
+ element_type=ElementType.EXTERNAL_WALL,
+ element_instance=1,
+ aspect_conditions=[
+ AspectCondition(
+ aspect_type=AspectType.FINISH,
+ aspect_instance=1,
+ value="Pointed",
+ quantity=65,
+ install_date=None,
+ renewal_year=2045,
+ comments=None,
+ ),
+ AspectCondition(
+ aspect_type=AspectType.FINISH,
+ aspect_instance=1,
+ value="Pointing",
+ quantity=1,
+ install_date=None,
+ renewal_year=2069,
+ comments=None,
+ ),
+ AspectCondition(
+ aspect_type=AspectType.FINISH,
+ aspect_instance=2,
+ value="Tile Hung",
+ quantity=8,
+ install_date=None,
+ renewal_year=2049,
+ comments=None,
+ ),
+ ],
+ ),
+ ],
+ date=date(2000, 1, 1),
+ source="Peabody",
+ )
+
+ expected = {
+ "uprn": 1,
+ "date": date(2000, 1, 1),
+ "source": "Peabody",
+ "elements": [
+ {
+ "element_type": ElementType.EXTERNAL_WINDOWS,
+ "element_instance": 1,
+ "aspects": [
+ {
+ "aspect_type": AspectType.MATERIAL,
+ "aspect_instance": 1,
+ "value": "UPVC Double Glazed",
+ "quantity": 8,
+ "install_date": None,
+ "renewal_year": 2036,
+ "comments": None,
+ }
+ ],
+ },
+ {
+ "element_type": ElementType.EXTERNAL_DECORATION,
+ "element_instance": 1,
+ "aspects": [
+ {
+ "aspect_type": AspectType.CONDITION,
+ "aspect_instance": 1,
+ "value": "Normal",
+ "quantity": 1,
+ "install_date": None,
+ "renewal_year": 2029,
+ "comments": None,
+ }
+ ],
+ },
+ {
+ "element_type": ElementType.EXTERNAL_WALL,
+ "element_instance": 1,
+ "aspects": [
+ {
+ "aspect_instance": 1,
+ "value": "Pointed",
+ "quantity": 65,
+ "install_date": None,
+ "renewal_year": 2045,
+ "comments": None,
+ },
+ {
+ "aspect_type": AspectType.FINISH,
+ "aspect_instance": 1,
+ "value": "Pointing",
+ "quantity": 1,
+ "install_date": None,
+ "renewal_year": 2069,
+ "comments": None,
+ },
+ {
+ "aspect_type": AspectType.FINISH,
+ "aspect_instance": 2,
+ "value": "Tile Hung",
+ "quantity": 8,
+ "install_date": None,
+ "renewal_year": 2049,
+ "comments": None,
+ },
+ ],
+ },
+ ],
+ }
+
+ # act
+ model: PropertyConditionSurveyModel = ConditionPostgres.map_survey_to_model(survey)
+
+ # assert (survey level)
+ CustomAsserts.assert_property_condition_survey_model_matches_expected(
+ model,
+ expected,
+ )
diff --git a/backend/engine/engine.py b/backend/engine/engine.py
index a9156078..e833eb89 100644
--- a/backend/engine/engine.py
+++ b/backend/engine/engine.py
@@ -796,9 +796,9 @@ async def model_engine(body: PlanTriggerRequest):
property_non_invasive_recommendations, patch = req_data.non_invasive_recommendations, req_data.patch
# if we have a remote assment data type, we pull the additional data and include it
- epc_page_source = {}
+ epc_page_source, find_my_epc_components = {}, []
if (body.event_type == "remote_assessment") and not (epc_searcher.newest_epc.get("estimated")):
- property_non_invasive_recommendations, patch, epc_page_source = (
+ property_non_invasive_recommendations, patch, epc_page_source, find_my_epc_components = (
RetrieveFindMyEpc.get_from_epc_with_fallback(
epc=epc_searcher.newest_epc,
epc_page=epc_page,
@@ -834,6 +834,7 @@ async def model_engine(body: PlanTriggerRequest):
postcode=epc_searcher.postcode_clean,
epc_record=prepared_epc,
already_installed=property_already_installed + eco_packages.get(property_id)[3],
+ find_my_epc_components=find_my_epc_components,
property_valuation=req_data.valuation,
non_invasive_recommendations=property_non_invasive_recommendations,
energy_assessment=energy_assessment,
@@ -1050,11 +1051,14 @@ async def model_engine(body: PlanTriggerRequest):
property_required_measures = [m for m in recommendations[p.id] if m[0]["type"] in body.required_measures]
measures_to_optimise = [m for m in recommendations[p.id] if m[0]["type"] not in body.required_measures]
+ ventilation_included = "ventilation" in property_measure_types
+
# If a measure requiring ventilation is selected, and the property does not have ventilation, we enfore
# its inclusion
+
needs_ventilation = any(
x in property_measure_types for x in assumptions.measures_needing_ventilation
- ) and not p.has_ventilation
+ ) and not p.has_ventilation and ventilation_included
if not measures_to_optimise:
# Nothing to do, we just reshape the recommendations
diff --git a/etl/find_my_epc/RetrieveFindMyEpc.py b/etl/find_my_epc/RetrieveFindMyEpc.py
index cf6659f9..392e6aaa 100644
--- a/etl/find_my_epc/RetrieveFindMyEpc.py
+++ b/etl/find_my_epc/RetrieveFindMyEpc.py
@@ -36,6 +36,8 @@ class RetrieveFindMyEpc:
self.rrn = rrn
self.address_cleaned = self.address.replace(",", "").replace(" ", "").lower()
+
+ # Containers for the extracted components
self.walls = []
self.address_postal_town = address_postal_town
@@ -256,12 +258,10 @@ class RetrieveFindMyEpc:
property_features_table = soup.find("tbody", class_="govuk-table__body")
property_features_table = property_features_table.find_all("tr")
- # Extract wall types
- self.walls = []
- for row in property_features_table:
- cells = row.find_all("td")
- if row.find("th").text.strip() == "Wall":
- self.walls.append(cells[0].text.strip())
+ property_components = self.extract_property_components(property_features_table)
+
+ # Extract walls
+ self.walls = [x["description"] for x in property_components if x["component_name"] == "Wall"]
# Finally, we format the recommendations
recommendations = self.format_recommendations(recommendations, assessment_data, sap_2012_date)
@@ -424,6 +424,37 @@ class RetrieveFindMyEpc:
return chosen_epc, epc_certificate
+ @staticmethod
+ def extract_property_components(property_features_table: list):
+ """
+ Function to pull out a table for property components, marking their appearance index
+ :param property_features_table: The table of property features, as extracted by BeautifulSoup
+ :return: List of property components with appearance index
+ """
+ property_components = []
+ for row in property_features_table:
+ cells = row.find_all("td")
+ component_name = row.find("th").text.strip()
+ property_components.append(
+ {
+ "component_name": component_name,
+ "description": cells[0].text.strip(),
+ "efficiency": cells[1].text.strip(),
+ }
+ )
+ # Add an appearance index, which will indicate if the component appears multiple times, so this
+ # becomes a reference for the building part the component is associated to (main, extensions, etc)
+ # We want to inject this appearance index into the component dictionaries
+ component_count = {}
+ for component in property_components:
+ name = component['component_name']
+ if name not in component_count:
+ component_count[name] = 0
+ component['appearance_index'] = component_count[name]
+ component_count[name] += 1
+
+ return property_components
+
def retrieve_newest_find_my_epc_data(
self, sap_2012_date=None, return_page=False, epc_page_source=None, rrn=None
):
@@ -577,12 +608,10 @@ class RetrieveFindMyEpc:
property_features_table = address_res.find("tbody", class_="govuk-table__body")
property_features_table = property_features_table.find_all("tr")
- # Extract wall types
- self.walls = []
- for row in property_features_table:
- cells = row.find_all("td")
- if row.find("th").text.strip() == "Wall":
- self.walls.append(cells[0].text.strip())
+ property_components = self.extract_property_components(property_features_table)
+
+ # Extract walls
+ self.walls = [x["description"] for x in property_components if x["component_name"] == "Wall"]
# Finally, we format the recommendations
recommendations = self.format_recommendations(recommendations, assessment_data, sap_2012_date)
@@ -615,6 +644,7 @@ class RetrieveFindMyEpc:
"heating_text": heating_text,
"hot_water_text": hot_water_text,
"recommendations": recommendations,
+ "property_components": property_components,
"epc_data": epc_data,
**assessment_data,
**low_carbon_energy_sources,
@@ -665,7 +695,7 @@ class RetrieveFindMyEpc:
],
"Change heating to gas condensing boiler": ["boiler_upgrade"],
"Fan assisted storage heaters and dual immersion cylinder": ["high_heat_retention_storage_heaters"],
- "Flat roof or sloping ceiling insulation": ["flat_roof_insulation"],
+ "Flat roof or sloping ceiling insulation": ["flat_roof_insulation", "sloping_ceiling_insulation"],
"Heating controls (room thermostat)": [
"roomstat_programmer_trvs", "time_temperature_zone_control"
],
@@ -804,7 +834,9 @@ class RetrieveFindMyEpc:
"page_source": find_epc_data.get("page_source")
}
- return non_invasive_recommendations, patch, page_source
+ property_components = find_epc_data.get("property_components", [])
+
+ return non_invasive_recommendations, patch, page_source, property_components
@classmethod
def get_from_epc_with_fallback(
diff --git a/recommendations/Costs.py b/recommendations/Costs.py
index 60b1d8a2..5f312f63 100644
--- a/recommendations/Costs.py
+++ b/recommendations/Costs.py
@@ -1,4 +1,6 @@
+from typing import Mapping, Any
import numpy as np
+
from recommendations.county_to_region import county_to_region_map
from utils.logger import setup_logger
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
@@ -160,6 +162,14 @@ class Costs:
"low_energy_lighting": 0.26,
"high_heat_retention_storage_heaters": 0.1,
"windows_glazing": 0.15,
+ "boiler_upgrade": 0.26,
+ "time_and_temperature_zone_control": 0.1,
+ "roomstat_programmer_trvs": 0.1,
+ "room_roof_insulation": 0.26,
+ "heater_removal": 0.1,
+ "sealing_open_fireplace": 0.1,
+ "mechanical_ventilation": 0.26,
+ "sloping_ceiling_insulation": 0.26 # Similar to IWI so using the same contingency
}
# Preliminaries are a percentage of the total cost of the work and covers the cost of site-specific costs
@@ -664,10 +674,12 @@ class Costs:
subtotal_before_vat = total_cost / (1 + self.VAT_RATE)
vat = total_cost - subtotal_before_vat
+ contingency_rate = self.CONTINGENCIES["roomstat_programmer_trvs"]
+
return {
"total": total_cost,
- "contingency": total_cost * self.CONTINGENCY,
- "contingency_rate": self.CONTINGENCY,
+ "contingency": total_cost * contingency_rate,
+ "contingency_rate": contingency_rate,
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": labour_hours,
@@ -698,10 +710,12 @@ class Costs:
labour_days = np.ceil(labour_hours / 8)
+ contingency_rate = self.CONTINGENCIES["time_and_temperature_zone_control"]
+
return {
"total": total_cost,
- "contingency": total_cost * self.CONTINGENCY,
- "contingency_rate": self.CONTINGENCY,
+ "contingency": total_cost * contingency_rate,
+ "contingency_rate": contingency_rate,
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": labour_hours,
@@ -752,10 +766,12 @@ class Costs:
subtotal_before_vat = removal_cost
total_cost = subtotal_before_vat + vat
+ contingency_rate = self.CONTINGENCIES["heater_removal"]
+
return {
"total": total_cost,
- "contingency": total_cost * self.CONTINGENCY,
- "contingency_rate": self.CONTINGENCY,
+ "contingency": total_cost * contingency_rate,
+ "contingency_rate": contingency_rate,
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": removal_labour_hours,
@@ -858,10 +874,12 @@ class Costs:
subtotal_before_vat += system_change_cost_before_vat
vat += system_change_vat
+ contingency_rate = self.CONTINGENCIES["boiler_upgrade"]
+
return {
"total": total_cost,
- "contingency": total_cost * self.CONTINGENCY,
- "contingency_rate": self.CONTINGENCY,
+ "contingency": total_cost * contingency_rate,
+ "contingency_rate": contingency_rate,
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": labour_hours,
@@ -920,3 +938,70 @@ class Costs:
"labour_hours": 80,
"labour_days": 10,
}
+
+ @staticmethod
+ def _estimate_number_of_days_for_sloping_ceiling(insulation_roof_area: float) -> float:
+ """
+ Estimate labour days required to insulate an existing sloping ceiling.
+
+ Heuristic model based on retrofit guidance (Checkatrade, The Green Age)
+ and analogy with internal wall insulation.
+
+ See _estimate_number_of_days_for_solid_floor for detailed explanation regarding assumptions
+ and methodology, however for the purpose of placeholder, this function mimics the approach
+ to that method but is detached to allow for future changes
+
+ Assumptions:
+ - ~30 m² of sloping ceiling takes ~4 working days
+ - Small jobs still require multiple days (setup, stripping, reboarding)
+ - Larger areas benefit from economies of scale, but not linearly
+
+ :param insulation_roof_area: m² of sloping ceiling to be insulated
+ """
+
+ base_days = 4
+ base_area = 30 # m2 reference case
+ labour_exponent = 0.85
+ min_days = 2
+
+ labour_days = max(
+ min_days,
+ base_days * (insulation_roof_area / base_area) ** labour_exponent
+ )
+
+ return labour_days
+
+ @classmethod
+ def sloping_ceiling_insulation(cls, insulation_roof_area: float) -> Mapping[str, float]:
+ """
+ This costing for this is based on Checkatrade desktop research, since we are yet to receive installer quotes.
+ :param insulation_roof_area: Area of the sloping ceiling to be insulated
+ :return:
+ """
+ ################
+ # Assumptions
+ ################
+ # Sources:
+ # https://www.checkatrade.com/blog/cost-guides/vaulted-ceiling-cost/
+ # https://www.thegreenage.co.uk/can-i-insulate-my-sloping-ceiling/
+ # These assumptions last updated 21/02/2026
+ insulation_cost_per_m2 = 52 # The actual install process is quite similar to IWI
+ labour_rate = 250 # per day
+ contingency_rate = cls.CONTINGENCIES["sloping_ceiling_insulation"]
+
+ labour_days = cls._estimate_number_of_days_for_sloping_ceiling(insulation_roof_area)
+ labour_hours = labour_days * 8
+
+ total = (insulation_cost_per_m2 * insulation_roof_area) + (labour_rate * labour_days)
+
+ # Assume VAT included in the total => total is 120% of subtotal
+ vat = total - (total / 1.2)
+
+ return {
+ "total": float(total),
+ "contingency": float(total * contingency_rate),
+ "contingency_rate": contingency_rate,
+ "vat": float(vat),
+ "labour_hours": float(labour_hours),
+ "labour_days": float(labour_days),
+ }
diff --git a/recommendations/RoofRecommendations.py b/recommendations/RoofRecommendations.py
index 1e5636ff..71e47ba6 100644
--- a/recommendations/RoofRecommendations.py
+++ b/recommendations/RoofRecommendations.py
@@ -2,7 +2,7 @@ import math
import pandas as pd
from backend.Property import Property
from backend.app.plan.schemas import MEASURE_MAP
-from typing import List
+from typing import List, Mapping, Any
from datatypes.enums import QuantityUnits
from recommendations.recommendation_utils import (
get_roof_u_value, r_value_per_mm_to_u_value, calculate_u_value_uplift, is_diminishing_returns,
@@ -11,6 +11,7 @@ from recommendations.recommendation_utils import (
)
from recommendations.Costs import Costs
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
+from backend.app.plan.schemas import ROOF_INSULATION_MEASURES
class RoofRecommendations:
@@ -119,41 +120,377 @@ class RoofRecommendations:
return (full_insulated_room_roof or room_roof_insulated_at_rafters) and not has_non_invasive_recommendation
- def recommend(self, phase, measures=None, default_u_values=False):
+ @staticmethod
+ def is_sloping_ceiling_appropriate(
+ is_pitched: bool,
+ is_loft: bool,
+ is_assumed: bool,
+ is_flat: bool,
+ has_sloping_ceiling_recommendation: bool,
+ primary_roof_looks_sloped: bool,
+ insulation_thickness: str,
+ has_loft_insulation_recommendation: bool
+ ) -> bool:
+ """
+ :param is_pitched: Boolean - indicates whether or not the roof is pitched
+ :param is_flat: Boolean - indicates whether or not the roof is flat
+ :param is_loft: Boolean - indicates whether or not the roof is described as a loft
+ :param is_assumed: Boolean - indiates if the assessment of the roof is assumed or actually confirmed
+ :param has_sloping_ceiling_recommendation: Boolean - indicates if the property has a sloping ceiling
+ recommendation
+ :param primary_roof_looks_sloped: Boolean - indicates if the primary room is described a sloped (as opposed to
+ an extension)
+ :param insulation_thickness: String - insulation thickness of the roof
+ :param has_loft_insulation_recommendation: Boolean - indicates whether or not there
+ :return:
+ """
+ # We need to check:
+ # 1) If the property has a pitched roof
+ # 2) Does it have a recommendation for sloping ceiling
+ # 3) Is the insulation status NOT assumed
+ # 4) Is there a sloping ceiling recommendation (this may relate to the primary or secondary roof)
+
+ # If we have a loft primary roof and sloping ceiling
+
+ has_suitable_features = (
+ is_pitched and not is_loft and not is_assumed and primary_roof_looks_sloped
+ )
+
+ # Check if it needs a recommendation
+ needs_recommendation_condition1 = has_sloping_ceiling_recommendation | (
+ insulation_thickness in ["below average"]
+ )
+
+ needs_recommendation_condition2 = has_sloping_ceiling_recommendation & (
+ insulation_thickness in ["none"]
+ )
+
+ # If the insulation thickness is 'none' this isn't alone conclusive for us to determine if it's
+ # a sloped ceiling
+ needs_recommendation = needs_recommendation_condition1 | needs_recommendation_condition2
+
+ # The property is pitched, not a loft, not assumed and has a sloping ceiling rec
+ if has_suitable_features and needs_recommendation:
+ return True
+
+ # In this case, we have an assumed pitched roof with average or below average insulation
+ # but a sloping ceiling insulation without loft
+ if has_sloping_ceiling_recommendation and not has_loft_insulation_recommendation and not is_flat:
+ return True
+
+ return False
+
+ @staticmethod
+ def is_loft_insulation_appropriate(
+ measures: List,
+ is_pitched: bool,
+ is_at_rafters: bool,
+ rir_over_loft: bool,
+ is_assumed: bool,
+ insulation_thickness: str,
+ has_loft_insulation_recommendation: bool,
+ has_sloping_ceiling_recommendation: bool
+ ) -> bool:
+ """
+ Determine if loft insulation is appropriate
+ :param measures: List - list of measures
+ :param is_pitched: Boolean - indicates whether or not the roof is pitched
+ :param is_at_rafters: Boolean - indicates whether or not the loft insulation is at rafters
+ :param rir_over_loft: Boolean - indicates whether or not there we should be doing RIR insulation
+ :param is_assumed: Boolean - indicates whether or not the roof insulation status is assumed
+ :param insulation_thickness: String - insulation thickness of the roof
+ :param has_loft_insulation_recommendation: Boolean - indicates whether or not there
+ is a loft insulation non-invasive recommendation
+ :param has_sloping_ceiling_recommendation: Boolean - indicates whether or not there
+ is a sloping ceiling non-invasive recommendation
+ :return:
+ """
+
+ has_li_in_measures = "loft_insulation" in measures
+
+ # Key business logic:
+ # If we have a pitched roof, no insulation, it's not assumed and we have a sloping ceiling recommendation,
+ # we do NOT recommend loft insulation
+ if is_pitched and not is_assumed and has_sloping_ceiling_recommendation:
+ return False
+
+ # We check the insulation thickness. If it's one of the "average", "below average", "none" values,
+
+ if (
+ is_assumed and is_pitched and insulation_thickness in ["average", "below average", "above average"]
+ and not has_sloping_ceiling_recommendation and not has_loft_insulation_recommendation
+ ):
+ # This is a pitched roof, without access to the loft, with unknown insulation status
+ return True
+
+ return has_loft_insulation_recommendation or (
+ is_pitched and has_li_in_measures and not is_at_rafters
+ ) and not rir_over_loft
+
+ @staticmethod
+ def is_flat_roof_insulation_appropriate(
+ is_flat: bool, measures: List, has_flat_roof_recommendation: bool, primary_roof_looks_sloped: bool
+ ) -> bool:
+ """
+ Determine if flat roof insulation is appropriate
+ :param is_flat: Boolean - indicates whether or not the roof is flat
+ :param measures: List - list of measures
+ :param has_flat_roof_recommendation: Boolean - indicates whether or not there is a flat roof non-invasive
+ recommendation
+ :param primary_roof_looks_sloped: Boolean - indicates if the primary roof looks like a sloped roof
+ :return: Boolean
+
+ When checking if has_flat_roof_recommendation and primary_roof_looks_sloped, we need to check both
+ conditions. This is because within a default EPC recommendation, the EPC will pair these recommendations
+ together. Therefore, weneed to ensure the primary roof isn't sloped
+ """
+
+ flat_roof_in_measures = "flat_roof_insulation" in measures
+
+ return (is_flat and flat_roof_in_measures) or (has_flat_roof_recommendation and not primary_roof_looks_sloped)
+
+ @staticmethod
+ def is_room_roof_insulation_appropriate(
+ is_room_roof, measures, rir_over_loft, has_room_roof_recommendation
+ ):
+ """
+ Determine if room roof insulation is appropriate
+ :param is_room_roof: Boolean - indicates whether or not the roof is a room roof
+ :param measures: List - list of measures
+ :param rir_over_loft: Boolean - indicates whether or not there we should be doing RIR insulation
+ :param has_room_roof_recommendation: Boolean - indicates whether or not there is a room roof non-invasive
+ recommendation
+ :return:
+ """
+ return is_room_roof and ("room_roof_insulation" in measures) or (
+ has_room_roof_recommendation or rir_over_loft
+ )
+
+ def _does_roof_need_recommendation(self, measures: List | None = None, u_value: float | None = None):
+ """
+ Utility function to recommend which contains the logic to determine whether the roof needs a recommendation
+ :return:
+ """
+ # If there is a property above, nothing can be done
if self.property.roof["has_dwelling_above"]:
- return
+ return False
- measures = MEASURE_MAP["roof_insulation"] if measures is None else measures
-
- u_value = self.property.roof["thermal_transmittance"]
-
- # If we have a flat roof but we don't have flat roof as a measure, we exit
+ # If we have a flat roof but not flat roof insulation recommendation
if self.property.roof["is_flat"] and "flat_roof_insulation" not in measures:
- return
+ return False
- # We check if the roof is already insulated and if so, we exit
-
- # Building regulations part L recommend installing at least 270mm of insulation, however generally we
- # experience diminishing returns in terms of SAP once we go beyond around 150mm of insulation
- # This only holds true for pitched roofs.
+ # Logic to check if we have an already insulated loft
if self.is_loft_already_insulated(measures):
- return
+ return False
+ # Logic to check if we have an insulated flat roof
if (self.insulation_thickness >= self.MINIMUM_FLAT_ROOF_ISULATION_MM) and self.property.roof["is_flat"]:
- return
+ return False
+ # Logic to check if we have an already insulated room in roof
if self.is_room_roof_insulated_or_unsuitable(measures):
- return
+ return False
if self.property.roof["is_thatched"]:
- return
+ return False
- # If we have a u-value and we don't have a non-invasive recommendation, we can't recommend anything
if (u_value is not None) and not any(
x in MEASURE_MAP["roof_insulation"] for x in [r["type"] for r in self.property.non_invasive_recommendations]
):
- # We don't have enough information to provide a recommendation
+ return False
+
+ return True
+
+ @staticmethod
+ def _does_primary_roof_look_sloped(
+ is_pitched: bool, is_loft: bool, is_assumed: bool
+ ):
+ """
+ Determine if the primary roof is sloped
+ :param is_pitched: bool - is the roof pitched
+ :param is_loft: bool - is the roof a loft
+ :param is_assumed: bool - is the roof insulation status assumed
+ :return:
+ """
+ # Conditions for this to be true
+ # Case 1
+ # In the property roof description (primary roof)
+ # 1) Pitched Roof
+ # 2) Uninsulated
+ # 3) Not assumed
+ if is_pitched and not is_loft and not is_assumed:
+ return True
+
+ return False
+
+ @staticmethod
+ def _deduce_primary_roof(component_needs: dict) -> str:
+ """
+ Helper function for deducing the primary roof type used by _handle_multi_roof_types
+ """
+
+ # Can a non-primary part satisfy loft insulation?
+ primary_needs_loft = component_needs[1]["needs_loft_insulation"]
+ secondary_needs_loft = any(
+ p['needs_loft_insulation'] for idx, p in component_needs.items() if idx != 1
+ )
+
+ if primary_needs_loft and not secondary_needs_loft:
+ # Only option is loft
+ return "loft"
+
+ primary_needs_sloping = component_needs[1]["needs_sloping_ceiling"]
+ secondary_needs_sloping = any(
+ p['needs_sloping_ceiling'] for idx, p in component_needs.items() if idx != 1
+ )
+
+ if primary_needs_sloping and not secondary_needs_sloping:
+ # Only option is sloping ceiling
+ return "sloping_ceiling"
+
+ return "loft_insulation" # Defer to the cheaper option
+
+ def _handle_multi_roof_types(
+ self,
+ measures: List,
+ find_my_epc_components: List[Mapping[str, Any]],
+ non_invasive_recommendations: List[Mapping[str, Any]],
+ has_sloping_ceiling_recommendation: bool,
+ has_loft_insulation_recommendation: bool,
+ rir_over_loft: bool
+ ) -> tuple[bool, bool]:
+ """
+ This is a rough function to handle some edge cases, where we have two roof descriptions where
+ both look like they could be sloping ceilings or lofts. In this case, we need to deduce
+ which roof is the primary roof, and therefore whether or not we should recommend sloping ceiling insulation
+ :param measures: List - list of measures
+ :param find_my_epc_components: List - list of components from find my epc
+ :param non_invasive_recommendations: List - list of non-invasive recommendations
+ :param has_sloping_ceiling_recommendation: Boolean - indicates whether or not there is a sloping ceiling
+ recommendation
+ :param has_loft_insulation_recommendation: Boolean - indicates whether or not there is a loft insulation
+ recommendation
+ :param rir_over_loft: Boolean - indicates whether or not there we should be doing RIR insulation
+ :return: tuple[bool, bool] - (needs_sloping_ceiling, needs_loft_insulation)
+ """
+
+ # We utilise the find my EPC data to solve cases where the primary roof and secondary roof
+ # being loft and sloped ceiling is ambiguous
+ # We need to:
+ # 1) Check if we have two roof types
+ # 2) check if both could be considered sloped
+ # 3) Check if we have two non-invasive recommendations for both roof types
+ # 4) Determine which roof is the primary roof
+
+ # We check a specific condition - which will imply loft insulation isn't appropriate but room in roof
+ # insulation is
+ # 1) We have an uninsulated loft (assumed)
+ # 2) We have a non-intrusive recommendation for room in roof insulation
+
+ # We only use this when we have sloping ceiling and loft insulation recommendations
+ # Components are indexed from 0
+
+ needs_sloping = True
+ needs_loft = True
+
+ roof_count = max(
+ x["appearance_index"] for x in find_my_epc_components if x["component_name"] == "Roof"
+ ) + 1
+
+ roof_non_invasive_recommendations = [
+ x["type"] for x in non_invasive_recommendations if x['type'] in ROOF_INSULATION_MEASURES
+ ]
+
+ has_both_recommendations = (
+ "loft_insulation" in roof_non_invasive_recommendations and \
+ "sloping_ceiling_insulation" in roof_non_invasive_recommendations
+ )
+
+ if (roof_count <= 1) or not has_both_recommendations:
+ if roof_count > 1:
+ if "loft_insulation" in roof_non_invasive_recommendations:
+ return not needs_sloping, needs_loft
+
+ if "sloping_ceiling_insulation" in roof_non_invasive_recommendations:
+ return needs_sloping, not needs_loft
+
+ return needs_sloping, not needs_loft # Indicates that the property needs sloping ceiling as we only run
+ # this in that case
+
+ extracted_roof_descriptions = {
+ idx: {
+ "description": component["description"],
+ **RoofAttributes(component["description"]).process()
+ } for idx, component in enumerate(find_my_epc_components) if component["component_name"] == "Roof"
+ }
+
+ component_needs = {}
+ for component_idx, mapped in extracted_roof_descriptions.items():
+ is_pitched = mapped["is_pitched"]
+ is_loft = mapped["is_loft"]
+ is_assumed = mapped["is_assumed"]
+ insulation_thickness = mapped["insulation_thickness"]
+ is_at_rafters = mapped["is_at_rafters"]
+ is_flat = mapped["is_flat"]
+
+ needs_sloping_ceiling = self.is_sloping_ceiling_appropriate(
+ is_flat=is_flat,
+ is_pitched=is_pitched,
+ is_loft=is_loft,
+ is_assumed=is_assumed,
+ has_sloping_ceiling_recommendation=has_sloping_ceiling_recommendation,
+ primary_roof_looks_sloped=True,
+ insulation_thickness=insulation_thickness,
+ has_loft_insulation_recommendation=has_loft_insulation_recommendation
+ )
+ # If the roof has some form of insulation already but isn't a loft, it's
+ # not a loft. E.g. "pitched, limited insulation" is for sloping ceiling, not loft
+ needs_loft_insulation = self.is_loft_insulation_appropriate(
+ measures=measures,
+ is_pitched=is_pitched,
+ is_at_rafters=is_at_rafters,
+ rir_over_loft=rir_over_loft,
+ insulation_thickness=insulation_thickness,
+ has_loft_insulation_recommendation=has_loft_insulation_recommendation,
+ is_assumed=is_assumed,
+ has_sloping_ceiling_recommendation=False
+ )
+
+ component_needs[component_idx] = {
+ "needs_sloping_ceiling": needs_sloping_ceiling,
+ "needs_loft_insulation": needs_loft_insulation
+ }
+
+ # Given the results we determine if the primary roof is sloped. The situation we may be in is
+ # one where the only otion is to assign one of the primary or secondary roof as a loft or sloped ceiling
+ # forcing our hand on whether the primary roof is sloped
+ primary_roof_type = self._deduce_primary_roof(component_needs)
+
+ if primary_roof_type in ["ambiguous", "sloping_ceiling"]:
+ return needs_sloping, not needs_loft # Set sloping ceiling to true, loft to false
+
+ return not needs_sloping, needs_loft # Set sloping ceiling to false, loft to true
+
+ def recommend(self, phase: int, measures: List | None = None, default_u_values: bool = False):
+ """
+ Main method to recommend roof insulation measures
+ :param phase: Integer - phase of the recommendation, determines the order in which recommendations are
+ applied to the property
+ :param measures: List - list of measures to consider for recommendation
+ :param default_u_values: Boolean - whether or not to use default u-values for the recommendations
+ :return:
+ """
+
+ measures = MEASURE_MAP["roof_insulation"] if measures is None else measures
+ u_value = self.property.roof["thermal_transmittance"]
+ property_needs_roof_recommendation = self._does_roof_need_recommendation(measures, u_value)
+
+ if not property_needs_roof_recommendation:
+ # Roof is either:
+ # - already sufficiently insulated
+ # - unsuitable (dwelling above, thatched, etc.)
+ # - not matching available measures
return
u_value = get_roof_u_value(
@@ -169,33 +506,103 @@ class RoofRecommendations:
)
self.estimated_u_value = u_value
+ # The Roof is already compliant - in this case, the u-value is beyond the requirements for
+ # Building Regs Part L and so we don't recommend anything
if (u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE) or all(
m not in measures for m in MEASURE_MAP["roof_insulation"]
):
- # The Roof is already compliant
return
non_invasive_recommendations = self.property.non_invasive_recommendations
- # We check a specific condition - which will imply loft insulation isn't appropriate but room in roof
- # insulation is
- # 1) We have an uninsulated loft (assumed)
- # 2) We have a non-intrusive recommendation for room in roof insulation
+ is_pitched = self.property.roof["is_pitched"]
+ is_loft = self.property.roof["is_loft"]
+ is_assumed = self.property.roof["is_assumed"]
+ is_at_rafters = self.property.roof["is_at_rafters"]
+ is_flat = self.property.roof["is_flat"]
+ is_room_roof = self.property.roof["is_roof_room"]
+ insulation_thickness = self.property.roof["insulation_thickness"]
+ has_sloping_ceiling_recommendation = any(
+ x["type"] == "sloping_ceiling_insulation" for x in non_invasive_recommendations
+ )
+ has_loft_insulation_recommendation = any(x["type"] == "loft_insulation" for x in non_invasive_recommendations)
+ has_flat_roof_recommendation = any(x["type"] == "flat_roof_insulation" for x in non_invasive_recommendations)
+ has_room_roof_recommendation = any(x["type"] == "room_roof_insulation" for x in non_invasive_recommendations)
+
+ primary_roof_looks_sloped = self._does_primary_roof_look_sloped(
+ is_pitched=is_pitched, is_loft=is_loft, is_assumed=is_assumed
+ )
rir_over_loft = (
- self.property.roof["is_pitched"] and
+ is_pitched and
self.property.roof["insulation_thickness"] == "none" and
- "room_in_roof_insulation" in [x["type"] for x in non_invasive_recommendations]
+ has_room_roof_recommendation
)
- # We firstly handle non-intrusive recommendations, which may override the normal roof insulation recommendations
- if ("loft_insulation" in [x["type"] for x in non_invasive_recommendations]) or (
- self.property.roof["is_pitched"] and "loft_insulation" in measures and
- not self.property.roof["is_at_rafters"]
- ) and not rir_over_loft:
+ needs_sloping_ceiling = self.is_sloping_ceiling_appropriate(
+ is_pitched=is_pitched,
+ is_flat=is_flat,
+ is_loft=is_loft,
+ is_assumed=is_assumed,
+ has_sloping_ceiling_recommendation=has_sloping_ceiling_recommendation,
+ primary_roof_looks_sloped=primary_roof_looks_sloped,
+ insulation_thickness=insulation_thickness,
+ has_loft_insulation_recommendation=has_loft_insulation_recommendation
+ )
+ needs_loft_insulation = self.is_loft_insulation_appropriate(
+ measures=measures,
+ is_pitched=is_pitched,
+ is_at_rafters=is_at_rafters,
+ rir_over_loft=rir_over_loft,
+ insulation_thickness=insulation_thickness,
+ has_loft_insulation_recommendation=has_loft_insulation_recommendation,
+ is_assumed=is_assumed,
+ has_sloping_ceiling_recommendation=has_sloping_ceiling_recommendation
+ )
+ needs_flat_roof_insulation = self.is_flat_roof_insulation_appropriate(
+ is_flat=is_flat,
+ measures=measures,
+ has_flat_roof_recommendation=has_flat_roof_recommendation,
+ primary_roof_looks_sloped=primary_roof_looks_sloped
+ )
+ needs_rir_insulation = self.is_room_roof_insulation_appropriate(
+ is_room_roof=is_room_roof,
+ measures=measures,
+ rir_over_loft=rir_over_loft,
+ has_room_roof_recommendation=has_room_roof_recommendation
+ )
+
+ # We handle possible multi roof types
+ if needs_sloping_ceiling:
+ # Multi-roof override:
+ # In ambiguous cases (extensions, mixed descriptions), EPC component analysis
+ # may force us to choose between loft vs sloping ceiling.
+ needs_sloping_ceiling, needs_loft_insulation = self._handle_multi_roof_types(
+ measures=measures,
+ find_my_epc_components=self.property.find_my_epc_components,
+ non_invasive_recommendations=non_invasive_recommendations,
+ has_sloping_ceiling_recommendation=has_sloping_ceiling_recommendation,
+ has_loft_insulation_recommendation=has_loft_insulation_recommendation,
+ rir_over_loft=rir_over_loft
+ )
+ # Explicit override
+ needs_flat_roof_insulation = False
+ needs_rir_insulation = False
+ if needs_sloping_ceiling and needs_loft_insulation:
+ raise RuntimeError(
+ "Multi-roof resolution produced conflicting outcomes: "
+ "both sloping ceiling and loft insulation required"
+ )
+
+ # Retrofit precedence (least → most invasive):
+ # Loft > Flat roof > Room in roof > Sloping ceiling
+
+ ################################################################
+ # ~~~~~ Loft Insulation Recommendation Logic ~~~~~
+ ################################################################
+ if needs_loft_insulation:
self.recommend_roof_insulation(
u_value=u_value,
- insulation_thickness=self.insulation_thickness,
phase=phase,
is_flat=False,
is_pitched=True,
@@ -203,13 +610,12 @@ class RoofRecommendations:
)
return
- if (
- (self.property.roof["is_flat"] and "flat_roof_insulation" in measures) or
- "flat_roof_insulation" in [x["type"] for x in non_invasive_recommendations]
- ):
+ ################################################################
+ # ~~~~~ Flat Roof Insulation Recommendation Logic ~~~~~
+ ################################################################
+ if needs_flat_roof_insulation:
self.recommend_roof_insulation(
u_value=u_value,
- insulation_thickness=0,
phase=phase,
is_flat=True,
is_pitched=False,
@@ -217,16 +623,34 @@ class RoofRecommendations:
)
return
+ ################################################################
+ # ~~~~~ Room Roof Insulation Recommendation Logic ~~~~~
+ ################################################################
# There are cases where the property might have a room roof as the second roof, but we have a recommendation for
# it, so we allow this override
- if self.property.roof["is_roof_room"] and ("room_roof_insulation" in measures) or (
- "room_roof_insulation" in [x["type"] for x in non_invasive_recommendations] or
- rir_over_loft
- ):
+ if needs_rir_insulation:
self.recommend_room_roof_insulation(u_value, phase, default_u_values)
return
- raise NotImplementedError("Implement me")
+ ####################################################################################################
+ # ~~~~~ Sloping Ceiling Insulation Recommendation Logic ~~~~~
+ ####################################################################################################
+ if needs_sloping_ceiling:
+ self.recommend_sloping_ceiling(
+ phase=phase,
+ u_value=u_value,
+ non_invasive_recommendations=non_invasive_recommendations
+ )
+ return
+
+ raise RuntimeError(
+ "Roof recommendation undecidable. "
+ f"needs_loft={needs_loft_insulation}, "
+ f"needs_flat={needs_flat_roof_insulation}, "
+ f"needs_rir={needs_rir_insulation}, "
+ f"needs_sloping={needs_sloping_ceiling}, "
+ f"roof={self.property.roof}"
+ )
@staticmethod
def make_roof_insulation_description(material):
@@ -245,7 +669,7 @@ class RoofRecommendations:
raise ValueError("Invalid material type")
def recommend_roof_insulation(
- self, u_value, insulation_thickness, phase, is_pitched, is_flat, default_u_values
+ self, u_value, phase, is_pitched, is_flat, default_u_values
):
"""
@@ -267,7 +691,6 @@ class RoofRecommendations:
could be traditional roofing materials like bitumen-based felt, rubber membranes like EPDM, or fiberglass.
:param u_value: U-value of the roof before any retrofit measures have been installed
- :param insulation_thickness: Existing Insulation thickness of the loft
:param phase: Phase of the recommendation
:param is_pitched: Is the roof pitched
:param is_flat: Is the roof flat
@@ -586,3 +1009,71 @@ class RoofRecommendations:
)
self.recommendations = recommendations
+
+ def recommend_sloping_ceiling(self, phase: int, u_value, non_invasive_recommendations: List[Mapping[str, Any]]):
+ """
+ Sloping ceiling insulation recommendations are different from other roof types, though
+ the description of the roof appears to be quite similar to a roof with a loft. In order to
+ deduce the roof type, we apply the following logic:
+
+ 1) If the roof is descrbed as pitched, insulated, without a loft insulation thickness, it's
+ an insulated sloped ceiling
+ 2) If the roof insulation is assumed, it implies that the surveyor could not gain access to the
+ roof and therefore it's a loft
+ 3) If it's a pitched roof that is uninsulated and is NOT assumed, and there is not loft insulation
+ recommendation, this implies that the surveyor was able to gain access to the roof and there was no
+ loft insulation recommendation so it must be a sloping ceiling since loft insulation is a default
+ recommendation for an uninsualted loft
+
+ Since we don't have any materials from installers for this specific recommendation, we
+ do not iterate through any materials. Instead, we provide a single recommendation, we estimated
+ prices based on desk research.
+ :return:
+ """
+
+ sloping_ceiling_recommendation = next(
+ (x for x in non_invasive_recommendations if x["type"] == "sloping_ceiling_insulation"), {}
+ )
+
+ new_description = "Pitched, insulated"
+ new_efficiency = "Average" # 75mm insulation only results in average performance category
+
+ roof_ending_config = RoofAttributes(new_description).process()
+ roof_simulation_config = check_simulation_difference(
+ new_config=roof_ending_config, old_config=self.property.roof, prefix="roof_"
+ )
+
+ # We pull out new u-values, based on 75mm of insulation, with u-values defined from Elmhurst
+ new_u_value = 0.5 # This doesn't change, regardless of starting u-value
+
+ simulation_config = {
+ **roof_simulation_config,
+ "roof_thermal_transmittance_ending": new_u_value,
+ "roof_energy_eff_ending": new_efficiency
+ }
+
+ cost_result = self.costs.sloping_ceiling_insulation(
+ insulation_roof_area=self.property.roof_area # For a pitched roof, this is the pitched roof area
+ )
+
+ self.recommendations = [
+ {
+ "phase": phase,
+ "parts": [],
+ "type": "sloping_ceiling_insulation",
+ "measure_type": "sloping_ceiling_insulation",
+ "description": "Insulate sloping ceilings at the rafters and re-decorate",
+ "starting_u_value": u_value,
+ "new_u_value": None,
+ "sap_points": sloping_ceiling_recommendation.get("sap_points", None),
+ "simulation_config": simulation_config,
+ "description_simulation": {
+ "roof-description": new_description,
+ "roof-energy-eff": new_efficiency
+ },
+ **cost_result,
+ "already_installed": "sloping_ceiling_insulation" in self.property.already_installed,
+ "survey": sloping_ceiling_recommendation.get("survey", None),
+ "innovation_rate": 0
+ }
+ ]
diff --git a/recommendations/tests/test_costs.py b/recommendations/tests/test_costs.py
index 752caf8c..10a63554 100644
--- a/recommendations/tests/test_costs.py
+++ b/recommendations/tests/test_costs.py
@@ -236,3 +236,11 @@ class TestCosts:
)
assert result['total'] == pytest.approx(expected_cost, rel=0.01)
+
+ def test_sloping_ceiling_insulation(self):
+ mock_property = Mock()
+ mock_property.data = {"county": "Mansfield"}
+ costs = Costs(mock_property)
+ res = costs.sloping_ceiling_insulation(insulation_roof_area=64.085)
+ assert res["total"] == 5238.713924924947
+ assert res["contingency"] == 1362.0656204804861
diff --git a/recommendations/tests/test_roof_recommendations.py b/recommendations/tests/test_roof_recommendations.py
index 2241aeb7..0879757f 100644
--- a/recommendations/tests/test_roof_recommendations.py
+++ b/recommendations/tests/test_roof_recommendations.py
@@ -1,7 +1,9 @@
+import pytest
+from unittest.mock import Mock
from backend.Property import Property
+from etl.epc.Record import EPCRecord
from recommendations.RoofRecommendations import RoofRecommendations
from recommendations.tests.test_data.materials import materials
-from etl.epc.Record import EPCRecord
class TestRoofRecommendations:
@@ -402,3 +404,374 @@ class TestRoofRecommendations:
roof_recommender14.recommend(phase=0)
assert not roof_recommender14.recommendations
+
+ # ~~~~~~~~~~~~ Sloping Ceiling Insulation ~~~~~~~~~~~~
+ @pytest.mark.parametrize(
+ "roof, has_sloping_ceiling_recommendation, primary_roof_looks_sloped, insulation_thickness, "
+ "has_loft_insulation_recommendation, expected_result",
+ [
+ (
+ {
+ 'original_description': 'Pitched, no insulation',
+ 'thermal_transmittance': None,
+ 'thermal_transmittance_unit': None,
+ 'is_pitched': True,
+ 'is_roof_room': False,
+ 'is_loft': False,
+ 'is_flat': False,
+ 'is_thatched': False,
+ 'is_at_rafters': False,
+ 'is_assumed': False,
+ 'has_dwelling_above': False,
+ 'is_valid': True,
+ 'insulation_thickness': 'none'
+ },
+ True,
+ True,
+ "none",
+ False,
+ True,
+ ),
+ (
+ {
+ 'original_description': 'Pitched, insulated (assumed)', 'clean_description': 'Pitched, insulated',
+ 'thermal_transmittance': None, 'thermal_transmittance_unit': None, 'is_pitched': True,
+ 'is_roof_room': False, 'is_loft': False, 'is_flat': False, 'is_thatched': False,
+ 'is_at_rafters': False, 'is_assumed': True, 'has_dwelling_above': False, 'is_valid': True,
+ 'insulation_thickness': 'average'
+ },
+ False,
+ False,
+ "average",
+ False,
+ False
+ )
+ ]
+ )
+ def test_is_sloping_ceiling_appropriate(
+ self, roof, has_sloping_ceiling_recommendation, primary_roof_looks_sloped,
+ insulation_thickness, has_loft_insulation_recommendation, expected_result
+ ):
+ assert RoofRecommendations.is_sloping_ceiling_appropriate(
+ is_flat=roof["is_flat"],
+ is_pitched=roof["is_pitched"],
+ is_loft=roof["is_loft"],
+ is_assumed=roof["is_assumed"],
+ has_sloping_ceiling_recommendation=has_sloping_ceiling_recommendation,
+ primary_roof_looks_sloped=primary_roof_looks_sloped,
+ insulation_thickness=insulation_thickness,
+ has_loft_insulation_recommendation=has_loft_insulation_recommendation
+ ) == expected_result
+
+ def test_sloping_ceiling_pitched_no_insulation(self):
+ property_instance = Mock(
+ id=0,
+ roof={
+ 'original_description': 'Pitched, no insulation', 'clean_description': 'Pitched, no insulation',
+ 'thermal_transmittance': None, 'thermal_transmittance_unit': None, 'is_pitched': True,
+ 'is_roof_room': False, 'is_loft': False, 'is_flat': False, 'is_thatched': False,
+ 'is_at_rafters': False, 'is_assumed': False, 'has_dwelling_above': False, 'is_valid': True,
+ 'insulation_thickness': 'none'
+ },
+ roof_area=64.085,
+ data={"county": None, "local-authority-label": "Manchester"},
+ age_band="D",
+ already_installed=[],
+ non_invasive_recommendations=[
+ {'type': 'flat_roof_insulation', 'sap_points': 9, 'survey': True},
+ {'type': 'sloping_ceiling_insulation', 'sap_points': 9, 'survey': True},
+ {'type': 'cavity_wall_insulation', 'sap_points': 6, 'survey': True},
+ {'type': 'suspended_floor_insulation', 'sap_points': 2, 'survey': True},
+ {'type': 'roomstat_programmer_trvs', 'sap_points': 3, 'survey': True},
+ {'type': 'time_temperature_zone_control', 'sap_points': 3, 'survey': True},
+ {'type': 'solar_pv', 'sap_points': 5, 'survey': True, 'suitable': True}
+ ],
+ find_my_epc_components=[
+ {'component_name': 'Wall', 'description': 'Solid brick, as built, no insulation (assumed)',
+ 'efficiency': 'Very poor', 'appearance_index': 0},
+ {'component_name': 'Roof', 'description': 'Pitched, no insulation', 'efficiency': 'Very poor',
+ 'appearance_index': 0},
+ {'component_name': 'Roof', 'description': 'Pitched, limited insulation', 'efficiency': 'Very poor',
+ 'appearance_index': 1},
+ {'component_name': 'Window', 'description': 'Some multiple glazing', 'efficiency': 'Very poor',
+ 'appearance_index': 0},
+ {'component_name': 'Main heating', 'description': 'Boiler and radiators, mains gas',
+ 'efficiency': 'Good', 'appearance_index': 0},
+ {'component_name': 'Main heating control', 'description': 'Programmer, room thermostat and TRVs',
+ 'efficiency': 'Good', 'appearance_index': 0},
+ {'component_name': 'Hot water', 'description': 'From main system', 'efficiency': 'Good',
+ 'appearance_index': 0},
+ {'component_name': 'Lighting', 'description': 'Low energy lighting in 28% of fixed outlets',
+ 'efficiency': 'Average', 'appearance_index': 0},
+ {'component_name': 'Floor', 'description': 'Solid, no insulation (assumed)', 'efficiency': 'N/A',
+ 'appearance_index': 0},
+ {'component_name': 'Secondary heating', 'description': 'None', 'efficiency': 'N/A',
+ 'appearance_index': 0}
+ ]
+
+ )
+
+ roof_recommender = RoofRecommendations(property_instance=property_instance, materials=[])
+ assert not roof_recommender.recommendations
+
+ roof_recommender.recommend(phase=0)
+ assert len(roof_recommender.recommendations) == 1
+
+ assert roof_recommender.recommendations[0]["type"] == "sloping_ceiling_insulation"
+ assert roof_recommender.recommendations[0]["measure_type"] == "sloping_ceiling_insulation"
+ assert (
+ roof_recommender.recommendations[0]["description"] ==
+ "Insulate sloping ceilings at the rafters and re-decorate"
+ )
+ assert roof_recommender.recommendations[0]["simulation_config"] == {
+ 'roof_insulation_thickness_ending': 'average',
+ 'roof_thermal_transmittance_ending': 0.5,
+ 'roof_energy_eff_ending': 'Average'
+ }
+
+ assert roof_recommender.recommendations[0]["description_simulation"] == {
+ 'roof-description': 'Pitched, insulated', 'roof-energy-eff': 'Average'
+ }
+
+ def test_ambiguous_sloping_ceiling_or_loft(self):
+ # In this case, we actually expect loft insulation to be recommended
+ property_instance = Mock(
+ id=0,
+ roof={
+ # Roof looks like it could be a sloping ceiling but it's actually a loft
+ 'original_description': 'Pitched, no insulation', 'clean_description': 'Pitched, no insulation',
+ 'thermal_transmittance': None, 'thermal_transmittance_unit': None, 'is_pitched': True,
+ 'is_roof_room': False, 'is_loft': False, 'is_flat': False, 'is_thatched': False,
+ 'is_at_rafters': False, 'is_assumed': False, 'has_dwelling_above': False, 'is_valid': True,
+ 'insulation_thickness': 'none'
+ },
+ roof_area=197.748,
+ data={"county": None, "local-authority-label": "Manchester"},
+ already_installed=[],
+ find_my_epc_components=[
+ {'component_name': 'Wall', 'description': 'Solid brick, as built, no insulation (assumed)',
+ 'efficiency': 'Very poor', 'appearance_index': 0},
+ {'component_name': 'Roof', 'description': 'Pitched, no insulation', 'efficiency': 'Very poor',
+ 'appearance_index': 0},
+ {'component_name': 'Roof', 'description': 'Pitched, limited insulation', 'efficiency': 'Very poor',
+ 'appearance_index': 1},
+ {'component_name': 'Window', 'description': 'Some multiple glazing', 'efficiency': 'Very poor',
+ 'appearance_index': 0},
+ {'component_name': 'Main heating', 'description': 'Boiler and radiators, mains gas',
+ 'efficiency': 'Good', 'appearance_index': 0},
+ {'component_name': 'Main heating control', 'description': 'Programmer, room thermostat and TRVs',
+ 'efficiency': 'Good', 'appearance_index': 0},
+ {'component_name': 'Hot water', 'description': 'From main system', 'efficiency': 'Good',
+ 'appearance_index': 0},
+ {'component_name': 'Lighting', 'description': 'Low energy lighting in 28% of fixed outlets',
+ 'efficiency': 'Average', 'appearance_index': 0},
+ {'component_name': 'Floor', 'description': 'Solid, no insulation (assumed)', 'efficiency': 'N/A',
+ 'appearance_index': 0},
+ {'component_name': 'Secondary heating', 'description': 'None', 'efficiency': 'N/A',
+ 'appearance_index': 0}
+ ],
+ age_band="B",
+ non_invasive_recommendations=[
+ {'type': 'loft_insulation', 'sap_points': 3, 'survey': True},
+ {'type': 'flat_roof_insulation', 'sap_points': 2, 'survey': True},
+ {'type': 'sloping_ceiling_insulation', 'sap_points': 2, 'survey': True},
+ {'type': 'internal_wall_insulation', 'sap_points': 9, 'survey': True},
+ {'type': 'draught_proofing', 'sap_points': 1, 'survey': True},
+ {'type': 'low_energy_lighting', 'sap_points': 1, 'survey': True},
+ {'type': 'solar_water_heating', 'sap_points': 1, 'survey': True},
+ {'type': 'double_glazing', 'sap_points': 3, 'survey': True},
+ {'type': 'solar_pv', 'sap_points': 4, 'survey': True, 'suitable': True}
+ ],
+ insulation_floor_area=162
+ )
+
+ roof_recommender = RoofRecommendations(property_instance=property_instance, materials=materials)
+ assert not roof_recommender.recommendations
+
+ roof_recommender.recommend(phase=0)
+ assert len(roof_recommender.recommendations) == 3
+
+ # Should all be loft insulation recommendations
+ assert all(
+ rec["type"] == "loft_insulation" for rec in roof_recommender.recommendations
+ )
+
+ def test_no_access_pitched_roof_assumed(self):
+ """
+ In this case, the roof will have been surveyed as pitched, but the surveyor won't
+ have gotten access to the property to check the insulation. Therefore, we
+ recommend loft insulation. We assume that the roof is a locked off loft
+ :return:
+ """
+
+ property_instance = Mock(
+ id=0,
+ roof={
+ 'original_description': 'Pitched, limited insulation (assumed)',
+ 'clean_description': 'Pitched, limited insulation', 'thermal_transmittance': None,
+ 'thermal_transmittance_unit': None, 'is_pitched': True, 'is_roof_room': False, 'is_loft': False,
+ 'is_flat': False, 'is_thatched': False, 'is_at_rafters': False, 'is_assumed': True,
+ 'has_dwelling_above': False, 'is_valid': True, 'insulation_thickness': 'below average'
+ },
+ roof_area=73.24,
+ data={"county": None, "local-authority-label": "Manchester"},
+ already_installed=[],
+ find_my_epc_components=[
+ {'component_name': 'Wall', 'description': 'Solid brick, as built, no insulation (assumed)',
+ 'efficiency': 'Very poor', 'appearance_index': 0},
+ {'component_name': 'Wall', 'description': 'System built, as built, no insulation (assumed)',
+ 'efficiency': 'Poor', 'appearance_index': 1},
+ {'component_name': 'Wall', 'description': 'Cavity wall, filled cavity', 'efficiency': 'Average',
+ 'appearance_index': 2},
+ {'component_name': 'Roof', 'description': 'Pitched, limited insulation (assumed)',
+ 'efficiency': 'Very poor', 'appearance_index': 0},
+ {'component_name': 'Window', 'description': 'Fully double glazed', 'efficiency': 'Average',
+ 'appearance_index': 0},
+ {'component_name': 'Main heating', 'description': 'Boiler and radiators, mains gas',
+ 'efficiency': 'Good', 'appearance_index': 0},
+ {'component_name': 'Main heating control', 'description': 'Programmer and room thermostat',
+ 'efficiency': 'Average', 'appearance_index': 0},
+ {'component_name': 'Hot water', 'description': 'From main system', 'efficiency': 'Good',
+ 'appearance_index': 0},
+ {'component_name': 'Lighting', 'description': 'Low energy lighting in 75% of fixed outlets',
+ 'efficiency': 'Very good', 'appearance_index': 0},
+ {'component_name': 'Roof', 'description': '(another dwelling above)', 'efficiency': 'N/A',
+ 'appearance_index': 1},
+ {'component_name': 'Floor', 'description': 'Suspended, no insulation (assumed)', 'efficiency': 'N/A',
+ 'appearance_index': 0},
+ {'component_name': 'Floor', 'description': 'Solid, no insulation (assumed)', 'efficiency': 'N/A',
+ 'appearance_index': 1},
+ {'component_name': 'Secondary heating', 'description': 'None', 'efficiency': 'N/A',
+ 'appearance_index': 0}
+ ],
+ age_band="B",
+ non_invasive_recommendations=[
+ {'type': 'internal_wall_insulation', 'sap_points': 2, 'survey': True},
+ {'type': 'suspended_floor_insulation', 'sap_points': 2, 'survey': True},
+ {'type': 'solid_floor_insulation', 'sap_points': 1, 'survey': True},
+ {'type': 'low_energy_lighting', 'sap_points': 0, 'survey': True}
+ ],
+ insulation_floor_area=60
+ )
+
+ roof_recommender = RoofRecommendations(property_instance=property_instance, materials=materials)
+ assert not roof_recommender.recommendations
+
+ roof_recommender.recommend(phase=0)
+ assert len(roof_recommender.recommendations) == 3
+
+ # Should all be loft insulation recommendations
+ assert all(
+ rec["type"] == "loft_insulation" for rec in roof_recommender.recommendations
+ )
+
+ def test_traditional_loft_insulation(self):
+ property_instance = Mock(
+ id=0,
+ roof={
+ 'original_description': 'Pitched, no insulation', 'clean_description': 'Pitched, no insulation',
+ 'thermal_transmittance': None, 'thermal_transmittance_unit': None, 'is_pitched': True,
+ 'is_roof_room': False, 'is_loft': False, 'is_flat': False, 'is_thatched': False,
+ 'is_at_rafters': False, 'is_assumed': False, 'has_dwelling_above': False, 'is_valid': True,
+ 'insulation_thickness': 'none'
+ },
+ roof_area=48.82666666666667,
+ data={"county": None, "local-authority-label": "Manchester"},
+ already_installed=[],
+ find_my_epc_components=[
+ {'component_name': 'Wall', 'description': 'Cavity wall, filled cavity', 'efficiency': 'Good',
+ 'appearance_index': 0},
+ {'component_name': 'Roof', 'description': 'Pitched, no insulation', 'efficiency': 'Very poor',
+ 'appearance_index': 0},
+ {'component_name': 'Window', 'description': 'Fully double glazed', 'efficiency': 'Good',
+ 'appearance_index': 0},
+ {'component_name': 'Main heating', 'description': 'Boiler and radiators, mains gas',
+ 'efficiency': 'Good', 'appearance_index': 0},
+ {'component_name': 'Main heating control', 'description': 'TRVs and bypass', 'efficiency': 'Average',
+ 'appearance_index': 0},
+ {'component_name': 'Hot water', 'description': 'From main system', 'efficiency': 'Good',
+ 'appearance_index': 0},
+ {'component_name': 'Lighting', 'description': 'Low energy lighting in all fixed outlets',
+ 'efficiency': 'Very good', 'appearance_index': 0},
+ {'component_name': 'Floor', 'description': 'Solid, no insulation (assumed)', 'efficiency': 'N/A',
+ 'appearance_index': 0},
+ {'component_name': 'Secondary heating', 'description': 'Room heaters, electric', 'efficiency': 'N/A',
+ 'appearance_index': 0}
+ ],
+ age_band="F",
+ non_invasive_recommendations=[
+ {'type': 'loft_insulation', 'sap_points': 9, 'survey': True},
+ {'type': 'solid_floor_insulation', 'sap_points': 2, 'survey': True},
+ {'type': 'solar_water_heating', 'sap_points': 1, 'survey': True},
+ {'type': 'solar_pv', 'sap_points': 11, 'survey': True, 'suitable': True}
+ ],
+ insulation_floor_area=40.0
+ )
+
+ roof_recommender = RoofRecommendations(property_instance=property_instance, materials=materials)
+ assert not roof_recommender.recommendations
+
+ roof_recommender.recommend(0)
+ assert len(roof_recommender.recommendations) == 3
+ # should all be loft insulation recommendations
+ assert all(rec["type"] == "loft_insulation" for rec in roof_recommender.recommendations)
+
+ def sloping_ceiling_limited_insulation(self):
+ property_instance = Mock(
+ id=0,
+ roof={
+ "original_description": 'Pitched, limited insulation (assumed)',
+ 'clean_description': 'Pitched, limited insulation',
+ 'thermal_transmittance': None, 'thermal_transmittance_unit': None, 'is_pitched': True,
+ 'is_roof_room': False, 'is_loft': False, 'is_flat': False, 'is_thatched': False, 'is_at_rafters': False,
+ 'is_assumed': True, 'has_dwelling_above': False, 'is_valid': True,
+ 'insulation_thickness': 'below average'
+ },
+ roof_area=35,
+ data={"county": None, "local-authority-label": "Manchester"},
+ already_installed=[],
+ find_my_epc_components=[
+ {'component_name': 'Wall', 'description': 'Cavity wall, as built, no insulation (assumed)',
+ 'efficiency': 'poor', 'appearance_index': 0},
+ {'component_name': 'Roof', 'description': 'Pitched, limited insulation (assumed)',
+ 'efficiency': 'Very poor', 'appearance_index': 0},
+ {'component_name': 'Window', 'description': 'Fully double glazed', 'efficiency': 'Average',
+ 'appearance_index': 0},
+ {'component_name': 'Main heating', 'description': 'Boiler and radiators, mains gas',
+ 'efficiency': 'Good', 'appearance_index': 0},
+ {'component_name': 'Main heating control', 'description': 'TRVs and bypass',
+ 'efficiency': 'Average', 'appearance_index': 0},
+ {'component_name': 'Hot water', 'description': 'From main system', 'efficiency': 'Good',
+ 'appearance_index': 0},
+ {'component_name': 'Lighting', 'description': 'Low energy lighting in all fixed outlets',
+ 'efficiency': 'Very good', 'appearance_index': 0},
+ {'component_name': 'Floor', 'description': '(another dwelling below)', 'efficiency': 'N/A',
+ 'appearance_index': 0},
+ {'component_name': 'Secondary heating', 'description': 'None', 'efficiency': 'N/A',
+ 'appearance_index': 0}
+ ],
+ age_band="B",
+ non_invasive_recommendations=[
+ {'type': 'sloping_ceiling_insulation', 'sap_points': 2, 'survey': True},
+ {'type': 'flat_roof_insulation', 'sap_points': 2, 'survey': True},
+ ],
+ )
+
+ # We expect a sloping ceiling insulation recommendation
+ roof_recommender = RoofRecommendations(property_instance=property_instance, materials=materials)
+ assert not roof_recommender.recommendations
+
+ roof_recommender.recommend(phase=0)
+ assert len(roof_recommender.recommendations) == 1
+ assert roof_recommender.recommendations[0]["type"] == "sloping_ceiling_insulation"
+ assert roof_recommender.recommendations[0]["measure_type"] == "sloping_ceiling_insulation"
+ assert roof_recommender.recommendations[0]["description"] == \
+ "Insulate sloping ceilings at the rafters and re-decorate"
+ assert roof_recommender.recommendations[0]["simulation_config"] == {
+ 'roof_insulation_thickness_ending': 'average',
+ 'roof_thermal_transmittance_ending': 0.5,
+ 'roof_energy_eff_ending': 'Average'
+ }
+ assert roof_recommender.recommendations[0]["description_simulation"] == {
+ 'roof-description': 'Pitched, insulated', 'roof-energy-eff': 'Average'
+ }
diff --git a/recommendations/tests/test_wall_recommendations.py b/recommendations/tests/test_wall_recommendations.py
index 18560118..c54582ad 100644
--- a/recommendations/tests/test_wall_recommendations.py
+++ b/recommendations/tests/test_wall_recommendations.py
@@ -1,6 +1,4 @@
-import os
import pytest
-import pickle
import numpy as np
from unittest.mock import Mock, MagicMock