mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
193 lines
6.2 KiB
Python
193 lines
6.2 KiB
Python
import json
|
|
from typing import Optional, Any, Mapping, Dict, Union, List
|
|
|
|
import pandas as pd
|
|
from sqlalchemy.orm import Session
|
|
|
|
from backend.export.property_scenarios.input_schema import ExportRequest
|
|
from backend.export.property_scenarios.db_functions import DbMethods
|
|
from backend.app.db.connection import db_read_session
|
|
from backend.app.utils import sap_to_epc
|
|
from utils.logger import setup_logger
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
def choose_group_keys(payload: ExportRequest) -> List[Union[int, str]]:
|
|
if payload.default_plans_only:
|
|
return ["default_plans"] # Single export, no scenario grouping
|
|
return payload.scenario_ids
|
|
|
|
|
|
def has_solar_with_battery(materials_list: Optional[List[Dict[str, Any]]]) -> bool:
|
|
"""
|
|
Simple check to determine if any material in the list is a solar PV measure that includes a battery.
|
|
:param materials_list:
|
|
:return:
|
|
"""
|
|
for m in materials_list or []:
|
|
if m.get("type") == "solar_pv" and m.get("includes_battery") is True:
|
|
return True
|
|
return False
|
|
|
|
|
|
def process_export(
|
|
payload: ExportRequest, session: Session
|
|
) -> Dict[Union[str, int], pd.DataFrame]:
|
|
export_files: Dict[Union[str, int], pd.DataFrame] = {}
|
|
|
|
db_methods = DbMethods(session)
|
|
|
|
properties_df = db_methods.get_properties(payload.portfolio_id)
|
|
|
|
logger.info("Retrieved %s properties for export", len(properties_df))
|
|
|
|
plans_df: pd.DataFrame = db_methods.get_latest_plans(
|
|
portfolio_id=payload.portfolio_id,
|
|
scenario_ids=payload.scenario_ids,
|
|
default_only=bool(payload.default_plans_only),
|
|
)
|
|
|
|
logger.info("Retrieved %s plans for export", len(plans_df))
|
|
|
|
if plans_df.empty:
|
|
logger.info(
|
|
"Empty plans dataframe - no plans to export. Returning empty export."
|
|
)
|
|
return export_files
|
|
plan_ids: List[int] = plans_df["id"].tolist()
|
|
recommendations_df: pd.DataFrame = db_methods.get_recommendations(plan_ids)
|
|
|
|
logger.info("Retrieved %s recommendations for export", len(recommendations_df))
|
|
|
|
recommendations_df = db_methods.attach_materials(recommendations_df)
|
|
|
|
recommendations_df["has_solar_with_battery"] = recommendations_df[
|
|
"materials"
|
|
].apply(has_solar_with_battery)
|
|
|
|
_filter = (recommendations_df["measure_type"] == "solar_pv") & (
|
|
recommendations_df["has_solar_with_battery"]
|
|
)
|
|
|
|
recommendations_df.loc[_filter, "measure_type"] = (
|
|
recommendations_df.loc[_filter, "measure_type"] + "_with_battery"
|
|
)
|
|
|
|
group_keys: List[Union[str, int]] = choose_group_keys(payload)
|
|
|
|
for group_key in group_keys:
|
|
|
|
if payload.default_plans_only:
|
|
scenario_recs = recommendations_df
|
|
else:
|
|
scenario_recs = recommendations_df[
|
|
recommendations_df["scenario_id"] == group_key
|
|
]
|
|
|
|
if scenario_recs.empty:
|
|
logger.info(
|
|
"No recommendations found for group_key %s - skipping export for this group",
|
|
group_key,
|
|
)
|
|
continue
|
|
|
|
measures_df: pd.DataFrame = scenario_recs[
|
|
["property_id", "measure_type", "plan_name", "estimated_cost"]
|
|
].drop_duplicates()
|
|
|
|
pivot: pd.DataFrame = measures_df.pivot(
|
|
index=["property_id", "plan_name"],
|
|
columns="measure_type",
|
|
values="estimated_cost",
|
|
).reset_index()
|
|
|
|
pivot["total_retrofit_cost"] = pivot.drop(
|
|
columns=["property_id", "plan_name"]
|
|
).sum(axis=1)
|
|
|
|
post_sap: pd.DataFrame = (
|
|
scenario_recs.groupby("property_id")[["sap_points"]].sum().reset_index()
|
|
)
|
|
|
|
df: pd.DataFrame = (
|
|
properties_df.rename(columns={"solar_pv": "existing_solar_pv"})
|
|
.merge(pivot, how="left", on="property_id")
|
|
.merge(post_sap, how="left", on="property_id")
|
|
)
|
|
|
|
df["sap_points"] = df["sap_points"].fillna(0)
|
|
df["predicted_post_works_sap"] = df["current_sap_points"] + df["sap_points"]
|
|
df["predicted_post_works_epc"] = df["predicted_post_works_sap"].apply(
|
|
sap_to_epc
|
|
)
|
|
|
|
export_files[group_key] = df
|
|
|
|
return export_files
|
|
|
|
|
|
# ============================================================
|
|
# Lambda Handler
|
|
# ============================================================
|
|
|
|
|
|
def handler(
|
|
event: Mapping[str, Any], context: Optional[Any]
|
|
) -> Mapping[str, Union[int, str]]:
|
|
"""
|
|
Example event:
|
|
body_dict = {
|
|
"task_id": "test",
|
|
"subtask_id": "test",
|
|
"portfolio_id": 655,
|
|
"scenario_ids": [],
|
|
"default_plans_only": True,
|
|
}
|
|
|
|
body_dict = {
|
|
"task_id": "test",
|
|
"subtask_id": "test",
|
|
"portfolio_id": 655,
|
|
"scenario_ids": [1174],
|
|
"default_plans_only": False,
|
|
}
|
|
:param event: Lambda event containing export request details
|
|
:param context: Lambda context (not used in this handler but included for completeness)
|
|
:return: HTTP response indicating success or failure of the export operation
|
|
"""
|
|
for record in event.get("Records", []):
|
|
try:
|
|
body_dict = json.loads(record["body"])
|
|
|
|
logger.debug("Validating request body")
|
|
payload = ExportRequest.model_validate(body_dict)
|
|
|
|
if payload.scenario_ids_ignored:
|
|
logger.warning(
|
|
"Received scenario_ids in request body but they will be ignored "
|
|
"because default_plans_only is set to True"
|
|
)
|
|
|
|
logger.debug("Successfully validated request body")
|
|
with db_read_session() as session:
|
|
exported_files = process_export(payload, session)
|
|
|
|
# TODO: Need to handle the exported files - e.g. upload to s3 and email a presigned url
|
|
_ = exported_files
|
|
return {
|
|
"statusCode": 200,
|
|
"body": json.dumps({}),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to process record: {e}")
|
|
return {
|
|
"statusCode": 500,
|
|
"body": json.dumps({"message": "Failed to process export request"}),
|
|
}
|
|
|
|
return {
|
|
"statusCode": 201,
|
|
"body": json.dumps({"message": "No records to process"}),
|
|
}
|