diff --git a/backend/Property.py b/backend/Property.py index 2e6cbbb6..b7753413 100644 --- a/backend/Property.py +++ b/backend/Property.py @@ -61,7 +61,14 @@ class Property: n_bedrooms = None def __init__( - self, id, postcode, address, epc_record, already_installed=None, non_invasive_recommendations=None, + self, + id, + postcode, + address, + epc_record, + already_installed=None, + non_invasive_recommendations=None, + measures=None, **kwargs ): @@ -85,6 +92,8 @@ class Property: ast.literal_eval(non_invasive_recommendations['recommendations']) if non_invasive_recommendations else [] ) + # This is a list of measures that have been recommended for the property + self.measures = ast.literal_eval(measures) if measures else None self.uprn = epc_record.get("uprn") self.full_sap_epc = epc_record.get("full_sap_epc") @@ -163,12 +172,12 @@ class Property: :return: """ n_bathrooms = kwargs.get("n_bathrooms", None) - if n_bathrooms is not None: + if n_bathrooms not in [None, ""]: # We add on a small value to ensure that the number of bathrooms is rounded up, in case the value is 0.5 n_bathrooms = int(round(float(n_bathrooms) + 1e-5)) n_bedrooms = kwargs.get("n_bedrooms", None) - if n_bedrooms is not None: + if n_bedrooms not in [None, ""]: n_bedrooms = int(round(float(n_bedrooms) + 1e-5)) return { @@ -221,6 +230,29 @@ class Property: # self.base_difference_record.df + def simulate_all_representative_recommendations( + self, property_representative_recommendations, + ): + """ + This method was put together to simulate the impact of the representative recommendations on the property + all at once, for usage within the mds report + :return: + """ + + recommendation_record = self.base_difference_record.df.to_dict("records")[ + 0 + ].copy() + + scoring_dict = self.create_recommendation_scoring_data( + property_id=self.id, + recommendation_record=recommendation_record, + recommendations=property_representative_recommendations, + primary_recommendation_id=self.id, + non_invasive_recommendations=self.non_invasive_recommendations, + ) + + return scoring_dict + def adjust_difference_record_with_recommendations( self, property_recommendations, property_representative_recommendations ): @@ -321,49 +353,6 @@ class Property: for recommendation in recommendations: # For the list of recommendations we have, we iteratively update the output - # We update the description to indicate it's insulated - if recommendation["type"] in [ - "internal_wall_insulation", - "external_wall_insulation", - "cavity_wall_insulation", - ]: - - # # If we have a non-incasive recommendation that the cavity wall is partially filled, we skip the - # # cavity wall insulation recommendation (since on the EPC, the property will look like how it did - # # before any works) - # if "cavity_surveyed_as_filled_is_partial" in non_invasive_recommendations: - # continue - - # The upgrade made here is to the u-value of the walls and the description of the - # insulation thickness - output["walls_thermal_transmittance_ending"] = recommendation[ - "new_u_value" - ] - # Setting the insulation thickness here to above average should be tested further because we - # don't see a high volume of instances for this - output["walls_insulation_thickness_ending"] = "average" - output["walls_energy_eff_ending"] = "Good" - - # Note: often when the wall is insulatied, the internal/external insulation is not noted so we should - # test the impact of using these booleans - if recommendation["type"] == "external_wall_insulation": - output["external_insulation_ending"] = True - output["internal_insulation_ending"] = False - - if recommendation["type"] == "internal_wall_insulation": - output["external_insulation_ending"] = False - output["internal_insulation_ending"] = True - - if recommendation["type"] == "cavity_wall_insulation": - output["is_filled_cavity_ending"] = True - - else: - if output["walls_thermal_transmittance_ending"] is None: - raise ValueError("We should not have a None value for the u value") - - if output["walls_insulation_thickness_ending"] is None: - output["walls_insulation_thickness_ending"] = "none" - # Update description to indicate it's insulate if recommendation["type"] in [ "solid_floor_insulation", @@ -375,11 +364,8 @@ class Property: "Have more than 1 floor insulation part - handle this case" ) - # output["floor_thermal_transmittance_ending"] = recommendation["new_u_value"] # We don't really see above average for this in the training data output["floor_insulation_thickness_ending"] = "average" - # This is rarely ever populated in the training data - # output["floor_energy_eff_ending"] = "Good" else: if output["floor_thermal_transmittance_ending"] is None: raise ValueError("We should not have a None value for the u value") @@ -418,19 +404,20 @@ class Property: 400, ] - proposed_depth = int(parts[0]["depth"]) + proposed_depth = recommendation["new_thickness"] if proposed_depth not in valid_numeric_values: # Take the nearest value for scoring proposed_depth = min( valid_numeric_values, key=lambda x: abs(x - proposed_depth) ) - output["roof_insulation_thickness_ending"] = str(proposed_depth) + output["roof_insulation_thickness_ending"] = str(int(proposed_depth)) if recommendation["type"] == "loft_insulation": if proposed_depth >= 270: output["roof_energy_eff_ending"] = "Very Good" else: - output["roof_energy_eff_ending"] = "Good" + if output["roof_energy_eff_ending"] not in ["Good", "Very Good"]: + output["roof_energy_eff_ending"] = "Good" else: output["roof_energy_eff_ending"] = "Very Good" else: @@ -450,7 +437,8 @@ class Property: if recommendation["type"] == "windows_glazing": output["multi_glaze_proportion_ending"] = 100 - output["windows_energy_eff_ending"] = "Average" + if output["windows_energy_eff_ending"] not in ["Average", "Good", "Very Good"]: + output["windows_energy_eff_ending"] = "Average" is_secondary_glazing = recommendation["is_secondary_glazing"] @@ -481,9 +469,12 @@ class Property: ) if recommendation["type"] in [ - "heating", "hot_water_tank_insulation", "heating_control", "secondary_heating" + "heating", "hot_water_tank_insulation", "heating_control", "secondary_heating", + "internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation", ]: # We update the data, as defined in the recommendaton + if output["walls_insulation_thickness_ending"] is None: + output["walls_insulation_thickness_ending"] = "none" simulation_config = recommendation["simulation_config"] # If any entries in simulation_config are None, we will set them to "Unknown" which is the cleaning diff --git a/backend/apis/GoogleSolarApi.py b/backend/apis/GoogleSolarApi.py index 86324c58..205a3560 100644 --- a/backend/apis/GoogleSolarApi.py +++ b/backend/apis/GoogleSolarApi.py @@ -11,12 +11,14 @@ EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") # This is for 6 Laura Close, Tintagel, PL34 0EB (same property that Cotswolrd energy used) uprn = 100040099104 +# This is for 353A, Hermitage Lane, ME16 9NT (one of the e.on properties) +uprn = 200000964454 cleaning_data = read_dataframe_from_s3_parquet( bucket_name="retrofit-data-dev", file_key="sap_change_model/cleaning_dataset.parquet", ) -searcher = SearchEpc(address1="6 Laura Close", postcode="PL34 0EB", uprn=uprn, auth_token=EPC_AUTH_TOKEN, os_api_key="") +searcher = SearchEpc(address1="", postcode="", uprn=uprn, auth_token=EPC_AUTH_TOKEN, os_api_key="") searcher.find_property(skip_os=True) @@ -80,7 +82,7 @@ solar_potential["panelWidthMeters"] solar_potential["wholeRoofStats"] -# Copy of response for testing: +# Copy of response for testing - 6 Laura Close, Tintagel, PL34 0EB # {'name': 'buildings/ChIJ2yC6t4KEa0gRh2TIssogI7k', 'center': {'latitude': 50.667375, 'longitude': -4.7416833}, # 'imageryDate': {'year': 2021, 'month': 7, 'day': 19}, 'regionCode': 'GB', 'solarPotential': {'maxArrayPanelsCount': # 39, 'maxArrayAreaMeters2': 76.578636, 'maxSunshineHoursPerYear': 1172.0627, 'carbonOffsetFactorKgPerMwh': diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 06d1aadf..1e2c1e6f 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -35,6 +35,7 @@ from recommendations.optimiser.CostOptimiser import CostOptimiser from recommendations.optimiser.GainOptimiser import GainOptimiser from recommendations.optimiser.optimiser_functions import prepare_input_measures from recommendations.Recommendations import Recommendations +from recommendations.Mds import Mds from utils.logger import setup_logger from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3 from backend.ml_models.Valuation import PropertyValuation @@ -618,3 +619,285 @@ async def trigger_plan(body: PlanTriggerRequest): session.close() return Response(status_code=200) + + +@router.post("/mds") +async def build_mds(body: PlanTriggerRequest): + # TODO: This is a placeholder location for the MDS endpoint, which this is being assembled + + logger.info("Connecting to db") + session = sessionmaker(bind=db_engine)() + created_at = datetime.now().isoformat() + + try: + session.begin() + logger.info("Getting the inputs") + plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path) + + cleaning_data = read_dataframe_from_s3_parquet( + bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet", + ) + + input_properties = [] + for property_id, config in tqdm(enumerate(plan_input), total=len(plan_input)): + # We validate each record in the file. If the record is NOT valid, we need to handle this accordingly + uprn = config.get("uprn", None) + uprn = None if uprn == "" else uprn + if uprn: + uprn = int(float(uprn)) + + epc_searcher = SearchEpc( + address1=config["address"], + postcode=config["postcode"], + uprn=uprn, + auth_token=get_settings().EPC_AUTH_TOKEN, + os_api_key=get_settings().ORDNANCE_SURVEY_API_KEY, + ) + epc_searcher.ordnance_survey_client.built_form = config.get("built_form", None) + epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None) + # For the moment, our OS API access is unavailable, so we skip and interpolate + epc_searcher.find_property(skip_os=True) + + if config["address"] == "35b High Street": + print("Performing temporary patch") + epc_searcher.newest_epc["uprn"] = 10002911892 + epc_searcher.full_sap_epc["uprn"] = 10002911892 + + # Create a record in db + # TODO: If we productionise the creation of this mds report, we will need to store this in the db + # property_id, is_new = create_property( + # session, body.portfolio_id, epc_searcher.address_clean, epc_searcher.postcode_clean, epc_searcher.uprn + # ) + # if not is_new: + # continue + # + # create_property_targets( + # session, + # property_id=property_id, + # portfolio_id=body.portfolio_id, + # epc_target=body.goal_value, + # heat_demand_target=None + # ) + + epc_records = { + 'original_epc': epc_searcher.newest_epc.copy(), + 'full_sap_epc': epc_searcher.full_sap_epc.copy(), + 'old_data': epc_searcher.older_epcs.copy(), + } + + # patch = next(( + # x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) + # ), {}) + # epc_records = patch_epc(patch, epc_records) + + prepared_epc = EPCRecord( + epc_records=epc_records, + run_mode="newdata", + cleaning_data=cleaning_data + ) + + # property_already_installed = next(( + # x for x in already_installed if + # (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) + # ), {}) + # + # property_non_invasive_recommendations = next(( + # x for x in non_invasive_recommendations if + # (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) + # ), {}) + + measures = config["measures"] if "measures" in config else None + + input_properties.append( + Property( + id=property_id, + address=epc_searcher.address_clean, + postcode=epc_searcher.postcode_clean, + epc_record=prepared_epc, + # already_installed=property_already_installed, + # non_invasive_recommendations=property_non_invasive_recommendations, + measures=measures, + **Property.extract_kwargs(config) + ) + ) + + logger.info("Reading in materials and cleaned datasets") + materials = get_materials(session) + cleaned = get_cleaned() + + uprn_filenames = read_dataframe_from_s3_parquet( + bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet" + ) + photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket=get_settings().DATA_BUCKET) + + logger.info("Getting spatial data") + for p in tqdm(input_properties): + p.get_spatial_data(uprn_filenames) + + logger.info("Getting components and epc recommendations") + recommendations_scoring_data = [] + representative_recommendations = {} + + for p in tqdm(input_properties): + + p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds) + + mds = Mds(property_instance=p, materials=materials) + property_representative_recommendations, errors = mds.build() + + if errors: + logger.info("Errors occurred during MDS build") + + representative_recommendations[p.id] = property_representative_recommendations + + # Build the scoring data + p.create_base_difference_epc_record(cleaned_lookup=cleaned) + recommendations_scoring_data.append( + p.simulate_all_representative_recommendations(property_representative_recommendations) + ) + + logger.info("Preparing data for scoring in sap change api") + recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data) + + recommendations_scoring_data = recommendations_scoring_data.drop( + columns=["rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending", + "carbon_ending"] + ) + + model_api = ModelApi(portfolio_id=body.portfolio_id, timestamp=created_at) + + all_predictions = { + "sap_change_predictions": pd.DataFrame(), + "heat_demand_predictions": pd.DataFrame(), + "carbon_change_predictions": pd.DataFrame() + } + to_loop_over = range(0, recommendations_scoring_data.shape[0], SCORING_BATCH_SIZE) + for chunk in tqdm(to_loop_over, total=len(to_loop_over)): + predictions_dict = model_api.predict_all( + df=recommendations_scoring_data.iloc[chunk:chunk + SCORING_BATCH_SIZE], + bucket=get_settings().DATA_BUCKET, + prediction_buckets={ + "sap_change_predictions": get_settings().SAP_PREDICTIONS_BUCKET, + "heat_demand_predictions": get_settings().HEAT_PREDICTIONS_BUCKET, + "carbon_change_predictions": get_settings().CARBON_PREDICTIONS_BUCKET + } + ) + + # Append the predictions to the predictions dictionary + for key, scored in predictions_dict.items(): + all_predictions[key] = pd.concat([all_predictions[key], scored]) + + # We now produce a table of results for the mds report + + # TODO: TEMP + for p in plan_input: + if p["uprn"]: + p["uprn"] = str(int(float(p["uprn"]))) + + results = [] + for p in input_properties: + measures = p.measures + property_recommendations = [r['type'] for r in representative_recommendations[p.id]] + + # TODO: Check high heat retention storage heaters - looks like it's excluded controls! + + sap_prediction = all_predictions["sap_change_predictions"][ + all_predictions["sap_change_predictions"]["property_id"] == str(p.id) + ] + + heat_demand_prediction = all_predictions["heat_demand_predictions"][ + all_predictions["heat_demand_predictions"]["property_id"] == str(p.id) + ] + + carbon_prediction = all_predictions["carbon_change_predictions"][ + all_predictions["carbon_change_predictions"]["property_id"] == str(p.id) + ] + + # Get a before and after for SAP, heat demand, CO2 and also calculate energy bill and energy savings + sap_before = int(p.data["current-energy-efficiency"]) + sap_after = sap_prediction["predictions"].values[0] if measures else sap_before + + epc_before = p.data["current-energy-rating"] + epc_after = sap_to_epc(sap_after) if measures else epc_before + + heat_demand_before = p.data["energy-consumption-current"] + heat_demand_after = heat_demand_prediction["predictions"].values[0] if measures else heat_demand_before + + carbon_before = p.data["co2-emissions-current"] + carbon_after = carbon_prediction["predictions"].values[0] if measures else carbon_before + + # Estimate bill savings + + from backend.ml_models.AnnualBillSavings import AnnualBillSavings + current_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered( + epc_energy_consumption=heat_demand_before * p.floor_area, + current_epc_rating=epc_before, + ) + + # TODO: This isn't quite right as this is based on EVERY possible measure, not just the ones that are + # actually implemented + expected_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered( + epc_energy_consumption=heat_demand_after * p.floor_area, + current_epc_rating=epc_before, + ) + + # TODO: We should determine if the home is gas & electricity or just electricity + current_energy_bill = AnnualBillSavings.calculate_annual_bill( + current_adjusted_energy, + ) + expected_energy_bill = AnnualBillSavings.calculate_annual_bill( + expected_adjusted_energy, + ) + + bill_savings = current_energy_bill - expected_energy_bill + energy_savings = current_adjusted_energy - expected_adjusted_energy + + config = [c for c in plan_input if c["uprn"] == str(p.uprn)] + if not config: + config = {"address": None, "postcode": None} + else: + config = config[0] + + to_append = { + "config_address": config["address"], + "config_postcode": config["postcode"], + "address": p.address, + "postcode": p.postcode, + "measures": measures, + "property_recommendations": property_recommendations, + "year_of_epc": p.data['lodgement-date'], + "sap_before": sap_before, + "sap_after": sap_after, + "epc_before": epc_before, + "epc_after": epc_after, + "heat_demand_before": heat_demand_before, + "heat_demand_after": heat_demand_after, + "carbon_before": carbon_before, + "carbon_after": carbon_after, + "bill_savings": bill_savings, + "energy_savings": energy_savings, + } + results.append(to_append) + + results = pd.DataFrame(results) + results["sap_uplift"] = results["sap_after"] - results["sap_before"] + + + except IntegrityError: + logger.error("Database integrity error occurred", exc_info=True) + session.rollback() + return Response(status_code=500, content="Database integrity error.") + except OperationalError: + logger.error("Database operational error occurred", exc_info=True) + session.rollback() + return Response(status_code=500, content="Database operational error.") + except ValueError: + logger.error("Value error - possibly due to malformed data", exc_info=True) + session.rollback() + return Response(status_code=400, content="Bad request: malformed data.") + except Exception as e: # General exception handling + logger.error(f"An error occurred: {e}") + session.rollback() + return Response(status_code=500, content="An unexpected error occurred.") + finally: + session.close() diff --git a/backend/ml_models/AnnualBillSavings.py b/backend/ml_models/AnnualBillSavings.py index 99d67126..b92077e4 100644 --- a/backend/ml_models/AnnualBillSavings.py +++ b/backend/ml_models/AnnualBillSavings.py @@ -43,15 +43,20 @@ class AnnualBillSavings: return cls.ELECTRICITY_PRICE_CAP * kwh @classmethod - def calculate_annual_bill(cls, kwh): + def calculate_annual_bill(cls, kwh, mains_gas=True): """ This method will estimate the total annual bill for a property It assumed gas & electricity are used :param kwh: The total kwh consumption + :param mains_gas: Whether the property uses mains gas :return: An estimate for annual bill """ - return cls.PRICE_FACTOR * kwh + (cls.DAILY_STANDARD_CHARGE_GAS + cls.DAILY_STANDARD_CHARGE_ELECTRICITY * 365) + if mains_gas: + return cls.PRICE_FACTOR * kwh + ( + cls.DAILY_STANDARD_CHARGE_GAS + cls.DAILY_STANDARD_CHARGE_ELECTRICITY * 365) + + return cls.ELECTRICITY_PRICE_CAP * kwh + (cls.DAILY_STANDARD_CHARGE_ELECTRICITY * 365) @classmethod def adjust_energy_to_metered(cls, epc_energy_consumption, current_epc_rating): diff --git a/backend/ml_models/Valuation.py b/backend/ml_models/Valuation.py index cfd775e7..dd77fb4b 100644 --- a/backend/ml_models/Valuation.py +++ b/backend/ml_models/Valuation.py @@ -150,6 +150,10 @@ class PropertyValuation: ] + # Additional sources: + # https://superhomes.org.uk/wp-content/uploads/2024/05/The-Impact-of-Retrofit-on-Residential-Property-Market + # -Values-7-rotated-1.pdf + EPC_BANDS = ["G", "F", "E", "D", "C", "B", "A"] @classmethod diff --git a/backend/ml_models/api.py b/backend/ml_models/api.py index bdc7c178..a2024dd7 100644 --- a/backend/ml_models/api.py +++ b/backend/ml_models/api.py @@ -99,6 +99,13 @@ class ModelApi: # depending on how you want to handle errors in your application return None + @staticmethod + def extract_phase(recommendation_id): + if 'phase=' in recommendation_id: + return int(recommendation_id.split('phase=')[1][0]) + else: + return None + def predict_all(self, df, bucket, prediction_buckets) -> dict: """ @@ -135,9 +142,11 @@ class ModelApi: # To grab the phase, we pull the integer after "phase=" in the recommendation_id. We can do this with a # string split on phase= and then grab the second element of the resulting list. We could also use a # regular expression to do this but we use the string split method here, for safety. - predictions_df['phase'] = predictions_df['recommendation_id'].str.split('phase=').str[1].str[0] + # We may not always have a phase to split on, so we need to handle this case. We can do this by using the + # str[1] method to grab the second element of the resulting list. We then grab the first character of this + # string to get the phase. We then convert this to an integer. # Convert back to int - predictions_df['phase'] = predictions_df['phase'].astype(int) + predictions_df['phase'] = predictions_df['recommendation_id'].apply(self.extract_phase) predictions[model_prefix] = predictions_df diff --git a/etl/customers/eon/pilot_asset_list.py b/etl/customers/eon/pilot_asset_list.py new file mode 100644 index 00000000..4f79e05e --- /dev/null +++ b/etl/customers/eon/pilot_asset_list.py @@ -0,0 +1,271 @@ +import time + +import pandas as pd + +from utils.s3 import read_excel_from_s3 +from backend.SearchEpc import SearchEpc +from dotenv import load_dotenv +import os +from tqdm import tqdm +from utils.s3 import save_csv_to_s3 + +# Read in the .env file in backend +load_dotenv(dotenv_path="backend/.env") +EPC_AUTH_TOKEN = os.getenv("EPC_AUTH_TOKEN") +# Stored in my notes +ORDNANCE_SURVEY_API_KEY = "" + +PORTFOLIO_ID = 80 +USER_ID = 8 + + +def extract_mds_measures(config): + measures = [] + if not pd.isnull(config["EWI (Trad Const)"]): + measures.append({"external_wall_insulation": "EWI (Trad Const)"}) + + if not pd.isnull(config["EWI (Non Trad Const)"]): + measures.append({"external_wall_insulation": "EWI (Non Trad Const)"}) + + if not pd.isnull(config["CWI"]): + measures.append({"cavity_wall_insulation": "CWI"}) + + if not pd.isnull(config["LI"]): + measures.append({"loft_insulation": "LI"}) + + if not pd.isnull(config["Party Wall Insu"]): + measures.append({"party_wall_insulation": "Party Wall Insu"}) + + if not pd.isnull(config["IWI (POA - Prov Sum Only)"]): + measures.append({"internal_wall_insulation": "IWI (POA - Prov Sum Only)"}) + + if not pd.isnull(config["U/F Insu (Manual install)"]): + measures.append({"suspended_floor_insulation": "U/F Insu (Manual install)"}) + + if not pd.isnull(config["U/F insu (Qbot)"]): + measures.append({"suspended_floor_insulation": "U/F insu (Qbot)"}) + + if not pd.isnull(config["Solid floor insl (Out of scope - Prov sum only)"]): + measures.append({"solid_floor_insulation": "Solid floor insl (Out of scope - Prov sum only)"}) + + if not pd.isnull(config["ASHP Htg"]): + measures.append({"air_source_heat_pump": "ASHP Htg"}) + + if not pd.isnull(config["GSHP Htg"]): + measures.append({"ground_source_heat_pump": "GSHP Htg"}) + + if not pd.isnull(config["Shared ground loops"]): + measures.append({"shared_ground_loops": "Shared ground loops"}) + + if not pd.isnull(config["Communal heat networks"]): + measures.append({"communal_heat_networks": "Communal heat networks"}) + + if not pd.isnull(config["District heating networks"]): + measures.append({"district_heating_networks": "District heating networks"}) + + if not pd.isnull(config["Elec Storage Htrs (Out of scope -Prov sum only)"]): + measures.append({"electric_storage_heaters": "Elec Storage Htrs (Out of scope -Prov sum only)"}) + + if not pd.isnull(config["Low Energy Bulbs"]): + measures.append({"low_energy_lighting": "Low Energy Bulbs"}) + + if not pd.isnull(config["Cyl Insulation"]): + measures.append({"cylinder_insulation": "Cyl Insulation"}) + + if not pd.isnull(config["Smart controls"]): + measures.append({"smart_controls": "Smart controls"}) + + if not pd.isnull(config["Zone controls"]): + measures.append({"zone_controls": "Zone controls"}) + + if not pd.isnull(config["Upgrade TRV's"]): + measures.append({"trvs": "Upgrade TRV's"}) + + if not pd.isnull(config["Solar PV"]): + measures.append({"solar_pv": "Solar PV"}) + + if not pd.isnull(config["Solar Thermal"]): + measures.append({"solar_thermal": "Solar Thermal"}) + + if not pd.isnull(config["Double Glazing (POA - Prov sum only)"]): + measures.append({"double_glazing": "Double Glazing (POA - Prov sum only)"}) + + if not pd.isnull(config["Draught Proofing"]): + measures.append({"draught_proofing": "Draught Proofing"}) + + if not pd.isnull(config["Ventilation upgrade"]): + measures.append({"mechanical_ventilation": "Ventilation upgrade"}) + + if not pd.isnull(config["Gas Boiler Replacement"]): + measures.append({"gas_boiler": "Gas Boiler Replacement"}) + + if not pd.isnull(config["Flat roof (Out of scope - prov sum only)"]): + measures.append({"flat_roof_insulation": "Flat roof (Out of scope - prov sum only)"}) + + if not pd.isnull(config["RIR (POA - Prov sum only)"]): + measures.append({"room_in_roof_insulation": "RIR (POA - Prov sum only)"}) + + if not pd.isnull(config["EV Charging"]): + measures.append({"ev_charging": "EV Charging"}) + + if not pd.isnull(config["Battery"]): + measures.append({"battery": "Battery"}) + + return measures + + +def parse_property_type(config): + # This should come from the ordnance survey api eventually + + # array(['Detached', 'Semi-detached', 'Bungalow', 'Mid Terrace', + # 'End Terrace', 'Top Flat', 'Mid Flat', + # 'Low rise flat (1-2 storey)', nan], dtype=object) + + if config["Address"] == "Flat Central Garage": + return {"property_type": "Bungalow", "built_form": "Mid-Terrace"} + + if pd.isnull(config["Property Type"]): + return {"property_type": None, "built_form": None} + + lookup = { + "Detached": {"property_type": "House", "built_form": "Detached"}, + "Semi-detached": {"property_type": "House", "built_form": "Semi-detached"}, + "Bungalow": {"property_type": "Bungalow", "built_form": "Detached"}, + "Mid Terrace": {"property_type": "House", "built_form": "Mid-Terrace"}, + "End Terrace": {"property_type": "House", "built_form": "End-Terrace"}, + "Top Flat": {"property_type": "Flat", "built_form": None}, + "Mid Flat": {"property_type": "Flat", "built_form": None}, + "Low rise flat (1-2 storey)": {"property_type": "Flat", "built_form": None}, + } + + return lookup[config["Property Type"]] + + +def app(): + """ + Create the initial asset list for the E.ON pilot + :return: + """ + + raw_asset_list = read_excel_from_s3( + bucket_name="retrofit-datalake-dev", + file_key="customers/E.ON/sample SHDF Information MDS Template Vr3.0.xlsx", + header_row=11, + drop_all_na=False + ) + + # Keep just the columns we need + raw_asset_list_base = raw_asset_list[ + [ + "Address", "Postcode", "No Bedrooms" + ] + ].copy().rename( + columns={ + "Address": "address", + "Postcode": "postcode", + "No Bedrooms": "n_bedrooms" + } + ) + + # For each property, retrieve UPRN with from the Ordnance Survey API. To do this, I have created a free + # trial with Ordnance Survey with my personal account as a temporary solution. + # Let's just pull the full EPC data for this + asset_list_with_uprn = [] + for row, property_meta in tqdm(raw_asset_list_base.iterrows(), total=raw_asset_list_base.shape[0]): + if row <= 104: + continue + time.sleep(1.1) + searcher = SearchEpc( + address1=property_meta["address"], + postcode=property_meta["postcode"], + auth_token=EPC_AUTH_TOKEN, + os_api_key=ORDNANCE_SURVEY_API_KEY, + full_address=", ".join([property_meta["address"], property_meta["postcode"]]) + ) + + # Let's just find the UPRN + searcher.ordnance_survey_client.get_places_api() + + uprn = searcher.ordnance_survey_client.most_relevant_result["UPRN"] + + # searcher.find_property(skip_os=False) + + asset_list_with_uprn.append( + { + **property_meta, + "uprn": uprn, + } + ) + + # Store this as a backup + # import pandas as pd + # asset_list_with_uprn_df = pd.DataFrame(asset_list_with_uprn) + # asset_list_with_uprn_df.to_csv("eon_asset_list_with_uprn.csv", index=False) + # Read in + # asset_list_with_uprn = pd.read_csv("eon_asset_list_with_uprn.csv").to_dict(orient="records") + + # Store the asset list and create the portfolio payload + asset_list_with_uprn_df = pd.DataFrame(asset_list_with_uprn) + asset_list_with_uprn_df["uprn"] = asset_list_with_uprn_df["uprn"].astype(str).astype(int) + + # We now determine which measures we need for each property + finalised_asset_list = [] + for i, config in raw_asset_list.iterrows(): + + asset_config = asset_list_with_uprn_df[ + (asset_list_with_uprn_df["address"] == config["Address"]) & + (asset_list_with_uprn_df["postcode"] == config["Postcode"]) + ] + if asset_config.shape[0] != 1: + raise ValueError("Could not find a unique match for the property") + + measures = extract_mds_measures(config) + + # Get the property type + pt = parse_property_type(config) + + if config["Address"] in [ + "28 Hermitage Lane", + "35a High Street", + "35b High Street", + "Flat Over 20 Holborough Road", + "Flat above 7 Malling Road" + ]: + print(config["Address"]) + uprn = None + else: + uprn = asset_config["uprn"].values[0] + + finalised_asset_list.append( + { + "address": config["Address"], + "postcode": config["Postcode"], + "uprn": uprn, + "n_bedrooms": config["No Bedrooms"], + "measures": measures, + **pt + } + ) + finalised_asset_list = pd.DataFrame(finalised_asset_list) + + # Store the asset list in s3 + filename = f"{USER_ID}/{PORTFOLIO_ID}/pilot.csv" + save_csv_to_s3( + dataframe=finalised_asset_list, + bucket_name="retrofit-plan-inputs-dev", + file_name=filename + ) + + # EPC C portoflio + body = { + "portfolio_id": str(PORTFOLIO_ID), + "housing_type": "Social", + "goal": "Increase EPC", + "goal_value": "C", + "trigger_file_path": filename, + "already_installed_file_path": "", + "patches_file_path": "", + "non_invasive_recommendations_file_path": "", + "budget": None, + } + print(body) diff --git a/etl/customers/goldman/property_ownership.py b/etl/customers/goldman/property_ownership.py index 24922f68..d30205ae 100644 --- a/etl/customers/goldman/property_ownership.py +++ b/etl/customers/goldman/property_ownership.py @@ -20,27 +20,39 @@ def aggregate_matches(matching_lookup, company_ownership, properties): properties[["UPRN", "LOCAL_AUTHORITY_LABEL"]], how="left", on="UPRN" ) counts = ( - df.groupby(["Company Registration No. (1)", "Proprietor Name (1)", "LOCAL_AUTHORITY_LABEL"])["UPRN"] + df.groupby(["Company Registration No. (1)", "LOCAL_AUTHORITY_LABEL"])["UPRN"] .count() .reset_index(name="number_of_properties") ) counts = counts.sort_values("number_of_properties", ascending=False) pivot_counts = counts.pivot_table( - index=["Company Registration No. (1)", "Proprietor Name (1)"], # Rows: companies and proprietors + index=["Company Registration No. (1)"], # Rows: companies and proprietors columns="LOCAL_AUTHORITY_LABEL", # Columns: each local authority values="number_of_properties", # The counts of properties fill_value=0 # Fill missing values with 0 (where there are no properties owned) ).reset_index() total_counts = ( - df.groupby(["Company Registration No. (1)", "Proprietor Name (1)"])["UPRN"] + df.groupby(["Company Registration No. (1)"])["UPRN"] .count() .reset_index(name="total_number_of_properties") ) + # We have cases where the same company registration number results in the same company name, so we produce a best + # name per company registration number + best_names = ( + df.groupby(["Company Registration No. (1)"])["Proprietor Name (1)"] + .first() + .reset_index() + ) + + total_counts = best_names.merge( + total_counts, how="left", on=["Company Registration No. (1)"] + ) + pivot_counts = pivot_counts.merge( - total_counts, how="left", on=["Company Registration No. (1)", "Proprietor Name (1)"] + total_counts, how="left", on=["Company Registration No. (1)"] ) pivot_counts = pivot_counts.sort_values("total_number_of_properties", ascending=False) @@ -187,7 +199,45 @@ def remove_duplicate_matches(matching_lookup, properties, company_ownership): if not to_drop.empty: merged = pd.merge(matching_lookup, to_drop, on=['UPRN', 'Title Number'], how='left', indicator=True) - merged[merged['_merge'] == 'left_only'].drop(columns=['_merge']) + merged = merged[merged['_merge'] == 'left_only'].drop(columns=['_merge']) + + return merged + + return matching_lookup + + +def remove_duplicate_uprn_matches(matching_lookup, properties, company_ownership): + dupe_uprns = matching_lookup[matching_lookup["UPRN"].duplicated()]["UPRN"].unique().tolist() + + to_drop = [] + for dupe_uprn in dupe_uprns: + dupe_data = matching_lookup[matching_lookup["UPRN"] == dupe_uprn].copy() + matched_addresses = dupe_data.merge( + properties[["UPRN", "ADDRESS"]].rename(columns={"ADDRESS": "epc_address"}), + how="left", on="UPRN" + ).merge( + company_ownership[["Title Number", "Property Address"]], + how="left", on="Title Number" + ) + # We perform levenstein to get the best match + best_match = levenstein_match( + matching_string=matched_addresses["Property Address"].values[0], + df=matched_addresses, + address_col="epc_address" + ) + matches_to_drop = matched_addresses[ + ~matched_addresses["Title Number"].isin(best_match["Title Number"].values) + ] + + to_drop.append( + matches_to_drop[["UPRN", "Title Number"]].copy() + ) + + to_drop = pd.concat(to_drop) + + if not to_drop.empty: + merged = pd.merge(matching_lookup, to_drop, on=['UPRN', 'Title Number'], how='left', indicator=True) + merged = merged[merged['_merge'] == 'left_only'].drop(columns=['_merge']) return merged @@ -254,6 +304,9 @@ def app(): properties = properties[ properties["TENURE"].isin(["rental (private)", "Rented (private)", "owner-occupied", "Owner-occupied"]) ] + # We have some duplicated on UPRN + # Take the newest UPRN + properties = properties.sort_values("LODGEMENT_DATE", ascending=False).drop_duplicates("UPRN") # Remove entries where the address begins with the term "land adjoining", or other records that don't reference the # the property itself @@ -354,43 +407,75 @@ def app(): freehold_matching_lookup = pd.DataFrame(freehold_matching_lookup) leasehold_matching_lookup = pd.DataFrame(leasehold_matching_lookup) shared_leasehold_match = pd.concat(shared_leasehold_match) + shared_freehold_match = pd.concat(shared_freehold_match) + + # freehold_matching_lookup.to_excel("freehold_matching_lookup_new.xlsx") + # leasehold_matching_lookup.to_excel("leasehold_matching_lookup_new.xlsx") + # shared_leasehold_match.to_excel("shared_leasehold_match_new.xlsx") + # shared_freehold_match.to_excel("shared_freehold_match_new.xlsx") # The approximate matches aren't very good freehold_matching_lookup = freehold_matching_lookup[freehold_matching_lookup["match_type"] == "exact"] leasehold_matching_lookup = leasehold_matching_lookup[leasehold_matching_lookup["match_type"] == "exact"] - # There are some cases where we have duplicates - freehold_matching_lookup = remove_duplicate_matches(freehold_matching_lookup, properties, company_ownership) - leasehold_matching_lookup = remove_duplicate_matches(leasehold_matching_lookup, properties, company_ownership) + # Combine + combined_matching_lookup = pd.concat([freehold_matching_lookup, leasehold_matching_lookup]) + # Remove duplicates + combined_matching_lookup = remove_duplicate_matches(combined_matching_lookup, properties, company_ownership) + # We also have duplicates at a UPRN level + combined_matching_lookup = remove_duplicate_uprn_matches(combined_matching_lookup, properties, company_ownership) - matched_addresses = freehold_matching_lookup.merge( - properties[["UPRN", "ADDRESS"]].rename(columns={"ADDRESS": "epc_address"}), + # There are some cases where we have duplicates + # freehold_matching_lookup = remove_duplicate_matches(freehold_matching_lookup, properties, company_ownership) + # leasehold_matching_lookup = remove_duplicate_matches(leasehold_matching_lookup, properties, company_ownership) + + matched_addresses = combined_matching_lookup.merge( + properties[["UPRN", "ADDRESS", "CURRENT_ENERGY_EFFICIENCY", "CURRENT_ENERGY_RATING"]].rename( + columns={"ADDRESS": "epc_address"}), how="left", on="UPRN" ).merge( - company_ownership[["Title Number", "Property Address"]], + company_ownership[["Title Number", "Property Address", "Company Registration No. (1)", "Proprietor Name (1)"]], how="left", on="Title Number" ) # shared_freehold_match = pd.DataFrame(shared_freehold_match) # Strore these files - freehold_matching_lookup.to_excel("freehold_matching_lookup.xlsx") - leasehold_matching_lookup.to_excel("leasehold_matching_lookup.xlsx") - shared_leasehold_match.to_excel("shared_leasehold_match.xlsx") + # freehold_matching_lookup.to_excel("freehold_matching_lookup.xlsx") + # leasehold_matching_lookup.to_excel("leasehold_matching_lookup.xlsx") + # shared_leasehold_match.to_excel("shared_leasehold_match.xlsx") # shared_freehold_match.to_excel("shared_freehold_match.xlsx") + # read the files + # freehold_matching_lookup = pd.read_excel("freehold_matching_lookup.xlsx") + # leasehold_matching_lookup = pd.read_excel("leasehold_matching_lookup.xlsx") + # shared_leasehold_match = pd.read_excel("shared_leasehold_match.xlsx") freehold_aggregate = aggregate_matches(freehold_matching_lookup, company_ownership, properties) leasehold_aggregate = aggregate_matches(leasehold_matching_lookup, company_ownership, properties) combined_aggregate = aggregate_matches( - pd.concat([freehold_matching_lookup, leasehold_matching_lookup]), company_ownership, properties + combined_matching_lookup, company_ownership, properties ) - df = pd.concat([freehold_matching_lookup, leasehold_matching_lookup]) - investment_20m = combined_aggregate[combined_aggregate["cumulative_value"] <= 20_500_000] investment_50m = combined_aggregate[combined_aggregate["cumulative_value"] <= 51_000_000] - properties["WALLS_DESCRIPTION"].value_counts(normalize=True) + investment_20m_properties = matched_addresses[ + matched_addresses["Company Registration No. (1)"].isin(investment_20m["Company Registration No. (1)"]) + ] + + investment_50m_properties = matched_addresses[ + matched_addresses["Company Registration No. (1)"].isin(investment_50m["Company Registration No. (1)"]) + ] + + portfolio_epc_data_50m = properties[properties["UPRN"].isin(investment_50m_properties["UPRN"])] + portfolio_epc_data_20m = properties[properties["UPRN"].isin(investment_20m_properties["UPRN"])] + + investment_20m_properties.to_excel("investment_20m_properties 28th May.xlsx", index=False) + investment_50m_properties.to_excel("investment_50m_properties 28th May.xlsx", index=False) + + # Store the EPC data + portfolio_epc_data_50m.to_excel("portfolio_epc_data_50m 28th May.xlsx", index=False) + portfolio_epc_data_20m.to_excel("portfolio_epc_data_20m 28th May.xlsx", index=False) def company_aggregation(): diff --git a/etl/epc_clean/app.py b/etl/epc_clean/app.py index 3f1a1a80..59561b3c 100644 --- a/etl/epc_clean/app.py +++ b/etl/epc_clean/app.py @@ -2,24 +2,27 @@ from tqdm import tqdm import os import pandas as pd import msgpack +import inspect from etl.epc_clean.EpcClean import EpcClean from etl.epc.settings import EARLIEST_EPC_DATE from pathlib import Path from utils.s3 import save_data_to_s3 +src_file_path = inspect.getfile(lambda: None) + LAND_REGISTRY_PATHS = [ - os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-monthly-update-new-version.csv", - os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2022 (1).csv", - os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2021.csv", - os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2020.csv", - os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2019.csv", - os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2018.csv", - os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2017-part1.csv", - os.path.abspath(os.path.dirname(__file__)) + "/model_data/local_data/pp-2017-part2.csv", + os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-monthly-update-new-version.csv", + os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2022 (1).csv", + os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2021.csv", + os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2020.csv", + os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2019.csv", + os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2018.csv", + os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2017-part1.csv", + os.path.abspath(os.path.dirname(src_file_path)) + "/model_data/local_data/pp-2017-part2.csv", ] -EPC_DIRECTORY = Path(__file__).parent / "local_data" / "all-domestic-certificates" +EPC_DIRECTORY = Path(src_file_path).parent / "local_data" / "all-domestic-certificates" ENVIRONMENT = os.getenv("ENVIRONMENT", "dev") diff --git a/etl/epc_clean/epc_attributes/HotWaterAttributes.py b/etl/epc_clean/epc_attributes/HotWaterAttributes.py index e8bce0bb..54deaa09 100644 --- a/etl/epc_clean/epc_attributes/HotWaterAttributes.py +++ b/etl/epc_clean/epc_attributes/HotWaterAttributes.py @@ -116,7 +116,14 @@ class HotWaterAttributes(Definitions): "instantaneous at " "point of use, " "waste water heat " - "recovery" + "recovery", + "ogçör brif system, adfer gwres d+¦r gwastraff": "from main system, waste water heat recovery", + "twymwr tanddwr, tarriff safonol, adfer gwres d+¦r gwastraff": "electric immersion, standard tariff, waste " + "water heat recovery", + "ogçör brif system, dim thermostat ar y silindr, adfer gwres nwyon ffliw": "from main system, no cylinder " + "thermostat, flue gas heat recovery", + "ogçör brif system, gydag ynnigçör haul, adfer gwres nwyon ffliw": "from main system, plus solar, flue gas " + "heat recovery", } def __init__(self, description: str): diff --git a/etl/epc_clean/epc_attributes/MainheatAttributes.py b/etl/epc_clean/epc_attributes/MainheatAttributes.py index 673b460a..9f0931a3 100644 --- a/etl/epc_clean/epc_attributes/MainheatAttributes.py +++ b/etl/epc_clean/epc_attributes/MainheatAttributes.py @@ -56,6 +56,9 @@ class MainHeatAttributes(Definitions): "bwyler a gwres dan y llawr, lpg": "boiler and underfloor heating, lpg", "bwyler a gwres dan y llawr, trydan": "boiler and underfloor heating, electric", "boiler and radiators, nwy prif gyflenwad, mains gas": "boiler and radiators, mains gas", + "bwyler a rheiddiaduron, olew, st+¦r wresogyddion trydan": "boiler and radiators, oil, electric storage " + "heaters", + "pwmp gwres sygçön tarddu yn yr awyr, awyr gynnes, trydan": "air source heat pump, warm air, electric", } REMAP = { diff --git a/etl/epc_clean/epc_attributes/MainheatControlAttributes.py b/etl/epc_clean/epc_attributes/MainheatControlAttributes.py index 23f39d08..887bdda7 100644 --- a/etl/epc_clean/epc_attributes/MainheatControlAttributes.py +++ b/etl/epc_clean/epc_attributes/MainheatControlAttributes.py @@ -111,7 +111,8 @@ class MainheatControlAttributes(Definitions): 't+-ól un gyfradd, trvs': 'single rate heating, trvs', 't+ól un gyfradd, rhaglennydd a trvs': 'single rate heating, programmer, trvs', 't+ól un gyfradd, trvs': 'single rate heating, trvs', - 'trvs a falf osgoi': 'trvs and bypass' + 'trvs a falf osgoi': 'trvs and bypass', + 'rheolaeth celect': 'celect-type control', } def __init__(self, description: str): diff --git a/etl/epc_clean/epc_attributes/WindowAttributes.py b/etl/epc_clean/epc_attributes/WindowAttributes.py index ce0b156a..5286fc5a 100644 --- a/etl/epc_clean/epc_attributes/WindowAttributes.py +++ b/etl/epc_clean/epc_attributes/WindowAttributes.py @@ -30,6 +30,7 @@ class WindowAttributes(Definitions): "gwydrau eilaidd llawn": "full secondary glazing", "gwydrau eilaidd mwyaf": "mostly secondary glazing", "gwydrau eilaidd rhannol": "partial secondary glazing", + "gwydrau lluosog ym mhobman": "multiple glazing throughout", } def __init__(self, description: str): diff --git a/etl/epc_clean/epc_attributes/attribute_utils.py b/etl/epc_clean/epc_attributes/attribute_utils.py index b5fc590d..60f4653e 100644 --- a/etl/epc_clean/epc_attributes/attribute_utils.py +++ b/etl/epc_clean/epc_attributes/attribute_utils.py @@ -24,7 +24,7 @@ def extract_thermal_transmittance(result: dict, description: str) -> Tuple[ if match: result['thermal_transmittance'] = float(match.group(1)) - result['thermal_transmittance_unit'] = match.group(3) + result['thermal_transmittance_unit'] = "w/m-¦k" # We standardise the unit # Remove the match from the description description = re.sub(THERMAL_TRANSMITTANCE_STR, "", description) else: diff --git a/infrastructure/terraform/main.tf b/infrastructure/terraform/main.tf index 55266e10..0da850c5 100644 --- a/infrastructure/terraform/main.tf +++ b/infrastructure/terraform/main.tf @@ -81,6 +81,10 @@ resource "aws_db_instance" "default" { # We will look to change this in the future but as we are pre-MVP at the time of setting this, we don't # have major security demand and don't want to set this up now publicly_accessible = true + # Specify the CA certificate with the default RDS CA certificate + ca_cert_identifier = "rds-ca-rsa2048-g1" + # Temporary to enfore immediate change + apply_immediately = true } # Set up the bucket that recieve the csv uploads of epc to be retrofit @@ -147,7 +151,7 @@ module "route53" { source = "./modules/route53" domain_name = var.domain_name api_url_prefix = var.api_url_prefix - providers = { + providers = { aws.aws_use1 = aws.aws_use1 } } diff --git a/recommendations/Costs.py b/recommendations/Costs.py index fd3c1692..03190727 100644 --- a/recommendations/Costs.py +++ b/recommendations/Costs.py @@ -626,12 +626,10 @@ class Costs: preliminaries_rate = self.EWI_SCAFFOLDING_PRELIMINARIES else: preliminaries_rate = self.EWI_NO_SCAFFOLDING_PRELIMINARIES - elif self.property.data["property-type"] == "Maisonette": + elif self.property.data["property-type"] in ["Maisonette", "Flat"]: preliminaries_rate = self.EWI_SCAFFOLDING_PRELIMINARIES elif self.property.data["property-type"] == "Bungalow": preliminaries_rate = self.EWI_NO_SCAFFOLDING_PRELIMINARIES - else: - raise ValueError("Unsupported property type - haven't handled flats") demolition_data = [x for x in non_insulation_materials if x["type"] == "ewi_wall_demolition"] preparation_data = [x for x in non_insulation_materials if x["type"] == "ewi_wall_preparation"] diff --git a/recommendations/HeatingRecommender.py b/recommendations/HeatingRecommender.py index a51803f2..2041f783 100644 --- a/recommendations/HeatingRecommender.py +++ b/recommendations/HeatingRecommender.py @@ -103,7 +103,7 @@ class HeatingRecommender: return - def recommend_air_source_heat_pump(self, phase, has_cavity_or_loft_recommendations): + def recommend_air_source_heat_pump(self, phase, has_cavity_or_loft_recommendations, _return=False): """ This method will implement the recommendation for an air source heat pump This is ultimately an overhaul to the heating system and so is recommended as an alternative to other @@ -200,6 +200,8 @@ class HeatingRecommender: **ashp_costs } + if _return: + return [ashp_recommendation] self.heating_recommendations.append(ashp_recommendation) @staticmethod @@ -312,7 +314,7 @@ class HeatingRecommender: return output - def recommend_hhr_storage_heaters(self, phase, system_change, heating_controls_only): + def recommend_hhr_storage_heaters(self, phase, system_change, heating_controls_only, _return=False): """ We will recommend upgrading to a high heat retention storage system, if the current system is not already high heat retention storage @@ -321,6 +323,8 @@ class HeatingRecommender: :param system_change: Indicates if we are recommending a different type of heating system, compared to the current system :param heating_controls_only: Indicates if we should include a recommendation for just heating controls + :param _return: Indicates if we should return the recommendations, rather than appending them to the + recommendations list :return: """ @@ -374,6 +378,8 @@ class HeatingRecommender: heating_controls_only=heating_controls_only, system_change=system_change ) + if _return: + return recommendations self.heating_recommendations.extend(recommendations) diff --git a/recommendations/Mds.py b/recommendations/Mds.py new file mode 100644 index 00000000..7453e5e9 --- /dev/null +++ b/recommendations/Mds.py @@ -0,0 +1,173 @@ +from backend.Property import Property +from recommendations.FloorRecommendations import FloorRecommendations +from recommendations.WallRecommendations import WallRecommendations +from recommendations.RoofRecommendations import RoofRecommendations +from recommendations.VentilationRecommendations import VentilationRecommendations +from recommendations.FireplaceRecommendations import FireplaceRecommendations +from recommendations.LightingRecommendations import LightingRecommendations +from recommendations.SolarPvRecommendations import SolarPvRecommendations +from recommendations.WindowsRecommendations import WindowsRecommendations +from recommendations.HeatingRecommender import HeatingRecommender +from recommendations.HotwaterRecommendations import HotwaterRecommendations +from recommendations.SecondaryHeating import SecondaryHeating +from recommendations.Recommendations import Recommendations + + +class Mds: + """ + Handles the contruction of the MDS report + """ + + def __init__(self, property_instance: Property, materials): + self.property_instance = property_instance + + self.floor_recommender = FloorRecommendations(property_instance=property_instance, materials=materials) + self.wall_recommender = WallRecommendations(property_instance=property_instance, materials=materials) + self.roof_recommender = RoofRecommendations(property_instance=property_instance, materials=materials) + self.ventilation_recomender = VentilationRecommendations( + property_instance=property_instance, materials=materials + ) + self.fireplace_recommender = FireplaceRecommendations(property_instance=property_instance) + self.lighting_recommender = LightingRecommendations(property_instance=property_instance, materials=materials) + self.windows_recommender = WindowsRecommendations(property_instance=property_instance, materials=materials) + self.solar_recommender = SolarPvRecommendations(property_instance=property_instance) + self.heating_recommender = HeatingRecommender(property_instance=property_instance) + self.hotwater_recommender = HotwaterRecommendations(property_instance=property_instance) + self.secondary_heating_recommender = SecondaryHeating(property_instance=property_instance) + + def build(self): + if self.property_instance.measures is None: + raise NotImplementedError("No measures in the property - implement me") + + measures = self.property_instance.measures + + measure_config_list = [list(m.keys())[0] for m in measures] + + not_implemented_measures = [ + "party_wall_insulation", + "ground_source_heat_pump", + "shared_ground_loops", + "communal_heat_networks", + "district_heating_networks", + "solar_thermal", + "draught_proofing", + "ev_charging", + "battery", + ] + # Check if we have a not implemented measure + if any([m in not_implemented_measures for m in measure_config_list]): + raise NotImplementedError("Not implemented measure in the property - implement me") + + mds_recommendations = [] + errors = [] + + # TODO: Could use a decarator to reduce the boilerplate code - insert_recommendation_id and then the append + + if "external_wall_insulation" in measure_config_list: + recs = self.wall_recommender.mds_recommend_ewi(phase=0) + if not recs: + raise Exception("No recommendations for external wall insulation") + recs = self.insert_recommendation_id(recs, measures, "external_wall_insulation") + mds_recommendations.append(recs) + + if "cavity_wall_insulation" in measure_config_list: + recs = self.wall_recommender.mds_recommend_cavity_wall_insulation(phase=0) + recs = self.insert_recommendation_id(recs, measures, "cavity_wall_insulation") + mds_recommendations.append(recs) + + if "loft_insulation" in measure_config_list: + # Check if the roof is suitable for loft insulation + if self.property_instance.roof['is_roof_room']: + errors.append("Roof is a room") + else: + recs = self.roof_recommender.mds_loft_insulation(phase=0) + if not recs: + raise Exception("No recommendations for loft insulation") + recs = self.insert_recommendation_id(recs, measures, "loft_insulation") + mds_recommendations.append(recs) + + if "internal_wall_insulation" in measure_config_list: + raise Exception("check me out 4") + self.wall_recommender.recommend(phase=0) + + if "suspended_floor_insulation" in measure_config_list: + raise Exception("check me out 5") + self.floor_recommender.recommend(phase=0) + + if "solid_floor_insulation" in measure_config_list: + raise Exception("check me out 6") + self.floor_recommender.recommend(phase=0) + + if "air_source_heat_pump" in measure_config_list: + recs = self.heating_recommender.recommend_air_source_heat_pump( + phase=0, has_cavity_or_loft_recommendations=False, _return=True + ) + recs = self.insert_recommendation_id(recs, measures, "air_source_heat_pump") + mds_recommendations.append(recs) + + if "electric_storage_heaters" in measure_config_list: + recs = self.heating_recommender.recommend_hhr_storage_heaters( + phase=0, system_change=True, heating_controls_only=False, _return=True + ) + recs = self.insert_recommendation_id(recs, measures, "electric_storage_heaters") + mds_recommendations.append(recs) + + if "low_energy_lighting" in measure_config_list: + raise Exception("check me out 9") + self.lighting_recommender.recommend(phase=0) + + if "cylinder_insulation" in measure_config_list: + raise Exception("check me out 10") + self.hotwater_recommender.recommend(phase=0) + + if "smart_controls" in measure_config_list: + raise Exception("check me out 11") + self.heating_recommender.recommend(phase=0) + + if "zone_controls" in measure_config_list: + raise Exception("check me out 12") + self.heating_recommender.recommend(phase=0) + + if "trvs" in measure_config_list: + raise Exception("check me out 13") + self.heating_recommender.recommend(phase=0) + + if "solar_pv" in measure_config_list: + recs = self.solar_recommender.mds_recommend(phase=0, solar_pv_percentage=0.5) + recs = self.insert_recommendation_id(recs, measures, "solar_pv") + mds_recommendations.append(recs) + + if "double_glazing" in measure_config_list: + raise Exception("check me out 15") + self.windows_recommender.recommend(phase=0) + + if "mechanical_ventilation" in measure_config_list: + raise Exception("check me out 16") + self.ventilation_recomender.recommend(phase=0) + + if "gas_boiler" in measure_config_list: + raise Exception("check me out 17") + self.heating_recommender.recommend(phase=0) + + if "flat_roof_insulation" in measure_config_list: + raise Exception("check me out 18") + self.roof_recommender.recommend(phase=0) + + if "room_in_roof_insulation" in measure_config_list: + raise Exception("check me out 19") + self.roof_recommender.recommend(phase=0) + + property_representative_recommendations = Recommendations.create_representative_recommendations( + mds_recommendations, non_invasive_recommendations=[] + ) + + return property_representative_recommendations, errors + + @staticmethod + def insert_recommendation_id(recommendations, measures, measure_name): + # Insert the recommendation identifier into this recommendation + measure_config = [m for m in measures if measure_name in m][0] + for r in recommendations: + r["recommendation_id"] = list(measure_config.values())[0] + + return recommendations diff --git a/recommendations/RoofRecommendations.py b/recommendations/RoofRecommendations.py index dc5ee7db..538d90e4 100644 --- a/recommendations/RoofRecommendations.py +++ b/recommendations/RoofRecommendations.py @@ -54,6 +54,26 @@ class RoofRecommendations: ] ] + def mds_loft_insulation(self, phase): + """ + For usages within the mds report + :param phase: + :return: + """ + self.recommendations = [] + + insulation_thickness = convert_thickness_to_numeric( + self.property.roof["insulation_thickness"], + self.property.roof["is_pitched"], + self.property.roof["is_flat"] + ) + + u_value = get_roof_u_value(**{**self.property.roof, "age_band": self.property.age_band}) + + self.recommend_roof_insulation(u_value, insulation_thickness, self.property.roof, phase) + + return self.recommendations + def recommend(self, phase): if self.property.roof["has_dwelling_above"]: @@ -210,6 +230,7 @@ class RoofRecommendations: already_installed = "loft_insulation" in self.property.already_installed if already_installed: cost_result = override_costs(cost_result) + new_thickness = insulation_thickness + material["depth"] elif material["type"] == "flat_roof_insulation": cost_result = self.costs.flat_roof_insulation( floor_area=self.property.insulation_floor_area, @@ -219,6 +240,7 @@ class RoofRecommendations: already_installed = "flat_roof_insulation" in self.property.already_installed if already_installed: cost_result = override_costs(cost_result) + new_thickness = None else: raise ValueError("Invalid material type") @@ -239,6 +261,7 @@ class RoofRecommendations: "new_u_value": new_u_value, "sap_points": None, "already_installed": already_installed, + "new_thickness": new_thickness, **cost_result } ) diff --git a/recommendations/SolarPvRecommendations.py b/recommendations/SolarPvRecommendations.py index 58d4b123..14161da3 100644 --- a/recommendations/SolarPvRecommendations.py +++ b/recommendations/SolarPvRecommendations.py @@ -35,6 +35,46 @@ class SolarPvRecommendations: return trimmed_list + def mds_recommend(self, phase=None, solar_pv_percentage=0.5): + # For specific usage within the mds report + + solar_pv_roof_area = self.property.get_solar_pv_roof_area(solar_pv_percentage) + + number_solar_panels = np.floor(solar_pv_roof_area / self.SOLAR_PANEL_AREA) + solar_panel_wattage = number_solar_panels * self.SOLAR_PANEL_WATTAGE + + solar_panel_wattage = np.clip( + a=solar_panel_wattage, a_min=self.MIN_SYSTEM_WATTAGE, a_max=self.MAX_SYSTEM_WATTAGE + ) + + # We now have a property which is potentially suitable for solar PV + roof_coverage_percent = round(solar_pv_percentage * 100) + # Given the wattage, we estimate the cost of the solar PV system. This is based on the MCS database + # of solar PV installations + cost_result = self.costs.solar_pv(wattage=solar_panel_wattage, has_battery=False) + kw = np.floor(solar_panel_wattage / 100) / 10 + + description = (f"Install a {kw} kilowatt-peak (kWp) solar photovoltaic (PV) p" + f"anel system on {round(roof_coverage_percent)}% the roof.") + + return [ + { + "phase": phase, + "parts": [], + "type": "solar_pv", + "description": description, + "starting_u_value": None, + "new_u_value": None, + "sap_points": None, + "already_installed": False, + **cost_result, + # This is required for simulating the SAP impact. solar_pv_percentage is between 0 & 1 so we scale + # back up here + "photo_supply": roof_coverage_percent, + "has_battery": False + } + ] + def recommend(self, phase): """ We check if a property is potentially suitable for solar PV based on the following criteria: diff --git a/recommendations/WallRecommendations.py b/recommendations/WallRecommendations.py index 8ca34bc8..ea58c4e3 100644 --- a/recommendations/WallRecommendations.py +++ b/recommendations/WallRecommendations.py @@ -6,14 +6,10 @@ import pandas as pd from datatypes.enums import QuantityUnits from backend.Property import Property from BaseUtility import Definitions +from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes from recommendations.recommendation_utils import ( - r_value_per_mm_to_u_value, - calculate_u_value_uplift, - is_diminishing_returns, - update_lowest_selected_u_value, - get_recommended_part, - get_wall_u_value, - override_costs, + r_value_per_mm_to_u_value, calculate_u_value_uplift, is_diminishing_returns, update_lowest_selected_u_value, + get_recommended_part, get_wall_u_value, override_costs, check_simulation_difference ) from recommendations.config import PARTIALLY_FILLED_PERCENTAGE_ASSUMPTION from recommendations.Costs import Costs @@ -58,7 +54,31 @@ class WallRecommendations(Definitions): # threshold NEW_BUILD_INSULATED = 0.75 - def __init__(self, property_instance: Property, materials: List): + # These are the ending descriptions we consider for walls with external insulation + EXTERNALLY_INSULATED_WALL_DESCRIPTIONS = { + "solid_brick": "Solid brick, with external insulation", + "cob": "Cob, with external insulation", + "system_built": "System built, with external insulation", + "granite_or_whinstone": 'Granite or whinstone, with external insulation', + "sandstone_or_limestone": 'Sandstone or limestone, with external insulation', + "timber_frame": "Timber frame, with external insulation" + } + + # These are the ending descriptions we consider for walls with internal insulation + INTERNALLY_INSULATED_WALL_DESCRIPTIONS = { + "solid_brick": "Solid brick, with internal insulation", + "cob": "Cob, with internal insulation", + "system_built": "System built, with internal insulation", + "granite_or_whinstone": 'Granite or whinstone, with internal insulation', + "sandstone_or_limestone": 'Sandstone or limestone, with internal insulation', + "timber_frame": "Timber frame, with internal insulation" + } + + def __init__( + self, + property_instance: Property, + materials: List + ): self.property = property_instance self.costs = Costs(self.property) # For audit purposes, when estimating u values we'll store it @@ -108,6 +128,47 @@ class WallRecommendations(Definitions): return True + def mds_recommend_cavity_wall_insulation(self, phase=None): + # Function specifically for cavity wall insulation, for usage in the mds report + self.recommendations = [] + insulation_thickness = self.property.walls["insulation_thickness"] + + u_value = get_wall_u_value( + clean_description=self.property.walls["clean_description"], + age_band=self.property.age_band, + is_granite_or_whinstone=self.property.walls["is_granite_or_whinstone"], + is_sandstone_or_limestone=self.property.walls["is_sandstone_or_limestone"], + ) + + # Test filling cavity + self.find_cavity_insulation(u_value, insulation_thickness, phase) + + return self.recommendations + + def mds_recommend_ewi(self, phase=None): + # Function specifically for external wall insulation, for usage in the mds report + self.recommendations = [] + + u_value = self.property.walls["thermal_transmittance"] + + if u_value is None: + u_value = get_wall_u_value( + clean_description=self.property.walls["clean_description"], + age_band=self.property.age_band, + is_granite_or_whinstone=self.property.walls["is_granite_or_whinstone"], + is_sandstone_or_limestone=self.property.walls["is_sandstone_or_limestone"], + ) + + # EWI + ewi_recommendations = self._find_insulation( + u_value=u_value, + insulation_materials=pd.DataFrame(self.external_wall_insulation_materials), + non_insulation_materials=self.external_wall_non_insulation_materials, + phase=phase + ) + + return ewi_recommendations + def recommend(self, phase=0): # if building built after 1990 + we're able to identify U-value + # U-value less than 0.18 and if in or close to a conversation area, @@ -269,6 +330,21 @@ class WallRecommendations(Definitions): # updated the new u-value with the best possible our installers have new_u_value = max(0.31, new_u_value) + wall_ending_config = WallAttributes("Cavity wall, filled cavity").process() + + simulation_config = {} + if self.property.data["walls-energy-eff"] not in ["Good", "Very Good"]: + simulation_config = { + "walls_energy_eff_ending": "Good", + "walls_thermal_transmittance_ending": new_u_value + } + + walls_simulation_config = check_simulation_difference( + new_config=wall_ending_config, old_config=self.property.walls, prefix="walls_" + ) + + simulation_config = {**simulation_config, **walls_simulation_config} + recommendations.append( { "phase": phase, @@ -286,15 +362,40 @@ class WallRecommendations(Definitions): "new_u_value": new_u_value, "sap_points": None, "already_installed": already_installed, - **cost_result, + "simulation_config": simulation_config, + **cost_result } ) self.recommendations = recommendations - def _find_insulation( - self, u_value, insulation_materials, non_insulation_materials, phase - ): + def get_internal_external_wall_description(self, description_map, new_u_value): + if self.property.walls["is_solid_brick"]: + return description_map["solid_brick"] + + if self.property.walls["is_cob"]: + return description_map["cob"] + + if self.property.walls["is_system_built"]: + return description_map["system_built"] + + if self.property.walls["is_granite_or_whinstone"]: + return description_map["granite_or_whinstone"] + + if self.property.walls["is_sandstone_or_limestone"]: + return description_map["sandstone_or_limestone"] + + if self.property.walls["is_timber_frame"]: + return description_map["timber_frame"] + + if "Average thermal transmittance" in self.property.walls["clean_description"]: + if new_u_value is None: + raise ValueError("New u value is None") + return f'Average thermal transmittance {new_u_value} W/m-¦K' + + raise NotImplementedError("Not implemented yet") + + def _find_insulation(self, u_value, insulation_materials, non_insulation_materials, phase): lowest_selected_u_value = None recommendations = [] @@ -342,6 +443,10 @@ class WallRecommendations(Definitions): if already_installed: cost_result = override_costs(cost_result) + new_description = self.get_internal_external_wall_description( + self.INTERNALLY_INSULATED_WALL_DESCRIPTIONS, new_u_value + ) + elif material["type"] == "external_wall_insulation": cost_result = self.costs.external_wall_insulation( wall_area=self.property.insulation_wall_area, @@ -354,9 +459,31 @@ class WallRecommendations(Definitions): ) if already_installed: cost_result = override_costs(cost_result) + + new_description = self.get_internal_external_wall_description( + self.EXTERNALLY_INSULATED_WALL_DESCRIPTIONS, new_u_value + ) else: raise ValueError("Invalid material type") + wall_ending_config = WallAttributes(new_description).process() + + simulation_config = {} + if self.property.data["walls-energy-eff"] not in ["Good", "Very Good"]: + simulation_config = { + "walls_energy_eff_ending": "Good" + } + + walls_simulation_config = check_simulation_difference( + new_config=wall_ending_config, old_config=self.property.walls, prefix="walls_" + ) + + simulation_config = { + **walls_simulation_config, + **simulation_config, + "walls_thermal_transmittance_ending": new_u_value + } + recommendations.append( { "phase": phase, @@ -374,7 +501,8 @@ class WallRecommendations(Definitions): "new_u_value": new_u_value, "already_installed": already_installed, "sap_points": None, - **cost_result, + "simulation_config": simulation_config, + **cost_result } ) diff --git a/recommendations/recommendation_utils.py b/recommendations/recommendation_utils.py index a3043c31..996f5c9c 100644 --- a/recommendations/recommendation_utils.py +++ b/recommendations/recommendation_utils.py @@ -756,15 +756,18 @@ def calculate_cavity_age(newest_epc, older_epcs, cleaned): return cavity_age -def check_simulation_difference(old_config, new_config): +def check_simulation_difference(old_config, new_config, prefix=""): """ Given two dictionaries, that describe the heating control configurations, this method will compare the two and pick out the differences. These differences will be things that have been added and things that have been removed. This will be used to determine how we should be updating the configuration in the simulation :return: """ - - differences = {key + "_ending": new_config[key] for key in new_config if old_config[key] != new_config[key]} + differences = {} + for key in new_config: + if old_config[key] != new_config[key]: + new_key = prefix + key + "_ending" if key in ["is_assumed", "thermal_transmittance"] else key + "_ending" + differences[new_key] = new_config[key] return differences diff --git a/utils/s3.py b/utils/s3.py index fd5992ce..05482271 100644 --- a/utils/s3.py +++ b/utils/s3.py @@ -198,13 +198,14 @@ def read_pickle_from_s3(bucket_name, s3_file_name): return data -def read_excel_from_s3(bucket_name, file_key, header_row): +def read_excel_from_s3(bucket_name, file_key, header_row, drop_all_na=True): """ Read an Excel file from an S3 bucket and return it as a pandas DataFrame. :param bucket_name: Name of the S3 bucket. :param file_key: Key of the file (including directory path within the bucket). :param header_row: The row number to use as the header (0-indexed). + :param drop_all_na: Whether to drop columns where all values are NaN. :return: A pandas DataFrame containing the data from the Excel file. """ @@ -219,7 +220,8 @@ def read_excel_from_s3(bucket_name, file_key, header_row): df = pd.read_excel(excel_buffer, header=header_row) # Drop columns where all values are NaN - df.dropna(axis=1, how='all', inplace=True) + if drop_all_na: + df.dropna(axis=1, how='all', inplace=True) # Reset index if the first column is just an index or entirely NaN df.reset_index(drop=True, inplace=True)