diff --git a/.dockerignore b/.dockerignore index 083ae2c7..246f8354 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,6 +1,9 @@ +# Ignore all test directories model_data/local_data/* backend/tests/* backend/node_modules/* +backend/.idea/* +backend/.env recommendations/tests/* model_data/tests/* infrastructure/* @@ -12,3 +15,8 @@ land_registry/* pytest.ini */README.md utils/tests/* +etl/epc/tests/* +etl/epc_clean/tests/* +etl/spatial/tests/* + + diff --git a/.idea/Model.iml b/.idea/Model.iml index b0f9c00d..850c0cda 100644 --- a/.idea/Model.iml +++ b/.idea/Model.iml @@ -7,7 +7,7 @@ - + diff --git a/.idea/misc.xml b/.idea/misc.xml index 78660f34..e4070118 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -3,7 +3,7 @@ - + diff --git a/backend/apis/GoogleSolarApi.py b/backend/apis/GoogleSolarApi.py index ed7b7422..75f28ceb 100644 --- a/backend/apis/GoogleSolarApi.py +++ b/backend/apis/GoogleSolarApi.py @@ -589,14 +589,61 @@ class GoogleSolarApi: self.double_property = True @staticmethod + def calculate_percentage_decrease(start_efficiency, end_efficiency, consumption_averages): + """ + Calculate the percentage decrease in consumption between two energy efficiency ratings. + :param start_efficiency: The starting energy efficiency rating. + :param end_efficiency: The ending energy efficiency rating. + :param consumption_averages: The DataFrame containing the consumption averages. + :return: + """ + + start_consumption = consumption_averages.loc[ + consumption_averages["current-energy-efficiency"].astype(str) == str(start_efficiency), "total_consumption" + ].values[0] + + end_consumption = consumption_averages.loc[ + consumption_averages["current-energy-efficiency"].astype(str) == str(end_efficiency), "total_consumption" + ].values[0] + + percentage_decrease = ((start_consumption - end_consumption) / start_consumption) * 100 + # percentage_decrease cannot be nehative + if percentage_decrease < 0: + percentage_decrease = 0 + return percentage_decrease + + @classmethod + def estimate_new_consumption( + cls, current_energy_efficiency, target_efficiency, current_consumption, ofgem_consumption_averages + ): + """ + Given then consumption_averages dataset, which is produced as a result of the training_data.py script, + for the energy kwh models, this function will estimate the new consumption based on the current consumption, + based on the expected reduction in consumption from the current rating to the target rating. + :param current_energy_efficiency: The current energy efficiency rating + :param target_efficiency: The target energy efficiency rating + :param current_consumption: The current consumption of the property + :param ofgem_consumption_averages: DataFrame of the Ofgem consumption averages + :return: + """ + percentage_decrease = cls.calculate_percentage_decrease( + start_efficiency=current_energy_efficiency, + end_efficiency=target_efficiency, + consumption_averages=ofgem_consumption_averages + ) + new_consumption = current_consumption * (1 - percentage_decrease / 100) + return new_consumption + + @classmethod def prepare_input_data( + cls, input_properties: List[Property], - energy_consumption_client: EnergyConsumptionModel, + ofgem_consumption_averages: pd.DataFrame, body: PlanTriggerRequest ): """ :param input_properties: List of properties - :param energy_consumption_client: EnergyConsumptionModel instance + :param ofgem_consumption_averages: DataFrame of the Ofgem consumption averages :param body: PlanTriggerRequest instance This sets up the data required to make the solar api request :return: @@ -610,12 +657,13 @@ class GoogleSolarApi: # Energy consumption is adjusted for the property's expected post retrofit state # We set the target rating to EPC C, which is the typical EPC rating we would expect the # property to achieve post retrofit of just the fabric - "energy_consumption": energy_consumption_client.estimate_new_consumption( + "energy_consumption": cls.estimate_new_consumption( current_energy_efficiency=p.data["current-energy-efficiency"], target_efficiency="69", current_consumption=p.estimate_electrical_consumption( assumed_ashp_efficiency=assumptions.AVERAGE_ASHP_EFFICIENCY, exclusions=body.exclusions - ) + ), + ofgem_consumption_averages=ofgem_consumption_averages ), "property_id": p.id, "uprn": p.uprn @@ -628,12 +676,13 @@ class GoogleSolarApi: # Energy consumption is adjusted for the property's expected post retrofit state # We set the target rating to EPC C, which is the typical EPC rating we would expect the # property to achieve post retrofit of just the fabric - "energy_consumption": energy_consumption_client.estimate_new_consumption( + "energy_consumption": cls.estimate_new_consumption( current_energy_efficiency=p.data["current-energy-efficiency"], target_efficiency="69", current_consumption=p.estimate_electrical_consumption( assumed_ashp_efficiency=assumptions.AVERAGE_ASHP_EFFICIENCY, exclusions=body.exclusions ), + ofgem_consumption_averages=ofgem_consumption_averages ), "property_id": p.id, "uprn": p.uprn diff --git a/backend/app/config.py b/backend/app/config.py index 9aaa0a52..21e8f21c 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -1,5 +1,5 @@ from functools import lru_cache -from pydantic import BaseSettings +from pydantic_settings import BaseSettings class Settings(BaseSettings): diff --git a/backend/app/db/functions/recommendations_functions.py b/backend/app/db/functions/recommendations_functions.py index feeced10..d6e41c61 100644 --- a/backend/app/db/functions/recommendations_functions.py +++ b/backend/app/db/functions/recommendations_functions.py @@ -110,19 +110,19 @@ def upload_recommendations(session: Session, recommendations_to_upload, property "type": rec["type"], "measure_type": rec["measure_type"], "description": rec["description"], - "estimated_cost": rec["total"], + "estimated_cost": float(rec["total"]), "default": rec["default"], - "starting_u_value": rec.get("starting_u_value"), - "new_u_value": rec.get("new_u_value"), - "sap_points": rec["sap_points"], - "energy_savings": rec["heat_demand"], - "kwh_savings": rec["kwh_savings"], - "co2_equivalent_savings": rec["co2_equivalent_savings"], - "total_work_hours": rec["labour_hours"], - "energy_cost_savings": rec["energy_cost_savings"], - "labour_days": rec["labour_days"], + "starting_u_value": float(rec.get("starting_u_value")) if rec.get("starting_u_value") else None, + "new_u_value": float(rec.get("new_u_value")) if rec.get("new_u_value") else None, + "sap_points": float(rec["sap_points"]), + "energy_savings": float(rec["heat_demand"]), + "kwh_savings": float(rec["kwh_savings"]), + "co2_equivalent_savings": float(rec["co2_equivalent_savings"]), + "total_work_hours": float(rec["labour_hours"]), + "energy_cost_savings": float(rec["energy_cost_savings"]), + "labour_days": float(rec["labour_days"]), "already_installed": rec["already_installed"], - "heat_demand": rec["heat_demand"] + "heat_demand": float(rec["heat_demand"]) } for rec in recommendations_to_upload ] diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 925fb05b..26c84c81 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -24,7 +24,7 @@ from backend.app.db.functions.recommendations_functions import ( from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn from backend.app.db.models.portfolio import rating_lookup from backend.app.dependencies import validate_token -from backend.app.plan.schemas import PlanTriggerRequest, MdsRequest +from backend.app.plan.schemas import PlanTriggerRequest from backend.app.plan.utils import get_cleaned from backend.app.utils import epc_to_sap_lower_bound, sap_to_epc @@ -36,15 +36,12 @@ from recommendations.optimiser.CostOptimiser import CostOptimiser from recommendations.optimiser.GainOptimiser import GainOptimiser from recommendations.optimiser.optimiser_functions import prepare_input_measures from recommendations.Recommendations import Recommendations -from recommendations.Mds import Mds from utils.logger import setup_logger from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3 from backend.ml_models.Valuation import PropertyValuation -from etl.bill_savings.EnergyConsumptionModel import EnergyConsumptionModel from etl.bill_savings.KwhData import KwhData from etl.spatial.OpenUprnClient import OpenUprnClient -from etl.solar.SolarPhotoSupply import SolarPhotoSupply logger = setup_logger() @@ -488,6 +485,16 @@ async def trigger_plan(body: PlanTriggerRequest): if not input_properties: return Response(status_code=204) + # Set up model api and warm up the lambdas + model_api = ModelApi( + portfolio_id=body.portfolio_id, + timestamp=created_at, + prediction_buckets=get_prediction_buckets() + ) + await model_api.async_warm_up_lambdas( + model_prefies=model_api.KWH_MODEL_PREFIXES + model_api.MODEL_PREFIXES + ) + # The materials data could be cached or local so we don't need to make # consistent requests to the backend for # the same data @@ -495,32 +502,14 @@ async def trigger_plan(body: PlanTriggerRequest): materials = get_materials(session) cleaned = get_cleaned() - dataset_version = "2024-07-08" - energy_consumption_client = EnergyConsumptionModel( - model_paths={ - "heating_kwh": f"model_directory/energy_consumption_model/heating_kwh_{dataset_version}.pkl", - "hot_water_kwh": f"model_directory/energy_consumption_model/hot_water_kwh_{dataset_version}.pkl" - }, - dummy_schema_path=f"model_directory/energy_consumption_model/{dataset_version}_dummy_schema.pkl", - consumption_average_path=f"energy_consumption/{dataset_version}/consumption_averages.parquet", - cleaned=cleaned, - environment=get_settings().ENVIRONMENT - ) - kwh_client = KwhData(bucket=get_settings().DATA_BUCKET, read_consumption_data=True) - model_api = ModelApi( - portfolio_id=body.portfolio_id, - timestamp=created_at, - prediction_buckets=get_prediction_buckets() - ) - epcs_for_scoring = kwh_client.transform(data=kwh_client.prepare_epc(input_properties), cleaned=cleaned) - kwh_preds = model_api.paginated_predictions( + kwh_preds = await model_api.async_paginated_predictions( data=epcs_for_scoring, bucket=get_settings().DATA_BUCKET, - model_prefixes=["heating_kwh_predictions", "hotwater_kwh_predictions"], + model_prefixes=model_api.KWH_MODEL_PREFIXES, extract_ids=False, batch_size=SCORING_BATCH_SIZE ) @@ -536,9 +525,15 @@ async def trigger_plan(body: PlanTriggerRequest): # extensions, since it doesn't seem to do a great job logger.info("Performing solar analysis") + + ofgem_consumption_averages = read_dataframe_from_s3_parquet( + bucket_name=get_settings().DATA_BUCKET, + file_key=f"energy_consumption/2024-07-08/consumption_averages.parquet" + ) + building_solar_config, unit_solar_config = GoogleSolarApi.prepare_input_data( input_properties=input_properties, - energy_consumption_client=energy_consumption_client, + ofgem_consumption_averages=ofgem_consumption_averages, body=body ) @@ -593,7 +588,7 @@ async def trigger_plan(body: PlanTriggerRequest): "carbon_ending"] ) - all_predictions = model_api.paginated_predictions( + all_predictions = await model_api.async_paginated_predictions( data=recommendations_scoring_data, bucket=get_settings().DATA_BUCKET, batch_size=SCORING_BATCH_SIZE @@ -622,10 +617,10 @@ async def trigger_plan(body: PlanTriggerRequest): scoring_epcs = pd.DataFrame(scoring_epcs) scoring_epcs = kwh_client.transform(data=scoring_epcs, cleaned=cleaned) - kwh_simulation_predictions = model_api.paginated_predictions( + kwh_simulation_predictions = await model_api.async_paginated_predictions( data=scoring_epcs, bucket=get_settings().DATA_BUCKET, - model_prefixes=["heating_kwh_predictions", "hotwater_kwh_predictions"], + model_prefixes=model_api.KWH_MODEL_PREFIXES, batch_size=SCORING_BATCH_SIZE ) @@ -776,6 +771,7 @@ async def trigger_plan(body: PlanTriggerRequest): update_or_create_property_spatial_details(session, p.uprn, p.spatial) property_data = p.get_full_property_data(current_valuation=valuations["current_value"]) + update_property_data( session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data ) @@ -868,587 +864,3 @@ async def trigger_plan(body: PlanTriggerRequest): session.close() return Response(status_code=200) - - -@router.post("/mds") -async def build_mds(body: MdsRequest): - # TODO: This is a placeholder location for the MDS endpoint, which this is being assembled - - logger.info("Connecting to db") - session = sessionmaker(bind=db_engine)() - created_at = datetime.now().isoformat() - - try: - session.begin() - logger.info("Getting the inputs") - plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path) - measure_set = body.measures - optimise_measures = measure_set is not None - - cleaning_data = read_dataframe_from_s3_parquet( - bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet", - ) - - input_properties = [] - for property_id, config in tqdm(enumerate(plan_input), total=len(plan_input)): - # We validate each record in the file. If the record is NOT valid, we need to handle this accordingly - uprn = config.get("uprn", None) - uprn = None if uprn == "" else uprn - if uprn: - uprn = int(float(uprn)) - - epc_searcher = SearchEpc( - address1=config["address"], - postcode=config["postcode"], - uprn=uprn, - auth_token=get_settings().EPC_AUTH_TOKEN, - os_api_key=get_settings().ORDNANCE_SURVEY_API_KEY, - ) - epc_searcher.ordnance_survey_client.built_form = config.get("built_form", None) - epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None) - # For the moment, our OS API access is unavailable, so we skip and interpolate - epc_searcher.find_property(skip_os=True) - - if config["address"] == "35b High Street": - print("Performing temporary patch on 35b High Street") - epc_searcher.newest_epc["uprn"] = 10002911892 - epc_searcher.full_sap_epc["uprn"] = 10002911892 - - if config["address"] == "Cobnut Barn": - print("Performing temporary patch on Cobnut Barn") - epc_searcher.newest_epc["uprn"] = 10013924689 - - # Create a record in db - # TODO: If we productionise the creation of this mds report, we will need to store this in the db - # property_id, is_new = create_property( - # session, body.portfolio_id, epc_searcher.address_clean, epc_searcher.postcode_clean, epc_searcher.uprn - # ) - # if not is_new: - # continue - # - # create_property_targets( - # session, - # property_id=property_id, - # portfolio_id=body.portfolio_id, - # epc_target=body.goal_value, - # heat_demand_target=None - # ) - - epc_records = { - 'original_epc': epc_searcher.newest_epc.copy(), - 'full_sap_epc': epc_searcher.full_sap_epc.copy(), - 'old_data': epc_searcher.older_epcs.copy(), - } - - # patch = next(( - # x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) - # ), {}) - # epc_records = patch_epc(patch, epc_records) - - prepared_epc = EPCRecord( - epc_records=epc_records, - run_mode="newdata", - cleaning_data=cleaning_data - ) - - # property_already_installed = next(( - # x for x in already_installed if - # (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) - # ), {}) - # - # property_non_invasive_recommendations = next(( - # x for x in non_invasive_recommendations if - # (x["address"] == config["address"]) and (x["postcode"] == config["postcode"]) - # ), {}) - - if measure_set is None: - measures = config["measures"] if "measures" in config else None - else: - measures = measure_set - - input_properties.append( - Property( - id=property_id, - address=epc_searcher.address_clean, - postcode=epc_searcher.postcode_clean, - epc_record=prepared_epc, - # already_installed=property_already_installed, - # non_invasive_recommendations=property_non_invasive_recommendations, - measures=measures, - is_new=is_new, - **Property.extract_kwargs(config) - ) - ) - - logger.info("Reading in materials and cleaned datasets") - materials = get_materials(session) - cleaned = get_cleaned() - - uprn_filenames = read_dataframe_from_s3_parquet( - bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet" - ) - photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket=get_settings().DATA_BUCKET) - - logger.info("Getting spatial data") - for p in tqdm(input_properties): - p.get_spatial_data(uprn_filenames) - - logger.info("Getting components and epc recommendations") - recommendations_scoring_data = [] - representative_recommendations = {} - recommendations = {} - - for p in tqdm(input_properties): - p.set_features(cleaned, photo_supply_lookup, floor_area_decile_thresholds) - - mds = Mds(property_instance=p, materials=materials, optimise_measures=optimise_measures) - mds_recommendations, property_representative_recommendations, errors = mds.build() - - if isinstance(errors, list): - if errors: - raise Exception("Errors occurred during MDS build") - else: - if any([len(x) for x in errors.values()]): - raise Exception("Errors occurred during MDS build") - - recommendations[p.id] = mds_recommendations - representative_recommendations[p.id] = property_representative_recommendations - - # Build the scoring data - p.create_base_difference_epc_record(cleaned_lookup=cleaned) - if optimise_measures: - for _id, mds_recs in mds_recommendations.items(): - representative_ids = [r["recommendation_id"] for r in property_representative_recommendations[_id]] - simulation_mds_recs = [] - for recs in mds_recs: - simulation_mds_recs.append( - [r for r in recs if r["recommendation_id"] in representative_ids] - ) - - p.adjust_difference_record_with_recommendations( - simulation_mds_recs, property_representative_recommendations[_id] - ) - - data = p.recommendations_scoring_data.copy() - for d in data: - d["id"] = d["id"] + "*" + _id - - recommendations_scoring_data.extend(data) - - else: - recommendations_scoring_data.append( - p.simulate_all_representative_recommendations(property_representative_recommendations) - ) - - logger.info("Preparing data for scoring in sap change api") - recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data) - - recommendations_scoring_data = recommendations_scoring_data.drop( - columns=["rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending", - "carbon_ending"] - ) - - model_api = ModelApi( - portfolio_id=body.portfolio_id, timestamp=created_at, prediction_buckets=get_prediction_buckets() - ) - - all_predictions = { - "sap_change_predictions": pd.DataFrame(), - "heat_demand_predictions": pd.DataFrame(), - "carbon_change_predictions": pd.DataFrame() - } - to_loop_over = range(0, recommendations_scoring_data.shape[0], SCORING_BATCH_SIZE) - for chunk in tqdm(to_loop_over, total=len(to_loop_over)): - predictions_dict = model_api.predict_all( - df=recommendations_scoring_data.iloc[chunk:chunk + SCORING_BATCH_SIZE], - ) - - # Append the predictions to the predictions dictionary - for key, scored in predictions_dict.items(): - all_predictions[key] = pd.concat([all_predictions[key], scored]) - - # TODO: 1) walls_insulation_thickness_ending is not being set in the recommendations_scoring_data, - # insulation_thickness_ending is being set instead - # 2) - - # TODO: TEMP - for p in plan_input: - if p["uprn"]: - p["uprn"] = str(int(float(p["uprn"]))) - - import re - from backend.ml_models.AnnualBillSavings import AnnualBillSavings - - if optimise_measures: - results = [] - for p in input_properties: - - sap_before = int(p.data["current-energy-efficiency"]) - epc_before = p.data["current-energy-rating"] - heat_demand_before = p.data["energy-consumption-current"] - carbon_before = p.data["co2-emissions-current"] - current_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered( - epc_energy_consumption=heat_demand_before * p.floor_area, - current_epc_rating=epc_before, - ) - current_energy_bill = AnnualBillSavings.calculate_annual_bill(current_adjusted_energy) - - package_comparison = [] - for _id in recommendations[p.id].keys(): - - sap_prediction = all_predictions["sap_change_predictions"][ - (all_predictions["sap_change_predictions"]["property_id"] == str(p.id)) & - (all_predictions["sap_change_predictions"]["recommendation_id"].str.contains(re.escape(_id))) - ].copy().reset_index(drop=True) - sap_prediction["row_id"] = sap_prediction.index - - heat_demand_prediction = all_predictions["heat_demand_predictions"][ - (all_predictions["heat_demand_predictions"]["property_id"] == str(p.id)) & - (all_predictions["heat_demand_predictions"]["recommendation_id"].str.contains(re.escape(_id))) - ].copy().reset_index(drop=True) - heat_demand_prediction["row_id"] = heat_demand_prediction.index - - carbon_prediction = all_predictions["carbon_change_predictions"][ - (all_predictions["carbon_change_predictions"]["property_id"] == str(p.id)) & - (all_predictions["carbon_change_predictions"]["recommendation_id"].str.contains(re.escape(_id))) - ].copy().reset_index(drop=True) - carbon_prediction["row_id"] = carbon_prediction.index - - epc_target = body.goal_value - if epc_before == epc_target: - continue - - sap_target = epc_to_sap_lower_bound(epc_target) - # Define the measures - sap_threshold_barrier = sap_prediction[sap_prediction["predictions"] >= sap_target] - meets_threshold = True - if sap_threshold_barrier.empty: - sap_threshold_barrier = sap_prediction.tail(1) - meets_threshold = False - sap_threshold_barrier = sap_threshold_barrier.head(1) - - sap_prediction = sap_prediction[ - sap_prediction["row_id"] <= sap_threshold_barrier["row_id"].values[0] - ] - heat_demand_prediction = heat_demand_prediction[ - heat_demand_prediction["row_id"] <= sap_threshold_barrier["row_id"].values[0] - ] - carbon_prediction = carbon_prediction[ - carbon_prediction["row_id"] <= sap_threshold_barrier["row_id"].values[0] - ] - - reverse_map = {v: k for k, v in Mds.format_map.items()} - - selected_measures = [ - reverse_map[x.split("-")[0]] for x in sap_prediction["recommendation_id"].values - ] - selected_measure_ids = [x.split("*")[0] for x in sap_prediction["recommendation_id"].values] - - costs = [ - r["total"] for r in representative_recommendations[p.id][_id] if - r["recommendation_id"] in selected_measure_ids - ] - costs = sum(costs) - - sap_after = sap_prediction["predictions"].values[-1] - epc_after = sap_to_epc(sap_after) - heat_demand_after = heat_demand_prediction["predictions"].values[-1] - carbon_after = carbon_prediction["predictions"].values[-1] - - expected_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered( - epc_energy_consumption=heat_demand_after * p.floor_area, - current_epc_rating=epc_before, - ) - - expected_energy_bill = AnnualBillSavings.calculate_annual_bill(expected_adjusted_energy) - - bill_savings = current_energy_bill - expected_energy_bill - energy_savings = current_adjusted_energy - expected_adjusted_energy - - package_comparison.append( - { - "id": _id, - "cost": costs, - "measures": selected_measures, - "sap_before": sap_before, - "sap_after": sap_after, - "epc_before": epc_before, - "epc_after": epc_after, - "heat_demand_before": heat_demand_before, - "heat_demand_after": heat_demand_after, - "carbon_before": carbon_before, - "carbon_after": carbon_after, - "bill_savings": bill_savings, - "energy_savings": energy_savings, - "current_energy_bill": current_energy_bill, - "meets_threshold": meets_threshold - } - ) - - package_comparison = pd.DataFrame(package_comparison) - # Find the smallest cost package - if not package_comparison.empty: - - # We check if any of the packages meet the threshold - # If none of them do, take the one that gets closest to the target - if package_comparison["meets_threshold"].any(): - package_comparison = package_comparison[package_comparison["meets_threshold"]] - package_comparison = package_comparison.sort_values("cost") - else: - package_comparison = package_comparison.sort_values("sap_after", ascending=False) - - package_comparison = package_comparison.head(1).to_dict("records")[0] - else: - package_comparison = { - "measures": [], - "sap_before": sap_before, - "sap_after": sap_before, - "epc_before": epc_before, - "epc_after": epc_before, - "heat_demand_before": heat_demand_before, - "heat_demand_after": heat_demand_before, - "carbon_before": carbon_before, - "carbon_after": carbon_before, - "bill_savings": 0, - "energy_savings": 0, - "current_energy_bill": current_energy_bill, - "meets_threshold": False - } - - config = [c for c in plan_input if c["uprn"] == str(p.uprn)] - if not config: - config = {"address": None, "postcode": None} - else: - config = config[0] - - results.append({ - "config_address": config["address"], - "config_postcode": config["postcode"], - "uprn": p.uprn, - "address": p.address, - "postcode": p.postcode, - "measures": package_comparison["measures"], - "year_of_epc": p.data['lodgement-date'], - "sap_before": package_comparison["sap_before"], - "sap_after": package_comparison["sap_after"], - "epc_before": package_comparison["epc_before"], - "epc_after": package_comparison["epc_after"], - "heat_demand_before": package_comparison["heat_demand_before"], - "heat_demand_after": package_comparison["heat_demand_after"], - "carbon_before": package_comparison["carbon_before"], - "carbon_after": package_comparison["carbon_after"], - "bill_savings": round(package_comparison["bill_savings"], 2), - "energy_savings": round(package_comparison["energy_savings"], 2), - "current_energy_bill": round(package_comparison["current_energy_bill"], 2), - "EWI": "EWI" if "external_wall_insulation" in package_comparison["measures"] else None, - "CWI": "CWI" if "cavity_wall_insulation" in package_comparison["measures"] else None, - "LI": "LI" if "loft_insulation" in package_comparison["measures"] else None, - "ASHP Htg": "ASHP Htg" if "air_source_heat_pump" in package_comparison["measures"] else None, - "Elec Storage": ( - "Elec Storage Htrs (Out of scope -Prov sum only)" if "high_heat_retention_storage_heaters" in - package_comparison["measures"] else None - ), - "Solar PV": "Solar PV" if "solar_pv" in package_comparison["measures"] else None, - }) - - results = pd.DataFrame(results) - - # For the different measures, we check the impact with a few debugging functions - - walls_check, hhr_check = check_mds(results, input_properties, recommendations, optimise_measures) - - results.to_excel("optimised mds_results 5th June.xlsx") - - results = [] - for p in input_properties: - measures = p.measures - property_recommendations = [r['type'] for r in representative_recommendations[p.id]] - - # TODO: Check high heat retention storage heaters - looks like it's excluded controls! - - sap_prediction = all_predictions["sap_change_predictions"][ - all_predictions["sap_change_predictions"]["property_id"] == str(p.id) - ] - - heat_demand_prediction = all_predictions["heat_demand_predictions"][ - all_predictions["heat_demand_predictions"]["property_id"] == str(p.id) - ] - - carbon_prediction = all_predictions["carbon_change_predictions"][ - all_predictions["carbon_change_predictions"]["property_id"] == str(p.id) - ] - - # Get a before and after for SAP, heat demand, CO2 and also calculate energy bill and energy savings - sap_before = int(p.data["current-energy-efficiency"]) - sap_after = sap_prediction["predictions"].values[0] if measures else sap_before - - epc_before = p.data["current-energy-rating"] - epc_after = sap_to_epc(sap_after) if measures else epc_before - - heat_demand_before = p.data["energy-consumption-current"] - heat_demand_after = heat_demand_prediction["predictions"].values[0] if measures else heat_demand_before - - carbon_before = p.data["co2-emissions-current"] - carbon_after = carbon_prediction["predictions"].values[0] if measures else carbon_before - - # Estimate bill savings - - from backend.ml_models.AnnualBillSavings import AnnualBillSavings - current_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered( - epc_energy_consumption=heat_demand_before * p.floor_area, - current_epc_rating=epc_before, - ) - - # TODO: This isn't quite right as this is based on EVERY possible measure, not just the ones that are - # actually implemented - expected_adjusted_energy = AnnualBillSavings.adjust_energy_to_metered( - epc_energy_consumption=heat_demand_after * p.floor_area, - current_epc_rating=epc_before, - ) - - # TODO: We should determine if the home is gas & electricity or just electricity - - # Determine if the heating and hotwater was previously electric only or both - - current_energy_bill = AnnualBillSavings.calculate_annual_bill( - kwh=current_adjusted_energy, - ) - expected_energy_bill = AnnualBillSavings.calculate_annual_bill( - kwh=expected_adjusted_energy, - ) - - bill_savings = current_energy_bill - expected_energy_bill - energy_savings = current_adjusted_energy - expected_adjusted_energy - - config = [c for c in plan_input if c["uprn"] == str(p.uprn)] - if not config: - config = {"address": None, "postcode": None} - else: - config = config[0] - - to_append = { - "config_address": config["address"], - "config_postcode": config["postcode"], - "uprn": p.uprn, - "address": p.address, - "postcode": p.postcode, - "measures": measures, - "property_recommendations": property_recommendations, - "year_of_epc": p.data['lodgement-date'], - "sap_before": sap_before, - "sap_after": sap_after, - "epc_before": epc_before, - "epc_after": epc_after, - "heat_demand_before": heat_demand_before, - "heat_demand_after": heat_demand_after, - "carbon_before": carbon_before, - "carbon_after": carbon_after, - "bill_savings": round(bill_savings, 2), - "energy_savings": round(energy_savings, 2), - "current_energy_bill": round(current_energy_bill, 2), - "fuel_type": p.main_fuel["fuel_type"], - } - results.append(to_append) - - results = pd.DataFrame(results) - results["sap_uplift"] = results["sap_after"] - results["sap_before"] - - # results.to_excel("mds_results 5th June.xlsx") - - walls_check, hhr_check = check_mds(results, input_properties, recommendations, optimise_measures) - - except IntegrityError: - logger.error("Database integrity error occurred", exc_info=True) - session.rollback() - return Response(status_code=500, content="Database integrity error.") - except OperationalError: - logger.error("Database operational error occurred", exc_info=True) - session.rollback() - return Response(status_code=500, content="Database operational error.") - except ValueError: - logger.error("Value error - possibly due to malformed data", exc_info=True) - session.rollback() - return Response(status_code=400, content="Bad request: malformed data.") - except Exception as e: # General exception handling - logger.error(f"An error occurred: {e}") - session.rollback() - return Response(status_code=500, content="An unexpected error occurred.") - finally: - session.close() - - -def check_mds(results, input_properties, recommendations, optimise_measures): - import ast - walls_check = [] - hhr_check = [] - for p in input_properties: - res = results[results["uprn"] == p.uprn] - wall = p.walls - heating = p.main_heating - heating_controls = p.main_heating_controls - - if optimise_measures: - measures = res["measures"].values[0] - else: - measures = [list(z.keys())[0] for z in res["measures"].values[0]] - - wall_recommendation = [ - x for x in measures if - x in ["internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation"] - ] - - hhr_recommendation = [ - x for x in measures if - x in ["high_heat_retention_storage_heaters"] - ] - - if optimise_measures: - possible_measures = [ast.literal_eval(x) for x in list(recommendations[p.id].keys())] - # Unlist them - possible_measures = [x for sublist in possible_measures for x in sublist] - possible_measures = list(set(possible_measures)) - else: - possible_measures = p.measures - - if wall_recommendation: - if len(wall_recommendation) > 1: - raise Exception("something went wrong") - wall_recommendation = wall_recommendation[0] - else: - wall_recommendation = None - - hhr_recommendation = hhr_recommendation[0] if hhr_recommendation else None - - walls_check.append( - { - "uprn": p.uprn, - "address": p.address, - "postcode": p.postcode, - "property_type": p.data['property-type'], - "conservation_status": p.spatial["conservation_status"], - "is_listed_building": p.spatial["is_listed_building"], - "is_heritage_building": p.spatial["is_heritage_building"], - "wall": wall["clean_description"], - "recommendation": wall_recommendation, - "possible_measures": possible_measures, - "selected_measures": res["measures"].values[0], - } - ) - - hhr_check.append( - { - "uprn": p.uprn, - "address": p.address, - "postcode": p.postcode, - "heating": heating["clean_description"], - "heating_controls": heating_controls["clean_description"], - "recommendation": hhr_recommendation, - "possible_measures": possible_measures, - "selected_measures": res["measures"].values[0], - } - ) - - walls_check = pd.DataFrame(walls_check) - hhr_check = pd.DataFrame(hhr_check) - - return walls_check, hhr_check diff --git a/backend/app/plan/schemas.py b/backend/app/plan/schemas.py index e0c5f35d..f84912fe 100644 --- a/backend/app/plan/schemas.py +++ b/backend/app/plan/schemas.py @@ -1,60 +1,25 @@ -from pydantic import BaseModel, conlist, validator -from typing import Optional +from pydantic import BaseModel, Field, BeforeValidator +from typing import Annotated, List, Optional +# Example constants for validation TYPICAL_MEASURE_TYPES = [ - "wall_insulation", - "roof_insulation", - "ventilation", - "floor_insulation", - "windows", - "fireplace", - "heating", - "hot_water", - "low_energy_lighting", - "secondary_heating", - "solar_pv" + "wall_insulation", "roof_insulation", "ventilation", "floor_insulation", + "windows", "fireplace", "heating", "hot_water", "low_energy_lighting", + "secondary_heating", "solar_pv" ] SPECIFIC_MEASURES = [ - # Specific measures - # Walls - "internal_wall_insulation", - "external_wall_insulation", - "cavity_wall_insulation", - # Roof - "loft_insulation", - "flat_roof_insulation", - "room_roof_insulation", - # Floor - "suspended_floor_insulation", - "solid_floor_insulation", - # Heating - "boiler_upgrade", - "high_heat_retention_storage_heater", - "air_source_heat_pump", - "secondary_heating", - # Solar - "solar_pv", - # Windows Glazing - "double_glazing", - "secondary_glazing", - # Mechanical ventilation - "ventilation", - # Other - "low_energy_lighting", - "fireplace", - "hot_water", + "internal_wall_insulation", "external_wall_insulation", "cavity_wall_insulation", + "loft_insulation", "flat_roof_insulation", "room_roof_insulation", + "suspended_floor_insulation", "solid_floor_insulation", + "boiler_upgrade", "high_heat_retention_storage_heater", "air_source_heat_pump", + "secondary_heating", "solar_pv", "double_glazing", "secondary_glazing", + "ventilation", "low_energy_lighting", "fireplace", "hot_water" ] NON_INVASIVE_SPECIFIC_MEASURES = [ - # Specific measures that will typically come from an energy assessment - "trickle_vents", - "draught_proofing", - "mixed_glazing", # This covers partial double glazing and secondary glazing - "cavity_extract_and_refill", - # Indicates that there is one (need to handle the case where there are multiple) - # extension that requires cavity wall insulation - "extension_cavity_wall_insulation", + "trickle_vents", "draught_proofing", "mixed_glazing", "cavity_extract_and_refill", + "extension_cavity_wall_insulation" ] # This allows us to extend high level categories for measures such as "wall_insulation" to the specific measures @@ -70,11 +35,37 @@ MEASURE_MAP = { "heating_controls": ["roomstat_programmer_trvs", "time_temperature_zone_control"] } +VALID_GOALS = ["Increasing EPC"] +VALID_HOUSING_TYPES = ["Social", "Private"] + + +# Define the validation function for inclusions/exclusions +def check_inclusion_or_exclusion(value: str) -> str: + if value not in TYPICAL_MEASURE_TYPES + SPECIFIC_MEASURES + NON_INVASIVE_SPECIFIC_MEASURES: + raise ValueError(f"{value} is not an allowed inclusion") + return value + + +def check_goals(value: str) -> str: + assert value in VALID_GOALS, f"{value} is not a valid goal" + return value + + +def check_housing_type(value: str) -> str: + assert value in VALID_HOUSING_TYPES, f"{value} is not a valid housing type" + return value + + +# Use Annotated with BeforeValidator for each list item validation +InclusionOrExclusionItem = Annotated[str, BeforeValidator(check_inclusion_or_exclusion)] +Goal = Annotated[str, BeforeValidator(check_goals)] +HousingType = Annotated[str, BeforeValidator(check_housing_type)] + class PlanTriggerRequest(BaseModel): budget: Optional[float] = None - goal: str - housing_type: str + goal: Goal + housing_type: HousingType goal_value: str portfolio_id: int trigger_file_path: str @@ -82,53 +73,10 @@ class PlanTriggerRequest(BaseModel): patches_file_path: Optional[str] = None non_invasive_recommendations_file_path: Optional[str] = None valuation_file_path: Optional[str] = None - exclusions: Optional[conlist(str, min_items=1)] = None - inclusions: Optional[conlist(str, min_items=1)] = None + exclusions: Optional[List[InclusionOrExclusionItem]] = Field(default=None, min_length=1) + inclusions: Optional[List[InclusionOrExclusionItem]] = Field(default=None, min_length=1) scenario_name: Optional[str] = "" - # If true, will allow us to create multiple plans for the same portfolio, whereas if this is false, if this property - # exists in the portfolio, it will be ignored multi_plan: Optional[bool] = False - - # if False, allows optimisation to be switched off optimise: Optional[bool] = True - - # If True, uses default u-values for models default_u_values: Optional[bool] = True - - _allowed_goals = {"Increasing EPC"} - - _allowed_housing_types = {"Social", "Private"} - - # Validator to ensure exclusions are within the pre-defined possibilities - @validator('exclusions', each_item=True) - def check_exclusions(cls, v): - if v not in TYPICAL_MEASURE_TYPES + SPECIFIC_MEASURES + NON_INVASIVE_SPECIFIC_MEASURES: - raise ValueError(f"{v} is not an allowed exclusion") - return v - - @validator('inclusions', each_item=True) - def check_inclusions(cls, v): - if v not in TYPICAL_MEASURE_TYPES + SPECIFIC_MEASURES + NON_INVASIVE_SPECIFIC_MEASURES: - raise ValueError(f"{v} is not an allowed inclusion") - return v - - # Validator to ensure that the goal is within the pre-defined possibilities - @validator('goal') - def check_goal(cls, v): - if v not in cls._allowed_goals: - raise ValueError(f"{v} is not a valid goal") - return v - - # Validator to ensure that the housing type is within the pre-defined possibilities - @validator('housing_type') - def check_housing_type(cls, v): - if v not in cls._allowed_housing_types: - raise ValueError(f"{v} is not a valid housing type") - return v - - -class MdsRequest(PlanTriggerRequest): - # When creating the mds report, we allow an optional list of measures to select from. If this is passed, it will - # cause the service to select the optimal package from the list of measures - measures: Optional[conlist(str, min_items=1)] = None diff --git a/backend/docker/Dockerfile b/backend/docker/Dockerfile index fd498cdb..006f088a 100644 --- a/backend/docker/Dockerfile +++ b/backend/docker/Dockerfile @@ -1,5 +1,5 @@ # Pull base image -FROM python:3.10.12-slim-buster as build-image +FROM python:3.11.10-slim-bullseye as build-image # Set environment variables ENV PYTHONDONTWRITEBYTECODE 1 @@ -12,10 +12,10 @@ WORKDIR var/task/Model RUN #apt-get update && apt-get install -y netcat-openbsd # Install python dependencies -COPY ./backend/requirements/base.txt ./backend/requirements/base.txt +COPY ./backend/requirements/requirements.txt ./backend/requirements/requirements.txt RUN pip install --upgrade pip # Install and clean up temp caches -RUN pip install -r backend/requirements/base.txt && rm -rf /root/.cache +RUN pip install -r backend/requirements/requirements.txt && rm -rf /root/.cache # Since we are not using a base AWS image, there is some additional setup required. We need to set up the runtime # interface client @@ -35,16 +35,12 @@ COPY --from=build-image /usr/local/lib/python3.10/site-packages/ /usr/local/lib/ # Copy project files COPY ./backend/ ./backend COPY ./recommendations/ ./recommendations -COPY ./model_data/BaseUtility.py ./model_data/BaseUtility.py -COPY ./model_data/config.py ./model_data/config.py -COPY ./model_data/optimiser/ ./model_data/optimiser/ -COPY ./model_data/__init__.py ./model_data/__init__.py -COPY ./model_data/EpcClean.py ./model_data/EpcClean.py -COPT ./model_data/simulation_system/core/ ./model_data/simulation_system/core/ -COPY ./model_data/utils.py ./model_data/utils.py -COPY ./model_data/epc_attributes/ ./model_data/epc_attributes/ -COPY ./datatypes/ ./datatypes/ COPY ./utils/ ./utils/ +COPY ./etl/epc/ ./etl/epc/ +COPY ./etl/epc_clean/ ./etl/epc_clean/ +COPY ./etl/bill_savings/ ./etl/bill_savings/ +COPY ./etl/spatial/ ./etl/spatial/ +COPY ./datatypes/ ./datatypes/ # Set the ENTRYPOINT to the AWS Lambda RIC and CMD to your function handler ENTRYPOINT [ "/usr/local/bin/python", "-m", "awslambdaric" ] diff --git a/backend/docker/lambda.Dockerfile b/backend/docker/lambda.Dockerfile index 13c30b88..1c079981 100644 --- a/backend/docker/lambda.Dockerfile +++ b/backend/docker/lambda.Dockerfile @@ -1,5 +1,5 @@ # Pull base image -FROM python:3.10.12-slim-buster as build-image +FROM python:3.11.10-slim-bullseye as build-image # Set environment variables ENV PYTHONDONTWRITEBYTECODE 1 @@ -12,10 +12,10 @@ WORKDIR var/task/Model #RUN apt-get update && apt-get install -y netcat-openbsd # Install python dependencies -COPY ./backend/requirements/base.txt ./backend/requirements/base.txt +COPY ./backend/requirements/requirements.txt ./backend/requirements/requirements.txt # Install and clean up temp caches RUN pip install --upgrade pip \ - && pip install -r backend/requirements/base.txt && rm -rf /root/.cache + && pip install -r backend/requirements/requirements.txt && rm -rf /root/.cache # Since we are not using a base AWS image, there is some additional setup required. We need to set up the runtime # interface client @@ -24,28 +24,28 @@ RUN pip install --upgrade pip \ RUN pip install awslambdaric # Second stage: "runtime-image" -FROM python:3.10.12-slim-buster +FROM python:3.11.10-slim-bullseye + +# Create the extensions directory to avoid warnings with RIE +RUN mkdir -p /opt/extensions # Set work directory to the root of your project WORKDIR /var/task/Model # Copy the python dependencies from the build-image -COPY --from=build-image /usr/local/lib/python3.10/site-packages/ /usr/local/lib/python3.10/site-packages/ +COPY --from=build-image /usr/local/lib/python3.11/site-packages/ /usr/local/lib/python3.11/site-packages/ # Copy project files COPY ./backend/ ./backend COPY ./recommendations/ ./recommendations -COPY ./model_data/BaseUtility.py ./model_data/BaseUtility.py -COPY ./model_data/config.py ./model_data/config.py -COPY ./model_data/optimiser/ ./model_data/optimiser/ -COPY ./model_data/__init__.py ./model_data/__init__.py -COPY ./model_data/EpcClean.py ./model_data/EpcClean.py -COPY ./model_data/utils.py ./model_data/utils.py -COPY ./model_data/epc_attributes/ ./model_data/epc_attributes/ -COPY ./model_data/simulation_system/core/DataProcessor.py ./model_data/simulation_system/core/DataProcessor.py -COPY ./model_data/simulation_system/core/Settings.py ./model_data/simulation_system/core/Settings.py -COPY ./datatypes/ ./datatypes/ COPY ./utils/ ./utils/ +COPY ./etl/epc/ ./etl/epc/ +COPY ./etl/epc_clean/ ./etl/epc_clean/ +COPY ./etl/bill_savings/ ./etl/bill_savings/ +COPY ./etl/spatial/ ./etl/spatial/ +COPY ./BaseUtility.py ./BaseUtility.py +COPY ./datatypes/ ./datatypes/ + # Set the ENTRYPOINT to the AWS Lambda RIC and CMD to your function handler ENTRYPOINT [ "/usr/local/bin/python", "-m", "awslambdaric" ] diff --git a/backend/ml_models/Valuation.py b/backend/ml_models/Valuation.py index c6c1582b..92c55641 100644 --- a/backend/ml_models/Valuation.py +++ b/backend/ml_models/Valuation.py @@ -244,8 +244,8 @@ class PropertyValuation: return { "current_value": current_value, - "lower_bound_increased_value": current_value * (1 + min_increase), - "upper_bound_increased_value": current_value * (1 + max_increase), - "average_increased_value": current_value * (1 + avg_increase), - "average_increase": current_value * (1 + avg_increase) - current_value + "lower_bound_increased_value": float(current_value * (1 + min_increase)), + "upper_bound_increased_value": float(current_value * (1 + max_increase)), + "average_increased_value": float(current_value * (1 + avg_increase)), + "average_increase": float(current_value * (1 + avg_increase) - current_value) } diff --git a/backend/ml_models/api.py b/backend/ml_models/api.py index e922d7fc..c2f2dcd9 100644 --- a/backend/ml_models/api.py +++ b/backend/ml_models/api.py @@ -1,3 +1,5 @@ +import aiohttp +import asyncio import pandas as pd from tqdm import tqdm import requests @@ -18,6 +20,8 @@ class ModelApi: # "hot_water_cost_predictions", ] + KWH_MODEL_PREFIXES = ["heating_kwh_predictions", "hotwater_kwh_predictions"] + MODEL_URLS = { "sap_change_predictions": "sapmodel", "heat_demand_predictions": "heatmodel", @@ -120,6 +124,28 @@ class ModelApi: # depending on how you want to handle errors in your application return None + async def predict_async(self, file_location, model_prefix: str): + """Makes an asynchronous POST request to the Model API with the provided parameters.""" + logger.info(f"Making request to {model_prefix} change api") + url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict" + payload = { + "file_location": file_location, + "property_id": "", # This should get removed + "portfolio_id": self.portfolio_id, + "created_at": self.timestamp + } + + async with aiohttp.ClientSession() as session: + try: + async with session.post( + url, json=payload, headers={"Content-Type": "application/json"}, timeout=120 + ) as response: + response.raise_for_status() + return await response.json() + except aiohttp.ClientError as e: + logger.error(f"An error occurred: {e}") + return None + @staticmethod def extract_phase(recommendation_id): if 'phase=' in recommendation_id: @@ -180,6 +206,43 @@ class ModelApi: return predictions + async def predict_all_async(self, df, bucket, model_prefixes=None, extract_ids=True) -> dict: + """Uploads data and makes asynchronous requests to the model APIs for predictions.""" + model_prefixes = self.MODEL_PREFIXES if model_prefixes is None else model_prefixes + + predictions = {} + tasks = [] + async with aiohttp.ClientSession() as session: + for model_prefix in model_prefixes: + logger.info(f"Scoring for model prefix: {model_prefix}") + file_location = self.upload_scoring_data(df, bucket, model_prefix) + # Schedule the prediction request as a coroutine + tasks.append( + self.predict_async(f"s3://{bucket}/" + file_location, model_prefix) + ) + + # Gather all asynchronous tasks (execute them concurrently) + responses = await asyncio.gather(*tasks, return_exceptions=True) + + for model_prefix, response in zip(model_prefixes, responses): + if response: + predictions_bucket = self.prediction_buckets[model_prefix] + predictions_df = pd.DataFrame( + read_dataframe_from_s3_parquet( + bucket_name=predictions_bucket, + file_key=response["storage_filepath"].split(predictions_bucket + "/")[1] + ) + ) + predictions_df['predictions'] = predictions_df["predictions"].astype(float).round(1) + if extract_ids: + predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+', + expand=True) + predictions_df['phase'] = predictions_df['recommendation_id'].apply(self.extract_phase) + + predictions[model_prefix] = predictions_df + + return predictions + def paginated_predictions(self, data, bucket, batch_size, model_prefixes=None, extract_ids=True): all_predictions = self.predictions_template() to_loop_over = range(0, data.shape[0], batch_size) @@ -196,3 +259,59 @@ class ModelApi: all_predictions[key] = pd.concat([all_predictions[key], scored]) return all_predictions + + async def async_warm_up_lambdas(self, model_prefies=None): + """Send asynchronous pre-flight requests to each model endpoint to wake up the cold Lambdas without waiting + for responses.""" + logger.info("Asynchronously warming up Lambda functions...") + + model_prefixes = self.MODEL_PREFIXES if model_prefies is None else model_prefies + + tasks = [] + async with aiohttp.ClientSession() as session: + for model_prefix in model_prefixes: + url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict" + # Create a coroutine for each warm-up request and add it to the tasks list + tasks.append(self._send_warm_up_request(session, url, model_prefix)) + + # Run all tasks concurrently but don't wait for the responses to finish + await asyncio.gather(*tasks, return_exceptions=True) + + @staticmethod + async def _send_warm_up_request(session, url, model_prefix): + """Helper method to send a pre-flight request to a given model URL.""" + try: + async with session.post(url, json={}, timeout=2) as response: + # Log success for monitoring but do not block on the response + logger.info(f"Warmed up {model_prefix} with status code: {response.status}") + except aiohttp.ClientError as e: + logger.warning(f"Failed to warm up {model_prefix}: {e}") + + logger.info("Lambda functions are warmed up and ready to go!") + + async def async_paginated_predictions(self, data, bucket, batch_size, model_prefixes=None, extract_ids=True): + all_predictions = self.predictions_template() + to_loop_over = range(0, data.shape[0], batch_size) + + async def run_batches(): + for chunk in tqdm(to_loop_over, total=len(to_loop_over)): + predictions_dict = await self.predict_all_async( + df=data.iloc[chunk:chunk + batch_size], + bucket=bucket, + model_prefixes=model_prefixes, + extract_ids=extract_ids + ) + + for key, scored in predictions_dict.items(): + all_predictions[key] = pd.concat([all_predictions[key], scored]) + + # Check if there is an existing event loop + try: + # If there is an existing event loop, await the coroutine directly + loop = asyncio.get_running_loop() + await run_batches() + except RuntimeError: # No running event loop + # If no event loop is running, use asyncio.run() + asyncio.run(run_batches()) + + return all_predictions diff --git a/backend/requirements/base.txt b/backend/requirements/base.txt deleted file mode 100644 index c4e7367c..00000000 --- a/backend/requirements/base.txt +++ /dev/null @@ -1,42 +0,0 @@ -msgpack==1.0.5 -anyio==3.7.1 -cffi==1.15.1 -click==8.1.3 -cryptography==37.0.4 -ecdsa==0.18.0 -epc-api-python==1.0.2 -exceptiongroup==1.1.2 -fastapi==0.99.1 -h11==0.14.0 -httptools==0.5.0 -idna==3.4 -mangum==0.17.0 -pyasn1==0.5.0 -pycparser==2.21 -pydantic==1.10.11 -PyJWT==2.7.0 -python-dotenv==1.0.0 -python-jose==3.3.0 -PyYAML==6.0 -rsa==4.9 -six==1.16.0 -sniffio==1.3.0 -starlette==0.27.0 -typing_extensions==4.7.1 -uvicorn==0.22.0 -uvloop==0.17.0 -urllib3<2 -watchfiles==0.19.0 -websockets==11.0.3 -sqlalchemy==2.0.19 -psycopg2-binary -pytz==2023.3 -mip==1.15.0 -boto3==1.28.3 -pandas==1.5.3 -pyarrow==12.0.1 -textblob -usaddress==0.5.10 - -# Requirements we may not need -xgboost==1.7.6 \ No newline at end of file diff --git a/backend/requirements/local.txt b/backend/requirements/local.txt deleted file mode 100644 index 5a1693c4..00000000 --- a/backend/requirements/local.txt +++ /dev/null @@ -1,28 +0,0 @@ -anyio==3.7.1 -cffi==1.15.1 -click==8.1.3 -cryptography==37.0.4 -ecdsa==0.18.0 -exceptiongroup==1.1.2 -fastapi==0.99.1 -h11==0.14.0 -httptools==0.5.0 -idna==3.4 -mangum==0.17.0 -pyasn1==0.5.0 -pycparser==2.21 -pydantic==1.10.11 -PyJWT==2.7.0 -python-dotenv==1.0.0 -python-jose==3.3.0 -PyYAML==6.0 -rsa==4.9 -six==1.16.0 -sniffio==1.3.0 -starlette==0.27.0 -typing_extensions==4.7.1 -uvicorn==0.22.0 -uvloop==0.17.0 -watchfiles==0.19.0 -websockets==11.0.3 -boto3 \ No newline at end of file diff --git a/backend/requirements/requirements.txt b/backend/requirements/requirements.txt new file mode 100644 index 00000000..dd5c34ca --- /dev/null +++ b/backend/requirements/requirements.txt @@ -0,0 +1,31 @@ +# Pandas and numpy +numpy==2.1.2 +pandas==2.2.3 +pytz==2024.2 +six==1.16.0 +# tqdm +tqdm==4.66.5 +# fastapi +fastapi==0.115.2 +sqlalchemy==2.0.36 +pydantic-settings==2.6.0 +psycopg2-binary==2.9.10 +python-jose==3.3.0 +cryptography==43.0.3 +mangum==0.19.0 +# AWS +boto3==1.35.44 +# ML, Data Science +usaddress==0.5.11 +epc-api-python==1.0.2 +fuzzywuzzy==0.18.0 +python-Levenshtein==0.26.0 +textblob==0.18.0.post0 +msgpack==1.1.0 +scikit-learn==1.5.2 +cffi==1.15.1 +mip==1.15.0 +# Data +pyarrow==17.0.0 +fastparquet==2024.5.0 +aiohttp==3.10.10 diff --git a/etl/bill_savings/EnergyConsumptionModel.py b/etl/bill_savings/EnergyConsumptionModel.py index 4daf2b31..1ccfee60 100644 --- a/etl/bill_savings/EnergyConsumptionModel.py +++ b/etl/bill_savings/EnergyConsumptionModel.py @@ -1,6 +1,6 @@ import pandas as pd import numpy as np -from xgboost import XGBRegressor +# from xgboost import XGBRegressor from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_percentage_error from sklearn.feature_selection import RFECV @@ -79,13 +79,13 @@ class EnergyConsumptionModel: if x not in self.CATEGORICAL_COLUMNS }) - if model_paths: - for target, path in model_paths.items(): - # Read model - self.models[target] = read_pickle_from_s3( - bucket_name=f"retrofit-model-directory-{environment}", s3_file_name=path - ) - # Read dummy schema + # if model_paths: + # for target, path in model_paths.items(): + # # Read model + # self.models[target] = read_pickle_from_s3( + # bucket_name=f"retrofit-model-directory-{environment}", s3_file_name=path + # ) + # Read dummy schema if dummy_schema_path: self.dummy_schema = read_pickle_from_s3( @@ -278,33 +278,33 @@ class EnergyConsumptionModel: logger.info(f"Feature selection completed for target {target}") - def init_model(self, feature_selection=False): - - if feature_selection: - # Set up a smaller model to work it - return XGBRegressor( - objective='reg:squarederror', - n_estimators=50, - learning_rate=0.05, - max_depth=6, - subsample=0.8, - colsample_bytree=0.8, - reg_alpha=0.1, - reg_lambda=0.1 - ) - - return XGBRegressor( - objective='reg:squarederror', - n_estimators=1000, - learning_rate=0.05, - max_depth=6, - min_child_weight=3, - subsample=0.8, - colsample_bytree=0.8, - reg_alpha=0.1, - reg_lambda=0.1 - # n_jobs=self.n_jobs - ) + # def init_model(self, feature_selection=False): + # + # if feature_selection: + # # Set up a smaller model to work it + # return XGBRegressor( + # objective='reg:squarederror', + # n_estimators=50, + # learning_rate=0.05, + # max_depth=6, + # subsample=0.8, + # colsample_bytree=0.8, + # reg_alpha=0.1, + # reg_lambda=0.1 + # ) + # + # return XGBRegressor( + # objective='reg:squarederror', + # n_estimators=1000, + # learning_rate=0.05, + # max_depth=6, + # min_child_weight=3, + # subsample=0.8, + # colsample_bytree=0.8, + # reg_alpha=0.1, + # reg_lambda=0.1 + # # n_jobs=self.n_jobs + # ) def fit_model(self, target): """Fits the model to the training data and removes zero-importance features.""" diff --git a/etl/epc/DataProcessor.py b/etl/epc/DataProcessor.py index 4ad854c1..9655cf77 100644 --- a/etl/epc/DataProcessor.py +++ b/etl/epc/DataProcessor.py @@ -263,7 +263,7 @@ class EPCDataProcessor: # Use replace function to map data (if exists in key), to corresponding value - i.e. Remove invalid values data = self.data.replace(data_anomaly_map) - data = data.replace(np.NAN, None) + data = data.replace(np.nan, None) self.data = data @@ -384,7 +384,7 @@ class EPCDataProcessor: has_missings = pd.isnull(self.data[col]).sum() while has_missings: self.data = apply_clean( - data=self.data, matching_columns=matching_columns[0 : to_index + 1] + data=self.data, matching_columns=matching_columns[0: to_index + 1] ) has_missings = pd.isnull(self.data[col]).sum() @@ -487,7 +487,7 @@ class EPCDataProcessor: filled_data = ( self.data.groupby("UPRN", group_keys=True)[columns_to_fill] - .apply(lambda group: group.fillna(method="bfill").fillna(method="ffill")) + .apply(lambda group: group.bfill().ffill().infer_objects(copy=False)) .reset_index() .set_index("level_1") .sort_index() @@ -791,7 +791,7 @@ class EPCDataProcessor: We fill photo supply with zeros where it's missing """ - self.data["PHOTO_SUPPLY"] = self.data["PHOTO_SUPPLY"].fillna(0) + self.data["PHOTO_SUPPLY"] = self.data["PHOTO_SUPPLY"].astype("Int64").fillna(0) @staticmethod def apply_averages_cleaning( @@ -858,12 +858,12 @@ class EPCDataProcessor: # Fill NaN values with averages for col in cols_to_clean: - data_to_clean[col].fillna(data_to_clean[f"{col}_AVERAGE"], inplace=True) - data_to_clean.drop(columns=[f"{col}_AVERAGE"], inplace=True) + data_to_clean[col] = data_to_clean[col].fillna(data_to_clean[f"{col}_AVERAGE"]) + data_to_clean = data_to_clean.drop(columns=[f"{col}_AVERAGE"]) # If we still have missings - data_to_clean[col].fillna(data_to_clean[col].mean(), inplace=True) + data_to_clean[col] = data_to_clean[col].fillna(data_to_clean[col].mean()) # Final step if we still have missings - use global mean - data_to_clean[col].fillna(global_averages[col], inplace=True) + data_to_clean[col] = data_to_clean[col].fillna(global_averages[col]) return data_to_clean diff --git a/etl/epc/settings.py b/etl/epc/settings.py index a814750f..2a9b1746 100644 --- a/etl/epc/settings.py +++ b/etl/epc/settings.py @@ -182,7 +182,6 @@ EFFICIENCY_FEATURES = [ ROOM_FEATURES = ["number_habitable_rooms", "number_heated_rooms"] - COMPONENT_FEATURES = CORE_COMPONENT_FEATURES + [ "TRANSACTION_TYPE", "ENERGY_TARIFF", # Not sure if this is relevant @@ -241,7 +240,11 @@ BUILT_FORM_REMAP = { DATA_PROCESSOR_SETTINGS = { "low_memory": False, "epc_minimum_count": 1, - "column_mappings": {"UPRN": [int, str]}, + "column_mappings": { + "UPRN": [int, str], + "NUMBER_HEATED_ROOMS": [float], + "NUMBER_HABITABLE_ROOMS": [float], + }, } # This has a manual mapping of the column types required diff --git a/etl/spatial/OpenUprnClient.py b/etl/spatial/OpenUprnClient.py index 5c43347a..c0cd3992 100644 --- a/etl/spatial/OpenUprnClient.py +++ b/etl/spatial/OpenUprnClient.py @@ -1,7 +1,6 @@ import os from tqdm import tqdm import pandas as pd -import geopandas as gpd from utils.logger import setup_logger from utils.s3 import read_io_from_s3, save_dataframe_to_s3_parquet, read_dataframe_from_s3_parquet from backend.Property import Property @@ -86,17 +85,6 @@ class OpenUprnClient: return filename return None - @staticmethod - def convert_bng_data_to_gpd(df): - - gpd_data = gpd.GeoDataFrame( - df, - geometry=gpd.points_from_xy(df.X_COORDINATE, df.Y_COORDINATE), - crs="EPSG:27700" # British National Grid - ) - - return gpd_data - def save_filenames_to_s3(self, bucket_name): """ Save the filenames to s3 diff --git a/etl/spatial/app.py b/etl/spatial/app.py index d58509dd..e8055432 100644 --- a/etl/spatial/app.py +++ b/etl/spatial/app.py @@ -6,6 +6,7 @@ our database for querying from other services import os from tqdm import tqdm import pandas as pd +import geopandas as gpd from etl.spatial.ConservationAreaClient import ConservationAreaClient from etl.spatial.OpenUprnClient import OpenUprnClient from etl.spatial.SpecialBuildingsClient import SpecialBuildingsClient @@ -25,6 +26,16 @@ HISTORIC_ENGLAND_HERITAGE_BUILDINGS_PATHNAME = \ logger = setup_logger() +def convert_bng_data_to_gpd(df): + gpd_data = gpd.GeoDataFrame( + df, + geometry=gpd.points_from_xy(df.X_COORDINATE, df.Y_COORDINATE), + crs="EPSG:27700" # British National Grid + ) + + return gpd_data + + def app(): """ This application uses the conservation area datasets to determine if a UPRN is @@ -85,7 +96,7 @@ def app(): to_loop_over = open_uprn_client.data.groupby("filename") for filename, uprn_df in tqdm(open_uprn_client.data.groupby("filename"), total=len(to_loop_over)): - uprn_gdf = OpenUprnClient.convert_bng_data_to_gpd(uprn_df) + uprn_gdf = convert_bng_data_to_gpd(uprn_df) uprn_gdf = conservation_area_client.is_in_conservation_area_vectorised(uprn_gdf=uprn_gdf) uprn_gdf = special_buildings_client.is_listed_building_vectorised(uprn_gdf=uprn_gdf) diff --git a/model_data/requirements/requirements.txt b/model_data/requirements/requirements.txt index 1d84fc3d..845166d9 100644 --- a/model_data/requirements/requirements.txt +++ b/model_data/requirements/requirements.txt @@ -1,8 +1,9 @@ -pydantic==1.10.11 +pydantic==2.9.2 +pydantic-settings==2.6.0 epc-api-python==1.0.2 -pandas==2.0.3 -numpy==1.25.1 -pytz==2023.3 +numpy==2.1.2 +pandas==2.2.3 +pytz==2024.2 tzdata==2023.3 tqdm mypy @@ -20,4 +21,6 @@ pyspellchecker textblob boto3 pyarrow -msgpack==1.0.5 +msgpack==1.1.0 + + diff --git a/recommendations/Recommendations.py b/recommendations/Recommendations.py index aa7e041e..dd51b47d 100644 --- a/recommendations/Recommendations.py +++ b/recommendations/Recommendations.py @@ -730,8 +730,8 @@ class Recommendations: "id": STARTING_DUMMY_ID_VALUE, "phase": STARTING_DUMMY_ID_VALUE, "recommendation_id": STARTING_DUMMY_ID_VALUE, - "predictions_heating": property_kwh["heating"], - "predictions_hotwater": property_kwh["hot_water"], + "predictions_heating": float(property_kwh["heating"]), + "predictions_hotwater": float(property_kwh["hot_water"]), } ] ), @@ -854,12 +854,12 @@ class Recommendations: # We return a dictionary that contains the individual costs, that can be stored to the database current_energy_bill = { - "heating_cost_current": starting_figures["heating_cost"], - "hot_water_cost_current": starting_figures["hotwater_cost"], - "lighting_cost_current": property_instance.energy_cost_estimates["unadjusted"]["lighting"], - "appliances_cost_current": property_instance.energy_cost_estimates["unadjusted"]["appliances"], - "gas_standing_charge": gas_standing_charge, - "electricity_standing_charge": electricity_standing_charge, + "heating_cost_current": float(starting_figures["heating_cost"]), + "hot_water_cost_current": float(starting_figures["hotwater_cost"]), + "lighting_cost_current": float(property_instance.energy_cost_estimates["unadjusted"]["lighting"]), + "appliances_cost_current": float(property_instance.energy_cost_estimates["unadjusted"]["appliances"]), + "gas_standing_charge": float(gas_standing_charge), + "electricity_standing_charge": float(electricity_standing_charge), } return current_energy_bill diff --git a/recommendations/tests/test_data/heating_recommendations_data.py b/recommendations/tests/test_data/heating_recommendations_data.py index 51bf0378..26263826 100644 --- a/recommendations/tests/test_data/heating_recommendations_data.py +++ b/recommendations/tests/test_data/heating_recommendations_data.py @@ -946,9 +946,10 @@ testing_examples = [ }, "heating_measure_types": [ 'high_heat_retention_storage_heater', + 'air_source_heat_pump', ], "heating_controls_measure_types": [], - "notes": "This is an end-terrace house, without mains gas connection, so all we recommend is HHR" + "notes": "This is an end-terrace house, without mains gas connection, so we recommend is HHR & ASHP" }, { "epc": { diff --git a/recommendations/tests/test_roof_recommendations.py b/recommendations/tests/test_roof_recommendations.py index 6dbc3b72..214ea6c0 100644 --- a/recommendations/tests/test_roof_recommendations.py +++ b/recommendations/tests/test_roof_recommendations.py @@ -57,10 +57,10 @@ class TestRoofRecommendations: assert len(roof_recommender2.recommendations) == 1 - assert roof_recommender2.recommendations[0]["total"] == 1610.0000000000002 - assert roof_recommender2.recommendations[0]["new_u_value"] == 0.14 + assert roof_recommender2.recommendations[0]["total"] == 1653 + assert roof_recommender2.recommendations[0]["new_u_value"] == 0.13 assert roof_recommender2.recommendations[0]["starting_u_value"] == 0.68 - assert roof_recommender2.recommendations[0]["parts"][0]["depth"] == 270 + assert roof_recommender2.recommendations[0]["parts"][0]["depth"] == 300 epc_record = EPCRecord() epc_record.prepared_epc = {"county": "Greater London Authority", "roof-energy-eff": "Very Poor"} @@ -85,7 +85,7 @@ class TestRoofRecommendations: assert roof_recommender3.recommendations assert len(roof_recommender3.recommendations) == 1 - assert roof_recommender3.recommendations[0]["parts"][0]["depth"] == 270 + assert roof_recommender3.recommendations[0]["parts"][0]["depth"] == 300.0 def test_loft_insulation_recommendation_150mm_insulation(self): epc_record = EPCRecord() @@ -107,14 +107,14 @@ class TestRoofRecommendations: assert not roof_recommender4.recommendations - roof_recommender4.recommend(phase=0) + roof_recommender4.recommend(phase=0, default_u_values=True) assert len(roof_recommender4.recommendations) == 1 - assert roof_recommender4.recommendations[0]["total"] == 1552.5 - assert roof_recommender4.recommendations[0]["new_u_value"] == 0.13 + assert roof_recommender4.recommendations[0]["total"] == 1653.0 + assert roof_recommender4.recommendations[0]["new_u_value"] == 0.14 assert roof_recommender4.recommendations[0]["starting_u_value"] == 0.3 - assert roof_recommender4.recommendations[0]["parts"][0]["depth"] == 200 + assert roof_recommender4.recommendations[0]["parts"][0]["depth"] == 300 epc_record = EPCRecord() epc_record.prepared_epc = {"county": "Somerset", "roof-energy-eff": "Good"} @@ -139,7 +139,7 @@ class TestRoofRecommendations: assert roof_recommender5.recommendations assert len(roof_recommender5.recommendations) == 1 - assert roof_recommender5.recommendations[0]["parts"][0]["depth"] == 200 + assert roof_recommender5.recommendations[0]["parts"][0]["depth"] == 300 def test_loft_insulation_recommendation_270mm_insulation(self): # We shouldn't recommend anything in this case diff --git a/recommendations/tests/test_window_recommendations.py b/recommendations/tests/test_window_recommendations.py index ae6c6377..baef3574 100644 --- a/recommendations/tests/test_window_recommendations.py +++ b/recommendations/tests/test_window_recommendations.py @@ -48,13 +48,13 @@ class TestWindowRecommendations: 'starting_u_value': None, 'new_u_value': None, 'sap_points': None, 'already_installed': False, 'total': 7980.0, 'labour_hours': 0.0, 'labour_days': 0.0, 'is_secondary_glazing': False, 'description_simulation': { - 'multi-glaze-proportion': 100, 'windows-energy-eff': 'Average', + 'multi-glaze-proportion': 100, 'windows-energy-eff': 'Good', 'windows-description': 'Fully double glazed', 'glazed-type': 'double glazing installed during or after 2002' }, 'simulation_config': { 'has_glazing_ending': True, 'glazing_type_ending': 'double', - 'multi_glaze_proportion_ending': 100, 'windows_energy_eff_ending': 'Average', + 'multi_glaze_proportion_ending': 100, 'windows_energy_eff_ending': 'Good', 'glazed_type_ending': 'double glazing installed during or after 2002' } } @@ -401,14 +401,14 @@ class TestWindowRecommendations: 'sap_points': None, 'already_installed': False, 'total': 7980.0, 'labour_hours': 0.0, 'labour_days': 0.0, 'is_secondary_glazing': False, 'description_simulation': { - 'multi-glaze-proportion': 100, 'windows-energy-eff': 'Average', + 'multi-glaze-proportion': 100, 'windows-energy-eff': 'Good', 'windows-description': 'Fully double glazed', 'glazed-type': 'double glazing installed during or after 2002' }, 'simulation_config': { 'has_glazing_ending': True, 'glazing_coverage_ending': 'full', 'glazing_type_ending': 'double', 'multi_glaze_proportion_ending': 100, - 'windows_energy_eff_ending': 'Average', + 'windows_energy_eff_ending': 'Good', 'glazed_type_ending': 'double glazing installed during or after 2002' } } @@ -624,7 +624,7 @@ class TestWindowRecommendations: 'total_floor_area_ending': 61.0, 'floor_height_starting': 2.37, 'floor_height_ending': 2.37, 'hot_water_energy_eff_starting': 'Good', 'hot_water_energy_eff_ending': 'Good', 'floor_energy_eff_starting': 'NO_RATING', 'floor_energy_eff_ending': 'NO_RATING', - 'windows_energy_eff_starting': 'Very Poor', 'windows_energy_eff_ending': 'Average', + 'windows_energy_eff_starting': 'Very Poor', 'windows_energy_eff_ending': 'Good', 'walls_energy_eff_starting': 'Very Poor', 'walls_energy_eff_ending': 'Very Poor', 'sheating_energy_eff_starting': 'NO_RATING', 'sheating_energy_eff_ending': 'NO_RATING', 'roof_energy_eff_starting': 'Very Poor', 'roof_energy_eff_ending': 'Very Poor', @@ -666,7 +666,7 @@ class TestWindowRecommendations: {'variable': 'glazed_type_ending', 'starting': 'not defined', 'simulated': 'double glazing installed during or after 2002'}, {'variable': 'multi_glaze_proportion_ending', 'starting': 0.0, 'simulated': 100}, - {'variable': 'windows_energy_eff_ending', 'starting': 'Very Poor', 'simulated': 'Average'}, + {'variable': 'windows_energy_eff_ending', 'starting': 'Very Poor', 'simulated': 'Good'}, ] assert different == expected_different diff --git a/utils/s3.py b/utils/s3.py index ca0cbfac..1a686b55 100644 --- a/utils/s3.py +++ b/utils/s3.py @@ -192,7 +192,7 @@ def read_pickle_from_s3(bucket_name, s3_file_name): try: data = pickle.loads(serialized_data) except Exception as e: - logger.errpr(f'Failed to deserialize data: {str(e)}') + logger.error(f'Failed to deserialize data: {str(e)}') return None return data