integrated new models into router

This commit is contained in:
Khalim Conn-Kowlessar 2024-01-16 19:14:27 +00:00
parent 60744d83b1
commit 47016ef89c
3 changed files with 15 additions and 218 deletions

View file

@ -159,14 +159,13 @@ class Property(Definitions):
for i, rec in enumerate(recommendations_by_type):
recommendation_record = self.base_difference_record.df.to_dict("records")[0].copy()
scoring_dict = self.create_recommendation_scoring_data(
recommendation_record=recommendation_record, recommendation=rec,
property_id=self.id, recommendation_record=recommendation_record, recommendation=rec,
)
scoring_dict['id'] = "+".join([str(self.id), str(rec["recommendation_id"])])
self.recommendations_scoring_data.append(scoring_dict)
@staticmethod
def create_recommendation_scoring_data(recommendation_record, recommendation: dict):
def create_recommendation_scoring_data(property_id, recommendation_record, recommendation: dict):
for col in [
"walls_insulation_thickness", "floor_insulation_thickness", "roof_insulation_thickness"
@ -273,6 +272,8 @@ class Property(Definitions):
]:
raise NotImplementedError("Implement me")
recommendation_record['id'] = "+".join([str(property_id), str(recommendation["recommendation_id"])])
return recommendation_record
def get_components(self, cleaned, photo_supply_lookup, floor_area_decile_thresholds):
@ -437,9 +438,9 @@ class Property(Definitions):
"floor_height": self.floor_height,
"heat_loss_corridor": self.heat_loss_corridor["heat_loss_corridor"],
"unheated_corridor_length": self.heat_loss_corridor["length"],
"number_of_open_fireplaces": self.number_of_open_fireplaces,
"number_of_extensions": self.number_of_extensions,
"number_of_storeys": self.number_of_storeys,
"number_of_open_fireplaces": self.number_of_open_fireplaces["number_of_open_fireplaces"],
"number_of_extensions": self.number_of_extensions["number_of_extensions"],
"number_of_storeys": self.number_of_storeys["number_of_storeys"],
"mains_gas": self.mains_gas,
"energy_tariff": self.data["energy-tariff"],
"primary_energy_consumption": self.energy["primary_energy_consumption"],

View file

@ -23,7 +23,7 @@ from backend.app.db.functions.recommendations_functions import (
from backend.app.db.models.portfolio import rating_lookup
from backend.app.dependencies import validate_token
from backend.app.plan.schemas import PlanTriggerRequest
from backend.app.plan.utils import create_recommendation_scoring_data, get_cleaned
from backend.app.plan.utils import get_cleaned
from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, sap_to_epc
from backend.ml_models.api import ModelApi
@ -173,6 +173,8 @@ async def trigger_plan(body: PlanTriggerRequest):
"carbon_change_predictions": get_settings().CARBON_PREDICTIONS_BUCKET
}
)
# all_predictions["heat_demand_predictions"]= all_predictions["sap_change_predictions"].copy()
# all_predictions["carbon_change_predictions"] = all_predictions["sap_change_predictions"].copy()
# Insert the predictions into the recommendations and run the optimiser
logger.info("Optimising recommendations")
@ -283,6 +285,7 @@ async def trigger_plan(body: PlanTriggerRequest):
scoring_dict = {}
for rec in default_recommendations:
scoring_dict = Property.create_recommendation_scoring_data(
property_id=property_instance.id,
recommendation_record=recommendation_record,
recommendation=rec
)
@ -297,33 +300,6 @@ async def trigger_plan(body: PlanTriggerRequest):
# PERFORM SAME STEPS AGAIN - TODO: TO BE REMOVED
combined_recommendations_scoring_data = pd.DataFrame(combined_recommendations_scoring_data)
# Perform the same cleaning as in the model - first clean number of room variables though
combined_recommendations_scoring_data = EPCDataProcessor.apply_averages_cleaning(
data_to_clean=combined_recommendations_scoring_data,
cleaning_data=cleaning_data,
cols_to_merge_on=['PROPERTY_TYPE', 'BUILT_FORM', 'CONSTRUCTION_AGE_BAND', 'LOCAL_AUTHORITY'],
colnames=["NUMBER_HABITABLE_ROOMS", "NUMBER_HEATED_ROOMS"],
)
combined_recommendations_scoring_data = EPCDataProcessor.apply_averages_cleaning(
data_to_clean=combined_recommendations_scoring_data,
cleaning_data=cleaning_data,
cols_to_merge_on=COLUMNS_TO_MERGE_ON + ["LOCAL_AUTHORITY"],
).drop(columns=["LOCAL_AUTHORITY"])
combined_recommendations_scoring_data = EPCDataProcessor.clean_missings_after_description_process(
combined_recommendations_scoring_data,
ignore_cols=[
c for c in combined_recommendations_scoring_data.columns if ("thermal_transmittance" in c) or (
"insulation_thickness" in c) or ("ENERGY_EFF" in c)
]
)
combined_recommendations_scoring_data = EPCDataProcessor.clean_efficiency_variables(
combined_recommendations_scoring_data
)
model_api = ModelApi(portfolio_id=body.portfolio_id, timestamp=created_at)
all_combined_predictions = model_api.predict_all(
df=combined_recommendations_scoring_data,
bucket=get_settings().DATA_BUCKET,
@ -334,6 +310,10 @@ async def trigger_plan(body: PlanTriggerRequest):
}
)
# all_combined_predictions["heat_demand_predictions"]= all_combined_predictions["sap_change_predictions"].copy()
# all_combined_predictions["carbon_change_predictions"] = all_combined_predictions[
# "sap_change_predictions"].copy()
# We update the carbon and heat demand predictions
for property_id, property_recommendations in recommendations.items():
combined_heat_demand = all_combined_predictions["heat_demand_predictions"]

