completing combination of available measures

This commit is contained in:
Khalim Conn-Kowlessar 2024-06-01 14:32:20 +01:00
parent dc39d6690b
commit 5a9bc15306
7 changed files with 462 additions and 14 deletions

View file

@ -93,7 +93,10 @@ class Property:
non_invasive_recommendations else []
)
# This is a list of measures that have been recommended for the property
self.measures = ast.literal_eval(measures) if measures else None
if isinstance(measures, list):
self.measures = measures
else:
self.measures = ast.literal_eval(measures) if measures else None
self.uprn = epc_record.get("uprn")
self.full_sap_epc = epc_record.get("full_sap_epc")

View file

@ -23,7 +23,7 @@ from backend.app.db.functions.recommendations_functions import (
)
from backend.app.db.models.portfolio import rating_lookup
from backend.app.dependencies import validate_token
from backend.app.plan.schemas import PlanTriggerRequest
from backend.app.plan.schemas import PlanTriggerRequest, MdsRequest
from backend.app.plan.utils import get_cleaned
from backend.app.utils import epc_to_sap_lower_bound, sap_to_epc
@ -622,7 +622,7 @@ async def trigger_plan(body: PlanTriggerRequest):
@router.post("/mds")
async def build_mds(body: PlanTriggerRequest):
async def build_mds(body: MdsRequest):
# TODO: This is a placeholder location for the MDS endpoint, which this is being assembled
logger.info("Connecting to db")
@ -633,6 +633,8 @@ async def build_mds(body: PlanTriggerRequest):
session.begin()
logger.info("Getting the inputs")
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
measure_set = body.measures
optimise_measures = measure_set is not None
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
@ -706,7 +708,10 @@ async def build_mds(body: PlanTriggerRequest):
# (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
# ), {})
measures = config["measures"] if "measures" in config else None
if measure_set is None:
measures = config["measures"] if "measures" in config else None
else:
measures = measure_set
input_properties.append(
Property(
@ -738,13 +743,11 @@ async def build_mds(body: PlanTriggerRequest):
recommendations_scoring_data = []
representative_recommendations = {}
# TODO: Action the optimise_measures flat
for p in tqdm(input_properties):
p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds)
# [{'external_wall_insulation': 'EWI (Trad Const)'}, {'loft_insulation': 'LI'}, {'air_source_heat_pump':
# 'ASHP Htg'}, {'solar_pv': 'Solar PV'}]
mds = Mds(property_instance=p, materials=materials)
mds = Mds(property_instance=p, materials=materials, optimise_measures=optimise_measures)
property_representative_recommendations, errors = mds.build()
if errors:
@ -886,6 +889,7 @@ async def build_mds(body: PlanTriggerRequest):
results = pd.DataFrame(results)
results["sap_uplift"] = results["sap_after"] - results["sap_before"]
# results.to_excel("mds_results 30th May.xlsx")
except IntegrityError:
logger.error("Database integrity error occurred", exc_info=True)

View file

@ -52,3 +52,9 @@ class PlanTriggerRequest(BaseModel):
if v not in cls._allowed_housing_types:
raise ValueError(f"{v} is not a valid housing type")
return v
class MdsRequest(PlanTriggerRequest):
# When creating the mds report, we allow an optional list of measures to select from. If this is passed, it will
# cause the service to select the optimal package from the list of measures
measures: Optional[conlist(str, min_items=1)] = None

View file

@ -64,7 +64,7 @@ def extract_mds_measures(config):
measures.append({"district_heating_networks": "District heating networks"})
if not pd.isnull(config["Elec Storage Htrs (Out of scope -Prov sum only)"]):
measures.append({"electric_storage_heaters": "Elec Storage Htrs (Out of scope -Prov sum only)"})
measures.append({"high_heat_retention_storage_heaters": "Elec Storage Htrs (Out of scope -Prov sum only)"})
if not pd.isnull(config["Low Energy Bulbs"]):
measures.append({"low_energy_lighting": "Low Energy Bulbs"})
@ -269,3 +269,26 @@ def app():
"budget": None,
}
print(body)
# Optimised version where we specify the measures
measures = [
"external_wall_insulation",
"cavity_wall_insulation",
"loft_insulation",
"air_source_heat_pump",
"high_heat_retention_storage_heaters",
"solar_pv"
]
body = {
"portfolio_id": str(PORTFOLIO_ID),
"housing_type": "Social",
"goal": "Increase EPC",
"goal_value": "C",
"trigger_file_path": filename,
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": "",
"measures": measures,
"budget": None,
}

View file

