Merge pull request #134 from Hestia-Homes/feature/proof_of_site_notes

Feature/proof of site notes
This commit is contained in:
Jun-te Kim 2026-04-23 14:20:33 +01:00 committed by GitHub
commit 2b0894bb0b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 744 additions and 77 deletions

View file

@ -29,7 +29,9 @@
"4ops.terraform",
"fabiospampinato.vscode-todo-plus",
"jgclark.vscode-todo-highlight",
"corentinartaud.pdfpreview"
"corentinartaud.pdfpreview",
"GrapeCity.gc-excelviewer",
"anthropic.claude-code"
]
}
}

View file

@ -1,51 +1,51 @@
name: Hubspot Sync
# name: Hubspot Sync
on:
schedule:
# Every 15 minutes, 07:0018:59, MondayFriday (UTC)
- cron: '0 7-18/2 * * 1-5'
# on:
# schedule:
# # Every 15 minutes, 07:0018:59, MondayFriday (UTC)
# - cron: '0 7-18/2 * * 1-5'
# Once on Saturday at 09:00 UTC
- cron: '0 9 * * 6'
# # Once on Saturday at 09:00 UTC
# - cron: '0 9 * * 6'
# Once on Sunday at 09:00 UTC
- cron: '0 9 * * 0'
workflow_dispatch:
# # Once on Sunday at 09:00 UTC
# - cron: '0 9 * * 0'
# workflow_dispatch:
jobs:
hubspot-sync:
runs-on: [self-hosted, mist]
steps:
- uses: actions/checkout@v4
# jobs:
# hubspot-sync:
# runs-on: [self-hosted, mist]
# steps:
# - uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
# - name: Set up Python
# uses: actions/setup-python@v5
# with:
# python-version: '3.12'
- name: Install dependencies
run: |
pip install poetry
poetry install --no-root
# - name: Install dependencies
# run: |
# pip install poetry
# poetry install --no-root
# - name: Run scripts
# env:
# PYTHONPATH: ${{ github.workspace }}
# DATABASE_URL: ${{ secrets.PROD_DATABASE_URL }}
# run: |
# pwd
# ls -la
# poetry run python etl/hubSpotClient/scripts/hubspot_gather_all_deals.py
# # - name: Run scripts
# # env:
# # PYTHONPATH: ${{ github.workspace }}
# # DATABASE_URL: ${{ secrets.PROD_DATABASE_URL }}
# # run: |
# # pwd
# # ls -la
# # poetry run python etl/hubSpotClient/scripts/hubspot_gather_all_deals.py
- name: Run scripts
env:
PYTHONPATH: ${{ github.workspace }}
DATABASE_URL: ${{ secrets.PROD_DATABASE_URL }}
run: |
pwd
ls -la
poetry run python etl/hubSpotClient/scripts/hubspot_update_script.py
# - name: Run scripts
# env:
# PYTHONPATH: ${{ github.workspace }}
# DATABASE_URL: ${{ secrets.PROD_DATABASE_URL }}
# run: |
# pwd
# ls -la
# poetry run python etl/hubSpotClient/scripts/hubspot_update_script.py

View file

@ -2,19 +2,12 @@ name: Months End
on:
schedule:
- cron: '0 7 * * 1' # Every Monday at 07:00 UTC
- cron: '0 7 23 * *' # On the 23th of every month at 07:00 UTC
- cron: '0 7 24 * *' # On the 25th of every month at 07:00 UTC
- cron: '0 7 25 * *' # On the 25th of every month at 07:00 UTC
- cron: '0 7 26 * *' # On the 26th of every month at 07:00 UTC
- cron: '0 7 27 * *' # On the 26th of every month at 07:00 UTC
- cron: '0 7 29 * *' # On the 29th of every month at 07:00 UTC
- cron: '0 7 19 * *' # On the 29th of every month at 07:00 UTC
- cron: '0 7 23-31 * *' # Every day from the 23rd to end of month at 07:00 UTC
workflow_dispatch:
jobs:
surveyed-needs-sign-off:
runs-on: [self-hosted, mist]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

Binary file not shown.

102
devcontainer.sh Normal file
View file

