set up solar etc process and deleted research script

This commit is contained in:
Khalim Conn-Kowlessar 2024-01-05 16:07:23 +00:00
parent c5361706ef
commit e9d3577cf6
3 changed files with 1 additions and 115 deletions

View file

@ -110,6 +110,7 @@ class SolarPhotoSupply:
if self.photo_supply_lookup.empty:
raise ValueError("No data to save")
logger.info("Storing outputs to S3")
# Store this data in s3 as a parquet file
save_dataframe_to_s3_parquet(

View file

@ -1,9 +1,6 @@
import pandas as pd
from pathlib import Path
from tqdm import tqdm
from etl.epc.property_change_app import get_cleaned
from etl.solar.SolarPhotoSupply import SolarPhotoSupply
from utils.s3 import save_dataframe_to_s3_parquet
DATA_DIRECTORY = Path(__file__).parent / "local_data" / "all-domestic-certificates"

View file

@ -1,112 +0,0 @@
import pandas as pd
from pathlib import Path
from tqdm import tqdm
from etl.epc.property_change_app import get_cleaned
from utils.s3 import save_dataframe_to_s3_parquet
DATA_DIRECTORY = Path(__file__).parent / "local_data" / "all-domestic-certificates"
def app():
"""
This code reads in the EPC data and attempt to produce a reasonable figure for the photo-supply variable, which
is the following:
"Percentage of photovoltaic area as a percentage of total roof area. 0% indicates that a Photovoltaic Supply
is not present in the property."
When recommending solar, we want to simulate the retrofit by increasing this value from 0, so we need a sensible
figure to increase this to. This script will pull the data for that, to allow us to try and deduce what
a sensible figure would be
:return:
"""
directories = [entry for entry in DATA_DIRECTORY.iterdir() if entry.is_dir()]
results = []
for dir in tqdm(directories):
filepath = dir / "certificates.csv"
df = pd.read_csv(filepath, low_memory=False)
df = df[~pd.isnull(df["UPRN"])]
df["UPRN"] = df["UPRN"].astype(int).astype(str)
# Drop rows that have a missing PROPERTY_TYPE, BUILT_FORM, CONSTRUCTION_AGE_BAND, TOTAL_FLOOR_AREA
for col in ["PROPERTY_TYPE", "BUILT_FORM", "CONSTRUCTION_AGE_BAND", "TOTAL_FLOOR_AREA"]:
df = df[~pd.isnull(df[col])]
# Take newest LODGEMENT_DATE per UPRN
df = df.sort_values(by="LODGEMENT_DATE", ascending=False).drop_duplicates(subset=["UPRN"])
data = df[
["UPRN", "PROPERTY_TYPE", "TENURE", "BUILT_FORM", "ROOF_DESCRIPTION", "PHOTO_SUPPLY", "TOTAL_FLOOR_AREA",
"CONSTRUCTION_AGE_BAND", "SOLAR_WATER_HEATING_FLAG"]
].copy()
data["PHOTO_SUPPLY"] = data["PHOTO_SUPPLY"].fillna(0)
data = data[data["PHOTO_SUPPLY"] != 0]
results.append(data)
results = pd.concat(results)
# Convert total floor area to deciles
decile_thresholds = results["TOTAL_FLOOR_AREA"].quantile([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]).values
def classify_floor_area(new_area, thresholds):
for i, threshold in enumerate(thresholds):
if new_area <= threshold:
return i # Returns the decile index (0 to 9)
return len(thresholds)
# Assuming 'new_data' is your new DataFrame with floor area data
results["floor_area_decile"] = pd.cut(
results["TOTAL_FLOOR_AREA"],
bins=[0] + list(decile_thresholds) + [float('inf')],
labels=False,
include_lowest=True
)
# Convert tenure to lower
results["TENURE"] = results["TENURE"].str.lower()
# Append on the roof details
cleaned_lookup = get_cleaned()
lookup = pd.DataFrame(cleaned_lookup["roof-description"])
results = results.merge(
lookup.drop(
columns=[
"clean_description", "thermal_transmittance", "thermal_transmittance_unit", "insulation_thickness",
"is_assumed"
]
),
left_on="ROOF_DESCRIPTION",
right_on="original_description",
how="left"
)
aggregated = results.groupby(
[
"PROPERTY_TYPE", "BUILT_FORM", "TENURE", "is_pitched", "is_roof_room", "is_flat",
"CONSTRUCTION_AGE_BAND", "floor_area_decile"
],
observed=True
).agg(
{
"PHOTO_SUPPLY": ["median", "mean"],
}
).reset_index()
aggregated.columns = ['_'.join(col).strip() for col in aggregated.columns.values]
# Remove trailing underscore from columns
aggregated.columns = [col[:-1] if col.endswith("_") else col for col in aggregated.columns.values]
# Convert columns to lowercase
aggregated.columns = [col.lower() for col in aggregated.columns.values]
# Store this data in s3 as a parquet file
save_dataframe_to_s3_parquet(
df=aggregated,
bucket_name="retrofit-data-dev",
file_key="solar_pv_supply/photo_supply_lookup.parquet",
)
floor_area_decile_thresholds = pd.DataFrame(decile_thresholds, columns=["floor_area_decile_thresholds"])
save_dataframe_to_s3_parquet(
df=floor_area_decile_thresholds,
bucket_name="retrofit-data-dev",
file_key=f"solar_pv_supply/floor_area_decile_thresholds.parquet",
)