added additional secondary heating recommendation

This commit is contained in:
Khalim Conn-Kowlessar 2024-11-13 12:13:37 +00:00
parent 6d01490962
commit b01635ddd6
9 changed files with 169 additions and 25 deletions

2
.idea/Model.iml generated
View file

@ -7,7 +7,7 @@
<sourceFolder url="file://$MODULE_DIR$/open_uprn" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/recommendations" isTestSource="false" />
</content>
<orderEntry type="jdk" jdkName="Stonewater-wave-3" jdkType="Python SDK" />
<orderEntry type="jdk" jdkName="Fastapi-backend" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyNamespacePackagesService">

2
.idea/misc.xml generated
View file

@ -3,7 +3,7 @@
<component name="Black">
<option name="sdkName" value="Python 3.10 (backend)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Stonewater-wave-3" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Fastapi-backend" project-jdk-type="Python SDK" />
<component name="PyCharmProfessionalAdvertiser">
<option name="shown" value="true" />
</component>

View file

@ -792,9 +792,14 @@ class GoogleSolarApi:
property_instance = [p for p in input_properties if p.id == unit["property_id"]][0]
# At this level, we check if the property is suitable for solar and if now, skip
# Or if we have a solar non-invasive recommendation
non_invasive_rec = next(
(r for r in property_instance.non_invasive_recommendations if r["type"] == "solar_pv"), {}
).get("array_wattage")
if (
(not property_instance.is_solar_pv_valid()) or
[r for r in property_instance.non_invasive_recommendations if r["type"] == "solar_pv"]
non_invasive_rec is not None
):
continue

View file

@ -394,7 +394,7 @@ async def trigger_plan(body: PlanTriggerRequest):
logger.info("Getting the inputs")
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
# Check for duplicate UPRNS
input_uprns = [x.get("uprn") for x in plan_input if "uprn" in x]
input_uprns = [x.get("uprn") for x in plan_input if "uprn" in x and x.get("uprn")]
if input_uprns:
# Check for dupes
if len(input_uprns) != len(set(input_uprns)):

View file