@ -0,0 +1,102 @@
#!/usr/bin/env bash
#
# devcontainer.sh — devcontainer helper for this repo
#
# Usage:
# ./devcontainer.sh <command>
#
# Commands:
# up build + start the devcontainer (idempotent)
# shell attach a bash shell; auto-ups if not running
# down stop the devcontainer
# rebuild remove + rebuild from scratch, no cache
#
# Examples:
# ./devcontainer.sh shell # one-shot: up if needed, then bash
# ./devcontainer.sh rebuild
set -euo pipefail
SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"
REPO_ROOT="${SCRIPT_DIR}"
CONFIG_PATH="${REPO_ROOT}/.devcontainer/devcontainer.json"
VALID_COMMANDS=(up shell down rebuild)
# --- helpers ---------------------------------------------------------------
usage() {
sed -n '3,15p' "${BASH_SOURCE[0]}" | sed 's/^# \{0,1\}//'
exit "${1:-0}"
}
die() {
echo "error: $*" >&2
exit 1
}
in_list() {
local needle="$1"
shift
local item
for item in "$@"; do
[[ "${item}" == "${needle}" ]] && return 0
done
return 1
}
container_id() {
# Find the running container for this repo via devcontainer labels.
docker ps -q \
--filter "label=devcontainer.local_folder=${REPO_ROOT}" \
--filter "label=devcontainer.config_file=${CONFIG_PATH}"
}
# --- argument parsing ------------------------------------------------------
[[ $# -eq 1 ]] || usage 1
COMMAND="$1"
in_list "${COMMAND}" "${VALID_COMMANDS[@]}" \
|| die "invalid command '${COMMAND}' (expected: ${VALID_COMMANDS[*]})"
[[ -f "${CONFIG_PATH}" ]] || die "config not found: ${CONFIG_PATH}"
DC_ARGS=(--workspace-folder "${REPO_ROOT}")
# --- dispatch --------------------------------------------------------------
case "${COMMAND}" in
up)
echo ">> bringing up devcontainer"
devcontainer up "${DC_ARGS[@]}"
;;
shell)
# Auto-up if not already running. `devcontainer up` is idempotent —
# it reuses an existing container, so this is cheap on warm starts.
if [[ -z "$(container_id)" ]]; then
echo ">> devcontainer not running, bringing it up first"
devcontainer up "${DC_ARGS[@]}"
fi
echo ">> attaching shell"
devcontainer exec "${DC_ARGS[@]}" bash 2>/dev/null \
|| devcontainer exec "${DC_ARGS[@]}" sh
;;
down)
cid="$(container_id)"
if [[ -z "${cid}" ]]; then
echo ">> devcontainer not running, nothing to stop"
exit 0
fi
echo ">> stopping devcontainer"
docker stop "${cid}"
;;
rebuild)
echo ">> rebuilding devcontainer from scratch"
devcontainer up "${DC_ARGS[@]}" --remove-existing-container --build-no-cache
;;
esac

View file

@ -11,10 +11,6 @@ class HubspotTodb:
init_db()
self.s3 = S3Uploader()
def new_record_to_hubspot_data(self, deal_data, company, listing, hubspot_client):
print("⚠️ Deprecated — use the new interface instead.")
return self.upsert_hubspot_deal(deal_data, company, listing, hubspot_client)
def new_record_company(self, company_data):
"""Adds a new record to the hubspot_company_data table."""
with get_db_session() as session:
@ -27,6 +23,10 @@ class HubspotTodb:
session.refresh(new_record)
return new_record
def new_record_to_hubspot_data(self, deal_data, company, listing, hubspot_client):
print("⚠️ Deprecated — use the new interface instead.")
return self.upsert_hubspot_deal(deal_data, company, listing, hubspot_client)
def find_all_deals_with_company_id(self, company_id):
"""Returns a list of deals for a given company_id."""
with get_db_session() as session:
@ -35,12 +35,13 @@ class HubspotTodb:
.filter(HubspotDealData.company_id == company_id)
.all()
)
def find_deal_with_deal_id(self, deal_id):
with get_db_session() as session:
return(
return (
session.query(HubspotDealData)
.filter(HubspotDealData.deal_id == deal_id).one()
.filter(HubspotDealData.deal_id == deal_id)
.one()
)
def _sha256(self, file_path: str) -> str:
@ -163,10 +164,14 @@ class HubspotTodb:
db_record.major_condition_issue_evidence_s3_url = s3_url
session.add(db_record)
session.commit()
print(f"✅ Updated DB with S3 URL for deal_id={deal_in_db.deal_id}")
print(
f"✅ Updated DB with S3 URL for deal_id={deal_in_db.deal_id}"
)
return False
except Exception as e:
print(f"⚠️ Failed to download/upload photo for deal_id {deal_in_db.deal_id}: {e}")
print(
f"⚠️ Failed to download/upload photo for deal_id {deal_in_db.deal_id}: {e}"
)
# Continue without the file — don't crash the entire update
else:
print(f"⚠️ Photo URL missing for deal_id {deal_in_db.deal_id}")
@ -232,7 +237,9 @@ class HubspotTodb:
if photo_url:
try:
local_file = hubspot_client.download_file_from_url(photo_url)
local_file = hubspot_client.download_file_from_url(
photo_url
)
s3_url = self.s3.upload_file(
local_file,
"retrofit-data-dev",
@ -240,7 +247,9 @@ class HubspotTodb:
)
existing.major_condition_issue_evidence_s3_url = s3_url
except Exception as e:
print(f"⚠️ Failed to download photo for deal_id {existing.deal_id}: {e}")
print(
f"⚠️ Failed to download photo for deal_id {existing.deal_id}: {e}"
)
# Continue without the file — don't crash the update
else:
print(f"⚠️ Photo URL missing for deal_id {existing.deal_id}")
@ -285,12 +294,12 @@ class HubspotTodb:
)
new_record.major_condition_issue_evidence_s3_url = s3_url
except Exception as e:
print(f"⚠️ Failed to download photo for deal_id {new_record.deal_id}: {e}")
print(
f"⚠️ Failed to download photo for deal_id {new_record.deal_id}: {e}"
)
# Continue without the file — don't crash the insert
session.add(new_record)
session.commit()
session.refresh(new_record)
return new_record

View file

@ -7,8 +7,8 @@ from etl.fileReader.sitenotes import (
WarmHomesConditionReport,
ECOConditionReport,
EnergyPerformanceReportWithData,
EnergyPerformanceReportSummaryInformation
EnergyPerformanceReportSummaryInformation,
SmartEpcSiteNote,
)
from etl.fileReader.reportType import ReportType
from pprint import pprint
@ -57,6 +57,8 @@ class pdfReaderToText():
self.type = ReportType.ENERGY_PERFORMANCE_REPORT_WITH_DATA
elif "Summary Information".lower() == self.text_list[0].lower():
self.type = ReportType.ENERGY_PERFORMANCE_REPORT_SUMMARY_INFORMATION
elif "SMART EPC: Record of" in self.all_text and "Inspection & Site Notes" in self.all_text:
self.type = ReportType.SMART_EPC_SITE_NOTE
else:
pass
return self.type
@ -75,4 +77,6 @@ class pdfReaderToText():
elif self.type == ReportType.ENERGY_PERFORMANCE_REPORT_WITH_DATA:
return EnergyPerformanceReportWithData(self.text_list)
elif self.type == ReportType.ENERGY_PERFORMANCE_REPORT_SUMMARY_INFORMATION:
return EnergyPerformanceReportSummaryInformation(self.text_list)
return EnergyPerformanceReportSummaryInformation(self.text_list)
elif self.type == ReportType.SMART_EPC_SITE_NOTE:
return SmartEpcSiteNote(self.text_list)

View file

@ -8,6 +8,7 @@ class ReportType(Enum):
OVERWRITING_U_VALUE_DECLARATION_FORM = "overwriting_u_value_declaration_form"
ECO_CONDITION_REPORT = "osmosis_condition_pas_2035_report"
WARM_HOMES_CONDITION_REPORT = "warm_homes_condition_pas_2035_report"
SMART_EPC_SITE_NOTE = "smart_epc_site_note"
ENERGY_PERFORMANCE_REPORT_WITH_DATA = "energy_performance_report_with_data"
ENERGY_PERFORMANCE_REPORT_SUMMARY_INFORMATION = "energy_performance_report_summary_information"
LIG_XML = "lodgement_xml_needed_for_lodgement_to_like_trademark"

View file

@ -19,6 +19,14 @@ from etl.transform.conditionReportTypes import (
)
from datetime import datetime
from pprint import pprint
from etl.transform.smartEpcSiteNoteTypes import (
SmartEpcHeader, SmartEpcGeneral, SmartEpcBuildingConstruction,
SmartEpcFloorMeasurement, SmartEpcRoofSpace, SmartEpcWindow,
SmartEpcMainHeating, SmartEpcSecondaryHeating, SmartEpcWaterHeating,
SmartEpcVentilation, SmartEpcRenewables, SmartEpcRoomCount,
SmartEpcMisc, SmartEpcCustomerResponse, SmartEpcAddendum,
SmartEpcSiteNoteModel,
)
class SiteNotesExtractor():
def __init__(self, data_list):
@ -1653,4 +1661,305 @@ class EnergyPerformanceReportSummaryInformation(SiteNotesExtractor):
self.setup()
def setup(self):
pass
pass
class SmartEpcSiteNote(SiteNotesExtractor):
def __init__(self, data_list):
super().__init__(data_list)
self.type = ReportType.SMART_EPC_SITE_NOTE
self.master_obj = self.setup()
def setup(self) -> SmartEpcSiteNoteModel:
return SmartEpcSiteNoteModel(
header=self.get_header(),
general=self.get_general(),
building_construction=self.get_building_construction(),
roof_space=self.get_roof_space(),
windows=self.get_windows(),
main_heating=self.get_main_heating(),
secondary_heating=self.get_secondary_heating(),
water_heating=self.get_water_heating(),
ventilation=self.get_ventilation(),
renewables=self.get_renewables(),
room_count=self.get_room_count(),
misc=self.get_misc(),
customer_response=self.get_customer_response(),
addendum=self.get_addendum(),
)
def _safe_get(self, key):
try:
return self.get_next_value(self.raw_data, key)
except (ValueError, IndexError):
return None
def get_header(self) -> SmartEpcHeader:
# Address is multi-line; greedily concat lines until next known header key
address_parts = []
try:
addr_idx = self.raw_data.index("Property Address:") + 1
end_keys = {"RdSAP Assessment", "General", "Page 1", "Page 2"}
i = addr_idx
while i < len(self.raw_data) and self.raw_data[i] not in end_keys:
val = self.raw_data[i].strip()
if val:
address_parts.append(val)
i += 1
except (ValueError, IndexError):
pass
address = ", ".join(address_parts) if address_parts else None
return SmartEpcHeader(
inspection_surveyor=self._safe_get("Inspection Surveyor:"),
email_address=self._safe_get("E-Mail Address:"),
report_reference=self._safe_get("Report Reference:"),
created_on=self._safe_get("Created On:"),
date_of_inspection=self._safe_get("Date of Inspection:"),
property_address=address,
)
def get_general(self) -> SmartEpcGeneral:
return SmartEpcGeneral(
epc_checked=self._safe_get("Confirm you have checked for the existence of an"),
epc_exists=self._safe_get("Does an EPC exist at the point of carrying out this"),
inspection_date=self._safe_get("Inspection Date:"),
transaction_type=self._safe_get("Transaction Type:"),
tenure=self._safe_get("Tenure:"),
property_type=self._safe_get("Type of Property:"),
detachment_type=self._safe_get("Detachment Type:"),
number_of_storeys=self._safe_get("Number of storeys:"),
terrain_type=self._safe_get("Terrain Type:"),
number_of_extensions=self._safe_get("Number of Extensions:"),
electricity_smart_meter=self._safe_get("Is an electricity smart meter present?"),
electric_meter_type=self._safe_get("Electric meter type:"),
dwelling_export_capable=self._safe_get("Is the dwelling export-capable?"),
mains_gas_available=self._safe_get("Is mains gas available?"),
gas_smart_meter=self._safe_get("Is there a gas smart meter?"),
gas_meter_accessible=self._safe_get("Is the gas meter accessible?"),
)
def get_building_construction(self) -> SmartEpcBuildingConstruction:
measurements = self._get_floor_measurements()
return SmartEpcBuildingConstruction(
age_range=self._safe_get("Age Range:"),
age_indicators=self._safe_get("Record indicators of property age:"),
walls_construction_type=self._safe_get("Walls - Construction Type:"),
cavity_construction_indicators=self._safe_get("Record external indicators of Cavity Construction:"),
walls_insulation_type=self._safe_get("Walls - Insulation Type:"),
filled_cavity_indicators=self._safe_get("Record indicators of filled cavity:"),
thermal_conductivity=self._safe_get("Thermal conductivity of wall insulation:"),
wall_u_value_known=self._safe_get("Wall U-Value known?"),
wall_thickness=self._safe_get("Wall thickness:"),
party_wall_construction_type=self._safe_get("Party wall construction type:"),
floor_type=self._safe_get("Floor type:"),
floor_construction=self._safe_get("Floor Construction:"),
floor_insulation_type=self._safe_get("Floor Insulation Type:"),
floor_u_value_known=self._safe_get("Floor U-Value known?"),
measurements=measurements,
)
def _get_floor_measurements(self):
measurements = []
try:
bm_idx = self.raw_data.index("Building Measurements")
# Skip the header row: Area (m2), Height (m), Heat Loss Perimeter (m), PWL (m)
# Then read rows: floor_name, area, height, heat_loss, pwl
# Rows start after "PWL (m)" token
try:
header_end = self.raw_data.index("PWL (m)", bm_idx) + 1
except ValueError:
return measurements
i = header_end
# Floor rows look like: "Floor 0", "41.33", "2.46", "18.63", "7.29"
# or section headers like "Main Building"
# Stop when we hit a known section header
stop_tokens = {"Roof Space", "Windows", "Heating & Hot Water", "Page 6", "Page 7"}
while i + 4 < len(self.raw_data):
name = self.raw_data[i].strip()
if not name or name in stop_tokens or not (name.startswith("Floor") or name.startswith("Main Building")):
if name.startswith("Floor"):
pass
else:
break
if name.startswith("Floor"):
measurements.append(SmartEpcFloorMeasurement(
floor_name=name,
area_m2=self.raw_data[i + 1].strip() or None,
height_m=self.raw_data[i + 2].strip() or None,
heat_loss_perimeter_m=self.raw_data[i + 3].strip() or None,
pwl_m=self.raw_data[i + 4].strip() or None,
))
i += 5
else:
i += 1
except (ValueError, IndexError):
pass
return measurements
def get_roof_space(self) -> SmartEpcRoofSpace:
return SmartEpcRoofSpace(
construction_type=self._safe_get("Roofs - Construction Type:"),
insulation_at=self._safe_get("Roofs - Insulation At:"),
u_value=self._safe_get("Roof U-Value:"),
insulation_thickness=self._safe_get("Roofs - Insulation Thickness:"),
cavity_wall_indicators_in_roof=self._safe_get("Record indicators of Cavity Wall Construction in roof"),
rooms_in_roof=self._safe_get("Are there rooms in the roof?"),
)
def get_windows(self):
windows = []
window_num = 1
while True:
label = f"Window {window_num}"
if label not in self.raw_data:
break
try:
start = self.raw_data.index(label)
# Find next "Window N" or a known section header to bound the block
next_label = f"Window {window_num + 1}"
stop_tokens = {"Heating & Hot Water", "Main Heating Systems", "Secondary Heating System"}
if next_label in self.raw_data:
end = self.raw_data.index(next_label)
else:
end = len(self.raw_data)
for tok in stop_tokens:
try:
end = min(end, self.raw_data.index(tok))
except ValueError:
pass
block = self.raw_data[start:end]
def blk_get(key):
try:
return block[block.index(key) + 1].strip() or None
except (ValueError, IndexError):
return None
windows.append(SmartEpcWindow(
window_number=window_num,
location=blk_get("Window location:"),
wall_type=blk_get("Window wall type:"),
glazing_type=blk_get("Glazing Type:"),
window_type=blk_get("Window type:"),
frame_type=blk_get("Window frame type:"),
glazing_gap=blk_get("What size is the glazing gap?"),
draught_proofed=blk_get("Is the window draught proofed?"),
permanent_shutters=blk_get("Are there permanent shutters present?"),
height_m=blk_get("Window height:"),
width_m=blk_get("Window width:"),
orientation=blk_get("Orientation:"),
))
except (ValueError, IndexError):
break
window_num += 1
return windows
def get_main_heating(self) -> SmartEpcMainHeating:
return SmartEpcMainHeating(
selection_method=self._safe_get("How would you like to select the Heating System?"),
system_type=self._safe_get("System type:"),
product_id=self._safe_get("Product Id"),
manufacturer=self._safe_get("Manufacturer"),
model=self._safe_get("Model"),
orig_manuf=self._safe_get("Orig Manuf"),
fuel=self._safe_get("Fuel"),
seasonal_efficiency=self._safe_get("S. Efficiency"),
heating_type=self._safe_get("Type"),
condensing=self._safe_get("Condensing"),
year=self._safe_get("Year"),
mount=self._safe_get("Mount"),
open_flue=self._safe_get("Open Flue"),
fan_assist=self._safe_get("Fan Assist"),
status=self._safe_get("Status"),
pump_age=self._safe_get("Central heating pump age:"),
controls=self._safe_get("Controls:"),
fghrs=self._safe_get("Does the boiler have a Flue Gas Heat Recover"),
weather_compensator=self._safe_get("Is there a weather compensator?"),
emitter=self._safe_get("Emitter:"),
emitter_temperature=self._safe_get("Emitter Temperature:"),
)
def get_secondary_heating(self) -> SmartEpcSecondaryHeating:
return SmartEpcSecondaryHeating(
secondary_fuel=self._safe_get("Secondary Fuel"),
secondary_system=self._safe_get("Secondary System:"),
)
def get_water_heating(self) -> SmartEpcWaterHeating:
return SmartEpcWaterHeating(
water_heating_type=self._safe_get("Water Heating Type:"),
water_heating_system=self._safe_get("Water Heating System:"),
cylinder_size=self._safe_get("Cylinder Size:"),
)
def get_ventilation(self) -> SmartEpcVentilation:
return SmartEpcVentilation(
ventilation_type=self._safe_get("Ventilation type:"),
fixed_air_conditioning=self._safe_get("Has fixed air conditioning?"),
in_pcdf_database=self._safe_get("Is the ventilation in the PCDF database?"),
open_flues=self._safe_get("Number of open flues:"),
closed_flues=self._safe_get("Number of closed flues:"),
boiler_flues=self._safe_get("Number of boiler flues:"),
other_flues=self._safe_get("Number of other flues:"),
extract_fans=self._safe_get("Number of extract fans:"),
passive_vents=self._safe_get("Number of passive vents:"),
flueless_gas_fires=self._safe_get("Number of flueless gas fires:"),
pressure_test=self._safe_get("Pressure test:"),
draught_lobby=self._safe_get("Is there a draught lobby?"),
)
def get_renewables(self) -> SmartEpcRenewables:
return SmartEpcRenewables(
wind_turbines=self._safe_get("Has wind turbines?"),
solar_hot_water=self._safe_get("Has solar hot water?"),
pv_array=self._safe_get("Has photovoltaic array?"),
pv_batteries=self._safe_get("Number of PV batteries:"),
hydro=self._safe_get("Is the dwelling connected to Hydro?"),
)
def get_room_count(self) -> SmartEpcRoomCount:
return SmartEpcRoomCount(
habitable_rooms=self._safe_get("Number of habitable rooms?"),
unheated_rooms=self._safe_get("Are any of these rooms unheated?"),
external_doors=self._safe_get("Number of external doors?"),
insulated_external_doors=self._safe_get("Number of insulated external doors?"),
draughtproofed_external_doors=self._safe_get("Number of draughtproofed external doors?"),
open_chimneys=self._safe_get("Number of open chimneys?"),
blocked_chimneys=self._safe_get("Number of blocked chimneys?"),
fixed_incandescent_bulbs=self._safe_get("Number of fixed incandescent bulbs:"),
led_cfl_known=self._safe_get("Is the exact number of LED and CFL bulbs known?"),
led_bulbs=self._safe_get("Number of fixed LED bulbs:"),
cfl_bulbs=self._safe_get("Number of fixed CFL bulbs:"),
)
def get_misc(self) -> SmartEpcMisc:
shower_types = []
for i, token in enumerate(self.raw_data):
if token == "Shower outlet type:" and i + 1 < len(self.raw_data):
val = self.raw_data[i + 1].strip()
if val:
shower_types.append(val)
return SmartEpcMisc(
waste_water_heat_recovery=self._safe_get("Are there any waste water heat recovery systems?"),
number_of_baths=self._safe_get("Number of baths:"),
special_features=self._safe_get("How many special features are there at the"),
shower_outlet_types=shower_types,
conservatory=self._safe_get("Is there conservatory?"),
)
def get_customer_response(self) -> SmartEpcCustomerResponse:
return SmartEpcCustomerResponse(
customer_present=self._safe_get("Customer present?"),
willing_to_answer_survey=self._safe_get("Customer willing to answer satisfaction survey?"),
)
def get_addendum(self) -> SmartEpcAddendum:
return SmartEpcAddendum(
addendum=self._safe_get("Addendum"),
related_party_disclosure=self._safe_get("Related party disclosure"),
hard_to_treat_access_issues=self._safe_get("Hard to treat cavity walls: Property has access"),
hard_to_treat_high_exposure=self._safe_get("Hard to treat cavity walls: Property has high"),
hard_to_treat_narrow_cavities=self._safe_get("Hard to treat cavity walls: Property has narrow"),
)

