Model/etl/solar/SolarPhotoSupply.py
2024-01-12 11:21:13 +00:00

244 lines
9.8 KiB
Python

import pandas as pd
from tqdm import tqdm
from utils.s3 import save_dataframe_to_s3_parquet, read_dataframe_from_s3_parquet
from utils.logger import setup_logger
logger = setup_logger()
class SolarPhotoSupply:
DATASET_COLUMNS = [
"UPRN", "PROPERTY_TYPE", "TENURE", "BUILT_FORM", "ROOF_DESCRIPTION", "PHOTO_SUPPLY", "TOTAL_FLOOR_AREA",
"CONSTRUCTION_AGE_BAND", "SOLAR_WATER_HEATING_FLAG"
]
def __init__(self, file_directories, cleaned_lookup):
"""
Initialize the SolarPhotoSupply class with file directories and a cleaned lookup. Currently, this class
just works with locally stored data, but this could be extended to work with data stored in S3.
:param file_directories: A list of directories where files are stored.
:param cleaned_lookup: A dictionary containing cleaned lookup data.
"""
self.file_directories = file_directories
self.results = []
self.decile_thresholds = None
self.roof_lookup = pd.DataFrame(cleaned_lookup.get("roof-description"))
self.photo_supply_lookup = pd.DataFrame()
self.floor_area_decile_thresholds = pd.DataFrame()
def create_dataset(self):
"""
Create a dataset from the provided file directories. This method processes the data files,
applies transformations, and aggregates data into a useful format.
"""
if self.roof_lookup.empty:
raise ValueError("No roof lookup data")
results = []
logger.info("Creating solar photo supply dataset")
for dir in tqdm(self.file_directories):
filepath = dir / "certificates.csv"
df = pd.read_csv(filepath, low_memory=False)
df = df[~pd.isnull(df["UPRN"])]
df["UPRN"] = df["UPRN"].astype(int).astype(str)
# Drop rows that have a missing PROPERTY_TYPE, BUILT_FORM, CONSTRUCTION_AGE_BAND, TOTAL_FLOOR_AREA
for col in ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "TOTAL_FLOOR_AREA"]:
df = df[~pd.isnull(df[col])]
# Take newest LODGEMENT_DATE per UPRN
df = df.sort_values(by="LODGEMENT_DATE", ascending=False).drop_duplicates(subset=["UPRN"])
data = df[self.DATASET_COLUMNS].copy()
data["PHOTO_SUPPLY"] = data["PHOTO_SUPPLY"].fillna(0)
data = data[data["PHOTO_SUPPLY"] != 0]
results.append(data)
self.results = pd.concat(results)
# Convert total floor area to deciles
self.decile_thresholds = self.results["TOTAL_FLOOR_AREA"].quantile(
[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
).values
self.results["floor_area_decile"] = pd.cut(
self.results["TOTAL_FLOOR_AREA"],
bins=[0] + list(self.decile_thresholds) + [float('inf')],
labels=False,
include_lowest=True
)
# Convert tenure to lower
self.results["TENURE"] = self.results["TENURE"].str.lower()
self.results = self.results.merge(
self.roof_lookup.drop(
columns=[
"clean_description", "thermal_transmittance", "thermal_transmittance_unit", "insulation_thickness",
"is_assumed"
]
),
left_on="ROOF_DESCRIPTION",
right_on="original_description",
how="left"
)
self.photo_supply_lookup = self.results.groupby(
[
"PROPERTY_TYPE", "BUILT_FORM", "TENURE", "is_pitched", "is_roof_room", "is_flat",
"CONSTRUCTION_AGE_BAND", "floor_area_decile"
],
observed=True
).agg(
{
"PHOTO_SUPPLY": ["median", "mean"],
}
).reset_index()
self.photo_supply_lookup.columns = ['_'.join(col).strip() for col in self.photo_supply_lookup.columns.values]
# Remove trailing underscore from columns
self.photo_supply_lookup.columns = [
col[:-1] if col.endswith("_") else col for col in self.photo_supply_lookup.columns.values
]
# Convert columns to lowercase
self.photo_supply_lookup.columns = [col.lower() for col in self.photo_supply_lookup.columns.values]
self.floor_area_decile_thresholds = pd.DataFrame(
self.decile_thresholds,
columns=["floor_area_decile_thresholds"]
)
@staticmethod
def classify_floor_area(new_area, thresholds):
"""
Classify a given floor area into a decile based on provided thresholds.
:param new_area: The new floor area to be classified.
:param thresholds: A list of thresholds used for classification.
:return: An integer representing the decile index.
"""
for i, threshold in enumerate(thresholds):
if new_area <= threshold:
return i # Returns the decile index (0 to 9)
return len(thresholds)
def save(self):
"""
Save the processed data to an S3 bucket in the parquet format. This method also handles
logging and validation to ensure data is present before saving.
"""
if self.photo_supply_lookup.empty:
raise ValueError("No data to save")
logger.info("Storing outputs to S3")
# Store this data in s3 as a parquet file
save_dataframe_to_s3_parquet(
df=self.photo_supply_lookup,
bucket_name="retrofit-data-dev",
file_key="solar_pv_supply/photo_supply_lookup.parquet",
)
save_dataframe_to_s3_parquet(
df=self.floor_area_decile_thresholds,
bucket_name="retrofit-data-dev",
file_key=f"solar_pv_supply/floor_area_decile_thresholds.parquet",
)
@staticmethod
def load(bucket):
"""
Load datasets from an S3 bucket.
:param bucket: The name of the S3 bucket to load data from.
:return: A tuple containing photo supply lookup and floor area decile thresholds dataframes.
"""
photo_supply_lookup = read_dataframe_from_s3_parquet(
bucket_name=bucket, file_key="solar_pv_supply/photo_supply_lookup.parquet",
)
floor_area_decile_thresholds = read_dataframe_from_s3_parquet(
bucket_name=bucket, file_key="solar_pv_supply/floor_area_decile_thresholds.parquet",
)
return photo_supply_lookup, floor_area_decile_thresholds
@classmethod
def filter_photo_supply_lookup(
cls,
photo_supply_lookup: pd.DataFrame,
floor_area_decile_thresholds: pd.DataFrame,
tenure: str,
built_form: str,
property_type: str,
construction_age_band: str,
is_flat: bool,
is_pitched: bool,
is_roof_room: bool,
floor_area: float
):
"""
Filter the photo supply lookup to find the most appropriate photo supply for a given property.
:param photo_supply_lookup: The photo supply lookup dataframe.
:param floor_area_decile_thresholds: The floor area decile thresholds dataframe.
:param tenure: The tenure of the property.
:param built_form: The built form of the property.
:param property_type: The property type of the property.
:param construction_age_band: The construction age band of the property.
:param is_flat: Whether the property has a flat roof.
:param is_pitched: Whether the property has a pitched roof.
:param is_roof_room: Whether the property has a roof room.
:param floor_area: The floor area of the property.
:return:
"""
# Convert the tenure to lower case, as is done in the creation of the dataset
tenure = tenure.lower()
# We remap the "not defined"
tenure = {
"not defined - use in the case of a new dwelling for which the intended tenure in not known. it is not to "
"be used for an existing dwelling":
"not defined - use in the case of a new dwelling for which the intended tenure in not known. it is no"
}.get(tenure, tenure)
photo_supply_matched = photo_supply_lookup[
(photo_supply_lookup["tenure"] == tenure) &
(photo_supply_lookup["built_form"] == built_form) &
(photo_supply_lookup["property_type"] == property_type) &
(photo_supply_lookup["construction_age_band"] == construction_age_band) &
(photo_supply_lookup["is_flat"] == is_flat) &
(photo_supply_lookup["is_pitched"] == is_pitched) &
(photo_supply_lookup["is_roof_room"] == is_roof_room)
]
if photo_supply_matched.empty:
# There are a small number of cases where we don't get a full match so try again with a more aggregated
# average
photo_supply_matched = photo_supply_lookup[
(photo_supply_lookup["tenure"] == tenure) &
(photo_supply_lookup["built_form"] == built_form) &
(photo_supply_lookup["property_type"] == property_type)
]
if construction_age_band in photo_supply_matched["construction_age_band"].values:
photo_supply_matched = photo_supply_matched[
photo_supply_matched["construction_age_band"] == construction_age_band
]
if photo_supply_matched.empty:
raise ValueError("No photo supply matches")
floor_area_decile = cls.classify_floor_area(
floor_area, floor_area_decile_thresholds["floor_area_decile_thresholds"].values
)
if floor_area_decile in photo_supply_matched["floor_area_decile"].values:
photo_supply_matched = photo_supply_matched[
photo_supply_matched["floor_area_decile"] == floor_area_decile
]
return photo_supply_matched