refactoring repo wip

This commit is contained in:
Khalim Conn-Kowlessar 2023-10-05 14:52:01 +01:00
parent 9d14953efb
commit 311c8ff171
18 changed files with 9 additions and 8690 deletions

View file

@ -39,12 +39,8 @@ from model_data.optimiser.GainOptimiser import GainOptimiser
from model_data.optimiser.CostOptimiser import CostOptimiser
from backend.app.utils import epc_to_sap_lower_bound, read_parquet_from_s3
from model_data.optimiser.optimiser_functions import prepare_input_measures
from model_data.simulation_system.core.DataProcessor import DataProcessor
from model_data.simulation_system.core.Settings import COLUMNS_TO_MERGE_ON
# TODO: This is placeholder until data is stored in DB
from backend.app.plan.uvalue_estimates_walls import uvalue_estimates_walls
from backend.app.plan.uvalue_estimates_floors import uvalue_estimates_floors
from etl.property_change.DataProcessor import DataProcessor
from etl.property_change.settings import COLUMNS_TO_MERGE_ON
logger = setup_logger()
@ -273,22 +269,9 @@ async def trigger_plan(body: PlanTriggerRequest):
# Property recommendations
p.get_components(cleaned)
# This is placeholder, until the full dataset is loaded into the database and we just make a read to the
# database
floors_u_value_estimate = [
x for x in uvalue_estimates_floors
if (x['local-authority'] == p.data["local-authority"]) &
(x['property-type'] == p.data["property-type"]) &
(x['built-form'] == p.data["built-form"]) &
(x['floor-energy-eff'] == p.data["floor-energy-eff"] if p.data[
"floor-energy-eff"] != 'N/A' else True) &
(x['floor-env-eff'] == p.data["floor-env-eff"] if p.data["floor-env-eff"] != 'N/A' else True)
]
# Floor recommendations
floor_recommender = FloorRecommendations(
property_instance=p,
uvalue_estimates=floors_u_value_estimate,
total_floor_area_group_decile=total_floor_area_group_decile,
materials=materials_by_type["suspended_floor_insulation"] + materials_by_type["solid_floor_insulation"],
)

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -3,8 +3,8 @@ import os
import pandas as pd
import msgpack
from model_data.EpcClean import EpcClean
from model_data.simulation_system.core.Settings import EARLIEST_EPC_DATE
from etl.epc_clean.EpcClean import EpcClean
from etl.property_change.settings import EARLIEST_EPC_DATE
from pathlib import Path
from utils.s3 import save_data_to_s3
@ -37,8 +37,6 @@ def app():
epc_directories = [entry for entry in EPC_DIRECTORY.iterdir() if entry.is_dir()]
for directory in tqdm(epc_directories):
directory_destructured = str(directory).split("/")[-1].split("-")
gss_code = directory_destructured[1]
local_authority = directory_destructured[2]
data = pd.read_csv(directory / "certificates.csv", low_memory=False)
# Rename the columns to the same format as the api returns
@ -62,14 +60,6 @@ def app():
new_data = [x for x in data if x["original_description"] not in existing_descriptions]
cleaned_data[k].extend(new_data)
# TODO: Add property age band into this
# uvalue_estimates = UvalueEstimations(data=data)
# uvalue_estimates.get_estimates(cleaner=cleaner)
# # TODO: Store these to a s3
# uvalue_estimates.walls
# uvalue_estimates.floors
# uvalue_estimates.roofs
# Basic check to make sure all descriptions are unique
for _, cleaned in cleaned_data.items():
descriptions = [x["original_description"] for x in cleaned]

View file

@ -2,7 +2,7 @@ from pathlib import Path
import numpy as np
import pandas as pd
from model_data.BaseUtility import Definitions
from model_data.simulation_system.core.Settings import (
from etl.property_change.settings import (
DATA_PROCESSOR_SETTINGS,
EARLIEST_EPC_DATE,
FULLY_GLAZED_DESCRIPTIONS,

View file

@ -1 +1,4 @@
PyPDF2
boto3==1.28.38
PyPDF2==3.0.1
pydantic==1.10.11
epc-api-python==1.0.2

View file

@ -1,183 +0,0 @@
"""
Set up the client to be used for downloading and uploading model files
"""
import os
import boto3
from core.Logger import logger
# class S3FSClient:
# """
# Set up the correct client to upload files to s3
# """
# def __init__(self, runtime_environment: str = "local") -> None:
# self.client: s3fs.S3FileSystem | None = None
# self.model_bucket: str
# self.client_factory(runtime_environment)
# self.determine_model_bucket(runtime_environment)
# def client_factory(self, runtime_environment: str = "local"):
# """
# Select the correct s3 client to use
# """
# if runtime_environment == "local":
# logger.info("No S3 client setup required")
# elif runtime_environment == "local-mock":
# logger.info(f"S3 settings for {runtime_environment}")
# self.client = s3fs.S3FileSystem(
# key=os.environ.get("AWS_ACCESS_KEY_ID", "admin"),
# secret=os.environ.get("AWS_SECRET_ACCESS_KEY", "password"),
# client_kwargs={
# "endpoint_url": os.environ.get(
# "ENDPOINT_URL", "http://localhost:9000"
# )
# },
# )
# elif runtime_environment in ["dev", "staging", "prod"]:
# logger.info(f"S3 settings for {runtime_environment}")
# # Key/ token should be in session/lambda for this
# self.client = s3fs.S3FileSystem()
# else:
# raise NotImplementedError("No correspnding runtime environment")
# def determine_model_bucket(self, runtime_environment: str) -> None:
# """
# For the given environment, return the correct bucket for models
# """
# if runtime_environment == "local":
# logger.info("In local development - no need for s3")
# elif runtime_environment in ["local-mock", "dev"]:
# # TODO: get from enironment
# self.model_bucket = "retrofit-model-directory-dev"
# elif runtime_environment in ["staging", "prod"]:
# self.model_bucket = f"retrofit-model-directory-{runtime_environment}"
# else:
# raise NotImplementedError("No corresponding runtime environment")
# def download_model(self, filepath: str, model_folder: str):
# """
# For the file path, download the model locally so that we can load the model
# """
# if self.client is None:
# logger.info("No need to download model as local development")
# else:
# def list_files_recursively(folder_path, client):
# all_files = []
# for root, dirs, files in client.walk(folder_path):
# for file in files:
# s3_path = os.path.join(root, file)
# all_files.append(s3_path)
# return all_files
# # List all files in the specified S3 folder and its subfolders
# files = list_files_recursively(
# f"{self.model_bucket}/{filepath}", client=self.client
# )
# # Download each file
# for file in files:
# # Extract the filename from the S3 path
# filename = file.split(filepath)[-1]
# # Define the local path where you want to save the file
# local_path = os.path.join(model_folder, filename)
# # Download the file from S3 to the local directory
# self.client.get(file, local_path)
# print(f"Downloaded {filename} to {local_path}")
# print("Download completed.")
class BotoClient:
"""
Using boto3 to access the different aws storage configurations
"""
def __init__(self, runtime_environment: str = "local") -> None:
self.client = None
self.model_bucket: str
self.client_factory(runtime_environment)
self.determine_model_bucket(runtime_environment)
def client_factory(self, runtime_environment: str = "local"):
"""
Select the correct s3 client to use
"""
if runtime_environment == "local":
logger.info("No S3 client setup required")
elif runtime_environment == "local-mock":
logger.info(f"S3 settings for {runtime_environment}")
session = boto3.Session()
self.client = session.client(
service_name="s3",
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", "admin"),
aws_secret_access_key=os.environ.get(
"AWS_SECRET_ACCESS_KEY", "password"
),
endpoint_url=os.environ.get("ENDPOINT_URL", "http://localhost:9000"),
)
elif runtime_environment in ["dev", "staging", "prod"]:
logger.info(f"S3 settings for {runtime_environment}")
# Key/ token should be in session/lambda for this
self.client = boto3.client("s3")
else:
raise NotImplementedError("No correspnding runtime environment")
def determine_model_bucket(self, runtime_environment: str) -> None:
"""
For the given environment, return the correct bucket for models
"""
if runtime_environment == "local":
logger.info("In local development - no need for s3")
elif runtime_environment in ["local-mock", "dev"]:
# TODO: get from enironment
self.model_bucket = "retrofit-model-directory-dev"
elif runtime_environment in ["staging", "prod"]:
self.model_bucket = f"retrofit-model-directory-{runtime_environment}"
else:
raise NotImplementedError("No corresponding runtime environment")
def download_model(self, filepath: str, model_folder: str):
"""
For the file path, download the model locally so that we can load the model
"""
# List all objects with the specified prefix in the bucket
if self.client is None:
raise ValueError("SHould not be in here!")
objects = self.client.list_objects_v2(Bucket=self.model_bucket, Prefix=filepath)
# Ensure the local directory for downloads exists
if not os.path.exists(model_folder):
os.makedirs(model_folder)
# Download each object with the specified prefix
for obj in objects.get("Contents", []):
# Get the object key (file path)
object_key = obj["Key"]
# Determine the local file path to save the object
local_file_path = os.path.join(
model_folder, object_key.split(f"{filepath}/")[-1]
)
# Create the local directory if it doesn't exist
local_directory = os.path.dirname(local_file_path)
if not os.path.exists(local_directory):
os.makedirs(local_directory)
# Download the object from S3 to the local directory
self.client.download_file(self.model_bucket, object_key, local_file_path)
print(f"Downloaded {object_key} to {local_file_path}")
print("Download completed.")

View file

@ -1,145 +0,0 @@
import pandas as pd
import os
from typing import Protocol
import boto3
from io import BytesIO, StringIO
from core.CloudClient import BotoClient
def read_parquet_from_s3(client, bucket_name, file_key):
"""
Read a CSV file from S3 using boto3 and pandas.
:param bucket_name: Name of the S3 bucket.
:param file_key: Key of the file (including directory path within the bucket).
:param aws_access_key_id: AWS Access Key ID
:param aws_secret_access_key: AWS Secret Access Key
:return: DataFrame containing the CSV data.
"""
# Get the object
s3_object = client.get_object(Bucket=bucket_name, Key=file_key)
# Read the CSV body into a DataFrame
csv_body = s3_object["Body"].read()
df = pd.read_parquet(BytesIO(csv_body))
return df
def read_csv_from_s3(client, bucket_name, file_key, index_col):
"""
Read a CSV file from S3 using boto3 and pandas.
:param bucket_name: Name of the S3 bucket.
:param file_key: Key of the file (including directory path within the bucket).
:param aws_access_key_id: AWS Access Key ID
:param aws_secret_access_key: AWS Secret Access Key
:return: DataFrame containing the CSV data.
"""
# Get the object
s3_object = client.get_object(Bucket=bucket_name, Key=file_key)
# Read the CSV body into a DataFrame
csv_body = s3_object["Body"].read().decode("utf-8")
df = pd.read_csv(StringIO(csv_body), index_col=index_col)
return df
class DataLoader(Protocol):
"""
Interface for all DataLoader classes
"""
@staticmethod
def load(
client: BotoClient, filepath: str, index_col: str | None = None
) -> pd.DataFrame | None:
"""
Loading data from the relevant source
"""
class LocalDataLoader:
"""
Implements the DataLoader Protocol for local files
"""
@staticmethod
def load(
client: BotoClient, filepath: str, index_col: str | None = None
) -> pd.DataFrame:
if not os.path.exists(filepath):
raise FileNotFoundError(f"File not found: {filepath}")
if filepath.endswith(".parquet"):
df = pd.read_parquet(filepath)
if index_col is not None:
df = df.set_index(index_col)
elif filepath.endswith(".csv"):
df = pd.read_csv(filepath, index_col=index_col)
else:
raise ValueError(f"File format not supported for file: {filepath}")
return df
class S3DataLoader:
"""
Implements the DataLoader Protocol for s3 files, hosting locally in a mocked service
"""
@staticmethod
def load(
client: BotoClient, filepath: str, index_col: str | None = None
) -> pd.DataFrame:
if not filepath.startswith("s3://"):
filepath = "s3://" + filepath
filepath_split = filepath.split("s3://")[-1].split("/", 1)
bucket = filepath_split[0]
key = filepath_split[1]
if filepath.endswith(".parquet"):
df = read_parquet_from_s3(
client=client.client, bucket_name=bucket, file_key=key
)
if index_col is not None:
df = df.set_index(index_col)
elif filepath.endswith(".csv"):
df = read_csv_from_s3(
client=client.client,
bucket_name=bucket,
file_key=key,
index_col=index_col,
)
else:
raise ValueError(f"File format not supported for file: {filepath}")
return df
def dataloader_factory(runtime_environment: str | None = None) -> DataLoader:
"""
Use factory pattern to determine which loading method we use
"""
if runtime_environment is None:
runtime_environment = "local"
dataloader_types = {
"local": LocalDataLoader(),
"local-mock": S3DataLoader(),
"dev": S3DataLoader(),
"staging": S3DataLoader(),
"prod": S3DataLoader(),
}
if runtime_environment not in dataloader_types:
raise ValueError("Incorrect runtime environment specified")
return dataloader_types[runtime_environment]

View file

@ -1,26 +0,0 @@
"""
Logger that will be used throughout the application
"""
import logging
def setup_logger():
# Create a logger
logger = logging.getLogger()
# Set the log level
logger.setLevel(logging.INFO)
# Create a formatter
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
# Create a stream handler to direct logs to stdout
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
# Add the stream handler to the logger
logger.addHandler(stream_handler)
return logger
logger = setup_logger()

View file

@ -1,167 +0,0 @@
"""
Generate metrics and enable regeneration of metrics if new metrics are generated
Key tasks:
- Specify metric functions that take in prediction vs actual to generate a metric value
- Given a model and test data, produce a suite of all metrics
"""
import os
import pandas as pd
from pathlib import Path
import seaborn as sns
import matplotlib.pyplot as plt
from core.CloudClient import BotoClient
from core.Logger import logger
from core.Settings import (
RESIDUAL_TRUE_LABEL,
RESIDUAL_PREDICTION_LABEL,
SEABORN_RESIDUAL_AXIS_FONTSIZE,
SEABORN_RESIDUAL_TITLE_FONTSIZE,
SEABORN_RESIDUAL_STYLE,
SEABORN_RESIDUAL_ASPECT_RATIO,
SEABORN_RESIDUAL_PLOT_DPI,
SEABORN_RESIDUAL_RANGE,
SEABORN_RESIDUAL_LINE_COLOUR,
SEABORN_RESIDUAL_LINE_WIDTH,
)
from sklearn.metrics import (
mean_absolute_error,
median_absolute_error,
mean_squared_error,
mean_absolute_percentage_error,
)
# Dummy example of new metric that can be added - must be true and prediction as arguments
def max_error(y_true: pd.Series, y_pred: pd.Series):
return max(y_true - y_pred)
METRIC_TO_APPLY = [
mean_absolute_error,
median_absolute_error,
mean_squared_error,
mean_absolute_percentage_error,
# max_error
]
def sort_by_metric(
data: pd.DataFrame, optimse_metric: str, best_model_column_name: str
) -> pd.DataFrame:
"""
Helper function to sort data frame by metric and append a best model flag
"""
# Ascending as we want lowest error values
data = data.sort_values(optimse_metric, ascending=True).reset_index(drop=True)
data[best_model_column_name] = [False] * len(data)
data.loc[0, best_model_column_name] = True
return data
class Metrics:
"""
All metric functions used to generate a dictionary of metrics
"""
def upload_metrics(self, output_filepath: Path, client: BotoClient) -> None:
"""
Providing a path, this function will save the metrics folders/files.
"""
if client.client is None:
logger.info("In local development mode - no need to upload")
else:
logger.info(f"Saving metrics into s3")
s3_location = client.model_bucket + "/" + str(output_filepath)
self.directory_upload(
client=client,
local_directory=str(output_filepath),
bucket_name=client.model_bucket,
)
logger.info("Save complete")
def directory_upload(self, client, local_directory, bucket_name):
# Iterate through the local directory and upload each file
for root, dirs, files in os.walk(local_directory):
for file in files:
# Determine the local file path and S3 object key
local_file_path = os.path.join(root, file)
s3_object_key = os.path.relpath(local_file_path, local_directory)
# Upload the file to S3
client.client.upload_file(local_file_path, bucket_name, local_file_path)
logger.info(
f"Uploaded {local_file_path} to {bucket_name}/{local_file_path}"
)
@staticmethod
def list_metric_functions() -> list:
"""
Gather all metric functions to run
"""
return [metric_to_apply.__name__ for metric_to_apply in METRIC_TO_APPLY]
@staticmethod
def generate_metric_suite(actuals: pd.Series, predictions: pd.Series) -> pd.Series:
"""
For the model, test data and target, generate predictions and then iterative over all metrics to generate a Series of metric values
"""
metric_dict = {}
for metric_function in METRIC_TO_APPLY:
metric_dict[metric_function.__name__] = metric_function(
actuals, predictions
)
metrics = pd.Series(metric_dict)
return metrics
@staticmethod
def generate_plot_suite():
"""
Can do all metric ploting
"""
@staticmethod
def generate_residual_plot(
actuals: pd.Series,
predictions: pd.Series,
target_column: str,
output_filepath: Path | str,
):
# TODO: can have a model.metric_outputs method
# FOr not just do it here
residual_df = pd.DataFrame(
list(zip(actuals, predictions)),
columns=[RESIDUAL_TRUE_LABEL, RESIDUAL_PREDICTION_LABEL],
)
# image formatting
sns.set(style=SEABORN_RESIDUAL_STYLE)
ax = sns.scatterplot(
x=RESIDUAL_TRUE_LABEL, y=RESIDUAL_PREDICTION_LABEL, data=residual_df
)
ax.set_aspect(SEABORN_RESIDUAL_ASPECT_RATIO)
ax.set_xlabel(f"True {target_column}", fontsize=SEABORN_RESIDUAL_AXIS_FONTSIZE)
ax.set_ylabel(
f"Predicted {target_column}", fontsize=SEABORN_RESIDUAL_AXIS_FONTSIZE
) # ylabel
ax.set_title("Residuals", fontsize=SEABORN_RESIDUAL_TITLE_FONTSIZE)
# Square aspect ratio
ax.plot(
SEABORN_RESIDUAL_RANGE,
SEABORN_RESIDUAL_RANGE,
SEABORN_RESIDUAL_LINE_COLOUR,
linewidth=SEABORN_RESIDUAL_LINE_WIDTH,
)
plt.tight_layout()
plt.savefig(output_filepath, dpi=SEABORN_RESIDUAL_PLOT_DPI)

View file

@ -1,114 +0,0 @@
"""
"""
from io import StringIO
import pandas as pd
from pathlib import Path
from core.Logger import logger
from core.CloudClient import BotoClient
from core.Metrics import Metrics
from core.Settings import BEST_MODEL_COLUMN_NAME
def read_csv_from_s3(client, bucket_name, file_key, index_col):
"""
Read a CSV file from S3 using boto3 and pandas.
:param bucket_name: Name of the S3 bucket.
:param file_key: Key of the file (including directory path within the bucket).
:param aws_access_key_id: AWS Access Key ID
:param aws_secret_access_key: AWS Secret Access Key
:return: DataFrame containing the CSV data.
"""
# Get the object
s3_object = client.get_object(Bucket=bucket_name, Key=file_key)
# Read the CSV body into a DataFrame
csv_body = s3_object["Body"].read().decode("utf-8")
df = pd.read_csv(StringIO(csv_body), index_col=index_col)
return df
class RegistryHandler:
"""
Handles the loading of the registry depending on the environment
"""
def load_registry(self, registry_path: Path, client: BotoClient, metrics: Metrics):
"""
Depening on the environment, we will have to load from locally or s3 (mock/real)
"""
if client.client is None:
logger.info("Using local development - no need for s3 load")
return self.load_local_registry(
registry_path=registry_path, metrics=metrics
)
logger.info(f"Check if registry exists")
check_exists = client.client.list_objects_v2(
Bucket=client.model_bucket, Prefix=str(registry_path)
)
if "Contents" in check_exists:
logger.info("Loading existing registry")
registry_df = read_csv_from_s3(
client=client.client,
bucket_name=client.model_bucket,
file_key=str(registry_path),
index_col=None,
)
else:
logger.info("No registry found - creating new one")
registry_df = self.create_new_registry(metrics=metrics)
return registry_df
def load_local_registry(self, registry_path: Path, metrics: Metrics):
"""
In local development mode, load the registry
"""
if registry_path.exists():
logger.info("Registry file found - Loading into Dataframe")
registry_df = pd.read_csv(registry_path, index_col=None)
else:
logger.info("No registry found - creating new one")
registry_df = self.create_new_registry(metrics=metrics)
return registry_df
def create_new_registry(self, metrics: Metrics):
"""
If no registry is found, create a new one
"""
# TODO: Moved columns into settings: MODEL_DETAILS and Metrics class columns
columns = [
BEST_MODEL_COLUMN_NAME,
"model_type",
"model_name",
"model_location",
] + metrics.list_metric_functions()
registry_df = pd.DataFrame(columns=columns)
return registry_df
def save_registry(self, output_filepath: Path, client: BotoClient) -> None:
"""
Providing a path, this function will save the model to be used.
"""
if client.client is None:
logger.info("In local development mode - no need for s3 client")
else:
logger.info(f"Saving registry into s3")
s3_location = client.model_bucket + "/" + str(output_filepath)
client.client.upload_file(
str(output_filepath), client.model_bucket, str(output_filepath)
)
logger.info("Save complete")

View file

@ -1,221 +0,0 @@
# Using a simply python file as settings for now
# TODO: migrate to dynaconf
from pathlib import Path
METRIC_FILENAME = "metrics.csv"
OPTIMISE_METRIC = "mean_absolute_error"
BEST_MODEL_COLUMN_NAME = "best_model"
# TODO: remove these setting elsewhere for CML
RESIDUAL_TRUE_LABEL = "true"
RESIDUAL_PREDICTION_LABEL = "pred"
RESIDUAL_FILE = "residual.png"
SEABORN_RESIDUAL_AXIS_FONTSIZE = 12
SEABORN_RESIDUAL_TITLE_FONTSIZE = 22
SEABORN_RESIDUAL_STYLE = "whitegrid"
SEABORN_RESIDUAL_ASPECT_RATIO = "equal"
SEABORN_RESIDUAL_PLOT_DPI = 120
SEABORN_RESIDUAL_RANGE = [-100, 100]
SEABORN_RESIDUAL_LINE_COLOUR = "black"
SEABORN_RESIDUAL_LINE_WIDTH = 1
# Can move to a hyperparmeters file
# If anything we might want to have a file that can be loaded and sent to this script
MODEL_HYPERPARAMETERS = {
"autogluon": {
"problem_type": "regression",
"eval_metric": "mean_absolute_error",
"time_limit": 45,
"presets": "medium_quality",
"excluded_model_types": None,
}
}
TIMESTAMP_FORMAT = "%Y_%m_%d_%H_%M_%S"
RANDOM_SEED = 0
SUBSAMPLE_FACTOR = 200
TRAIN_AND_VALIDATION_DATA_NAME = "train_validation_data.parquet"
TEST_DATA_NAME = "test_data.parquet"
REGISTRY_FILE = "model_registry.csv"
MODEL_DIRECTORY = "model_directory"
BASE_REGISTRY_PATH = Path(__file__).parent.parent / MODEL_DIRECTORY
PREDICTION_LOCATION = Path("predictions")
PREDICTION_FILE = "prediction.json"
METADATA_FILE = "metadata.json"
MODEL_FOLDER = "model"
METRICS_FOLDER = "metrics"
DEPLOYMENT_FOLDER = "deployment"
TOTAL_FLOOR_AREA_NATIONAL_AVERAGE = 70
FLOOR_HEIGHT_NATIONAL_AVERAGE = 2.45
AVERAGE_FIXED_FEATURES = [
"TOTAL_FLOOR_AREA",
"FLOOR_HEIGHT",
"FIXED_LIGHTING_OUTLETS_COUNT",
]
COLUMNS_TO_MERGE_ON = [
"PROPERTY_TYPE",
"BUILT_FORM",
"CONSTRUCTION_AGE_BAND",
"NUMBER_HABITABLE_ROOMS",
"NUMBER_HEATED_ROOMS",
]
FULLY_GLAZED_DESCRIPTIONS = [
"Fully double glazed",
"High performance glazing",
"Fully triple glazed",
"Full secondary glazing",
"Multiple glazing throughout",
]
FIXED_FEATURES = [
"PROPERTY_TYPE",
"BUILT_FORM",
"CONSTRUCTION_AGE_BAND",
"NUMBER_HABITABLE_ROOMS",
"CONSTITUENCY",
"NUMBER_HEATED_ROOMS",
"FIXED_LIGHTING_OUTLETS_COUNT",
]
COMPONENT_FEATURES = [
"TRANSACTION_TYPE",
"WALLS_DESCRIPTION",
"FLOOR_DESCRIPTION",
"LIGHTING_DESCRIPTION",
"ROOF_DESCRIPTION",
"MAINHEAT_DESCRIPTION",
"HOTWATER_DESCRIPTION",
"MAIN_FUEL",
"MECHANICAL_VENTILATION",
"SECONDHEAT_DESCRIPTION",
"ENERGY_TARIFF", # Not sure if this is relevant
"SOLAR_WATER_HEATING_FLAG",
"PHOTO_SUPPLY",
"WINDOWS_DESCRIPTION",
"GLAZED_TYPE",
"MULTI_GLAZE_PROPORTION",
"LOW_ENERGY_LIGHTING",
"NUMBER_OPEN_FIREPLACES",
"MAINHEATCONT_DESCRIPTION",
"EXTENSION_COUNT",
"TOTAL_FLOOR_AREA",
"FLOOR_HEIGHT",
# 'GLAZED_AREA', # May not need this since we have MULTI_GLAZE_PROPORTION
]
# For these fields, we take the latest value if we have multiple values
# Since more recent EPCs have been conducted with more rigour, we assume that the latest value is
# the most accurate
LATEST_FIELD = [
"NUMBER_HABITABLE_ROOMS",
"NUMBER_HEATED_ROOMS",
"FIXED_LIGHTING_OUTLETS_COUNT",
"CONSTRUCTION_AGE_BAND", # This is a field we're probably want to use verisk data for
]
# If we see thee features changing, we don't use the EPC, since deem it not to be reliable
MANDATORY_FIXED_FEATURES = ["PROPERTY_TYPE", "BUILT_FORM", "CONSTITUENCY"]
# For particularly old EPC data, we have inconsistent records so we'll only include EPCS that were
# conducted after 2010, since SAP09 was introduced in 2009 an later SAP12 was introduced in England
# and Wales from 31 July 2014
EARLIEST_EPC_DATE = "2014-08-01"
RDSAP_RESPONSE = "CURRENT_ENERGY_EFFICIENCY"
HEAT_DEMAND_RESPONSE = "ENERGY_CONSUMPTION_CURRENT"
CARBON_RESPONSE = "CO2_EMISSIONS_CURRENT"
def ordinal(n):
if 10 <= n % 100 <= 20:
suffix = "th"
else:
suffix = {1: "st", 2: "nd", 3: "rd"}.get(n % 10, "th")
return str(n) + suffix
FLOOR_LEVEL_MAP = {
"Basement": -1,
"Ground": 0,
"ground floor": 0,
"20+": 20,
"21st or above": 21,
**{str(i).zfill(2): i for i in range(0, 21)},
**{ordinal(i): i for i in range(-1, 21)},
**{str(i): i for i in range(-1, 21)},
**{i: i for i in range(-1, 21)},
}
BUILT_FORM_REMAP = {
"Enclosed End-Terrace": "End-Terrace",
"Enclosed Mid-Terrace": "Mid-Terrace",
}
DATA_PROCESSOR_SETTINGS = {
"low_memory": False,
"epc_minimum_count": 1,
"column_mappings": {"UPRN": [int, str]},
}
# This has a manual mapping of the column types required
COLUMNTYPES = {
'UPRN': 'object', 'TOTAL_FLOOR_AREA': 'float64', 'FLOOR_HEIGHT': 'float64', 'PROPERTY_TYPE': 'object',
'BUILT_FORM': 'object', 'CONSTITUENCY': 'object', 'NUMBER_HABITABLE_ROOMS': 'float64',
'NUMBER_HEATED_ROOMS': 'float64', 'FIXED_LIGHTING_OUTLETS_COUNT': 'float64',
'CONSTRUCTION_AGE_BAND': 'object',
'TRANSACTION_TYPE': 'object',
'WALLS_DESCRIPTION': 'object',
'FLOOR_DESCRIPTION': 'object',
'LIGHTING_DESCRIPTION': 'object',
'ROOF_DESCRIPTION': 'object',
'MAINHEAT_DESCRIPTION': 'object',
'HOTWATER_DESCRIPTION': 'object', 'MAIN_FUEL': 'object',
'MECHANICAL_VENTILATION': 'object',
'SECONDHEAT_DESCRIPTION': 'object', 'ENERGY_TARIFF': 'object',
'SOLAR_WATER_HEATING_FLAG': 'object', 'PHOTO_SUPPLY': 'float64',
'WINDOWS_DESCRIPTION': 'object',
'GLAZED_TYPE': 'object',
'MULTI_GLAZE_PROPORTION': 'float64',
'LOW_ENERGY_LIGHTING': 'float64',
'NUMBER_OPEN_FIREPLACES': 'float64',
'MAINHEATCONT_DESCRIPTION': 'object',
'EXTENSION_COUNT': 'float64',
'LODGEMENT_DATE': 'object',
}
# For modelling, we don't allow records with more than 100 SAP points
MAX_SAP_SCORE = 100
fill_na_map = {
# There are some descriptions, such as "To be used only when there is no heating/hot-water system or data is from
# a community network" that could be clustered with unknown fuel
"MAIN_FUEL": "UNKNOWN",
"MECHANICAL_VENTILATION": "Unknown",
"SECONDHEAT_DESCRIPTION": "None",
"ENERGY_TARIFF": "Unknown",
# We set solar water heating flag to N - we could investigate using a different category entirely
"SOLAR_WATER_HEATING_FLAG": "N",
"GLAZED_TYPE": "not defined",
"MULTI_GLAZE_PROPORTION": 0,
"LOW_ENERGY_LIGHTING": 0,
"MAINHEATCONT_DESCRIPTION": "Unknown",
"EXTENSION_COUNT": 0,
"NUMBER_OPEN_FIREPLACES": 0
}
# After the property descriptions have been re-remapped, we expect these features to be fixed
FIXED_DESCRIPTON_MAPPED_FEATURES = [
'another_property_below', 'is_roof_room', 'is_granite_or_whinstone', 'is_flat', 'is_suspended',
'has_dwelling_above', 'is_as_built', 'is_to_external_air', 'is_cob', 'is_pitched', 'is_solid', 'is_at_rafters',
'is_solid_brick', 'is_loft', 'is_system_built', 'is_timber_frame', 'is_sandstone_or_limestone', 'is_filled_cavity',
'is_cavity_wall', 'is_thatched', 'is_to_unheated_space'
]

View file

@ -1,91 +0,0 @@
import boto3
from botocore.exceptions import NoCredentialsError
import json
from io import StringIO
import os
import logging
from predictions import prediction
logger = logging.getLogger()
logger.setLevel(logging.INFO)
RUNTIME_ENVIRONMENT = os.environ.get("RUNTIME_ENVIRONMENT", "dev")
def upload_dataframe_to_s3(df, bucket, s3_file_name):
"""
Upload a pandas DataFrame to an S3 bucket as CSV
:param df: DataFrame to upload
:param bucket: Bucket to upload to
:param s3_file_name: S3 object name
:return: True if file was uploaded, else False
"""
# Initialize the S3 client
s3 = boto3.client('s3')
csv_buffer = StringIO()
# Write the DataFrame to the buffer as CSV
df.to_csv(csv_buffer, index=False)
try:
# Upload the CSV from the buffer to S3
s3.put_object(Bucket=bucket, Key=s3_file_name, Body=csv_buffer.getvalue())
print(f"Successfully uploaded DataFrame to {bucket}/{s3_file_name}")
return True
except NoCredentialsError:
print("Credentials not available")
return False
def handler(event, context):
"""
Take in event and trigger the prediction pipeline
"""
logger.info("received event: " + str(event))
try:
body = json.loads(event["body"]) if not isinstance(event["body"], dict) else event["body"]
data_path = body["file_location"]
property_id = body["property_id"]
portfolio_id = body["portfolio_id"]
created_at = body["created_at"]
# We could fix the model path but for the moment, we just take the best model path based on the registry
outputs = prediction(model_path=None, data_path=data_path)
# Store into s3, with key of {portfolio_id}-{property_id}
storage_filepath = f"{portfolio_id}/{property_id}/{created_at}.csv"
upload_dataframe_to_s3(
df=outputs,
bucket=f"retrofit-sap-predictions-{RUNTIME_ENVIRONMENT}",
s3_file_name=storage_filepath
)
return {
"statusCode": 200,
"body": json.dumps({
"message": "Successfully processed input",
"storage_filepath": storage_filepath
})
}
except (Exception, KeyError, ValueError) as e:
logger.info("Prediction failed")
logger.info(e)
return {
"statusCode": 500,
"body": json.dumps({
"message": "Prediction failed",
"error": str(e)
})
}
if __name__ == "__main__":
handler()