diff --git a/etl/bill_savings/EnergyConsumptionModel.py b/etl/bill_savings/EnergyConsumptionModel.py new file mode 100644 index 00000000..2ca88da5 --- /dev/null +++ b/etl/bill_savings/EnergyConsumptionModel.py @@ -0,0 +1,89 @@ +import pandas as pd +from datetime import datetime +from sklearn.model_selection import train_test_split +from sklearn.linear_model import LinearRegression +from sklearn.metrics import mean_squared_error, r2_score +from utils.s3 import save_pickle_to_s3, read_pickle_from_s3 + + +class EnergyConsumptionModel: + FEATURES = ['feature_1', 'feature_2'] + TARGETS = ['heating_kwh', 'hot_water_kwh'] + + def __init__(self, model_paths=None): + self.models = {} + self.model_paths = model_paths or {} + self.data = None + + self.X_train = None + self.X_test = None + self.y_train = None + self.y_test = None + + if model_paths: + for target, path in model_paths.items(): + self.models[target] = read_pickle_from_s3(bucket_name="retrofit-model-directory-dev", s3_file_name=path) + + def read_dataset(self, file_path): + self.data = pd.read_csv(file_path) + + def feature_engineering(self): + # Example feature engineering steps + self.data['feature_1'] = self.data['original_feature_1'] ** 2 + self.data['feature_2'] = self.data['original_feature_2'] ** 0.5 + # Add more feature engineering steps as required + + def split_dataset(self, target, test_size=0.2, random_state=42): + X = self.data[self.FEATURES] + y = self.data[target] + self.X_train, self.X_test, self.y_train, self.y_test = train_test_split( + X, y, test_size=test_size, random_state=random_state + ) + + def fit_model(self, target): + self.models[target] = LinearRegression() + self.models[target].fit(self.X_train, self.y_train) + + def evaluate_model(self, target): + y_pred = self.models[target].predict(self.X_test) + mse = mean_squared_error(self.y_test, y_pred) + r2 = r2_score(self.y_test, y_pred) + return {'MSE': mse, 'R2': r2} + + def save_model(self, target): + run_date = datetime.now().strftime("%Y-%m-%d") + save_pickle_to_s3( + self.models[target], + bucket_name="retrofit-model-directory-dev", + s3_file_name=f"model_directory/energy_consumption_model/{target}_{run_date}.pkl" + ) + + def score_new_data(self, new_data, target): + if target not in self.models: + raise ValueError(f"Model for target {target} not loaded or trained") + + new_data_transformed = self.transform_new_data(new_data) + return self.models[target].predict(new_data_transformed) + + def transform_new_data(self, new_data): + # Apply the same transformations as in feature_engineering + new_data['feature_1'] = new_data['original_feature_1'] ** 2 + new_data['feature_2'] = new_data['original_feature_2'] ** 0.5 + return new_data[self.FEATURES] + +# Example usage: +# model = EnergyConsumptionModel() +# model.read_dataset('/mnt/data/energy_consumption_dataset.csv') +# model.feature_engineering() + +# For heating_kwh +# model.split_dataset(target='heating_kwh') +# model.fit_model(target='heating_kwh') +# print(model.evaluate_model(target='heating_kwh')) +# model.save_model(target='heating_kwh') + +# For hot_water_kwh +# model.split_dataset(target='hot_water_kwh') +# model.fit_model(target='hot_water_kwh') +# print(model.evaluate_model(target='hot_water_kwh')) +# model.save_model(target='hot_water_kwh') diff --git a/etl/bill_savings/data_collation.py b/etl/bill_savings/data_collation.py new file mode 100644 index 00000000..ef2b286b --- /dev/null +++ b/etl/bill_savings/data_collation.py @@ -0,0 +1,94 @@ +import re +from datetime import datetime + +import pandas as pd + +from utils.s3 import list_files_in_s3_folder, read_pickle_from_s3, save_dataframe_to_s3_parquet + +# These columns we co-erce to strings before saving +PROBLEMATIC_COLUMNS = ["main-heating-controls"] + + +def extract_kwh_value(text): + """ + Extract the numerical kWh value from a given string. + + :param text: The input string containing the kWh value. + :return: The extracted numerical kWh value as an integer. + """ + # Use regular expression to find the numerical value followed by "kWh per year" + match = re.search(r'([\d,]+) kWh per year', text) + + if match: + # Remove commas from the extracted value and convert to integer + kwh_value = int(match.group(1).replace(',', '')) + return kwh_value + else: + # If no match is found, return None or raise an exception + return None + + +def app(): + """ + Given the files written in our datalake in s3, this application will collate the data into a single file + and store it back in s3 for analysis + :return: + """ + + # Firstly, list all of the saved files in s3 + data_files = list_files_in_s3_folder(bucket_name="retrofit-datalake-dev", folder_name="energy_consumption_data") + + run_date = datetime.now().strftime("%Y-%m-%d") + + complete_data = [] + for files in data_files: + dataset_run_date = files.split("/")[-1].split(".")[0] + # Extract the date from the file name + dataset_run_date = pd.Timestamp(dataset_run_date) + + # Load the data from the file + data = read_pickle_from_s3(bucket_name="retrofit-datalake-dev", s3_file_name=files) + + # We check that the retrieved energy consumption sufficiently matches the EPC data + internal_dataset = [] + for x in data: + epc_data = x["epc"] + epc_sap = epc_data["current-energy-efficiency"] + epc_potential_sap = epc_data["potential-energy-efficiency"] + # Make sure this matches the extracted sap + if int(epc_sap) != int(x["current_epc_efficiency"]) or int(epc_potential_sap) != int( + x["potential_epc_efficiency"] + ): + continue + + heating_kwh = extract_kwh_value(x["heating_text"]) + hot_water_kwh = extract_kwh_value(x["hot_water_text"]) + internal_dataset.append( + { + **epc_data, + "heating_kwh": heating_kwh, + "hot_water_kwh": hot_water_kwh, + "dataset_run_date": dataset_run_date + } + ) + + complete_data.extend(internal_dataset) + + df = pd.DataFrame(complete_data) + # Because we collate multiple runs into a single data source, it's possible that we have duplicated data at + # the uprn level, so we dedupe based on the newest dataset_run_date + + df = df.sort_values("dataset_run_date", ascending=False).drop_duplicates(subset="uprn", keep="first") + df = df.drop(columns=["dataset_run_date"]) + + for col in PROBLEMATIC_COLUMNS: + df[col] = df[col].astype(str) + + # Save the data back to s3, but this time as a parquet file + save_dataframe_to_s3_parquet( + bucket_name="retrofit-data-dev", + file_key=f"energy_consumption/{run_date}/energy_consumption_dataset.parquet", + df=df + ) + + df.to_csv("energy_consumption_dataset.csv", index=False) diff --git a/etl/bill_savings/data_collection.py b/etl/bill_savings/data_collection.py index 1f787d48..3b503122 100644 --- a/etl/bill_savings/data_collection.py +++ b/etl/bill_savings/data_collection.py @@ -131,7 +131,8 @@ def app(): sample_size = 100 energy_consumption_data = [] - for directory in tqdm(epc_directories): + for i, directory in tqdm(enumerate(epc_directories), total=len(epc_directories)): + data = pd.read_csv(directory / "certificates.csv", low_memory=False) # Rename the columns to the same format as the api returns data.columns = [c.replace("_", "-").lower() for c in data.columns] @@ -147,8 +148,8 @@ def app(): collected_data = [] for _, property_data in data.iterrows(): - # Sleep for a random time between 0.1 and 1.5 seconds - time.sleep(np.random.uniform(0.1, 1.5)) + # Sleep for a random time between 0.1 and 1.4 seconds + time.sleep(np.random.uniform(0.1, 1.4)) uprn = int(property_data["uprn"]) address = property_data["address1"] @@ -167,7 +168,7 @@ def app(): { **response, "epc": property_data.to_dict(), - "epc_directory": directory + "epc_directory": str(directory) } ) diff --git a/utils/s3.py b/utils/s3.py index 05482271..1b14ca97 100644 --- a/utils/s3.py +++ b/utils/s3.py @@ -246,3 +246,33 @@ def read_csv_from_s3(bucket_name, filepath): data = list(reader) return data + + +def list_files_in_s3_folder(bucket_name, folder_name): + """ + List all files in a given folder in an S3 bucket. + + :param bucket_name: The name of the S3 bucket. + :param folder_name: The folder name within the S3 bucket. + :return: A list of file keys in the specified S3 folder. + """ + try: + s3 = boto3.client('s3') + response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name) + + if 'Contents' not in response: + logger.info(f"No files found in folder {folder_name} in bucket {bucket_name}.") + return [] + + file_keys = [content['Key'] for content in response['Contents']] + return file_keys + + except NoCredentialsError: + logger.error("Credentials not available.") + return [] + except PartialCredentialsError: + logger.error("Incomplete credentials provided.") + return [] + except Exception as e: + logger.error(f'Failed to list files in folder {folder_name} in bucket {bucket_name}: {str(e)}') + return []