mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
351 lines
15 KiB
Python
351 lines
15 KiB
Python
import aiohttp
|
|
import asyncio
|
|
import pandas as pd
|
|
from typing import List, Dict
|
|
from tqdm import tqdm
|
|
import requests
|
|
from requests.exceptions import RequestException
|
|
from utils.logger import setup_logger
|
|
from utils.s3 import save_dataframe_to_s3_parquet, read_dataframe_from_s3_parquet
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
class ModelApi:
|
|
_aiohttp_session: aiohttp.ClientSession = None
|
|
|
|
MODEL_PREFIXES = [
|
|
"sap_change_predictions",
|
|
"heat_demand_predictions",
|
|
"carbon_change_predictions",
|
|
]
|
|
|
|
KWH_MODEL_PREFIXES = ["heating_kwh_predictions", "hotwater_kwh_predictions"]
|
|
|
|
BASELINE_MODEL_PREFIXES = [
|
|
"retrofit_sap_baseline_predictions",
|
|
"retrofit_heat_baseline_predictions",
|
|
"retrofit_carbon_baseline_predictions",
|
|
]
|
|
|
|
MODEL_URLS: Dict[str, str] = {
|
|
"sap_change_predictions": "sapmodel",
|
|
"heat_demand_predictions": "heatmodel",
|
|
"carbon_change_predictions": "carbonmodel",
|
|
"hotwater_kwh_predictions": "hotwaterkwhmodel",
|
|
"heating_kwh_predictions": "heatingkwhmodel",
|
|
# Baseline prediction models
|
|
"retrofit_sap_baseline_predictions": "sapbaselinemodel",
|
|
"retrofit_heat_baseline_predictions": "heatbaselinemodel",
|
|
"retrofit_carbon_baseline_predictions": "carbonbaselinemodel",
|
|
}
|
|
|
|
def __init__(
|
|
self,
|
|
portfolio_id,
|
|
timestamp,
|
|
prediction_buckets,
|
|
base_url="https://api.dev.hestia.homes",
|
|
max_retries=2,
|
|
):
|
|
self.base_url = base_url
|
|
self.portfolio_id = portfolio_id
|
|
self.timestamp = timestamp
|
|
self.prediction_buckets = prediction_buckets
|
|
self.max_retries = max_retries
|
|
self.semaphore = asyncio.Semaphore(3)
|
|
|
|
@staticmethod
|
|
def get_aiohttp_session():
|
|
if ModelApi._aiohttp_session is None or ModelApi._aiohttp_session.closed:
|
|
connector = aiohttp.TCPConnector(limit=10)
|
|
ModelApi._aiohttp_session = aiohttp.ClientSession(connector=connector)
|
|
return ModelApi._aiohttp_session
|
|
|
|
@staticmethod
|
|
async def close_aiohttp_session():
|
|
if ModelApi._aiohttp_session and not ModelApi._aiohttp_session.closed:
|
|
await ModelApi._aiohttp_session.close()
|
|
ModelApi._aiohttp_session = None
|
|
|
|
@staticmethod
|
|
def predictions_template():
|
|
return {key: pd.DataFrame() for key in ModelApi.MODEL_URLS.keys()}
|
|
|
|
def upload_scoring_data(self, df: pd.DataFrame, bucket: str, model_prefix: str) -> str:
|
|
file_location = f"{model_prefix}/{self.portfolio_id}/{self.timestamp}.parquet"
|
|
logger.info(f"Storing scoring data to s3: {file_location}")
|
|
save_dataframe_to_s3_parquet(df=df, bucket_name=bucket, file_key=file_location)
|
|
return file_location
|
|
|
|
def predict(self, file_location, model_prefix: str):
|
|
"""Makes a POST request to the SAP Change Model API with the provided parameters.
|
|
|
|
Args:
|
|
file_location (str): The file location to be passed in the request payload.
|
|
model_prefix (str): The model prefix to be used in the request URL.
|
|
|
|
Returns:
|
|
dict: The API response as a dictionary if the request was successful, None otherwise.
|
|
"""
|
|
logger.info(f"Making request to {model_prefix} change api")
|
|
url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict"
|
|
payload = {
|
|
"file_location": file_location,
|
|
"property_id": "", # This should get removed
|
|
"portfolio_id": self.portfolio_id,
|
|
"created_at": self.timestamp
|
|
}
|
|
|
|
try:
|
|
response = requests.post(url, json=payload, headers={"Content-Type": "application/json"}, timeout=120)
|
|
|
|
# Check if the response status code is 2xx (success)
|
|
response.raise_for_status()
|
|
|
|
# Return the JSON response as a Python dictionary
|
|
return response.json()
|
|
except RequestException as e:
|
|
logger.error(f"An error occurred: {e}")
|
|
# In case of an error, you might want to return None or raise the exception
|
|
# depending on how you want to handle errors in your application
|
|
return None
|
|
|
|
async def predict_async(self, file_location, model_prefix: str, session: aiohttp.ClientSession):
|
|
url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict"
|
|
payload = {
|
|
"file_location": file_location,
|
|
"property_id": "",
|
|
"portfolio_id": self.portfolio_id,
|
|
"created_at": self.timestamp
|
|
}
|
|
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"User-Agent": "model-engine-lambda/1.0"
|
|
}
|
|
|
|
async with self.semaphore:
|
|
# await asyncio.sleep(random.uniform(0.3, 1.2))
|
|
try:
|
|
async with session.post(url, json=payload, headers=headers, timeout=120) as response:
|
|
if response.status != 200:
|
|
text = await response.text()
|
|
logger.error(
|
|
f"\n--- API ERROR [{model_prefix}] ---\n"
|
|
f"Status Code: {response.status}\n"
|
|
f"URL: {url}\n"
|
|
f"Payload: {payload}\n"
|
|
f"Response Text:\n{text}\n"
|
|
f"----------------------------"
|
|
)
|
|
return None
|
|
return await response.json()
|
|
except aiohttp.ClientError as e:
|
|
logger.error(f"[{model_prefix}] HTTP error: {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.exception(f"[{model_prefix}] Unexpected error: {e}")
|
|
return None
|
|
|
|
@staticmethod
|
|
def extract_phase(recommendation_id):
|
|
if 'phase=' in recommendation_id:
|
|
extracted = recommendation_id.split('phase=')[1]
|
|
return int(extracted.strip())
|
|
else:
|
|
return None
|
|
|
|
def predict_all(
|
|
self, df: pd.DataFrame,
|
|
bucket: str,
|
|
model_prefixes: List[str] | None = None,
|
|
extract_ids: bool = True,
|
|
extract_uprn: bool = False
|
|
) -> dict:
|
|
|
|
"""
|
|
For each model prefix, this method will upload the scoring data to s3 and then make a request to the
|
|
model api to generate predictions. The predictions will be stored in the predictions bucket.
|
|
This method will then fetch the stored predictions and format them, returning all of the predictions as
|
|
a dictionary of panaas dataframes
|
|
:param df: Pandas dataframe with scoring data to be uploaded to s3
|
|
:param bucket: Name of the bucket in s3 to upload to
|
|
:param model_prefixes: List of model prefixes to generate predictions for. If None, all model prefixes will be
|
|
used
|
|
:param extract_ids: Boolean to determine if the property_id and recommendation_id should be extracted from the
|
|
scoring data
|
|
:param extract_uprn: Boolean to determine if the uprn should be extracted from the scoring data
|
|
id column
|
|
:return:
|
|
"""
|
|
|
|
model_prefixes = self.MODEL_PREFIXES if model_prefixes is None else model_prefixes
|
|
|
|
predictions = {}
|
|
for model_prefix in model_prefixes:
|
|
logger.info(f"Scoring for model prefix: {model_prefix}")
|
|
file_location = self.upload_scoring_data(df, bucket, model_prefix)
|
|
response = self.predict(
|
|
"s3://{DATA_BUCKET}/".format(DATA_BUCKET=bucket) + file_location, model_prefix
|
|
)
|
|
|
|
predictions_bucket = self.prediction_buckets[model_prefix]
|
|
|
|
# Retrieve the predictions
|
|
predictions_df = pd.DataFrame(
|
|
read_dataframe_from_s3_parquet(
|
|
bucket_name=predictions_bucket,
|
|
file_key=response["storage_filepath"].split(predictions_bucket + "/")[1]
|
|
)
|
|
)
|
|
|
|
predictions_df['predictions'] = predictions_df["predictions"].astype(float).round(1)
|
|
if extract_ids:
|
|
predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+', expand=True)
|
|
# To grab the phase, we pull the integer after "phase=" in the recommendation_id. We can do this with a
|
|
# string split on phase= and then grab the second element of the resulting list. We could also use a
|
|
# regular expression to do this but we use the string split method here, for safety.
|
|
# We may not always have a phase to split on, so we need to handle this case. We can do this by using
|
|
# the str[1] method to grab the second element of the resulting list. We then grab the first
|
|
# character of this
|
|
# string to get the phase. We then convert this to an integer.
|
|
# Convert back to int
|
|
predictions_df['phase'] = predictions_df['recommendation_id'].apply(self.extract_phase)
|
|
|
|
if extract_uprn and "uprn" in df.columns:
|
|
predictions_df["uprn"] = df["uprn"].values
|
|
|
|
predictions[model_prefix] = predictions_df
|
|
|
|
return predictions
|
|
|
|
async def predict_all_async(self, df, bucket, model_prefixes=None, extract_ids=True) -> dict:
|
|
model_prefixes = self.MODEL_PREFIXES if model_prefixes is None else model_prefixes
|
|
predictions = {}
|
|
|
|
session = self.get_aiohttp_session()
|
|
|
|
async def run_model(model_prefix):
|
|
logger.info(f"Scoring for model prefix: {model_prefix}")
|
|
file_location = self.upload_scoring_data(df, bucket, model_prefix)
|
|
response = await self.predict_async(f"s3://{bucket}/" + file_location, model_prefix, session=session)
|
|
return model_prefix, response
|
|
|
|
# Run all model calls concurrently
|
|
results = await asyncio.gather(
|
|
*(run_model(mp) for mp in model_prefixes),
|
|
return_exceptions=True
|
|
)
|
|
|
|
for model_prefix, response in results:
|
|
if response and not isinstance(response, Exception):
|
|
predictions_bucket = self.prediction_buckets[model_prefix]
|
|
predictions_df = pd.DataFrame(
|
|
read_dataframe_from_s3_parquet(
|
|
bucket_name=predictions_bucket,
|
|
file_key=response["storage_filepath"].split(predictions_bucket + "/")[1]
|
|
)
|
|
)
|
|
predictions_df['predictions'] = predictions_df["predictions"].astype(float).round(1)
|
|
if extract_ids:
|
|
predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+',
|
|
expand=True)
|
|
predictions_df['phase'] = predictions_df['recommendation_id'].apply(self.extract_phase)
|
|
|
|
predictions[model_prefix] = predictions_df
|
|
|
|
return predictions
|
|
|
|
def paginated_predictions(self, data, bucket, batch_size, model_prefixes=None, extract_ids=True):
|
|
all_predictions = self.predictions_template()
|
|
to_loop_over = range(0, data.shape[0], batch_size)
|
|
for chunk in tqdm(to_loop_over, total=len(to_loop_over)):
|
|
predictions_dict = self.predict_all(
|
|
df=data.iloc[chunk:chunk + batch_size],
|
|
bucket=bucket,
|
|
model_prefixes=model_prefixes,
|
|
extract_ids=extract_ids
|
|
)
|
|
|
|
# Append the predictions to the predictions dictionary
|
|
for key, scored in predictions_dict.items():
|
|
all_predictions[key] = pd.concat([all_predictions[key], scored])
|
|
|
|
return all_predictions
|
|
|
|
async def async_warm_up_lambdas(self, model_prefies=None):
|
|
logger.info("Asynchronously warming up Lambda functions...")
|
|
model_prefixes = self.MODEL_PREFIXES if model_prefies is None else model_prefies
|
|
session = self.get_aiohttp_session()
|
|
tasks = [
|
|
self._send_warm_up_request(
|
|
session, f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict",
|
|
model_prefix
|
|
)
|
|
for model_prefix in model_prefixes
|
|
]
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
logger.info("Lambda functions are warmed up and ready to go!")
|
|
|
|
@staticmethod
|
|
async def _send_warm_up_request(session, url, model_prefix):
|
|
try:
|
|
json_payload = {
|
|
"file_location": "s3://warm-up-placeholder",
|
|
"portfolio_id": 0,
|
|
"property_id": "",
|
|
"created_at": "2020-01-01T00:00:00",
|
|
"warm": True
|
|
# The presence of this key will send the api down a specific warm up route, to call
|
|
# prediction and load the font manager, because that is a key bottleneck for cold starts
|
|
}
|
|
async with session.post(url, json=json_payload, timeout=10) as response:
|
|
text = await response.text()
|
|
if response.status != 200:
|
|
logger.warning(
|
|
f"[{model_prefix}] Warm-up failed. Status: {response.status} | Body:\n{text}"
|
|
)
|
|
else:
|
|
logger.info(f"[{model_prefix}] Warmed up with status {response.status}")
|
|
except aiohttp.ClientError as e:
|
|
logger.warning(f"[{model_prefix}] Warm-up ClientError: {e}")
|
|
except Exception as e:
|
|
logger.warning(f"[{model_prefix}] Warm-up Exception: {e}")
|
|
|
|
async def async_paginated_predictions(self, data, bucket, batch_size, model_prefixes=None, extract_ids=True):
|
|
all_predictions = self.predictions_template()
|
|
for chunk in tqdm(range(0, data.shape[0], batch_size)):
|
|
attempts = 0
|
|
while attempts <= self.max_retries:
|
|
try:
|
|
predictions_dict = await self.predict_all_async(
|
|
df=data.iloc[chunk:chunk + batch_size],
|
|
bucket=bucket,
|
|
model_prefixes=model_prefixes,
|
|
extract_ids=extract_ids
|
|
)
|
|
for key, scored in predictions_dict.items():
|
|
all_predictions[key] = pd.concat([all_predictions[key], scored])
|
|
break
|
|
except Exception as e:
|
|
attempts += 1
|
|
logger.error(f"Batch {chunk}-{chunk + batch_size} failed (Attempt {attempts}): {e}")
|
|
await asyncio.sleep(2 ** attempts) # exponential backoff
|
|
await self.close_aiohttp_session()
|
|
|
|
# Ensure stable output structure for the datagrame to be utilised by other functions downstream
|
|
for k in all_predictions.keys():
|
|
if all_predictions[k].empty:
|
|
col_template = ['id', 'predictions', 'property_id', 'recommendation_id', 'phase'] if (
|
|
extract_ids) else ['id', 'predictions']
|
|
|
|
all_predictions[k] = pd.DataFrame(
|
|
columns=col_template
|
|
)
|
|
|
|
return all_predictions
|
|
|
|
@property
|
|
def models_for_warm_up(self):
|
|
return self.KWH_MODEL_PREFIXES + self.MODEL_PREFIXES + self.BASELINE_MODEL_PREFIXES
|