Merge pull request #301 from Hestia-Homes/eon

Eon work + addressing AWS database issues
This commit is contained in:
KhalimCK 2024-05-29 11:35:14 +01:00 committed by GitHub
commit 4482ea2628
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 1146 additions and 103 deletions

View file

@ -61,7 +61,14 @@ class Property:
n_bedrooms = None
def __init__(
self, id, postcode, address, epc_record, already_installed=None, non_invasive_recommendations=None,
self,
id,
postcode,
address,
epc_record,
already_installed=None,
non_invasive_recommendations=None,
measures=None,
**kwargs
):
@ -85,6 +92,8 @@ class Property:
ast.literal_eval(non_invasive_recommendations['recommendations']) if
non_invasive_recommendations else []
)
# This is a list of measures that have been recommended for the property
self.measures = ast.literal_eval(measures) if measures else None
self.uprn = epc_record.get("uprn")
self.full_sap_epc = epc_record.get("full_sap_epc")
@ -163,12 +172,12 @@ class Property:
:return:
"""
n_bathrooms = kwargs.get("n_bathrooms", None)
if n_bathrooms is not None:
if n_bathrooms not in [None, ""]:
# We add on a small value to ensure that the number of bathrooms is rounded up, in case the value is 0.5
n_bathrooms = int(round(float(n_bathrooms) + 1e-5))
n_bedrooms = kwargs.get("n_bedrooms", None)
if n_bedrooms is not None:
if n_bedrooms not in [None, ""]:
n_bedrooms = int(round(float(n_bedrooms) + 1e-5))
return {
@ -221,6 +230,29 @@ class Property:
# self.base_difference_record.df
def simulate_all_representative_recommendations(
self, property_representative_recommendations,
):
"""
This method was put together to simulate the impact of the representative recommendations on the property
all at once, for usage within the mds report
:return:
"""
recommendation_record = self.base_difference_record.df.to_dict("records")[
0
].copy()
scoring_dict = self.create_recommendation_scoring_data(
property_id=self.id,
recommendation_record=recommendation_record,
recommendations=property_representative_recommendations,
primary_recommendation_id=self.id,
non_invasive_recommendations=self.non_invasive_recommendations,
)
return scoring_dict
def adjust_difference_record_with_recommendations(
self, property_recommendations, property_representative_recommendations
):
@ -321,49 +353,6 @@ class Property:
for recommendation in recommendations:
# For the list of recommendations we have, we iteratively update the output
# We update the description to indicate it's insulated
if recommendation["type"] in [
"internal_wall_insulation",
"external_wall_insulation",
"cavity_wall_insulation",
]:
# # If we have a non-incasive recommendation that the cavity wall is partially filled, we skip the
# # cavity wall insulation recommendation (since on the EPC, the property will look like how it did
# # before any works)
# if "cavity_surveyed_as_filled_is_partial" in non_invasive_recommendations:
# continue
# The upgrade made here is to the u-value of the walls and the description of the
# insulation thickness
output["walls_thermal_transmittance_ending"] = recommendation[
"new_u_value"
]
# Setting the insulation thickness here to above average should be tested further because we
# don't see a high volume of instances for this
output["walls_insulation_thickness_ending"] = "average"
output["walls_energy_eff_ending"] = "Good"
# Note: often when the wall is insulatied, the internal/external insulation is not noted so we should
# test the impact of using these booleans
if recommendation["type"] == "external_wall_insulation":
output["external_insulation_ending"] = True
output["internal_insulation_ending"] = False
if recommendation["type"] == "internal_wall_insulation":
output["external_insulation_ending"] = False
output["internal_insulation_ending"] = True
if recommendation["type"] == "cavity_wall_insulation":
output["is_filled_cavity_ending"] = True
else:
if output["walls_thermal_transmittance_ending"] is None:
raise ValueError("We should not have a None value for the u value")
if output["walls_insulation_thickness_ending"] is None:
output["walls_insulation_thickness_ending"] = "none"
# Update description to indicate it's insulate
if recommendation["type"] in [
"solid_floor_insulation",
@ -375,11 +364,8 @@ class Property:
"Have more than 1 floor insulation part - handle this case"
)
# output["floor_thermal_transmittance_ending"] = recommendation["new_u_value"]
# We don't really see above average for this in the training data
output["floor_insulation_thickness_ending"] = "average"
# This is rarely ever populated in the training data
# output["floor_energy_eff_ending"] = "Good"
else:
if output["floor_thermal_transmittance_ending"] is None:
raise ValueError("We should not have a None value for the u value")
@ -418,19 +404,20 @@ class Property:
400,
]
proposed_depth = int(parts[0]["depth"])
proposed_depth = recommendation["new_thickness"]
if proposed_depth not in valid_numeric_values:
# Take the nearest value for scoring
proposed_depth = min(
valid_numeric_values, key=lambda x: abs(x - proposed_depth)
)
output["roof_insulation_thickness_ending"] = str(proposed_depth)
output["roof_insulation_thickness_ending"] = str(int(proposed_depth))
if recommendation["type"] == "loft_insulation":
if proposed_depth >= 270:
output["roof_energy_eff_ending"] = "Very Good"
else:
output["roof_energy_eff_ending"] = "Good"
if output["roof_energy_eff_ending"] not in ["Good", "Very Good"]:
output["roof_energy_eff_ending"] = "Good"
else:
output["roof_energy_eff_ending"] = "Very Good"
else:
@ -450,7 +437,8 @@ class Property:
if recommendation["type"] == "windows_glazing":
output["multi_glaze_proportion_ending"] = 100
output["windows_energy_eff_ending"] = "Average"
if output["windows_energy_eff_ending"] not in ["Average", "Good", "Very Good"]:
output["windows_energy_eff_ending"] = "Average"
is_secondary_glazing = recommendation["is_secondary_glazing"]
@ -481,9 +469,12 @@ class Property:
)
if recommendation["type"] in [
"heating", "hot_water_tank_insulation", "heating_control", "secondary_heating"
"heating", "hot_water_tank_insulation", "heating_control", "secondary_heating",
"internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation",
]:
# We update the data, as defined in the recommendaton
if output["walls_insulation_thickness_ending"] is None:
output["walls_insulation_thickness_ending"] = "none"
simulation_config = recommendation["simulation_config"]
# If any entries in simulation_config are None, we will set them to "Unknown" which is the cleaning

View file

@ -11,12 +11,14 @@ EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
# This is for 6 Laura Close, Tintagel, PL34 0EB (same property that Cotswolrd energy used)
uprn = 100040099104
# This is for 353A, Hermitage Lane, ME16 9NT (one of the e.on properties)
uprn = 200000964454
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name="retrofit-data-dev", file_key="sap_change_model/cleaning_dataset.parquet",
)
searcher = SearchEpc(address1="6 Laura Close", postcode="PL34 0EB", uprn=uprn, auth_token=EPC_AUTH_TOKEN, os_api_key="")
searcher = SearchEpc(address1="", postcode="", uprn=uprn, auth_token=EPC_AUTH_TOKEN, os_api_key="")
searcher.find_property(skip_os=True)
@ -80,7 +82,7 @@ solar_potential["panelWidthMeters"]
solar_potential["wholeRoofStats"]
# Copy of response for testing:
# Copy of response for testing - 6 Laura Close, Tintagel, PL34 0EB
# {'name': 'buildings/ChIJ2yC6t4KEa0gRh2TIssogI7k', 'center': {'latitude': 50.667375, 'longitude': -4.7416833},
# 'imageryDate': {'year': 2021, 'month': 7, 'day': 19}, 'regionCode': 'GB', 'solarPotential': {'maxArrayPanelsCount':
# 39, 'maxArrayAreaMeters2': 76.578636, 'maxSunshineHoursPerYear': 1172.0627, 'carbonOffsetFactorKgPerMwh':

View file

@ -35,6 +35,7 @@ from recommendations.optimiser.CostOptimiser import CostOptimiser
from recommendations.optimiser.GainOptimiser import GainOptimiser
from recommendations.optimiser.optimiser_functions import prepare_input_measures
from recommendations.Recommendations import Recommendations
from recommendations.Mds import Mds
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3
from backend.ml_models.Valuation import PropertyValuation
@ -618,3 +619,285 @@ async def trigger_plan(body: PlanTriggerRequest):
session.close()
return Response(status_code=200)
@router.post("/mds")
async def build_mds(body: PlanTriggerRequest):
# TODO: This is a placeholder location for the MDS endpoint, which this is being assembled
logger.info("Connecting to db")
session = sessionmaker(bind=db_engine)()
created_at = datetime.now().isoformat()
try:
session.begin()
logger.info("Getting the inputs")
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
)
input_properties = []
for property_id, config in tqdm(enumerate(plan_input), total=len(plan_input)):
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
uprn = config.get("uprn", None)
uprn = None if uprn == "" else uprn
if uprn:
uprn = int(float(uprn))
epc_searcher = SearchEpc(
address1=config["address"],
postcode=config["postcode"],
uprn=uprn,
auth_token=get_settings().EPC_AUTH_TOKEN,
os_api_key=get_settings().ORDNANCE_SURVEY_API_KEY,
)
epc_searcher.ordnance_survey_client.built_form = config.get("built_form", None)
epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None)
# For the moment, our OS API access is unavailable, so we skip and interpolate
epc_searcher.find_property(skip_os=True)
if config["address"] == "35b High Street":
print("Performing temporary patch")
epc_searcher.newest_epc["uprn"] = 10002911892
epc_searcher.full_sap_epc["uprn"] = 10002911892
# Create a record in db
# TODO: If we productionise the creation of this mds report, we will need to store this in the db
# property_id, is_new = create_property(
# session, body.portfolio_id, epc_searcher.address_clean, epc_searcher.postcode_clean, epc_searcher.uprn
# )
# if not is_new:
# continue
#
# create_property_targets(
# session,
# property_id=property_id,
# portfolio_id=body.portfolio_id,
# epc_target=body.goal_value,
# heat_demand_target=None
# )
epc_records = {
'original_epc': epc_searcher.newest_epc.copy(),
'full_sap_epc': epc_searcher.full_sap_epc.copy(),
'old_data': epc_searcher.older_epcs.copy(),
}
# patch = next((
# x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
# ), {})
# epc_records = patch_epc(patch, epc_records)
prepared_epc = EPCRecord(
epc_records=epc_records,
run_mode="newdata",
cleaning_data=cleaning_data
)
# property_already_installed = next((
# x for x in already_installed if
# (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
# ), {})
#
# property_non_invasive_recommendations = next((
# x for x in non_invasive_recommendations if
# (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
# ), {})
measures = config["measures"] if "measures" in config else None
input_properties.append(
Property(
id=property_id,
address=epc_searcher.address_clean,
postcode=epc_searcher.postcode_clean,
epc_record=prepared_epc,
# already_installed=property_already_installed,
# non_invasive_recommendations=property_non_invasive_recommendations,
measures=measures,
**Property.extract_kwargs(config)
)
)
logger.info("Reading in materials and cleaned datasets")
materials = get_materials(session)
cleaned = get_cleaned()
uprn_filenames = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet"
)
photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket=get_settings().DATA_BUCKET)
logger.info("Getting spatial data")
for p in tqdm(input_properties):
p.get_spatial_data(uprn_filenames)
logger.info("Getting components and epc recommendations")
recommendations_scoring_data = []
representative_recommendations = {}
for p in tqdm(input_properties):
p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds)
mds = Mds(property_instance=p, materials=materials)
property_representative_recommendations, errors = mds.build()
if errors:
logger.info("Errors occurred during MDS build")
representative_recommendations[p.id] = property_representative_recommendations
# Build the scoring data
p.create_base_difference_epc_record(cleaned_lookup=cleaned)
recommendations_scoring_data.append(
p.simulate_all_representative_recommendations(property_representative_recommendations)
)
logger.info("Preparing data for scoring in sap change api")
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
recommendations_scoring_data = recommendations_scoring_data.drop(
columns=["rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending",
"carbon_ending"]
)
model_api = ModelApi(portfolio_id=body.portfolio_id, timestamp=created_at)
all_predictions = {
"sap_change_predictions": pd.DataFrame(),
"heat_demand_predictions": pd.DataFrame(),
"carbon_change_predictions": pd.DataFrame()
}
to_loop_over = range(0, recommendations_scoring_data.shape[0], SCORING_BATCH_SIZE)
for chunk in tqdm(to_loop_over, total=len(to_loop_over)):
predictions_dict = model_api.predict_all(
df=recommendations_scoring_data.iloc[chunk:chunk + SCORING_BATCH_SIZE],
bucket=get_settings().DATA_BUCKET,
prediction_buckets={
"sap_change_predictions": get_settings().SAP_PREDICTIONS_BUCKET,
"heat_demand_predictions": get_settings().HEAT_PREDICTIONS_BUCKET,
"carbon_change_predictions": get_settings().CARBON_PREDICTIONS_BUCKET
}
)
# Append the predictions to the predictions dictionary
for key, scored in predictions_dict.items():
all_predictions[key] = pd.concat([all_predictions[key], scored])
# We now produce a table of results for the mds report
# TODO: TEMP
for p in plan_input:
if p["uprn"]:
p["uprn"] = str(int(float(p["uprn"])))
results = []
for p in input_properties:
measures = p.measures
property_recommendations = [r['type'] for r in representative_recommendations[p.id]]
# TODO: Check high heat retention storage heaters - looks like it's excluded controls!
sap_prediction = all_predictions["sap_change_predictions"][
all_predictions["sap_change_predictions"]["property_id"] == str(p.id)
]
heat_demand_prediction = all_predictions["heat_demand_predictions"][
all_predictions["heat_demand_predictions"]["property_id"] == str(p.id)
]
carbon_prediction = all_predictions["carbon_change_predictions"][
all_predictions["carbon_change_predictions"]["property_id"] == str(p.id)
]
# Get a before and after for SAP, heat demand, CO2 and also calculate energy bill and energy savings
sap_before = int(p.data["current-energy-efficiency"])
sap_after = sap_prediction["predictions"].values[0] if measures else sap_before
epc_before = p.data["current-energy-rating"]
epc_after = sap_to_epc(sap_after) if measures else epc_before
heat_demand_before = p.data["energy-consumption-current"]
heat_demand_after = heat_demand_prediction["predictions"].values[0] if measures else heat_demand_before
carbon_before = p.data["co2-emissions-current"]
carbon_after = carbon_prediction["predictions"].values[0] if measures else carbon_before
# Estimate bill savings
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
current_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
epc_energy_consumption=heat_demand_before * p.floor_area,
current_epc_rating=epc_before,
)
# TODO: This isn't quite right as this is based on EVERY possible measure, not just the ones that are
# actually implemented
expected_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered(
epc_energy_consumption=heat_demand_after * p.floor_area,
current_epc_rating=epc_before,
)
# TODO: We should determine if the home is gas & electricity or just electricity
current_energy_bill = AnnualBillSavings.calculate_annual_bill(
current_adjusted_energy,
)
expected_energy_bill = AnnualBillSavings.calculate_annual_bill(
expected_adjusted_energy,
)
bill_savings = current_energy_bill - expected_energy_bill
energy_savings = current_adjusted_energy - expected_adjusted_energy
config = [c for c in plan_input if c["uprn"] == str(p.uprn)]
if not config:
config = {"address": None, "postcode": None}
else:
config = config[0]
to_append = {
"config_address": config["address"],
"config_postcode": config["postcode"],
"address": p.address,
"postcode": p.postcode,
"measures": measures,
"property_recommendations": property_recommendations,
"year_of_epc": p.data['lodgement-date'],
"sap_before": sap_before,
"sap_after": sap_after,
"epc_before": epc_before,
"epc_after": epc_after,
"heat_demand_before": heat_demand_before,
"heat_demand_after": heat_demand_after,
"carbon_before": carbon_before,
"carbon_after": carbon_after,
"bill_savings": bill_savings,
"energy_savings": energy_savings,
}
results.append(to_append)
results = pd.DataFrame(results)
results["sap_uplift"] = results["sap_after"] - results["sap_before"]
except IntegrityError:
logger.error("Database integrity error occurred", exc_info=True)
session.rollback()
return Response(status_code=500, content="Database integrity error.")
except OperationalError:
logger.error("Database operational error occurred", exc_info=True)
session.rollback()
return Response(status_code=500, content="Database operational error.")
except ValueError:
logger.error("Value error - possibly due to malformed data", exc_info=True)
session.rollback()
return Response(status_code=400, content="Bad request: malformed data.")
except Exception as e: # General exception handling
logger.error(f"An error occurred: {e}")
session.rollback()
return Response(status_code=500, content="An unexpected error occurred.")
finally:
session.close()

View file

@ -43,15 +43,20 @@ class AnnualBillSavings:
return cls.ELECTRICITY_PRICE_CAP * kwh
@classmethod
def calculate_annual_bill(cls, kwh):
def calculate_annual_bill(cls, kwh, mains_gas=True):
"""
This method will estimate the total annual bill for a property
It assumed gas & electricity are used
:param kwh: The total kwh consumption
:param mains_gas: Whether the property uses mains gas
:return: An estimate for annual bill
"""
return cls.PRICE_FACTOR * kwh + (cls.DAILY_STANDARD_CHARGE_GAS + cls.DAILY_STANDARD_CHARGE_ELECTRICITY * 365)
if mains_gas:
return cls.PRICE_FACTOR * kwh + (
cls.DAILY_STANDARD_CHARGE_GAS + cls.DAILY_STANDARD_CHARGE_ELECTRICITY * 365)
return cls.ELECTRICITY_PRICE_CAP * kwh + (cls.DAILY_STANDARD_CHARGE_ELECTRICITY * 365)
@classmethod
def adjust_energy_to_metered(cls, epc_energy_consumption, current_epc_rating):

View file

@ -150,6 +150,10 @@ class PropertyValuation:
]
# Additional sources:
# https://superhomes.org.uk/wp-content/uploads/2024/05/The-Impact-of-Retrofit-on-Residential-Property-Market
# -Values-7-rotated-1.pdf
EPC_BANDS = ["G", "F", "E", "D", "C", "B", "A"]
@classmethod

View file

@ -99,6 +99,13 @@ class ModelApi:
# depending on how you want to handle errors in your application
return None
@staticmethod
def extract_phase(recommendation_id):
if 'phase=' in recommendation_id:
return int(recommendation_id.split('phase=')[1][0])
else:
return None
def predict_all(self, df, bucket, prediction_buckets) -> dict:
"""
@ -135,9 +142,11 @@ class ModelApi:
# To grab the phase, we pull the integer after "phase=" in the recommendation_id. We can do this with a
# string split on phase= and then grab the second element of the resulting list. We could also use a
# regular expression to do this but we use the string split method here, for safety.
predictions_df['phase'] = predictions_df['recommendation_id'].str.split('phase=').str[1].str[0]
# We may not always have a phase to split on, so we need to handle this case. We can do this by using the
# str[1] method to grab the second element of the resulting list. We then grab the first character of this
# string to get the phase. We then convert this to an integer.
# Convert back to int
predictions_df['phase'] = predictions_df['phase'].astype(int)
predictions_df['phase'] = predictions_df['recommendation_id'].apply(self.extract_phase)
predictions[model_prefix] = predictions_df

