change pdf reader to file reader

This commit is contained in:
Jun-te Kim 2025-07-09 10:25:42 +00:00
parent 58e27915cb
commit 1e467cfd56
10 changed files with 136 additions and 38 deletions

View file

@ -1,8 +1,8 @@
from etl.utils.logger import Logger
import logging
import pymupdf
from etl.pdfReader.sitenotes import QuidosSiteNotesExtractor, CSR, ConditionReport
from etl.pdfReader.reportType import ReportType
from etl.fileReader.sitenotes import QuidosSiteNotesExtractor, CSR, WarmHomesConditionReport, ECOConditionReport, RDSAPEnergyReport
from etl.fileReader.reportType import ReportType
class pdfReaderToText():
@ -24,6 +24,7 @@ class pdfReaderToText():
self.all_text += text
self.text_list = self.all_text.split('\n')
print(self.text_list)
def get_list_of_text(self):
return self.text_list
@ -41,7 +42,11 @@ class pdfReaderToText():
elif "Chartered Surveyor Report: Recommending Extraction of Defective Cavity Wall Insulation " in self.text_list:
self.type = ReportType.CHARTED_SURVEYOR_REPORT
elif "Osmosis ACD NEW PAS 2035 Condition Report".lower() in self.text_list[0].lower():
self.type = ReportType.OSMOSIS_CONDITION_PAS_2035_REPORT
self.type = ReportType.WARM_HOMES_CONDITION_REPORT
elif "Domna NEW PAS 2035 ECO Condition Report".lower() in self.text_list[0].lower():
self.type = ReportType.ECO_CONDITION_REPORT
elif "ENERGY REPORT".lower() == self.text_list[0].lower():
self.type = ReportType.RDSAP_ENERGY_REPORT
else:
pass
return self.type
@ -53,6 +58,10 @@ class pdfReaderToText():
return QuidosSiteNotesExtractor(self.text_list)
elif self.type == ReportType.CHARTED_SURVEYOR_REPORT:
return CSR(self.text_list)
elif self.type == ReportType.OSMOSIS_CONDITION_PAS_2035_REPORT:
return ConditionReport(self.text_list)
elif self.type == ReportType.WARM_HOMES_CONDITION_REPORT:
return WarmHomesConditionReport(self.text_list)
elif self.type == ReportType.ECO_CONDITION_REPORT:
return ECOConditionReport(self.text_list)
elif self.type == ReportType.RDSAP_ENERGY_REPORT:
return RDSAPEnergyReport(self.text_list)

View file

@ -7,4 +7,6 @@ class ReportType(Enum):
ENERGY_PERFORMANCE_REPORT = "energy_performance_report"
U_VALUE_CALCULATOR_REPORT = "u_value_calculator_report"
OVERWRITING_U_VALUE_DECLARATION_FORM = "overwriting_u_value_declaration_form"
OSMOSIS_CONDITION_PAS_2035_REPORT = "osmosis_condition_pas_2035_report"
ECO_CONDITION_REPORT = "osmosis_condition_pas_2035_report"
WARM_HOMES_CONDITION_REPORT = "warm_homes_condition_pas_2035_report"
RDSAP_ENERGY_REPORT = "rdsap_energy_report"

View file

@ -88,11 +88,30 @@ class CSR(SiteNotesExtractor):
type=dict_.get('detailed_description_of_existing_cavity_wall_insulation_', "")
) if dict_ is not None else None
class ConditionReport(SiteNotesExtractor):
class RDSAPEnergyReport(SiteNotesExtractor):
def __init__(self, data_list):
super().__init__(data_list)
self.type = ReportType.OSMOSIS_CONDITION_PAS_2035_REPORT
self.type = ReportType.RDSAP_ENERGY_REPORT
self.master_obj = self.setup_energy_report()
def setup_energy_report(self):
pass
class ECOConditionReport(SiteNotesExtractor):
def __init__(self, data_list):
super().__init__(data_list)
self.type = ReportType.ECO_CONDITION_REPORT
self.master_obj = self.setup_condition_report()
def setup_condition_report(self):
pass
class WarmHomesConditionReport(SiteNotesExtractor):
def __init__(self, data_list):
super().__init__(data_list)
self.type = ReportType.WARM_HOMES_CONDITION_REPORT
self.master_obj = self.setup_condition_report()
def setup_condition_report(self):

View file

@ -81,7 +81,7 @@ class HubSpotClient():
def get_domna_and_landlord_id(self, deals_id):
data = self.get_listings_from_deals_id(deals_id)
return data.properties['domna_property_id'], data.properties['owner_property_id'], data.properties['national_uprn']
return data.properties['domna_property_id'], data.properties['owner_property_id'], data.properties.get('national_uprn', '') or ''
def get_notes_from_deals_id(self, deals_id):
from hubspot.crm.objects import PublicObjectSearchRequest
@ -211,7 +211,7 @@ class HubSpotClient():
try:
deal_name = deal.properties['dealname']
self.logger.info(f"Validating <{deal_name}>")
input
# input(f"Press enter to verfiy <{deal_name}>")
all_deals.append(SubmissionInfoFromDeal(
deal_id= deal.properties["hs_object_id"],
deal_name=deal.properties["dealname"],
@ -228,7 +228,7 @@ class HubSpotClient():
))
except Exception as e:
def format_error_note(e):
note_text = "⚠️ <b>Error occurred while verifying deal data:</b><br><br>"
note_text = "⚠️ <b>Automated Verification Failed:</b><br><br>"
if hasattr(e, "errors") and callable(e.errors):
note_text += "❌ <b>Validation Errors:</b><br>"
@ -267,7 +267,6 @@ class HubSpotClient():
self.logger.info(f"Deal name <{deal_name}> moving to 'needs additional information'")
self.move_deals_to_different_stage([deal_id], DealStage.NEEDS_ADDITIONAL_INFORMATION_FROM_ASSESSOR.value)
return all_deals
def print_all_pipeline_ids(self):

View file

@ -5,6 +5,7 @@ import uuid
from pydantic import Field, field_validator, model_validator
from etl.utils.utils import get_sharepoint_path
from etl.scraper.scraper import SharePointScraper, SharePointInstaller
from etl.surveyedData.surveryedData import surveyedDataProcessor
@ -16,7 +17,7 @@ def string_to_installer(installer):
elif installer.upper() == "SCIS":
return SharePointInstaller.SOUTH_COAST_INSULATION
elif installer.upper() == "SGEC":
return SharePointInstaller.SGEC
return SharePointInstaller.JJC
else:
return None
@ -40,7 +41,7 @@ class SubmissionInfoFromDeal(BaseModel):
submission_folder_path: str = Field(..., min_length=1)
landlord_id: str = Field(..., min_length=1)
domna_id: str = Field(..., min_length=1)
uprn: str = Field(..., min_length=1)
uprn: str
@field_validator('post_sap_score', 'no_of_wet_rooms')
@classmethod
@ -50,18 +51,37 @@ class SubmissionInfoFromDeal(BaseModel):
return v
@model_validator(mode="after")
def check_submission_folder_path(self):
errors = []
def check_sharepoint_link_and_contents(self):
try:
path = get_sharepoint_path(self.submission_folder_path)
installer = string_to_installer(self.installer)
sp = SharePointScraper(installer)
files = sp.get_folders_in_path(path)
if "value" in files and len(files["value"]) > 0:
return self
raise ValueError(f"SharePoint folder is empty: {self.submission_folder_path}")
except Exception as e:
raise ValueError(f"Error accessing SharePoint path: {self.submission_folder_path}. Error: {str(e)}")
raise ValueError(f"Error accessing SharePoint path: {self.submission_folder_path}. Error: {str(e)}")
try:
# Check if sharepoint link is reachable and has any contents
files = sp.get_folders_in_path(path)
if "value" in files and len(files["value"]) > 0:
pass
else:
raise ValueError(f"SharePoint folder is empty: {self.submission_folder_path}")
except Exception as e:
raise ValueError(str(e))
# download files in url and check files are there:
try:
files = sp.download_files_from_path(path)
print(files)
sdp = surveyedDataProcessor("fake address", files)
assert sdp.condition_report is not None, "Condition Report is missing"
assert sdp.energy_report is not None, "Energy Report pdf is missing"
except Exception as e:
raise ValueError(str(e))
return self

View file

@ -13,3 +13,8 @@ os.environ["DATABASE_URL"] = "postgresql://postgres:makingwarmhomes@db:5432/post
hubspotClient = HubSpotClient()
deals = hubspotClient.get_deals_from_deal_stage(DealStage.SURVEYED_COMPLETE_NEEDS_SIGN_OFF)
# TODO sanity address check
# TODO load

View file

@ -301,13 +301,46 @@ class SharePointScraper():
file_names_to_download.update({file["name"]: file['@microsoft.graph.downloadUrl']})
each_file = []
for file_name, url in file_names_to_download.items():
self.logger.info(f"Downloading {file_name} from {url}")
self.logger.debug(f"Downloading {file_name} from {url}")
content = self.get_file_content(url)
file_path = self.create_temp_file(content, f"{name}/{WEEK_COMMENCING}/{house_ass}/{address}/{file_name}")
each_file.append(file_path)
address_paths.update({address: each_file})
paths.append(address_paths)
return paths
def download_files_from_path(self, path):
"""
Download all non-media files from a list of root paths.
Args:
root_paths (List[str]): List of full folder paths to start from.
Returns:
List[Dict[str, List[str]]]: A list of dictionaries mapping address folder names to downloaded file paths.
"""
avoid = [".jpg", ".mov", ".JPG", ".heic", ".HEIC", ".png", ".PNG", ".jpeg", ".JPEG", ".mp4", ".MP4"]
files_info = self.get_folders_in_path(path)
if 'value' not in files_info:
raise RuntimeError(f"Failed to get files from {path}")
file_names_to_download = {
file["name"]: file["@microsoft.graph.downloadUrl"]
for file in files_info['value']
if 'file' in file and not any(file["name"].endswith(ext) for ext in avoid)
}
downloaded_files = []
for file_name, url in file_names_to_download.items():
self.logger.info(f"Downloading {file_name} from {url}")
content = self.get_file_content(url)
file_path = self.create_temp_file(content, f"{path}/{file_name}")
downloaded_files.append(file_path)
return downloaded_files
def create_temp_file(self, content, path):
# Ensure the path is under /tmp/
@ -320,6 +353,6 @@ class SharePointScraper():
with open(path, 'wb+') as temp_file:
temp_file.write(content.getvalue())
self.logger.info(f"Temporary file created at: {path}")
self.logger.debug(f"Temporary file created at: {path}")
return path

View file

@ -1,6 +1,7 @@
from etl.pdfReader.pdfReaderToText import pdfReaderToText
from etl.pdfReader.reportType import ReportType
import math
from xml.dom.minidom import parseString
from etl.models.preSiteNoteTypes import (
AssessorInfo, CompanyInfo,
PreSiteNotesSummaryInfo,
@ -38,21 +39,31 @@ class surveyedDataProcessor():
self.pre_site_note = None
self.csr = None
self.condition_report = None
self.identify_files()
self.hubspot_deal_id = None
self.energy_report = None
self.identify_files()
def identify_files(self):
for file in self.files:
pdf = pdfReaderToText(file)
if pdf:
if pdf.type == ReportType.QUIDOS_PRESITE_NOTE:
self.pre_site_note = pdf.get_reader()
self.address = self.pre_site_note.survey_information.address
elif pdf.type == ReportType.CHARTED_SURVEYOR_REPORT:
self.csr = pdf.get_reader()
elif pdf.type == ReportType.OSMOSIS_CONDITION_PAS_2035_REPORT:
self.condition_report = pdf.get_reader()
if file.lower().endswith('.pdf'):
pdf = pdfReaderToText(file)
if pdf:
if pdf.type == ReportType.QUIDOS_PRESITE_NOTE:
self.pre_site_note = pdf.get_reader()
self.address = self.pre_site_note.survey_information.address
elif pdf.type == ReportType.CHARTED_SURVEYOR_REPORT:
self.csr = pdf.get_reader()
elif pdf.type == ReportType.WARM_HOMES_CONDITION_REPORT:
self.condition_report = pdf.get_reader()
elif pdf.type == ReportType.ECO_CONDITION_REPORT:
self.condition_report = pdf.get_reader()
elif pdf.type == ReportType.RDSAP_ENERGY_REPORT:
self.energy_report = pdf.get_reader()
elif file.lower().endswith('.xml'):
print(f"identified an xml file {file.lower()}")
pass
def load_condition_report(self, db_session):
general_information = self.load_general_information_from_condition_report(db_session)

View file

@ -37,4 +37,4 @@ def get_sharepoint_path(url):
raise SharePointURLError(f"The URL does not contain 'id=' parameter. URL: {url}")
except (IndexError, ValueError) as e:
raise SharePointURLError(f"Error parsing SharePoint URL: {url}. Reason: {e}")
raise SharePointURLError(f"Error with SharePoint URL, please check {url}. Reason: {e}")