Model/backend/export/property_scenarios/main.py
2026-04-16 22:21:54 +00:00

190 lines
6.4 KiB
Python

import json
from typing import Optional, Any, Mapping, Dict, Union, List
import pandas as pd
from sqlalchemy.orm import Session
from backend.export.property_scenarios.input_schema import ExportRequest
from backend.export.property_scenarios.db_functions import DbMethods
from backend.app.db.connection import db_read_session
from backend.app.utils import sap_to_epc
from utils.logger import setup_logger
logger = setup_logger()
def choose_group_keys(payload: ExportRequest) -> List[Union[int, str]]:
if payload.default_plans_only:
return ["default_plans"] # Single export, no scenario grouping
return payload.scenario_ids
def has_solar_with_battery(materials_list: Optional[List[Dict[str, Any]]]) -> bool:
"""
Simple check to determine if any material in the list is a solar PV measure that includes a battery.
:param materials_list:
:return:
"""
for m in materials_list or []:
if m.get("type") == "solar_pv" and m.get("includes_battery") is True:
return True
return False
def process_export(
payload: ExportRequest, session: Session
) -> Dict[Union[str, int], pd.DataFrame]:
export_files: Dict[Union[str, int], pd.DataFrame] = {}
db_methods = DbMethods(session)
properties_df = db_methods.get_properties(payload.portfolio_id)
logger.info("Retrieved %s properties for export", len(properties_df))
plans_df: pd.DataFrame = db_methods.get_latest_plans(
portfolio_id=payload.portfolio_id,
scenario_ids=payload.scenario_ids,
default_only=bool(payload.default_plans_only),
)
logger.info("Retrieved %s plans for export", len(plans_df))
if plans_df.empty:
logger.info(
"Empty plans dataframe - no plans to export. Returning empty export."
)
return export_files
plan_ids: List[int] = plans_df["id"].tolist()
recommendations_df: pd.DataFrame = db_methods.get_recommendations(plan_ids)
logger.info("Retrieved %s recommendations for export", len(recommendations_df))
recommendations_df = db_methods.attach_materials(recommendations_df)
recommendations_df["has_solar_with_battery"] = recommendations_df[
"materials"
].apply(has_solar_with_battery)
_filter = (recommendations_df["measure_type"] == "solar_pv") & (
recommendations_df["has_solar_with_battery"]
)
recommendations_df.loc[_filter, "measure_type"] = (
recommendations_df.loc[_filter, "measure_type"] + "_with_battery"
)
group_keys: List[Union[str, int]] = choose_group_keys(payload)
for group_key in group_keys:
if payload.default_plans_only:
scenario_recs = recommendations_df
else:
scenario_recs = recommendations_df[
recommendations_df["scenario_id"] == group_key
]
if scenario_recs.empty:
logger.info(
"No recommendations found for group_key %s - skipping export for this group",
group_key,
)
continue
measures_df: pd.DataFrame = scenario_recs[
["property_id", "measure_type", "plan_name", "estimated_cost"]
].drop_duplicates()
pivot: pd.DataFrame = measures_df.pivot(
index=["property_id", "plan_name"],
columns="measure_type",
values="estimated_cost",
).reset_index()
pivot["total_retrofit_cost"] = pivot.drop(
columns=["property_id", "plan_name"]
).sum(axis=1)
post_sap: pd.DataFrame = (
scenario_recs.groupby("property_id")[["sap_points"]].sum().reset_index()
)
df: pd.DataFrame = (
properties_df.rename(columns={"solar_pv": "existing_solar_pv"})
.merge(pivot, how="left", on="property_id")
.merge(post_sap, how="left", on="property_id")
)
df["sap_points"] = df["sap_points"].fillna(0)
df["predicted_post_works_sap"] = df["current_sap_points"] + df["sap_points"]
df["predicted_post_works_epc"] = df["predicted_post_works_sap"].apply(
sap_to_epc
)
export_files[group_key] = df
return export_files
# ============================================================
# Lambda Handler
# ============================================================
def handler(
event: Mapping[str, Any], context: Optional[Any]
) -> Mapping[str, Union[int, str]]:
"""
Example event:
body_dict = {
"task_id": "test",
"subtask_id": "test",
"portfolio_id": 682,
"scenario_ids": [1210],
"default_plans_only": False,
}
:param event: Lambda event containing export request details
:param context: Lambda context (not used in this handler but included for completeness)
:return: HTTP response indicating success or failure of the export operation
"""
for record in event.get("Records", []):
try:
body_dict = json.loads(record["body"])
logger.debug("Validating request body")
payload = ExportRequest.model_validate(body_dict)
if payload.scenario_ids_ignored:
logger.warning(
"Received scenario_ids in request body but they will be ignored "
"because default_plans_only is set to True"
)
logger.debug("Successfully validated request body")
with db_read_session() as session:
exported_files = process_export(payload, session)
# TODO: Need to handle the exported files - e.g. upload to s3 and email a presigned url
output_path = f"/tmp/export_{payload.portfolio_id}.xlsx"
with pd.ExcelWriter(output_path, engine="openpyxl") as writer:
for group_key, df in exported_files.items():
sheet_name = str(group_key)[:31] # Excel sheet names max 31 chars
df.to_excel(writer, sheet_name=sheet_name, index=False)
logger.info("Exported files written to %s", output_path)
return {
"statusCode": 200,
"body": json.dumps({}),
}
except Exception as e:
logger.error(f"Failed to process record: {e}")
return {
"statusCode": 500,
"body": json.dumps({"message": "Failed to process export request"}),
}
return {
"statusCode": 201,
"body": json.dumps({"message": "No records to process"}),
}