diff --git a/backend/Property.py b/backend/Property.py index be60784c..f1c7e65c 100644 --- a/backend/Property.py +++ b/backend/Property.py @@ -829,3 +829,43 @@ class Property(Definitions): number_habitable_rooms=self.number_of_rooms, extension_count=float(self.data["extension-count"]), ) + + def set_solar_panel_area(self, photo_supply_data): + """ + Sets the approximate area of the solar panels + :return: + """ + + # Approximate area of the solar panels + solar_panel_area = 1.6 + # Wattage per pan + solar_panel_wattage = 360 + + photo_supply_lookup = photo_supply_data["photo_supply_lookup"] + floor_area_decile_thresholds = photo_supply_data["floor_area_decile_thresholds"] + + # TODO: Create a class for the solar etl process and make this one of the functions, which applies a different + # method depending on the data type + def classify_floor_area(new_area, thresholds): + for i, threshold in enumerate(thresholds): + if new_area <= threshold: + return i # Returns the decile index (0 to 9) + return len(thresholds) + + floor_area_decile = classify_floor_area(self.floor_area, floor_area_decile_thresholds) + + # Given the photo_supply_lookup, we esimate the percentage of the roof that is suitable for solar panels + + # TODO: Move this to the ETL process, since we need to know that tenure should be lower + tenure = self.data["tenure"].lower() + photo_supply_matched = photo_supply_lookup[ + (photo_supply_lookup["tenure"] == tenure) & + (photo_supply_lookup["built_form"] == self.data["built-form"]) & + (photo_supply_lookup["property_type"] == self.data["property-type"]) & + (photo_supply_lookup["construction_age_band"] == self.construction_age_band) & + (photo_supply_lookup["is_flat"] == self.roof["is_flat"]) & + (photo_supply_lookup["is_pitched"] == self.roof["is_pitched"]) & + (photo_supply_lookup["is_roof_room"] == self.roof["is_roof_room"]) + ] + + # n_panels = np.floor(solar_panel_area * ) diff --git a/etl/testing_data/solar_research.py b/etl/testing_data/solar_research.py new file mode 100644 index 00000000..9abacdc3 --- /dev/null +++ b/etl/testing_data/solar_research.py @@ -0,0 +1,105 @@ +import pandas as pd +from pathlib import Path +from tqdm import tqdm +from etl.epc.property_change_app import get_cleaned +from utils.s3 import save_dataframe_to_s3_parquet + +DATA_DIRECTORY = Path(__file__).parent / "local_data" / "all-domestic-certificates" + + +def app(): + """ + This code reads in the EPC data and attempt to produce a reasonable figure for the photo-supply variable, which + is the following: + "Percentage of photovoltaic area as a percentage of total roof area. 0% indicates that a Photovoltaic Supply + is not present in the property." + + When recommending solar, we want to simulate the retrofit by increasing this value from 0, so we need a sensible + figure to increase this to. This script will pull the data for that, to allow us to try and deduce what + a sensible figure would be + :return: + """ + + directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()] + results = [] + for dir in tqdm(directories): + filepath = dir / "certificates.csv" + df = pd.read_csv(filepath, low_memory=False) + df = df[~pd.isnull(df["UPRN"])] + df["UPRN"] = df["UPRN"].astype(int).astype(str) + # Drop rows that have a missing PROPERTY_TYPE, BUILT_FORM, CONSTRUCTION_AGE_BAND, TOTAL_FLOOR_AREA + for col in ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "TOTAL_FLOOR_AREA"]: + df = df[~pd.isnull(df[col])] + # Take newest LODGEMENT_DATE per UPRN + df = df.sort_values(by="LODGEMENT_DATE", ascending=False).drop_duplicates(subset=["UPRN"]) + + data = df[ + ["UPRN", "PROPERTY_TYPE", "TENURE", "BUILT_FORM", "ROOF_DESCRIPTION", "PHOTO_SUPPLY", "TOTAL_FLOOR_AREA", + "CONSTRUCTION_AGE_BAND"] + ].copy() + data["PHOTO_SUPPLY"] = data["PHOTO_SUPPLY"].fillna(0) + data = data[data["PHOTO_SUPPLY"] != 0] + results.append(data) + + results = pd.concat(results) + + # Convert total floor area to deciles + decile_thresholds = results["TOTAL_FLOOR_AREA"].quantile([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]).values + + def classify_floor_area(new_area, thresholds): + for i, threshold in enumerate(thresholds): + if new_area <= threshold: + return i # Returns the decile index (0 to 9) + return len(thresholds) + + # Assuming 'new_data' is your new DataFrame with floor area data + results["floor_area_decile"] = pd.cut( + results["TOTAL_FLOOR_AREA"], + bins=[0] + list(decile_thresholds) + [float('inf')], + labels=False, + include_lowest=True + ) + + # Convert tenure to lower + results["TENURE"] = results["TENURE"].str.lower() + + # Append on the roof details + cleaned_lookup = get_cleaned() + lookup = pd.DataFrame(cleaned_lookup["roof-description"]) + + results = results.merge( + lookup.drop( + columns=[ + "clean_description", "thermal_transmittance", "thermal_transmittance_unit", "insulation_thickness", + "is_assumed" + ] + ), + left_on="ROOF_DESCRIPTION", + right_on="original_description", + how="left" + ) + + aggregated = results.groupby( + [ + "PROPERTY_TYPE", "BUILT_FORM", "TENURE", "is_pitched", "is_roof_room", "is_loft", "is_flat", "is_thatched", + "is_at_rafters", "has_dwelling_above", "CONSTRUCTION_AGE_BAND", "floor_area_decile" + ], + observed=True + ).agg( + { + "PHOTO_SUPPLY": ["median", "mean"], + } + ).reset_index() + + aggregated.columns = ['_'.join(col).strip() for col in aggregated.columns.values] + # Remove trailing underscore from columns + aggregated.columns = [col[:-1] if col.endswith("_") else col for col in aggregated.columns.values] + # Convert columns to lowercase + aggregated.columns = [col.lower() for col in aggregated.columns.values] + + # Store this data in s3 as a parquet file + save_dataframe_to_s3_parquet( + df=aggregated, + bucket_name="retrofit-data-dev", + file_key=f"solar_pv_supply/photo_supply_lookup.parquet", + ) diff --git a/recommendations/SolarPvRecommendations.py b/recommendations/SolarPvRecommendations.py new file mode 100644 index 00000000..addbeb3f --- /dev/null +++ b/recommendations/SolarPvRecommendations.py @@ -0,0 +1,37 @@ +from recommendations.Costs import Costs + + +class SolarPvRecommendations: + + def __init__(self, property_instance): + """ + :param property_instance: Instance of the Property class, for the home associated to property_id + :param photo_supply_lookup: Lookup table of photo supply percentages + """ + + self.property = property_instance + self.costs = Costs(self.property) + + self.recommendations = [] + + def recommend(self): + """ + We check if a property is potentially suitable for solar PV based on the following criteria: + - The property is a house or bungalow + - The property has a flat or pitched roof + - The property does not have existing solar pv + :return: + """ + + is_valid_property_type = self.property.data["property-type"] in ["House", "Bungalow"] + is_valid_roof_type = ( + self.property.roof["is_flat"] or self.property.roof["is_pitched"] or self.property.roof["is_roof_room"] + ) + has_no_existing_solar_pv = not self.property.data["photo-supply"] in [ + None, 0, self.property.DATA_ANOMALY_MATCHES + ] + + if not is_valid_property_type or not is_valid_roof_type or has_no_existing_solar_pv: + return + + # We now have a property which is potentially suitable for solar PV diff --git a/recommendations/tests/test_solar_pv_recommendations.py b/recommendations/tests/test_solar_pv_recommendations.py new file mode 100644 index 00000000..e69de29b