Merge pull request #335 from Hestia-Homes/retrofit-assessmet-api

Retrofit assessmet api
This commit is contained in:
KhalimCK 2024-09-09 12:44:20 +01:00 committed by GitHub
commit 8e2dec0d68
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 1345 additions and 290 deletions

View file

@ -365,7 +365,7 @@ class Property:
for rec in property_recommendations_by_phase:
# We simulate the impact of the recommendation at this current phase, and all of the prior phases
if rec["type"] == "mechanical_ventilation":
if rec["type"] in ["mechanical_ventilation", "trickle_vents", "draught_proofing"]:
continue
scoring_dict = self.create_recommendation_scoring_data(
@ -480,7 +480,6 @@ class Property:
"""
output = recommendation_record.copy()
non_invasive_recommendations = [] if non_invasive_recommendations is None else non_invasive_recommendations
for col in [
"walls_insulation_thickness",
@ -537,28 +536,14 @@ class Property:
"heating", "hot_water_tank_insulation", "heating_control", "secondary_heating",
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
"cylinder_thermostat", "loft_insulation", "room_roof_insulation", "flat_roof_insulation",
"solid_floor_insulation", "suspended_floor_insulation",
"solid_floor_insulation", "suspended_floor_insulation", "mixed_glazing"
]:
# We update the data, as defined in the recommendaton
if output["walls_insulation_thickness_ending"] is None:
output["walls_insulation_thickness_ending"] = "none"
for prefix in ["walls", "roof", "floor"]:
if output[f"{prefix}_insulation_thickness_ending"] is None:
output[f"{prefix}_insulation_thickness_ending"] = "none"
if output["walls_thermal_transmittance_ending"] is None:
raise ValueError("We should not have a None value for the u value")
if output["roof_insulation_thickness_ending"] is None:
output["roof_insulation_thickness_ending"] = "none"
if output["roof_thermal_transmittance_ending"] is None:
raise ValueError("We should not have a None value for the u value")
if output["floor_thermal_transmittance_ending"] is None:
raise ValueError("We should not have a None value for the u value")
if output["floor_insulation_thickness_ending"] is None:
output["floor_insulation_thickness_ending"] = "none"
simulation_config = recommendation["simulation_config"]
simulation_config = recommendation["simulation_config"].copy()
# If any entries in simulation_config are None, we will set them to "Unknown" which is the cleaning
# value
for key, value in simulation_config.items():
@ -576,7 +561,7 @@ class Property:
"loft_insulation", "room_roof_insulation", "flat_roof_insulation",
"solid_floor_insulation", "suspended_floor_insulation",
"windows_glazing", "solar_pv", "heating", "hot_water_tank_insulation",
"heating_control", "secondary_heating", "cylinder_thermostat"
"heating_control", "secondary_heating", "cylinder_thermostat", "mixed_glazing"
]:
raise NotImplementedError(
"Implement me, given type %s" % recommendation["type"]
@ -1231,12 +1216,12 @@ class Property:
else:
raise Exception("Investiage me")
def is_ashp_valid(self, exclusions):
def is_ashp_valid(self, measures):
if "air_source_heat_pump" in self.non_invasive_recommendations:
return True
if "air_source_heat_pump" in exclusions:
if "air_source_heat_pump" not in measures:
return False
suitable_property_type = self.data["property-type"] in ["House", "Bungalow"]
@ -1279,9 +1264,11 @@ class Property:
"""
exclusions = [] if exclusions is None else exclusions
if "air_source_heat_pump" in exclusions:
return self.current_energy_consumption
# If the property currently has an ASHP, we don't gain from any efficiency improvements
if not self.is_ashp_valid(exclusions=exclusions):
if not self.is_ashp_valid(measures=["air_source_heat_pump"]):
return self.current_energy_consumption
# If the property currently has an electric boiler, it will still benefit from the ASHP efficiency gain

View file

@ -33,6 +33,9 @@ class Settings(BaseSettings):
HEATING_KWH_PREDICTIONS_BUCKET: str
HOTWATER_KWH_PREDICTIONS_BUCKET: str
# Other S3 buckts
ENERGY_ASSESSMENTS_BUCKET: str
class Config:
env_file = "backend/.env"

View file

@ -1,17 +1,26 @@
from backend.app.db.models.energy_assessments import EnergyAssessment
from backend.app.db.models.energy_assessments import (
EnergyAssessment, EnergyAssessmentScenarios, EnergyAssessmentDocuments, DocumentTypeEnum
)
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from typing import Optional
from typing import Optional, List, Dict
from sqlalchemy import desc
from utils.logger import setup_logger
logger = setup_logger()
def bulk_insert_energy_assessments(session: Session, data_list):
def bulk_insert_energy_assessments(session: Session, data_list: List[dict]) -> Dict[int, int]:
"""
This function inserts or updates multiple energy assessment records into the database.
This function inserts or updates multiple energy assessment records into the database and returns a mapping of
uprn to energy_assessment_id.
:param session: The SQLAlchemy session.
:param data_list: A list of dictionaries containing energy assessment data.
:return: A dictionary mapping each uprn to its corresponding energy_assessment_id.
"""
uprn_to_assessment_id = {}
try:
for data in data_list:
uprn = data.get('uprn')
@ -28,19 +37,30 @@ def bulk_insert_energy_assessments(session: Session, data_list):
for key, value in data.items():
setattr(existing_record, key, value)
session.add(existing_record)
# Map the uprn to the existing record's ID
uprn_to_assessment_id[uprn] = existing_record.id
else:
# Insert a new record
new_assessment = EnergyAssessment(**data)
session.add(new_assessment)
# Flush the session to get the newly created ID before commit
session.flush()
# Map the uprn to the new record's ID
uprn_to_assessment_id[uprn] = new_assessment.id
# Commit the transaction
session.commit()
print("All records inserted or updated successfully.")
logger.info("All records inserted or updated successfully.")
except IntegrityError as e:
# Rollback the session in case of error
session.rollback()
print(f"Error occurred: {e}")
logger.info(f"Error occurred: {e}")
return uprn_to_assessment_id
def get_latest_assessment_by_uprn(session: Session, uprn: int) -> Optional[EnergyAssessment]:
@ -58,5 +78,81 @@ def get_latest_assessment_by_uprn(session: Session, uprn: int) -> Optional[Energ
return latest_assessment.to_dict() if latest_assessment else EnergyAssessment.empty_response()
except Exception as e:
print(f"An error occurred: {e}")
logger.info(f"An error occurred: {e}")
return None
def create_scenarios_for_documents(session: Session, document_list: List[dict], uprn_to_assessment_id: dict):
"""
Creates scenarios for documents by UPRN and links them to the energy assessments.
:param session: The SQLAlchemy session.
:param document_list: A list of dictionaries containing document data.
:param uprn_to_assessment_id: A dictionary mapping UPRN to energy_assessment_id.
"""
try:
for document in document_list:
uprn = document.get('uprn')
scenario_name = document.get('scenario_id')
if scenario_name:
# Get the associated energy_assessment_id for the UPRN
energy_assessment_id = uprn_to_assessment_id.get(uprn)
# Check if the scenario already exists
existing_scenario = session.query(EnergyAssessmentScenarios).filter_by(
scenario_name=scenario_name,
energy_assessment_id=energy_assessment_id
).first()
if not existing_scenario:
# Create the scenario
new_scenario = EnergyAssessmentScenarios(
scenario_name=scenario_name,
energy_assessment_id=energy_assessment_id
)
session.add(new_scenario)
session.flush() # Get the new scenario ID
# Update document with new scenario ID
document['scenario_id'] = new_scenario.id
else:
# If the scenario already exists, just use its ID
document['scenario_id'] = existing_scenario.id
# Commit the scenarios
session.commit()
logger.info("Scenarios created successfully.")
except IntegrityError as e:
session.rollback()
logger.info(f"Error occurred: {e}")
def create_documents(session: Session, document_list: List[dict]):
"""
Inserts documents into the energy_assessment_documents table, linking them to scenarios and assessments.
:param session: The SQLAlchemy session.
:param document_list: A list of dictionaries containing document data.
"""
try:
for document in document_list:
# Ensure the document_type is cast to Enum
new_document = EnergyAssessmentDocuments(
uprn=document['uprn'],
document_type=DocumentTypeEnum(document['document_type']).value,
document_location=document['document_location'],
energy_assessment_id=document['energy_assessment_id'],
scenario_id=document.get('scenario_id') # Might be None if no scenario
)
session.add(new_document)
# Commit all document insertions
session.commit()
logger.info("Documents created successfully.")
except IntegrityError as e:
session.rollback()
logger.info(f"Error occurred: {e}")

View file

@ -1,5 +1,8 @@
from sqlalchemy import Column, Integer, BigInteger, Text, Float, DateTime, Boolean, Date
from sqlalchemy import Column, Integer, BigInteger, Text, Float, DateTime, Boolean, Date, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import ENUM as PgEnum
import enum
from datetime import datetime
Base = declarative_base()
@ -163,3 +166,42 @@ class EnergyAssessment(Base):
@staticmethod
def empty_response():
return {"epc": {}, "condition": {}}
class EnergyAssessmentScenarios(Base):
__tablename__ = 'energy_assessment_scenarios'
id = Column(BigInteger, primary_key=True, autoincrement=True)
scenario_name = Column(Text, nullable=False)
energy_assessment_id = Column(BigInteger, ForeignKey('energy_assessments.id'), nullable=False)
class DocumentTypeEnum(enum.Enum):
EPR = "EPR"
ConditionReport = "Condition Report"
EvidenceReport = "Evidence Report"
SummaryInformation = "Summary Information"
FloorPlan = "Floor Plan"
ScenarioDraftEPC = "Scenario Draft EPC"
ScenarioSiteNotes = "Scenario Site Notes"
class EnergyAssessmentDocuments(Base):
__tablename__ = 'energy_assessment_documents'
id = Column(BigInteger, primary_key=True, autoincrement=True)
uprn = Column(BigInteger, nullable=False)
energy_assessment_id = Column(BigInteger, ForeignKey('energy_assessments.id'), nullable=False)
document_type = Column(PgEnum(DocumentTypeEnum, name="document_type", create_type=False), nullable=False)
document_location = Column(Text, nullable=False)
uploaded_at = Column(DateTime(timezone=True), nullable=False, default=datetime.utcnow)
scenario_id = Column(BigInteger, ForeignKey('energy_assessment_scenarios.id'), nullable=True)
@staticmethod
def empty_response():
return {
"id": None,
"uprn": None,
"document_type": None,
"document_location": None,
"uploaded_at": None,
"scenario_id": None
}

View file

@ -0,0 +1,273 @@
import os
from io import BytesIO
from typing import List
from fastapi import APIRouter, Depends
from starlette.responses import Response
from backend.app.config import get_settings
from backend.app.dependencies import validate_token
from backend.app.energy_assessments.schemas import EnergyAssessmentUploadPayload
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError, OperationalError
from backend.app.db.connection import db_engine
from backend.app.db.functions.energy_assessment_functions import (
bulk_insert_energy_assessments, create_scenarios_for_documents, create_documents
)
from etl.xml_survey_extraction.XmlParser import XmlParser
from utils.s3 import (
read_from_s3, list_files_and_subfolders_in_s3_folder, list_xmls_in_s3_folder, save_csv_to_s3,
list_files_in_s3_folder
)
from utils.logger import setup_logger
logger = setup_logger()
def insert_energy_assessment_documents(document_list: List[dict], uprn_to_assessment_id: dict):
"""
Inserts or updates energy assessment documents, assigning the correct energy_assessment_id.
:param document_list: A list of dictionaries containing document data.
:param uprn_to_assessment_id: A dictionary mapping UPRN to energy_assessment_id.
"""
for document in document_list:
uprn = document['uprn']
# Assign the energy_assessment_id based on uprn
energy_assessment_id = uprn_to_assessment_id.get(uprn)
if not energy_assessment_id:
logger.info(f"No energy_assessment_id found for UPRN: {uprn}. Skipping document.")
continue
# Attach energy_assessment_id to each document
document['energy_assessment_id'] = energy_assessment_id
logger.info("Energy Assessment IDs assigned to documents.")
router = APIRouter(
prefix="/energy-assessments",
tags=["energy-assessments"],
dependencies=[Depends(validate_token)],
responses={404: {"description": "Not found"}}
)
@router.post("/upload")
async def upload(body: EnergyAssessmentUploadPayload):
"""
Given a location in S3, this service will retrieve the data in s3 and perform the following:
1) Extract the data and store it to the data
2) Extract the links to other artefacts collected during the energy assessment, such as EPRs, floor plans and
condition reports
This will allow us to do the following:
1) Present the findings of the energy assessment to the client
2) Allow the end use to download the artefacts collected during the energy assessment
Eventually, we will this service to collect the key documents from the service where they're uploaded
(e.g. Onedrive) and store them to S3, but for the moment, this is sufficient
"""
logger.info("Connecting to db")
session = sessionmaker(bind=db_engine)()
try:
logger.info("Extracting energy assessment data")
energy_assessments = list_files_and_subfolders_in_s3_folder(
bucket_name=get_settings().ENERGY_ASSESSMENTS_BUCKET,
folder_name=f"{body.surveyor}/{body.project_code}/"
)
logger.info(
f"Found {len(energy_assessments)} energy assessments for {body.surveyor} and {body.project_code}"
)
assessments_map = {}
for assessment in energy_assessments:
uploaded_xmls = list_xmls_in_s3_folder(
bucket_name=get_settings().ENERGY_ASSESSMENTS_BUCKET,
folder_name=os.path.join(assessment, "docs & plans")
)
energy_assessment_files = list_files_in_s3_folder(
bucket_name=get_settings().ENERGY_ASSESSMENTS_BUCKET,
folder_name=os.path.join(assessment, "docs & plans")
)
# Remove xmls from the list of files
energy_assessment_files = [file for file in energy_assessment_files if file not in uploaded_xmls]
# We now split this into the different types of files
# EPR
eprs = [
file for file in energy_assessment_files if "epr.pdf" in file.split("/")[-1].replace(" ", "").lower()
]
# Condition report
condition_reports = [
file for file in energy_assessment_files if "cr.pdf" in file.split("/")[-1].replace(" ", "").lower()
]
# Evidence report
evidence_reports = [
file for file in energy_assessment_files
if "evidence.pdf" in file.split("/")[-1].replace(" ", "").lower()
]
# Summary report
summary_reports = [
file for file in energy_assessment_files
if "sn.pdf" in file.split("/")[-1].replace(" ", "").lower()
]
# Floor plans - these are just the jpgs
floor_plans = [file for file in energy_assessment_files if file.endswith(".jpg")]
# We now retrieve scenarios
scenario_folders = list_files_and_subfolders_in_s3_folder(
bucket_name=get_settings().ENERGY_ASSESSMENTS_BUCKET,
folder_name=assessment
)
# filter folders that contain the word scenario
scenario_folders = [
folder for folder in scenario_folders if "scenario" in folder.rstrip("/").split("/")[-1].lower()
]
scenario_documents = []
for sf in scenario_folders:
scenario_files = list_files_in_s3_folder(
bucket_name=get_settings().ENERGY_ASSESSMENTS_BUCKET,
folder_name=sf
)
notes = [
file for file in scenario_files if "sitenotes" in file.split("/")[-1].replace(" ", "").lower()
]
# This should be the leftovers
draft_epc = [file for file in scenario_files if file not in notes]
scenario_documents.append(
{
"identifier": sf.rstrip("/").split("/")[-1],
"Scenario Site Notes": notes,
"Scenario Draft EPC": draft_epc
}
)
uprn = int(assessment.rstrip("/").split("/")[-1])
assessments_map[uprn] = {
"xmls": uploaded_xmls,
"EPR": eprs,
"Condition Report": condition_reports,
"Evidence Report": evidence_reports,
"Summary Information": summary_reports,
"Floor Plan": floor_plans,
"scenario_documents": scenario_documents
}
logger.info("Extracted energy assessment data and storing file locations to database")
xml_data_to_store = []
energy_assessment_documents = []
for uprn, files in assessments_map.items():
# Create the rows of data to insert into the energy assessment documents
property_ea_docs = []
for doc_type, doc_files in files.items():
if doc_type == "xmls":
continue
if doc_type == "scenario_documents":
for doc in doc_files:
# This scenario id is put in as a placeholder means os associating the scenario documents with
# the correct scenario
scenario_id = doc["identifier"]
for sn in doc["Scenario Site Notes"]:
property_ea_docs.append(
{
"uprn": uprn,
"document_type": "Scenario Site Notes",
"document_location": sn,
"scenario_id": scenario_id
}
)
for d_epc in doc["Scenario Draft EPC"]:
property_ea_docs.append(
{
"uprn": uprn,
"document_type": "Scenario Draft EPC",
"document_location": d_epc,
"scenario_id": scenario_id
}
)
continue
for doc in doc_files:
property_ea_docs.append(
{
"uprn": uprn,
"document_type": doc_type,
"document_location": doc,
"scenario_id": None
}
)
energy_assessment_documents.extend(property_ea_docs)
xmls = files["xmls"]
extracted_data = {}
for xml in xmls:
xml_data = read_from_s3(bucket_name=get_settings().ENERGY_ASSESSMENTS_BUCKET, s3_file_name=xml)
xml_data_io = BytesIO(xml_data)
xml_parser = XmlParser(
file=xml_data_io,
filekey=os.path.join(f"s3://{get_settings().ENERGY_ASSESSMENTS_BUCKET}", xml),
uprn=uprn,
surveyor_company=body.surveyor,
)
xml_parser.run()
if xml_parser.is_lig:
logger.info(f"Extracted data from {xml}")
extracted_epc = xml_parser.epc
extracted_additional_data = xml_parser.additional_data
data_to_update = {
**extracted_epc, **extracted_additional_data
}
# We need to update the keys to match the database schema - i.e. we should replace all hyphens with
# underscores
data_to_update = {k.replace("-", "_"): v for k, v in data_to_update.items()}
extracted_data.update(data_to_update)
xml_data_to_store.append(extracted_data)
logger.info("Storing energy assessment xml data to database")
uprn_to_assessment_id = bulk_insert_energy_assessments(session, xml_data_to_store)
# Insert energy assessment id into the documents data
insert_energy_assessment_documents(energy_assessment_documents, uprn_to_assessment_id)
create_scenarios_for_documents(session, energy_assessment_documents, uprn_to_assessment_id)
create_documents(session, energy_assessment_documents)
session.close()
except IntegrityError:
logger.error("Database integrity error occurred", exc_info=True)
session.rollback()
return Response(status_code=500, content="Database integrity error.")
except OperationalError:
logger.error("Database operational error occurred", exc_info=True)
session.rollback()
return Response(status_code=500, content="Database operational error.")
except ValueError:
logger.error("Value error - possibly due to malformed data", exc_info=True)
session.rollback()
return Response(status_code=400, content="Bad request: malformed data.")
except Exception as e: # General exception handling
logger.error(f"An error occurred: {e}")
session.rollback()
return Response(status_code=500, content="An unexpected error occurred.")
finally:
session.close()
return Response(status_code=200)

View file

@ -0,0 +1,10 @@
from pydantic import BaseModel
class EnergyAssessmentUploadPayload(BaseModel):
portfolio_id: int
# This is the energy assessment company/individual that conducted the energy assessment, where the data is uploaded
# against
surveyor: str
# is a code, like VEC001, which is used to identify the project and also where the data is uploaded against
project_code: str

View file

@ -620,6 +620,13 @@ async def trigger_plan(body: PlanTriggerRequest):
if individual_units:
# Model the solar potential at the property level
for unit in tqdm(individual_units):
# TODO: Tidy up this code
# We don't need to do this if we have global inclusions that don't include solar
if body.inclusions:
if "solar_pv" not in body.inclusions:
continue
property_instance = [p for p in input_properties if p.id == unit["property_id"]][0]
# At this level, we check if the property is suitable for solar and if now, skip
if not property_instance.is_solar_pv_valid():
@ -668,7 +675,9 @@ async def trigger_plan(body: PlanTriggerRequest):
recommendations_scoring_data = []
representative_recommendations = {}
for p in tqdm(input_properties):
recommender = Recommendations(property_instance=p, materials=materials, exclusions=body.exclusions)
recommender = Recommendations(
property_instance=p, materials=materials, exclusions=body.exclusions, inclusions=body.inclusions
)
property_recommendations, property_representative_recommendations = recommender.recommend()
if not property_recommendations:

View file

@ -1,6 +1,56 @@
from pydantic import BaseModel, conlist, validator
from typing import Optional
TYPICAL_MEASURE_TYPES = [
"wall_insulation",
"roof_insulation",
"ventilation",
"floor_insulation",
"windows",
"fireplace",
"heating",
"hot_water",
"low_energy_lighting",
"secondary_heating",
"solar_pv"
]
SPECIFIC_MEASURES = [
# Specific measures
# Walls
"internal_wall_insulation",
"external_wall_insulation",
"cavity_wall_insulation"
# Roof
"loft_insulation",
"flat_roof_insulation",
"room_roof_insulation",
# Floor
"suspended_floor_insulation",
"solid_floor_insulation",
# Heating
"boiler_upgrade",
"high_heat_retention_storage_heater",
"air_source_heat_pump",
# Specific measures that will typically come from an energy assessment
"trickle_vents",
"draught_proofing",
"mixed_glazing", # This covers partial double glazing and secondary glazing
"cavity_extract_and_refill",
]
# This allows us to extend high level categories for measures such as "wall_insulation" to the specific measures
# such as "external_wall_insulation", "internal_wall_insulation", "cavity_wall_insulation"
MEASURE_MAP = {
"wall_insulation": [
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation", "cavity_extract_and_refill"
],
"roof_insulation": ["loft_insulation", "flat_roof_insulation", "room_roof_insulation"],
"floor_insulation": ["suspended_floor_insulation", "solid_floor_insulation"],
"heating": ["boiler_upgrade", "high_heat_retention_storage_heater", "air_source_heat_pump"],
}
class PlanTriggerRequest(BaseModel):
budget: Optional[float] = None
@ -13,33 +63,13 @@ class PlanTriggerRequest(BaseModel):
patches_file_path: Optional[str] = None
non_invasive_recommendations_file_path: Optional[str] = None
exclusions: Optional[conlist(str, min_items=1)] = None
inclusions: Optional[conlist(str, min_items=1)] = None
scenario_name: Optional[str] = ""
# If true, will allow us to create multiple plans for the same portfolio, whereas if this is false, if this property
# exists in the portfolio, it will be ignored
multi_plan: Optional[bool] = False
# Pre-defined list of possibilities for exclusions
_allowed_exclusions = {
# Measure classes
"wall_insulation",
"ventilation",
"roof_insulation",
"floor_insulation",
"windows",
"fireplace",
"heating",
"hot_water",
"lighting",
"solar_pv",
# Specific measures
"air_source_heat_pump",
"internal_wall_insulation",
"external_wall_insulation",
"secondary_heating",
"boiler_upgrade",
"high_heat_retention_storage_heater",
}
_allowed_goals = {"Increasing EPC"}
_allowed_housing_types = {"Social", "Private"}
@ -47,10 +77,16 @@ class PlanTriggerRequest(BaseModel):
# Validator to ensure exclusions are within the pre-defined possibilities
@validator('exclusions', each_item=True)
def check_exclusions(cls, v):
if v not in cls._allowed_exclusions:
if v not in TYPICAL_MEASURE_TYPES + SPECIFIC_MEASURES:
raise ValueError(f"{v} is not an allowed exclusion")
return v
@validator('inclusions', each_item=True)
def check_inclusions(cls, v):
if v not in TYPICAL_MEASURE_TYPES + SPECIFIC_MEASURES:
raise ValueError(f"{v} is not an allowed inclusion")
return v
# Validator to ensure that the goal is within the pre-defined possibilities
@validator('goal')
def check_goal(cls, v):

View file

@ -95,6 +95,31 @@ epc_data["eligibility_type"] = np.where(
epc_data["eligibility_type"]
)
# Example EPCS to analysis
analysis_epcs = epc_data[~pd.isnull(epc_data["eligibility_type"])].copy()
# Keep just columns we need
analysis_epcs = analysis_epcs[
[
"UPRN", "TENURE", "CURRENT_ENERGY_RATING", "WALLS_DESCRIPTION", "ROOF_DESCRIPTION",
"CONSTRUCTION_AGE_BAND", "TOTAL_FLOOR_AREA", "PROPERTY_TYPE", "BUILT_FORM", "MAINHEAT_DESCRIPTION",
"eligibility_type",
]
]
analysis_epcs["grouped_epc_band"] = np.where(
analysis_epcs["CURRENT_ENERGY_RATING"].isin(["D"]),
"EPC D",
"EPC E-G"
)
analysis_epcs.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Customers/bcc tender/analysis_epcs.csv", index=False)
# Create aggregations and we store this information
agg_cols = ["CURRENT_ENERGY_RATING", "CONSTRUCTION_AGE_BAND", "PROPERTY_TYPE", "BUILT_FORM", "grouped_epc_band"]
agg_cols = ["WALLS_DESCRIPTION", "ROOF_DESCRIPTION", "MAINHEAT_DESCRIPTION"]
for col in agg_cols:
agg_df = analysis_epcs.groupby([col]).size().reset_index(name="Number of Properties")
agg_df["Percentage of Properties"] = 100 * agg_df["Number of Properties"] / agg_df["Number of Properties"].sum()
agg_df.to_csv(f"/Users/khalimconn-kowlessar/Documents/hestia/Customers/bcc tender/{col}.csv", index=False)
# Eligibiilty 6: GBIS General Eligibility, Social - tenure is social rented and EPC rating D-G, but also the property
# should be rented out below market rate
# This is a subset of Eligibility 3 - we likely don't need to do any scaling

View file

@ -0,0 +1,196 @@
import pandas as pd
from utils.s3 import save_csv_to_s3
def app():
# This is the payload to be used to extract the energy assessment data from s3 and upload it to the database,
# as well as produce links to each of the uploaded documents.
portfolio_id = 101
body = {
"portfolio_id": portfolio_id,
"surveyor": "JAFFERSONS ENERGY CONSULTANTS",
"project_code": "VEC001",
}
# These are the recommendations based on the on-site survey of the property.
non_intrusive_recommendations = [
{
# 2 Grove Mansions
"uprn": 121016121,
"recommendations": [
{
"type": "draught_proofing",
"cost": 123,
"survey": True,
"sap_points": 1
},
{
"type": "mixed_glazing", "cost": 12345, "survey": True,
"description": "Install double glazing to north facing windows and secondary glazing to the "
"remaining windows at the front of the building",
"sap_points": 3
},
{"type": "trickle_vents", "cost": 500, "survey": True},
{"type": "suspended_floor_insulation", "cost": None, "survey": True, "sap_points": 2},
{"type": "internal_wall_insulation", "cost": None, "survey": True, "sap_points": 5},
]
},
{
# 8 Grove Mansions
"uprn": 10024087855,
"recommendations": [
{"type": "draught_proofing", "cost": 123, "survey": True, "sap_points": 2},
{
"type": "mixed_glazing", "cost": 12345, "survey": True,
"description": "Install double glazing to north facing windows and secondary glazing to the "
"remaining windows at the front of the building",
"sap_points": 4
},
{"type": "trickle_vents", "cost": 500, "survey": True},
{"type": "low_energy_lighting", "cost": None, "survey": True, "sap_points": 0},
{"type": "internal_wall_insulation", "cost": None, "survey": True, 'sap_points': 5},
]
},
{
# 9 Grove Mansions
"uprn": 121016128,
"recommendations": [
{"type": "draught_proofing", "cost": 123, "survey": True, "sap_points": 1},
{
"type": "mixed_glazing", "cost": 12345, "survey": True,
"description": "Install double glazing to north facing windows and secondary glazing to the "
"remaining windows at the front of the building",
"sap_points": 3
},
{"type": "trickle_vents", "cost": 500, "survey": True},
{"type": "low_energy_lighting", "cost": None, "survey": True, "sap_points": 1},
{"type": "suspended_floor_insulation", "cost": None, "sap_points": 1},
{"type": "internal_wall_insulation", "cost": None, "survey": True, "sap_points": 6},
]
},
{
# 5 Grove Mansions
"uprn": 121016124,
"recommendations": [
{
"type": "mixed_glazing", "cost": 12345, "survey": True,
"description": "Install double glazing to north facing windows and secondary glazing to the "
"remaining windows at the front of the building",
"sap_points": 5
},
{"type": "trickle_vents", "cost": 500, "survey": True},
{"type": "low_energy_lighting", "cost": None, "survey": True, "sap_points": 2},
{"type": "internal_wall_insulation", "cost": None, "survey": True, "sap_points": 8},
]
},
{
# 14 Grove Mansions
"uprn": 121016117,
"recommendations": [
{"type": "draught_proofing", "cost": 123, "survey": True, "sap_points": 1},
{
"type": "mixed_glazing", "cost": 12345, "survey": True,
"description": "Install double glazing to north facing windows and secondary glazing to the "
"remaining windows at the front of the building",
"sap_points": 4
},
{"type": "trickle_vents", "cost": 500, "survey": True},
{"type": "low_energy_lighting", "cost": None, "survey": True, "sap_points": 1},
{"type": "internal_wall_insulation", "cost": None, "survey": True, "sap_points": 6},
]
},
{
# 19 Grove Mansions
"uprn": 10024087902,
"recommendations": [
{"type": "low_energy_lighting", "cost": None, "survey": True, "sap_points": 0},
{"type": "internal_wall_insulation", "cost": None, "survey": True, "sap_points": 2},
{"type": "room_roof_insulation", "cost": None, "survey": True, "sap_points": 16},
]
},
]
asset_list = [
{
"uprn": 121016121, "address": "", "postcode": ""
},
{
"uprn": 10024087855, "address": "", "postcode": ""
},
{
"uprn": 121016128, "address": "", "postcode": ""
},
{
"uprn": 121016124, "address": "", "postcode": ""
},
{
"uprn": 121016117, "address": "", "postcode": ""
},
{
"uprn": 10024087902, "address": "", "postcode": ""
},
]
asset_list = pd.DataFrame(asset_list)
filename = f"{8}/{portfolio_id}/asset_list.csv"
save_csv_to_s3(
dataframe=asset_list,
bucket_name="retrofit-plan-inputs-dev",
file_name=filename
)
# TODO Create asset list
# TODO: Store asset list & non_intrusive_recommendations
# Store non-invasive recommendations in S3
non_invasive_recommendations_filename = f"{8}/{portfolio_id}/non_invasive_recommendations.json"
save_csv_to_s3(
dataframe=pd.DataFrame(non_intrusive_recommendations),
bucket_name="retrofit-plan-inputs-dev",
file_name=non_invasive_recommendations_filename
)
# This is the first scenario which includes the first batch of recommendations
body1 = {
"portfolio_id": str(portfolio_id),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "A",
"trigger_file_path": filename,
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
"inclusions": [
"draught_proofing", "mixed_glazing", "trickle_vents", "low_energy_lighting",
],
"budget": None,
"scenario_name": "Quick wins - do now while tenanted",
"multi_plan": True,
}
# This is the second scenario which includes the second batch of recommendations
body2 = {
"portfolio_id": str(portfolio_id),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "A",
"trigger_file_path": filename,
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
"inclusions": [
"draught_proofing",
"mixed_glazing",
"trickle_vents",
"low_energy_lighting",
"suspended_floor_insulation",
"internal_wall_insulation"
],
"budget": None,
"scenario_name": "Do when void",
"multi_plan": True,
}
print(body1)
print(body2)

View file

@ -9,6 +9,7 @@ from etl.xml_survey_extraction.pcdb import heating_data
PROPERTY_TYPE_LOOKUP = {
"0": "House",
"House": "House",
"2": "Flat"
}
@ -106,6 +107,8 @@ class XmlParser:
BUILT_FORM_MAP = {
"1": "Detached",
"3": "End-Terrace",
"4": "Mid-Terrace",
}
GLAZED_AREA_MAP = {
@ -121,7 +124,9 @@ class XmlParser:
}
TENURE_MAP = {
'1': "Owner-occupied"
"1": "Owner-occupied",
"2": "Rented (social)",
"3": "Rented (private)",
}
TARIFF_MAP = {
@ -179,6 +184,39 @@ class XmlParser:
# Put together all of the additional data we capture
self.extract_additional_data()
def _parse_heat_loss_corridor(self):
hlc_lookup = {"2": "unheated corridor", "Unheated": "unheated corridor"}
if self.is_lig:
heat_loss_corridor = self.get_node_value('Heat-Loss-Corridor')
else:
# For some reason, this tag is spelt incorrectly in the rdsap xml
heat_loss_corridor = self.get_node_value('FlatCoridor')
return hlc_lookup[heat_loss_corridor]
def _parse_heat_loss_corridor_length(self):
if self.is_lig:
return self.get_node_value('Unheated-Corridor-Length')
return self.get_node_value('FlatShelteredWallLength')
def _parse_flat_storey_count(self):
# in the EPR the tag is Storeys
if self.is_lig:
storeys = None
else:
storeys = self.get_node_value('Storeys')
return storeys
def _parse_flat_top_storey(self):
if self.is_lig:
return self.get_node_value('Top-Storey')
return None
def _parse_floor_level(self):
if self.is_lig:
flat_details = self.xml.getElementsByTagName('SAP-Flat-Details')[0]
return flat_details.getElementsByTagName("Level")[0].firstChild.nodeValue
return None
def extract_epc(self):
if self.floor_dimensions is None:
@ -190,15 +228,18 @@ class XmlParser:
property_type = self.get_property_type()
if property_type == "Flat":
raise NotImplementedError(
"Need to handle: heat-loss-corridor, unheated-corridor-length, flat-storey-count, flat-top-storey, "
"floor-level"
)
heat_loss_corridor = "NO DATA!"
unheated_corridor_length = ""
flat_storey_count = ""
flat_top_storey = ""
floor_level = "NO DATA!"
heat_loss_corridor = self._parse_heat_loss_corridor()
unheated_corridor_length = self._parse_heat_loss_corridor_length()
flat_storey_count = self._parse_flat_storey_count()
flat_top_storey = self._parse_flat_top_storey()
floor_level = self._parse_floor_level()
else:
heat_loss_corridor = "NO DATA!"
unheated_corridor_length = ""
flat_storey_count = ""
flat_top_storey = ""
floor_level = "NO DATA!"
floor_height = np.mean([
float(x['room_height']) for x in self.floor_dimensions if
@ -384,9 +425,17 @@ class XmlParser:
}
cylinder_insulation_type = {
None: "",
"1": "Foam",
}
cylinder_insulation_thickness = int(
self.get_node_value('Cylinder-Insulation-Thickness')
) if self.get_node_value('Cylinder-Insulation-Thickness') else None
cylinder_thermostat = boolean_lookup[self.get_node_value('Cylinder-Thermostat')] \
if self.get_node_value('Cylinder-Thermostat') else None
self.additional_data = {
"file_location": self.filekey,
"surveyor_name": self.surveyor_name,
@ -408,8 +457,8 @@ class XmlParser:
"percent_draftproofed": int(self.get_node_value('Percent-Draughtproofed')),
"has_hot_water_cylinder": boolean_lookup[self.get_node_value('Has-Hot-Water-Cylinder')],
"cylinder_insulation_type": cylinder_insulation_type[self.get_node_value('Cylinder-Insulation-Type')],
"cylinder_insulation_thickness": int(self.get_node_value('Cylinder-Insulation-Thickness')),
"cylinder_thermostat": boolean_lookup[self.get_node_value('Cylinder-Thermostat')],
"cylinder_insulation_thickness": cylinder_insulation_thickness,
"cylinder_thermostat": cylinder_thermostat,
"main_dwelling_ground_floor_area": float(main_dwelling_ground_floor_area),
"number_of_windows": int(number_of_windows),
"windows_area": float(windows_area),
@ -471,6 +520,13 @@ class XmlParser:
if not property_type:
property_type = self.xml.getElementsByTagName('PropertyType1')
if len(property_type) > 1:
property_types = {PROPERTY_TYPE_LOOKUP[p.firstChild.nodeValue] for p in property_type}
if len(property_types) > 1:
raise ValueError("Multiple property types found")
return property_types.pop()
return PROPERTY_TYPE_LOOKUP[property_type[0].firstChild.nodeValue]
def get_sap(self):
@ -683,6 +739,30 @@ class XmlParser:
self.perimeter = self.heat_loss_perimeter + self.party_wall_length
@staticmethod
def _parse_windows_content(window, glazing_type_lookup, orientation_lookup):
# There may not be a pvc frame
pvc_frame = window.getElementsByTagName("PVC-Frame")
pvc_frame = pvc_frame[0].firstChild.nodeValue if pvc_frame else None
# There may not be a glazing gap for single glazed windows
glazing_gap = window.getElementsByTagName("Glazing-Gap")
glazing_gap = glazing_gap[0].firstChild.nodeValue if glazing_gap else None
parsed = {
"window_location": window.getElementsByTagName("Window-Location")[0].firstChild.nodeValue,
"window_area": window.getElementsByTagName("Window-Area")[0].firstChild.nodeValue,
"window_type": window.getElementsByTagName("Window-Type")[0].firstChild.nodeValue,
"glazing_type": glazing_type_lookup[
window.getElementsByTagName("Glazing-Type")[0].firstChild.nodeValue
],
"pvc_frame": pvc_frame,
"glazing_gap": glazing_gap,
"orientation": orientation_lookup[window.getElementsByTagName("Orientation")[0].firstChild.nodeValue]
}
return parsed
def get_windows(self):
"""
Extracts data about the windows in the property, including the number of windows and the window type.
@ -692,7 +772,8 @@ class XmlParser:
sap_windows = self.xml.getElementsByTagName("SAP-Windows")[0].getElementsByTagName("SAP-Window")
glazing_type_lookup = {
"3": "double glazing, unknown install date"
"3": "double glazing, unknown install date",
"5": "Single glazing",
}
orientation_lookup = {
@ -707,15 +788,9 @@ class XmlParser:
}
self.windows = [
{
"window_location": window.getElementsByTagName("Window-Location")[0].firstChild.nodeValue,
"window_area": window.getElementsByTagName("Window-Area")[0].firstChild.nodeValue,
"window_type": window.getElementsByTagName("Window-Type")[0].firstChild.nodeValue,
"glazing_type": glazing_type_lookup[
window.getElementsByTagName("Glazing-Type")[0].firstChild.nodeValue
],
"pvc_frame": window.getElementsByTagName("PVC-Frame")[0].firstChild.nodeValue,
"glazing_gap": window.getElementsByTagName("Glazing-Gap")[0].firstChild.nodeValue,
"orientation": orientation_lookup[window.getElementsByTagName("Orientation")[0].firstChild.nodeValue]
} for window in sap_windows
self._parse_windows_content(
window=window,
glazing_type_lookup=glazing_type_lookup,
orientation_lookup=orientation_lookup
) for window in sap_windows
]

View file

@ -12,14 +12,15 @@ logger = setup_logger()
BUCKET = "retrofit-energy-assessments-dev"
USER_ID = 8
non_invasive_recommendations_filepath = "{USER_ID}/{PORTFOLIO_ID}/non_invasive_recommendations.csv"
SCENARIOS = {
86: {
"project_code": "VDE001",
101: {
"project_code": "VEC001",
"surveyor": "JAFFERSONS ENERGY CONSULTANTS",
"bodies": [
# Scenario A: Cavity wall insulation
{
"portfolio_id": str(86),
"portfolio_id": str(101),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "A",
@ -27,14 +28,16 @@ SCENARIOS = {
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": "",
"exclusions": ["floor_insulation", "fireplace", "solar_pv", "heating", 'lighting'],
"inclusions": [
"draught_proofing", "secondary_glazing", "trickle_vents", "low_energy_lighting",
],
"budget": None,
"scenario_name": "Low Hanging Fruit",
"scenario_name": "Quick wins - do now while tenanted",
"multi_plan": True,
},
# Scenario B: CWI, Solar PV, AHSP
{
"portfolio_id": str(86),
"portfolio_id": str(101),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "A",
@ -42,66 +45,97 @@ SCENARIOS = {
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": "",
"exclusions": ["floor_insulation", "fireplace", 'lighting'],
"inclusions": [
"draught_proofing",
"secondary_glazing",
"trickle_vents",
"low_energy_lighting",
"suspended_floor_insulation",
"internal_wall_insulation"
],
"budget": None,
"scenario_name": "Deep Retrofit",
"scenario_name": "Do when void",
"multi_plan": True,
},
# Scenario C, CWI, floor insulation, PV, AHSP
{
"portfolio_id": str(86),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "A",
"trigger_file_path": "",
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": "",
"exclusions": ["fireplace", 'lighting'],
"budget": None,
"scenario_name": "Whole House Retrofit",
"multi_plan": True,
}
]
},
87: {
"project_code": "VDE002",
"surveyor": "JAFFERSONS ENERGY CONSULTANTS",
"bodies": [
# Scenario A: Solar PV, AHSP
{
"portfolio_id": str(87),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "A",
"trigger_file_path": "",
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": "",
"exclusions": ["floor_insulation", "fireplace"],
"budget": None,
"scenario_name": "Deep Retrofit",
"multi_plan": True,
},
# Scenario B, floor insulation, PV, AHSP
{
"portfolio_id": str(87),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "A",
"trigger_file_path": "",
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": "",
"exclusions": ["fireplace"],
"budget": None,
"scenario_name": "Whole House Retrofit",
"multi_plan": True,
}
]
}
}
# TODO: These non-intrusive recommendations should be detected from the EPRs, the scenarios and the condition report?
# For recommendations like trickle vents, we can deduce this from the condition report, depending on the
# ventilation of the room and the presence of trickle vents.
NON_INTRUSITVE_RECOMMENDATIONS = [
{
# 2 Grove Mansions
"uprn": 121016121,
"recommendations": [
{
"type": "draught_proofing",
"cost": None,
"survey": True
},
{"type": "secondary_glazing", "cost": None, "survey": True},
{"type": "trickle_vents", "cost": None, "survey": True},
{"type": "suspended_floor_insulation", "cost": None, "survey": True},
{"type": "internal_wall_insulation", "cost": None, "survey": True},
]
},
{
# 8 Grove Mansions
"uprn": 10024087855,
"recommendations": [
{"type": "draught_proofing", "cost": None, "survey": True},
{"type": "secondary_glazing", "cost": None, "survey": True},
{"type": "trickle_vents", "cost": None, "survey": True},
{"type": "low_energy_lighting", "cost": None, "survey": True},
{"type": "internal_wall_insulation", "cost": None, "survey": True},
]
},
{
# 9 Grove Mansions
"uprn": 121016128,
"recommendations": [
{"type": "draught_proofing", "cost": None, "survey": True},
{"type": "secondary_glazing", "cost": None, "survey": True},
{"type": "trickle_vents", "cost": None, "survey": True},
{"type": "low_energy_lighting", "cost": None, "survey": True},
{"type": "suspended_floor_insulation", "cost": None},
{"type": "internal_wall_insulation", "cost": None, "survey": True},
]
},
{
# 5 Grove Mansions
"uprn": 121016124,
"recommendations": [
{"type": "secondary_glazing", "cost": None, "survey": True},
{"type": "trickle_vents", "cost": None, "survey": True},
{"type": "low_energy_lighting", "cost": None, "survey": True},
{"type": "internal_wall_insulation", "cost": None, "survey": True},
]
},
{
# 14 Grove Mansions
"uprn": 121016117,
"recommendations": [
{"type": "draught_proofing", "cost": None, "survey": True},
{"type": "secondary_glazing", "cost": None, "survey": True},
{"type": "trickle_vents", "cost": None, "survey": True},
{"type": "low_energy_lighting", "cost": None, "survey": True},
{"type": "internal_wall_insulation", "cost": None, "survey": True},
]
},
{
# 19 Grove Mansions
"uprn": 121016117,
"recommendations": [
{"type": "low_energy_lighting", "cost": None, "survey": True},
{"type": "secondary_glazing", "cost": None, "survey": True},
{"type": "internal_wall_insulation", "cost": None, "survey": True},
{"type": "room_roof_insulation", "cost": None, "survey": True},
]
},
]
def main():
"""
@ -166,7 +200,7 @@ def main():
# For each property, we download the xmls and extract the data
database_data = []
for uprn, xmls in assessments_map.items():
extracted_data = {}
for xml in xmls:
xml_data = read_from_s3(bucket_name=BUCKET, s3_file_name=xml)

View file

@ -176,7 +176,7 @@ module "retrofit_hotwater_kwh_predictions" {
}
module "retrofit_energy_assessments" {
source = "./modules/s3"
source = "./modules/s3_presignable_bucket"
bucketname = "retrofit-energy-assessments-${var.stage}"
allowed_origins = var.allowed_origins
}

View file

@ -0,0 +1,56 @@
from backend.Property import Property
class DraughtProofingRecommendations:
def __init__(self, property_instance: Property):
self.property = property_instance
self.recommendation = []
def recommend(self):
"""
In some cases, we can identify the need for draught proofing from the EPC recommendations, however the initial
implementation of this class will just assume that we are picking up a non-invasive recommendation from the
survey
"""
# For the moment, draught proofing doesn't have a phase impact
draught_proofing_recommendation_config = next(
(r for r in self.property.non_invasive_recommendations if
r["type"] == "draught_proofing"),
{}
)
if not draught_proofing_recommendation_config:
return
description = (
"Draught proof doors and windows to improve energy efficiency" if
not draught_proofing_recommendation_config.get("description")
else draught_proofing_recommendation_config["description"]
)
# We recommend installing two mechanical ventilation systems
self.recommendation = [
{
"phase": None,
"parts": [],
"type": "draught_proofing",
"description": description,
"starting_u_value": None,
"new_u_value": None,
"already_installed": False,
"sap_points": draught_proofing_recommendation_config["sap_points"],
"heat_demand": 0,
"kwh_savings": 0,
"co2_equivalent_savings": 0,
"energy_cost_savings": 0,
"total": draught_proofing_recommendation_config["cost"],
# We use a very simple and rough estimate of 4 hours per unit
"labour_hours": draught_proofing_recommendation_config.get("labour_hours", 8),
"labour_days": draught_proofing_recommendation_config.get("labour_days", 1), # Assume 8 hour day
"survey": True
}
]

View file

@ -5,6 +5,7 @@ import pandas as pd
from BaseUtility import Definitions
from datatypes.enums import QuantityUnits
from backend.app.plan.schemas import MEASURE_MAP
from backend.Property import Property
from recommendations.recommendation_utils import (
r_value_per_mm_to_u_value, calculate_u_value_uplift, is_diminishing_returns, update_lowest_selected_u_value,
@ -63,14 +64,13 @@ class FloorRecommendations(Definitions):
]
]
self.exposed_floor_insulation_materials = [
part for part in materials if part["type"] == "exposed_floor_insulation"
]
def recommend(self, phase=0, measures=None):
# TODO: To be completed
self.exposed_floor_non_insulation_materials = []
measures = MEASURE_MAP["floor_insulation"] if measures is None else measures
if not measures:
return
def recommend(self, phase=0):
u_value = self.property.floor["thermal_transmittance"]
property_type = self.property.data["property-type"]
floor_area = self.property.insulation_floor_area
@ -124,7 +124,7 @@ class FloorRecommendations(Definitions):
self.property.floor["is_suspended"] or
self.property.floor["is_to_unheated_space"] or
self.property.floor["is_to_external_air"]
):
) and "suspended_floor_insulation" in measures:
# Given the U-value, we recommend underfloor insulation
self.recommend_floor_insulation(
phase=phase,
@ -134,7 +134,7 @@ class FloorRecommendations(Definitions):
)
return
if self.property.floor["is_solid"]:
if self.property.floor["is_solid"] and "solid_floor_insulation" in measures:
# Given the U-value, we recommend solid floor insulation options which are usually solid foam
self.recommend_floor_insulation(
u_value=u_value,

View file

@ -1,6 +1,7 @@
from recommendations.Costs import Costs, BOILER_UPGRADE_SCHEME_ASHP_VALUE
from recommendations.recommendation_utils import check_simulation_difference, override_costs
from backend.Property import Property
from backend.app.plan.schemas import MEASURE_MAP
from etl.epc_clean.epc_attributes.MainheatAttributes import MainHeatAttributes
from etl.epc_clean.epc_attributes.HotWaterAttributes import HotWaterAttributes
from etl.epc_clean.epc_attributes.MainFuelAttributes import MainFuelAttributes
@ -28,7 +29,7 @@ class HeatingRecommender:
self.property.main_heating["clean_description"] in self.ELECTRIC_HEATING_DESCRIPTIONS
)
def is_high_heat_retention_valid(self, ashp_only_heating_recommendation, exclusions):
def is_high_heat_retention_valid(self, ashp_only_heating_recommendation, measures):
"""
Check conditions if high heat retention storage is valid
:return:
@ -43,10 +44,11 @@ class HeatingRecommender:
has_electric = self.has_electric_heating_description or electric_heating_assumed
return (
has_electric and (not ashp_only_heating_recommendation) and ("boiler_upgrade" not in exclusions)
has_electric and (not ashp_only_heating_recommendation) and
("high_heat_retention_storage_heater" in measures)
)
def is_boiler_upgrade_suitable(self, exclusions, ashp_only_heating_recommendation):
def is_boiler_upgrade_suitable(self, measures, ashp_only_heating_recommendation):
"""
These are the conditions we apply to recommend a boiler installation
:return:
@ -84,12 +86,12 @@ class HeatingRecommender:
portable_heaters_has_mains
) and
(not ashp_only_heating_recommendation) and
("boiler_upgrade" not in exclusions)
("boiler_upgrade" in measures)
)
return is_valid, has_boiler
def recommend(self, has_cavity_or_loft_recommendations, phase=0, exclusions=None):
def recommend(self, has_cavity_or_loft_recommendations, phase=0, measures=None):
"""
Produces heating recommendations
@ -97,16 +99,18 @@ class HeatingRecommender:
recommendation. If there are cavity or loft recommendations, the property would need to complete those measures
before being able to get the boiler upgrade scheme benefits. The messaging in the front end would be to
:param phase: indicates the phase of the retrofit programme
:param exclusions: A list of exclusions for the recommendations
:param measures: A list of measures for the recommendations
"""
measures = MEASURE_MAP["heating"] if measures is None else measures
# TODO: We could have a system flush recommendation for an existing boiler, where there is no need to replace
# the boiler, but instead flushing the system will make it run more efficiently. There is a cost for this
# in the Costs class, stored as SYSTEM_FLUSH_COST
# TODO: Right now, we don't have recommendations for electric boilers - we should probably have one
exclusions = [] if exclusions is None else exclusions
# if we have a non-invasive ashp recommendation, we get the configuration directly from the property instance
non_invasive_ashp_recommendation = next(
(r for r in self.property.non_invasive_recommendations if r["type"] == "air_source_heat_pump"),
{"suitable": True}
@ -122,7 +126,7 @@ class HeatingRecommender:
# This first iteration of the recommender will provide very basic recommendation
# We recommend heating controls based on the main heating system
hhr_valid = self.is_high_heat_retention_valid(ashp_only_heating_recommendation, exclusions)
hhr_valid = self.is_high_heat_retention_valid(ashp_only_heating_recommendation, measures)
if hhr_valid:
# Recommend high heat retention storage heaters
@ -131,7 +135,7 @@ class HeatingRecommender:
self.recommend_hhr_storage_heaters(phase=phase, system_change=True, heating_controls_only=False)
gas_boiler_suitable, has_boiler = self.is_boiler_upgrade_suitable(
exclusions=exclusions, ashp_only_heating_recommendation=ashp_only_heating_recommendation
measures=measures, ashp_only_heating_recommendation=ashp_only_heating_recommendation
)
if gas_boiler_suitable:
@ -153,7 +157,7 @@ class HeatingRecommender:
# In the future, we'll allow overrides, so that non-intrusive surveys can contradict these conditions
# and either allow or prevent the recommendation of an air source heat pump
if self.property.is_ashp_valid(exclusions=exclusions) and non_invasive_ashp_recommendation["suitable"]:
if self.property.is_ashp_valid(measures=measures) and non_invasive_ashp_recommendation["suitable"]:
self.recommend_air_source_heat_pump(
phase=phase,
has_cavity_or_loft_recommendations=has_cavity_or_loft_recommendations,

View file

@ -66,6 +66,11 @@ class LightingRecommendations:
if self.property.lighting["low_energy_proportion"] == 100:
return
leds_recommendation_config = next(
(r for r in self.property.non_invasive_recommendations if r["type"] == "low_energy_lighting"),
{}
)
number_lighting_outlets = self.property.number_lighting_outlets
# Number non lel outlets
@ -79,6 +84,9 @@ class LightingRecommendations:
return
# Get the cost of the fittings
if leds_recommendation_config.get("cost"):
raise NotImplementedError("Costs from for low energy lighting have not been implemented")
cost_result = self.costs.low_energy_lighting(
number_of_lights=number_non_lel_outlets,
number_current_lel_lights=number_lighting_outlets - number_non_lel_outlets,
@ -97,6 +105,12 @@ class LightingRecommendations:
cost_result = override_costs(cost_result)
description = "Low energy lighting has already been installed, no further action required"
if leds_recommendation_config.get("sap_points") is not None:
# This could be zero points
sap_points = leds_recommendation_config["sap_points"]
else:
sap_points = round(2 * (number_non_lel_outlets / number_lighting_outlets), 2)
self.recommendation = [
{
"phase": phase,
@ -108,13 +122,14 @@ class LightingRecommendations:
"already_installed": already_installed,
# For SAP points, we use the fact that lighting is usually worth 2 points and we scale this to
# the proportion of lights that will be set to low energy
"sap_points": round(2 * (number_non_lel_outlets / number_lighting_outlets), 2),
"sap_points": sap_points,
"kwh_savings": heat_demand_change,
"co2_equivalent_savings": carbon_change,
"description_simulation": {
"lighting-energy-eff": "Very Good",
"lighting-description": "Low energy lighting in all fixed outlets",
},
**cost_result
**cost_result,
"survey": leds_recommendation_config.get("survey", False)
}
]

View file

@ -14,9 +14,11 @@ from recommendations.WindowsRecommendations import WindowsRecommendations
from recommendations.HeatingRecommender import HeatingRecommender
from recommendations.HotwaterRecommendations import HotwaterRecommendations
from recommendations.SecondaryHeating import SecondaryHeating
from recommendations.DraughtProofingRecommendations import DraughtProofingRecommendations
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
from backend.apis.GoogleSolarApi import GoogleSolarApi
import backend.app.assumptions as assumptions
from backend.app.plan.schemas import TYPICAL_MEASURE_TYPES, SPECIFIC_MEASURES, MEASURE_MAP
ASHP_COP = 3
STARTING_DUMMY_ID_VALUE = -9999
@ -32,15 +34,24 @@ class Recommendations:
property_instance: Property,
materials: List,
exclusions: List[str] = None,
inclusions: List[str] = None,
):
"""
:param property_instance: Instance of the Property class, for the home associated to property_id
:param materials: List of materials to be used in the recommendations
:param exclusions: List of specific measures or measure types to exclude from recommendations. Defaulted to
None, meaning no exclusions to be applied
:param inclusions: List of specific measures of measure types to include. Defaulted to None, meaning all
measures are included
"""
self.property_instance = property_instance
self.materials = materials
self.exclusions = exclusions if exclusions else []
self.inclusions = inclusions if inclusions else []
self.all_typical_measures = TYPICAL_MEASURE_TYPES
self.all_specific_measures = SPECIFIC_MEASURES
self.floor_recommender = FloorRecommendations(property_instance=property_instance, materials=materials)
self.wall_recomender = WallRecommendations(property_instance=property_instance, materials=materials)
@ -48,6 +59,7 @@ class Recommendations:
self.ventilation_recomender = VentilationRecommendations(
property_instance=property_instance, materials=materials
)
self.draught_proofing_recommender = DraughtProofingRecommendations(property_instance=property_instance)
self.fireplace_recommender = FireplaceRecommendations(property_instance=property_instance)
self.lighting_recommender = LightingRecommendations(property_instance=property_instance, materials=materials)
self.windows_recommender = WindowsRecommendations(property_instance=property_instance, materials=materials)
@ -56,6 +68,27 @@ class Recommendations:
self.hotwater_recommender = HotwaterRecommendations(property_instance=property_instance)
self.secondary_heating_recommender = SecondaryHeating(property_instance=property_instance)
def find_included_measures(self):
"""
Determines the set of measures to be included in recommendations
"""
# Generally, inclusions is a global option and will overrule specific property non-invasive recommendations.
# This is done so that we can use inclusions to specify scenarios.
inclusions_full = [MEASURE_MAP[x] if x in MEASURE_MAP else x for x in self.inclusions]
exclusions_full = [MEASURE_MAP[x] if x in MEASURE_MAP else x for x in self.exclusions]
if inclusions_full and exclusions_full:
# All typical measures
return self.all_specific_measures
if inclusions_full:
return inclusions_full
if exclusions_full:
return [m for m in self.all_specific_measures if m not in exclusions_full]
def recommend(self):
"""
@ -68,19 +101,18 @@ class Recommendations:
property_recommendations = []
phase = 0
measures = self.find_included_measures()
# Building Fabric
if "wall_insulation" not in self.exclusions:
self.wall_recomender.recommend(phase=phase, exclusions=self.exclusions)
if self.wall_recomender.recommendations:
property_recommendations.append(self.wall_recomender.recommendations)
phase += 1
self.wall_recomender.recommend(phase=phase, measures=measures)
if self.wall_recomender.recommendations:
property_recommendations.append(self.wall_recomender.recommendations)
phase += 1
if "roof_insulation" not in self.exclusions:
self.roof_recommender.recommend(phase=phase)
if self.roof_recommender.recommendations:
property_recommendations.append(self.roof_recommender.recommendations)
phase += 1
self.roof_recommender.recommend(phase=phase, measures=measures)
if self.roof_recommender.recommendations:
property_recommendations.append(self.roof_recommender.recommendations)
phase += 1
# Ventilation recommendations
# We only produce a ventilation recommendation if the property is recommended to have wall or roof
@ -90,103 +122,122 @@ class Recommendations:
# real impact on the SAP score. Therefore, we don't need to include phasing for ventilation. If we
# have any
# wall or roof recommendations, we will ensure that ventilation is included in the simulation
if "ventilation" not in self.exclusions:
if self.wall_recomender.recommendations or self.roof_recommender.recommendations:
self.ventilation_recomender.recommend()
if self.ventilation_recomender.recommendation:
property_recommendations.append(self.ventilation_recomender.recommendation)
if (
(self.wall_recomender.recommendations or self.roof_recommender.recommendations) and
("ventilation" in measures)
):
self.ventilation_recomender.recommend()
if self.ventilation_recomender.recommendation:
property_recommendations.append(self.ventilation_recomender.recommendation)
if "floor_insulation" not in self.exclusions:
self.floor_recommender.recommend(phase=phase)
if "trickle_vents" in measures:
# This is a recommendatin that typically comes from an energy assessment
trickle_vents_rec = self.ventilation_recomender.recommend_trickle_vents()
if trickle_vents_rec:
property_recommendations.append(trickle_vents_rec)
if "draught_proofing" in measures:
# This is a recommendation that in some instances we can recommend, by deducing it from the SAP
# recommendations, however we will implement this later
self.draught_proofing_recommender.recommend()
if self.draught_proofing_recommender.recommendation:
property_recommendations.append(self.draught_proofing_recommender.recommendation)
if "floor_insulation" in measures:
self.floor_recommender.recommend(phase=phase, measures=measures)
if self.floor_recommender.recommendations:
property_recommendations.append(self.floor_recommender.recommendations)
phase += 1
if "windows" not in self.exclusions:
if "windows" in measures:
self.windows_recommender.recommend(phase=phase)
if self.windows_recommender.recommendation:
property_recommendations.append(self.windows_recommender.recommendation)
phase += 1
if "fireplace" not in self.exclusions:
if "mixed_glazing" in measures:
# This is a recommendation that comes exclusively from an energy assessment
mixed_glazing_rec = self.windows_recommender.recommend_mixed_glazing(phase=phase)
if mixed_glazing_rec:
property_recommendations.append(mixed_glazing_rec)
phase += 1
if "fireplace" in measures:
self.fireplace_recommender.recommend(phase=phase)
if self.fireplace_recommender.recommendation:
property_recommendations.append(self.fireplace_recommender.recommendation)
phase += 1
# Heating and Electical systems
if "heating" not in self.exclusions:
cavity_or_loft_recommendations = [
r for r in self.wall_recomender.recommendations + self.roof_recommender.recommendations
if r["type"] in ["cavity_wall_insulation", "loft_insulation"]
]
has_cavity_or_loft_recommendations = len(cavity_or_loft_recommendations) > 0
cavity_or_loft_recommendations = [
r for r in self.wall_recomender.recommendations + self.roof_recommender.recommendations
if r["type"] in ["cavity_wall_insulation", "loft_insulation"]
]
has_cavity_or_loft_recommendations = len(cavity_or_loft_recommendations) > 0
self.heating_recommender.recommend(
phase=phase,
measures=measures,
has_cavity_or_loft_recommendations=has_cavity_or_loft_recommendations,
)
if (
self.heating_recommender.heating_recommendations or
self.heating_recommender.heating_control_recommendations
):
self.heating_recommender.recommend(
phase=phase,
has_cavity_or_loft_recommendations=has_cavity_or_loft_recommendations,
exclusions=self.exclusions
)
if (
self.heating_recommender.heating_recommendations or
self.heating_recommender.heating_control_recommendations
):
# We split into first and second phase recommendations
first_phase_recommendations = [
r for r in (
self.heating_recommender.heating_recommendations +
self.heating_recommender.heating_control_recommendations
)
if r["phase"] == phase
]
second_phase_recommendations = [
r for r in (
self.heating_recommender.heating_recommendations +
self.heating_recommender.heating_control_recommendations
)
if r["phase"] == phase + 1
]
if first_phase_recommendations:
property_recommendations.append(first_phase_recommendations)
if second_phase_recommendations:
property_recommendations.append(second_phase_recommendations)
# We check if we have distinct heating and heating controls recommendations
# If so, we increment by 2 (one of the heating system, one for the heating controls)
# otherwise we incremenet by 1
max_used_phase = max(
[rec["phase"] for rec in
self.heating_recommender.heating_recommendations +
self.heating_recommender.heating_control_recommendations]
# We split into first and second phase recommendations
first_phase_recommendations = [
r for r in (
self.heating_recommender.heating_recommendations +
self.heating_recommender.heating_control_recommendations
)
amount_to_increment = max_used_phase - phase + 1
phase += amount_to_increment
if r["phase"] == phase
]
second_phase_recommendations = [
r for r in (
self.heating_recommender.heating_recommendations +
self.heating_recommender.heating_control_recommendations
)
if r["phase"] == phase + 1
]
if first_phase_recommendations:
property_recommendations.append(first_phase_recommendations)
if second_phase_recommendations:
property_recommendations.append(second_phase_recommendations)
# We check if we have distinct heating and heating controls recommendations
# If so, we increment by 2 (one of the heating system, one for the heating controls)
# otherwise we incremenet by 1
max_used_phase = max(
[rec["phase"] for rec in
self.heating_recommender.heating_recommendations +
self.heating_recommender.heating_control_recommendations]
)
amount_to_increment = max_used_phase - phase + 1
phase += amount_to_increment
# Hot water
if "hot_water" not in self.exclusions:
if "hot_water" in measures:
self.hotwater_recommender.recommend(phase=phase)
if self.hotwater_recommender.recommendations:
property_recommendations.append(self.hotwater_recommender.recommendations)
phase += 1
if "lighting" not in self.exclusions:
if "low_energy_lighting" in measures:
self.lighting_recommender.recommend(phase=phase)
if self.lighting_recommender.recommendation:
property_recommendations.append(self.lighting_recommender.recommendation)
phase += 1
if "secondary_heating" not in self.exclusions:
if "secondary_heating" in measures:
self.secondary_heating_recommender.recommend(phase=phase)
if self.secondary_heating_recommender.recommendation:
property_recommendations.append(self.secondary_heating_recommender.recommendation)
phase += 1
# Renewables
if "solar_pv" not in self.exclusions:
if "solar_pv" in measures:
self.solar_recommender.recommend(phase=phase)
if self.solar_recommender.recommendation:
property_recommendations.append(self.solar_recommender.recommendation)
@ -197,13 +248,13 @@ class Recommendations:
# We also need to create the representative recommendations for each recommendation type
property_representative_recommendations = self.create_representative_recommendations(
property_recommendations, non_invasive_recommendations=self.property_instance.non_invasive_recommendations
property_recommendations,
)
return property_recommendations, property_representative_recommendations
@staticmethod
def create_representative_recommendations(property_recommendations, non_invasive_recommendations):
def create_representative_recommendations(property_recommendations):
"""
This method will create a representative recommendation for each recommendation type
In order to create a representative recommendation, we choose the recommendation that has:
@ -220,12 +271,10 @@ class Recommendations:
# If the property was initially surveyed as filled, but the cavity was only partially filled, we don't
# want to include the cavity wall insulation recommendation in the defaults
# if (recommendations_by_type[0].get("type") == "cavity_wall_insulation") and (
# "cavity_surveyed_as_filled_is_partial" in non_invasive_recommendations
# ):
# continue
if recommendations_by_type[0].get("type") == "mechanical_ventilation":
if recommendations_by_type[0].get("type") in [
"mechanical_ventilation", "trickle_vents", "draught_proofing"
]:
continue
has_u_value = recommendations_by_type[0].get("new_u_value") is not None
@ -257,7 +306,10 @@ class Recommendations:
elif not has_u_value and has_sap_points:
# Sort the options by the cost per SAP point improvement - the lower the better
for rec in recommendations:
rec["efficiency"] = rec["total"] / rec["sap_points"]
if rec["sap_points"] == 0:
rec["efficiency"] = 0
else:
rec["efficiency"] = rec["total"] / rec["sap_points"]
elif has_rank:
# Sort the options by rank - the lower the better
for rec in recommendations:
@ -394,8 +446,9 @@ class Recommendations:
impact_summary = []
for recommendations_by_type in property_recommendations:
for rec in recommendations_by_type:
if rec["type"] == "mechanical_ventilation":
# We don't have a percieved sap impact of mechanical ventilation
if rec["type"] in ["mechanical_ventilation", "trickle_vents", "draught_proofing"]:
# We don't have a percieved sap impact of mechanical ventilation or trickle vents, and we don't
# have the capacity to score draught proofing
continue
phase_energy_efficiency_metrics = {
@ -481,8 +534,10 @@ class Recommendations:
property_phase_impact["carbon"], rec["co2_equivalent_savings"]
)
# Insert this information into the recommendation
rec["sap_points"] = property_phase_impact["sap"]
# Insert this information into the recommendation.
if not rec.get("survey", False):
rec["sap_points"] = property_phase_impact["sap"]
rec["co2_equivalent_savings"] = property_phase_impact["carbon"]
rec["heat_demand"] = property_phase_impact["heat_demand"]
@ -536,7 +591,7 @@ class Recommendations:
"heating_cop": mapped["cop"], "hotwater_cop": 1
}
mapped_hotwater = DESCRIPTIONS_TO_FUEL_TYPES[hotwater_description]
mapped_hotwater = assumptions.DESCRIPTIONS_TO_FUEL_TYPES[hotwater_description]
return {
"heating_fuel_type": heating_fuel, "hotwater_fuel_type": mapped_hotwater["fuel"],
@ -656,18 +711,6 @@ class Recommendations:
pd.isnull(kwh_impact_table["hotwater_fuel_type"]).sum()):
raise Exception("Fuel type is missing")
# kwh_impact_table["heating_fuel_type"] = np.where(
# kwh_impact_table["id"] == STARTING_DUMMY_ID_VALUE,
# property_instance.heating_energy_source,
# kwh_impact_table["heating_fuel_type"]
# )
#
# kwh_impact_table["hotwater_fuel_type"] = np.where(
# kwh_impact_table["id"] == STARTING_DUMMY_ID_VALUE,
# property_instance.hot_water_energy_source,
# kwh_impact_table["hotwater_fuel_type"]
# )
# We now calculate the fuel cost
for k in ["heating", "hotwater"]:
kwh_impact_table[f"{k}_cost"] = kwh_impact_table.apply(
@ -679,7 +722,8 @@ class Recommendations:
# We now deduce if any of the recommendations result in a change of fuel type
for recs in property_recommendations:
for rec in recs:
if rec["type"] == "mechanical_ventilation":
if rec["type"] in ["mechanical_ventilation", "trickle_vents", "draught_proofing"]:
# We cannot score the impact on draught proofing
continue
rec_impact = kwh_impact_table[kwh_impact_table["recommendation_id"] == rec["recommendation_id"]]

View file

@ -1,6 +1,7 @@
import math
import pandas as pd
from backend.Property import Property
from backend.app.plan.schemas import MEASURE_MAP
from typing import List
from datatypes.enums import QuantityUnits
from recommendations.recommendation_utils import (
@ -78,13 +79,13 @@ class RoofRecommendations:
return self.recommendations
def is_loft_already_insulated(self):
def is_loft_already_insulated(self, measures):
"""
Check if the loft is already insulated
"""
# If we have a non-invasive recommendation for the loft insulation, we can assume that the loft is not insulated
if "loft_insulation" in self.property.non_invasive_recommendations:
if "loft_insulation" in measures:
return False
return (self.insulation_thickness > self.MINIMUM_LOFT_ISULATION_MM) and self.property.roof["is_pitched"]
@ -108,11 +109,13 @@ class RoofRecommendations:
return full_insulated_room_roof or room_roof_insulated_at_rafters
def recommend(self, phase):
def recommend(self, phase, measures=None):
if self.property.roof["has_dwelling_above"]:
return
measures = MEASURE_MAP["roof_insulation"] if measures is None else measures
u_value = self.property.roof["thermal_transmittance"]
# We check if the roof is already insulated and if so, we exit
@ -120,7 +123,7 @@ class RoofRecommendations:
# Building regulations part L recommend installing at least 270mm of insulation, however generally we
# experience diminishing returns in terms of SAP once we go beyond around 150mm of insulation
# This only holds true for pitched roofs.
if self.is_loft_already_insulated():
if self.is_loft_already_insulated(measures):
return
if (self.insulation_thickness >= self.MINIMUM_FLAT_ROOF_ISULATION_MM) and self.property.roof["is_flat"]:
@ -152,20 +155,24 @@ class RoofRecommendations:
)
self.estimated_u_value = u_value
if (u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE) and (
"loft_insulation" not in self.property.non_invasive_recommendations
if (u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE) or (
"loft_insulation" not in measures
):
# The Roof is already compliant
return
if self.property.roof["is_pitched"] or self.property.roof["is_flat"]:
insulation_thickness = (
0 if "loft_insulation" not in self.property.non_invasive_recommendations else self.insulation_thickness
)
if (self.property.roof["is_pitched"] and "loft_insulation" in measures) or (
self.property.roof["is_flat"] and "flat_roof_insulation" in measures
):
insulation_thickness = 0 if "loft_insulation" not in measures else self.insulation_thickness
self.recommend_roof_insulation(u_value, insulation_thickness, self.property.roof, phase)
return
if self.property.roof["is_roof_room"]:
# There are cases where the property might have a room roof as the second roof, but we have a recommendation for
# it, so we allow this override
if self.property.roof["is_roof_room"] and ("room_roof_insulation" in measures) or (
"room_roof_insulation" in [x["type"] for x in self.property.non_invasive_recommendations]
):
self.recommend_room_roof_insulation(u_value, phase)
return
@ -418,6 +425,10 @@ class RoofRecommendations:
}
]
rir_non_invasive_recommendation = next(
(x for x in self.property.non_invasive_recommendations if x["type"] == "room_roof_insulation"), {}
)
# lowest_selected_u_value = None
recommendations = []
for material in roof_roof_insulation_materials:
@ -442,7 +453,13 @@ class RoofRecommendations:
# if new_u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE:
# lowest_selected_u_value = update_lowest_selected_u_value(lowest_selected_u_value, new_u_value)
estimated_cost = cost_per_unit * self.property.insulation_floor_area
estimated_cost = (
cost_per_unit * self.property.insulation_floor_area if
rir_non_invasive_recommendation.get("cost") is None else
rir_non_invasive_recommendation.get("cost")
)
sap_points = rir_non_invasive_recommendation.get("sap_points", None)
# Could also be Roof room(s), ceiling insulated
new_descriptin = "Pitched, insulated at rafters"
@ -480,14 +497,15 @@ class RoofRecommendations:
"description": "Insulate room in roof at rafters and re-decorate",
"starting_u_value": u_value,
"new_u_value": None,
"sap_points": None,
"sap_points": sap_points,
"simulation_config": simulation_config,
"description_simulation": {
"roof-description": new_descriptin,
"roof-energy-eff": new_efficiency
},
**cost_result,
"already_installed": already_installed
"already_installed": already_installed,
"survey": rir_non_invasive_recommendation.get("survey", None)
}
)

