merged from main

This commit is contained in:
Jun-te Kim 2026-02-03 17:01:19 +00:00
commit 96a6557e4b
30 changed files with 1881 additions and 384 deletions

View file

@ -9,10 +9,20 @@ services:
command: sleep infinity
volumes:
- ../../:/workspaces/model
networks:
- model-net
networks:
model-net:
driver: bridge
db:
image: postgres:17.4
restart: unless-stopped
ports:
- 5432:5432
environment:
- PGDATABASE=tech_team_local_db
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=makingwarmerhomes
volumes:
- postgres-data-two:/var/lib/postgresql/data
volumes:
postgres-data-two:

6
.idea/copilot.data.migration.agent.xml generated Normal file
View file

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AgentMigrationStateService">
<option name="migrationStatus" value="COMPLETED" />
</component>
</project>

6
.idea/copilot.data.migration.ask.xml generated Normal file
View file

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AskMigrationStateService">
<option name="migrationStatus" value="COMPLETED" />
</component>
</project>

View file

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Ask2AgentMigrationStateService">
<option name="migrationStatus" value="COMPLETED" />
</component>
</project>

6
.idea/copilot.data.migration.edit.xml generated Normal file
View file

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="EditMigrationStateService">
<option name="migrationStatus" value="COMPLETED" />
</component>
</project>

View file

