Merge pull request #1204 from Hestia-Homes/feature/generate-ventilation-audit-from-magicplan

New application to generate ventilation audit file for a given hubspot deal ID
This commit is contained in:
Daniel Roth 2026-06-09 16:25:12 +01:00 committed by GitHub
commit 63b1fa08a8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
46 changed files with 1567 additions and 86 deletions

View file

@ -661,6 +661,42 @@ jobs:
TF_VAR_magicplan_customer_id: ${{ secrets.MAGICPLAN_CUSTOMER_ID }}
TF_VAR_magicplan_api_key: ${{ secrets.MAGICPLAN_API_KEY }}
# ============================================================
# Build Audit Generator image
# ============================================================
audit_generator_image:
needs: [determine_stage, shared_terraform]
uses: ./.github/workflows/_build_image.yml
with:
ecr_repo: audit-generator-${{ needs.determine_stage.outputs.stage }}
dockerfile_path: applications/audit_generator/handler/Dockerfile
build_context: .
secrets:
AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }}
AWS_REGION: ${{ secrets.DEV_AWS_REGION }}
# ============================================================
# Deploy Audit Generator Lambda
# ============================================================
audit_generator_lambda:
needs: [audit_generator_image, determine_stage]
uses: ./.github/workflows/_deploy_lambda.yml
with:
lambda_name: audit_generator
lambda_path: deployment/terraform/lambda/audit_generator
stage: ${{ needs.determine_stage.outputs.stage }}
ecr_repo: audit-generator-${{ needs.determine_stage.outputs.stage }}
image_digest: ${{ needs.audit_generator_image.outputs.image_digest }}
terraform_apply: ${{ needs.determine_stage.outputs.terraform_apply }}
secrets:
AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }}
AWS_REGION: ${{ secrets.DEV_AWS_REGION }}
TF_VAR_db_host: ${{ secrets.DEV_DB_HOST }}
TF_VAR_db_name: ${{ secrets.DEV_DB_NAME }}
TF_VAR_db_port: ${{ secrets.DEV_DB_PORT }}
# ============================================================
# Deploy Hubspot ETL Lambda
# ============================================================

View file

@ -123,6 +123,16 @@ jobs:
build_context: .
service_name: magic-plan
# ============================================================
# Audit Generator
# ============================================================
audit_generator_smoke_test:
uses: ./.github/workflows/_smoke_test_lambda.yml
with:
dockerfile_path: applications/audit_generator/handler/Dockerfile
build_context: .
service_name: audit-generator
# ============================================================
# HubSpot Scraper
# ============================================================

View file

@ -65,6 +65,16 @@ _Avoid_: user input, raw address, user_inputed_address
The reference cohort matched to a target Property by both geographic proximity (postcode prefix / UPRN range) and physical similarity (property type, built form, age band); used by the EPC Prediction Service for gap-filling and anomaly detection.
_Avoid_: neighbours, similar properties, peer set
### Survey documents
**Ventilation Audit**:
A machine-generated `.xlsx` spreadsheet produced by the `audit-generator` Lambda from a property's parsed **MagicPlan Plan**. Written fields per room: room name, width, length, area. Per window: dimensions, opening type, number of openings, percent openable (`pct_openable`), trickle vent count and area per vent. Per door: width and undercut. Internal doors appear once per room they connect (so typically twice). Columns requiring human knowledge (Blocked, Pictured, FP reference numbers, door location labels) are left blank for the coordinator to complete. Recorded in `uploaded_files` with `file_type = VENTILATION_AUDIT` and `file_source = AUDIT_GENERATOR`. Distinct from a PAS 2023 Ventilation document, which is externally uploaded by a human.
_Avoid_: ventilation report, audit report, PAS ventilation (that is the external survey form)
**PAS 2023 Ventilation**:
An externally-uploaded ventilation survey document produced by a human assessor and ingested from an external source (e.g. Coordination Hub). Recorded in `uploaded_files` with `file_type = PAS_2023_VENTILATION`. Distinct from a **Ventilation Audit**, which is machine-generated from MagicPlan floor plan data.
_Avoid_: ventilation audit (that is the generated output)
### Source data
**Site Notes**:

View file

View file

@ -0,0 +1,9 @@
from pydantic import BaseModel, ConfigDict
class AuditGeneratorTriggerRequest(BaseModel):
model_config = ConfigDict(extra="ignore")
task_id: str
sub_task_id: str
hubspot_deal_id: str

View file

@ -0,0 +1,39 @@
from __future__ import annotations
import os
from typing import Any
import boto3
from applications.audit_generator.audit_generator_trigger_request import (
AuditGeneratorTriggerRequest,
)
from infrastructure.postgres.config import PostgresConfig
from infrastructure.postgres.engine import make_engine, make_session
from infrastructure.s3.s3_client import S3Client
from orchestration.audit_generator_orchestrator import AuditGeneratorOrchestrator
from orchestration.audit_generator_unit_of_work import AuditGeneratorUnitOfWork
from utilities.aws_lambda.subtask_handler import subtask_handler
@subtask_handler()
def handler(body: dict[str, Any], context: Any) -> None:
trigger = AuditGeneratorTriggerRequest.model_validate(body)
boto3_client: Any = boto3.client # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
boto_s3: Any = boto3_client("s3")
bucket = os.environ["S3_BUCKET_NAME"]
s3_client = S3Client(boto_s3_client=boto_s3, bucket=bucket)
engine = make_engine(PostgresConfig.from_env(os.environ))
def session_factory() -> Any:
return make_session(engine)
def uow_factory() -> AuditGeneratorUnitOfWork:
return AuditGeneratorUnitOfWork(session_factory)
AuditGeneratorOrchestrator(
hubspot_deal_id=trigger.hubspot_deal_id,
s3_client=s3_client,
uow_factory=uow_factory,
).run()

View file

@ -0,0 +1,17 @@
FROM public.ecr.aws/lambda/python:3.11
WORKDIR /var/task
COPY applications/audit_generator/handler/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY utilities/ utilities/
COPY backend/ backend/
COPY applications/ applications/
COPY domain/ domain/
COPY datatypes/ datatypes/
COPY orchestration/ orchestration/
COPY repositories/ repositories/
COPY infrastructure/ infrastructure/
CMD ["applications.audit_generator.handler.handler"]

View file

@ -0,0 +1,7 @@
awslambdaric
sqlalchemy==2.0.36
sqlmodel
psycopg2-binary==2.9.10
pydantic-settings==2.6.0
boto3==1.35.44
openpyxl

View file

@ -3,7 +3,7 @@ from typing import Optional
from sqlalchemy import select
from backend.app.db.connection import db_read_session
from backend.app.db.models.uploaded_file import (
from infrastructure.postgres.uploaded_file_table import (
FileSourceEnum,
FileTypeEnum,
UploadedFile,

View file

@ -1,69 +0,0 @@
import enum
from sqlalchemy import TIMESTAMP, BigInteger, Column, Text, Enum as SqlEnum
from backend.app.db.base import Base
class FileTypeEnum(enum.Enum):
PHOTO_PACK = "photo_pack"
SITE_NOTE = "site_note"
RD_SAP_SITE_NOTE = "rd_sap_site_note"
PAS_2023_VENTILATION = "pas_2023_ventilation"
PAS_2023_CONDITION = "pas_2023_condition"
PAS_SIGNIFICANCE = "pas_significance"
PAR_PHOTO_PACK = "par_photo_pack"
PAS_2023_PROPERTY = "pas_2023_property"
PAS_2023_OCCUPANCY = "pas_2023_occupancy"
ECMK_SITE_NOTE = "ecmk_site_note"
ECMK_RD_SAP_SITE_NOTE = "ecmk_rd_sap_site_note"
ECMK_SURVEY_XML = "ecmk_survey_xml"
MAGIC_PLAN_JSON = "magic_plan_json"
IMPROVEMENT_OPTION_EVALUATION = "improvement_option_evaluation"
MEDIUM_TERM_IMPROVEMENT_PLAN = "medium_term_improvement_plan"
RETROFIT_DESIGN_DOC = "retrofit_design_doc"
MCS_COMPLIANCE_CERTIFICATE = "mcs_compliance_certificate"
OTHER = "other"
class FileSourceEnum(enum.Enum):
PAS_HUB = "pas hub"
COORDINATION_HUB = "coordination_hub"
SHAREPOINT = "sharepoint"
HUBSPOT = "hubspot"
ECMK = "ecmk"
MAGIC_PLAN = "magic_plan"
class UploadedFile(Base):
__tablename__ = "uploaded_files"
id = Column(BigInteger, primary_key=True, autoincrement=True)
s3_file_bucket = Column(Text, nullable=False)
s3_file_key = Column(Text, nullable=False)
s3_upload_timestamp = Column(TIMESTAMP(timezone=True), nullable=False)
landlord_property_id = Column(Text, nullable=True)
uprn = Column(BigInteger, nullable=True)
hubspot_listing_id = Column(BigInteger, nullable=True)
hubspot_deal_id = Column(Text, nullable=True)
file_type = Column(
SqlEnum(
FileTypeEnum,
name="file_type",
create_type=False,
values_callable=lambda enum_cls: [e.value for e in enum_cls],
),
nullable=True,
)
file_source = Column(
SqlEnum(
FileSourceEnum,
name="file_source",
create_type=False,
values_callable=lambda enum_cls: [e.value for e in enum_cls],
),
nullable=True,
)

View file

@ -7,7 +7,7 @@ from backend.app.db.connection import db_session
from backend.app.db.functions.uploaded_files_functions import (
get_uploaded_file_by_listing_type_and_source,
)
from backend.app.db.models.uploaded_file import FileSourceEnum, FileTypeEnum
from infrastructure.postgres.uploaded_file_table import FileSourceEnum, FileTypeEnum
from backend.documents_parser.db_writer import save_epc_property_data
from backend.documents_parser.parser import parse_site_notes_pdf
from backend.ecmk_fetcher.address_list import (

View file

@ -1,6 +1,6 @@
from enum import Enum
from backend.app.db.models.uploaded_file import FileTypeEnum
from infrastructure.postgres.uploaded_file_table import FileTypeEnum
class FileDownloadButtonType(Enum):

View file

@ -1,7 +1,7 @@
from typing import Dict
from unittest.mock import MagicMock, call, patch
from backend.app.db.models.uploaded_file import FileTypeEnum
from infrastructure.postgres.uploaded_file_table import FileTypeEnum
from backend.ecmk_fetcher.address_list import PropertyRow
from backend.ecmk_fetcher.ecmk_service import EcmkService
from backend.ecmk_fetcher.reports import FileDownloadButtonType

View file

@ -3,7 +3,7 @@ from unittest.mock import MagicMock, call, patch
import pytest
from backend.app.db.models.uploaded_file import FileTypeEnum
from infrastructure.postgres.uploaded_file_table import FileTypeEnum
from backend.ecmk_fetcher.upload import upload_file_to_s3_and_record

View file

@ -3,7 +3,7 @@ import os
from typing import cast
from backend.app.db.connection import db_session
from backend.app.db.models.uploaded_file import (
from infrastructure.postgres.uploaded_file_table import (
FileSourceEnum,
FileTypeEnum,
UploadedFile,

View file

@ -1,7 +1,7 @@
from enum import Enum
from typing import Optional
from backend.app.db.models.uploaded_file import FileTypeEnum
from infrastructure.postgres.uploaded_file_table import FileTypeEnum
class CoreFiles(Enum):

View file

@ -3,7 +3,7 @@ from datetime import datetime, timezone
from typing import Callable, List, NamedTuple, Optional, cast
from backend.app.db.connection import db_session
from backend.app.db.models.uploaded_file import (
from infrastructure.postgres.uploaded_file_table import (
FileSourceEnum,
FileTypeEnum,
UploadedFile,

View file

@ -4,7 +4,7 @@ from typing import Any, Callable, Optional
from unittest.mock import MagicMock, call, patch
from backend.app.db.models.uploaded_file import FileSourceEnum, FileTypeEnum
from infrastructure.postgres.uploaded_file_table import FileSourceEnum, FileTypeEnum
from backend.pashub_fetcher.pashub_client import (
DownloadedFile,
DownloadedFiles,

View file

@ -0,0 +1,126 @@
# PRD: Ventilation Audit Generator from MagicPlan
## Problem Statement
When a surveyor completes a MagicPlan survey for a property, the resulting floor plan data (rooms, windows, doors, ventilation measurements) needs to be transformed into a structured ventilation audit spreadsheet. Currently this transformation is manual — someone must extract plan data and populate a report by hand, which is slow and error-prone.
## Solution
An AWS Lambda (`audit-generator`) triggered via SQS receives a HubSpot deal ID, fetches the parsed MagicPlan `Plan` from the database, populates a pre-formatted `.xlsx` template with plan data, uploads the result to S3, and records it in `uploaded_files`. The populated spreadsheet is then accessible to the UI so the user knows an audit file exists for that deal.
## User Stories
1. As a coordinator, I want clicking a button in the UI to trigger generation of a ventilation audit spreadsheet, so that I do not have to manually populate it from the floor plan.
2. As a coordinator, I want the audit spreadsheet to be automatically populated with room, window, and door data from the MagicPlan survey, so that the data entry step is eliminated.
3. As a coordinator, I want the system to use a pre-formatted `.xlsx` template when generating the audit, so that conditional formatting and layout are preserved without requiring code changes.
4. As a coordinator, I want the UI to indicate whether a ventilation audit already exists for a deal, so that I avoid triggering duplicate generation unnecessarily.
5. As a coordinator, I want re-triggering generation to overwrite the previous audit file, so that I can regenerate after a corrected survey is uploaded.
6. As an engineer, I want the lambda to raise a clear error if no MagicPlan JSON has been uploaded for the deal, so that misconfigured triggers are diagnosed quickly.
7. As an engineer, I want the lambda to raise a distinct error if a MagicPlan JSON exists but has not yet been parsed into the database, so that timing issues are distinguishable from missing data.
8. As an engineer, I want the generated spreadsheet recorded in `uploaded_files` with a `VENTILATION_AUDIT` file type, so that the UI and other systems can query for its existence.
9. As an engineer, I want the lambda to follow the `@subtask_handler()` pattern, so that it integrates with the task orchestration system and benefits from standard error handling and observability.
## Implementation Decisions
- **Lambda pattern**: `@subtask_handler()` decorator. Trigger body contains `task_id`, `sub_task_id`, and `hubspot_deal_id`.
- **MAGIC_PLAN_JSON lookup**: Query `uploaded_files` filtered by `hubspot_deal_id` and `file_type = MAGIC_PLAN_JSON`, ordered by `s3_upload_timestamp DESC`, taking the most recent row. Rationale: a re-upload supersedes the earlier file.
- **Plan retrieval**: Use the existing `MagicPlanPostgresRepository.get_plan_by_uploaded_file_id` to fetch the parsed domain `Plan` from postgres. The lambda does not re-parse from S3 — that is the magic_plan lambda's responsibility.
- **Error handling — two distinct cases**:
- No `uploaded_files` row found → raise with message indicating no MagicPlan has been uploaded for this deal.
- Row found but `get_plan_by_uploaded_file_id` returns `None` → raise with message indicating the plan has been uploaded but not yet parsed.
- Both use the same exception type; distinct messages enable diagnosis in CloudWatch.
- **Spreadsheet generation**:
- Format: `.xlsx` via `openpyxl`.
- The template `d1_ventilation_template.xlsx` is bundled with the lambda at `applications/audit-generator/d1_ventilation_template.xlsx` and loaded from the deployment package via `importlib.resources` or a path relative to the handler file. No S3 round-trip for the template.
- The template is loaded with `openpyxl.load_workbook(path)` (default `data_only=False` to preserve formulas), populated, and serialised to bytes via `BytesIO` for upload.
- Cell targeting uses fixed column letters (see Spreadsheet Layout below). Named ranges are not defined in the template.
- The template has formulas in columns J (`=H*I`), N (`=J*M`), S (`=Q*R`), and Y (`=W*X`) — the lambda does not write to these cells; they are calculated by Excel/Sheets when the file is opened.
- The template has 50 data rows (rows 655), extended programmatically. The footer merge sits at A56:Z56; legend rows at 5760.
- **Output S3 key**: `documents/hubspot_deal_id/{hubspot_deal_id}/ventilation_audit.xlsx`. Re-running the lambda overwrites the previous file.
- **Operation order**: S3 upload first, then `uploaded_files` DB insert. An orphaned S3 file on DB failure is harmless and will be overwritten on retry. A DB record pointing to a non-existent file is worse.
- **New enum values** (added to `FileTypeEnum` and `FileSourceEnum`):
- `FileTypeEnum.VENTILATION_AUDIT = "ventilation_audit"`
- `FileSourceEnum.AUDIT_GENERATOR = "audit_generator"`
- **DDD migration of `UploadedFile`**: The existing `backend/app/db/models/uploaded_file.py` (SQLAlchemy `Base`) is replaced by `infrastructure/postgres/uploaded_file_table.py` (SQLModel). `FileTypeEnum`, `FileSourceEnum`, and `UploadedFile` all move there. The class name `UploadedFile` is kept (no `Model` suffix — there is no domain counterpart). All seven consumers update their import path; `backend/app/db/models/uploaded_file.py` is deleted. Because `UploadedFile` is now registered on `SQLModel.metadata`, the shared `tests/conftest.py` `db_engine` fixture must emit `CREATE TYPE IF NOT EXISTS` for `file_type` and `file_source` via raw SQL before calling `SQLModel.metadata.create_all(engine)` — otherwise the table creation fails for all integration tests. The dedicated per-test conftest approach (Question 6) is therefore superseded.
- **New `UploadedFileRepository`**: A new repository (`UploadedFilePostgresRepository`) is introduced with a `get_latest_by_hubspot_deal_id(hubspot_deal_id: str, file_type: FileTypeEnum) -> Optional[UploadedFile]` method. Queries `uploaded_files` filtered by `hubspot_deal_id` and `file_type`, ordered by `s3_upload_timestamp DESC`, returning the most recent row.
- **Session management**: A dedicated `AuditGeneratorUnitOfWork` context manager (standalone — does not inherit from `PostgresUnitOfWork` or `UnitOfWork`) holds `uploaded_file: UploadedFilePostgresRepository` and `magic_plan: MagicPlanPostgresRepository`, both bound to the same session. Opens the session on `__enter__`, rolls back and closes on `__exit__`, exposes `commit()`. The handler holds a module-scoped engine (reused across warm Lambda invocations) and passes a `session_factory` callable to `AuditGeneratorUnitOfWork` — the session is created fresh per invocation and never long-lived.
- **Idempotency**: No duplicate guard. `uploaded_files` is append-only — the lambda always inserts a new row; rows are never updated or deleted. The S3 file is always overwritten at the fixed key. The UI and any future queries treat the most recent row by `s3_upload_timestamp` as authoritative.
- **Environment variables**:
- `S3_BUCKET_NAME` (shared convention)
- `DATABASE_URL` (shared convention)
- **Trigger**: The SQS message is sent by a UI action in a separate repo. No SQS publishing client is required in this PR.
## Testing Decisions
Good tests assert observable outputs given controlled inputs — they do not assert on internal call sequences or implementation details. Prefer mocking at the boundary of the system under test, not inside it.
**Handler tests** (`tests/applications/audit_generator/test_audit_generator_handler.py`):
- Test that an invalid trigger body raises `ValidationError`.
- Test that the orchestrator is constructed with values derived from env vars and the trigger body.
- Test that the handler returns the expected value on success.
- Use `handler.__wrapped__` to bypass the `@subtask_handler` decorator (prior art: `test_magic_plan_handler.py`).
**Orchestrator tests** (`tests/orchestration/audit_generator/test_audit_generator_orchestrator.py`):
- Mock `S3Client` with `MagicMock(spec=S3Client)`. Mock the `AuditGeneratorUnitOfWork` factory: the factory returns a mock UoW whose `__enter__` returns itself and whose `.uploaded_file` and `.magic_plan` attributes are mock repos.
- Test happy path: correct S3 key used for output upload; `uploaded_files` insert called with correct `file_type` and `file_source`; `uow.commit()` called.
- Test error path: raises with appropriate message when `uploaded_file_repo.get_latest_by_hubspot_deal_id` returns `None`.
- Test error path: raises with appropriate message when `magic_plan_repo.get_plan_by_uploaded_file_id` returns `None`.
**Repository tests** (`tests/repositories/uploaded_file/test_uploaded_file_postgres_repository.py`):
- Integration tests using the shared `db_engine` fixture. The fixture already calls `SQLModel.metadata.create_all(engine)`; after the DDD migration `UploadedFile` is in `SQLModel.metadata`, so no dedicated conftest is needed. The shared `tests/conftest.py` must emit `CREATE TYPE IF NOT EXISTS` for `file_type` and `file_source` before `create_all`.
- Test that `get_latest_by_hubspot_deal_id` returns the most recent row by `s3_upload_timestamp` when multiple rows with the same `file_type` exist.
- Test that it returns `None` when no matching row exists.
- Test that it filters correctly by `file_type` (a row with a different `file_type` is not returned).
## Out of Scope
- The SQS trigger — the UI button that sends the SQS message lives in a separate repo.
- Any ventilation calculation or compliance logic — the spreadsheet is populated with raw plan data only.
## Spreadsheet Layout
Sheet name: `D1 Ventilation`. Data starts at row 6. The three series run in parallel columns — each row may contain room data, window data, and door data independently; the longest series determines the last row used.
| Column | Content | Source |
|--------|---------|--------|
| B | Room name | `Room.name` |
| D | Room area (m²) | `Room.area_m2` |
| G | Window location (room name) | `Room.name` (parent room) |
| H | Window width (m) | `Window.width_m` |
| I | Window height (m) | `Window.height_m` |
| J | Window area (m²) | **formula** `=H*I` — do not write |
| K | Opening type | `WindowVentilation.opening_type` |
| L | Number of openings | `WindowVentilation.num_openings` |
| M | % of window (decimal) | `WindowVentilation.pct_openable / 100` |
| N | Total opening area (m²) | **formula** `=J*M` — do not write |
| O | Blocked | leave blank (visual check by auditor) |
| P | Pictured | leave blank (visual check by auditor) |
| Q | Trickle vent effective area per vent (mm²) | `WindowVentilation.trickle_vent_area_mm2` |
| R | Number of trickle vents | `WindowVentilation.num_trickle_vents` |
| S | Total trickle vent area (mm²) | **formula** `=Q*R` — do not write |
| V | Door location (room name) | `Room.name` (parent room) |
| W | Door width (mm) | `Door.width_mm` |
| X | Door undercut (mm) | `DoorVentilation.undercut_mm` |
| Y | Door area (mm²) | **formula** `=W*X` — do not write |
Internal doors appear once per room they connect (typically twice). `WindowVentilation` and `DoorVentilation` fields are `Optional`; write `0` when `None` so formula cells (J, N, S, Y) do not produce `#VALUE!` errors.
## Further Notes
- The `audit-generator` application scaffold already exists at `applications/audit-generator/` with empty `handler.py` and `audit_generator_trigger_request.py` files.
- The `MagicPlanPostgresRepository.get_plan_by_uploaded_file_id` method is the correct entry point for fetching the parsed plan — no S3 re-parsing is needed.
- The `openpyxl` library must be added to `applications/audit-generator/handler/requirements.txt`.
- The template (`d1_ventilation_template.xlsx`) has 50 data rows (rows 655) with formulas in columns J, N, S, Y. If a property exceeds 50 windows, rooms, or doors the lambda should raise a clear error rather than silently truncating.

View file

@ -0,0 +1,45 @@
data "terraform_remote_state" "shared" {
backend = "s3"
config = {
bucket = "assessment-model-terraform-state"
key = "env:/${var.stage}/terraform.tfstate"
region = "eu-west-2"
}
}
data "aws_secretsmanager_secret_version" "db_credentials" {
secret_id = "${var.stage}/assessment_model/db_credentials"
}
locals {
db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)
}
resource "aws_iam_role_policy_attachment" "audit_generator_s3_write" {
role = module.lambda.role_name
policy_arn = data.terraform_remote_state.shared.outputs.energy_assessments_s3_write_arn
}
module "lambda" {
source = "../../modules/lambda_with_sqs"
name = "audit_generator"
stage = var.stage
image_uri = local.image_uri
maximum_concurrency = var.maximum_concurrency
reserved_concurrent_executions = var.reserved_concurrent_executions
batch_size = var.batch_size
environment = {
STAGE = var.stage
LOG_LEVEL = "info"
S3_BUCKET_NAME = data.terraform_remote_state.shared.outputs.retrofit_energy_assessments_bucket_name
POSTGRES_USERNAME = local.db_credentials.db_assessment_model_username
POSTGRES_PASSWORD = local.db_credentials.db_assessment_model_password
POSTGRES_HOST = var.db_host
POSTGRES_DATABASE = var.db_name
POSTGRES_PORT = var.db_port
}
}

View file

@ -0,0 +1,9 @@
output "audit_generator_queue_url" {
value = module.lambda.queue_url
description = "URL of the Audit Generator SQS queue"
}
output "audit_generator_queue_arn" {
value = module.lambda.queue_arn
description = "ARN of the Audit Generator SQS queue"
}

View file

@ -0,0 +1,16 @@
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = ">= 5.0"
}
}
backend "s3" {
bucket = "audit-generator-terraform-state"
key = "terraform.tfstate"
region = "eu-west-2"
}
required_version = ">= 1.2.0"
}

View file

@ -0,0 +1,52 @@
variable "stage" {
description = "Deployment stage (e.g. dev, prod)"
type = string
}
variable "ecr_repo_url" {
type = string
description = "ECR repository URL (no tag, no digest)"
}
variable "image_digest" {
type = string
description = "Image digest (sha256:...)"
}
variable "maximum_concurrency" {
type = number
default = null
}
variable "reserved_concurrent_executions" {
type = number
default = 1
}
variable "batch_size" {
type = number
default = 1
}
locals {
image_uri = "${var.ecr_repo_url}@${var.image_digest}"
}
output "resolved_image_uri" {
value = local.image_uri
}
variable "db_host" {
type = string
sensitive = true
}
variable "db_name" {
type = string
sensitive = true
}
variable "db_port" {
type = string
sensitive = true
}

View file

@ -830,3 +830,17 @@ module "magic_plan_client_registry" {
stage = var.stage
}
################################################
# Audit Generator Lambda
################################################
module "audit_generator_state_bucket" {
source = "../modules/tf_state_bucket"
bucket_name = "audit-generator-terraform-state"
}
module "audit_generator_registry" {
source = "../modules/container_registry"
name = "audit-generator"
stage = var.stage
}

View file

@ -0,0 +1,68 @@
# PRD: Extract ventilation audit sheet population into the magicplan domain
**Status:** Backlog
---
## Problem Statement
The logic that maps a `Plan` into spreadsheet cells — which column receives `pct_openable / 100`, which rows are rooms vs windows vs doors, what the 50-row capacity limit is, how column Y conditional formatting is applied — currently lives inside the orchestrator. Developers reading `AuditGeneratorOrchestrator` have to wade through cell-writing details to understand the orchestration flow, and there is no way to test the sheet-population rules in isolation without invoking the full orchestrator (which requires a mocked UoW, mocked S3, and the real XLSX template file).
## Solution
Move all sheet-population logic into the magicplan domain as a dedicated module (`ventilation_audit`), exposing a single public function `populate_sheet(sheet, plan)`. The orchestrator delegates to this function and retains only its infrastructure responsibilities: loading the template, serialising the workbook, uploading to S3, and persisting metadata.
This makes the mapping rules directly testable against a plain `openpyxl` sheet with no orchestration overhead, and keeps the orchestrator focused on coordination rather than domain rules.
## User Stories
1. As a developer debugging a malformed audit spreadsheet, I want the cell-mapping rules to live in the domain so that I can locate the logic without reading through orchestration code.
2. As a developer writing a test for ventilation audit content, I want to call `populate_sheet` directly with a synthetic `Plan` and a blank sheet so that I can assert cell values without mocking S3 or a unit of work.
3. As a developer adding a new opening type or ventilation field, I want the affected mapping logic to be co-located with the `Plan` domain models so that the change is easy to find and the impact is obvious.
4. As a developer reading the orchestrator, I want the `run()` method to read as a sequence of high-level steps (fetch → populate → serialise → upload → persist) with no cell-writing detail so that the orchestration intent is immediately clear.
5. As a developer running the test suite, I want the 50-row overflow validation to be covered by a domain-level test so that regressions in that constraint are caught without running the full orchestrator.
6. As a developer extending the audit template to a second sheet, I want the sheet-population contract to be a clearly bounded function so that I can add a second `populate_*` function in the same module without touching the orchestrator.
## Implementation Decisions
- **New module `domain/magicplan/ventilation_audit.py`** contains the public function `populate_sheet(sheet, plan)` and all private helpers (`_write_cell`, `_apply_column_y_formatting`) and constants (`_DATA_START_ROW`, `_MAX_ROWS`, `_Y_CF_RANGE`, `_Y_THRESHOLD`, `_Y_HEADER`). These are moved verbatim from the orchestrator — no logic changes.
- **`populate_sheet` is the sole public surface.** Helpers remain private to the module. This follows the existing `mapper.py` pattern (stateless module-level functions, no class wrapper).
- **The orchestrator imports `populate_sheet`** and replaces its `_populate_sheet(sheet, plan)` call. All `openpyxl.cell.rich_text`, `openpyxl.cell.text`, `openpyxl.formatting.rule`, and `openpyxl.styles` imports move with the logic. `openpyxl.load_workbook` stays — loading the template is an infrastructure step.
- **`_serialise_workbook` stays in the orchestrator** — converting a workbook to bytes is a serialisation step, not domain logic.
- **No interface change to the orchestrator's public API**`AuditGeneratorOrchestrator.__init__` and `run()` signatures are unchanged.
## Testing Decisions
Good tests for `populate_sheet` assert observable outputs (cell values, conditional formatting rule count) given a controlled `Plan` input. They do not assert on internal call sequences or private helper invocations.
Tests should use a fresh `openpyxl.Workbook().active` sheet — no template file needed, which keeps them fast and dependency-free.
Modules to test (new file: `tests/domain/magicplan/test_ventilation_audit.py`):
| Scenario | Assertion |
|---|---|
| Rooms written correctly | Col B = room name, col D = area_m2, starting at `_DATA_START_ROW` |
| Windows written correctly | Cols GI, KM, QR populated; pct_openable divided by 100 |
| Windows with null ventilation | Ventilation columns default to 0 |
| Doors written correctly | Cols VX populated with room name, width_mm, undercut_mm |
| Room overflow | > 50 rooms raises `ValueError` |
| Window overflow | > 50 windows raises `ValueError` |
| Door overflow | > 50 doors raises `ValueError` |
| Column Y formatting applied | Sheet has two conditional formatting rules after `populate_sheet` |
Prior art: `tests/orchestration/audit_generator/test_audit_generator_orchestrator.py` shows the `_make_plan` / `_make_window` / `_make_door` fixture pattern to reuse. The existing orchestrator tests need no changes.
## Out of Scope
- Changes to the spreadsheet template or column layout.
- Support for plans with more than 50 rooms, windows, or doors (the 50-row limit is a template constraint, not lifted here).
- Extracting `_serialise_workbook` or template-loading into the domain.
- Any changes to the `AuditGeneratorOrchestrator` public API or the Lambda entry point.
## Further Notes
The orchestrator test suite already provides integration-level coverage (S3 call order, `UploadedFile` enums, error paths). This refactor adds the missing unit-level coverage for the mapping rules, which are currently exercised only incidentally via the happy-path orchestrator tests.

View file

@ -0,0 +1,86 @@
from __future__ import annotations
from typing import Any
from openpyxl.cell.rich_text import CellRichText, TextBlock
from openpyxl.cell.text import InlineFont
from openpyxl.formatting.rule import CellIsRule # type: ignore[reportUnknownVariableType]
from openpyxl.styles import Color, Font
from domain.magicplan.models import Door, Plan, Room, Window
_DATA_START_ROW = 6
_MAX_ROWS = 50
_Y_CF_RANGE = f"Y{_DATA_START_ROW}:Y{_DATA_START_ROW + _MAX_ROWS - 1}"
_Y_THRESHOLD = 7600
_Y_HEADER = CellRichText(
TextBlock(InlineFont(b=True, sz=11, rFont="Aptos Narrow"), "Area (mm2)\n"),
TextBlock(InlineFont(b=True, sz=11, color=Color(rgb="FF0000"), rFont="Aptos Narrow"), "<"),
TextBlock(InlineFont(b=True, sz=11, rFont="Aptos Narrow"), " 7600 "),
TextBlock(InlineFont(b=True, sz=11, color=Color(rgb="196B24"), rFont="Aptos Narrow"), "<"),
)
def _apply_column_y_formatting(sheet: Any) -> None:
sheet.conditional_formatting.add(
_Y_CF_RANGE,
CellIsRule(operator="lessThan", formula=[str(_Y_THRESHOLD)], font=Font(color=Color(rgb="FF0000"))),
)
sheet.conditional_formatting.add(
_Y_CF_RANGE,
CellIsRule(operator="greaterThan", formula=[str(_Y_THRESHOLD)], font=Font(color=Color(rgb="196B24"))),
)
sheet["Y3"] = _Y_HEADER
def _write_cell(sheet: Any, row: int, col: str, value: Any) -> None:
sheet[f"{col}{row}"] = value
def populate_sheet(sheet: Any, plan: Plan) -> None:
rooms: list[Room] = [room for floor in plan.floors for room in floor.rooms]
windows: list[tuple[str, Window]] = [
(room.name, w) for room in rooms for w in room.windows
]
doors: list[tuple[str, Door]] = [
(room.name, d) for room in rooms for d in room.doors
]
if len(rooms) > _MAX_ROWS:
raise ValueError(f"Room series exceeds {_MAX_ROWS} rows ({len(rooms)} rooms)")
if len(windows) > _MAX_ROWS:
raise ValueError(f"Window series exceeds {_MAX_ROWS} rows ({len(windows)} windows)")
if len(doors) > _MAX_ROWS:
raise ValueError(f"Door series exceeds {_MAX_ROWS} rows ({len(doors)} doors)")
for i, room in enumerate(rooms):
row = _DATA_START_ROW + i
_write_cell(sheet, row, "B", room.name)
_write_cell(sheet, row, "D", room.area_m2)
for i, (room_name, window) in enumerate(windows):
row = _DATA_START_ROW + i
vent = window.ventilation
_write_cell(sheet, row, "G", room_name)
_write_cell(sheet, row, "H", window.width_m)
_write_cell(sheet, row, "I", window.height_m)
# J = formula =H*I — do not write
_write_cell(sheet, row, "K", vent.opening_type if vent else 0)
_write_cell(sheet, row, "L", vent.num_openings if vent else 0)
pct = vent.pct_openable if vent else None
_write_cell(sheet, row, "M", (pct / 100) if pct is not None else 0)
# N = formula =J*M — do not write
# O, P = blank (visual check by auditor)
_write_cell(sheet, row, "Q", vent.trickle_vent_area_mm2 if vent else 0)
_write_cell(sheet, row, "R", vent.num_trickle_vents if vent else 0)
# S = formula =Q*R — do not write
for i, (room_name, door) in enumerate(doors):
row = _DATA_START_ROW + i
vent = door.ventilation
_write_cell(sheet, row, "V", room_name)
_write_cell(sheet, row, "W", door.width_mm)
_write_cell(sheet, row, "X", vent.undercut_mm if vent else 0)
# Y = formula =W*X — do not write
_apply_column_y_formatting(sheet)

View file

@ -0,0 +1,96 @@
from __future__ import annotations
import enum
from typing import ClassVar, Optional
from sqlalchemy import TIMESTAMP, BigInteger, Column, Text
from sqlalchemy import Enum as SqlEnum
from sqlmodel import Field, SQLModel
class FileTypeEnum(enum.Enum):
PHOTO_PACK = "photo_pack"
SITE_NOTE = "site_note"
RD_SAP_SITE_NOTE = "rd_sap_site_note"
PAS_2023_VENTILATION = "pas_2023_ventilation"
PAS_2023_CONDITION = "pas_2023_condition"
PAS_SIGNIFICANCE = "pas_significance"
PAR_PHOTO_PACK = "par_photo_pack"
PAS_2023_PROPERTY = "pas_2023_property"
PAS_2023_OCCUPANCY = "pas_2023_occupancy"
ECMK_SITE_NOTE = "ecmk_site_note"
ECMK_RD_SAP_SITE_NOTE = "ecmk_rd_sap_site_note"
ECMK_SURVEY_XML = "ecmk_survey_xml"
MAGIC_PLAN_JSON = "magic_plan_json"
IMPROVEMENT_OPTION_EVALUATION = "improvement_option_evaluation"
MEDIUM_TERM_IMPROVEMENT_PLAN = "medium_term_improvement_plan"
RETROFIT_DESIGN_DOC = "retrofit_design_doc"
MCS_COMPLIANCE_CERTIFICATE = "mcs_compliance_certificate"
OTHER = "other"
VENTILATION_AUDIT = "ventilation_audit"
class FileSourceEnum(enum.Enum):
PAS_HUB = "pas hub"
COORDINATION_HUB = "coordination_hub"
SHAREPOINT = "sharepoint"
HUBSPOT = "hubspot"
ECMK = "ecmk"
MAGIC_PLAN = "magic_plan"
AUDIT_GENERATOR = "audit_generator"
def _enum_values(enum_cls: type[enum.Enum]) -> list[str]:
return [e.value for e in enum_cls]
class UploadedFile(SQLModel, table=True):
__tablename__: ClassVar[str] = "uploaded_files" # pyright: ignore[reportIncompatibleVariableOverride]
id: Optional[int] = Field(
default=None,
sa_column=Column(BigInteger, primary_key=True, autoincrement=True),
)
s3_file_bucket: str = Field(sa_column=Column(Text, nullable=False))
s3_file_key: str = Field(sa_column=Column(Text, nullable=False))
s3_upload_timestamp: object = Field(
sa_column=Column(TIMESTAMP(timezone=True), nullable=False)
)
landlord_property_id: Optional[str] = Field(
default=None, sa_column=Column(Text, nullable=True)
)
uprn: Optional[int] = Field(
default=None, sa_column=Column(BigInteger, nullable=True)
)
hubspot_listing_id: Optional[int] = Field(
default=None, sa_column=Column(BigInteger, nullable=True)
)
hubspot_deal_id: Optional[str] = Field(
default=None, sa_column=Column(Text, nullable=True)
)
file_type: Optional[str] = Field(
default=None,
sa_column=Column(
SqlEnum(
FileTypeEnum,
name="file_type",
create_type=False,
values_callable=_enum_values,
),
nullable=True,
),
)
file_source: Optional[str] = Field(
default=None,
sa_column=Column(
SqlEnum(
FileSourceEnum,
name="file_source",
create_type=False,
values_callable=_enum_values,
),
nullable=True,
),
)

View file

@ -0,0 +1,79 @@
from __future__ import annotations
from collections.abc import Callable
from datetime import datetime, timezone
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
import openpyxl
from domain.magicplan.ventilation_audit import populate_sheet
from infrastructure.postgres.uploaded_file_table import (
FileSourceEnum,
FileTypeEnum,
UploadedFile,
)
from infrastructure.s3.s3_client import S3Client
if TYPE_CHECKING:
from orchestration.audit_generator_unit_of_work import AuditGeneratorUnitOfWork
_TEMPLATE_PATH = Path(__file__).parent.parent / "applications" / "audit_generator" / "d1_ventilation_template.xlsx"
_SHEET_NAME = "D1 Ventilation"
def _serialise_workbook(wb: Any) -> bytes:
buf = BytesIO()
wb.save(buf)
return buf.getvalue()
class AuditGeneratorOrchestrator:
def __init__(
self,
hubspot_deal_id: str,
s3_client: S3Client,
uow_factory: Callable[[], "AuditGeneratorUnitOfWork"],
) -> None:
self._hubspot_deal_id = hubspot_deal_id
self._s3_client = s3_client
self._uow_factory = uow_factory
def run(self) -> None:
with self._uow_factory() as uow:
uploaded_file = uow.uploaded_file.get_latest_by_hubspot_deal_id(
self._hubspot_deal_id, FileTypeEnum.MAGIC_PLAN_JSON
)
if uploaded_file is None:
raise ValueError(
f"No MagicPlan JSON has been uploaded for deal {self._hubspot_deal_id!r}"
)
plan = uow.magic_plan.get_plan_by_uploaded_file_id(cast(int, uploaded_file.id))
if plan is None:
raise ValueError(
f"MagicPlan JSON exists for deal {self._hubspot_deal_id!r} "
"but the plan is not yet parsed into the database"
)
wb = openpyxl.load_workbook(_TEMPLATE_PATH)
sheet = wb[_SHEET_NAME]
populate_sheet(sheet, plan)
xlsx_bytes = _serialise_workbook(wb)
s3_key = (
f"documents/hubspot_deal_id/{self._hubspot_deal_id}/ventilation_audit.xlsx"
)
self._s3_client.put_object(s3_key, xlsx_bytes)
new_row = UploadedFile(
s3_file_bucket=self._s3_client.bucket,
s3_file_key=s3_key,
s3_upload_timestamp=datetime.now(timezone.utc),
hubspot_deal_id=self._hubspot_deal_id,
file_type=FileTypeEnum.VENTILATION_AUDIT.value,
file_source=FileSourceEnum.AUDIT_GENERATOR.value,
)
uow.uploaded_file.insert(new_row)
uow.commit()

View file

@ -0,0 +1,39 @@
from __future__ import annotations
from collections.abc import Callable
from types import TracebackType
from typing import Optional
from sqlmodel import Session
from repositories.magic_plan.magic_plan_postgres_repository import (
MagicPlanPostgresRepository,
)
from repositories.uploaded_file.uploaded_file_postgres_repository import (
UploadedFilePostgresRepository,
)
class AuditGeneratorUnitOfWork:
def __init__(self, session_factory: Callable[[], Session]) -> None:
self._session_factory = session_factory
def __enter__(self) -> "AuditGeneratorUnitOfWork":
self._session = self._session_factory()
self.uploaded_file = UploadedFilePostgresRepository(self._session)
self.magic_plan = MagicPlanPostgresRepository(self._session)
return self
def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType],
) -> None:
try:
self._session.rollback()
finally:
self._session.close()
def commit(self) -> None:
self._session.commit()

View file

@ -8,7 +8,7 @@ from domain.magicplan.api.response import MagicPlanPlan, PlanSummary
from domain.magicplan.mapper import map_plan
from domain.magicplan.models import Plan
from backend.app.db.models.uploaded_file import (
from infrastructure.postgres.uploaded_file_table import (
FileSourceEnum,
FileTypeEnum,
UploadedFile,

View file

@ -1,12 +1,20 @@
from __future__ import annotations
from typing import Any, cast
from typing import Any, NamedTuple, Optional, cast
from sqlalchemy import delete, select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlmodel import Session, col
from domain.magicplan.models import Floor, Plan
from domain.magicplan.models import (
Door,
DoorVentilation,
Floor,
Plan,
Room,
Window,
WindowVentilation,
)
from infrastructure.postgres.magic_plan_tables import (
MagicPlanDoorModel,
MagicPlanDoorVentilationModel,
@ -19,10 +27,153 @@ from infrastructure.postgres.magic_plan_tables import (
from repositories.magic_plan.magic_plan_repository import MagicPlanRepository
class _Rows(NamedTuple):
floors: list[MagicPlanFloorModel]
rooms: list[MagicPlanRoomModel]
windows: list[MagicPlanWindowModel]
doors: list[MagicPlanDoorModel]
win_vents: list[MagicPlanWindowVentilationModel]
door_vents: list[MagicPlanDoorVentilationModel]
def _build_windows(
rows: list[MagicPlanWindowModel],
vents: list[MagicPlanWindowVentilationModel],
) -> dict[int, list[Window]]:
vent_by_id = {wv.magic_plan_window_id: wv for wv in vents}
result: dict[int, list[Window]] = {}
for row in rows:
wv = vent_by_id.get(cast(int, row.id))
result.setdefault(row.magic_plan_room_id, []).append(
Window(
width_m=cast(float, row.width_m),
height_m=cast(float, row.height_m),
area_m2=cast(float, row.area_m2),
ventilation=WindowVentilation(
opening_type=wv.opening_type,
num_openings=wv.num_openings,
pct_openable=wv.pct_openable,
trickle_vent_area_mm2=wv.trickle_vent_area_mm2,
num_trickle_vents=wv.num_trickle_vents,
) if wv else None,
)
)
return result
def _build_doors(
rows: list[MagicPlanDoorModel],
vents: list[MagicPlanDoorVentilationModel],
) -> dict[int, list[Door]]:
vent_by_id = {dv.magic_plan_door_id: dv for dv in vents}
result: dict[int, list[Door]] = {}
for row in rows:
dv = vent_by_id.get(cast(int, row.id))
result.setdefault(row.magic_plan_room_id, []).append(
Door(
width_mm=cast(float, row.width_mm),
height_mm=cast(float, row.height_mm),
ventilation=DoorVentilation(undercut_mm=dv.undercut_mm) if dv else None,
)
)
return result
def _build_rooms(
rows: list[MagicPlanRoomModel],
windows_by_room: dict[int, list[Window]],
doors_by_room: dict[int, list[Door]],
) -> dict[int, list[Room]]:
result: dict[int, list[Room]] = {}
for row in rows:
room_id = cast(int, row.id)
result.setdefault(row.magic_plan_floor_id, []).append(
Room(
name=cast(str, row.name),
width_m=cast(float, row.width_m),
length_m=cast(float, row.length_m),
area_m2=cast(float, row.area_m2),
windows=windows_by_room.get(room_id, []),
doors=doors_by_room.get(room_id, []),
)
)
return result
class MagicPlanPostgresRepository(MagicPlanRepository):
def __init__(self, session: Session) -> None:
self._session = session
def get_plan_by_uploaded_file_id(self, uploaded_file_id: int) -> Optional[Plan]:
plan_row = self._fetch_one(
select(MagicPlanPlanModel).where(
col(MagicPlanPlanModel.uploaded_file_id) == uploaded_file_id
)
)
if plan_row is None:
return None
rows = self._fetch_rows(cast(int, plan_row.id))
windows_by_room = _build_windows(rows.windows, rows.win_vents)
doors_by_room = _build_doors(rows.doors, rows.door_vents)
rooms_by_floor = _build_rooms(rows.rooms, windows_by_room, doors_by_room)
return Plan(
uid=cast(str, plan_row.magic_plan_uid),
name=plan_row.name,
address=plan_row.address,
postcode=plan_row.postcode,
floors=[
Floor(level=f.level, name=None, rooms=rooms_by_floor.get(cast(int, f.id), []))
for f in rows.floors
],
)
def _fetch_rows(self, plan_id: int) -> _Rows:
floor_rows: list[MagicPlanFloorModel] = self._fetch_many(
select(MagicPlanFloorModel).where(
col(MagicPlanFloorModel.magic_plan_plan_id) == plan_id
)
)
floor_ids = [cast(int, f.id) for f in floor_rows]
room_rows: list[MagicPlanRoomModel] = self._fetch_many(
select(MagicPlanRoomModel).where(
col(MagicPlanRoomModel.magic_plan_floor_id).in_(floor_ids)
)
)
room_ids = [cast(int, r.id) for r in room_rows]
window_rows: list[MagicPlanWindowModel] = self._fetch_many(
select(MagicPlanWindowModel).where(
col(MagicPlanWindowModel.magic_plan_room_id).in_(room_ids)
)
)
door_rows: list[MagicPlanDoorModel] = self._fetch_many(
select(MagicPlanDoorModel).where(
col(MagicPlanDoorModel.magic_plan_room_id).in_(room_ids)
)
)
win_vents: list[MagicPlanWindowVentilationModel] = self._fetch_many(
select(MagicPlanWindowVentilationModel).where(
col(MagicPlanWindowVentilationModel.magic_plan_window_id).in_(
[cast(int, w.id) for w in window_rows]
)
)
)
door_vents: list[MagicPlanDoorVentilationModel] = self._fetch_many(
select(MagicPlanDoorVentilationModel).where(
col(MagicPlanDoorVentilationModel.magic_plan_door_id).in_(
[cast(int, d.id) for d in door_rows]
)
)
)
return _Rows(floor_rows, room_rows, window_rows, door_rows, win_vents, door_vents)
def _fetch_one(self, stmt: Any) -> Any:
return self._session.execute(stmt).scalars().one_or_none() # pyright: ignore[reportDeprecated]
def _fetch_many(self, stmt: Any) -> Any:
return list(self._session.execute(stmt).scalars().all()) # pyright: ignore[reportDeprecated]
def save(self, plan: Plan, uploaded_file_id: int) -> None:
plan_id = self._upsert_plan(plan, uploaded_file_id)
self._delete_children(plan_id)

View file

View file

@ -0,0 +1,28 @@
from __future__ import annotations
from typing import Optional
from sqlalchemy import select
from sqlmodel import Session, col
from infrastructure.postgres.uploaded_file_table import FileTypeEnum, UploadedFile
class UploadedFilePostgresRepository:
def __init__(self, session: Session) -> None:
self._session = session
def get_latest_by_hubspot_deal_id(
self, hubspot_deal_id: str, file_type: FileTypeEnum
) -> Optional[UploadedFile]:
stmt = (
select(UploadedFile)
.where(col(UploadedFile.hubspot_deal_id) == hubspot_deal_id)
.where(col(UploadedFile.file_type) == file_type.value)
.order_by(col(UploadedFile.s3_upload_timestamp).desc())
.limit(1)
)
return self._session.execute(stmt).scalars().one_or_none() # pyright: ignore[reportDeprecated]
def insert(self, uploaded_file: UploadedFile) -> None:
self._session.add(uploaded_file)

View file

@ -0,0 +1,83 @@
"""
Run audit_generator locally.
Usage:
cd /workspaces/model
python scripts/run_audit_generator_local.py [<hubspot_deal_id>]
Prompts for deal ID and S3 destination (local file or real S3) if not supplied.
"""
from __future__ import annotations
import os
import sys
from pathlib import Path
from typing import Any, Union
import boto3
# Load .env before importing infra modules
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent.parent / "backend" / ".env")
from infrastructure.postgres.config import PostgresConfig
from infrastructure.postgres.engine import make_engine, make_session
from infrastructure.s3.s3_client import S3Client
from orchestration.audit_generator_orchestrator import AuditGeneratorOrchestrator
from orchestration.audit_generator_unit_of_work import AuditGeneratorUnitOfWork
class _LocalS3Client:
"""Writes to local filesystem instead of S3."""
def __init__(self, output_dir: Path) -> None:
self._output_dir = output_dir
self._output_dir.mkdir(parents=True, exist_ok=True)
@property
def bucket(self) -> str:
return "local"
def get_object(self, key: str) -> bytes:
raise NotImplementedError
def put_object(self, key: str, body: bytes) -> str:
dest = self._output_dir / Path(key).name
dest.write_bytes(body)
print(f"Saved: {dest}")
return str(dest)
def _make_s3_client() -> Union[S3Client, "_LocalS3Client"]:
use_real = input("Use real S3? [y/N]: ").strip().lower() == "y"
if use_real:
bucket = "retrofit-energy-assessments-dev"
boto3_client: Any = boto3.client
return S3Client(boto_s3_client=boto3_client("s3"), bucket=bucket)
output_dir = Path(__file__).parent.parent / "local_output"
return _LocalS3Client(output_dir)
def main() -> None:
deal_id = sys.argv[1] if len(sys.argv) > 1 else input("hubspot_deal_id: ").strip()
s3_client = _make_s3_client()
engine = make_engine(PostgresConfig.from_env(os.environ))
def session_factory() -> Any:
return make_session(engine)
def uow_factory() -> AuditGeneratorUnitOfWork:
return AuditGeneratorUnitOfWork(session_factory)
AuditGeneratorOrchestrator(
hubspot_deal_id=deal_id,
s3_client=s3_client, # type: ignore[arg-type]
uow_factory=uow_factory,
).run()
if __name__ == "__main__":
main()

View file

@ -0,0 +1,99 @@
from __future__ import annotations
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from pydantic import ValidationError
from applications.audit_generator.handler import handler
_ENV = {
"DATABASE_URL": "postgresql+psycopg://user:pass@localhost/db",
"S3_BUCKET_NAME": "test-bucket",
# Tests patch PostgresConfig and make_engine to avoid needing the individual
# POSTGRES_* vars that PostgresConfig.from_env would otherwise require.
}
_VALID_BODY: dict[str, Any] = {
"task_id": "task-1",
"sub_task_id": "subtask-1",
"hubspot_deal_id": "deal-xyz",
}
def _call(body: dict[str, Any]) -> Any:
return handler.__wrapped__(body, None) # type: ignore[attr-defined]
# --- request validation ---
def test_invalid_body_raises_validation_error() -> None:
# Arrange — body missing all required fields
body: dict[str, Any] = {}
# Act / Assert
with patch("applications.audit_generator.handler.AuditGeneratorOrchestrator"):
with pytest.raises(ValidationError):
_call(body)
# --- orchestrator construction ---
def test_handler_passes_hubspot_deal_id_from_body_to_orchestrator() -> None:
# Arrange
mock_orch = MagicMock()
mock_orch.run.return_value = None
# Act
with patch("applications.audit_generator.handler.os.environ", _ENV), \
patch("applications.audit_generator.handler.PostgresConfig"), \
patch("applications.audit_generator.handler.make_engine"), \
patch("applications.audit_generator.handler.S3Client") as MockS3, \
patch("applications.audit_generator.handler.AuditGeneratorOrchestrator", return_value=mock_orch) as MockOrch:
MockS3.return_value = MagicMock()
_call(_VALID_BODY)
# Assert — deal id flows from body into the orchestrator constructor
MockOrch.assert_called_once()
assert MockOrch.call_args.kwargs["hubspot_deal_id"] == "deal-xyz"
def test_handler_passes_bucket_from_env_to_s3_client() -> None:
# Arrange
mock_orch = MagicMock()
mock_orch.run.return_value = None
# Act
with patch("applications.audit_generator.handler.os.environ", _ENV), \
patch("applications.audit_generator.handler.PostgresConfig"), \
patch("applications.audit_generator.handler.make_engine"), \
patch("applications.audit_generator.handler.S3Client") as MockS3, \
patch("applications.audit_generator.handler.AuditGeneratorOrchestrator", return_value=mock_orch):
_call(_VALID_BODY)
# Assert — bucket name from env reaches S3Client constructor
MockS3.assert_called_once()
assert MockS3.call_args.kwargs["bucket"] == "test-bucket"
# --- return value ---
def test_handler_returns_none_on_success() -> None:
# Arrange
mock_orch = MagicMock()
mock_orch.run.return_value = None
# Act
with patch("applications.audit_generator.handler.os.environ", _ENV), \
patch("applications.audit_generator.handler.PostgresConfig"), \
patch("applications.audit_generator.handler.make_engine"), \
patch("applications.audit_generator.handler.S3Client"), \
patch("applications.audit_generator.handler.AuditGeneratorOrchestrator", return_value=mock_orch):
result = _call(_VALID_BODY)
# Assert
assert result is None

View file

@ -17,29 +17,65 @@ from typing import Any
import pytest
from psycopg import Connection
from pytest_postgresql import factories
from sqlalchemy import Engine
from sqlalchemy import Engine, text
from sqlmodel import SQLModel, create_engine
# Importing the SQLModel row modules registers their tables on
# SQLModel.metadata so ``create_all`` builds the full schema. Imports look
# unused; they aren't.
import infrastructure.postgres.uploaded_file_table as _uf_table # pyright: ignore[reportUnusedImport]
# pg_ctl ships under a versioned path and is not on PATH in the dev container.
_PG_CTL = next(iter(sorted(glob.glob("/usr/lib/postgresql/*/bin/pg_ctl"))), "pg_ctl")
postgresql_proc = factories.postgresql_proc(
executable=_PG_CTL
) # pyright: ignore[reportUnknownMemberType]
postgresql_proc = factories.postgresql_proc(executable=_PG_CTL) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]
postgresql = factories.postgresql("postgresql_proc")
def _create_pg_enum_types(engine: Engine) -> None:
"""Emit CREATE TYPE for PostgreSQL enum types used by UploadedFile.
SQLModel.metadata.create_all uses create_type=False for these enums
(they are normally created by Alembic migrations). Tests need them upfront.
A DO block swallows duplicate_object so the fixture is safe to call on a
pre-seeded database.
"""
from infrastructure.postgres.uploaded_file_table import FileSourceEnum, FileTypeEnum
ft_values = ", ".join(f"'{e.value}'" for e in FileTypeEnum)
fs_values = ", ".join(f"'{e.value}'" for e in FileSourceEnum)
with engine.connect() as conn:
conn.execute(
text(
f"""
DO $$ BEGIN
CREATE TYPE file_type AS ENUM ({ft_values});
EXCEPTION WHEN duplicate_object THEN NULL;
END $$;
"""
)
)
conn.execute(
text(
f"""
DO $$ BEGIN
CREATE TYPE file_source AS ENUM ({fs_values});
EXCEPTION WHEN duplicate_object THEN NULL;
END $$;
"""
)
)
conn.commit()
@pytest.fixture
def db_engine(postgresql: Connection[Any]) -> Iterator[Engine]:
"""A SQLModel engine bound to a fresh, ephemeral PostgreSQL database."""
info = postgresql.info
url = f"postgresql+psycopg://{info.user}:@{info.host}:{info.port}/{info.dbname}"
engine = create_engine(url)
_create_pg_enum_types(engine)
SQLModel.metadata.create_all(engine)
try:
yield engine

View file

@ -0,0 +1,94 @@
from __future__ import annotations
import openpyxl
import pytest
from domain.magicplan.models import (
Door,
DoorVentilation,
Floor,
Plan,
Room,
Window,
WindowVentilation,
)
from domain.magicplan.ventilation_audit import populate_sheet
def _make_window(with_ventilation: bool = True) -> Window:
vent = (
WindowVentilation(
opening_type="Hinged",
num_openings=1,
pct_openable=50,
trickle_vent_area_mm2=1000,
num_trickle_vents=2,
)
if with_ventilation
else None
)
return Window(width_m=1.0, height_m=1.2, area_m2=1.2, ventilation=vent)
def _make_door(with_ventilation: bool = True) -> Door:
vent = DoorVentilation(undercut_mm=10.0) if with_ventilation else None
return Door(width_mm=800.0, height_mm=2000.0, ventilation=vent)
def _make_plan(
num_rooms: int = 1,
num_windows_per_room: int = 1,
num_doors_per_room: int = 1,
) -> Plan:
rooms = [
Room(
name=f"Room {i}",
width_m=3.0,
length_m=4.0,
area_m2=12.0,
windows=[_make_window() for _ in range(num_windows_per_room)],
doors=[_make_door() for _ in range(num_doors_per_room)],
)
for i in range(num_rooms)
]
return Plan(
uid="test-uid",
name="Test Plan",
address="1 Test St",
postcode="TE1 1ST",
floors=[Floor(level=0, name="Ground", rooms=rooms)],
)
def _blank_sheet() -> object:
return openpyxl.Workbook().active
def test_raises_when_rooms_exceed_50() -> None:
# Arrange
plan = _make_plan(num_rooms=51, num_windows_per_room=0, num_doors_per_room=0)
sheet = _blank_sheet()
# Act / Assert
with pytest.raises(ValueError, match="50"):
populate_sheet(sheet, plan)
def test_raises_when_windows_exceed_50() -> None:
# Arrange
plan = _make_plan(num_rooms=1, num_windows_per_room=51, num_doors_per_room=0)
sheet = _blank_sheet()
# Act / Assert
with pytest.raises(ValueError, match="50"):
populate_sheet(sheet, plan)
def test_raises_when_doors_exceed_50() -> None:
# Arrange
plan = _make_plan(num_rooms=1, num_windows_per_room=0, num_doors_per_room=51)
sheet = _blank_sheet()
# Act / Assert
with pytest.raises(ValueError, match="50"):
populate_sheet(sheet, plan)

View file

@ -0,0 +1,17 @@
from infrastructure.postgres.uploaded_file_table import (
FileSourceEnum,
FileTypeEnum,
UploadedFile,
)
def test_file_type_enum_has_ventilation_audit() -> None:
assert FileTypeEnum.VENTILATION_AUDIT.value == "ventilation_audit"
def test_file_source_enum_has_audit_generator() -> None:
assert FileSourceEnum.AUDIT_GENERATOR.value == "audit_generator"
def test_uploaded_file_is_importable() -> None:
assert UploadedFile.__tablename__ == "uploaded_files"

View file

@ -0,0 +1,204 @@
from __future__ import annotations
from io import BytesIO
from typing import Any
from unittest.mock import MagicMock, call, patch
import pytest
from domain.magicplan.models import (
Door,
DoorVentilation,
Floor,
Plan,
Room,
Window,
WindowVentilation,
)
from infrastructure.postgres.uploaded_file_table import (
FileSourceEnum,
FileTypeEnum,
UploadedFile,
)
from infrastructure.s3.s3_client import S3Client
from orchestration.audit_generator_orchestrator import AuditGeneratorOrchestrator
_DEAL_ID = "deal-abc"
_BUCKET = "test-bucket"
_EXPECTED_S3_KEY = f"documents/hubspot_deal_id/{_DEAL_ID}/ventilation_audit.xlsx"
def _make_window(with_ventilation: bool = True) -> Window:
vent = (
WindowVentilation(
opening_type="Hinged",
num_openings=1,
pct_openable=50,
trickle_vent_area_mm2=1000,
num_trickle_vents=2,
)
if with_ventilation
else None
)
return Window(width_m=1.0, height_m=1.2, area_m2=1.2, ventilation=vent)
def _make_door(with_ventilation: bool = True) -> Door:
vent = DoorVentilation(undercut_mm=10.0) if with_ventilation else None
return Door(width_mm=800.0, height_mm=2000.0, ventilation=vent)
def _make_plan(
num_rooms: int = 1,
num_windows_per_room: int = 1,
num_doors_per_room: int = 1,
) -> Plan:
rooms = [
Room(
name=f"Room {i}",
width_m=3.0,
length_m=4.0,
area_m2=12.0,
windows=[_make_window() for _ in range(num_windows_per_room)],
doors=[_make_door() for _ in range(num_doors_per_room)],
)
for i in range(num_rooms)
]
return Plan(
uid="test-uid",
name="Test Plan",
address="1 Test St",
postcode="TE1 1ST",
floors=[Floor(level=0, name="Ground", rooms=rooms)],
)
def _make_uploaded_file_row(id: int = 1) -> UploadedFile:
return UploadedFile(
id=id,
s3_file_bucket=_BUCKET,
s3_file_key="documents/deal/plan.json",
s3_upload_timestamp=None, # type: ignore[arg-type]
hubspot_deal_id=_DEAL_ID,
file_type=FileTypeEnum.MAGIC_PLAN_JSON.value,
)
def _make_mock_uow(
uploaded_file_row: Any = None,
plan: Any = None,
) -> tuple[MagicMock, MagicMock]:
"""Return (mock_uow, mock_uow_factory)."""
mock_uow = MagicMock()
mock_uow.__enter__ = MagicMock(return_value=mock_uow)
mock_uow.__exit__ = MagicMock(return_value=False)
mock_uow.uploaded_file.get_latest_by_hubspot_deal_id.return_value = uploaded_file_row
mock_uow.magic_plan.get_plan_by_uploaded_file_id.return_value = plan
mock_uow_factory = MagicMock(return_value=mock_uow)
return mock_uow, mock_uow_factory
def _make_s3() -> MagicMock:
s3 = MagicMock(spec=S3Client)
s3.bucket = _BUCKET
return s3
def _make_orchestrator(
s3: Any = None,
uow_factory: Any = None,
deal_id: str = _DEAL_ID,
) -> AuditGeneratorOrchestrator:
return AuditGeneratorOrchestrator(
hubspot_deal_id=deal_id,
s3_client=s3 or _make_s3(),
uow_factory=uow_factory or MagicMock(),
)
# --- error: no uploaded file ---
def test_raises_when_no_magic_plan_json_uploaded() -> None:
# Arrange
mock_uow, mock_uow_factory = _make_mock_uow(uploaded_file_row=None)
orch = _make_orchestrator(uow_factory=mock_uow_factory)
# Act / Assert
with pytest.raises(ValueError, match="No MagicPlan"):
orch.run()
# --- error: plan not yet parsed ---
def test_raises_when_plan_not_yet_parsed() -> None:
# Arrange
mock_uow, mock_uow_factory = _make_mock_uow(
uploaded_file_row=_make_uploaded_file_row(), plan=None
)
orch = _make_orchestrator(uow_factory=mock_uow_factory)
# Act / Assert
with pytest.raises(ValueError, match="not yet parsed"):
orch.run()
# --- happy path ---
def test_uploads_to_correct_s3_key() -> None:
# Arrange
s3 = _make_s3()
plan = _make_plan()
mock_uow, mock_uow_factory = _make_mock_uow(
uploaded_file_row=_make_uploaded_file_row(), plan=plan
)
orch = _make_orchestrator(s3=s3, uow_factory=mock_uow_factory)
# Act
orch.run()
# Assert
s3.put_object.assert_called_once()
assert s3.put_object.call_args.args[0] == _EXPECTED_S3_KEY
def test_inserts_uploaded_file_with_correct_enums() -> None:
# Arrange
plan = _make_plan()
mock_uow, mock_uow_factory = _make_mock_uow(
uploaded_file_row=_make_uploaded_file_row(), plan=plan
)
orch = _make_orchestrator(uow_factory=mock_uow_factory)
# Act
orch.run()
# Assert — the UploadedFile inserted has the correct type/source
mock_uow.uploaded_file.insert.assert_called_once()
inserted: UploadedFile = mock_uow.uploaded_file.insert.call_args.args[0]
assert inserted.file_type == FileTypeEnum.VENTILATION_AUDIT.value
assert inserted.file_source == FileSourceEnum.AUDIT_GENERATOR.value
assert inserted.hubspot_deal_id == _DEAL_ID
assert inserted.s3_file_key == _EXPECTED_S3_KEY
def test_commits_after_s3_upload() -> None:
# Arrange
s3 = _make_s3()
plan = _make_plan()
mock_uow, mock_uow_factory = _make_mock_uow(
uploaded_file_row=_make_uploaded_file_row(), plan=plan
)
call_order: list[str] = []
s3.put_object.side_effect = lambda *a, **kw: call_order.append("s3_upload")
mock_uow.commit.side_effect = lambda: call_order.append("commit")
orch = _make_orchestrator(s3=s3, uow_factory=mock_uow_factory)
# Act
orch.run()
# Assert — S3 upload happens before DB commit
assert call_order == ["s3_upload", "commit"]

View file

@ -9,7 +9,7 @@ from domain.magicplan.api.response import MagicPlanPlan, PlanSummary
from domain.magicplan.mapper import map_plan
from domain.magicplan.models import Plan
from backend.app.db.models.uploaded_file import (
from infrastructure.postgres.uploaded_file_table import (
FileSourceEnum,
FileTypeEnum,
UploadedFile,

View file

@ -0,0 +1,80 @@
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from sqlalchemy import Engine
from sqlmodel import Session
from infrastructure.postgres.uploaded_file_table import FileTypeEnum, UploadedFile
from repositories.uploaded_file.uploaded_file_postgres_repository import (
UploadedFilePostgresRepository,
)
_DEAL_ID = "deal-abc-123"
_BUCKET = "test-bucket"
def _make_uploaded_file(
hubspot_deal_id: str = _DEAL_ID,
file_type: FileTypeEnum = FileTypeEnum.MAGIC_PLAN_JSON,
offset_seconds: int = 0,
) -> UploadedFile:
return UploadedFile(
s3_file_bucket=_BUCKET,
s3_file_key=f"documents/{hubspot_deal_id}/plan.json",
s3_upload_timestamp=datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
+ timedelta(seconds=offset_seconds),
hubspot_deal_id=hubspot_deal_id,
file_type=file_type.value,
)
def test_returns_most_recent_row_by_timestamp(db_engine: Engine) -> None:
# Arrange — two rows for the same deal/type; older first, newer second
older = _make_uploaded_file(offset_seconds=0)
newer = _make_uploaded_file(offset_seconds=60)
with Session(db_engine) as session:
session.add(older)
session.add(newer)
session.commit()
newer_id = newer.id
# Act
with Session(db_engine) as session:
result = UploadedFilePostgresRepository(session).get_latest_by_hubspot_deal_id(
_DEAL_ID, FileTypeEnum.MAGIC_PLAN_JSON
)
# Assert
assert result is not None
assert result.id == newer_id
def test_returns_none_when_no_matching_row(db_engine: Engine) -> None:
# Arrange — empty database
# Act
with Session(db_engine) as session:
result = UploadedFilePostgresRepository(session).get_latest_by_hubspot_deal_id(
"nonexistent-deal", FileTypeEnum.MAGIC_PLAN_JSON
)
# Assert
assert result is None
def test_does_not_return_row_with_different_file_type(db_engine: Engine) -> None:
# Arrange — row exists but for a different file_type
row = _make_uploaded_file(file_type=FileTypeEnum.OTHER)
with Session(db_engine) as session:
session.add(row)
session.commit()
# Act
with Session(db_engine) as session:
result = UploadedFilePostgresRepository(session).get_latest_by_hubspot_deal_id(
_DEAL_ID, FileTypeEnum.MAGIC_PLAN_JSON
)
# Assert
assert result is None