@ -0,0 +1,148 @@
import os
import pandas as pd
from tqdm import tqdm
from dotenv import load_dotenv
from utils.s3 import read_excel_from_s3
from backend.SearchEpc import SearchEpc
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
from recommendations.recommendation_utils import (
estimate_perimeter,
estimate_external_wall_area,
estimate_number_of_floors
)
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
def app():
"""
This app is EPC pulling data for some properties owned by LHP
:return:
"""
# asset_list = read_excel_from_s3(
# bucket_name="retrofit-datalake-dev",
# file_key="customers/guiness/TGP CW Properties PV.xlsx",
# header_row=0
# )
asset_list = pd.read_excel("/Users/khalimconn-kowlessar/Downloads/Echo4 3.4.24.xlsx", header=0)
epc_data = []
for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)):
full_address = home["ADDRESS"]
address_split = full_address.split(",")
address1 = address_split[0].strip()
postcode = address_split[-1].strip()
searcher = SearchEpc(
address1=address1,
postcode=postcode,
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=True,
full_address=full_address
)
# Force the skipping of estimating the EPC
searcher.ordnance_survey_client.property_type = None
searcher.ordnance_survey_client.built_form = None
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
continue
epc = {
"asset_list_address": full_address,
**searcher.newest_epc.copy()
}
epc_data.append(epc)
epc_df = pd.DataFrame(epc_data)
# Retrieve just the data we need
epc_df = epc_df[
[
"asset_list_address",
"uprn",
"property-type",
"built-form",
"inspection-date",
"current-energy-rating",
"current-energy-efficiency",
"roof-description",
"walls-description",
"transaction-type",
# New fields needed
"secondheat-description",
"total-floor-area",
"construction-age-band",
"floor-height",
"number-habitable-rooms",
"mainheat-description"
]
]
asset_list = asset_list.merge(
epc_df,
how="left",
left_on=["ADDRESS"],
right_on=["asset_list_address"]
)
asset_list = asset_list.drop(columns=["asset_list_address"])
# Rename the columns
asset_list = asset_list.rename(columns={
"inspection-date": "Date of last EPC",
"current-energy-efficiency": "SAP score on register",
"current-energy-rating": "EPC rating on register",
"property-type": "Property Type",
"built-form": "Archetype",
"total-floor-area": "Property Floor Area",
"construction-age-band": "Property Age Band",
"floor-height": "Property Floor Height",
"number-habitable-rooms": "Number of Habitable Rooms",
"walls-description": "Wall Construction",
"roof-description": "Roof Construction",
"mainheat-description": "Heating Type",
"secondheat-description": "Secondary Heating",
"transaction-type": "Reason for last EPC"
})
asset_list["Estimated Number of Floors"] = asset_list.apply(
lambda x: estimate_number_of_floors(property_type=x["Property Type"]), axis=1
)
asset_list["Property Floor Area"] = asset_list["Property Floor Area"].astype(float)
asset_list["Number of Habitable Rooms"] = asset_list["Number of Habitable Rooms"].astype(float)
asset_list["Estimated Perimeter (m)"] = asset_list.apply(
lambda x: estimate_perimeter(
floor_area=x["Property Floor Area"] / x["Estimated Number of Floors"],
num_rooms=x["Number of Habitable Rooms"] / x["Estimated Number of Floors"],
), axis=1
)
asset_list["Estimated Heat Loss Perimeter (m)"] = asset_list.apply(
lambda x: estimate_external_wall_area(
num_floors=x["Estimated Number of Floors"],
floor_height=float(x["Property Floor Height"]) if x["Property Floor Height"] else 2.5,
perimeter=x["Estimated Perimeter (m)"],
built_form=x["Archetype"]
),
axis=1
)
asset_list["Roof Insulation Thickness"] = asset_list.apply(
lambda x: RoofAttributes(description=x["Roof Construction"]).process()["insulation_thickness"],
axis=1
)
# Store as an excel
filename = "LHP EPC Data pull.xlsx"
asset_list.to_excel(filename, index=False)

View file

@ -0,0 +1,148 @@
import os
import pandas as pd
from tqdm import tqdm
from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
from etl.epc_clean.epc_attributes.RoofAttributes import RoofAttributes
from recommendations.recommendation_utils import (
estimate_perimeter,
estimate_external_wall_area,
estimate_number_of_floors
)
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
def app():
"""
This app is EPC pulling data for some properties owned by Unitas
:return:
"""
# asset_list = read_excel_from_s3(
# bucket_name="retrofit-datalake-dev",
# file_key="customers/guiness/TGP CW Properties PV.xlsx",
# header_row=0
# )
asset_list = pd.read_excel(
"/Users/khalimconn-kowlessar/Downloads/UNITAS BUNGALOWS - EPC DATA PULL.xlsx", header=0
)
epc_data = []
for _, home in tqdm(asset_list.iterrows(), total=len(asset_list)):
searcher = SearchEpc(
address1=str(home["Address Line 1"]),
postcode=home["Post Code"],
uprn=home["Property Reference"],
auth_token=EPC_AUTH_TOKEN,
os_api_key="",
property_type=None,
fast=True
)
# Force the skipping of estimating the EPC
searcher.ordnance_survey_client.property_type = None
searcher.ordnance_survey_client.built_form = None
searcher.find_property(skip_os=True)
if searcher.newest_epc is None:
continue
epc = {
"asset_list_address": home["Address Line 1"],
"asset_list_postcode": home["Post Code"],
**searcher.newest_epc.copy()
}
epc_data.append(epc)
epc_df = pd.DataFrame(epc_data)
# Retrieve just the data we need
epc_df = epc_df[
[
"asset_list_address",
"uprn",
"property-type",
"built-form",
"inspection-date",
"current-energy-rating",
"current-energy-efficiency",
"roof-description",
"walls-description",
"transaction-type",
# New fields needed
"secondheat-description",
"total-floor-area",
"construction-age-band",
"floor-height",
"number-habitable-rooms",
"mainheat-description"
]
]
asset_list = asset_list.merge(
epc_df,
how="left",
left_on=["Address Line 1"],
right_on=["asset_list_address"]
)
asset_list = asset_list.drop(columns=["asset_list_address"])
# Rename the columns
asset_list = asset_list.rename(columns={
"inspection-date": "Date of last EPC",
"current-energy-efficiency": "SAP score on register",
"current-energy-rating": "EPC rating on register",
"property-type": "EPC Property Type",
"built-form": "Archetype",
"total-floor-area": "Property Floor Area",
"construction-age-band": "Property Age Band",
"floor-height": "Property Floor Height",
"number-habitable-rooms": "Number of Habitable Rooms",
"walls-description": "Wall Construction",
"roof-description": "Roof Construction",
"mainheat-description": "Heating Type",
"secondheat-description": "Secondary Heating",
"transaction-type": "Reason for last EPC"
})
asset_list["Estimated Number of Floors"] = asset_list.apply(
lambda x: estimate_number_of_floors(property_type=x["EPC Property Type"]) if not pd.isnull(
x["EPC Property Type"]) else None,
axis=1
)
asset_list["Property Floor Area"] = asset_list["Property Floor Area"].astype(float)
asset_list["Number of Habitable Rooms"] = asset_list["Number of Habitable Rooms"].astype(float)
asset_list["Estimated Perimeter (m)"] = asset_list.apply(
lambda x: estimate_perimeter(
floor_area=x["Property Floor Area"] / x["Estimated Number of Floors"],
num_rooms=x["Number of Habitable Rooms"] / x["Estimated Number of Floors"],
) if not pd.isnull(x["uprn"]) else None, axis=1
)
asset_list["Estimated Heat Loss Perimeter (m)"] = asset_list.apply(
lambda x: estimate_external_wall_area(
num_floors=x["Estimated Number of Floors"],
floor_height=float(x["Property Floor Height"]) if x["Property Floor Height"] else 2.5,
perimeter=x["Estimated Perimeter (m)"],
built_form=x["Archetype"]
) if not pd.isnull(x["uprn"]) else None,
axis=1
)
asset_list["Roof Insulation Thickness"] = asset_list.apply(
lambda x: RoofAttributes(description=x["Roof Construction"]).process()["insulation_thickness"] if not pd.isnull(
x["uprn"]) else None,
axis=1
)
# Store as an excel
filename = "UNITAS BUNGALOWS - EPC DATA PULL - May 30tg 2024.xlsx"
asset_list.to_excel(filename, index=False)

View file

