Merge pull request #358 from Hestia-Homes/remote-assessment

Remote assessment
This commit is contained in:
KhalimCK 2024-10-21 12:28:43 +01:00 committed by GitHub
commit 27737d82fd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 510 additions and 95 deletions

View file

@ -75,6 +75,7 @@ class Property:
postcode,
address,
epc_record,
property_valuation=None,
already_installed=None,
non_invasive_recommendations=None,
measures=None,
@ -111,6 +112,8 @@ class Property:
else:
self.measures = ast.literal_eval(measures) if measures else None
self.valuation = property_valuation
self.uprn = epc_record.get("uprn")
self.uprn_source = self.data.get("uprn-source")
@ -535,7 +538,8 @@ class Property:
"loft_insulation", "room_roof_insulation", "flat_roof_insulation",
"solid_floor_insulation", "suspended_floor_insulation",
"windows_glazing", "solar_pv", "heating", "hot_water_tank_insulation",
"heating_control", "secondary_heating", "cylinder_thermostat", "mixed_glazing"
"heating_control", "secondary_heating", "cylinder_thermostat", "mixed_glazing",
"extension_cavity_wall_insulation",
]:
raise NotImplementedError(
"Implement me, given type %s" % recommendation["type"]

View file

@ -272,23 +272,10 @@ class GoogleSolarApi:
roi_summary = []
for segment in roof_segment_summaries:
if segment["panelsCount"] < min_panels:
continue
wattage = segment["panelsCount"] * self.insights_data["solarPotential"]["panelCapacityWatts"]
generated_dc_energy = segment["yearlyEnergyDcKwh"]
ratio = generated_dc_energy / wattage
if cost_instance is None:
cost = MCS_SOLAR_PV_COST_DATA["average_cost_per_kwh"] * (wattage / 1000)
else:
cost = cost_instance.solar_pv(
n_panels=segment["panelsCount"],
has_battery=False,
n_floors=property_instance.number_of_floors,
)["total"]
roi_summary.append(
{
"segmentIndex": segment["segmentIndex"],
@ -296,7 +283,6 @@ class GoogleSolarApi:
"generated_dc_energy": generated_dc_energy,
"ratio": ratio,
"n_panels": segment["panelsCount"],
"cost": cost,
"panneled_roof_area": self.panel_area * int(segment["panelsCount"])
}
)
@ -305,10 +291,21 @@ class GoogleSolarApi:
if roi_summary.empty:
continue
if roi_summary["n_panels"].sum() < min_panels:
continue
if cost_instance is None:
total_cost = MCS_SOLAR_PV_COST_DATA["average_cost_per_kwh"] * (wattage / 1000)
else:
total_cost = cost_instance.solar_pv(
n_panels=roi_summary["n_panels"].sum(),
has_battery=False,
n_floors=property_instance.number_of_floors,
)["total"]
weighted_ratio = np.average(
roi_summary["ratio"].values, weights=roi_summary["generated_dc_energy"].values
)
total_cost = roi_summary["cost"].sum()
yearly_dc_energy = roi_summary["generated_dc_energy"].sum()
panel_performance.append(

View file

@ -4,8 +4,10 @@ PESSIMISTIC_ASHP_EFFICIENCY = 200
AVERAGE_ASHP_EFFICIENCY = 250
# Conservative estimate of the proportion of electricity that will be consumed, whereas the rest will
# be exported
# be exported. These are averages based on Google research. E.g
# https://www.nea.org.uk/who-we-are/innovation-technical-evaluation/solarpv/solarpv-batteries
SOLAR_CONSUMPTION_PROPORTION = 0.5
SOLAR_CONSUMPTION_WITH_BATTERY_PROPORTION = 0.7
# Typically, each solar panel takes up around 3.4 m2 of roof space under RdSAP. This was been verified in Elmhurst
RDSAP_AREA_PER_PANEL = 3.4

View file

@ -289,7 +289,7 @@ def create_epc_records(epc_searcher: SearchEpc, energy_assessment: dict):
}, energy_assessment_is_newer
def get_on_site_data(body: PlanTriggerRequest):
def get_request_property_data(body: PlanTriggerRequest):
"""
This function will read in the on-site data from the S3 bucket
:param body: The request body
@ -311,10 +311,18 @@ def get_on_site_data(body: PlanTriggerRequest):
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.non_invasive_recommendations_file_path
)
return patches, already_installed, non_invasive_recommendations
valuation_data = []
if body.valuation_file_path:
valuation_data = read_csv_from_s3(
bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.valuation_file_path
)
return patches, already_installed, non_invasive_recommendations, valuation_data
def extract_property_on_site_recommendations(config, patches, already_installed, non_invasive_recommendations, uprn):
def extract_property_request_data(
config, patches, already_installed, non_invasive_recommendations, valuation_data, uprn
):
patch_has_uprn = "uprn" in patches[0] if patches else True
if patch_has_uprn:
patch = next((
@ -360,7 +368,12 @@ def extract_property_on_site_recommendations(config, patches, already_installed,
property_non_invasive_recommendations["recommendations"] = str(transformed)
return patch, property_already_installed, property_non_invasive_recommendations
property_valution = next((
float(x["value"]) for x in valuation_data if
(str(x["uprn"]) == str(uprn))
), None)
return patch, property_already_installed, property_non_invasive_recommendations, property_valution
router = APIRouter(
@ -384,7 +397,7 @@ async def trigger_plan(body: PlanTriggerRequest):
logger.info("Getting the inputs")
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
# If we have patches or overrides, we should read them in here
patches, already_installed, non_invasive_recommendations = get_on_site_data(body)
patches, already_installed, non_invasive_recommendations, valuation_data = get_request_property_data(body)
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
@ -412,8 +425,6 @@ async def trigger_plan(body: PlanTriggerRequest):
# We check for an energy assessment we have performed on this property:
energy_assessment = get_latest_assessment_by_uprn(session, uprn if uprn is not None else epc_searcher.uprn)
if not energy_assessment["epc"]:
continue
# Create a record in db
property_id, is_new = create_property(
session, body.portfolio_id, epc_searcher.address_clean, epc_searcher.postcode_clean,
@ -440,9 +451,14 @@ async def trigger_plan(body: PlanTriggerRequest):
epc_searcher, energy_assessment
)
patch, property_already_installed, property_non_invasive_recommendations = (
extract_property_on_site_recommendations(
config, patches, already_installed, non_invasive_recommendations, uprn
patch, property_already_installed, property_non_invasive_recommendations, property_valuation = (
extract_property_request_data(
config=config,
patches=patches,
already_installed=already_installed,
non_invasive_recommendations=non_invasive_recommendations,
valuation_data=valuation_data,
uprn=epc_searcher.uprn,
)
)
@ -462,6 +478,7 @@ async def trigger_plan(body: PlanTriggerRequest):
postcode=epc_searcher.postcode_clean,
epc_record=prepared_epc,
already_installed=property_already_installed,
property_valuation=property_valuation,
non_invasive_recommendations=property_non_invasive_recommendations,
energy_assessment=energy_assessment,
**Property.extract_kwargs(config), # TODO: Depraecate this

View file

@ -52,6 +52,9 @@ NON_INVASIVE_SPECIFIC_MEASURES = [
"draught_proofing",
"mixed_glazing", # This covers partial double glazing and secondary glazing
"cavity_extract_and_refill",
# Indicates that there is one (need to handle the case where there are multiple)
# extension that requires cavity wall insulation
"extension_cavity_wall_insulation",
]
# This allows us to extend high level categories for measures such as "wall_insulation" to the specific measures
@ -78,6 +81,7 @@ class PlanTriggerRequest(BaseModel):
already_installed_file_path: Optional[str] = None
patches_file_path: Optional[str] = None
non_invasive_recommendations_file_path: Optional[str] = None
valuation_file_path: Optional[str] = None
exclusions: Optional[conlist(str, min_items=1)] = None
inclusions: Optional[conlist(str, min_items=1)] = None

View file

@ -1,4 +1,5 @@
import numpy as np
from scipy.constants import value
class PropertyValuation:
@ -203,9 +204,12 @@ class PropertyValuation:
@classmethod
def estimate(cls, property_instance, target_epc):
value = cls.UPRN_VALUE_LOOKUP.get(property_instance.uprn)
current_value = (
property_instance.valuation if property_instance.valuation else
cls.UPRN_VALUE_LOOKUP.get(property_instance.uprn)
)
if not value:
if not current_value:
return {
"current_value": 0,
"lower_bound_increased_value": 0,
@ -235,12 +239,13 @@ class PropertyValuation:
max_increase = max(all_increases)
min_increase = min(all_increases)
avg_increase = np.mean(all_increases)
return {
"current_value": value,
"lower_bound_increased_value": value * (1 + min_increase),
"upper_bound_increased_value": value * (1 + max_increase),
"average_increased_value": value * (1 + avg_increase),
"average_increase": value * (1 + avg_increase) - value
"current_value": current_value,
"lower_bound_increased_value": current_value * (1 + min_increase),
"upper_bound_increased_value": current_value * (1 + max_increase),
"average_increased_value": current_value * (1 + avg_increase),
"average_increase": current_value * (1 + avg_increase) - current_value
}

View file

@ -417,9 +417,14 @@ def slides():
# Show more characters in a column
pd.set_option('display.max_colwidth', None)
# preparing of this data for the following 2 needs:
# 1) dataset to share with Nextgen heating
# 2) Breakdown of results by property type
def lewes_outputs():
"""
preparing of this data for the following 2 needs:
1) dataset to share with Nextgen heating
2) Breakdown of results by property type
:return:
"""
# get the asset list
asset_list = read_csv_from_s3(bucket_name="retrofit-plan-inputs-dev", filepath="8/90/pilot.csv")
@ -431,6 +436,14 @@ def slides():
)
non_intrusive_recommendations = pd.DataFrame(non_intrusive_recommendations)
# Right now this is the second version of the nehaven portfolio
portfolio_id = 90
# Look at one scenario at a time, otherwise this is agony
scenario_ids = [47, 48, 49, 50, 51]
properties_data, plans_data, recommendations_data = get_data(portfolio_id, scenario_ids)
properties_df = pd.DataFrame(properties_data)
recommendations_df = pd.DataFrame(recommendations_data)
# Unnest this
import ast
survey_recs = []
@ -502,27 +515,73 @@ def slides():
# We now pull out the recommendations impact by property type and sub type
# Exclude sealing open fireplaces
recommendations_df = recommendations_df[recommendations_df["type"] != "sealing_open_fireplace"]
# We update the type column so that if type == heating, and the description contains "air source heat pump",
# the type is "air_source_heat_pump", else if the description contains "high heat retention storage heaters", else
# if the description contains "condensing boiler, the type is updated to "boiler_upgrade"
recommendations_df["type"] = np.where(
recommendations_df["type"] == "heating",
np.where(
recommendations_df["description"].str.contains("air source heat pump"),
"Air Source Heat Pump",
np.where(
recommendations_df["description"].str.contains("high heat retention"),
"High Heat Retention Storage",
np.where(
recommendations_df["description"].str.contains("condensing boiler"),
"Boiler Upgrade",
recommendations_df["type"]
)
)
),
recommendations_df["type"]
)
recommendation_types = recommendations_df["type"].unique().tolist()
rename_dict = {
'hot_water_tank_insulation': 'Hot Water Tank Insulation',
'windows_glazing': 'Windows Glazing',
'secondary_heating': 'Secondary Heating',
'cavity_wall_insulation': 'Cavity Wall Insulation',
'flat_roof_insulation': 'Flat Roof Insulation',
'mechanical_ventilation': 'Mechanical Ventilation',
'loft_insulation': 'Loft Insulation',
'cylinder_thermostat': 'Cylinder Thermostat',
'room_roof_insulation': 'Room Roof Insulation',
'low_energy_lighting': 'Low Energy Lighting',
'external_wall_insulation': 'External Wall Insulation',
'solar_pv': 'Solar PV',
'heating_control': 'Heating Control',
'solid_floor_insulation': 'Solid Floor Insulation',
'suspended_floor_insulation': 'Suspended Floor Insulation',
'internal_wall_insulation': 'Internal Wall Insulation'
}
property_scenario_impact = []
for scenario_id in scenario_ids:
for scenario_id in tqdm(scenario_ids):
# Get the recommendations for the scenario, default
scenario_recommendations = recommendations_df[
(recommendations_df["Scenario ID"] == scenario_id) &
(recommendations_df["default"] == True)
].copy()
scenario_recommendations['ligting_kwh'] = scenario_recommendations.apply(
scenario_recommendations['Estimated Lighting kWh Savings'] = scenario_recommendations.apply(
lambda x: x['kwh_savings'] if x['type'] == 'low_energy_lighting' else 0,
axis=1)
scenario_recommendations['solar_kwh'] = scenario_recommendations.apply(
scenario_recommendations['Estimated Solar kWh Savings'] = scenario_recommendations.apply(
lambda x: x['kwh_savings'] if x['type'] == 'solar_pv' else 0, axis=1)
# Set 'Estimated Kwh Savings' to zero where specific kwh columns are used
scenario_recommendations['Estimated Kwh Savings'] = scenario_recommendations.apply(
scenario_recommendations['Estimated Heating Demand kWh Savings'] = scenario_recommendations.apply(
lambda x: 0 if x['type'] in ['low_energy_lighting', 'solar_pv'] else x[
'kwh_savings'], axis=1)
scenario_grouped_data = scenario_recommendations.groupby(['property_id']).agg({
'Estimated Kwh Savings': 'sum',
'Estimated Heating Demand kWh Savings': 'sum',
'Estimated Lighting kWh Savings': 'sum',
'Estimated Solar kWh Savings': 'sum',
"estimated_cost": "sum"
}).reset_index()
@ -531,18 +590,52 @@ def slides():
].merge(
scenario_grouped_data, on=["property_id"], how="left"
)
comparison["Estimated Kwh Savings"] = comparison["Estimated Kwh Savings"].fillna(0)
comparison["Estimated Heating Demand kWh Savings"] = (
comparison["Estimated Heating Demand kWh Savings"].fillna(0)
)
comparison["Estimated Lighting kWh Savings"] = (
comparison["Estimated Lighting kWh Savings"].fillna(0)
)
comparison["Estimated Solar kWh Savings"] = (
comparison["Estimated Solar kWh Savings"].fillna(0)
)
comparison["estimated_cost"] = comparison["estimated_cost"].fillna(0)
comparison["post_scenario_heating_hotwater_kwh"] = (
comparison["current_energy_demand_heating_hotwater"] - comparison["Estimated Kwh Savings"]
comparison["current_energy_demand_heating_hotwater"] - comparison["Estimated Heating Demand kWh Savings"]
)
# For each scenario, we create a measure matrix
measure_matrix = scenario_recommendations.pivot_table(
index='property_id',
columns='type',
values='id', # Using 'id' just as a placeholder for the pivot
aggfunc=lambda x: True, # If an ID exists for a given type, mark as True
fill_value=False # Fill other entries as False
).reset_index()
non_zero_heat_demand_impact = comparison[
(comparison["Estimated Heating Demand kWh Savings"] > 0) |
(comparison["Estimated Lighting kWh Savings"] > 0) |
(comparison["Estimated Solar kWh Savings"] > 0)
]
measure_matrix = measure_matrix[
measure_matrix["property_id"].isin(non_zero_heat_demand_impact["property_id"].values)
]
measure_matrix = measure_matrix.rename(columns=rename_dict)
comparison = comparison.merge(
measure_matrix, on="property_id", how="left"
)
comparison["scenario_id"] = scenario_id
property_scenario_impact.append(comparison)
property_scenario_impact = pd.concat(property_scenario_impact)
property_scenario_impact = property_scenario_impact.drop(columns=["property_id", "Estimated Kwh Savings"])
# property_scenario_impact = property_scenario_impact.drop(columns=["property_id", "Estimated Kwh Savings"])
for v in list(rename_dict.values()) + ["Air Source Heat Pump", "High Heat Retention Storage", "Boiler Upgrade"]:
# Fill NaNs with False
property_scenario_impact[v] = property_scenario_impact[v].fillna(False)
# Scale
property_scenario_impact["post_scenario_heating_hotwater_kwh_scaled"] = (
@ -600,57 +693,119 @@ def slides():
"post_scenario_heating_hotwater_kwh_scaled"]].empty:
raise Exception("someting went wrong")
# Reorder the columns
grouped_data = grouped_data[
[
'property_type',
'property_sub_type',
'scenario',
'estimated_heating_hotwater_kwh',
'post_scenario_heating_hotwater_kwh',
'estimated_heating_hotwater_kwh_scaled',
'post_scenario_heating_hotwater_kwh_scaled',
'estimated_cost',
]
# Reorder the columns
grouped_data = grouped_data[
[
'property_type',
'property_sub_type',
'scenario',
'estimated_heating_hotwater_kwh',
'post_scenario_heating_hotwater_kwh',
'estimated_heating_hotwater_kwh_scaled',
'post_scenario_heating_hotwater_kwh_scaled',
'estimated_cost',
]
]
grouped_data = grouped_data.rename(
columns={
"property_type": "Property Type",
"property_sub_type": "Property Sub Type",
"scenario": "Scenario",
"estimated_heating_hotwater_kwh": "Estimated Heating & Hot Water kwh",
"post_scenario_heating_hotwater_kwh": "Post Scenario Heating & Hot Water kwh",
"estimated_heating_hotwater_kwh_scaled": "Estimated Heating & Hot Water kwh (scaled)",
"post_scenario_heating_hotwater_kwh_scaled": "Post Scenario Heating & Hot Water kwh (scaled)",
"estimated_cost": "Estimated Cost or Retrofit",
}
)
grouped_data = grouped_data.rename(
columns={
"property_type": "Property Type",
"property_sub_type": "Property Sub Type",
"scenario": "Scenario",
"estimated_heating_hotwater_kwh": "Estimated Heating & Hot Water kwh",
"post_scenario_heating_hotwater_kwh": "Post Scenario Heating & Hot Water kwh",
"estimated_heating_hotwater_kwh_scaled": "Estimated Heating & Hot Water kwh (scaled)",
"post_scenario_heating_hotwater_kwh_scaled": "Post Scenario Heating & Hot Water kwh (scaled)",
"estimated_cost": "Estimated Cost or Retrofit",
}
)
grouped_data.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Newhaven/outputs/Scenario kWh Impact by Property "
"Type.xlsx",
index=False
)
# grouped_data.to_excel(
# "/Users/khalimconn-kowlessar/Documents/hestia/Customers/Newhaven/outputs/Scenario kWh Impact by Property "
# "Type.xlsx",
# index=False
# )
property_scenario_impact = property_scenario_impact.merge(
scenario_names, how="left", on="scenario_id"
)
property_scenario_impact = property_scenario_impact.merge(
scenario_names, how="left", on="scenario_id"
)
df_pivot = property_scenario_impact.pivot_table(index='uprn', columns='scenario',
values=['post_scenario_heating_hotwater_kwh',
'post_scenario_heating_hotwater_kwh_scaled'])
lewes_data = next_gen_dataset.merge(
property_scenario_impact, how="left", on="uprn"
)
# Flattening multi-index columns
df_pivot.columns = [f'{col[0]}_{col[1]}' for col in df_pivot.columns]
lewes_data = lewes_data.sort_values(
["postcode", "uprn", "scenario_id"], ascending=True
)
# Reset the index to have a clean dataframe
df_pivot.reset_index(inplace=True)
# Rearrange, rename columns and drop what we don't need
# TODO - remap the heating type
lewes_data = lewes_data[
[
'uprn', 'address', 'postcode', 'property_type', 'built_form',
# 'estimated_heating_hotwater_kwh',
'primary_fuel_type', 'gross_floor_area', 'floor_height', 'number_of_floors', 'ashp_suitable',
'ashp_size_kw',
'ashp_cost', 'solar_suitable', 'solar_size_kwp', 'solar_cost',
'scenario',
'estimated_heating_hotwater_kwh_scaled',
'post_scenario_heating_hotwater_kwh_scaled',
# 'property_id', - dropped
# 'current_energy_demand_heating_hotwater',
'Estimated Heating Demand kWh Savings',
'Estimated Lighting kWh Savings',
'Estimated Solar kWh Savings',
'estimated_cost',
'post_scenario_heating_hotwater_kwh', 'Cavity Wall Insulation', 'Cylinder Thermostat',
'Flat Roof Insulation',
'Hot Water Tank Insulation', 'Loft Insulation', 'Mechanical Ventilation', 'Room Roof Insulation',
# 'scenario_id', - dropped
'Low Energy Lighting', 'Secondary Heating', 'Windows Glazing', 'External Wall Insulation',
'Heating Control',
'Solar PV',
'Air Source Heat Pump', 'Boiler Upgrade', 'High Heat Retention Storage',
'Internal Wall Insulation',
'Solid Floor Insulation',
'Suspended Floor Insulation',
]
].rename(
columns={
"primary_fuel_type": "Primary Fuel Type",
"gross_floor_area": "Gross Floor Area",
"floor_height": "Floor Height",
"number_of_floors": "Number of Floors",
"ashp_suitable": "Is an ASHP Suitable?",
"ashp_size_kw": "ASHP Size (kW)",
"ashp_cost": "ASHP Cost",
"solar_suitable": "Is Solar PV Suitable?",
"solar_size_kwp": "Solar PV Size (kWp)",
"solar_cost": "Solar PV Cost",
# "estimated_heating_hotwater_kwh": "Estimated Heating & Hot Water kwh",
"estimated_heating_hotwater_kwh_scaled": "Estimated Heating & Hot Water kwh",
"post_scenario_heating_hotwater_kwh_scaled": "Post Scenario Heating & Hot Water kwh",
"estimated_cost": "Estimated Cost of Scenario"
}
)
next_gen_dataset = next_gen_dataset.merge(
df_pivot, how="left", on="uprn"
)
# We save this dataset, which will be shared with Lewes Council
lewes_data.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Newhaven/outputs/Lewes property data.csv", index=False
)
next_gen_dataset.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Newhaven/outputs/next_gen_dataset.csv", index=False
)
df_pivot = property_scenario_impact.pivot_table(index='uprn', columns='scenario',
values=['post_scenario_heating_hotwater_kwh',
'post_scenario_heating_hotwater_kwh_scaled'])
# Flattening multi-index columns
df_pivot.columns = [f'{col[0]}_{col[1]}' for col in df_pivot.columns]
# Reset the index to have a clean dataframe
df_pivot.reset_index(inplace=True)
next_gen_dataset = next_gen_dataset.merge(
df_pivot, how="left", on="uprn"
)
next_gen_dataset.to_csv(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Newhaven/outputs/next_gen_dataset.csv", index=False
)

View file

@ -0,0 +1,78 @@
import pandas as pd
from utils.s3 import save_csv_to_s3
PORTFOLIO_ID = 111
USER_ID = 8
def app():
"""
This application is used to initialise and run remote assessments
:return:
"""
asset_list = [
{
"uprn": 100050770761,
"address": "12 Sheardown Street",
"postcode": "DN4 0BH"
}
]
asset_list = pd.DataFrame(asset_list)
# Store the asset list in s3
filename = f"{USER_ID}/{PORTFOLIO_ID}/asset_list.csv"
save_csv_to_s3(
dataframe=asset_list,
bucket_name="retrofit-plan-inputs-dev",
file_name=filename
)
non_invasive_recommendations = [
{
"uprn": 100050770761,
"recommendations": [
{
"type": "extension_cavity_wall_insulation",
"sap_points": 2,
}
]
}
]
# Store non-invasive recommendations in S3
non_invasive_recommendations_filename = f"{USER_ID}/{PORTFOLIO_ID}/non_invasive_recommendations.csv"
save_csv_to_s3(
dataframe=pd.DataFrame(non_invasive_recommendations),
bucket_name="retrofit-plan-inputs-dev",
file_name=non_invasive_recommendations_filename
)
valuation_data = [
{
"uprn": 100050770761,
"value": 67_000
}
]
# Store valuation data to s3
valuation_filename = f"{USER_ID}/{PORTFOLIO_ID}/valuation.csv"
save_csv_to_s3(
dataframe=pd.DataFrame(valuation_data),
bucket_name="retrofit-plan-inputs-dev",
file_name=valuation_filename
)
body = {
"portfolio_id": str(PORTFOLIO_ID),
"housing_type": "Private",
"goal": "Increasing EPC",
"goal_value": "C",
"trigger_file_path": filename,
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": non_invasive_recommendations_filename,
"valuation_file_path": valuation_filename,
"scenario_name": "Full package remote assessment",
"multi_plan": True,
"budget": None,
}
print(body)

View file

@ -128,6 +128,13 @@ class Recommendations:
property_recommendations.append(self.wall_recomender.recommendations)
phase += 1
# We handle recommendations covering specific non-invasive measures
new_phase = self.wall_recomender.recommend_extended(phase=phase, measures=measures)
if self.wall_recomender.extended_recommendations:
property_recommendations.append(self.wall_recomender.extended_recommendations)
# We don't have any phasing here
phase = new_phase
self.roof_recommender.recommend(phase=phase, measures=measures, default_u_values=self.default_u_values)
if self.roof_recommender.recommendations:
property_recommendations.append(self.roof_recommender.recommendations)
@ -470,9 +477,32 @@ class Recommendations:
impact_summary = []
for recommendations_by_type in property_recommendations:
for rec in recommendations_by_type:
if rec["type"] in ["mechanical_ventilation", "trickle_vents", "draught_proofing"]:
if rec["type"] in [
"mechanical_ventilation", "trickle_vents", "draught_proofing", "extension_cavity_wall_insulation"
]:
# We don't have a percieved sap impact of mechanical ventilation or trickle vents, and we don't
# have the capacity to score draught proofing
if rec["type"] == "extension_cavity_wall_insulation":
previous_phase = [x for x in impact_summary if x["phase"] == (rec["phase"] - 1)]
if previous_phase:
sap = previous_phase[0]["sap"]
carbon = previous_phase[0]["carbon"]
heat_demand = previous_phase[0]["heat_demand"]
else:
sap = float(property_instance.data["current-energy-efficiency"])
carbon = float(property_instance.data["co2-emissions-current"])
heat_demand = float(property_instance.data["energy-consumption-current"])
impact_summary.append(
{
"phase": rec["phase"],
"recommendation_id": rec["recommendation_id"],
"sap": sap + rec["sap_points"],
"carbon": carbon - rec["co2_equivalent_savings"],
"heat_demand": heat_demand - rec["heat_demand"],
}
)
continue
phase_energy_efficiency_metrics = {
@ -563,6 +593,17 @@ class Recommendations:
property_phase_impact["carbon"], rec["co2_equivalent_savings"]
)
if rec["type"] == "loft_insulation":
# When we have a loft insulation recommendation, where there is an extension and the existing
# amount of loft insulation is already good, we limit the SAP points
# By limiting here, we don't change the value in current_phase_values. This means that the
# future recommendations won't have an impact that is too large
li_sap_limit = RoofRecommendations.get_loft_insulation_sap_limit(
property_instance.data["roof-energy-eff"], property_instance.data["extension-count"]
)
if li_sap_limit is not None:
property_phase_impact["sap"] = min(property_phase_impact["sap"], li_sap_limit)
# Insert this information into the recommendation.
if not rec.get("survey", False):
rec["sap_points"] = property_phase_impact["sap"]
@ -664,7 +705,11 @@ class Recommendations:
{
"phase": r["phase"],
"recommendation_id": r["recommendation_id"],
"solar_kwh_savings": r["initial_ac_kwh_per_year"] * assumptions.SOLAR_CONSUMPTION_PROPORTION,
"solar_kwh_savings": (
r["initial_ac_kwh_per_year"] * assumptions.SOLAR_CONSUMPTION_PROPORTION
) if not r["has_battery"] else (
r["initial_ac_kwh_per_year"] * assumptions.SOLAR_CONSUMPTION_WITH_BATTERY_PROPORTION
),
} for recs in property_recommendations for r in recs if r["type"] == "solar_pv"
], columns=["phase", "recommendation_id", "solar_kwh_savings"])
@ -751,7 +796,9 @@ class Recommendations:
# We now deduce if any of the recommendations result in a change of fuel type
for recs in property_recommendations:
for rec in recs:
if rec["type"] in ["mechanical_ventilation", "trickle_vents", "draught_proofing"]:
if rec["type"] in [
"mechanical_ventilation", "trickle_vents", "draught_proofing", "extension_cavity_wall_insulation"
]:
# We cannot score the impact on draught proofing
continue

View file

@ -59,6 +59,23 @@ class RoofRecommendations:
self.property.roof["is_flat"]
)
@classmethod
def get_loft_insulation_sap_limit(cls, roof_energy_eff, extension_count):
"""
Get the SAP limit for loft insulation
:param roof_energy_eff:
:return:
"""
if extension_count == 0:
# No limit
return None
if roof_energy_eff in ["Good", "Very Good"]:
return 1
return None
def mds_loft_insulation(self, phase):
"""
For usages within the mds report
@ -273,7 +290,7 @@ class RoofRecommendations:
# loft is already partially insulated.
# Note: This requirement is only for loft insulation
if (
(material["depth"] + insulation_thickness) < self.MINIMUM_RECOMMENDED_LOFT_INSULATION
material["depth"] < self.MINIMUM_RECOMMENDED_LOFT_INSULATION
) and is_pitched:
continue
@ -295,6 +312,7 @@ class RoofRecommendations:
# We allow a small tolerance for error so we don't discount the recommendation entirely
if new_u_value <= self.BUILDING_REGULATIONS_PART_L_MAX_U_VALUE:
lowest_selected_u_value = update_lowest_selected_u_value(lowest_selected_u_value, new_u_value)
cost_result = self.costs.loft_and_flat_insulation(

View file

@ -196,7 +196,7 @@ class SolarPvRecommendations:
]
roof_area = self.property.roof_area
solar_configurations = panel_performance.head(3).reset_index(drop=True)
solar_configurations = panel_performance.head(6).reset_index(drop=True)
# We combine each of these configurations with estimates with and without a battery
for rank, recommendation_config in solar_configurations.iterrows():

