address JTK review comments

This commit is contained in:
Daniel Roth 2026-04-20 15:11:17 +00:00
parent 9c362b3639
commit 825f5fb096
3 changed files with 149 additions and 140 deletions

View file

@ -38,38 +38,36 @@ class PasHubRdSapSiteNotesExtractor:
# --- generic helpers ---
def _get(self, key: str, offset: int = 1) -> Optional[str]:
try:
idx = self.text_list.index(key)
return self.text_list[idx + offset].strip() or None
except (ValueError, IndexError):
return None
def _get_in_doc(self, key: str, offset: int = 1) -> Optional[str]:
return self._get_in(self.text_list, key, offset)
def _bool(self, key: str, offset: int = 1) -> bool:
val = self._get(key, offset)
val = self._get_in_doc(key, offset)
return val is not None and val.lower() == "yes"
def _get_in(self, lst: List[str], key: str, offset: int = 1) -> Optional[str]:
def _get_in(
self, list_to_process: List[str], key: str, offset: int = 1
) -> Optional[str]:
try:
idx = lst.index(key)
return lst[idx + offset].strip() or None
idx = list_to_process.index(key)
return list_to_process[idx + offset].strip() or None
except (ValueError, IndexError):
return None
def _bool_in(self, lst: List[str], key: str, offset: int = 1) -> bool:
val = self._get_in(lst, key, offset)
def _bool_in(self, list_to_process: List[str], key: str, offset: int = 1) -> bool:
val = self._get_in(list_to_process, key, offset)
return val is not None and val.lower() == "yes"
def _optional_bool_in(self, lst: List[str], key: str) -> Optional[bool]:
val = self._get_in(lst, key)
def _optional_bool_in(self, list_to_process: List[str], key: str) -> Optional[bool]:
val = self._get_in(list_to_process, key)
return None if val is None else val.lower() == "yes"
def _is_known_in(self, lst: List[str], key: str) -> bool:
val = self._get_in(lst, key)
def _is_known_in(self, list_to_process: List[str], key: str) -> bool:
val = self._get_in(list_to_process, key)
return val is not None and val.lower() != "not known"
def _wall_thickness_in(self, lst: List[str]) -> int:
val = self._get_in(lst, "Wall thickness:")
def _wall_thickness_in(self, list_to_process: List[str]) -> int:
val = self._get_in(list_to_process, "Wall thickness:")
return int(val.split()[0]) if val else 0
def _section(self, start: str, end: str) -> List[str]:
@ -92,22 +90,24 @@ class PasHubRdSapSiteNotesExtractor:
except ValueError:
property_address = ""
created_on_raw = self._get("Created On:")
created_on_raw = self._get_in_doc("Created On:")
created_on = (
datetime.strptime(created_on_raw, "%d %B %Y").strftime("%Y-%m-%d")
if created_on_raw
else ""
)
date_of_inspection_raw = self._get("Date of Inspection:")
date_of_inspection_raw = self._get_in_doc("Date of Inspection:")
if not date_of_inspection_raw:
raise ValueError("Date of Inspection not found in document")
date_of_inspection = datetime.strptime(date_of_inspection_raw, "%d %B %Y").date()
date_of_inspection = datetime.strptime(
date_of_inspection_raw, "%d %B %Y"
).date()
return InspectionMetadata(
inspection_surveyor=self._get("Inspection Surveyor:") or "",
email_address=self._get("E-Mail Address:") or "",
report_reference=self._get("Report Reference:") or "",
inspection_surveyor=self._get_in_doc("Inspection Surveyor:") or "",
email_address=self._get_in_doc("E-Mail Address:") or "",
report_reference=self._get_in_doc("Report Reference:") or "",
created_on=created_on,
date_of_inspection=date_of_inspection,
property_address=property_address,
@ -133,13 +133,13 @@ class PasHubRdSapSiteNotesExtractor:
)
def extract_general(self) -> General:
inspection_date_raw = self._get("Inspection Date:")
inspection_date_raw = self._get_in_doc("Inspection Date:")
if not inspection_date_raw:
raise ValueError("Inspection Date not found in document")
inspection_date = datetime.strptime(inspection_date_raw, "%d/%m/%Y").date()
storeys_raw = self._get("Number of storeys:") or "0"
extensions_raw = self._get("Number of Extensions:") or "0"
storeys_raw = self._get_in_doc("Number of storeys:") or "0"
extensions_raw = self._get_in_doc("Number of Extensions:") or "0"
return General(
epc_checked_before_assessment=self._bool(
@ -149,20 +149,23 @@ class PasHubRdSapSiteNotesExtractor:
"Does an EPC exist at the point of carrying out this", offset=2
),
inspection_date=inspection_date,
transaction_type=self._get("Transaction Type:") or "",
tenure=self._get("Tenure:") or "",
property_type=self._get("Type of Property:") or "",
detachment_type=self._get("Detachment Type:") or "",
transaction_type=self._get_in_doc("Transaction Type:") or "",
tenure=self._get_in_doc("Tenure:") or "",
property_type=self._get_in_doc("Type of Property:") or "",
detachment_type=self._get_in_doc("Detachment Type:") or "",
number_of_storeys=int(storeys_raw.split()[0]),
terrain_type=self._get("Terrain Type:") or "",
terrain_type=self._get_in_doc("Terrain Type:") or "",
number_of_extensions=int(extensions_raw.split()[0]),
electricity_smart_meter=self._bool("Is an electricity smart meter present?"),
electric_meter_type=self._get("Electric meter type:") or "",
electricity_smart_meter=self._bool(
"Is an electricity smart meter present?"
),
electric_meter_type=self._get_in_doc("Electric meter type:") or "",
dwelling_export_capable=self._bool("Is the dwelling export-capable?"),
mains_gas_available=self._bool("Is mains gas available?"),
gas_smart_meter=self._bool("Is there a gas smart meter?"),
gas_meter_accessible=self._bool("Is the gas meter accessible?"),
measurements_location=self._get("Select Measurements Location:") or "",
measurements_location=self._get_in_doc("Select Measurements Location:")
or "",
)
def extract_building_construction(self) -> BuildingConstruction:
@ -209,23 +212,28 @@ class PasHubRdSapSiteNotesExtractor:
) -> MainBuildingConstruction:
return MainBuildingConstruction(
age_range=self._get_in(data, "Age Range:") or "",
age_indicators=self._get_in(data, "Record indicators of property age:") or "",
walls_construction_type=self._get_in(data, "Walls - Construction Type:") or "",
age_indicators=self._get_in(data, "Record indicators of property age:")
or "",
walls_construction_type=self._get_in(data, "Walls - Construction Type:")
or "",
cavity_construction_indicators=self._get_in(
data, "Record external indicators of Cavity Construction:"
) or "",
)
or "",
walls_insulation_type=self._get_in(data, "Walls - Insulation Type:") or "",
filled_cavity_indicators=self._get_in(
data, "Record indicators of filled cavity:"
),
thermal_conductivity_of_wall_insulation=self._get_in(
data, "Thermal conductivity of wall insulation:"
) or "",
)
or "",
wall_u_value_known=self._is_known_in(data, "Wall U-Value known?"),
wall_thickness_mm=self._wall_thickness_in(data),
party_wall_construction_type=self._get_in(
data, "Party wall construction type:"
) or "",
)
or "",
)
def _parse_extension_construction(
@ -234,23 +242,28 @@ class PasHubRdSapSiteNotesExtractor:
return ExtensionConstruction(
id=ext_id,
age_range=self._get_in(data, "Age Range:") or "",
age_indicators=self._get_in(data, "Record indicators of property age:") or "",
walls_construction_type=self._get_in(data, "Walls - Construction Type:") or "",
age_indicators=self._get_in(data, "Record indicators of property age:")
or "",
walls_construction_type=self._get_in(data, "Walls - Construction Type:")
or "",
cavity_construction_indicators=self._get_in(
data, "Record external indicators of Cavity Construction:"
) or "",
)
or "",
walls_insulation_type=self._get_in(data, "Walls - Insulation Type:") or "",
filled_cavity_indicators=self._get_in(
data, "Record indicators of filled cavity:"
),
thermal_conductivity_of_wall_insulation=self._get_in(
data, "Thermal conductivity of wall insulation:"
) or "",
)
or "",
wall_u_value_known=self._is_known_in(data, "Wall U-Value known?"),
wall_thickness_mm=self._wall_thickness_in(data),
party_wall_construction_type=self._get_in(
data, "Party wall construction type:"
) or "",
)
or "",
)
def extract_building_measurements(self) -> BuildingMeasurements:
@ -281,7 +294,9 @@ class PasHubRdSapSiteNotesExtractor:
extensions.append(
ExtensionMeasurements(
id=n + 1,
floors=self._parse_floor_measurements(bm_section[ext_start:ext_end]),
floors=self._parse_floor_measurements(
bm_section[ext_start:ext_end]
),
)
)
@ -352,14 +367,30 @@ class PasHubRdSapSiteNotesExtractor:
v_section = self._section("Ventilation", "Conservatories")
return Ventilation(
ventilation_type=self._get_in(v_section, "Ventilation type:") or "",
has_fixed_air_conditioning=self._bool_in(v_section, "Has fixed air conditioning?"),
number_of_open_flues=int(self._get_in(v_section, "Number of open flues:") or 0),
number_of_closed_flues=int(self._get_in(v_section, "Number of closed flues:") or 0),
number_of_boiler_flues=int(self._get_in(v_section, "Number of boiler flues:") or 0),
number_of_other_flues=int(self._get_in(v_section, "Number of other flues:") or 0),
number_of_extract_fans=int(self._get_in(v_section, "Number of extract fans:") or 0),
number_of_passive_vents=int(self._get_in(v_section, "Number of passive vents:") or 0),
number_of_flueless_gas_fires=int(self._get_in(v_section, "Number of flueless gas fires:") or 0),
has_fixed_air_conditioning=self._bool_in(
v_section, "Has fixed air conditioning?"
),
number_of_open_flues=int(
self._get_in(v_section, "Number of open flues:") or 0
),
number_of_closed_flues=int(
self._get_in(v_section, "Number of closed flues:") or 0
),
number_of_boiler_flues=int(
self._get_in(v_section, "Number of boiler flues:") or 0
),
number_of_other_flues=int(
self._get_in(v_section, "Number of other flues:") or 0
),
number_of_extract_fans=int(
self._get_in(v_section, "Number of extract fans:") or 0
),
number_of_passive_vents=int(
self._get_in(v_section, "Number of passive vents:") or 0
),
number_of_flueless_gas_fires=int(
self._get_in(v_section, "Number of flueless gas fires:") or 0
),
pressure_test=self._get_in(v_section, "Pressure test:") or "",
draught_lobby=self._bool_in(v_section, "Is there a draught lobby?"),
ventilation_in_pcdf_database=self._optional_bool_in(
@ -408,7 +439,8 @@ class PasHubRdSapSiteNotesExtractor:
self._get_in(rce_section, "Number of insulated external doors?") or 0
),
number_of_draughtproofed_external_doors=int(
self._get_in(rce_section, "Number of draughtproofed external doors?") or 0
self._get_in(rce_section, "Number of draughtproofed external doors?")
or 0
),
number_of_open_chimneys=int(
self._get_in(rce_section, "Number of open chimneys?") or 0
@ -430,15 +462,19 @@ class PasHubRdSapSiteNotesExtractor:
),
waste_water_heat_recovery=self._get_in(
rce_section, "Are there any waste water heat recovery systems?"
) or "",
)
or "",
)
def extract_water_use(self) -> WaterUse:
wu_section = self._section("Room Count Elements", "Customer Response")
baths_raw = self._get_in(wu_section, "Number of baths:") or "0"
special_raw = self._get_in(
special_raw = (
self._get_in(
wu_section, "How many special features are there at the", offset=2
) or "0"
)
or "0"
)
showers = []
n = 1
@ -481,9 +517,8 @@ class PasHubRdSapSiteNotesExtractor:
)
return SurveyAddendum(
addendum=self._get_in(a_section, "Addendum") or "",
related_party_disclosure=self._get_in(
a_section, "Related party disclosure"
) or "",
related_party_disclosure=self._get_in(a_section, "Related party disclosure")
or "",
hard_to_treat_cavity_access_issues=self._bool_in(
a_section,
"Hard to treat cavity walls: Property has access",
@ -503,7 +538,10 @@ class PasHubRdSapSiteNotesExtractor:
def _parse_main_heating(self, data: List[str]) -> MainHeating:
return MainHeating(
selection_method=self._get_in(data, "How would you like to select the Heating System?") or "",
selection_method=self._get_in(
data, "How would you like to select the Heating System?"
)
or "",
system_type=self._get_in(data, "System type:") or "",
product_id=int(self._get_in(data, "Product Id") or 0),
manufacturer=self._get_in(data, "Manufacturer") or "",
@ -518,7 +556,8 @@ class PasHubRdSapSiteNotesExtractor:
open_flue=self._get_in(data, "Open Flue") or "",
fan_assist=self._bool_in(data, "Fan Assist"),
status=self._get_in(data, "Status") or "",
central_heating_pump_age=self._get_in(data, "Central heating pump age:") or "",
central_heating_pump_age=self._get_in(data, "Central heating pump age:")
or "",
controls=self._get_in(data, "Controls:") or "",
flue_gas_heat_recovery_system=self._bool_in(
data, "Does the boiler have a Flue Gas Heat Recover", offset=2
@ -539,7 +578,9 @@ class PasHubRdSapSiteNotesExtractor:
type=self._get_in(data, "Water Heating Type:") or "",
system=self._get_in(data, "Water Heating System:") or "",
cylinder_size=self._get_in(data, "Cylinder Size:") or "",
cylinder_measured_heat_loss=self._get_in(data, "Cylinder Measured Heat Loss:"),
cylinder_measured_heat_loss=self._get_in(
data, "Cylinder Measured Heat Loss:"
),
insulation_type=self._get_in(data, "Insulation Type:"),
insulation_thickness_mm=int(thickness_raw) if thickness_raw else None,
has_thermostat=self._optional_bool_in(data, "Cylinder Thermostat:"),
@ -557,7 +598,9 @@ class PasHubRdSapSiteNotesExtractor:
frame_type=self._get_in(data, "Window frame type:") or "",
glazing_gap=self._get_in(data, "What size is the glazing gap?") or "",
draught_proofed=self._bool_in(data, "Is the window draught proofed?"),
permanent_shutters=self._bool_in(data, "Are there permanent shutters present?"),
permanent_shutters=self._bool_in(
data, "Are there permanent shutters present?"
),
height_m=float(height_raw.split()[0]) if height_raw else 0.0,
width_m=float(width_raw.split()[0]) if width_raw else 0.0,
orientation=self._get_in(data, "Orientation:") or "",
@ -583,7 +626,8 @@ class PasHubRdSapSiteNotesExtractor:
roof_u_value_known=self._is_known_in(data, "Roof U-Value:"),
cavity_wall_construction_indicators=self._get_in(
data, "Record indicators of Cavity Wall Construction in roof", offset=2
) or "",
)
or "",
rooms_in_roof=self._bool_in(data, "Are there rooms in the roof?"),
insulation_thickness_mm=thickness_mm,
insulation_thickness=thickness_str,
@ -602,7 +646,8 @@ class PasHubRdSapSiteNotesExtractor:
roof_u_value_known=self._is_known_in(data, "Roof U-Value:"),
cavity_wall_construction_indicators=self._get_in(
data, "Record indicators of Cavity Wall Construction in roof", offset=2
) or "",
)
or "",
rooms_in_roof=self._bool_in(data, "Are there rooms in the roof?"),
insulation_thickness_mm=thickness_mm,
insulation_thickness=thickness_str,

View file

@ -1,9 +1,6 @@
import json
import os
import time
from typing import Any, List, Mapping
from typing import Any, Mapping
import boto3
from utils.logger import setup_logger
from utils.s3 import upload_file_to_s3
@ -28,60 +25,6 @@ def upload_pdf(local_path: str, bucket: str, key: str) -> None:
logger.info("Upload complete")
def start_textract_job(bucket: str, key: str) -> str:
client = boto3.client("textract")
response = client.start_document_analysis(
DocumentLocation={"S3Object": {"Bucket": bucket, "Name": key}},
FeatureTypes=["FORMS"],
)
job_id: str = response["JobId"]
logger.info(f"Started Textract job {job_id}")
return job_id
def wait_for_job(job_id: str, poll_interval_seconds: int = 5) -> None:
client = boto3.client("textract")
logger.info(f"Polling Textract job {job_id}...")
while True:
response = client.get_document_analysis(JobId=job_id, MaxResults=1)
status = response["JobStatus"]
logger.info(f"Status: {status}")
if status == "SUCCEEDED":
return
if status == "FAILED":
raise RuntimeError(
f"Textract job {job_id} failed: {response.get('StatusMessage')}"
)
time.sleep(poll_interval_seconds)
def collect_blocks(job_id: str) -> List[Any]:
client = boto3.client("textract")
blocks: List[Any] = []
next_token = None
while True:
kwargs: dict = {"JobId": job_id, "MaxResults": 1000}
if next_token:
kwargs["NextToken"] = next_token
response = client.get_document_analysis(**kwargs)
blocks.extend(response.get("Blocks", []))
next_token = response.get("NextToken")
if not next_token:
break
logger.info(f"Collected {len(blocks)} blocks")
return blocks
def save_blocks(blocks: List[Any], output_path: str) -> None:
with open(output_path, "w") as f:
json.dump(blocks, f, indent=2, default=str)
logger.info(f"Saved blocks to {output_path}")
def handler(event: Mapping[str, Any], context: Any) -> None:
logger.info("Entered handler")
@ -89,12 +32,7 @@ def handler(event: Mapping[str, Any], context: Any) -> None:
upload_pdf(PDF_LOCAL_PATH, BUCKET, PDF_S3_KEY)
job_id = start_textract_job(BUCKET, PDF_S3_KEY)
wait_for_job(job_id)
blocks = collect_blocks(job_id)
save_blocks(blocks, output_path)
logger.info("Done")
logger.info(f"Uploaded file to {output_path}")
if __name__ == "__main__":

View file

@ -84,8 +84,20 @@ class EpcPropertyDataMapper:
metadata = survey.inspection_metadata
address_parts = [p.strip() for p in metadata.property_address.split(", ")]
postcode = address_parts[-1] if len(address_parts) >= 1 else None
post_town = address_parts[-3] if len(address_parts) >= 4 else (address_parts[-2] if len(address_parts) >= 3 else None)
address_line_1 = ", ".join(address_parts[:-3]) if len(address_parts) >= 4 else ", ".join(address_parts[:-2]) if len(address_parts) >= 3 else address_parts[0] if address_parts else None
post_town = (
address_parts[-3]
if len(address_parts) >= 4
else (address_parts[-2] if len(address_parts) >= 3 else None)
)
address_line_1 = (
", ".join(address_parts[:-3])
if len(address_parts) >= 4
else (
", ".join(address_parts[:-2])
if len(address_parts) >= 3
else address_parts[0] if address_parts else None
)
)
construction = survey.building_construction
measurements = survey.building_measurements
@ -95,14 +107,22 @@ class EpcPropertyDataMapper:
room_counts = survey.room_count_elements
roof_space = survey.roof_space
sap_building_parts = [_map_main_building_part(construction, measurements, roof_space.main_building)]
sap_building_parts = [
_map_main_building_part(
construction, measurements, roof_space.main_building
)
]
if construction.extensions and measurements.extensions:
for ext_c in construction.extensions:
matching_m = [m for m in measurements.extensions if m.id == ext_c.id]
matching_r = [r for r in (roof_space.extensions or []) if r.id == ext_c.id]
matching_r = [
r for r in (roof_space.extensions or []) if r.id == ext_c.id
]
if matching_m:
sap_building_parts.append(
_map_extension_building_part(ext_c, matching_m[0], matching_r[0] if matching_r else None)
_map_extension_building_part(
ext_c, matching_m[0], matching_r[0] if matching_r else None
)
)
total_floor_area = round(
@ -112,7 +132,7 @@ class EpcPropertyDataMapper:
for floor in part.sap_floor_dimensions
),
2,
)
) # TODO: verify that is the correct approach
return EpcPropertyData(
dwelling_type=f"{general.detachment_type} {general.property_type.lower()}",
@ -1433,10 +1453,16 @@ def _map_floor_dimensions(floors: List[FloorMeasurement]) -> List[SapFloorDimens
]
def _map_roof(roof: Optional[Union[RoofSpaceDetail, ExtensionRoofSpace]]) -> tuple[Optional[str], Optional[Union[str, int]]]:
def _map_roof(
roof: Optional[Union[RoofSpaceDetail, ExtensionRoofSpace]],
) -> tuple[Optional[str], Optional[Union[str, int]]]:
if roof is None:
return None, None
thickness: Optional[Union[str, int]] = roof.insulation_thickness_mm if roof.insulation_thickness_mm is not None else roof.insulation_thickness
thickness: Optional[Union[str, int]] = (
roof.insulation_thickness_mm
if roof.insulation_thickness_mm is not None
else roof.insulation_thickness
)
return roof.insulation_at or None, thickness