diff --git a/.devcontainer/backend/Dockerfile b/.devcontainer/backend/Dockerfile index 6a1cc120..a92d37f6 100644 --- a/.devcontainer/backend/Dockerfile +++ b/.devcontainer/backend/Dockerfile @@ -1,15 +1,23 @@ -FROM python:3.11.10-bullseye +FROM python:3.11.10-bookworm ARG USER=vscode +ARG USER_UID=1000 +ARG USER_GID=1000 ARG DEBIAN_FRONTEND=noninteractive -# 1) Toolchain + utilities for building libpostal +# 1) Toolchain + utilities for building libpostal, plus LazyVim deps RUN apt-get update && apt-get install -y --no-install-recommends \ - sudo jq vim curl git ca-certificates \ + sudo jq vim curl git ca-certificates wget \ build-essential pkg-config automake autoconf libtool \ + ripgrep fd-find make unzip \ && rm -rf /var/lib/apt/lists/* +# Neovim latest (LazyVim needs >=0.9) +RUN curl -fsSL https://github.com/neovim/neovim/releases/latest/download/nvim-linux-x86_64.tar.gz \ + | tar -xz -C /opt \ + && ln -s /opt/nvim-linux-x86_64/bin/nvim /usr/local/bin/nvim + # # 2) Build and install libpostal from source # RUN git clone --depth 1 https://github.com/openvenues/libpostal /tmp/libpostal \ # && cd /tmp/libpostal \ @@ -21,7 +29,8 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ # && rm -rf /tmp/libpostal # 3) Create the user and grant sudo privileges -RUN useradd -m -s /usr/bin/bash ${USER} \ +RUN groupadd -g ${USER_GID} ${USER} \ + && useradd -m -u ${USER_UID} -g ${USER_GID} -s /usr/bin/bash ${USER} \ && echo "${USER} ALL=(ALL) NOPASSWD: ALL" >/etc/sudoers.d/${USER} \ && chmod 0440 /etc/sudoers.d/${USER} @@ -64,4 +73,23 @@ RUN apt install -y wget gnupg2 lsb-release RUN echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" | sudo tee /etc/apt/sources.list.d/pgdg.list RUN wget -qO - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add - RUN apt update -RUN apt install -y postgresql-14 \ No newline at end of file +RUN apt install -y postgresql-14 + +# Install Node.js + backlog.md +RUN curl -fsSL https://deb.nodesource.com/setup_20.x | bash - \ + && apt-get install -y nodejs \ + && npm install -g backlog.md \ + && rm -rf /var/lib/apt/lists/* + +USER ${USER} + +# Bootstrap LazyVim starter config +RUN git clone https://github.com/LazyVim/starter /home/${USER}/.config/nvim \ + && rm -rf /home/${USER}/.config/nvim/.git +# Install Claude +RUN curl -fsSL https://claude.ai/install.sh | bash \ + && export PATH="/home/${USER}/.local/bin:${PATH}" \ + && claude plugin marketplace add JuliusBrussee/caveman \ + && claude plugin install caveman@caveman +ENV PATH="/home/vscode/.local/bin:${PATH}" +USER root diff --git a/.devcontainer/backend/devcontainer.json b/.devcontainer/backend/devcontainer.json index 48a58bd6..a9b7352a 100644 --- a/.devcontainer/backend/devcontainer.json +++ b/.devcontainer/backend/devcontainer.json @@ -6,7 +6,7 @@ "workspaceFolder": "/workspaces/model", "postStartCommand": "bash .devcontainer/backend/post-install.sh", "mounts": [ - // "source=${localEnv:HOME},target=/home/vscode,type=bind", + "source=${localEnv:HOME},target=/workspaces/home,type=bind", "source=${localEnv:HOME}/.aws,target=/home/vscode/.aws,type=bind,consistency=cached" ], "customizations": { @@ -43,6 +43,17 @@ }, "containerEnv": { "PYTHONFLAGS": "-Xfrozen_modules=off" + }, + "forwardPorts": [6421, 8000], + "portsAttributes": { + "6421": { + "label": "Backlog.md", + "onAutoForward": "notify" + }, + "8000": { + "label": "FastAPI", + "onAutoForward": "notify" + } } } \ No newline at end of file diff --git a/.devcontainer/backend/docker-compose.yml b/.devcontainer/backend/docker-compose.yml index 683b4489..757cfbe0 100644 --- a/.devcontainer/backend/docker-compose.yml +++ b/.devcontainer/backend/docker-compose.yml @@ -2,13 +2,23 @@ version: '3.8' services: model-backend: - user: "${UID}:${GID}" build: context: ../.. dockerfile: .devcontainer/backend/Dockerfile + args: + USER_UID: ${UID:-1000} + USER_GID: ${GID:-1000} command: sleep infinity + ports: + - "8000:8000" volumes: - ../../:/workspaces/model + - ~/.gitconfig:/home/vscode/.gitconfig:ro + environment: + - SSH_AUTH_SOCK=${SSH_AUTH_SOCK:-} + networks: + - backend-net + - shared-dev db: @@ -22,7 +32,15 @@ services: - POSTGRES_PASSWORD=makingwarmerhomes volumes: - postgres-data-two:/var/lib/postgresql/data + networks: + - backend-net +networks: + backend-net: + driver: bridge + shared-dev: + external: true + volumes: postgres-data-two: \ No newline at end of file diff --git a/.github/workflows/deploy_terraform.yml b/.github/workflows/deploy_terraform.yml index 22f16fee..398232c6 100644 --- a/.github/workflows/deploy_terraform.yml +++ b/.github/workflows/deploy_terraform.yml @@ -201,6 +201,46 @@ jobs: AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }} AWS_REGION: ${{ secrets.DEV_AWS_REGION }} + # ============================================================ + # Build Bulk Address2UPRN Combiner image and Push + # ============================================================ + bulk_address2uprn_combiner_image: + needs: [determine_stage, shared_terraform] + uses: ./.github/workflows/_build_image.yml + with: + ecr_repo: bulk_address2uprn_combiner-${{ needs.determine_stage.outputs.stage }} + dockerfile_path: backend/bulk_address2uprn_combiner/handler/Dockerfile + build_context: . + build_args: | + DEV_DB_HOST=$DEV_DB_HOST + DEV_DB_PORT=$DEV_DB_PORT + DEV_DB_NAME=$DEV_DB_NAME + secrets: + AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }} + AWS_REGION: ${{ secrets.DEV_AWS_REGION }} + DEV_DB_HOST: ${{ secrets.DEV_DB_HOST }} + DEV_DB_PORT: ${{ secrets.DEV_DB_PORT }} + DEV_DB_NAME: ${{ secrets.DEV_DB_NAME }} + + # ============================================================ + # Deploy Bulk Address2UPRN Combiner Lambda + # ============================================================ + bulk_address2uprn_combiner_lambda: + needs: [bulk_address2uprn_combiner_image, determine_stage, shared_terraform] + uses: ./.github/workflows/_deploy_lambda.yml + with: + lambda_name: bulk_address2uprn_combiner + lambda_path: infrastructure/terraform/lambda/bulk_address2uprn_combiner + stage: ${{ needs.determine_stage.outputs.stage }} + ecr_repo: bulk_address2uprn_combiner-${{ needs.determine_stage.outputs.stage }} + image_digest: ${{ needs.bulk_address2uprn_combiner_image.outputs.image_digest }} + terraform_apply: ${{ needs.determine_stage.outputs.terraform_apply }} + secrets: + AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }} + AWS_REGION: ${{ secrets.DEV_AWS_REGION }} + # ============================================================ # Condition ETL image and Push # ============================================================ @@ -332,17 +372,10 @@ jobs: ecr_repo: pashub_to_ara-${{ needs.determine_stage.outputs.stage }} dockerfile_path: backend/pashub_fetcher/handler/Dockerfile build_context: . - build_args: | - DEV_DB_HOST=$DEV_DB_HOST - DEV_DB_PORT=$DEV_DB_PORT - DEV_DB_NAME=$DEV_DB_NAME secrets: AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }} AWS_REGION: ${{ secrets.DEV_AWS_REGION }} - DEV_DB_HOST: ${{ secrets.DEV_DB_HOST }} - DEV_DB_PORT: ${{ secrets.DEV_DB_PORT }} - DEV_DB_NAME: ${{ secrets.DEV_DB_NAME }} # ============================================================ @@ -362,6 +395,9 @@ jobs: AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }} AWS_REGION: ${{ secrets.DEV_AWS_REGION }} + TF_VAR_db_host: ${{ secrets.DEV_DB_HOST }} + TF_VAR_db_name: ${{ secrets.DEV_DB_NAME }} + TF_VAR_db_port: ${{ secrets.DEV_DB_PORT }} TF_VAR_sharepoint_client_id: ${{ secrets.SHAREPOINT_CLIENT_ID }} TF_VAR_sharepoint_client_secret: ${{ secrets.SHAREPOINT_CLIENT_SECRET }} TF_VAR_sharepoint_tenant_id: ${{ secrets.SHAREPOINT_TENANT_ID }} @@ -377,7 +413,7 @@ jobs: # Deploy FastAPI Lambda # ============================================================ fast_api_lambda: - needs: [determine_stage, ara_engine_lambda, categorisation_lambda] + needs: [determine_stage, ara_engine_lambda, categorisation_lambda, postcodeSplitter_lambda, bulk_address2uprn_combiner_lambda] uses: ./.github/workflows/_deploy_lambda.yml with: lambda_name: ara_fast_api diff --git a/.gitignore b/.gitignore index 299e03d4..ee5aa8ab 100644 --- a/.gitignore +++ b/.gitignore @@ -278,6 +278,11 @@ cache/ *.png *.pptx +*.csv +*.xlsx +*.pdf +**/Chunks/ +*.ipynb local_data* @@ -285,4 +290,5 @@ local_data* pyrightconfig.json # playwright output -*/pashub_fetcher/videos/* \ No newline at end of file +*/pashub_fetcher/videos/* +backlog/* diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 00000000..aa0426a0 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,29 @@ + + + + + +## BACKLOG WORKFLOW INSTRUCTIONS + +This project uses Backlog.md MCP for all task and project management activities. + +**CRITICAL GUIDANCE** + +- If your client supports MCP resources, read `backlog://workflow/overview` to understand when and how to use Backlog for this project. +- If your client only supports tools or the above request fails, call `backlog.get_backlog_instructions()` to load the tool-oriented overview. Use the `instruction` selector when you need `task-creation`, `task-execution`, or `task-finalization`. + +- **First time working here?** Read the overview resource IMMEDIATELY to learn the workflow +- **Already familiar?** You should have the overview cached ("## Backlog.md Overview (MCP)") +- **When to read it**: BEFORE creating tasks, or when you're unsure whether to track work + +These guides cover: +- Decision framework for when to create tasks +- Search-first workflow to avoid duplicates +- Links to detailed guides for task creation, execution, and finalization +- MCP tools reference + +You MUST read the overview resource to understand the complete workflow. The information is NOT summarized here. + + + + diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 00000000..de2917f2 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,30 @@ + + + + + +## BACKLOG WORKFLOW INSTRUCTIONS + +This project uses Backlog.md MCP for all task and project management activities. + +**CRITICAL GUIDANCE** + +- If your client supports MCP resources, read `backlog://workflow/overview` to understand when and how to use Backlog for this project. +- If your client only supports tools or the above request fails, call `backlog.get_backlog_instructions()` to load the tool-oriented overview. Use the `instruction` selector when you need `task-creation`, `task-execution`, or `task-finalization`. + +- **First time working here?** Read the overview resource IMMEDIATELY to learn the workflow +- **Already familiar?** You should have the overview cached ("## Backlog.md Overview (MCP)") +- **When to read it**: BEFORE creating tasks, or when you're unsure whether to track work + +These guides cover: +- Decision framework for when to create tasks +- Search-first workflow to avoid duplicates +- Links to detailed guides for task creation, execution, and finalization +- MCP tools reference + +You MUST read the overview resource to understand the complete workflow. The information is NOT summarized here. + + + + + diff --git a/asset_list/app.py b/asset_list/app.py index b0030667..49ec48a0 100644 --- a/asset_list/app.py +++ b/asset_list/app.py @@ -74,23 +74,23 @@ def app(): """ data_folder = "/workspaces/model/asset_list" - data_filename = "Waverley UPRN Match.xlsx" + data_filename = "2026-04-22T08_22_00.779745_61049fd3.xlsx" sheet_name = "in" postcode_column = "postcode_clean" - address1_column = "domna_found_address" + address1_column = "address2uprn_address" address1_method = None - fulladdress_column = "domna_found_address" + fulladdress_column = "address2uprn_address" address_cols_to_concat = [] missing_postcodes_method = None landlord_year_built = None - landlord_os_uprn = "domna_found_uprn" - landlord_property_type = "Property Type 1" # Good to include if landlord gave - landlord_built_form = None # Good to include if landlord gave + landlord_os_uprn = "address2uprn_uprn" + landlord_property_type = "Property Type" # Good to include if landlord gave + landlord_built_form = "Built Form" # Good to include if landlord gave landlord_wall_construction = None landlord_roof_construction = None landlord_heating_system = None landlord_existing_pv = None - landlord_property_id = "WBC Ref" + landlord_property_id = "UPRN" landlord_sap = None outcomes_filename = None outcomes_sheetname = None diff --git a/backend/address2UPRN/main.py b/backend/address2UPRN/main.py index 79c0de69..28ad344f 100644 --- a/backend/address2UPRN/main.py +++ b/backend/address2UPRN/main.py @@ -563,14 +563,14 @@ def handler(event, context, local=False): except Exception as s3_error: logger.error(f"Failed to save results to S3: {s3_error}") - # Mark subtask as completed + # Mark subtask as complete try: subtask_interface.update_subtask_status( subtask_id, - "completed", + "complete", outputs={"rows_processed": "todo -> show sensible output"}, ) - logger.info(f"Marked subtask {subtask_id} as completed") + logger.info(f"Marked subtask {subtask_id} as complete") except Exception as db_error: logger.error(f"Failed to mark subtask as completed: {db_error}") diff --git a/backend/app/bulk_uploads/__init__.py b/backend/app/bulk_uploads/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/app/bulk_uploads/router.py b/backend/app/bulk_uploads/router.py new file mode 100644 index 00000000..9928b456 --- /dev/null +++ b/backend/app/bulk_uploads/router.py @@ -0,0 +1,178 @@ +import boto3 +from collections import Counter +from uuid import UUID +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlmodel import select + +from backend.app.dependencies import validate_token +from backend.app.config import get_settings +from backend.app.db.connection import get_db_session +from backend.app.db.models.bulk_address_uploads import BulkAddressUpload +from backend.app.bulk_uploads.schema import ( + CombinedResultRow, + CombinedResultsResponse, + CombinerTriggerRequest, + FlagsSummary, + PostcodeSplitterTriggerRequest, +) +from backend.app.bulk_uploads.scoring import score_bucket + +ADDRESS_COLS = ("Address 1", "Address 2", "Address 3", "postcode") +INTERNAL_REF_COL = "Internal Reference" +UPRN_COL = "address2uprn_uprn" +MATCHED_ADDRESS_COL = "address2uprn_address" +LEXISCORE_COL = "address2uprn_lexiscore" +MISSING_SENTINEL = "invalid postcode" + + +def _normalize(value) -> str: + if value is None: + return "" + return str(value).strip() + + +def _is_missing_uprn(uprn: str) -> bool: + return uprn == "" or uprn.lower() == MISSING_SENTINEL + + +def _parse_lexiscore(raw) -> float | None: + val = _normalize(raw) + if not val or val.lower() == MISSING_SENTINEL: + return None + try: + return float(val) + except ValueError: + return None + + +router = APIRouter( + prefix="/bulk-uploads", + tags=["bulk-uploads"], + dependencies=[Depends(validate_token)], +) + + +@router.post("/trigger-postcode-splitter", status_code=202) +async def trigger_postcode_splitter(req: PostcodeSplitterTriggerRequest): + settings = get_settings() + + try: + sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION) + response = sqs.send_message( + QueueUrl=settings.POSTCODE_SPLITTER_SQS_URL, + MessageBody=req.model_dump_json(), + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"SQS error: {e}") + + return { + "task_id": req.task_id, + "sub_task_id": req.sub_task_id, + "sqs_message_id": response.get("MessageId"), + } + + +@router.post("/trigger-combiner", status_code=202) +async def trigger_combiner(req: CombinerTriggerRequest): + settings = get_settings() + + try: + sqs = boto3.client("sqs", settings.AWS_DEFAULT_REGION) + response = sqs.send_message( + QueueUrl=settings.COMBINER_SQS_URL, + MessageBody=req.model_dump_json(), + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"SQS error: {e}") + + return { + "task_id": req.task_id, + "sub_task_id": req.sub_task_id, + "sqs_message_id": response.get("MessageId"), + } + + +@router.get("/{task_id}/combined-results", response_model=CombinedResultsResponse) +async def get_combined_results( + task_id: UUID, + offset: int = Query(0, ge=0), + limit: int = Query(500, ge=1, le=5000), +): + with get_db_session() as session: + upload = session.exec( + select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id) + ).first() + + if upload is None: + raise HTTPException(status_code=404, detail="Upload not found for task_id") + if not upload.combined_output_s3_uri: + raise HTTPException(status_code=409, detail="Combiner not finished") + + bucket, key = parse_s3_uri(upload.combined_output_s3_uri) + try: + raw_rows = read_csv_from_s3(bucket, key) + except Exception as e: + raise HTTPException(status_code=502, detail=f"Failed to read combined CSV: {e}") + + uprn_values = [_normalize(r.get(UPRN_COL)) for r in raw_rows] + uprn_counts = Counter(u for u in uprn_values if not _is_missing_uprn(u)) + duplicate_uprns = {u for u, count in uprn_counts.items() if count >= 2} + + missing_count = sum(1 for u in uprn_values if _is_missing_uprn(u)) + duplicate_count = sum(1 for u in uprn_values if u in duplicate_uprns) + matched_count = len(raw_rows) - missing_count + + page = raw_rows[offset : offset + limit] + rows: list[CombinedResultRow] = [] + for i, raw in enumerate(page): + absolute_index = offset + i + address_parts = [ + _normalize(raw.get(col)) for col in ADDRESS_COLS if _normalize(raw.get(col)) + ] + input_address = ", ".join(address_parts) + internal_ref = _normalize(raw.get(INTERNAL_REF_COL)) or None + + uprn_raw = _normalize(raw.get(UPRN_COL)) + uprn = None if _is_missing_uprn(uprn_raw) else uprn_raw + + matched_address_raw = _normalize(raw.get(MATCHED_ADDRESS_COL)) + matched_address = ( + None + if not matched_address_raw + or matched_address_raw.lower() == MISSING_SENTINEL + else matched_address_raw + ) + + lexiscore = _parse_lexiscore(raw.get(LEXISCORE_COL)) + + flags: list[str] = [] + if uprn is None: + flags.append("missing") + elif uprn in duplicate_uprns: + flags.append("duplicate") + + rows.append( + CombinedResultRow( + row_index=absolute_index, + input_address=input_address, + internal_reference=internal_ref, + uprn=uprn, + matched_address=matched_address, + lexiscore=lexiscore, + score_bucket=score_bucket(lexiscore), + flags=flags, + ) + ) + + return CombinedResultsResponse( + task_id=str(task_id), + total=len(raw_rows), + offset=offset, + limit=limit, + flags_summary=FlagsSummary( + duplicates=duplicate_count, + missing=missing_count, + matched=matched_count, + ), + rows=rows, + ) diff --git a/backend/app/bulk_uploads/schema.py b/backend/app/bulk_uploads/schema.py new file mode 100644 index 00000000..ca3b39ea --- /dev/null +++ b/backend/app/bulk_uploads/schema.py @@ -0,0 +1,40 @@ +from typing import List, Literal, Optional + +from pydantic import BaseModel + + +class PostcodeSplitterTriggerRequest(BaseModel): + task_id: str + sub_task_id: str + s3_uri: str + + +class CombinerTriggerRequest(BaseModel): + task_id: str + sub_task_id: str + + +class FlagsSummary(BaseModel): + duplicates: int + missing: int + matched: int + + +class CombinedResultRow(BaseModel): + row_index: int + input_address: str + internal_reference: Optional[str] = None + uprn: Optional[str] = None + matched_address: Optional[str] = None + lexiscore: Optional[float] = None + score_bucket: Optional[Literal["high", "med", "low"]] = None + flags: List[Literal["duplicate", "missing"]] = [] + + +class CombinedResultsResponse(BaseModel): + task_id: str + total: int + offset: int + limit: int + flags_summary: FlagsSummary + rows: List[CombinedResultRow] diff --git a/backend/app/bulk_uploads/scoring.py b/backend/app/bulk_uploads/scoring.py new file mode 100644 index 00000000..5ed946fb --- /dev/null +++ b/backend/app/bulk_uploads/scoring.py @@ -0,0 +1,14 @@ +from typing import Optional + +HIGH_THRESHOLD = 0.85 +MED_THRESHOLD = 0.65 + + +def score_bucket(score: Optional[float]) -> Optional[str]: + if score is None: + return None + if score >= HIGH_THRESHOLD: + return "high" + if score >= MED_THRESHOLD: + return "med" + return "low" diff --git a/backend/app/config.py b/backend/app/config.py index 9532ddd6..70a6b50c 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -39,6 +39,8 @@ class Settings(BaseSettings): ENGINE_SQS_URL: str = "changeme" CATEGORISATION_SQS_URL: str = "changeme" PASHUB_TO_ARA_SQS_URL: str = "changeme" + POSTCODE_SPLITTER_SQS_URL: str = "changeme" + COMBINER_SQS_URL: str = "changeme" # Third parties EPC_AUTH_TOKEN: str = "changeme" diff --git a/backend/app/db/functions/bulk_address_uploads_functions.py b/backend/app/db/functions/bulk_address_uploads_functions.py new file mode 100644 index 00000000..6252fe20 --- /dev/null +++ b/backend/app/db/functions/bulk_address_uploads_functions.py @@ -0,0 +1,36 @@ +from uuid import UUID +from datetime import datetime, timezone + +from sqlmodel import select + +from backend.app.db.connection import get_db_session +from backend.app.db.models.bulk_address_uploads import BulkAddressUpload + + +def set_combining_status(task_id: UUID) -> None: + now = datetime.now(timezone.utc) + with get_db_session() as session: + row = session.exec( + select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id) + ).first() + if not row: + raise ValueError(f"No bulk_address_uploads row for task_id {task_id}") + row.status = "combining" + row.updated_at = now + session.add(row) + session.commit() + + +def set_combined_output_s3_uri(task_id: UUID, s3_uri: str) -> None: + now = datetime.now(timezone.utc) + with get_db_session() as session: + row = session.exec( + select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id) + ).first() + if not row: + raise ValueError(f"No bulk_address_uploads row for task_id {task_id}") + row.combined_output_s3_uri = s3_uri + row.status = "awaiting_review" + row.updated_at = now + session.add(row) + session.commit() diff --git a/backend/app/db/models/bulk_address_uploads.py b/backend/app/db/models/bulk_address_uploads.py new file mode 100644 index 00000000..0d87ff47 --- /dev/null +++ b/backend/app/db/models/bulk_address_uploads.py @@ -0,0 +1,24 @@ +from typing import Optional +from uuid import UUID, uuid4 +from datetime import datetime + +from sqlalchemy import Column +from sqlalchemy.dialects.postgresql import JSONB +from sqlmodel import SQLModel, Field + + +class BulkAddressUpload(SQLModel, table=True): + __tablename__ = "bulk_address_uploads" + + id: UUID = Field(default_factory=uuid4, primary_key=True, index=True) + portfolio_id: str = Field(nullable=False) + user_id: str = Field(nullable=False) + s3_bucket: str = Field(nullable=False) + s3_key: str = Field(nullable=False) + filename: str = Field(nullable=False) + status: str = Field(default="ready_for_processing", nullable=False) + column_mapping: Optional[dict] = Field(default=None, sa_column=Column(JSONB)) + task_id: Optional[UUID] = Field(default=None) + combined_output_s3_uri: Optional[str] = Field(default=None) + created_at: datetime = Field(default_factory=datetime.utcnow) + updated_at: datetime = Field(default_factory=datetime.utcnow) diff --git a/backend/app/db/models/hubspot_deal_data.py b/backend/app/db/models/hubspot_deal_data.py index 758f688d..fa508fbe 100644 --- a/backend/app/db/models/hubspot_deal_data.py +++ b/backend/app/db/models/hubspot_deal_data.py @@ -38,6 +38,14 @@ class HubspotDealData(SQLModel, table=True): dampmould_growth: Optional[str] = Field(default=None) damp_mould_and_repairs_comments: Optional[str] = Field(default=None) pre_sap: Optional[str] = Field(default=None) + batch: Optional[str] = Field(default=None) + block_reference: Optional[str] = Field(default=None) + epc_prn: Optional[str] = Field(default=None) + potential_post_sap_score_dropdown: Optional[str] = Field(default=None) + ei_score: Optional[str] = Field(default=None) + ei_score__potential_: Optional[str] = Field(default=None) + epc_sap_score: Optional[str] = Field(default=None) + epc_sap_score__potential_: Optional[str] = Field(default=None) coordinator: Optional[str] = Field(default=None) mtp_completion_date: Optional[datetime] = Field(default=None) mtp_re_model_completion_date: Optional[datetime] = Field(default=None) diff --git a/backend/app/db/models/uploaded_file.py b/backend/app/db/models/uploaded_file.py index 5b34a752..a516a1df 100644 --- a/backend/app/db/models/uploaded_file.py +++ b/backend/app/db/models/uploaded_file.py @@ -38,6 +38,7 @@ class UploadedFile(Base): landlord_property_id = Column(Text, nullable=True) uprn = Column(BigInteger, nullable=True) hubspot_listing_id = Column(BigInteger, nullable=True) + hubspot_deal_id = Column(Text, nullable=True) file_type = Column( SqlEnum( diff --git a/backend/app/main.py b/backend/app/main.py index f0ab4d86..c9733c18 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -9,6 +9,7 @@ from backend.app.portfolio import router as portfolio_router from backend.app.whlg import router as whlg_router from backend.app.plan import router as plan_router from backend.app.tasks import router as tasks_router +from backend.app.bulk_uploads import router as bulk_uploads_router from backend.app.dependencies import validate_api_key from backend.app.config import get_settings @@ -59,6 +60,7 @@ app.include_router(portfolio_router.router, prefix="/v1") app.include_router(plan_router.router, prefix="/v1") app.include_router(whlg_router.router, prefix="/v1") app.include_router(tasks_router.router, prefix="/v1") +app.include_router(bulk_uploads_router.router, prefix="/v1") if get_settings().ENVIRONMENT == "local": from app.local import router as local_router @@ -75,6 +77,7 @@ from mangum import Mangum from backend.app.portfolio import router as portfolio_router from backend.app.whlg import router as whlg_router from backend.app.plan import router as plan_router +from backend.app.bulk_uploads import router as bulk_uploads_router from backend.app.dependencies import validate_api_key from backend.app.config import get_settings @@ -124,6 +127,7 @@ async def log_requests(request: Request, call_next): app.include_router(portfolio_router.router, prefix="/v1") app.include_router(plan_router.router, prefix="/v1") app.include_router(whlg_router.router, prefix="/v1") +app.include_router(bulk_uploads_router.router, prefix="/v1") if get_settings().ENVIRONMENT == "local": from app.local import router as local_router diff --git a/backend/bulk_address2uprn_combiner/__init__.py b/backend/bulk_address2uprn_combiner/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/bulk_address2uprn_combiner/handler/Dockerfile b/backend/bulk_address2uprn_combiner/handler/Dockerfile new file mode 100644 index 00000000..35f91d09 --- /dev/null +++ b/backend/bulk_address2uprn_combiner/handler/Dockerfile @@ -0,0 +1,23 @@ +FROM public.ecr.aws/lambda/python:3.11 + +ARG DEV_DB_HOST +ARG DEV_DB_PORT +ARG DEV_DB_NAME + +ENV DB_HOST=${DEV_DB_HOST} +ENV DB_PORT=${DEV_DB_PORT} +ENV DB_NAME=${DEV_DB_NAME} + +WORKDIR /var/task + +COPY backend/bulk_address2uprn_combiner/handler/requirements.txt . + +RUN pip install --no-cache-dir -r requirements.txt + +COPY utils/ utils/ +COPY backend/ backend/ +COPY datatypes/ datatypes/ + +COPY backend/bulk_address2uprn_combiner/main.py . + +CMD ["main.handler"] diff --git a/backend/bulk_address2uprn_combiner/handler/requirements.txt b/backend/bulk_address2uprn_combiner/handler/requirements.txt new file mode 100644 index 00000000..14d13e0d --- /dev/null +++ b/backend/bulk_address2uprn_combiner/handler/requirements.txt @@ -0,0 +1,7 @@ +pandas==2.2.2 +numpy<2.0 +boto3==1.35.44 +sqlmodel +sqlalchemy==2.0.36 +psycopg2-binary==2.9.10 +pydantic-settings==2.6.0 diff --git a/backend/bulk_address2uprn_combiner/main.py b/backend/bulk_address2uprn_combiner/main.py new file mode 100644 index 00000000..44f0b3f9 --- /dev/null +++ b/backend/bulk_address2uprn_combiner/main.py @@ -0,0 +1,79 @@ +import os +import boto3 +import pandas as pd +from io import BytesIO +from typing import Any +from uuid import UUID +from datetime import datetime, timezone + +from utils.logger import setup_logger +from backend.utils.subtasks import subtask_handler +from backend.app.db.functions.bulk_address_uploads_functions import ( + set_combined_output_s3_uri, + set_combining_status, +) + +logger = setup_logger() + +S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME") + + +def list_csv_files(s3_client, bucket: str, task_id: str) -> list[str]: + paginator = s3_client.get_paginator("list_objects_v2") + prefix = f"ara_raw_outputs/{task_id}/" + keys = [] + for page in paginator.paginate(Bucket=bucket, Prefix=prefix): + for obj in page.get("Contents", []): + if obj["Key"].endswith(".csv"): + keys.append(obj["Key"]) + return keys + + +def download_csv(s3_client, bucket: str, key: str) -> pd.DataFrame: + obj = s3_client.get_object(Bucket=bucket, Key=key) + return pd.read_csv(BytesIO(obj["Body"].read())) + + +@subtask_handler() +def handler(body: dict[str, Any], context: Any) -> str: + task_id_str: str = body.get("task_id", "") + + if not task_id_str: + raise RuntimeError("Missing task_id in message body") + + set_combining_status(UUID(task_id_str)) + + bucket = S3_BUCKET_NAME + if not bucket: + raise RuntimeError("S3_BUCKET_NAME env var not set") + + s3 = boto3.client("s3") + + logger.info(f"Combining ara_raw_outputs for task {task_id_str}") + + csv_keys = list_csv_files(s3, bucket, task_id_str) + if not csv_keys: + raise RuntimeError(f"No CSV files found under ara_raw_outputs/{task_id_str}/") + + logger.info(f"Found {len(csv_keys)} CSV files") + + dfs = [download_csv(s3, bucket, key) for key in csv_keys] + combined = pd.concat(dfs, ignore_index=True) + logger.info(f"Combined {len(combined)} rows from {len(dfs)} files") + + timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S") + output_key = f"bulk_final_outputs/{task_id_str}/combined_{timestamp}.csv" + + csv_buffer = BytesIO() + combined.to_csv(csv_buffer, index=False) + csv_buffer.seek(0) + s3.put_object(Bucket=bucket, Key=output_key, Body=csv_buffer.getvalue()) + + s3_uri = f"s3://{bucket}/{output_key}" + logger.info(f"Saved combined CSV to {s3_uri}") + print(f"OUTPUT_S3_URI: {s3_uri}") + + set_combined_output_s3_uri(UUID(task_id_str), s3_uri) + logger.info(f"Persisted combined_output_s3_uri + awaiting_review status for task {task_id_str}") + + return s3_uri diff --git a/backend/ecmk_fetcher/upload.py b/backend/ecmk_fetcher/upload.py index 8cb451b0..cc2c908d 100644 --- a/backend/ecmk_fetcher/upload.py +++ b/backend/ecmk_fetcher/upload.py @@ -40,6 +40,7 @@ def upload_excel_to_sharepoint( ) +# TODO: this should be moved to somewhere common and called by pashub fetcher def upload_file_to_s3_and_update_db( bucket: str, file_path: str, hubspot_listing_id: str, file_type: FileTypeEnum ) -> None: diff --git a/backend/export/property_scenarios/main.py b/backend/export/property_scenarios/main.py index f3ea0100..64627e01 100644 --- a/backend/export/property_scenarios/main.py +++ b/backend/export/property_scenarios/main.py @@ -26,15 +26,14 @@ def has_solar_with_battery(materials_list: Optional[List[Dict[str, Any]]]) -> bo :return: """ for m in materials_list or []: - if ( - m.get("type") == "solar_pv" - and m.get("includes_battery") is True - ): + if m.get("type") == "solar_pv" and m.get("includes_battery") is True: return True return False -def process_export(payload: ExportRequest, session: Session) -> Dict[Union[str, int], pd.DataFrame]: +def process_export( + payload: ExportRequest, session: Session +) -> Dict[Union[str, int], pd.DataFrame]: export_files: Dict[Union[str, int], pd.DataFrame] = {} db_methods = DbMethods(session) @@ -52,7 +51,9 @@ def process_export(payload: ExportRequest, session: Session) -> Dict[Union[str, logger.info("Retrieved %s plans for export", len(plans_df)) if plans_df.empty: - logger.info("Empty plans dataframe - no plans to export. Returning empty export.") + logger.info( + "Empty plans dataframe - no plans to export. Returning empty export." + ) return export_files plan_ids: List[int] = plans_df["id"].tolist() recommendations_df: pd.DataFrame = db_methods.get_recommendations(plan_ids) @@ -61,13 +62,12 @@ def process_export(payload: ExportRequest, session: Session) -> Dict[Union[str, recommendations_df = db_methods.attach_materials(recommendations_df) - recommendations_df["has_solar_with_battery"] = ( - recommendations_df["materials"].apply(has_solar_with_battery) - ) + recommendations_df["has_solar_with_battery"] = recommendations_df[ + "materials" + ].apply(has_solar_with_battery) - _filter = ( - (recommendations_df["measure_type"] == "solar_pv") - & (recommendations_df["has_solar_with_battery"]) + _filter = (recommendations_df["measure_type"] == "solar_pv") & ( + recommendations_df["has_solar_with_battery"] ) recommendations_df.loc[_filter, "measure_type"] = ( @@ -83,10 +83,13 @@ def process_export(payload: ExportRequest, session: Session) -> Dict[Union[str, else: scenario_recs = recommendations_df[ recommendations_df["scenario_id"] == group_key - ] + ] if scenario_recs.empty: - logger.info("No recommendations found for group_key %s - skipping export for this group", group_key) + logger.info( + "No recommendations found for group_key %s - skipping export for this group", + group_key, + ) continue measures_df: pd.DataFrame = scenario_recs[ @@ -99,14 +102,12 @@ def process_export(payload: ExportRequest, session: Session) -> Dict[Union[str, values="estimated_cost", ).reset_index() - pivot["total_retrofit_cost"] = ( - pivot.drop(columns=["property_id", "plan_name"]).sum(axis=1) - ) + pivot["total_retrofit_cost"] = pivot.drop( + columns=["property_id", "plan_name"] + ).sum(axis=1) post_sap: pd.DataFrame = ( - scenario_recs.groupby("property_id")[["sap_points"]] - .sum() - .reset_index() + scenario_recs.groupby("property_id")[["sap_points"]].sum().reset_index() ) df: pd.DataFrame = ( @@ -117,7 +118,9 @@ def process_export(payload: ExportRequest, session: Session) -> Dict[Union[str, df["sap_points"] = df["sap_points"].fillna(0) df["predicted_post_works_sap"] = df["current_sap_points"] + df["sap_points"] - df["predicted_post_works_epc"] = df["predicted_post_works_sap"].apply(sap_to_epc) + df["predicted_post_works_epc"] = df["predicted_post_works_sap"].apply( + sap_to_epc + ) export_files[group_key] = df @@ -128,22 +131,17 @@ def process_export(payload: ExportRequest, session: Session) -> Dict[Union[str, # Lambda Handler # ============================================================ -def handler(event: Mapping[str, Any], context: Optional[Any]) -> Mapping[str, Union[int, str]]: + +def handler( + event: Mapping[str, Any], context: Optional[Any] +) -> Mapping[str, Union[int, str]]: """ Example event: body_dict = { "task_id": "test", "subtask_id": "test", - "portfolio_id": 655, - "scenario_ids": [], - "default_plans_only": True, - } - - body_dict = { - "task_id": "test", - "subtask_id": "test", - "portfolio_id": 655, - "scenario_ids": [1174], + "portfolio_id": 682, + "scenario_ids": [1210], "default_plans_only": False, } :param event: Lambda event containing export request details @@ -168,7 +166,12 @@ def handler(event: Mapping[str, Any], context: Optional[Any]) -> Mapping[str, Un exported_files = process_export(payload, session) # TODO: Need to handle the exported files - e.g. upload to s3 and email a presigned url - _ = exported_files + output_path = f"/tmp/export_{payload.portfolio_id}.xlsx" + with pd.ExcelWriter(output_path, engine="openpyxl") as writer: + for group_key, df in exported_files.items(): + sheet_name = str(group_key)[:31] # Excel sheet names max 31 chars + df.to_excel(writer, sheet_name=sheet_name, index=False) + logger.info("Exported files written to %s", output_path) return { "statusCode": 200, "body": json.dumps({}), diff --git a/backend/pashub_fetcher/The_Guinness_Partnership_AtkinsR_alis_Coordination_Design_Board_1774881298.xlsx b/backend/pashub_fetcher/The_Guinness_Partnership_AtkinsR_alis_Coordination_Design_Board_1774881298.xlsx index a6478e3b..beb679c1 100644 Binary files a/backend/pashub_fetcher/The_Guinness_Partnership_AtkinsR_alis_Coordination_Design_Board_1774881298.xlsx and b/backend/pashub_fetcher/The_Guinness_Partnership_AtkinsR_alis_Coordination_Design_Board_1774881298.xlsx differ diff --git a/backend/pashub_fetcher/handler/Dockerfile b/backend/pashub_fetcher/handler/Dockerfile index d045becd..f8c2008c 100644 --- a/backend/pashub_fetcher/handler/Dockerfile +++ b/backend/pashub_fetcher/handler/Dockerfile @@ -8,7 +8,8 @@ RUN chmod +x /usr/local/bin/aws-lambda-rie WORKDIR /var/task COPY utils/ utils/ -COPY backend/pashub_fetcher/ backend/pashub_fetcher/ +COPY backend/ backend/ +COPY datatypes/ datatypes/ COPY backend/pashub_fetcher/handler/requirements.txt . RUN pip install --no-cache-dir -r requirements.txt @@ -22,5 +23,4 @@ ENTRYPOINT ["python", "-m", "awslambdaric"] # ----------------------------- # Lambda handler # ----------------------------- -CMD ["backend.pashub_fetcher.handler.test_handler.handler"] -# CMD ["backend.pashub_fetcher.handler.handler.handler"] \ No newline at end of file +CMD ["backend.pashub_fetcher.handler.handler.handler"] \ No newline at end of file diff --git a/backend/pashub_fetcher/handler/handler.py b/backend/pashub_fetcher/handler/handler.py index 3689efe9..60b946c1 100644 --- a/backend/pashub_fetcher/handler/handler.py +++ b/backend/pashub_fetcher/handler/handler.py @@ -1,8 +1,7 @@ from datetime import datetime, timezone -import json import os import re -from typing import Any, Dict, List, Mapping, Optional +from typing import Any, Dict, List, Optional from openpyxl import load_workbook from backend.app.config import get_settings @@ -104,10 +103,19 @@ def upload_job_to_sharepoint( ) -def upload_job_to_s3_and_update_db(job_files: List[str], uprn: str) -> None: +def upload_job_to_s3_and_update_db( + job_files: List[str], uprn: Optional[str], hubspot_deal_id: Optional[str] +) -> None: bucket = "retrofit-energy-assessments-dev" - base_path = f"documents/uprn/{uprn}" + if not uprn and not hubspot_deal_id: + return + + base_path = ( + f"documents/uprn/{uprn}" + if uprn + else f"documents/hubspot_deal_id/{hubspot_deal_id}" + ) uploaded_files: List[UploadedFile] = [] @@ -118,12 +126,14 @@ def upload_job_to_s3_and_update_db(job_files: List[str], uprn: str) -> None: upload_file_to_s3(file_path, bucket, file_key) # load row to db + # TODO: use same upload_file_to_s3_and_update_db method as ecmk fetcher does uploaded_files.append( UploadedFile( s3_file_bucket=bucket, s3_file_key=file_key, s3_upload_timestamp=datetime.now(timezone.utc), - uprn=int(uprn), + uprn=int(uprn) if uprn else None, + hubspot_deal_id=hubspot_deal_id, file_source=FileSourceEnum.PAS_HUB.value, file_type=infer_file_type(filename), ) @@ -144,6 +154,7 @@ def process_job( job_id = job.pashub_job_id uprn: Optional[str] = job.uprn or pashub_client.get_uprn_by_job_id(job_id) + hubspot_deal_id: Optional[str] = job.hubspot_deal_id if uprn: logger.info(f"Got UPRN {uprn} for job {job_id}") @@ -152,9 +163,9 @@ def process_job( job_files: List[str] = pashub_client.get_core_evidence_files_by_job_id(job_id) - if uprn: + if uprn or hubspot_deal_id: logger.info("Uploading files to s3") - upload_job_to_s3_and_update_db(job_files, uprn) + upload_job_to_s3_and_update_db(job_files, uprn, hubspot_deal_id) # # Comment out sharepoint loading for now: # Seems like the sharepoint link in pas hub is inconsistent in terms @@ -167,9 +178,8 @@ def process_job( @task_handler() -def handler(event: Mapping[str, Any], context: Any) -> None: +def handler(body: Dict[str, Any], context: Any) -> List[str]: logger.info("Received message") - logger.info(f"Number of events: {len(event.get('Records', []))}") settings = get_settings() @@ -185,48 +195,34 @@ def handler(event: Mapping[str, Any], context: Any) -> None: sharepoint_location=DomnaSites.SOCIAL_HOUSING_WAVE_3 ) - saved_file_paths: List[str] = [] + logger.debug("Validating request body") + payload = PashubToAraTriggerRequest.model_validate(body) + logger.debug("Successfully validated request body") - for record in event.get("Records", []): - try: - body_dict = json.loads(record["body"]) - logger.debug("Validating request body") + try: + files: List[str] = process_job( + payload, + pashub_client, + sharepoint_client, + ) + except UnauthorizedError: + logger.warning("Token expired - refreshing") - payload = PashubToAraTriggerRequest.model_validate(body_dict) + pashub_client = get_pashub_client( + pas_hub_email, + pas_hub_password, + ) - logger.debug("Successfully validated request body") + # retry once + files = process_job( + payload, + pashub_client, + sharepoint_client, + ) - try: - files: List[str] = process_job( - payload, - pashub_client, - sharepoint_client, - ) - saved_file_paths.extend(files) + logger.info(f"Saved {len(files)} files") - except UnauthorizedError: - logger.warning("Token expired - refreshing") - - pashub_client = get_pashub_client( - pas_hub_email, - pas_hub_password, - ) - - # retry once - files: List[str] = process_job( - payload, - pashub_client, - sharepoint_client, - ) - saved_file_paths.extend(files) - - except Exception as e: - logger.info("Handler exception") - logger.error(f"Failed to process record: {e}") - - logger.info("Successfully loaded jobs from spreadsheet") - - logger.info(f"Saved {len(saved_file_paths)} files") + return files if __name__ == "__main__": diff --git a/backend/pashub_fetcher/handler/requirements.txt b/backend/pashub_fetcher/handler/requirements.txt index c4e416a8..ba235c7f 100644 --- a/backend/pashub_fetcher/handler/requirements.txt +++ b/backend/pashub_fetcher/handler/requirements.txt @@ -2,4 +2,12 @@ awslambdaric playwright==1.58.0 requests msal -openpyxl \ No newline at end of file +openpyxl +pydantic-settings +sqlalchemy +sqlmodel +psycopg2-binary +pytz +boto3==1.35.44 +pandas==2.2.2 +numpy<2.0 diff --git a/backend/pashub_fetcher/local_handler/docker-compose.yml b/backend/pashub_fetcher/local_handler/docker-compose.yml index 34ba9277..8b183d80 100644 --- a/backend/pashub_fetcher/local_handler/docker-compose.yml +++ b/backend/pashub_fetcher/local_handler/docker-compose.yml @@ -5,6 +5,8 @@ services: build: context: ../../../ dockerfile: backend/pashub_fetcher/handler/Dockerfile + entrypoint: ["/usr/local/bin/aws-lambda-rie", "python", "-m", "awslambdaric"] + command: ["backend.pashub_fetcher.handler.handler.handler"] ports: - "9000:8080" env_file: diff --git a/backend/pashub_fetcher/local_handler/invoke_local_lambda.py b/backend/pashub_fetcher/local_handler/invoke_local_lambda.py index 463ef9d8..5248a874 100644 --- a/backend/pashub_fetcher/local_handler/invoke_local_lambda.py +++ b/backend/pashub_fetcher/local_handler/invoke_local_lambda.py @@ -12,7 +12,9 @@ payload = { { "body": json.dumps( { - "uprn": 123456, + "pashub_link": "https://google.co.uk", + "uprn": "123456", + "hubspot_deal_id": "498926855369", } ) } diff --git a/backend/pashub_fetcher/pashub_to_ara_trigger_request.py b/backend/pashub_fetcher/pashub_to_ara_trigger_request.py index 2e4f8380..518a8dc3 100644 --- a/backend/pashub_fetcher/pashub_to_ara_trigger_request.py +++ b/backend/pashub_fetcher/pashub_to_ara_trigger_request.py @@ -12,6 +12,8 @@ class PashubToAraTriggerRequest(BaseModel): uprn: Optional[str] = None landlord_property_id: Optional[str] = None deal_stage: Optional[str] = None + hubspot_listing_id: Optional[int] = None + hubspot_deal_id: Optional[str] = None @property def pashub_job_id(self) -> str: diff --git a/backend/pashub_fetcher/token_getter.py b/backend/pashub_fetcher/token_getter.py index 5534d114..2e2d1440 100644 --- a/backend/pashub_fetcher/token_getter.py +++ b/backend/pashub_fetcher/token_getter.py @@ -7,24 +7,40 @@ from utils.logger import setup_logger logger = setup_logger() -def get_token_from_local_storage(email: str, password: str) -> str: +def get_token_from_local_storage( + email: str, password: str, record_video: bool = False +) -> str: logger.info("Starting Playwright flow") - # For local testing / debugging, we save videos of the flow - video_dir = os.path.join(os.path.dirname(__file__), "videos") - os.makedirs(video_dir, exist_ok=True) - with sync_playwright() as p: + logger.info("Playwright server started") + browser = p.chromium.launch( headless=True, - args=["--no-sandbox", "--disable-dev-shm-usage"], + args=[ + "--no-sandbox", + "--disable-dev-shm-usage", + "--disable-gpu", + "--single-process", + "--no-zygote", + ], ) + logger.info("Chromium launched successfully") - context = browser.new_context( - record_video_dir=video_dir, - record_video_size={"width": 1280, "height": 720}, - ) + video_dir = None + if record_video: + video_dir = os.path.join(os.path.dirname(__file__), "videos") + os.makedirs(video_dir, exist_ok=True) + + if record_video: + context = browser.new_context( + record_video_dir=video_dir, + record_video_size={"width": 1280, "height": 720}, + ) + else: + context = browser.new_context() page = context.new_page() + logger.info("Page created") try: logger.info("Navigating to site...") @@ -71,8 +87,8 @@ def get_token_from_local_storage(email: str, password: str) -> str: raise Exception(f"Unexpected error: {str(e)}") finally: - logger.info("Closing browser context (saving video)...") context.close() browser.close() - logger.info(f"Video(s) saved in: {video_dir}") + if record_video and video_dir: + logger.info(f"Video(s) saved in: {video_dir}") diff --git a/backend/pashub_fetcher/trigger_lambda_from_file.py b/backend/pashub_fetcher/trigger_lambda_from_file.py new file mode 100644 index 00000000..fb9d1cbf --- /dev/null +++ b/backend/pashub_fetcher/trigger_lambda_from_file.py @@ -0,0 +1,63 @@ +import json +import os +import re +from typing import Any, Dict, List + +from openpyxl import load_workbook + +from backend.pashub_fetcher.pashub_to_ara_trigger_request import ( + PashubToAraTriggerRequest, +) +from backend.pashub_fetcher.handler.handler import handler + + +if __name__ == "__main__": + BASE_DIR = os.path.dirname(os.path.dirname(__file__)) + filepath: str = os.path.join( + BASE_DIR, + "pashub_fetcher", + "The_Guinness_Partnership_AtkinsR_alis_Coordination_Design_Board_1774881298.xlsx", + ) + + wb = load_workbook(filepath, data_only=True) + ws = wb["filtered_2"] + + HEADER_ROW = 3 + + headers: Dict[str, int] = {} + for col in range(1, ws.max_column + 1): + value = str(ws.cell(row=HEADER_ROW, column=col).value) + if value: + headers[value.strip()] = col + + name_col = headers["Name"] + link_col = headers["PasHub Link"] + hubspot_deal_id_col = headers["HubSpot ID"] + + trigger_requests: List[PashubToAraTriggerRequest] = [] + + for row in range(HEADER_ROW + 1, ws.max_row + 1): + name = ws.cell(row=row, column=name_col).value + link = ws.cell(row=row, column=link_col).value + hubspot_deal_id = ws.cell(row=row, column=hubspot_deal_id_col).value + + if not name or not link or not hubspot_deal_id: + continue + + match = re.search(r"/jobs/([0-9a-fA-F\-]+)/", str(link)) + if not match: + continue + + trigger_requests.append( + PashubToAraTriggerRequest( + pashub_link=str(link), hubspot_deal_id=str(hubspot_deal_id) + ) + ) + + # ---- Build fake SQS event ---- + event: Dict[str, Any] = { + "Records": [{"body": json.dumps(req.model_dump())} for req in trigger_requests] + } + + context = None + handler(event, context) diff --git a/backend/postcode_splitter/main.py b/backend/postcode_splitter/main.py index 4f63ed4b..3d4b9aa0 100644 --- a/backend/postcode_splitter/main.py +++ b/backend/postcode_splitter/main.py @@ -263,10 +263,10 @@ def handler(event, context, local=False): bucket_name=bucket_name, ) - # Mark subtask as completed + # Mark subtask as complete subtask_interface.update_subtask_status( subtask_id, - "completed", + "complete", outputs={"rows_processed": "completed"}, ) diff --git a/backend/run_local.sh b/backend/run_local.sh index be45a54a..0201e508 100644 --- a/backend/run_local.sh +++ b/backend/run_local.sh @@ -2,5 +2,6 @@ set -a source ./.env set +a -uvicorn app.main:app --reload +cd .. +uvicorn backend.app.main:app --reload --host 0.0.0.0 --port 8000 diff --git a/backend/scripts/combine_address2uprn_outputs.py b/backend/scripts/combine_address2uprn_outputs.py index 105b8639..085240a9 100644 --- a/backend/scripts/combine_address2uprn_outputs.py +++ b/backend/scripts/combine_address2uprn_outputs.py @@ -31,6 +31,8 @@ def download_csv(key): def main(task_id, output): + task_id = "3fb9a9b7-ff49-4c11-b9e1-9d00da955a75" + print(f"Scanning task: {task_id}") csv_files = list_csv_files(task_id) diff --git a/backend/tests/test_bulk_combined_results.py b/backend/tests/test_bulk_combined_results.py new file mode 100644 index 00000000..33fae09b --- /dev/null +++ b/backend/tests/test_bulk_combined_results.py @@ -0,0 +1,21 @@ +import pytest + +from backend.app.bulk_uploads.scoring import score_bucket + + +@pytest.mark.parametrize( + "score,expected", + [ + (None, None), + (0.0, "low"), + (0.5, "low"), + (0.64, "low"), + (0.65, "med"), + (0.84, "med"), + (0.85, "high"), + (0.9, "high"), + (1.0, "high"), + ], +) +def test_score_bucket_thresholds(score, expected): + assert score_bucket(score) == expected diff --git a/backend/tests/test_bulk_combiner_status.py b/backend/tests/test_bulk_combiner_status.py new file mode 100644 index 00000000..5ac75037 --- /dev/null +++ b/backend/tests/test_bulk_combiner_status.py @@ -0,0 +1,103 @@ +from datetime import datetime, timezone +from uuid import uuid4 + +import pytest +from sqlalchemy import create_engine +from sqlmodel import Session, SQLModel + +from backend.app.db.functions import bulk_address_uploads_functions as module +from backend.app.db.functions.bulk_address_uploads_functions import ( + set_combined_output_s3_uri, + set_combining_status, +) +from backend.app.db.models.bulk_address_uploads import BulkAddressUpload + + +@pytest.fixture(scope="function") +def pg_engine(postgresql): + connection_string = ( + f"postgresql+psycopg://" + f"{postgresql.info.user}:" + f"{postgresql.info.password}@" + f"{postgresql.info.host}:" + f"{postgresql.info.port}/" + f"{postgresql.info.dbname}" + ) + engine = create_engine(connection_string) + SQLModel.metadata.create_all(engine) + yield engine + SQLModel.metadata.drop_all(engine) + engine.dispose() + + +@pytest.fixture +def patched_session(pg_engine, monkeypatch): + sessions = [] + + def factory(): + s = Session(pg_engine) + sessions.append(s) + return s + + monkeypatch.setattr(module, "get_db_session", factory) + yield pg_engine + for s in sessions: + s.close() + + +def _insert_row(engine, task_id, status="processing"): + with Session(engine) as session: + row = BulkAddressUpload( + id=uuid4(), + portfolio_id="p1", + user_id="u1", + s3_bucket="b", + s3_key="k", + filename="f.csv", + status=status, + task_id=task_id, + created_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc), + ) + session.add(row) + session.commit() + + +def _fetch(engine, task_id): + from sqlmodel import select + with Session(engine) as session: + return session.exec( + select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id) + ).first() + + +def test_set_combining_status_updates_row(patched_session): + task_id = uuid4() + _insert_row(patched_session, task_id, status="processing") + + set_combining_status(task_id) + + row = _fetch(patched_session, task_id) + assert row.status == "combining" + assert row.combined_output_s3_uri is None + + +def test_set_combined_output_s3_uri_writes_uri_and_awaiting_review(patched_session): + task_id = uuid4() + _insert_row(patched_session, task_id, status="combining") + + set_combined_output_s3_uri(task_id, "s3://bucket/bulk_final_outputs/abc/combined.csv") + + row = _fetch(patched_session, task_id) + assert row.status == "awaiting_review" + assert row.combined_output_s3_uri == "s3://bucket/bulk_final_outputs/abc/combined.csv" + + +def test_set_combining_status_missing_row_raises(patched_session): + with pytest.raises(ValueError, match="No bulk_address_uploads row"): + set_combining_status(uuid4()) + + +def test_set_combined_output_s3_uri_missing_row_raises(patched_session): + with pytest.raises(ValueError, match="No bulk_address_uploads row"): + set_combined_output_s3_uri(uuid4(), "s3://x/y.csv") diff --git a/backlog/config.yml b/backlog/config.yml new file mode 100644 index 00000000..edf9b80b --- /dev/null +++ b/backlog/config.yml @@ -0,0 +1,17 @@ +project_name: "model-backend" +default_status: "To Do" +statuses: ["To Do", "In Progress", "Done"] +labels: [] +definition_of_done: [] +date_format: yyyy-mm-dd +max_column_width: 20 +default_editor: "vim" +auto_open_browser: true +default_port: 6420 +remote_operations: false +auto_commit: false +zero_padded_ids: 3 +bypass_git_hooks: false +check_active_branches: false +active_branch_days: 30 +task_prefix: "task" diff --git a/devcontainer.sh b/devcontainer.sh new file mode 100644 index 00000000..ba35e7f0 --- /dev/null +++ b/devcontainer.sh @@ -0,0 +1,110 @@ +#!/usr/bin/env bash +# +# dc.sh — devcontainer helper for this repo +# +# Usage: +# ./devcontainer.sh +# +# Configs: backend | asset_list +# Commands: up, shell, down, rebuild +# +# `shell` auto-runs `up` first if the container isn't already running, +# so it's safe to call cold. +# +# Examples: +# ./devcontainer.sh backend shell # up + exec bash +# ./devcontainer.sh asset_list up +# ./devcontainer.sh backend rebuild +# ./devcontainer.sh backend down + +set -euo pipefail + +SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" +REPO_ROOT="${SCRIPT_DIR}" + +VALID_CONFIGS=(backend asset_list) +VALID_COMMANDS=(up shell down rebuild) + +# --- helpers --------------------------------------------------------------- + +usage() { + sed -n '3,20p' "${BASH_SOURCE[0]}" | sed 's/^# \{0,1\}//' + exit "${1:-0}" +} + +die() { + echo "error: $*" >&2 + exit 1 +} + +in_list() { + # in_list + local needle="$1" + shift + local item + for item in "$@"; do + [[ "${item}" == "${needle}" ]] && return 0 + done + return 1 +} + +container_id_for() { + # Find a running container for the given config path via devcontainer labels. + local config_path="$1" + docker ps -q \ + --filter "label=devcontainer.local_folder=${REPO_ROOT}" \ + --filter "label=devcontainer.config_file=${config_path}" +} + +# --- argument parsing ------------------------------------------------------ + +[[ $# -eq 2 ]] || usage 1 + +CONFIG_NAME="$1" +COMMAND="$2" + +in_list "${CONFIG_NAME}" "${VALID_CONFIGS[@]}" \ + || die "invalid config '${CONFIG_NAME}' (expected: ${VALID_CONFIGS[*]})" +in_list "${COMMAND}" "${VALID_COMMANDS[@]}" \ + || die "invalid command '${COMMAND}' (expected: ${VALID_COMMANDS[*]})" + +CONFIG_PATH="${REPO_ROOT}/.devcontainer/${CONFIG_NAME}/devcontainer.json" +[[ -f "${CONFIG_PATH}" ]] || die "config not found: ${CONFIG_PATH}" + +DC_ARGS=(--workspace-folder "${REPO_ROOT}" --config "${CONFIG_PATH}") + +# --- dispatch -------------------------------------------------------------- + +case "${COMMAND}" in + up) + echo ">> bringing up '${CONFIG_NAME}'" + devcontainer up "${DC_ARGS[@]}" + ;; + + shell) + # Auto-up if not already running. `devcontainer up` is idempotent — + # it reuses an existing container, so this is cheap on warm starts. + if [[ -z "$(container_id_for "${CONFIG_PATH}")" ]]; then + echo ">> '${CONFIG_NAME}' not running, bringing it up first" + devcontainer up "${DC_ARGS[@]}" + fi + echo ">> attaching shell to '${CONFIG_NAME}'" + devcontainer exec "${DC_ARGS[@]}" bash 2>/dev/null \ + || devcontainer exec "${DC_ARGS[@]}" sh + ;; + + down) + cid="$(container_id_for "${CONFIG_PATH}")" + if [[ -z "${cid}" ]]; then + echo ">> '${CONFIG_NAME}' not running, nothing to stop" + exit 0 + fi + echo ">> stopping '${CONFIG_NAME}'" + docker stop "${cid}" + ;; + + rebuild) + echo ">> rebuilding '${CONFIG_NAME}' from scratch" + devcontainer up "${DC_ARGS[@]}" --remove-existing-container --build-no-cache + ;; +esac diff --git a/etl/hubspot/hubspotClient.py b/etl/hubspot/hubspotClient.py index 6bdf71ed..df28e4d6 100644 --- a/etl/hubspot/hubspotClient.py +++ b/etl/hubspot/hubspotClient.py @@ -254,12 +254,12 @@ class HubspotClient: "sharepoint_link", "dampmould_growth", "damp_mould_and_repairs_comments", - "pre_sap", + "pre_sap_score_dropdown", "coordinator", "mtp_completion_date", "mtp_re_model_completion_date", "ioe_v3_completion_date", - "proposed_measures", + "proposed_measures_dropdown", "approved_package", "designer", "design_completion_date", @@ -275,6 +275,14 @@ class HubspotClient: "confirmed_survey_time", "surveyed_date", "design_type", + "batch", + "block_reference", + "epc_prn", + "potential_post_sap_score_dropdown", + "ei_score", + "ei_score__potential_", + "epc_sap_score", + "epc_sap_score__potential_", ], ) ) diff --git a/etl/hubspot/hubspotDataTodB.py b/etl/hubspot/hubspotDataTodB.py index 9756833b..a2eb24c2 100644 --- a/etl/hubspot/hubspotDataTodB.py +++ b/etl/hubspot/hubspotDataTodB.py @@ -159,7 +159,17 @@ class HubspotDataToDb: "damp_mould_and_repairs_comments": deal_data.get( "damp_mould_and_repairs_comments" ), - "pre_sap": deal_data.get("pre_sap"), + "pre_sap": deal_data.get("pre_sap_score_dropdown"), + "batch": deal_data.get("batch"), + "block_reference": deal_data.get("block_reference"), + "epc_prn": deal_data.get("epc_prn"), + "potential_post_sap_score_dropdown": deal_data.get( + "potential_post_sap_score_dropdown" + ), + "ei_score": deal_data.get("ei_score"), + "ei_score__potential_": deal_data.get("ei_score__potential_"), + "epc_sap_score": deal_data.get("epc_sap_score"), + "epc_sap_score__potential_": deal_data.get("epc_sap_score__potential_"), "coordinator": deal_data.get("coordinator"), "mtp_completion_date": parse_hs_date(deal_data.get("mtp_completion_date")), "mtp_re_model_completion_date": parse_hs_date( @@ -168,7 +178,7 @@ class HubspotDataToDb: "ioe_v3_completion_date": parse_hs_date( deal_data.get("ioe_v3_completion_date") ), - "proposed_measures": deal_data.get("proposed_measures"), + "proposed_measures": deal_data.get("proposed_measures_dropdown"), "approved_package": deal_data.get("approved_package"), "designer": deal_data.get("designer"), "design_completion_date": parse_hs_date( @@ -228,7 +238,17 @@ class HubspotDataToDb: damp_mould_and_repairs_comments=deal_data.get( "damp_mould_and_repairs_comments" ), - pre_sap=deal_data.get("pre_sap"), + pre_sap=deal_data.get("pre_sap_score_dropdown"), + batch=deal_data.get("batch"), + block_reference=deal_data.get("block_reference"), + epc_prn=deal_data.get("epc_prn"), + potential_post_sap_score_dropdown=deal_data.get( + "potential_post_sap_score_dropdown" + ), + ei_score=deal_data.get("ei_score"), + ei_score__potential_=deal_data.get("ei_score__potential_"), + epc_sap_score=deal_data.get("epc_sap_score"), + epc_sap_score__potential_=deal_data.get("epc_sap_score__potential_"), coordinator=deal_data.get("coordinator"), mtp_completion_date=parse_hs_date(deal_data.get("mtp_completion_date")), mtp_re_model_completion_date=parse_hs_date( @@ -237,7 +257,7 @@ class HubspotDataToDb: ioe_v3_completion_date=parse_hs_date( deal_data.get("ioe_v3_completion_date") ), - proposed_measures=deal_data.get("proposed_measures"), + proposed_measures=deal_data.get("proposed_measures_dropdown"), approved_package=deal_data.get("approved_package"), designer=deal_data.get("designer"), design_completion_date=parse_hs_date( diff --git a/etl/hubspot/hubspot_deal_differ.py b/etl/hubspot/hubspot_deal_differ.py index b95b544c..fa5bbe42 100644 --- a/etl/hubspot/hubspot_deal_differ.py +++ b/etl/hubspot/hubspot_deal_differ.py @@ -6,9 +6,9 @@ from etl.hubspot.utils import parse_hs_date class HubspotDealDiffer: COORDINATION_COMPLETE: List[str] = [ - "v1 ioe/mtp complete", - "v2 ioe/mtp complete", - "v3 ioe/mtp complete", + "(v1) ioe/mtp complete", + "(v2) ioe/mtp complete", + "(v3) ioe/mtp complete", ] RETROFIT_DESIGN_COMPLETE = "uploaded" LODGEMENT_COMPLETE: List[str] = ["lodgement complete", "measures lodged"] @@ -62,9 +62,17 @@ class HubspotDealDiffer: "sharepoint_link": "sharepoint_link", "dampmould_growth": "dampmould_growth", "damp_mould_and_repairs_comments": "damp_mould_and_repairs_comments", - "pre_sap": "pre_sap", + "pre_sap_score_dropdown": "pre_sap", + "batch": "batch", + "block_reference": "block_reference", + "epc_prn": "epc_prn", + "potential_post_sap_score_dropdown": "potential_post_sap_score_dropdown", + "ei_score": "ei_score", + "ei_score__potential_": "ei_score__potential_", + "epc_sap_score": "epc_sap_score", + "epc_sap_score__potential_": "epc_sap_score__potential_", "coordinator": "coordinator", - "proposed_measures": "proposed_measures", + "proposed_measures_dropdown": "proposed_measures", "approved_package": "approved_package", "designer": "designer", "actual_measures_installed": "actual_measures_installed", @@ -149,19 +157,19 @@ class HubspotDealDiffer: def _coordination_completed( new_deal: Dict[str, str], old_deal: HubspotDealData ) -> bool: - new_status: str = new_deal.get("coordination_status", "") + new_status: str = new_deal.get("coordination_status") or "" return ( new_status != "" - and new_status in HubspotDealDiffer.COORDINATION_COMPLETE + and new_status.lower() in HubspotDealDiffer.COORDINATION_COMPLETE and new_status != old_deal.coordination_status ) @staticmethod def _design_completed(new_deal: Dict[str, str], old_deal: HubspotDealData) -> bool: - new_status: str = new_deal.get("design_status", "") + new_status: str = new_deal.get("design_status") or "" return ( new_status != "" - and new_status == HubspotDealDiffer.RETROFIT_DESIGN_COMPLETE + and new_status.lower() == HubspotDealDiffer.RETROFIT_DESIGN_COMPLETE and new_status != old_deal.design_status ) @@ -169,9 +177,9 @@ class HubspotDealDiffer: def _lodgement_completed( new_deal: Dict[str, str], old_deal: HubspotDealData ) -> bool: - new_status: str = new_deal.get("lodgement_status", "") + new_status: str = new_deal.get("lodgement_status") or "" return ( new_status != "" - and new_status in HubspotDealDiffer.LODGEMENT_COMPLETE + and new_status.lower() in HubspotDealDiffer.LODGEMENT_COMPLETE and new_status != old_deal.lodgement_status ) diff --git a/etl/hubspot/scripts/scraper/main.py b/etl/hubspot/scripts/scraper/main.py index e6c788ea..f7dc1076 100644 --- a/etl/hubspot/scripts/scraper/main.py +++ b/etl/hubspot/scripts/scraper/main.py @@ -104,10 +104,10 @@ def _trigger_pashub_fetcher(sqs_client: Any, hubspot_deal: Dict[str, str]) -> No message_body: Dict[str, Optional[str]] = { "pashub_link": hubspot_deal["pashub_link"], "address": None, # potentially available from Listing, leave as None for now - "sharepoint_link": hubspot_deal["sharepoint_link"], - "uprn": hubspot_deal["national_uprn"], - "landlord_property_id": hubspot_deal["owner_property_id"], - "deal_stage": hubspot_deal["deal_stage"], + "sharepoint_link": hubspot_deal.get("sharepoint_link", None), + "uprn": hubspot_deal.get("national_uprn", None), + "landlord_property_id": hubspot_deal.get("owner_property_id", None), + "deal_stage": hubspot_deal.get("deal_stage", None), } response = sqs_client.send_message( @@ -121,5 +121,5 @@ def _trigger_pashub_fetcher(sqs_client: Any, hubspot_deal: Dict[str, str]) -> No if __name__ == "__main__": - handler({"hubspot_deal_id": "371470706915"}, "") + handler({"hubspot_deal_id": "498926855369"}, "") print("beep") diff --git a/etl/hubspot/tests/test_hubspot_deal_differ.py b/etl/hubspot/tests/test_hubspot_deal_differ.py index 69f7668b..0523c982 100644 --- a/etl/hubspot/tests/test_hubspot_deal_differ.py +++ b/etl/hubspot/tests/test_hubspot_deal_differ.py @@ -90,8 +90,8 @@ def test_pashub_trigger__pashub_link_changed__returns_true( @pytest.mark.parametrize( "coordination_status,expected", [ - ("v1 ioe/mtp complete", True), - ("v2 ioe/mtp complete", True), + ("(v1) ioe/mtp complete", True), + ("(v2) ioe/mtp complete", True), ], ) def test_pashub_trigger__coordination_completed_and_pashub_link_set__returns_true( diff --git a/infrastructure/terraform/lambda/_template/main.tf b/infrastructure/terraform/lambda/_template/main.tf index 81b1c7f1..1cd63ffe 100644 --- a/infrastructure/terraform/lambda/_template/main.tf +++ b/infrastructure/terraform/lambda/_template/main.tf @@ -36,6 +36,8 @@ module "lambda" { # Optional: Set maximum_concurrency to limit concurrent SQS-triggered invocations (2-1000) maximum_concurrency = var.maximum_concurrency + reserved_concurrent_executions = var.reserved_concurrent_executions + batch_size = var.batch_size environment = { diff --git a/infrastructure/terraform/lambda/_template/variables.tf b/infrastructure/terraform/lambda/_template/variables.tf index 0a3092ee..daaa0b7c 100644 --- a/infrastructure/terraform/lambda/_template/variables.tf +++ b/infrastructure/terraform/lambda/_template/variables.tf @@ -23,6 +23,12 @@ variable "maximum_concurrency" { description = "Maximum number of concurrent Lambda invocations from SQS (2-1000). null = no limit." } +variable "reserved_concurrent_executions" { + type = number + default = -1 + description = "Reserved concurrency for the Lambda function. -1 = unreserved. 1+ = hard limit across all triggers." +} + variable "batch_size" { type = number default = 1 diff --git a/infrastructure/terraform/lambda/bulk_address2uprn_combiner/main.tf b/infrastructure/terraform/lambda/bulk_address2uprn_combiner/main.tf new file mode 100644 index 00000000..4be4fc20 --- /dev/null +++ b/infrastructure/terraform/lambda/bulk_address2uprn_combiner/main.tf @@ -0,0 +1,44 @@ +data "terraform_remote_state" "shared" { + backend = "s3" + config = { + bucket = "assessment-model-terraform-state" + key = "env:/${var.stage}/terraform.tfstate" + region = "eu-west-2" + } +} + +data "aws_secretsmanager_secret_version" "db_credentials" { + secret_id = "${var.stage}/assessment_model/db_credentials" +} + +locals { + db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string) +} + +module "lambda" { + source = "../../modules/lambda_with_sqs" + + name = "bulk-address2uprn-combiner" + stage = var.stage + + image_uri = local.image_uri + + timeout = 900 + memory_size = 2048 + + maximum_concurrency = var.maximum_concurrency + batch_size = var.batch_size + + environment = { + STAGE = var.stage + LOG_LEVEL = "info" + S3_BUCKET_NAME = data.terraform_remote_state.shared.outputs.retrofit_sap_data_bucket_name + DB_USERNAME = local.db_credentials.db_assessment_model_username + DB_PASSWORD = local.db_credentials.db_assessment_model_password + } +} + +resource "aws_iam_role_policy_attachment" "bulk_address2uprn_combiner_s3" { + role = module.lambda.role_name + policy_arn = data.terraform_remote_state.shared.outputs.bulk_address2uprn_combiner_s3_arn +} diff --git a/infrastructure/terraform/lambda/bulk_address2uprn_combiner/outputs.tf b/infrastructure/terraform/lambda/bulk_address2uprn_combiner/outputs.tf new file mode 100644 index 00000000..e5155388 --- /dev/null +++ b/infrastructure/terraform/lambda/bulk_address2uprn_combiner/outputs.tf @@ -0,0 +1,14 @@ +output "bulk_address2uprn_combiner_queue_url" { + value = module.lambda.queue_url + description = "URL of the bulk_address2uprn_combiner SQS queue" +} + +output "bulk_address2uprn_combiner_queue_arn" { + value = module.lambda.queue_arn + description = "ARN of the bulk_address2uprn_combiner SQS queue" +} + +output "bulk_address2uprn_combiner_lambda_arn" { + value = module.lambda.lambda_arn + description = "ARN of the bulk_address2uprn_combiner Lambda function" +} diff --git a/infrastructure/terraform/lambda/bulk_address2uprn_combiner/provider.tf b/infrastructure/terraform/lambda/bulk_address2uprn_combiner/provider.tf new file mode 100644 index 00000000..45422d9f --- /dev/null +++ b/infrastructure/terraform/lambda/bulk_address2uprn_combiner/provider.tf @@ -0,0 +1,16 @@ +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + version = ">= 5.0" + } + } + + backend "s3" { + bucket = "bulk-address2uprn-combiner-terraform-state" + key = "terraform.tfstate" + region = "eu-west-2" + } + + required_version = ">= 1.2.0" +} diff --git a/infrastructure/terraform/lambda/bulk_address2uprn_combiner/variables.tf b/infrastructure/terraform/lambda/bulk_address2uprn_combiner/variables.tf new file mode 100644 index 00000000..6e1c84a2 --- /dev/null +++ b/infrastructure/terraform/lambda/bulk_address2uprn_combiner/variables.tf @@ -0,0 +1,38 @@ +variable "lambda_name" { + type = string + description = "Logical name of the lambda" +} + +variable "stage" { + description = "Deployment stage (e.g. dev, prod)" + type = string +} + +variable "ecr_repo_url" { + type = string + description = "ECR repository URL (no tag, no digest)" +} + +variable "image_digest" { + type = string + description = "Image digest (sha256:...)" +} + +variable "maximum_concurrency" { + type = number + default = 2 + description = "Maximum concurrent Lambda invocations from SQS (2-1000)." +} + +variable "batch_size" { + type = number + default = 1 +} + +locals { + image_uri = "${var.ecr_repo_url}@${var.image_digest}" +} + +output "resolved_image_uri" { + value = local.image_uri +} diff --git a/infrastructure/terraform/lambda/fast-api/main.tf b/infrastructure/terraform/lambda/fast-api/main.tf index 05447657..3a2b5a5f 100644 --- a/infrastructure/terraform/lambda/fast-api/main.tf +++ b/infrastructure/terraform/lambda/fast-api/main.tf @@ -28,6 +28,24 @@ data "terraform_remote_state" "categorisation" { } } +data "terraform_remote_state" "postcode_splitter" { + backend = "s3" + config = { + bucket = "postcode-splitter-terraform-state", + key = "env:/${var.stage}/terraform.tfstate" + region = "eu-west-2" + } +} + +data "terraform_remote_state" "bulk_address2uprn_combiner" { + backend = "s3" + config = { + bucket = "bulk-address2uprn-combiner-terraform-state", + key = "env:/${var.stage}/terraform.tfstate" + region = "eu-west-2" + } +} + ############################################ # Load Credentials ############################################ @@ -83,8 +101,10 @@ module "fastapi" { CARBON_BASELINE_PREDICTIONS_BUCKET = data.terraform_remote_state.shared.outputs.retrofit_carbon_baseline_predictions_bucket_name HEAT_BASELINE_PREDICTIONS_BUCKET = data.terraform_remote_state.shared.outputs.retrofit_heat_baseline_predictions_bucket_name - ENGINE_SQS_URL = data.terraform_remote_state.engine.outputs.ara_engine_queue_url - CATEGORISATION_SQS_URL = data.terraform_remote_state.categorisation.outputs.categorisation_queue_url + ENGINE_SQS_URL = data.terraform_remote_state.engine.outputs.ara_engine_queue_url + CATEGORISATION_SQS_URL = data.terraform_remote_state.categorisation.outputs.categorisation_queue_url + POSTCODE_SPLITTER_SQS_URL = data.terraform_remote_state.postcode_splitter.outputs.postcode_splitter_queue_url + COMBINER_SQS_URL = data.terraform_remote_state.bulk_address2uprn_combiner.outputs.bulk_address2uprn_combiner_queue_url } } @@ -104,7 +124,9 @@ module "fastapi_sqs_policy" { resources = [ data.terraform_remote_state.engine.outputs.ara_engine_queue_arn, - data.terraform_remote_state.categorisation.outputs.categorisation_queue_arn + data.terraform_remote_state.categorisation.outputs.categorisation_queue_arn, + data.terraform_remote_state.postcode_splitter.outputs.postcode_splitter_queue_arn, + data.terraform_remote_state.bulk_address2uprn_combiner.outputs.bulk_address2uprn_combiner_queue_arn ] conditions = null diff --git a/infrastructure/terraform/lambda/hubspot_deal_etl/main.tf b/infrastructure/terraform/lambda/hubspot_deal_etl/main.tf index e8762337..48dd6b78 100644 --- a/infrastructure/terraform/lambda/hubspot_deal_etl/main.tf +++ b/infrastructure/terraform/lambda/hubspot_deal_etl/main.tf @@ -55,4 +55,25 @@ module "hubspot_deal_etl" { resource "aws_iam_role_policy_attachment" "lambda_s3_policy" { role = module.hubspot_deal_etl.role_name policy_arn = data.terraform_remote_state.shared.outputs.hubspot_etl_s3_read_and_write_arn +} + +# Create and attach S3 send policy for PasHub Fetcher queue +module "hubspot_deal_etl_sqs_policy" { + source = "../../modules/general_iam_policy" + + policy_name = "hubspot-deal-etl-sqs-send-${var.stage}" + policy_description = "Allow Hubspot ETL Lambda to send messages to PasHub Fetcher queue" + + actions = [ + "sqs:SendMessage" + ] + + resources = [ + data.terraform_remote_state.pashub_to_ara.outputs.pashub_to_ara_queue_arn + ] +} + +resource "aws_iam_role_policy_attachment" "hubspot_deal_etl_sqs_send" { + role = module.hubspot_deal_etl.role_name + policy_arn = module.hubspot_deal_etl_sqs_policy.policy_arn } \ No newline at end of file diff --git a/infrastructure/terraform/lambda/pashub_to_ara/main.tf b/infrastructure/terraform/lambda/pashub_to_ara/main.tf index e898e949..ae719a99 100644 --- a/infrastructure/terraform/lambda/pashub_to_ara/main.tf +++ b/infrastructure/terraform/lambda/pashub_to_ara/main.tf @@ -7,6 +7,14 @@ data "terraform_remote_state" "shared" { } } +data "aws_secretsmanager_secret_version" "db_credentials" { + secret_id = "${var.stage}/assessment_model/db_credentials" +} + +locals { + db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string) +} + module "lambda" { source = "../../modules/lambda_with_sqs" @@ -18,10 +26,27 @@ module "lambda" { # Optional: Set maximum_concurrency to limit concurrent SQS-triggered invocations (2-1000) maximum_concurrency = var.maximum_concurrency + reserved_concurrent_executions = var.reserved_concurrent_executions + batch_size = var.batch_size environment = { STAGE = var.stage LOG_LEVEL = "info" + + DB_USERNAME = local.db_credentials.db_assessment_model_username + DB_PASSWORD = local.db_credentials.db_assessment_model_password + DB_HOST = var.db_host + DB_NAME = var.db_name + DB_PORT = var.db_port + + SHAREPOINT_CLIENT_ID = var.sharepoint_client_id + SHAREPOINT_CLIENT_SECRET = var.sharepoint_client_secret + DOMNA_SHAREPOINT_ID = var.domna_sharepoint_id + OSMOSIS_ACD_SHAREPOINT_ID = var.osmosis_acd_sharepoint_id + PRIVATE_PAY_SHAREPOINT_ID = var.private_pay_sharepoint_id + SOCIAL_HOUSING_WAVE_3_SHAREPOINT_ID = var.social_housing_wave_3_sharepoint_id + PASHUB_EMAIL = var.pashub_email + PASHUB_PASSWORD = var.pashub_password } } diff --git a/infrastructure/terraform/lambda/pashub_to_ara/outputs.tf b/infrastructure/terraform/lambda/pashub_to_ara/outputs.tf index d44b8763..584c9b63 100644 --- a/infrastructure/terraform/lambda/pashub_to_ara/outputs.tf +++ b/infrastructure/terraform/lambda/pashub_to_ara/outputs.tf @@ -2,3 +2,8 @@ output "pashub_to_ara_queue_url" { value = module.lambda.queue_url description = "URL of the PasHub to Ara SQS queue" } + +output "pashub_to_ara_queue_arn" { + value = module.lambda.queue_arn + description = "ARN of the PasHub to Ara SQS queue" +} diff --git a/infrastructure/terraform/lambda/pashub_to_ara/variables.tf b/infrastructure/terraform/lambda/pashub_to_ara/variables.tf index e7646811..e68a26b6 100644 --- a/infrastructure/terraform/lambda/pashub_to_ara/variables.tf +++ b/infrastructure/terraform/lambda/pashub_to_ara/variables.tf @@ -23,6 +23,12 @@ variable "maximum_concurrency" { description = "Maximum number of concurrent Lambda invocations from SQS (2-1000). null = no limit." } +variable "reserved_concurrent_executions" { + type = number + default = 1 + description = "Reserved concurrency. Defaults to 1 to prevent concurrent Playwright browser collisions." +} + variable "batch_size" { type = number default = 1 @@ -35,3 +41,58 @@ locals { output "resolved_image_uri" { value = local.image_uri } + +variable "db_host" { + type = string + sensitive = true +} + +variable "db_name" { + type = string + sensitive = true +} + +variable "db_port" { + type = string + sensitive = true +} + +variable "sharepoint_client_id" { + type = string + sensitive = true +} + +variable "sharepoint_client_secret" { + type = string + sensitive = true +} + +variable "domna_sharepoint_id" { + type = string + sensitive = true +} + +variable "osmosis_acd_sharepoint_id" { + type = string + sensitive = true +} + +variable "private_pay_sharepoint_id" { + type = string + sensitive = true +} + +variable "social_housing_wave_3_sharepoint_id" { + type = string + sensitive = true +} + +variable "pashub_email" { + type = string + sensitive = true +} + +variable "pashub_password" { + type = string + sensitive = true +} \ No newline at end of file diff --git a/infrastructure/terraform/lambda/postcodeSplitter/outputs.tf b/infrastructure/terraform/lambda/postcodeSplitter/outputs.tf new file mode 100644 index 00000000..847cefa4 --- /dev/null +++ b/infrastructure/terraform/lambda/postcodeSplitter/outputs.tf @@ -0,0 +1,9 @@ +output "postcode_splitter_queue_url" { + value = module.lambda.queue_url + description = "URL of the Postcode Splitter SQS queue" +} + +output "postcode_splitter_queue_arn" { + value = module.lambda.queue_arn + description = "ARN of the Postcode Splitter SQS queue" +} diff --git a/infrastructure/terraform/modules/lambda_service/main.tf b/infrastructure/terraform/modules/lambda_service/main.tf index 8a159db1..3250110b 100644 --- a/infrastructure/terraform/modules/lambda_service/main.tf +++ b/infrastructure/terraform/modules/lambda_service/main.tf @@ -9,6 +9,8 @@ resource "aws_lambda_function" "this" { memory_size = var.memory_size publish = true + reserved_concurrent_executions = var.reserved_concurrent_executions + environment { variables = var.environment } diff --git a/infrastructure/terraform/modules/lambda_service/variables.tf b/infrastructure/terraform/modules/lambda_service/variables.tf index 43def6ad..46241f30 100644 --- a/infrastructure/terraform/modules/lambda_service/variables.tf +++ b/infrastructure/terraform/modules/lambda_service/variables.tf @@ -16,3 +16,9 @@ variable "environment" { type = map(string) default = {} } + +variable "reserved_concurrent_executions" { + type = number + default = -1 + description = "Reserved concurrency for the Lambda function. -1 = unreserved (default). 0 = throttle all. 1+ = hard limit." +} diff --git a/infrastructure/terraform/modules/lambda_with_sqs/main.tf b/infrastructure/terraform/modules/lambda_with_sqs/main.tf index 35626487..97f86793 100644 --- a/infrastructure/terraform/modules/lambda_with_sqs/main.tf +++ b/infrastructure/terraform/modules/lambda_with_sqs/main.tf @@ -31,7 +31,8 @@ module "lambda" { timeout = var.timeout memory_size = var.memory_size - environment = var.environment + environment = var.environment + reserved_concurrent_executions = var.reserved_concurrent_executions } ############################################ diff --git a/infrastructure/terraform/modules/lambda_with_sqs/variables.tf b/infrastructure/terraform/modules/lambda_with_sqs/variables.tf index 7c2832d2..90585e92 100644 --- a/infrastructure/terraform/modules/lambda_with_sqs/variables.tf +++ b/infrastructure/terraform/modules/lambda_with_sqs/variables.tf @@ -40,3 +40,9 @@ variable "maximum_concurrency" { default = null description = "Maximum number of concurrent Lambda invocations from SQS. null = no limit." } + +variable "reserved_concurrent_executions" { + type = number + default = -1 + description = "Reserved concurrency for the Lambda function. -1 = unreserved. 1 = single-threaded." +} diff --git a/infrastructure/terraform/shared/main.tf b/infrastructure/terraform/shared/main.tf index 47866c92..fbd09565 100644 --- a/infrastructure/terraform/shared/main.tf +++ b/infrastructure/terraform/shared/main.tf @@ -477,6 +477,34 @@ output "postcode_splitter_s3_read_arn" { value = module.postcode_splitter_s3_read.policy_arn } +################################################ +# Bulk Address2UPRN Combiner – Lambda ECR +################################################ +module "bulk_address2uprn_combiner_state_bucket" { + source = "../modules/tf_state_bucket" + bucket_name = "bulk-address2uprn-combiner-terraform-state" +} + +module "bulk_address2uprn_combiner_registry" { + source = "../modules/container_registry" + name = "bulk_address2uprn_combiner" + stage = var.stage +} + +module "bulk_address2uprn_combiner_s3" { + source = "../modules/s3_iam_policy" + + policy_name = "BulkAddress2UprnCombinerS3" + policy_description = "Allow bulk_address2uprn_combiner Lambda to read ara_raw_outputs and write bulk_final_outputs" + bucket_arns = ["arn:aws:s3:::retrofit-data-${var.stage}"] + actions = ["s3:GetObject", "s3:ListBucket", "s3:PutObject"] + resource_paths = ["/ara_raw_outputs/*", "/bulk_final_outputs/*"] +} + +output "bulk_address2uprn_combiner_s3_arn" { + value = module.bulk_address2uprn_combiner_s3.policy_arn +} + ################################################ # Categorisation – Lambda ECR ################################################ diff --git a/run_backlog.sh b/run_backlog.sh new file mode 100644 index 00000000..398e921c --- /dev/null +++ b/run_backlog.sh @@ -0,0 +1,2 @@ +#!/bin/bash +backlog browser --port 6421 diff --git a/serverless.yml b/serverless.yml index cf369a36..43b8b4e5 100644 --- a/serverless.yml +++ b/serverless.yml @@ -33,8 +33,10 @@ provider: HEAT_BASELINE_PREDICTIONS_BUCKET: ${env:HEAT_BASELINE_PREDICTIONS_BUCKET} ENGINE_SQS_URL: Ref: EngineQueue - # hardcode the categorisation queue for now as it's created in terraform + # hardcoded queues (created in terraform) CATEGORISATION_SQS_URL: "https://sqs.eu-west-2.amazonaws.com/337213553626/categorisation-queue-dev" + POSTCODE_SPLITTER_SQS_URL: "https://sqs.eu-west-2.amazonaws.com/337213553626/postcode-splitter-queue-dev" + COMBINER_SQS_URL: "https://sqs.eu-west-2.amazonaws.com/337213553626/bulk-address2uprn-combiner-queue-dev" plugins: - serverless-python-requirements @@ -112,6 +114,8 @@ resources: Resource: - Fn::GetAtt: [ EngineQueue, Arn ] - "arn:aws:sqs:eu-west-2:337213553626:categorisation-queue-dev" + - "arn:aws:sqs:eu-west-2:337213553626:postcode-splitter-queue-dev" + - "arn:aws:sqs:eu-west-2:337213553626:bulk-address2uprn-combiner-queue-dev" - Effect: Allow Action: - s3:GetObject diff --git a/sfr/principal_pitch/2_export_data.py b/sfr/principal_pitch/2_export_data.py index 06727f86..b275086d 100644 --- a/sfr/principal_pitch/2_export_data.py +++ b/sfr/principal_pitch/2_export_data.py @@ -26,13 +26,13 @@ from backend.app.db.functions.materials_functions import get_materials from collections import defaultdict from sqlalchemy import func -PORTFOLIO_ID = 632 -SCENARIOS = [1144] +PORTFOLIO_ID = 711 +SCENARIOS = [1233] scenario_names = { - 1144: "EPC C", + 1233: "Reach EPC C", } -project_name = "Calico Project" +project_name = "Novus" def get_data(portfolio_id, scenario_ids): @@ -230,7 +230,7 @@ for scenario_id in SCENARIOS: # Get recs for this scenario recommended_measures_df = recommendations_df[ recommendations_df["scenario_id"] == scenario_id - ][["property_id", "measure_type", "estimated_cost", "default"]] + ][["property_id", "measure_type", "estimated_cost", "default"]] recommended_measures_df = recommended_measures_df[ recommended_measures_df["default"] ] @@ -238,7 +238,7 @@ for scenario_id in SCENARIOS: post_install_sap = recommendations_df[ recommendations_df["scenario_id"] == scenario_id - ][["property_id", "default", "sap_points"]] + ][["property_id", "default", "sap_points"]] post_install_sap = post_install_sap[post_install_sap["default"]] # Sum up the sap points by property id post_install_sap = (