View file

@ -1,6 +1,7 @@
import math
from typing import List
import numpy as np
import pandas as pd
from datatypes.enums import QuantityUnits
@ -99,6 +100,8 @@ class WallRecommendations(Definitions):
# Will contains a list of recommended measures
self.recommendations = []
# Contains a list of extended recommendation measures, such as extension insulation
self.extended_recommendations = []
self.cavity_wall_insulation_materials = [
part for part in materials if part["type"] == "cavity_wall_insulation"
@ -267,6 +270,91 @@ class WallRecommendations(Definitions):
# If the u-value is within regulations, we don't do anything
return
def recommend_extended(self, phase, measures):
"""
Where we have extended measures, such as extension insulation, which cannot typically be picked up
from the EPC api, we handle the recommendation of these here
:param measures:
:return:
"""
# These are the measures that are covered by this function
extended_measures = ["extension_cavity_wall_insulation"]
measures_to_recommend = [measure for measure in measures if measure in extended_measures]
if not measures_to_recommend:
return phase
# We reset this to be empty
self.extended_recommendations = []
recommendation_phase = phase
for measure in measures_to_recommend:
if measure == "extension_cavity_wall_insulation":
recommendation = self.recommend_extension_cavity_wall_insulation(phase=recommendation_phase)
else:
raise NotImplementedError(f"Measure {measure} is not implemented")
recommendation_phase += 1
self.extended_recommendations.append(recommendation)
return recommendation_phase
def recommend_extension_cavity_wall_insulation(self, phase):
"""
This function produces the recommendation for extension cavity wall insulation
:return:
"""
# TODO: We aren't provided with carbon, heat or bill savings figures for this measure
extension_cavity_insulation_recommendation = [
r for r in self.property.non_invasive_recommendations if r["type"] == "extension_cavity_wall_insulation"
][0]
# https://surreybuildingprojects.co.uk/how-much-does-a-24m2-extension-cost
average_extension_floor_area = 24
# https://assets.publishing.service.gov.uk/media/5f047a01d3bf7f2be8350262
# /Size_of_English_Homes_Fact_Sheet_EHS_2018.pdf
# This is rough
average_house_floor_area = 94
proposed_extension_floor_area = self.property.floor_area * (
average_extension_floor_area / average_house_floor_area
)
# assume 3 walls are external
proposed_extension_insulation_wall_area = (
np.sqrt(proposed_extension_floor_area) * self.property.floor_height * 3
)
cost_result = self.costs.cavity_wall_insulation(
wall_area=proposed_extension_insulation_wall_area,
material=self.cavity_wall_insulation_materials[0],
)
recommendation = {
"phase": phase,
"parts": [],
"type": "extension_cavity_wall_insulation",
"measure_type": "extension_cavity_wall_insulation",
"description": "Insulate the cavity walls of the extension",
"starting_u_value": None,
"new_u_value": None,
"sap_points": extension_cavity_insulation_recommendation["sap_points"],
"heat_demand": 0,
"kwh_savings": 0,
"energy_savings": 0,
"energy_cost_savings": 0,
"co2_equivalent_savings": 0,
"already_installed": False,
"simulation_config": {},
"description_simulation": {},
**cost_result,
"default": True,
}
return recommendation
def find_cavity_insulation(self, u_value, insulation_thickness, phase, measures, default_u_values):
"""
This method tests different materials to fill the cavity wall, determining which