diff --git a/.github/workflows/deploy_terraform.yml b/.github/workflows/deploy_terraform.yml index c863f6f1..0d235ab1 100644 --- a/.github/workflows/deploy_terraform.yml +++ b/.github/workflows/deploy_terraform.yml @@ -157,10 +157,17 @@ jobs: ecr_repo: condition-etl-${{ needs.determine_stage.outputs.stage }} dockerfile_path: backend/condition/handler/Dockerfile build_context: . + build_args: | + DEV_DB_HOST=$DEV_DB_HOST + DEV_DB_PORT=$DEV_DB_PORT + DEV_DB_NAME=$DEV_DB_NAME secrets: AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }} AWS_REGION: ${{ secrets.DEV_AWS_REGION }} + DEV_DB_HOST: ${{ secrets.DEV_DB_HOST }} + DEV_DB_PORT: ${{ secrets.DEV_DB_PORT }} + DEV_DB_NAME: ${{ secrets.DEV_DB_NAME }} # ============================================================ # Deploy Condition ETL Lambda diff --git a/backend/.env.local b/backend/.env.local index 9b478e53..1679f10f 100644 --- a/backend/.env.local +++ b/backend/.env.local @@ -6,35 +6,17 @@ DB_PASSWORD=makingwarmerhomes #not used -GOOGLE_SOLAR_API_KEY="test" -SAP_PREDICTIONS_BUCKET="test" -CARBON_PREDICTIONS_BUCKET="test" -HEAT_PREDICTIONS_BUCKET="test" -HEATING_KWH_PREDICTIONS_BUCKET="test" -HOTWATER_KWH_PREDICTIONS_BUCKET="test" -API_KEY="test" -ENVIRONMENT="test" -SECRET_KEY="test" -PLAN_TRIGGER_BUCKET="test" -DATA_BUCKET="test" -EPC_AUTH_TOKEN="test" -ENGINE_SQS_URL="test" -ENERGY_ASSESSMENTS_BUCKET="test" -API_KEY="test" -SECRET_KEY="test" -ENVIRONMENT="test" -DATA_BUCKET="test" -PLAN_TRIGGER_BUCKET="test" -ENGINE_SQS_URL="test" -GOOGLE_SOLAR_API_KEY="test" -DB_HOST="test" -DB_PASSWORD="test" -DB_USERNAME="test" -DB_PORT="5432" -DB_NAME="test" -SAP_PREDICTIONS_BUCKET="test" -CARBON_PREDICTIONS_BUCKET="test" -HEAT_PREDICTIONS_BUCKET="test" -HEATING_KWH_PREDICTIONS_BUCKET="test" -HOTWATER_KWH_PREDICTIONS_BUCKET="test" -ENERGY_ASSESSMENTS_BUCKET="test" \ No newline at end of file +GOOGLE_SOLAR_API_KEY=test +SAP_PREDICTIONS_BUCKET=test +CARBON_PREDICTIONS_BUCKET=test +HEAT_PREDICTIONS_BUCKET=test +HEATING_KWH_PREDICTIONS_BUCKET=test +HOTWATER_KWH_PREDICTIONS_BUCKET=test +API_KEY=test +ENVIRONMENT=test +SECRET_KEY=test +PLAN_TRIGGER_BUCKET=test +DATA_BUCKET=test +EPC_AUTH_TOKEN=test +ENGINE_SQS_URL=test +ENERGY_ASSESSMENTS_BUCKET=test diff --git a/backend/address2UPRN/handler/Dockerfile b/backend/address2UPRN/handler/Dockerfile index c6dc1180..d01550a2 100644 --- a/backend/address2UPRN/handler/Dockerfile +++ b/backend/address2UPRN/handler/Dockerfile @@ -1,9 +1,6 @@ FROM public.ecr.aws/lambda/python:3.10 # FROM python:3.11.10-bullseye -# This is not going to be permenant - but until we solve for env variables in live prod -ENV EPC_AUTH_TOKEN=a2Nvbm5rb3dsZXNzYXJAZ21haWwuY29tOjY5MGJiMWM0NmIyOGI5ZDUxYzAxMzQzYzNiZGNlZGJjZDNmODQwMzAg - # Set working directory (Lambda task root) WORKDIR /var/task diff --git a/backend/condition/condition_trigger_request.py b/backend/condition/condition_trigger_request.py index 1bea6a0d..03bd6ad1 100644 --- a/backend/condition/condition_trigger_request.py +++ b/backend/condition/condition_trigger_request.py @@ -1,4 +1,5 @@ from enum import Enum +from typing import Optional from pydantic import BaseModel @@ -12,5 +13,21 @@ class ConditionTriggerRequest(BaseModel): file_type: ConditionFileType trigger_file_bucket: str # TODO: get this from settings trigger_file_key: str - uprn_lookup_file_bucket: str # TODO: get this from settings - uprn_lookup_file_key: str + + uprn_lookup_file_bucket: Optional[str] = None # TODO: get this from settings + uprn_lookup_file_key: Optional[str] = None + + +# { +# "file_type": "Peabody", +# "trigger_file_bucket": "condition-data-dev", +# "trigger_file_key": "input/peabody/2026_01_06 - Peabody - Stock Condition Data - Survey Records - D Lower.xlsx", +# "uprn_lookup_file_bucket": "condition-data-dev", +# "uprn_lookup_file_key": "input/peabody/uprn-lookup/PeabodyPropertymatched_Dec25_propref_UPRN.csv" +# } + +# { +# "file_type": "LBWF", +# "trigger_file_bucket": "condition-data-dev", +# "trigger_file_key": "input/lbwf/LBWF - Example Asset Data September 2025.xlsx", +# } diff --git a/backend/condition/handler/Dockerfile b/backend/condition/handler/Dockerfile index 8759dff3..031d981e 100644 --- a/backend/condition/handler/Dockerfile +++ b/backend/condition/handler/Dockerfile @@ -2,26 +2,21 @@ FROM public.ecr.aws/lambda/python:3.11 # For local running: # FROM python:3.11.10-bullseye -# ARG EPC_AUTH_TOKEN - - -# ARG DEV_DB_HOST -ARG JUNTE -ENV JUNTE=${JUNTE} - ARG DEV_DB_HOST -ENV DEV_DB_HOST=${DEV_DB_HOST} +ARG DEV_DB_PORT +ARG DEV_DB_NAME -ARG AWS_ACCESS_KEY_ID -ENV AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} - -ARG AWS_REGION -ENV AWS_REGION=${AWS_REGION} - # Set working directory (Lambda task root) WORKDIR /var/task +# Environment +ENV DB_HOST=${DEV_DB_HOST} +ENV DB_PORT=${DEV_DB_PORT} +ENV DB_NAME=${DEV_DB_NAME} + +COPY backend/.env.local backend/.env.local + # ----------------------------- # Copy requirements FIRST (for Docker layer caching) # ----------------------------- @@ -45,8 +40,6 @@ COPY backend/app/__init__.py backend/app/__init__.py COPY backend/app/db/__init__.py backend/app/db/__init__.py -# ENV EPC_AUTH_TOKEN=${EPC_AUTH_TOKEN} - # ----------------------------- # Lambda handler # ----------------------------- diff --git a/backend/condition/handler/handler.py b/backend/condition/handler/handler.py index 0f8dd940..2f3616a4 100644 --- a/backend/condition/handler/handler.py +++ b/backend/condition/handler/handler.py @@ -1,55 +1,51 @@ -# import json +import json from typing import Mapping, Any -import os +from io import BytesIO -# from io import BytesIO - -# from backend.condition.condition_trigger_request import ConditionTriggerRequest -# from backend.condition.lookups.uprn_lookup_s3 import UprnLookupS3 -# from backend.condition.processor import process_file -# from utils.logger import setup_logger -# from utils.s3 import read_io_from_s3 +from backend.condition.condition_trigger_request import ConditionTriggerRequest +from backend.condition.lookups.uprn_lookup_s3 import UprnLookupS3 +from backend.condition.processor import process_file +from utils.logger import setup_logger +from utils.s3 import read_io_from_s3 -# logger = setup_logger() +logger = setup_logger() def handler(event: Mapping[str, Any], context: Any) -> None: - print( - "hello Jun-te", - os.getenv("JUNTE", "empty junte"), - ) - print( - "hello DEV DB HOST:", - os.getenv("DEV_DB_HOST", "empty db"), - ) - print( - "hello access key", - os.getenv("AWS_ACCESS_KEY_ID", "empty key"), - ) - print( - "hello region", - os.getenv("AWS_REGION", "empty region"), - ) - # uprn_lookup = UprnLookupS3( - # bucket="", key="" - # ) # TODO: replace with postgres implementation - # for record in event.get("Records", []): - # try: - # body_dict = json.loads(record["body"]) - # payload = ConditionTriggerRequest.model_validate(body_dict) + for record in event.get("Records", []): + try: + body_dict = json.loads(record["body"]) + logger.debug("Validating request body") + payload = ConditionTriggerRequest.model_validate(body_dict) - # file_bytes: BytesIO = read_io_from_s3( - # bucket_name=payload.trigger_file_bucket, - # file_key=payload.trigger_file_key, - # ) + logger.debug("Successfully validated request body") - # process_file( - # file_stream=file_bytes, - # file_type=payload.file_type, - # uprn_lookup=uprn_lookup, - # ) + if payload.uprn_lookup_file_bucket and payload.uprn_lookup_file_key: + logger.debug("Getting UPRN lookup file from s3") + uprn_lookup = UprnLookupS3( + bucket=payload.uprn_lookup_file_bucket, + key=payload.uprn_lookup_file_key, + ) # TODO: replace with postgres implementation + logger.debug("Successfully got UPRN lookup file from s3") + else: + uprn_lookup = None - # except Exception as e: - # logger.error(f"Failed to process record: {e}") + logger.debug("Getting conditions data from s3") + file_bytes: BytesIO = read_io_from_s3( + bucket_name=payload.trigger_file_bucket, + file_key=payload.trigger_file_key, + ) + logger.debug( + "Successfully got conditions data from s3. Moving on to process file..." + ) + + process_file( + file_stream=file_bytes, + file_type=payload.file_type, + uprn_lookup=uprn_lookup, + ) + + except Exception as e: + logger.error(f"Failed to process record: {e}") diff --git a/backend/condition/handler/requirements.txt b/backend/condition/handler/requirements.txt index 2a54437f..1e259a95 100644 --- a/backend/condition/handler/requirements.txt +++ b/backend/condition/handler/requirements.txt @@ -1,6 +1,7 @@ openpyxl sqlmodel pydantic-settings +psycopg2-binary==2.9.10 # pandas isn't used, but needed for importing from utils.s3 pandas==2.2.2 diff --git a/backend/condition/parsing/peabody_parser.py b/backend/condition/parsing/peabody_parser.py index 126bcfea..4620ba82 100644 --- a/backend/condition/parsing/peabody_parser.py +++ b/backend/condition/parsing/peabody_parser.py @@ -23,11 +23,17 @@ class PeabodyParser(Parser): self, file_stream: BinaryIO, ) -> Any: - wb: Workbook = load_workbook(file_stream) + file_stream.seek(0) + logger.debug("[PeabodyParser] Loading workbook...") + wb: Workbook = load_workbook(file_stream, read_only=True, data_only=True) + logger.debug("[PeabodyParser] Successfully loaded workbook. Parsing assets...") assets = PeabodyParser._parse_assets(wb) + logger.debug( + "[PeabodyParser] Successfully parsed assets. Parsing UPRN lookup..." + ) location_ref_to_uprn_map = self.uprn_lookup.get_property_ref_to_uprn_lookup() - + logger.debug("[PeabodyParser] Successfully parsed UPRN lookup") return PeabodyParser._group_assets_into_properties( assets=assets, location_ref_to_uprn_map=location_ref_to_uprn_map, @@ -49,7 +55,7 @@ class PeabodyParser(Parser): ) if not asset.is_block_level: # Block-level condition surveys are out of scope for now - # until we have a wider think on how to handle block + # until we have a wider think on how to handle blocks assets.append(asset) # TODO: handle block-level assets except Exception as e: @@ -74,13 +80,14 @@ class PeabodyParser(Parser): assets_by_location_reference[asset.lo_reference].append(asset) properties: List[PeabodyProperty] = [] + failed_mappings_count = 0 for location_ref, grouped_assets in assets_by_location_reference.items(): uprn = location_ref_to_uprn_map.get(location_ref) if uprn is None: - logger.warning(f"No UPRN found for Location Reference: {location_ref}") + failed_mappings_count += 1 continue properties.append( @@ -90,6 +97,7 @@ class PeabodyParser(Parser): ) ) + logger.warning(f"No UPRN found for {failed_mappings_count} Location References") return properties @staticmethod diff --git a/backend/condition/persistence/condition_postgres.py b/backend/condition/persistence/condition_postgres.py index 9d7895f0..e83df540 100644 --- a/backend/condition/persistence/condition_postgres.py +++ b/backend/condition/persistence/condition_postgres.py @@ -19,18 +19,19 @@ class ConditionPostgres: def bulk_insert_surveys( self, surveys: List[PropertyConditionSurvey], batch_size: Optional[int] = 100 ) -> None: - logger.info( - f"Preparing to load {len(surveys)} property surveys to Postgres. Mapping to SQLModel objects..." + logger.debug( + f"[ConditionPostgres] Preparing to load {len(surveys)} property surveys to Postgres. Mapping to SQLModel objects..." ) survey_models: List[PropertyConditionSurveyModel] = [ ConditionPostgres.map_survey_to_model(s) for s in surveys ] total: int = len(survey_models) - logger.info( - f"Finished mapping {total} surveys. Writing to database in batches of {batch_size}..." + logger.debug( + f"[ConditionPostgres] Finished mapping {total} surveys. Writing to database in batches of {batch_size}..." ) with db_session() as session: + logger.info("[ConditionPostgres] Successfully made connection to database") for start in range(0, total, batch_size): end = min(start + batch_size, total) batch = survey_models[start:end] diff --git a/backend/condition/processor.py b/backend/condition/processor.py index 70ce2df9..ad5b4232 100644 --- a/backend/condition/processor.py +++ b/backend/condition/processor.py @@ -1,4 +1,4 @@ -from typing import Any, BinaryIO, List +from typing import Any, BinaryIO, List, Optional from datetime import datetime from backend.condition.condition_trigger_request import ConditionFileType @@ -14,13 +14,18 @@ logger = setup_logger() def process_file( - file_stream: BinaryIO, file_type: ConditionFileType, uprn_lookup: UprnLookup + file_stream: BinaryIO, + file_type: ConditionFileType, + uprn_lookup: Optional[UprnLookup], ) -> None: # Instantiation + logger.debug(f"[processor] Instantiating classes...") parser: Parser = select_parser(file_type, uprn_lookup) mapper: Mapper = select_mapper(file_type) persistence = ConditionPostgres() + logger.debug(f"[processor] Finished instantiating classes. Calling Parser...") + # Orchestration raw_properties: List[Any] = parser.parse(file_stream) @@ -41,6 +46,6 @@ def process_file( f"[processor] Finished mapping {len(property_condition_surveys)} properties. Writing to database..." ) - # persistence.bulk_insert_surveys(property_condition_surveys) + persistence.bulk_insert_surveys(property_condition_surveys) logger.info(f"[processor] Finished loading surveys to database") diff --git a/infrastructure/terraform/lambda/condition-etl/main.tf b/infrastructure/terraform/lambda/condition-etl/main.tf index a421f898..4219f209 100644 --- a/infrastructure/terraform/lambda/condition-etl/main.tf +++ b/infrastructure/terraform/lambda/condition-etl/main.tf @@ -1,3 +1,21 @@ +data "aws_secretsmanager_secret_version" "db_credentials" { + secret_id = "${var.stage}/assessment_model/db_credentials" +} + +data "terraform_remote_state" "shared" { + backend = "s3" + config = { + bucket = "assessment-model-terraform-state" + key = "env:/${var.stage}/terraform.tfstate" # TODO: dont hardcode this + region = "eu-west-2" + } +} + +locals { + db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string) +} + + module "lambda" { source = "../modules/lambda_with_sqs" @@ -5,10 +23,21 @@ module "lambda" { stage = var.stage image_uri = local.image_uri + timeout = 180 - environment = { - STAGE = var.stage - LOG_LEVEL = "info" - } + environment = merge( + { + STAGE = var.stage + LOG_LEVEL = "info" + DB_USERNAME = local.db_credentials.db_assessment_model_username + DB_PASSWORD = local.db_credentials.db_assessment_model_password + }, + ) + } + +resource "aws_iam_role_policy_attachment" "attach_condition_etl_s3_read" { + role = module.lambda.role_name + policy_arn = data.terraform_remote_state.shared.outputs.condition_etl_s3_read_arn +} \ No newline at end of file diff --git a/infrastructure/terraform/lambda/modules/lambda_with_sqs/main.tf b/infrastructure/terraform/lambda/modules/lambda_with_sqs/main.tf index 3816c206..065fb790 100644 --- a/infrastructure/terraform/lambda/modules/lambda_with_sqs/main.tf +++ b/infrastructure/terraform/lambda/modules/lambda_with_sqs/main.tf @@ -6,6 +6,10 @@ module "role" { name = "${var.name}-lambda-${var.stage}" } +output "role_name" { + value = module.role.role_name +} + ############################################ # SQS queue + DLQ ############################################ diff --git a/infrastructure/terraform/shared/main.tf b/infrastructure/terraform/shared/main.tf index a5b5ea4c..b1474055 100644 --- a/infrastructure/terraform/shared/main.tf +++ b/infrastructure/terraform/shared/main.tf @@ -335,4 +335,32 @@ module "postcode_splitter_registry" { name = "postcode_splitter" stage = var.stage +} + +################################################ +# Conidition data – S3 bucket +################################################ +module "condition_data_bucket" { + source = "../modules/s3" + bucketname = "condition-data-${var.stage}" + allowed_origins = var.allowed_origins +} + +resource "aws_iam_policy" "condition_etl_s3_read" { + name = "ConditionETLReadS3" + description = "Allow Lambda to read objects from condition-data-${var.stage}" + policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Effect = "Allow" + Action = ["s3:GetObject"] + Resource = "arn:aws:s3:::condition-data-${var.stage}/*" + } + ] + }) +} + +output "condition_etl_s3_read_arn" { + value = aws_iam_policy.condition_etl_s3_read.arn } \ No newline at end of file