@ -1,3 +1,4 @@
import itertools
from backend.Property import Property
from recommendations.FloorRecommendations import FloorRecommendations
from recommendations.WallRecommendations import WallRecommendations
@ -18,7 +19,7 @@ class Mds:
Handles the contruction of the MDS report
"""
def __init__(self, property_instance: Property, materials):
def __init__(self, property_instance: Property, materials, optimise_measures: bool = False):
self.property_instance = property_instance
self.floor_recommender = FloorRecommendations(property_instance=property_instance, materials=materials)
@ -35,13 +36,128 @@ class Mds:
self.hotwater_recommender = HotwaterRecommendations(property_instance=property_instance)
self.secondary_heating_recommender = SecondaryHeating(property_instance=property_instance)
# This flag indicates that we wish to optimise the measures, to the property, depending on the set of measures
# we have been provided
self.optimise_measures = optimise_measures
def select_optimal_measure_set(self, measures):
# This is the set
all_considered_measures = [
'external_wall_insulation',
'cavity_wall_insulation',
'loft_insulation',
'air_source_heat_pump',
'high_heat_retention_storage_heaters',
'solar_pv'
]
# Check if our measures are within the ones we've handled
new = [m for m in measures if m not in all_considered_measures]
if new:
raise NotImplementedError("New measures - handle me")
def prune_options(options, measures):
options_pruned = []
for _group in options:
group_pruned = [m for m in _group if m in measures]
if not group_pruned:
continue
options_pruned.append(group_pruned)
return options_pruned
# For options in here, a property could only possibly have one of these
one_choice_options = [
["external_wall_insulation", "cavity_wall_insulation", "internal_wall_insulation"],
["loft_insulation", "flat_roof_insulation", "room_in_roof_insulation"],
["solid_floor_insulation", "suspended_floor_insulation"],
]
# prune one_choice_options based on the measure set considered for this property
one_choice_options_pruned = prune_options(one_choice_options, measures)
# For options in here, a property could have one or the other so all should be considered
multi_path_options = [
["air_source_heat_pump", "high_heat_retention_storage_heaters", "gas_boiler"]
]
multi_path_options_pruned = prune_options(multi_path_options, measures)
one_choice_combinations = [list(itertools.product(*one_choice_options_pruned))]
one_choice_combinations = [list(x) for sublist in one_choice_combinations for x in sublist]
multi_path_combinations = [list(itertools.product(*multi_path_options_pruned))]
multi_path_combinations = [list(x) for sublist in multi_path_combinations for x in sublist]
one_choice_flat = [item for sublist in one_choice_options_pruned for item in sublist]
multi_path_flat = [item for sublist in multi_path_options_pruned for item in sublist]
remaining_measures = [
measure for measure in measures
if measure not in one_choice_flat and measure not in multi_path_flat
]
# Combine one_choice and multi_path combinations with remaining measures
final_combinations = []
for one_choice in one_choice_combinations:
for multi_path in multi_path_combinations:
final_combinations.append([m for m in one_choice + multi_path + remaining_measures])
pruned_combinations = []
for combination in final_combinations:
pruned_measures = []
for measure in combination:
if measure not in measures:
continue
# There are certain measures where we need to
if measure == "external_wall_insulation":
# Check if the wall is solid
if self.property_instance.walls['is_solid_brick']:
pruned_measures.append(measure)
continue
if measure == "cavity_wall_insulation":
# Check if the wall is cavity
if self.property_instance.walls['is_cavity_wall']:
pruned_measures.append(measure)
continue
if measure == "loft_insulation":
# Check if the roof is suitable for loft insulation
if self.property_instance.roof["is_pitched"]:
pruned_measures.append(measure)
continue
if measure == "solid_floor_insulation":
# Check if the floor is solid
if self.property_instance.floor["is_solid"]:
pruned_measures.append(measure)
continue
if measure == "suspended_floor_insulation":
# Check if the floor is suspended
if self.property_instance.floor["is_suspended"]:
pruned_measures.append(measure)
continue
pruned_measures.append(measure)
if len(combination) != len(pruned_measures):
continue
pruned_combinations.append(pruned_measures)
# We're left with the subset of measures that are possible for this property
# These are the possible groups of measures that could be applied to this home
return pruned_combinations
def build(self):
if self.property_instance.measures is None:
raise NotImplementedError("No measures in the property - implement me")
measures = self.property_instance.measures
measure_config_list = [list(m.keys())[0] for m in measures]
if self.optimise_measures:
measure_config_list = self.select_optimal_measure_set(self.property_instance.measures)
else:
measure_config_list = [list(m.keys())[0] for m in self.property_instance.measures]
not_implemented_measures = [
"party_wall_insulation",
@ -105,7 +221,7 @@ class Mds:
recs = self.insert_recommendation_id(recs, measures, "air_source_heat_pump")
mds_recommendations.append(recs)
if "electric_storage_heaters" in measure_config_list:
if "high_heat_retention_storage_heaters" in measure_config_list:
recs = self.heating_recommender.recommend_hhr_storage_heaters(
phase=0, system_change=True, heating_controls_only=False, _return=True
)