setting up solar client

This commit is contained in:
Khalim Conn-Kowlessar 2024-01-05 15:25:48 +00:00
parent 49b0a1d901
commit c5361706ef
6 changed files with 194 additions and 35 deletions

View file

@ -23,7 +23,7 @@ from backend.app.db.models.portfolio import rating_lookup
from backend.app.dependencies import validate_token
from backend.app.plan.schemas import PlanTriggerRequest
from backend.app.plan.utils import create_recommendation_scoring_data, get_cleaned
from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, read_parquet_from_s3, sap_to_epc
from backend.app.utils import epc_to_sap_lower_bound, read_csv_from_s3, sap_to_epc
from backend.ml_models.api import ModelApi
from backend.Property import Property
@ -34,7 +34,7 @@ from recommendations.optimiser.GainOptimiser import GainOptimiser
from recommendations.optimiser.optimiser_functions import prepare_input_measures
from recommendations.Recommendations import Recommendations
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet
from utils.s3 import read_dataframe_from_s3_parquet, read_dataframe_from_s3_parquet
from backend.ml_models.Valuation import PropertyValuation
from backend.ml_models.AnnualBillSavings import AnnualBillSavings
@ -61,19 +61,6 @@ async def trigger_plan(body: PlanTriggerRequest):
logger.info("Getting the inputs")
plan_input = read_csv_from_s3(bucket_name=get_settings().PLAN_TRIGGER_BUCKET, filepath=body.trigger_file_path)
uprn_filenames = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet"
)
cleaning_data = read_parquet_from_s3(
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
)
photo_supply_lookup = read_parquet_from_s3(
bucket_name=get_settings().DATA_BUCKET, file_key="solar_pv_supply/photo_supply_lookup.parquet",
)
floor_area_decile_thresholds = read_parquet_from_s3(
bucket_name=get_settings().DATA_BUCKET, file_key="solar_pv_supply/floor_area_decile_thresholds.parquet",
)
input_properties = []
for config in plan_input:
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
@ -115,17 +102,30 @@ async def trigger_plan(body: PlanTriggerRequest):
if not input_properties:
return Response(status_code=204)
logger.info("Getting spatial data")
for p in input_properties:
p.get_spatial_data(uprn_filenames)
# The materials data could be cached or local so we don't need to make
# consistent requests to the backend for
# the same data
logger.info("Reading in materials and cleaned datasets")
logger.info("Reading in data sources required for the engine")
materials = get_materials(session)
cleaned = get_cleaned()
uprn_filenames = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="spatial/filename_meta.parquet"
)
cleaning_data = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
)
photo_supply_lookup = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="solar_pv_supply/photo_supply_lookup.parquet",
)
floor_area_decile_thresholds = read_dataframe_from_s3_parquet(
bucket_name=get_settings().DATA_BUCKET, file_key="solar_pv_supply/floor_area_decile_thresholds.parquet",
)
logger.info("Getting spatial data")
for p in input_properties:
p.get_spatial_data(uprn_filenames)
logger.info("Getting components and epc recommendations")
recommendations = {}

View file