@ -2,6 +2,7 @@ import os
import time
import re
from etl.epc.settings import EARLIEST_EPC_DATE
from dotenv import load_dotenv
from tqdm import tqdm
import pandas as pd
@ -236,7 +237,7 @@ def caha():
address = remap_address(address)
find_epc_searcher = RetrieveFindMyEpc(address=address, postcode=postcode)
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data()
find_epc_data = find_epc_searcher.retrieve_newest_find_my_epc_data(sap_2012_date=EARLIEST_EPC_DATE)
time.sleep(0.5)
# We need uprn
searcher = SearchEpc(
@ -249,18 +250,102 @@ def caha():
searcher.find_property(skip_os=True)
newest_epc = searcher.newest_epc
uprn = newest_epc["uprn"]
if address in ["Flat D, 11 Victoria Avenue", "Flat B, 11 Victoria Avenue"]:
uprn = None
extracted_data.append(
{
"uprn": newest_epc["uprn"],
"uprn": uprn,
**find_epc_data,
}
)
asset_list.append(
{
"uprn": newest_epc["uprn"],
"address": home["Address letter or number"],
"uprn": uprn,
"address": address,
"postcode": home["Postcode"],
"property_type": newest_epc["property-type"],
}
)
non_invasive_recommendations = [
{
"uprn": r["uprn"],
"recommendations": r["recommendations"]
} for r in extracted_data
]
# for r in non_invasive_recommendations:
# new_recommendations = []
# extracted = [r for r in extracted_data if r["uprn"] == r["uprn"]][0]
# for rec in r["recommendations"]:
# if extracted["hotwater-description"] == "Gas boiler/circulator, no cylinder thermostat":
# if rec["type"] in ["hot_water_tank_insulation", "cylinder_thermostat"]:
# continue
# rec["survey"] = False
# new_recommendations.append(rec)
# r["recommendations"] = new_recommendations
# We model the two properties separately
asset_list = pd.DataFrame(asset_list)
# Drop Flat D, 11 Victoria Avenue
asset_list1 = asset_list[asset_list["address"] != "Flat D, 11 Victoria Avenue"]
asset_list2 = asset_list[asset_list["address"] == "Flat D, 11 Victoria Avenue"]
# Store the asset list in s3
filename = f"{USER_ID}/{CAHA_PORTFOLIO_ID}/asset_list1.csv"
save_csv_to_s3(
dataframe=asset_list1,
bucket_name="retrofit-plan-inputs-dev",
file_name=filename
)
filename2 = f"{USER_ID}/{CAHA_PORTFOLIO_ID}/asset_list2.csv"
save_csv_to_s3(
dataframe=asset_list2,
bucket_name="retrofit-plan-inputs-dev",
file_name=filename2
)
# Store the non-invasive recommendations in s3
non_invasive_recommendations_filename = f"{USER_ID}/{CAHA_PORTFOLIO_ID}/non_invasive_recommendations.csv"
save_csv_to_s3(
dataframe=pd.DataFrame(non_invasive_recommendations),
bucket_name="retrofit-plan-inputs-dev",
file_name=non_invasive_recommendations_filename
)
body = {
"portfolio_id": str(CAHA_PORTFOLIO_ID),
"housing_type": "Social",
"goal": "Increasing EPC",
"goal_value": "C",
"trigger_file_path": filename,
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
"valuation_file_path": "",
"scenario_name": "Wave 3 Packages",
"multi_plan": True,
"budget": None,
"exclusions": ["boiler_upgrade"]
}
print(body)
body2 = {
"portfolio_id": str(CAHA_PORTFOLIO_ID),
"housing_type": "Social",
"goal": "Increasing EPC",
"goal_value": "C",
"trigger_file_path": filename2,
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
"valuation_file_path": "",
"scenario_name": "Wave 3 Packages",
"multi_plan": True,
"budget": None,
"exclusions": ["boiler_upgrade"]
}
print(body2)

View file

@ -1411,5 +1411,45 @@ def find_remaining_surveys():
assert needed.shape[0] + costed.shape[0] == surveyed.shape[0]
def append_stonewater_id():
"""
This completes an adhoc request from Stonewater to add in their organisation Reference onto the model
:return:
"""
model_proposed_sample = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater - Bid Packages WIP 13.11.24.xlsx",
sheet_name="Modelled Packages",
header=13
)
model_proposed_sample = model_proposed_sample[~pd.isnull(model_proposed_sample["Address ID"])]
model_proposed_sample["Address ID"] = model_proposed_sample["Address ID"].astype(int)
original_archetypes = pd.read_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater SHDF_3_0_Board Triage 22.05.24 "
"- Archetyped V3.1.xlsx",
header=4
)
original_archetypes = original_archetypes[~pd.isnull(original_archetypes["Address ID"])]
original_archetypes = original_archetypes[original_archetypes["Address ID"] != "Address ID"]
original_archetypes["Address ID"] = original_archetypes["Address ID"].astype(int)
matched = model_proposed_sample.merge(
original_archetypes[["Address ID", 'Org. ref.']],
on="Address ID",
how="left"
)
if pd.isnull(matched["Org. ref."]).sum():
raise ValueError("Something went wrong")
# Save as CSV
matched.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Stonewater/Stonewater IDs.xlsx",
sheet_name="Proposed Wave 3 Sample",
index=False
)
# if __name__ == "__main__":
# main()

View file