View file

@ -81,3 +81,44 @@ class VentilationRecommendations(Definitions):
"labour_days": labour_days # Assume 8 hour day
}
]
def recommend_trickle_vents(self):
"""
This is not something that we can identify completely non-invasively, however a recommendation which may come
about as a result of an energy assessment is the installation of trickle vents. This function handles that
"""
trickle_vents_recommendation_config = next(
(r for r in self.property.non_invasive_recommendations if r["type"] == "trickle_vents"), {}
)
if not trickle_vents_recommendation_config:
return
description = (
"Install trickle vents on your windows" if
not trickle_vents_recommendation_config.get("description")
else trickle_vents_recommendation_config["description"]
)
return [
{
"phase": None,
"parts": [],
"type": "trickle_vents",
"description": description,
"starting_u_value": None,
"new_u_value": None,
"already_installed": False,
"sap_points": 0,
"heat_demand": 0,
"kwh_savings": 0,
"co2_equivalent_savings": 0,
"energy_cost_savings": 0,
"total": trickle_vents_recommendation_config["cost"],
# We use a very simple and rough estimate of 4 hours per unit
"labour_hours": trickle_vents_recommendation_config.get("labour_hours", 8),
"labour_days": trickle_vents_recommendation_config.get("labour_days", 1), # Assume 8 hour day
"survey": True
}
]