View file

@ -0,0 +1,271 @@
import time
import pandas as pd
from utils.s3 import read_excel_from_s3
from backend.SearchEpc import SearchEpc
from dotenv import load_dotenv
import os
from tqdm import tqdm
from utils.s3 import save_csv_to_s3
# Read in the .env file in backend
load_dotenv(dotenv_path="backend/.env")
EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN")
# Stored in my notes
ORDNANCE_SURVEY_API_KEY = ""
PORTFOLIO_ID = 80
USER_ID = 8
def extract_mds_measures(config):
measures = []
if not pd.isnull(config["EWI (Trad Const)"]):
measures.append({"external_wall_insulation": "EWI (Trad Const)"})
if not pd.isnull(config["EWI (Non Trad Const)"]):
measures.append({"external_wall_insulation": "EWI (Non Trad Const)"})
if not pd.isnull(config["CWI"]):
measures.append({"cavity_wall_insulation": "CWI"})
if not pd.isnull(config["LI"]):
measures.append({"loft_insulation": "LI"})
if not pd.isnull(config["Party Wall Insu"]):
measures.append({"party_wall_insulation": "Party Wall Insu"})
if not pd.isnull(config["IWI (POA - Prov Sum Only)"]):
measures.append({"internal_wall_insulation": "IWI (POA - Prov Sum Only)"})
if not pd.isnull(config["U/F Insu (Manual install)"]):
measures.append({"suspended_floor_insulation": "U/F Insu (Manual install)"})
if not pd.isnull(config["U/F insu (Qbot)"]):
measures.append({"suspended_floor_insulation": "U/F insu (Qbot)"})
if not pd.isnull(config["Solid floor insl (Out of scope - Prov sum only)"]):
measures.append({"solid_floor_insulation": "Solid floor insl (Out of scope - Prov sum only)"})
if not pd.isnull(config["ASHP Htg"]):
measures.append({"air_source_heat_pump": "ASHP Htg"})
if not pd.isnull(config["GSHP Htg"]):
measures.append({"ground_source_heat_pump": "GSHP Htg"})
if not pd.isnull(config["Shared ground loops"]):
measures.append({"shared_ground_loops": "Shared ground loops"})
if not pd.isnull(config["Communal heat networks"]):
measures.append({"communal_heat_networks": "Communal heat networks"})
if not pd.isnull(config["District heating networks"]):
measures.append({"district_heating_networks": "District heating networks"})
if not pd.isnull(config["Elec Storage Htrs (Out of scope -Prov sum only)"]):
measures.append({"electric_storage_heaters": "Elec Storage Htrs (Out of scope -Prov sum only)"})
if not pd.isnull(config["Low Energy Bulbs"]):
measures.append({"low_energy_lighting": "Low Energy Bulbs"})
if not pd.isnull(config["Cyl Insulation"]):
measures.append({"cylinder_insulation": "Cyl Insulation"})
if not pd.isnull(config["Smart controls"]):
measures.append({"smart_controls": "Smart controls"})
if not pd.isnull(config["Zone controls"]):
measures.append({"zone_controls": "Zone controls"})
if not pd.isnull(config["Upgrade TRV's"]):
measures.append({"trvs": "Upgrade TRV's"})
if not pd.isnull(config["Solar PV"]):
measures.append({"solar_pv": "Solar PV"})
if not pd.isnull(config["Solar Thermal"]):
measures.append({"solar_thermal": "Solar Thermal"})
if not pd.isnull(config["Double Glazing (POA - Prov sum only)"]):
measures.append({"double_glazing": "Double Glazing (POA - Prov sum only)"})
if not pd.isnull(config["Draught Proofing"]):
measures.append({"draught_proofing": "Draught Proofing"})
if not pd.isnull(config["Ventilation upgrade"]):
measures.append({"mechanical_ventilation": "Ventilation upgrade"})
if not pd.isnull(config["Gas Boiler Replacement"]):
measures.append({"gas_boiler": "Gas Boiler Replacement"})
if not pd.isnull(config["Flat roof (Out of scope - prov sum only)"]):
measures.append({"flat_roof_insulation": "Flat roof (Out of scope - prov sum only)"})
if not pd.isnull(config["RIR (POA - Prov sum only)"]):
measures.append({"room_in_roof_insulation": "RIR (POA - Prov sum only)"})
if not pd.isnull(config["EV Charging"]):
measures.append({"ev_charging": "EV Charging"})
if not pd.isnull(config["Battery"]):
measures.append({"battery": "Battery"})
return measures
def parse_property_type(config):
# This should come from the ordnance survey api eventually
# array(['Detached', 'Semi-detached', 'Bungalow', 'Mid Terrace',
# 'End Terrace', 'Top Flat', 'Mid Flat',
# 'Low rise flat (1-2 storey)', nan], dtype=object)
if config["Address"] == "Flat Central Garage":
return {"property_type": "Bungalow", "built_form": "Mid-Terrace"}
if pd.isnull(config["Property Type"]):
return {"property_type": None, "built_form": None}
lookup = {
"Detached": {"property_type": "House", "built_form": "Detached"},
"Semi-detached": {"property_type": "House", "built_form": "Semi-detached"},
"Bungalow": {"property_type": "Bungalow", "built_form": "Detached"},
"Mid Terrace": {"property_type": "House", "built_form": "Mid-Terrace"},
"End Terrace": {"property_type": "House", "built_form": "End-Terrace"},
"Top Flat": {"property_type": "Flat", "built_form": None},
"Mid Flat": {"property_type": "Flat", "built_form": None},
"Low rise flat (1-2 storey)": {"property_type": "Flat", "built_form": None},
}
return lookup[config["Property Type"]]
def app():
"""
Create the initial asset list for the E.ON pilot
:return:
"""
raw_asset_list = read_excel_from_s3(
bucket_name="retrofit-datalake-dev",
file_key="customers/E.ON/sample SHDF Information MDS Template Vr3.0.xlsx",
header_row=11,
drop_all_na=False
)
# Keep just the columns we need
raw_asset_list_base = raw_asset_list[
[
"Address", "Postcode", "No Bedrooms"
]
].copy().rename(
columns={
"Address": "address",
"Postcode": "postcode",
"No Bedrooms": "n_bedrooms"
}
)
# For each property, retrieve UPRN with from the Ordnance Survey API. To do this, I have created a free
# trial with Ordnance Survey with my personal account as a temporary solution.
# Let's just pull the full EPC data for this
asset_list_with_uprn = []
for row, property_meta in tqdm(raw_asset_list_base.iterrows(), total=raw_asset_list_base.shape[0]):
if row <= 104:
continue
time.sleep(1.1)
searcher = SearchEpc(
address1=property_meta["address"],
postcode=property_meta["postcode"],
auth_token=EPC_AUTH_TOKEN,
os_api_key=ORDNANCE_SURVEY_API_KEY,
full_address=", ".join([property_meta["address"], property_meta["postcode"]])
)
# Let's just find the UPRN
searcher.ordnance_survey_client.get_places_api()
uprn = searcher.ordnance_survey_client.most_relevant_result["UPRN"]
# searcher.find_property(skip_os=False)
asset_list_with_uprn.append(
{
**property_meta,
"uprn": uprn,
}
)
# Store this as a backup
# import pandas as pd
# asset_list_with_uprn_df = pd.DataFrame(asset_list_with_uprn)
# asset_list_with_uprn_df.to_csv("eon_asset_list_with_uprn.csv", index=False)
# Read in
# asset_list_with_uprn = pd.read_csv("eon_asset_list_with_uprn.csv").to_dict(orient="records")
# Store the asset list and create the portfolio payload
asset_list_with_uprn_df = pd.DataFrame(asset_list_with_uprn)
asset_list_with_uprn_df["uprn"] = asset_list_with_uprn_df["uprn"].astype(str).astype(int)
# We now determine which measures we need for each property
finalised_asset_list = []
for i, config in raw_asset_list.iterrows():
asset_config = asset_list_with_uprn_df[
(asset_list_with_uprn_df["address"] == config["Address"]) &
(asset_list_with_uprn_df["postcode"] == config["Postcode"])
]
if asset_config.shape[0] != 1:
raise ValueError("Could not find a unique match for the property")
measures = extract_mds_measures(config)
# Get the property type
pt = parse_property_type(config)
if config["Address"] in [
"28 Hermitage Lane",
"35a High Street",
"35b High Street",
"Flat Over 20 Holborough Road",
"Flat above 7 Malling Road"
]:
print(config["Address"])
uprn = None
else:
uprn = asset_config["uprn"].values[0]
finalised_asset_list.append(
{
"address": config["Address"],
"postcode": config["Postcode"],
"uprn": uprn,
"n_bedrooms": config["No Bedrooms"],
"measures": measures,
**pt
}
)
finalised_asset_list = pd.DataFrame(finalised_asset_list)
# Store the asset list in s3
filename = f"{USER_ID}/{PORTFOLIO_ID}/pilot.csv"
save_csv_to_s3(
dataframe=finalised_asset_list,
bucket_name="retrofit-plan-inputs-dev",
file_name=filename
)
# EPC C portoflio
body = {
"portfolio_id": str(PORTFOLIO_ID),
"housing_type": "Social",
"goal": "Increase EPC",
"goal_value": "C",
"trigger_file_path": filename,
"already_installed_file_path": "",
"patches_file_path": "",
"non_invasive_recommendations_file_path": "",
"budget": None,
}
print(body)

View file

@ -20,27 +20,39 @@ def aggregate_matches(matching_lookup, company_ownership, properties):
properties[["UPRN", "LOCAL_AUTHORITY_LABEL"]], how="left", on="UPRN"
)
counts = (
df.groupby(["Company Registration No. (1)", "Proprietor Name (1)", "LOCAL_AUTHORITY_LABEL"])["UPRN"]
df.groupby(["Company Registration No. (1)", "LOCAL_AUTHORITY_LABEL"])["UPRN"]
.count()
.reset_index(name="number_of_properties")
)
counts = counts.sort_values("number_of_properties", ascending=False)
pivot_counts = counts.pivot_table(
index=["Company Registration No. (1)", "Proprietor Name (1)"], # Rows: companies and proprietors
index=["Company Registration No. (1)"], # Rows: companies and proprietors
columns="LOCAL_AUTHORITY_LABEL", # Columns: each local authority
values="number_of_properties", # The counts of properties
fill_value=0 # Fill missing values with 0 (where there are no properties owned)
).reset_index()
total_counts = (
df.groupby(["Company Registration No. (1)", "Proprietor Name (1)"])["UPRN"]
df.groupby(["Company Registration No. (1)"])["UPRN"]
.count()
.reset_index(name="total_number_of_properties")
)
# We have cases where the same company registration number results in the same company name, so we produce a best
# name per company registration number
best_names = (
df.groupby(["Company Registration No. (1)"])["Proprietor Name (1)"]
.first()
.reset_index()
)
total_counts = best_names.merge(
total_counts, how="left", on=["Company Registration No. (1)"]
)
pivot_counts = pivot_counts.merge(
total_counts, how="left", on=["Company Registration No. (1)", "Proprietor Name (1)"]
total_counts, how="left", on=["Company Registration No. (1)"]
)
pivot_counts = pivot_counts.sort_values("total_number_of_properties", ascending=False)
@ -187,7 +199,45 @@ def remove_duplicate_matches(matching_lookup, properties, company_ownership):
if not to_drop.empty:
merged = pd.merge(matching_lookup, to_drop, on=['UPRN', 'Title Number'], how='left', indicator=True)
merged[merged['_merge'] == 'left_only'].drop(columns=['_merge'])
merged = merged[merged['_merge'] == 'left_only'].drop(columns=['_merge'])
return merged
return matching_lookup
def remove_duplicate_uprn_matches(matching_lookup, properties, company_ownership):
dupe_uprns = matching_lookup[matching_lookup["UPRN"].duplicated()]["UPRN"].unique().tolist()
to_drop = []
for dupe_uprn in dupe_uprns:
dupe_data = matching_lookup[matching_lookup["UPRN"] == dupe_uprn].copy()
matched_addresses = dupe_data.merge(
properties[["UPRN", "ADDRESS"]].rename(columns={"ADDRESS": "epc_address"}),
how="left", on="UPRN"
).merge(
company_ownership[["Title Number", "Property Address"]],
how="left", on="Title Number"
)
# We perform levenstein to get the best match
best_match = levenstein_match(
matching_string=matched_addresses["Property Address"].values[0],
df=matched_addresses,
address_col="epc_address"
)
matches_to_drop = matched_addresses[
~matched_addresses["Title Number"].isin(best_match["Title Number"].values)
]
to_drop.append(
matches_to_drop[["UPRN", "Title Number"]].copy()
)
to_drop = pd.concat(to_drop)
if not to_drop.empty:
merged = pd.merge(matching_lookup, to_drop, on=['UPRN', 'Title Number'], how='left', indicator=True)
merged = merged[merged['_merge'] == 'left_only'].drop(columns=['_merge'])
return merged
@ -254,6 +304,9 @@ def app():
properties = properties[
properties["TENURE"].isin(["rental (private)", "Rented (private)", "owner-occupied", "Owner-occupied"])
]
# We have some duplicated on UPRN
# Take the newest UPRN
properties = properties.sort_values("LODGEMENT_DATE", ascending=False).drop_duplicates("UPRN")
# Remove entries where the address begins with the term "land adjoining", or other records that don't reference the
# the property itself
@ -354,43 +407,75 @@ def app():
freehold_matching_lookup = pd.DataFrame(freehold_matching_lookup)
leasehold_matching_lookup = pd.DataFrame(leasehold_matching_lookup)
shared_leasehold_match = pd.concat(shared_leasehold_match)
shared_freehold_match = pd.concat(shared_freehold_match)
# freehold_matching_lookup.to_excel("freehold_matching_lookup_new.xlsx")
# leasehold_matching_lookup.to_excel("leasehold_matching_lookup_new.xlsx")
# shared_leasehold_match.to_excel("shared_leasehold_match_new.xlsx")
# shared_freehold_match.to_excel("shared_freehold_match_new.xlsx")
# The approximate matches aren't very good
freehold_matching_lookup = freehold_matching_lookup[freehold_matching_lookup["match_type"] == "exact"]
leasehold_matching_lookup = leasehold_matching_lookup[leasehold_matching_lookup["match_type"] == "exact"]
# There are some cases where we have duplicates
freehold_matching_lookup = remove_duplicate_matches(freehold_matching_lookup, properties, company_ownership)
leasehold_matching_lookup = remove_duplicate_matches(leasehold_matching_lookup, properties, company_ownership)
# Combine
combined_matching_lookup = pd.concat([freehold_matching_lookup, leasehold_matching_lookup])
# Remove duplicates
combined_matching_lookup = remove_duplicate_matches(combined_matching_lookup, properties, company_ownership)
# We also have duplicates at a UPRN level
combined_matching_lookup = remove_duplicate_uprn_matches(combined_matching_lookup, properties, company_ownership)
matched_addresses = freehold_matching_lookup.merge(
properties[["UPRN", "ADDRESS"]].rename(columns={"ADDRESS": "epc_address"}),
# There are some cases where we have duplicates
# freehold_matching_lookup = remove_duplicate_matches(freehold_matching_lookup, properties, company_ownership)
# leasehold_matching_lookup = remove_duplicate_matches(leasehold_matching_lookup, properties, company_ownership)
matched_addresses = combined_matching_lookup.merge(
properties[["UPRN", "ADDRESS", "CURRENT_ENERGY_EFFICIENCY", "CURRENT_ENERGY_RATING"]].rename(
columns={"ADDRESS": "epc_address"}),
how="left", on="UPRN"
).merge(
company_ownership[["Title Number", "Property Address"]],
company_ownership[["Title Number", "Property Address", "Company Registration No. (1)", "Proprietor Name (1)"]],
how="left", on="Title Number"
)
# shared_freehold_match = pd.DataFrame(shared_freehold_match)
# Strore these files
freehold_matching_lookup.to_excel("freehold_matching_lookup.xlsx")
leasehold_matching_lookup.to_excel("leasehold_matching_lookup.xlsx")
shared_leasehold_match.to_excel("shared_leasehold_match.xlsx")
# freehold_matching_lookup.to_excel("freehold_matching_lookup.xlsx")
# leasehold_matching_lookup.to_excel("leasehold_matching_lookup.xlsx")
# shared_leasehold_match.to_excel("shared_leasehold_match.xlsx")
# shared_freehold_match.to_excel("shared_freehold_match.xlsx")
# read the files
# freehold_matching_lookup = pd.read_excel("freehold_matching_lookup.xlsx")
# leasehold_matching_lookup = pd.read_excel("leasehold_matching_lookup.xlsx")
# shared_leasehold_match = pd.read_excel("shared_leasehold_match.xlsx")
freehold_aggregate = aggregate_matches(freehold_matching_lookup, company_ownership, properties)
leasehold_aggregate = aggregate_matches(leasehold_matching_lookup, company_ownership, properties)
combined_aggregate = aggregate_matches(
pd.concat([freehold_matching_lookup, leasehold_matching_lookup]), company_ownership, properties
combined_matching_lookup, company_ownership, properties
)
df = pd.concat([freehold_matching_lookup, leasehold_matching_lookup])
investment_20m = combined_aggregate[combined_aggregate["cumulative_value"] <= 20_500_000]
investment_50m = combined_aggregate[combined_aggregate["cumulative_value"] <= 51_000_000]
properties["WALLS_DESCRIPTION"].value_counts(normalize=True)
investment_20m_properties = matched_addresses[
matched_addresses["Company Registration No. (1)"].isin(investment_20m["Company Registration No. (1)"])
]
investment_50m_properties = matched_addresses[
matched_addresses["Company Registration No. (1)"].isin(investment_50m["Company Registration No. (1)"])
]
portfolio_epc_data_50m = properties[properties["UPRN"].isin(investment_50m_properties["UPRN"])]
portfolio_epc_data_20m = properties[properties["UPRN"].isin(investment_20m_properties["UPRN"])]
investment_20m_properties.to_excel("investment_20m_properties 28th May.xlsx", index=False)
investment_50m_properties.to_excel("investment_50m_properties 28th May.xlsx", index=False)
# Store the EPC data
portfolio_epc_data_50m.to_excel("portfolio_epc_data_50m 28th May.xlsx", index=False)
portfolio_epc_data_20m.to_excel("portfolio_epc_data_20m 28th May.xlsx", index=False)
def company_aggregation():

View file

@ -2,24 +2,27 @@ from tqdm import tqdm
import os
import pandas as pd
import msgpack
import inspect
from etl.epc_clean.EpcClean import EpcClean
from etl.epc.settings import EARLIEST_EPC_DATE
from pathlib import Path
from utils.s3 import save_data_to_s3
src_file_path = inspect.getfile(lambda: None)
LAND_REGISTRY_PATHS = [
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-monthly-update-new-version.csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2022 (1).csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2021.csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2020.csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2019.csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2018.csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2017-part1.csv",
os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2017-part2.csv",
os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-monthly-update-new-version.csv",
os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2022 (1).csv",
os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2021.csv",
os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2020.csv",
os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2019.csv",
os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2018.csv",
os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2017-part1.csv",
os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2017-part2.csv",
]
EPC_DIRECTORY = Path(__file__).parent / "local_data" / "all-domestic-certificates"
EPC_DIRECTORY = Path(src_file_path).parent / "local_data" / "all-domestic-certificates"
ENVIRONMENT = os.getenv("ENVIRONMENT", "dev")

View file

@ -116,7 +116,14 @@ class HotWaterAttributes(Definitions):
"instantaneous at "
"point of use, "
"waste water heat "
"recovery"
"recovery",
"ogçör brif system, adfer gwres d+¦r gwastraff": "from main system, waste water heat recovery",
"twymwr tanddwr, tarriff safonol, adfer gwres d+¦r gwastraff": "electric immersion, standard tariff, waste "
"water heat recovery",
"ogçör brif system, dim thermostat ar y silindr, adfer gwres nwyon ffliw": "from main system, no cylinder "
"thermostat, flue gas heat recovery",
"ogçör brif system, gydag ynnigçör haul, adfer gwres nwyon ffliw": "from main system, plus solar, flue gas "
"heat recovery",
}
def __init__(self, description: str):

View file

@ -56,6 +56,9 @@ class MainHeatAttributes(Definitions):
"bwyler a gwres dan y llawr, lpg": "boiler and underfloor heating, lpg",
"bwyler a gwres dan y llawr, trydan": "boiler and underfloor heating, electric",
"boiler and radiators, nwy prif gyflenwad, mains gas": "boiler and radiators, mains gas",
"bwyler a rheiddiaduron, olew, st+¦r wresogyddion trydan": "boiler and radiators, oil, electric storage "
"heaters",
"pwmp gwres sygçön tarddu yn yr awyr, awyr gynnes, trydan": "air source heat pump, warm air, electric",
}
REMAP = {

View file

@ -111,7 +111,8 @@ class MainheatControlAttributes(Definitions):
't+-ól un gyfradd, trvs': 'single rate heating, trvs',
't+ól un gyfradd, rhaglennydd a trvs': 'single rate heating, programmer, trvs',
't+ól un gyfradd, trvs': 'single rate heating, trvs',
'trvs a falf osgoi': 'trvs and bypass'
'trvs a falf osgoi': 'trvs and bypass',
'rheolaeth celect': 'celect-type control',
}
def __init__(self, description: str):

View file

@ -30,6 +30,7 @@ class WindowAttributes(Definitions):
"gwydrau eilaidd llawn": "full secondary glazing",
"gwydrau eilaidd mwyaf": "mostly secondary glazing",
"gwydrau eilaidd rhannol": "partial secondary glazing",
"gwydrau lluosog ym mhobman": "multiple glazing throughout",
}
def __init__(self, description: str):