@ -12,35 +12,23 @@ from asset_list.utils import get_data
from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv(
"EPC_AUTH_TOKEN",
"a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzA=",
)
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN", "a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzA=")
def extract_address1(
asset_list, full_address_col, postcode_col, method="first_two_words"
):
def extract_address1(asset_list, full_address_col, postcode_col, method="first_two_words"):
if method == "first_two_words":
asset_list["address1_extracted"] = (
asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
)
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[:2].str.join(" ")
return asset_list
if method == "first_word":
asset_list["address1_extracted"] = (
asset_list[full_address_col].str.split(" ").str[0]
)
asset_list["address1_extracted"] = asset_list[full_address_col].str.split(" ").str[0]
return asset_list
if method == "house_number_extraction":
asset_list["address1_extracted"] = asset_list.apply(
lambda x: SearchEpc.get_house_number(
address=x[full_address_col], postcode=x[postcode_col]
),
axis=1,
lambda x: SearchEpc.get_house_number(address=x[full_address_col], postcode=x[postcode_col]),
axis=1
)
return asset_list
@ -69,24 +57,65 @@ def app():
EPC recommendations
Property UPRN
"""
data_folder = "/workspaces/model/asset_list"
<<<<<<< HEAD
data_folder = ("/workspaces/model/asset_list")
data_filename = "assets.xlsx"
=======
data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Hackney"
data_filename = "Domna SHF Wave 3 (3).xlsx"
sheet_name = "Domna Wave 3"
postcode_column = 'Postcode'
address1_column = "Address 1"
address1_method = None
fulladdress_column = None
address_cols_to_concat = ["Address 1"]
missing_postcodes_method = None
landlord_year_built = "Construction Years"
landlord_os_uprn = "UPRN"
landlord_property_type = "Type"
landlord_built_form = "Attachment"
landlord_wall_construction = "Wall type"
landlord_roof_construction = None
landlord_heating_system = None
landlord_existing_pv = None
landlord_property_id = "Row ID"
landlord_sap = None
outcomes_filename = None
outcomes_sheetname = None
outcomes_postcode = None
outcomes_houseno = None
outcomes_id = None
outcomes_address = None
master_filepaths = []
master_id_colnames = []
master_to_asset_list_filepath = None
phase = False
ecosurv_landlords = None
asset_list_header = 0
landlord_block_reference = None
# Peabody data for cleaning
data_folder = ("/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting "
"Project/data_validation")
data_filename = "to_standardise_uprns.xlsx"
>>>>>>> 3874da6177cbcc37f7a488bec0a06e387906653c
sheet_name = "Sheet1"
postcode_column = "POSTCODE"
postcode_column = 'Postcode'
address1_column = None
address1_method = "house_number_extraction"
fulladdress_column = "ADDRESS"
address1_method = 'house_number_extraction'
fulladdress_column = 'Address'
address_cols_to_concat = None
missing_postcodes_method = None
landlord_year_built = None
landlord_os_uprn = "UPRN"
landlord_os_uprn = None
landlord_property_type = None
landlord_built_form = "BUILD FORM"
landlord_built_form = None
landlord_wall_construction = None
landlord_roof_construction = None
landlord_heating_system = None
landlord_existing_pv = None
landlord_property_id = "UPRN"
landlord_property_id = "LLUPRN"
landlord_sap = None
outcomes_filename = None
outcomes_sheetname = None
@ -126,62 +155,49 @@ def app():
landlord_existing_pv=landlord_existing_pv,
landlord_sap=landlord_sap,
landlord_block_reference=landlord_block_reference,
phase=phase,
phase=phase
)
asset_list.init_standardise()
# We produce the new maps, which can be saved for future useage
new_property_type_map = {
k: v
for k, v in (
asset_list.variable_mappings[asset_list.landlord_property_type]
if asset_list.landlord_property_type
else {}
k: v for k, v in (
asset_list.variable_mappings[asset_list.landlord_property_type] if
asset_list.landlord_property_type else {}
).items()
if k not in PROPERTY_MAPPING
}
new_built_form_map = {
k: v
for k, v in (
asset_list.variable_mappings[asset_list.landlord_built_form]
if asset_list.landlord_built_form
else {}
k: v for k, v in (
asset_list.variable_mappings[asset_list.landlord_built_form] if
asset_list.landlord_built_form else {}
).items()
if k not in BUILT_FORM_MAPPINGS
}
new_wall_map = {
k: v
for k, v in (
asset_list.variable_mappings[asset_list.landlord_wall_construction]
if asset_list.landlord_wall_construction
else {}
k: v for k, v in (
asset_list.variable_mappings[asset_list.landlord_wall_construction] if
asset_list.landlord_wall_construction else {}
).items()
if k not in WALL_CONSTRUCTION_MAPPINGS
}
new_heating_map = {
k: v
for k, v in (
asset_list.variable_mappings[asset_list.landlord_heating_system]
if asset_list.landlord_heating_system
else {}
k: v for k, v in (
asset_list.variable_mappings[asset_list.landlord_heating_system] if
asset_list.landlord_heating_system else {}
).items()
if k not in HEATING_MAPPINGS
}
new_existing_pv_map = {
k: v
for k, v in (
asset_list.variable_mappings[asset_list.landlord_existing_pv]
if asset_list.landlord_existing_pv
else {}
k: v for k, v in (
asset_list.variable_mappings[asset_list.landlord_existing_pv] if asset_list.landlord_existing_pv else {}
).items()
if k not in EXISTING_PV_MAPPINGS
}
new_roof_construction_map = {
k: v
for k, v in (
asset_list.variable_mappings[asset_list.landlord_roof_construction]
if asset_list.landlord_roof_construction
else {}
k: v for k, v in (
asset_list.variable_mappings[asset_list.landlord_roof_construction] if
asset_list.landlord_roof_construction else {}
).items()
if k not in ROOF_CONSTRUCTION_MAPPINGS
}
@ -195,7 +211,7 @@ def app():
outcomes_address=outcomes_address,
outcomes_postcode=outcomes_postcode,
outcomes_houseno=outcomes_houseno,
outcomes_id=outcomes_id,
outcomes_id=outcomes_id
)
asset_list.flag_survey_master(
@ -229,16 +245,14 @@ def app():
skip = max(chunk_indexes)
if any(x in folder_contents for x in downloaded_files):
skip = max(
[i for i in chunk_indexes if filename.format(i=i) in folder_contents]
)
skip = max([i for i in chunk_indexes if filename.format(i=i) in folder_contents])
for i in range(0, len(asset_list.standardised_asset_list), chunk_size):
print(f"Processing chunk {i} to {i + chunk_size}")
if skip is not None and not force_retrieve_data:
if i <= skip:
continue
chunk = asset_list.standardised_asset_list[i : i + chunk_size]
chunk = asset_list.standardised_asset_list[i:i + chunk_size]
epc_data_chunk, errors_chunk, no_epc_chunk = get_data(
df=chunk,
row_id_name=asset_list.DOMNA_PROPERTY_ID,
@ -250,7 +264,7 @@ def app():
built_form_column=AssetList.STANDARD_BUILT_FORM,
manual_uprn_map=manual_uprn_map,
epc_api_only=epc_api_only,
epc_auth_token=EPC_AUTH_TOKEN,
epc_auth_token=EPC_AUTH_TOKEN
)
# We now retrieve any failed properties
@ -273,9 +287,7 @@ def app():
# Append the failed data to the main data
# Store the chunk locally as a csv
pd.DataFrame(epc_data_chunk).to_csv(
os.path.join(data_folder, f"Chunks/Chunk {i}.csv"), index=False
)
pd.DataFrame(epc_data_chunk).to_csv(os.path.join(data_folder, f"Chunks/Chunk {i}.csv"), index=False)
# Store the errors and no-data locally
with open(os.path.join(data_folder, f"Chunks/Chunk {i} errors.json"), "w") as f:
json.dump(errors_chunk, f)
@ -306,9 +318,7 @@ def app():
unique_recommendations = set()
for _, row in recommendations_df.iterrows():
unique_recommendations.update(
[rec["improvement-summary-text"] for rec in row["recommendations"]]
)
unique_recommendations.update([rec["improvement-summary-text"] for rec in row["recommendations"]])
columns = [asset_list.DOMNA_PROPERTY_ID] + list(unique_recommendations)
transformed_data = []
@ -328,24 +338,20 @@ def app():
transformed_df = pd.DataFrame(transformed_data)
for col in [
"Floor insulation (solid floor)",
"Floor insulation",
"Floor insulation (suspended floor)",
"Floor insulation", "Floor insulation (suspended floor)"
]:
if col not in transformed_df.columns:
transformed_df[col] = False
transformed_df = transformed_df[
[
asset_list.DOMNA_PROPERTY_ID,
"Floor insulation (solid floor)",
"Floor insulation",
"Floor insulation (suspended floor)",
asset_list.DOMNA_PROPERTY_ID, "Floor insulation (solid floor)",
"Floor insulation", "Floor insulation (suspended floor)"
]
]
transformed_df["epc_has_floor_recommendation"] = (
transformed_df["Floor insulation (solid floor)"]
| transformed_df["Floor insulation"]
| transformed_df["Floor insulation (suspended floor)"]
transformed_df["Floor insulation (solid floor)"] | transformed_df["Floor insulation"] |
transformed_df["Floor insulation (suspended floor)"]
)
# Get the find my epc data
@ -358,20 +364,21 @@ def app():
find_my_epc_data.append(
{
asset_list.DOMNA_PROPERTY_ID: x[asset_list.DOMNA_PROPERTY_ID],
**x["find_my_epc_data"],
**x["find_my_epc_data"]
}
)
else:
find_my_epc_data.append(
{asset_list.DOMNA_PROPERTY_ID: x[asset_list.DOMNA_PROPERTY_ID]}
{
asset_list.DOMNA_PROPERTY_ID: x[asset_list.DOMNA_PROPERTY_ID]
}
)
find_my_epc_data = pd.DataFrame(find_my_epc_data)
find_my_epc_data = find_my_epc_data.merge(
transformed_df[[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"]],
how="left",
on=asset_list.DOMNA_PROPERTY_ID,
how="left", on=asset_list.DOMNA_PROPERTY_ID
)
# We check if we get the solar pv column:
@ -381,26 +388,24 @@ def app():
# Retrieve just the data we need
epc_df = epc_df[
[asset_list.DOMNA_PROPERTY_ID] + list(asset_list.EPC_API_DATA_NAMES.keys())
].rename(columns=asset_list.EPC_API_DATA_NAMES)
].rename(
columns=asset_list.EPC_API_DATA_NAMES
)
# Look for columns not in the find my EPC data, which will have happened if we didn't
# retrieve it in the first place
missed_find_epc_cols = [
c
for c in list(asset_list.FIND_EPC_DATA_NAMES.keys())
if c not in find_my_epc_data.columns
]
missed_find_epc_cols = [c for c in list(asset_list.FIND_EPC_DATA_NAMES.keys()) if c not in find_my_epc_data.columns]
if missed_find_epc_cols:
for c in missed_find_epc_cols:
find_my_epc_data[c] = None
epc_df = epc_df.merge(
find_my_epc_data[
[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"]
+ list(asset_list.FIND_EPC_DATA_NAMES.keys())
].rename(columns=asset_list.FIND_EPC_DATA_NAMES),
[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"] + list(asset_list.FIND_EPC_DATA_NAMES.keys())
]
.rename(columns=asset_list.FIND_EPC_DATA_NAMES),
how="left",
on=asset_list.DOMNA_PROPERTY_ID,
on=asset_list.DOMNA_PROPERTY_ID
)
asset_list.merge_data(epc_df)
@ -417,10 +422,7 @@ def app():
asset_list.get_work_figures()
# Store as an excel
filename = (
os.path.join(data_folder, ".".join(data_filename.split(".")[:-1]))
+ " - Standardised.xlsx"
)
filename = os.path.join(data_folder, ".".join(data_filename.split(".")[:-1])) + " - Standardised.xlsx"
# Store the data in two tabs. One for the asset list with the EPC data and the second with the flat data
# Determine inspections priority
@ -444,42 +446,26 @@ def app():
# )
with pd.ExcelWriter(filename) as writer:
asset_list.standardised_asset_list.to_excel(
writer, sheet_name="Standardised Asset List", index=False
)
asset_list.standardised_asset_list.to_excel(writer, sheet_name="Standardised Asset List", index=False)
if asset_list.block_analysis_df is not None:
asset_list.block_analysis_df.to_excel(
writer, sheet_name="Block Analysis", index=False
)
asset_list.block_analysis_df.to_excel(writer, sheet_name="Block Analysis", index=False)
# If we have outcomes, we add a tab with the outcomes
if not asset_list.outcomes_for_output.empty:
asset_list.outcomes_for_output.to_excel(
writer, sheet_name="Outcomes", index=False
)
asset_list.outcomes_for_output.to_excel(writer, sheet_name="Outcomes", index=False)
if not asset_list.unmatched_submissions.empty:
asset_list.unmatched_submissions.to_excel(
writer, sheet_name="Unmatched Submissions", index=False
)
asset_list.unmatched_submissions.to_excel(writer, sheet_name="Unmatched Submissions", index=False)
if not asset_list.outcomes_no_match.empty:
asset_list.outcomes_no_match.to_excel(
writer, sheet_name="Unmatched Outcomes", index=False
)
asset_list.outcomes_no_match.to_excel(writer, sheet_name="Unmatched Outcomes", index=False)
if not asset_list.ecosurv_no_match.empty:
asset_list.ecosurv_no_match.to_excel(
writer, sheet_name="Unmatched Ecosurv", index=False
)
asset_list.ecosurv_no_match.to_excel(writer, sheet_name="Unmatched Ecosurv", index=False)
if not asset_list.geographical_areas.empty:
asset_list.geographical_areas.to_excel(
writer, sheet_name="Geographical Areas", index=False
)
asset_list.geographical_areas.to_excel(writer, sheet_name="Geographical Areas", index=False)
# Store dupes
if asset_list.duplicated_addresses is not None:
if not asset_list.duplicated_addresses.empty:
asset_list.duplicated_addresses.to_excel(
writer, sheet_name="Duplicate Properties", index=False
)
asset_list.duplicated_addresses.to_excel(writer, sheet_name="Duplicate Properties", index=False)

22
backend/.env.local Normal file
View file

@ -0,0 +1,22 @@
DB_HOST=db
DB_PORT=5432
DB_NAME=tech_team_local_db
DB_USERNAME=postgres
DB_PASSWORD=makingwarmerhomes
#not used
GOOGLE_SOLAR_API_KEY="test"
SAP_PREDICTIONS_BUCKET="test"
CARBON_PREDICTIONS_BUCKET="test"
HEAT_PREDICTIONS_BUCKET="test"
HEATING_KWH_PREDICTIONS_BUCKET="test"
HOTWATER_KWH_PREDICTIONS_BUCKET="test"
API_KEY="test"
ENVIRONMENT="test"
SECRET_KEY="test"
PLAN_TRIGGER_BUCKET="test"
DATA_BUCKET="test"
EPC_AUTH_TOKEN="test"
ENGINE_SQS_URL="test"
ENERGY_ASSESSMENTS_BUCKET="test"

View file

@ -84,6 +84,7 @@ class Property:
uprn=None, # Pass as an optional input
property_valuation=None,
already_installed=None,
find_my_epc_components=None,
non_invasive_recommendations=None,
measures=None,
energy_assessment=None,
@ -114,6 +115,7 @@ class Property:
non_invasive_recommendations['recommendations'] if
non_invasive_recommendations else []
)
self.find_my_epc_components = find_my_epc_components # Store the find my epc components
# This is a list of measures that have been recommended for the property
if isinstance(measures, list):
self.measures = measures
@ -551,7 +553,7 @@ class Property:
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
"cylinder_thermostat", "loft_insulation", "room_roof_insulation", "flat_roof_insulation",
"solid_floor_insulation", "suspended_floor_insulation", "mixed_glazing",
"windows_glazing", "mechanical_ventilation", "solar_pv"
"windows_glazing", "mechanical_ventilation", "solar_pv", "sloping_ceiling_insulation"
]:
# We update the data, as defined in the recommendaton
for prefix in ["walls", "roof", "floor"]:
@ -574,7 +576,7 @@ class Property:
"solid_floor_insulation", "suspended_floor_insulation",
"windows_glazing", "solar_pv", "heating", "hot_water_tank_insulation",
"heating_control", "secondary_heating", "cylinder_thermostat", "mixed_glazing",
"extension_cavity_wall_insulation", "mechanical_ventilation",
"extension_cavity_wall_insulation", "mechanical_ventilation", "sloping_ceiling_insulation"
]:
raise NotImplementedError(
"Implement me, given type %s" % recommendation["type"]

View file

@ -42,7 +42,7 @@ class Settings(BaseSettings):
AWS_DEFAULT_REGION: Optional[str] = None
class Config:
env_file = "backend/.env"
env_file = "backend/.env.local"
@lru_cache()

View file

@ -3,7 +3,9 @@ from contextlib import contextmanager
from backend.app.config import get_settings
from sqlmodel import Session
connection_string = "postgresql+{drivername}://{username}:{password}@{server}:{port}/{dbname}"
connection_string = (
"postgresql+{drivername}://{username}:{password}@{server}:{port}/{dbname}"
)
db_string = connection_string.format(
drivername="psycopg2", # You'll need to use psycopg2 driver for PostgreSQL
username=get_settings().DB_USERNAME,
@ -28,7 +30,9 @@ db_engine = create_engine(
def get_db_session():
if db_engine is None:
raise RuntimeError("Database is not configured. Set DATABASE_URL in environment variables.")
raise RuntimeError(
"Database is not configured. Set DATABASE_URL in environment variables."
)
return Session(db_engine)

View file

@ -0,0 +1,12 @@
from typing import List
from sqlalchemy import insert, delete
from sqlalchemy.orm import Session
from backend.app.db.connection import db_session, db_read_session
from backend.app.db.models.condition import PropertyConditionSurveyModel
def bulk_insert_property_surveys(
session: Session, surveys: List[PropertyConditionSurveyModel]
) -> None:
raise NotImplementedError

View file

@ -0,0 +1,97 @@
from sqlalchemy import (
BigInteger,
Column,
Date,
ForeignKey,
Integer,
String,
Enum as SqlEnum,
)
from sqlalchemy.orm import declarative_base, relationship
from backend.condition.domain.aspect_type import AspectType
from backend.condition.domain.element_type import ElementType
Base = declarative_base()
ElementTypeDb = SqlEnum(
ElementType,
name="element_type",
native_enum=True,
values_callable=lambda enum: [e.value for e in enum],
)
AspectTypeDb = SqlEnum(
AspectType,
name="aspect_type",
native_enum=True,
values_callable=lambda enum: [a.value for a in enum],
)
class PropertyConditionSurveyModel(Base):
__tablename__ = "property_condition_survey"
id = Column(BigInteger, primary_key=True, autoincrement=True)
uprn = Column(BigInteger, nullable=False)
date = Column(Date, nullable=False)
source = Column(String, nullable=False)
elements = relationship(
"ElementModel",
back_populates="survey",
cascade="all, delete-orphan",
)
class ElementModel(Base):
__tablename__ = "element" # TODO: rename to survey_element?
id = Column(BigInteger, primary_key=True, autoincrement=True)
survey_id = Column(
BigInteger,
ForeignKey("property_condition_survey.id"),
nullable=False,
)
element_type = Column(ElementTypeDb, nullable=False)
element_instance = Column(BigInteger, nullable=False)
survey = relationship(
"PropertyConditionSurveyModel",
back_populates="elements",
)
aspect_conditions = relationship(
"AspectConditionModel",
back_populates="element",
cascade="all, delete-orphan",
)
class AspectConditionModel(Base):
__tablename__ = "aspect_condition" # TODO: rename to survey_aspect?
id = Column(BigInteger, primary_key=True, autoincrement=True)
element_id = Column(
BigInteger,
ForeignKey("element.id"),
nullable=False,
)
aspect_type = Column(AspectTypeDb, nullable=False)
aspect_instance = Column(BigInteger, nullable=False)
value = Column(String)
quantity = Column(Integer)
install_date = Column(Date)
renewal_year = Column(Integer)
comments = Column(String)
element = relationship(
"ElementModel",
back_populates="aspect_conditions",
)

View file

@ -9,7 +9,9 @@ TYPICAL_MEASURE_TYPES = [
]
WALL_INSULATION_MEASURES = ["internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"]
ROOF_INSULATION_MEASURES = ["loft_insulation", "flat_roof_insulation", "room_roof_insulation"]
ROOF_INSULATION_MEASURES = [
"loft_insulation", "flat_roof_insulation", "room_roof_insulation", "sloping_ceiling_insulation"
]
# Both all and roof insulaiton measures are eligible for ECO4. These are the remaining fabric and heating measures
# This is based on th measures we have recommendations for
@ -31,7 +33,7 @@ SPECIFIC_MEASURES = (
INSULATION_MEASURES = [
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
"loft_insulation", "flat_roof_insulation", "room_roof_insulation",
"loft_insulation", "flat_roof_insulation", "room_roof_insulation", "sloping_ceiling_insulation",
"suspended_floor_insulation", "solid_floor_insulation",
]
@ -46,7 +48,9 @@ MEASURE_MAP = {
"wall_insulation": [
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
],
"roof_insulation": ["loft_insulation", "flat_roof_insulation", "room_roof_insulation"],
"roof_insulation": [
"loft_insulation", "flat_roof_insulation", "room_roof_insulation", "sloping_ceiling_insulation"
],
"floor_insulation": ["suspended_floor_insulation", "solid_floor_insulation"],
"heating": ["boiler_upgrade", "high_heat_retention_storage_heaters", "air_source_heat_pump"],
"windows": ["double_glazing", "secondary_glazing"],

View file

@ -20,7 +20,7 @@ The processor currently supports file formats provided by **Peabody** and **LBWF
The `local_runner` script allows the processor to be executed in a local environment.
1. Copy a sample input file into the `sample_data/` directory.
1. Copy sample input file(s) into the `sample_data/` directory. If working with Peabody data, you'll need the Landlord Reference / UPRN lookup file as well.
2. Update `local_runner.py` as required, specifically the definitions of:
- `lbwf_path`
- `peabody_path`

View file

@ -21,6 +21,8 @@ def main() -> None:
/ "2026_01_06 - Peabody - Stock Condition Data - Survey Records - D Lower.xlsx"
)
filepaths = [lbwf_path, peabody_path]
# filepaths = [lbwf_path]
# filepaths = [peabody_path]
for fp in filepaths:
with fp.open("rb") as f:

View file

@ -1,4 +1,4 @@
from typing import BinaryIO, Any, Dict, Iterator, List, Tuple
from typing import BinaryIO, Any, Dict, Iterator, List, Optional, Tuple
from openpyxl import Workbook, load_workbook
from collections import defaultdict
@ -15,7 +15,11 @@ logger = setup_logger()
class LbwfParser(Parser):
def parse(self, file_stream: BinaryIO) -> Any:
def parse(
self,
file_stream: BinaryIO,
location_ref_to_uprn_map: Optional[Dict[str, int]] = None,
) -> Any:
wb: Workbook = load_workbook(file_stream)
address_to_uprn_map: Dict[str, int] = LbwfParser._generate_address_to_uprn_dict(
wb

View file

@ -1,8 +1,13 @@
from abc import ABC, abstractmethod
from typing import BinaryIO, Any
from typing import BinaryIO, Any, Dict, Optional
class Parser(ABC):
@abstractmethod
def parse(self, file_stream: BinaryIO) -> Any:
pass
def parse(
self,
file_stream: BinaryIO,
location_ref_to_uprn_map: Optional[Dict[str, int]] = None,
) -> Any:
pass

View file

@ -1,26 +1,55 @@
from typing import Any, BinaryIO, Dict, Iterator, List, Tuple, DefaultDict
import csv
from pathlib import Path
from typing import Any, BinaryIO, Dict, List, Optional, Tuple, DefaultDict
from openpyxl import Workbook, load_workbook
from collections import defaultdict
from backend.condition.parsing.parser import Parser
from backend.condition.parsing.records.peabody.peabody_asset_condition import PeabodyAssetCondition
from backend.condition.parsing.records.peabody.peabody_asset_condition import (
PeabodyAssetCondition,
)
from backend.condition.parsing.records.peabody.peabody_property import PeabodyProperty
from utils.logger import setup_logger
logger = setup_logger()
class PeabodyParser(Parser):
def parse(self, file_stream: BinaryIO) -> Any:
wb: Workbook = load_workbook(file_stream)
address_to_uprn_map: Dict[str, int] = PeabodyParser._generate_address_to_uprn_dict(wb)
assets = self._parse_assets(wb)
return self._group_assets_into_properties(
class PeabodyParser(Parser):
def parse(
self,
file_stream: BinaryIO,
location_ref_to_uprn_map: Optional[Dict[str, int]] = None,
) -> Any:
wb: Workbook = load_workbook(file_stream)
if location_ref_to_uprn_map is None:
location_ref_to_uprn_map: Dict[str, int] = (
PeabodyParser._build_location_ref_to_uprn_map()
)
assets = PeabodyParser._parse_assets(wb)
return PeabodyParser._group_assets_into_properties(
assets=assets,
address_to_uprn_map=address_to_uprn_map,
location_ref_to_uprn_map=location_ref_to_uprn_map,
)
@staticmethod
def _build_location_ref_to_uprn_map() -> Dict[str, int]:
location_ref_to_uprn_filepath: Path = (
Path(__file__).resolve().parents[1]
/ "sample_data"
/ "peabody"
/ "PeabodyPropertymatched_Dec25_propref_UPRN.csv"
)
location_ref_to_uprn_map: Dict[str, int] = {}
with location_ref_to_uprn_filepath.open(newline="") as f:
reader: Any = csv.DictReader(f)
for row in reader:
location_ref_to_uprn_map[row["reference"]] = int(row["out_uprn"])
return location_ref_to_uprn_map
@staticmethod
def _parse_assets(wb: Workbook) -> List[PeabodyAssetCondition]:
@ -33,39 +62,43 @@ class PeabodyParser(Parser):
assets: List[PeabodyAssetCondition] = []
for row in asset_rows:
try:
asset = PeabodyParser._map_row_to_asset_record(row, asset_header_indexes)
asset = PeabodyParser._map_row_to_asset_record(
row, asset_header_indexes
)
if not asset.is_block_level:
# Block-level condition surveys are out of scope for now
# until we have a wider think on how to handle block
assets.append(asset) # TODO: handle block-level assets
# until we have a wider think on how to handle block
assets.append(asset) # TODO: handle block-level assets
except Exception as e:
logger.error(f"Error mapping Peabody row to asset record: {e}")
continue
return assets
@staticmethod
def _group_assets_into_properties(
assets: List[PeabodyAssetCondition],
address_to_uprn_map: Dict[str, int],
location_ref_to_uprn_map: Dict[str, int],
) -> List[PeabodyProperty]:
assets_by_address: DefaultDict[str, List[PeabodyAssetCondition]] = defaultdict(list)
assets_by_location_reference: DefaultDict[str, List[PeabodyAssetCondition]] = (
defaultdict(list)
)
for asset in assets:
if asset.full_address is None:
if asset.lo_reference is None:
continue
address = asset.full_address.strip()
assets_by_address[address].append(asset)
assets_by_location_reference[asset.lo_reference].append(asset)
properties: List[PeabodyProperty] = []
for address, grouped_assets in assets_by_address.items():
uprn = address_to_uprn_map.get(address)
for location_ref, grouped_assets in assets_by_location_reference.items():
uprn = location_ref_to_uprn_map.get(location_ref)
if uprn is None:
logger.warning(f"No UPRN found for address: {address}")
logger.warning(f"No UPRN found for Location Reference: {location_ref}")
continue
properties.append(
@ -77,7 +110,6 @@ class PeabodyParser(Parser):
return properties
@staticmethod
def _map_row_to_asset_record(
row: Any | Tuple[object | None, ...],
@ -102,39 +134,9 @@ class PeabodyParser(Parser):
condition_survey_date=row[header_indexes["condition_survey_date"]],
)
@staticmethod
def _generate_address_to_uprn_dict(wb: Workbook) -> Dict[str, int | None]:
sheet = wb["Survey Records - D & Lower"]
rows: Iterator[Tuple[object | None, ...]] = sheet.iter_rows(values_only=True)
headers = next(rows)
header_indexes: Dict[str, int] = PeabodyParser._get_column_indexes_by_name(headers)
address_idx = header_indexes["full_address"]
address_to_uprn: Dict[str, int] = {}
# Generate random UPRNs for now
next_uprn = 1 # TODO: get real UPRNs
for row in rows:
address = row[address_idx]
if address is None:
continue
address = address.strip()
if address not in address_to_uprn:
address_to_uprn[address] = next_uprn
next_uprn += 1
return address_to_uprn
@staticmethod
def _get_column_indexes_by_name(
headers: Tuple[object | None, ...]
headers: Tuple[object | None, ...],
) -> Dict[str, int]:
index: Dict[str, int] = {}
@ -142,4 +144,4 @@ class PeabodyParser(Parser):
if isinstance(header, str):
index[header] = i
return index
return index

View file

@ -0,0 +1,86 @@
import time
from typing import List, Optional
from sqlmodel import Session
from utils.logger import setup_logger
from backend.app.db.models.condition import (
AspectConditionModel,
ElementModel,
PropertyConditionSurveyModel,
)
from backend.condition.domain.property_condition_survey import PropertyConditionSurvey
from backend.app.db.connection import db_session
logger = setup_logger()
class ConditionPostgres:
def bulk_insert_surveys(
self, surveys: List[PropertyConditionSurvey], batch_size: Optional[int] = 100
) -> None:
logger.info(
f"Preparing to load {len(surveys)} property surveys to Postgres. Mapping to SQLModel objects..."
)
survey_models: List[PropertyConditionSurveyModel] = [
ConditionPostgres.map_survey_to_model(s) for s in surveys
]
total: int = len(survey_models)
logger.info(
f"Finished mapping {total} surveys. Writing to database in batches of {batch_size}..."
)
with db_session() as session:
for start in range(0, total, batch_size):
end = min(start + batch_size, total)
batch = survey_models[start:end]
t0: float = time.perf_counter()
ConditionPostgres._insert_surveys_batch(batch, session)
elapsed: float = time.perf_counter() - t0
logger.info(
f"Inserted batch {start} - {end} ({len(batch)} surveys) in {elapsed} seconds",
)
@staticmethod
def map_survey_to_model(
survey: PropertyConditionSurvey,
) -> PropertyConditionSurveyModel:
survey_model = PropertyConditionSurveyModel(
uprn=survey.uprn,
date=survey.date,
source=survey.source,
elements=[],
)
for element in survey.elements:
element_model = ElementModel(
element_type=element.element_type,
element_instance=element.element_instance,
aspect_conditions=[],
)
for aspect in element.aspect_conditions:
aspect_model = AspectConditionModel(
aspect_type=aspect.aspect_type,
aspect_instance=aspect.aspect_instance,
value=aspect.value,
quantity=aspect.quantity,
install_date=aspect.install_date,
renewal_year=aspect.renewal_year,
comments=aspect.comments,
)
element_model.aspect_conditions.append(aspect_model)
survey_model.elements.append(element_model)
return survey_model
@staticmethod
def _insert_surveys_batch(
surveys: List[PropertyConditionSurveyModel], session: Session
) -> None:
session.add_all(surveys)
session.commit()

View file

@ -1,25 +1,33 @@
from typing import Any, BinaryIO, List
from datetime import datetime
from utils.logger import setup_logger
from backend.condition.domain.mapping.mapper import Mapper
from backend.condition.domain.property_condition_survey import PropertyConditionSurvey
from backend.condition.parsing.parser import Parser
from utils.logger import setup_logger
from backend.condition.persistence.condition_postgres import ConditionPostgres
from backend.condition.file_type import FileType, detect_file_type
from backend.condition.parsing.factory import select_parser, select_mapper
logger = setup_logger()
def process_file(file_stream: BinaryIO, source_key: str) -> None:
print(f"[processor] Received file: {source_key}")
logger.info(f"[processor] Received file: {source_key}")
# Instantiation
file_type: FileType = detect_file_type(source_key)
parser: Parser = select_parser(file_type)
mapper: Mapper = select_mapper(file_type)
persistence = ConditionPostgres()
# Orchestration
raw_properties: List[Any] = parser.parse(file_stream)
logger.info(
f"[processor] Finished loading customer survey data for {len(raw_properties)} properties. Mapping..."
)
survey_year = datetime.now().year # TODO: get this from filepath or elsewhere
property_condition_surveys: List[PropertyConditionSurvey] = []
@ -29,4 +37,10 @@ def process_file(file_stream: BinaryIO, source_key: str) -> None:
mapper.map_asset_conditions_for_property(p, survey_year)
)
print("done") # temp
logger.info(
f"[processor] Finished mapping {len(property_condition_surveys)} properties. Writing to database..."
)
persistence.bulk_insert_surveys(property_condition_surveys)
logger.info(f"[processor] Finished loading surveys to database")

View file

@ -1,3 +1,4 @@
from backend.app.db.models.condition import PropertyConditionSurveyModel
from backend.condition.domain.property_condition_survey import PropertyConditionSurvey
@ -72,3 +73,41 @@ class CustomAsserts:
f"{actual_aspect.comments} != {expected_aspect.comments}"
)
return True
def assert_property_condition_survey_model_matches_expected(
actual_model: PropertyConditionSurveyModel,
expected: dict,
) -> None:
assert actual_model.uprn == expected["uprn"], "UPRN differs"
assert actual_model.date == expected["date"], "Date differs"
assert actual_model.source == expected["source"], "Source differs"
assert len(actual_model.elements) == len(expected["elements"]), (
f"Expected {len(expected['elements'])} elements, "
f"got {len(actual_model.elements)}"
)
for i, (actual_element, expected_element) in enumerate(
zip(actual_model.elements, expected["elements"])
):
assert (
actual_element.element_type == expected_element["element_type"]
), f"Element[{i}].element_type differs"
assert (
actual_element.element_instance == expected_element["element_instance"]
), f"Element[{i}].element_instance differs"
assert len(actual_element.aspect_conditions) == len(
expected_element["aspects"]
), f"Element[{i}] aspect count differs"
for j, (actual_aspect, expected_aspect) in enumerate(
zip(actual_element.aspect_conditions, expected_element["aspects"])
):
prefix = f"Element[{i}].Aspect[{j}]"
for key, value in expected_aspect.items():
assert getattr(actual_aspect, key) == value, (
f"{prefix}.{key} differs: "
f"{getattr(actual_aspect, key)} != {value}"
)

View file

@ -1,127 +1,141 @@
import pytest
from typing import Any
from typing import Any, Dict
from io import BytesIO
from openpyxl import Workbook
from datetime import datetime
from backend.condition.parsing.peabody_parser import PeabodyParser
from backend.condition.parsing.records.peabody.peabody_asset_condition import PeabodyAssetCondition
from backend.condition.parsing.records.peabody.peabody_asset_condition import (
PeabodyAssetCondition,
)
from backend.condition.parsing.records.peabody.peabody_property import PeabodyProperty
@pytest.fixture
def peabody_assets_xlsx_bytes() -> BytesIO:
wb = Workbook()
survey_records_d_and_lower = wb.active
survey_records_d_and_lower.title = "Survey Records - D & Lower"
survey_records_d_and_lower.append([
"Lo_Reference",
"full_address",
"location_type_code",
"Parent_Lo_Reference",
"Element_Code",
"Element",
"Sub_Element_Code",
"Sub_Element",
"Material_Code",
"material_or_answer",
"Renewal_Quantity",
"Renewal_Year",
"Renewal_Cost",
"cloned",
"lo_type_code",
"condition_survey_date",
])
survey_records_d_and_lower.append([
"B000RAND",
"1 RANDOM HOUSE LONDON",
3,
"RAND2EST",
110,
"ROOFS",
1,
"Primary Roof",
9,
"Other",
3,
2054,
330,
"N",
3,
datetime(2025,12,4,9,17,0)
])
survey_records_d_and_lower.append([
"B000BLOCK",
"1100 BLOCK",
3,
"RAND2EST",
110,
"ROOFS",
1,
"Primary Roof",
9,
"Other",
3,
2054,
330,
"N",
3,
datetime(2025,12,4,9,17,0)
])
survey_records_d_and_lower.append([
"B000FAKE",
"3 FAKE CLOSE LONDON",
3,
"FAKEEST",
100,
"GENERAL",
15,
"External Decoration",
2,
"Normal",
1,
2035,
1500.7,
"N",
3,
datetime(2025,7,5,0,0,0)
])
survey_records_d_and_lower.append([
"B000MIS",
"99 MISC ROAD LONDON",
3,
"300828",
54,
"HHSRS",
29,
"HHSRS Structural Collapse & Falling Elements",
4,
"HHSRS Moderate",
2,
2027,
None,
"N",
3,
None
])
survey_records_d_and_lower.append([
"B000MIS",
"99 MISC ROAD LONDON",
3,
"300828",
53,
"External",
2,
"Chimney",
2,
"Present",
33,
2053,
3531,
"N",
3,
None
])
survey_records_d_and_lower.append(
[
"Lo_Reference",
"full_address",
"location_type_code",
"Parent_Lo_Reference",
"Element_Code",
"Element",
"Sub_Element_Code",
"Sub_Element",
"Material_Code",
"material_or_answer",
"Renewal_Quantity",
"Renewal_Year",
"Renewal_Cost",
"cloned",
"lo_type_code",
"condition_survey_date",
]
)
survey_records_d_and_lower.append(
[
"B000RAND",
"1 RANDOM HOUSE LONDON",
3,
"RAND2EST",
110,
"ROOFS",
1,
"Primary Roof",
9,
"Other",
3,
2054,
330,
"N",
3,
datetime(2025, 12, 4, 9, 17, 0),
]
)
survey_records_d_and_lower.append(
[
"B000BLOCK",
"1100 BLOCK",
3,
"RAND2EST",
110,
"ROOFS",
1,
"Primary Roof",
9,
"Other",
3,
2054,
330,
"N",
3,
datetime(2025, 12, 4, 9, 17, 0),
]
)
survey_records_d_and_lower.append(
[
"B000FAKE",
"3 FAKE CLOSE LONDON",
3,
"FAKEEST",
100,
"GENERAL",
15,
"External Decoration",
2,
"Normal",
1,
2035,
1500.7,
"N",
3,
datetime(2025, 7, 5, 0, 0, 0),
]
)
survey_records_d_and_lower.append(
[
"B000MIS",
"99 MISC ROAD LONDON",
3,
"300828",
54,
"HHSRS",
29,
"HHSRS Structural Collapse & Falling Elements",
4,
"HHSRS Moderate",
2,
2027,
None,
"N",
3,
None,
]
)
survey_records_d_and_lower.append(
[
"B000MIS",
"99 MISC ROAD LONDON",
3,
"300828",
53,
"External",
2,
"Chimney",
2,
"Present",
33,
2053,
3531,
"N",
3,
None,
]
)
stream = BytesIO()
wb.save(stream)
@ -129,18 +143,32 @@ def peabody_assets_xlsx_bytes() -> BytesIO:
return stream
def test_peabody_parser_parses_conditions(peabody_assets_xlsx_bytes):
@pytest.fixture
def location_ref_to_uprn_map() -> Dict[str, int]:
return {
"B000RAND": 1,
"B000BLOCK": 2,
"B000FAKE": 3,
"B000MIS": 4,
}
def test_peabody_parser_parses_conditions(
peabody_assets_xlsx_bytes, location_ref_to_uprn_map
):
# arrange
parser = PeabodyParser()
# act
result: Any = parser.parse(peabody_assets_xlsx_bytes)
result: Any = parser.parse(peabody_assets_xlsx_bytes, location_ref_to_uprn_map)
# assert
assert len(result) == 3
assert all(isinstance(item, PeabodyProperty) for item in result)
@pytest.fixture
def asset_condition_factory():
def _factory(full_address: str) -> PeabodyAssetCondition:
@ -165,6 +193,7 @@ def asset_condition_factory():
return _factory
@pytest.mark.parametrize(
"full_address, expected_block_level",
[
@ -175,7 +204,7 @@ def asset_condition_factory():
("81A-B GORE ROAD LONDON", True),
("73 & 74 HARVEST COURT ST. ALBANS", True),
("25 HAVERSHAM COURT GREENFORD", False),
("FLAT 10 SPARROW COURT SOUTHMERE DRIVE LONDON SE2 9ES", False)
("FLAT 10 SPARROW COURT SOUTHMERE DRIVE LONDON SE2 9ES", False),
],
)
def test_peabody_asset_is_block_level(
@ -187,4 +216,4 @@ def test_peabody_asset_is_block_level(
asset_condition = asset_condition_factory(full_address)
# act + assert
assert asset_condition.is_block_level == expected_block_level
assert asset_condition.is_block_level == expected_block_level

View file

@ -0,0 +1,164 @@
import pytest
from datetime import date
from backend.condition.persistence.condition_postgres import ConditionPostgres
from backend.condition.domain.property_condition_survey import PropertyConditionSurvey
from backend.condition.domain.element import Element
from backend.condition.domain.element_type import ElementType
from backend.condition.domain.aspect_condition import AspectCondition
from backend.condition.domain.aspect_type import AspectType
from backend.app.db.models.condition import PropertyConditionSurveyModel
from backend.condition.tests.custom_asserts import CustomAsserts
def test_map_survey_to_model() -> None:
# arrange
survey = PropertyConditionSurvey(
uprn=1,
elements=[
Element(
element_type=ElementType.EXTERNAL_WINDOWS,
element_instance=1,
aspect_conditions=[
AspectCondition(
aspect_type=AspectType.MATERIAL,
aspect_instance=1,
value="UPVC Double Glazed",
quantity=8,
install_date=None,
renewal_year=2036,
comments=None,
),
],
),
Element(
element_type=ElementType.EXTERNAL_DECORATION,
element_instance=1,
aspect_conditions=[
AspectCondition(
aspect_type=AspectType.CONDITION,
aspect_instance=1,
value="Normal",
quantity=1,
install_date=None,
renewal_year=2029,
comments=None,
)
],
),
Element(
element_type=ElementType.EXTERNAL_WALL,
element_instance=1,
aspect_conditions=[
AspectCondition(
aspect_type=AspectType.FINISH,
aspect_instance=1,
value="Pointed",
quantity=65,
install_date=None,
renewal_year=2045,
comments=None,
),
AspectCondition(
aspect_type=AspectType.FINISH,
aspect_instance=1,
value="Pointing",
quantity=1,
install_date=None,
renewal_year=2069,
comments=None,
),
AspectCondition(
aspect_type=AspectType.FINISH,
aspect_instance=2,
value="Tile Hung",
quantity=8,
install_date=None,
renewal_year=2049,
comments=None,
),
],
),
],
date=date(2000, 1, 1),
source="Peabody",
)
expected = {
"uprn": 1,
"date": date(2000, 1, 1),
"source": "Peabody",
"elements": [
{
"element_type": ElementType.EXTERNAL_WINDOWS,
"element_instance": 1,
"aspects": [
{
"aspect_type": AspectType.MATERIAL,
"aspect_instance": 1,
"value": "UPVC Double Glazed",
"quantity": 8,
"install_date": None,
"renewal_year": 2036,
"comments": None,
}
],
},
{
"element_type": ElementType.EXTERNAL_DECORATION,
"element_instance": 1,
"aspects": [
{
"aspect_type": AspectType.CONDITION,
"aspect_instance": 1,
"value": "Normal",
"quantity": 1,
"install_date": None,
"renewal_year": 2029,
"comments": None,
}
],
},
{
"element_type": ElementType.EXTERNAL_WALL,
"element_instance": 1,
"aspects": [
{
"aspect_instance": 1,
"value": "Pointed",
"quantity": 65,
"install_date": None,
"renewal_year": 2045,
"comments": None,
},
{
"aspect_type": AspectType.FINISH,
"aspect_instance": 1,
"value": "Pointing",
"quantity": 1,
"install_date": None,
"renewal_year": 2069,
"comments": None,
},
{
"aspect_type": AspectType.FINISH,
"aspect_instance": 2,
"value": "Tile Hung",
"quantity": 8,
"install_date": None,
"renewal_year": 2049,
"comments": None,
},
],
},
],
}
# act
model: PropertyConditionSurveyModel = ConditionPostgres.map_survey_to_model(survey)
# assert (survey level)
CustomAsserts.assert_property_condition_survey_model_matches_expected(
model,
expected,
)

View file

@ -796,9 +796,9 @@ async def model_engine(body: PlanTriggerRequest):
property_non_invasive_recommendations, patch = req_data.non_invasive_recommendations, req_data.patch
# if we have a remote assment data type, we pull the additional data and include it
epc_page_source = {}
epc_page_source, find_my_epc_components = {}, []
if (body.event_type == "remote_assessment") and not (epc_searcher.newest_epc.get("estimated")):
property_non_invasive_recommendations, patch, epc_page_source = (
property_non_invasive_recommendations, patch, epc_page_source, find_my_epc_components = (
RetrieveFindMyEpc.get_from_epc_with_fallback(
epc=epc_searcher.newest_epc,
epc_page=epc_page,
@ -834,6 +834,7 @@ async def model_engine(body: PlanTriggerRequest):
postcode=epc_searcher.postcode_clean,
epc_record=prepared_epc,
already_installed=property_already_installed + eco_packages.get(property_id)[3],
find_my_epc_components=find_my_epc_components,
property_valuation=req_data.valuation,
non_invasive_recommendations=property_non_invasive_recommendations,
energy_assessment=energy_assessment,
@ -1050,11 +1051,14 @@ async def model_engine(body: PlanTriggerRequest):
property_required_measures = [m for m in recommendations[p.id] if m[0]["type"] in body.required_measures]
measures_to_optimise = [m for m in recommendations[p.id] if m[0]["type"] not in body.required_measures]
ventilation_included = "ventilation" in property_measure_types
# If a measure requiring ventilation is selected, and the property does not have ventilation, we enfore
# its inclusion
needs_ventilation = any(
x in property_measure_types for x in assumptions.measures_needing_ventilation
) and not p.has_ventilation
) and not p.has_ventilation and ventilation_included
if not measures_to_optimise:
# Nothing to do, we just reshape the recommendations

View file

@ -36,6 +36,8 @@ class RetrieveFindMyEpc:
self.rrn = rrn
self.address_cleaned = self.address.replace(",", "").replace(" ", "").lower()
# Containers for the extracted components
self.walls = []
self.address_postal_town = address_postal_town
@ -256,12 +258,10 @@ class RetrieveFindMyEpc:
property_features_table = soup.find("tbody", class_="govuk-table__body")
property_features_table = property_features_table.find_all("tr")
# Extract wall types
self.walls = []
for row in property_features_table:
cells = row.find_all("td")
if row.find("th").text.strip() == "Wall":
self.walls.append(cells[0].text.strip())
property_components = self.extract_property_components(property_features_table)
# Extract walls
self.walls = [x["description"] for x in property_components if x["component_name"] == "Wall"]
# Finally, we format the recommendations
recommendations = self.format_recommendations(recommendations, assessment_data, sap_2012_date)
@ -424,6 +424,37 @@ class RetrieveFindMyEpc:
return chosen_epc, epc_certificate
@staticmethod
def extract_property_components(property_features_table: list):
"""
Function to pull out a table for property components, marking their appearance index
:param property_features_table: The table of property features, as extracted by BeautifulSoup
:return: List of property components with appearance index
"""
property_components = []
for row in property_features_table:
cells = row.find_all("td")
component_name = row.find("th").text.strip()
property_components.append(
{
"component_name": component_name,
"description": cells[0].text.strip(),
"efficiency": cells[1].text.strip(),
}
)
# Add an appearance index, which will indicate if the component appears multiple times, so this
# becomes a reference for the building part the component is associated to (main, extensions, etc)
# We want to inject this appearance index into the component dictionaries
component_count = {}
for component in property_components:
name = component['component_name']
if name not in component_count:
component_count[name] = 0
component['appearance_index'] = component_count[name]
component_count[name] += 1
return property_components
def retrieve_newest_find_my_epc_data(
self, sap_2012_date=None, return_page=False, epc_page_source=None, rrn=None
):
@ -577,12 +608,10 @@ class RetrieveFindMyEpc:
property_features_table = address_res.find("tbody", class_="govuk-table__body")
property_features_table = property_features_table.find_all("tr")
# Extract wall types
self.walls = []
for row in property_features_table:
cells = row.find_all("td")
if row.find("th").text.strip() == "Wall":
self.walls.append(cells[0].text.strip())
property_components = self.extract_property_components(property_features_table)
# Extract walls
self.walls = [x["description"] for x in property_components if x["component_name"] == "Wall"]
# Finally, we format the recommendations
recommendations = self.format_recommendations(recommendations, assessment_data, sap_2012_date)
@ -615,6 +644,7 @@ class RetrieveFindMyEpc:
"heating_text": heating_text,
"hot_water_text": hot_water_text,
"recommendations": recommendations,
"property_components": property_components,
"epc_data": epc_data,
**assessment_data,
**low_carbon_energy_sources,
@ -665,7 +695,7 @@ class RetrieveFindMyEpc:
],
"Change heating to gas condensing boiler": ["boiler_upgrade"],
"Fan assisted storage heaters and dual immersion cylinder": ["high_heat_retention_storage_heaters"],
"Flat roof or sloping ceiling insulation": ["flat_roof_insulation"],
"Flat roof or sloping ceiling insulation": ["flat_roof_insulation", "sloping_ceiling_insulation"],
"Heating controls (room thermostat)": [
"roomstat_programmer_trvs", "time_temperature_zone_control"
],
@ -804,7 +834,9 @@ class RetrieveFindMyEpc:
"page_source": find_epc_data.get("page_source")
}
return non_invasive_recommendations, patch, page_source
property_components = find_epc_data.get("property_components", [])
return non_invasive_recommendations, patch, page_source, property_components
@classmethod
def get_from_epc_with_fallback(

View file

@ -1,4 +1,6 @@
from typing import Mapping, Any
import numpy as np
from recommendations.county_to_region import county_to_region_map
from utils.logger import setup_logger
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
@ -160,6 +162,14 @@ class Costs:
"low_energy_lighting": 0.26,
"high_heat_retention_storage_heaters": 0.1,
"windows_glazing": 0.15,
"boiler_upgrade": 0.26,
"time_and_temperature_zone_control": 0.1,
"roomstat_programmer_trvs": 0.1,
"room_roof_insulation": 0.26,
"heater_removal": 0.1,
"sealing_open_fireplace": 0.1,
"mechanical_ventilation": 0.26,
"sloping_ceiling_insulation": 0.26 # Similar to IWI so using the same contingency
}
# Preliminaries are a percentage of the total cost of the work and covers the cost of site-specific costs
@ -664,10 +674,12 @@ class Costs:
subtotal_before_vat = total_cost / (1 + self.VAT_RATE)
vat = total_cost - subtotal_before_vat
contingency_rate = self.CONTINGENCIES["roomstat_programmer_trvs"]
return {
"total": total_cost,
"contingency": total_cost * self.CONTINGENCY,
"contingency_rate": self.CONTINGENCY,
"contingency": total_cost * contingency_rate,
"contingency_rate": contingency_rate,
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": labour_hours,
@ -698,10 +710,12 @@ class Costs:
labour_days = np.ceil(labour_hours / 8)
contingency_rate = self.CONTINGENCIES["time_and_temperature_zone_control"]
return {
"total": total_cost,
"contingency": total_cost * self.CONTINGENCY,
"contingency_rate": self.CONTINGENCY,
"contingency": total_cost * contingency_rate,
"contingency_rate": contingency_rate,
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": labour_hours,
@ -752,10 +766,12 @@ class Costs:
subtotal_before_vat = removal_cost
total_cost = subtotal_before_vat + vat
contingency_rate = self.CONTINGENCIES["heater_removal"]
return {
"total": total_cost,
"contingency": total_cost * self.CONTINGENCY,
"contingency_rate": self.CONTINGENCY,
"contingency": total_cost * contingency_rate,
"contingency_rate": contingency_rate,
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": removal_labour_hours,
@ -858,10 +874,12 @@ class Costs:
subtotal_before_vat += system_change_cost_before_vat
vat += system_change_vat
contingency_rate = self.CONTINGENCIES["boiler_upgrade"]
return {
"total": total_cost,
"contingency": total_cost * self.CONTINGENCY,
"contingency_rate": self.CONTINGENCY,
"contingency": total_cost * contingency_rate,
"contingency_rate": contingency_rate,
"subtotal": subtotal_before_vat,
"vat": vat,
"labour_hours": labour_hours,
@ -920,3 +938,70 @@ class Costs:
"labour_hours": 80,
"labour_days": 10,
}
@staticmethod
def _estimate_number_of_days_for_sloping_ceiling(insulation_roof_area: float) -> float:
"""
Estimate labour days required to insulate an existing sloping ceiling.
Heuristic model based on retrofit guidance (Checkatrade, The Green Age)
and analogy with internal wall insulation.
See _estimate_number_of_days_for_solid_floor for detailed explanation regarding assumptions
and methodology, however for the purpose of placeholder, this function mimics the approach
to that method but is detached to allow for future changes
Assumptions:
- ~30 of sloping ceiling takes ~4 working days
- Small jobs still require multiple days (setup, stripping, reboarding)
- Larger areas benefit from economies of scale, but not linearly
:param insulation_roof_area: of sloping ceiling to be insulated
"""
base_days = 4
base_area = 30 # m2 reference case
labour_exponent = 0.85
min_days = 2
labour_days = max(
min_days,
base_days * (insulation_roof_area / base_area) ** labour_exponent
)
return labour_days
@classmethod
def sloping_ceiling_insulation(cls, insulation_roof_area: float) -> Mapping[str, float]:
"""
This costing for this is based on Checkatrade desktop research, since we are yet to receive installer quotes.
:param insulation_roof_area: Area of the sloping ceiling to be insulated
:return:
"""
################
# Assumptions
################
# Sources:
# https://www.checkatrade.com/blog/cost-guides/vaulted-ceiling-cost/
# https://www.thegreenage.co.uk/can-i-insulate-my-sloping-ceiling/
# These assumptions last updated 21/02/2026
insulation_cost_per_m2 = 52 # The actual install process is quite similar to IWI
labour_rate = 250 # per day
contingency_rate = cls.CONTINGENCIES["sloping_ceiling_insulation"]
labour_days = cls._estimate_number_of_days_for_sloping_ceiling(insulation_roof_area)
labour_hours = labour_days * 8
total = (insulation_cost_per_m2 * insulation_roof_area) + (labour_rate * labour_days)
# Assume VAT included in the total => total is 120% of subtotal
vat = total - (total / 1.2)
return {
"total": float(total),
"contingency": float(total * contingency_rate),
"contingency_rate": contingency_rate,
"vat": float(vat),
"labour_hours": float(labour_hours),
"labour_days": float(labour_days),
}

View file

@ -2,7 +2,7 @@ import math
import pandas as pd
from backend.Property import Property
from backend.app.plan.schemas import MEASURE_MAP
from typing import List
from typing import List, Mapping, Any
from datatypes.enums import QuantityUnits
from recommendations.recommendation_utils import (
get_roof_u_value, r_value_per_mm_to_u_value, calculate_u_value_uplift, is_diminishing_returns,
@ -11,6 +11,7 @@ from recommendations.recommendation_utils import (
)
from recommendations.Costs import Costs
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
from backend.app.plan.schemas import ROOF_INSULATION_MEASURES
class RoofRecommendations:
@ -119,41 +120,377 @@ class RoofRecommendations:
return (full_insulated_room_roof or room_roof_insulated_at_rafters) and not has_non_invasive_recommendation
def recommend(self, phase, measures=None, default_u_values=False):
@staticmethod
def is_sloping_ceiling_appropriate(
is_pitched: bool,
is_loft: bool,
is_assumed: bool,
is_flat: bool,
has_sloping_ceiling_recommendation: bool,
primary_roof_looks_sloped: bool,
insulation_thickness: str,
has_loft_insulation_recommendation: bool
) -> bool:
"""
:param is_pitched: Boolean - indicates whether or not the roof is pitched
:param is_flat: Boolean - indicates whether or not the roof is flat
:param is_loft: Boolean - indicates whether or not the roof is described as a loft
:param is_assumed: Boolean - indiates if the assessment of the roof is assumed or actually confirmed
:param has_sloping_ceiling_recommendation: Boolean - indicates if the property has a sloping ceiling
recommendation
:param primary_roof_looks_sloped: Boolean - indicates if the primary room is described a sloped (as opposed to
an extension)
:param insulation_thickness: String - insulation thickness of the roof
:param has_loft_insulation_recommendation: Boolean - indicates whether or not there
:return:
"""
# We need to check:
# 1) If the property has a pitched roof
# 2) Does it have a recommendation for sloping ceiling
# 3) Is the insulation status NOT assumed
# 4) Is there a sloping ceiling recommendation (this may relate to the primary or secondary roof)
# If we have a loft primary roof and sloping ceiling
has_suitable_features = (
is_pitched and not is_loft and not is_assumed and primary_roof_looks_sloped
)
# Check if it needs a recommendation
needs_recommendation_condition1 = has_sloping_ceiling_recommendation | (
insulation_thickness in ["below average"]
)
needs_recommendation_condition2 = has_sloping_ceiling_recommendation & (
insulation_thickness in ["none"]
)
# If the insulation thickness is 'none' this isn't alone conclusive for us to determine if it's
# a sloped ceiling
needs_recommendation = needs_recommendation_condition1 | needs_recommendation_condition2
# The property is pitched, not a loft, not assumed and has a sloping ceiling rec
if has_suitable_features and needs_recommendation:
return True
# In this case, we have an assumed pitched roof with average or below average insulation
# but a sloping ceiling insulation without loft
if has_sloping_ceiling_recommendation and not has_loft_insulation_recommendation and not is_flat:
return True
return False
@staticmethod
def is_loft_insulation_appropriate(
measures: List,
is_pitched: bool,
is_at_rafters: bool,
rir_over_loft: bool,
is_assumed: bool,
insulation_thickness: str,
has_loft_insulation_recommendation: bool,
has_sloping_ceiling_recommendation: bool
) -> bool:
"""
Determine if loft insulation is appropriate
:param measures: List - list of measures
:param is_pitched: Boolean - indicates whether or not the roof is pitched
:param is_at_rafters: Boolean - indicates whether or not the loft insulation is at rafters
:param rir_over_loft: Boolean - indicates whether or not there we should be doing RIR insulation
:param is_assumed: Boolean - indicates whether or not the roof insulation status is assumed
:param insulation_thickness: String - insulation thickness of the roof
:param has_loft_insulation_recommendation: Boolean - indicates whether or not there
is a loft insulation non-invasive recommendation
:param has_sloping_ceiling_recommendation: Boolean - indicates whether or not there
is a sloping ceiling non-invasive recommendation
:return:
"""
has_li_in_measures = "loft_insulation" in measures
# Key business logic:
# If we have a pitched roof, no insulation, it's not assumed and we have a sloping ceiling recommendation,
# we do NOT recommend loft insulation
if is_pitched and not is_assumed and has_sloping_ceiling_recommendation:
return False
# We check the insulation thickness. If it's one of the "average", "below average", "none" values,
if (
is_assumed and is_pitched and insulation_thickness in ["average", "below average", "above average"]
and not has_sloping_ceiling_recommendation and not has_loft_insulation_recommendation
):
# This is a pitched roof, without access to the loft, with unknown insulation status
return True
return has_loft_insulation_recommendation or (
is_pitched and has_li_in_measures and not is_at_rafters
) and not rir_over_loft
@staticmethod
def is_flat_roof_insulation_appropriate(
is_flat: bool, measures: List, has_flat_roof_recommendation: bool, primary_roof_looks_sloped: bool
) -> bool:
"""
Determine if flat roof insulation is appropriate
:param is_flat: Boolean - indicates whether or not the roof is flat
:param measures: List - list of measures
:param has_flat_roof_recommendation: Boolean - indicates whether or not there is a flat roof non-invasive
recommendation
:param primary_roof_looks_sloped: Boolean - indicates if the primary roof looks like a sloped roof
:return: Boolean
When checking if has_flat_roof_recommendation and primary_roof_looks_sloped, we need to check both
conditions. This is because within a default EPC recommendation, the EPC will pair these recommendations
together. Therefore, weneed to ensure the primary roof isn't sloped
"""
flat_roof_in_measures = "flat_roof_insulation" in measures
return (is_flat and flat_roof_in_measures) or (has_flat_roof_recommendation and not primary_roof_looks_sloped)
@staticmethod
def is_room_roof_insulation_appropriate(
is_room_roof, measures, rir_over_loft, has_room_roof_recommendation
):
"""
Determine if room roof insulation is appropriate
:param is_room_roof: Boolean - indicates whether or not the roof is a room roof
:param measures: List - list of measures
:param rir_over_loft: Boolean - indicates whether or not there we should be doing RIR insulation
:param has_room_roof_recommendation: Boolean - indicates whether or not there is a room roof non-invasive
recommendation
:return:
"""
return is_room_roof and ("room_roof_insulation" in measures) or (
has_room_roof_recommendation or rir_over_loft
)
def _does_roof_need_recommendation(self, measures: List | None = None, u_value: float | None = None):
"""
Utility function to recommend which contains the logic to determine whether the roof needs a recommendation
:return:
"""
# If there is a property above, nothing can be done
if self.property.roof["has_dwelling_above"]:
return
return False
measures = MEASURE_MAP["roof_insulation"] if measures is None else measures
u_value = self.property.roof["thermal_transmittance"]
# If we have a flat roof but we don't have flat roof as a measure, we exit
# If we have a flat roof but not flat roof insulation recommendation
if self.property.roof["is_flat"] and "flat_roof_insulation" not in measures:
return
return False
# We check if the roof is already insulated and if so, we exit
# Building regulations part L recommend installing at least 270mm of insulation, however generally we
# experience diminishing returns in terms of SAP once we go beyond around 150mm of insulation
# This only holds true for pitched roofs.
# Logic to check if we have an already insulated loft
if self.is_loft_already_insulated(measures):
return
return False
# Logic to check if we have an insulated flat roof
if (self.insulation_thickness >= self.MINIMUM_FLAT_ROOF_ISULATION_MM) and self.property.roof["is_flat"]:
return
return False
# Logic to check if we have an already insulated room in roof
if self.is_room_roof_insulated_or_unsuitable(measures):
return
return False
if self.property.roof["is_thatched"]:
return
return False
# If we have a u-value and we don't have a non-invasive recommendation, we can't recommend anything
if (u_value is not None) and not any(
x in MEASURE_MAP["roof_insulation"] for x in [r["type"] for r in self.property.non_invasive_recommendations]
):
# We don't have enough information to provide a recommendation
return False
return True
@staticmethod
def _does_primary_roof_look_sloped(
is_pitched: bool, is_loft: bool, is_assumed: bool
):
"""
Determine if the primary roof is sloped
:param is_pitched: bool - is the roof pitched
:param is_loft: bool - is the roof a loft
:param is_assumed: bool - is the roof insulation status assumed
:return:
"""
# Conditions for this to be true
# Case 1
# In the property roof description (primary roof)
# 1) Pitched Roof
# 2) Uninsulated
# 3) Not assumed
if is_pitched and not is_loft and not is_assumed:
return True
return False
@staticmethod
def _deduce_primary_roof(component_needs: dict) -> str:
"""
Helper function for deducing the primary roof type used by _handle_multi_roof_types
"""
# Can a non-primary part satisfy loft insulation?
primary_needs_loft = component_needs[1]["needs_loft_insulation"]
secondary_needs_loft = any(
p['needs_loft_insulation'] for idx, p in component_needs.items() if idx != 1
)
if primary_needs_loft and not secondary_needs_loft:
# Only option is loft
return "loft"
primary_needs_sloping = component_needs[1]["needs_sloping_ceiling"]
secondary_needs_sloping = any(
p['needs_sloping_ceiling'] for idx, p in component_needs.items() if idx != 1
)
if primary_needs_sloping and not secondary_needs_sloping:
# Only option is sloping ceiling
return "sloping_ceiling"
return "loft_insulation" # Defer to the cheaper option
def _handle_multi_roof_types(
self,
measures: List,
find_my_epc_components: List[Mapping[str, Any]],
non_invasive_recommendations: List[Mapping[str, Any]],
has_sloping_ceiling_recommendation: bool,
has_loft_insulation_recommendation: bool,
rir_over_loft: bool
) -> tuple[bool, bool]:
"""
This is a rough function to handle some edge cases, where we have two roof descriptions where
both look like they could be sloping ceilings or lofts. In this case, we need to deduce
which roof is the primary roof, and therefore whether or not we should recommend sloping ceiling insulation
:param measures: List - list of measures
:param find_my_epc_components: List - list of components from find my epc
:param non_invasive_recommendations: List - list of non-invasive recommendations
:param has_sloping_ceiling_recommendation: Boolean - indicates whether or not there is a sloping ceiling
recommendation
:param has_loft_insulation_recommendation: Boolean - indicates whether or not there is a loft insulation
recommendation
:param rir_over_loft: Boolean - indicates whether or not there we should be doing RIR insulation
:return: tuple[bool, bool] - (needs_sloping_ceiling, needs_loft_insulation)
"""
# We utilise the find my EPC data to solve cases where the primary roof and secondary roof
# being loft and sloped ceiling is ambiguous
# We need to:
# 1) Check if we have two roof types
# 2) check if both could be considered sloped
# 3) Check if we have two non-invasive recommendations for both roof types
# 4) Determine which roof is the primary roof
# We check a specific condition - which will imply loft insulation isn't appropriate but room in roof
# insulation is
# 1) We have an uninsulated loft (assumed)
# 2) We have a non-intrusive recommendation for room in roof insulation
# We only use this when we have sloping ceiling and loft insulation recommendations
# Components are indexed from 0
needs_sloping = True
needs_loft = True
roof_count = max(
x["appearance_index"] for x in find_my_epc_components if x["component_name"] == "Roof"
) + 1
roof_non_invasive_recommendations = [
x["type"] for x in non_invasive_recommendations if x['type'] in ROOF_INSULATION_MEASURES
]
has_both_recommendations = (
"loft_insulation" in roof_non_invasive_recommendations and \
"sloping_ceiling_insulation" in roof_non_invasive_recommendations
)
if (roof_count <= 1) or not has_both_recommendations:
if roof_count > 1:
if "loft_insulation" in roof_non_invasive_recommendations:
return not needs_sloping, needs_loft
if "sloping_ceiling_insulation" in roof_non_invasive_recommendations:
return needs_sloping, not needs_loft
return needs_sloping, not needs_loft # Indicates that the property needs sloping ceiling as we only run
# this in that case
extracted_roof_descriptions = {
idx: {
"description": component["description"],
**RoofAttributes(component["description"]).process()
} for idx, component in enumerate(find_my_epc_components) if component["component_name"] == "Roof"
}
component_needs = {}
for component_idx, mapped in extracted_roof_descriptions.items():
is_pitched = mapped["is_pitched"]
is_loft = mapped["is_loft"]
is_assumed = mapped["is_assumed"]
insulation_thickness = mapped["insulation_thickness"]
is_at_rafters = mapped["is_at_rafters"]
is_flat = mapped["is_flat"]
needs_sloping_ceiling = self.is_sloping_ceiling_appropriate(
is_flat=is_flat,
is_pitched=is_pitched,
is_loft=is_loft,
is_assumed=is_assumed,
has_sloping_ceiling_recommendation=has_sloping_ceiling_recommendation,
primary_roof_looks_sloped=True,
insulation_thickness=insulation_thickness,
has_loft_insulation_recommendation=has_loft_insulation_recommendation
)
# If the roof has some form of insulation already but isn't a loft, it's
# not a loft. E.g. "pitched, limited insulation" is for sloping ceiling, not loft
needs_loft_insulation = self.is_loft_insulation_appropriate(
measures=measures,
is_pitched=is_pitched,
is_at_rafters=is_at_rafters,
rir_over_loft=rir_over_loft,
insulation_thickness=insulation_thickness,
has_loft_insulation_recommendation=has_loft_insulation_recommendation,
is_assumed=is_assumed,
has_sloping_ceiling_recommendation=False
)
component_needs[component_idx] = {
"needs_sloping_ceiling": needs_sloping_ceiling,
"needs_loft_insulation": needs_loft_insulation
}
# Given the results we determine if the primary roof is sloped. The situation we may be in is
# one where the only otion is to assign one of the primary or secondary roof as a loft or sloped ceiling
# forcing our hand on whether the primary roof is sloped
primary_roof_type = self._deduce_primary_roof(component_needs)
if primary_roof_type in ["ambiguous", "sloping_ceiling"]:
return needs_sloping, not needs_loft # Set sloping ceiling to true, loft to false
return not needs_sloping, needs_loft # Set sloping ceiling to false, loft to true
def recommend(self, phase: int, measures: List | None = None, default_u_values: bool = False):
"""
Main method to recommend roof insulation measures
:param phase: Integer - phase of the recommendation, determines the order in which recommendations are
applied to the property
:param measures: List - list of measures to consider for recommendation
:param default_u_values: Boolean - whether or not to use default u-values for the recommendations
:return:
"""
measures = MEASURE_MAP["roof_insulation"] if measures is None else measures
u_value = self.property.roof["thermal_transmittance"]
property_needs_roof_recommendation = self._does_roof_need_recommendation(measures, u_value)
if not property_needs_roof_recommendation:
# Roof is either:
# - already sufficiently insulated
# - unsuitable (dwelling above, thatched, etc.)
# - not matching available measures
return
u_value = get_roof_u_value(
@ -169,33 +506,103 @@ class RoofRecommendations:
)
self.estimated_u_value = u_value
# The Roof is already compliant - in this case, the u-value is beyond the requirements for
# Building Regs Part L and so we don't recommend anything
if (u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE) or all(
m not in measures for m in MEASURE_MAP["roof_insulation"]
):
# The Roof is already compliant
return
non_invasive_recommendations = self.property.non_invasive_recommendations
# We check a specific condition - which will imply loft insulation isn't appropriate but room in roof
# insulation is
# 1) We have an uninsulated loft (assumed)
# 2) We have a non-intrusive recommendation for room in roof insulation
is_pitched = self.property.roof["is_pitched"]
is_loft = self.property.roof["is_loft"]
is_assumed = self.property.roof["is_assumed"]
is_at_rafters = self.property.roof["is_at_rafters"]
is_flat = self.property.roof["is_flat"]
is_room_roof = self.property.roof["is_roof_room"]
insulation_thickness = self.property.roof["insulation_thickness"]
has_sloping_ceiling_recommendation = any(
x["type"] == "sloping_ceiling_insulation" for x in non_invasive_recommendations
)
has_loft_insulation_recommendation = any(x["type"] == "loft_insulation" for x in non_invasive_recommendations)
has_flat_roof_recommendation = any(x["type"] == "flat_roof_insulation" for x in non_invasive_recommendations)
has_room_roof_recommendation = any(x["type"] == "room_roof_insulation" for x in non_invasive_recommendations)
primary_roof_looks_sloped = self._does_primary_roof_look_sloped(
is_pitched=is_pitched, is_loft=is_loft, is_assumed=is_assumed
)
rir_over_loft = (
self.property.roof["is_pitched"] and
is_pitched and
self.property.roof["insulation_thickness"] == "none" and
"room_in_roof_insulation" in [x["type"] for x in non_invasive_recommendations]
has_room_roof_recommendation
)
# We firstly handle non-intrusive recommendations, which may override the normal roof insulation recommendations
if ("loft_insulation" in [x["type"] for x in non_invasive_recommendations]) or (
self.property.roof["is_pitched"] and "loft_insulation" in measures and
not self.property.roof["is_at_rafters"]
) and not rir_over_loft:
needs_sloping_ceiling = self.is_sloping_ceiling_appropriate(
is_pitched=is_pitched,
is_flat=is_flat,
is_loft=is_loft,
is_assumed=is_assumed,
has_sloping_ceiling_recommendation=has_sloping_ceiling_recommendation,
primary_roof_looks_sloped=primary_roof_looks_sloped,
insulation_thickness=insulation_thickness,
has_loft_insulation_recommendation=has_loft_insulation_recommendation
)
needs_loft_insulation = self.is_loft_insulation_appropriate(
measures=measures,
is_pitched=is_pitched,
is_at_rafters=is_at_rafters,
rir_over_loft=rir_over_loft,
insulation_thickness=insulation_thickness,
has_loft_insulation_recommendation=has_loft_insulation_recommendation,
is_assumed=is_assumed,
has_sloping_ceiling_recommendation=has_sloping_ceiling_recommendation
)
needs_flat_roof_insulation = self.is_flat_roof_insulation_appropriate(
is_flat=is_flat,
measures=measures,
has_flat_roof_recommendation=has_flat_roof_recommendation,
primary_roof_looks_sloped=primary_roof_looks_sloped
)
needs_rir_insulation = self.is_room_roof_insulation_appropriate(
is_room_roof=is_room_roof,
measures=measures,
rir_over_loft=rir_over_loft,
has_room_roof_recommendation=has_room_roof_recommendation
)
# We handle possible multi roof types
if needs_sloping_ceiling:
# Multi-roof override:
# In ambiguous cases (extensions, mixed descriptions), EPC component analysis
# may force us to choose between loft vs sloping ceiling.
needs_sloping_ceiling, needs_loft_insulation = self._handle_multi_roof_types(
measures=measures,
find_my_epc_components=self.property.find_my_epc_components,
non_invasive_recommendations=non_invasive_recommendations,
has_sloping_ceiling_recommendation=has_sloping_ceiling_recommendation,
has_loft_insulation_recommendation=has_loft_insulation_recommendation,
rir_over_loft=rir_over_loft
)
# Explicit override
needs_flat_roof_insulation = False
needs_rir_insulation = False
if needs_sloping_ceiling and needs_loft_insulation:
raise RuntimeError(
"Multi-roof resolution produced conflicting outcomes: "
"both sloping ceiling and loft insulation required"
)
# Retrofit precedence (least → most invasive):
# Loft > Flat roof > Room in roof > Sloping ceiling
################################################################
# ~~~~~ Loft Insulation Recommendation Logic ~~~~~
################################################################
if needs_loft_insulation:
self.recommend_roof_insulation(
u_value=u_value,
insulation_thickness=self.insulation_thickness,
phase=phase,
is_flat=False,
is_pitched=True,
@ -203,13 +610,12 @@ class RoofRecommendations:
)
return
if (
(self.property.roof["is_flat"] and "flat_roof_insulation" in measures) or
"flat_roof_insulation" in [x["type"] for x in non_invasive_recommendations]
):
################################################################
# ~~~~~ Flat Roof Insulation Recommendation Logic ~~~~~
################################################################
if needs_flat_roof_insulation:
self.recommend_roof_insulation(
u_value=u_value,
insulation_thickness=0,
phase=phase,
is_flat=True,
is_pitched=False,
@ -217,16 +623,34 @@ class RoofRecommendations:
)
return
################################################################
# ~~~~~ Room Roof Insulation Recommendation Logic ~~~~~
################################################################
# There are cases where the property might have a room roof as the second roof, but we have a recommendation for
# it, so we allow this override
if self.property.roof["is_roof_room"] and ("room_roof_insulation" in measures) or (
"room_roof_insulation" in [x["type"] for x in non_invasive_recommendations] or
rir_over_loft
):
if needs_rir_insulation:
self.recommend_room_roof_insulation(u_value, phase, default_u_values)
return
raise NotImplementedError("Implement me")
####################################################################################################
# ~~~~~ Sloping Ceiling Insulation Recommendation Logic ~~~~~
####################################################################################################
if needs_sloping_ceiling:
self.recommend_sloping_ceiling(
phase=phase,
u_value=u_value,
non_invasive_recommendations=non_invasive_recommendations
)
return
raise RuntimeError(
"Roof recommendation undecidable. "
f"needs_loft={needs_loft_insulation}, "
f"needs_flat={needs_flat_roof_insulation}, "
f"needs_rir={needs_rir_insulation}, "
f"needs_sloping={needs_sloping_ceiling}, "
f"roof={self.property.roof}"
)
@staticmethod
def make_roof_insulation_description(material):
@ -245,7 +669,7 @@ class RoofRecommendations:
raise ValueError("Invalid material type")
def recommend_roof_insulation(
self, u_value, insulation_thickness, phase, is_pitched, is_flat, default_u_values
self, u_value, phase, is_pitched, is_flat, default_u_values
):
"""
@ -267,7 +691,6 @@ class RoofRecommendations:
could be traditional roofing materials like bitumen-based felt, rubber membranes like EPDM, or fiberglass.
:param u_value: U-value of the roof before any retrofit measures have been installed
:param insulation_thickness: Existing Insulation thickness of the loft
:param phase: Phase of the recommendation
:param is_pitched: Is the roof pitched
:param is_flat: Is the roof flat
@ -586,3 +1009,71 @@ class RoofRecommendations:
)
self.recommendations = recommendations
def recommend_sloping_ceiling(self, phase: int, u_value, non_invasive_recommendations: List[Mapping[str, Any]]):
"""
Sloping ceiling insulation recommendations are different from other roof types, though
the description of the roof appears to be quite similar to a roof with a loft. In order to
deduce the roof type, we apply the following logic:
1) If the roof is descrbed as pitched, insulated, without a loft insulation thickness, it's
an insulated sloped ceiling
2) If the roof insulation is assumed, it implies that the surveyor could not gain access to the
roof and therefore it's a loft
3) If it's a pitched roof that is uninsulated and is NOT assumed, and there is not loft insulation
recommendation, this implies that the surveyor was able to gain access to the roof and there was no
loft insulation recommendation so it must be a sloping ceiling since loft insulation is a default
recommendation for an uninsualted loft
Since we don't have any materials from installers for this specific recommendation, we
do not iterate through any materials. Instead, we provide a single recommendation, we estimated
prices based on desk research.
:return:
"""
sloping_ceiling_recommendation = next(
(x for x in non_invasive_recommendations if x["type"] == "sloping_ceiling_insulation"), {}
)
new_description = "Pitched, insulated"
new_efficiency = "Average" # 75mm insulation only results in average performance category
roof_ending_config = RoofAttributes(new_description).process()
roof_simulation_config = check_simulation_difference(
new_config=roof_ending_config, old_config=self.property.roof, prefix="roof_"
)
# We pull out new u-values, based on 75mm of insulation, with u-values defined from Elmhurst
new_u_value = 0.5 # This doesn't change, regardless of starting u-value
simulation_config = {
**roof_simulation_config,
"roof_thermal_transmittance_ending": new_u_value,
"roof_energy_eff_ending": new_efficiency
}
cost_result = self.costs.sloping_ceiling_insulation(
insulation_roof_area=self.property.roof_area # For a pitched roof, this is the pitched roof area
)
self.recommendations = [
{
"phase": phase,
"parts": [],
"type": "sloping_ceiling_insulation",
"measure_type": "sloping_ceiling_insulation",
"description": "Insulate sloping ceilings at the rafters and re-decorate",
"starting_u_value": u_value,
"new_u_value": None,
"sap_points": sloping_ceiling_recommendation.get("sap_points", None),
"simulation_config": simulation_config,
"description_simulation": {
"roof-description": new_description,
"roof-energy-eff": new_efficiency
},
**cost_result,
"already_installed": "sloping_ceiling_insulation" in self.property.already_installed,
"survey": sloping_ceiling_recommendation.get("survey", None),
"innovation_rate": 0
}
]

