fixing stict typing issues

This commit is contained in:
Khalim Conn-Kowlessar 2026-02-25 16:45:52 +00:00
parent bf865811c0
commit e645f90b0e
6 changed files with 65 additions and 24 deletions

5
.gitignore vendored
View file

@ -279,4 +279,7 @@ cache/
*.png
*.pptx
local_data*
local_data*
# pyright local config
pyrightconfig.json

25
.idea/watcherTasks.xml generated Normal file
View file

@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectTasksOptions">
<TaskOptions isEnabled="true">
<option name="arguments" value="$FilePath$" />
<option name="checkSyntaxErrors" value="true" />
<option name="description" />
<option name="exitCodeBehavior" value="ERROR" />
<option name="fileExtension" value="py" />
<option name="immediateSync" value="true" />
<option name="name" value="Pyright" />
<option name="output" value="" />
<option name="outputFilters">
<array />
</option>
<option name="outputFromStdout" value="false" />
<option name="program" value="$USER_HOME$/.nvm/versions/node/v18.15.0/bin/pyright" />
<option name="runOnExternalChanges" value="true" />
<option name="scopeName" value="Project Files" />
<option name="trackOnlyRoot" value="false" />
<option name="workingDir" value="$ProjectFileDir$" />
<envs />
</TaskOptions>
</component>
</project>

View file

@ -1,5 +1,5 @@
from typing import Optional, Union, List
from pydantic import BaseModel, model_validator
from pydantic import BaseModel, model_validator, PrivateAttr
class ExportRequest(BaseModel):
@ -15,7 +15,10 @@ class ExportRequest(BaseModel):
# boolean which will overwrite the scenario ids. If this is true, we will only export the default plan for each
# property and will ignore the scenario ids
default_plans_only: Optional[bool] = False
# Private attribute to indicate whether scenario_ids should be ignored due to default_plans_only being True
_scenario_ids_ignored: bool = PrivateAttr(default=False)
@model_validator(mode="after")
def validate_default_plan_override(self):
"""

View file

@ -1,5 +1,5 @@
import json
from typing import Optional, Any, Mapping, Dict, Union
from typing import Optional, Any, Mapping, Dict, Union, List
import pandas as pd
from sqlalchemy.orm import Session
@ -13,6 +13,12 @@ from utils.logger import setup_logger
logger = setup_logger()
def choose_group_keys(payload: ExportRequest) -> List[int]:
if payload.default_plans_only:
return [] # Single export, no scenario grouping
return payload.scenario_ids or []
def process_export(payload: ExportRequest, session: Session) -> Dict[Union[str, int], pd.DataFrame]:
export_files: Dict[Union[str, int], pd.DataFrame] = {}
@ -22,33 +28,28 @@ def process_export(payload: ExportRequest, session: Session) -> Dict[Union[str,
logger.info("Retrieved %s properties for export", len(properties_df))
plans_df = db_methods.get_latest_plans(
plans_df: pd.DataFrame = db_methods.get_latest_plans(
portfolio_id=payload.portfolio_id,
scenario_ids=payload.scenario_ids,
default_only=payload.default_plans_only,
default_only=bool(payload.default_plans_only),
)
logger.info("Retrieved %s plans for export", len(plans_df))
if plans_df.empty:
return export_files
recommendations_df = db_methods.get_recommendations(
plans_df["id"].tolist()
)
plan_ids: List[int] = plans_df["id"].tolist()
recommendations_df: pd.DataFrame = db_methods.get_recommendations(plan_ids)
recommendations_df = db_methods.attach_materials(recommendations_df)
if payload.default_plans_only:
group_keys = [None] # Single export, no scenario grouping
else:
group_keys = payload.scenario_ids
group_keys: List[Union[str, int]] = choose_group_keys(payload)
for group_key in group_keys:
if payload.default_plans_only:
scenario_recs = recommendations_df
export_label = "default_plans"
export_label: Union[str, int] = "default_plans"
else:
scenario_recs = recommendations_df[
recommendations_df["scenario_id"] == group_key
@ -62,7 +63,7 @@ def process_export(payload: ExportRequest, session: Session) -> Dict[Union[str,
["property_id", "measure_type", "plan_name", "estimated_cost"]
].drop_duplicates()
pivot = measures_df.pivot(
pivot: pd.DataFrame = measures_df.pivot(
index=["property_id", "plan_name"],
columns="measure_type",
values="estimated_cost",
@ -72,13 +73,13 @@ def process_export(payload: ExportRequest, session: Session) -> Dict[Union[str,
pivot.drop(columns=["property_id", "plan_name"]).sum(axis=1)
)
post_sap = (
post_sap: pd.DataFrame = (
scenario_recs.groupby("property_id")[["sap_points"]]
.sum()
.reset_index()
)
df = (
df: pd.DataFrame = (
properties_df.rename(columns={"solar_pv": "existing_solar_pv"})
.merge(pivot, how="left", on="property_id")
.merge(post_sap, how="left", on="property_id")
@ -86,9 +87,7 @@ def process_export(payload: ExportRequest, session: Session) -> Dict[Union[str,
df["sap_points"] = df["sap_points"].fillna(0)
df["predicted_post_works_sap"] = df["current_sap_points"] + df["sap_points"]
df["predicted_post_works_epc"] = df[
"predicted_post_works_sap"
].apply(sap_to_epc)
df["predicted_post_works_epc"] = df["predicted_post_works_sap"].apply(sap_to_epc)
export_files[export_label] = df
@ -99,7 +98,7 @@ def process_export(payload: ExportRequest, session: Session) -> Dict[Union[str,
# Lambda Handler
# ============================================================
def handler(event: Mapping[str, Any], context: Optional[Any]) -> Mapping[str, int | str]:
def handler(event: Mapping[str, Any], context: Optional[Any]) -> Mapping[str, Union[int, str]]:
"""
Lambda event should have the following structure:
1) task id - unique identifier for the export task (optional, can be used for tracking/logging)
@ -128,7 +127,7 @@ def handler(event: Mapping[str, Any], context: Optional[Any]) -> Mapping[str, in
logger.debug("Validating request body")
payload = ExportRequest.model_validate(body_dict)
if payload._scenario_ids_ignored:
if getattr(payload, "_scenario_ids_ignored", False):
logger.warning(
"Received scenario_ids in request body but they will be ignored "
"because default_plans_only is set to True"
@ -139,7 +138,7 @@ def handler(event: Mapping[str, Any], context: Optional[Any]) -> Mapping[str, in
exported_files = process_export(payload, session)
# TODO: Need to handle the exported files - e.g. upload to s3 and email a presigned url
_ = exported_files
return {
"statusCode": 200,
"body": json.dumps({}),

3
pyproject.toml Normal file
View file

@ -0,0 +1,3 @@
[tool.pyright]
reportUnknownMemberType = false
reportUnknownVariableType = false

8
pyrightconfig.json Normal file
View file

@ -0,0 +1,8 @@
{
"typeCheckingMode": "strict",
"venvPath": "/Users/khalimconn-kowlessar/opt/anaconda3/envs/",
"venv": "Fastapi-backend",
"include": [
"."
]
}