@ -195,7 +195,7 @@ def create_recommendation_scoring_data(
raise ValueError("Invalid glazing type - implement me")
if recommendation["type"] == "solar_pv":
scoring_dict["PHOTO_SUPPLY_ENDING"] = property.solar_pv_percentage
scoring_dict["PHOTO_SUPPLY_ENDING"] = recommendation["photo_supply"]
if recommendation["type"] not in [
"mechanical_ventilation", "sealing_open_fireplace", "low_energy_lighting",

View file

@ -121,19 +121,6 @@ def epc_to_sap_lower_bound(epc: str):
raise ValueError("EPC rating should be between A and G")
def read_parquet_from_s3(bucket_name, file_key):
client = boto3.client('s3')
# Get the object
s3_object = client.get_object(Bucket=bucket_name, Key=file_key)
# Read the CSV body into a DataFrame
csv_body = s3_object["Body"].read()
df = pd.read_parquet(BytesIO(csv_body))
return df
def save_dataframe_to_s3_parquet(df, bucket_name, file_key):
"""
Save a pandas DataFrame to S3 as a Parquet file.

View file

@ -0,0 +1,136 @@
import pandas as pd
from tqdm import tqdm
from utils.s3 import save_dataframe_to_s3_parquet, read_dataframe_from_s3_parquet
from utils.logger import setup_logger
logger = setup_logger()
class SolarPhotoSupply:
DATASET_COLUMNS = [
"UPRN", "PROPERTY_TYPE", "TENURE", "BUILT_FORM", "ROOF_DESCRIPTION", "PHOTO_SUPPLY", "TOTAL_FLOOR_AREA",
"CONSTRUCTION_AGE_BAND", "SOLAR_WATER_HEATING_FLAG"
]
def __init__(self, file_directories, cleaned_lookup):
self.file_directories = file_directories
self.results = []
self.decile_thresholds = None
self.roof_lookup = pd.DataFrame(cleaned_lookup["roof-description"])
self.photo_supply_lookup = pd.DataFrame()
self.floor_area_decile_thresholds = pd.DataFrame()
def create_dataset(self):
results = []
logger.info("Creating solar photo supply dataset")
for dir in tqdm(self.file_directories):
filepath = dir / "certificates.csv"
df = pd.read_csv(filepath, low_memory=False)
df = df[~pd.isnull(df["UPRN"])]
df["UPRN"] = df["UPRN"].astype(int).astype(str)
# Drop rows that have a missing PROPERTY_TYPE, BUILT_FORM, CONSTRUCTION_AGE_BAND, TOTAL_FLOOR_AREA
for col in ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "TOTAL_FLOOR_AREA"]:
df = df[~pd.isnull(df[col])]
# Take newest LODGEMENT_DATE per UPRN
df = df.sort_values(by="LODGEMENT_DATE", ascending=False).drop_duplicates(subset=["UPRN"])
data = df[self.DATASET_COLUMNS].copy()
data["PHOTO_SUPPLY"] = data["PHOTO_SUPPLY"].fillna(0)
data = data[data["PHOTO_SUPPLY"] != 0]
results.append(data)
self.results = pd.concat(results)
# Convert total floor area to deciles
self.decile_thresholds = self.results["TOTAL_FLOOR_AREA"].quantile(
[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
).values
self.results["floor_area_decile"] = pd.cut(
self.results["TOTAL_FLOOR_AREA"],
bins=[0] + list(self.decile_thresholds) + [float('inf')],
labels=False,
include_lowest=True
)
# Convert tenure to lower
self.results["TENURE"] = self.results["TENURE"].str.lower()
self.results = self.results.merge(
self.roof_lookup.drop(
columns=[
"clean_description", "thermal_transmittance", "thermal_transmittance_unit", "insulation_thickness",
"is_assumed"
]
),
left_on="ROOF_DESCRIPTION",
right_on="original_description",
how="left"
)
self.photo_supply_lookup = self.results.groupby(
[
"PROPERTY_TYPE", "BUILT_FORM", "TENURE", "is_pitched", "is_roof_room", "is_flat",
"CONSTRUCTION_AGE_BAND", "floor_area_decile"
],
observed=True
).agg(
{
"PHOTO_SUPPLY": ["median", "mean"],
}
).reset_index()
self.photo_supply_lookup.columns = ['_'.join(col).strip() for col in self.photo_supply_lookup.columns.values]
# Remove trailing underscore from columns
self.photo_supply_lookup.columns = [
col[:-1] if col.endswith("_") else col for col in self.photo_supply_lookup.columns.values
]
# Convert columns to lowercase
self.photo_supply_lookup.columns = [col.lower() for col in self.photo_supply_lookup.columns.values]
self.floor_area_decile_thresholds = pd.DataFrame(
self.decile_thresholds,
columns=["floor_area_decile_thresholds"]
)
@staticmethod
def classify_floor_area(new_area, thresholds):
for i, threshold in enumerate(thresholds):
if new_area <= threshold:
return i # Returns the decile index (0 to 9)
return len(thresholds)
def save(self):
if self.photo_supply_lookup.empty:
raise ValueError("No data to save")
# Store this data in s3 as a parquet file
save_dataframe_to_s3_parquet(
df=self.photo_supply_lookup,
bucket_name="retrofit-data-dev",
file_key="solar_pv_supply/photo_supply_lookup.parquet",
)
save_dataframe_to_s3_parquet(
df=self.floor_area_decile_thresholds,
bucket_name="retrofit-data-dev",
file_key=f"solar_pv_supply/floor_area_decile_thresholds.parquet",
)
@staticmethod
def load(bucket):
photo_supply_lookup = read_dataframe_from_s3_parquet(
bucket_name=bucket, file_key="solar_pv_supply/photo_supply_lookup.parquet",
)
floor_area_decile_thresholds = read_dataframe_from_s3_parquet(
bucket_name=bucket, file_key="solar_pv_supply/floor_area_decile_thresholds.parquet",
)
return photo_supply_lookup, floor_area_decile_thresholds

34
etl/solar/app.py Normal file
View file

@ -0,0 +1,34 @@
import pandas as pd
from pathlib import Path
from tqdm import tqdm
from etl.epc.property_change_app import get_cleaned
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from utils.s3 import save_dataframe_to_s3_parquet
DATA_DIRECTORY = Path(__file__).parent / "local_data" / "all-domestic-certificates"
def app():
"""
This code reads in the EPC data and attempt to produce a reasonable figure for the photo-supply variable, which
is the following:
"Percentage of photovoltaic area as a percentage of total roof area. 0% indicates that a Photovoltaic Supply
is not present in the property."
When recommending solar, we want to simulate the retrofit by increasing this value from 0, so we need a sensible
figure to increase this to. This script will pull the data for that, to allow us to try and deduce what
a sensible figure would be
:return:
"""
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
cleaned_lookup = get_cleaned()
solar_data_client = SolarPhotoSupply(
file_directories=directories,
cleaned_lookup=cleaned_lookup
)
solar_data_client.create_dataset()
solar_data_client.save()

View file

@ -57,6 +57,8 @@ class SolarPvRecommendations:
"new_u_value": None,
"sap_points": None,
**cost_result,
"photo_supply": self.property.solar_pv_percentage # This is required for simulating the SAP impact
# This is required for simulating the SAP impact. solar_pv_percentage is between 0 & 1 so we scale
# back up here
"photo_supply": 100 * self.property.solar_pv_percentage
}
]