adding the optimisation approach for mds

This commit is contained in:
Khalim Conn-Kowlessar 2024-06-03 18:17:31 +01:00
parent 5a9bc15306
commit 1eca5af64c
5 changed files with 335 additions and 37 deletions

View file

@ -742,24 +742,42 @@ async def build_mds(body: MdsRequest):
logger.info("Getting components and epc recommendations")
recommendations_scoring_data = []
representative_recommendations = {}
recommendations = {}
# TODO: Action the optimise_measures flat
for p in tqdm(input_properties):
p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds)
mds = Mds(property_instance=p, materials=materials, optimise_measures=optimise_measures)
property_representative_recommendations, errors = mds.build()
mds_recommendations, property_representative_recommendations, errors = mds.build()
if errors:
logger.info("Errors occurred during MDS build")
recommendations[p.id] = mds_recommendations
representative_recommendations[p.id] = property_representative_recommendations
# Build the scoring data
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
recommendations_scoring_data.append(
p.simulate_all_representative_recommendations(property_representative_recommendations)
)
if optimise_measures:
for _id, mds_recs in mds_recommendations.items():
representative_ids = [r["recommendation_id"] for r in property_representative_recommendations[_id]]
simulation_mds_recs = []
for recs in mds_recs:
simulation_mds_recs.append(
[r for r in recs if r["recommendation_id"] in representative_ids]
)
p.adjust_difference_record_with_recommendations(
simulation_mds_recs, property_representative_recommendations[_id]
)
recommendations_scoring_data.extend(p.recommendations_scoring_data)
else:
recommendations_scoring_data.append(
p.simulate_all_representative_recommendations(property_representative_recommendations)
)
logger.info("Preparing data for scoring in sap change api")
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)

View file

