be the same as main

This commit is contained in:
Jun-te Kim 2026-04-13 15:20:55 +00:00
parent ef366f1cd5
commit 9c1181475e

View file

@ -26,14 +26,15 @@ def has_solar_with_battery(materials_list: Optional[List[Dict[str, Any]]]) -> bo
:return:
"""
for m in materials_list or []:
if m.get("type") == "solar_pv" and m.get("includes_battery") is True:
if (
m.get("type") == "solar_pv"
and m.get("includes_battery") is True
):
return True
return False
def process_export(
payload: ExportRequest, session: Session
) -> Dict[Union[str, int], pd.DataFrame]:
def process_export(payload: ExportRequest, session: Session) -> Dict[Union[str, int], pd.DataFrame]:
export_files: Dict[Union[str, int], pd.DataFrame] = {}
db_methods = DbMethods(session)
@ -51,9 +52,7 @@ def process_export(
logger.info("Retrieved %s plans for export", len(plans_df))
if plans_df.empty:
logger.info(
"Empty plans dataframe - no plans to export. Returning empty export."
)
logger.info("Empty plans dataframe - no plans to export. Returning empty export.")
return export_files
plan_ids: List[int] = plans_df["id"].tolist()
recommendations_df: pd.DataFrame = db_methods.get_recommendations(plan_ids)
@ -62,12 +61,13 @@ def process_export(
recommendations_df = db_methods.attach_materials(recommendations_df)
recommendations_df["has_solar_with_battery"] = recommendations_df[
"materials"
].apply(has_solar_with_battery)
recommendations_df["has_solar_with_battery"] = (
recommendations_df["materials"].apply(has_solar_with_battery)
)
_filter = (recommendations_df["measure_type"] == "solar_pv") & (
recommendations_df["has_solar_with_battery"]
_filter = (
(recommendations_df["measure_type"] == "solar_pv")
& (recommendations_df["has_solar_with_battery"])
)
recommendations_df.loc[_filter, "measure_type"] = (
@ -83,13 +83,10 @@ def process_export(
else:
scenario_recs = recommendations_df[
recommendations_df["scenario_id"] == group_key
]
]
if scenario_recs.empty:
logger.info(
"No recommendations found for group_key %s - skipping export for this group",
group_key,
)
logger.info("No recommendations found for group_key %s - skipping export for this group", group_key)
continue
measures_df: pd.DataFrame = scenario_recs[
@ -102,12 +99,14 @@ def process_export(
values="estimated_cost",
).reset_index()
pivot["total_retrofit_cost"] = pivot.drop(
columns=["property_id", "plan_name"]
).sum(axis=1)
pivot["total_retrofit_cost"] = (
pivot.drop(columns=["property_id", "plan_name"]).sum(axis=1)
)
post_sap: pd.DataFrame = (
scenario_recs.groupby("property_id")[["sap_points"]].sum().reset_index()
scenario_recs.groupby("property_id")[["sap_points"]]
.sum()
.reset_index()
)
df: pd.DataFrame = (
@ -118,9 +117,7 @@ def process_export(
df["sap_points"] = df["sap_points"].fillna(0)
df["predicted_post_works_sap"] = df["current_sap_points"] + df["sap_points"]
df["predicted_post_works_epc"] = df["predicted_post_works_sap"].apply(
sap_to_epc
)
df["predicted_post_works_epc"] = df["predicted_post_works_sap"].apply(sap_to_epc)
export_files[group_key] = df
@ -131,10 +128,7 @@ def process_export(
# Lambda Handler
# ============================================================
def handler(
event: Mapping[str, Any], context: Optional[Any]
) -> Mapping[str, Union[int, str]]:
def handler(event: Mapping[str, Any], context: Optional[Any]) -> Mapping[str, Union[int, str]]:
"""
Example event:
body_dict = {