diff --git a/backend/Property.py b/backend/Property.py index 19e5cb2e..704e4f0a 100644 --- a/backend/Property.py +++ b/backend/Property.py @@ -365,7 +365,7 @@ class Property: for rec in property_recommendations_by_phase: # We simulate the impact of the recommendation at this current phase, and all of the prior phases - if rec["type"] == "mechanical_ventilation": + if rec["type"] in ["mechanical_ventilation", "trickle_vents", "draught_proofing"]: continue scoring_dict = self.create_recommendation_scoring_data( @@ -480,7 +480,6 @@ class Property: """ output = recommendation_record.copy() - non_invasive_recommendations = [] if non_invasive_recommendations is None else non_invasive_recommendations for col in [ "walls_insulation_thickness", @@ -537,28 +536,14 @@ class Property: "heating", "hot_water_tank_insulation", "heating_control", "secondary_heating", "internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation", "cylinder_thermostat", "loft_insulation", "room_roof_insulation", "flat_roof_insulation", - "solid_floor_insulation", "suspended_floor_insulation", + "solid_floor_insulation", "suspended_floor_insulation", "mixed_glazing" ]: # We update the data, as defined in the recommendaton - if output["walls_insulation_thickness_ending"] is None: - output["walls_insulation_thickness_ending"] = "none" + for prefix in ["walls", "roof", "floor"]: + if output[f"{prefix}_insulation_thickness_ending"] is None: + output[f"{prefix}_insulation_thickness_ending"] = "none" - if output["walls_thermal_transmittance_ending"] is None: - raise ValueError("We should not have a None value for the u value") - - if output["roof_insulation_thickness_ending"] is None: - output["roof_insulation_thickness_ending"] = "none" - - if output["roof_thermal_transmittance_ending"] is None: - raise ValueError("We should not have a None value for the u value") - - if output["floor_thermal_transmittance_ending"] is None: - raise ValueError("We should not have a None value for the u value") - - if output["floor_insulation_thickness_ending"] is None: - output["floor_insulation_thickness_ending"] = "none" - - simulation_config = recommendation["simulation_config"] + simulation_config = recommendation["simulation_config"].copy() # If any entries in simulation_config are None, we will set them to "Unknown" which is the cleaning # value for key, value in simulation_config.items(): @@ -576,7 +561,7 @@ class Property: "loft_insulation", "room_roof_insulation", "flat_roof_insulation", "solid_floor_insulation", "suspended_floor_insulation", "windows_glazing", "solar_pv", "heating", "hot_water_tank_insulation", - "heating_control", "secondary_heating", "cylinder_thermostat" + "heating_control", "secondary_heating", "cylinder_thermostat", "mixed_glazing" ]: raise NotImplementedError( "Implement me, given type %s" % recommendation["type"] @@ -1231,12 +1216,12 @@ class Property: else: raise Exception("Investiage me") - def is_ashp_valid(self, exclusions): + def is_ashp_valid(self, measures): if "air_source_heat_pump" in self.non_invasive_recommendations: return True - if "air_source_heat_pump" in exclusions: + if "air_source_heat_pump" not in measures: return False suitable_property_type = self.data["property-type"] in ["House", "Bungalow"] @@ -1279,9 +1264,11 @@ class Property: """ exclusions = [] if exclusions is None else exclusions + if "air_source_heat_pump" in exclusions: + return self.current_energy_consumption # If the property currently has an ASHP, we don't gain from any efficiency improvements - if not self.is_ashp_valid(exclusions=exclusions): + if not self.is_ashp_valid(measures=["air_source_heat_pump"]): return self.current_energy_consumption # If the property currently has an electric boiler, it will still benefit from the ASHP efficiency gain diff --git a/backend/app/config.py b/backend/app/config.py index b5ea72fe..9aaa0a52 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -33,6 +33,9 @@ class Settings(BaseSettings): HEATING_KWH_PREDICTIONS_BUCKET: str HOTWATER_KWH_PREDICTIONS_BUCKET: str + # Other S3 buckts + ENERGY_ASSESSMENTS_BUCKET: str + class Config: env_file = "backend/.env" diff --git a/backend/app/db/functions/energy_assessment_functions.py b/backend/app/db/functions/energy_assessment_functions.py index b223d2f5..ca2f721c 100644 --- a/backend/app/db/functions/energy_assessment_functions.py +++ b/backend/app/db/functions/energy_assessment_functions.py @@ -1,17 +1,26 @@ -from backend.app.db.models.energy_assessments import EnergyAssessment +from backend.app.db.models.energy_assessments import ( + EnergyAssessment, EnergyAssessmentScenarios, EnergyAssessmentDocuments, DocumentTypeEnum +) from sqlalchemy.orm import Session from sqlalchemy.exc import IntegrityError -from typing import Optional +from typing import Optional, List, Dict from sqlalchemy import desc +from utils.logger import setup_logger + +logger = setup_logger() -def bulk_insert_energy_assessments(session: Session, data_list): +def bulk_insert_energy_assessments(session: Session, data_list: List[dict]) -> Dict[int, int]: """ - This function inserts or updates multiple energy assessment records into the database. + This function inserts or updates multiple energy assessment records into the database and returns a mapping of + uprn to energy_assessment_id. :param session: The SQLAlchemy session. :param data_list: A list of dictionaries containing energy assessment data. + :return: A dictionary mapping each uprn to its corresponding energy_assessment_id. """ + uprn_to_assessment_id = {} + try: for data in data_list: uprn = data.get('uprn') @@ -28,19 +37,30 @@ def bulk_insert_energy_assessments(session: Session, data_list): for key, value in data.items(): setattr(existing_record, key, value) session.add(existing_record) + + # Map the uprn to the existing record's ID + uprn_to_assessment_id[uprn] = existing_record.id else: # Insert a new record new_assessment = EnergyAssessment(**data) session.add(new_assessment) + # Flush the session to get the newly created ID before commit + session.flush() + + # Map the uprn to the new record's ID + uprn_to_assessment_id[uprn] = new_assessment.id + # Commit the transaction session.commit() - print("All records inserted or updated successfully.") + logger.info("All records inserted or updated successfully.") except IntegrityError as e: # Rollback the session in case of error session.rollback() - print(f"Error occurred: {e}") + logger.info(f"Error occurred: {e}") + + return uprn_to_assessment_id def get_latest_assessment_by_uprn(session: Session, uprn: int) -> Optional[EnergyAssessment]: @@ -58,5 +78,81 @@ def get_latest_assessment_by_uprn(session: Session, uprn: int) -> Optional[Energ return latest_assessment.to_dict() if latest_assessment else EnergyAssessment.empty_response() except Exception as e: - print(f"An error occurred: {e}") + logger.info(f"An error occurred: {e}") return None + + +def create_scenarios_for_documents(session: Session, document_list: List[dict], uprn_to_assessment_id: dict): + """ + Creates scenarios for documents by UPRN and links them to the energy assessments. + + :param session: The SQLAlchemy session. + :param document_list: A list of dictionaries containing document data. + :param uprn_to_assessment_id: A dictionary mapping UPRN to energy_assessment_id. + """ + try: + for document in document_list: + uprn = document.get('uprn') + scenario_name = document.get('scenario_id') + + if scenario_name: + # Get the associated energy_assessment_id for the UPRN + energy_assessment_id = uprn_to_assessment_id.get(uprn) + + # Check if the scenario already exists + existing_scenario = session.query(EnergyAssessmentScenarios).filter_by( + scenario_name=scenario_name, + energy_assessment_id=energy_assessment_id + ).first() + + if not existing_scenario: + # Create the scenario + new_scenario = EnergyAssessmentScenarios( + scenario_name=scenario_name, + energy_assessment_id=energy_assessment_id + ) + session.add(new_scenario) + session.flush() # Get the new scenario ID + + # Update document with new scenario ID + document['scenario_id'] = new_scenario.id + else: + # If the scenario already exists, just use its ID + document['scenario_id'] = existing_scenario.id + + # Commit the scenarios + session.commit() + logger.info("Scenarios created successfully.") + + except IntegrityError as e: + session.rollback() + logger.info(f"Error occurred: {e}") + + +def create_documents(session: Session, document_list: List[dict]): + """ + Inserts documents into the energy_assessment_documents table, linking them to scenarios and assessments. + + :param session: The SQLAlchemy session. + :param document_list: A list of dictionaries containing document data. + """ + try: + for document in document_list: + # Ensure the document_type is cast to Enum + new_document = EnergyAssessmentDocuments( + uprn=document['uprn'], + document_type=DocumentTypeEnum(document['document_type']).value, + document_location=document['document_location'], + energy_assessment_id=document['energy_assessment_id'], + scenario_id=document.get('scenario_id') # Might be None if no scenario + ) + + session.add(new_document) + + # Commit all document insertions + session.commit() + logger.info("Documents created successfully.") + + except IntegrityError as e: + session.rollback() + logger.info(f"Error occurred: {e}") diff --git a/backend/app/db/models/energy_assessments.py b/backend/app/db/models/energy_assessments.py index 3928f9fa..46912c9b 100644 --- a/backend/app/db/models/energy_assessments.py +++ b/backend/app/db/models/energy_assessments.py @@ -1,5 +1,8 @@ -from sqlalchemy import Column, Integer, BigInteger, Text, Float, DateTime, Boolean, Date +from sqlalchemy import Column, Integer, BigInteger, Text, Float, DateTime, Boolean, Date, ForeignKey from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.dialects.postgresql import ENUM as PgEnum +import enum +from datetime import datetime Base = declarative_base() @@ -163,3 +166,42 @@ class EnergyAssessment(Base): @staticmethod def empty_response(): return {"epc": {}, "condition": {}} + + +class EnergyAssessmentScenarios(Base): + __tablename__ = 'energy_assessment_scenarios' + id = Column(BigInteger, primary_key=True, autoincrement=True) + scenario_name = Column(Text, nullable=False) + energy_assessment_id = Column(BigInteger, ForeignKey('energy_assessments.id'), nullable=False) + + +class DocumentTypeEnum(enum.Enum): + EPR = "EPR" + ConditionReport = "Condition Report" + EvidenceReport = "Evidence Report" + SummaryInformation = "Summary Information" + FloorPlan = "Floor Plan" + ScenarioDraftEPC = "Scenario Draft EPC" + ScenarioSiteNotes = "Scenario Site Notes" + + +class EnergyAssessmentDocuments(Base): + __tablename__ = 'energy_assessment_documents' + id = Column(BigInteger, primary_key=True, autoincrement=True) + uprn = Column(BigInteger, nullable=False) + energy_assessment_id = Column(BigInteger, ForeignKey('energy_assessments.id'), nullable=False) + document_type = Column(PgEnum(DocumentTypeEnum, name="document_type", create_type=False), nullable=False) + document_location = Column(Text, nullable=False) + uploaded_at = Column(DateTime(timezone=True), nullable=False, default=datetime.utcnow) + scenario_id = Column(BigInteger, ForeignKey('energy_assessment_scenarios.id'), nullable=True) + + @staticmethod + def empty_response(): + return { + "id": None, + "uprn": None, + "document_type": None, + "document_location": None, + "uploaded_at": None, + "scenario_id": None + } diff --git a/backend/app/energy_assessments/router.py b/backend/app/energy_assessments/router.py new file mode 100644 index 00000000..0f5fcf1b --- /dev/null +++ b/backend/app/energy_assessments/router.py @@ -0,0 +1,273 @@ +import os +from io import BytesIO +from typing import List + +from fastapi import APIRouter, Depends +from starlette.responses import Response + +from backend.app.config import get_settings +from backend.app.dependencies import validate_token +from backend.app.energy_assessments.schemas import EnergyAssessmentUploadPayload + +from sqlalchemy.orm import sessionmaker +from sqlalchemy.exc import IntegrityError, OperationalError +from backend.app.db.connection import db_engine +from backend.app.db.functions.energy_assessment_functions import ( + bulk_insert_energy_assessments, create_scenarios_for_documents, create_documents +) + +from etl.xml_survey_extraction.XmlParser import XmlParser + +from utils.s3 import ( + read_from_s3, list_files_and_subfolders_in_s3_folder, list_xmls_in_s3_folder, save_csv_to_s3, + list_files_in_s3_folder +) +from utils.logger import setup_logger + +logger = setup_logger() + + +def insert_energy_assessment_documents(document_list: List[dict], uprn_to_assessment_id: dict): + """ + Inserts or updates energy assessment documents, assigning the correct energy_assessment_id. + + :param document_list: A list of dictionaries containing document data. + :param uprn_to_assessment_id: A dictionary mapping UPRN to energy_assessment_id. + """ + for document in document_list: + uprn = document['uprn'] + # Assign the energy_assessment_id based on uprn + energy_assessment_id = uprn_to_assessment_id.get(uprn) + + if not energy_assessment_id: + logger.info(f"No energy_assessment_id found for UPRN: {uprn}. Skipping document.") + continue + + # Attach energy_assessment_id to each document + document['energy_assessment_id'] = energy_assessment_id + + logger.info("Energy Assessment IDs assigned to documents.") + + +router = APIRouter( + prefix="/energy-assessments", + tags=["energy-assessments"], + dependencies=[Depends(validate_token)], + responses={404: {"description": "Not found"}} +) + + +@router.post("/upload") +async def upload(body: EnergyAssessmentUploadPayload): + """ + Given a location in S3, this service will retrieve the data in s3 and perform the following: + 1) Extract the data and store it to the data + 2) Extract the links to other artefacts collected during the energy assessment, such as EPRs, floor plans and + condition reports + + This will allow us to do the following: + 1) Present the findings of the energy assessment to the client + 2) Allow the end use to download the artefacts collected during the energy assessment + + Eventually, we will this service to collect the key documents from the service where they're uploaded + (e.g. Onedrive) and store them to S3, but for the moment, this is sufficient + + """ + + logger.info("Connecting to db") + session = sessionmaker(bind=db_engine)() + + try: + logger.info("Extracting energy assessment data") + energy_assessments = list_files_and_subfolders_in_s3_folder( + bucket_name=get_settings().ENERGY_ASSESSMENTS_BUCKET, + folder_name=f"{body.surveyor}/{body.project_code}/" + ) + + logger.info( + f"Found {len(energy_assessments)} energy assessments for {body.surveyor} and {body.project_code}" + ) + assessments_map = {} + for assessment in energy_assessments: + uploaded_xmls = list_xmls_in_s3_folder( + bucket_name=get_settings().ENERGY_ASSESSMENTS_BUCKET, + folder_name=os.path.join(assessment, "docs & plans") + ) + + energy_assessment_files = list_files_in_s3_folder( + bucket_name=get_settings().ENERGY_ASSESSMENTS_BUCKET, + folder_name=os.path.join(assessment, "docs & plans") + ) + # Remove xmls from the list of files + energy_assessment_files = [file for file in energy_assessment_files if file not in uploaded_xmls] + # We now split this into the different types of files + # EPR + eprs = [ + file for file in energy_assessment_files if "epr.pdf" in file.split("/")[-1].replace(" ", "").lower() + ] + # Condition report + condition_reports = [ + file for file in energy_assessment_files if "cr.pdf" in file.split("/")[-1].replace(" ", "").lower() + ] + # Evidence report + evidence_reports = [ + file for file in energy_assessment_files + if "evidence.pdf" in file.split("/")[-1].replace(" ", "").lower() + ] + # Summary report + summary_reports = [ + file for file in energy_assessment_files + if "sn.pdf" in file.split("/")[-1].replace(" ", "").lower() + ] + # Floor plans - these are just the jpgs + floor_plans = [file for file in energy_assessment_files if file.endswith(".jpg")] + + # We now retrieve scenarios + scenario_folders = list_files_and_subfolders_in_s3_folder( + bucket_name=get_settings().ENERGY_ASSESSMENTS_BUCKET, + folder_name=assessment + ) + + # filter folders that contain the word scenario + scenario_folders = [ + folder for folder in scenario_folders if "scenario" in folder.rstrip("/").split("/")[-1].lower() + ] + scenario_documents = [] + for sf in scenario_folders: + scenario_files = list_files_in_s3_folder( + bucket_name=get_settings().ENERGY_ASSESSMENTS_BUCKET, + folder_name=sf + ) + notes = [ + file for file in scenario_files if "sitenotes" in file.split("/")[-1].replace(" ", "").lower() + ] + # This should be the leftovers + draft_epc = [file for file in scenario_files if file not in notes] + scenario_documents.append( + { + "identifier": sf.rstrip("/").split("/")[-1], + "Scenario Site Notes": notes, + "Scenario Draft EPC": draft_epc + } + ) + + uprn = int(assessment.rstrip("/").split("/")[-1]) + assessments_map[uprn] = { + "xmls": uploaded_xmls, + "EPR": eprs, + "Condition Report": condition_reports, + "Evidence Report": evidence_reports, + "Summary Information": summary_reports, + "Floor Plan": floor_plans, + "scenario_documents": scenario_documents + } + + logger.info("Extracted energy assessment data and storing file locations to database") + xml_data_to_store = [] + energy_assessment_documents = [] + for uprn, files in assessments_map.items(): + # Create the rows of data to insert into the energy assessment documents + property_ea_docs = [] + for doc_type, doc_files in files.items(): + if doc_type == "xmls": + continue + + if doc_type == "scenario_documents": + for doc in doc_files: + # This scenario id is put in as a placeholder means os associating the scenario documents with + # the correct scenario + scenario_id = doc["identifier"] + for sn in doc["Scenario Site Notes"]: + property_ea_docs.append( + { + "uprn": uprn, + "document_type": "Scenario Site Notes", + "document_location": sn, + "scenario_id": scenario_id + } + ) + + for d_epc in doc["Scenario Draft EPC"]: + property_ea_docs.append( + { + "uprn": uprn, + "document_type": "Scenario Draft EPC", + "document_location": d_epc, + "scenario_id": scenario_id + } + ) + + continue + + for doc in doc_files: + property_ea_docs.append( + { + "uprn": uprn, + "document_type": doc_type, + "document_location": doc, + "scenario_id": None + } + ) + energy_assessment_documents.extend(property_ea_docs) + + xmls = files["xmls"] + extracted_data = {} + for xml in xmls: + xml_data = read_from_s3(bucket_name=get_settings().ENERGY_ASSESSMENTS_BUCKET, s3_file_name=xml) + xml_data_io = BytesIO(xml_data) + xml_parser = XmlParser( + file=xml_data_io, + filekey=os.path.join(f"s3://{get_settings().ENERGY_ASSESSMENTS_BUCKET}", xml), + uprn=uprn, + surveyor_company=body.surveyor, + ) + xml_parser.run() + if xml_parser.is_lig: + logger.info(f"Extracted data from {xml}") + extracted_epc = xml_parser.epc + extracted_additional_data = xml_parser.additional_data + + data_to_update = { + **extracted_epc, **extracted_additional_data + } + + # We need to update the keys to match the database schema - i.e. we should replace all hyphens with + # underscores + data_to_update = {k.replace("-", "_"): v for k, v in data_to_update.items()} + + extracted_data.update(data_to_update) + + xml_data_to_store.append(extracted_data) + + logger.info("Storing energy assessment xml data to database") + uprn_to_assessment_id = bulk_insert_energy_assessments(session, xml_data_to_store) + + # Insert energy assessment id into the documents data + insert_energy_assessment_documents(energy_assessment_documents, uprn_to_assessment_id) + + create_scenarios_for_documents(session, energy_assessment_documents, uprn_to_assessment_id) + + create_documents(session, energy_assessment_documents) + + session.close() + + except IntegrityError: + logger.error("Database integrity error occurred", exc_info=True) + session.rollback() + return Response(status_code=500, content="Database integrity error.") + except OperationalError: + logger.error("Database operational error occurred", exc_info=True) + session.rollback() + return Response(status_code=500, content="Database operational error.") + except ValueError: + logger.error("Value error - possibly due to malformed data", exc_info=True) + session.rollback() + return Response(status_code=400, content="Bad request: malformed data.") + except Exception as e: # General exception handling + logger.error(f"An error occurred: {e}") + session.rollback() + return Response(status_code=500, content="An unexpected error occurred.") + finally: + session.close() + + return Response(status_code=200) diff --git a/backend/app/energy_assessments/schemas.py b/backend/app/energy_assessments/schemas.py new file mode 100644 index 00000000..cfee76ff --- /dev/null +++ b/backend/app/energy_assessments/schemas.py @@ -0,0 +1,10 @@ +from pydantic import BaseModel + + +class EnergyAssessmentUploadPayload(BaseModel): + portfolio_id: int + # This is the energy assessment company/individual that conducted the energy assessment, where the data is uploaded + # against + surveyor: str + # is a code, like VEC001, which is used to identify the project and also where the data is uploaded against + project_code: str diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index e773e303..929ce7fa 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -620,6 +620,13 @@ async def trigger_plan(body: PlanTriggerRequest): if individual_units: # Model the solar potential at the property level for unit in tqdm(individual_units): + + # TODO: Tidy up this code + # We don't need to do this if we have global inclusions that don't include solar + if body.inclusions: + if "solar_pv" not in body.inclusions: + continue + property_instance = [p for p in input_properties if p.id == unit["property_id"]][0] # At this level, we check if the property is suitable for solar and if now, skip if not property_instance.is_solar_pv_valid(): @@ -668,7 +675,9 @@ async def trigger_plan(body: PlanTriggerRequest): recommendations_scoring_data = [] representative_recommendations = {} for p in tqdm(input_properties): - recommender = Recommendations(property_instance=p, materials=materials, exclusions=body.exclusions) + recommender = Recommendations( + property_instance=p, materials=materials, exclusions=body.exclusions, inclusions=body.inclusions + ) property_recommendations, property_representative_recommendations = recommender.recommend() if not property_recommendations: diff --git a/backend/app/plan/schemas.py b/backend/app/plan/schemas.py index 04a1eb89..2968babf 100644 --- a/backend/app/plan/schemas.py +++ b/backend/app/plan/schemas.py @@ -1,6 +1,56 @@ from pydantic import BaseModel, conlist, validator from typing import Optional +TYPICAL_MEASURE_TYPES = [ + "wall_insulation", + "roof_insulation", + "ventilation", + "floor_insulation", + "windows", + "fireplace", + "heating", + "hot_water", + "low_energy_lighting", + "secondary_heating", + "solar_pv" +] + +SPECIFIC_MEASURES = [ + # Specific measures + # Walls + "internal_wall_insulation", + "external_wall_insulation", + "cavity_wall_insulation" + # Roof + "loft_insulation", + "flat_roof_insulation", + "room_roof_insulation", + # Floor + "suspended_floor_insulation", + "solid_floor_insulation", + # Heating + "boiler_upgrade", + "high_heat_retention_storage_heater", + "air_source_heat_pump", + + # Specific measures that will typically come from an energy assessment + "trickle_vents", + "draught_proofing", + "mixed_glazing", # This covers partial double glazing and secondary glazing + "cavity_extract_and_refill", +] + +# This allows us to extend high level categories for measures such as "wall_insulation" to the specific measures +# such as "external_wall_insulation", "internal_wall_insulation", "cavity_wall_insulation" +MEASURE_MAP = { + "wall_insulation": [ + "internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation", "cavity_extract_and_refill" + ], + "roof_insulation": ["loft_insulation", "flat_roof_insulation", "room_roof_insulation"], + "floor_insulation": ["suspended_floor_insulation", "solid_floor_insulation"], + "heating": ["boiler_upgrade", "high_heat_retention_storage_heater", "air_source_heat_pump"], +} + class PlanTriggerRequest(BaseModel): budget: Optional[float] = None @@ -13,33 +63,13 @@ class PlanTriggerRequest(BaseModel): patches_file_path: Optional[str] = None non_invasive_recommendations_file_path: Optional[str] = None exclusions: Optional[conlist(str, min_items=1)] = None + inclusions: Optional[conlist(str, min_items=1)] = None + scenario_name: Optional[str] = "" # If true, will allow us to create multiple plans for the same portfolio, whereas if this is false, if this property # exists in the portfolio, it will be ignored multi_plan: Optional[bool] = False - # Pre-defined list of possibilities for exclusions - _allowed_exclusions = { - # Measure classes - "wall_insulation", - "ventilation", - "roof_insulation", - "floor_insulation", - "windows", - "fireplace", - "heating", - "hot_water", - "lighting", - "solar_pv", - # Specific measures - "air_source_heat_pump", - "internal_wall_insulation", - "external_wall_insulation", - "secondary_heating", - "boiler_upgrade", - "high_heat_retention_storage_heater", - } - _allowed_goals = {"Increasing EPC"} _allowed_housing_types = {"Social", "Private"} @@ -47,10 +77,16 @@ class PlanTriggerRequest(BaseModel): # Validator to ensure exclusions are within the pre-defined possibilities @validator('exclusions', each_item=True) def check_exclusions(cls, v): - if v not in cls._allowed_exclusions: + if v not in TYPICAL_MEASURE_TYPES + SPECIFIC_MEASURES: raise ValueError(f"{v} is not an allowed exclusion") return v + @validator('inclusions', each_item=True) + def check_inclusions(cls, v): + if v not in TYPICAL_MEASURE_TYPES + SPECIFIC_MEASURES: + raise ValueError(f"{v} is not an allowed inclusion") + return v + # Validator to ensure that the goal is within the pre-defined possibilities @validator('goal') def check_goal(cls, v): diff --git a/etl/customers/bcc_tender/app.py b/etl/customers/bcc_tender/app.py index 281cf864..8cdc6e13 100644 --- a/etl/customers/bcc_tender/app.py +++ b/etl/customers/bcc_tender/app.py @@ -95,6 +95,31 @@ epc_data["eligibility_type"] = np.where( epc_data["eligibility_type"] ) +# Example EPCS to analysis +analysis_epcs = epc_data[~pd.isnull(epc_data["eligibility_type"])].copy() +# Keep just columns we need +analysis_epcs = analysis_epcs[ + [ + "UPRN", "TENURE", "CURRENT_ENERGY_RATING", "WALLS_DESCRIPTION", "ROOF_DESCRIPTION", + "CONSTRUCTION_AGE_BAND", "TOTAL_FLOOR_AREA", "PROPERTY_TYPE", "BUILT_FORM", "MAINHEAT_DESCRIPTION", + "eligibility_type", + ] +] +analysis_epcs["grouped_epc_band"] = np.where( + analysis_epcs["CURRENT_ENERGY_RATING"].isin(["D"]), + "EPC D", + "EPC E-G" +) +analysis_epcs.to_csv("/Users/khalimconn-kowlessar/Documents/hestia/Customers/bcc tender/analysis_epcs.csv", index=False) + +# Create aggregations and we store this information +agg_cols = ["CURRENT_ENERGY_RATING", "CONSTRUCTION_AGE_BAND", "PROPERTY_TYPE", "BUILT_FORM", "grouped_epc_band"] +agg_cols = ["WALLS_DESCRIPTION", "ROOF_DESCRIPTION", "MAINHEAT_DESCRIPTION"] +for col in agg_cols: + agg_df = analysis_epcs.groupby([col]).size().reset_index(name="Number of Properties") + agg_df["Percentage of Properties"] = 100 * agg_df["Number of Properties"] / agg_df["Number of Properties"].sum() + agg_df.to_csv(f"/Users/khalimconn-kowlessar/Documents/hestia/Customers/bcc tender/{col}.csv", index=False) + # Eligibiilty 6: GBIS General Eligibility, Social - tenure is social rented and EPC rating D-G, but also the property # should be rented out below market rate # This is a subset of Eligibility 3 - we likely don't need to do any scaling diff --git a/etl/customers/vectis/outputs.py b/etl/customers/vectis/outputs.py new file mode 100644 index 00000000..c6d0905f --- /dev/null +++ b/etl/customers/vectis/outputs.py @@ -0,0 +1,196 @@ +import pandas as pd +from utils.s3 import save_csv_to_s3 + + +def app(): + # This is the payload to be used to extract the energy assessment data from s3 and upload it to the database, + # as well as produce links to each of the uploaded documents. + + portfolio_id = 101 + + body = { + "portfolio_id": portfolio_id, + "surveyor": "JAFFERSONS ENERGY CONSULTANTS", + "project_code": "VEC001", + } + + # These are the recommendations based on the on-site survey of the property. + non_intrusive_recommendations = [ + { + # 2 Grove Mansions + "uprn": 121016121, + "recommendations": [ + { + "type": "draught_proofing", + "cost": 123, + "survey": True, + "sap_points": 1 + }, + { + "type": "mixed_glazing", "cost": 12345, "survey": True, + "description": "Install double glazing to north facing windows and secondary glazing to the " + "remaining windows at the front of the building", + "sap_points": 3 + }, + {"type": "trickle_vents", "cost": 500, "survey": True}, + {"type": "suspended_floor_insulation", "cost": None, "survey": True, "sap_points": 2}, + {"type": "internal_wall_insulation", "cost": None, "survey": True, "sap_points": 5}, + ] + }, + { + # 8 Grove Mansions + "uprn": 10024087855, + "recommendations": [ + {"type": "draught_proofing", "cost": 123, "survey": True, "sap_points": 2}, + { + "type": "mixed_glazing", "cost": 12345, "survey": True, + "description": "Install double glazing to north facing windows and secondary glazing to the " + "remaining windows at the front of the building", + "sap_points": 4 + }, + {"type": "trickle_vents", "cost": 500, "survey": True}, + {"type": "low_energy_lighting", "cost": None, "survey": True, "sap_points": 0}, + {"type": "internal_wall_insulation", "cost": None, "survey": True, 'sap_points': 5}, + ] + }, + { + # 9 Grove Mansions + "uprn": 121016128, + "recommendations": [ + {"type": "draught_proofing", "cost": 123, "survey": True, "sap_points": 1}, + { + "type": "mixed_glazing", "cost": 12345, "survey": True, + "description": "Install double glazing to north facing windows and secondary glazing to the " + "remaining windows at the front of the building", + "sap_points": 3 + }, + {"type": "trickle_vents", "cost": 500, "survey": True}, + {"type": "low_energy_lighting", "cost": None, "survey": True, "sap_points": 1}, + {"type": "suspended_floor_insulation", "cost": None, "sap_points": 1}, + {"type": "internal_wall_insulation", "cost": None, "survey": True, "sap_points": 6}, + ] + }, + { + # 5 Grove Mansions + "uprn": 121016124, + "recommendations": [ + { + "type": "mixed_glazing", "cost": 12345, "survey": True, + "description": "Install double glazing to north facing windows and secondary glazing to the " + "remaining windows at the front of the building", + "sap_points": 5 + }, + {"type": "trickle_vents", "cost": 500, "survey": True}, + {"type": "low_energy_lighting", "cost": None, "survey": True, "sap_points": 2}, + {"type": "internal_wall_insulation", "cost": None, "survey": True, "sap_points": 8}, + ] + }, + { + # 14 Grove Mansions + "uprn": 121016117, + "recommendations": [ + {"type": "draught_proofing", "cost": 123, "survey": True, "sap_points": 1}, + { + "type": "mixed_glazing", "cost": 12345, "survey": True, + "description": "Install double glazing to north facing windows and secondary glazing to the " + "remaining windows at the front of the building", + "sap_points": 4 + }, + {"type": "trickle_vents", "cost": 500, "survey": True}, + {"type": "low_energy_lighting", "cost": None, "survey": True, "sap_points": 1}, + {"type": "internal_wall_insulation", "cost": None, "survey": True, "sap_points": 6}, + ] + }, + { + # 19 Grove Mansions + "uprn": 10024087902, + "recommendations": [ + {"type": "low_energy_lighting", "cost": None, "survey": True, "sap_points": 0}, + {"type": "internal_wall_insulation", "cost": None, "survey": True, "sap_points": 2}, + {"type": "room_roof_insulation", "cost": None, "survey": True, "sap_points": 16}, + ] + }, + ] + + asset_list = [ + { + "uprn": 121016121, "address": "", "postcode": "" + }, + { + "uprn": 10024087855, "address": "", "postcode": "" + }, + { + "uprn": 121016128, "address": "", "postcode": "" + }, + { + "uprn": 121016124, "address": "", "postcode": "" + }, + { + "uprn": 121016117, "address": "", "postcode": "" + }, + { + "uprn": 10024087902, "address": "", "postcode": "" + }, + ] + asset_list = pd.DataFrame(asset_list) + + filename = f"{8}/{portfolio_id}/asset_list.csv" + save_csv_to_s3( + dataframe=asset_list, + bucket_name="retrofit-plan-inputs-dev", + file_name=filename + ) + + # TODO Create asset list + # TODO: Store asset list & non_intrusive_recommendations + # Store non-invasive recommendations in S3 + non_invasive_recommendations_filename = f"{8}/{portfolio_id}/non_invasive_recommendations.json" + save_csv_to_s3( + dataframe=pd.DataFrame(non_intrusive_recommendations), + bucket_name="retrofit-plan-inputs-dev", + file_name=non_invasive_recommendations_filename + ) + + # This is the first scenario which includes the first batch of recommendations + body1 = { + "portfolio_id": str(portfolio_id), + "housing_type": "Private", + "goal": "Increasing EPC", + "goal_value": "A", + "trigger_file_path": filename, + "already_installed_file_path": "", + "patches_file_path": "", + "non_invasive_recommendations_file_path": non_invasive_recommendations_filename, + "inclusions": [ + "draught_proofing", "mixed_glazing", "trickle_vents", "low_energy_lighting", + ], + "budget": None, + "scenario_name": "Quick wins - do now while tenanted", + "multi_plan": True, + } + + # This is the second scenario which includes the second batch of recommendations + body2 = { + "portfolio_id": str(portfolio_id), + "housing_type": "Private", + "goal": "Increasing EPC", + "goal_value": "A", + "trigger_file_path": filename, + "already_installed_file_path": "", + "patches_file_path": "", + "non_invasive_recommendations_file_path": non_invasive_recommendations_filename, + "inclusions": [ + "draught_proofing", + "mixed_glazing", + "trickle_vents", + "low_energy_lighting", + "suspended_floor_insulation", + "internal_wall_insulation" + ], + "budget": None, + "scenario_name": "Do when void", + "multi_plan": True, + } + + print(body1) + print(body2) diff --git a/etl/xml_survey_extraction/XmlParser.py b/etl/xml_survey_extraction/XmlParser.py index 0bc3d56b..ffe191a4 100644 --- a/etl/xml_survey_extraction/XmlParser.py +++ b/etl/xml_survey_extraction/XmlParser.py @@ -9,6 +9,7 @@ from etl.xml_survey_extraction.pcdb import heating_data PROPERTY_TYPE_LOOKUP = { "0": "House", "House": "House", + "2": "Flat" } @@ -106,6 +107,8 @@ class XmlParser: BUILT_FORM_MAP = { "1": "Detached", + "3": "End-Terrace", + "4": "Mid-Terrace", } GLAZED_AREA_MAP = { @@ -121,7 +124,9 @@ class XmlParser: } TENURE_MAP = { - '1': "Owner-occupied" + "1": "Owner-occupied", + "2": "Rented (social)", + "3": "Rented (private)", } TARIFF_MAP = { @@ -179,6 +184,39 @@ class XmlParser: # Put together all of the additional data we capture self.extract_additional_data() + def _parse_heat_loss_corridor(self): + hlc_lookup = {"2": "unheated corridor", "Unheated": "unheated corridor"} + if self.is_lig: + heat_loss_corridor = self.get_node_value('Heat-Loss-Corridor') + else: + # For some reason, this tag is spelt incorrectly in the rdsap xml + heat_loss_corridor = self.get_node_value('FlatCoridor') + return hlc_lookup[heat_loss_corridor] + + def _parse_heat_loss_corridor_length(self): + if self.is_lig: + return self.get_node_value('Unheated-Corridor-Length') + return self.get_node_value('FlatShelteredWallLength') + + def _parse_flat_storey_count(self): + # in the EPR the tag is Storeys + if self.is_lig: + storeys = None + else: + storeys = self.get_node_value('Storeys') + return storeys + + def _parse_flat_top_storey(self): + if self.is_lig: + return self.get_node_value('Top-Storey') + return None + + def _parse_floor_level(self): + if self.is_lig: + flat_details = self.xml.getElementsByTagName('SAP-Flat-Details')[0] + return flat_details.getElementsByTagName("Level")[0].firstChild.nodeValue + return None + def extract_epc(self): if self.floor_dimensions is None: @@ -190,15 +228,18 @@ class XmlParser: property_type = self.get_property_type() if property_type == "Flat": - raise NotImplementedError( - "Need to handle: heat-loss-corridor, unheated-corridor-length, flat-storey-count, flat-top-storey, " - "floor-level" - ) - heat_loss_corridor = "NO DATA!" - unheated_corridor_length = "" - flat_storey_count = "" - flat_top_storey = "" - floor_level = "NO DATA!" + heat_loss_corridor = self._parse_heat_loss_corridor() + unheated_corridor_length = self._parse_heat_loss_corridor_length() + flat_storey_count = self._parse_flat_storey_count() + flat_top_storey = self._parse_flat_top_storey() + floor_level = self._parse_floor_level() + + else: + heat_loss_corridor = "NO DATA!" + unheated_corridor_length = "" + flat_storey_count = "" + flat_top_storey = "" + floor_level = "NO DATA!" floor_height = np.mean([ float(x['room_height']) for x in self.floor_dimensions if @@ -384,9 +425,17 @@ class XmlParser: } cylinder_insulation_type = { + None: "", "1": "Foam", } + cylinder_insulation_thickness = int( + self.get_node_value('Cylinder-Insulation-Thickness') + ) if self.get_node_value('Cylinder-Insulation-Thickness') else None + + cylinder_thermostat = boolean_lookup[self.get_node_value('Cylinder-Thermostat')] \ + if self.get_node_value('Cylinder-Thermostat') else None + self.additional_data = { "file_location": self.filekey, "surveyor_name": self.surveyor_name, @@ -408,8 +457,8 @@ class XmlParser: "percent_draftproofed": int(self.get_node_value('Percent-Draughtproofed')), "has_hot_water_cylinder": boolean_lookup[self.get_node_value('Has-Hot-Water-Cylinder')], "cylinder_insulation_type": cylinder_insulation_type[self.get_node_value('Cylinder-Insulation-Type')], - "cylinder_insulation_thickness": int(self.get_node_value('Cylinder-Insulation-Thickness')), - "cylinder_thermostat": boolean_lookup[self.get_node_value('Cylinder-Thermostat')], + "cylinder_insulation_thickness": cylinder_insulation_thickness, + "cylinder_thermostat": cylinder_thermostat, "main_dwelling_ground_floor_area": float(main_dwelling_ground_floor_area), "number_of_windows": int(number_of_windows), "windows_area": float(windows_area), @@ -471,6 +520,13 @@ class XmlParser: if not property_type: property_type = self.xml.getElementsByTagName('PropertyType1') + if len(property_type) > 1: + property_types = {PROPERTY_TYPE_LOOKUP[p.firstChild.nodeValue] for p in property_type} + if len(property_types) > 1: + raise ValueError("Multiple property types found") + + return property_types.pop() + return PROPERTY_TYPE_LOOKUP[property_type[0].firstChild.nodeValue] def get_sap(self): @@ -683,6 +739,30 @@ class XmlParser: self.perimeter = self.heat_loss_perimeter + self.party_wall_length + @staticmethod + def _parse_windows_content(window, glazing_type_lookup, orientation_lookup): + + # There may not be a pvc frame + pvc_frame = window.getElementsByTagName("PVC-Frame") + pvc_frame = pvc_frame[0].firstChild.nodeValue if pvc_frame else None + + # There may not be a glazing gap for single glazed windows + glazing_gap = window.getElementsByTagName("Glazing-Gap") + glazing_gap = glazing_gap[0].firstChild.nodeValue if glazing_gap else None + + parsed = { + "window_location": window.getElementsByTagName("Window-Location")[0].firstChild.nodeValue, + "window_area": window.getElementsByTagName("Window-Area")[0].firstChild.nodeValue, + "window_type": window.getElementsByTagName("Window-Type")[0].firstChild.nodeValue, + "glazing_type": glazing_type_lookup[ + window.getElementsByTagName("Glazing-Type")[0].firstChild.nodeValue + ], + "pvc_frame": pvc_frame, + "glazing_gap": glazing_gap, + "orientation": orientation_lookup[window.getElementsByTagName("Orientation")[0].firstChild.nodeValue] + } + return parsed + def get_windows(self): """ Extracts data about the windows in the property, including the number of windows and the window type. @@ -692,7 +772,8 @@ class XmlParser: sap_windows = self.xml.getElementsByTagName("SAP-Windows")[0].getElementsByTagName("SAP-Window") glazing_type_lookup = { - "3": "double glazing, unknown install date" + "3": "double glazing, unknown install date", + "5": "Single glazing", } orientation_lookup = { @@ -707,15 +788,9 @@ class XmlParser: } self.windows = [ - { - "window_location": window.getElementsByTagName("Window-Location")[0].firstChild.nodeValue, - "window_area": window.getElementsByTagName("Window-Area")[0].firstChild.nodeValue, - "window_type": window.getElementsByTagName("Window-Type")[0].firstChild.nodeValue, - "glazing_type": glazing_type_lookup[ - window.getElementsByTagName("Glazing-Type")[0].firstChild.nodeValue - ], - "pvc_frame": window.getElementsByTagName("PVC-Frame")[0].firstChild.nodeValue, - "glazing_gap": window.getElementsByTagName("Glazing-Gap")[0].firstChild.nodeValue, - "orientation": orientation_lookup[window.getElementsByTagName("Orientation")[0].firstChild.nodeValue] - } for window in sap_windows + self._parse_windows_content( + window=window, + glazing_type_lookup=glazing_type_lookup, + orientation_lookup=orientation_lookup + ) for window in sap_windows ] diff --git a/etl/xml_survey_extraction/app.py b/etl/xml_survey_extraction/app.py index f5394abf..ffe6274c 100644 --- a/etl/xml_survey_extraction/app.py +++ b/etl/xml_survey_extraction/app.py @@ -12,14 +12,15 @@ logger = setup_logger() BUCKET = "retrofit-energy-assessments-dev" USER_ID = 8 +non_invasive_recommendations_filepath = "{USER_ID}/{PORTFOLIO_ID}/non_invasive_recommendations.csv" SCENARIOS = { - 86: { - "project_code": "VDE001", + 101: { + "project_code": "VEC001", "surveyor": "JAFFERSONS ENERGY CONSULTANTS", "bodies": [ # Scenario A: Cavity wall insulation { - "portfolio_id": str(86), + "portfolio_id": str(101), "housing_type": "Private", "goal": "Increasing EPC", "goal_value": "A", @@ -27,14 +28,16 @@ SCENARIOS = { "already_installed_file_path": "", "patches_file_path": "", "non_invasive_recommendations_file_path": "", - "exclusions": ["floor_insulation", "fireplace", "solar_pv", "heating", 'lighting'], + "inclusions": [ + "draught_proofing", "secondary_glazing", "trickle_vents", "low_energy_lighting", + ], "budget": None, - "scenario_name": "Low Hanging Fruit", + "scenario_name": "Quick wins - do now while tenanted", "multi_plan": True, }, # Scenario B: CWI, Solar PV, AHSP { - "portfolio_id": str(86), + "portfolio_id": str(101), "housing_type": "Private", "goal": "Increasing EPC", "goal_value": "A", @@ -42,66 +45,97 @@ SCENARIOS = { "already_installed_file_path": "", "patches_file_path": "", "non_invasive_recommendations_file_path": "", - "exclusions": ["floor_insulation", "fireplace", 'lighting'], + "inclusions": [ + "draught_proofing", + "secondary_glazing", + "trickle_vents", + "low_energy_lighting", + "suspended_floor_insulation", + "internal_wall_insulation" + ], "budget": None, - "scenario_name": "Deep Retrofit", + "scenario_name": "Do when void", "multi_plan": True, }, - # Scenario C, CWI, floor insulation, PV, AHSP - { - "portfolio_id": str(86), - "housing_type": "Private", - "goal": "Increasing EPC", - "goal_value": "A", - "trigger_file_path": "", - "already_installed_file_path": "", - "patches_file_path": "", - "non_invasive_recommendations_file_path": "", - "exclusions": ["fireplace", 'lighting'], - "budget": None, - "scenario_name": "Whole House Retrofit", - "multi_plan": True, - } ] }, - 87: { - "project_code": "VDE002", - "surveyor": "JAFFERSONS ENERGY CONSULTANTS", - "bodies": [ - # Scenario A: Solar PV, AHSP - { - "portfolio_id": str(87), - "housing_type": "Private", - "goal": "Increasing EPC", - "goal_value": "A", - "trigger_file_path": "", - "already_installed_file_path": "", - "patches_file_path": "", - "non_invasive_recommendations_file_path": "", - "exclusions": ["floor_insulation", "fireplace"], - "budget": None, - "scenario_name": "Deep Retrofit", - "multi_plan": True, - }, - # Scenario B, floor insulation, PV, AHSP - { - "portfolio_id": str(87), - "housing_type": "Private", - "goal": "Increasing EPC", - "goal_value": "A", - "trigger_file_path": "", - "already_installed_file_path": "", - "patches_file_path": "", - "non_invasive_recommendations_file_path": "", - "exclusions": ["fireplace"], - "budget": None, - "scenario_name": "Whole House Retrofit", - "multi_plan": True, - } - ] - } } +# TODO: These non-intrusive recommendations should be detected from the EPRs, the scenarios and the condition report? +# For recommendations like trickle vents, we can deduce this from the condition report, depending on the +# ventilation of the room and the presence of trickle vents. +NON_INTRUSITVE_RECOMMENDATIONS = [ + { + # 2 Grove Mansions + "uprn": 121016121, + "recommendations": [ + { + "type": "draught_proofing", + "cost": None, + "survey": True + }, + {"type": "secondary_glazing", "cost": None, "survey": True}, + {"type": "trickle_vents", "cost": None, "survey": True}, + {"type": "suspended_floor_insulation", "cost": None, "survey": True}, + {"type": "internal_wall_insulation", "cost": None, "survey": True}, + ] + }, + { + # 8 Grove Mansions + "uprn": 10024087855, + "recommendations": [ + {"type": "draught_proofing", "cost": None, "survey": True}, + {"type": "secondary_glazing", "cost": None, "survey": True}, + {"type": "trickle_vents", "cost": None, "survey": True}, + {"type": "low_energy_lighting", "cost": None, "survey": True}, + {"type": "internal_wall_insulation", "cost": None, "survey": True}, + ] + }, + { + # 9 Grove Mansions + "uprn": 121016128, + "recommendations": [ + {"type": "draught_proofing", "cost": None, "survey": True}, + {"type": "secondary_glazing", "cost": None, "survey": True}, + {"type": "trickle_vents", "cost": None, "survey": True}, + {"type": "low_energy_lighting", "cost": None, "survey": True}, + {"type": "suspended_floor_insulation", "cost": None}, + {"type": "internal_wall_insulation", "cost": None, "survey": True}, + ] + }, + { + # 5 Grove Mansions + "uprn": 121016124, + "recommendations": [ + {"type": "secondary_glazing", "cost": None, "survey": True}, + {"type": "trickle_vents", "cost": None, "survey": True}, + {"type": "low_energy_lighting", "cost": None, "survey": True}, + {"type": "internal_wall_insulation", "cost": None, "survey": True}, + ] + }, + { + # 14 Grove Mansions + "uprn": 121016117, + "recommendations": [ + {"type": "draught_proofing", "cost": None, "survey": True}, + {"type": "secondary_glazing", "cost": None, "survey": True}, + {"type": "trickle_vents", "cost": None, "survey": True}, + {"type": "low_energy_lighting", "cost": None, "survey": True}, + {"type": "internal_wall_insulation", "cost": None, "survey": True}, + ] + }, + { + # 19 Grove Mansions + "uprn": 121016117, + "recommendations": [ + {"type": "low_energy_lighting", "cost": None, "survey": True}, + {"type": "secondary_glazing", "cost": None, "survey": True}, + {"type": "internal_wall_insulation", "cost": None, "survey": True}, + {"type": "room_roof_insulation", "cost": None, "survey": True}, + ] + }, +] + def main(): """ @@ -166,7 +200,7 @@ def main(): # For each property, we download the xmls and extract the data database_data = [] for uprn, xmls in assessments_map.items(): - + extracted_data = {} for xml in xmls: xml_data = read_from_s3(bucket_name=BUCKET, s3_file_name=xml) diff --git a/infrastructure/terraform/main.tf b/infrastructure/terraform/main.tf index 972722bb..9c2b7d47 100644 --- a/infrastructure/terraform/main.tf +++ b/infrastructure/terraform/main.tf @@ -176,7 +176,7 @@ module "retrofit_hotwater_kwh_predictions" { } module "retrofit_energy_assessments" { - source = "./modules/s3" + source = "./modules/s3_presignable_bucket" bucketname = "retrofit-energy-assessments-${var.stage}" allowed_origins = var.allowed_origins } diff --git a/recommendations/DraughtProofingRecommendations.py b/recommendations/DraughtProofingRecommendations.py new file mode 100644 index 00000000..197d80cc --- /dev/null +++ b/recommendations/DraughtProofingRecommendations.py @@ -0,0 +1,56 @@ +from backend.Property import Property + + +class DraughtProofingRecommendations: + + def __init__(self, property_instance: Property): + self.property = property_instance + + self.recommendation = [] + + def recommend(self): + """ + In some cases, we can identify the need for draught proofing from the EPC recommendations, however the initial + implementation of this class will just assume that we are picking up a non-invasive recommendation from the + survey + """ + + # For the moment, draught proofing doesn't have a phase impact + + draught_proofing_recommendation_config = next( + (r for r in self.property.non_invasive_recommendations if + r["type"] == "draught_proofing"), + {} + ) + + if not draught_proofing_recommendation_config: + return + + description = ( + "Draught proof doors and windows to improve energy efficiency" if + not draught_proofing_recommendation_config.get("description") + else draught_proofing_recommendation_config["description"] + ) + + # We recommend installing two mechanical ventilation systems + self.recommendation = [ + { + "phase": None, + "parts": [], + "type": "draught_proofing", + "description": description, + "starting_u_value": None, + "new_u_value": None, + "already_installed": False, + "sap_points": draught_proofing_recommendation_config["sap_points"], + "heat_demand": 0, + "kwh_savings": 0, + "co2_equivalent_savings": 0, + "energy_cost_savings": 0, + "total": draught_proofing_recommendation_config["cost"], + # We use a very simple and rough estimate of 4 hours per unit + "labour_hours": draught_proofing_recommendation_config.get("labour_hours", 8), + "labour_days": draught_proofing_recommendation_config.get("labour_days", 1), # Assume 8 hour day + "survey": True + } + ] diff --git a/recommendations/FloorRecommendations.py b/recommendations/FloorRecommendations.py index c63d45c2..a1f63f96 100644 --- a/recommendations/FloorRecommendations.py +++ b/recommendations/FloorRecommendations.py @@ -5,6 +5,7 @@ import pandas as pd from BaseUtility import Definitions from datatypes.enums import QuantityUnits +from backend.app.plan.schemas import MEASURE_MAP from backend.Property import Property from recommendations.recommendation_utils import ( r_value_per_mm_to_u_value, calculate_u_value_uplift, is_diminishing_returns, update_lowest_selected_u_value, @@ -63,14 +64,13 @@ class FloorRecommendations(Definitions): ] ] - self.exposed_floor_insulation_materials = [ - part for part in materials if part["type"] == "exposed_floor_insulation" - ] + def recommend(self, phase=0, measures=None): - # TODO: To be completed - self.exposed_floor_non_insulation_materials = [] + measures = MEASURE_MAP["floor_insulation"] if measures is None else measures + + if not measures: + return - def recommend(self, phase=0): u_value = self.property.floor["thermal_transmittance"] property_type = self.property.data["property-type"] floor_area = self.property.insulation_floor_area @@ -124,7 +124,7 @@ class FloorRecommendations(Definitions): self.property.floor["is_suspended"] or self.property.floor["is_to_unheated_space"] or self.property.floor["is_to_external_air"] - ): + ) and "suspended_floor_insulation" in measures: # Given the U-value, we recommend underfloor insulation self.recommend_floor_insulation( phase=phase, @@ -134,7 +134,7 @@ class FloorRecommendations(Definitions): ) return - if self.property.floor["is_solid"]: + if self.property.floor["is_solid"] and "solid_floor_insulation" in measures: # Given the U-value, we recommend solid floor insulation options which are usually solid foam self.recommend_floor_insulation( u_value=u_value, diff --git a/recommendations/HeatingRecommender.py b/recommendations/HeatingRecommender.py index 78dce329..dc433806 100644 --- a/recommendations/HeatingRecommender.py +++ b/recommendations/HeatingRecommender.py @@ -1,6 +1,7 @@ from recommendations.Costs import Costs, BOILER_UPGRADE_SCHEME_ASHP_VALUE from recommendations.recommendation_utils import check_simulation_difference, override_costs from backend.Property import Property +from backend.app.plan.schemas import MEASURE_MAP from etl.epc_clean.epc_attributes.MainheatAttributes import MainHeatAttributes from etl.epc_clean.epc_attributes.HotWaterAttributes import HotWaterAttributes from etl.epc_clean.epc_attributes.MainFuelAttributes import MainFuelAttributes @@ -28,7 +29,7 @@ class HeatingRecommender: self.property.main_heating["clean_description"] in self.ELECTRIC_HEATING_DESCRIPTIONS ) - def is_high_heat_retention_valid(self, ashp_only_heating_recommendation, exclusions): + def is_high_heat_retention_valid(self, ashp_only_heating_recommendation, measures): """ Check conditions if high heat retention storage is valid :return: @@ -43,10 +44,11 @@ class HeatingRecommender: has_electric = self.has_electric_heating_description or electric_heating_assumed return ( - has_electric and (not ashp_only_heating_recommendation) and ("boiler_upgrade" not in exclusions) + has_electric and (not ashp_only_heating_recommendation) and + ("high_heat_retention_storage_heater" in measures) ) - def is_boiler_upgrade_suitable(self, exclusions, ashp_only_heating_recommendation): + def is_boiler_upgrade_suitable(self, measures, ashp_only_heating_recommendation): """ These are the conditions we apply to recommend a boiler installation :return: @@ -84,12 +86,12 @@ class HeatingRecommender: portable_heaters_has_mains ) and (not ashp_only_heating_recommendation) and - ("boiler_upgrade" not in exclusions) + ("boiler_upgrade" in measures) ) return is_valid, has_boiler - def recommend(self, has_cavity_or_loft_recommendations, phase=0, exclusions=None): + def recommend(self, has_cavity_or_loft_recommendations, phase=0, measures=None): """ Produces heating recommendations @@ -97,16 +99,18 @@ class HeatingRecommender: recommendation. If there are cavity or loft recommendations, the property would need to complete those measures before being able to get the boiler upgrade scheme benefits. The messaging in the front end would be to :param phase: indicates the phase of the retrofit programme - :param exclusions: A list of exclusions for the recommendations + :param measures: A list of measures for the recommendations """ + measures = MEASURE_MAP["heating"] if measures is None else measures + # TODO: We could have a system flush recommendation for an existing boiler, where there is no need to replace # the boiler, but instead flushing the system will make it run more efficiently. There is a cost for this # in the Costs class, stored as SYSTEM_FLUSH_COST # TODO: Right now, we don't have recommendations for electric boilers - we should probably have one - exclusions = [] if exclusions is None else exclusions + # if we have a non-invasive ashp recommendation, we get the configuration directly from the property instance non_invasive_ashp_recommendation = next( (r for r in self.property.non_invasive_recommendations if r["type"] == "air_source_heat_pump"), {"suitable": True} @@ -122,7 +126,7 @@ class HeatingRecommender: # This first iteration of the recommender will provide very basic recommendation # We recommend heating controls based on the main heating system - hhr_valid = self.is_high_heat_retention_valid(ashp_only_heating_recommendation, exclusions) + hhr_valid = self.is_high_heat_retention_valid(ashp_only_heating_recommendation, measures) if hhr_valid: # Recommend high heat retention storage heaters @@ -131,7 +135,7 @@ class HeatingRecommender: self.recommend_hhr_storage_heaters(phase=phase, system_change=True, heating_controls_only=False) gas_boiler_suitable, has_boiler = self.is_boiler_upgrade_suitable( - exclusions=exclusions, ashp_only_heating_recommendation=ashp_only_heating_recommendation + measures=measures, ashp_only_heating_recommendation=ashp_only_heating_recommendation ) if gas_boiler_suitable: @@ -153,7 +157,7 @@ class HeatingRecommender: # In the future, we'll allow overrides, so that non-intrusive surveys can contradict these conditions # and either allow or prevent the recommendation of an air source heat pump - if self.property.is_ashp_valid(exclusions=exclusions) and non_invasive_ashp_recommendation["suitable"]: + if self.property.is_ashp_valid(measures=measures) and non_invasive_ashp_recommendation["suitable"]: self.recommend_air_source_heat_pump( phase=phase, has_cavity_or_loft_recommendations=has_cavity_or_loft_recommendations, diff --git a/recommendations/LightingRecommendations.py b/recommendations/LightingRecommendations.py index 1186b0a9..b9456f8d 100644 --- a/recommendations/LightingRecommendations.py +++ b/recommendations/LightingRecommendations.py @@ -66,6 +66,11 @@ class LightingRecommendations: if self.property.lighting["low_energy_proportion"] == 100: return + leds_recommendation_config = next( + (r for r in self.property.non_invasive_recommendations if r["type"] == "low_energy_lighting"), + {} + ) + number_lighting_outlets = self.property.number_lighting_outlets # Number non lel outlets @@ -79,6 +84,9 @@ class LightingRecommendations: return # Get the cost of the fittings + if leds_recommendation_config.get("cost"): + raise NotImplementedError("Costs from for low energy lighting have not been implemented") + cost_result = self.costs.low_energy_lighting( number_of_lights=number_non_lel_outlets, number_current_lel_lights=number_lighting_outlets - number_non_lel_outlets, @@ -97,6 +105,12 @@ class LightingRecommendations: cost_result = override_costs(cost_result) description = "Low energy lighting has already been installed, no further action required" + if leds_recommendation_config.get("sap_points") is not None: + # This could be zero points + sap_points = leds_recommendation_config["sap_points"] + else: + sap_points = round(2 * (number_non_lel_outlets / number_lighting_outlets), 2) + self.recommendation = [ { "phase": phase, @@ -108,13 +122,14 @@ class LightingRecommendations: "already_installed": already_installed, # For SAP points, we use the fact that lighting is usually worth 2 points and we scale this to # the proportion of lights that will be set to low energy - "sap_points": round(2 * (number_non_lel_outlets / number_lighting_outlets), 2), + "sap_points": sap_points, "kwh_savings": heat_demand_change, "co2_equivalent_savings": carbon_change, "description_simulation": { "lighting-energy-eff": "Very Good", "lighting-description": "Low energy lighting in all fixed outlets", }, - **cost_result + **cost_result, + "survey": leds_recommendation_config.get("survey", False) } ] diff --git a/recommendations/Recommendations.py b/recommendations/Recommendations.py index 4f75b30b..45498a8a 100644 --- a/recommendations/Recommendations.py +++ b/recommendations/Recommendations.py @@ -14,9 +14,11 @@ from recommendations.WindowsRecommendations import WindowsRecommendations from recommendations.HeatingRecommender import HeatingRecommender from recommendations.HotwaterRecommendations import HotwaterRecommendations from recommendations.SecondaryHeating import SecondaryHeating +from recommendations.DraughtProofingRecommendations import DraughtProofingRecommendations from backend.ml_models.AnnualBillSavings import AnnualBillSavings from backend.apis.GoogleSolarApi import GoogleSolarApi import backend.app.assumptions as assumptions +from backend.app.plan.schemas import TYPICAL_MEASURE_TYPES, SPECIFIC_MEASURES, MEASURE_MAP ASHP_COP = 3 STARTING_DUMMY_ID_VALUE = -9999 @@ -32,15 +34,24 @@ class Recommendations: property_instance: Property, materials: List, exclusions: List[str] = None, + inclusions: List[str] = None, ): """ :param property_instance: Instance of the Property class, for the home associated to property_id :param materials: List of materials to be used in the recommendations + :param exclusions: List of specific measures or measure types to exclude from recommendations. Defaulted to + None, meaning no exclusions to be applied + :param inclusions: List of specific measures of measure types to include. Defaulted to None, meaning all + measures are included """ self.property_instance = property_instance self.materials = materials self.exclusions = exclusions if exclusions else [] + self.inclusions = inclusions if inclusions else [] + + self.all_typical_measures = TYPICAL_MEASURE_TYPES + self.all_specific_measures = SPECIFIC_MEASURES self.floor_recommender = FloorRecommendations(property_instance=property_instance, materials=materials) self.wall_recomender = WallRecommendations(property_instance=property_instance, materials=materials) @@ -48,6 +59,7 @@ class Recommendations: self.ventilation_recomender = VentilationRecommendations( property_instance=property_instance, materials=materials ) + self.draught_proofing_recommender = DraughtProofingRecommendations(property_instance=property_instance) self.fireplace_recommender = FireplaceRecommendations(property_instance=property_instance) self.lighting_recommender = LightingRecommendations(property_instance=property_instance, materials=materials) self.windows_recommender = WindowsRecommendations(property_instance=property_instance, materials=materials) @@ -56,6 +68,27 @@ class Recommendations: self.hotwater_recommender = HotwaterRecommendations(property_instance=property_instance) self.secondary_heating_recommender = SecondaryHeating(property_instance=property_instance) + def find_included_measures(self): + """ + Determines the set of measures to be included in recommendations + """ + + # Generally, inclusions is a global option and will overrule specific property non-invasive recommendations. + # This is done so that we can use inclusions to specify scenarios. + + inclusions_full = [MEASURE_MAP[x] if x in MEASURE_MAP else x for x in self.inclusions] + exclusions_full = [MEASURE_MAP[x] if x in MEASURE_MAP else x for x in self.exclusions] + + if inclusions_full and exclusions_full: + # All typical measures + return self.all_specific_measures + + if inclusions_full: + return inclusions_full + + if exclusions_full: + return [m for m in self.all_specific_measures if m not in exclusions_full] + def recommend(self): """ @@ -68,19 +101,18 @@ class Recommendations: property_recommendations = [] phase = 0 + measures = self.find_included_measures() # Building Fabric - if "wall_insulation" not in self.exclusions: - self.wall_recomender.recommend(phase=phase, exclusions=self.exclusions) - if self.wall_recomender.recommendations: - property_recommendations.append(self.wall_recomender.recommendations) - phase += 1 + self.wall_recomender.recommend(phase=phase, measures=measures) + if self.wall_recomender.recommendations: + property_recommendations.append(self.wall_recomender.recommendations) + phase += 1 - if "roof_insulation" not in self.exclusions: - self.roof_recommender.recommend(phase=phase) - if self.roof_recommender.recommendations: - property_recommendations.append(self.roof_recommender.recommendations) - phase += 1 + self.roof_recommender.recommend(phase=phase, measures=measures) + if self.roof_recommender.recommendations: + property_recommendations.append(self.roof_recommender.recommendations) + phase += 1 # Ventilation recommendations # We only produce a ventilation recommendation if the property is recommended to have wall or roof @@ -90,103 +122,122 @@ class Recommendations: # real impact on the SAP score. Therefore, we don't need to include phasing for ventilation. If we # have any # wall or roof recommendations, we will ensure that ventilation is included in the simulation - if "ventilation" not in self.exclusions: - if self.wall_recomender.recommendations or self.roof_recommender.recommendations: - self.ventilation_recomender.recommend() - if self.ventilation_recomender.recommendation: - property_recommendations.append(self.ventilation_recomender.recommendation) + if ( + (self.wall_recomender.recommendations or self.roof_recommender.recommendations) and + ("ventilation" in measures) + ): + self.ventilation_recomender.recommend() + if self.ventilation_recomender.recommendation: + property_recommendations.append(self.ventilation_recomender.recommendation) - if "floor_insulation" not in self.exclusions: - self.floor_recommender.recommend(phase=phase) + if "trickle_vents" in measures: + # This is a recommendatin that typically comes from an energy assessment + trickle_vents_rec = self.ventilation_recomender.recommend_trickle_vents() + if trickle_vents_rec: + property_recommendations.append(trickle_vents_rec) + + if "draught_proofing" in measures: + # This is a recommendation that in some instances we can recommend, by deducing it from the SAP + # recommendations, however we will implement this later + self.draught_proofing_recommender.recommend() + if self.draught_proofing_recommender.recommendation: + property_recommendations.append(self.draught_proofing_recommender.recommendation) + + if "floor_insulation" in measures: + self.floor_recommender.recommend(phase=phase, measures=measures) if self.floor_recommender.recommendations: property_recommendations.append(self.floor_recommender.recommendations) phase += 1 - if "windows" not in self.exclusions: + if "windows" in measures: self.windows_recommender.recommend(phase=phase) if self.windows_recommender.recommendation: property_recommendations.append(self.windows_recommender.recommendation) phase += 1 - if "fireplace" not in self.exclusions: + if "mixed_glazing" in measures: + # This is a recommendation that comes exclusively from an energy assessment + mixed_glazing_rec = self.windows_recommender.recommend_mixed_glazing(phase=phase) + if mixed_glazing_rec: + property_recommendations.append(mixed_glazing_rec) + phase += 1 + + if "fireplace" in measures: self.fireplace_recommender.recommend(phase=phase) if self.fireplace_recommender.recommendation: property_recommendations.append(self.fireplace_recommender.recommendation) phase += 1 - # Heating and Electical systems - if "heating" not in self.exclusions: + cavity_or_loft_recommendations = [ + r for r in self.wall_recomender.recommendations + self.roof_recommender.recommendations + if r["type"] in ["cavity_wall_insulation", "loft_insulation"] + ] + has_cavity_or_loft_recommendations = len(cavity_or_loft_recommendations) > 0 - cavity_or_loft_recommendations = [ - r for r in self.wall_recomender.recommendations + self.roof_recommender.recommendations - if r["type"] in ["cavity_wall_insulation", "loft_insulation"] - ] - has_cavity_or_loft_recommendations = len(cavity_or_loft_recommendations) > 0 + self.heating_recommender.recommend( + phase=phase, + measures=measures, + has_cavity_or_loft_recommendations=has_cavity_or_loft_recommendations, + ) + if ( + self.heating_recommender.heating_recommendations or + self.heating_recommender.heating_control_recommendations + ): - self.heating_recommender.recommend( - phase=phase, - has_cavity_or_loft_recommendations=has_cavity_or_loft_recommendations, - exclusions=self.exclusions - ) - if ( - self.heating_recommender.heating_recommendations or - self.heating_recommender.heating_control_recommendations - ): - - # We split into first and second phase recommendations - first_phase_recommendations = [ - r for r in ( - self.heating_recommender.heating_recommendations + - self.heating_recommender.heating_control_recommendations - ) - if r["phase"] == phase - ] - second_phase_recommendations = [ - r for r in ( - self.heating_recommender.heating_recommendations + - self.heating_recommender.heating_control_recommendations - ) - if r["phase"] == phase + 1 - ] - - if first_phase_recommendations: - property_recommendations.append(first_phase_recommendations) - - if second_phase_recommendations: - property_recommendations.append(second_phase_recommendations) - - # We check if we have distinct heating and heating controls recommendations - # If so, we increment by 2 (one of the heating system, one for the heating controls) - # otherwise we incremenet by 1 - max_used_phase = max( - [rec["phase"] for rec in - self.heating_recommender.heating_recommendations + - self.heating_recommender.heating_control_recommendations] + # We split into first and second phase recommendations + first_phase_recommendations = [ + r for r in ( + self.heating_recommender.heating_recommendations + + self.heating_recommender.heating_control_recommendations ) - amount_to_increment = max_used_phase - phase + 1 - phase += amount_to_increment + if r["phase"] == phase + ] + second_phase_recommendations = [ + r for r in ( + self.heating_recommender.heating_recommendations + + self.heating_recommender.heating_control_recommendations + ) + if r["phase"] == phase + 1 + ] + + if first_phase_recommendations: + property_recommendations.append(first_phase_recommendations) + + if second_phase_recommendations: + property_recommendations.append(second_phase_recommendations) + + # We check if we have distinct heating and heating controls recommendations + # If so, we increment by 2 (one of the heating system, one for the heating controls) + # otherwise we incremenet by 1 + max_used_phase = max( + [rec["phase"] for rec in + self.heating_recommender.heating_recommendations + + self.heating_recommender.heating_control_recommendations] + ) + amount_to_increment = max_used_phase - phase + 1 + phase += amount_to_increment # Hot water - if "hot_water" not in self.exclusions: + if "hot_water" in measures: self.hotwater_recommender.recommend(phase=phase) if self.hotwater_recommender.recommendations: property_recommendations.append(self.hotwater_recommender.recommendations) phase += 1 - if "lighting" not in self.exclusions: + if "low_energy_lighting" in measures: self.lighting_recommender.recommend(phase=phase) if self.lighting_recommender.recommendation: property_recommendations.append(self.lighting_recommender.recommendation) phase += 1 - if "secondary_heating" not in self.exclusions: + if "secondary_heating" in measures: self.secondary_heating_recommender.recommend(phase=phase) if self.secondary_heating_recommender.recommendation: property_recommendations.append(self.secondary_heating_recommender.recommendation) phase += 1 # Renewables - if "solar_pv" not in self.exclusions: + if "solar_pv" in measures: self.solar_recommender.recommend(phase=phase) if self.solar_recommender.recommendation: property_recommendations.append(self.solar_recommender.recommendation) @@ -197,13 +248,13 @@ class Recommendations: # We also need to create the representative recommendations for each recommendation type property_representative_recommendations = self.create_representative_recommendations( - property_recommendations, non_invasive_recommendations=self.property_instance.non_invasive_recommendations + property_recommendations, ) return property_recommendations, property_representative_recommendations @staticmethod - def create_representative_recommendations(property_recommendations, non_invasive_recommendations): + def create_representative_recommendations(property_recommendations): """ This method will create a representative recommendation for each recommendation type In order to create a representative recommendation, we choose the recommendation that has: @@ -220,12 +271,10 @@ class Recommendations: # If the property was initially surveyed as filled, but the cavity was only partially filled, we don't # want to include the cavity wall insulation recommendation in the defaults - # if (recommendations_by_type[0].get("type") == "cavity_wall_insulation") and ( - # "cavity_surveyed_as_filled_is_partial" in non_invasive_recommendations - # ): - # continue - if recommendations_by_type[0].get("type") == "mechanical_ventilation": + if recommendations_by_type[0].get("type") in [ + "mechanical_ventilation", "trickle_vents", "draught_proofing" + ]: continue has_u_value = recommendations_by_type[0].get("new_u_value") is not None @@ -257,7 +306,10 @@ class Recommendations: elif not has_u_value and has_sap_points: # Sort the options by the cost per SAP point improvement - the lower the better for rec in recommendations: - rec["efficiency"] = rec["total"] / rec["sap_points"] + if rec["sap_points"] == 0: + rec["efficiency"] = 0 + else: + rec["efficiency"] = rec["total"] / rec["sap_points"] elif has_rank: # Sort the options by rank - the lower the better for rec in recommendations: @@ -394,8 +446,9 @@ class Recommendations: impact_summary = [] for recommendations_by_type in property_recommendations: for rec in recommendations_by_type: - if rec["type"] == "mechanical_ventilation": - # We don't have a percieved sap impact of mechanical ventilation + if rec["type"] in ["mechanical_ventilation", "trickle_vents", "draught_proofing"]: + # We don't have a percieved sap impact of mechanical ventilation or trickle vents, and we don't + # have the capacity to score draught proofing continue phase_energy_efficiency_metrics = { @@ -481,8 +534,10 @@ class Recommendations: property_phase_impact["carbon"], rec["co2_equivalent_savings"] ) - # Insert this information into the recommendation - rec["sap_points"] = property_phase_impact["sap"] + # Insert this information into the recommendation. + if not rec.get("survey", False): + rec["sap_points"] = property_phase_impact["sap"] + rec["co2_equivalent_savings"] = property_phase_impact["carbon"] rec["heat_demand"] = property_phase_impact["heat_demand"] @@ -536,7 +591,7 @@ class Recommendations: "heating_cop": mapped["cop"], "hotwater_cop": 1 } - mapped_hotwater = DESCRIPTIONS_TO_FUEL_TYPES[hotwater_description] + mapped_hotwater = assumptions.DESCRIPTIONS_TO_FUEL_TYPES[hotwater_description] return { "heating_fuel_type": heating_fuel, "hotwater_fuel_type": mapped_hotwater["fuel"], @@ -656,18 +711,6 @@ class Recommendations: pd.isnull(kwh_impact_table["hotwater_fuel_type"]).sum()): raise Exception("Fuel type is missing") - # kwh_impact_table["heating_fuel_type"] = np.where( - # kwh_impact_table["id"] == STARTING_DUMMY_ID_VALUE, - # property_instance.heating_energy_source, - # kwh_impact_table["heating_fuel_type"] - # ) - # - # kwh_impact_table["hotwater_fuel_type"] = np.where( - # kwh_impact_table["id"] == STARTING_DUMMY_ID_VALUE, - # property_instance.hot_water_energy_source, - # kwh_impact_table["hotwater_fuel_type"] - # ) - # We now calculate the fuel cost for k in ["heating", "hotwater"]: kwh_impact_table[f"{k}_cost"] = kwh_impact_table.apply( @@ -679,7 +722,8 @@ class Recommendations: # We now deduce if any of the recommendations result in a change of fuel type for recs in property_recommendations: for rec in recs: - if rec["type"] == "mechanical_ventilation": + if rec["type"] in ["mechanical_ventilation", "trickle_vents", "draught_proofing"]: + # We cannot score the impact on draught proofing continue rec_impact = kwh_impact_table[kwh_impact_table["recommendation_id"] == rec["recommendation_id"]] diff --git a/recommendations/RoofRecommendations.py b/recommendations/RoofRecommendations.py index 5075928e..fe027371 100644 --- a/recommendations/RoofRecommendations.py +++ b/recommendations/RoofRecommendations.py @@ -1,6 +1,7 @@ import math import pandas as pd from backend.Property import Property +from backend.app.plan.schemas import MEASURE_MAP from typing import List from datatypes.enums import QuantityUnits from recommendations.recommendation_utils import ( @@ -78,13 +79,13 @@ class RoofRecommendations: return self.recommendations - def is_loft_already_insulated(self): + def is_loft_already_insulated(self, measures): """ Check if the loft is already insulated """ # If we have a non-invasive recommendation for the loft insulation, we can assume that the loft is not insulated - if "loft_insulation" in self.property.non_invasive_recommendations: + if "loft_insulation" in measures: return False return (self.insulation_thickness > self.MINIMUM_LOFT_ISULATION_MM) and self.property.roof["is_pitched"] @@ -108,11 +109,13 @@ class RoofRecommendations: return full_insulated_room_roof or room_roof_insulated_at_rafters - def recommend(self, phase): + def recommend(self, phase, measures=None): if self.property.roof["has_dwelling_above"]: return + measures = MEASURE_MAP["roof_insulation"] if measures is None else measures + u_value = self.property.roof["thermal_transmittance"] # We check if the roof is already insulated and if so, we exit @@ -120,7 +123,7 @@ class RoofRecommendations: # Building regulations part L recommend installing at least 270mm of insulation, however generally we # experience diminishing returns in terms of SAP once we go beyond around 150mm of insulation # This only holds true for pitched roofs. - if self.is_loft_already_insulated(): + if self.is_loft_already_insulated(measures): return if (self.insulation_thickness >= self.MINIMUM_FLAT_ROOF_ISULATION_MM) and self.property.roof["is_flat"]: @@ -152,20 +155,24 @@ class RoofRecommendations: ) self.estimated_u_value = u_value - if (u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE) and ( - "loft_insulation" not in self.property.non_invasive_recommendations + if (u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE) or ( + "loft_insulation" not in measures ): # The Roof is already compliant return - if self.property.roof["is_pitched"] or self.property.roof["is_flat"]: - insulation_thickness = ( - 0 if "loft_insulation" not in self.property.non_invasive_recommendations else self.insulation_thickness - ) + if (self.property.roof["is_pitched"] and "loft_insulation" in measures) or ( + self.property.roof["is_flat"] and "flat_roof_insulation" in measures + ): + insulation_thickness = 0 if "loft_insulation" not in measures else self.insulation_thickness self.recommend_roof_insulation(u_value, insulation_thickness, self.property.roof, phase) return - if self.property.roof["is_roof_room"]: + # There are cases where the property might have a room roof as the second roof, but we have a recommendation for + # it, so we allow this override + if self.property.roof["is_roof_room"] and ("room_roof_insulation" in measures) or ( + "room_roof_insulation" in [x["type"] for x in self.property.non_invasive_recommendations] + ): self.recommend_room_roof_insulation(u_value, phase) return @@ -418,6 +425,10 @@ class RoofRecommendations: } ] + rir_non_invasive_recommendation = next( + (x for x in self.property.non_invasive_recommendations if x["type"] == "room_roof_insulation"), {} + ) + # lowest_selected_u_value = None recommendations = [] for material in roof_roof_insulation_materials: @@ -442,7 +453,13 @@ class RoofRecommendations: # if new_u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE: # lowest_selected_u_value = update_lowest_selected_u_value(lowest_selected_u_value, new_u_value) - estimated_cost = cost_per_unit * self.property.insulation_floor_area + estimated_cost = ( + cost_per_unit * self.property.insulation_floor_area if + rir_non_invasive_recommendation.get("cost") is None else + rir_non_invasive_recommendation.get("cost") + ) + + sap_points = rir_non_invasive_recommendation.get("sap_points", None) # Could also be Roof room(s), ceiling insulated new_descriptin = "Pitched, insulated at rafters" @@ -480,14 +497,15 @@ class RoofRecommendations: "description": "Insulate room in roof at rafters and re-decorate", "starting_u_value": u_value, "new_u_value": None, - "sap_points": None, + "sap_points": sap_points, "simulation_config": simulation_config, "description_simulation": { "roof-description": new_descriptin, "roof-energy-eff": new_efficiency }, **cost_result, - "already_installed": already_installed + "already_installed": already_installed, + "survey": rir_non_invasive_recommendation.get("survey", None) } ) diff --git a/recommendations/VentilationRecommendations.py b/recommendations/VentilationRecommendations.py index 1120654a..34439827 100644 --- a/recommendations/VentilationRecommendations.py +++ b/recommendations/VentilationRecommendations.py @@ -81,3 +81,44 @@ class VentilationRecommendations(Definitions): "labour_days": labour_days # Assume 8 hour day } ] + + def recommend_trickle_vents(self): + """ + This is not something that we can identify completely non-invasively, however a recommendation which may come + about as a result of an energy assessment is the installation of trickle vents. This function handles that + """ + + trickle_vents_recommendation_config = next( + (r for r in self.property.non_invasive_recommendations if r["type"] == "trickle_vents"), {} + ) + + if not trickle_vents_recommendation_config: + return + + description = ( + "Install trickle vents on your windows" if + not trickle_vents_recommendation_config.get("description") + else trickle_vents_recommendation_config["description"] + ) + + return [ + { + "phase": None, + "parts": [], + "type": "trickle_vents", + "description": description, + "starting_u_value": None, + "new_u_value": None, + "already_installed": False, + "sap_points": 0, + "heat_demand": 0, + "kwh_savings": 0, + "co2_equivalent_savings": 0, + "energy_cost_savings": 0, + "total": trickle_vents_recommendation_config["cost"], + # We use a very simple and rough estimate of 4 hours per unit + "labour_hours": trickle_vents_recommendation_config.get("labour_hours", 8), + "labour_days": trickle_vents_recommendation_config.get("labour_days", 1), # Assume 8 hour day + "survey": True + } + ] diff --git a/recommendations/WallRecommendations.py b/recommendations/WallRecommendations.py index b73f187c..a0c71860 100644 --- a/recommendations/WallRecommendations.py +++ b/recommendations/WallRecommendations.py @@ -5,6 +5,7 @@ import pandas as pd from datatypes.enums import QuantityUnits from backend.Property import Property +from backend.app.plan.schemas import MEASURE_MAP from BaseUtility import Definitions from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes from recommendations.recommendation_utils import ( @@ -162,7 +163,7 @@ class WallRecommendations(Definitions): ) # Test filling cavity - self.find_cavity_insulation(u_value, insulation_thickness, phase) + self.find_cavity_insulation(u_value, insulation_thickness, phase, measures) return self.recommendations @@ -190,11 +191,15 @@ class WallRecommendations(Definitions): return ewi_recommendations - def recommend(self, phase=0, exclusions=None): + def recommend(self, phase=0, measures=None): # if building built after 1990 + we're able to identify U-value + # U-value less than 0.18 and if in or close to a conversation area, # recommend internal wall insulation as a possible measure + measures = MEASURE_MAP["wall_insulation"] if measures is None else measures + if not measures: + return + u_value = self.property.walls["thermal_transmittance"] u_value = None if pd.isnull(u_value) else u_value @@ -207,7 +212,7 @@ class WallRecommendations(Definitions): or self.property.walls["is_filled_cavity"] ) and ( "cavity_extract_and_refill" - not in self.property.non_invasive_recommendations + not in measures ): return @@ -235,7 +240,7 @@ class WallRecommendations(Definitions): and (u_value >= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE) ): # Recommend insulation - self.find_insulation(u_value, phase) + self.find_insulation(u_value, phase, measures) return # We can't detect it's a cavity wall, but it was built after 1990 so likely built with insulation already @@ -259,22 +264,22 @@ class WallRecommendations(Definitions): self.estimated_u_value = u_value - if is_cavity_wall or "cavity_extract_and_refill" in self.property.non_invasive_recommendations: + if (is_cavity_wall and "cavity_wall_insulation" in measures) or "cavity_extract_and_refill" in measures: if u_value >= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE: # Test filling cavity - self.find_cavity_insulation(u_value, insulation_thickness, phase) + self.find_cavity_insulation(u_value, insulation_thickness, phase, measures) return # Remaining wall types are treated with IWI or EWI if (u_value >= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE) and self.is_suitable_for_solid_insulation(): - self.find_insulation(u_value, phase, exclusions=exclusions) + self.find_insulation(u_value, phase, measures=measures) return # If the u-value is within regulations, we don't do anything return - def find_cavity_insulation(self, u_value, insulation_thickness, phase): + def find_cavity_insulation(self, u_value, insulation_thickness, phase, measures): """ This method tests different materials to fill the cavity wall, determining which material will give us the best U-value. @@ -294,6 +299,8 @@ class WallRecommendations(Definitions): :param u_value: u_value of the starting wall :param insulation_thickness: describes the insulation level of the wall. If "below average", we have a partially filled cavity wall + :param phase: The phase of the recommendation + :param measures: The measures we're considering """ insulation_materials = pd.DataFrame(self.cavity_wall_insulation_materials) @@ -328,7 +335,7 @@ class WallRecommendations(Definitions): is_extraction_and_refill = ( "cavity_extract_and_refill" - in self.property.non_invasive_recommendations + in measures ) cost_result = self.costs.cavity_wall_insulation( @@ -447,6 +454,16 @@ class WallRecommendations(Definitions): lowest_selected_u_value = None recommendations = [] + + iwi_non_invasive_recommendations = next( + (r for r in self.property.non_invasive_recommendations if r["type"] == "internal_wall_insulation"), {} + ) + ewi_non_invasive_recommendations = next( + (r for r in self.property.non_invasive_recommendations if r["type"] == "external_wall_insulation"), {} + ) + if ewi_non_invasive_recommendations: + raise NotImplementedError("Implement ewi non-invasive recommendations") + for _, insulation_material_group in insulation_materials.groupby("description"): for _, material in insulation_material_group.iterrows(): @@ -479,6 +496,15 @@ class WallRecommendations(Definitions): ) if material["type"] == "internal_wall_insulation": + + if iwi_non_invasive_recommendations.get("cost") is not None: + raise NotImplementedError( + "Not handled passing costs from non-invasive recommendations for iwi" + ) + + sap_points = iwi_non_invasive_recommendations.get("sap_points", None) + survey = iwi_non_invasive_recommendations.get("survey", False) + cost_result = self.costs.internal_wall_insulation( wall_area=self.property.insulation_wall_area, material=material.to_dict(), @@ -496,6 +522,10 @@ class WallRecommendations(Definitions): ) elif material["type"] == "external_wall_insulation": + + sap_points = ewi_non_invasive_recommendations.get("sap_points", None) + survey = ewi_non_invasive_recommendations.get("survey", False) + cost_result = self.costs.external_wall_insulation( wall_area=self.property.insulation_wall_area, material=material.to_dict(), @@ -546,19 +576,20 @@ class WallRecommendations(Definitions): "starting_u_value": u_value, "new_u_value": new_u_value, "already_installed": already_installed, - "sap_points": None, + "sap_points": sap_points, "simulation_config": simulation_config, "description_simulation": { "walls-description": new_description, "walls-energy-eff": simulation_config["walls_energy_eff_ending"] }, - **cost_result + **cost_result, + "survey": survey } ) return recommendations - def find_insulation(self, u_value, phase, exclusions=None): + def find_insulation(self, u_value, phase, measures): """ This function contains the logic for finding potential insulation measures for a property, depending on the parts available and whether the property can have external wall insulation installed @@ -570,10 +601,8 @@ class WallRecommendations(Definitions): # we separate the logic for for recommending them, therefore we don't # consider diminishing returns between the two as they are considered to be separate measures - exclusions = [] if exclusions is None else exclusions - ewi_recommendations = [] - if self.ewi_valid() and "external_wall_insulation" not in exclusions: + if self.ewi_valid() and "external_wall_insulation" in measures: ewi_recommendations = self._find_insulation( u_value=u_value, insulation_materials=pd.DataFrame( @@ -584,7 +613,7 @@ class WallRecommendations(Definitions): ) iwi_recommendations = [] - if "internal_wall_insulation" not in exclusions: + if "internal_wall_insulation" in measures: iwi_recommendations = self._find_insulation( u_value=u_value, insulation_materials=pd.DataFrame(self.internal_wall_insulation_materials), diff --git a/recommendations/WindowsRecommendations.py b/recommendations/WindowsRecommendations.py index 3826a470..ae7f7057 100644 --- a/recommendations/WindowsRecommendations.py +++ b/recommendations/WindowsRecommendations.py @@ -3,8 +3,9 @@ from typing import List import numpy as np from backend.Property import Property +from etl.epc_clean.epc_attributes.WindowAttributes import WindowAttributes from recommendations.Costs import Costs -from recommendations.recommendation_utils import override_costs +from recommendations.recommendation_utils import override_costs, check_simulation_difference class WindowsRecommendations: @@ -128,3 +129,64 @@ class WindowsRecommendations: } } ] + + def recommend_mixed_glazing(self, phase): + """ + This function will recommend mixed glazing to the property. This is a more specific recommendation than + the general windows recommendation, but is almost certain to arise from a survey + :return: + """ + + mixed_glazing_recommendation_config = next( + (r for r in self.property.non_invasive_recommendations if r["type"] == "mixed_glazing"), {} + ) + if not mixed_glazing_recommendation_config: + return + + description = ( + "Install a combination of secondary and double glazing to single glazed windows" if + not mixed_glazing_recommendation_config.get("description") + else mixed_glazing_recommendation_config["description"] + ) + + windows_ending_config = WindowAttributes("Full secondary glazing").process() + + windows_simulation_config = check_simulation_difference( + new_config=windows_ending_config, old_config=self.property.windows, prefix="windows_" + ) + + windows_simulation_config = { + **windows_simulation_config, + "windows_energy_eff_ending": "Average", + "glazed_type_ending": "secondary glazing", + "multi_glaze_proportion_ending": 100, + } + + return [ + { + "phase": phase, + "parts": [], + "type": "mixed_glazing", + "description": description, + "starting_u_value": None, + "new_u_value": None, + "already_installed": False, + "sap_points": mixed_glazing_recommendation_config["sap_points"], + "heat_demand": None, # We will predict this + "kwh_savings": None, # We will predict this + "co2_equivalent_savings": None, # We will predict this + "energy_cost_savings": None, # We will predict this + "total": mixed_glazing_recommendation_config["cost"], + # We use a very simple and rough estimate of 4 hours per unit + "labour_hours": mixed_glazing_recommendation_config.get("labour_hours", 8), + "labour_days": mixed_glazing_recommendation_config.get("labour_days", 1), # Assume 8 hour day + "survey": mixed_glazing_recommendation_config["survey"], + "simulation_config": windows_simulation_config, + "description_simulation": { + "multi-glaze-proportion": 100, + "windows-energy-eff": "Average", + "windows-description": "Multiple glazing throughout", + "glazed-type": "secondary glazing", + }, + } + ]