From 3e0444b3a7228ea165ff8de39f6c8bfdbde1fa35 Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Fri, 20 Feb 2026 17:01:09 +0000 Subject: [PATCH] working on export logic --- .idea/Model.iml | 2 +- .idea/misc.xml | 2 +- .../export/property_scenarios/db_functions.py | 205 ++++++++++++++++++ .../export/property_scenarios/input_schema.py | 33 +++ backend/export/property_scenarios/main.py | 154 +++++++++++++ 5 files changed, 394 insertions(+), 2 deletions(-) create mode 100644 backend/export/property_scenarios/db_functions.py create mode 100644 backend/export/property_scenarios/input_schema.py create mode 100644 backend/export/property_scenarios/main.py diff --git a/.idea/Model.iml b/.idea/Model.iml index 09f2e496..c6561970 100644 --- a/.idea/Model.iml +++ b/.idea/Model.iml @@ -7,7 +7,7 @@ - + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml index fb10c6b0..50cad4ca 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -3,7 +3,7 @@ - + diff --git a/backend/export/property_scenarios/db_functions.py b/backend/export/property_scenarios/db_functions.py new file mode 100644 index 00000000..f527e738 --- /dev/null +++ b/backend/export/property_scenarios/db_functions.py @@ -0,0 +1,205 @@ +from typing import List, Any, Dict, Optional +import pandas as pd +from sqlalchemy import func +from sqlalchemy.orm import Session +from collections import defaultdict + +from backend.app.db.models.recommendations import ( + Recommendation, + PlanModel, + PlanRecommendations, + RecommendationMaterials, +) +from backend.app.db.models.portfolio import ( + PropertyModel, + PropertyDetailsEpcModel, +) +from utils.logger import setup_logger + +logger = setup_logger() + + +class DbMethods: + + def __init__(self, session: Session): + self.session = session + + def get_properties(self, portfolio_id: int) -> pd.DataFrame: + """ + Function to fetch the property data, for property scenario exports + :param portfolio_id: + :return: + """ + query = ( + self.session.query(PropertyModel, PropertyDetailsEpcModel) + .join( + PropertyDetailsEpcModel, + PropertyModel.id == PropertyDetailsEpcModel.property_id, + ) + .filter(PropertyModel.portfolio_id == portfolio_id) + .all() + ) + + data = [ + { + **{ + col.name: getattr(row.PropertyModel, col.name) + for col in PropertyModel.__table__.columns + }, + **{ + col.name: getattr(row.PropertyDetailsEpcModel, col.name) + for col in PropertyDetailsEpcModel.__table__.columns + }, + } + for row in query + ] + + return pd.DataFrame(data) + + def get_latest_plans( + self, + portfolio_id: int, + scenario_ids: Optional[List[int]] = None, + default_only: bool = False, + ) -> pd.DataFrame: + """ + Fetch latest plans. + + Modes: + 1) Scenario mode: latest per (scenario_id, property_id) + 2) Default mode: latest default plan per property (ignores scenario_ids) + + """ + + # ----------------------------- + # Sanity checks + # ----------------------------- + if default_only and scenario_ids: + # Override scenario_ids to make it explicit that they will be ignored in the query + scenario_ids = None + + if not default_only and not scenario_ids: + raise ValueError( + "Either scenario_ids must be provided " + "or default_only must be True." + ) + + # ----------------------------- + # Filter on just the default plans - we ignore the scenario ids. NOTE - this is specific to postgres + # and relies on DISTINCT ON behaviour. + # ----------------------------- + if default_only: + # Latest default plan per property (ignore scenarios entirely) + # DISTINCT ON (property_id) keeps the first row per property, + # ordered by created_at DESC so we get the newest one. + + plans_query = ( + self.session.query(PlanModel) + .filter(PlanModel.is_default.is_(True)) + .distinct(PlanModel.property_id) + .order_by( + PlanModel.property_id, + PlanModel.created_at.desc(), + ) + ) + + else: + # Latest plan per (scenario_id, property_id) + # DISTINCT ON (scenario_id, property_id) keeps the newest + # plan per scenario/property combination. + + plans_query = ( + self.session.query(PlanModel) + .filter(PlanModel.scenario_id.in_(scenario_ids)) + .distinct( + PlanModel.scenario_id, + PlanModel.property_id, + ) + .order_by( + PlanModel.scenario_id, + PlanModel.property_id, + PlanModel.created_at.desc(), + ) + ) + + logger.info("Fetching plans") + plans = plans_query.all() + + return pd.DataFrame( + [ + { + col.name: getattr(plan, col.name) + for col in PlanModel.__table__.columns + } + for plan in plans + ] + ) + + def get_recommendations(self, plan_ids: List[int]) -> pd.DataFrame: + + if not plan_ids: + return pd.DataFrame() + + recs_query = ( + self.session.query( + Recommendation, + PlanModel.scenario_id, + ) + .join( + PlanRecommendations, + Recommendation.id == PlanRecommendations.recommendation_id, + ) + .join(PlanModel, PlanModel.id == PlanRecommendations.plan_id) + .filter( + PlanRecommendations.plan_id.in_(plan_ids), + Recommendation.default.is_(True), + Recommendation.already_installed.is_(False), + ) + .all() + ) + + data = [ + { + **{ + col.name: getattr(r.Recommendation, col.name) + for col in Recommendation.__table__.columns + }, + "scenario_id": r.scenario_id, + } + for r in recs_query + ] + + return pd.DataFrame(data) + + def attach_materials(self, recommendations_df: pd.DataFrame) -> pd.DataFrame: + + if recommendations_df.empty: + recommendations_df["materials"] = [] + return recommendations_df + + rec_ids = recommendations_df["id"].tolist() + + materials_query = ( + self.session.query(RecommendationMaterials) + .filter(RecommendationMaterials.recommendation_id.in_(rec_ids)) + .all() + ) + + materials_map: Dict[int, List[Dict[str, Any]]] = defaultdict(list) + + for m in materials_query: + materials_map[m.recommendation_id].append( + { + "material_id": m.material_id, + "depth": m.depth, + "quantity": m.quantity, + "quantity_unit": m.quantity_unit, + "estimated_cost": m.estimated_cost, + } + ) + + recommendations_df["materials"] = recommendations_df["id"].apply( + lambda x: materials_map.get(x, []) + ) + + return recommendations_df diff --git a/backend/export/property_scenarios/input_schema.py b/backend/export/property_scenarios/input_schema.py new file mode 100644 index 00000000..4ef704a3 --- /dev/null +++ b/backend/export/property_scenarios/input_schema.py @@ -0,0 +1,33 @@ +from typing import Optional, Union, List +from pydantic import BaseModel, model_validator + + +class ExportRequest(BaseModel): + # uuid which maps to a specific export request, used for tracking and logging + task_id: Union[str, None] + # uuid which maps to a specific export operation, used for tracking and logging. subtask is the child of the + # task, where the work has been distributed across workers + subtask_id: Union[str, None] + # associated portfolio id for the export request + portfolio_id: int + # list of scenario ids to export + scenario_ids: List[int] + # boolean which will overwrite the scenario ids. If this is true, we will only export the default plan for each + # property and will ignore the scenario ids + default_plans_only: Optional[bool] = False + + @model_validator(mode="after") + def validate_default_plan_override(self): + """ + If default_plans_only is True and scenario_ids were provided, + we allow execution but make it explicit that scenario_ids + will be ignored. + """ + if self.default_plans_only and self.scenario_ids: + # We do NOT raise — we allow execution. + # We just mark the object so the handler can log/return a warning. + object.__setattr__(self, "_scenario_ids_ignored", True) + else: + object.__setattr__(self, "_scenario_ids_ignored", False) + + return self diff --git a/backend/export/property_scenarios/main.py b/backend/export/property_scenarios/main.py new file mode 100644 index 00000000..88ebf326 --- /dev/null +++ b/backend/export/property_scenarios/main.py @@ -0,0 +1,154 @@ +import json +from typing import List, Optional, Any, Mapping + +import pandas as pd +from sqlalchemy.orm import Session + +from backend.export.property_scenarios.input_schema import ExportRequest +from backend.export.property_scenarios.db_functions import DbMethods +from backend.app.db.connection import db_engine +from backend.app.utils import sap_to_epc +from utils.logger import setup_logger + +logger = setup_logger() + + +def process_export(config: ExportRequest) -> List[str]: + exported_files: List[str] = [] + + with Session(bind=db_engine) as session: + + db_methods = DbMethods(session) + + properties_df = db_methods.get_properties(config.portfolio_id) + + plans_df = db_methods.get_latest_plans( + portfolio_id=config.portfolio_id, + scenario_ids=config.scenario_ids, + default_only=config.default_plans_only, + ) + + if plans_df.empty: + return exported_files + + recommendations_df = db_methods.get_recommendations( + plans_df["id"].tolist() + ) + + recommendations_df = db_methods.attach_materials(recommendations_df) + + for scenario_id in config.scenario_ids: + + scenario_recs = recommendations_df[ + recommendations_df["scenario_id"] == scenario_id + ] + + if scenario_recs.empty: + continue + + measures_df = scenario_recs[ + ["property_id", "measure_type", "estimated_cost"] + ].drop_duplicates() + + pivot = measures_df.pivot( + index="property_id", + columns="measure_type", + values="estimated_cost", + ).reset_index() + + pivot["total_retrofit_cost"] = ( + pivot.drop(columns=["property_id"]).sum(axis=1) + ) + + post_sap = ( + scenario_recs.groupby("property_id")[["sap_points"]] + .sum() + .reset_index() + ) + + df = ( + properties_df + .merge(pivot, how="left", on="property_id") + .merge(post_sap, how="left", on="property_id") + ) + + df["sap_points"] = df["sap_points"].fillna(0) + df["predicted_post_works_sap"] = ( + df["current_sap_points"] + df["sap_points"] + ) + df["predicted_post_works_epc"] = df[ + "predicted_post_works_sap" + ].apply(sap_to_epc) + + filename = ( + f"/tmp/{config.scenario_names[scenario_id]} - " + f"{config.project_name}.xlsx" + ) + + with pd.ExcelWriter(filename) as writer: + df.to_excel(writer, sheet_name="properties", index=False) + + exported_files.append(filename) + + return exported_files + + +# ============================================================ +# Lambda Handler +# ============================================================ + +def handler(event: dict, context: Optional[Any]) -> Mapping[str, int | str]: + """ + Lambda event should have the following structure: + 1) task id - unique identifier for the export task (optional, can be used for tracking/logging) + 2) subtask id - unique identifier for the specific export operation (optional, can be used for tracking/logging) + 2) portfolio id - id of the portfolio to export + 3) scenario ids - list of scenario ids to export + 4) default_plans_only - flag indicating if we should only consider default plans for export (optional, + defaults to False) + :param event: + :param context: + :return: + """ + for record in event.get("Records", []): + try: + body_dict = json.loads(record["body"]) + + # body_dict = { + # "task_id": "test", + # "subtask_id": "test", + # "portfolio_id": 569, + # "scenario_ids": [], + # "default_plans_only": True, + # } + + logger.debug("Validating request body") + payload = ExportRequest.model_validate(body_dict) + + if payload._scenario_ids_ignored: + logger.warning( + "Received scenario_ids in request body but they will be ignored " + "because default_plans_only is set to True" + ) + + logger.debug("Successfully validated request body") + process_export(payload) + + # TODO: Need to handle the exported files - e.g. upload to s3 and email a presigned url + + return { + "statusCode": 200, + "body": json.dumps({}), + } + + except Exception as e: + logger.error(f"Failed to process record: {e}") + return { + "statusCode": 500, + "body": json.dumps({"message": "Failed to process export request"}), + } + + return { + "statusCode": 201, + "body": json.dumps({"message": "No records to process"}), + }