diff --git a/backend/app/db/models/organisation.py b/backend/app/db/models/organisation.py index e8649cdd..a9718c42 100644 --- a/backend/app/db/models/organisation.py +++ b/backend/app/db/models/organisation.py @@ -40,6 +40,30 @@ class HubspotDealData(SQLModel, table=True): coordination_status: Optional[str] = Field(default=None) design_status: Optional[str] = Field(default=None) + listing_id: Optional[str] = Field(default=None) + pashub_link: Optional[str] = Field(default=None) + sharepoint_link: Optional[str] = Field(default=None) + dampmould_growth: Optional[str] = Field(default=None) + pre_sap: Optional[str] = Field(default=None) + coordinator: Optional[str] = Field(default=None) + mtp_completion_date: Optional[datetime] = Field(default=None) + mtp_re_model_completion_date: Optional[datetime] = Field(default=None) + ioe_v3_completion_date: Optional[datetime] = Field(default=None) + proposed_measures: Optional[str] = Field(default=None) + approved_package: Optional[str] = Field(default=None) + designer: Optional[str] = Field(default=None) + design_completion_date: Optional[datetime] = Field(default=None) + actual_measures_installed: Optional[str] = Field(default=None) + installer: Optional[str] = Field(default=None) + installer_handover: Optional[str] = Field(default=None) + lodgement_status: Optional[str] = Field(default=None) + measures_lodgement_date: Optional[datetime] = Field(default=None) + lodgement_date: Optional[datetime] = Field(default=None) + expected_commencement_date: Optional[datetime] = Field(default=None) + surveyor: Optional[str] = Field(default=None) + confirmed_survey_date: Optional[datetime] = Field(default=None) + confirmed_survey_time: Optional[str] = Field(default=None) + created_at: datetime = Field( sa_column=Column( DateTime(timezone=True), diff --git a/backend/pashub_fetcher/handler/handler.py b/backend/pashub_fetcher/handler/handler.py index df7bb9dc..793b9d70 100644 --- a/backend/pashub_fetcher/handler/handler.py +++ b/backend/pashub_fetcher/handler/handler.py @@ -1,4 +1,5 @@ from datetime import datetime, timezone +import json import os import re from typing import Any, Dict, List, Mapping, Optional @@ -11,8 +12,12 @@ from backend.app.db.models.uploaded_file import ( UploadedFile, ) from backend.pashub_fetcher.core_files import infer_file_type + from backend.pashub_fetcher.job import Job from backend.pashub_fetcher.pashub_client import PashubClient, UnauthorizedError +from backend.pashub_fetcher.pashub_to_ara_trigger_request import ( + PashubToAraTriggerRequest, +) from backend.pashub_fetcher.sharepoint_subfolders import SharepointSubfolders from backend.pashub_fetcher.token_getter import get_token_from_local_storage from utils.logger import setup_logger @@ -72,21 +77,21 @@ def get_pashub_client(email: str, password: str) -> PashubClient: def upload_job_to_sharepoint( sharepoint_client: DomnaSharepointClient, - base_path: str, - job: Job, + # base_path: str, + sharepoint_link: str, job_files: List[str], ) -> None: - job_path = f"{base_path}/{job['address']}" + # job_path = f"{base_path}/{job['address']}" # Create main job folder - sharepoint_client.makedir(job["address"], base_path) + # sharepoint_client.makedir(job["address"], base_path) # Create subfolders - for folder in SharepointSubfolders: - sharepoint_client.makedir(folder.value, job_path) + # for folder in SharepointSubfolders: + # sharepoint_client.makedir(folder.value, job_path) # Upload into assessment folder - assessment_path = f"{job_path}/{SharepointSubfolders.ASSESSMENT.value}" + assessment_path = f"{sharepoint_link}/{SharepointSubfolders.ASSESSMENT.value}" for file_path in job_files: filename = file_path.split("/")[-1] @@ -131,14 +136,14 @@ def upload_job_to_s3_and_update_db(job_files: List[str], uprn: str) -> None: def process_job( - job: Job, + job: PashubToAraTriggerRequest, pashub_client: PashubClient, sharepoint_client: DomnaSharepointClient, - base_path: str, ) -> List[str]: - job_id = job["id"] + job_id = job.pashub_job_id + + uprn: Optional[str] = job.uprn or pashub_client.get_uprn_by_job_id(job_id) - uprn: Optional[str] = pashub_client.get_uprn_by_job_id(job_id) if uprn: logger.info(f"Got UPRN {uprn} for job {job_id}") else: @@ -150,24 +155,22 @@ def process_job( logger.info("Uploading files to s3") upload_job_to_s3_and_update_db(job_files, uprn) - upload_job_to_sharepoint(sharepoint_client, base_path, job, job_files) + # # Comment out sharepoint loading for now: + # Seems like the sharepoint link in pas hub is inconsistent in terms + # of whether it points to a property or a project + + # if job.sharepoint_link: + # upload_job_to_sharepoint(sharepoint_client, job.sharepoint_link, job_files) return job_files def handler(event: Mapping[str, Any], context: Any) -> None: + logger.info("Received message") + logger.info(f"Number of events: {len(event.get('Records', []))}") + settings = get_settings() - BASE_DIR = os.path.dirname(os.path.dirname(__file__)) - # filepath = os.path.join(BASE_DIR, "Watford_Warm_Homes_Wave_3_RA Downloads .xlsx") - filepath = os.path.join( - BASE_DIR, - "The_Guinness_Partnership_AtkinsR_alis_Coordination_Design_Board_1774881298.xlsx", - ) - - jobs: List[Job] = extract_jobs(filepath) - logger.info("Successfully loaded jobs from spreadsheet") - pas_hub_email = settings.PASHUB_EMAIL pas_hub_password = settings.PASHUB_PASSWORD @@ -180,36 +183,46 @@ def handler(event: Mapping[str, Any], context: Any) -> None: sharepoint_location=DomnaSites.SOCIAL_HOUSING_WAVE_3 ) - BASE_PATH = "/Osmosis-ACD Projects/Watford Warm Homes/Watford Property Folders (Shared with Client)" - saved_file_paths: List[str] = [] - for job in jobs: + for record in event.get("Records", []): try: - files = process_job( - job, - pashub_client, - sharepoint_client, - BASE_PATH, - ) - saved_file_paths.extend(files) + body_dict = json.loads(record["body"]) + logger.debug("Validating request body") - except UnauthorizedError: - logger.warning("Token expired - refreshing") + payload = PashubToAraTriggerRequest.model_validate(body_dict) - pashub_client = get_pashub_client( - pas_hub_email, - pas_hub_password, - ) + logger.debug("Successfully validated request body") - # retry once - files = process_job( - job, - pashub_client, - sharepoint_client, - BASE_PATH, - ) - saved_file_paths.extend(files) + try: + files: List[str] = process_job( + payload, + pashub_client, + sharepoint_client, + ) + saved_file_paths.extend(files) + + except UnauthorizedError: + logger.warning("Token expired - refreshing") + + pashub_client = get_pashub_client( + pas_hub_email, + pas_hub_password, + ) + + # retry once + files: List[str] = process_job( + payload, + pashub_client, + sharepoint_client, + ) + saved_file_paths.extend(files) + + except Exception as e: + logger.info("Handler exception") + logger.error(f"Failed to process record: {e}") + + logger.info("Successfully loaded jobs from spreadsheet") logger.info(f"Saved {len(saved_file_paths)} files") diff --git a/etl/hubspot/hubspotClient.py b/etl/hubspot/hubspotClient.py index 8bbe8a63..e5461c61 100644 --- a/etl/hubspot/hubspotClient.py +++ b/etl/hubspot/hubspotClient.py @@ -189,6 +189,7 @@ class HubspotClient: ) listing_info: dict[str, str] = cast(dict[str, str], listing.properties) # type: ignore[reportUnknownMemberType] + listing_info["listing_id"] = listing_id self.logger.info(f"Listing info for deal {deal_id}: {listing_info}") return listing_info @@ -201,13 +202,35 @@ class HubspotClient: "dealname", "dealstage", "pipeline", - "outcome", # outcome, - "outcome_notes", # outcome notes + "outcome", + "outcome_notes", "project_code", "major_condition_issue_description", "major_condition_issue_photos", - "coordination_status__stage_1_", # Coordiantion Status (Stage 1), - "retrofit_design_status", # Retrofit Design Status + "coordination_status__stage_1_", + "retrofit_design_status", + "pashub_link", + "sharepoint_link", + "dampmould_growth", + "pre_sap", + "coordinator", + "mtp_completion_date", + "mtp_re_model_completion_date", + "ioe_v3_completion_date", + "proposed_measures", + "approved_package", + "designer", + "design_completion_date", + "actual_measures_installed", + "installer", + "installer_handover", + "lodgement_status", + "measures_lodgement_date", + "lodgement_date", + "expected_commencement_date", + "surveyor", + "confirmed_survey_date", + "confirmed_survey_time", ], ) diff --git a/etl/hubspot/hubspotDataTodB.py b/etl/hubspot/hubspotDataTodB.py index f7f79e46..0c38f483 100644 --- a/etl/hubspot/hubspotDataTodB.py +++ b/etl/hubspot/hubspotDataTodB.py @@ -2,7 +2,7 @@ from backend.app.db.connection import db_read_session from backend.app.db.models.organisation import Organisation, HubspotDealData from sqlmodel import select from datetime import datetime, timezone -from typing import TypedDict +from typing import TypedDict, Optional from etl.hubspot.s3_uploader import S3Uploader import hashlib import os @@ -82,6 +82,14 @@ class HubspotDataToDb: .one_or_none() ) + def _parse_hs_date(self, value: Optional[str]) -> Optional[datetime]: + if not value: + return None + try: + return datetime.fromisoformat(value.replace("Z", "+00:00")) + except ValueError: + return None + def _sha256(self, file_path: str) -> str: """Compute SHA-256 checksum of a file.""" sha256 = hashlib.sha256() @@ -114,6 +122,10 @@ class HubspotDataToDb: deal_in_db.deal_id == hs_deal.get("hs_object_id"), "deal_id mismatch" ), soft_assert(deal_in_db.company_id == hs_company_id, "company_id mismatch"), + soft_assert( + deal_in_db.listing_id == hs_listing.get("listing_id"), + "listing_id mismatch", + ), soft_assert( deal_in_db.landlord_property_id == hs_listing.get("owner_property_id"), "landlord_property_id mismatch", @@ -157,6 +169,94 @@ class HubspotDataToDb: deal_in_db.design_status == hs_deal.get("retrofit_design_status"), "retrofit design mismatch", ), + soft_assert( + deal_in_db.pashub_link == hs_deal.get("pashub_link"), + "pashub_link mismatch", + ), + soft_assert( + deal_in_db.sharepoint_link == hs_deal.get("sharepoint_link"), + "sharepoint_link mismatch", + ), + soft_assert( + deal_in_db.dampmould_growth == hs_deal.get("dampmould_growth"), + "dampmould_growth mismatch", + ), + soft_assert( + deal_in_db.pre_sap == hs_deal.get("pre_sap"), + "pre_sap mismatch", + ), + soft_assert( + deal_in_db.coordinator == hs_deal.get("coordinator"), + "coordinator mismatch", + ), + soft_assert( + deal_in_db.mtp_completion_date == self._parse_hs_date(hs_deal.get("mtp_completion_date")), + "mtp_completion_date mismatch", + ), + soft_assert( + deal_in_db.mtp_re_model_completion_date == self._parse_hs_date(hs_deal.get("mtp_re_model_completion_date")), + "mtp_re_model_completion_date mismatch", + ), + soft_assert( + deal_in_db.ioe_v3_completion_date == self._parse_hs_date(hs_deal.get("ioe_v3_completion_date")), + "ioe_v3_completion_date mismatch", + ), + soft_assert( + deal_in_db.proposed_measures == hs_deal.get("proposed_measures"), + "proposed_measures mismatch", + ), + soft_assert( + deal_in_db.approved_package == hs_deal.get("approved_package"), + "approved_package mismatch", + ), + soft_assert( + deal_in_db.designer == hs_deal.get("designer"), + "designer mismatch", + ), + soft_assert( + deal_in_db.design_completion_date == self._parse_hs_date(hs_deal.get("design_completion_date")), + "design_completion_date mismatch", + ), + soft_assert( + deal_in_db.actual_measures_installed == hs_deal.get("actual_measures_installed"), + "actual_measures_installed mismatch", + ), + soft_assert( + deal_in_db.installer == hs_deal.get("installer"), + "installer mismatch", + ), + soft_assert( + deal_in_db.installer_handover == hs_deal.get("installer_handover"), + "installer_handover mismatch", + ), + soft_assert( + deal_in_db.lodgement_status == hs_deal.get("lodgement_status"), + "lodgement_status mismatch", + ), + soft_assert( + deal_in_db.measures_lodgement_date == self._parse_hs_date(hs_deal.get("measures_lodgement_date")), + "measures_lodgement_date mismatch", + ), + soft_assert( + deal_in_db.lodgement_date == self._parse_hs_date(hs_deal.get("lodgement_date")), + "lodgement_date mismatch", + ), + soft_assert( + deal_in_db.expected_commencement_date == self._parse_hs_date(hs_deal.get("expected_commencement_date")), + "expected_commencement_date mismatch", + ), + soft_assert( + deal_in_db.surveyor == hs_deal.get("surveyor"), + "surveyor mismatch", + ), + soft_assert( + deal_in_db.confirmed_survey_date == self._parse_hs_date(hs_deal.get("confirmed_survey_date")), + "confirmed_survey_date mismatch", + ), + soft_assert( + deal_in_db.confirmed_survey_time == hs_deal.get("confirmed_survey_time"), + "confirmed_survey_time mismatch", + ), ] # If discrepancies found, update from HubSpot @@ -211,6 +311,9 @@ class HubspotDataToDb: f"⚠️ Failed to download/upload photo for deal_id {deal_in_db.deal_id}: {e}" ) # Continue without the file — don't crash the entire update + finally: + if "local_file" in locals() and os.path.exists(local_file): + os.remove(local_file) else: print(f"⚠️ Photo URL missing for deal_id {deal_in_db.deal_id}") @@ -238,6 +341,7 @@ class HubspotDataToDb: for attr, value in { "dealname": deal_data.get("dealname"), "dealstage": deal_data.get("dealstage"), + "listing_id": listing.get("listing_id"), "landlord_property_id": listing.get("owner_property_id"), "uprn": listing.get("national_uprn"), "outcome": deal_data.get("outcome"), @@ -250,16 +354,32 @@ class HubspotDataToDb: "major_condition_issue_photos": deal_data.get( "major_condition_issue_photos" ), - "major_condition_issue_description": deal_data.get( - "major_condition_issue_description" - ), - "major_condition_issue_photos": deal_data.get( - "major_condition_issue_photos" - ), "coordination_status": deal_data.get( "coordination_status__stage_1_" ), "design_status": deal_data.get("retrofit_design_status"), + "pashub_link": deal_data.get("pashub_link"), + "sharepoint_link": deal_data.get("sharepoint_link"), + "dampmould_growth": deal_data.get("dampmould_growth"), + "pre_sap": deal_data.get("pre_sap"), + "coordinator": deal_data.get("coordinator"), + "mtp_completion_date": self._parse_hs_date(deal_data.get("mtp_completion_date")), + "mtp_re_model_completion_date": self._parse_hs_date(deal_data.get("mtp_re_model_completion_date")), + "ioe_v3_completion_date": self._parse_hs_date(deal_data.get("ioe_v3_completion_date")), + "proposed_measures": deal_data.get("proposed_measures"), + "approved_package": deal_data.get("approved_package"), + "designer": deal_data.get("designer"), + "design_completion_date": self._parse_hs_date(deal_data.get("design_completion_date")), + "actual_measures_installed": deal_data.get("actual_measures_installed"), + "installer": deal_data.get("installer"), + "installer_handover": deal_data.get("installer_handover"), + "lodgement_status": deal_data.get("lodgement_status"), + "measures_lodgement_date": self._parse_hs_date(deal_data.get("measures_lodgement_date")), + "lodgement_date": self._parse_hs_date(deal_data.get("lodgement_date")), + "expected_commencement_date": self._parse_hs_date(deal_data.get("expected_commencement_date")), + "surveyor": deal_data.get("surveyor"), + "confirmed_survey_date": self._parse_hs_date(deal_data.get("confirmed_survey_date")), + "confirmed_survey_time": deal_data.get("confirmed_survey_time"), }.items(): setattr(existing, attr, value or getattr(existing, attr)) @@ -288,6 +408,9 @@ class HubspotDataToDb: f"⚠️ Failed to download photo for deal_id {existing.deal_id}: {e}" ) # Continue without the file — don't crash the update + finally: + if "local_file" in locals() and os.path.exists(local_file): + os.remove(local_file) else: print(f"⚠️ Photo URL missing for deal_id {existing.deal_id}") @@ -302,6 +425,7 @@ class HubspotDataToDb: deal_id=deal_id, dealname=deal_data.get("dealname"), dealstage=deal_data.get("dealstage"), + listing_id=listing.get("listing_id"), landlord_property_id=listing.get("owner_property_id"), uprn=listing.get("national_uprn"), outcome=deal_data.get("outcome"), @@ -316,6 +440,28 @@ class HubspotDataToDb: ), coordination_status=deal_data.get("coordination_status__stage_1_"), design_status=deal_data.get("retrofit_design_status"), + pashub_link=deal_data.get("pashub_link"), + sharepoint_link=deal_data.get("sharepoint_link"), + dampmould_growth=deal_data.get("dampmould_growth"), + pre_sap=deal_data.get("pre_sap"), + coordinator=deal_data.get("coordinator"), + mtp_completion_date=self._parse_hs_date(deal_data.get("mtp_completion_date")), + mtp_re_model_completion_date=self._parse_hs_date(deal_data.get("mtp_re_model_completion_date")), + ioe_v3_completion_date=self._parse_hs_date(deal_data.get("ioe_v3_completion_date")), + proposed_measures=deal_data.get("proposed_measures"), + approved_package=deal_data.get("approved_package"), + designer=deal_data.get("designer"), + design_completion_date=self._parse_hs_date(deal_data.get("design_completion_date")), + actual_measures_installed=deal_data.get("actual_measures_installed"), + installer=deal_data.get("installer"), + installer_handover=deal_data.get("installer_handover"), + lodgement_status=deal_data.get("lodgement_status"), + measures_lodgement_date=self._parse_hs_date(deal_data.get("measures_lodgement_date")), + lodgement_date=self._parse_hs_date(deal_data.get("lodgement_date")), + expected_commencement_date=self._parse_hs_date(deal_data.get("expected_commencement_date")), + surveyor=deal_data.get("surveyor"), + confirmed_survey_date=self._parse_hs_date(deal_data.get("confirmed_survey_date")), + confirmed_survey_time=deal_data.get("confirmed_survey_time"), ) # Handle upload at insert time @@ -335,6 +481,9 @@ class HubspotDataToDb: f"⚠️ Failed to download photo for deal_id {new_record.deal_id}: {e}" ) # Continue without the file — don't crash the insert + finally: + if "local_file" in locals() and os.path.exists(local_file): + os.remove(local_file) session.add(new_record) session.commit() diff --git a/etl/hubspot/scripts/scraper/bulk_load.py b/etl/hubspot/scripts/scraper/bulk_load.py new file mode 100644 index 00000000..6fac23ea --- /dev/null +++ b/etl/hubspot/scripts/scraper/bulk_load.py @@ -0,0 +1,40 @@ +from etl.hubspot.hubspotClient import HubspotClient, Companies, Pipeline +from etl.hubspot.scripts.scraper.main import handler +from tqdm import tqdm + + +PIPELINE_ID = Pipeline.OPERATIONS_SOCIAL_HOUSING.value + +companies = list([Companies.THE_GUINESS_PARTNERSHIP, Companies.SOUTHERN_HOUSING_GROUP]) + + +def bulk_load(companies: list[Companies] | None = None) -> None: + """ + Load all deals from the given companies (defaults to all Companies enum values) + into the database, filtered to the Operations/Social Housing pipeline. + """ + hubspot = HubspotClient() + targets = companies or list(Companies) + + for company in tqdm(targets, desc="Companies", unit="co"): + company_id = company.value + deal_ids = hubspot.get_deal_ids_from_company(company_id) + + processed = 0 + with tqdm(deal_ids, desc=company.name, unit="deal", leave=False) as deal_bar: + for deal_id in deal_bar: + deal_data = hubspot.from_deal_id_get_info(deal_id) + if deal_data.get("pipeline") != PIPELINE_ID: + deal_bar.set_postfix({"status": "skip", "deal": deal_id}) + continue + + deal_bar.set_postfix({"status": "uploading", "deal": deal_id}) + handler({"hubspot_deal_id": deal_id}, context=None) + processed += 1 + deal_bar.set_postfix({"status": "done", "deal": deal_id}) + + tqdm.write(f"[{company.name}] {processed}/{len(deal_ids)} deals in pipeline") + + +if __name__ == "__main__": + bulk_load(companies) diff --git a/etl/hubspot/scripts/scraper/main.py b/etl/hubspot/scripts/scraper/main.py index 48864b22..f5afef52 100644 --- a/etl/hubspot/scripts/scraper/main.py +++ b/etl/hubspot/scripts/scraper/main.py @@ -4,7 +4,7 @@ 3) [completed] Load the db and check if upsert it into the table 4) [completed]Getting working on a AWS lambda 5) [completed] subtask and tasks history -6) [TODO]The new sexy deal properties, move it over +6) [completed]The new sexy deal properties, move it over """ from etl.hubspot.hubspotClient import HubspotClient diff --git a/infrastructure/terraform/lambda/hubspot_deal_etl/main.tf b/infrastructure/terraform/lambda/hubspot_deal_etl/main.tf index 051c7154..0f5cc6ba 100644 --- a/infrastructure/terraform/lambda/hubspot_deal_etl/main.tf +++ b/infrastructure/terraform/lambda/hubspot_deal_etl/main.tf @@ -35,10 +35,13 @@ module "hubspot_deal_etl" { LOG_LEVEL = "info" DB_USERNAME = local.db_credentials.db_assessment_model_username DB_PASSWORD = local.db_credentials.db_assessment_model_password + DB_HOST = var.db_host + DB_NAME = var.db_name + DB_PORT = var.db_port } } resource "aws_iam_role_policy_attachment" "lambda_s3_policy" { - role = module.lambda.role_name + role = module.hubspot_deal_etl.role_name policy_arn = data.terraform_remote_state.shared.outputs.hubspot_etl_s3_read_and_write_arn } \ No newline at end of file diff --git a/sfr/principal_pitch/2_export_data.py b/sfr/principal_pitch/2_export_data.py index c89560cb..fece17e0 100644 --- a/sfr/principal_pitch/2_export_data.py +++ b/sfr/principal_pitch/2_export_data.py @@ -26,13 +26,13 @@ from backend.app.db.functions.materials_functions import get_materials from collections import defaultdict from sqlalchemy import func -PORTFOLIO_ID = 639 -SCENARIOS = [1157] +PORTFOLIO_ID = 640 +SCENARIOS = [1154] scenario_names = { - 1157: "EPC C - no EWI solid floor", + 1154: "EPC - 10k Budget", } -project_name = "Instagroup Sample" +project_name = "First Charterhouse Investments" def get_data(portfolio_id, scenario_ids):