From 96235ed3a9d4c60b9d83bf6a61888df1dac4d1d1 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Mon, 8 Jul 2024 10:48:00 +0100 Subject: [PATCH] adding storage of dummy schema --- backend/Property.py | 26 +++- backend/app/plan/router.py | 30 ++++- backend/requirements/base.txt | 5 +- etl/bill_savings/EnergyConsumptionModel.py | 119 ++++++------------ etl/bill_savings/data_collection.py | 2 +- etl/bill_savings/training.py | 53 +++++++- .../places_for_people/demo_portfolio.py | 118 +++++++++++++++++ 7 files changed, 269 insertions(+), 84 deletions(-) create mode 100644 etl/customers/places_for_people/demo_portfolio.py diff --git a/backend/Property.py b/backend/Property.py index a80c3057..76bea0a6 100644 --- a/backend/Property.py +++ b/backend/Property.py @@ -512,7 +512,11 @@ class Property: return output def get_components( - self, cleaned, photo_supply_lookup, floor_area_decile_thresholds + self, + cleaned, + photo_supply_lookup, + floor_area_decile_thresholds, + energy_consumption_client ): """ Given the cleaning that has been performed, we'll use this to identify the property @@ -522,6 +526,8 @@ class Property: of the roof that is suitable for solar panels :param floor_area_decile_thresholds: This is the decile thresholds for the floor area, used in estimating the solar pv roof area + :param energy_consumption_client: Contains the heating and hot water kwh models - used to predict current + energy annual consumption in kWh :return: """ @@ -592,12 +598,28 @@ class Property: self.find_energy_sources() self.set_current_energy_bill() - def set_current_energy_bill(self): + def set_current_energy_bill(self, energy_consumption_client): """ Given what we know about the property now, estimates the current energy consumption using the UCL paper https://www.sciencedirect.com/science/article/pii/S0378778823002542 :return: """ + scoring_df = pd.DataFrame([self.epc_record.prepared_epc]) + # Change columns from underscores to hyphens + scoring_df.columns = [ + x.lower().replace("_", "-") for x in scoring_df.columns + ] + for col in ["heating_kwh", "hot_water_kwh"]: + scoring_df[col] = None + energy_consumption_client.data = None + heating_prediction = energy_consumption_client.score_new_data( + new_data=scoring_df, target="heating_kwh" + ) + + hot_water_prediction = energy_consumption_client.score_new_data( + new_data=scoring_df, target="hot_water_kwh" + ) + starting_heat_demand = ( float(self.data["energy-consumption-current"]) * self.floor_area ) diff --git a/backend/app/plan/router.py b/backend/app/plan/router.py index 258449c2..7c2d156b 100644 --- a/backend/app/plan/router.py +++ b/backend/app/plan/router.py @@ -40,6 +40,7 @@ from recommendations.Mds import Mds from utils.logger import setup_logger from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3 from backend.ml_models.Valuation import PropertyValuation +from etl.bill_savings.EnergyConsumptionModel import EnergyConsumptionModel logger = setup_logger() @@ -262,6 +263,7 @@ async def trigger_plan(body: PlanTriggerRequest): bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet", ) + # TODO: insert building id input_properties = [] for config in tqdm(plan_input): # We validate each record in the file. If the record is NOT valid, we need to handle this accordingly @@ -337,6 +339,11 @@ async def trigger_plan(body: PlanTriggerRequest): if not input_properties: return Response(status_code=204) + # TOOD: TEMP - store locally as pickle + # import pickle + # with open("input_properties.pkl", "wb") as f: + # pickle.dump(input_properties, f) + # The materials data could be cached or local so we don't need to make # consistent requests to the backend for # the same data @@ -350,9 +357,30 @@ async def trigger_plan(body: PlanTriggerRequest): photo_supply_lookup, floor_area_decile_thresholds = SolarPhotoSupply.load(bucket=get_settings().DATA_BUCKET) solar_api_client = GoogleSolarApi(api_key=get_settings().GOOGLE_SOLAR_API_KEY) + dataset_version = "2024-07-05" + energy_consumption_client = EnergyConsumptionModel( + model_paths={ + "heating_kwh": f"model_directory/energy_consumption_model/heating_kwh_{dataset_version}.pkl", + "hot_water_kwh": f"model_directory/energy_consumption_model/hot_water_kwh_{dataset_version}.pkl" + }, + cleaned=cleaned + ) + + # Store all of these locally + # with open("temp_inputs.pkl", "wb") as f: + # pickle.dump({ + # "input_properties": input_properties, + # "materials": materials, + # "cleaned": cleaned, + # "uprn_filenames": uprn_filenames, + # "photo_supply_lookup": photo_supply_lookup, + # "floor_area_decile_thresholds": floor_area_decile_thresholds, + # "model_client": model_client + # }, f) + logger.info("Getting spatial data") for p in input_properties: - p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds) + p.get_components(cleaned, photo_supply_lookup, floor_area_decile_thresholds, energy_consumption_client) p.get_spatial_data(uprn_filenames) # Call Google Solar API # TODO: Complete me diff --git a/backend/requirements/base.txt b/backend/requirements/base.txt index 3173f7f8..c4e7367c 100644 --- a/backend/requirements/base.txt +++ b/backend/requirements/base.txt @@ -36,4 +36,7 @@ boto3==1.28.3 pandas==1.5.3 pyarrow==12.0.1 textblob -usaddress==0.5.10 \ No newline at end of file +usaddress==0.5.10 + +# Requirements we may not need +xgboost==1.7.6 \ No newline at end of file diff --git a/etl/bill_savings/EnergyConsumptionModel.py b/etl/bill_savings/EnergyConsumptionModel.py index 14ece803..c77001b3 100644 --- a/etl/bill_savings/EnergyConsumptionModel.py +++ b/etl/bill_savings/EnergyConsumptionModel.py @@ -1,17 +1,14 @@ import pandas as pd import numpy as np -import msgpack from xgboost import XGBRegressor from datetime import datetime from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_percentage_error from sklearn.feature_selection import RFECV -from utils.s3 import save_pickle_to_s3, read_pickle_from_s3, read_dataframe_from_s3_parquet, read_from_s3 -import logging -from pprint import pprint +from utils.s3 import save_pickle_to_s3, read_pickle_from_s3, read_dataframe_from_s3_parquet +from utils.logger import setup_logger -# Configure logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = setup_logger() class EnergyConsumptionModel: @@ -61,6 +58,7 @@ class EnergyConsumptionModel: self.training_predictions = {} self.testing_predictions = {} self.best_iteration = {} + self.dummy_schema = None self.x_train = {} self.x_test = {} @@ -81,13 +79,13 @@ class EnergyConsumptionModel: def read_dataset(self, file_path): """Reads the dataset from the specified file path.""" - logging.info(f"Reading dataset from {file_path}") + logger.info(f"Reading dataset from {file_path}") self.data = read_dataframe_from_s3_parquet(bucket_name="retrofit-data-dev", file_key=file_path) self.input_data = self.data.copy() - def feature_engineering(self): + def feature_engineering(self, drop_first=False): """Performs feature engineering on the dataset.""" - logging.info("Starting feature engineering") + logger.info("Starting feature engineering") self.data["lodgement-date"] = pd.to_datetime(self.data["lodgement-date"]) self.data["lodgement-year"] = self.data["lodgement-date"].dt.year self.data["lodgement-month"] = self.data["lodgement-date"].dt.month @@ -141,20 +139,13 @@ class EnergyConsumptionModel: ) self.data = self.data.drop(columns=["original_description", "thermal_transmittance", "from", "to"]) - # Modify number of heated rooms and number of habitable rooms - # self.data["number-heated-rooms"] = self.data["number-heated-rooms"].apply( - # lambda x: "16_or_more" if x > 15 else str(x) - # ) - # self.data["number-habitable-rooms"] = self.data["number-habitable-rooms"].apply( - # lambda x: "10+" if x > 10 else str(x) - # ) - # Convert data types self.data[self.NUMERICAL_COLUMNS] = self.data[self.NUMERICAL_COLUMNS].apply(pd.to_numeric) self.data[self.CATEGORICAL_COLUMNS] = self.data[self.CATEGORICAL_COLUMNS].astype(str) # Convert categorical columns to dummies - self.data = pd.get_dummies(self.data, columns=self.CATEGORICAL_COLUMNS, drop_first=True) + self.data = pd.get_dummies(self.data, columns=self.CATEGORICAL_COLUMNS, drop_first=drop_first) + self.dummy_schema = self.data.columns.tolist() # Store the dummy columns self.dummy_columns = {} @@ -168,14 +159,14 @@ class EnergyConsumptionModel: dummy_feature_columns.append(feature) self.dummy_columns[target] = dummy_feature_columns - logging.info("Feature engineering completed") + logger.info("Feature engineering completed") def split_dataset(self, target, test_size=0.2, validation_size=0.2, random_state=42): """Splits the dataset into training, validation, and testing sets.""" if target not in self.TARGETS: raise ValueError(f"Target {target} not in {self.TARGETS}") - logging.info(f"Splitting dataset for target {target}") + logger.info(f"Splitting dataset for target {target}") # Split into train + validation and test sets x_train_val, x_test, y_train_val, y_test = train_test_split( @@ -209,7 +200,7 @@ class EnergyConsumptionModel: if target not in self.TARGETS: raise ValueError(f"Target {target} not in {self.TARGETS}") - logging.info(f"Starting feature selection for target {target}") + logger.info(f"Starting feature selection for target {target}") # Sample the data if specified if sample_fraction < 1.0: @@ -236,7 +227,7 @@ class EnergyConsumptionModel: self.x_test[target] = self.x_test[target][self.selected_features[target]] self.x_val[target] = self.x_val[target][self.selected_features[target]] - logging.info(f"Feature selection completed for target {target}") + logger.info(f"Feature selection completed for target {target}") def init_model(self, feature_selection=False): @@ -269,7 +260,7 @@ class EnergyConsumptionModel: def fit_model(self, target): """Fits the model to the training data and removes zero-importance features.""" - logging.info(f"Fitting model for target {target}") + logger.info(f"Fitting model for target {target}") # Initialize and fit the model model = self.init_model() @@ -291,7 +282,7 @@ class EnergyConsumptionModel: zero_importance_features = feature_importance[feature_importance['Importance'] == 0]['Feature'].tolist() if zero_importance_features: - logging.info(f"Removing zero-importance features for target {target}: {zero_importance_features}") + logger.info(f"Removing zero-importance features for target {target}: {zero_importance_features}") self.x_train[target] = self.x_train[target].drop(columns=zero_importance_features) self.x_val[target] = self.x_val[target].drop(columns=zero_importance_features) @@ -312,22 +303,22 @@ class EnergyConsumptionModel: # Store the best iteration self.best_iteration[target] = self.models[target].best_iteration - logging.info(f"Model fitting completed for target {target}") + logger.info(f"Model fitting completed for target {target}") def re_train_final_model(self, target): """Re-trains the final model on the combined training and validation set.""" - logging.info(f"Re-training final model for target {target}") + logger.info(f"Re-training final model for target {target}") x_train_val = pd.concat([self.x_train[target], self.x_val[target]]) y_train_val = pd.concat([self.y_train[target], self.y_val[target]]) self.models[target] = self.init_model() self.models[target].fit(x_train_val, y_train_val, verbose=False) - logging.info(f"Re-training final model completed for target {target}") + logger.info(f"Re-training final model completed for target {target}") def evaluate_model(self, target): """Evaluates the model on training and testing data.""" - logging.info(f"Evaluating model for target {target}") + logger.info(f"Evaluating model for target {target}") y_train_pred = self.models[target].predict(self.x_train[target]) train_mse = mean_squared_error(self.y_train[target], y_train_pred) train_r2 = r2_score(self.y_train[target], y_train_pred) @@ -365,7 +356,7 @@ class EnergyConsumptionModel: 'Importance': self.models[target].feature_importances_ }).sort_values(by='Importance', ascending=False) - logging.info(f"Evaluation completed for target {target}") + logger.info(f"Evaluation completed for target {target}") return { 'train': { @@ -381,14 +372,19 @@ class EnergyConsumptionModel: } } - def save_model(self, target): + def save_model(self, target, dataset_version): """Saves the model to S3.""" - logging.info(f"Saving model for target {target}") - run_date = datetime.now().strftime("%Y-%m-%d") + logger.info(f"Saving model for target {target}") save_pickle_to_s3( self.models[target], bucket_name="retrofit-model-directory-dev", - s3_file_name=f"model_directory/energy_consumption_model/{target}_{run_date}.pkl" + s3_file_name=f"model_directory/energy_consumption_model/{target}_{dataset_version}.pkl" + ) + logger.info("Saving dummy schema for target {target}") + save_pickle_to_s3( + self.dummy_schema, + bucket_name="retrofit-model-directory-dev", + s3_file_name=f"model_directory/energy_consumption_model/{target}_{dataset_version}_dummy_schema.pkl" ) def score_new_data(self, new_data, target): @@ -404,57 +400,24 @@ class EnergyConsumptionModel: self.data = new_data.copy() # Run feature engineering - self.feature_engineering() + # TODO: This needs to be dummied out according to the training data + self.feature_engineering(drop_first=False) # Select the transformed data - new_data_transformed = self.data[self.dummy_columns[target]] + new_data_transformed = self.data[self.dummy_columns[target]].copy() - # Ensure the columns match the selected features - new_data_transformed = new_data_transformed[self.selected_features[target]] + missed_dummies = [c for c in self.models[target].feature_names_in_ if c not in new_data_transformed.columns] + zero_df = pd.DataFrame([dict(zip(missed_dummies, [0, ] * len(missed_dummies)))]) + + new_data_transformed = pd.concat([new_data_transformed, zero_df], axis=1) + # When we dummy in this case, we run with drop_first = False so we may end up with some of those + # first columns, we we'll need to dorp them + new_data_transformed = new_data_transformed[self.models[target].feature_names_in_] # Generate predictions - predictions = self.models[target].predict(new_data_transformed) + prediction = self.models[target].predict(new_data_transformed) # Reset self.data to None self.data = None - return predictions - - -# Usage: -cleaned = read_from_s3( - s3_file_name="cleaned_epc_data/cleaned.bson", - bucket_name="retrofit-data-dev" -) - -cleaned = msgpack.unpackb(cleaned, raw=False) - -model = EnergyConsumptionModel(cleaned=cleaned, n_jobs=2) -model.read_dataset('energy_consumption/2024-07-05/energy_consumption_dataset.parquet') -model.feature_engineering() - -# For heating_kwh -model.split_dataset(target='heating_kwh') -model.fit_model(target='heating_kwh') -model.re_train_final_model(target='heating_kwh') -evaluation_results = model.evaluate_model(target='heating_kwh') - -pprint(evaluation_results["train"]) -pprint(evaluation_results["test"]) - -importance_df = evaluation_results["train"]["Feature Importance"] -testing_predictions = model.testing_predictions["heating_kwh"] -testing_predictions = testing_predictions.sort_values("residual", ascending=False) -training_predictions = model.training_predictions["heating_kwh"] -training_predictions = training_predictions.sort_values("residual", ascending=False) -# Merge on model.input_data, by the index -merged_data = testing_predictions.merge(model.input_data, left_index=True, right_index=True) -merged_data_train = training_predictions.merge(model.input_data, left_index=True, right_index=True) - -# For hot_water_kwh -model.split_dataset(target='hot_water_kwh') -model.fit_model(target='hot_water_kwh') -model.re_train_final_model(target='hot_water_kwh') -evaluation_results = model.evaluate_model(target='hot_water_kwh') -pprint(evaluation_results["train"]) -pprint(evaluation_results["test"]) + return prediction diff --git a/etl/bill_savings/data_collection.py b/etl/bill_savings/data_collection.py index 4d913e8f..4fc03f99 100644 --- a/etl/bill_savings/data_collection.py +++ b/etl/bill_savings/data_collection.py @@ -133,7 +133,7 @@ def app(): energy_consumption_data = [] for i, directory in tqdm(enumerate(epc_directories), total=len(epc_directories)): # Skip the first 50 - if i < 36: + if i < 260: continue data = pd.read_csv(directory / "certificates.csv", low_memory=False) diff --git a/etl/bill_savings/training.py b/etl/bill_savings/training.py index 2c29c317..b1a939a1 100644 --- a/etl/bill_savings/training.py +++ b/etl/bill_savings/training.py @@ -1,5 +1,56 @@ -def hanlder(): +from pprint import pprint +import msgpack +from utils.s3 import read_from_s3 +from etl.bill_savings.EnergyConsumptionModel import EnergyConsumptionModel + + +def handler(): """ This function is used to train the model and store the final models in s3 as pickles :return: """ + + dataset_version = "2024-07-05" + + # Usage: + cleaned = read_from_s3( + s3_file_name="cleaned_epc_data/cleaned.bson", + bucket_name="retrofit-data-dev" + ) + + cleaned = msgpack.unpackb(cleaned, raw=False) + + model = EnergyConsumptionModel(cleaned=cleaned, n_jobs=2) + model.read_dataset(f'energy_consumption/{dataset_version}/energy_consumption_dataset.parquet') + model.feature_engineering() + + # For heating_kwh + model.split_dataset(target='heating_kwh') + model.fit_model(target='heating_kwh') + model.re_train_final_model(target='heating_kwh') + evaluation_results = model.evaluate_model(target='heating_kwh') + + pprint(evaluation_results["train"]) + pprint(evaluation_results["test"]) + + model.save_model(target='heating_kwh', dataset_version=dataset_version) + + # importance_df = evaluation_results["train"]["Feature Importance"] + # testing_predictions = model.testing_predictions["heating_kwh"] + # testing_predictions = testing_predictions.sort_values("residual", ascending=False) + # training_predictions = model.training_predictions["heating_kwh"] + # training_predictions = training_predictions.sort_values("residual", ascending=False) + # # Merge on model.input_data, by the index + # merged_data = testing_predictions.merge(model.input_data, left_index=True, right_index=True) + # merged_data_train = training_predictions.merge(model.input_data, left_index=True, right_index=True) + + # For hot_water_kwh + model.split_dataset(target='hot_water_kwh') + model.fit_model(target='hot_water_kwh') + model.re_train_final_model(target='hot_water_kwh') + evaluation_results = model.evaluate_model(target='hot_water_kwh') + + pprint(evaluation_results["train"]) + pprint(evaluation_results["test"]) + + model.save_model(target='hot_water_kwh', dataset_version=dataset_version) diff --git a/etl/customers/places_for_people/demo_portfolio.py b/etl/customers/places_for_people/demo_portfolio.py new file mode 100644 index 00000000..5c290ad7 --- /dev/null +++ b/etl/customers/places_for_people/demo_portfolio.py @@ -0,0 +1,118 @@ +import pandas as pd + +from utils.s3 import save_csv_to_s3 + +PORTFOLIO_ID = 83 +USER_ID = 8 + + +def app(): + # TODO: We can insert a variable, indicating the they own all of the units in the building + asset_list = [ + { + "address": "Flat 1, Fenton Court", + "postcode": "N2 8DS", + "uprn": 200140644, + "building_id": 1, + }, + { + "address": "Flat 2, Fenton Court", + "postcode": "N2 8DS", + "uprn": 200140645, + "building_id": 1, + }, + { + "address": "Flat 3, Fenton Court", + "postcode": "N2 8DS", + "uprn": 200140646, + "building_id": 1, + }, + { + "address": "Flat 4, Fenton Court", + "postcode": "N2 8DS", + "uprn": 200140647, + "building_id": 1, + }, + { + "address": "Flat 5, Fenton Court", + "postcode": "N2 8DS", + "uprn": 200140648, + "building_id": 1, + }, + { + "address": "Flat 6, Fenton Court", + "postcode": "N2 8DS", + "uprn": 200140649, + "building_id": 1, + } + ] + + asset_list = pd.DataFrame(asset_list) + + # Store the asset list in s3 + filename = f"{USER_ID}/{PORTFOLIO_ID}/non_intrusives.csv" + save_csv_to_s3( + dataframe=asset_list, + bucket_name="retrofit-plan-inputs-dev", + file_name=filename + ) + + body = { + "portfolio_id": str(PORTFOLIO_ID), + "housing_type": "Private", + "goal": "Increase EPC", + "goal_value": "B", + "trigger_file_path": filename, + "already_installed_file_path": "", + "patches_file_path": "", + "non_invasive_recommendations_file_path": "", + "budget": None, + } + print(body) + + # Get an example of flats with solar panels from epc data + + # import inspect + # import pandas as pd + # from tqdm import tqdm + # from pathlib import Path + # + # src_file_path = inspect.getfile(lambda: None) + # + # EPC_DIRECTORY = Path(src_file_path).parent / "local_data" / "all-domestic-certificates" + # + # epc_directories = [entry for entry in EPC_DIRECTORY.iterdir() if entry.is_dir()] + # + # directory = epc_directories[1] + # data = pd.read_csv(directory / "certificates.csv", low_memory=False) + # # Get flats + # data = data[data["PROPERTY_TYPE"].str.lower().str.contains("flat")] + # data = data[~pd.isnull(data["UPRN"])] + # data["UPRN"] = data["UPRN"].astype(int).astype(str) + # data = data[pd.to_datetime(data["LODGEMENT_DATE"]) > "2020-01-01"] + # flats_with_solar = data[data['PHOTO_SUPPLY'] > 0] + # + # print(flats_with_solar["UPRN"]) + # + # flats_with_solar[["ADDRESS", "UPRN"]] + # + # # Good example: + # # UPRN: 10013160824, Flat 39, The Meadow, 30 Busk Meadow S5 7JH (care home with 39 flats, have solar panels) + # # + # # Mostly, For a mid-floor flat, the property doesn't show as having solar panels through the photo_supply variable + # # But actually for UPRN: 10013245713, Apartment 4, Orchard House, Gill Lane PR4 5QN, this has a dwelling above + # # but the photo_supply variable is 20 + # + # # Small flat consisting of 2 units + # # UPRN: 42172953, FLAT 2, 276 CLAUGHTON ROAD, BIRKENHEAD CH41 4DX + # + # # Flat containing 5 units + # # UPRN: 10013247127 Flat 1, Old Church House PR4 5GE + # # UPRN: 10013247130 Flat 4, Old Church House PR4 5GE + # + # # Flat containing multiple units: + # # UPRNS: 10013245710, 10013245716, 10013245711, 10013245717, 10013245714, 10013245715, 10013245712, 10013245713 + # + # # Look for flats with air source heat pumps! + # flats_with_asps = data[data["MAINHEAT_DESCRIPTION"].str.lower().str.contains("air source heat pump")] + # print(flats_with_asps[["UPRN", "ADDRESS"]])