@ -295,6 +295,49 @@ def main():
addresses_df2.to_excel("Places For People EPC data with surveyor.xlsx", index=False)
# Read in
df = pd.read_excel("Places For People EPC data with surveyor.xlsx")
df = df[
df["assessor_name"].isin(
[
"Arsalan Khalid", "Kieran Bradnock", "Wayne Davies", "Lindsay Sands", "Bruce Nethercot",
"Christopher Hearn", "Robert Sigerson", "Daniel Riddle", "Leroy Sands",
]
)
]
# Get the EPC
heights = []
for _, row in tqdm(df.iterrows(), total=len(df)):
searcher = SearchEpc(
address1=str(row["Matched EPC Address"]),
postcode=str(row["POSTCODE"]),
uprn=str(int(row["uprn"])),
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=True,
)
# Force the skipping of estimating the EPC
searcher.ordnance_survey_client.property_type = None
searcher.ordnance_survey_client.built_form = None
searcher.find_property(skip_os=True)
height = {
"uprn": row["uprn"],
"floor_height": searcher.newest_epc["floor-height"]
}
heights.append(height)
df = df.merge(
pd.DataFrame(heights),
how="left",
on="uprn"
)
df.to_excel("WF surveyors with floor heights.xlsx", index=False)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,182 @@
import pandas as pd
import os
from tqdm import tqdm
from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
def app():
# Read in rolling master
master = pd.read_csv(
"/Users/khalimconn-kowlessar/Downloads/UNITAS ( STOKE) MASTER ROLLING SHEET UPDATED 16.5.24 K - PASSWORD "
"PROTECTED/ECO 4 - PHASE 1-Table 1.csv"
)
master = master[master["INSTALLER"] == "SCIS"]
master = master[
[
'UPRN', 'NO.', 'Street / Block Name', 'Town/Area', 'Post Code', 'Surveyor', "SUBMISSION DATE"
]
]
master = master[~pd.isnull(master["UPRN"])]
master = master[master["UPRN"] != "NOT ON ASSET LIST"]
heights = []
eco_assessment_epcs = []
for _, row in tqdm(master.iterrows(), total=len(master)):
searcher = SearchEpc(
address1="",
postcode="",
uprn=str(int(row["UPRN"])),
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=False,
)
# Force the skipping of estimating the EPC
searcher.ordnance_survey_client.property_type = None
searcher.ordnance_survey_client.built_form = None
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
continue
# Look for eco assessment epcs
eco_epc = [x for x in [searcher.newest_epc] + searcher.older_epcs if x['transaction-type'] == 'ECO assessment']
# Take the newest
eco_epc = sorted(eco_epc, key=lambda x: x['inspection-date'], reverse=True)
if eco_epc:
eco_assessment_epcs.append(eco_epc[0])
height = {
"uprn": row["UPRN"],
"floor_height": searcher.newest_epc["floor-height"]
}
heights.append(height)
heights_df = pd.DataFrame(heights)
eco_assessment_epcs_df = pd.DataFrame(eco_assessment_epcs)
merged_heights_df = master.merge(heights_df, left_on="UPRN", right_on="uprn", how="inner")
merged_heights_df = merged_heights_df[merged_heights_df["floor_height"] != ""]
merged_eco_assessment_epcs_df = master.merge(eco_assessment_epcs_df[["uprn", "floor-height"]], left_on="UPRN",
right_on="uprn", how="inner")
merged_eco_assessment_epcs_df["floor-height"] = merged_eco_assessment_epcs_df["floor-height"].astype(float)
merged_eco_assessment_epcs_df.groupby("Surveyor")["floor-height"].mean()
# Store
merged_heights_df.to_csv("Unitas 2022 heights - based on newest EPC.csv", index=False)
merged_eco_assessment_epcs_df.to_csv("Unitas 2022 heights - based on ECO assessment EPC.csv", index=False)
# Read in a diferent sheet
master = pd.read_excel(
"/Users/khalimconn-kowlessar/Downloads/COMMUNITY HOUSING SURVEYS WITH A POST EPC.xlsx"
)
master["row_number"] = master.index
heights = []
eco_assessment_epcs = []
expected_pre = []
expected_post = []
biggest_floor_height = []
for _, row in tqdm(master.iterrows(), total=len(master)):
full_address = ", ".join([
str(row["NO."]), row["Street / Block Name"], row["Town/Area"], row["Post Code"]
])
searcher = SearchEpc(
address1=str(row["NO."]),
postcode=str(row["Post Code"]),
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=False,
full_address=full_address
)
# Force the skipping of estimating the EPC
searcher.ordnance_survey_client.property_type = None
searcher.ordnance_survey_client.built_form = None
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
continue
all_epcs = [searcher.newest_epc] + searcher.older_epcs
# Search for SAP 54s
sap_54s = [x for x in all_epcs if x["current-energy-efficiency"] == "54"]
sap_69s = [x for x in all_epcs if x["current-energy-efficiency"] == "69"]
heights = [float(x["floor-height"]) for x in all_epcs if x["floor-height"] != ""]
# Look for eco assessment epcs
eco_epc = [x for x in [searcher.newest_epc] + searcher.older_epcs if x['transaction-type'] == 'ECO assessment']
# Take the newest
eco_epc = sorted(eco_epc, key=lambda x: x['inspection-date'], reverse=True)
if eco_epc:
eco_assessment_epcs.append(
{
"row_number": row["row_number"],
**eco_epc[0]
}
)
if heights:
floor_height_max = max(heights)
biggest_floor_height.append(
{
"row_number": row["row_number"],
"floor_height": floor_height_max
}
)
if sap_54s:
expected_pre.append(
{
"row_number": row["row_number"],
**sap_54s[0]
}
)
if sap_69s:
expected_post.append(
{
"row_number": row["row_number"],
**sap_69s[0]
}
)
expected_pre_df = pd.DataFrame(expected_pre)
expected_post_df = pd.DataFrame(expected_post)
heights_df = pd.DataFrame(biggest_floor_height)
eco_assessment_epcs_df = pd.DataFrame(eco_assessment_epcs)
merged_heights_df = master.merge(heights_df, on="row_number", how="inner")
merged_heights_df = merged_heights_df[merged_heights_df["floor_height"] != ""]
merged_eco_assessment_epcs_df = master.merge(
eco_assessment_epcs_df[["row_number", "floor-height"]], on="row_number", how="inner"
)
merged_eco_assessment_epcs_df["floor-height"] = merged_eco_assessment_epcs_df["floor-height"].astype(float)
merged_eco_assessment_epcs_df.groupby("Surveyor")["floor-height"].mean()
# Check average floor height for social housing properties with ECO assessment EPCs in Birmingham
sample = pd.read_csv("local_data/all-domestic-certificates/domestic-E08000025-Birmingham/certificates.csv")
sample = sample[sample["TRANSACTION_TYPE"] == "ECO assessment"]
sample = sample[sample["TENURE"].isin(["rental (social)", "Rented (social)"])]
sample["FLOOR_HEIGHT"] = sample["FLOOR_HEIGHT"].astype(float)
sample["FLOOR_HEIGHT"].mean()
sample[pd.to_datetime(sample["LODGEMENT_DATE"]) >= "2022-01-01"]["FLOOR_HEIGHT"].mean()

View file