View file

@ -5,6 +5,7 @@ import pandas as pd
from datatypes.enums import QuantityUnits
from backend.Property import Property
from backend.app.plan.schemas import MEASURE_MAP
from BaseUtility import Definitions
from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes
from recommendations.recommendation_utils import (
@ -162,7 +163,7 @@ class WallRecommendations(Definitions):
)
# Test filling cavity
self.find_cavity_insulation(u_value, insulation_thickness, phase)
self.find_cavity_insulation(u_value, insulation_thickness, phase, measures)
return self.recommendations
@ -190,11 +191,15 @@ class WallRecommendations(Definitions):
return ewi_recommendations
def recommend(self, phase=0, exclusions=None):
def recommend(self, phase=0, measures=None):
# if building built after 1990 + we're able to identify U-value +
# U-value less than 0.18 and if in or close to a conversation area,
# recommend internal wall insulation as a possible measure
measures = MEASURE_MAP["wall_insulation"] if measures is None else measures
if not measures:
return
u_value = self.property.walls["thermal_transmittance"]
u_value = None if pd.isnull(u_value) else u_value
@ -207,7 +212,7 @@ class WallRecommendations(Definitions):
or self.property.walls["is_filled_cavity"]
) and (
"cavity_extract_and_refill"
not in self.property.non_invasive_recommendations
not in measures
):
return
@ -235,7 +240,7 @@ class WallRecommendations(Definitions):
and (u_value >= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE)
):
# Recommend insulation
self.find_insulation(u_value, phase)
self.find_insulation(u_value, phase, measures)
return
# We can't detect it's a cavity wall, but it was built after 1990 so likely built with insulation already
@ -259,22 +264,22 @@ class WallRecommendations(Definitions):
self.estimated_u_value = u_value
if is_cavity_wall or "cavity_extract_and_refill" in self.property.non_invasive_recommendations:
if (is_cavity_wall and "cavity_wall_insulation" in measures) or "cavity_extract_and_refill" in measures:
if u_value >= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE:
# Test filling cavity
self.find_cavity_insulation(u_value, insulation_thickness, phase)
self.find_cavity_insulation(u_value, insulation_thickness, phase, measures)
return
# Remaining wall types are treated with IWI or EWI
if (u_value >= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE) and self.is_suitable_for_solid_insulation():
self.find_insulation(u_value, phase, exclusions=exclusions)
self.find_insulation(u_value, phase, measures=measures)
return
# If the u-value is within regulations, we don't do anything
return
def find_cavity_insulation(self, u_value, insulation_thickness, phase):
def find_cavity_insulation(self, u_value, insulation_thickness, phase, measures):
"""
This method tests different materials to fill the cavity wall, determining which
material will give us the best U-value.
@ -294,6 +299,8 @@ class WallRecommendations(Definitions):
:param u_value: u_value of the starting wall
:param insulation_thickness: describes the insulation level of the wall. If "below average", we have a partially
filled cavity wall
:param phase: The phase of the recommendation
:param measures: The measures we're considering
"""
insulation_materials = pd.DataFrame(self.cavity_wall_insulation_materials)
@ -328,7 +335,7 @@ class WallRecommendations(Definitions):
is_extraction_and_refill = (
"cavity_extract_and_refill"
in self.property.non_invasive_recommendations
in measures
)
cost_result = self.costs.cavity_wall_insulation(
@ -447,6 +454,16 @@ class WallRecommendations(Definitions):
lowest_selected_u_value = None
recommendations = []
iwi_non_invasive_recommendations = next(
(r for r in self.property.non_invasive_recommendations if r["type"] == "internal_wall_insulation"), {}
)
ewi_non_invasive_recommendations = next(
(r for r in self.property.non_invasive_recommendations if r["type"] == "external_wall_insulation"), {}
)
if ewi_non_invasive_recommendations:
raise NotImplementedError("Implement ewi non-invasive recommendations")
for _, insulation_material_group in insulation_materials.groupby("description"):
for _, material in insulation_material_group.iterrows():
@ -479,6 +496,15 @@ class WallRecommendations(Definitions):
)
if material["type"] == "internal_wall_insulation":
if iwi_non_invasive_recommendations.get("cost") is not None:
raise NotImplementedError(
"Not handled passing costs from non-invasive recommendations for iwi"
)
sap_points = iwi_non_invasive_recommendations.get("sap_points", None)
survey = iwi_non_invasive_recommendations.get("survey", False)
cost_result = self.costs.internal_wall_insulation(
wall_area=self.property.insulation_wall_area,
material=material.to_dict(),
@ -496,6 +522,10 @@ class WallRecommendations(Definitions):
)
elif material["type"] == "external_wall_insulation":
sap_points = ewi_non_invasive_recommendations.get("sap_points", None)
survey = ewi_non_invasive_recommendations.get("survey", False)
cost_result = self.costs.external_wall_insulation(
wall_area=self.property.insulation_wall_area,
material=material.to_dict(),
@ -546,19 +576,20 @@ class WallRecommendations(Definitions):
"starting_u_value": u_value,
"new_u_value": new_u_value,
"already_installed": already_installed,
"sap_points": None,
"sap_points": sap_points,
"simulation_config": simulation_config,
"description_simulation": {
"walls-description": new_description,
"walls-energy-eff": simulation_config["walls_energy_eff_ending"]
},
**cost_result
**cost_result,
"survey": survey
}
)
return recommendations
def find_insulation(self, u_value, phase, exclusions=None):
def find_insulation(self, u_value, phase, measures):
"""
This function contains the logic for finding potential insulation measures for a property, depending
on the parts available and whether the property can have external wall insulation installed
@ -570,10 +601,8 @@ class WallRecommendations(Definitions):
# we separate the logic for for recommending them, therefore we don't
# consider diminishing returns between the two as they are considered to be separate measures
exclusions = [] if exclusions is None else exclusions
ewi_recommendations = []
if self.ewi_valid() and "external_wall_insulation" not in exclusions:
if self.ewi_valid() and "external_wall_insulation" in measures:
ewi_recommendations = self._find_insulation(
u_value=u_value,
insulation_materials=pd.DataFrame(
@ -584,7 +613,7 @@ class WallRecommendations(Definitions):
)
iwi_recommendations = []
if "internal_wall_insulation" not in exclusions:
if "internal_wall_insulation" in measures:
iwi_recommendations = self._find_insulation(
u_value=u_value,
insulation_materials=pd.DataFrame(self.internal_wall_insulation_materials),

View file

@ -3,8 +3,9 @@ from typing import List
import numpy as np
from backend.Property import Property
from etl.epc_clean.epc_attributes.WindowAttributes import WindowAttributes
from recommendations.Costs import Costs
from recommendations.recommendation_utils import override_costs
from recommendations.recommendation_utils import override_costs, check_simulation_difference
class WindowsRecommendations:
@ -128,3 +129,64 @@ class WindowsRecommendations:
}
}
]
def recommend_mixed_glazing(self, phase):
"""
This function will recommend mixed glazing to the property. This is a more specific recommendation than
the general windows recommendation, but is almost certain to arise from a survey
:return:
"""
mixed_glazing_recommendation_config = next(
(r for r in self.property.non_invasive_recommendations if r["type"] == "mixed_glazing"), {}
)
if not mixed_glazing_recommendation_config:
return
description = (
"Install a combination of secondary and double glazing to single glazed windows" if
not mixed_glazing_recommendation_config.get("description")
else mixed_glazing_recommendation_config["description"]
)
windows_ending_config = WindowAttributes("Full secondary glazing").process()
windows_simulation_config = check_simulation_difference(
new_config=windows_ending_config, old_config=self.property.windows, prefix="windows_"
)
windows_simulation_config = {
**windows_simulation_config,
"windows_energy_eff_ending": "Average",
"glazed_type_ending": "secondary glazing",
"multi_glaze_proportion_ending": 100,
}
return [
{
"phase": phase,
"parts": [],
"type": "mixed_glazing",
"description": description,
"starting_u_value": None,
"new_u_value": None,
"already_installed": False,
"sap_points": mixed_glazing_recommendation_config["sap_points"],
"heat_demand": None, # We will predict this
"kwh_savings": None, # We will predict this
"co2_equivalent_savings": None, # We will predict this
"energy_cost_savings": None, # We will predict this
"total": mixed_glazing_recommendation_config["cost"],
# We use a very simple and rough estimate of 4 hours per unit
"labour_hours": mixed_glazing_recommendation_config.get("labour_hours", 8),
"labour_days": mixed_glazing_recommendation_config.get("labour_days", 1), # Assume 8 hour day
"survey": mixed_glazing_recommendation_config["survey"],
"simulation_config": windows_simulation_config,
"description_simulation": {
"multi-glaze-proportion": 100,
"windows-energy-eff": "Average",
"windows-description": "Multiple glazing throughout",
"glazed-type": "secondary glazing",
},
}
]