View file

@ -25,187 +25,3 @@ def get_cleaned():
cleaned = msgpack.unpackb(cleaned, raw=False)
return cleaned
def create_recommendation_scoring_data(
property: Property,
recommendation: dict,
starting_epc_data: pd.DataFrame,
ending_epc_data: pd.DataFrame,
fixed_data: pd.DataFrame,
):
"""
This wrapper function prepares data to be passed to the sap model api
:return:
"""
# TODO: This needs to be complete depracated
scoring_dict = {
"UPRN": property.data["uprn"],
"id": "+".join([str(property.id), str(recommendation["recommendation_id"])]),
"LOCAL_AUTHORITY": property.data["local-authority"],
**starting_epc_data.to_dict("records")[0],
**ending_epc_data.to_dict("records")[0],
**fixed_data.to_dict("records")[0]
}
# Set staring u-values if we don't have them
if scoring_dict["walls_thermal_transmittance"] is None:
scoring_dict["walls_thermal_transmittance"] = get_wall_u_value(
clean_description=property.walls["clean_description"],
age_band=property.age_band,
is_granite_or_whinstone=property.walls["is_granite_or_whinstone"],
is_sandstone_or_limestone=property.walls["is_sandstone_or_limestone"]
)
if scoring_dict["floor_thermal_transmittance"] is None:
scoring_dict["floor_thermal_transmittance"] = get_floor_u_value(
floor_type=property.floor_type,
area=property.floor_area,
perimeter=property.perimeter,
wall_type=property.wall_type,
insulation_thickness=property.floor["insulation_thickness"],
age_band=property.age_band,
)
if scoring_dict["roof_thermal_transmittance"] is None:
scoring_dict["roof_thermal_transmittance"] = get_roof_u_value(
insulation_thickness=property.roof["insulation_thickness"],
has_dwelling_above=property.roof["has_dwelling_above"],
is_loft=property.roof["is_loft"],
is_roof_room=property.roof["is_roof_room"],
is_thatched=property.roof["is_thatched"],
age_band=property.age_band,
is_flat=property.roof["is_flat"],
is_pitched=property.roof["is_pitched"],
is_at_rafters=property.roof["is_at_rafters"],
)
for col in [
"walls_insulation_thickness", "floor_insulation_thickness", "roof_insulation_thickness"
]:
if scoring_dict[col] is None:
scoring_dict[col] = "none"
# We update the description to indicate it's insulated
if recommendation["type"] in ["internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"]:
# The upgrade made here is to the u-value of the walls and the description of the
# insulation thickness
scoring_dict["walls_thermal_transmittance_ending"] = recommendation["new_u_value"]
scoring_dict["walls_insulation_thickness_ending"] = "above average"
scoring_dict["walls_energy_eff_ending"] = "Good"
else:
if scoring_dict["walls_thermal_transmittance_ending"] is None:
scoring_dict["walls_thermal_transmittance_ending"] = get_wall_u_value(
clean_description=property.walls["clean_description"],
age_band=property.age_band,
is_granite_or_whinstone=property.walls["is_granite_or_whinstone"],
is_sandstone_or_limestone=property.walls["is_sandstone_or_limestone"]
)
if scoring_dict["walls_insulation_thickness_ending"] is None:
scoring_dict["walls_insulation_thickness_ending"] = "none"
# Update description to indicate it's insulate
if recommendation["type"] in ["solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation"]:
if len(recommendation["parts"]) > 1:
raise NotImplementedError("Have more than 1 floor insulation part - handle this case")
scoring_dict["floor_thermal_transmittance_ending"] = recommendation["new_u_value"]
# We don't really see above average for this in the training data
scoring_dict["floor_insulation_thickness_ending"] = "average"
scoring_dict["floor_energy_eff_ending"] = "Good"
else:
if scoring_dict["floor_thermal_transmittance_ending"] is None:
scoring_dict["floor_thermal_transmittance_ending"] = get_floor_u_value(
floor_type=property.floor_type,
area=property.floor_area,
perimeter=property.perimeter,
wall_type=property.wall_type,
insulation_thickness=property.floor["insulation_thickness"],
age_band=property.age_band,
)
if scoring_dict["floor_insulation_thickness_ENDING"] is None:
scoring_dict["floor_insulation_thickness_ENDING"] = "none"
if recommendation["type"] in ["loft_insulation", "room_roof_insulation", "flat_roof_insulation"]:
scoring_dict["roof_thermal_transmittance_ENDING"] = recommendation["new_u_value"]
parts = recommendation["parts"]
if len(parts) != 1:
raise ValueError("More than one part for roof insulation - investiage me")
# This is based on the values we have in the training data
valid_numeric_values = [
12, 25, 50, 75, 100, 150, 200, 250, 270, 300, 350, 400
]
proposed_depth = int(parts[0]["depth"])
if proposed_depth not in valid_numeric_values:
# Take the nearest value for scoring
proposed_depth = min(valid_numeric_values, key=lambda x: abs(x - proposed_depth))
scoring_dict["roof_insulation_thickness_ENDING"] = str(proposed_depth)
scoring_dict["ROOF_ENERGY_EFF_ENDING"] = "Very Good"
else:
# Fill missing roof u-values - this fill is not based on recommended upgrades
if scoring_dict["roof_thermal_transmittance_ENDING"] is None:
scoring_dict["roof_thermal_transmittance_ENDING"] = get_roof_u_value(
insulation_thickness=property.roof["insulation_thickness"],
has_dwelling_above=property.roof["has_dwelling_above"],
is_loft=property.roof["is_loft"],
is_roof_room=property.roof["is_roof_room"],
is_thatched=property.roof["is_thatched"],
age_band=property.age_band,
is_flat=property.roof["is_flat"],
is_pitched=property.roof["is_pitched"],
is_at_rafters=property.roof["is_at_rafters"],
)
if scoring_dict["roof_insulation_thickness_ENDING"] is None:
scoring_dict["roof_insulation_thickness_ENDING"] = "none"
if recommendation["type"] == "mechanical_ventilation":
scoring_dict["MECHANICAL_VENTILATION_ENDING"] = 'mechanical, extract only'
if recommendation["type"] == "sealing_open_fireplace":
scoring_dict["NUMBER_OPEN_FIREPLACES_ENDING"] = 0
if recommendation["type"] == "low_energy_lighting":
scoring_dict["LOW_ENERGY_LIGHTING_ENDING"] = 100
scoring_dict["LIGHTING_ENERGY_EFF_STARTING"] = "Very Good"
if recommendation["type"] == "windows_glazing":
scoring_dict["MULTI_GLAZE_PROPORTION_ENDING"] = 100
scoring_dict["WINDOWS_ENERGY_EFF_ENDING"] = "Average"
is_secondary_glazing = recommendation["is_secondary_glazing"]
if scoring_dict["glazing_type_ENDING"] == "multiple":
pass
elif scoring_dict["glazing_type_ENDING"] == "single":
scoring_dict["glazing_type_ENDING"] = "secondary" if is_secondary_glazing else "double"
elif scoring_dict["glazing_type_ENDING"] == "double":
scoring_dict["glazing_type_ENDING"] = "multiple" if is_secondary_glazing else "double"
elif scoring_dict["glazing_type_ENDING"] == "secondary":
scoring_dict["glazing_type_ENDING"] = "secondary" if is_secondary_glazing else "multiple"
elif scoring_dict["glazing_type_ENDING"] in ["triple", "high performance"]:
scoring_dict["glazing_type_ENDING"] = "multiple"
else:
raise ValueError("Invalid glazing type - implement me")
if recommendation["type"] == "solar_pv":
scoring_dict["PHOTO_SUPPLY_ENDING"] = recommendation["photo_supply"]
if recommendation["type"] not in [
"mechanical_ventilation", "sealing_open_fireplace", "low_energy_lighting",
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
"loft_insulation", "room_roof_insulation", "flat_roof_insulation",
"solid_floor_insulation", "suspended_floor_insulation", "exposed_floor_insulation",
"windows_glazing", "solar_pv"
]:
raise NotImplementedError("Implement me")
return scoring_dict