@ -1,4 +1,5 @@
import itertools
from utils.logger import setup_logger
from backend.Property import Property
from recommendations.FloorRecommendations import FloorRecommendations
from recommendations.WallRecommendations import WallRecommendations
@ -13,6 +14,8 @@ from recommendations.HotwaterRecommendations import HotwaterRecommendations
from recommendations.SecondaryHeating import SecondaryHeating
from recommendations.Recommendations import Recommendations
logger = setup_logger()
class Mds:
"""
@ -52,6 +55,16 @@ class Mds:
'solar_pv'
]
format_map = {
"external_wall_insulation": "EWI (Trad Const)",
"internal_wall_insualtion": "IWI",
"cavity_wall_insulation": "CWI",
"loft_insulation": "LI",
"air_source_heat_pump": "ASHP Htg",
"high_heat_retention_storage_heaters": "High Heat Retention Storage Heaters",
"solar_pv": "Solar PV",
}
# Check if our measures are within the ones we've handled
new = [m for m in measures if m not in all_considered_measures]
if new:
@ -144,21 +157,18 @@ class Mds:
if len(combination) != len(pruned_measures):
continue
pruned_combinations.append(pruned_measures)
pruned_measures_formatted = []
for pm in pruned_measures:
pruned_measures_formatted.append({pm: format_map[pm]})
pruned_combinations.append(pruned_measures_formatted)
# We're left with the subset of measures that are possible for this property
# These are the possible groups of measures that could be applied to this home
return pruned_combinations
def build(self):
if self.property_instance.measures is None:
raise NotImplementedError("No measures in the property - implement me")
if self.optimise_measures:
measure_config_list = self.select_optimal_measure_set(self.property_instance.measures)
else:
measure_config_list = [list(m.keys())[0] for m in self.property_instance.measures]
def _build(self, measure_config_list, measures):
not_implemented_measures = [
"party_wall_insulation",
"ground_source_heat_pump",
@ -176,114 +186,159 @@ class Mds:
mds_recommendations = []
errors = []
phase = 0
# TODO: Could use a decarator to reduce the boilerplate code - insert_recommendation_id and then the append
if "external_wall_insulation" in measure_config_list:
recs = self.wall_recommender.mds_recommend_ewi(phase=0)
recs = self.wall_recommender.mds_recommend_ewi(phase=phase)
if not recs:
raise Exception("No recommendations for external wall insulation")
recs = self.insert_recommendation_id(recs, measures, "external_wall_insulation")
mds_recommendations.append(recs)
if self.optimise_measures and len(recs):
phase += 1
if "cavity_wall_insulation" in measure_config_list:
recs = self.wall_recommender.mds_recommend_cavity_wall_insulation(phase=0)
recs = self.wall_recommender.mds_recommend_cavity_wall_insulation(phase=phase)
recs = self.insert_recommendation_id(recs, measures, "cavity_wall_insulation")
mds_recommendations.append(recs)
if self.optimise_measures and len(recs):
phase += 1
if "loft_insulation" in measure_config_list:
# Check if the roof is suitable for loft insulation
if self.property_instance.roof['is_roof_room']:
errors.append("Roof is a room")
else:
recs = self.roof_recommender.mds_loft_insulation(phase=0)
recs = self.roof_recommender.mds_loft_insulation(phase=phase)
if not recs:
raise Exception("No recommendations for loft insulation")
recs = self.insert_recommendation_id(recs, measures, "loft_insulation")
mds_recommendations.append(recs)
if self.optimise_measures and len(recs):
phase += 1
if "internal_wall_insulation" in measure_config_list:
raise Exception("check me out 4")
self.wall_recommender.recommend(phase=0)
self.wall_recommender.recommend(phase=phase)
if "suspended_floor_insulation" in measure_config_list:
raise Exception("check me out 5")
self.floor_recommender.recommend(phase=0)
self.floor_recommender.recommend(phase=phase)
if "solid_floor_insulation" in measure_config_list:
raise Exception("check me out 6")
self.floor_recommender.recommend(phase=0)
self.floor_recommender.recommend(phase=phase)
if "air_source_heat_pump" in measure_config_list:
recs = self.heating_recommender.recommend_air_source_heat_pump(
phase=0, has_cavity_or_loft_recommendations=False, _return=True
phase=phase, has_cavity_or_loft_recommendations=False, _return=True
)
recs = self.insert_recommendation_id(recs, measures, "air_source_heat_pump")
mds_recommendations.append(recs)
if self.optimise_measures and len(recs):
phase += 1
if "high_heat_retention_storage_heaters" in measure_config_list:
recs = self.heating_recommender.recommend_hhr_storage_heaters(
phase=0, system_change=True, heating_controls_only=False, _return=True
phase=phase, system_change=True, heating_controls_only=False, _return=True
)
recs = self.insert_recommendation_id(recs, measures, "electric_storage_heaters")
recs = self.insert_recommendation_id(recs, measures, "high_heat_retention_storage_heaters")
mds_recommendations.append(recs)
if self.optimise_measures and len(recs):
phase += 1
if "low_energy_lighting" in measure_config_list:
raise Exception("check me out 9")
self.lighting_recommender.recommend(phase=0)
self.lighting_recommender.recommend(phase=phase)
if "cylinder_insulation" in measure_config_list:
raise Exception("check me out 10")
self.hotwater_recommender.recommend(phase=0)
self.hotwater_recommender.recommend(phase=phase)
if "smart_controls" in measure_config_list:
raise Exception("check me out 11")
self.heating_recommender.recommend(phase=0)
self.heating_recommender.recommend(phase=phase)
if "zone_controls" in measure_config_list:
raise Exception("check me out 12")
self.heating_recommender.recommend(phase=0)
self.heating_recommender.recommend(phase=phase)
if "trvs" in measure_config_list:
raise Exception("check me out 13")
self.heating_recommender.recommend(phase=0)
self.heating_recommender.recommend(phase=phase)
if "solar_pv" in measure_config_list:
recs = self.solar_recommender.mds_recommend(phase=0, solar_pv_percentage=0.5)
recs = self.solar_recommender.mds_recommend(phase=phase, solar_pv_percentage=0.5)
recs = self.insert_recommendation_id(recs, measures, "solar_pv")
mds_recommendations.append(recs)
if self.optimise_measures and len(recs):
phase += 1
if "double_glazing" in measure_config_list:
raise Exception("check me out 15")
self.windows_recommender.recommend(phase=0)
self.windows_recommender.recommend(phase=phase)
if "mechanical_ventilation" in measure_config_list:
raise Exception("check me out 16")
self.ventilation_recomender.recommend(phase=0)
self.ventilation_recomender.recommend(phase=phase)
if "gas_boiler" in measure_config_list:
raise Exception("check me out 17")
self.heating_recommender.recommend(phase=0)
self.heating_recommender.recommend(phase=phase)
if "flat_roof_insulation" in measure_config_list:
raise Exception("check me out 18")
self.roof_recommender.recommend(phase=0)
self.roof_recommender.recommend(phase=phase)
if "room_in_roof_insulation" in measure_config_list:
raise Exception("check me out 19")
self.roof_recommender.recommend(phase=0)
self.roof_recommender.recommend(phase=phase)
property_representative_recommendations = Recommendations.create_representative_recommendations(
mds_recommendations, non_invasive_recommendations=[]
)
return property_representative_recommendations, errors
return mds_recommendations, property_representative_recommendations, errors
def build(self):
if self.property_instance.measures is None:
raise NotImplementedError("No measures in the property - implement me")
if self.optimise_measures:
measures_set = self.select_optimal_measure_set(self.property_instance.measures)
logger.info(f"Building recommendations for {len(measures_set)} combinations of measures")
mds_recommendations_map = {}
representative_recommendations_map = {}
errors_map = {}
for measures in measures_set:
measure_config_list = [list(x.keys())[0] for x in measures]
mds_recommendations, rep_recommendations, errors = self._build(
measure_config_list=measure_config_list,
measures=measures
)
if errors:
logger.info(f"Errors: {errors}")
mds_recommendations_map[str(measure_config_list)] = mds_recommendations
representative_recommendations_map[str(measure_config_list)] = rep_recommendations
errors_map[str(measure_config_list)] = errors
return mds_recommendations_map, representative_recommendations_map, errors_map
else:
measure_config_list = [list(m.keys())[0] for m in self.property_instance.measures]
return self._build(measure_config_list=measure_config_list, measures=self.property_instance.measures)
@staticmethod
def insert_recommendation_id(recommendations, measures, measure_name):
# Insert the recommendation identifier into this recommendation
measure_config = [m for m in measures if measure_name in m][0]
idx = 0
for r in recommendations:
r["recommendation_id"] = list(measure_config.values())[0]
r["recommendation_id"] = list(measure_config.values())[0] + "-" + str(idx)
idx += 1
return recommendations

View file

@ -227,7 +227,7 @@ class Recommendations:
recommendations_by_type = sorted(recommendations_by_type, key=lambda x: x["type"])
representative_recommendations = []
for type, recommendations in groupby(recommendations_by_type, key=lambda x: x["type"]):
for _type, recommendations in groupby(recommendations_by_type, key=lambda x: x["type"]):
recommendations = list(recommendations)
# We also create an efficiency key, which is used to sort the recommendations
if has_u_value: