Merge pull request #558 from Hestia-Homes/eco-eligiblity-bug

Implemented tasks and subtasks into engine + Adding EPC caching
This commit is contained in:
KhalimCK 2025-11-27 19:27:38 +08:00 committed by GitHub
commit 8da9f27c92
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 676 additions and 1501 deletions

File diff suppressed because it is too large Load diff

View file

@ -2,7 +2,6 @@ import time
import random
import pandas as pd
from adhoc.investigation import newest_epc
from backend.SearchEpc import SearchEpc
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from tqdm import tqdm

View file

@ -917,7 +917,7 @@ class SearchEpc:
return agg[key].values[0]
def find_property(self, skip_os=False):
def find_property(self, skip_os=False, api_data=None):
"""
This method will attempt to identify a property. It will, at first, use the EPC api to try and
find the EPC for the property and the associated UPRN. If this fails, it will use the Ordnance Survey API to
@ -928,10 +928,17 @@ class SearchEpc:
as a final check to see if there is any EPC data.
If there is no EPC data, the epc data will be estimated based on the surrounding properties
:param skip_os: If True, the ordnance survey api will be skipped and only the EPC api will be used
:param api_data: If provided, this data will be used instead of querying the EPC api
"""
# Step 1: use the epc api to find the property and uprn
response = self.get_epc()
if api_data:
self.data = api_data
response = {"status": 200}
else:
response = self.get_epc()
if response["status"] == 200:
(

View file

@ -3,7 +3,6 @@ from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
API_KEY: str
API_KEY_NAME: str = "X-API-KEY"
@ -43,7 +42,8 @@ class Settings(BaseSettings):
AWS_DEFAULT_REGION: Optional[str] = None
class Config:
env_file = "backend.env"
env_file = "backend/.env"
@lru_cache()
def get_settings():

View file

@ -14,6 +14,7 @@ db_string = connection_string.format(
db_engine = create_engine(db_string, pool_size=5, max_overflow=5)
def get_db_session():
if db_engine is None:
raise RuntimeError("Database is not configured. Set DATABASE_URL in environment variables.")

View file

@ -0,0 +1,12 @@
from .epc_functions import *
from .address_functions import *
from .portfolio_functions import *
from .energy_assessment_functions import *
from .property_functions import *
from .recommendations_functions import *
from .solar_functions import *
from .funding_functions import *
from .materials_functions import *
from .inspections_functions import *
from .non_intrusive_surveys import *
from .whlg_functions import *

View file

@ -0,0 +1,125 @@
from datetime import datetime, timedelta, timezone
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from backend.app.db.models.epc import EpcStore
class EpcStoreService:
"""
Service layer for EPC data lookup and persistence.
"""
FRESHNESS_DAYS = 30
# status labels
FRESH = "fresh"
EXPIRED = "expired"
MISSING = "missing"
@classmethod
def get_epc_for_uprn(cls, session: Session, uprn: int):
"""
Query EPC data for a given UPRN and return a dict describing:
- epc_api: only if within last 30 days
- epc_page: only if epc_api exists
- status: 'fresh', 'expired', or 'missing'
"""
record = session.query(EpcStore).filter(EpcStore.uprn == uprn).first()
if not record:
return {"status": cls.MISSING, "epc_api": None, "epc_page": None}
if not record.epc_api_created_at:
# API data missing → treat as missing even if page data exists
return {"status": cls.MISSING, "epc_api": None, "epc_page": None}
# check freshness
cutoff = datetime.now(timezone.utc) - timedelta(days=EpcStoreService.FRESHNESS_DAYS)
if record.epc_api_created_at.date() < cutoff.date():
return {"status": cls.EXPIRED, "epc_api": None, "epc_page": None}
# Fresh API → include page only if present
return {
"status": cls.FRESH,
"epc_api": record.epc_api,
"epc_page": record.epc_page if record.epc_page else None,
"epc_page_rrn": record.epc_page_rrn,
"epc_api_created_at": record.epc_api_created_at,
"epc_page_created_at": record.epc_page_created_at,
}
@classmethod
def check_insert_needed(cls, epc_cache, epc_estimated, uprn):
"""
Check if an insert is needed based on existing data.
:return:
"""
no_existing_epc_cache = epc_cache.get("epc_api") is None
existing_cache_expired = (
epc_cache.get("status") == cls.EXPIRED
)
needs_insert = bool((no_existing_epc_cache or existing_cache_expired) and not epc_estimated and uprn)
return needs_insert
@staticmethod
def upsert_epc_data(
session: Session,
uprn: int,
epc_api: dict | None,
epc_page: str | None,
epc_page_rrn: str | None,
epc_api_created_at: datetime | None = None,
epc_page_created_at: datetime | None = None,
):
"""
Insert or update EPC data for a UPRN.
Rules:
- If record exists update it
- If record does not exist create new
"""
try:
record = session.query(EpcStore).filter(EpcStore.uprn == uprn).first()
if record:
# update path
if epc_api is not None:
record.epc_api = epc_api
if epc_api_created_at is None:
epc_api_created_at = datetime.now(timezone.utc)
record.epc_api_created_at = epc_api_created_at
# update page data only if BOTH:
# 1) the caller passed page data
# 2) epc_api is not None (page only allowed when API exists)
if epc_page is not None and epc_api is not None:
record.epc_page = epc_page
record.epc_page_rrn = epc_page_rrn
if epc_page_created_at is None:
epc_page_created_at = datetime.now(timezone.utc)
record.epc_page_created_at = epc_page_created_at
else:
# insert path
record = EpcStore(
uprn=uprn,
epc_api=epc_api,
epc_api_created_at=epc_api_created_at,
epc_page=epc_page if epc_api is not None else None,
epc_page_rrn=epc_page_rrn if epc_api is not None else None,
epc_page_created_at=epc_page_created_at if epc_api is not None else None,
)
session.add(record)
session.flush()
session.commit()
return record
except SQLAlchemyError as e:
session.rollback()
raise e

View file

@ -1,5 +1,3 @@
from __future__ import annotations
# ---- Standard Library ----
from typing import Optional, Dict, Any
from datetime import datetime, timezone
@ -27,20 +25,22 @@ class SubTaskInterface:
# --------------------------------------------------------
# CREATE SUBTASK
# --------------------------------------------------------
def create_subtask(self, task_id: UUID, inputs: Optional[Dict[str, Any]] = None):
now = datetime.now(timezone.utc)
def create_subtask(self, task_id: UUID, inputs: Optional[Dict[str, Any]] = None, status=None):
with get_db_session() as session:
task = session.get(Task, task_id)
if not task:
raise ValueError(f"Task {task_id} not found")
# We treat waiting as the default status
status = "waiting" if status is None else status
subtask = SubTask(
taskId=task_id,
task_id=task_id,
inputs=json.dumps(inputs) if inputs else None,
status="waiting",
jobStarted=None,
jobCompleted=None,
status=status,
job_started=None,
job_completed=None,
)
session.add(subtask)
@ -49,12 +49,21 @@ class SubTaskInterface:
# Recalculate parent task progress
self._update_task_progress(session, task_id)
return subtask
return subtask.id
# --------------------------------------------------------
# UPDATE STATUS (in progress, complete, failed)
# --------------------------------------------------------
def update_subtask_status(self, subtask_id: UUID, status: str):
def update_subtask_status(
self, subtask_id: UUID, status: str, outputs=None
):
"""
Update the status of a subtask, and recalculate the parent task progress.
:param subtask_id: UUID of the subtask to update
:param status: New status (in progress, complete, failed)
:param outputs: Optional outputs to set
:return:
"""
now = datetime.now(timezone.utc)
with get_db_session() as session:
@ -65,21 +74,23 @@ class SubTaskInterface:
normalized = status.lower()
# When job really starts
if normalized == "in progress" and subtask.jobStarted is None:
subtask.jobStarted = now
if normalized == "in progress" and subtask.job_started is None:
subtask.job_started = now
# Completed or failed
if normalized in ("complete", "failed"):
subtask.jobCompleted = now
subtask.job_completed = now
subtask.status = normalized
subtask.updatedAt = now
subtask.updated_at = now
if outputs is not None:
subtask.outputs = json.dumps(outputs)
session.add(subtask)
session.commit()
# Recalculate task status
self._update_task_progress(session, subtask.taskId)
self._update_task_progress(session, subtask.task_id)
session.refresh(subtask)
return subtask
@ -87,7 +98,8 @@ class SubTaskInterface:
# --------------------------------------------------------
# UPDATE OUTPUTS
# --------------------------------------------------------
def update_subtask_output(self, subtask_id: UUID, outputs: Dict[str, Any]):
@staticmethod
def update_subtask_output(subtask_id: UUID, outputs: Dict[str, Any]):
now = datetime.now(timezone.utc)
with get_db_session() as session:
@ -96,7 +108,7 @@ class SubTaskInterface:
raise ValueError(f"SubTask {subtask_id} not found")
subtask.outputs = json.dumps(outputs)
subtask.updatedAt = now
subtask.updated_at = now
session.add(subtask)
session.commit()
@ -106,7 +118,8 @@ class SubTaskInterface:
# --------------------------------------------------------
# UPDATE CLOUD LOGS URL
# --------------------------------------------------------
def update_subtask_logs(self, subtask_id: UUID, cloud_logs_url: str):
@staticmethod
def update_subtask_logs(subtask_id: UUID, cloud_logs_url: str):
now = datetime.now(timezone.utc)
with get_db_session() as session:
@ -114,8 +127,8 @@ class SubTaskInterface:
if not subtask:
raise ValueError(f"SubTask {subtask_id} not found")
subtask.cloudLogsURL = cloud_logs_url
subtask.updatedAt = now
subtask.cloud_logs_url = cloud_logs_url
subtask.updated_at = now
session.add(subtask)
session.commit()
@ -125,8 +138,8 @@ class SubTaskInterface:
# --------------------------------------------------------
# SET BOTH OUTPUT + LOGS
# --------------------------------------------------------
@staticmethod
def set_subtask_result(
self,
subtask_id: UUID,
outputs: Optional[Dict[str, Any]] = None,
cloud_logs_url: Optional[str] = None,
@ -142,9 +155,9 @@ class SubTaskInterface:
subtask.outputs = json.dumps(outputs)
if cloud_logs_url is not None:
subtask.cloudLogsURL = cloud_logs_url
subtask.cloud_logs_url = cloud_logs_url
subtask.updatedAt = now
subtask.updated_at = now
session.add(subtask)
session.commit()
session.refresh(subtask)
@ -153,13 +166,14 @@ class SubTaskInterface:
# --------------------------------------------------------
# TASK PROGRESS CALCULATION
# --------------------------------------------------------
def _update_task_progress(self, session: Session, task_id: UUID):
@staticmethod
def _update_task_progress(session: Session, task_id: UUID):
task = session.get(Task, task_id)
if not task:
return
subtasks = session.exec(
select(SubTask).where(SubTask.taskId == task_id)
select(SubTask).where(SubTask.task_id == task_id)
).all()
statuses = [s.status.lower() for s in subtasks]
@ -167,24 +181,24 @@ class SubTaskInterface:
if "failed" in statuses:
task.status = "failed"
task.jobCompleted = now
task.job_completed = now
elif all(s == "complete" for s in statuses):
task.status = "complete"
task.jobCompleted = now
task.job_completed = now
elif "in progress" in statuses:
task.status = "in progress"
if task.jobStarted is None:
task.jobStarted = now
if task.job_started is None:
task.job_started = now
else:
# All waiting
task.status = "waiting"
task.jobStarted = None
task.jobCompleted = None
task.job_started = None
task.job_completed = None
task.updatedAt = now
task.updated_at = now
session.add(task)
session.commit()
@ -212,18 +226,18 @@ class SubTaskInterface:
# Set logs
if cloud_logs_url is not None:
subtask.cloudLogsURL = cloud_logs_url
subtask.cloud_logs_url = cloud_logs_url
# Status + timestamps
subtask.status = normalized
subtask.jobCompleted = now
subtask.updatedAt = now
subtask.job_completed = now
subtask.updated_at = now
session.add(subtask)
session.commit()
# Update parent task (complete/failed)
self._update_task_progress(session, subtask.taskId)
self._update_task_progress(session, subtask.task_id)
session.refresh(subtask)
return subtask
@ -237,38 +251,49 @@ class TasksInterface:
High-level operations for Task records.
"""
@staticmethod
def create_task(
self,
*,
task_source: str,
service: Optional[str] = None,
inputs: Optional[Dict[str, Any]] = None,
task_only: bool = False,
):
now = datetime.now(timezone.utc)
"""
Create a new Task record, and an initial SubTask in waiting state. Can also be used to create just
a task, without a subtask
:param task_source: Text indicating source of task creation (e.g. file path + function name)
:param service: Optional service name
:param inputs: Inputs of the job being run
:param task_only: If True, only create the Task record, without a SubTask
:return:
"""
with get_db_session() as session:
task = Task(
taskSource=task_source,
task_source=task_source,
service=service,
status="waiting",
jobStarted=None,
jobCompleted=None,
job_started=None,
job_completed=None,
)
session.add(task)
session.commit()
session.refresh(task)
if task_only:
return task.id, None
# Create first subtask in waiting state
subtask_interface = SubTaskInterface()
subtask = subtask_interface.create_subtask(
subtask_id = subtask_interface.create_subtask(
task_id=task.id,
inputs=inputs,
)
return task.id, subtask.id
return task.id, subtask_id
def update_task_status(self, task_id: UUID, status: str):
@staticmethod
def update_task_status(task_id: UUID, status: str):
now = datetime.now(timezone.utc)
with get_db_session() as session:
@ -278,14 +303,14 @@ class TasksInterface:
normalized = status.lower()
if normalized == "in progress" and task.jobStarted is None:
task.jobStarted = now
if normalized == "in progress" and task.job_started is None:
task.job_started = now
if normalized == "complete":
task.jobCompleted = now
task.job_completed = now
task.status = normalized
task.updatedAt = now
task.updated_at = now
session.add(task)
session.commit()

View file

@ -0,0 +1,29 @@
from sqlalchemy import (
Column,
Integer,
String,
JSON,
TIMESTAMP,
UniqueConstraint,
)
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class EpcStore(Base):
"""
Stores EPC data retrieved from the EPC API and EPC web pages.
"""
__tablename__ = "epc_store"
id = Column(Integer, primary_key=True, autoincrement=True)
uprn = Column(Integer)
epc_api_created_at = Column(TIMESTAMP(timezone=False))
epc_api = Column(JSON, nullable=False)
epc_page_created_at = Column(TIMESTAMP(timezone=False))
epc_page = Column(String)
epc_page_rrn = Column(String)
def __repr__(self):
return f"<EpcStore(id={self.id}, uprn='{self.uprn}')>"

View file

@ -20,6 +20,7 @@ class MaterialType(enum.Enum):
room_roof_insulation = "room_roof_insulation"
windows_glazing = "windows_glazing"
secondary_glazing = "secondary_glazing"
double_glazing = "double_glazing"
cavity_wall_extraction = "cavity_wall_extraction"
iwi_wall_demolition = "iwi_wall_demolition"

View file

@ -4,6 +4,7 @@ import datetime
from sqlalchemy import Column, Integer, Text, Boolean, Float, DateTime, Enum, ForeignKey, CheckConstraint
from sqlalchemy.ext.declarative import declarative_base
from backend.app.db.models.users import UserModel # noqa
from backend.app.db.models.materials import MaterialType
Base = declarative_base()
@ -225,3 +226,18 @@ class PortfolioUsers(Base):
role = Column(Text, nullable=False)
created_at = Column(DateTime, nullable=False, default=datetime.datetime.now(pytz.utc))
updated_at = Column(DateTime, nullable=False, default=datetime.datetime.now(pytz.utc))
class PropertyInstalledMeasures(Base):
"""
This model keeps a record of the installed measures for each property, at the UPRN level
"""
__tablename__ = 'property_installed_measures'
id = Column(Integer, primary_key=True, autoincrement=True)
uprn = Column(Integer, nullable=False)
measure_type = Column(
Enum(MaterialType, values_callable=lambda x: [e.value for e in x], create_constraint=False),
nullable=False
)
created_at = Column(DateTime, nullable=False, default=datetime.datetime.now(pytz.utc))
installed_at = Column(DateTime, nullable=False, default=datetime.datetime.now(pytz.utc))

View file

@ -1,6 +1,4 @@
from __future__ import annotations
from typing import Optional, List
from typing import Optional
from datetime import datetime
from uuid import UUID, uuid4
@ -10,64 +8,29 @@ from sqlmodel import SQLModel, Field, Relationship
class Task(SQLModel, table=True):
__tablename__ = "tasks"
id: UUID = Field(
default_factory=uuid4,
primary_key=True,
index=True,
)
taskSource: str = Field(alias="task_source")
jobStarted: Optional[datetime] = Field(
default=None, alias="job_started"
)
jobCompleted: Optional[datetime] = Field(
default=None, alias="job_completed"
)
id: UUID = Field(default_factory=uuid4, primary_key=True, index=True, )
task_source: str
job_started: Optional[datetime] = None
job_completed: Optional[datetime] = None
status: str = Field(default="In Progress")
service: Optional[str] = None
updated_at: datetime = Field(default_factory=datetime.utcnow)
updatedAt: datetime = Field(
default_factory=datetime.utcnow,
alias="updated_at",
)
# Relationship
subTasks: List["SubTask"] = Relationship(back_populates="task")
sub_tasks: list["SubTask"] = Relationship(back_populates="task")
class SubTask(SQLModel, table=True):
__tablename__ = "sub_task"
id: UUID = Field(
default_factory=uuid4,
primary_key=True,
index=True,
)
taskId: UUID = Field(
foreign_key="tasks.id",
alias="task_id",
)
jobStarted: Optional[datetime] = Field(
default=None, alias="job_started"
)
jobCompleted: Optional[datetime] = Field(
default=None, alias="job_completed"
)
id: UUID = Field(default_factory=uuid4, primary_key=True, index=True, )
task_id: UUID = Field(foreign_key="tasks.id")
job_started: Optional[datetime] = None
job_completed: Optional[datetime] = None
status: str = Field(default="In Progress")
inputs: Optional[str] = None
outputs: Optional[str] = None
cloudLogsURL: Optional[str] = Field(alias="cloud_logs_url")
cloud_logs_url: Optional[str] = None
updated_at: datetime = Field(default_factory=datetime.utcnow)
updatedAt: datetime = Field(
default_factory=datetime.utcnow,
alias="updated_at",
)
# Relationship
task: Optional[Task] = Relationship(back_populates="subTasks")
task: Optional["Task"] = Relationship(back_populates="sub_tasks")

View file

@ -15,6 +15,7 @@ from utils.logger import setup_logger
from backend.app.db.connection import db_engine
from backend.app.db.functions.recommendations_functions import create_scenario
from backend.app.db.functions.tasks.Tasks import TasksInterface, SubTaskInterface
logger = setup_logger()
@ -81,12 +82,34 @@ async def trigger_plan_entrypoint(body: PlanTriggerRequest):
# Insert the scenario ID into the data payload
data["scenario_id"] = scenario_id
# Create a main task
task_id, _ = TasksInterface.create_task(
task_source="backend/plan/router.py:trigger_plan_entrypoint",
service="plan_engine",
inputs=data,
task_only=True
)
subtask_interface = SubTaskInterface()
for i in range(total_chunks):
# Create an entry in the request logs table
index_start = i * chunk_size
index_end = min((i + 1) * chunk_size, total_rows)
message_payload = {**data, "index_start": index_start, "index_end": index_end}
message_payload = {
**data, "index_start": index_start, "index_end": index_end,
}
# Create a subtask for this chunk
subtask_id = subtask_interface.create_subtask(
task_id=task_id,
inputs=message_payload
)
# Add task and subtask to message
message_payload["task_id"] = str(task_id)
message_payload["subtask_id"] = str(subtask_id)
message_body = json.dumps(message_payload)
response = sqs_client.send_message(

View file

@ -129,6 +129,10 @@ class PlanTriggerRequest(BaseModel):
index_start: Optional[int] = None
index_end: Optional[int] = None
# Task and subtask IDs
task_id: Optional[str] = None
subtask_id: Optional[str] = None
@model_validator(mode="after")
def check_indexes(self):
if (self.index_start is None) != (self.index_end is None):

View file

@ -1,8 +1,10 @@
import msgpack
from uuid import UUID
from typing import Any
from utils.s3 import read_from_s3
from backend.app.config import get_settings
from backend.app.plan.data_classes import PropertyRequestData
from typing import Any
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
from starlette.responses import Response
from utils.logger import setup_logger
@ -211,8 +213,13 @@ def parse_eco_packages(config: dict[str, Any], prepared_epc) -> tuple[list[str],
return measures, mapped["target_sap"], mapped["plan_type"], already_installed
def handle_error(session, msg, status=500):
def handle_error(session, msg, e, subtask_id, status=500):
# When the pipeline fails, handles error process
SubTaskInterface().update_subtask_status(
subtask_id=UUID(subtask_id),
status="failed",
outputs=str(e)
)
logger.error(msg, exc_info=True)
session.rollback()
return Response(status_code=status, content=msg)

View file

@ -5,26 +5,20 @@ from datetime import datetime
from tqdm import tqdm
import pandas as pd
import numpy as np
from etl.epc.Record import EPCRecord
from uuid import UUID
from backend.Funding import Funding
from backend.SearchEpc import SearchEpc
from etl.epc.Record import EPCRecord
from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.orm import sessionmaker
from starlette.responses import Response
from backend.app.config import get_settings, get_prediction_buckets
from backend.app.db.connection import db_engine
from backend.app.db.functions.materials_functions import get_materials
from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations
from backend.app.db.functions.property_functions import (
create_property_details_epc, create_property_targets, update_property_data,
update_or_create_property_spatial_details, ensure_property_exists
)
from backend.app.db.functions.recommendations_functions import (
create_plan, upload_recommendations, create_scenario
)
from backend.app.db.functions.funding_functions import upload_funding
from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn
from backend.app.db.functions.address_functions import get_associated_uprns
import backend.app.db.functions as db_funcs
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
from backend.app.db.models.portfolio import rating_lookup
from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES
@ -33,9 +27,6 @@ from backend.app.plan.utils import (
)
from backend.app.utils import sap_to_epc
import backend.app.assumptions as assumptions
from backend.app.db.functions.inspections_functions import (
extract_inspection_data, bulk_upsert_inspections_pg
)
from backend.ml_models.api import ModelApi
from backend.Property import Property
@ -45,18 +36,18 @@ from recommendations.optimiser.CostOptimiser import CostOptimiser
from recommendations.optimiser.GainOptimiser import GainOptimiser
import recommendations.optimiser.optimiser_functions as optimiser_functions
from recommendations.Recommendations import Recommendations
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3
from backend.ml_models.Valuation import PropertyValuation
from etl.bill_savings.KwhData import KwhData
from etl.spatial.OpenUprnClient import OpenUprnClient
from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc
from backend.Funding import Funding
from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths
from recommendations.recommendation_utils import convert_thickness_to_numeric, get_wall_u_value
from utils.logger import setup_logger
from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3
logger = setup_logger()
BATCH_SIZE = 5
@ -392,6 +383,26 @@ def parse_heating_system(config):
return None
def check_duplicate_uprns(plan_input):
"""
Simple function to check if the input data contains duplicated UPRNS.
If there are duplicates, an exception will be rasied
:return:
"""
# Check for duplicate UPRNS
input_uprns = [x.get("uprn") for x in plan_input if "uprn" in x and x.get("uprn")]
if input_uprns:
# Check for dupes
if len(input_uprns) != len(set(input_uprns)):
# Find the duplicate UPRNs
duplicates = set([x for x in input_uprns if input_uprns.count(x) > 1])
# de-dupe input_uprns
raise ValueError(f"Duplicate UPRNs in the input data: {duplicates}")
return True
async def model_engine(body: PlanTriggerRequest):
logger.info("Model Engine triggered with body: %s", json.loads(body.model_dump_json()))
@ -421,6 +432,9 @@ async def model_engine(body: PlanTriggerRequest):
)
# Where the EPC has been estimated, that is because a UPRN wasn't avaialble and so we remove UPRN
# This will be reflexted
if "estimated" not in plan_input.columns:
plan_input["estimated"] = False
plan_input["uprn"] = np.where(
plan_input["estimated"].isin([1, True]) & (
(plan_input["uprn"] < 0) | pd.isnull(plan_input["uprn"])
@ -480,16 +494,8 @@ async def model_engine(body: PlanTriggerRequest):
if body.index_start is not None and body.index_end is not None:
plan_input = plan_input[body.index_start:body.index_end]
# Check for duplicate UPRNS
input_uprns = [x.get("uprn") for x in plan_input if "uprn" in x and x.get("uprn")]
if input_uprns:
# Check for dupes
if len(input_uprns) != len(set(input_uprns)):
# Find the duplicate UPRNs
duplicates = set([x for x in input_uprns if input_uprns.count(x) > 1])
# de-dupe input_uprns
raise ValueError(f"Duplicate UPRNs in the input data: {duplicates}")
# Confirm no duplicate UPRNS
check_duplicate_uprns(plan_input)
# If we have patches or overrides, we should read them in here
patches, already_installed, non_invasive_recommendations, valuation_data = get_request_property_data(body)
@ -515,10 +521,18 @@ async def model_engine(body: PlanTriggerRequest):
if uprn:
uprn = int(float(uprn))
epc_api_data, epc_page, rrn, epc_cache = None, None, None, {}
if uprn:
# if we have a UPRN, we check if we already have EPC data associated with this UPRN
epc_cache = db_funcs.epc_functions.EpcStoreService.get_epc_for_uprn(session, uprn)
if epc_cache["status"] == db_funcs.epc_functions.EpcStoreService.FRESH:
epc_api_data, epc_page, rrn = epc_cache["epc_api"], epc_cache["epc_page"], epc_cache["epc_page_rrn"]
address1 = config.get("address", None)
# Handle domna address list format
if pd.isnull(address1) and body.file_format == "domna_asset_list":
address1 = config.get("domna_full_address", None)
address1 = config.get("domna_address_1", None)
address1 = str(int(address1)) if isinstance(address1, float) else str(address1)
full_address = config["domna_full_address"] if body.file_format == "domna_asset_list" else None
@ -528,7 +542,7 @@ async def model_engine(body: PlanTriggerRequest):
if (body.event_type == "remote_assessment") and config.get("property_type") == "Flat":
# We're running a remote assessment for a flat - we go and grab the associated
# UPRNS for other units in the same building
associated_uprns = get_associated_uprns(
associated_uprns = db_funcs.address_functions.get_associated_uprns(
session, postcode=config["postcode"], uprn=uprn
)
@ -545,16 +559,20 @@ async def model_engine(body: PlanTriggerRequest):
epc_searcher.ordnance_survey_client.built_form = config.get("built_form", None)
epc_searcher.ordnance_survey_client.property_type = config.get("property_type", None)
# For the moment, our OS API access is unavailable, so we skip and interpolate
epc_searcher.find_property(skip_os=True)
epc_searcher.find_property(skip_os=True, api_data=epc_api_data)
if epc_searcher.newest_epc.get("estimated") and body.file_format == "domna_asset_list" and (
epc_searcher.newest_epc["uprn"] < 0
):
epc_searcher.newest_epc["uprn-source"] = epc_searcher.UPRN_SOURCE_SIMULATED
# We check for an energy assessment we have performed on this property:
energy_assessment = get_latest_assessment_by_uprn(session, uprn if uprn is not None else epc_searcher.uprn)
energy_assessment = db_funcs.energy_assessment_functions.get_latest_assessment_by_uprn(
session, uprn if uprn is not None else epc_searcher.uprn
)
property_id, is_new = ensure_property_exists(
property_id, is_new = db_funcs.property_functions.ensure_property_exists(
session, body, epc_searcher, energy_assessment, landlord_property_id=config.get("landlord_property_id")
)
if not property_id:
@ -570,7 +588,7 @@ async def model_engine(body: PlanTriggerRequest):
)
if is_new:
create_property_targets(
db_funcs.property_functions.create_property_targets(
session,
property_id=property_id,
portfolio_id=body.portfolio_id,
@ -599,18 +617,19 @@ async def model_engine(body: PlanTriggerRequest):
patch = req_data.patch
# if we have a remote assment data type, we pull the additional data and include it
epc_page_source = {}
if (body.event_type == "remote_assessment") and not (epc_searcher.newest_epc.get("estimated")):
logger.info("Retrieving find my epc data")
try:
property_non_invasive_recommendations, patch = RetrieveFindMyEpc.get_from_epc(
epc_searcher.newest_epc
property_non_invasive_recommendations, patch, epc_page_source = RetrieveFindMyEpc.get_from_epc(
epc_searcher.newest_epc, epc_page, rrn=rrn
)
except Exception as e:
logger.error(f"Failed to retrieve without cleaning address {e}")
for k in ["address", "address1"]:
epc_searcher.newest_epc[k] = epc_searcher.address_clean
property_non_invasive_recommendations, patch = RetrieveFindMyEpc.get_from_epc(
epc_searcher.newest_epc
property_non_invasive_recommendations, patch, epc_page_source = RetrieveFindMyEpc.get_from_epc(
epc_searcher.newest_epc, epc_page, rrn=rrn
)
# If we have a property type, this means when we pull the epc data, we might need to make a patch
@ -627,7 +646,7 @@ async def model_engine(body: PlanTriggerRequest):
eco_packages[property_id] = parse_eco_packages(config, prepared_epc)
# Final step - extract inspections data, if we have it - we inject into property for usage
property_inspections = extract_inspection_data(config)
property_inspections = db_funcs.inspections_functions.extract_inspection_data(config)
if property_inspections:
inspections_map[property_id] = property_inspections
@ -647,6 +666,24 @@ async def model_engine(body: PlanTriggerRequest):
)
)
# If we have:
# 1) No EPC API data
# 2) A real EPC
# 3) A UPRN (meaning that a UPRN could be fetched against that property)
# We store this data
if db_funcs.epc_functions.EpcStoreService.check_insert_needed(
epc_cache, epc_searcher.newest_epc.get("estimated"), epc_searcher.uprn
):
# We store the EPC data we have found for this property
db_funcs.epc_functions.EpcStoreService.upsert_epc_data(
session=session,
uprn=epc_searcher.uprn,
epc_api=epc_searcher.data,
epc_page=epc_page_source.get("page_source"),
epc_page_rrn=epc_page_source.get("rrn"),
)
if not input_properties:
return Response(status_code=204)
@ -654,7 +691,7 @@ async def model_engine(body: PlanTriggerRequest):
# aginst each property if
if inspections_map:
logger.info("Inserting inspections data")
bulk_upsert_inspections_pg(session, inspections_map)
db_funcs.inspections_functions.bulk_upsert_inspections_pg(session, inspections_map)
# Set up model api and warm up the lambdas
model_api = ModelApi(
@ -671,7 +708,7 @@ async def model_engine(body: PlanTriggerRequest):
# consistent requests to the backend for
# the same data
logger.info("Reading in materials and cleaned datasets")
materials = get_materials(session)
materials = db_funcs.materials_functions.get_materials(session)
cleaned = get_cleaned()
project_scores_matrix, partial_project_scores_matrix, whlg_eligible_postcodes = get_funding_data()
@ -1096,7 +1133,7 @@ async def model_engine(body: PlanTriggerRequest):
# We don't need to create a new scenario, we just use the existing one
scenario_id = body.scenario_id
else:
engine_scenario = create_scenario(
engine_scenario = db_funcs.recommendations_functions.create_scenario(
session=session,
scenario={
"name": body.scenario_name,
@ -1140,24 +1177,26 @@ async def model_engine(body: PlanTriggerRequest):
)
property_value_increase_ranges[p.id] = valuations
# TODO - this is not right, especially if the existing run failed
if p.is_new:
property_details_epc = p.get_property_details_epc(
portfolio_id=body.portfolio_id, rating_lookup=rating_lookup,
)
create_property_details_epc(session, property_details_epc)
db_funcs.property_functions.create_property_details_epc(session, property_details_epc)
update_or_create_property_spatial_details(session, p.uprn, p.spatial)
db_funcs.property_functions.update_or_create_property_spatial_details(
session, p.uprn, p.spatial
)
property_data = p.get_full_property_data(current_valuation=valuations["current_value"])
update_property_data(
db_funcs.property_functions.update_property_data(
session, property_id=p.id, portfolio_id=body.portfolio_id, property_data=property_data
)
if not recommendations_to_upload:
continue
new_plan_id = create_plan(session, {
new_plan_id = db_funcs.recommendations_functions.create_plan(session, {
"portfolio_id": body.portfolio_id,
"property_id": p.id,
"scenario_id": scenario_id,
@ -1175,11 +1214,10 @@ async def model_engine(body: PlanTriggerRequest):
"plan_type": eco_packages.get(p.id, (None, None, None))[2]
})
upload_recommendations(
db_funcs.recommendations_functions.upload_recommendations(
session, recommendations_to_upload, p.id, new_plan_id
)
upload_funding(session, p, new_plan_id, recommendations_to_upload)
db_funcs.funding_functions.upload_funding(session, p, new_plan_id, recommendations_to_upload)
if valuations["current_value"] > 0:
property_valuation_increases.append(
@ -1218,7 +1256,7 @@ async def model_engine(body: PlanTriggerRequest):
property_value_increase_ranges=property_value_increase_ranges
)
aggregate_portfolio_recommendations(
db_funcs.portfolio_functions.aggregate_portfolio_recommendations(
session,
portfolio_id=body.portfolio_id,
scenario_id=scenario_id,
@ -1230,17 +1268,20 @@ async def model_engine(body: PlanTriggerRequest):
# Commit final changes
session.commit()
except IntegrityError:
return handle_error(session, "Database integrity error.", 500)
except OperationalError:
return handle_error(session, "Database operational error.", 500)
except ValueError:
return handle_error(session, "Bad request: malformed data.", 400)
except IntegrityError as e:
return handle_error(session, "Database integrity error.", e, body.subtask_id, 500)
except OperationalError as e:
return handle_error(session, "Database operational error.", e, body.subtask_id, 500)
except ValueError as e:
return handle_error(session, "Bad request: malformed data.", e, body.subtask_id, 400)
except Exception as e: # General exception handling
return handle_error(session, "An unexpected error occurred.", 500)
return handle_error(session, "An unexpected error occurred.", e, body.subtask_id, 500)
finally:
session.close()
# Mark the subtask as successful
SubTaskInterface().update_subtask_status(subtask_id=UUID(body.subtask_id), status="failed")
logger.info("Model Engine completed successfully")
return Response(status_code=200)

View file

@ -93,7 +93,7 @@ costs_by_floor_area = costs_by_floor_area.groupby("current-energy-efficiency")[
epc_data = epc_data[~pd.isnull(epc_data["UPRN"])]
sample_epc_data = epc_data[pd.to_datetime(epc_data["LODGEMENT_DATE"]) >= "2015-01-01"].drop_duplicates("UPRN").sample(
5000).reset_index(drop=True)
10000).reset_index(drop=True)
# TODO: In Property find_energy_sources, sort out biomass community heating - what fuel type
# TODO: We might be able to remove find_energy_sources entirely and remove estimate_electrical_consumption. It's used

View file

@ -10,6 +10,7 @@ import json
import time
import os
import pandas as pd
import numpy as np
from tqdm import tqdm
from dotenv import load_dotenv
from asset_list.utils import get_data_for_property
@ -52,8 +53,6 @@ n_postcodes = property_list["Post Code"].nunique()
postcode_summary = property_list.groupby("Post Code")["UPRN"].count().reset_index()
postcode_summary["UPRN"].mean()
test_match = property_list.merge(sustainability_data, left_on="UPRN", right_on="Org Ref")
def classify_floor_area(x):
if x <= 72:
@ -70,12 +69,187 @@ sustainability_data["Floor Area Band"] = sustainability_data["Total Floor Area (
lambda x: classify_floor_area(x)
)
archetypes = sustainability_data[
["Type", "Attachment", "Construction Years", "Wall Construction", "Wall Insulation",
"Roof Construction", "Roof Insulation", "Floor Construction", "Floor Insulation",
"Glazing", "Heating", "Boiler Efficiency", "Main Fuel", "Controls Adequacy",
"Floor Area Band"]
].drop_duplicates()
# Archetype reductions
# Roof insulation category
# 1) Split roof insulation into > 100mm loft and <= 100mm loft
sustainability_data["Roof Insulation Category"] = sustainability_data["Roof Insulation"].copy()
sustainability_data["Roof Insulation Category"] = np.where(
sustainability_data["Roof Insulation Category"].isin(
['mm200', 'mm300', 'mm250', 'mm150', 'mm270', 'mm400', 'mm350'],
),
"LI > 100mm",
sustainability_data["Roof Insulation Category"],
)
sustainability_data["Roof Insulation Category"] = np.where(
sustainability_data["Roof Insulation Category"].isin(
['mm100', 'mm50', 'mm75', 'mm25'],
),
"LI <= 100mm",
sustainability_data["Roof Insulation Category"],
)
# 2) Group all of the glazed together (e.g. double glazed, secondary glazed, triple glazed)
sustainability_data["Glazing Type"] = sustainability_data["Glazing"].copy()
sustainability_data["Glazing Type"] = np.where(
sustainability_data["Glazing Type"].isin(
['Double 2002 or later', 'Double before 2002', 'Double but age unknown', 'DoubleKnownData']
),
"Double Glazed",
sustainability_data["Glazing Type"],
)
sustainability_data["Glazing Type"] = np.where(
sustainability_data["Glazing Type"].isin(['Triple', 'TripleKnownData']),
"Triple Glazed",
sustainability_data["Glazing Type"],
)
# 3) Group up boiler efficiency A, B-D, E - G? or someting like this
sustainability_data["Boiler Efficiency Group"] = sustainability_data["Boiler Efficiency"].copy()
sustainability_data["Boiler Efficiency Group"] = np.where(
sustainability_data["Boiler Efficiency Group"].isin(['B', 'C', 'D']),
"B-D",
sustainability_data["Boiler Efficiency Group"],
)
sustainability_data["Boiler Efficiency Group"] = np.where(
sustainability_data["Boiler Efficiency Group"].isin(['E', 'F', 'G']),
"E-G",
sustainability_data["Boiler Efficiency Group"],
)
# 4) Group up main fuel into gas, electric, oil, other?
sustainability_data["Main Fuel Group"] = sustainability_data["Main Fuel"].copy()
sustainability_data["Main Fuel Group"] = np.where(
sustainability_data["Main Fuel Group"].isin(
["SmokelessCoal", "BiomassCommunity", "B30DCommunity"]
),
"Other Fuel",
sustainability_data["Main Fuel Group"],
)
# 5) Wall Construction - group up Sandstone and Granite into one category
sustainability_data["Wall Construction"] = np.where(
sustainability_data["Wall Construction"].isin(["Sandstone", "Granite"]),
"Sandstone/Granite",
sustainability_data["Wall Construction"]
)
sustainability_data["Wall Construction"] = np.where(
sustainability_data["Wall Construction"].isin(["Timber Frame", "System", "Solid Brick"]),
"Solid",
sustainability_data["Wall Construction"]
)
# 6) Reduce or remove floor construction
sustainability_data["Floor Construction"] = np.where(
sustainability_data["Floor Construction"].isin(["SuspendedTimber", "SuspendedNotTimber"]),
"Suspended Floor",
sustainability_data["Floor Construction"]
)
# 7) Reduce wall insulation
sustainability_data["Wall Insulation"] = np.where(
sustainability_data["Wall Insulation"].isin(
["FilledCavityPlusInternal", "FilledCavityPlusExternal", "FilledCavity", "External", "Internal"]
),
"Insulated",
sustainability_data["Wall Insulation"]
)
# 8) Fill floor insulation
sustainability_data["Floor Insulation"] = sustainability_data["Floor Insulation"].fillna("Unknown")
# 9) Reduce Age bands
sustainability_data["Construction Years"] = np.where(
sustainability_data["Construction Years"].isin(["2003-2006", "2007-2011", "2012 onwards"]),
"2003 onwards",
sustainability_data["Construction Years"],
)
sustainability_data["Construction Years"] = np.where(
sustainability_data["Construction Years"].isin(["Before 1900", "1900-1929"]),
"Before 1929",
sustainability_data["Construction Years"],
)
sustainability_data["Construction Years"] = np.where(
sustainability_data["Construction Years"].isin(["1983-1990", "1991-1995"]),
"1983-1995",
sustainability_data["Construction Years"],
)
sustainability_data["Construction Years"] = np.where(
sustainability_data["Construction Years"].isin(["1950-1966", "1967-1975", "1976-1982"]),
"1950-1982",
sustainability_data["Construction Years"],
)
# Roof
sustainability_data["Roof Construction"] = np.where(
sustainability_data["Roof Construction"].isin(
["PitchedNormalLoftAccess", "PitchedThatched", "PitchedNormalNoLoftAccess", "PitchedWithSlopingCeiling"]
),
"Pitched Roof",
sustainability_data["Roof Construction"]
)
archetype_variables = [
"Type", "Attachment", "Construction Years", "Wall Construction", "Wall Insulation",
"Roof Construction", "Roof Insulation Category", "Floor Construction", "Floor Insulation",
"Glazing Type", "Heating", "Boiler Efficiency Group", "Main Fuel Group", "Controls Adequacy",
"Floor Area Band"
]
archetypes = sustainability_data[archetype_variables + ["UPRN"]].dropna().groupby(archetype_variables)[
"UPRN"].nunique().reset_index().rename(columns={"UPRN": "Count"}).sort_values(by="Count",
ascending=False).reset_index(
drop=True)
# We take a sample that represents 95% of the properties
archetypes["Cumulative Count"] = archetypes["Count"].cumsum()
archetypes["Cumulative Proportion"] = archetypes["Cumulative Count"] / archetypes["Count"].sum()
archetypes_85 = archetypes[archetypes["Cumulative Proportion"] <= 0.80]
archetypes_85["Archetypes_85_reference"] = archetypes_85.index + 1
archetypes_85["Archetypes_85_reference"] = "Archetype_Sample_" + archetypes_85["Archetypes_85_reference"].astype(str)
# We now take a sample of the properties that represent 85% of the total properties
sustainability_data = sustainability_data.merge(
archetypes_85,
on=archetype_variables,
how="inner"
)
# We take 1 random property, by archetype 85 reference
modelling_sample = sustainability_data.groupby("Archetypes_85_reference").apply(
lambda x: x.sample(1, random_state=42)
).reset_index(drop=True)
# Checking distributions
def compare_distributions(full_df, sample_df, column):
full_dist = full_df[column].value_counts(normalize=True)
sample_dist = sample_df[column].value_counts(normalize=True)
comparison = pd.concat([full_dist, sample_dist], axis=1, keys=['Full', 'Sample']).fillna(0)
return comparison
for col in archetype_variables:
print(f"--- {col} ---")
print(compare_distributions(sustainability_data, modelling_sample, col))
# Save this CSV as input
modelling_sample.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/modelling_sample.xlsx",
)
# Save the archetype definitions
archetypes_85.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/archetypes_85.xlsx",
)
# Save the full archetypes
archetypes.to_excel(
"/Users/khalimconn-kowlessar/Documents/hestia/Customers/Peabody/Nov 2025 Consulting Project/full_archetypes.xlsx",
)
# Maps the property types to the format recognised by the EPC api
property_type_map = {}

View file

@ -20,6 +20,7 @@ DATA_ANOMALY_MATCHES = {
# certificate retrieval process is successfully completed. Mandatory data items cannot be applied
# retrospectively to energy certificates lodged before the date of the change.
"Not recorded",
"Not Recorded",
# The data also contains DECs with an operational rating of 9999 (a default DEC). The production of a
# default DEC value was allowed to enable building occupiers, with poor quality or no energy data,
# the opportunity to comply with the regulations. From April 2011 the ability to lodge a default DEC was no

View file

@ -21,14 +21,16 @@ class RetrieveFindMyEpc:
'Chrome/111.0.0.0 Safari/537.36'
}
def __init__(self, address: str, postcode: str):
def __init__(self, address: str, postcode: str, rrn: str = None):
"""
This class is tasked with retrieving the latest EPC data from the find my epc website
:param address: The address of the property
:param postcode: The postcode of the property
:param rrn: The RRN of the EPC (if known)
"""
self.address = address
self.postcode = postcode
self.rrn = rrn
self.address_cleaned = self.address.replace(",", "").replace(" ", "").lower()
self.walls = []
@ -286,54 +288,12 @@ class RetrieveFindMyEpc:
:return:
"""
postcode_input = self.postcode.replace(" ", "+")
postcode_search = self.SEARCH_POSTCODE_URL.format(postcode_input=postcode_input)
postcode_response = requests.get(postcode_search, headers=self.HEADERS)
postcode_res = BeautifulSoup(postcode_response.text, features="html.parser")
rows = postcode_res.find_all('tr', class_='govuk-table__row')
extracted_table = []
for row in rows:
# Extract the address and URL
address_tag = row.find('a', class_='govuk-link')
if address_tag is None:
continue
extracted_address = None
extracted_address_url = None
if address_tag:
extracted_address = address_tag.text.strip()
extracted_address_url = address_tag['href']
extracted_address_cleaned = extracted_address.replace(",", "").replace(" ", "").lower()
if not extracted_address_cleaned.startswith(self.address_cleaned):
continue
# If the address is a match, we can extract the data
# Extract the expiry date
expiry_date_tag = row.find('td', class_='govuk-table__cell date')
expiry_date = None
if expiry_date_tag is not None:
expiry_date = expiry_date_tag.parent.find('span').text.strip()
extracted_table.append(
{
"extracted_address": extracted_address,
"extracted_address_url": extracted_address_url,
"expiry_date": datetime.strptime(expiry_date, '%d %B %Y'),
}
)
if not extracted_table:
raise ValueError("No EPC found")
if len(extracted_table) > 1:
# We take the one with the most recent expiry date
extracted_table = sorted(extracted_table, key=lambda x: x['expiry_date'], reverse=True)
chosen_epc = self.BASE_ENERGY_URL + extracted_table[0]['extracted_address_url']
epc_certificate = chosen_epc.split('/')[-1]
if self.rrn:
# We build the URL directly
epc_certificate = self.rrn
chosen_epc = f"{self.BASE_ENERGY_URL}/energy-certificate/{epc_certificate}"
else:
chosen_epc, epc_certificate = self._find_epc_page()
address_response = requests.get(chosen_epc, headers=self.HEADERS)
address_res = BeautifulSoup(address_response.text, features="html.parser")
@ -371,9 +331,12 @@ class RetrieveFindMyEpc:
return all_find_my_epc_data
def retrieve_newest_find_my_epc_data(self, sap_2012_date=None, return_page=False):
def _find_epc_page(self):
"""
For a post code and address, we pull out all the required data from the find my epc website
This function is used to find the EPC page source for a given address and postcode.
It is done by fetching the page, associating to the postcode and then matching the
addresses on the page to the address we have been given.
:return:
"""
postcode_input = self.postcode.replace(" ", "+")
@ -398,6 +361,7 @@ class RetrieveFindMyEpc:
extracted_address_cleaned = (
extracted_address.replace(",", "").replace(" ", "").lower()
)
if not extracted_address_cleaned.startswith(self.address_cleaned):
continue
@ -427,8 +391,28 @@ class RetrieveFindMyEpc:
chosen_epc = self.BASE_ENERGY_URL + extracted_table[0]['extracted_address_url']
epc_certificate = chosen_epc.split('/')[-1]
address_response = requests.get(chosen_epc, headers=self.HEADERS)
address_res = BeautifulSoup(address_response.text, features="html.parser")
return chosen_epc, epc_certificate
def retrieve_newest_find_my_epc_data(self, sap_2012_date=None, return_page=False, epc_page_source=None, rrn=None):
"""
For a post code and address, we pull out all the required data from the find my epc website
"""
if epc_page_source is None and rrn is None:
chosen_epc, rrn = self._find_epc_page()
address_response = requests.get(chosen_epc, headers=self.HEADERS)
epc_page_source = address_response.text
address_res = BeautifulSoup(address_response.text, features="html.parser")
elif self.rrn:
epc_certificate = self.rrn
chosen_epc = f"{self.BASE_ENERGY_URL}/energy-certificate/{epc_certificate}"
address_response = requests.get(chosen_epc, headers=self.HEADERS)
epc_page_source = address_response.text
address_res = BeautifulSoup(address_response.text, features="html.parser")
else:
if rrn is None:
raise ValueError("rrn must be provided if epc_page_source is provided")
address_res = BeautifulSoup(epc_page_source, features="html.parser")
# Key data we want to retrieve:
# 1) Rating
@ -563,8 +547,21 @@ class RetrieveFindMyEpc:
# 5) Pull out the EPC data
epc_data = self.extract_epc_data(address_res)
# Pull out the address information which can be found in the box with the class "epc-address"
# We split it up on break tags
addr = address_res.find("p", class_="epc-address").get_text(separator="\n").strip()
lines = addr.split("\n")
if len(lines) > 2:
address1 = lines[0]
address2 = lines[1]
postcode = lines[-1]
else:
address1 = lines[0]
address2 = ""
postcode = lines[-1]
resulting_data = {
'epc_certificate': epc_certificate,
'epc_certificate': rrn,
'current_epc_rating': current_rating.split(' ')[-6],
'current_epc_efficiency': current_sap,
'potential_epc_rating': potential_rating.split(' ')[-6],
@ -575,11 +572,16 @@ class RetrieveFindMyEpc:
"epc_data": epc_data,
**assessment_data,
**low_carbon_energy_sources,
"page_source": epc_page_source,
# Add in address a postcode from the page - covers use cases where we are given RRN
"address1": address1,
"address2": address2,
"postcode": postcode,
}
if return_page:
# We return the page text as well, which can be parsed again later
return resulting_data, postcode_response.text
return resulting_data, epc_page_source
return resulting_data
@ -721,11 +723,15 @@ class RetrieveFindMyEpc:
return formatted_recommendations
@classmethod
def get_from_epc(cls, epc):
def get_from_epc(cls, epc, epc_page_source=None, rrn=None):
if epc_page_source is not None and rrn is None:
raise ValueError("rrn must be provided if epc_page_source is provided")
# Attempt both methods:
try:
searcher = cls(address=epc["address"], postcode=epc["postcode"])
find_epc_data = searcher.retrieve_newest_find_my_epc_data()
find_epc_data = searcher.retrieve_newest_find_my_epc_data(epc_page_source=epc_page_source, rrn=rrn)
except Exception as e:
logger.error(f"Error retrieving find my epc data: {e}")
@ -733,7 +739,7 @@ class RetrieveFindMyEpc:
address1 = ",".join(epc["address"].split(",")[:-1])
try:
searcher = cls(address=address1, postcode=epc["postcode"])
find_epc_data = searcher.retrieve_newest_find_my_epc_data()
find_epc_data = searcher.retrieve_newest_find_my_epc_data(epc_page_source=epc_page_source, rrn=rrn)
logger.info("Successfully retrieved find my epc data using trimmed address")
except Exception as e2:
logger.error(f"Error retrieving find my epc data using trimmed address: {e2}")
@ -746,7 +752,7 @@ class RetrieveFindMyEpc:
address1 = epc["address1"]
# We attempt with the backup add
searcher = cls(address=address1, postcode=epc["postcode"])
find_epc_data = searcher.retrieve_newest_find_my_epc_data()
find_epc_data = searcher.retrieve_newest_find_my_epc_data(epc_page_source=epc_page_source, rrn=rrn)
logger.info("Successfully retrieved find my epc data using backup address")
non_invasive_recommendations = {
@ -765,4 +771,9 @@ class RetrieveFindMyEpc:
**find_epc_data["epc_data"],
}
return non_invasive_recommendations, patch
page_source = {
"rrn": find_epc_data["epc_certificate"],
"page_source": find_epc_data["page_source"]
}
return non_invasive_recommendations, patch, page_source