mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
Merge pull request #524 from Hestia-Homes/eco-eligiblity-bug
Eco eligiblity bug
This commit is contained in:
commit
e8f5f09263
16 changed files with 797 additions and 239 deletions
|
|
@ -298,13 +298,13 @@ def app():
|
|||
landlord_block_reference = None
|
||||
|
||||
# Project from Nick
|
||||
data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/sfr/Sep2025 Project"
|
||||
data_filename = "AL Test.xlsx"
|
||||
sheet_name = "Sheet1"
|
||||
postcode_column = 'postcode'
|
||||
data_folder = "/Users/khalimconn-kowlessar/Documents/hestia/Customers/sfr/October 2025 AL portfolio"
|
||||
data_filename = "22.10 AL Portfolio.xlsx"
|
||||
sheet_name = "22.10 AL Portfolio"
|
||||
postcode_column = 'Postcode'
|
||||
address1_column = None
|
||||
address1_method = 'house_number_extraction'
|
||||
fulladdress_column = "address"
|
||||
fulladdress_column = "Address"
|
||||
address_cols_to_concat = []
|
||||
missing_postcodes_method = None
|
||||
landlord_year_built = None
|
||||
|
|
@ -315,7 +315,7 @@ def app():
|
|||
landlord_roof_construction = None
|
||||
landlord_heating_system = None
|
||||
landlord_existing_pv = None
|
||||
landlord_property_id = "row_id"
|
||||
landlord_property_id = "Row ID"
|
||||
landlord_sap = None
|
||||
outcomes_filename = None
|
||||
outcomes_sheetname = None
|
||||
|
|
|
|||
|
|
@ -65,6 +65,7 @@ class Property:
|
|||
# Surplus information, that can be provided as optional inputs, by a customer
|
||||
n_bathrooms = None
|
||||
n_bedrooms = None
|
||||
landlord_property_id = None # unique reference for the property as recognised by the landlord
|
||||
building_id = None # Used to group properties together into a single building
|
||||
|
||||
# Contains the solar panel optimisation results from the Google Solar API
|
||||
|
|
@ -265,8 +266,9 @@ class Property:
|
|||
"number_of_floors": number_of_floors,
|
||||
"insulation_floor_area": insulation_floor_area,
|
||||
"insulation_wall_area": insulation_wall_area,
|
||||
"building_id": kwargs.get("building_id", None),
|
||||
"floor_area": floor_area
|
||||
"building_id": kwargs.get("building_id", kwargs.get("landlord_block_reference", None)),
|
||||
"floor_area": floor_area,
|
||||
"landlord_property_id": kwargs.get("landlord_property_id"),
|
||||
}
|
||||
|
||||
def parse_kwargs(self, kwargs):
|
||||
|
|
|
|||
|
|
@ -479,9 +479,7 @@ class GoogleSolarApi:
|
|||
|
||||
roi_results = pd.DataFrame(roi_results)
|
||||
|
||||
panel_performance = panel_performance.merge(
|
||||
roi_results, how="left", on="n_panels"
|
||||
)
|
||||
panel_performance = panel_performance.merge(roi_results, how="left", on="n_panels")
|
||||
|
||||
# We want max roi, minimal generation deficit, and max generation value - we create a ranking score
|
||||
# Assign equal weights to each metric
|
||||
|
|
@ -742,7 +740,7 @@ class GoogleSolarApi:
|
|||
@classmethod
|
||||
def building_solar_analysis(
|
||||
cls, building_solar_config: List, input_properties: List[Property], session, google_solar_api_key: str,
|
||||
solar_materials: list
|
||||
solar_materials: list,
|
||||
):
|
||||
"""
|
||||
Perform the solar analysis for the building level
|
||||
|
|
@ -826,9 +824,21 @@ class GoogleSolarApi:
|
|||
@classmethod
|
||||
def unit_solar_analysis(
|
||||
cls, unit_solar_config: List, input_properties: List[Property], session, body, google_solar_api_key: str,
|
||||
solar_materials: list
|
||||
solar_materials: list, inspections_map: dict
|
||||
):
|
||||
|
||||
"""
|
||||
Perform the solar analysis for the unit level
|
||||
:param unit_solar_config: List of unit solar configurations
|
||||
:param input_properties: List of properties
|
||||
:param session: Database session
|
||||
:param body: PlanTriggerRequest instance
|
||||
:param google_solar_api_key: Google Solar API key
|
||||
:param solar_materials: List of solar materials
|
||||
:param inspections_map: Dictionary mapping property IDs to inspection data
|
||||
:return:
|
||||
"""
|
||||
|
||||
if not unit_solar_config:
|
||||
return input_properties
|
||||
|
||||
|
|
@ -879,6 +889,15 @@ class GoogleSolarApi:
|
|||
property_instance=property_instance,
|
||||
)
|
||||
|
||||
property_inspections = inspections_map.get(property_instance.id, {})
|
||||
|
||||
if property_inspections:
|
||||
# If we have some inspections data, we check if we have some data which indicates solar cannot
|
||||
# be installed. We're loose about this now since this is post review
|
||||
if solar_api_client.panel_performance.empty:
|
||||
# We assume solar is a suitable option
|
||||
solar_api_client.panel_performance = solar_api_client.default_panel_performance(property_instance)
|
||||
|
||||
# Store the data in the database
|
||||
solar_api_client.save_to_db(
|
||||
session=session,
|
||||
|
|
@ -923,12 +942,43 @@ class GoogleSolarApi:
|
|||
None
|
||||
)
|
||||
|
||||
if material_1_6 is None or material_3_2 is None:
|
||||
material_4_35 = next(
|
||||
(m for m in self.solar_materials if m["type"] == "solar_pv" and
|
||||
abs(m["size"] - 4.35) < 0.1 and not m["includes_battery"]),
|
||||
None
|
||||
)
|
||||
|
||||
if material_1_6 is None or material_3_2 is None or material_4_35 is None:
|
||||
raise ValueError("No suitable solar product found for the default configuration.")
|
||||
|
||||
# We return a 1.6 and 3.2 kwp system
|
||||
panel_performance = pd.DataFrame(
|
||||
[
|
||||
{
|
||||
'n_panels': 10,
|
||||
'yearly_dc_energy': 4350 * assumptions.MEDIAN_WATTAGE_TO_DC,
|
||||
'total_cost': cost_instance.solar_pv(
|
||||
solar_product=material_4_35,
|
||||
scaffolding_options=[
|
||||
{"total_cost": 1000, "size": property_instance.number_of_floors},
|
||||
{"total_cost": 1000, "size": 3}
|
||||
],
|
||||
n_floors=property_instance.number_of_floors
|
||||
)["total"],
|
||||
'weighted_ratio': None,
|
||||
'panneled_roof_area': 9 * assumptions.RDSAP_AREA_PER_PANEL,
|
||||
'array_wattage': 4350,
|
||||
'initial_ac_kwh_per_year': 4350 * assumptions.MEDIAN_WATTAGE_TO_AC,
|
||||
'lifetime_ac_kwh': None,
|
||||
'lifetime_dc_kwh': None,
|
||||
'roi': None,
|
||||
'generation_value': None,
|
||||
'generation_deficit': None,
|
||||
'expected_payback_years': None,
|
||||
'surplus': None,
|
||||
'combined_score': None,
|
||||
'rank': None
|
||||
},
|
||||
{
|
||||
'n_panels': 8,
|
||||
'yearly_dc_energy': 3200 * assumptions.MEDIAN_WATTAGE_TO_DC,
|
||||
|
|
|
|||
214
backend/app/db/functions/inspections_functions.py
Normal file
214
backend/app/db/functions/inspections_functions.py
Normal file
|
|
@ -0,0 +1,214 @@
|
|||
import re
|
||||
from dataclasses import dataclass, asdict
|
||||
from typing import Optional, Dict, Any, Type, TypeVar
|
||||
from sqlalchemy.orm import Session
|
||||
from datetime import timezone
|
||||
|
||||
from enum import Enum
|
||||
from datetime import datetime, timedelta
|
||||
import math
|
||||
import pytz
|
||||
import enum
|
||||
|
||||
from backend.app.db.models.inspections import (
|
||||
InspectionModel,
|
||||
InspectionArchetype,
|
||||
InspectionArchetype2,
|
||||
InspectionsWallConstruction,
|
||||
InspectionsWallInsulation,
|
||||
InspectionsInsulationMaterial,
|
||||
InspectionBorescoped,
|
||||
InspectionsRoofOrientation,
|
||||
InspectionsTileHung,
|
||||
InspectionsRendered,
|
||||
InspectionsCladding,
|
||||
InspectionsAccessIssues,
|
||||
)
|
||||
from sqlalchemy.dialects.postgresql import insert
|
||||
|
||||
NON_INTRUSIVE_PREFIX = "non-intrusives:"
|
||||
|
||||
|
||||
@dataclass
|
||||
class InspectionData:
|
||||
archetype: Optional[InspectionArchetype] = None
|
||||
archetype_2: Optional[InspectionArchetype2] = None
|
||||
wall_construction: Optional[InspectionsWallConstruction] = None
|
||||
insulation: Optional[InspectionsWallInsulation] = None
|
||||
insulation_material: Optional[InspectionsInsulationMaterial] = None
|
||||
borescoped: Optional[InspectionBorescoped] = None
|
||||
roof_orientation: Optional[InspectionsRoofOrientation] = None
|
||||
tile_hung: Optional[InspectionsTileHung] = None
|
||||
rendered: Optional[InspectionsRendered] = None
|
||||
cladding: Optional[InspectionsCladding] = None
|
||||
access_issues: Optional[InspectionsAccessIssues] = None
|
||||
date: Optional[datetime] = None # Reflects the date when the survey was actually conducted
|
||||
notes: Optional[str] = None
|
||||
surveyor_name: Optional[str] = None
|
||||
|
||||
|
||||
def _clean_string(value: Any) -> Optional[str]:
|
||||
"""Normalize strings for enum matching, tolerant of NaN/None."""
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, float) and math.isnan(value):
|
||||
return None
|
||||
if not isinstance(value, str):
|
||||
return None
|
||||
|
||||
v = (
|
||||
value.strip()
|
||||
.lower()
|
||||
.replace("“", '"')
|
||||
.replace("”", '"')
|
||||
.replace("’", "'")
|
||||
)
|
||||
return re.sub(r"\s+", " ", v)
|
||||
|
||||
|
||||
E = TypeVar("E", bound=Enum)
|
||||
|
||||
|
||||
def _match_enum(value: Any, enum_cls: Type[E]) -> Optional[E]:
|
||||
"""Case-insensitive fuzzy matching for enums, tolerant of NaN/None."""
|
||||
v = _clean_string(value)
|
||||
if not v:
|
||||
return None
|
||||
|
||||
for e in enum_cls:
|
||||
if v == e.value.lower():
|
||||
return e
|
||||
|
||||
for e in enum_cls:
|
||||
if v in e.value.lower() or e.value.lower() in v:
|
||||
return e
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _lower_key_dict(d: dict) -> dict:
|
||||
"""Convert all keys to lowercase for case-insensitive lookup."""
|
||||
return {str(k).lower(): v for k, v in d.items() if isinstance(k, str)}
|
||||
|
||||
|
||||
def extract_inspection_data(config: Dict[str, Any]) -> Optional[InspectionData]:
|
||||
"""Extract and map inspection data from a config row."""
|
||||
config_lower = _lower_key_dict(config)
|
||||
|
||||
non_intrusive_fields = {
|
||||
k: v for k, v in config_lower.items()
|
||||
if k.startswith(NON_INTRUSIVE_PREFIX)
|
||||
}
|
||||
|
||||
if not non_intrusive_fields:
|
||||
return None
|
||||
|
||||
data = InspectionData()
|
||||
|
||||
data.archetype = _match_enum(
|
||||
config_lower.get("non-intrusives: archetype"), InspectionArchetype
|
||||
)
|
||||
data.archetype_2 = _match_enum(
|
||||
config_lower.get("non-intrusives: archetype 2"), InspectionArchetype2
|
||||
)
|
||||
data.wall_construction = _match_enum(
|
||||
config_lower.get("non-intrusives: construction"), InspectionsWallConstruction
|
||||
)
|
||||
data.insulation = _match_enum(
|
||||
config_lower.get("non-intrusives: insulated"), InspectionsWallInsulation
|
||||
)
|
||||
data.insulation_material = _match_enum(
|
||||
config_lower.get("non-intrusives: material"), InspectionsInsulationMaterial
|
||||
)
|
||||
data.borescoped = _match_enum(
|
||||
config_lower.get("non-intrusives: boroscoped?"), InspectionBorescoped
|
||||
)
|
||||
data.roof_orientation = _match_enum(
|
||||
config_lower.get("non-intrusives: roof orientation"), InspectionsRoofOrientation
|
||||
)
|
||||
data.tile_hung = _match_enum(
|
||||
config_lower.get("non-intrusives: tile hung"), InspectionsTileHung
|
||||
)
|
||||
data.rendered = _match_enum(
|
||||
config_lower.get("non-intrusives: rendered"), InspectionsRendered
|
||||
)
|
||||
data.cladding = _match_enum(
|
||||
config_lower.get("non-intrusives: cladding"), InspectionsCladding
|
||||
)
|
||||
data.access_issues = _match_enum(
|
||||
config_lower.get("non-intrusives: access issues"), InspectionsAccessIssues
|
||||
)
|
||||
|
||||
data.date = config_lower.get("non-intrusives: date")
|
||||
data.notes = config_lower.get("non-intrusives: further surveyor notes")
|
||||
# convert surveyor name to title case if present
|
||||
data.surveyor_name = config_lower.get("non-intrusives: name of surveyor").title() if config_lower.get(
|
||||
"non-intrusives: name of surveyor") else None
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def bulk_upsert_inspections_pg(session: Session, inspections_map):
|
||||
"""
|
||||
Bulk insert/update inspection records:
|
||||
- 'created_at' = actual survey date
|
||||
- 'uploaded_at' = time of upload or update
|
||||
- If an inspection exists for the same property on the same date → overwrite
|
||||
- Otherwise → insert a new record
|
||||
"""
|
||||
|
||||
if not inspections_map:
|
||||
return
|
||||
|
||||
now = datetime.now(pytz.utc)
|
||||
|
||||
for property_id, data in inspections_map.items():
|
||||
# Extract survey date from the data
|
||||
record = asdict(data)
|
||||
survey_date = getattr(data, "survey_date", None) or record.get("survey_date")
|
||||
|
||||
if not survey_date:
|
||||
continue # skip if no survey date available
|
||||
|
||||
# Convert to UTC datetime if needed
|
||||
if hasattr(survey_date, "to_pydatetime"):
|
||||
survey_date = survey_date.to_pydatetime()
|
||||
if survey_date.tzinfo is None:
|
||||
survey_date = survey_date.replace(tzinfo=pytz.utc)
|
||||
|
||||
record["property_id"] = property_id
|
||||
record["created_at"] = survey_date
|
||||
record["uploaded_at"] = now
|
||||
|
||||
# Normalize enums and NaNs
|
||||
for key, value in record.items():
|
||||
if isinstance(value, enum.Enum):
|
||||
record[key] = value.value
|
||||
elif isinstance(value, float) and math.isnan(value):
|
||||
record[key] = None
|
||||
|
||||
# Find existing inspection *for same property on same day*
|
||||
start_of_day = survey_date.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
end_of_day = start_of_day + timedelta(days=1)
|
||||
|
||||
existing_inspection = (
|
||||
session.query(InspectionModel)
|
||||
.filter(
|
||||
InspectionModel.property_id == property_id,
|
||||
InspectionModel.created_at >= start_of_day,
|
||||
InspectionModel.created_at < end_of_day,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if existing_inspection:
|
||||
# Overwrite existing record (same survey day)
|
||||
for field, value in record.items():
|
||||
setattr(existing_inspection, field, value)
|
||||
existing_inspection.uploaded_at = now
|
||||
else:
|
||||
# Create new inspection for new day
|
||||
new_inspection = InspectionModel(**record)
|
||||
session.add(new_inspection)
|
||||
|
||||
session.flush()
|
||||
|
|
@ -12,7 +12,7 @@ from sqlalchemy.orm.exc import NoResultFound
|
|||
|
||||
|
||||
def create_property(session: Session, portfolio_id: int, address: str, postcode: str, uprn: str,
|
||||
energy_assessment: dict) -> (int, bool):
|
||||
energy_assessment: dict, landlord_property_id: str | None = None) -> (int, bool):
|
||||
"""
|
||||
This function will create a record for the property in the database if it does not exist.
|
||||
If it does exist, it will just update the updated_at field.
|
||||
|
|
@ -20,6 +20,9 @@ def create_property(session: Session, portfolio_id: int, address: str, postcode:
|
|||
:param portfolio_id: The ID of the portfolio the property belongs to
|
||||
:param address: The address of the property
|
||||
:param postcode: The postcode of the property
|
||||
:param uprn: The UPRN of the property
|
||||
:param energy_assessment: The energy assessment data for the property
|
||||
:param landlord_property_id: The landlord property ID if available
|
||||
:return: The ID of the property and a boolean indicating whether it was created or not
|
||||
"""
|
||||
|
||||
|
|
@ -49,6 +52,7 @@ def create_property(session: Session, portfolio_id: int, address: str, postcode:
|
|||
postcode=postcode,
|
||||
portfolio_id=portfolio_id,
|
||||
uprn=uprn,
|
||||
landlord_property_id=landlord_property_id,
|
||||
creation_status=PropertyCreationStatus.LOADING,
|
||||
status=status,
|
||||
has_pre_condition_report=False,
|
||||
|
|
@ -63,6 +67,30 @@ def create_property(session: Session, portfolio_id: int, address: str, postcode:
|
|||
return new_property.id, True
|
||||
|
||||
|
||||
def ensure_property_exists(session, body, epc_searcher, energy_assessment, landlord_property_id=None):
|
||||
"""
|
||||
Wrapper funtion which checks if a property is new and will return the roperty type if not
|
||||
:param session:
|
||||
:param body:
|
||||
:param epc_searcher:
|
||||
:param energy_assessment:
|
||||
:param landlord_property_id:
|
||||
:return:
|
||||
"""
|
||||
property_id, is_new = create_property(
|
||||
session=session,
|
||||
portfolio_id=body.portfolio_id,
|
||||
address=epc_searcher.address_clean,
|
||||
postcode=epc_searcher.postcode_clean,
|
||||
uprn=epc_searcher.uprn,
|
||||
energy_assessment=energy_assessment,
|
||||
landlord_property_id=str(landlord_property_id) if landlord_property_id is not None else None
|
||||
)
|
||||
if not is_new and not body.multi_plan:
|
||||
return None, False
|
||||
return property_id, is_new
|
||||
|
||||
|
||||
def create_property_targets(
|
||||
session: Session, property_id: int, portfolio_id: int, epc_target=None, heat_demand_target=None
|
||||
):
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ from sqlalchemy import (
|
|||
ForeignKey,
|
||||
)
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from backend.app.db.models.portfolio import PropertyModel
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
|
|
@ -138,19 +139,117 @@ class InspectionModel(Base):
|
|||
__tablename__ = "inspections"
|
||||
|
||||
id = Column(BigInteger, primary_key=True, autoincrement=True)
|
||||
property_id = Column(BigInteger, ForeignKey("property.id"), nullable=False)
|
||||
property_id = Column(BigInteger, ForeignKey(PropertyModel.id), nullable=False)
|
||||
|
||||
archetype = Column(Enum(InspectionArchetype), nullable=True)
|
||||
archetype_2 = Column(Enum(InspectionArchetype2), nullable=True)
|
||||
wall_construction = Column(Enum(InspectionsWallConstruction), nullable=True)
|
||||
insulation = Column(Enum(InspectionsWallInsulation), nullable=True)
|
||||
insulation_material = Column(Enum(InspectionsInsulationMaterial), nullable=True)
|
||||
borescoped = Column(Enum(InspectionBorescoped), nullable=True)
|
||||
roof_orientation = Column(Enum(InspectionsRoofOrientation), nullable=True)
|
||||
tile_hung = Column(Enum(InspectionsTileHung), nullable=True)
|
||||
rendered = Column(Enum(InspectionsRendered), nullable=True)
|
||||
cladding = Column(Enum(InspectionsCladding), nullable=True)
|
||||
access_issues = Column(Enum(InspectionsAccessIssues), nullable=True)
|
||||
archetype = Column(
|
||||
Enum(
|
||||
InspectionArchetype,
|
||||
name="inspection_archetype",
|
||||
values_callable=lambda e: [m.value for m in e],
|
||||
create_type=False,
|
||||
),
|
||||
nullable=True,
|
||||
)
|
||||
|
||||
archetype_2 = Column(
|
||||
Enum(
|
||||
InspectionArchetype2,
|
||||
name="inspection_archetype_2",
|
||||
values_callable=lambda e: [m.value for m in e],
|
||||
create_type=False,
|
||||
),
|
||||
nullable=True,
|
||||
)
|
||||
|
||||
wall_construction = Column(
|
||||
Enum(
|
||||
InspectionsWallConstruction,
|
||||
name="inspections_wall_construction",
|
||||
values_callable=lambda e: [m.value for m in e],
|
||||
create_type=False,
|
||||
),
|
||||
nullable=True,
|
||||
)
|
||||
|
||||
insulation = Column(
|
||||
Enum(
|
||||
InspectionsWallInsulation,
|
||||
name="inspections_wall_insulation",
|
||||
values_callable=lambda e: [m.value for m in e],
|
||||
create_type=False,
|
||||
),
|
||||
nullable=True,
|
||||
)
|
||||
|
||||
insulation_material = Column(
|
||||
Enum(
|
||||
InspectionsInsulationMaterial,
|
||||
name="inspections_insulation_material",
|
||||
values_callable=lambda e: [m.value for m in e],
|
||||
create_type=False,
|
||||
),
|
||||
nullable=True,
|
||||
)
|
||||
|
||||
borescoped = Column(
|
||||
Enum(
|
||||
InspectionBorescoped,
|
||||
name="inspection_borescoped",
|
||||
values_callable=lambda e: [m.value for m in e],
|
||||
create_type=False,
|
||||
),
|
||||
nullable=True,
|
||||
)
|
||||
|
||||
roof_orientation = Column(
|
||||
Enum(
|
||||
InspectionsRoofOrientation,
|
||||
name="inspections_roof_orientation",
|
||||
values_callable=lambda e: [m.value for m in e],
|
||||
create_type=False,
|
||||
),
|
||||
nullable=True,
|
||||
)
|
||||
|
||||
tile_hung = Column(
|
||||
Enum(
|
||||
InspectionsTileHung,
|
||||
name="inspections_tile_hung",
|
||||
values_callable=lambda e: [m.value for m in e],
|
||||
create_type=False,
|
||||
),
|
||||
nullable=True,
|
||||
)
|
||||
|
||||
rendered = Column(
|
||||
Enum(
|
||||
InspectionsRendered,
|
||||
name="inspections_rendered",
|
||||
values_callable=lambda e: [m.value for m in e],
|
||||
create_type=False,
|
||||
),
|
||||
nullable=True,
|
||||
)
|
||||
|
||||
cladding = Column(
|
||||
Enum(
|
||||
InspectionsCladding,
|
||||
name="inspections_cladding",
|
||||
values_callable=lambda e: [m.value for m in e],
|
||||
create_type=False,
|
||||
),
|
||||
nullable=True,
|
||||
)
|
||||
|
||||
access_issues = Column(
|
||||
Enum(
|
||||
InspectionsAccessIssues,
|
||||
name="inspections_access_issues",
|
||||
values_callable=lambda e: [m.value for m in e],
|
||||
create_type=False,
|
||||
),
|
||||
nullable=True,
|
||||
)
|
||||
|
||||
notes = Column(Text)
|
||||
surveyor_name = Column(Text)
|
||||
|
|
|
|||
|
|
@ -86,6 +86,7 @@ class PropertyModel(Base):
|
|||
portfolio_id = Column(Integer, ForeignKey('portfolio.id'), nullable=False)
|
||||
creation_status = Column(Enum(PropertyCreationStatus), nullable=False)
|
||||
uprn = Column(Integer)
|
||||
landlord_property_id = Column(Text)
|
||||
building_reference_number = Column(Integer)
|
||||
status = Column(Enum(PortfolioStatus, values_callable=lambda x: [e.value for e in x]), nullable=False)
|
||||
address = Column(Text)
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ from sqlalchemy.sql import func
|
|||
from backend.app.db.models.portfolio import Portfolio, PropertyModel
|
||||
from backend.app.db.models.materials import Material
|
||||
from datatypes.enums import QuantityUnits
|
||||
import enum
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
|
|
@ -47,6 +48,14 @@ class RecommendationMaterials(Base):
|
|||
estimated_cost = Column(Float, nullable=False)
|
||||
|
||||
|
||||
class PlanTypeEnum(enum.Enum):
|
||||
SOLAR_ECO4 = "solar_eco4"
|
||||
SOLAR_HHRSH_ECO4 = "solar_hhrsh_eco4"
|
||||
EMPTY_CAVITY_ECO = "empty_cavity_eco"
|
||||
PARTIAL_CAVITY_ECO = "partial_cavity_eco"
|
||||
EXTRACTION_ECO = "extraction_eco"
|
||||
|
||||
|
||||
class Plan(Base):
|
||||
__tablename__ = 'plan'
|
||||
|
||||
|
|
@ -60,6 +69,15 @@ class Plan(Base):
|
|||
valuation_increase_lower_bound = Column(Float)
|
||||
valuation_increase_upper_bound = Column(Float)
|
||||
valuation_increase_average = Column(Float)
|
||||
plan_type = Column(
|
||||
Enum(
|
||||
PlanTypeEnum,
|
||||
name="plan_type",
|
||||
values_callable=lambda e: [m.value for m in e],
|
||||
create_type=False,
|
||||
),
|
||||
nullable=True,
|
||||
)
|
||||
|
||||
|
||||
class PlanRecommendations(Base):
|
||||
|
|
|
|||
10
backend/app/plan/data_classes.py
Normal file
10
backend/app/plan/data_classes.py
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
from dataclasses import dataclass
|
||||
from typing import Any, Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class PropertyRequestData:
|
||||
patch: dict
|
||||
already_installed: dict
|
||||
non_invasive_recommendations: dict
|
||||
valuation: Optional[float]
|
||||
|
|
@ -55,7 +55,7 @@ MEASURE_MAP = {
|
|||
|
||||
VALID_GOALS = ["Increasing EPC", "Energy Savings", "Reducing CO2 emissions"]
|
||||
VALID_HOUSING_TYPES = ["Social", "Private"]
|
||||
VALID_EVENT_TYPES = ["remote_assessment"]
|
||||
VALID_EVENT_TYPES = ["remote_assessment", "eco_project"]
|
||||
|
||||
|
||||
# Define the validation function for inclusions/exclusions
|
||||
|
|
@ -113,7 +113,7 @@ class PlanTriggerRequest(BaseModel):
|
|||
|
||||
# When performing a remote assessment, if this has been set, it will allow the engine to
|
||||
# pull data from the find my epc website, to utilise as part of a remote assessment
|
||||
event_type: Optional[Literal["remote_assessment"]] = None
|
||||
event_type: Optional[Literal["remote_assessment", "eco_project"]] = None
|
||||
|
||||
# If true, before optimising the engine will select a slightly larger package, to account for the SAP 10 causing
|
||||
# scores to drop by a few points
|
||||
|
|
|
|||
|
|
@ -1,7 +1,8 @@
|
|||
from utils.s3 import read_from_s3
|
||||
|
||||
from backend.app.config import get_settings
|
||||
import msgpack
|
||||
from utils.s3 import read_from_s3
|
||||
from backend.app.config import get_settings
|
||||
from backend.app.plan.data_classes import PropertyRequestData
|
||||
from typing import Any
|
||||
|
||||
|
||||
def get_cleaned():
|
||||
|
|
@ -21,3 +22,169 @@ def get_cleaned():
|
|||
cleaned = msgpack.unpackb(cleaned, raw=False)
|
||||
|
||||
return cleaned
|
||||
|
||||
|
||||
def patch_epc(patch, epc_records):
|
||||
"""
|
||||
This utility function is useful to patch the epc data if we have data from the customer
|
||||
:return:
|
||||
"""
|
||||
|
||||
for patch_variable, patch_value in patch.items():
|
||||
|
||||
if patch_variable in ["address", "postcode"]:
|
||||
continue
|
||||
|
||||
if patch_value == "":
|
||||
continue
|
||||
if patch_variable in epc_records["original_epc"]:
|
||||
epc_records["original_epc"][patch_variable] = patch_value
|
||||
|
||||
return epc_records
|
||||
|
||||
|
||||
def extract_property_request_data(
|
||||
config, patches, already_installed, non_invasive_recommendations, valuation_data, uprn
|
||||
):
|
||||
patch_has_uprn = "uprn" in patches[0] if patches else True
|
||||
if patch_has_uprn:
|
||||
patch = next((
|
||||
x for x in patches if str(x["uprn"]) == str(config["uprn"])
|
||||
), {})
|
||||
else:
|
||||
patch = next((
|
||||
x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
||||
), {})
|
||||
|
||||
property_already_installed = next((
|
||||
x for x in already_installed if
|
||||
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
||||
), {})
|
||||
|
||||
# Because we have some non-invasive recommendations that match on address and postcode, but not UPRN
|
||||
# we need to check existence of uprn
|
||||
has_uprn = "uprn" in non_invasive_recommendations[0] if non_invasive_recommendations else False
|
||||
if has_uprn:
|
||||
has_uprn = non_invasive_recommendations[0]["uprn"] not in ["", None]
|
||||
|
||||
if has_uprn:
|
||||
property_non_invasive_recommendations = next((
|
||||
x for x in non_invasive_recommendations if
|
||||
(str(x["uprn"]) == str(uprn))
|
||||
), {})
|
||||
|
||||
# We patch the non-invasive recs that are ['cavity_extract_and_refill']
|
||||
else:
|
||||
property_non_invasive_recommendations = next((
|
||||
x for x in non_invasive_recommendations if
|
||||
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
||||
), {})
|
||||
|
||||
if isinstance(property_non_invasive_recommendations.get("recommendations"), str):
|
||||
property_non_invasive_recommendations["recommendations"] = ast.literal_eval(
|
||||
property_non_invasive_recommendations["recommendations"]
|
||||
)
|
||||
transformed = []
|
||||
for rec in property_non_invasive_recommendations["recommendations"]:
|
||||
if isinstance(rec, str):
|
||||
transformed.append({"type": rec, })
|
||||
else:
|
||||
transformed.append(rec)
|
||||
|
||||
property_non_invasive_recommendations["recommendations"] = transformed
|
||||
|
||||
# Check if the valuation data has uprn
|
||||
valuation_has_uprn = "uprn" in valuation_data[0] if valuation_data else False
|
||||
if valuation_has_uprn:
|
||||
valuation_has_uprn = valuation_data[0]["uprn"] not in ["", None]
|
||||
|
||||
if valuation_has_uprn:
|
||||
property_valuation = next((
|
||||
float(x["valuation"]) for x in valuation_data if
|
||||
(str(x["uprn"]) == str(uprn))
|
||||
), None)
|
||||
else:
|
||||
property_valuation = next((
|
||||
float(x["valuation"]) for x in valuation_data if
|
||||
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
||||
), None)
|
||||
|
||||
# Return data class to give a structured format
|
||||
return PropertyRequestData(
|
||||
patch=patch,
|
||||
already_installed=property_already_installed,
|
||||
non_invasive_recommendations=property_non_invasive_recommendations,
|
||||
valuation=property_valuation
|
||||
)
|
||||
|
||||
|
||||
def parse_eco_packages(config: dict[str, Any]) -> tuple[list[str], int, str] | tuple[None, None, None]:
|
||||
solar_identification = config.get("solar_reason", None)
|
||||
cavity_identification = config.get("cavity_reason", None)
|
||||
if not solar_identification and not cavity_identification:
|
||||
return None, None, None
|
||||
|
||||
# We map the categories to the desired measures and upgrade targets
|
||||
# We note that the categories are placeholder until we move the standardised asset list
|
||||
|
||||
identification_map = {
|
||||
"Solar Eligible": {
|
||||
"measures": ["solar_pv", "loft_insulation", "mechanical_ventilation"],
|
||||
"target_sap": 86, # High B
|
||||
"plan_type": "solar_eco4"
|
||||
},
|
||||
"Solar Eligible, Solid Wall Uninsulated, EPC E or Below": {
|
||||
"measures": ["solar_pv", "loft_insulation", "mechanical_ventilation"],
|
||||
"target_sap": 86, # High B
|
||||
"plan_type": "solar_eco4"
|
||||
},
|
||||
"Solar Eligible, Needs Heating Upgrade": {
|
||||
"measures": ["solar_pv", "loft_insulation", "high_heat_retention_storage_heater", "mechanical_ventilation"],
|
||||
"target_sap": 86, # High B
|
||||
"plan_type": "solar_hhrsh_eco4"
|
||||
},
|
||||
"Non-Intrusive Data Shows Empty Cavity": {
|
||||
"measures": ["cavity_wall_insulation", "mechanical_ventilation"],
|
||||
"target_sap": 69, # Low C
|
||||
"plan_type": "empty_cavity_eco"
|
||||
},
|
||||
'Non-Intrusive Data Shows Empty Cavity, built after 2002': {
|
||||
"measures": ["cavity_wall_insulation", "mechanical_ventilation"],
|
||||
"target_sap": 69, # Low C
|
||||
"plan_type": "empty_cavity_eco"
|
||||
},
|
||||
"EPC Shows Empty Cavity, inspections show retro drilled": {
|
||||
# EPC Indicates it's empty, so we simulate a fill
|
||||
"measures": ["cavity_wall_insulation", "mechanical_ventilation"],
|
||||
"target_sap": 69, # Low C
|
||||
"plan_type": "extraction_eco"
|
||||
},
|
||||
"EPC Shows Empty Cavity, inspections show filled at build": {
|
||||
# EPC Indicates it's empty, so we simulate a fill
|
||||
"measures": ["cavity_wall_insulation", "mechanical_ventilation"],
|
||||
"target_sap": 69, # Low C
|
||||
"plan_type": "extraction_eco"
|
||||
},
|
||||
"EPC Shows Empty Cavity": {
|
||||
# EPC Indicates it's empty, so we simulate a fill
|
||||
"measures": ["cavity_wall_insulation", "mechanical_ventilation"],
|
||||
"target_sap": 69, # Low C
|
||||
"plan_type": "empty_cavity_eco"
|
||||
}
|
||||
}
|
||||
|
||||
# Always prioritise solar
|
||||
if solar_identification:
|
||||
_key = solar_identification.split(":")[0]
|
||||
else:
|
||||
_key = cavity_identification.split(":")[0]
|
||||
|
||||
mapped = identification_map[_key]
|
||||
return mapped["measures"], mapped["target_sap"], mapped["plan_type"]
|
||||
|
||||
|
||||
def handle_error(session, msg, status=500):
|
||||
# When the pipeline fails, handles error process
|
||||
logger.error(msg, exc_info=True)
|
||||
session.rollback()
|
||||
return Response(status_code=status, content=msg)
|
||||
|
|
|
|||
|
|
@ -17,8 +17,8 @@ from backend.app.db.connection import db_engine
|
|||
from backend.app.db.functions.materials_functions import get_materials
|
||||
from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations
|
||||
from backend.app.db.functions.property_functions import (
|
||||
create_property, create_property_details_epc, create_property_targets, update_property_data,
|
||||
update_or_create_property_spatial_details
|
||||
create_property_details_epc, create_property_targets, update_property_data,
|
||||
update_or_create_property_spatial_details, ensure_property_exists
|
||||
)
|
||||
from backend.app.db.functions.recommendations_functions import (
|
||||
create_plan, upload_recommendations, create_scenario
|
||||
|
|
@ -27,9 +27,14 @@ from backend.app.db.functions.funding_functions import upload_funding
|
|||
from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn
|
||||
from backend.app.db.models.portfolio import rating_lookup
|
||||
from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES
|
||||
from backend.app.plan.utils import get_cleaned
|
||||
from backend.app.plan.utils import (
|
||||
get_cleaned, patch_epc, extract_property_request_data, parse_eco_packages, handle_error
|
||||
)
|
||||
from backend.app.utils import sap_to_epc
|
||||
import backend.app.assumptions as assumptions
|
||||
from backend.app.db.functions.inspections_functions import (
|
||||
extract_inspection_data, bulk_upsert_inspections_pg
|
||||
)
|
||||
|
||||
from backend.ml_models.api import ModelApi
|
||||
from backend.Property import Property
|
||||
|
|
@ -57,25 +62,6 @@ BATCH_SIZE = 5
|
|||
SCORING_BATCH_SIZE = 100
|
||||
|
||||
|
||||
def patch_epc(patch, epc_records):
|
||||
"""
|
||||
This utility function is useful to patch the epc data if we have data from the customer
|
||||
:return:
|
||||
"""
|
||||
|
||||
for patch_variable, patch_value in patch.items():
|
||||
|
||||
if patch_variable in ["address", "postcode"]:
|
||||
continue
|
||||
|
||||
if patch_value == "":
|
||||
continue
|
||||
if patch_variable in epc_records["original_epc"]:
|
||||
epc_records["original_epc"][patch_variable] = patch_value
|
||||
|
||||
return epc_records
|
||||
|
||||
|
||||
def extract_portfolio_aggregation_data(
|
||||
input_properties, total_valuation_increase, recommendations, new_epc_bands, property_value_increase_ranges
|
||||
):
|
||||
|
|
@ -349,75 +335,6 @@ def get_request_property_data(body: PlanTriggerRequest):
|
|||
return patches, already_installed, non_invasive_recommendations, valuation_data
|
||||
|
||||
|
||||
def extract_property_request_data(
|
||||
config, patches, already_installed, non_invasive_recommendations, valuation_data, uprn
|
||||
):
|
||||
patch_has_uprn = "uprn" in patches[0] if patches else True
|
||||
if patch_has_uprn:
|
||||
patch = next((
|
||||
x for x in patches if str(x["uprn"]) == str(config["uprn"])
|
||||
), {})
|
||||
else:
|
||||
patch = next((
|
||||
x for x in patches if (x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
||||
), {})
|
||||
|
||||
property_already_installed = next((
|
||||
x for x in already_installed if
|
||||
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
||||
), {})
|
||||
|
||||
# Because we have some non-invasive recommendations that match on address and postcode, but not UPRN
|
||||
# we need to check existence of uprn
|
||||
has_uprn = "uprn" in non_invasive_recommendations[0] if non_invasive_recommendations else False
|
||||
if has_uprn:
|
||||
has_uprn = non_invasive_recommendations[0]["uprn"] not in ["", None]
|
||||
|
||||
if has_uprn:
|
||||
property_non_invasive_recommendations = next((
|
||||
x for x in non_invasive_recommendations if
|
||||
(str(x["uprn"]) == str(uprn))
|
||||
), {})
|
||||
|
||||
# We patch the non-invasive recs that are ['cavity_extract_and_refill']
|
||||
else:
|
||||
property_non_invasive_recommendations = next((
|
||||
x for x in non_invasive_recommendations if
|
||||
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
||||
), {})
|
||||
|
||||
if isinstance(property_non_invasive_recommendations.get("recommendations"), str):
|
||||
property_non_invasive_recommendations["recommendations"] = ast.literal_eval(
|
||||
property_non_invasive_recommendations["recommendations"]
|
||||
)
|
||||
transformed = []
|
||||
for rec in property_non_invasive_recommendations["recommendations"]:
|
||||
if isinstance(rec, str):
|
||||
transformed.append({"type": rec, })
|
||||
else:
|
||||
transformed.append(rec)
|
||||
|
||||
property_non_invasive_recommendations["recommendations"] = transformed
|
||||
|
||||
# Check if the valuation data has uprn
|
||||
valuation_has_uprn = "uprn" in valuation_data[0] if valuation_data else False
|
||||
if valuation_has_uprn:
|
||||
valuation_has_uprn = valuation_data[0]["uprn"] not in ["", None]
|
||||
|
||||
if valuation_has_uprn:
|
||||
property_valution = next((
|
||||
float(x["valuation"]) for x in valuation_data if
|
||||
(str(x["uprn"]) == str(uprn))
|
||||
), None)
|
||||
else:
|
||||
property_valution = next((
|
||||
float(x["valuation"]) for x in valuation_data if
|
||||
(x["address"] == config["address"]) and (x["postcode"] == config["postcode"])
|
||||
), None)
|
||||
|
||||
return patch, property_already_installed, property_non_invasive_recommendations, property_valution
|
||||
|
||||
|
||||
def get_funding_data():
|
||||
"""
|
||||
This function retrieves the eco project scores matrix and the warm homes local grant funding data
|
||||
|
|
@ -483,8 +400,13 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
plan_input = plan_input.rename(
|
||||
columns={"domna_address_1": "address", "domna_postcode": "postcode", "epc_os_uprn": "uprn"}
|
||||
)
|
||||
# Where the EPC has been estimated, that is because a UPRN wasn't avaialble and so we remote UPRN
|
||||
plan_input["uprn"] = np.where(plan_input["estimated"].isin([1, True]), None, plan_input["uprn"])
|
||||
# Where the EPC has been estimated, that is because a UPRN wasn't avaialble and so we remove UPRN
|
||||
# This will be reflexted
|
||||
plan_input["uprn"] = np.where(
|
||||
plan_input["estimated"].isin([1, True]) & (
|
||||
(plan_input["uprn"] < 0) | pd.isnull(plan_input["uprn"])
|
||||
), None, plan_input["uprn"]
|
||||
)
|
||||
# We handle the landlord property type and built form
|
||||
plan_input["property_type"] = plan_input["landlord_property_type"].copy()
|
||||
if "landlord_built_form" in plan_input.columns:
|
||||
|
|
@ -564,7 +486,7 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
bucket_name=get_settings().DATA_BUCKET, file_key="sap_change_model/cleaning_dataset.parquet",
|
||||
)
|
||||
|
||||
input_properties = []
|
||||
input_properties, inspections_map, eco_packages = [], {}, {}
|
||||
for config in tqdm(plan_input):
|
||||
|
||||
# We validate each record in the file. If the record is NOT valid, we need to handle this accordingly
|
||||
|
|
@ -595,21 +517,20 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None)
|
||||
# For the moment, our OS API access is unavailable, so we skip and interpolate
|
||||
epc_searcher.find_property(skip_os=True)
|
||||
if epc_searcher.newest_epc.get("estimated") and body.file_format == "domna_asset_list":
|
||||
if epc_searcher.newest_epc.get("estimated") and body.file_format == "domna_asset_list" and (
|
||||
epc_searcher.newest_epc["uprn"] < 0
|
||||
):
|
||||
epc_searcher.newest_epc["uprn-source"] = epc_searcher.UPRN_SOURCE_SIMULATED
|
||||
|
||||
# We check for an energy assessment we have performed on this property:
|
||||
energy_assessment = get_latest_assessment_by_uprn(session, uprn if uprn is not None else epc_searcher.uprn)
|
||||
|
||||
# Create a record in db
|
||||
property_id, is_new = create_property(
|
||||
session=session,
|
||||
portfolio_id=body.portfolio_id,
|
||||
address=epc_searcher.address_clean,
|
||||
postcode=epc_searcher.postcode_clean,
|
||||
uprn=epc_searcher.uprn,
|
||||
energy_assessment=energy_assessment
|
||||
property_id, is_new = ensure_property_exists(
|
||||
session, body, epc_searcher, energy_assessment, landlord_property_id=config.get("landlord_property_id")
|
||||
)
|
||||
if not property_id:
|
||||
continue
|
||||
|
||||
if not is_new and not body.multi_plan:
|
||||
continue
|
||||
|
||||
|
|
@ -636,16 +557,17 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
epc_searcher, energy_assessment
|
||||
)
|
||||
|
||||
patch, property_already_installed, property_non_invasive_recommendations, property_valuation = (
|
||||
extract_property_request_data(
|
||||
config=config,
|
||||
patches=patches,
|
||||
already_installed=already_installed,
|
||||
non_invasive_recommendations=non_invasive_recommendations,
|
||||
valuation_data=valuation_data,
|
||||
uprn=epc_searcher.uprn,
|
||||
)
|
||||
req_data = extract_property_request_data(
|
||||
config=config,
|
||||
patches=patches,
|
||||
already_installed=already_installed,
|
||||
non_invasive_recommendations=non_invasive_recommendations,
|
||||
valuation_data=valuation_data,
|
||||
uprn=epc_searcher.uprn,
|
||||
)
|
||||
# Pull this out as it may get overwritten
|
||||
property_non_invasive_recommendations = req_data.non_invasive_recommendations
|
||||
patch = req_data.patch
|
||||
|
||||
# if we have a remote assment data type, we pull the additional data and include it
|
||||
if (body.event_type == "remote_assessment") and not (epc_searcher.newest_epc.get("estimated")):
|
||||
|
|
@ -679,17 +601,31 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
address=epc_searcher.address_clean,
|
||||
postcode=epc_searcher.postcode_clean,
|
||||
epc_record=prepared_epc,
|
||||
already_installed=property_already_installed,
|
||||
property_valuation=property_valuation,
|
||||
already_installed=req_data.already_installed,
|
||||
property_valuation=req_data.valuation,
|
||||
non_invasive_recommendations=property_non_invasive_recommendations,
|
||||
energy_assessment=energy_assessment,
|
||||
**Property.extract_kwargs(config), # TODO: Depraecate this
|
||||
)
|
||||
)
|
||||
|
||||
# If we have an ECO project, we parse the cavity/solar reasons
|
||||
eco_packages[property_id] = parse_eco_packages(config)
|
||||
|
||||
# Final step - extract inspections data, if we have it
|
||||
property_inspections = extract_inspection_data(config)
|
||||
if property_inspections:
|
||||
inspections_map[property_id] = property_inspections
|
||||
|
||||
if not input_properties:
|
||||
return Response(status_code=204)
|
||||
|
||||
# We check if we have inspections data and store it in the database if so. We'll update or create
|
||||
# aginst each property if
|
||||
if inspections_map:
|
||||
logger.info("Inserting inspections data")
|
||||
bulk_upsert_inspections_pg(session, inspections_map)
|
||||
|
||||
# Set up model api and warm up the lambdas
|
||||
model_api = ModelApi(
|
||||
portfolio_id=body.portfolio_id,
|
||||
|
|
@ -749,7 +685,7 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
input_properties=input_properties,
|
||||
session=session,
|
||||
google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY,
|
||||
solar_materials=[m for m in materials if m["type"] == "solar_pv"]
|
||||
solar_materials=[m for m in materials if m["type"] == "solar_pv"],
|
||||
)
|
||||
|
||||
input_properties = GoogleSolarApi.unit_solar_analysis(
|
||||
|
|
@ -759,18 +695,31 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
body=body,
|
||||
solar_materials=[m for m in materials if m["type"] == "solar_pv"],
|
||||
google_solar_api_key=get_settings().GOOGLE_SOLAR_API_KEY,
|
||||
inspections_map=inspections_map
|
||||
)
|
||||
|
||||
# We also make a tweak - if the property has been flagged for solar but doesn't contain
|
||||
# any panel performance, we ensure that we have a 3kWp and 4kWp option for the property
|
||||
|
||||
logger.info("Identifying property recommendations")
|
||||
recommendations = {}
|
||||
recommendations_scoring_data = []
|
||||
representative_recommendations = {}
|
||||
for p in tqdm(input_properties):
|
||||
# We set the ECO package data, if we have it
|
||||
property_eco_package = eco_packages.get(p.id, (None, None, None))
|
||||
if property_eco_package[0] is not None:
|
||||
inclusions = property_eco_package[0]
|
||||
exclusions = []
|
||||
else:
|
||||
inclusions = body.inclusions
|
||||
exclusions = body.exclusions
|
||||
|
||||
recommender = Recommendations(
|
||||
property_instance=p,
|
||||
materials=materials,
|
||||
exclusions=body.exclusions,
|
||||
inclusions=body.inclusions,
|
||||
exclusions=exclusions,
|
||||
inclusions=inclusions,
|
||||
default_u_values=body.default_u_values
|
||||
)
|
||||
property_recommendations, property_representative_recommendations = recommender.recommend()
|
||||
|
|
@ -788,7 +737,6 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
|
||||
recommendations_scoring_data.extend(p.recommendations_scoring_data)
|
||||
|
||||
# TODO: Make sure that number_habitable_rooms has been dropped
|
||||
logger.info("Preparing data for scoring in sap change api")
|
||||
recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data)
|
||||
|
||||
|
|
@ -878,16 +826,16 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
fixed_gain = optimiser_functions.calculate_fixed_gain(
|
||||
property_required_measures, recommendations, p, needs_ventilation
|
||||
)
|
||||
gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain)
|
||||
gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain, eco_packages=eco_packages)
|
||||
|
||||
funding = Funding(
|
||||
tenure=body.housing_type,
|
||||
project_scores_matrix=project_scores_matrix,
|
||||
partial_project_scores_matrix=partial_project_scores_matrix,
|
||||
whlg_eligible_postcodes=whlg_eligible_postcodes,
|
||||
eco4_social_cavity_abs_rate=12.5,
|
||||
eco4_social_cavity_abs_rate=13,
|
||||
eco4_social_solid_abs_rate=17,
|
||||
eco4_private_cavity_abs_rate=12.5,
|
||||
eco4_private_cavity_abs_rate=13,
|
||||
eco4_private_solid_abs_rate=17,
|
||||
gbis_social_cavity_abs_rate=21,
|
||||
gbis_social_solid_abs_rate=25,
|
||||
|
|
@ -1025,8 +973,8 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
|
||||
funding.check_funding(
|
||||
measures=solution,
|
||||
starting_sap=p.data["current-energy-efficiency"],
|
||||
ending_sap=p.data["current-energy-efficiency"] + sum([x["gain"] for x in solution]),
|
||||
starting_sap=int(p.data["current-energy-efficiency"]),
|
||||
ending_sap=int(p.data["current-energy-efficiency"]) + sum([x["gain"] for x in solution]),
|
||||
floor_area=p.floor_area,
|
||||
mainheat_description=p.main_heating["clean_description"],
|
||||
heating_control_description=p.main_heating_controls["clean_description"],
|
||||
|
|
@ -1193,6 +1141,7 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
"valuation_increase_average": (
|
||||
valuations["average_increased_value"] - valuations["current_value"]
|
||||
),
|
||||
"plan_type": eco_packages.get(p.id, (None, None, None))[2]
|
||||
})
|
||||
|
||||
upload_recommendations(
|
||||
|
|
@ -1212,7 +1161,7 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
except Exception as e:
|
||||
# Rollback the session if an error occurs
|
||||
session.rollback()
|
||||
print("Failed i = %s" % str(i))
|
||||
logger.warning("Failed i = %s" % str(i))
|
||||
logger.error(f"An error occurred during batch starting at index {i}: {e}")
|
||||
logger.error(f"property is uprn {p.uprn} id {p.id} address {p.address}")
|
||||
|
||||
|
|
@ -1251,21 +1200,13 @@ async def model_engine(body: PlanTriggerRequest):
|
|||
session.commit()
|
||||
|
||||
except IntegrityError:
|
||||
logger.error("Database integrity error occurred", exc_info=True)
|
||||
session.rollback()
|
||||
return Response(status_code=500, content="Database integrity error.")
|
||||
return handle_error(session, "Database integrity error.", 500)
|
||||
except OperationalError:
|
||||
logger.error("Database operational error occurred", exc_info=True)
|
||||
session.rollback()
|
||||
return Response(status_code=500, content="Database operational error.")
|
||||
return handle_error(session, "Database operational error.", 500)
|
||||
except ValueError:
|
||||
logger.error("Value error - possibly due to malformed data", exc_info=True)
|
||||
session.rollback()
|
||||
return Response(status_code=400, content="Bad request: malformed data.")
|
||||
return handle_error(session, "Bad request: malformed data.", 400)
|
||||
except Exception as e: # General exception handling
|
||||
logger.error(f"An error occurred: {e}")
|
||||
session.rollback()
|
||||
return Response(status_code=500, content="An unexpected error occurred.")
|
||||
return handle_error(session, "An unexpected error occurred.", 500)
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
|
|
|||
|
|
@ -380,7 +380,7 @@ class EPCRecord:
|
|||
df.columns = [x.upper().replace("-", "_") for x in df.columns]
|
||||
|
||||
if replace_empty_string:
|
||||
df = df.replace("", np.nan)
|
||||
df = df.replace("", np.nan).infer_objects(copy=False)
|
||||
|
||||
return df
|
||||
|
||||
|
|
|
|||
|
|
@ -3,109 +3,126 @@ import pandas as pd
|
|||
import time
|
||||
from stealth_requests import StealthSession
|
||||
import random
|
||||
import os
|
||||
from multiprocessing import Pool
|
||||
from tqdm import tqdm
|
||||
|
||||
ENGINES = ["safari", "chrome"]
|
||||
CACHE_DIR = "zoopla_cache"
|
||||
os.makedirs(CACHE_DIR, exist_ok=True)
|
||||
|
||||
|
||||
def random_delay():
|
||||
"""Pause randomly between requests (0.5–2 s)."""
|
||||
time.sleep(random.uniform(0.5, 2))
|
||||
|
||||
|
||||
def scrape_all_estimates(session, url):
|
||||
# Rotate impersonation per request
|
||||
resp = session.get(url, impersonate=ENGINES[random.randint(0, 1)])
|
||||
"""Scrape valuation estimates for one Zoopla property URL."""
|
||||
resp = session.get(url, impersonate=random.choice(ENGINES))
|
||||
page_source = BeautifulSoup(resp.text, "html.parser")
|
||||
estimates = page_source.find_all("div", {"data-testid": "sale-estimate"})
|
||||
is_blocked = len(estimates) == 0
|
||||
return estimates, is_blocked
|
||||
return estimates, is_blocked, resp.text
|
||||
|
||||
|
||||
def extract_estimates(estimates):
|
||||
"""Extract low, mid, and high estimates from parsed HTML."""
|
||||
est = estimates[0]
|
||||
low = est.find("span", {"data-testid": "low-estimate-blurred"}).text
|
||||
mid = est.find("p", {"data-testid": "estimate-blurred"}).text
|
||||
high = est.find("span", {"data-testid": "high-estimate-blurred"}).text
|
||||
return low, mid, high
|
||||
|
||||
|
||||
def cache_path_for_url(url):
|
||||
"""Return a deterministic local cache path for a URL."""
|
||||
uprn = url.split("/")[-2]
|
||||
return os.path.join(CACHE_DIR, f"{uprn}.html")
|
||||
|
||||
|
||||
def parallel_task(url):
|
||||
# No impersonate argument here
|
||||
"""Main worker function executed in each process."""
|
||||
cache_path = cache_path_for_url(url)
|
||||
|
||||
# Use cached file if it exists
|
||||
if os.path.exists(cache_path):
|
||||
html = open(cache_path, "r").read()
|
||||
page_source = BeautifulSoup(html, "html.parser")
|
||||
estimates = page_source.find_all("div", {"data-testid": "sale-estimate"})
|
||||
if estimates:
|
||||
low, mid, high = extract_estimates(estimates)
|
||||
return {"URL": url, "Low Estimate": low, "Middle Estimate": mid, "High Estimate": high}
|
||||
|
||||
# Otherwise scrape live
|
||||
with StealthSession() as session:
|
||||
estimates, is_blocked = scrape_all_estimates(session, url)
|
||||
attempts = 0
|
||||
while attempts < 5:
|
||||
estimates, is_blocked, html = scrape_all_estimates(session, url)
|
||||
if not is_blocked and estimates:
|
||||
open(cache_path, "w").write(html)
|
||||
low, mid, high = extract_estimates(estimates)
|
||||
return {"URL": url, "Low Estimate": low, "Middle Estimate": mid, "High Estimate": high}
|
||||
attempts += 1
|
||||
print(f"[Attempt {attempts}] Blocked or empty for {url}")
|
||||
random_delay()
|
||||
|
||||
while is_blocked:
|
||||
print(f"Blocked by Zoopla for URL: {url}")
|
||||
time.sleep(random.uniform(0, 1))
|
||||
estimates, is_blocked = scrape_all_estimates(session, url)
|
||||
|
||||
low_estimate = estimates[0].find("span", {"data-testid": "low-estimate-blurred"}).text
|
||||
middle_estimate = estimates[0].find("p", {"data-testid": "estimate-blurred"}).text
|
||||
high_estimate = estimates[0].find("span", {"data-testid": "high-estimate-blurred"}).text
|
||||
|
||||
return {
|
||||
"URL": url,
|
||||
"Low Estimate": low_estimate,
|
||||
"Middle Estimate": middle_estimate,
|
||||
"High Estimate": high_estimate,
|
||||
}
|
||||
# If still blocked, return placeholders
|
||||
return {"URL": url, "Low Estimate": None, "Middle Estimate": None, "High Estimate": None}
|
||||
|
||||
|
||||
def parse_price(p):
|
||||
if p is None:
|
||||
return None
|
||||
|
||||
p = p.replace("£", "").strip().lower()
|
||||
if not p:
|
||||
return None
|
||||
if p.endswith("k"):
|
||||
return float(p[:-1]) * 1000
|
||||
return float(p[:-1]) * 1_000
|
||||
elif p.endswith("m"):
|
||||
return float(p[:-1]) * 1_000_000
|
||||
else:
|
||||
return float(p)
|
||||
|
||||
|
||||
# def parallel_task(url):
|
||||
# with StealthSession(impersonate=ENGINES[random.randint(0, 1)]) as session:
|
||||
# estimates, is_blocked = scrape_all_estimates(session, url)
|
||||
#
|
||||
# while is_blocked:
|
||||
# # Will need to wait and retry if blocked by Zoopla
|
||||
# print(f"Blocked by Zoopla for URL: {url}")
|
||||
# sleep_factor = random.uniform(0, 1) # Random delay to avoid detection
|
||||
# time.sleep(sleep_factor * 1)
|
||||
# estimates, is_blocked = scrape_all_estimates(session, url)
|
||||
#
|
||||
# low_estimate = (
|
||||
# estimates[0].find("span", {"data-testid": "low-estimate-blurred"}).text
|
||||
# ) # Find all span elements with data-testid="low-estimate"
|
||||
# middle_estimate = (
|
||||
# estimates[0].find("p", {"data-testid": "estimate-blurred"}).text
|
||||
# ) # Find all span elements with data-testid="middle-estimate"
|
||||
# high_estimate = (
|
||||
# estimates[0].find("span", {"data-testid": "high-estimate-blurred"}).text
|
||||
# ) # Find all span elements with data-testid="high-estimate-blurred"
|
||||
#
|
||||
# return {
|
||||
# "URL": url,
|
||||
# "Low Estimate": low_estimate,
|
||||
# "Middle Estimate": middle_estimate,
|
||||
# "High Estimate": high_estimate,
|
||||
# }
|
||||
try:
|
||||
return float(p.replace(",", ""))
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Get a SAL
|
||||
# Load portfolio
|
||||
asset_list = pd.read_excel(
|
||||
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/NRLA/Property Box/Property Box Finance Portfolio - "
|
||||
"Standardised.xlsx",
|
||||
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/sfr/October 2025 AL portfolio/22.10 AL Portfolio - "
|
||||
"Standardised - partial UPRN fill.xlsx",
|
||||
sheet_name="Standardised Asset List"
|
||||
)
|
||||
asset_list = asset_list[~pd.isnull(asset_list["epc_os_uprn"])]
|
||||
asset_list["epc_os_uprn"] = asset_list["epc_os_uprn"].astype(int).astype(str)
|
||||
uprns = asset_list["epc_os_uprn"].tolist()
|
||||
urls = [f"https://www.zoopla.co.uk/property/uprn/{uprn}/" for uprn in uprns]
|
||||
|
||||
with Pool(processes=5) as pool:
|
||||
# Limit concurrency to avoid blocks
|
||||
with Pool(processes=2) as pool: # fewer processes = fewer fingerprints
|
||||
estimates_list = list(
|
||||
tqdm(
|
||||
pool.imap(parallel_task, urls),
|
||||
total=len(urls),
|
||||
)
|
||||
tqdm(pool.imap(parallel_task, urls), total=len(urls))
|
||||
)
|
||||
|
||||
df = pd.DataFrame(estimates_list)
|
||||
# Extract UPRN from URL
|
||||
df["uprn"] = df["URL"].str.extract(r"uprn/(\d+)/")
|
||||
df["valuation"] = df["Middle Estimate"].apply(parse_price)
|
||||
|
||||
df.to_csv("zoopla_estimates.csv", index=False)
|
||||
|
||||
df["uprn"] = df["uprn"].astype(int).astype(str)
|
||||
|
||||
asset_list.merge(df[["uprn", "valuation"]], left_on="epc_os_uprn", right_on="uprn", how="left").to_excel(
|
||||
"Property Box Finance Portfolio - Standardised - with valuations.xlsx", index=False
|
||||
# Merge with asset list
|
||||
merged = asset_list.merge(
|
||||
df[["uprn", "valuation"]],
|
||||
left_on="epc_os_uprn",
|
||||
right_on="uprn",
|
||||
how="left"
|
||||
)
|
||||
merged.to_excel(
|
||||
"20251029 AL Portfolio - Standardised - with valuations.xlsx",
|
||||
index=False
|
||||
)
|
||||
|
||||
print("Done. Results saved.")
|
||||
|
|
|
|||
|
|
@ -416,7 +416,9 @@ def optimise_with_funding_paths(p, input_measures, housing_type, funding: Fundin
|
|||
"total_gain": total_gain,
|
||||
"path": path_spec,
|
||||
"scheme": scheme,
|
||||
"is_eligible": _is_eligible_funding_package(scheme, p.data["current-energy-efficiency"], total_gain),
|
||||
"is_eligible": _is_eligible_funding_package(
|
||||
scheme, int(p.data["current-energy-efficiency"]), total_gain
|
||||
),
|
||||
"unfunded_items": unfunded_picked,
|
||||
})
|
||||
|
||||
|
|
@ -427,11 +429,12 @@ def optimise_with_funding_paths(p, input_measures, housing_type, funding: Fundin
|
|||
solutions["meets_upgrade_target"] = solutions["total_gain"] >= target_gain - 0.1
|
||||
|
||||
# If we have packages that are fundable, but do not meet the upgrade target, we can run a final optimisation pass
|
||||
if not solutions[solutions["is_eligible"] & ~solutions["meets_upgrade_target"]].empty:
|
||||
logger.info("We have some packages that are fundable but do not meet the target gain")
|
||||
# Turned off logging - too noisy
|
||||
# if not solutions[solutions["is_eligible"] & ~solutions["meets_upgrade_target"]].empty:
|
||||
# logger.info("We have some packages that are fundable but do not meet the target gain")
|
||||
|
||||
# We now can calculate the project ABS, which subtracts from the cost, but this is only relevant for ECO4
|
||||
solutions["starting_sap"] = p.data["current-energy-efficiency"]
|
||||
solutions["starting_sap"] = int(p.data["current-energy-efficiency"])
|
||||
solutions["floor_area"] = p.floor_area
|
||||
solutions["ending_sap"] = solutions["starting_sap"] + solutions["total_gain"]
|
||||
solutions["starting_band"] = solutions["starting_sap"].apply(funding.get_sap_band)
|
||||
|
|
|
|||
|
|
@ -176,7 +176,8 @@ def calculate_fixed_gain(property_required_measures, recommendations, p, needs_v
|
|||
return fixed_gain
|
||||
|
||||
|
||||
def calculate_gain(body: PlanTriggerRequest, p: Property, fixed_gain: float) -> float | None:
|
||||
def calculate_gain(body: PlanTriggerRequest, p: Property, fixed_gain: float,
|
||||
eco_packages: None | dict = None) -> float | None:
|
||||
"""
|
||||
Calculates the target gain value for optimisation based on the goal.
|
||||
|
||||
|
|
@ -193,6 +194,7 @@ def calculate_gain(body: PlanTriggerRequest, p: Property, fixed_gain: float) ->
|
|||
Property object with EPC data (must have p.data["current-energy-efficiency"]).
|
||||
fixed_gain : float
|
||||
Total fixed gain from required measures (returned by calculate_fixed_gain).
|
||||
eco_packages : dict, optional
|
||||
|
||||
Returns
|
||||
-------
|
||||
|
|
@ -201,8 +203,14 @@ def calculate_gain(body: PlanTriggerRequest, p: Property, fixed_gain: float) ->
|
|||
"""
|
||||
if body.goal == "Increasing EPC":
|
||||
current_sap = int(p.data["current-energy-efficiency"])
|
||||
|
||||
target_sap = (
|
||||
eco_packages.get(p.id)[1] if eco_packages.get(p.id)[1] is not None
|
||||
else epc_to_sap_lower_bound(body.goal_value)
|
||||
)
|
||||
|
||||
gain = CostOptimiser.calculate_sap_gain_with_slack(
|
||||
epc_to_sap_lower_bound(body.goal_value) - current_sap
|
||||
target_sap - current_sap
|
||||
) - fixed_gain
|
||||
if body.simulate_sap_10:
|
||||
gain += 3
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue