diff --git a/backend/app/db/functions/uploaded_files_functions.py b/backend/app/db/functions/uploaded_files_functions.py new file mode 100644 index 00000000..3708813a --- /dev/null +++ b/backend/app/db/functions/uploaded_files_functions.py @@ -0,0 +1,25 @@ +from typing import Optional + +from sqlalchemy import select + +from backend.app.db.connection import db_read_session +from backend.app.db.models.uploaded_file import ( + FileSourceEnum, + FileTypeEnum, + UploadedFile, +) + + +def get_uploaded_file_by_listing_type_and_source( + hubspot_listing_id: int, + file_type: FileTypeEnum, + file_source: FileSourceEnum, +) -> Optional[UploadedFile]: + with db_read_session() as session: + statement = select(UploadedFile).where( + UploadedFile.hubspot_listing_id == hubspot_listing_id, + UploadedFile.file_type == file_type, + UploadedFile.file_source == file_source, + ) + + return session.exec(statement).one_or_none() diff --git a/backend/app/db/models/uploaded_file.py b/backend/app/db/models/uploaded_file.py index 9b751d34..71763790 100644 --- a/backend/app/db/models/uploaded_file.py +++ b/backend/app/db/models/uploaded_file.py @@ -14,6 +14,8 @@ class FileTypeEnum(enum.Enum): PAR_PHOTO_PACK = "par_photo_pack" PAS_2023_PROPERTY = "pas_2023_property" PAS_2023_OCCUPANCY = "pas_2023_occupancy" + ECMK_SITE_NOTE = "ecmk_site_note" + ECMK_RD_SAP_SITE_NOTE = "ecmk_rd_sap_site_note" class FileSourceEnum(enum.Enum): @@ -37,9 +39,21 @@ class UploadedFile(Base): hubspot_listing_id = Column(BigInteger, nullable=True) file_type = Column( - SqlEnum(FileTypeEnum, name="file_type", create_type=False), nullable=True + SqlEnum( + FileTypeEnum, + name="file_type", + create_type=False, + values_callable=lambda enum_cls: [e.value for e in enum_cls], + ), + nullable=True, ) file_source = Column( - SqlEnum(FileSourceEnum, name="file_source", create_type=False), nullable=True + SqlEnum( + FileSourceEnum, + name="file_source", + create_type=False, + values_callable=lambda enum_cls: [e.value for e in enum_cls], + ), + nullable=True, ) diff --git a/backend/ecmk_fetcher/address_list.py b/backend/ecmk_fetcher/address_list.py index d273c45d..ba636a70 100644 --- a/backend/ecmk_fetcher/address_list.py +++ b/backend/ecmk_fetcher/address_list.py @@ -1,40 +1,59 @@ -from typing import Dict, Optional -from openpyxl import load_workbook import re +from dataclasses import dataclass +from typing import Any, Dict, Optional +from openpyxl import Workbook, load_workbook +from openpyxl.worksheet.worksheet import Worksheet -def extract_addresses_from_spreadsheet(filepath: str) -> Dict[str, str]: - wb = load_workbook(filepath, data_only=True) - ws = wb["Southern RA-Lite Programme 3103"] +@dataclass +class PropertyRow: + row_index: int + address: str + listing_id: str - properties: Dict[str, str] = {} - header_row = 1 - id_col_index = None - deal_name_col_index = None +def extract_addresses_from_spreadsheet( + filepath: str, +) -> Dict[str, PropertyRow]: + wb: Workbook = load_workbook(filepath, data_only=True) + ws: Worksheet = wb["Southern RA-Lite Programme 3103"] + header_row: int = 1 + id_col: Optional[int] = None + deal_name_col: Optional[int] = None + listing_id_col: Optional[int] = None + + # find columns for col in range(1, ws.max_column + 1): - value = ws.cell(row=header_row, column=col).value + raw_value: Any = ws.cell(row=header_row, column=col).value + value: str = str(raw_value).strip().lower() if raw_value else "" - if value and str(value).strip().lower() == "id": - id_col_index = col + if value == "id": + id_col = col + elif value == "deal name": + deal_name_col = col + elif value == "associated listing ids": + listing_id_col = col - if value and str(value).strip().lower() == "deal name": - deal_name_col_index = col - break + if id_col is None or deal_name_col is None or listing_id_col is None: + raise Exception("Missing required columns") - if id_col_index is None or deal_name_col_index is None: - raise Exception("Required columns not found") + properties: Dict[str, PropertyRow] = {} for row in range(2, ws.max_row + 1): - id_val = ws.cell(row=row, column=id_col_index).value - deal_name = ws.cell(row=row, column=deal_name_col_index).value + id_val: Any = ws.cell(row=row, column=id_col).value + deal_name: Any = ws.cell(row=row, column=deal_name_col).value + listing_id: Any = ws.cell(row=row, column=listing_id_col).value - if not id_val or not deal_name: + if not id_val or not deal_name or not listing_id: continue - properties[str(id_val).strip()] = extract_succinct_address( - str(deal_name).strip() + property_id: str = str(id_val).strip() + + properties[property_id] = PropertyRow( + row_index=row, + address=extract_succinct_address(str(deal_name)), + listing_id=listing_id, ) return properties diff --git a/backend/ecmk_fetcher/browser.py b/backend/ecmk_fetcher/browser.py index 6d018537..de349b92 100644 --- a/backend/ecmk_fetcher/browser.py +++ b/backend/ecmk_fetcher/browser.py @@ -50,6 +50,7 @@ def get_first_row_signature(page: Page) -> str: def go_to_next_page(page: Page) -> bool: + logger.info("Going to next page") before = get_first_row_signature(page) page.locator("#assessmentDatatable_next a").click() diff --git a/backend/ecmk_fetcher/handler/Dockerfile b/backend/ecmk_fetcher/handler/Dockerfile new file mode 100644 index 00000000..fa2126fd --- /dev/null +++ b/backend/ecmk_fetcher/handler/Dockerfile @@ -0,0 +1,26 @@ +FROM mcr.microsoft.com/playwright/python:v1.58.0-jammy + +# Install AWS Lambda RIE +ADD https://github.com/aws/aws-lambda-runtime-interface-emulator/releases/latest/download/aws-lambda-rie /usr/local/bin/aws-lambda-rie +RUN chmod +x /usr/local/bin/aws-lambda-rie + +# Set working directory (Lambda task root) +WORKDIR /var/task + +COPY backend/ecmk_fetcher/handler/requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY utils/ utils/ +COPY backend/ backend/ +COPY datatypes/ datatypes/ + +# Local lambda entrypoint +# ENTRYPOINT ["/usr/local/bin/aws-lambda-rie", "python", "-m", "awslambdaric"] + +#AWS lambda entrypoint +ENTRYPOINT ["python", "-m", "awslambdaric"] + +# ----------------------------- +# Lambda handler +# ----------------------------- +CMD ["backend.ecmk_fetcher.handler.handler.handler"] \ No newline at end of file diff --git a/backend/ecmk_fetcher/handler/handler.py b/backend/ecmk_fetcher/handler/handler.py index 4ce3a949..b777cc9f 100644 --- a/backend/ecmk_fetcher/handler/handler.py +++ b/backend/ecmk_fetcher/handler/handler.py @@ -1,9 +1,13 @@ from typing import Any, Mapping from backend.ecmk_fetcher.processor import run_job +from utils.logger import setup_logger + +logger = setup_logger() def handler(event: Mapping[str, Any], context: Any) -> None: + logger.info("Entered handler") run_job() diff --git a/backend/ecmk_fetcher/handler/requirements.txt b/backend/ecmk_fetcher/handler/requirements.txt new file mode 100644 index 00000000..2692484e --- /dev/null +++ b/backend/ecmk_fetcher/handler/requirements.txt @@ -0,0 +1,12 @@ +awslambdaric +playwright==1.58.0 +msal +openpyxl +sqlalchemy==2.0.36 +sqlmodel +pytz==2024.2 +psycopg2-binary==2.9.10 +pydantic-settings==2.6.0 +boto3==1.35.44 +pandas==2.2.2 +numpy<2.0 \ No newline at end of file diff --git a/backend/ecmk_fetcher/local_handler/docker-compose.yml b/backend/ecmk_fetcher/local_handler/docker-compose.yml new file mode 100644 index 00000000..fd642499 --- /dev/null +++ b/backend/ecmk_fetcher/local_handler/docker-compose.yml @@ -0,0 +1,11 @@ +version: "3.9" + +services: + ecmk-fetcher-lambda: + build: + context: ../../../ + dockerfile: backend/ecmk_fetcher/handler/Dockerfile + ports: + - "9000:8080" + env_file: + - ../../../.env \ No newline at end of file diff --git a/backend/ecmk_fetcher/local_handler/invoke_local_lambda.py b/backend/ecmk_fetcher/local_handler/invoke_local_lambda.py new file mode 100644 index 00000000..ba76301e --- /dev/null +++ b/backend/ecmk_fetcher/local_handler/invoke_local_lambda.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 +import json +import requests + +HOST = "localhost" +PORT = "9000" + +LAMBDA_URL = f"http://{HOST}:{PORT}/2015-03-31/functions/function/invocations" + +payload = { + "Records": [ + { + "body": json.dumps( + { + "test": 123456, + } + ) + } + ] +} + +response = requests.post(LAMBDA_URL, json=payload) + +print("Status code:", response.status_code) +print("Response:") +print(response.text) diff --git a/backend/ecmk_fetcher/processor.py b/backend/ecmk_fetcher/processor.py index 1852b867..2f122080 100644 --- a/backend/ecmk_fetcher/processor.py +++ b/backend/ecmk_fetcher/processor.py @@ -1,6 +1,5 @@ import os -from typing import Dict, List - +from typing import Dict from playwright.sync_api import ( sync_playwright, Locator, @@ -9,7 +8,14 @@ from playwright.sync_api import ( BrowserContext, ) -from backend.ecmk_fetcher.address_list import extract_addresses_from_spreadsheet +from backend.app.db.functions.uploaded_files_functions import ( + get_uploaded_file_by_listing_type_and_source, +) +from backend.app.db.models.uploaded_file import FileSourceEnum, FileTypeEnum +from backend.ecmk_fetcher.address_list import ( + PropertyRow, + extract_addresses_from_spreadsheet, +) from backend.ecmk_fetcher.browser import ( attach_debug_listeners, download_with_retry, @@ -18,14 +24,25 @@ from backend.ecmk_fetcher.browser import ( go_to_next_page, login, ) -from backend.ecmk_fetcher.reports import REPORT_TYPES, build_property_id -from backend.ecmk_fetcher.sharepoint import upload_file_to_sharepoint +from backend.ecmk_fetcher.reports import ( + REPORT_TYPES, + build_property_id, + map_report_type_to_db_file_type, +) +from backend.ecmk_fetcher.upload import ( + upload_file_to_s3_and_update_db, + upload_file_to_sharepoint, +) +from utils.logger import setup_logger from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient from utils.sharepoint.domna_sites import DomnaSites +logger = setup_logger() + def run_job() -> None: - username: str = "" + + username: str = "" # TODO: get from github secrets password: str = "" property_list_file: str = ( @@ -35,8 +52,7 @@ def run_job() -> None: BASE_DIR: str = os.path.dirname(__file__) filepath: str = os.path.join(BASE_DIR, property_list_file) - property_map: Dict[str, str] = extract_addresses_from_spreadsheet(filepath) - property_ids: List[str] = list(property_map.keys()) + property_map: Dict[str, PropertyRow] = extract_addresses_from_spreadsheet(filepath) sharepoint_client: DomnaSharepointClient = DomnaSharepointClient( sharepoint_location=DomnaSites.PRIVATE_PAY @@ -44,6 +60,8 @@ def run_job() -> None: sharepoint_base_path: str = "/Projects/Southern Housing/SH-SURV-26-001/Assessments" + s3_bucket: str = "retrofit-energy-assessments-dev" + with sync_playwright() as p: browser: Browser = p.chromium.launch(headless=True) context: BrowserContext = browser.new_context() @@ -79,14 +97,38 @@ def run_job() -> None: property_id: str = build_property_id(address, postcode) - if property_id not in property_ids: + property_row: PropertyRow | None = property_map.get(property_id) + + if not property_row: continue - sharepoint_address: str = property_map[property_id] + logger.info(f"Match found for property {address}") + + sharepoint_address: str = property_row.address go_to_assessment_details(page, row) for report_type in REPORT_TYPES: + hubspot_listing_id: str = property_row.listing_id + try: + db_file_type: FileTypeEnum = ( + map_report_type_to_db_file_type(report_type) + ) + + except ValueError: + logger.error( + f"Unknown report type {report_type}, skipping file" + ) + continue + + if get_uploaded_file_by_listing_type_and_source( + hubspot_listing_id=int(hubspot_listing_id), + file_type=db_file_type, + file_source=FileSourceEnum.ECMK, + ): + logger.debug("File already uploaded to s3, skipping") + continue + file_path: str | None = download_with_retry( page, report_type ) @@ -94,6 +136,10 @@ def run_job() -> None: if not file_path: continue + logger.info( + f"Successfully downloaded file {os.path.basename(file_path)} from ECMK" + ) + try: upload_file_to_sharepoint( client=sharepoint_client, @@ -101,6 +147,20 @@ def run_job() -> None: base_path=sharepoint_base_path, subpath=sharepoint_address, ) + logger.info( + f"Successfully loaded {os.path.basename(file_path)} to sharepoint for {address}" + ) + + # Upload to s3 and update db + upload_file_to_s3_and_update_db( + bucket=s3_bucket, + file_path=file_path, + hubspot_listing_id=hubspot_listing_id, + file_type=db_file_type, + ) + + except Exception: + raise finally: if os.path.exists(file_path): os.remove(file_path) diff --git a/backend/ecmk_fetcher/reports.py b/backend/ecmk_fetcher/reports.py index a8f12792..d8d11d50 100644 --- a/backend/ecmk_fetcher/reports.py +++ b/backend/ecmk_fetcher/reports.py @@ -1,5 +1,7 @@ from enum import Enum +from backend.app.db.models.uploaded_file import FileTypeEnum + class FileDownloadButtonType(Enum): ASSESSOR_HUB_SITENOTE_REPORT = 11 @@ -15,6 +17,16 @@ REPORT_TYPES = [ ] +def map_report_type_to_db_file_type(report_type: int) -> FileTypeEnum: + match report_type: + case FileDownloadButtonType.ASSESSOR_HUB_SITENOTE_REPORT.value: + return FileTypeEnum.ECMK_SITE_NOTE + case FileDownloadButtonType.SITENOTE_REPORT.value: + return FileTypeEnum.ECMK_RD_SAP_SITE_NOTE + case _: + raise ValueError("Unknown report type") + + def build_report_selector(report_type: int) -> str: return f"a.download-report-btn[data-report-type='{report_type}']" diff --git a/backend/ecmk_fetcher/sharepoint.py b/backend/ecmk_fetcher/sharepoint.py deleted file mode 100644 index 79db1294..00000000 --- a/backend/ecmk_fetcher/sharepoint.py +++ /dev/null @@ -1,20 +0,0 @@ -import os - -from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient - - -def upload_file_to_sharepoint( - client: DomnaSharepointClient, - file_path: str, - base_path: str, - subpath: str, -) -> None: - filename = os.path.basename(file_path) - - full_path = f"{base_path}/{subpath}/1. Retrofit Assessment/A. Assessment" - - client.upload_file( - file_path=file_path, - sharepoint_path=full_path, - file_name=filename, - ) diff --git a/backend/ecmk_fetcher/upload.py b/backend/ecmk_fetcher/upload.py new file mode 100644 index 00000000..0a744e53 --- /dev/null +++ b/backend/ecmk_fetcher/upload.py @@ -0,0 +1,51 @@ +from datetime import datetime, timezone +import os + +from backend.app.db.connection import db_session +from backend.app.db.models.uploaded_file import ( + FileSourceEnum, + FileTypeEnum, + UploadedFile, +) +from utils.s3 import upload_file_to_s3 +from utils.sharepoint.domna_sharepoint_client import DomnaSharepointClient + + +def upload_file_to_sharepoint( + client: DomnaSharepointClient, + file_path: str, + base_path: str, + subpath: str, +) -> None: + filename = os.path.basename(file_path) + + full_path = f"{base_path}/{subpath}/1. Retrofit Assessment/A. Assessment" + + client.upload_file( + file_path=file_path, + sharepoint_path=full_path, + file_name=filename, + ) + + +def upload_file_to_s3_and_update_db( + bucket: str, file_path: str, hubspot_listing_id: str, file_type: FileTypeEnum +) -> None: + filename: str = os.path.basename(file_path) + key: str = f"documents/hubspot_listing_id/{hubspot_listing_id}/{filename}" + + upload_file_to_s3(file_path, bucket, key) + + uploaded_file = UploadedFile( + s3_file_bucket=bucket, + s3_file_key=key, + s3_upload_timestamp=datetime.now(timezone.utc), + hubspot_listing_id=hubspot_listing_id, + file_source=FileSourceEnum.ECMK.value, + file_type=file_type.value, + ) + + with db_session() as session: + # TODO: we should do multiple files at once to reduce db trips + session.add(uploaded_file) + session.commit() diff --git a/infrastructure/terraform/lambda/ecmk_to_ara/main.tf b/infrastructure/terraform/lambda/ecmk_to_ara/main.tf new file mode 100644 index 00000000..357c2f87 --- /dev/null +++ b/infrastructure/terraform/lambda/ecmk_to_ara/main.tf @@ -0,0 +1,27 @@ +data "terraform_remote_state" "shared" { + backend = "s3" + config = { + bucket = "assessment-model-terraform-state" + key = "env:/${var.stage}/terraform.tfstate" + region = "eu-west-2" + } +} + +module "lambda" { + source = "../../modules/lambda_with_sqs" + + name = "ecmk_to_ara" #"address2uprn" for example + stage = var.stage + + image_uri = local.image_uri + + # Optional: Set maximum_concurrency to limit concurrent SQS-triggered invocations (2-1000) + maximum_concurrency = var.maximum_concurrency + + batch_size = var.batch_size + + environment = { + STAGE = var.stage + LOG_LEVEL = "info" + } +} diff --git a/infrastructure/terraform/lambda/ecmk_to_ara/provider.tf b/infrastructure/terraform/lambda/ecmk_to_ara/provider.tf new file mode 100644 index 00000000..87a94150 --- /dev/null +++ b/infrastructure/terraform/lambda/ecmk_to_ara/provider.tf @@ -0,0 +1,16 @@ +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + version = ">= 5.0" + } + } + + backend "s3" { + bucket = "ecmk-to-ara-terraform-state" + key = "terraform.tfstate" + region = "eu-west-2" + } + + required_version = ">= 1.2.0" +} \ No newline at end of file diff --git a/infrastructure/terraform/lambda/ecmk_to_ara/variables.tf b/infrastructure/terraform/lambda/ecmk_to_ara/variables.tf new file mode 100644 index 00000000..984e3908 --- /dev/null +++ b/infrastructure/terraform/lambda/ecmk_to_ara/variables.tf @@ -0,0 +1,37 @@ +variable "lambda_name" { + type = string + description = "Logical name of the lambda (e.g. address2uprn)" +} + +variable "stage" { + description = "Deployment stage (e.g. dev, prod)" + type = string +} +variable "ecr_repo_url" { + type = string + description = "ECR repository URL (no tag, no digest)" +} + +variable "image_digest" { + type = string + description = "Image digest (sha256:...)" +} + +variable "maximum_concurrency" { + type = number + default = 2 + description = "Maximum number of concurrent Lambda invocations from SQS (2-1000). null = no limit." +} + +variable "batch_size" { + type = number + default = 1 +} + +locals { + image_uri = "${var.ecr_repo_url}@${var.image_digest}" +} + +output "resolved_image_uri" { + value = local.image_uri +} diff --git a/infrastructure/terraform/shared/main.tf b/infrastructure/terraform/shared/main.tf index 9d272eb6..47866c92 100644 --- a/infrastructure/terraform/shared/main.tf +++ b/infrastructure/terraform/shared/main.tf @@ -538,6 +538,20 @@ module "pashub_to_ara_registry" { stage = var.stage } +################################################ +# ECMK to Ara – Lambda +################################################ +module "ecmk_to_ara_state_bucket" { + source = "../modules/tf_state_bucket" + bucket_name = "ecmk-to-ara-terraform-state" +} + +module "ecmk_to_ara_registry" { + source = "../modules/container_registry" + name = "ecmk_to_ara" + stage = var.stage +} + ################################################ # Engine – Lambda ECR ################################################