diff --git a/backend/apis/GoogleSolarApi.py b/backend/apis/GoogleSolarApi.py index 6d2ddf6c..d29e3da5 100644 --- a/backend/apis/GoogleSolarApi.py +++ b/backend/apis/GoogleSolarApi.py @@ -213,6 +213,10 @@ class GoogleSolarApi: # 1) Convert Solar Energy AD production from the DC production panel_performance["initial_ac_kwh_per_year"] = panel_performance["yearly_dc_energy"] * self.dc_to_ac_rate + # This is just a benchmark figure, based on the national figure. This doesn't not respect the fact that a + # property could be 100% electric + average_electricity_consumption + # Remove anything where the total ac energy is less than half of the array wattage panel_performance = panel_performance[ (panel_performance["initial_ac_kwh_per_year"] / panel_performance["array_warrage"]) >= 0.5 diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 80392c88..258449c2 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -284,16 +284,16 @@ async def trigger_plan(body: PlanTriggerRequest): property_id, is_new = create_property( session, body.portfolio_id, epc_searcher.address_clean, epc_searcher.postcode_clean, epc_searcher.uprn ) - # if not is_new: - # continue - # - # create_property_targets( - # session, - # property_id=property_id, - # portfolio_id=body.portfolio_id, - # epc_target=body.goal_value, - # heat_demand_target=None - # ) + if not is_new: + continue + + create_property_targets( + session, + property_id=property_id, + portfolio_id=body.portfolio_id, + epc_target=body.goal_value, + heat_demand_target=None + ) epc_records = { 'original_epc': epc_searcher.newest_epc.copy(), @@ -356,7 +356,7 @@ async def trigger_plan(body: PlanTriggerRequest): p.get_spatial_data(uprn_filenames) # Call Google Solar API # TODO: Complete me - # solar_performance = solar_api_client.get(longitude=p.spatial["longitude"], latitude=p.spatial["latitude"]) + solar_performance = solar_api_client.get(longitude=p.spatial["longitude"], latitude=p.spatial["latitude"]) logger.info("Getting components and epc recommendations") recommendations = {} diff --git a/backend/ml_models/AnnualBillSavings.py b/backend/ml_models/AnnualBillSavings.py index 7395ab6b..e6494bcd 100644 --- a/backend/ml_models/AnnualBillSavings.py +++ b/backend/ml_models/AnnualBillSavings.py @@ -1,5 +1,16 @@ import numpy as np +QUARTERLY_ENERGY_PRICES = [ + # 2024 Q1 + {"start": "2024-01-01", "end": "2024-03-31", "electricity": 0.2, "gas": 0.042}, + # 2023 Q4 + {"start": "2023-10-01", "end": "2023-12-31", "electricity": 0.202, "gas": 0.51}, + # 2023 Q3 + {"start": "2023-07-01", "end": "2023-09-30", "electricity": 0.188, "gas": 0.46}, + # 2023 Q2 + {"start": "2023-04-01", "end": "2023-06-30", "electricity": 0.177, "gas": 0.456}, +] + class AnnualBillSavings: """ diff --git a/etl/bill_savings/EnergyConsumptionModel.py b/etl/bill_savings/EnergyConsumptionModel.py new file mode 100644 index 00000000..b616be08 --- /dev/null +++ b/etl/bill_savings/EnergyConsumptionModel.py @@ -0,0 +1,526 @@ +import pandas as pd +import numpy as np +import msgpack +from xgboost import XGBRegressor +from datetime import datetime +from sklearn.model_selection import train_test_split +from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_percentage_error +from sklearn.feature_selection import RFECV +from utils.s3 import save_pickle_to_s3, read_pickle_from_s3, read_dataframe_from_s3_parquet, read_from_s3 +import logging +from pprint import pprint + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + + +class EnergyConsumptionModel: + FEATURES = { + "heating_kwh": [ + "lodgement-year", "lodgement-month", "current-energy-efficiency", "energy-consumption-current", + "heating-cost-current", "heating-cost-potential", "total-floor-area", "number-heated-rooms", + "mainheat-description", "mainheat-energy-eff", "main-fuel", "secondheat-description", "property-type", + "built-form", "mainheatcont-description", "hotwater-description", "hot-water-energy-eff", + "walls-energy-eff", + "roof-energy-eff", "windows-description", "windows-energy-eff", "floor-description", "flat-top-storey", + "flat-storey-count", "unheated-corridor-length", "solar-water-heating-flag", "mechanical-ventilation", + "low-energy-lighting", "environment-impact-current", "energy-tariff", + "county", "construction-age-band", "co2-emissions-current", + # TODO: Testing + "lighting-cost-current", "hot-water-cost-current", "current-energy-rating" + ], + "hot_water_kwh": [ + "lodgement-year", "lodgement-month", + "current-energy-efficiency", + "energy-consumption-current", + "hot-water-cost-current", + "total-floor-area", "number-heated-rooms", + "hotwater-description", "hot-water-energy-eff", "main-fuel", "property-type", "built-form", + "co2-emissions-current", + ] + } + TARGETS = ['heating_kwh', 'hot_water_kwh'] + CATEGORICAL_COLUMNS = [ + "lodgement-year", "lodgement-month", "main-fuel", "mainheat-description", "number-heated-rooms", + "number-habitable-rooms", "mainheat-energy-eff", "mainheatcont-description", "property-type", "built-form", + "construction-age-band", "secondheat-description", "hotwater-description", "hot-water-energy-eff", + "walls-description", "walls-energy-eff", "roof-description", "roof-energy-eff", "floor-description", + "county", + "windows-description", "windows-energy-eff", "flat-top-storey", + "flat-storey-count", "unheated-corridor-length", "solar-water-heating-flag", "mechanical-ventilation", + "low-energy-lighting", "environment-impact-current", "energy-tariff", "current-energy-rating" + ] + + def __init__(self, cleaned, model_paths=None, n_jobs=1): + self.cleaned = cleaned + self.models = {} + self.model_paths = model_paths or {} + self.n_jobs = n_jobs + + self.data = None + self.input_data = None + self.dummy_columns = None + self.training_predictions = {} + self.testing_predictions = {} + self.best_iteration = {} + + self.x_train = {} + self.x_test = {} + self.x_val = {} + self.y_val = {} + self.y_train = {} + self.y_test = {} + self.selected_features = {} + + self.NUMERICAL_COLUMNS = list({ + x for x in self.FEATURES["heating_kwh"] + self.FEATURES["hot_water_kwh"] + if x not in self.CATEGORICAL_COLUMNS + }) + + if model_paths: + for target, path in model_paths.items(): + self.models[target] = read_pickle_from_s3(bucket_name="retrofit-model-directory-dev", s3_file_name=path) + + def read_dataset(self, file_path): + """Reads the dataset from the specified file path.""" + logging.info(f"Reading dataset from {file_path}") + self.data = read_dataframe_from_s3_parquet(bucket_name="retrofit-data-dev", file_key=file_path) + self.input_data = self.data.copy() + + def feature_engineering(self): + """Performs feature engineering on the dataset.""" + logging.info("Starting feature engineering") + self.data["lodgement-date"] = pd.to_datetime(self.data["lodgement-date"]) + self.data["lodgement-year"] = self.data["lodgement-date"].dt.year + self.data["lodgement-month"] = self.data["lodgement-date"].dt.month + + # For walls, roof, floor description where we have average thermal transmittance, to avoid too many categories + # we group them + ranges = { + "lessthan 0.1": (0, 0.1), + "0.1 - 0.3": (0.1, 0.3), + "0.3 - 0.5": (0.3, 0.5), + "morethan 0.5": (0.5, 2.5), + } + + # Generate the lookup table + thermal_transmittance_lookup_table = [] + for i in range(1, 251): + value = i / 100 + for label, (low, high) in ranges.items(): + if low < value <= high: + thermal_transmittance_lookup_table.append({"from": value, "to": label}) + break + + # Convert to DataFrame for display + thermal_transmittance_lookup_table = pd.DataFrame(thermal_transmittance_lookup_table) + thermal_transmittance_lookup_table["from"] = thermal_transmittance_lookup_table["from"].astype(str) + + # Apply the lookup table to the data + for feature in ["walls-description", "roof-description", "floor-description"]: + cleaned_df = pd.DataFrame(self.cleaned[feature])[["original_description", "thermal_transmittance"]] + # Round to 2 decimal places and convert to string + cleaned_df["thermal_transmittance"] = cleaned_df["thermal_transmittance"].round(2).astype(str) + + self.data = self.data.merge( + cleaned_df, + how="left", + left_on=feature, + right_on="original_description", + ) + # We now have the thermal transmittance in the data, which we can use to group with the lookup table + self.data = self.data.merge( + thermal_transmittance_lookup_table, + how="left", + left_on="thermal_transmittance", + right_on="from", + ) + # Where "to" is populated, replace feature with to + self.data[feature] = np.where( + ~pd.isnull(self.data["to"]), + self.data["to"], + self.data[feature] + ) + self.data = self.data.drop(columns=["original_description", "thermal_transmittance", "from", "to"]) + + # Modify number of heated rooms and number of habitable rooms + self.data["number-heated-rooms"] = self.data["number-heated-rooms"].apply( + lambda x: "16_or_more" if x > 15 else str(x) + ) + # self.data["number-habitable-rooms"] = self.data["number-habitable-rooms"].apply( + # lambda x: "10+" if x > 10 else str(x) + # ) + + # Convert data types + self.data[self.NUMERICAL_COLUMNS] = self.data[self.NUMERICAL_COLUMNS].apply(pd.to_numeric) + self.data[self.CATEGORICAL_COLUMNS] = self.data[self.CATEGORICAL_COLUMNS].astype(str) + + # Convert categorical columns to dummies + self.data = pd.get_dummies(self.data, columns=self.CATEGORICAL_COLUMNS, drop_first=True) + + # Store the dummy columns + self.dummy_columns = {} + for target in self.TARGETS: + target_features = self.FEATURES[target] + dummy_feature_columns = [] + for feature in target_features: + if feature in self.CATEGORICAL_COLUMNS: + dummy_feature_columns.extend([col for col in self.data.columns if col.startswith(feature + '_')]) + else: + dummy_feature_columns.append(feature) + self.dummy_columns[target] = dummy_feature_columns + + logging.info("Feature engineering completed") + + def split_dataset(self, target, test_size=0.2, validation_size=0.2, random_state=42): + """Splits the dataset into training, validation, and testing sets.""" + if target not in self.TARGETS: + raise ValueError(f"Target {target} not in {self.TARGETS}") + + logging.info(f"Splitting dataset for target {target}") + + # Split into train + validation and test sets + x_train_val, x_test, y_train_val, y_test = train_test_split( + self.data[self.dummy_columns[target]], + self.data[target], + test_size=test_size, + random_state=random_state + ) + + # Split train + validation into train and validation sets + x_train, x_val, y_train, y_val = train_test_split( + x_train_val, + y_train_val, + test_size=validation_size / (1 - test_size), + random_state=random_state + ) + + self.x_train[target], self.x_val[target], self.x_test[target] = x_train, x_val, x_test + self.y_train[target], self.y_val[target], self.y_test[target] = y_train, y_val, y_test + + def feature_selection(self, target, cv_folds=3, sample_fraction=0.1, random_state=42): + """ + Performs feature selection using RFECV with XGBoost. + + Parameters: + - target: The target variable for feature selection. + - cv_folds: Number of cross-validation folds. + - sample_fraction: Fraction of the data to use for feature selection. + - random_state: Random state for reproducibility. + """ + if target not in self.TARGETS: + raise ValueError(f"Target {target} not in {self.TARGETS}") + + logging.info(f"Starting feature selection for target {target}") + + # Sample the data if specified + if sample_fraction < 1.0: + x_sample, _, y_sample, _ = train_test_split( + self.x_train[target], self.y_train[target], + train_size=sample_fraction, random_state=random_state + ) + else: + x_sample = self.x_train[target] + y_sample = self.y_train[target] + + # Initialize the XGBoost model and RFECV + model = self.init_model(feature_selection=True) + selector = RFECV( + model, step=1, cv=cv_folds, scoring='neg_mean_absolute_percentage_error', verbose=1, n_jobs=self.n_jobs + ) + selector = selector.fit(x_sample, y_sample) + + # Get the selected features + self.selected_features[target] = x_sample.columns[selector.support_] + + # Update x_train, x_test and x_val with selected features + self.x_train[target] = self.x_train[target][self.selected_features[target]] + self.x_test[target] = self.x_test[target][self.selected_features[target]] + self.x_val[target] = self.x_val[target][self.selected_features[target]] + + logging.info(f"Feature selection completed for target {target}") + + def init_model(self, feature_selection=False): + + if feature_selection: + # Set up a smaller model to work it + return XGBRegressor( + objective='reg:squarederror', + n_estimators=50, + learning_rate=0.05, + max_depth=6, + subsample=0.8, + colsample_bytree=0.8, + reg_alpha=0.1, + reg_lambda=0.1 + ) + + return XGBRegressor( + objective='reg:squarederror', + n_estimators=1000, + learning_rate=0.05, + max_depth=6, + min_child_weight=3, + subsample=0.8, + colsample_bytree=0.8, + reg_alpha=0.1, + reg_lambda=0.1 + # n_jobs=self.n_jobs + ) + + def fit_model(self, target): + """Fits the model to the training data and removes zero-importance features.""" + + logging.info(f"Fitting model for target {target}") + + # Initialize and fit the model + model = self.init_model() + model.fit( + self.x_train[target], + self.y_train[target], + eval_set=[(self.x_val[target], self.y_val[target])], + early_stopping_rounds=50 + ) + + # Store the model + self.models[target] = model + + # Identify and remove zero-importance features + feature_importance = pd.DataFrame({ + 'Feature': self.x_train[target].columns, + 'Importance': model.feature_importances_ + }) + zero_importance_features = feature_importance[feature_importance['Importance'] == 0]['Feature'].tolist() + + if zero_importance_features: + logging.info(f"Removing zero-importance features for target {target}: {zero_importance_features}") + + self.x_train[target] = self.x_train[target].drop(columns=zero_importance_features) + self.x_val[target] = self.x_val[target].drop(columns=zero_importance_features) + self.x_test[target] = self.x_test[target].drop(columns=zero_importance_features) + + # Re-fit the model with the reduced feature set + model = self.init_model() + model.fit( + self.x_train[target], + self.y_train[target], + eval_set=[(self.x_val[target], self.y_val[target])], + early_stopping_rounds=50 + ) + + # Update the model + self.models[target] = model + + # Store the best iteration + self.best_iteration[target] = self.models[target].best_iteration + + logging.info(f"Model fitting completed for target {target}") + + def re_train_final_model(self, target): + """Re-trains the final model on the combined training and validation set.""" + logging.info(f"Re-training final model for target {target}") + x_train_val = pd.concat([self.x_train[target], self.x_val[target]]) + y_train_val = pd.concat([self.y_train[target], self.y_val[target]]) + + self.models[target] = self.init_model() + + self.models[target].fit(x_train_val, y_train_val, verbose=False) + logging.info(f"Re-training final model completed for target {target}") + + def evaluate_model(self, target): + """Evaluates the model on training and testing data.""" + logging.info(f"Evaluating model for target {target}") + y_train_pred = self.models[target].predict(self.x_train[target]) + train_mse = mean_squared_error(self.y_train[target], y_train_pred) + train_r2 = r2_score(self.y_train[target], y_train_pred) + train_mape = mean_absolute_percentage_error(self.y_train[target], y_train_pred) + + self.training_predictions[target] = pd.DataFrame({ + 'Actual': self.y_train[target], + 'Predicted': y_train_pred + }) + self.training_predictions[target]["residual"] = abs( + self.training_predictions[target]["Actual"] - self.training_predictions[target]["Predicted"] + ) + + y_test_pred = self.models[target].predict(self.x_test[target]) + test_mse = mean_squared_error(self.y_test[target], y_test_pred) + test_r2 = r2_score(self.y_test[target], y_test_pred) + test_mape = mean_absolute_percentage_error(self.y_test[target], y_test_pred) + + self.testing_predictions[target] = pd.DataFrame({ + 'Actual': self.y_test[target], + 'Predicted': y_test_pred + }) + self.testing_predictions[target]["residual"] = abs( + self.testing_predictions[target]["Actual"] - self.testing_predictions[target]["Predicted"] + ) + + if target in self.selected_features: + feature_importance = pd.DataFrame({ + 'Feature': self.selected_features[target], + 'Importance': self.models[target].feature_importances_ + }).sort_values(by='Importance', ascending=False) + else: + feature_importance = pd.DataFrame({ + 'Feature': self.x_train[target].columns, + 'Importance': self.models[target].feature_importances_ + }).sort_values(by='Importance', ascending=False) + + logging.info(f"Evaluation completed for target {target}") + + return { + 'train': { + 'MSE': train_mse, + 'R2': train_r2, + 'MAPE': train_mape, + 'Feature Importance': feature_importance + }, + 'test': { + 'MSE': test_mse, + 'R2': test_r2, + 'MAPE': test_mape + } + } + + def save_model(self, target): + """Saves the model to S3.""" + logging.info(f"Saving model for target {target}") + run_date = datetime.now().strftime("%Y-%m-%d") + save_pickle_to_s3( + self.models[target], + bucket_name="retrofit-model-directory-dev", + s3_file_name=f"model_directory/energy_consumption_model/{target}_{run_date}.pkl" + ) + + def score_new_data(self, new_data, target): + """Scores new data using the trained model.""" + if target not in self.models: + raise ValueError(f"Model for target {target} not loaded or trained") + + new_data_transformed = self.transform_new_data(new_data, target) + return self.models[target].predict(new_data_transformed) + + def transform_new_data(self, new_data, target): + """Applies the same transformations to new data as were applied to the training data.""" + + # TODO THis should jsut use our other transformation function + new_data["lodgement-date"] = pd.to_datetime(new_data["lodgement-date"]) + new_data["lodgement-year"] = new_data["lodgement-date"].dt.year + new_data["lodgement-month"] = new_data["lodgement-date"].dt.month + + # Convert categorical columns to dummies + new_data = pd.get_dummies(new_data, columns=self.CATEGORICAL_COLUMNS, drop_first=True) + + # Align new data with the dummy columns from training data + new_data = new_data.reindex(columns=self.dummy_columns[target], fill_value=0) + + # Select the features used by the model + new_data = new_data[self.selected_features[target]] + + return new_data + + def error_analysis(self, target, top_n=10, unique_threshold=0.8): + """ + Perform error analysis on the provided model and dataset. + + Parameters: + - target: The target variable to analyze. + - top_n: Number of top residuals to consider for analysis. + - unique_threshold: Threshold to exclude columns with high unique values. + + Returns: + - summary: Dictionary summarizing common features among poorly performing rows. + """ + + # Calculate predictions and residuals + y_train_pred = self.models[target].predict(self.x_train[target]) + y_test_pred = self.models[target].predict(self.x_test[target]) + + train_residuals = self.y_train[target] - y_train_pred + test_residuals = self.y_test[target] - y_test_pred + + # Identify top N poorly performing rows by absolute residuals + top_train_indices = train_residuals.abs().nlargest(top_n).index + top_test_indices = test_residuals.abs().nlargest(top_n).index + + top_train_data = self.input_data.loc[top_train_indices] + top_test_data = self.input_data.loc[top_test_indices] + + # Automatically detect and exclude columns + def exclude_columns(data, threshold): + exclude_cols = [] + num_rows = data.shape[0] + for col in data.columns: + if data[col].dtype == 'object' and data[col].nunique() / num_rows >= threshold: + exclude_cols.append(col) + return exclude_cols + + exclude_cols = exclude_columns(top_train_data, unique_threshold) + + top_train_data = top_train_data.drop(columns=exclude_cols) + top_test_data = top_test_data.drop(columns=exclude_cols) + + # One-hot encode categorical variables + categorical_columns = top_train_data.select_dtypes(include=['object']).columns.tolist() + top_train_data_encoded = pd.get_dummies(top_train_data, columns=categorical_columns, drop_first=True) + top_test_data_encoded = pd.get_dummies(top_test_data, columns=categorical_columns, drop_first=True) + + # Ensure all original columns are included in the encoded data + top_train_data_encoded = top_train_data_encoded.reindex(columns=self.input_data.columns, fill_value=0) + top_test_data_encoded = top_test_data_encoded.reindex(columns=self.input_data.columns, fill_value=0) + + # Correlation analysis with residuals + train_corr = top_train_data_encoded.corrwith(train_residuals.loc[top_train_indices]) + test_corr = top_test_data_encoded.corrwith(test_residuals.loc[top_test_indices]) + + # Return summaries + summary = { + "train_summary": top_train_data.describe(include='all').T, + "test_summary": top_test_data.describe(include='all').T, + "train_corr": train_corr, + "test_corr": test_corr, + "top_train_data": top_train_data, + "top_test_data": top_test_data + } + + return summary + + +# Usage: +cleaned = read_from_s3( + s3_file_name="cleaned_epc_data/cleaned.bson", + bucket_name="retrofit-data-dev" +) + +cleaned = msgpack.unpackb(cleaned, raw=False) + +model = EnergyConsumptionModel(cleaned=cleaned, n_jobs=2) +model.read_dataset('energy_consumption/2024-07-05/energy_consumption_dataset.parquet') +model.feature_engineering() + +# For heating_kwh +model.split_dataset(target='heating_kwh') +model.fit_model(target='heating_kwh') +model.re_train_final_model(target='heating_kwh') +evaluation_results = model.evaluate_model(target='heating_kwh') + +pprint(evaluation_results["train"]) +pprint(evaluation_results["test"]) + +importance_df = evaluation_results["train"]["Feature Importance"] +testing_predictions = model.testing_predictions["heating_kwh"] +testing_predictions = testing_predictions.sort_values("residual", ascending=False) +training_predictions = model.training_predictions["heating_kwh"] +training_predictions = training_predictions.sort_values("residual", ascending=False) +# Merge on model.input_data, by the index +merged_data = testing_predictions.merge(model.input_data, left_index=True, right_index=True) +merged_data_train = training_predictions.merge(model.input_data, left_index=True, right_index=True) + +# For hot_water_kwh +model.split_dataset(target='hot_water_kwh') +model.fit_model(target='hot_water_kwh') +model.re_train_final_model(target='hot_water_kwh') +evaluation_results = model.evaluate_model(target='hot_water_kwh') +pprint(evaluation_results["train"]) +pprint(evaluation_results["test"]) diff --git a/etl/bill_savings/data_collection.py b/etl/bill_savings/data_collection.py new file mode 100644 index 00000000..4d913e8f --- /dev/null +++ b/etl/bill_savings/data_collection.py @@ -0,0 +1,184 @@ +import time +from datetime import datetime, timedelta +from dateutil.relativedelta import relativedelta + +import requests +import inspect +import pandas as pd +from tqdm import tqdm +from bs4 import BeautifulSoup +from etl.epc.settings import EARLIEST_EPC_DATE +from pathlib import Path +import numpy as np +from utils.s3 import save_pickle_to_s3 + +src_file_path = inspect.getfile(lambda: None) + +EPC_DIRECTORY = Path(src_file_path).parent / "local_data" / "all-domestic-certificates" +SEARCH_POSTCODE_URL = ( + "https://find-energy-certificate.service.gov.uk/find-a-certificate/search-by-postcode?postcode={postcode_input}" +) +BASE_ENERGY_URL = "https://find-energy-certificate.service.gov.uk" + + +def calculate_expiry_date(lodgement_date): + lodgement_date_dt = datetime.strptime(lodgement_date, '%Y-%m-%d') + expiry_date_dt = lodgement_date_dt + relativedelta(years=10) - timedelta(days=1) + return expiry_date_dt.strftime('%-d %B %Y') + + +def retrieve_find_my_epc_data(uprn: int, postcode: str, address: str, expected_expiry_date: str): + """ + For a post code and address, we pull out all the required data from the find my epc website + """ + + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' + 'Chrome/111.0.0.0 Safari/537.36' + } + postcode_input = postcode.replace(" ", "+") + postcode_search = SEARCH_POSTCODE_URL.format(postcode_input=postcode_input) + postcode_response = requests.get(postcode_search, headers=headers) + + address_cleaned = address.replace(",", "").replace(" ", "").lower() + postcode_res = BeautifulSoup(postcode_response.text, features="html.parser") + rows = postcode_res.find_all('tr', class_='govuk-table__row') + + extracted_table = [] + for row in rows: + # Extract the address and URL + address_tag = row.find('a', class_='govuk-link') + if address_tag is None: + continue + extracted_address = None + extracted_address_url = None + if address_tag: + extracted_address = address_tag.text.strip() + extracted_address_url = address_tag['href'] + + extracted_address_cleaned = extracted_address.replace(",", "").replace(" ", "").lower() + if not extracted_address_cleaned.startswith(address_cleaned): + continue + + # If the address is a match, we can extract the data + + # Extract the expiry date + expiry_date_tag = row.find('td', class_='govuk-table__cell date') + expiry_date = None + if expiry_date_tag is not None: + expiry_date = expiry_date_tag.parent.find('span').text.strip() + + extracted_table.append( + { + "extracted_address": extracted_address, + "extracted_address_url": extracted_address_url, + "expiry_date": expiry_date + } + ) + + extracted_table = [entry for entry in extracted_table if entry['expiry_date'] == expected_expiry_date] + + if len(extracted_table) > 1: + print("Multiple candidates found, skipping for now") + return None + + if not extracted_table: + print("No candidates found, skipping for now") + return None + + chosen_epc = BASE_ENERGY_URL + extracted_table[0]['extracted_address_url'] + epc_certificate = chosen_epc.split('/')[-1] + + address_response = requests.get(chosen_epc, headers=headers) + address_res = BeautifulSoup(address_response.text, features="html.parser") + + ratings = address_res.find('desc', {'id': 'svg-desc'}).text + current_rating = ratings.split(".")[0] + potential_rating = ratings.split(".")[1] + + # Retrieve the energy consumption + bills = address_res.find('div', {'id': 'bills-affected'}) + bills_list = bills.find_all('li') + if not bills_list: + return None + heating_text = bills_list[0].text + hot_water_text = bills_list[1].text + + resulting_data = { + 'extracted_uprn': uprn, + 'extracted_address': address, + 'epc_certificate': epc_certificate, + 'current_epc_rating': current_rating.split(' ')[-6], + 'current_epc_efficiency': int(current_rating.split(' ')[-1]), + 'potential_epc_rating': potential_rating.split(' ')[-6], + "potential_epc_efficiency": int(potential_rating.split(' ')[-1]), + "heating_text": heating_text, + "hot_water_text": hot_water_text, + } + + return resulting_data + + +def app(): + """ + This application is tasked with pulling a large quantity of data from the find my epc website, containing the + estimated energy consumption for properties + :return: + """ + + epc_directories = [entry for entry in EPC_DIRECTORY.iterdir() if entry.is_dir()] + + sample_size = 100 + + energy_consumption_data = [] + for i, directory in tqdm(enumerate(epc_directories), total=len(epc_directories)): + # Skip the first 50 + if i < 36: + continue + + data = pd.read_csv(directory / "certificates.csv", low_memory=False) + # Rename the columns to the same format as the api returns + data.columns = [c.replace("_", "-").lower() for c in data.columns] + # Take just date before the date threshold + data = data[data["lodgement-date"] >= EARLIEST_EPC_DATE] + + data = data[~pd.isnull(data["uprn"])] + # Take just the newest EPC per uprn, based on lodgement-date + data = data.sort_values("lodgement-date", ascending=False).drop_duplicates("uprn") + + data = data.sample(sample_size) + # We use the addreess data to find the related information + + collected_data = [] + for _, property_data in data.iterrows(): + time.sleep(np.random.uniform(0.3, 2)) + + uprn = int(property_data["uprn"]) + address = property_data["address1"] + postcode = property_data["postcode"] + expected_expiry_date = calculate_expiry_date(property_data["lodgement-date"]) + + response = retrieve_find_my_epc_data( + uprn=uprn, + postcode=postcode, + address=address, + expected_expiry_date=expected_expiry_date + ) + if response is None: + continue + collected_data.append( + { + **response, + "epc": property_data.to_dict(), + "epc_directory": str(directory) + } + ) + + energy_consumption_data.extend(collected_data) + + # Store the pickle in s3 + save_time = datetime.now() + save_pickle_to_s3( + energy_consumption_data, bucket_name="retrofit-datalake-dev", + s3_file_name=f"energy_consumption_data/{save_time}.pkl" + ) diff --git a/etl/bill_savings/data_combining.py b/etl/bill_savings/data_combining.py new file mode 100644 index 00000000..a111ecf2 --- /dev/null +++ b/etl/bill_savings/data_combining.py @@ -0,0 +1,93 @@ +import re +from datetime import datetime +from tqdm import tqdm + +import pandas as pd + +from utils.s3 import list_files_in_s3_folder, read_pickle_from_s3, save_dataframe_to_s3_parquet + +# These columns we co-erce to strings before saving +PROBLEMATIC_COLUMNS = ["main-heating-controls", "floor-level"] + + +def extract_kwh_value(text): + """ + Extract the numerical kWh value from a given string. + + :param text: The input string containing the kWh value. + :return: The extracted numerical kWh value as an integer. + """ + # Use regular expression to find the numerical value followed by "kWh per year" + match = re.search(r'([\d,]+) kWh per year', text) + + if match: + # Remove commas from the extracted value and convert to integer + kwh_value = int(match.group(1).replace(',', '')) + return kwh_value + else: + # If no match is found, return None or raise an exception + return None + + +def app(): + """ + Given the files written in our datalake in s3, this application will collate the data into a single file + and store it back in s3 for analysis + :return: + """ + + # Firstly, list all of the saved files in s3 + data_files = list_files_in_s3_folder(bucket_name="retrofit-datalake-dev", folder_name="energy_consumption_data") + + run_date = datetime.now().strftime("%Y-%m-%d") + + complete_data = [] + for files in tqdm(data_files): + dataset_run_date = files.split("/")[-1].split(".")[0] + # Extract the date from the file name + dataset_run_date = pd.Timestamp(dataset_run_date) + + # Load the data from the file + data = read_pickle_from_s3(bucket_name="retrofit-datalake-dev", s3_file_name=files) + + # We check that the retrieved energy consumption sufficiently matches the EPC data + internal_dataset = [] + for x in data: + epc_data = x["epc"] + epc_sap = epc_data["current-energy-efficiency"] + epc_potential_sap = epc_data["potential-energy-efficiency"] + # Make sure this matches the extracted sap + if int(epc_sap) != int(x["current_epc_efficiency"]) or int(epc_potential_sap) != int( + x["potential_epc_efficiency"] + ): + continue + + heating_kwh = extract_kwh_value(x["heating_text"]) + hot_water_kwh = extract_kwh_value(x["hot_water_text"]) + internal_dataset.append( + { + **epc_data, + "heating_kwh": heating_kwh, + "hot_water_kwh": hot_water_kwh, + "dataset_run_date": dataset_run_date + } + ) + + complete_data.extend(internal_dataset) + + df = pd.DataFrame(complete_data) + # Because we collate multiple runs into a single data source, it's possible that we have duplicated data at + # the uprn level, so we dedupe based on the newest dataset_run_date + + df = df.sort_values("dataset_run_date", ascending=False).drop_duplicates(subset="uprn", keep="first") + df = df.drop(columns=["dataset_run_date"]) + + for col in PROBLEMATIC_COLUMNS: + df[col] = df[col].astype(str) + + # Save the data back to s3, but this time as a parquet file + save_dataframe_to_s3_parquet( + bucket_name="retrofit-data-dev", + file_key=f"energy_consumption/{run_date}/energy_consumption_dataset.parquet", + df=df + ) diff --git a/etl/epc_clean/app.py b/etl/epc_clean/app.py index 59561b3c..1d833b72 100644 --- a/etl/epc_clean/app.py +++ b/etl/epc_clean/app.py @@ -39,11 +39,8 @@ def app(): cleaned_data = {} epc_directories = [entry for entry in EPC_DIRECTORY.iterdir() if entry.is_dir()] - WALLS = [] for directory in tqdm(epc_directories): data = pd.read_csv(directory / "certificates.csv", low_memory=False) - z = data["WALLS_DESCRIPTION"].unique().tolist() - WALLS.extend(z) # Rename the columns to the same format as the api returns data.columns = [c.replace("_", "-").lower() for c in data.columns] # Take just date before the date threshold diff --git a/infrastructure/terraform/main.tf b/infrastructure/terraform/main.tf index 0da850c5..b90c73a8 100644 --- a/infrastructure/terraform/main.tf +++ b/infrastructure/terraform/main.tf @@ -49,30 +49,30 @@ resource "aws_security_group" "allow_db" { ingress { # TLS (change to whatever ports you need) - from_port = 5432 - to_port = 5432 - protocol = "tcp" + from_port = 5432 + to_port = 5432 + protocol = "tcp" cidr_blocks = ["0.0.0.0/0"] } egress { - from_port = 0 - to_port = 0 - protocol = "-1" + from_port = 0 + to_port = 0 + protocol = "-1" cidr_blocks = ["0.0.0.0/0"] } } resource "aws_db_instance" "default" { - allocated_storage = var.allocated_storage - engine = "postgres" - engine_version = "14.10" - instance_class = var.instance_class - db_name = var.database_name - username = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)["db_assessment_model_username"] - password = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)["db_assessment_model_password"] - parameter_group_name = "default.postgres14" - skip_final_snapshot = true + allocated_storage = var.allocated_storage + engine = "postgres" + engine_version = "14.10" + instance_class = var.instance_class + db_name = var.database_name + username = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)["db_assessment_model_username"] + password = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)["db_assessment_model_password"] + parameter_group_name = "default.postgres14" + skip_final_snapshot = true vpc_security_group_ids = [aws_security_group.allow_db.id] lifecycle { prevent_destroy = true @@ -145,6 +145,23 @@ module "retrofit_heat_predictions" { allowed_origins = var.allowed_origins } +module "retrofit_lighting_cost_predictions" { + source = "./modules/s3" + bucketname = "retrofit-lighting-cost-predictions-${var.stage}" + allowed_origins = var.allowed_origins +} + +module "retrofit_heating_cost_predictions" { + source = "./modules/s3" + bucketname = "retrofit-heating-cost-predictions-${var.stage}" + allowed_origins = var.allowed_origins +} + +module "retrofit_hot_water_cost_predictions" { + source = "./modules/s3" + bucketname = "retrofit-hot-water-cost-predictions-${var.stage}" + allowed_origins = var.allowed_origins +} # Set up the route53 record for the API module "route53" { @@ -187,6 +204,22 @@ module "lambda_heat_prediction_ecr" { source = "./modules/ecr" } +# ECR repos for lighting cost, heating cost and hot water cost models +module "lambda_lighting_cost_prediction_ecr" { + ecr_name = "lighting-cost-prediction-${var.stage}" + source = "./modules/ecr" +} + +module "lambda_heating_cost_prediction_ecr" { + ecr_name = "heating-cost-prediction-${var.stage}" + source = "./modules/ecr" +} + +module "lambda_hot_water_cost_prediction_ecr" { + ecr_name = "hot-water-cost-prediction-${var.stage}" + source = "./modules/ecr" +} + ############################################## # CDN - Cloudfront ############################################## diff --git a/utils/s3.py b/utils/s3.py index 05482271..1b14ca97 100644 --- a/utils/s3.py +++ b/utils/s3.py @@ -246,3 +246,33 @@ def read_csv_from_s3(bucket_name, filepath): data = list(reader) return data + + +def list_files_in_s3_folder(bucket_name, folder_name): + """ + List all files in a given folder in an S3 bucket. + + :param bucket_name: The name of the S3 bucket. + :param folder_name: The folder name within the S3 bucket. + :return: A list of file keys in the specified S3 folder. + """ + try: + s3 = boto3.client('s3') + response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name) + + if 'Contents' not in response: + logger.info(f"No files found in folder {folder_name} in bucket {bucket_name}.") + return [] + + file_keys = [content['Key'] for content in response['Contents']] + return file_keys + + except NoCredentialsError: + logger.error("Credentials not available.") + return [] + except PartialCredentialsError: + logger.error("Incomplete credentials provided.") + return [] + except Exception as e: + logger.error(f'Failed to list files in folder {folder_name} in bucket {bucket_name}: {str(e)}') + return []