mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
beefed up api async logic
This commit is contained in:
parent
2b4ac6ae53
commit
b68dc49ce2
3 changed files with 98 additions and 143 deletions
|
|
@ -517,13 +517,6 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
input_properties = []
|
||||
for config in tqdm(plan_input):
|
||||
|
||||
if config["landlord_property_id"] in ["LE113NWIC95", "NG241FBCT", "NG51BNIC"]:
|
||||
continue
|
||||
|
||||
if not pd.isnull(config.get("uprn")):
|
||||
if int(float(config.get("uprn"))) < 0:
|
||||
continue
|
||||
|
||||
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
|
||||
uprn = config.get("uprn", None)
|
||||
if pd.isnull(uprn):
|
||||
|
|
@ -532,7 +525,7 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
uprn = int(float(uprn))
|
||||
|
||||
epc_searcher = SearchEpc(
|
||||
address1=config["address"],
|
||||
address1=str(config["address"]),
|
||||
postcode=config["postcode"],
|
||||
uprn=uprn,
|
||||
auth_token=get_settings().EPC_AUTH_TOKEN,
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import json
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import pandas as pd
|
||||
|
|
@ -11,6 +12,8 @@ logger = setup_logger()
|
|||
|
||||
|
||||
class ModelApi:
|
||||
_aiohttp_session: aiohttp.ClientSession = None
|
||||
|
||||
MODEL_PREFIXES = [
|
||||
"sap_change_predictions",
|
||||
"heat_demand_predictions",
|
||||
|
|
@ -54,44 +57,30 @@ class ModelApi:
|
|||
self.portfolio_id = portfolio_id
|
||||
self.timestamp = timestamp
|
||||
self.prediction_buckets = prediction_buckets
|
||||
|
||||
self.max_retries = max_retries
|
||||
self.semaphore = asyncio.Semaphore(5)
|
||||
|
||||
@staticmethod
|
||||
def get_aiohttp_session():
|
||||
if ModelApi._aiohttp_session is None or ModelApi._aiohttp_session.closed:
|
||||
connector = aiohttp.TCPConnector(limit=10)
|
||||
ModelApi._aiohttp_session = aiohttp.ClientSession(connector=connector)
|
||||
return ModelApi._aiohttp_session
|
||||
|
||||
@staticmethod
|
||||
async def close_aiohttp_session():
|
||||
if ModelApi._aiohttp_session and not ModelApi._aiohttp_session.closed:
|
||||
await ModelApi._aiohttp_session.close()
|
||||
ModelApi._aiohttp_session = None
|
||||
|
||||
@staticmethod
|
||||
def predictions_template():
|
||||
return {
|
||||
"sap_change_predictions": pd.DataFrame(),
|
||||
"heat_demand_predictions": pd.DataFrame(),
|
||||
"carbon_change_predictions": pd.DataFrame(),
|
||||
"hotwater_kwh_predictions": pd.DataFrame(),
|
||||
"heating_kwh_predictions": pd.DataFrame(),
|
||||
}
|
||||
return {key: pd.DataFrame() for key in ModelApi.MODEL_URLS.keys()}
|
||||
|
||||
def upload_scoring_data(self, df: pd.DataFrame, bucket: str, model_prefix: str) -> str:
|
||||
"""
|
||||
The sap model api needs a scoring data that is sitting in s3 to use as a dataset to score on
|
||||
This method allows the user to upload a table as a parquet file. This method will return the file
|
||||
location, which can be used as the file location in the predict() method
|
||||
|
||||
:param df: Pandas dataframe with scoring data to be uploaded to s3
|
||||
:param bucket: Name of the bucket in s3 to upload to
|
||||
:param model_prefix: The model prefix to be used in the file location
|
||||
:return:
|
||||
"""
|
||||
|
||||
# if model_prefix not in self.MODEL_PREFIXES:
|
||||
# raise ValueError(f"Model prefix specified is not in {self.MODEL_PREFIXES}")
|
||||
|
||||
# Store parquet file in s3 for scoring
|
||||
file_location = f"{model_prefix}/{self.portfolio_id}/{self.timestamp}.parquet"
|
||||
|
||||
logger.info("Storing scoring data to s3")
|
||||
save_dataframe_to_s3_parquet(
|
||||
df=df,
|
||||
bucket_name=bucket,
|
||||
file_key=file_location
|
||||
)
|
||||
|
||||
logger.info(f"Storing scoring data to s3: {file_location}")
|
||||
save_dataframe_to_s3_parquet(df=df, bucket_name=bucket, file_key=file_location)
|
||||
return file_location
|
||||
|
||||
def predict(self, file_location, model_prefix: str):
|
||||
|
|
@ -127,25 +116,25 @@ class ModelApi:
|
|||
# depending on how you want to handle errors in your application
|
||||
return None
|
||||
|
||||
async def predict_async(self, file_location, model_prefix: str):
|
||||
"""Makes an asynchronous POST request to the Model API with the provided parameters."""
|
||||
logger.info(f"Making request to {model_prefix} model API")
|
||||
async def predict_async(self, file_location, model_prefix: str, session: aiohttp.ClientSession):
|
||||
url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict"
|
||||
payload = {
|
||||
"file_location": file_location,
|
||||
"property_id": "", # This should get removed
|
||||
"property_id": "",
|
||||
"portfolio_id": self.portfolio_id,
|
||||
"created_at": self.timestamp
|
||||
}
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
try:
|
||||
async with session.post(
|
||||
url, json=payload, headers={"Content-Type": "application/json"}, timeout=120
|
||||
) as response:
|
||||
text = await response.text()
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"User-Agent": "model-engine-lambda/1.0"
|
||||
}
|
||||
|
||||
async with self.semaphore:
|
||||
try:
|
||||
async with session.post(url, json=payload, headers=headers, timeout=120) as response:
|
||||
if response.status != 200:
|
||||
text = await response.text()
|
||||
logger.error(
|
||||
f"\n--- API ERROR [{model_prefix}] ---\n"
|
||||
f"Status Code: {response.status}\n"
|
||||
|
|
@ -155,10 +144,7 @@ class ModelApi:
|
|||
f"----------------------------"
|
||||
)
|
||||
return None
|
||||
|
||||
# Successful response
|
||||
return aiohttp.helpers.json.loads(text)
|
||||
|
||||
return await response.json()
|
||||
except aiohttp.ClientError as e:
|
||||
logger.error(f"[{model_prefix}] HTTP error: {e}")
|
||||
return None
|
||||
|
|
@ -227,41 +213,35 @@ class ModelApi:
|
|||
return predictions
|
||||
|
||||
async def predict_all_async(self, df, bucket, model_prefixes=None, extract_ids=True) -> dict:
|
||||
"""Uploads data and makes asynchronous requests to the model APIs for predictions."""
|
||||
model_prefixes = self.MODEL_PREFIXES if model_prefixes is None else model_prefixes
|
||||
|
||||
predictions = {}
|
||||
tasks = []
|
||||
async with aiohttp.ClientSession() as session:
|
||||
for model_prefix in model_prefixes:
|
||||
logger.info(f"Scoring for model prefix: {model_prefix}")
|
||||
logger.info("Uploading scoring data to S3")
|
||||
file_location = self.upload_scoring_data(df, bucket, model_prefix)
|
||||
logger.info("Data uploaded to S3, now making prediction request")
|
||||
# Schedule the prediction request as a coroutine
|
||||
tasks.append(
|
||||
self.predict_async(f"s3://{bucket}/" + file_location, model_prefix)
|
||||
)
|
||||
|
||||
# Gather all asynchronous tasks (execute them concurrently)
|
||||
responses = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
session = self.get_aiohttp_session()
|
||||
|
||||
for model_prefix, response in zip(model_prefixes, responses):
|
||||
if response:
|
||||
predictions_bucket = self.prediction_buckets[model_prefix]
|
||||
predictions_df = pd.DataFrame(
|
||||
read_dataframe_from_s3_parquet(
|
||||
bucket_name=predictions_bucket,
|
||||
file_key=response["storage_filepath"].split(predictions_bucket + "/")[1]
|
||||
)
|
||||
for model_prefix in model_prefixes:
|
||||
logger.info(f"Scoring for model prefix: {model_prefix}")
|
||||
file_location = self.upload_scoring_data(df, bucket, model_prefix)
|
||||
tasks.append(self.predict_async(f"s3://{bucket}/" + file_location, model_prefix, session=session))
|
||||
|
||||
responses = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
for model_prefix, response in zip(model_prefixes, responses):
|
||||
if response:
|
||||
predictions_bucket = self.prediction_buckets[model_prefix]
|
||||
predictions_df = pd.DataFrame(
|
||||
read_dataframe_from_s3_parquet(
|
||||
bucket_name=predictions_bucket,
|
||||
file_key=response["storage_filepath"].split(predictions_bucket + "/")[1]
|
||||
)
|
||||
predictions_df['predictions'] = predictions_df["predictions"].astype(float).round(1)
|
||||
if extract_ids:
|
||||
predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+',
|
||||
expand=True)
|
||||
predictions_df['phase'] = predictions_df['recommendation_id'].apply(self.extract_phase)
|
||||
)
|
||||
predictions_df['predictions'] = predictions_df["predictions"].astype(float).round(1)
|
||||
if extract_ids:
|
||||
predictions_df[['property_id', 'recommendation_id']] = predictions_df['id'].str.split('+',
|
||||
expand=True)
|
||||
predictions_df['phase'] = predictions_df['recommendation_id'].apply(self.extract_phase)
|
||||
|
||||
predictions[model_prefix] = predictions_df
|
||||
predictions[model_prefix] = predictions_df
|
||||
|
||||
return predictions
|
||||
|
||||
|
|
@ -283,75 +263,57 @@ class ModelApi:
|
|||
return all_predictions
|
||||
|
||||
async def async_warm_up_lambdas(self, model_prefies=None):
|
||||
"""Send asynchronous pre-flight requests to each model endpoint to wake up the cold Lambdas without waiting
|
||||
for responses."""
|
||||
logger.info("Asynchronously warming up Lambda functions...")
|
||||
|
||||
model_prefixes = self.MODEL_PREFIXES if model_prefies is None else model_prefies
|
||||
|
||||
tasks = []
|
||||
async with aiohttp.ClientSession() as session:
|
||||
for model_prefix in model_prefixes:
|
||||
url = f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict"
|
||||
# Create a coroutine for each warm-up request and add it to the tasks list
|
||||
tasks.append(self._send_warm_up_request(session, url, model_prefix))
|
||||
|
||||
# Run all tasks concurrently but don't wait for the responses to finish
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
session = self.get_aiohttp_session()
|
||||
tasks = [
|
||||
self._send_warm_up_request(session, f"{self.base_url}/{self.MODEL_URLS[model_prefix]}/predict",
|
||||
model_prefix)
|
||||
for model_prefix in model_prefixes
|
||||
]
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
logger.info("Lambda functions are warmed up and ready to go!")
|
||||
|
||||
@staticmethod
|
||||
async def _send_warm_up_request(session, url, model_prefix):
|
||||
"""Helper method to send a pre-flight request to a given model URL."""
|
||||
try:
|
||||
async with session.post(url, json={}, timeout=2) as response:
|
||||
# Log success for monitoring but do not block on the response
|
||||
logger.info(f"Warmed up {model_prefix} with status code: {response.status}")
|
||||
json_payload = {
|
||||
"file_location": "s3://warm-up-placeholder",
|
||||
"portfolio_id": 0,
|
||||
"property_id": "",
|
||||
"created_at": "2020-01-01T00:00:00"
|
||||
}
|
||||
async with session.post(url, json=json_payload, timeout=10) as response:
|
||||
text = await response.text()
|
||||
if response.status != 200:
|
||||
logger.warning(
|
||||
f"[{model_prefix}] Warm-up failed. Status: {response.status} | Body:\n{text}"
|
||||
)
|
||||
else:
|
||||
logger.info(f"[{model_prefix}] Warmed up with status {response.status}")
|
||||
except aiohttp.ClientError as e:
|
||||
logger.warning(f"Failed to warm up {model_prefix}: {e}")
|
||||
|
||||
logger.info("Lambda functions are warmed up and ready to go!")
|
||||
logger.warning(f"[{model_prefix}] Warm-up ClientError: {e}")
|
||||
except Exception as e:
|
||||
logger.warning(f"[{model_prefix}] Warm-up Exception: {e}")
|
||||
|
||||
async def async_paginated_predictions(self, data, bucket, batch_size, model_prefixes=None, extract_ids=True):
|
||||
all_predictions = self.predictions_template()
|
||||
to_loop_over = range(0, data.shape[0], batch_size)
|
||||
|
||||
async def run_batches():
|
||||
for chunk in tqdm(to_loop_over, total=len(to_loop_over)):
|
||||
|
||||
attempts = 0
|
||||
success = False
|
||||
while attempts <= self.max_retries and not success:
|
||||
try:
|
||||
predictions_dict = await self.predict_all_async(
|
||||
df=data.iloc[chunk:chunk + batch_size],
|
||||
bucket=bucket,
|
||||
model_prefixes=model_prefixes,
|
||||
extract_ids=extract_ids
|
||||
)
|
||||
|
||||
for key, scored in predictions_dict.items():
|
||||
all_predictions[key] = pd.concat([all_predictions[key], scored])
|
||||
|
||||
success = True
|
||||
except Exception as e:
|
||||
attempts += 1
|
||||
logger.error(
|
||||
f"Batch {chunk}-{chunk + batch_size} failed (Attempt {attempts}/{self.max_retries}). "
|
||||
f"Error: {e}"
|
||||
)
|
||||
|
||||
if attempts > self.max_retries:
|
||||
logger.error(
|
||||
f"Skipping batch {chunk}-{chunk + batch_size} after {self.max_retries} failed attempts."
|
||||
)
|
||||
|
||||
# Check if there is an existing event loop
|
||||
try:
|
||||
# If there is an existing event loop, await the coroutine directly
|
||||
loop = asyncio.get_running_loop()
|
||||
await run_batches()
|
||||
except RuntimeError: # No running event loop
|
||||
# If no event loop is running, use asyncio.run()
|
||||
asyncio.run(run_batches())
|
||||
|
||||
for chunk in tqdm(range(0, data.shape[0], batch_size)):
|
||||
attempts = 0
|
||||
while attempts <= self.max_retries:
|
||||
try:
|
||||
predictions_dict = await self.predict_all_async(
|
||||
df=data.iloc[chunk:chunk + batch_size],
|
||||
bucket=bucket,
|
||||
model_prefixes=model_prefixes,
|
||||
extract_ids=extract_ids
|
||||
)
|
||||
for key, scored in predictions_dict.items():
|
||||
all_predictions[key] = pd.concat([all_predictions[key], scored])
|
||||
break
|
||||
except Exception as e:
|
||||
attempts += 1
|
||||
logger.error(f"Batch {chunk}-{chunk + batch_size} failed (Attempt {attempts}): {e}")
|
||||
await asyncio.sleep(2 ** attempts) # exponential backoff
|
||||
await self.close_aiohttp_session()
|
||||
return all_predictions
|
||||
|
|
|
|||
|
|
@ -60,7 +60,7 @@ functions:
|
|||
image:
|
||||
uri: ${env:ECR_URI}:${env:GITHUB_SHA}
|
||||
timeout: 900
|
||||
memorySize: 2048
|
||||
memorySize: 4096
|
||||
role: EngineLambdaRole
|
||||
events:
|
||||
- sqs:
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue