mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
added bulk address 2 uprn lmabda
This commit is contained in:
parent
2ffabe94d5
commit
ec4c870465
12 changed files with 319 additions and 0 deletions
40
.github/workflows/deploy_terraform.yml
vendored
40
.github/workflows/deploy_terraform.yml
vendored
|
|
@ -201,6 +201,46 @@ jobs:
|
|||
AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }}
|
||||
AWS_REGION: ${{ secrets.DEV_AWS_REGION }}
|
||||
|
||||
# ============================================================
|
||||
# Build Bulk Address2UPRN Combiner image and Push
|
||||
# ============================================================
|
||||
bulk_address2uprn_combiner_image:
|
||||
needs: [determine_stage, shared_terraform]
|
||||
uses: ./.github/workflows/_build_image.yml
|
||||
with:
|
||||
ecr_repo: bulk_address2uprn_combiner-${{ needs.determine_stage.outputs.stage }}
|
||||
dockerfile_path: backend/bulk_address2uprn_combiner/handler/Dockerfile
|
||||
build_context: .
|
||||
build_args: |
|
||||
DEV_DB_HOST=$DEV_DB_HOST
|
||||
DEV_DB_PORT=$DEV_DB_PORT
|
||||
DEV_DB_NAME=$DEV_DB_NAME
|
||||
secrets:
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }}
|
||||
AWS_REGION: ${{ secrets.DEV_AWS_REGION }}
|
||||
DEV_DB_HOST: ${{ secrets.DEV_DB_HOST }}
|
||||
DEV_DB_PORT: ${{ secrets.DEV_DB_PORT }}
|
||||
DEV_DB_NAME: ${{ secrets.DEV_DB_NAME }}
|
||||
|
||||
# ============================================================
|
||||
# Deploy Bulk Address2UPRN Combiner Lambda
|
||||
# ============================================================
|
||||
bulk_address2uprn_combiner_lambda:
|
||||
needs: [bulk_address2uprn_combiner_image, determine_stage, shared_terraform]
|
||||
uses: ./.github/workflows/_deploy_lambda.yml
|
||||
with:
|
||||
lambda_name: bulk_address2uprn_combiner
|
||||
lambda_path: infrastructure/terraform/lambda/bulk_address2uprn_combiner
|
||||
stage: ${{ needs.determine_stage.outputs.stage }}
|
||||
ecr_repo: bulk_address2uprn_combiner-${{ needs.determine_stage.outputs.stage }}
|
||||
image_digest: ${{ needs.bulk_address2uprn_combiner_image.outputs.image_digest }}
|
||||
terraform_apply: ${{ needs.determine_stage.outputs.terraform_apply }}
|
||||
secrets:
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }}
|
||||
AWS_REGION: ${{ secrets.DEV_AWS_REGION }}
|
||||
|
||||
# ============================================================
|
||||
# Condition ETL image and Push
|
||||
# ============================================================
|
||||
|
|
|
|||
5
.gitignore
vendored
5
.gitignore
vendored
|
|
@ -278,6 +278,11 @@ cache/
|
|||
|
||||
*.png
|
||||
*.pptx
|
||||
*.csv
|
||||
*.xlsx
|
||||
*.pdf
|
||||
**/Chunks/
|
||||
*.ipynb
|
||||
|
||||
local_data*
|
||||
|
||||
|
|
|
|||
30
backend/app/db/models/bulk_address_uploads.py
Normal file
30
backend/app/db/models/bulk_address_uploads.py
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
from typing import Optional
|
||||
from uuid import UUID, uuid4
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from sqlmodel import SQLModel, Field, select
|
||||
|
||||
from backend.app.db.connection import get_db_session
|
||||
|
||||
|
||||
class BulkAddressUpload(SQLModel, table=True):
|
||||
__tablename__ = "bulk_address_uploads"
|
||||
|
||||
id: UUID = Field(default_factory=uuid4, primary_key=True, index=True)
|
||||
task_id: UUID = Field(foreign_key="tasks.id", index=True)
|
||||
combined_csv_s3_uri: Optional[str] = Field(default=None)
|
||||
updated_at: datetime = Field(default_factory=datetime.utcnow)
|
||||
|
||||
|
||||
def set_combined_csv_s3_uri(task_id: UUID, s3_uri: str) -> None:
|
||||
now = datetime.now(timezone.utc)
|
||||
with get_db_session() as session:
|
||||
row = session.exec(
|
||||
select(BulkAddressUpload).where(BulkAddressUpload.task_id == task_id)
|
||||
).first()
|
||||
if not row:
|
||||
raise ValueError(f"No bulk_address_uploads row for task_id {task_id}")
|
||||
row.combined_csv_s3_uri = s3_uri
|
||||
row.updated_at = now
|
||||
session.add(row)
|
||||
session.commit()
|
||||
0
backend/bulk_address2uprn_combiner/__init__.py
Normal file
0
backend/bulk_address2uprn_combiner/__init__.py
Normal file
23
backend/bulk_address2uprn_combiner/handler/Dockerfile
Normal file
23
backend/bulk_address2uprn_combiner/handler/Dockerfile
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
FROM public.ecr.aws/lambda/python:3.11
|
||||
|
||||
ARG DEV_DB_HOST
|
||||
ARG DEV_DB_PORT
|
||||
ARG DEV_DB_NAME
|
||||
|
||||
ENV DB_HOST=${DEV_DB_HOST}
|
||||
ENV DB_PORT=${DEV_DB_PORT}
|
||||
ENV DB_NAME=${DEV_DB_NAME}
|
||||
|
||||
WORKDIR /var/task
|
||||
|
||||
COPY backend/bulk_address2uprn_combiner/handler/requirements.txt .
|
||||
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY utils/ utils/
|
||||
COPY backend/ backend/
|
||||
COPY datatypes/ datatypes/
|
||||
|
||||
COPY backend/bulk_address2uprn_combiner/main.py .
|
||||
|
||||
CMD ["main.handler"]
|
||||
|
|
@ -0,0 +1,7 @@
|
|||
pandas==2.2.2
|
||||
numpy<2.0
|
||||
boto3==1.35.44
|
||||
sqlmodel
|
||||
sqlalchemy==2.0.36
|
||||
psycopg2-binary==2.9.10
|
||||
pydantic-settings==2.6.0
|
||||
74
backend/bulk_address2uprn_combiner/main.py
Normal file
74
backend/bulk_address2uprn_combiner/main.py
Normal file
|
|
@ -0,0 +1,74 @@
|
|||
import os
|
||||
import boto3
|
||||
import pandas as pd
|
||||
from io import BytesIO
|
||||
from typing import Any
|
||||
from uuid import UUID
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from utils.logger import setup_logger
|
||||
from backend.utils.subtasks import subtask_handler
|
||||
from backend.app.db.models.bulk_address_uploads import set_combined_csv_s3_uri
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")
|
||||
|
||||
|
||||
def list_csv_files(s3_client, bucket: str, task_id: str) -> list[str]:
|
||||
paginator = s3_client.get_paginator("list_objects_v2")
|
||||
prefix = f"ara_raw_outputs/{task_id}/"
|
||||
keys = []
|
||||
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
|
||||
for obj in page.get("Contents", []):
|
||||
if obj["Key"].endswith(".csv"):
|
||||
keys.append(obj["Key"])
|
||||
return keys
|
||||
|
||||
|
||||
def download_csv(s3_client, bucket: str, key: str) -> pd.DataFrame:
|
||||
obj = s3_client.get_object(Bucket=bucket, Key=key)
|
||||
return pd.read_csv(BytesIO(obj["Body"].read()))
|
||||
|
||||
|
||||
@subtask_handler()
|
||||
def handler(body: dict[str, Any], context: Any) -> str:
|
||||
task_id_str: str = body.get("task_id", "")
|
||||
|
||||
if not task_id_str:
|
||||
raise RuntimeError("Missing task_id in message body")
|
||||
|
||||
bucket = S3_BUCKET_NAME
|
||||
if not bucket:
|
||||
raise RuntimeError("S3_BUCKET_NAME env var not set")
|
||||
|
||||
s3 = boto3.client("s3")
|
||||
|
||||
logger.info(f"Combining ara_raw_outputs for task {task_id_str}")
|
||||
|
||||
csv_keys = list_csv_files(s3, bucket, task_id_str)
|
||||
if not csv_keys:
|
||||
raise RuntimeError(f"No CSV files found under ara_raw_outputs/{task_id_str}/")
|
||||
|
||||
logger.info(f"Found {len(csv_keys)} CSV files")
|
||||
|
||||
dfs = [download_csv(s3, bucket, key) for key in csv_keys]
|
||||
combined = pd.concat(dfs, ignore_index=True)
|
||||
logger.info(f"Combined {len(combined)} rows from {len(dfs)} files")
|
||||
|
||||
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S")
|
||||
output_key = f"bulk_final_outputs/{task_id_str}/combined_{timestamp}.csv"
|
||||
|
||||
csv_buffer = BytesIO()
|
||||
combined.to_csv(csv_buffer, index=False)
|
||||
csv_buffer.seek(0)
|
||||
s3.put_object(Bucket=bucket, Key=output_key, Body=csv_buffer.getvalue())
|
||||
|
||||
s3_uri = f"s3://{bucket}/{output_key}"
|
||||
logger.info(f"Saved combined CSV to {s3_uri}")
|
||||
print(f"OUTPUT_S3_URI: {s3_uri}")
|
||||
|
||||
set_combined_csv_s3_uri(UUID(task_id_str), s3_uri)
|
||||
logger.info(f"Persisted combined_csv_s3_uri for task {task_id_str}")
|
||||
|
||||
return s3_uri
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
data "terraform_remote_state" "shared" {
|
||||
backend = "s3"
|
||||
config = {
|
||||
bucket = "assessment-model-terraform-state"
|
||||
key = "env:/${var.stage}/terraform.tfstate"
|
||||
region = "eu-west-2"
|
||||
}
|
||||
}
|
||||
|
||||
data "aws_secretsmanager_secret_version" "db_credentials" {
|
||||
secret_id = "${var.stage}/assessment_model/db_credentials"
|
||||
}
|
||||
|
||||
locals {
|
||||
db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)
|
||||
}
|
||||
|
||||
module "lambda" {
|
||||
source = "../../modules/lambda_with_sqs"
|
||||
|
||||
name = "bulk-address2uprn-combiner"
|
||||
stage = var.stage
|
||||
|
||||
image_uri = local.image_uri
|
||||
|
||||
timeout = 900
|
||||
memory_size = 2048
|
||||
|
||||
maximum_concurrency = var.maximum_concurrency
|
||||
batch_size = var.batch_size
|
||||
|
||||
environment = {
|
||||
STAGE = var.stage
|
||||
LOG_LEVEL = "info"
|
||||
S3_BUCKET_NAME = data.terraform_remote_state.shared.outputs.retrofit_sap_data_bucket_name
|
||||
DB_USERNAME = local.db_credentials.db_assessment_model_username
|
||||
DB_PASSWORD = local.db_credentials.db_assessment_model_password
|
||||
}
|
||||
}
|
||||
|
||||
resource "aws_iam_role_policy_attachment" "bulk_address2uprn_combiner_s3" {
|
||||
role = module.lambda.role_name
|
||||
policy_arn = data.terraform_remote_state.shared.outputs.bulk_address2uprn_combiner_s3_arn
|
||||
}
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
output "bulk_address2uprn_combiner_queue_url" {
|
||||
value = module.lambda.queue_url
|
||||
description = "URL of the bulk_address2uprn_combiner SQS queue"
|
||||
}
|
||||
|
||||
output "bulk_address2uprn_combiner_queue_arn" {
|
||||
value = module.lambda.queue_arn
|
||||
description = "ARN of the bulk_address2uprn_combiner SQS queue"
|
||||
}
|
||||
|
||||
output "bulk_address2uprn_combiner_lambda_arn" {
|
||||
value = module.lambda.lambda_arn
|
||||
description = "ARN of the bulk_address2uprn_combiner Lambda function"
|
||||
}
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
terraform {
|
||||
required_providers {
|
||||
aws = {
|
||||
source = "hashicorp/aws"
|
||||
version = ">= 5.0"
|
||||
}
|
||||
}
|
||||
|
||||
backend "s3" {
|
||||
bucket = "bulk-address2uprn-combiner-terraform-state"
|
||||
key = "terraform.tfstate"
|
||||
region = "eu-west-2"
|
||||
}
|
||||
|
||||
required_version = ">= 1.2.0"
|
||||
}
|
||||
|
|
@ -0,0 +1,38 @@
|
|||
variable "lambda_name" {
|
||||
type = string
|
||||
description = "Logical name of the lambda"
|
||||
}
|
||||
|
||||
variable "stage" {
|
||||
description = "Deployment stage (e.g. dev, prod)"
|
||||
type = string
|
||||
}
|
||||
|
||||
variable "ecr_repo_url" {
|
||||
type = string
|
||||
description = "ECR repository URL (no tag, no digest)"
|
||||
}
|
||||
|
||||
variable "image_digest" {
|
||||
type = string
|
||||
description = "Image digest (sha256:...)"
|
||||
}
|
||||
|
||||
variable "maximum_concurrency" {
|
||||
type = number
|
||||
default = 2
|
||||
description = "Maximum concurrent Lambda invocations from SQS (2-1000)."
|
||||
}
|
||||
|
||||
variable "batch_size" {
|
||||
type = number
|
||||
default = 1
|
||||
}
|
||||
|
||||
locals {
|
||||
image_uri = "${var.ecr_repo_url}@${var.image_digest}"
|
||||
}
|
||||
|
||||
output "resolved_image_uri" {
|
||||
value = local.image_uri
|
||||
}
|
||||
|
|
@ -477,6 +477,34 @@ output "postcode_splitter_s3_read_arn" {
|
|||
value = module.postcode_splitter_s3_read.policy_arn
|
||||
}
|
||||
|
||||
################################################
|
||||
# Bulk Address2UPRN Combiner – Lambda ECR
|
||||
################################################
|
||||
module "bulk_address2uprn_combiner_state_bucket" {
|
||||
source = "../modules/tf_state_bucket"
|
||||
bucket_name = "bulk-address2uprn-combiner-terraform-state"
|
||||
}
|
||||
|
||||
module "bulk_address2uprn_combiner_registry" {
|
||||
source = "../modules/container_registry"
|
||||
name = "bulk_address2uprn_combiner"
|
||||
stage = var.stage
|
||||
}
|
||||
|
||||
module "bulk_address2uprn_combiner_s3" {
|
||||
source = "../modules/s3_iam_policy"
|
||||
|
||||
policy_name = "BulkAddress2UprnCombinerS3"
|
||||
policy_description = "Allow bulk_address2uprn_combiner Lambda to read ara_raw_outputs and write bulk_final_outputs"
|
||||
bucket_arns = ["arn:aws:s3:::retrofit-data-${var.stage}"]
|
||||
actions = ["s3:GetObject", "s3:ListBucket", "s3:PutObject"]
|
||||
resource_paths = ["/ara_raw_outputs/*", "/bulk_final_outputs/*"]
|
||||
}
|
||||
|
||||
output "bulk_address2uprn_combiner_s3_arn" {
|
||||
value = module.bulk_address2uprn_combiner_s3.policy_arn
|
||||
}
|
||||
|
||||
################################################
|
||||
# Categorisation – Lambda ECR
|
||||
################################################
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue