Merge branch 'main' of github.com:Hestia-Homes/Model into etl-michael-cost

This commit is contained in:
Michael Duong 2024-07-10 23:20:23 +01:00
commit 834db6701f
9 changed files with 907 additions and 29 deletions

View file

@ -213,6 +213,10 @@ class GoogleSolarApi:
# 1) Convert Solar Energy AD production from the DC production
panel_performance["initial_ac_kwh_per_year"] = panel_performance["yearly_dc_energy"] * self.dc_to_ac_rate
# This is just a benchmark figure, based on the national figure. This doesn't not respect the fact that a
# property could be 100% electric
average_electricity_consumption
# Remove anything where the total ac energy is less than half of the array wattage
panel_performance = panel_performance[
(panel_performance["initial_ac_kwh_per_year"] / panel_performance["array_warrage"]) >= 0.5

View file

@ -284,16 +284,16 @@ async def trigger_plan(body: PlanTriggerRequest):
property_id, is_new = create_property(
session, body.portfolio_id, epc_searcher.address_clean, epc_searcher.postcode_clean, epc_searcher.uprn
)
# if not is_new:
# continue
#
# create_property_targets(
# session,
# property_id=property_id,
# portfolio_id=body.portfolio_id,
# epc_target=body.goal_value,
# heat_demand_target=None
# )
if not is_new:
continue
create_property_targets(
session,
property_id=property_id,
portfolio_id=body.portfolio_id,
epc_target=body.goal_value,
heat_demand_target=None
)
epc_records = {
'original_epc': epc_searcher.newest_epc.copy(),
@ -356,7 +356,7 @@ async def trigger_plan(body: PlanTriggerRequest):
p.get_spatial_data(uprn_filenames)
# Call Google Solar API
# TODO: Complete me
# solar_performance = solar_api_client.get(longitude=p.spatial["longitude"], latitude=p.spatial["latitude"])
solar_performance = solar_api_client.get(longitude=p.spatial["longitude"], latitude=p.spatial["latitude"])
logger.info("Getting components and epc recommendations")
recommendations = {}

View file

@ -1,5 +1,16 @@
import numpy as np
QUARTERLY_ENERGY_PRICES = [
# 2024 Q1
{"start": "2024-01-01", "end": "2024-03-31", "electricity": 0.2, "gas": 0.042},
# 2023 Q4
{"start": "2023-10-01", "end": "2023-12-31", "electricity": 0.202, "gas": 0.51},
# 2023 Q3
{"start": "2023-07-01", "end": "2023-09-30", "electricity": 0.188, "gas": 0.46},
# 2023 Q2
{"start": "2023-04-01", "end": "2023-06-30", "electricity": 0.177, "gas": 0.456},
]
class AnnualBillSavings:
"""

View file

@ -0,0 +1,526 @@
import pandas as pd
import numpy as np
import msgpack
from xgboost import XGBRegressor
from datetime import datetime
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_percentage_error
from sklearn.feature_selection import RFECV
from utils.s3 import save_pickle_to_s3, read_pickle_from_s3, read_dataframe_from_s3_parquet, read_from_s3
import logging
from pprint import pprint
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class EnergyConsumptionModel:
FEATURES = {
"heating_kwh": [
"lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current",
"heating-cost-current", "heating-cost-potential", "total-floor-area", "number-heated-rooms",
"mainheat-description", "mainheat-energy-eff", "main-fuel", "secondheat-description", "property-type",
"built-form", "mainheatcont-description", "hotwater-description", "hot-water-energy-eff",
"walls-energy-eff",
"roof-energy-eff", "windows-description", "windows-energy-eff", "floor-description", "flat-top-storey",
"flat-storey-count", "unheated-corridor-length", "solar-water-heating-flag", "mechanical-ventilation",
"low-energy-lighting", "environment-impact-current", "energy-tariff",
"county", "construction-age-band", "co2-emissions-current",
# TODO: Testing
"lighting-cost-current", "hot-water-cost-current", "current-energy-rating"
],
"hot_water_kwh": [
"lodgement-year", "lodgement-month",
"current-energy-efficiency",
"energy-consumption-current",
"hot-water-cost-current",
"total-floor-area", "number-heated-rooms",
"hotwater-description", "hot-water-energy-eff", "main-fuel", "property-type", "built-form",
"co2-emissions-current",
]
}
TARGETS = ['heating_kwh', 'hot_water_kwh']
CATEGORICAL_COLUMNS = [
"lodgement-year", "lodgement-month", "main-fuel", "mainheat-description", "number-heated-rooms",
"number-habitable-rooms", "mainheat-energy-eff", "mainheatcont-description", "property-type", "built-form",
"construction-age-band", "secondheat-description", "hotwater-description", "hot-water-energy-eff",
"walls-description", "walls-energy-eff", "roof-description", "roof-energy-eff", "floor-description",
"county",
"windows-description", "windows-energy-eff", "flat-top-storey",
"flat-storey-count", "unheated-corridor-length", "solar-water-heating-flag", "mechanical-ventilation",
"low-energy-lighting", "environment-impact-current", "energy-tariff", "current-energy-rating"
]
def __init__(self, cleaned, model_paths=None, n_jobs=1):
self.cleaned = cleaned
self.models = {}
self.model_paths = model_paths or {}
self.n_jobs = n_jobs
self.data = None
self.input_data = None
self.dummy_columns = None
self.training_predictions = {}
self.testing_predictions = {}
self.best_iteration = {}
self.x_train = {}
self.x_test = {}
self.x_val = {}
self.y_val = {}
self.y_train = {}
self.y_test = {}
self.selected_features = {}
self.NUMERICAL_COLUMNS = list({
x for x in self.FEATURES["heating_kwh"] + self.FEATURES["hot_water_kwh"]
if x not in self.CATEGORICAL_COLUMNS
})
if model_paths:
for target, path in model_paths.items():
self.models[target] = read_pickle_from_s3(bucket_name="retrofit-model-directory-dev", s3_file_name=path)
def read_dataset(self, file_path):
"""Reads the dataset from the specified file path."""
logging.info(f"Reading dataset from {file_path}")
self.data = read_dataframe_from_s3_parquet(bucket_name="retrofit-data-dev", file_key=file_path)
self.input_data = self.data.copy()
def feature_engineering(self):
"""Performs feature engineering on the dataset."""
logging.info("Starting feature engineering")
self.data["lodgement-date"] = pd.to_datetime(self.data["lodgement-date"])
self.data["lodgement-year"] = self.data["lodgement-date"].dt.year
self.data["lodgement-month"] = self.data["lodgement-date"].dt.month
# For walls, roof, floor description where we have average thermal transmittance, to avoid too many categories
# we group them
ranges = {
"lessthan 0.1": (0, 0.1),
"0.1 - 0.3": (0.1, 0.3),
"0.3 - 0.5": (0.3, 0.5),
"morethan 0.5": (0.5, 2.5),
}
# Generate the lookup table
thermal_transmittance_lookup_table = []
for i in range(1, 251):
value = i / 100
for label, (low, high) in ranges.items():
if low < value <= high:
thermal_transmittance_lookup_table.append({"from": value, "to": label})
break
# Convert to DataFrame for display
thermal_transmittance_lookup_table = pd.DataFrame(thermal_transmittance_lookup_table)
thermal_transmittance_lookup_table["from"] = thermal_transmittance_lookup_table["from"].astype(str)
# Apply the lookup table to the data
for feature in ["walls-description", "roof-description", "floor-description"]:
cleaned_df = pd.DataFrame(self.cleaned[feature])[["original_description", "thermal_transmittance"]]
# Round to 2 decimal places and convert to string
cleaned_df["thermal_transmittance"] = cleaned_df["thermal_transmittance"].round(2).astype(str)
self.data = self.data.merge(
cleaned_df,
how="left",
left_on=feature,
right_on="original_description",
)
# We now have the thermal transmittance in the data, which we can use to group with the lookup table
self.data = self.data.merge(
thermal_transmittance_lookup_table,
how="left",
left_on="thermal_transmittance",
right_on="from",
)
# Where "to" is populated, replace feature with to
self.data[feature] = np.where(
~pd.isnull(self.data["to"]),
self.data["to"],
self.data[feature]
)
self.data = self.data.drop(columns=["original_description", "thermal_transmittance", "from", "to"])
# Modify number of heated rooms and number of habitable rooms
self.data["number-heated-rooms"] = self.data["number-heated-rooms"].apply(
lambda x: "16_or_more" if x > 15 else str(x)
)
# self.data["number-habitable-rooms"] = self.data["number-habitable-rooms"].apply(
# lambda x: "10+" if x > 10 else str(x)
# )
# Convert data types
self.data[self.NUMERICAL_COLUMNS] = self.data[self.NUMERICAL_COLUMNS].apply(pd.to_numeric)
self.data[self.CATEGORICAL_COLUMNS] = self.data[self.CATEGORICAL_COLUMNS].astype(str)
# Convert categorical columns to dummies
self.data = pd.get_dummies(self.data, columns=self.CATEGORICAL_COLUMNS, drop_first=True)
# Store the dummy columns
self.dummy_columns = {}
for target in self.TARGETS:
target_features = self.FEATURES[target]
dummy_feature_columns = []
for feature in target_features:
if feature in self.CATEGORICAL_COLUMNS:
dummy_feature_columns.extend([col for col in self.data.columns if col.startswith(feature + '_')])
else:
dummy_feature_columns.append(feature)
self.dummy_columns[target] = dummy_feature_columns
logging.info("Feature engineering completed")
def split_dataset(self, target, test_size=0.2, validation_size=0.2, random_state=42):
"""Splits the dataset into training, validation, and testing sets."""
if target not in self.TARGETS:
raise ValueError(f"Target {target} not in {self.TARGETS}")
logging.info(f"Splitting dataset for target {target}")
# Split into train + validation and test sets
x_train_val, x_test, y_train_val, y_test = train_test_split(
self.data[self.dummy_columns[target]],
self.data[target],
test_size=test_size,
random_state=random_state
)
# Split train + validation into train and validation sets
x_train, x_val, y_train, y_val = train_test_split(
x_train_val,
y_train_val,
test_size=validation_size / (1 - test_size),
random_state=random_state
)
self.x_train[target], self.x_val[target], self.x_test[target] = x_train, x_val, x_test
self.y_train[target], self.y_val[target], self.y_test[target] = y_train, y_val, y_test
def feature_selection(self, target, cv_folds=3, sample_fraction=0.1, random_state=42):
"""
Performs feature selection using RFECV with XGBoost.
Parameters:
- target: The target variable for feature selection.
- cv_folds: Number of cross-validation folds.
- sample_fraction: Fraction of the data to use for feature selection.
- random_state: Random state for reproducibility.
"""
if target not in self.TARGETS:
raise ValueError(f"Target {target} not in {self.TARGETS}")
logging.info(f"Starting feature selection for target {target}")
# Sample the data if specified
if sample_fraction < 1.0:
x_sample, _, y_sample, _ = train_test_split(
self.x_train[target], self.y_train[target],
train_size=sample_fraction, random_state=random_state
)
else:
x_sample = self.x_train[target]
y_sample = self.y_train[target]
# Initialize the XGBoost model and RFECV
model = self.init_model(feature_selection=True)
selector = RFECV(
model, step=1, cv=cv_folds, scoring='neg_mean_absolute_percentage_error', verbose=1, n_jobs=self.n_jobs
)
selector = selector.fit(x_sample, y_sample)
# Get the selected features
self.selected_features[target] = x_sample.columns[selector.support_]
# Update x_train, x_test and x_val with selected features
self.x_train[target] = self.x_train[target][self.selected_features[target]]
self.x_test[target] = self.x_test[target][self.selected_features[target]]
self.x_val[target] = self.x_val[target][self.selected_features[target]]
logging.info(f"Feature selection completed for target {target}")
def init_model(self, feature_selection=False):
if feature_selection:
# Set up a smaller model to work it
return XGBRegressor(
objective='reg:squarederror',
n_estimators=50,
learning_rate=0.05,
max_depth=6,
subsample=0.8,
colsample_bytree=0.8,
reg_alpha=0.1,
reg_lambda=0.1
)
return XGBRegressor(
objective='reg:squarederror',
n_estimators=1000,
learning_rate=0.05,
max_depth=6,
min_child_weight=3,
subsample=0.8,
colsample_bytree=0.8,
reg_alpha=0.1,
reg_lambda=0.1
# n_jobs=self.n_jobs
)
def fit_model(self, target):
"""Fits the model to the training data and removes zero-importance features."""
logging.info(f"Fitting model for target {target}")
# Initialize and fit the model
model = self.init_model()
model.fit(
self.x_train[target],
self.y_train[target],
eval_set=[(self.x_val[target], self.y_val[target])],
early_stopping_rounds=50
)
# Store the model
self.models[target] = model
# Identify and remove zero-importance features
feature_importance = pd.DataFrame({
'Feature': self.x_train[target].columns,
'Importance': model.feature_importances_
})
zero_importance_features = feature_importance[feature_importance['Importance'] == 0]['Feature'].tolist()
if zero_importance_features:
logging.info(f"Removing zero-importance features for target {target}: {zero_importance_features}")
self.x_train[target] = self.x_train[target].drop(columns=zero_importance_features)
self.x_val[target] = self.x_val[target].drop(columns=zero_importance_features)
self.x_test[target] = self.x_test[target].drop(columns=zero_importance_features)
# Re-fit the model with the reduced feature set
model = self.init_model()
model.fit(
self.x_train[target],
self.y_train[target],
eval_set=[(self.x_val[target], self.y_val[target])],
early_stopping_rounds=50
)
# Update the model
self.models[target] = model
# Store the best iteration
self.best_iteration[target] = self.models[target].best_iteration
logging.info(f"Model fitting completed for target {target}")
def re_train_final_model(self, target):
"""Re-trains the final model on the combined training and validation set."""
logging.info(f"Re-training final model for target {target}")
x_train_val = pd.concat([self.x_train[target], self.x_val[target]])
y_train_val = pd.concat([self.y_train[target], self.y_val[target]])
self.models[target] = self.init_model()
self.models[target].fit(x_train_val, y_train_val, verbose=False)
logging.info(f"Re-training final model completed for target {target}")
def evaluate_model(self, target):
"""Evaluates the model on training and testing data."""
logging.info(f"Evaluating model for target {target}")
y_train_pred = self.models[target].predict(self.x_train[target])
train_mse = mean_squared_error(self.y_train[target], y_train_pred)
train_r2 = r2_score(self.y_train[target], y_train_pred)
train_mape = mean_absolute_percentage_error(self.y_train[target], y_train_pred)
self.training_predictions[target] = pd.DataFrame({
'Actual': self.y_train[target],
'Predicted': y_train_pred
})
self.training_predictions[target]["residual"] = abs(
self.training_predictions[target]["Actual"] - self.training_predictions[target]["Predicted"]
)
y_test_pred = self.models[target].predict(self.x_test[target])
test_mse = mean_squared_error(self.y_test[target], y_test_pred)
test_r2 = r2_score(self.y_test[target], y_test_pred)
test_mape = mean_absolute_percentage_error(self.y_test[target], y_test_pred)
self.testing_predictions[target] = pd.DataFrame({
'Actual': self.y_test[target],
'Predicted': y_test_pred
})
self.testing_predictions[target]["residual"] = abs(
self.testing_predictions[target]["Actual"] - self.testing_predictions[target]["Predicted"]
)
if target in self.selected_features:
feature_importance = pd.DataFrame({
'Feature': self.selected_features[target],
'Importance': self.models[target].feature_importances_
}).sort_values(by='Importance', ascending=False)
else:
feature_importance = pd.DataFrame({
'Feature': self.x_train[target].columns,
'Importance': self.models[target].feature_importances_
}).sort_values(by='Importance', ascending=False)
logging.info(f"Evaluation completed for target {target}")
return {
'train': {
'MSE': train_mse,
'R2': train_r2,
'MAPE': train_mape,
'Feature Importance': feature_importance
},
'test': {
'MSE': test_mse,
'R2': test_r2,
'MAPE': test_mape
}
}
def save_model(self, target):
"""Saves the model to S3."""
logging.info(f"Saving model for target {target}")
run_date = datetime.now().strftime("%Y-%m-%d")
save_pickle_to_s3(
self.models[target],
bucket_name="retrofit-model-directory-dev",
s3_file_name=f"model_directory/energy_consumption_model/{target}_{run_date}.pkl"
)
def score_new_data(self, new_data, target):
"""Scores new data using the trained model."""
if target not in self.models:
raise ValueError(f"Model for target {target} not loaded or trained")
new_data_transformed = self.transform_new_data(new_data, target)
return self.models[target].predict(new_data_transformed)
def transform_new_data(self, new_data, target):
"""Applies the same transformations to new data as were applied to the training data."""
# TODO THis should jsut use our other transformation function
new_data["lodgement-date"] = pd.to_datetime(new_data["lodgement-date"])
new_data["lodgement-year"] = new_data["lodgement-date"].dt.year
new_data["lodgement-month"] = new_data["lodgement-date"].dt.month
# Convert categorical columns to dummies
new_data = pd.get_dummies(new_data, columns=self.CATEGORICAL_COLUMNS, drop_first=True)
# Align new data with the dummy columns from training data
new_data = new_data.reindex(columns=self.dummy_columns[target], fill_value=0)
# Select the features used by the model
new_data = new_data[self.selected_features[target]]
return new_data
def error_analysis(self, target, top_n=10, unique_threshold=0.8):
"""
Perform error analysis on the provided model and dataset.
Parameters:
- target: The target variable to analyze.
- top_n: Number of top residuals to consider for analysis.
- unique_threshold: Threshold to exclude columns with high unique values.
Returns:
- summary: Dictionary summarizing common features among poorly performing rows.
"""
# Calculate predictions and residuals
y_train_pred = self.models[target].predict(self.x_train[target])
y_test_pred = self.models[target].predict(self.x_test[target])
train_residuals = self.y_train[target] - y_train_pred
test_residuals = self.y_test[target] - y_test_pred
# Identify top N poorly performing rows by absolute residuals
top_train_indices = train_residuals.abs().nlargest(top_n).index
top_test_indices = test_residuals.abs().nlargest(top_n).index
top_train_data = self.input_data.loc[top_train_indices]
top_test_data = self.input_data.loc[top_test_indices]
# Automatically detect and exclude columns
def exclude_columns(data, threshold):
exclude_cols = []
num_rows = data.shape[0]
for col in data.columns:
if data[col].dtype == 'object' and data[col].nunique() / num_rows >= threshold:
exclude_cols.append(col)
return exclude_cols
exclude_cols = exclude_columns(top_train_data, unique_threshold)
top_train_data = top_train_data.drop(columns=exclude_cols)
top_test_data = top_test_data.drop(columns=exclude_cols)
# One-hot encode categorical variables
categorical_columns = top_train_data.select_dtypes(include=['object']).columns.tolist()
top_train_data_encoded = pd.get_dummies(top_train_data, columns=categorical_columns, drop_first=True)
top_test_data_encoded = pd.get_dummies(top_test_data, columns=categorical_columns, drop_first=True)
# Ensure all original columns are included in the encoded data
top_train_data_encoded = top_train_data_encoded.reindex(columns=self.input_data.columns, fill_value=0)
top_test_data_encoded = top_test_data_encoded.reindex(columns=self.input_data.columns, fill_value=0)
# Correlation analysis with residuals
train_corr = top_train_data_encoded.corrwith(train_residuals.loc[top_train_indices])
test_corr = top_test_data_encoded.corrwith(test_residuals.loc[top_test_indices])
# Return summaries
summary = {
"train_summary": top_train_data.describe(include='all').T,
"test_summary": top_test_data.describe(include='all').T,
"train_corr": train_corr,
"test_corr": test_corr,
"top_train_data": top_train_data,
"top_test_data": top_test_data
}
return summary
# Usage:
cleaned = read_from_s3(
s3_file_name="cleaned_epc_data/cleaned.bson",
bucket_name="retrofit-data-dev"
)
cleaned = msgpack.unpackb(cleaned, raw=False)
model = EnergyConsumptionModel(cleaned=cleaned, n_jobs=2)
model.read_dataset('energy_consumption/2024-07-05/energy_consumption_dataset.parquet')
model.feature_engineering()
# For heating_kwh
model.split_dataset(target='heating_kwh')
model.fit_model(target='heating_kwh')
model.re_train_final_model(target='heating_kwh')
evaluation_results = model.evaluate_model(target='heating_kwh')
pprint(evaluation_results["train"])
pprint(evaluation_results["test"])
importance_df = evaluation_results["train"]["Feature Importance"]
testing_predictions = model.testing_predictions["heating_kwh"]
testing_predictions = testing_predictions.sort_values("residual", ascending=False)
training_predictions = model.training_predictions["heating_kwh"]
training_predictions = training_predictions.sort_values("residual", ascending=False)
# Merge on model.input_data, by the index
merged_data = testing_predictions.merge(model.input_data, left_index=True, right_index=True)
merged_data_train = training_predictions.merge(model.input_data, left_index=True, right_index=True)
# For hot_water_kwh
model.split_dataset(target='hot_water_kwh')
model.fit_model(target='hot_water_kwh')
model.re_train_final_model(target='hot_water_kwh')
evaluation_results = model.evaluate_model(target='hot_water_kwh')
pprint(evaluation_results["train"])
pprint(evaluation_results["test"])

View file

@ -0,0 +1,184 @@
import time
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import requests
import inspect
import pandas as pd
from tqdm import tqdm
from bs4 import BeautifulSoup
from etl.epc.settings import EARLIEST_EPC_DATE
from pathlib import Path
import numpy as np
from utils.s3 import save_pickle_to_s3
src_file_path = inspect.getfile(lambda: None)
EPC_DIRECTORY = Path(src_file_path).parent / "local_data" / "all-domestic-certificates"
SEARCH_POSTCODE_URL = (
"https://find-energy-certificate.service.gov.uk/find-a-certificate/search-by-postcode?postcode={postcode_input}"
)
BASE_ENERGY_URL = "https://find-energy-certificate.service.gov.uk"
def calculate_expiry_date(lodgement_date):
lodgement_date_dt = datetime.strptime(lodgement_date, '%Y-%m-%d')
expiry_date_dt = lodgement_date_dt + relativedelta(years=10) - timedelta(days=1)
return expiry_date_dt.strftime('%-d %B %Y')
def retrieve_find_my_epc_data(uprn: int, postcode: str, address: str, expected_expiry_date: str):
"""
For a post code and address, we pull out all the required data from the find my epc website
"""
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/111.0.0.0 Safari/537.36'
}
postcode_input = postcode.replace(" ", "+")
postcode_search = SEARCH_POSTCODE_URL.format(postcode_input=postcode_input)
postcode_response = requests.get(postcode_search, headers=headers)
address_cleaned = address.replace(",", "").replace(" ", "").lower()
postcode_res = BeautifulSoup(postcode_response.text, features="html.parser")
rows = postcode_res.find_all('tr', class_='govuk-table__row')
extracted_table = []
for row in rows:
# Extract the address and URL
address_tag = row.find('a', class_='govuk-link')
if address_tag is None:
continue
extracted_address = None
extracted_address_url = None
if address_tag:
extracted_address = address_tag.text.strip()
extracted_address_url = address_tag['href']
extracted_address_cleaned = extracted_address.replace(",", "").replace(" ", "").lower()
if not extracted_address_cleaned.startswith(address_cleaned):
continue
# If the address is a match, we can extract the data
# Extract the expiry date
expiry_date_tag = row.find('td', class_='govuk-table__cell date')
expiry_date = None
if expiry_date_tag is not None:
expiry_date = expiry_date_tag.parent.find('span').text.strip()
extracted_table.append(
{
"extracted_address": extracted_address,
"extracted_address_url": extracted_address_url,
"expiry_date": expiry_date
}
)
extracted_table = [entry for entry in extracted_table if entry['expiry_date'] == expected_expiry_date]
if len(extracted_table) > 1:
print("Multiple candidates found, skipping for now")
return None
if not extracted_table:
print("No candidates found, skipping for now")
return None
chosen_epc = BASE_ENERGY_URL + extracted_table[0]['extracted_address_url']
epc_certificate = chosen_epc.split('/')[-1]
address_response = requests.get(chosen_epc, headers=headers)
address_res = BeautifulSoup(address_response.text, features="html.parser")
ratings = address_res.find('desc', {'id': 'svg-desc'}).text
current_rating = ratings.split(".")[0]
potential_rating = ratings.split(".")[1]
# Retrieve the energy consumption
bills = address_res.find('div', {'id': 'bills-affected'})
bills_list = bills.find_all('li')
if not bills_list:
return None
heating_text = bills_list[0].text
hot_water_text = bills_list[1].text
resulting_data = {
'extracted_uprn': uprn,
'extracted_address': address,
'epc_certificate': epc_certificate,
'current_epc_rating': current_rating.split(' ')[-6],
'current_epc_efficiency': int(current_rating.split(' ')[-1]),
'potential_epc_rating': potential_rating.split(' ')[-6],
"potential_epc_efficiency": int(potential_rating.split(' ')[-1]),
"heating_text": heating_text,
"hot_water_text": hot_water_text,
}
return resulting_data
def app():
"""
This application is tasked with pulling a large quantity of data from the find my epc website, containing the
estimated energy consumption for properties
:return:
"""
epc_directories = [entry for entry in EPC_DIRECTORY.iterdir() if entry.is_dir()]
sample_size = 100
energy_consumption_data = []
for i, directory in tqdm(enumerate(epc_directories), total=len(epc_directories)):
# Skip the first 50
if i < 36:
continue
data = pd.read_csv(directory / "certificates.csv", low_memory=False)
# Rename the columns to the same format as the api returns
data.columns = [c.replace("_", "-").lower() for c in data.columns]
# Take just date before the date threshold
data = data[data["lodgement-date"] >= EARLIEST_EPC_DATE]
data = data[~pd.isnull(data["uprn"])]
# Take just the newest EPC per uprn, based on lodgement-date
data = data.sort_values("lodgement-date", ascending=False).drop_duplicates("uprn")
data = data.sample(sample_size)
# We use the addreess data to find the related information
collected_data = []
for _, property_data in data.iterrows():
time.sleep(np.random.uniform(0.3, 2))
uprn = int(property_data["uprn"])
address = property_data["address1"]
postcode = property_data["postcode"]
expected_expiry_date = calculate_expiry_date(property_data["lodgement-date"])
response = retrieve_find_my_epc_data(
uprn=uprn,
postcode=postcode,
address=address,
expected_expiry_date=expected_expiry_date
)
if response is None:
continue
collected_data.append(
{
**response,
"epc": property_data.to_dict(),
"epc_directory": str(directory)
}
)
energy_consumption_data.extend(collected_data)
# Store the pickle in s3
save_time = datetime.now()
save_pickle_to_s3(
energy_consumption_data, bucket_name="retrofit-datalake-dev",
s3_file_name=f"energy_consumption_data/{save_time}.pkl"
)

View file

@ -0,0 +1,93 @@
import re
from datetime import datetime
from tqdm import tqdm
import pandas as pd
from utils.s3 import list_files_in_s3_folder, read_pickle_from_s3, save_dataframe_to_s3_parquet
# These columns we co-erce to strings before saving
PROBLEMATIC_COLUMNS = ["main-heating-controls", "floor-level"]
def extract_kwh_value(text):
"""
Extract the numerical kWh value from a given string.
:param text: The input string containing the kWh value.
:return: The extracted numerical kWh value as an integer.
"""
# Use regular expression to find the numerical value followed by "kWh per year"
match = re.search(r'([\d,]+) kWh per year', text)
if match:
# Remove commas from the extracted value and convert to integer
kwh_value = int(match.group(1).replace(',', ''))
return kwh_value
else:
# If no match is found, return None or raise an exception
return None
def app():
"""
Given the files written in our datalake in s3, this application will collate the data into a single file
and store it back in s3 for analysis
:return:
"""
# Firstly, list all of the saved files in s3
data_files = list_files_in_s3_folder(bucket_name="retrofit-datalake-dev", folder_name="energy_consumption_data")
run_date = datetime.now().strftime("%Y-%m-%d")
complete_data = []
for files in tqdm(data_files):
dataset_run_date = files.split("/")[-1].split(".")[0]
# Extract the date from the file name
dataset_run_date = pd.Timestamp(dataset_run_date)
# Load the data from the file
data = read_pickle_from_s3(bucket_name="retrofit-datalake-dev", s3_file_name=files)
# We check that the retrieved energy consumption sufficiently matches the EPC data
internal_dataset = []
for x in data:
epc_data = x["epc"]
epc_sap = epc_data["current-energy-efficiency"]
epc_potential_sap = epc_data["potential-energy-efficiency"]
# Make sure this matches the extracted sap
if int(epc_sap) != int(x["current_epc_efficiency"]) or int(epc_potential_sap) != int(
x["potential_epc_efficiency"]
):
continue
heating_kwh = extract_kwh_value(x["heating_text"])
hot_water_kwh = extract_kwh_value(x["hot_water_text"])
internal_dataset.append(
{
**epc_data,
"heating_kwh": heating_kwh,
"hot_water_kwh": hot_water_kwh,
"dataset_run_date": dataset_run_date
}
)
complete_data.extend(internal_dataset)
df = pd.DataFrame(complete_data)
# Because we collate multiple runs into a single data source, it's possible that we have duplicated data at
# the uprn level, so we dedupe based on the newest dataset_run_date
df = df.sort_values("dataset_run_date", ascending=False).drop_duplicates(subset="uprn", keep="first")
df = df.drop(columns=["dataset_run_date"])
for col in PROBLEMATIC_COLUMNS:
df[col] = df[col].astype(str)
# Save the data back to s3, but this time as a parquet file
save_dataframe_to_s3_parquet(
bucket_name="retrofit-data-dev",
file_key=f"energy_consumption/{run_date}/energy_consumption_dataset.parquet",
df=df
)

View file

@ -39,11 +39,8 @@ def app():
cleaned_data = {}
epc_directories = [entry for entry in EPC_DIRECTORY.iterdir() if entry.is_dir()]
WALLS = []
for directory in tqdm(epc_directories):
data = pd.read_csv(directory / "certificates.csv", low_memory=False)
z = data["WALLS_DESCRIPTION"].unique().tolist()
WALLS.extend(z)
# Rename the columns to the same format as the api returns
data.columns = [c.replace("_", "-").lower() for c in data.columns]
# Take just date before the date threshold

View file

@ -49,30 +49,30 @@ resource "aws_security_group" "allow_db" {
ingress {
# TLS (change to whatever ports you need)
from_port = 5432
to_port = 5432
protocol = "tcp"
from_port = 5432
to_port = 5432
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
}
resource "aws_db_instance" "default" {
allocated_storage = var.allocated_storage
engine = "postgres"
engine_version = "14.10"
instance_class = var.instance_class
db_name = var.database_name
username = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)["db_assessment_model_username"]
password = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)["db_assessment_model_password"]
parameter_group_name = "default.postgres14"
skip_final_snapshot = true
allocated_storage = var.allocated_storage
engine = "postgres"
engine_version = "14.10"
instance_class = var.instance_class
db_name = var.database_name
username = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)["db_assessment_model_username"]
password = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)["db_assessment_model_password"]
parameter_group_name = "default.postgres14"
skip_final_snapshot = true
vpc_security_group_ids = [aws_security_group.allow_db.id]
lifecycle {
prevent_destroy = true
@ -145,6 +145,23 @@ module "retrofit_heat_predictions" {
allowed_origins = var.allowed_origins
}
module "retrofit_lighting_cost_predictions" {
source = "./modules/s3"
bucketname = "retrofit-lighting-cost-predictions-${var.stage}"
allowed_origins = var.allowed_origins
}
module "retrofit_heating_cost_predictions" {
source = "./modules/s3"
bucketname = "retrofit-heating-cost-predictions-${var.stage}"
allowed_origins = var.allowed_origins
}
module "retrofit_hot_water_cost_predictions" {
source = "./modules/s3"
bucketname = "retrofit-hot-water-cost-predictions-${var.stage}"
allowed_origins = var.allowed_origins
}
# Set up the route53 record for the API
module "route53" {
@ -187,6 +204,22 @@ module "lambda_heat_prediction_ecr" {
source = "./modules/ecr"
}
# ECR repos for lighting cost, heating cost and hot water cost models
module "lambda_lighting_cost_prediction_ecr" {
ecr_name = "lighting-cost-prediction-${var.stage}"
source = "./modules/ecr"
}
module "lambda_heating_cost_prediction_ecr" {
ecr_name = "heating-cost-prediction-${var.stage}"
source = "./modules/ecr"
}
module "lambda_hot_water_cost_prediction_ecr" {
ecr_name = "hot-water-cost-prediction-${var.stage}"
source = "./modules/ecr"
}
##############################################
# CDN - Cloudfront
##############################################

View file

@ -246,3 +246,33 @@ def read_csv_from_s3(bucket_name, filepath):
data = list(reader)
return data
def list_files_in_s3_folder(bucket_name, folder_name):
"""
List all files in a given folder in an S3 bucket.
:param bucket_name: The name of the S3 bucket.
:param folder_name: The folder name within the S3 bucket.
:return: A list of file keys in the specified S3 folder.
"""
try:
s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)
if 'Contents' not in response:
logger.info(f"No files found in folder {folder_name} in bucket {bucket_name}.")
return []
file_keys = [content['Key'] for content in response['Contents']]
return file_keys
except NoCredentialsError:
logger.error("Credentials not available.")
return []
except PartialCredentialsError:
logger.error("Incomplete credentials provided.")
return []
except Exception as e:
logger.error(f'Failed to list files in folder {folder_name} in bucket {bucket_name}: {str(e)}')
return []