Merge pull request #466 from Hestia-Homes/debugging-api

beefed up api async logic
This commit is contained in:
KhalimCK 2025-07-22 20:52:13 +01:00 committed by GitHub
commit 21b82c34fb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 98 additions and 143 deletions

View file

@ -517,13 +517,6 @@ async def model_engine(body: PlanTriggerRequest):
input_properties = []
for config in tqdm(plan_input):
if config["landlord_property_id"] in ["LE113NWIC95", "NG241FBCT", "NG51BNIC"]:
continue
if not pd.isnull(config.get("uprn")):
if int(float(config.get("uprn"))) < 0:
continue
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
uprn = config.get("uprn", None)
if pd.isnull(uprn):
@ -532,7 +525,7 @@ async def model_engine(body: PlanTriggerRequest):
uprn = int(float(uprn))
epc_searcher = SearchEpc(
address1=config["address"],
address1=str(config["address"]),
postcode=config["postcode"],
uprn=uprn,
auth_token=get_settings().EPC_AUTH_TOKEN,

View file

@ -1,3 +1,4 @@
import json
import aiohttp
import asyncio
import pandas as pd
@ -11,6 +12,8 @@ logger = setup_logger()
class ModelApi:
_aiohttp_session: aiohttp.ClientSession = None
MODEL_PREFIXES = [
"sap_change_predictions",
"heat_demand_predictions",
@ -54,44 +57,30 @@ class ModelApi:
self.portfolio_id = portfolio_id
self.timestamp = timestamp
self.prediction_buckets = prediction_buckets
self.max_retries = max_retries
self.semaphore = asyncio.Semaphore(5)
@staticmethod
def get_aiohttp_session():
if ModelApi._aiohttp_session is None or ModelApi._aiohttp_session.closed:
connector = aiohttp.TCPConnector(limit=10)
ModelApi._aiohttp_session = aiohttp.ClientSession(connector=connector)
return ModelApi._aiohttp_session
@staticmethod
async def close_aiohttp_session():
if ModelApi._aiohttp_session and not ModelApi._aiohttp_session.closed:
await ModelApi._aiohttp_session.close()
ModelApi._aiohttp_session = None
@staticmethod
def predictions_template():
return {
"sap_change_predictions": pd.DataFrame(),
"heat_demand_predictions": pd.DataFrame(),
"carbon_change_predictions": pd.DataFrame(),
"hotwater_kwh_predictions": pd.DataFrame(),
"heating_kwh_predictions": pd.DataFrame(),
}
return {key: pd.DataFrame() for key in ModelApi.MODEL_URLS.keys()}
def upload_scoring_data(self, df: pd.DataFrame, bucket: str, model_prefix: str) -> str:
"""
The sap model api needs a scoring data that is sitting in s3 to use as a dataset to score on
This method allows the user to upload a table as a parquet file. This method will return the file
location, which can be used as the file location in the predict() method
:param df: Pandas dataframe with scoring data to be uploaded to s3
:param bucket: Name of the bucket in s3 to upload to
:param model_prefix: The model prefix to be used in the file location
:return:
"""
# if model_prefix not in self.MODEL_PREFIXES:
# raise ValueError(f"Model prefix specified is not in {self.MODEL_PREFIXES}")
# Store parquet file in s3 for scoring
file_location = f"{model_prefix}/{self.portfolio_id}/{self.timestamp}.parquet"
logger.info("Storing scoring data to s3")
save_dataframe_to_s3_parquet(
df=df,
bucket_name=bucket,
file_key=file_location
)
logger.info(f"Storing scoring data to s3: {file_location}")
save_dataframe_to_s3_parquet(df=df, bucket_name=bucket, file_key=file_location)
return file_location
def predict(self, file_location, model_prefix: str):
@ -127,25 +116,25 @@ class ModelApi:
# depending on how you want to handle errors in your application
return None
async def predict_async(self, file_location, model_prefix: str):
"""Makes an asynchronous POST request to the Model API with the provided parameters."""
logger.info(f"Making request to {model_prefix} model API")
async def predict_async(self, file_location, model_prefix: str, session: aiohttp.ClientSession):
url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict"
payload = {
"file_location": file_location,
"property_id": "", # This should get removed
"property_id": "",
"portfolio_id": self.portfolio_id,
"created_at": self.timestamp
}
async with aiohttp.ClientSession() as session:
try:
async with session.post(
url, json=payload, headers={"Content-Type": "application/json"}, timeout=120
) as response:
text = await response.text()
headers = {
"Content-Type": "application/json",
"User-Agent": "model-engine-lambda/1.0"
}
async with self.semaphore:
try:
async with session.post(url, json=payload, headers=headers, timeout=120) as response:
if response.status != 200:
text = await response.text()
logger.error(
f"\n--- API ERROR [{model_prefix}] ---\n"
f"Status Code: {response.status}\n"
@ -155,10 +144,7 @@ class ModelApi:
f"----------------------------"
)
return None
# Successful response
return aiohttp.helpers.json.loads(text)
return await response.json()
except aiohttp.ClientError as e:
logger.error(f"[{model_prefix}] HTTP error: {e}")
return None
@ -227,41 +213,35 @@ class ModelApi:
return predictions
async def predict_all_async(self, df, bucket, model_prefixes=None, extract_ids=True) -> dict:
"""Uploads data and makes asynchronous requests to the model APIs for predictions."""
model_prefixes = self.MODEL_PREFIXES if model_prefixes is None else model_prefixes
predictions = {}
tasks = []
async with aiohttp.ClientSession() as session:
for model_prefix in model_prefixes:
logger.info(f"Scoring for model prefix: {model_prefix}")
logger.info("Uploading scoring data to S3")
file_location = self.upload_scoring_data(df, bucket, model_prefix)
logger.info("Data uploaded to S3, now making prediction request")
# Schedule the prediction request as a coroutine
tasks.append(
self.predict_async(f"s3://{bucket}/" + file_location, model_prefix)
)
# Gather all asynchronous tasks (execute them concurrently)
responses = await asyncio.gather(*tasks, return_exceptions=True)
session = self.get_aiohttp_session()
for model_prefix, response in zip(model_prefixes, responses):
if response:
predictions_bucket = self.prediction_buckets[model_prefix]
predictions_df = pd.DataFrame(
read_dataframe_from_s3_parquet(
bucket_name=predictions_bucket,
file_key=response["storage_filepath"].split(predictions_bucket + "/")[1]
)
for model_prefix in model_prefixes:
logger.info(f"Scoring for model prefix: {model_prefix}")
file_location = self.upload_scoring_data(df, bucket, model_prefix)
tasks.append(self.predict_async(f"s3://{bucket}/" + file_location, model_prefix, session=session))
responses = await asyncio.gather(*tasks, return_exceptions=True)
for model_prefix, response in zip(model_prefixes, responses):
if response:
predictions_bucket = self.prediction_buckets[model_prefix]
predictions_df = pd.DataFrame(
read_dataframe_from_s3_parquet(
bucket_name=predictions_bucket,
file_key=response["storage_filepath"].split(predictions_bucket + "/")[1]
)
predictions_df['predictions'] = predictions_df["predictions"].astype(float).round(1)
if extract_ids:
predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+',
expand=True)
predictions_df['phase'] = predictions_df['recommendation_id'].apply(self.extract_phase)
)
predictions_df['predictions'] = predictions_df["predictions"].astype(float).round(1)
if extract_ids:
predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+',
expand=True)
predictions_df['phase'] = predictions_df['recommendation_id'].apply(self.extract_phase)
predictions[model_prefix] = predictions_df
predictions[model_prefix] = predictions_df
return predictions
@ -283,75 +263,57 @@ class ModelApi:
return all_predictions
async def async_warm_up_lambdas(self, model_prefies=None):
"""Send asynchronous pre-flight requests to each model endpoint to wake up the cold Lambdas without waiting
for responses."""
logger.info("Asynchronously warming up Lambda functions...")
model_prefixes = self.MODEL_PREFIXES if model_prefies is None else model_prefies
tasks = []
async with aiohttp.ClientSession() as session:
for model_prefix in model_prefixes:
url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict"
# Create a coroutine for each warm-up request and add it to the tasks list
tasks.append(self._send_warm_up_request(session, url, model_prefix))
# Run all tasks concurrently but don't wait for the responses to finish
await asyncio.gather(*tasks, return_exceptions=True)
session = self.get_aiohttp_session()
tasks = [
self._send_warm_up_request(session, f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict",
model_prefix)
for model_prefix in model_prefixes
]
await asyncio.gather(*tasks, return_exceptions=True)
logger.info("Lambda functions are warmed up and ready to go!")
@staticmethod
async def _send_warm_up_request(session, url, model_prefix):
"""Helper method to send a pre-flight request to a given model URL."""
try:
async with session.post(url, json={}, timeout=2) as response:
# Log success for monitoring but do not block on the response
logger.info(f"Warmed up {model_prefix} with status code: {response.status}")
json_payload = {
"file_location": "s3://warm-up-placeholder",
"portfolio_id": 0,
"property_id": "",
"created_at": "2020-01-01T00:00:00"
}
async with session.post(url, json=json_payload, timeout=10) as response:
text = await response.text()
if response.status != 200:
logger.warning(
f"[{model_prefix}] Warm-up failed. Status: {response.status} | Body:\n{text}"
)
else:
logger.info(f"[{model_prefix}] Warmed up with status {response.status}")
except aiohttp.ClientError as e:
logger.warning(f"Failed to warm up {model_prefix}: {e}")
logger.info("Lambda functions are warmed up and ready to go!")
logger.warning(f"[{model_prefix}] Warm-up ClientError: {e}")
except Exception as e:
logger.warning(f"[{model_prefix}] Warm-up Exception: {e}")
async def async_paginated_predictions(self, data, bucket, batch_size, model_prefixes=None, extract_ids=True):
all_predictions = self.predictions_template()
to_loop_over = range(0, data.shape[0], batch_size)
async def run_batches():
for chunk in tqdm(to_loop_over, total=len(to_loop_over)):
attempts = 0
success = False
while attempts <= self.max_retries and not success:
try:
predictions_dict = await self.predict_all_async(
df=data.iloc[chunk:chunk + batch_size],
bucket=bucket,
model_prefixes=model_prefixes,
extract_ids=extract_ids
)
for key, scored in predictions_dict.items():
all_predictions[key] = pd.concat([all_predictions[key], scored])
success = True
except Exception as e:
attempts += 1
logger.error(
f"Batch {chunk}-{chunk + batch_size} failed (Attempt {attempts}/{self.max_retries}). "
f"Error: {e}"
)
if attempts > self.max_retries:
logger.error(
f"Skipping batch {chunk}-{chunk + batch_size} after {self.max_retries} failed attempts."
)
# Check if there is an existing event loop
try:
# If there is an existing event loop, await the coroutine directly
loop = asyncio.get_running_loop()
await run_batches()
except RuntimeError: # No running event loop
# If no event loop is running, use asyncio.run()
asyncio.run(run_batches())
for chunk in tqdm(range(0, data.shape[0], batch_size)):
attempts = 0
while attempts <= self.max_retries:
try:
predictions_dict = await self.predict_all_async(
df=data.iloc[chunk:chunk + batch_size],
bucket=bucket,
model_prefixes=model_prefixes,
extract_ids=extract_ids
)
for key, scored in predictions_dict.items():
all_predictions[key] = pd.concat([all_predictions[key], scored])
break
except Exception as e:
attempts += 1
logger.error(f"Batch {chunk}-{chunk + batch_size} failed (Attempt {attempts}): {e}")
await asyncio.sleep(2 ** attempts) # exponential backoff
await self.close_aiohttp_session()
return all_predictions

View file

@ -60,7 +60,7 @@ functions:
image:
uri: ${env:ECR_URI}:${env:GITHUB_SHA}
timeout: 900
memorySize: 2048
memorySize: 4096
role: EngineLambdaRole
events:
- sqs: