merge dans code

This commit is contained in:
Jun-te Kim 2026-02-10 13:24:18 +00:00
commit 95dff50455
13 changed files with 180 additions and 112 deletions

View file

@ -157,10 +157,17 @@ jobs:
ecr_repo: condition-etl-${{ needs.determine_stage.outputs.stage }}
dockerfile_path: backend/condition/handler/Dockerfile
build_context: .
build_args: |
DEV_DB_HOST=$DEV_DB_HOST
DEV_DB_PORT=$DEV_DB_PORT
DEV_DB_NAME=$DEV_DB_NAME
secrets:
AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }}
AWS_REGION: ${{ secrets.DEV_AWS_REGION }}
DEV_DB_HOST: ${{ secrets.DEV_DB_HOST }}
DEV_DB_PORT: ${{ secrets.DEV_DB_PORT }}
DEV_DB_NAME: ${{ secrets.DEV_DB_NAME }}
# ============================================================
# Deploy Condition ETL Lambda

View file

@ -6,35 +6,17 @@ DB_PASSWORD=makingwarmerhomes
#not used
GOOGLE_SOLAR_API_KEY="test"
SAP_PREDICTIONS_BUCKET="test"
CARBON_PREDICTIONS_BUCKET="test"
HEAT_PREDICTIONS_BUCKET="test"
HEATING_KWH_PREDICTIONS_BUCKET="test"
HOTWATER_KWH_PREDICTIONS_BUCKET="test"
API_KEY="test"
ENVIRONMENT="test"
SECRET_KEY="test"
PLAN_TRIGGER_BUCKET="test"
DATA_BUCKET="test"
EPC_AUTH_TOKEN="test"
ENGINE_SQS_URL="test"
ENERGY_ASSESSMENTS_BUCKET="test"
API_KEY="test"
SECRET_KEY="test"
ENVIRONMENT="test"
DATA_BUCKET="test"
PLAN_TRIGGER_BUCKET="test"
ENGINE_SQS_URL="test"
GOOGLE_SOLAR_API_KEY="test"
DB_HOST="test"
DB_PASSWORD="test"
DB_USERNAME="test"
DB_PORT="5432"
DB_NAME="test"
SAP_PREDICTIONS_BUCKET="test"
CARBON_PREDICTIONS_BUCKET="test"
HEAT_PREDICTIONS_BUCKET="test"
HEATING_KWH_PREDICTIONS_BUCKET="test"
HOTWATER_KWH_PREDICTIONS_BUCKET="test"
ENERGY_ASSESSMENTS_BUCKET="test"
GOOGLE_SOLAR_API_KEY=test
SAP_PREDICTIONS_BUCKET=test
CARBON_PREDICTIONS_BUCKET=test
HEAT_PREDICTIONS_BUCKET=test
HEATING_KWH_PREDICTIONS_BUCKET=test
HOTWATER_KWH_PREDICTIONS_BUCKET=test
API_KEY=test
ENVIRONMENT=test
SECRET_KEY=test
PLAN_TRIGGER_BUCKET=test
DATA_BUCKET=test
EPC_AUTH_TOKEN=test
ENGINE_SQS_URL=test
ENERGY_ASSESSMENTS_BUCKET=test

View file

@ -1,9 +1,6 @@
FROM public.ecr.aws/lambda/python:3.10
# FROM python:3.11.10-bullseye
# This is not going to be permenant - but until we solve for env variables in live prod
ENV EPC_AUTH_TOKEN=a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzAg
# Set working directory (Lambda task root)
WORKDIR /var/task

View file

@ -1,4 +1,5 @@
from enum import Enum
from typing import Optional
from pydantic import BaseModel
@ -12,5 +13,21 @@ class ConditionTriggerRequest(BaseModel):
file_type: ConditionFileType
trigger_file_bucket: str # TODO: get this from settings
trigger_file_key: str
uprn_lookup_file_bucket: str # TODO: get this from settings
uprn_lookup_file_key: str
uprn_lookup_file_bucket: Optional[str] = None # TODO: get this from settings
uprn_lookup_file_key: Optional[str] = None
# {
# "file_type": "Peabody",
# "trigger_file_bucket": "condition-data-dev",
# "trigger_file_key": "input/peabody/2026_01_06 - Peabody - Stock Condition Data - Survey Records - D Lower.xlsx",
# "uprn_lookup_file_bucket": "condition-data-dev",
# "uprn_lookup_file_key": "input/peabody/uprn-lookup/PeabodyPropertymatched_Dec25_propref_UPRN.csv"
# }
# {
# "file_type": "LBWF",
# "trigger_file_bucket": "condition-data-dev",
# "trigger_file_key": "input/lbwf/LBWF - Example Asset Data September 2025.xlsx",
# }

View file

@ -2,26 +2,21 @@ FROM public.ecr.aws/lambda/python:3.11
# For local running:
# FROM python:3.11.10-bullseye
# ARG EPC_AUTH_TOKEN
# ARG DEV_DB_HOST
ARG JUNTE
ENV JUNTE=${JUNTE}
ARG DEV_DB_HOST
ENV DEV_DB_HOST=${DEV_DB_HOST}
ARG DEV_DB_PORT
ARG DEV_DB_NAME
ARG AWS_ACCESS_KEY_ID
ENV AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
ARG AWS_REGION
ENV AWS_REGION=${AWS_REGION}
# Set working directory (Lambda task root)
WORKDIR /var/task
# Environment
ENV DB_HOST=${DEV_DB_HOST}
ENV DB_PORT=${DEV_DB_PORT}
ENV DB_NAME=${DEV_DB_NAME}
COPY backend/.env.local backend/.env.local
# -----------------------------
# Copy requirements FIRST (for Docker layer caching)
# -----------------------------
@ -45,8 +40,6 @@ COPY backend/app/__init__.py backend/app/__init__.py
COPY backend/app/db/__init__.py backend/app/db/__init__.py
# ENV EPC_AUTH_TOKEN=${EPC_AUTH_TOKEN}
# -----------------------------
# Lambda handler
# -----------------------------

View file

@ -1,55 +1,51 @@
# import json
import json
from typing import Mapping, Any
import os
from io import BytesIO
# from io import BytesIO
# from backend.condition.condition_trigger_request import ConditionTriggerRequest
# from backend.condition.lookups.uprn_lookup_s3 import UprnLookupS3
# from backend.condition.processor import process_file
# from utils.logger import setup_logger
# from utils.s3 import read_io_from_s3
from backend.condition.condition_trigger_request import ConditionTriggerRequest
from backend.condition.lookups.uprn_lookup_s3 import UprnLookupS3
from backend.condition.processor import process_file
from utils.logger import setup_logger
from utils.s3 import read_io_from_s3
# logger = setup_logger()
logger = setup_logger()
def handler(event: Mapping[str, Any], context: Any) -> None:
print(
"hello Jun-te",
os.getenv("JUNTE", "empty junte"),
)
print(
"hello DEV DB HOST:",
os.getenv("DEV_DB_HOST", "empty db"),
)
print(
"hello access key",
os.getenv("AWS_ACCESS_KEY_ID", "empty key"),
)
print(
"hello region",
os.getenv("AWS_REGION", "empty region"),
)
# uprn_lookup = UprnLookupS3(
# bucket="", key=""
# ) # TODO: replace with postgres implementation
# for record in event.get("Records", []):
# try:
# body_dict = json.loads(record["body"])
# payload = ConditionTriggerRequest.model_validate(body_dict)
for record in event.get("Records", []):
try:
body_dict = json.loads(record["body"])
logger.debug("Validating request body")
payload = ConditionTriggerRequest.model_validate(body_dict)
# file_bytes: BytesIO = read_io_from_s3(
# bucket_name=payload.trigger_file_bucket,
# file_key=payload.trigger_file_key,
# )
logger.debug("Successfully validated request body")
# process_file(
# file_stream=file_bytes,
# file_type=payload.file_type,
# uprn_lookup=uprn_lookup,
# )
if payload.uprn_lookup_file_bucket and payload.uprn_lookup_file_key:
logger.debug("Getting UPRN lookup file from s3")
uprn_lookup = UprnLookupS3(
bucket=payload.uprn_lookup_file_bucket,
key=payload.uprn_lookup_file_key,
) # TODO: replace with postgres implementation
logger.debug("Successfully got UPRN lookup file from s3")
else:
uprn_lookup = None
# except Exception as e:
# logger.error(f"Failed to process record: {e}")
logger.debug("Getting conditions data from s3")
file_bytes: BytesIO = read_io_from_s3(
bucket_name=payload.trigger_file_bucket,
file_key=payload.trigger_file_key,
)
logger.debug(
"Successfully got conditions data from s3. Moving on to process file..."
)
process_file(
file_stream=file_bytes,
file_type=payload.file_type,
uprn_lookup=uprn_lookup,
)
except Exception as e:
logger.error(f"Failed to process record: {e}")