View file

@ -13,12 +13,13 @@ from typing import Callable
from etl.scraper.scraper import SharePointInstaller, SharePointScraper
osmosis = SharePointScraper(SharePointInstaller.PRIVATE_PAY)
osmosis = SharePointScraper(SharePointInstaller.SOCIAL_HOUSING_WAVE_3)
osmosis
parent_folder = "/Projects/Southern Housing/SH-SURV-26-001/Assessments"
excel_path = "/workspaces/survey-extractor/example_data/SH-SURV-26-001-monday.com.xlsx"
asset_list = pd.read_excel(excel_path, sheet_name="SH-SURV-26-001-monday.com")
parent_folder = "/Osmosis-ACD Projects/Sero-Clarion Housing/Sero Project Documents/Property Folders"
osmosis.get_folders_in_path(parent_folder)
excel_path = "/workspaces/survey-extractor/example_data/Solar Programme Phase 1 DRAFT.xlsx"
asset_list = pd.read_excel(excel_path, sheet_name="Sheet1")
# --------------------------------------------------
# Retry Decorator (3 attempts + exponential backoff)
@ -50,7 +51,7 @@ def retry(max_attempts: int = 3, base_delay: float = 1.0):
@retry(max_attempts=5)
def process_asset(address: pd.Series):
folder_name = f"{address['Name']} {address['Postcode']}"
folder_name = f"{address['Real Full Address']}"
print(f"\n📁 Processing {folder_name}")
web_url = osmosis.create_dir(folder_name, parent_folder)
@ -73,7 +74,7 @@ def process_asset(address: pd.Series):
osmosis.create_dir("4. Post EPC", base_path)
osmosis.create_dir(
f"{address['Name']} - POST EPC Photos",
f"{folder_name} - POST EPC Photos",
f"{base_path}/4. Post EPC"
)
@ -84,8 +85,7 @@ def process_asset(address: pd.Series):
osmosis.create_dir("3. Additional Documents", trust_path)
return {
"Name": address["Name"],
"Postcode": address["Postcode"],
"Name": folder_name,
"Sharepoint": web_url,
}
@ -93,7 +93,7 @@ def process_asset(address: pd.Series):
# --------------------------------------------------
# Parallel Execution
# --------------------------------------------------
# asset_list = asset_list.head(1)
results = []
failed_rows = []

