Merge pull request #1040 from Hestia-Homes/feature/trigger-sitenotes-to-db

Save site notes to DB as part of pashub to ara workflow
This commit is contained in:
Daniel Roth 2026-04-30 14:32:45 +01:00 committed by GitHub
commit 72ccb7c19d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 1625 additions and 397 deletions

View file

@ -20,6 +20,7 @@ class EpcPropertyModel(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
property_id: Optional[int] = Field(default=None)
portfolio_id: Optional[int] = Field(default=None)
uploaded_file_id: Optional[int] = Field(default=None)
# Identity / admin
uprn: Optional[int] = Field(default=None)

View file

@ -0,0 +1,76 @@
from typing import Optional
from sqlmodel import Session
from backend.app.db.models.epc_property import (
EpcBuildingPartModel,
EpcEnergyElementModel,
EpcFlatDetailsModel,
EpcFloorDimensionModel,
EpcMainHeatingDetailModel,
EpcPropertyEnergyPerformanceModel,
EpcPropertyModel,
EpcWindowModel,
)
from datatypes.epc.domain.epc_property_data import EpcPropertyData
def save_epc_property_data(
session: Session,
data: EpcPropertyData,
uploaded_file_id: Optional[int] = None,
property_id: Optional[int] = None,
portfolio_id: Optional[int] = None,
) -> EpcPropertyModel:
epc_prop = EpcPropertyModel.from_epc_property_data(
data, property_id=property_id, portfolio_id=portfolio_id
)
epc_prop.uploaded_file_id = uploaded_file_id
session.add(epc_prop)
session.flush()
assert epc_prop.id is not None
epc_property_id: int = epc_prop.id
session.add(
EpcPropertyEnergyPerformanceModel.from_epc_property_data(
data, epc_property_id=epc_property_id
)
)
for detail in data.sap_heating.main_heating_details:
session.add(EpcMainHeatingDetailModel.from_domain(detail, epc_property_id))
for part in data.sap_building_parts:
bp = EpcBuildingPartModel.from_domain(part, epc_property_id)
session.add(bp)
session.flush()
assert bp.id is not None
for dim in part.sap_floor_dimensions:
session.add(EpcFloorDimensionModel.from_domain(dim, bp.id))
for window in data.sap_windows:
session.add(EpcWindowModel.from_domain(window, epc_property_id))
for el in data.roofs:
session.add(EpcEnergyElementModel.from_domain(el, "roof", epc_property_id))
for el in data.walls:
session.add(EpcEnergyElementModel.from_domain(el, "wall", epc_property_id))
for el in data.floors:
session.add(EpcEnergyElementModel.from_domain(el, "floor", epc_property_id))
for el in data.main_heating:
session.add(EpcEnergyElementModel.from_domain(el, "main_heating", epc_property_id))
for el, etype in [
(data.window, "window"),
(data.lighting, "lighting"),
(data.hot_water, "hot_water"),
(data.secondary_heating, "secondary_heating"),
(data.main_heating_controls, "main_heating_controls"),
]:
if el is not None:
session.add(EpcEnergyElementModel.from_domain(el, etype, epc_property_id))
if data.sap_flat_details is not None:
session.add(EpcFlatDetailsModel.from_domain(data.sap_flat_details, epc_property_id))
return epc_prop

View file

@ -0,0 +1,28 @@
from typing import List
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from datatypes.epc.domain.mapper import EpcPropertyDataMapper
from backend.documents_parser.elmhurst_extractor import ElmhurstSiteNotesExtractor
from backend.documents_parser.extractor import PasHubRdSapSiteNotesExtractor
from backend.documents_parser.pdf import pdf_to_pages, pdf_to_text_list
def parse_site_notes_pdf(file_path: str) -> EpcPropertyData:
with open(file_path, "rb") as f:
pdf_bytes = f.read()
pages = pdf_to_pages(pdf_bytes)
if "Elmhurst Energy Systems" in "\n".join(pages):
return _parse_elmhurst(pages)
return _parse_pashub(pdf_bytes)
def _parse_elmhurst(pages: List[str]) -> EpcPropertyData:
site_notes = ElmhurstSiteNotesExtractor(pages).extract()
return EpcPropertyDataMapper.from_elmhurst_site_notes(site_notes)
def _parse_pashub(pdf_bytes: bytes) -> EpcPropertyData:
tokens = pdf_to_text_list(pdf_bytes)
site_notes = PasHubRdSapSiteNotesExtractor(tokens).extract()
return EpcPropertyDataMapper.from_site_notes(site_notes)

View file

@ -0,0 +1,257 @@
import os
from typing import Dict
from playwright.sync_api import Browser, BrowserContext, Locator, Page, sync_playwright
from backend.app.db.connection import db_session
from backend.app.db.functions.uploaded_files_functions import (
get_uploaded_file_by_listing_type_and_source,
)
from backend.app.db.models.uploaded_file import FileSourceEnum, FileTypeEnum
from backend.documents_parser.db_writer import save_epc_property_data
from backend.documents_parser.parser import parse_site_notes_pdf
from backend.ecmk_fetcher.address_list import (
PropertyRow,
extract_addresses_from_spreadsheet,
)
from backend.ecmk_fetcher.browser import (
attach_debug_listeners,
download_with_retry,
go_to_assessment_details,
go_to_assessments,
go_to_next_page,
login,
)
from backend.ecmk_fetcher.excel_writer import write_row
from backend.ecmk_fetcher.reports import (
REPORT_TYPES,
FileDownloadButtonType,
build_property_id,
map_report_type_to_db_file_type,
)
from backend.ecmk_fetcher.upload import (
upload_excel_to_sharepoint,
upload_file_to_s3_and_record,
upload_file_to_sharepoint,
)
from backend.ecmk_fetcher.xml_processor import flatten_sap_property, parse_rdsap
from utils.logger import setup_logger
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
logger = setup_logger()
DIMENSIONS_FILENAME: str = "Dimensions.xlsx"
class EcmkService:
def __init__(
self,
sharepoint_client: DomnaSharepointClient,
s3_bucket: str,
property_list_filepath: str,
sharepoint_base_path: str,
sharepoint_excel_path: str,
local_dimensions_path: str,
) -> None:
self._sharepoint_client = sharepoint_client
self._s3_bucket = s3_bucket
self._sharepoint_base_path = sharepoint_base_path
self._sharepoint_excel_path = sharepoint_excel_path
self._local_dimensions_path = local_dimensions_path
self._property_map: Dict[str, PropertyRow] = extract_addresses_from_spreadsheet(
property_list_filepath
)
def run(self) -> None:
self._sharepoint_client.download_file(
sharepoint_path=f"{self._sharepoint_excel_path}/{DIMENSIONS_FILENAME}",
local_path=self._local_dimensions_path,
)
with sync_playwright() as p:
browser: Browser = p.chromium.launch(headless=True)
context: BrowserContext = browser.new_context()
page: Page = context.new_page()
try:
self._run_browser_session(page)
finally:
context.close()
browser.close()
def _run_browser_session(self, page: Page) -> None:
username: str = "" # TODO: get from github secrets
password: str = ""
attach_debug_listeners(page)
login(page, username, password)
go_to_assessments(page)
while True:
rows: Locator = page.locator("#assessmentDatatable tbody tr")
row_count: int = rows.count()
for i in range(row_count):
row: Locator = rows.nth(i)
try:
cells: Locator = row.locator("td")
first_name: str = cells.nth(1).inner_text().strip()
last_name: str = cells.nth(2).inner_text().strip()
address: str = cells.nth(5).inner_text().strip()
postcode: str = cells.nth(7).inner_text().strip()
status: str = cells.nth(9).inner_text().strip()
if first_name == "Oliver" and last_name == "Stephens":
continue
if status != "Submitted (not Lodged)":
continue
property_id: str = build_property_id(address, postcode)
property_row: PropertyRow | None = self._property_map.get(
property_id
)
if not property_row:
continue
logger.info(f"Match found for property {address}")
sharepoint_address: str = property_row.address
go_to_assessment_details(page, row)
for report_type in REPORT_TYPES:
hubspot_listing_id: str = property_row.listing_id
try:
db_file_type: FileTypeEnum = (
map_report_type_to_db_file_type(report_type)
)
except ValueError:
logger.error(
f"Unknown report type {report_type}, skipping file"
)
continue
if get_uploaded_file_by_listing_type_and_source(
hubspot_listing_id=int(hubspot_listing_id),
file_type=db_file_type,
file_source=FileSourceEnum.ECMK,
):
logger.debug("File already uploaded to s3, skipping")
continue
file_path: str | None = download_with_retry(page, report_type)
if not file_path:
continue
logger.info(
f"Successfully downloaded file {os.path.basename(file_path)} from ECMK"
)
try:
self._process_file(
file_path=file_path,
report_type=report_type,
db_file_type=db_file_type,
sharepoint_address=sharepoint_address,
hubspot_listing_id=hubspot_listing_id,
)
except Exception:
raise
finally:
if os.path.exists(file_path):
os.remove(file_path)
page.go_back()
page.wait_for_selector(
"#assessmentDatatable tbody tr", timeout=15000
)
except Exception as e:
raise Exception(f"Row processing failed: {str(e)}") from e
if not go_to_next_page(page):
break
def _process_file(
self,
file_path: str,
report_type: int,
db_file_type: FileTypeEnum,
sharepoint_address: str,
hubspot_listing_id: str,
) -> None:
if report_type == FileDownloadButtonType.RAW_XML.value:
self._process_xml_file(
file_path=file_path,
db_file_type=db_file_type,
hubspot_listing_id=hubspot_listing_id,
)
else:
self._process_pdf_file(
file_path=file_path,
file_type=db_file_type,
sharepoint_address=sharepoint_address,
hubspot_listing_id=hubspot_listing_id,
)
def _process_xml_file(
self,
file_path: str,
db_file_type: FileTypeEnum,
hubspot_listing_id: str,
) -> None:
with open(file_path, "r", encoding="utf-8") as f:
xml_string: str = f.read()
details = parse_rdsap(xml_string)
row_data = flatten_sap_property(details)
write_row(self._local_dimensions_path, row_data)
upload_excel_to_sharepoint(
client=self._sharepoint_client,
file_path=self._local_dimensions_path,
sharepoint_path=self._sharepoint_excel_path,
)
upload_file_to_s3_and_record(
bucket=self._s3_bucket,
file_path=file_path,
hubspot_listing_id=hubspot_listing_id,
file_type=db_file_type,
)
def _process_pdf_file(
self,
file_path: str,
file_type: FileTypeEnum,
sharepoint_address: str,
hubspot_listing_id: str,
) -> None:
upload_file_to_sharepoint(
client=self._sharepoint_client,
file_path=file_path,
base_path=self._sharepoint_base_path,
subpath=sharepoint_address,
)
uploaded_file_id: int = upload_file_to_s3_and_record(
bucket=self._s3_bucket,
file_path=file_path,
hubspot_listing_id=hubspot_listing_id,
file_type=file_type,
)
if file_type == FileTypeEnum.ECMK_RD_SAP_SITE_NOTE:
try:
epc_data = parse_site_notes_pdf(file_path)
with db_session() as session:
save_epc_property_data(
session=session,
data=epc_data,
uploaded_file_id=uploaded_file_id,
)
except Exception:
logger.warning(
f"EPC extraction failed for {os.path.basename(file_path)} — file record retained"
)

View file

@ -1,14 +1,32 @@
import os
from typing import Any, Mapping
from backend.ecmk_fetcher.processor import run_job
from backend.ecmk_fetcher.ecmk_service import EcmkService
from utils.logger import setup_logger
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
from utils.sharepoint.domna_sites import DomnaSites
logger = setup_logger()
_PROPERTY_LIST_FILE: str = (
"hubspot-crm-exports-southern-ra-lite-programme-3103-2026-03-31-2.xlsx"
)
_BASE_DIR: str = os.path.dirname(os.path.dirname(__file__))
def handler(event: Mapping[str, Any], context: Any) -> None:
logger.info("Entered handler")
run_job()
service = EcmkService(
sharepoint_client=DomnaSharepointClient(
sharepoint_location=DomnaSites.PRIVATE_PAY
),
s3_bucket="retrofit-energy-assessments-dev",
property_list_filepath=os.path.join(_BASE_DIR, _PROPERTY_LIST_FILE),
sharepoint_base_path="/Projects/Southern Housing/SH-SURV-26-001/Assessments",
sharepoint_excel_path="/Projects/Southern Housing/SH-SURV-26-001/Modelling",
local_dimensions_path=os.path.join(_BASE_DIR, "Dimensions.xlsx"),
)
service.run()
if __name__ == "__main__":

View file

@ -1,209 +0,0 @@
import os
from typing import Dict
from playwright.sync_api import (
sync_playwright,
Locator,
Page,
Browser,
BrowserContext,
)
from backend.app.db.functions.uploaded_files_functions import (
get_uploaded_file_by_listing_type_and_source,
)
from backend.app.db.models.uploaded_file import FileSourceEnum, FileTypeEnum
from backend.ecmk_fetcher.address_list import (
PropertyRow,
extract_addresses_from_spreadsheet,
)
from backend.ecmk_fetcher.browser import (
attach_debug_listeners,
download_with_retry,
go_to_assessment_details,
go_to_assessments,
go_to_next_page,
login,
)
from backend.ecmk_fetcher.reports import (
REPORT_TYPES,
FileDownloadButtonType,
build_property_id,
map_report_type_to_db_file_type,
)
from backend.ecmk_fetcher.excel_writer import write_row
from backend.ecmk_fetcher.upload import (
upload_excel_to_sharepoint,
upload_file_to_s3_and_update_db,
upload_file_to_sharepoint,
)
from backend.ecmk_fetcher.xml_processor import flatten_sap_property, parse_rdsap
from utils.logger import setup_logger
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
from utils.sharepoint.domna_sites import DomnaSites
logger = setup_logger()
def run_job() -> None:
username: str = "" # TODO: get from github secrets
password: str = ""
property_list_file: str = (
"hubspot-crm-exports-southern-ra-lite-programme-3103-2026-03-31-2.xlsx"
)
BASE_DIR: str = os.path.dirname(__file__)
filepath: str = os.path.join(BASE_DIR, property_list_file)
property_map: Dict[str, PropertyRow] = extract_addresses_from_spreadsheet(filepath)
sharepoint_client: DomnaSharepointClient = DomnaSharepointClient(
sharepoint_location=DomnaSites.PRIVATE_PAY
)
sharepoint_base_path: str = "/Projects/Southern Housing/SH-SURV-26-001/Assessments"
sharepoint_excel_path: str = "/Projects/Southern Housing/SH-SURV-26-001/Modelling"
DIMENSIONS_FILENAME: str = "Dimensions.xlsx"
local_dimensions_path: str = os.path.join(BASE_DIR, DIMENSIONS_FILENAME)
sharepoint_client.download_file(
sharepoint_path=f"{sharepoint_excel_path}/{DIMENSIONS_FILENAME}",
local_path=local_dimensions_path,
)
s3_bucket: str = "retrofit-energy-assessments-dev"
with sync_playwright() as p:
browser: Browser = p.chromium.launch(headless=True)
context: BrowserContext = browser.new_context()
page: Page = context.new_page()
attach_debug_listeners(page)
try:
login(page, username, password)
go_to_assessments(page)
while True:
rows: Locator = page.locator("#assessmentDatatable tbody tr")
row_count: int = rows.count()
for i in range(row_count):
row: Locator = rows.nth(i)
try:
cells: Locator = row.locator("td")
first_name: str = cells.nth(1).inner_text().strip()
last_name: str = cells.nth(2).inner_text().strip()
address: str = cells.nth(5).inner_text().strip()
postcode: str = cells.nth(7).inner_text().strip()
status: str = cells.nth(9).inner_text().strip()
if first_name == "Oliver" and last_name == "Stephens":
continue
if status != "Submitted (not Lodged)":
continue
property_id: str = build_property_id(address, postcode)
property_row: PropertyRow | None = property_map.get(property_id)
if not property_row:
continue
logger.info(f"Match found for property {address}")
sharepoint_address: str = property_row.address
go_to_assessment_details(page, row)
for report_type in REPORT_TYPES:
hubspot_listing_id: str = property_row.listing_id
try:
db_file_type: FileTypeEnum = (
map_report_type_to_db_file_type(report_type)
)
except ValueError:
logger.error(
f"Unknown report type {report_type}, skipping file"
)
continue
if get_uploaded_file_by_listing_type_and_source(
hubspot_listing_id=int(hubspot_listing_id),
file_type=db_file_type,
file_source=FileSourceEnum.ECMK,
):
logger.debug("File already uploaded to s3, skipping")
continue
file_path: str | None = download_with_retry(
page, report_type
)
if not file_path:
continue
logger.info(
f"Successfully downloaded file {os.path.basename(file_path)} from ECMK"
)
try:
if report_type == FileDownloadButtonType.RAW_XML.value:
with open(file_path, "r", encoding="utf-8") as f:
xml_string = f.read()
details = parse_rdsap(xml_string)
row_data = flatten_sap_property(details)
write_row(local_dimensions_path, row_data)
upload_excel_to_sharepoint(
client=sharepoint_client,
file_path=local_dimensions_path,
sharepoint_path=sharepoint_excel_path,
)
logger.info(
f"Written dimensions row and uploaded Dimensions.xlsx for {address}"
)
else:
upload_file_to_sharepoint(
client=sharepoint_client,
file_path=file_path,
base_path=sharepoint_base_path,
subpath=sharepoint_address,
)
logger.info(
f"Successfully loaded {os.path.basename(file_path)} to sharepoint for {address}"
)
# Upload to s3 and update db
upload_file_to_s3_and_update_db(
bucket=s3_bucket,
file_path=file_path,
hubspot_listing_id=hubspot_listing_id,
file_type=db_file_type,
)
except Exception:
raise
finally:
if os.path.exists(file_path):
os.remove(file_path)
page.go_back()
page.wait_for_selector(
"#assessmentDatatable tbody tr", timeout=15000
)
except Exception as e:
raise Exception(f"Row processing failed: {str(e)}") from e
if not go_to_next_page(page):
break
finally:
context.close()
browser.close()

View file

@ -0,0 +1,594 @@
from typing import Dict
from unittest.mock import MagicMock, call, patch
from backend.app.db.models.uploaded_file import FileTypeEnum
from backend.ecmk_fetcher.address_list import PropertyRow
from backend.ecmk_fetcher.ecmk_service import EcmkService
from backend.ecmk_fetcher.reports import FileDownloadButtonType
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
FAKE_PROPERTY_MAP: Dict[str, PropertyRow] = {
"10 FAKE ST SW1A 1AA": PropertyRow(
row_index=2, address="10 Fake St SW1A 1AA", listing_id="hs-001"
)
}
def make_service(
sharepoint_client: DomnaSharepointClient | None = None,
s3_bucket: str = "test-bucket",
property_list_filepath: str = "/fake/properties.xlsx",
sharepoint_base_path: str = "/base",
sharepoint_excel_path: str = "/excel",
local_dimensions_path: str = "/fake/Dimensions.xlsx",
) -> EcmkService:
return EcmkService(
sharepoint_client=sharepoint_client or MagicMock(spec=DomnaSharepointClient),
s3_bucket=s3_bucket,
property_list_filepath=property_list_filepath,
sharepoint_base_path=sharepoint_base_path,
sharepoint_excel_path=sharepoint_excel_path,
local_dimensions_path=local_dimensions_path,
)
# ---------------------------------------------------------------------------
# __init__: loads property map from spreadsheet filepath
# ---------------------------------------------------------------------------
def test_init_loads_property_map_from_filepath() -> None:
with patch(
"backend.ecmk_fetcher.ecmk_service.extract_addresses_from_spreadsheet",
return_value=FAKE_PROPERTY_MAP,
) as mock_extract:
_ = make_service(property_list_filepath="/some/props.xlsx")
mock_extract.assert_called_once_with("/some/props.xlsx")
# ---------------------------------------------------------------------------
# run(): downloads Dimensions.xlsx before Playwright browser launches
# ---------------------------------------------------------------------------
def _make_playwright_mocks() -> tuple[MagicMock, MagicMock, MagicMock, MagicMock]:
mock_page = MagicMock()
mock_context = MagicMock()
mock_context.new_page.return_value = mock_page
mock_browser = MagicMock()
mock_browser.new_context.return_value = mock_context
mock_playwright = MagicMock()
mock_playwright.chromium.launch.return_value = mock_browser
return mock_page, mock_context, mock_browser, mock_playwright
def test_run_downloads_dimensions_before_browser_launch() -> None:
call_order: list[str] = []
mock_client = MagicMock(spec=DomnaSharepointClient)
def _on_download(**_: object) -> None:
call_order.append("download")
mock_client.download_file.side_effect = _on_download
_, _, mock_browser, mock_playwright = _make_playwright_mocks()
def _on_launch(**_: object) -> MagicMock:
call_order.append("browser")
return mock_browser
mock_playwright.chromium.launch.side_effect = _on_launch
with (
patch(
"backend.ecmk_fetcher.ecmk_service.extract_addresses_from_spreadsheet",
return_value=FAKE_PROPERTY_MAP,
),
patch("backend.ecmk_fetcher.ecmk_service.sync_playwright") as mock_sync_pw,
):
mock_sync_pw.return_value.__enter__.return_value = mock_playwright
service = make_service(
sharepoint_client=mock_client,
sharepoint_excel_path="/excel",
local_dimensions_path="/fake/Dimensions.xlsx",
)
with patch.object(service, "_run_browser_session"):
service.run()
assert call_order == ["download", "browser"]
def test_run_downloads_dimensions_with_correct_paths() -> None:
mock_client = MagicMock(spec=DomnaSharepointClient)
_, _, _, mock_playwright = _make_playwright_mocks()
with (
patch(
"backend.ecmk_fetcher.ecmk_service.extract_addresses_from_spreadsheet",
return_value=FAKE_PROPERTY_MAP,
),
patch("backend.ecmk_fetcher.ecmk_service.sync_playwright") as mock_sync_pw,
):
mock_sync_pw.return_value.__enter__.return_value = mock_playwright
service = make_service(
sharepoint_client=mock_client,
sharepoint_excel_path="/excel",
local_dimensions_path="/fake/Dimensions.xlsx",
)
with patch.object(service, "_run_browser_session"):
service.run()
mock_client.download_file.assert_called_once_with(
sharepoint_path="/excel/Dimensions.xlsx",
local_path="/fake/Dimensions.xlsx",
)
# ---------------------------------------------------------------------------
# run(): passes the Playwright Page into _run_browser_session
# ---------------------------------------------------------------------------
def test_run_passes_page_to_run_browser_session() -> None:
mock_page, _, _, mock_playwright = _make_playwright_mocks()
with (
patch(
"backend.ecmk_fetcher.ecmk_service.extract_addresses_from_spreadsheet",
return_value=FAKE_PROPERTY_MAP,
),
patch("backend.ecmk_fetcher.ecmk_service.sync_playwright") as mock_sync_pw,
):
mock_sync_pw.return_value.__enter__.return_value = mock_playwright
service = make_service()
with patch.object(service, "_run_browser_session") as mock_session:
service.run()
mock_session.assert_called_once_with(mock_page)
# ---------------------------------------------------------------------------
# _process_file: dispatches based on report_type
# ---------------------------------------------------------------------------
def test_process_file_dispatches_to_xml_for_raw_xml() -> None:
with patch(
"backend.ecmk_fetcher.ecmk_service.extract_addresses_from_spreadsheet",
return_value=FAKE_PROPERTY_MAP,
):
service = make_service()
with (
patch.object(service, "_process_xml_file") as mock_xml,
patch.object(service, "_process_pdf_file") as mock_pdf,
):
service._process_file(
file_path="/tmp/file.xml",
report_type=FileDownloadButtonType.RAW_XML.value,
db_file_type=FileTypeEnum.ECMK_SURVEY_XML,
sharepoint_address="10 Fake St",
hubspot_listing_id="hs-001",
)
mock_xml.assert_called_once()
mock_pdf.assert_not_called()
def test_process_file_dispatches_to_pdf_for_non_xml() -> None:
with patch(
"backend.ecmk_fetcher.ecmk_service.extract_addresses_from_spreadsheet",
return_value=FAKE_PROPERTY_MAP,
):
service = make_service()
with (
patch.object(service, "_process_xml_file") as mock_xml,
patch.object(service, "_process_pdf_file") as mock_pdf,
):
service._process_file(
file_path="/tmp/file.pdf",
report_type=FileDownloadButtonType.SITENOTE_REPORT.value,
db_file_type=FileTypeEnum.ECMK_RD_SAP_SITE_NOTE,
sharepoint_address="10 Fake St",
hubspot_listing_id="hs-001",
)
mock_pdf.assert_called_once()
mock_xml.assert_not_called()
# ---------------------------------------------------------------------------
# _process_xml_file: parse → flatten → write row → upload excel → S3
# ---------------------------------------------------------------------------
def test_process_xml_file_full_chain() -> None:
fake_details = MagicMock()
fake_row_data = MagicMock()
with patch(
"backend.ecmk_fetcher.ecmk_service.extract_addresses_from_spreadsheet",
return_value=FAKE_PROPERTY_MAP,
):
service = make_service(
s3_bucket="my-bucket",
sharepoint_excel_path="/excel",
local_dimensions_path="/dims/Dimensions.xlsx",
)
with (
patch(
"backend.ecmk_fetcher.ecmk_service.parse_rdsap", return_value=fake_details
) as mock_parse,
patch(
"backend.ecmk_fetcher.ecmk_service.flatten_sap_property",
return_value=fake_row_data,
) as mock_flatten,
patch("backend.ecmk_fetcher.ecmk_service.write_row") as mock_write,
patch(
"backend.ecmk_fetcher.ecmk_service.upload_excel_to_sharepoint"
) as mock_upload_excel,
patch(
"backend.ecmk_fetcher.ecmk_service.upload_file_to_s3_and_record"
) as mock_s3,
patch(
"builtins.open",
MagicMock(return_value=MagicMock(
__enter__=lambda s: MagicMock(read=lambda: "<xml/>"),
__exit__=MagicMock(return_value=False),
)),
),
):
service._process_xml_file(
file_path="/tmp/report.xml",
db_file_type=FileTypeEnum.ECMK_SURVEY_XML,
hubspot_listing_id="hs-001",
)
mock_parse.assert_called_once()
mock_flatten.assert_called_once_with(fake_details)
mock_write.assert_called_once_with("/dims/Dimensions.xlsx", fake_row_data)
mock_upload_excel.assert_called_once_with(
client=service._sharepoint_client,
file_path="/dims/Dimensions.xlsx",
sharepoint_path="/excel",
)
mock_s3.assert_called_once_with(
bucket="my-bucket",
file_path="/tmp/report.xml",
hubspot_listing_id="hs-001",
file_type=FileTypeEnum.ECMK_SURVEY_XML,
)
# ---------------------------------------------------------------------------
# _process_pdf_file: sharepoint upload → S3 upload
# ---------------------------------------------------------------------------
def test_process_pdf_file_uploads_to_sharepoint_then_s3() -> None:
with patch(
"backend.ecmk_fetcher.ecmk_service.extract_addresses_from_spreadsheet",
return_value=FAKE_PROPERTY_MAP,
):
service = make_service(
s3_bucket="my-bucket",
sharepoint_base_path="/base",
)
with (
patch(
"backend.ecmk_fetcher.ecmk_service.upload_file_to_sharepoint"
) as mock_sp,
patch(
"backend.ecmk_fetcher.ecmk_service.upload_file_to_s3_and_record",
return_value=42,
) as mock_s3,
patch("backend.ecmk_fetcher.ecmk_service.parse_site_notes_pdf"),
patch("backend.ecmk_fetcher.ecmk_service.db_session"),
):
service._process_pdf_file(
file_path="/tmp/report.pdf",
file_type=FileTypeEnum.ECMK_SITE_NOTE,
sharepoint_address="10 Fake St",
hubspot_listing_id="hs-001",
)
mock_sp.assert_called_once_with(
client=service._sharepoint_client,
file_path="/tmp/report.pdf",
base_path="/base",
subpath="10 Fake St",
)
mock_s3.assert_called_once_with(
bucket="my-bucket",
file_path="/tmp/report.pdf",
hubspot_listing_id="hs-001",
file_type=FileTypeEnum.ECMK_SITE_NOTE,
)
# ---------------------------------------------------------------------------
# _process_pdf_file: EPC extraction conditional on file_type
# ---------------------------------------------------------------------------
def test_process_pdf_file_runs_epc_extraction_for_rd_sap_site_note() -> None:
fake_epc_data = MagicMock()
fake_session = MagicMock()
with patch(
"backend.ecmk_fetcher.ecmk_service.extract_addresses_from_spreadsheet",
return_value=FAKE_PROPERTY_MAP,
):
service = make_service()
with (
patch("backend.ecmk_fetcher.ecmk_service.upload_file_to_sharepoint"),
patch(
"backend.ecmk_fetcher.ecmk_service.upload_file_to_s3_and_record",
return_value=99,
),
patch(
"backend.ecmk_fetcher.ecmk_service.parse_site_notes_pdf",
return_value=fake_epc_data,
) as mock_parse,
patch(
"backend.ecmk_fetcher.ecmk_service.save_epc_property_data"
) as mock_save,
patch(
"backend.ecmk_fetcher.ecmk_service.db_session"
) as mock_db_session,
):
mock_db_session.return_value.__enter__.return_value = fake_session
service._process_pdf_file(
file_path="/tmp/sitenote.pdf",
file_type=FileTypeEnum.ECMK_RD_SAP_SITE_NOTE,
sharepoint_address="10 Fake St",
hubspot_listing_id="hs-001",
)
mock_parse.assert_called_once_with("/tmp/sitenote.pdf")
mock_save.assert_called_once_with(
session=fake_session,
data=fake_epc_data,
uploaded_file_id=99,
)
def test_process_pdf_file_skips_epc_extraction_for_ecmk_site_note() -> None:
with patch(
"backend.ecmk_fetcher.ecmk_service.extract_addresses_from_spreadsheet",
return_value=FAKE_PROPERTY_MAP,
):
service = make_service()
with (
patch("backend.ecmk_fetcher.ecmk_service.upload_file_to_sharepoint"),
patch(
"backend.ecmk_fetcher.ecmk_service.upload_file_to_s3_and_record",
return_value=42,
),
patch(
"backend.ecmk_fetcher.ecmk_service.parse_site_notes_pdf"
) as mock_parse,
patch("backend.ecmk_fetcher.ecmk_service.db_session") as mock_db_session,
):
service._process_pdf_file(
file_path="/tmp/sitenote.pdf",
file_type=FileTypeEnum.ECMK_SITE_NOTE,
sharepoint_address="10 Fake St",
hubspot_listing_id="hs-001",
)
mock_parse.assert_not_called()
mock_db_session.assert_not_called()
def test_process_pdf_file_epc_uses_separate_db_session_from_s3_upload() -> None:
"""EPC db_session opens only after upload_file_to_s3_and_record returns."""
call_order: list[str] = []
def _on_s3(**_: object) -> int:
call_order.append("s3")
return 77
def _on_db_session() -> MagicMock:
call_order.append("db_session")
ctx = MagicMock()
ctx.__enter__ = MagicMock(return_value=MagicMock())
ctx.__exit__ = MagicMock(return_value=False)
return ctx
with patch(
"backend.ecmk_fetcher.ecmk_service.extract_addresses_from_spreadsheet",
return_value=FAKE_PROPERTY_MAP,
):
service = make_service()
with (
patch("backend.ecmk_fetcher.ecmk_service.upload_file_to_sharepoint"),
patch(
"backend.ecmk_fetcher.ecmk_service.upload_file_to_s3_and_record",
side_effect=_on_s3,
),
patch("backend.ecmk_fetcher.ecmk_service.parse_site_notes_pdf"),
patch("backend.ecmk_fetcher.ecmk_service.save_epc_property_data"),
patch(
"backend.ecmk_fetcher.ecmk_service.db_session",
side_effect=_on_db_session,
),
):
service._process_pdf_file(
file_path="/tmp/sitenote.pdf",
file_type=FileTypeEnum.ECMK_RD_SAP_SITE_NOTE,
sharepoint_address="10 Fake St",
hubspot_listing_id="hs-001",
)
assert call_order == ["s3", "db_session"]
# ---------------------------------------------------------------------------
# _process_pdf_file: EPC failures swallowed with warning
# ---------------------------------------------------------------------------
def _pdf_file_patches_for_failure() -> tuple: # type: ignore[type-arg]
return (
patch("backend.ecmk_fetcher.ecmk_service.upload_file_to_sharepoint"),
patch(
"backend.ecmk_fetcher.ecmk_service.upload_file_to_s3_and_record",
return_value=1,
),
)
def test_process_pdf_file_parse_failure_logged_as_warning_not_raised() -> None:
with patch(
"backend.ecmk_fetcher.ecmk_service.extract_addresses_from_spreadsheet",
return_value=FAKE_PROPERTY_MAP,
):
service = make_service()
sp_patch, s3_patch = _pdf_file_patches_for_failure()
with (
sp_patch,
s3_patch,
patch(
"backend.ecmk_fetcher.ecmk_service.parse_site_notes_pdf",
side_effect=ValueError("bad pdf"),
),
patch("backend.ecmk_fetcher.ecmk_service.save_epc_property_data") as mock_save,
patch("backend.ecmk_fetcher.ecmk_service.db_session"),
patch("backend.ecmk_fetcher.ecmk_service.logger") as mock_logger,
):
service._process_pdf_file(
file_path="/tmp/sitenote.pdf",
file_type=FileTypeEnum.ECMK_RD_SAP_SITE_NOTE,
sharepoint_address="10 Fake St",
hubspot_listing_id="hs-001",
)
mock_logger.warning.assert_called_once()
mock_save.assert_not_called()
def test_process_pdf_file_save_failure_logged_as_warning_not_raised() -> None:
fake_session = MagicMock()
with patch(
"backend.ecmk_fetcher.ecmk_service.extract_addresses_from_spreadsheet",
return_value=FAKE_PROPERTY_MAP,
):
service = make_service()
sp_patch, s3_patch = _pdf_file_patches_for_failure()
with (
sp_patch,
s3_patch,
patch(
"backend.ecmk_fetcher.ecmk_service.parse_site_notes_pdf",
return_value=MagicMock(),
),
patch(
"backend.ecmk_fetcher.ecmk_service.save_epc_property_data",
side_effect=RuntimeError("db exploded"),
),
patch("backend.ecmk_fetcher.ecmk_service.db_session") as mock_db_session,
patch("backend.ecmk_fetcher.ecmk_service.logger") as mock_logger,
):
mock_db_session.return_value.__enter__.return_value = fake_session
service._process_pdf_file(
file_path="/tmp/sitenote.pdf",
file_type=FileTypeEnum.ECMK_RD_SAP_SITE_NOTE,
sharepoint_address="10 Fake St",
hubspot_listing_id="hs-001",
)
mock_logger.warning.assert_called_once()
# ---------------------------------------------------------------------------
# _run_browser_session: delegates file processing to _process_file
# ---------------------------------------------------------------------------
def _make_page_mock_with_one_matching_row() -> MagicMock:
cells_nth: dict[int, MagicMock] = {n: MagicMock() for n in (1, 2, 5, 7, 9)}
cells_nth[1].inner_text.return_value = "John"
cells_nth[2].inner_text.return_value = "Doe"
cells_nth[5].inner_text.return_value = "10 FAKE ST"
cells_nth[7].inner_text.return_value = "SW1A 1AA"
cells_nth[9].inner_text.return_value = "Submitted (not Lodged)"
cells_mock = MagicMock()
cells_mock.nth.side_effect = lambda n: cells_nth[n]
row_mock = MagicMock()
row_mock.locator.return_value = cells_mock
rows_mock = MagicMock()
rows_mock.count.return_value = 1
rows_mock.nth.return_value = row_mock
page = MagicMock()
page.locator.return_value = rows_mock
return page
# address "10 FAKE ST" + postcode "SW1A 1AA" → build_property_id → "10SW1A1AA"
_BROWSER_SESSION_PROPERTY_MAP: Dict[str, PropertyRow] = {
"10SW1A1AA": PropertyRow(
row_index=2, address="10 Fake St SW1A 1AA", listing_id="12345"
)
}
def test_run_browser_session_calls_process_file_for_downloaded_file() -> None:
mock_page = _make_page_mock_with_one_matching_row()
with patch(
"backend.ecmk_fetcher.ecmk_service.extract_addresses_from_spreadsheet",
return_value=_BROWSER_SESSION_PROPERTY_MAP,
):
service = make_service()
with (
patch("backend.ecmk_fetcher.ecmk_service.attach_debug_listeners"),
patch("backend.ecmk_fetcher.ecmk_service.login"),
patch("backend.ecmk_fetcher.ecmk_service.go_to_assessments"),
patch("backend.ecmk_fetcher.ecmk_service.go_to_assessment_details"),
patch("backend.ecmk_fetcher.ecmk_service.go_to_next_page", return_value=False),
patch(
"backend.ecmk_fetcher.ecmk_service.get_uploaded_file_by_listing_type_and_source",
return_value=None,
),
patch(
"backend.ecmk_fetcher.ecmk_service.download_with_retry",
return_value="/tmp/fake.pdf",
),
patch(
"backend.ecmk_fetcher.ecmk_service.map_report_type_to_db_file_type",
return_value=FileTypeEnum.ECMK_SITE_NOTE,
),
patch(
"backend.ecmk_fetcher.ecmk_service.REPORT_TYPES",
[FileDownloadButtonType.SITENOTE_REPORT.value],
),
patch.object(service, "_process_file") as mock_process_file,
patch("os.path.exists", return_value=False),
):
service._run_browser_session(mock_page)
mock_process_file.assert_called_once_with(
file_path="/tmp/fake.pdf",
report_type=FileDownloadButtonType.SITENOTE_REPORT.value,
db_file_type=FileTypeEnum.ECMK_SITE_NOTE,
sharepoint_address="10 Fake St SW1A 1AA",
hubspot_listing_id="12345",
)

View file

@ -0,0 +1,59 @@
from unittest.mock import MagicMock, patch
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
def test_handler_constructs_ecmk_service_and_calls_run() -> None:
mock_service = MagicMock()
mock_service_cls = MagicMock(return_value=mock_service)
with (
patch(
"backend.ecmk_fetcher.handler.handler.EcmkService",
mock_service_cls,
),
patch(
"backend.ecmk_fetcher.handler.handler.DomnaSharepointClient",
return_value=MagicMock(spec=DomnaSharepointClient),
),
):
from backend.ecmk_fetcher.handler.handler import handler
handler({}, None)
mock_service_cls.assert_called_once()
mock_service.run.assert_called_once()
def test_handler_passes_correct_config_to_ecmk_service() -> None:
mock_service = MagicMock()
mock_service_cls = MagicMock(return_value=mock_service)
with (
patch(
"backend.ecmk_fetcher.handler.handler.EcmkService",
mock_service_cls,
),
patch(
"backend.ecmk_fetcher.handler.handler.DomnaSharepointClient",
return_value=MagicMock(spec=DomnaSharepointClient),
),
):
from backend.ecmk_fetcher.handler.handler import handler
handler({}, None)
_, kwargs = mock_service_cls.call_args
assert kwargs["s3_bucket"] == "retrofit-energy-assessments-dev"
assert (
kwargs["sharepoint_base_path"]
== "/Projects/Southern Housing/SH-SURV-26-001/Assessments"
)
assert (
kwargs["sharepoint_excel_path"]
== "/Projects/Southern Housing/SH-SURV-26-001/Modelling"
)
assert kwargs["property_list_filepath"].endswith(
"hubspot-crm-exports-southern-ra-lite-programme-3103-2026-03-31-2.xlsx"
)
assert kwargs["local_dimensions_path"].endswith("Dimensions.xlsx")

View file

@ -0,0 +1,108 @@
from typing import Generator
from unittest.mock import MagicMock, call, patch
import pytest
from backend.app.db.models.uploaded_file import FileTypeEnum
from backend.ecmk_fetcher.upload import upload_file_to_s3_and_record
@pytest.fixture
def mock_uploaded_file() -> MagicMock:
obj = MagicMock()
obj.id = 42
return obj
@pytest.fixture
def mock_session() -> MagicMock:
return MagicMock()
@pytest.fixture
def patched_deps(
mock_uploaded_file: MagicMock, mock_session: MagicMock
) -> Generator[dict[str, MagicMock], None, None]:
with (
patch(
"backend.ecmk_fetcher.upload.upload_file_to_s3"
) as mock_s3,
patch(
"backend.ecmk_fetcher.upload.db_session"
) as mock_db_ctx,
patch(
"backend.ecmk_fetcher.upload.UploadedFile",
return_value=mock_uploaded_file,
) as mock_model,
):
mock_db_ctx.return_value.__enter__.return_value = mock_session
mock_db_ctx.return_value.__exit__.return_value = False
yield {
"s3": mock_s3,
"db_ctx": mock_db_ctx,
"model": mock_model,
"session": mock_session,
"uploaded_file": mock_uploaded_file,
}
def test_returns_uploaded_file_id_as_int(
patched_deps: dict[str, MagicMock],
) -> None:
result = upload_file_to_s3_and_record(
bucket="test-bucket",
file_path="/tmp/report.pdf",
hubspot_listing_id="hs-001",
file_type=FileTypeEnum.ECMK_RD_SAP_SITE_NOTE,
)
assert isinstance(result, int)
assert result == 42
def test_uploads_to_s3_with_key_derived_from_listing_id_and_filename(
patched_deps: dict[str, MagicMock],
) -> None:
upload_file_to_s3_and_record(
bucket="my-bucket",
file_path="/some/path/site_note.pdf",
hubspot_listing_id="hs-999",
file_type=FileTypeEnum.ECMK_RD_SAP_SITE_NOTE,
)
patched_deps["s3"].assert_called_once_with(
"/some/path/site_note.pdf",
"my-bucket",
"documents/hubspot_listing_id/hs-999/site_note.pdf",
)
def test_adds_uploaded_file_record_to_session(
patched_deps: dict[str, MagicMock],
) -> None:
upload_file_to_s3_and_record(
bucket="test-bucket",
file_path="/tmp/report.pdf",
hubspot_listing_id="hs-001",
file_type=FileTypeEnum.ECMK_RD_SAP_SITE_NOTE,
)
patched_deps["session"].add.assert_called_once_with(
patched_deps["uploaded_file"]
)
patched_deps["session"].flush.assert_called_once()
def test_site_note_type_does_not_trigger_pdf_parsing(
patched_deps: dict[str, MagicMock],
) -> None:
# If parsing branch still existed, this would blow up without a
# parse_site_notes_pdf mock — test passes only when branch is absent.
result = upload_file_to_s3_and_record(
bucket="test-bucket",
file_path="/tmp/site_note.pdf",
hubspot_listing_id="hs-002",
file_type=FileTypeEnum.ECMK_RD_SAP_SITE_NOTE,
)
assert result == 42

View file

@ -1,5 +1,6 @@
from datetime import datetime, timezone
import os
from typing import cast
from backend.app.db.connection import db_session
from backend.app.db.models.uploaded_file import (
@ -7,9 +8,12 @@ from backend.app.db.models.uploaded_file import (
FileTypeEnum,
UploadedFile,
)
from utils.logger import setup_logger
from utils.s3 import upload_file_to_s3
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
logger = setup_logger()
def upload_file_to_sharepoint(
client: DomnaSharepointClient,
@ -41,9 +45,9 @@ def upload_excel_to_sharepoint(
# TODO: this should be moved to somewhere common and called by pashub fetcher
def upload_file_to_s3_and_update_db(
def upload_file_to_s3_and_record(
bucket: str, file_path: str, hubspot_listing_id: str, file_type: FileTypeEnum
) -> None:
) -> int:
filename: str = os.path.basename(file_path)
key: str = f"documents/hubspot_listing_id/{hubspot_listing_id}/{filename}"
@ -61,4 +65,7 @@ def upload_file_to_s3_and_update_db(
with db_session() as session:
# TODO: we should do multiple files at once to reduce db trips
session.add(uploaded_file)
session.commit()
session.flush()
uploaded_file_id: int = int(cast(int, uploaded_file.id))
return uploaded_file_id

View file

@ -1,72 +1,18 @@
from datetime import datetime, timezone
import os
import re
from typing import Any, Dict, List, Optional
from openpyxl import load_workbook
from typing import Any, Dict, List
from backend.app.config import get_settings
from backend.app.db.connection import db_session
from backend.app.db.models.uploaded_file import (
FileSourceEnum,
UploadedFile,
)
from backend.pashub_fetcher.core_files import infer_file_type
from backend.pashub_fetcher.job import Job
from backend.pashub_fetcher.pashub_client import PashubClient, UnauthorizedError
from backend.pashub_fetcher.pashub_to_ara_trigger_request import (
PashubToAraTriggerRequest,
)
from backend.pashub_fetcher.sharepoint_subfolders import SharepointSubfolders
from backend.pashub_fetcher.pashub_service import PashubService
from backend.pashub_fetcher.pashub_to_ara_trigger_request import PashubToAraTriggerRequest
from backend.pashub_fetcher.token_getter import get_token_from_local_storage
from backend.utils.subtasks import task_handler
from utils.logger import setup_logger
from utils.s3 import upload_file_to_s3
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
from utils.sharepoint.domna_sites import DomnaSites
logger = setup_logger()
def extract_jobs(filepath: str) -> List[Job]:
wb = load_workbook(filepath, data_only=True)
# ws = wb["watford warm homes (wave 3) mai"]
ws = wb["filtered"]
HEADER_ROW = 3
headers: Dict[str, int] = {}
for col in range(1, ws.max_column + 1):
value = str(ws.cell(row=HEADER_ROW, column=col).value)
if value:
headers[value.strip()] = col
name_col = headers["Name"]
# link_col = headers["Pashub Link"]
link_col = headers["PasHub Link"]
jobs: List[Job] = []
for row in range(HEADER_ROW + 1, ws.max_row + 1):
name = ws.cell(row=row, column=name_col).value
link = ws.cell(row=row, column=link_col).value
if not name or not link:
continue
match = re.search(r"/jobs/([0-9a-fA-F\-]+)/", str(link))
if not match:
continue
jobs.append(
{
"id": match.group(1),
"address": str(name),
}
)
return jobs
S3_BUCKET = "retrofit-energy-assessments-dev"
def get_pashub_client(email: str, password: str) -> PashubClient:
@ -75,114 +21,6 @@ def get_pashub_client(email: str, password: str) -> PashubClient:
return PashubClient(token=token)
def upload_job_to_sharepoint(
sharepoint_client: DomnaSharepointClient,
# base_path: str,
sharepoint_link: str,
job_files: List[str],
) -> None:
# job_path = f"{base_path}/{job['address']}"
# Create main job folder
# sharepoint_client.makedir(job["address"], base_path)
# Create subfolders
# for folder in SharepointSubfolders:
# sharepoint_client.makedir(folder.value, job_path)
# Upload into assessment folder
assessment_path = f"{sharepoint_link}/{SharepointSubfolders.ASSESSMENT.value}"
for file_path in job_files:
filename = file_path.split("/")[-1]
sharepoint_client.upload_file(
file_path,
assessment_path,
filename,
)
def upload_job_to_s3_and_update_db(
job_files: List[str], uprn: Optional[str], hubspot_deal_id: Optional[str]
) -> None:
bucket = "retrofit-energy-assessments-dev"
if not uprn and not hubspot_deal_id:
return
base_path = (
f"documents/uprn/{uprn}"
if uprn
else f"documents/hubspot_deal_id/{hubspot_deal_id}"
)
uploaded_files: List[UploadedFile] = []
for file_path in job_files:
filename = os.path.basename(file_path)
file_key = f"{base_path}/{filename}"
upload_file_to_s3(file_path, bucket, file_key)
# load row to db
# TODO: use same upload_file_to_s3_and_update_db method as ecmk fetcher does
uploaded_files.append(
UploadedFile(
s3_file_bucket=bucket,
s3_file_key=file_key,
s3_upload_timestamp=datetime.now(timezone.utc),
uprn=int(uprn) if uprn else None,
hubspot_deal_id=hubspot_deal_id,
file_source=FileSourceEnum.PAS_HUB.value,
file_type=infer_file_type(filename),
)
)
with db_session() as session:
session.add_all(uploaded_files)
session.commit()
pass
def process_job(
job: PashubToAraTriggerRequest,
pashub_client: PashubClient,
sharepoint_client: DomnaSharepointClient,
) -> List[str]:
job_id = job.pashub_job_id
uprn: Optional[str] = job.uprn or pashub_client.get_uprn_by_job_id(job_id)
hubspot_deal_id: Optional[str] = job.hubspot_deal_id
if uprn:
logger.info(f"Got UPRN {uprn} for job {job_id}")
else:
logger.info(f"No UPRN found for job {job_id}")
job_files: List[str] = pashub_client.get_core_evidence_files_by_job_id(job_id)
if uprn or hubspot_deal_id:
logger.info("Uploading files to s3")
upload_job_to_s3_and_update_db(job_files, uprn, hubspot_deal_id)
# # Comment out sharepoint loading for now:
# Seems like the sharepoint link in pas hub is inconsistent in terms
# of whether it points to a property or a project
# if job.sharepoint_link:
# upload_job_to_sharepoint(sharepoint_client, job.sharepoint_link, job_files)
for file_path in job_files:
try:
os.remove(file_path)
except OSError:
logger.warning(f"Failed to delete temp file {file_path}")
return job_files
@task_handler()
def handler(body: Dict[str, Any], context: Any) -> List[str]:
logger.info("Received message")
@ -195,8 +33,6 @@ def handler(body: Dict[str, Any], context: Any) -> List[str]:
if (not pas_hub_email) or (not pas_hub_password):
raise ValueError("Pas Hub credentials not provided")
pashub_client = get_pashub_client(pas_hub_email, pas_hub_password)
sharepoint_client = DomnaSharepointClient(
sharepoint_location=DomnaSites.SOCIAL_HOUSING_WAVE_3
)
@ -205,26 +41,24 @@ def handler(body: Dict[str, Any], context: Any) -> List[str]:
payload = PashubToAraTriggerRequest.model_validate(body)
logger.debug("Successfully validated request body")
service = PashubService(
pashub_client=get_pashub_client(pas_hub_email, pas_hub_password),
sharepoint_client=sharepoint_client,
s3_bucket=S3_BUCKET,
)
try:
files: List[str] = process_job(
payload,
pashub_client,
sharepoint_client,
)
files: List[str] = service.run(payload)
except UnauthorizedError:
logger.warning("Token expired - refreshing")
pashub_client = get_pashub_client(
pas_hub_email,
pas_hub_password,
service = PashubService(
pashub_client=get_pashub_client(pas_hub_email, pas_hub_password),
sharepoint_client=sharepoint_client,
s3_bucket=S3_BUCKET,
)
# retry once
files = process_job(
payload,
pashub_client,
sharepoint_client,
)
files = service.run(payload)
logger.info(f"Saved {len(files)} files")

View file

@ -0,0 +1,158 @@
import os
from datetime import datetime, timezone
from typing import List, NamedTuple, Optional, cast
from backend.app.db.connection import db_session
from backend.app.db.models.uploaded_file import (
FileSourceEnum,
FileTypeEnum,
UploadedFile,
)
from backend.documents_parser.db_writer import save_epc_property_data
from backend.documents_parser.parser import parse_site_notes_pdf
from backend.pashub_fetcher.core_files import infer_file_type
from backend.pashub_fetcher.pashub_client import PashubClient
from backend.pashub_fetcher.pashub_to_ara_trigger_request import (
PashubToAraTriggerRequest,
)
from backend.pashub_fetcher.sharepoint_subfolders import SharepointSubfolders
from datatypes.epc.domain.epc_property_data import EpcPropertyData
from utils.logger import setup_logger
from utils.s3 import upload_file_to_s3
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
logger = setup_logger()
class _FileUploadRecord(NamedTuple):
file_path: str
file_type: Optional[str]
uploaded_file_id: int
class PashubService:
def __init__(
self,
pashub_client: PashubClient,
sharepoint_client: DomnaSharepointClient,
s3_bucket: str,
) -> None:
self._pashub_client = pashub_client
self._sharepoint_client = sharepoint_client
self._s3_bucket = s3_bucket
def run(self, request: PashubToAraTriggerRequest) -> List[str]:
job_id = request.pashub_job_id
uprn: Optional[str] = request.uprn or self._pashub_client.get_uprn_by_job_id(
job_id
)
hubspot_deal_id: Optional[str] = request.hubspot_deal_id
if uprn:
logger.info(f"Got UPRN {uprn} for job {job_id}")
else:
logger.info(f"No UPRN found for job {job_id}")
job_files: List[str] = self._pashub_client.get_core_evidence_files_by_job_id(
job_id
)
if uprn or hubspot_deal_id:
logger.info("Uploading files to s3")
upload_records = self._upload_to_s3_and_update_db(
job_files, uprn, hubspot_deal_id
)
self._save_site_notes(upload_records)
# SharePoint upload disabled: pashub sharepoint_link is inconsistent
# (points to property or project unpredictably)
# if request.sharepoint_link:
# self._upload_to_sharepoint(request.sharepoint_link, job_files)
for file_path in job_files:
try:
os.remove(file_path)
except OSError:
logger.warning(f"Failed to delete temp file {file_path}")
return job_files
def _upload_to_s3_and_update_db(
self,
job_files: List[str],
uprn: Optional[str],
hubspot_deal_id: Optional[str],
) -> List[_FileUploadRecord]:
if not uprn and not hubspot_deal_id:
return []
base_path = (
f"documents/uprn/{uprn}"
if uprn
else f"documents/hubspot_deal_id/{hubspot_deal_id}"
)
file_paths: List[str] = []
uploaded_files: List[UploadedFile] = []
for file_path in job_files:
filename = os.path.basename(file_path)
file_key = f"{base_path}/{filename}"
upload_file_to_s3(file_path, self._s3_bucket, file_key)
uploaded_file = UploadedFile(
s3_file_bucket=self._s3_bucket,
s3_file_key=file_key,
s3_upload_timestamp=datetime.now(timezone.utc),
uprn=int(uprn) if uprn else None,
hubspot_deal_id=hubspot_deal_id,
file_source=FileSourceEnum.PAS_HUB.value,
file_type=infer_file_type(filename),
)
file_paths.append(file_path)
uploaded_files.append(uploaded_file)
with db_session() as session:
session.add_all(uploaded_files)
session.flush()
upload_records = [
_FileUploadRecord(
file_path=fp,
file_type=cast(Optional[str], uf.file_type),
uploaded_file_id=cast(int, uf.id),
)
for fp, uf in zip(file_paths, uploaded_files)
]
return upload_records
def _save_site_notes(self, upload_records: List[_FileUploadRecord]) -> None:
for record in upload_records:
if (
record.file_type is None
or FileTypeEnum(record.file_type) != FileTypeEnum.RD_SAP_SITE_NOTE
):
continue
try:
epc_data: EpcPropertyData = parse_site_notes_pdf(record.file_path)
with db_session() as session:
save_epc_property_data(
session, epc_data, uploaded_file_id=record.uploaded_file_id
)
except Exception:
logger.warning(
f"Failed to parse site notes {record.file_path}", exc_info=True
)
def _upload_to_sharepoint(
self,
sharepoint_link: str,
job_files: List[str],
) -> None:
assessment_path = f"{sharepoint_link}/{SharepointSubfolders.ASSESSMENT.value}"
for file_path in job_files:
filename = file_path.split("/")[-1]
self._sharepoint_client.upload_file(file_path, assessment_path, filename)

View file

@ -0,0 +1,43 @@
import re
from typing import Dict, List
from openpyxl import load_workbook
from backend.pashub_fetcher.job import Job
def extract_jobs(filepath: str) -> List[Job]:
wb = load_workbook(filepath, data_only=True)
ws = wb["filtered"]
HEADER_ROW = 3
headers: Dict[str, int] = {}
for col in range(1, ws.max_column + 1):
value = str(ws.cell(row=HEADER_ROW, column=col).value)
if value:
headers[value.strip()] = col
name_col = headers["Name"]
link_col = headers["PasHub Link"]
jobs: List[Job] = []
for row in range(HEADER_ROW + 1, ws.max_row + 1):
name = ws.cell(row=row, column=name_col).value
link = ws.cell(row=row, column=link_col).value
if not name or not link:
continue
match = re.search(r"/jobs/([0-9a-fA-F\-]+)/", str(link))
if not match:
continue
jobs.append(
{
"id": match.group(1),
"address": str(name),
}
)
return jobs

View file

@ -0,0 +1,254 @@
from typing import Optional
from unittest.mock import MagicMock, call, patch
from backend.pashub_fetcher.pashub_client import PashubClient
from backend.pashub_fetcher.pashub_service import PashubService
from backend.pashub_fetcher.pashub_to_ara_trigger_request import (
PashubToAraTriggerRequest,
)
from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient
FAKE_JOB_LINK = "https://pashub.net/jobs/job-id-123/details"
def make_request(
pashub_link: str = FAKE_JOB_LINK,
uprn: Optional[str] = None,
hubspot_deal_id: Optional[str] = None,
sharepoint_link: Optional[str] = None,
) -> PashubToAraTriggerRequest:
return PashubToAraTriggerRequest(
pashub_link=pashub_link,
uprn=uprn,
hubspot_deal_id=hubspot_deal_id,
sharepoint_link=sharepoint_link,
)
def make_service(
pashub_client: Optional[PashubClient] = None,
sharepoint_client: Optional[DomnaSharepointClient] = None,
s3_bucket: str = "test-bucket",
) -> PashubService:
return PashubService(
pashub_client=pashub_client or MagicMock(spec=PashubClient),
sharepoint_client=sharepoint_client or MagicMock(spec=DomnaSharepointClient),
s3_bucket=s3_bucket,
)
# ---------------------------------------------------------------------------
# run(): returns file paths
# ---------------------------------------------------------------------------
def test_run_returns_file_paths() -> None:
mock_client = MagicMock(spec=PashubClient)
mock_client.get_uprn_by_job_id.return_value = None
mock_client.get_core_evidence_files_by_job_id.return_value = [
"/tmp/a.pdf",
"/tmp/b.pdf",
]
service = make_service(pashub_client=mock_client)
with patch("backend.pashub_fetcher.pashub_service.os.remove"):
result = service.run(make_request())
assert result == ["/tmp/a.pdf", "/tmp/b.pdf"]
# ---------------------------------------------------------------------------
# run(): skips upload when neither uprn nor hubspot_deal_id
# ---------------------------------------------------------------------------
def test_run_skips_upload_when_no_uprn_and_no_deal_id() -> None:
mock_client = MagicMock(spec=PashubClient)
mock_client.get_uprn_by_job_id.return_value = None
mock_client.get_core_evidence_files_by_job_id.return_value = ["/tmp/a.pdf"]
service = make_service(pashub_client=mock_client)
with (
patch("backend.pashub_fetcher.pashub_service.upload_file_to_s3") as mock_s3,
patch("backend.pashub_fetcher.pashub_service.os.remove"),
):
service.run(make_request(uprn=None, hubspot_deal_id=None))
mock_s3.assert_not_called()
# ---------------------------------------------------------------------------
# run(): UPRN present → uploads each file to S3 with correct bucket/key
# ---------------------------------------------------------------------------
def test_run_uploads_files_to_s3_using_uprn_path() -> None:
mock_client = MagicMock(spec=PashubClient)
mock_client.get_uprn_by_job_id.return_value = None
mock_client.get_core_evidence_files_by_job_id.return_value = [
"/tmp/SiteNote_001.pdf",
"/tmp/Photopack_002.pdf",
]
service = make_service(pashub_client=mock_client, s3_bucket="my-bucket")
with (
patch("backend.pashub_fetcher.pashub_service.upload_file_to_s3") as mock_s3,
patch("backend.pashub_fetcher.pashub_service.db_session"),
patch("backend.pashub_fetcher.pashub_service.os.remove"),
):
service.run(make_request(uprn="12345"))
mock_s3.assert_has_calls(
[
call(
"/tmp/SiteNote_001.pdf",
"my-bucket",
"documents/uprn/12345/SiteNote_001.pdf",
),
call(
"/tmp/Photopack_002.pdf",
"my-bucket",
"documents/uprn/12345/Photopack_002.pdf",
),
],
any_order=False,
)
# ---------------------------------------------------------------------------
# run(): UPRN present → UploadedFile records added to DB session
# ---------------------------------------------------------------------------
def test_run_persists_uploaded_file_records_to_db() -> None:
mock_client = MagicMock(spec=PashubClient)
mock_client.get_uprn_by_job_id.return_value = None
mock_client.get_core_evidence_files_by_job_id.return_value = [
"/tmp/SiteNote_001.pdf"
]
fake_session = MagicMock()
service = make_service(pashub_client=mock_client)
with (
patch("backend.pashub_fetcher.pashub_service.upload_file_to_s3"),
patch("backend.pashub_fetcher.pashub_service.db_session") as mock_db,
patch("backend.pashub_fetcher.pashub_service.os.remove"),
):
mock_db.return_value.__enter__.return_value = fake_session
service.run(make_request(uprn="12345"))
fake_session.add_all.assert_called_once()
added: list = fake_session.add_all.call_args[0][0]
assert len(added) == 1
assert added[0].s3_file_bucket == "test-bucket"
assert added[0].uprn == 12345
# ---------------------------------------------------------------------------
# run(): hubspot_deal_id only → uses deal_id S3 path prefix
# ---------------------------------------------------------------------------
def test_run_uses_hubspot_deal_id_path_when_no_uprn() -> None:
mock_client = MagicMock(spec=PashubClient)
mock_client.get_uprn_by_job_id.return_value = None
mock_client.get_core_evidence_files_by_job_id.return_value = [
"/tmp/SiteNote_001.pdf"
]
service = make_service(pashub_client=mock_client, s3_bucket="my-bucket")
with (
patch("backend.pashub_fetcher.pashub_service.upload_file_to_s3") as mock_s3,
patch("backend.pashub_fetcher.pashub_service.db_session"),
patch("backend.pashub_fetcher.pashub_service.os.remove"),
):
service.run(make_request(uprn=None, hubspot_deal_id="deal-abc"))
mock_s3.assert_called_once_with(
"/tmp/SiteNote_001.pdf",
"my-bucket",
"documents/hubspot_deal_id/deal-abc/SiteNote_001.pdf",
)
# ---------------------------------------------------------------------------
# run(): RD_SAP_SITE_NOTE file → site notes parsed and saved to DB
# ---------------------------------------------------------------------------
def test_run_parses_and_saves_site_notes_for_rd_sap_site_note_file() -> None:
mock_client = MagicMock(spec=PashubClient)
mock_client.get_uprn_by_job_id.return_value = None
mock_client.get_core_evidence_files_by_job_id.return_value = [
"/tmp/RdSAP_SiteNote_001.pdf"
]
fake_epc_data = MagicMock()
fake_session = MagicMock()
fake_uploaded_file_id = 99
service = make_service(pashub_client=mock_client)
with (
patch("backend.pashub_fetcher.pashub_service.upload_file_to_s3"),
patch(
"backend.pashub_fetcher.pashub_service.parse_site_notes_pdf",
return_value=fake_epc_data,
) as mock_parse,
patch(
"backend.pashub_fetcher.pashub_service.save_epc_property_data"
) as mock_save,
patch("backend.pashub_fetcher.pashub_service.db_session") as mock_db,
patch("backend.pashub_fetcher.pashub_service.os.remove"),
):
fake_session.add_all = MagicMock(
side_effect=lambda files: setattr(files[0], "id", fake_uploaded_file_id)
)
mock_db.return_value.__enter__.return_value = fake_session
service.run(make_request(uprn="12345"))
mock_parse.assert_called_once_with("/tmp/RdSAP_SiteNote_001.pdf")
mock_save.assert_called_once_with(
fake_session, fake_epc_data, uploaded_file_id=fake_uploaded_file_id
)
# ---------------------------------------------------------------------------
# run(): site notes parse failure → warning logged, run returns normally
# ---------------------------------------------------------------------------
def test_run_warns_and_continues_when_site_notes_parsing_fails() -> None:
mock_client = MagicMock(spec=PashubClient)
mock_client.get_uprn_by_job_id.return_value = None
mock_client.get_core_evidence_files_by_job_id.return_value = [
"/tmp/RdSAP_SiteNote_001.pdf"
]
service = make_service(pashub_client=mock_client)
with (
patch("backend.pashub_fetcher.pashub_service.upload_file_to_s3"),
patch(
"backend.pashub_fetcher.pashub_service.parse_site_notes_pdf",
side_effect=ValueError("corrupt pdf"),
),
patch(
"backend.pashub_fetcher.pashub_service.save_epc_property_data"
) as mock_save,
patch("backend.pashub_fetcher.pashub_service.db_session"),
patch("backend.pashub_fetcher.pashub_service.logger") as mock_logger,
patch("backend.pashub_fetcher.pashub_service.os.remove"),
):
result = service.run(make_request(uprn="12345"))
assert result == ["/tmp/RdSAP_SiteNote_001.pdf"]
mock_logger.warning.assert_called()
mock_save.assert_not_called()

View file

@ -3,6 +3,6 @@ pythonpath = .
log_cli = true
log_cli_level = INFO
addopts = --cov-report term-missing --cov=etl/epc --cov=recommendations --cov=backend --cov=etl/epc_clean --cov=etl/spatial
testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests etl/hubspot/tests backend/hubspot_trigger_orchestrator/tests datatypes/epc/schema/tests datatypes/epc/surveys/tests datatypes/epc/domain/tests backend/ecmk_fetcher/tests/ backend/documents_parser/tests
testpaths = recommendations/tests backend/tests etl/epc/tests etl/epc_clean/tests etl/spatial/tests backend/condition/tests backend/address2UPRN/tests backend/onboarders/tests backend/categorisation/tests backend/export/tests etl/hubspot/tests backend/hubspot_trigger_orchestrator/tests datatypes/epc/schema/tests datatypes/epc/surveys/tests datatypes/epc/domain/tests backend/ecmk_fetcher/tests/ backend/pashub_fetcher/tests backend/documents_parser/tests
markers =
integration: mark a test as an integration test