View file

@ -236,3 +236,11 @@ class TestCosts:
)
assert result['total'] == pytest.approx(expected_cost, rel=0.01)
def test_sloping_ceiling_insulation(self):
mock_property = Mock()
mock_property.data = {"county": "Mansfield"}
costs = Costs(mock_property)
res = costs.sloping_ceiling_insulation(insulation_roof_area=64.085)
assert res["total"] == 5238.713924924947
assert res["contingency"] == 1362.0656204804861

View file

@ -1,7 +1,9 @@
import pytest
from unittest.mock import Mock
from backend.Property import Property
from etl.epc.Record import EPCRecord
from recommendations.RoofRecommendations import RoofRecommendations
from recommendations.tests.test_data.materials import materials
from etl.epc.Record import EPCRecord
class TestRoofRecommendations:
@ -402,3 +404,374 @@ class TestRoofRecommendations:
roof_recommender14.recommend(phase=0)
assert not roof_recommender14.recommendations
# ~~~~~~~~~~~~ Sloping Ceiling Insulation ~~~~~~~~~~~~
@pytest.mark.parametrize(
"roof, has_sloping_ceiling_recommendation, primary_roof_looks_sloped, insulation_thickness, "
"has_loft_insulation_recommendation, expected_result",
[
(
{
'original_description': 'Pitched, no insulation',
'thermal_transmittance': None,
'thermal_transmittance_unit': None,
'is_pitched': True,
'is_roof_room': False,
'is_loft': False,
'is_flat': False,
'is_thatched': False,
'is_at_rafters': False,
'is_assumed': False,
'has_dwelling_above': False,
'is_valid': True,
'insulation_thickness': 'none'
},
True,
True,
"none",
False,
True,
),
(
{
'original_description': 'Pitched, insulated (assumed)', 'clean_description': 'Pitched, insulated',
'thermal_transmittance': None, 'thermal_transmittance_unit': None, 'is_pitched': True,
'is_roof_room': False, 'is_loft': False, 'is_flat': False, 'is_thatched': False,
'is_at_rafters': False, 'is_assumed': True, 'has_dwelling_above': False, 'is_valid': True,
'insulation_thickness': 'average'
},
False,
False,
"average",
False,
False
)
]
)
def test_is_sloping_ceiling_appropriate(
self, roof, has_sloping_ceiling_recommendation, primary_roof_looks_sloped,
insulation_thickness, has_loft_insulation_recommendation, expected_result
):
assert RoofRecommendations.is_sloping_ceiling_appropriate(
is_flat=roof["is_flat"],
is_pitched=roof["is_pitched"],
is_loft=roof["is_loft"],
is_assumed=roof["is_assumed"],
has_sloping_ceiling_recommendation=has_sloping_ceiling_recommendation,
primary_roof_looks_sloped=primary_roof_looks_sloped,
insulation_thickness=insulation_thickness,
has_loft_insulation_recommendation=has_loft_insulation_recommendation
) == expected_result
def test_sloping_ceiling_pitched_no_insulation(self):
property_instance = Mock(
id=0,
roof={
'original_description': 'Pitched, no insulation', 'clean_description': 'Pitched, no insulation',
'thermal_transmittance': None, 'thermal_transmittance_unit': None, 'is_pitched': True,
'is_roof_room': False, 'is_loft': False, 'is_flat': False, 'is_thatched': False,
'is_at_rafters': False, 'is_assumed': False, 'has_dwelling_above': False, 'is_valid': True,
'insulation_thickness': 'none'
},
roof_area=64.085,
data={"county": None, "local-authority-label": "Manchester"},
age_band="D",
already_installed=[],
non_invasive_recommendations=[
{'type': 'flat_roof_insulation', 'sap_points': 9, 'survey': True},
{'type': 'sloping_ceiling_insulation', 'sap_points': 9, 'survey': True},
{'type': 'cavity_wall_insulation', 'sap_points': 6, 'survey': True},
{'type': 'suspended_floor_insulation', 'sap_points': 2, 'survey': True},
{'type': 'roomstat_programmer_trvs', 'sap_points': 3, 'survey': True},
{'type': 'time_temperature_zone_control', 'sap_points': 3, 'survey': True},
{'type': 'solar_pv', 'sap_points': 5, 'survey': True, 'suitable': True}
],
find_my_epc_components=[
{'component_name': 'Wall', 'description': 'Solid brick, as built, no insulation (assumed)',
'efficiency': 'Very poor', 'appearance_index': 0},
{'component_name': 'Roof', 'description': 'Pitched, no insulation', 'efficiency': 'Very poor',
'appearance_index': 0},
{'component_name': 'Roof', 'description': 'Pitched, limited insulation', 'efficiency': 'Very poor',
'appearance_index': 1},
{'component_name': 'Window', 'description': 'Some multiple glazing', 'efficiency': 'Very poor',
'appearance_index': 0},
{'component_name': 'Main heating', 'description': 'Boiler and radiators, mains gas',
'efficiency': 'Good', 'appearance_index': 0},
{'component_name': 'Main heating control', 'description': 'Programmer, room thermostat and TRVs',
'efficiency': 'Good', 'appearance_index': 0},
{'component_name': 'Hot water', 'description': 'From main system', 'efficiency': 'Good',
'appearance_index': 0},
{'component_name': 'Lighting', 'description': 'Low energy lighting in 28% of fixed outlets',
'efficiency': 'Average', 'appearance_index': 0},
{'component_name': 'Floor', 'description': 'Solid, no insulation (assumed)', 'efficiency': 'N/A',
'appearance_index': 0},
{'component_name': 'Secondary heating', 'description': 'None', 'efficiency': 'N/A',
'appearance_index': 0}
]
)
roof_recommender = RoofRecommendations(property_instance=property_instance, materials=[])
assert not roof_recommender.recommendations
roof_recommender.recommend(phase=0)
assert len(roof_recommender.recommendations) == 1
assert roof_recommender.recommendations[0]["type"] == "sloping_ceiling_insulation"
assert roof_recommender.recommendations[0]["measure_type"] == "sloping_ceiling_insulation"
assert (
roof_recommender.recommendations[0]["description"] ==
"Insulate sloping ceilings at the rafters and re-decorate"
)
assert roof_recommender.recommendations[0]["simulation_config"] == {
'roof_insulation_thickness_ending': 'average',
'roof_thermal_transmittance_ending': 0.5,
'roof_energy_eff_ending': 'Average'
}
assert roof_recommender.recommendations[0]["description_simulation"] == {
'roof-description': 'Pitched, insulated', 'roof-energy-eff': 'Average'
}
def test_ambiguous_sloping_ceiling_or_loft(self):
# In this case, we actually expect loft insulation to be recommended
property_instance = Mock(
id=0,
roof={
# Roof looks like it could be a sloping ceiling but it's actually a loft
'original_description': 'Pitched, no insulation', 'clean_description': 'Pitched, no insulation',
'thermal_transmittance': None, 'thermal_transmittance_unit': None, 'is_pitched': True,
'is_roof_room': False, 'is_loft': False, 'is_flat': False, 'is_thatched': False,
'is_at_rafters': False, 'is_assumed': False, 'has_dwelling_above': False, 'is_valid': True,
'insulation_thickness': 'none'
},
roof_area=197.748,
data={"county": None, "local-authority-label": "Manchester"},
already_installed=[],
find_my_epc_components=[
{'component_name': 'Wall', 'description': 'Solid brick, as built, no insulation (assumed)',
'efficiency': 'Very poor', 'appearance_index': 0},
{'component_name': 'Roof', 'description': 'Pitched, no insulation', 'efficiency': 'Very poor',
'appearance_index': 0},
{'component_name': 'Roof', 'description': 'Pitched, limited insulation', 'efficiency': 'Very poor',
'appearance_index': 1},
{'component_name': 'Window', 'description': 'Some multiple glazing', 'efficiency': 'Very poor',
'appearance_index': 0},
{'component_name': 'Main heating', 'description': 'Boiler and radiators, mains gas',
'efficiency': 'Good', 'appearance_index': 0},
{'component_name': 'Main heating control', 'description': 'Programmer, room thermostat and TRVs',
'efficiency': 'Good', 'appearance_index': 0},
{'component_name': 'Hot water', 'description': 'From main system', 'efficiency': 'Good',
'appearance_index': 0},
{'component_name': 'Lighting', 'description': 'Low energy lighting in 28% of fixed outlets',
'efficiency': 'Average', 'appearance_index': 0},
{'component_name': 'Floor', 'description': 'Solid, no insulation (assumed)', 'efficiency': 'N/A',
'appearance_index': 0},
{'component_name': 'Secondary heating', 'description': 'None', 'efficiency': 'N/A',
'appearance_index': 0}
],
age_band="B",
non_invasive_recommendations=[
{'type': 'loft_insulation', 'sap_points': 3, 'survey': True},
{'type': 'flat_roof_insulation', 'sap_points': 2, 'survey': True},
{'type': 'sloping_ceiling_insulation', 'sap_points': 2, 'survey': True},
{'type': 'internal_wall_insulation', 'sap_points': 9, 'survey': True},
{'type': 'draught_proofing', 'sap_points': 1, 'survey': True},
{'type': 'low_energy_lighting', 'sap_points': 1, 'survey': True},
{'type': 'solar_water_heating', 'sap_points': 1, 'survey': True},
{'type': 'double_glazing', 'sap_points': 3, 'survey': True},
{'type': 'solar_pv', 'sap_points': 4, 'survey': True, 'suitable': True}
],
insulation_floor_area=162
)
roof_recommender = RoofRecommendations(property_instance=property_instance, materials=materials)
assert not roof_recommender.recommendations
roof_recommender.recommend(phase=0)
assert len(roof_recommender.recommendations) == 3
# Should all be loft insulation recommendations
assert all(
rec["type"] == "loft_insulation" for rec in roof_recommender.recommendations
)
def test_no_access_pitched_roof_assumed(self):
"""
In this case, the roof will have been surveyed as pitched, but the surveyor won't
have gotten access to the property to check the insulation. Therefore, we
recommend loft insulation. We assume that the roof is a locked off loft
:return:
"""
property_instance = Mock(
id=0,
roof={
'original_description': 'Pitched, limited insulation (assumed)',
'clean_description': 'Pitched, limited insulation', 'thermal_transmittance': None,
'thermal_transmittance_unit': None, 'is_pitched': True, 'is_roof_room': False, 'is_loft': False,
'is_flat': False, 'is_thatched': False, 'is_at_rafters': False, 'is_assumed': True,
'has_dwelling_above': False, 'is_valid': True, 'insulation_thickness': 'below average'
},
roof_area=73.24,
data={"county": None, "local-authority-label": "Manchester"},
already_installed=[],
find_my_epc_components=[
{'component_name': 'Wall', 'description': 'Solid brick, as built, no insulation (assumed)',
'efficiency': 'Very poor', 'appearance_index': 0},
{'component_name': 'Wall', 'description': 'System built, as built, no insulation (assumed)',
'efficiency': 'Poor', 'appearance_index': 1},
{'component_name': 'Wall', 'description': 'Cavity wall, filled cavity', 'efficiency': 'Average',
'appearance_index': 2},
{'component_name': 'Roof', 'description': 'Pitched, limited insulation (assumed)',
'efficiency': 'Very poor', 'appearance_index': 0},
{'component_name': 'Window', 'description': 'Fully double glazed', 'efficiency': 'Average',
'appearance_index': 0},
{'component_name': 'Main heating', 'description': 'Boiler and radiators, mains gas',
'efficiency': 'Good', 'appearance_index': 0},
{'component_name': 'Main heating control', 'description': 'Programmer and room thermostat',
'efficiency': 'Average', 'appearance_index': 0},
{'component_name': 'Hot water', 'description': 'From main system', 'efficiency': 'Good',
'appearance_index': 0},
{'component_name': 'Lighting', 'description': 'Low energy lighting in 75% of fixed outlets',
'efficiency': 'Very good', 'appearance_index': 0},
{'component_name': 'Roof', 'description': '(another dwelling above)', 'efficiency': 'N/A',
'appearance_index': 1},
{'component_name': 'Floor', 'description': 'Suspended, no insulation (assumed)', 'efficiency': 'N/A',
'appearance_index': 0},
{'component_name': 'Floor', 'description': 'Solid, no insulation (assumed)', 'efficiency': 'N/A',
'appearance_index': 1},
{'component_name': 'Secondary heating', 'description': 'None', 'efficiency': 'N/A',
'appearance_index': 0}
],
age_band="B",
non_invasive_recommendations=[
{'type': 'internal_wall_insulation', 'sap_points': 2, 'survey': True},
{'type': 'suspended_floor_insulation', 'sap_points': 2, 'survey': True},
{'type': 'solid_floor_insulation', 'sap_points': 1, 'survey': True},
{'type': 'low_energy_lighting', 'sap_points': 0, 'survey': True}
],
insulation_floor_area=60
)
roof_recommender = RoofRecommendations(property_instance=property_instance, materials=materials)
assert not roof_recommender.recommendations
roof_recommender.recommend(phase=0)
assert len(roof_recommender.recommendations) == 3
# Should all be loft insulation recommendations
assert all(
rec["type"] == "loft_insulation" for rec in roof_recommender.recommendations
)
def test_traditional_loft_insulation(self):
property_instance = Mock(
id=0,
roof={
'original_description': 'Pitched, no insulation', 'clean_description': 'Pitched, no insulation',
'thermal_transmittance': None, 'thermal_transmittance_unit': None, 'is_pitched': True,
'is_roof_room': False, 'is_loft': False, 'is_flat': False, 'is_thatched': False,
'is_at_rafters': False, 'is_assumed': False, 'has_dwelling_above': False, 'is_valid': True,
'insulation_thickness': 'none'
},
roof_area=48.82666666666667,
data={"county": None, "local-authority-label": "Manchester"},
already_installed=[],
find_my_epc_components=[
{'component_name': 'Wall', 'description': 'Cavity wall, filled cavity', 'efficiency': 'Good',
'appearance_index': 0},
{'component_name': 'Roof', 'description': 'Pitched, no insulation', 'efficiency': 'Very poor',
'appearance_index': 0},
{'component_name': 'Window', 'description': 'Fully double glazed', 'efficiency': 'Good',
'appearance_index': 0},
{'component_name': 'Main heating', 'description': 'Boiler and radiators, mains gas',
'efficiency': 'Good', 'appearance_index': 0},
{'component_name': 'Main heating control', 'description': 'TRVs and bypass', 'efficiency': 'Average',
'appearance_index': 0},
{'component_name': 'Hot water', 'description': 'From main system', 'efficiency': 'Good',
'appearance_index': 0},
{'component_name': 'Lighting', 'description': 'Low energy lighting in all fixed outlets',
'efficiency': 'Very good', 'appearance_index': 0},
{'component_name': 'Floor', 'description': 'Solid, no insulation (assumed)', 'efficiency': 'N/A',
'appearance_index': 0},
{'component_name': 'Secondary heating', 'description': 'Room heaters, electric', 'efficiency': 'N/A',
'appearance_index': 0}
],
age_band="F",
non_invasive_recommendations=[
{'type': 'loft_insulation', 'sap_points': 9, 'survey': True},
{'type': 'solid_floor_insulation', 'sap_points': 2, 'survey': True},
{'type': 'solar_water_heating', 'sap_points': 1, 'survey': True},
{'type': 'solar_pv', 'sap_points': 11, 'survey': True, 'suitable': True}
],
insulation_floor_area=40.0
)
roof_recommender = RoofRecommendations(property_instance=property_instance, materials=materials)
assert not roof_recommender.recommendations
roof_recommender.recommend(0)
assert len(roof_recommender.recommendations) == 3
# should all be loft insulation recommendations
assert all(rec["type"] == "loft_insulation" for rec in roof_recommender.recommendations)
def sloping_ceiling_limited_insulation(self):
property_instance = Mock(
id=0,
roof={
"original_description": 'Pitched, limited insulation (assumed)',
'clean_description': 'Pitched, limited insulation',
'thermal_transmittance': None, 'thermal_transmittance_unit': None, 'is_pitched': True,
'is_roof_room': False, 'is_loft': False, 'is_flat': False, 'is_thatched': False, 'is_at_rafters': False,
'is_assumed': True, 'has_dwelling_above': False, 'is_valid': True,
'insulation_thickness': 'below average'
},
roof_area=35,
data={"county": None, "local-authority-label": "Manchester"},
already_installed=[],
find_my_epc_components=[
{'component_name': 'Wall', 'description': 'Cavity wall, as built, no insulation (assumed)',
'efficiency': 'poor', 'appearance_index': 0},
{'component_name': 'Roof', 'description': 'Pitched, limited insulation (assumed)',
'efficiency': 'Very poor', 'appearance_index': 0},
{'component_name': 'Window', 'description': 'Fully double glazed', 'efficiency': 'Average',
'appearance_index': 0},
{'component_name': 'Main heating', 'description': 'Boiler and radiators, mains gas',
'efficiency': 'Good', 'appearance_index': 0},
{'component_name': 'Main heating control', 'description': 'TRVs and bypass',
'efficiency': 'Average', 'appearance_index': 0},
{'component_name': 'Hot water', 'description': 'From main system', 'efficiency': 'Good',
'appearance_index': 0},
{'component_name': 'Lighting', 'description': 'Low energy lighting in all fixed outlets',
'efficiency': 'Very good', 'appearance_index': 0},
{'component_name': 'Floor', 'description': '(another dwelling below)', 'efficiency': 'N/A',
'appearance_index': 0},
{'component_name': 'Secondary heating', 'description': 'None', 'efficiency': 'N/A',
'appearance_index': 0}
],
age_band="B",
non_invasive_recommendations=[
{'type': 'sloping_ceiling_insulation', 'sap_points': 2, 'survey': True},
{'type': 'flat_roof_insulation', 'sap_points': 2, 'survey': True},
],
)
# We expect a sloping ceiling insulation recommendation
roof_recommender = RoofRecommendations(property_instance=property_instance, materials=materials)
assert not roof_recommender.recommendations
roof_recommender.recommend(phase=0)
assert len(roof_recommender.recommendations) == 1
assert roof_recommender.recommendations[0]["type"] == "sloping_ceiling_insulation"
assert roof_recommender.recommendations[0]["measure_type"] == "sloping_ceiling_insulation"
assert roof_recommender.recommendations[0]["description"] == \
"Insulate sloping ceilings at the rafters and re-decorate"
assert roof_recommender.recommendations[0]["simulation_config"] == {
'roof_insulation_thickness_ending': 'average',
'roof_thermal_transmittance_ending': 0.5,
'roof_energy_eff_ending': 'Average'
}
assert roof_recommender.recommendations[0]["description_simulation"] == {
'roof-description': 'Pitched, insulated', 'roof-energy-eff': 'Average'
}

View file

@ -1,6 +1,4 @@
import os
import pytest
import pickle
import numpy as np
from unittest.mock import Mock, MagicMock