@ -1,3 +1,4 @@
import pandas as pd
import requests
from bs4 import BeautifulSoup
from datetime import datetime
@ -25,7 +26,7 @@ class RetrieveFindMyEpc:
self.address_cleaned = self.address.replace(",", "").replace(" ", "").lower()
def retrieve_newest_find_my_epc_data(self):
def retrieve_newest_find_my_epc_data(self, sap_2012_date=None):
"""
For a post code and address, we pull out all the required data from the find my epc website
"""
@ -188,7 +189,7 @@ class RetrieveFindMyEpc:
raise ValueError(f"Missing key: {key}")
# Finally, we format the recommendations
recommendations = self.format_recommendations(recommendations)
recommendations = self.format_recommendations(recommendations, assessment_data, sap_2012_date)
resulting_data = {
'epc_certificate': epc_certificate,
@ -205,11 +206,12 @@ class RetrieveFindMyEpc:
return resulting_data
@staticmethod
def format_recommendations(recommendations):
def format_recommendations(recommendations, assessment_data, sap_2012_date=None):
"""
This function converts the recommendations to a format that we can use in the engine as a non-intrusive survey
:param recommendations:
:return:
:param recommendations: The recommendations from the EPC
:param assessment_data: The assessment data from the EPC
:param sap_2012_date: The date of the SAP 2012 update
"""
measure_map = {
@ -246,17 +248,23 @@ class RetrieveFindMyEpc:
"Double glazing": ["double_glazing"],
}
survey = True
if sap_2012_date is not None:
certificate_date = datetime.strptime(assessment_data["Date of certificate"], "%d %B %Y")
if certificate_date < pd.to_datetime(sap_2012_date):
survey = False
formatted_recommendations = []
for rec in recommendations:
mapped = measure_map[rec["measure"]]
for measure in mapped:
formatted_recommendations.append(
{
"type": measure,
"sap_points": rec["sap_points"],
"survey": True
}
)
to_append = {
"type": measure,
"sap_points": rec["sap_points"],
"survey": survey,
}
if measure == "solar_pv":
to_append["suitable"] = True
formatted_recommendations.append(to_append)
return formatted_recommendations

View file

@ -60,15 +60,21 @@ class HotwaterRecommendations:
# If there is no system present, but access to the mains, we
has_tank_recommendation = [r for r in self.recommendations if r["type"] == "hot_water_tank_insulation"]
if (
(self.property.hotwater["heater_type"] in ["electric immersion"]) &
(self.property.data["hot-water-energy-eff"] == "Very Poor") &
(self.property.hotwater["no_system_present"] is None)
(self.property.hotwater["no_system_present"] is None) &
len(has_tank_recommendation) == 0
):
self.recommend_tank_insulation(phase=phase)
return
if self.property.hotwater["clean_description"] == "From main system, no cylinder thermostat":
has_cylinder_recommendation = [r for r in self.recommendations if r["type"] == "cylinder_thermostat"]
if ((self.property.hotwater["clean_description"] == "From main system, no cylinder thermostat") &
(len(has_cylinder_recommendation) == 0)):
self.recommend_cylinder_thermostat(phase=phase)
return

View file

@ -13,7 +13,7 @@ class SecondaryHeating:
ACCEPTED_MAINHEAT_DESCRIPTIONS = ["Boiler and radiators, mains gas"]
ACCEPTED_SECONDHEAT_DESCRIPTIONS = ["Room heaters, electric"]
# These are the heaters where works are required to remove them
FIXED_HEATER_DESCRIPTIONS = ["Room heaters, electric"]
FIXED_HEATER_DESCRIPTIONS = ["Room heaters, electric", 'Portable electric heaters (assumed)']
def __init__(self, property_instance: Property):
self.property = property_instance
@ -34,7 +34,7 @@ class SecondaryHeating:
if self.property.data['secondheat-description'] in self.FIXED_HEATER_DESCRIPTIONS:
# We have an associated cost otherwise, there is no cost
n_rooms = self.property.data['number-heated-rooms']
n_rooms = self.property.data['number-habitable-rooms'] - self.property.data['number-heated-rooms']
else:
n_rooms = 0