View file

@ -1,6 +1,7 @@
openpyxl
sqlmodel
pydantic-settings
psycopg2-binary==2.9.10
# pandas isn't used, but needed for importing from utils.s3
pandas==2.2.2

View file

@ -23,11 +23,17 @@ class PeabodyParser(Parser):
self,
file_stream: BinaryIO,
) -> Any:
wb: Workbook = load_workbook(file_stream)
file_stream.seek(0)
logger.debug("[PeabodyParser] Loading workbook...")
wb: Workbook = load_workbook(file_stream, read_only=True, data_only=True)
logger.debug("[PeabodyParser] Successfully loaded workbook. Parsing assets...")
assets = PeabodyParser._parse_assets(wb)
logger.debug(
"[PeabodyParser] Successfully parsed assets. Parsing UPRN lookup..."
)
location_ref_to_uprn_map = self.uprn_lookup.get_property_ref_to_uprn_lookup()
logger.debug("[PeabodyParser] Successfully parsed UPRN lookup")
return PeabodyParser._group_assets_into_properties(
assets=assets,
location_ref_to_uprn_map=location_ref_to_uprn_map,
@ -49,7 +55,7 @@ class PeabodyParser(Parser):
)
if not asset.is_block_level:
# Block-level condition surveys are out of scope for now
# until we have a wider think on how to handle block
# until we have a wider think on how to handle blocks
assets.append(asset) # TODO: handle block-level assets
except Exception as e:
@ -74,13 +80,14 @@ class PeabodyParser(Parser):
assets_by_location_reference[asset.lo_reference].append(asset)
properties: List[PeabodyProperty] = []
failed_mappings_count = 0
for location_ref, grouped_assets in assets_by_location_reference.items():
uprn = location_ref_to_uprn_map.get(location_ref)
if uprn is None:
logger.warning(f"No UPRN found for Location Reference: {location_ref}")
failed_mappings_count += 1
continue
properties.append(
@ -90,6 +97,7 @@ class PeabodyParser(Parser):
)
)
logger.warning(f"No UPRN found for {failed_mappings_count} Location References")
return properties
@staticmethod

View file

@ -19,18 +19,19 @@ class ConditionPostgres:
def bulk_insert_surveys(
self, surveys: List[PropertyConditionSurvey], batch_size: Optional[int] = 100
) -> None:
logger.info(
f"Preparing to load {len(surveys)} property surveys to Postgres. Mapping to SQLModel objects..."
logger.debug(
f"[ConditionPostgres] Preparing to load {len(surveys)} property surveys to Postgres. Mapping to SQLModel objects..."
)
survey_models: List[PropertyConditionSurveyModel] = [
ConditionPostgres.map_survey_to_model(s) for s in surveys
]
total: int = len(survey_models)
logger.info(
f"Finished mapping {total} surveys. Writing to database in batches of {batch_size}..."
logger.debug(
f"[ConditionPostgres] Finished mapping {total} surveys. Writing to database in batches of {batch_size}..."
)
with db_session() as session:
logger.info("[ConditionPostgres] Successfully made connection to database")
for start in range(0, total, batch_size):
end = min(start + batch_size, total)
batch = survey_models[start:end]

View file

@ -1,4 +1,4 @@
from typing import Any, BinaryIO, List
from typing import Any, BinaryIO, List, Optional
from datetime import datetime
from backend.condition.condition_trigger_request import ConditionFileType
@ -14,13 +14,18 @@ logger = setup_logger()
def process_file(
file_stream: BinaryIO, file_type: ConditionFileType, uprn_lookup: UprnLookup
file_stream: BinaryIO,
file_type: ConditionFileType,
uprn_lookup: Optional[UprnLookup],
) -> None:
# Instantiation
logger.debug(f"[processor] Instantiating classes...")
parser: Parser = select_parser(file_type, uprn_lookup)
mapper: Mapper = select_mapper(file_type)
persistence = ConditionPostgres()
logger.debug(f"[processor] Finished instantiating classes. Calling Parser...")
# Orchestration
raw_properties: List[Any] = parser.parse(file_stream)
@ -41,6 +46,6 @@ def process_file(
f"[processor] Finished mapping {len(property_condition_surveys)} properties. Writing to database..."
)
# persistence.bulk_insert_surveys(property_condition_surveys)
persistence.bulk_insert_surveys(property_condition_surveys)
logger.info(f"[processor] Finished loading surveys to database")

View file

@ -1,3 +1,21 @@
data "aws_secretsmanager_secret_version" "db_credentials" {
secret_id = "${var.stage}/assessment_model/db_credentials"
}
data "terraform_remote_state" "shared" {
backend = "s3"
config = {
bucket = "assessment-model-terraform-state"
key = "env:/${var.stage}/terraform.tfstate" # TODO: dont hardcode this
region = "eu-west-2"
}
}
locals {
db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)
}
module "lambda" {
source = "../modules/lambda_with_sqs"
@ -5,10 +23,21 @@ module "lambda" {
stage = var.stage
image_uri = local.image_uri
timeout = 180
environment = {
STAGE = var.stage
LOG_LEVEL = "info"
}
environment = merge(
{
STAGE = var.stage
LOG_LEVEL = "info"
DB_USERNAME = local.db_credentials.db_assessment_model_username
DB_PASSWORD = local.db_credentials.db_assessment_model_password
},
)
}
resource "aws_iam_role_policy_attachment" "attach_condition_etl_s3_read" {
role = module.lambda.role_name
policy_arn = data.terraform_remote_state.shared.outputs.condition_etl_s3_read_arn
}

View file

@ -6,6 +6,10 @@ module "role" {
name = "${var.name}-lambda-${var.stage}"
}
output "role_name" {
value = module.role.role_name
}
############################################
# SQS queue + DLQ
############################################

View file

@ -335,4 +335,32 @@ module "postcode_splitter_registry" {
name = "postcode_splitter"
stage = var.stage
}
################################################
# Conidition data S3 bucket
################################################
module "condition_data_bucket" {
source = "../modules/s3"
bucketname = "condition-data-${var.stage}"
allowed_origins = var.allowed_origins
}
resource "aws_iam_policy" "condition_etl_s3_read" {
name = "ConditionETLReadS3"
description = "Allow Lambda to read objects from condition-data-${var.stage}"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = ["s3:GetObject"]
Resource = "arn:aws:s3:::condition-data-${var.stage}/*"
}
]
})
}
output "condition_etl_s3_read_arn" {
value = aws_iam_policy.condition_etl_s3_read.arn
}