View file

@ -24,7 +24,7 @@ def extract_thermal_transmittance(result: dict, description: str) -> Tuple[
if match:
result['thermal_transmittance'] = float(match.group(1))
result['thermal_transmittance_unit'] = match.group(3)
result['thermal_transmittance_unit'] = "w/m-¦k" # We standardise the unit
# Remove the match from the description
description = re.sub(THERMAL_TRANSMITTANCE_STR, "", description)
else:

View file

@ -81,6 +81,8 @@ resource "aws_db_instance" "default" {
# We will look to change this in the future but as we are pre-MVP at the time of setting this, we don't
# have major security demand and don't want to set this up now
publicly_accessible = true
# Specify the CA certificate with the default RDS CA certificate
ca_cert_identifier = "rds-ca-rsa2048-g1"
}
# Set up the bucket that recieve the csv uploads of epc to be retrofit
@ -147,7 +149,7 @@ module "route53" {
source = "./modules/route53"
domain_name = var.domain_name
api_url_prefix = var.api_url_prefix
providers = {
providers = {
aws.aws_use1 = aws.aws_use1
}
}

View file

@ -626,12 +626,10 @@ class Costs:
preliminaries_rate = self.EWI_SCAFFOLDING_PRELIMINARIES
else:
preliminaries_rate = self.EWI_NO_SCAFFOLDING_PRELIMINARIES
elif self.property.data["property-type"] == "Maisonette":
elif self.property.data["property-type"] in ["Maisonette", "Flat"]:
preliminaries_rate = self.EWI_SCAFFOLDING_PRELIMINARIES
elif self.property.data["property-type"] == "Bungalow":
preliminaries_rate = self.EWI_NO_SCAFFOLDING_PRELIMINARIES
else:
raise ValueError("Unsupported property type - haven't handled flats")
demolition_data = [x for x in non_insulation_materials if x["type"] == "ewi_wall_demolition"]
preparation_data = [x for x in non_insulation_materials if x["type"] == "ewi_wall_preparation"]

View file

@ -103,7 +103,7 @@ class HeatingRecommender:
return
def recommend_air_source_heat_pump(self, phase, has_cavity_or_loft_recommendations):
def recommend_air_source_heat_pump(self, phase, has_cavity_or_loft_recommendations, _return=False):
"""
This method will implement the recommendation for an air source heat pump
This is ultimately an overhaul to the heating system and so is recommended as an alternative to other
@ -200,6 +200,8 @@ class HeatingRecommender:
**ashp_costs
}
if _return:
return [ashp_recommendation]
self.heating_recommendations.append(ashp_recommendation)
@staticmethod
@ -312,7 +314,7 @@ class HeatingRecommender:
return output
def recommend_hhr_storage_heaters(self, phase, system_change, heating_controls_only):
def recommend_hhr_storage_heaters(self, phase, system_change, heating_controls_only, _return=False):
"""
We will recommend upgrading to a high heat retention storage system, if the current system is not already
high heat retention storage
@ -321,6 +323,8 @@ class HeatingRecommender:
:param system_change: Indicates if we are recommending a different type of heating system, compared to the
current system
:param heating_controls_only: Indicates if we should include a recommendation for just heating controls
:param _return: Indicates if we should return the recommendations, rather than appending them to the
recommendations list
:return:
"""
@ -374,6 +378,8 @@ class HeatingRecommender:
heating_controls_only=heating_controls_only,
system_change=system_change
)
if _return:
return recommendations
self.heating_recommendations.extend(recommendations)

173
recommendations/Mds.py Normal file
View file

@ -0,0 +1,173 @@
from backend.Property import Property
from recommendations.FloorRecommendations import FloorRecommendations
from recommendations.WallRecommendations import WallRecommendations
from recommendations.RoofRecommendations import RoofRecommendations
from recommendations.VentilationRecommendations import VentilationRecommendations
from recommendations.FireplaceRecommendations import FireplaceRecommendations
from recommendations.LightingRecommendations import LightingRecommendations
from recommendations.SolarPvRecommendations import SolarPvRecommendations
from recommendations.WindowsRecommendations import WindowsRecommendations
from recommendations.HeatingRecommender import HeatingRecommender
from recommendations.HotwaterRecommendations import HotwaterRecommendations
from recommendations.SecondaryHeating import SecondaryHeating
from recommendations.Recommendations import Recommendations
class Mds:
"""
Handles the contruction of the MDS report
"""
def __init__(self, property_instance: Property, materials):
self.property_instance = property_instance
self.floor_recommender = FloorRecommendations(property_instance=property_instance, materials=materials)
self.wall_recommender = WallRecommendations(property_instance=property_instance, materials=materials)
self.roof_recommender = RoofRecommendations(property_instance=property_instance, materials=materials)
self.ventilation_recomender = VentilationRecommendations(
property_instance=property_instance, materials=materials
)
self.fireplace_recommender = FireplaceRecommendations(property_instance=property_instance)
self.lighting_recommender = LightingRecommendations(property_instance=property_instance, materials=materials)
self.windows_recommender = WindowsRecommendations(property_instance=property_instance, materials=materials)
self.solar_recommender = SolarPvRecommendations(property_instance=property_instance)
self.heating_recommender = HeatingRecommender(property_instance=property_instance)
self.hotwater_recommender = HotwaterRecommendations(property_instance=property_instance)
self.secondary_heating_recommender = SecondaryHeating(property_instance=property_instance)
def build(self):
if self.property_instance.measures is None:
raise NotImplementedError("No measures in the property - implement me")
measures = self.property_instance.measures
measure_config_list = [list(m.keys())[0] for m in measures]
not_implemented_measures = [
"party_wall_insulation",
"ground_source_heat_pump",
"shared_ground_loops",
"communal_heat_networks",
"district_heating_networks",
"solar_thermal",
"draught_proofing",
"ev_charging",
"battery",
]
# Check if we have a not implemented measure
if any([m in not_implemented_measures for m in measure_config_list]):
raise NotImplementedError("Not implemented measure in the property - implement me")
mds_recommendations = []
errors = []
# TODO: Could use a decarator to reduce the boilerplate code - insert_recommendation_id and then the append
if "external_wall_insulation" in measure_config_list:
recs = self.wall_recommender.mds_recommend_ewi(phase=0)
if not recs:
raise Exception("No recommendations for external wall insulation")
recs = self.insert_recommendation_id(recs, measures, "external_wall_insulation")
mds_recommendations.append(recs)
if "cavity_wall_insulation" in measure_config_list:
recs = self.wall_recommender.mds_recommend_cavity_wall_insulation(phase=0)
recs = self.insert_recommendation_id(recs, measures, "cavity_wall_insulation")
mds_recommendations.append(recs)
if "loft_insulation" in measure_config_list:
# Check if the roof is suitable for loft insulation
if self.property_instance.roof['is_roof_room']:
errors.append("Roof is a room")
else:
recs = self.roof_recommender.mds_loft_insulation(phase=0)
if not recs:
raise Exception("No recommendations for loft insulation")
recs = self.insert_recommendation_id(recs, measures, "loft_insulation")
mds_recommendations.append(recs)
if "internal_wall_insulation" in measure_config_list:
raise Exception("check me out 4")
self.wall_recommender.recommend(phase=0)
if "suspended_floor_insulation" in measure_config_list:
raise Exception("check me out 5")
self.floor_recommender.recommend(phase=0)
if "solid_floor_insulation" in measure_config_list:
raise Exception("check me out 6")
self.floor_recommender.recommend(phase=0)
if "air_source_heat_pump" in measure_config_list:
recs = self.heating_recommender.recommend_air_source_heat_pump(
phase=0, has_cavity_or_loft_recommendations=False, _return=True
)
recs = self.insert_recommendation_id(recs, measures, "air_source_heat_pump")
mds_recommendations.append(recs)
if "electric_storage_heaters" in measure_config_list:
recs = self.heating_recommender.recommend_hhr_storage_heaters(
phase=0, system_change=True, heating_controls_only=False, _return=True
)
recs = self.insert_recommendation_id(recs, measures, "electric_storage_heaters")
mds_recommendations.append(recs)
if "low_energy_lighting" in measure_config_list:
raise Exception("check me out 9")
self.lighting_recommender.recommend(phase=0)
if "cylinder_insulation" in measure_config_list:
raise Exception("check me out 10")
self.hotwater_recommender.recommend(phase=0)
if "smart_controls" in measure_config_list:
raise Exception("check me out 11")
self.heating_recommender.recommend(phase=0)
if "zone_controls" in measure_config_list:
raise Exception("check me out 12")
self.heating_recommender.recommend(phase=0)
if "trvs" in measure_config_list:
raise Exception("check me out 13")
self.heating_recommender.recommend(phase=0)
if "solar_pv" in measure_config_list:
recs = self.solar_recommender.mds_recommend(phase=0, solar_pv_percentage=0.5)
recs = self.insert_recommendation_id(recs, measures, "solar_pv")
mds_recommendations.append(recs)
if "double_glazing" in measure_config_list:
raise Exception("check me out 15")
self.windows_recommender.recommend(phase=0)
if "mechanical_ventilation" in measure_config_list:
raise Exception("check me out 16")
self.ventilation_recomender.recommend(phase=0)
if "gas_boiler" in measure_config_list:
raise Exception("check me out 17")
self.heating_recommender.recommend(phase=0)
if "flat_roof_insulation" in measure_config_list:
raise Exception("check me out 18")
self.roof_recommender.recommend(phase=0)
if "room_in_roof_insulation" in measure_config_list:
raise Exception("check me out 19")
self.roof_recommender.recommend(phase=0)
property_representative_recommendations = Recommendations.create_representative_recommendations(
mds_recommendations, non_invasive_recommendations=[]
)
return property_representative_recommendations, errors
@staticmethod
def insert_recommendation_id(recommendations, measures, measure_name):
# Insert the recommendation identifier into this recommendation
measure_config = [m for m in measures if measure_name in m][0]
for r in recommendations:
r["recommendation_id"] = list(measure_config.values())[0]
return recommendations

View file

@ -54,6 +54,26 @@ class RoofRecommendations:
]
]
def mds_loft_insulation(self, phase):
"""
For usages within the mds report
:param phase:
:return:
"""
self.recommendations = []
insulation_thickness = convert_thickness_to_numeric(
self.property.roof["insulation_thickness"],
self.property.roof["is_pitched"],
self.property.roof["is_flat"]
)
u_value = get_roof_u_value(**{**self.property.roof, "age_band": self.property.age_band})
self.recommend_roof_insulation(u_value, insulation_thickness, self.property.roof, phase)
return self.recommendations
def recommend(self, phase):
if self.property.roof["has_dwelling_above"]:
@ -210,6 +230,7 @@ class RoofRecommendations:
already_installed = "loft_insulation" in self.property.already_installed
if already_installed:
cost_result = override_costs(cost_result)
new_thickness = insulation_thickness + material["depth"]
elif material["type"] == "flat_roof_insulation":
cost_result = self.costs.flat_roof_insulation(
floor_area=self.property.insulation_floor_area,
@ -219,6 +240,7 @@ class RoofRecommendations:
already_installed = "flat_roof_insulation" in self.property.already_installed
if already_installed:
cost_result = override_costs(cost_result)
new_thickness = None
else:
raise ValueError("Invalid material type")
@ -239,6 +261,7 @@ class RoofRecommendations:
"new_u_value": new_u_value,
"sap_points": None,
"already_installed": already_installed,
"new_thickness": new_thickness,
**cost_result
}
)

View file

@ -35,6 +35,46 @@ class SolarPvRecommendations:
return trimmed_list
def mds_recommend(self, phase=None, solar_pv_percentage=0.5):
# For specific usage within the mds report
solar_pv_roof_area = self.property.get_solar_pv_roof_area(solar_pv_percentage)
number_solar_panels = np.floor(solar_pv_roof_area / self.SOLAR_PANEL_AREA)
solar_panel_wattage = number_solar_panels * self.SOLAR_PANEL_WATTAGE
solar_panel_wattage = np.clip(
a=solar_panel_wattage, a_min=self.MIN_SYSTEM_WATTAGE, a_max=self.MAX_SYSTEM_WATTAGE
)
# We now have a property which is potentially suitable for solar PV
roof_coverage_percent = round(solar_pv_percentage * 100)
# Given the wattage, we estimate the cost of the solar PV system. This is based on the MCS database
# of solar PV installations
cost_result = self.costs.solar_pv(wattage=solar_panel_wattage, has_battery=False)
kw = np.floor(solar_panel_wattage / 100) / 10
description = (f"Install a {kw} kilowatt-peak (kWp) solar photovoltaic (PV) p"
f"anel system on {round(roof_coverage_percent)}% the roof.")
return [
{
"phase": phase,
"parts": [],
"type": "solar_pv",
"description": description,
"starting_u_value": None,
"new_u_value": None,
"sap_points": None,
"already_installed": False,
**cost_result,
# This is required for simulating the SAP impact. solar_pv_percentage is between 0 & 1 so we scale
# back up here
"photo_supply": roof_coverage_percent,
"has_battery": False
}
]
def recommend(self, phase):
"""
We check if a property is potentially suitable for solar PV based on the following criteria:

View file

@ -6,9 +6,10 @@ import pandas as pd
from datatypes.enums import QuantityUnits
from backend.Property import Property
from BaseUtility import Definitions
from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes
from recommendations.recommendation_utils import (
r_value_per_mm_to_u_value, calculate_u_value_uplift, is_diminishing_returns, update_lowest_selected_u_value,
get_recommended_part, get_wall_u_value, override_costs
get_recommended_part, get_wall_u_value, override_costs, check_simulation_difference
)
from recommendations.config import PARTIALLY_FILLED_PERCENTAGE_ASSUMPTION
from recommendations.Costs import Costs
@ -53,6 +54,26 @@ class WallRecommendations(Definitions):
# threshold
NEW_BUILD_INSULATED = 0.75
# These are the ending descriptions we consider for walls with external insulation
EXTERNALLY_INSULATED_WALL_DESCRIPTIONS = {
"solid_brick": "Solid brick, with external insulation",
"cob": "Cob, with external insulation",
"system_built": "System built, with external insulation",
"granite_or_whinstone": 'Granite or whinstone, with external insulation',
"sandstone_or_limestone": 'Sandstone or limestone, with external insulation',
"timber_frame": "Timber frame, with external insulation"
}
# These are the ending descriptions we consider for walls with internal insulation
INTERNALLY_INSULATED_WALL_DESCRIPTIONS = {
"solid_brick": "Solid brick, with internal insulation",
"cob": "Cob, with internal insulation",
"system_built": "System built, with internal insulation",
"granite_or_whinstone": 'Granite or whinstone, with internal insulation',
"sandstone_or_limestone": 'Sandstone or limestone, with internal insulation',
"timber_frame": "Timber frame, with internal insulation"
}
def __init__(
self,
property_instance: Property,
@ -103,6 +124,47 @@ class WallRecommendations(Definitions):
return True
def mds_recommend_cavity_wall_insulation(self, phase=None):
# Function specifically for cavity wall insulation, for usage in the mds report
self.recommendations = []
insulation_thickness = self.property.walls["insulation_thickness"]
u_value = get_wall_u_value(
clean_description=self.property.walls["clean_description"],
age_band=self.property.age_band,
is_granite_or_whinstone=self.property.walls["is_granite_or_whinstone"],
is_sandstone_or_limestone=self.property.walls["is_sandstone_or_limestone"],
)
# Test filling cavity
self.find_cavity_insulation(u_value, insulation_thickness, phase)
return self.recommendations
def mds_recommend_ewi(self, phase=None):
# Function specifically for external wall insulation, for usage in the mds report
self.recommendations = []
u_value = self.property.walls["thermal_transmittance"]
if u_value is None:
u_value = get_wall_u_value(
clean_description=self.property.walls["clean_description"],
age_band=self.property.age_band,
is_granite_or_whinstone=self.property.walls["is_granite_or_whinstone"],
is_sandstone_or_limestone=self.property.walls["is_sandstone_or_limestone"],
)
# EWI
ewi_recommendations = self._find_insulation(
u_value=u_value,
insulation_materials=pd.DataFrame(self.external_wall_insulation_materials),
non_insulation_materials=self.external_wall_non_insulation_materials,
phase=phase
)
return ewi_recommendations
def recommend(self, phase=0):
# if building built after 1990 + we're able to identify U-value +
# U-value less than 0.18 and if in or close to a conversation area,
@ -238,6 +300,21 @@ class WallRecommendations(Definitions):
# updated the new u-value with the best possible our installers have
new_u_value = max(0.31, new_u_value)
wall_ending_config = WallAttributes("Cavity wall, filled cavity").process()
simulation_config = {}
if self.property.data["walls-energy-eff"] not in ["Good", "Very Good"]:
simulation_config = {
"walls_energy_eff_ending": "Good",
"walls_thermal_transmittance_ending": new_u_value
}
walls_simulation_config = check_simulation_difference(
new_config=wall_ending_config, old_config=self.property.walls, prefix="walls_"
)
simulation_config = {**simulation_config, **walls_simulation_config}
recommendations.append(
{
"phase": phase,
@ -255,12 +332,39 @@ class WallRecommendations(Definitions):
"new_u_value": new_u_value,
"sap_points": None,
"already_installed": already_installed,
"simulation_config": simulation_config,
**cost_result
}
)
self.recommendations = recommendations
def get_internal_external_wall_description(self, description_map, new_u_value):
if self.property.walls["is_solid_brick"]:
return description_map["solid_brick"]
if self.property.walls["is_cob"]:
return description_map["cob"]
if self.property.walls["is_system_built"]:
return description_map["system_built"]
if self.property.walls["is_granite_or_whinstone"]:
return description_map["granite_or_whinstone"]
if self.property.walls["is_sandstone_or_limestone"]:
return description_map["sandstone_or_limestone"]
if self.property.walls["is_timber_frame"]:
return description_map["timber_frame"]
if "Average thermal transmittance" in self.property.walls["clean_description"]:
if new_u_value is None:
raise ValueError("New u value is None")
return f'Average thermal transmittance {new_u_value} W/m-¦K'
raise NotImplementedError("Not implemented yet")
def _find_insulation(self, u_value, insulation_materials, non_insulation_materials, phase):
lowest_selected_u_value = None
@ -299,6 +403,10 @@ class WallRecommendations(Definitions):
if already_installed:
cost_result = override_costs(cost_result)
new_description = self.get_internal_external_wall_description(
self.INTERNALLY_INSULATED_WALL_DESCRIPTIONS, new_u_value
)
elif material["type"] == "external_wall_insulation":
cost_result = self.costs.external_wall_insulation(
wall_area=self.property.insulation_wall_area,
@ -308,9 +416,31 @@ class WallRecommendations(Definitions):
already_installed = "external_wall_insulation" in self.property.already_installed
if already_installed:
cost_result = override_costs(cost_result)
new_description = self.get_internal_external_wall_description(
self.EXTERNALLY_INSULATED_WALL_DESCRIPTIONS, new_u_value
)
else:
raise ValueError("Invalid material type")
wall_ending_config = WallAttributes(new_description).process()
simulation_config = {}
if self.property.data["walls-energy-eff"] not in ["Good", "Very Good"]:
simulation_config = {
"walls_energy_eff_ending": "Good"
}
walls_simulation_config = check_simulation_difference(
new_config=wall_ending_config, old_config=self.property.walls, prefix="walls_"
)
simulation_config = {
**walls_simulation_config,
**simulation_config,
"walls_thermal_transmittance_ending": new_u_value
}
recommendations.append(
{
"phase": phase,
@ -328,6 +458,7 @@ class WallRecommendations(Definitions):
"new_u_value": new_u_value,
"already_installed": already_installed,
"sap_points": None,
"simulation_config": simulation_config,
**cost_result
}
)

View file

@ -4,7 +4,7 @@ import numpy as np
from backend.Property import Property
from recommendations.Costs import Costs
from recommendation_utils import override_costs
from recommendations.recommendation_utils import override_costs
class WindowsRecommendations:

View file

@ -756,15 +756,18 @@ def calculate_cavity_age(newest_epc, older_epcs, cleaned):
return cavity_age
def check_simulation_difference(old_config, new_config):
def check_simulation_difference(old_config, new_config, prefix=""):
"""
Given two dictionaries, that describe the heating control configurations, this method will compare the two
and pick out the differences. These differences will be things that have been added and things that have been
removed. This will be used to determine how we should be updating the configuration in the simulation
:return:
"""
differences = {key + "_ending": new_config[key] for key in new_config if old_config[key] != new_config[key]}
differences = {}
for key in new_config:
if old_config[key] != new_config[key]:
new_key = prefix + key + "_ending" if key in ["is_assumed", "thermal_transmittance"] else key + "_ending"
differences[new_key] = new_config[key]
return differences

View file

@ -198,13 +198,14 @@ def read_pickle_from_s3(bucket_name, s3_file_name):
return data
def read_excel_from_s3(bucket_name, file_key, header_row):
def read_excel_from_s3(bucket_name, file_key, header_row, drop_all_na=True):
"""
Read an Excel file from an S3 bucket and return it as a pandas DataFrame.
:param bucket_name: Name of the S3 bucket.
:param file_key: Key of the file (including directory path within the bucket).
:param header_row: The row number to use as the header (0-indexed).
:param drop_all_na: Whether to drop columns where all values are NaN.
:return: A pandas DataFrame containing the data from the Excel file.
"""
@ -219,7 +220,8 @@ def read_excel_from_s3(bucket_name, file_key, header_row):
df = pd.read_excel(excel_buffer, header=header_row)
# Drop columns where all values are NaN
df.dropna(axis=1, how='all', inplace=True)
if drop_all_na:
df.dropna(axis=1, how='all', inplace=True)
# Reset index if the first column is just an index or entirely NaN
df.reset_index(drop=True, inplace=True)