View file

@ -31,6 +31,7 @@ class SharePointInstaller(Enum):
# NEW_JJC = os.getenv("NEW JJC", "10d96eba-b4f9-4e30-804f-05a8b60507b0")
OSMOSIS_ACD = os.getenv("OSMOSIS_ACD_SHAREPOINT_ID", "931c4361-681b-44e4-86f6-1a54aba3ae8a")
PRIVATE_PAY = os.getenv("PRIVATE_PAY_SHAREPOINT_ID", "16812ae4-5898-4fec-a6f6-382d1435586f")
SOCIAL_HOUSING_WAVE_3 = "c60c58fe-94c5-4647-9b5f-8202f1309f0f"
class SharePointScraper():
"""

View file

@ -47,6 +47,7 @@ class surveyedDataProcessor():
self.full_sap_xml = None
self.lig_sap_xml = None
self.rd_sap_xml = None
self.smart_epc_site_note = None
self.identify_files()
@ -71,6 +72,8 @@ class surveyedDataProcessor():
elif pdf.type == ReportType.ENERGY_PERFORMANCE_REPORT_SUMMARY_INFORMATION:
self.epr_summary_information = pdf.get_reader()
self.epr_summary_information_file_path = file
elif pdf.type == ReportType.SMART_EPC_SITE_NOTE:
self.smart_epc_site_note = pdf.get_reader()
elif file.lower().endswith('.xml'):
xml = xmlReader(file)

View file

@ -0,0 +1,194 @@
from sqlmodel import SQLModel
from typing import Optional, List
class BaseModel(SQLModel):
pass
class SmartEpcHeader(BaseModel):
inspection_surveyor: Optional[str] = None
email_address: Optional[str] = None
report_reference: Optional[str] = None
created_on: Optional[str] = None
date_of_inspection: Optional[str] = None
property_address: Optional[str] = None
class SmartEpcGeneral(BaseModel):
epc_checked: Optional[str] = None
epc_exists: Optional[str] = None
inspection_date: Optional[str] = None
transaction_type: Optional[str] = None
tenure: Optional[str] = None
property_type: Optional[str] = None
detachment_type: Optional[str] = None
number_of_storeys: Optional[str] = None
terrain_type: Optional[str] = None
number_of_extensions: Optional[str] = None
electricity_smart_meter: Optional[str] = None
electric_meter_type: Optional[str] = None
dwelling_export_capable: Optional[str] = None
mains_gas_available: Optional[str] = None
gas_smart_meter: Optional[str] = None
gas_meter_accessible: Optional[str] = None
class SmartEpcFloorMeasurement(BaseModel):
floor_name: Optional[str] = None
area_m2: Optional[str] = None
height_m: Optional[str] = None
heat_loss_perimeter_m: Optional[str] = None
pwl_m: Optional[str] = None
class SmartEpcBuildingConstruction(BaseModel):
age_range: Optional[str] = None
age_indicators: Optional[str] = None
walls_construction_type: Optional[str] = None
cavity_construction_indicators: Optional[str] = None
walls_insulation_type: Optional[str] = None
filled_cavity_indicators: Optional[str] = None
thermal_conductivity: Optional[str] = None
wall_u_value_known: Optional[str] = None
wall_thickness: Optional[str] = None
party_wall_construction_type: Optional[str] = None
floor_type: Optional[str] = None
floor_construction: Optional[str] = None
floor_insulation_type: Optional[str] = None
floor_u_value_known: Optional[str] = None
measurements: List[SmartEpcFloorMeasurement] = []
class SmartEpcRoofSpace(BaseModel):
construction_type: Optional[str] = None
insulation_at: Optional[str] = None
u_value: Optional[str] = None
insulation_thickness: Optional[str] = None
cavity_wall_indicators_in_roof: Optional[str] = None
rooms_in_roof: Optional[str] = None
class SmartEpcWindow(BaseModel):
window_number: Optional[int] = None
location: Optional[str] = None
wall_type: Optional[str] = None
glazing_type: Optional[str] = None
window_type: Optional[str] = None
frame_type: Optional[str] = None
glazing_gap: Optional[str] = None
draught_proofed: Optional[str] = None
permanent_shutters: Optional[str] = None
height_m: Optional[str] = None
width_m: Optional[str] = None
orientation: Optional[str] = None
class SmartEpcMainHeating(BaseModel):
selection_method: Optional[str] = None
system_type: Optional[str] = None
product_id: Optional[str] = None
manufacturer: Optional[str] = None
model: Optional[str] = None
orig_manuf: Optional[str] = None
fuel: Optional[str] = None
seasonal_efficiency: Optional[str] = None
heating_type: Optional[str] = None
condensing: Optional[str] = None
year: Optional[str] = None
mount: Optional[str] = None
open_flue: Optional[str] = None
fan_assist: Optional[str] = None
status: Optional[str] = None
pump_age: Optional[str] = None
controls: Optional[str] = None
fghrs: Optional[str] = None
weather_compensator: Optional[str] = None
emitter: Optional[str] = None
emitter_temperature: Optional[str] = None
class SmartEpcSecondaryHeating(BaseModel):
secondary_fuel: Optional[str] = None
secondary_system: Optional[str] = None
class SmartEpcWaterHeating(BaseModel):
water_heating_type: Optional[str] = None
water_heating_system: Optional[str] = None
cylinder_size: Optional[str] = None
class SmartEpcVentilation(BaseModel):
ventilation_type: Optional[str] = None
fixed_air_conditioning: Optional[str] = None
in_pcdf_database: Optional[str] = None
open_flues: Optional[str] = None
closed_flues: Optional[str] = None
boiler_flues: Optional[str] = None
other_flues: Optional[str] = None
extract_fans: Optional[str] = None
passive_vents: Optional[str] = None
flueless_gas_fires: Optional[str] = None
pressure_test: Optional[str] = None
draught_lobby: Optional[str] = None
class SmartEpcRenewables(BaseModel):
wind_turbines: Optional[str] = None
solar_hot_water: Optional[str] = None
pv_array: Optional[str] = None
pv_batteries: Optional[str] = None
hydro: Optional[str] = None
class SmartEpcRoomCount(BaseModel):
habitable_rooms: Optional[str] = None
unheated_rooms: Optional[str] = None
external_doors: Optional[str] = None
insulated_external_doors: Optional[str] = None
draughtproofed_external_doors: Optional[str] = None
open_chimneys: Optional[str] = None
blocked_chimneys: Optional[str] = None
fixed_incandescent_bulbs: Optional[str] = None
led_cfl_known: Optional[str] = None
led_bulbs: Optional[str] = None
cfl_bulbs: Optional[str] = None
class SmartEpcMisc(BaseModel):
waste_water_heat_recovery: Optional[str] = None
number_of_baths: Optional[str] = None
special_features: Optional[str] = None
shower_outlet_types: List[str] = []
conservatory: Optional[str] = None
class SmartEpcCustomerResponse(BaseModel):
customer_present: Optional[str] = None
willing_to_answer_survey: Optional[str] = None
class SmartEpcAddendum(BaseModel):
addendum: Optional[str] = None
related_party_disclosure: Optional[str] = None
hard_to_treat_access_issues: Optional[str] = None
hard_to_treat_high_exposure: Optional[str] = None
hard_to_treat_narrow_cavities: Optional[str] = None
class SmartEpcSiteNoteModel(BaseModel):
header: SmartEpcHeader
general: SmartEpcGeneral
building_construction: SmartEpcBuildingConstruction
roof_space: SmartEpcRoofSpace
windows: List[SmartEpcWindow] = []
main_heating: SmartEpcMainHeating
secondary_heating: SmartEpcSecondaryHeating
water_heating: SmartEpcWaterHeating
ventilation: SmartEpcVentilation
renewables: SmartEpcRenewables
room_count: SmartEpcRoomCount
misc: SmartEpcMisc
customer_response: SmartEpcCustomerResponse
addendum: SmartEpcAddendum

Binary file not shown.

Binary file not shown.

49
smart_epc_explorer.py Normal file
View file

@ -0,0 +1,49 @@
import sys
import json
from pprint import pprint
sys.path.insert(0, '/workspaces/survey-extractor')
from etl.fileReader.pdfReaderToText import pdfReaderToText
# Set your PDF path here
SOURCE_PDF_PATH = "/workspaces/survey-extractor/RdSAP_SiteNote_95053636_V3_Assessment (1).pdf"
reader = pdfReaderToText(SOURCE_PDF_PATH)
print(f"Detected type: {reader.type}")
print(f"Total text tokens: {len(reader.text_list)}\n")
print("--- Raw Token List ---")
for i, line in enumerate(reader.text_list):
print(f"[{i:04d}] {repr(line)}")
print("\n\n--- Extracted Structured Data ---")
extractor = reader.get_reader()
obj = extractor.master_obj
sections = [
("Header", obj.header),
("General", obj.general),
("Building Construction", obj.building_construction),
("Roof Space", obj.roof_space),
("Main Heating", obj.main_heating),
("Secondary Heating", obj.secondary_heating),
("Water Heating", obj.water_heating),
("Ventilation", obj.ventilation),
("Renewables", obj.renewables),
("Room Count", obj.room_count),
("Misc", obj.misc),
("Customer Response", obj.customer_response),
("Addendum", obj.addendum),
]
for title, section in sections:
print(f"\n=== {title} ===")
pprint(section.model_dump())
print(f"\n=== Windows ({len(obj.windows)} found) ===")
for w in obj.windows:
pprint(w.model_dump())
print("\n\n--- Full JSON ---")
print(json.dumps(obj.model_dump(), indent=2, default=str))