working on export logic

This commit is contained in:
Khalim Conn-Kowlessar 2026-02-20 17:01:09 +00:00
parent d51af41125
commit 3e0444b3a7
5 changed files with 394 additions and 2 deletions

2
.idea/Model.iml generated
View file

@ -7,7 +7,7 @@
<sourceFolder url="file://$MODULE_DIR$/open_uprn" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/recommendations" isTestSource="false" />
</content>
<orderEntry type="jdk" jdkName="AssetList" jdkType="Python SDK" />
<orderEntry type="jdk" jdkName="Fastapi-backend" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

2
.idea/misc.xml generated
View file

@ -3,7 +3,7 @@
<component name="Black">
<option name="sdkName" value="Python 3.10 (backend)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="AssetList" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Fastapi-backend" project-jdk-type="Python SDK" />
<component name="PyCharmProfessionalAdvertiser">
<option name="shown" value="true" />
</component>

View file

@ -0,0 +1,205 @@
from typing import List, Any, Dict, Optional
import pandas as pd
from sqlalchemy import func
from sqlalchemy.orm import Session
from collections import defaultdict
from backend.app.db.models.recommendations import (
Recommendation,
PlanModel,
PlanRecommendations,
RecommendationMaterials,
)
from backend.app.db.models.portfolio import (
PropertyModel,
PropertyDetailsEpcModel,
)
from utils.logger import setup_logger
logger = setup_logger()
class DbMethods:
def __init__(self, session: Session):
self.session = session
def get_properties(self, portfolio_id: int) -> pd.DataFrame:
"""
Function to fetch the property data, for property scenario exports
:param portfolio_id:
:return:
"""
query = (
self.session.query(PropertyModel, PropertyDetailsEpcModel)
.join(
PropertyDetailsEpcModel,
PropertyModel.id == PropertyDetailsEpcModel.property_id,
)
.filter(PropertyModel.portfolio_id == portfolio_id)
.all()
)
data = [
{
**{
col.name: getattr(row.PropertyModel, col.name)
for col in PropertyModel.__table__.columns
},
**{
col.name: getattr(row.PropertyDetailsEpcModel, col.name)
for col in PropertyDetailsEpcModel.__table__.columns
},
}
for row in query
]
return pd.DataFrame(data)
def get_latest_plans(
self,
portfolio_id: int,
scenario_ids: Optional[List[int]] = None,
default_only: bool = False,
) -> pd.DataFrame:
"""
Fetch latest plans.
Modes:
1) Scenario mode: latest per (scenario_id, property_id)
2) Default mode: latest default plan per property (ignores scenario_ids)
"""
# -----------------------------
# Sanity checks
# -----------------------------
if default_only and scenario_ids:
# Override scenario_ids to make it explicit that they will be ignored in the query
scenario_ids = None
if not default_only and not scenario_ids:
raise ValueError(
"Either scenario_ids must be provided "
"or default_only must be True."
)
# -----------------------------
# Filter on just the default plans - we ignore the scenario ids. NOTE - this is specific to postgres
# and relies on DISTINCT ON behaviour.
# -----------------------------
if default_only:
# Latest default plan per property (ignore scenarios entirely)
# DISTINCT ON (property_id) keeps the first row per property,
# ordered by created_at DESC so we get the newest one.
plans_query = (
self.session.query(PlanModel)
.filter(PlanModel.is_default.is_(True))
.distinct(PlanModel.property_id)
.order_by(
PlanModel.property_id,
PlanModel.created_at.desc(),
)
)
else:
# Latest plan per (scenario_id, property_id)
# DISTINCT ON (scenario_id, property_id) keeps the newest
# plan per scenario/property combination.
plans_query = (
self.session.query(PlanModel)
.filter(PlanModel.scenario_id.in_(scenario_ids))
.distinct(
PlanModel.scenario_id,
PlanModel.property_id,
)
.order_by(
PlanModel.scenario_id,
PlanModel.property_id,
PlanModel.created_at.desc(),
)
)
logger.info("Fetching plans")
plans = plans_query.all()
return pd.DataFrame(
[
{
col.name: getattr(plan, col.name)
for col in PlanModel.__table__.columns
}
for plan in plans
]
)
def get_recommendations(self, plan_ids: List[int]) -> pd.DataFrame:
if not plan_ids:
return pd.DataFrame()
recs_query = (
self.session.query(
Recommendation,
PlanModel.scenario_id,
)
.join(
PlanRecommendations,
Recommendation.id == PlanRecommendations.recommendation_id,
)
.join(PlanModel, PlanModel.id == PlanRecommendations.plan_id)
.filter(
PlanRecommendations.plan_id.in_(plan_ids),
Recommendation.default.is_(True),
Recommendation.already_installed.is_(False),
)
.all()
)
data = [
{
**{
col.name: getattr(r.Recommendation, col.name)
for col in Recommendation.__table__.columns
},
"scenario_id": r.scenario_id,
}
for r in recs_query
]
return pd.DataFrame(data)
def attach_materials(self, recommendations_df: pd.DataFrame) -> pd.DataFrame:
if recommendations_df.empty:
recommendations_df["materials"] = []
return recommendations_df
rec_ids = recommendations_df["id"].tolist()
materials_query = (
self.session.query(RecommendationMaterials)
.filter(RecommendationMaterials.recommendation_id.in_(rec_ids))
.all()
)
materials_map: Dict[int, List[Dict[str, Any]]] = defaultdict(list)
for m in materials_query:
materials_map[m.recommendation_id].append(
{
"material_id": m.material_id,
"depth": m.depth,
"quantity": m.quantity,
"quantity_unit": m.quantity_unit,
"estimated_cost": m.estimated_cost,
}
)
recommendations_df["materials"] = recommendations_df["id"].apply(
lambda x: materials_map.get(x, [])
)
return recommendations_df

View file

@ -0,0 +1,33 @@
from typing import Optional, Union, List
from pydantic import BaseModel, model_validator
class ExportRequest(BaseModel):
# uuid which maps to a specific export request, used for tracking and logging
task_id: Union[str, None]
# uuid which maps to a specific export operation, used for tracking and logging. subtask is the child of the
# task, where the work has been distributed across workers
subtask_id: Union[str, None]
# associated portfolio id for the export request
portfolio_id: int
# list of scenario ids to export
scenario_ids: List[int]
# boolean which will overwrite the scenario ids. If this is true, we will only export the default plan for each
# property and will ignore the scenario ids
default_plans_only: Optional[bool] = False
@model_validator(mode="after")
def validate_default_plan_override(self):
"""
If default_plans_only is True and scenario_ids were provided,
we allow execution but make it explicit that scenario_ids
will be ignored.
"""
if self.default_plans_only and self.scenario_ids:
# We do NOT raise — we allow execution.
# We just mark the object so the handler can log/return a warning.
object.__setattr__(self, "_scenario_ids_ignored", True)
else:
object.__setattr__(self, "_scenario_ids_ignored", False)
return self

View file

@ -0,0 +1,154 @@
import json
from typing import List, Optional, Any, Mapping
import pandas as pd
from sqlalchemy.orm import Session
from backend.export.property_scenarios.input_schema import ExportRequest
from backend.export.property_scenarios.db_functions import DbMethods
from backend.app.db.connection import db_engine
from backend.app.utils import sap_to_epc
from utils.logger import setup_logger
logger = setup_logger()
def process_export(config: ExportRequest) -> List[str]:
exported_files: List[str] = []
with Session(bind=db_engine) as session:
db_methods = DbMethods(session)
properties_df = db_methods.get_properties(config.portfolio_id)
plans_df = db_methods.get_latest_plans(
portfolio_id=config.portfolio_id,
scenario_ids=config.scenario_ids,
default_only=config.default_plans_only,
)
if plans_df.empty:
return exported_files
recommendations_df = db_methods.get_recommendations(
plans_df["id"].tolist()
)
recommendations_df = db_methods.attach_materials(recommendations_df)
for scenario_id in config.scenario_ids:
scenario_recs = recommendations_df[
recommendations_df["scenario_id"] == scenario_id
]
if scenario_recs.empty:
continue
measures_df = scenario_recs[
["property_id", "measure_type", "estimated_cost"]
].drop_duplicates()
pivot = measures_df.pivot(
index="property_id",
columns="measure_type",
values="estimated_cost",
).reset_index()
pivot["total_retrofit_cost"] = (
pivot.drop(columns=["property_id"]).sum(axis=1)
)
post_sap = (
scenario_recs.groupby("property_id")[["sap_points"]]
.sum()
.reset_index()
)
df = (
properties_df
.merge(pivot, how="left", on="property_id")
.merge(post_sap, how="left", on="property_id")
)
df["sap_points"] = df["sap_points"].fillna(0)
df["predicted_post_works_sap"] = (
df["current_sap_points"] + df["sap_points"]
)
df["predicted_post_works_epc"] = df[
"predicted_post_works_sap"
].apply(sap_to_epc)
filename = (
f"/tmp/{config.scenario_names[scenario_id]} - "
f"{config.project_name}.xlsx"
)
with pd.ExcelWriter(filename) as writer:
df.to_excel(writer, sheet_name="properties", index=False)
exported_files.append(filename)
return exported_files
# ============================================================
# Lambda Handler
# ============================================================
def handler(event: dict, context: Optional[Any]) -> Mapping[str, int | str]:
"""
Lambda event should have the following structure:
1) task id - unique identifier for the export task (optional, can be used for tracking/logging)
2) subtask id - unique identifier for the specific export operation (optional, can be used for tracking/logging)
2) portfolio id - id of the portfolio to export
3) scenario ids - list of scenario ids to export
4) default_plans_only - flag indicating if we should only consider default plans for export (optional,
defaults to False)
:param event:
:param context:
:return:
"""
for record in event.get("Records", []):
try:
body_dict = json.loads(record["body"])
# body_dict = {
# "task_id": "test",
# "subtask_id": "test",
# "portfolio_id": 569,
# "scenario_ids": [],
# "default_plans_only": True,
# }
logger.debug("Validating request body")
payload = ExportRequest.model_validate(body_dict)
if payload._scenario_ids_ignored:
logger.warning(
"Received scenario_ids in request body but they will be ignored "
"because default_plans_only is set to True"
)
logger.debug("Successfully validated request body")
process_export(payload)
# TODO: Need to handle the exported files - e.g. upload to s3 and email a presigned url
return {
"statusCode": 200,
"body": json.dumps({}),
}
except Exception as e:
logger.error(f"Failed to process record: {e}")
return {
"statusCode": 500,
"body": json.dumps({"message": "Failed to process export request"}),
}
return {
"statusCode": 201,
"body": json.dumps({"message": "No records to process"}),
}