From c5361706efc9d383b4bab9f0a6db9ecc54135af7 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Fri, 5 Jan 2024 15:25:48 +0000 Subject: [PATCH] setting up solar client --- backend/app/plan/router.py | 40 +++---- backend/app/plan/utils.py | 2 +- backend/app/utils.py | 13 --- etl/solar/SolarPhotoSupply.py | 136 ++++++++++++++++++++++ etl/solar/app.py | 34 ++++++ recommendations/SolarPvRecommendations.py | 4 +- 6 files changed, 194 insertions(+), 35 deletions(-) create mode 100644 etl/solar/SolarPhotoSupply.py create mode 100644 etl/solar/app.py diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 77ee9869..d28e6518 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -23,7 +23,7 @@ from backend.app.db.models.portfolio import rating_lookup from backend.app.dependencies import validate_token from backend.app.plan.schemas import PlanTriggerRequest from backend.app.plan.utils import create_recommendation_scoring_data, get_cleaned -from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, read_parquet_from_s3, sap_to_epc +from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, sap_to_epc from backend.ml_models.api import ModelApi from backend.Property import Property @@ -34,7 +34,7 @@ from recommendations.optimiser.GainOptimiser import GainOptimiser from recommendations.optimiser.optimiser_functions import prepare_input_measures from recommendations.Recommendations import Recommendations from utils.logger import setup_logger -from utils.s3 import read_dataframe_from_s3_parquet +from utils.s3 import read_dataframe_from_s3_parquet, read_dataframe_from_s3_parquet from backend.ml_models.Valuation import PropertyValuation from backend.ml_models.AnnualBillSavings import AnnualBillSavings @@ -61,19 +61,6 @@ async def trigger_plan(body: PlanTriggerRequest): logger.info("Getting the inputs") plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path) - uprn_filenames = read_dataframe_from_s3_parquet( - bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet" - ) - cleaning_data = read_parquet_from_s3( - bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet", - ) - photo_supply_lookup = read_parquet_from_s3( - bucket_name=get_settings().DATA_BUCKET, file_key="solar_pv_supply/photo_supply_lookup.parquet", - ) - floor_area_decile_thresholds = read_parquet_from_s3( - bucket_name=get_settings().DATA_BUCKET, file_key="solar_pv_supply/floor_area_decile_thresholds.parquet", - ) - input_properties = [] for config in plan_input: # We validate each record in the file. If the record is NOT valid, we need to handle this accordingly @@ -115,17 +102,30 @@ async def trigger_plan(body: PlanTriggerRequest): if not input_properties: return Response(status_code=204) - logger.info("Getting spatial data") - for p in input_properties: - p.get_spatial_data(uprn_filenames) - # The materials data could be cached or local so we don't need to make # consistent requests to the backend for # the same data - logger.info("Reading in materials and cleaned datasets") + logger.info("Reading in data sources required for the engine") materials = get_materials(session) cleaned = get_cleaned() + uprn_filenames = read_dataframe_from_s3_parquet( + bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet" + ) + cleaning_data = read_dataframe_from_s3_parquet( + bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet", + ) + photo_supply_lookup = read_dataframe_from_s3_parquet( + bucket_name=get_settings().DATA_BUCKET, file_key="solar_pv_supply/photo_supply_lookup.parquet", + ) + floor_area_decile_thresholds = read_dataframe_from_s3_parquet( + bucket_name=get_settings().DATA_BUCKET, file_key="solar_pv_supply/floor_area_decile_thresholds.parquet", + ) + + logger.info("Getting spatial data") + for p in input_properties: + p.get_spatial_data(uprn_filenames) + logger.info("Getting components and epc recommendations") recommendations = {} diff --git a/backend/app/plan/utils.py b/backend/app/plan/utils.py index fe7939f3..7672c316 100644 --- a/backend/app/plan/utils.py +++ b/backend/app/plan/utils.py @@ -195,7 +195,7 @@ def create_recommendation_scoring_data( raise ValueError("Invalid glazing type - implement me") if recommendation["type"] == "solar_pv": - scoring_dict["PHOTO_SUPPLY_ENDING"] = property.solar_pv_percentage + scoring_dict["PHOTO_SUPPLY_ENDING"] = recommendation["photo_supply"] if recommendation["type"] not in [ "mechanical_ventilation", "sealing_open_fireplace", "low_energy_lighting", diff --git a/backend/app/utils.py b/backend/app/utils.py index d912a94a..9a03ab21 100644 --- a/backend/app/utils.py +++ b/backend/app/utils.py @@ -121,19 +121,6 @@ def epc_to_sap_lower_bound(epc: str): raise ValueError("EPC rating should be between A and G") -def read_parquet_from_s3(bucket_name, file_key): - client = boto3.client('s3') - - # Get the object - s3_object = client.get_object(Bucket=bucket_name, Key=file_key) - - # Read the CSV body into a DataFrame - csv_body = s3_object["Body"].read() - df = pd.read_parquet(BytesIO(csv_body)) - - return df - - def save_dataframe_to_s3_parquet(df, bucket_name, file_key): """ Save a pandas DataFrame to S3 as a Parquet file. diff --git a/etl/solar/SolarPhotoSupply.py b/etl/solar/SolarPhotoSupply.py new file mode 100644 index 00000000..c6e2f9cb --- /dev/null +++ b/etl/solar/SolarPhotoSupply.py @@ -0,0 +1,136 @@ +import pandas as pd +from tqdm import tqdm +from utils.s3 import save_dataframe_to_s3_parquet, read_dataframe_from_s3_parquet +from utils.logger import setup_logger + +logger = setup_logger() + + +class SolarPhotoSupply: + DATASET_COLUMNS = [ + "UPRN", "PROPERTY_TYPE", "TENURE", "BUILT_FORM", "ROOF_DESCRIPTION", "PHOTO_SUPPLY", "TOTAL_FLOOR_AREA", + "CONSTRUCTION_AGE_BAND", "SOLAR_WATER_HEATING_FLAG" + ] + + def __init__(self, file_directories, cleaned_lookup): + self.file_directories = file_directories + + self.results = [] + self.decile_thresholds = None + + self.roof_lookup = pd.DataFrame(cleaned_lookup["roof-description"]) + + self.photo_supply_lookup = pd.DataFrame() + self.floor_area_decile_thresholds = pd.DataFrame() + + def create_dataset(self): + + results = [] + + logger.info("Creating solar photo supply dataset") + for dir in tqdm(self.file_directories): + filepath = dir / "certificates.csv" + df = pd.read_csv(filepath, low_memory=False) + df = df[~pd.isnull(df["UPRN"])] + df["UPRN"] = df["UPRN"].astype(int).astype(str) + # Drop rows that have a missing PROPERTY_TYPE, BUILT_FORM, CONSTRUCTION_AGE_BAND, TOTAL_FLOOR_AREA + for col in ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "TOTAL_FLOOR_AREA"]: + df = df[~pd.isnull(df[col])] + # Take newest LODGEMENT_DATE per UPRN + df = df.sort_values(by="LODGEMENT_DATE", ascending=False).drop_duplicates(subset=["UPRN"]) + + data = df[self.DATASET_COLUMNS].copy() + data["PHOTO_SUPPLY"] = data["PHOTO_SUPPLY"].fillna(0) + data = data[data["PHOTO_SUPPLY"] != 0] + results.append(data) + + self.results = pd.concat(results) + + # Convert total floor area to deciles + self.decile_thresholds = self.results["TOTAL_FLOOR_AREA"].quantile( + [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9] + ).values + + self.results["floor_area_decile"] = pd.cut( + self.results["TOTAL_FLOOR_AREA"], + bins=[0] + list(self.decile_thresholds) + [float('inf')], + labels=False, + include_lowest=True + ) + + # Convert tenure to lower + self.results["TENURE"] = self.results["TENURE"].str.lower() + + self.results = self.results.merge( + self.roof_lookup.drop( + columns=[ + "clean_description", "thermal_transmittance", "thermal_transmittance_unit", "insulation_thickness", + "is_assumed" + ] + ), + left_on="ROOF_DESCRIPTION", + right_on="original_description", + how="left" + ) + + self.photo_supply_lookup = self.results.groupby( + [ + "PROPERTY_TYPE", "BUILT_FORM", "TENURE", "is_pitched", "is_roof_room", "is_flat", + "CONSTRUCTION_AGE_BAND", "floor_area_decile" + ], + observed=True + ).agg( + { + "PHOTO_SUPPLY": ["median", "mean"], + } + ).reset_index() + + self.photo_supply_lookup.columns = ['_'.join(col).strip() for col in self.photo_supply_lookup.columns.values] + # Remove trailing underscore from columns + self.photo_supply_lookup.columns = [ + col[:-1] if col.endswith("_") else col for col in self.photo_supply_lookup.columns.values + ] + # Convert columns to lowercase + self.photo_supply_lookup.columns = [col.lower() for col in self.photo_supply_lookup.columns.values] + + self.floor_area_decile_thresholds = pd.DataFrame( + self.decile_thresholds, + columns=["floor_area_decile_thresholds"] + ) + + @staticmethod + def classify_floor_area(new_area, thresholds): + + for i, threshold in enumerate(thresholds): + if new_area <= threshold: + return i # Returns the decile index (0 to 9) + return len(thresholds) + + def save(self): + if self.photo_supply_lookup.empty: + raise ValueError("No data to save") + + # Store this data in s3 as a parquet file + + save_dataframe_to_s3_parquet( + df=self.photo_supply_lookup, + bucket_name="retrofit-data-dev", + file_key="solar_pv_supply/photo_supply_lookup.parquet", + ) + + save_dataframe_to_s3_parquet( + df=self.floor_area_decile_thresholds, + bucket_name="retrofit-data-dev", + file_key=f"solar_pv_supply/floor_area_decile_thresholds.parquet", + ) + + @staticmethod + def load(bucket): + photo_supply_lookup = read_dataframe_from_s3_parquet( + bucket_name=bucket, file_key="solar_pv_supply/photo_supply_lookup.parquet", + ) + floor_area_decile_thresholds = read_dataframe_from_s3_parquet( + bucket_name=bucket, file_key="solar_pv_supply/floor_area_decile_thresholds.parquet", + ) + + return photo_supply_lookup, floor_area_decile_thresholds diff --git a/etl/solar/app.py b/etl/solar/app.py new file mode 100644 index 00000000..29802e72 --- /dev/null +++ b/etl/solar/app.py @@ -0,0 +1,34 @@ +import pandas as pd +from pathlib import Path +from tqdm import tqdm +from etl.epc.property_change_app import get_cleaned +from etl.solar.SolarPhotoSupply import SolarPhotoSupply +from utils.s3 import save_dataframe_to_s3_parquet + +DATA_DIRECTORY = Path(__file__).parent / "local_data" / "all-domestic-certificates" + + +def app(): + """ + This code reads in the EPC data and attempt to produce a reasonable figure for the photo-supply variable, which + is the following: + "Percentage of photovoltaic area as a percentage of total roof area. 0% indicates that a Photovoltaic Supply + is not present in the property." + + When recommending solar, we want to simulate the retrofit by increasing this value from 0, so we need a sensible + figure to increase this to. This script will pull the data for that, to allow us to try and deduce what + a sensible figure would be + :return: + """ + + directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] + cleaned_lookup = get_cleaned() + + solar_data_client = SolarPhotoSupply( + file_directories=directories, + cleaned_lookup=cleaned_lookup + ) + + solar_data_client.create_dataset() + + solar_data_client.save() diff --git a/recommendations/SolarPvRecommendations.py b/recommendations/SolarPvRecommendations.py index 4dcf6104..5163c1cb 100644 --- a/recommendations/SolarPvRecommendations.py +++ b/recommendations/SolarPvRecommendations.py @@ -57,6 +57,8 @@ class SolarPvRecommendations: "new_u_value": None, "sap_points": None, **cost_result, - "photo_supply": self.property.solar_pv_percentage # This is required for simulating the SAP impact + # This is required for simulating the SAP impact. solar_pv_percentage is between 0 & 1 so we scale + # back up here + "photo_supply": 100 * self.property.solar_pv_percentage } ]