From 0daa1592d7f218d73acab5e02d5b14906f85b6aa Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Tue, 11 Nov 2025 14:06:35 +0000 Subject: [PATCH 01/11] added lambda files --- .devcontainer/Dockerfile | 37 +++++++++ .devcontainer/devcontainer.json | 30 +++++++ .devcontainer/docker-compose.yml | 18 ++++ .devcontainer/post-install.sh | 27 ++++++ .../whlg_calculator/docker/.dockerignore | 21 +++++ .../lambda/whlg_calculator/docker/Dockerfile | 25 ++++++ .../lambda/whlg_calculator/docker/app.py | 3 + .../lambda/whlg_calculator/docker/ecr.tf | 63 ++++++++++++++ .../lambda/whlg_calculator/docker/main.tf | 0 .../lambda/whlg_calculator/docker/provider.tf | 15 ++++ deployment/lambda/whlg_calculator/main.tf | 0 deployment/lambda/whlg_calculator/provider.tf | 15 ++++ deployment/lambda/whlg_calculator/vars.tf | 5 ++ .../lambda/whlg_calculator/whlg_lambda.tf | 83 +++++++++++++++++++ 14 files changed, 342 insertions(+) create mode 100644 .devcontainer/Dockerfile create mode 100644 .devcontainer/devcontainer.json create mode 100644 .devcontainer/docker-compose.yml create mode 100644 .devcontainer/post-install.sh create mode 100644 deployment/lambda/whlg_calculator/docker/.dockerignore create mode 100644 deployment/lambda/whlg_calculator/docker/Dockerfile create mode 100644 deployment/lambda/whlg_calculator/docker/app.py create mode 100644 deployment/lambda/whlg_calculator/docker/ecr.tf create mode 100644 deployment/lambda/whlg_calculator/docker/main.tf create mode 100644 deployment/lambda/whlg_calculator/docker/provider.tf create mode 100644 deployment/lambda/whlg_calculator/main.tf create mode 100644 deployment/lambda/whlg_calculator/provider.tf create mode 100644 deployment/lambda/whlg_calculator/vars.tf create mode 100644 deployment/lambda/whlg_calculator/whlg_lambda.tf diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile new file mode 100644 index 00000000..4d898973 --- /dev/null +++ b/.devcontainer/Dockerfile @@ -0,0 +1,37 @@ +FROM python:3.12-bullseye + +ARG USER=vscode +ARG DEBIAN_FRONTEND=noninteractive + +# 1) Toolchain + utilities for building libpostal +RUN apt-get update && apt-get install -y --no-install-recommends \ + sudo jq vim curl git ca-certificates \ + build-essential pkg-config automake autoconf libtool \ + && rm -rf /var/lib/apt/lists/* + +# 2) Build and install libpostal from source +RUN git clone --depth 1 https://github.com/openvenues/libpostal /tmp/libpostal \ + && cd /tmp/libpostal \ + && ./bootstrap.sh \ + && ./configure --datadir=/usr/local/share/libpostal \ + && make -j"$(nproc)" \ + && make install \ + && ldconfig \ + && rm -rf /tmp/libpostal + +# 3) Create the user and grant sudo privileges +RUN useradd -m -s /usr/bin/bash ${USER} \ + && echo "${USER} ALL=(ALL) NOPASSWD: ALL" >/etc/sudoers.d/${USER} \ + && chmod 0440 /etc/sudoers.d/${USER} + +# 4) Python deps +ENV PIP_NO_CACHE_DIR=1 PIP_DISABLE_PIP_VERSION_CHECK=1 +ADD asset_list/requirements.txt requirements.txt +RUN pip install -r requirements.txt + +# 5) Workdir +WORKDIR /workspaces/model + +# 6) Make Python find your package +# Add project root to PYTHONPATH for all processes +ENV PYTHONPATH=/workspaces/model:${PYTHONPATH} diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 00000000..49bd6f83 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,30 @@ +{ + "name": "Basic Python", + "dockerComposeFile": "docker-compose.yml", + "service": "model", + "remoteUser": "vscode", + "workspaceFolder": "/workspaces/model", + "postStartCommand": "bash .devcontainer/post-install.sh", + "mounts": [ + // Optional, just makes getting from Downloads (local env) easier + "source=${localEnv:HOME},target=/workspaces/home,type=bind" + ], + "customizations": { + "vscode": { + "settings": { + "files.defaultWorkspace": "/workspaces/model" + }, + "extensions": [ + "ms-python.python", + "ms-toolsai.jupyter", + "mechatroner.rainbow-csv", + "ms-toolsai.datawrangler", + "lindacong.vscode-book-reader", + "4ops.terraform", + "fabiospampinato.vscode-todo-plus", + "jgclark.vscode-todo-highlight", + "corentinartaud.pdfpreview" + ] + } + } +} diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml new file mode 100644 index 00000000..7f60d34d --- /dev/null +++ b/.devcontainer/docker-compose.yml @@ -0,0 +1,18 @@ +version: '3.8' + +services: + model: + user: "${UID}:${GID}" + build: + context: .. + dockerfile: .devcontainer/Dockerfile + command: sleep infinity + volumes: + - ..:/workspaces/model + networks: + - model-net + +networks: + model-net: + driver: bridge + diff --git a/.devcontainer/post-install.sh b/.devcontainer/post-install.sh new file mode 100644 index 00000000..d9fc3a9e --- /dev/null +++ b/.devcontainer/post-install.sh @@ -0,0 +1,27 @@ +# #!/bin/bash +# poetry install; + +# # Get the Poetry virtual environment path +# VENV_PATH=$(poetry env info --path 2>/dev/null) + +# if [ -z "$VENV_PATH" ]; then +# echo "No Poetry environment found. Did you run 'poetry install'?" +# exit 1 +# fi + +# # Ensure VS Code settings directory exists +# SETTINGS_DIR="/home/vscode/.vscode-server/data/Machine" +# SETTINGS_FILE="$SETTINGS_DIR/settings.json" + +# mkdir -p "$SETTINGS_DIR" + +# # If settings.json doesn't exist, create a default one +# if [ ! -f "$SETTINGS_FILE" ]; then +# echo "{}" > "$SETTINGS_FILE" +# fi + +# # Update VS Code settings to use the Poetry virtual environment +# jq --arg venv "$VENV_PATH/bin/python" '.["python.defaultInterpreterPath"] = $venv' \ +# "$SETTINGS_FILE" > "$SETTINGS_FILE.tmp" && mv "$SETTINGS_FILE.tmp" "$SETTINGS_FILE" + +# echo "✅ Updated VS Code to use Poetry environment: $VENV_PATH" diff --git a/deployment/lambda/whlg_calculator/docker/.dockerignore b/deployment/lambda/whlg_calculator/docker/.dockerignore new file mode 100644 index 00000000..d587d341 --- /dev/null +++ b/deployment/lambda/whlg_calculator/docker/.dockerignore @@ -0,0 +1,21 @@ +# Ignore junk and large files +*.pdf +*.csv +*.xml +*.parquet +*.ipynb +*.mp4 +*.mov +*.jpg +*.png +*.zip +*.tar.gz +__pycache__/ +*.pyc +*.pyo +*.pyd +build/ +dist/ +.etl_cache/ +tests/ +docs/ diff --git a/deployment/lambda/whlg_calculator/docker/Dockerfile b/deployment/lambda/whlg_calculator/docker/Dockerfile new file mode 100644 index 00000000..cdd1f8a3 --- /dev/null +++ b/deployment/lambda/whlg_calculator/docker/Dockerfile @@ -0,0 +1,25 @@ +FROM public.ecr.aws/lambda/python:3.12 + +# Install Poetry (you could pin a version if you like) +RUN curl -sSL https://install.python-poetry.org | python3 - + +# Add Poetry to PATH +ENV PATH="/root/.local/bin:$PATH" + +# Set working directory +WORKDIR /var/task + +# Copy Poetry files first to leverage Docker layer caching +COPY pyproject.toml poetry.lock README.md ./ +COPY etl/ etl/ + + +# Install dependencies into /var/task +RUN poetry config virtualenvs.create false \ + && poetry install --only main --no-interaction --no-ansi + +# Copy app code +COPY deployment/lambda/extractor_and_loader/docker/app.py ./ + +# Set Lambda handler +CMD ["app.handler"] \ No newline at end of file diff --git a/deployment/lambda/whlg_calculator/docker/app.py b/deployment/lambda/whlg_calculator/docker/app.py new file mode 100644 index 00000000..4dcf1a8e --- /dev/null +++ b/deployment/lambda/whlg_calculator/docker/app.py @@ -0,0 +1,3 @@ +def handler(event, context): + print("Hello and welcome to the WHLG Calculator") + print("Please contact the tech team for implementation") \ No newline at end of file diff --git a/deployment/lambda/whlg_calculator/docker/ecr.tf b/deployment/lambda/whlg_calculator/docker/ecr.tf new file mode 100644 index 00000000..a1501dff --- /dev/null +++ b/deployment/lambda/whlg_calculator/docker/ecr.tf @@ -0,0 +1,63 @@ +# ECR repo +resource "aws_ecr_repository" "whlg_calc_adhoc_ecr" { + name = "whlg_calc_adhoc_ecr" +} + +# ECR policy to allow Lambda access +resource "aws_ecr_repository_policy" "whlg_calc_adhoc_ecr_access" { + repository = aws_ecr_repository.whlg_calc_adhoc_ecr.name + + policy = jsonencode({ + Version = "2008-10-17", + Statement = [{ + Sid = "AllowLambdaPull", + Effect = "Allow", + Principal = { + Service = "lambda.amazonaws.com" + }, + Action = [ + "ecr:GetDownloadUrlForLayer", + "ecr:BatchGetImage", + "ecr:BatchCheckLayerAvailability" + ] + }] + }) +} + + + +# ECR lifecycle policy to delete tagged images older than 14 days +resource "aws_ecr_lifecycle_policy" "whlg_calc_adhoc_loader_lifecycle" { + repository = aws_ecr_repository.whlg_calc_adhoc_ecr.name + + policy = jsonencode({ + "rules": [ + { + "rulePriority": 2, + "description": "Expire images older than 14 days", + "selection": { + "tagStatus": "untagged", + "countType": "sinceImagePushed", + "countUnit": "days", + "countNumber": 1 + }, + "action": { + "type": "expire" + } + }, + { + "rulePriority": 1, + "description": "Keep last 5 images", + "selection": { + "tagStatus": "tagged", + "tagPrefixList": ["feature"], + "countType": "imageCountMoreThan", + "countNumber": 5 + }, + "action": { + "type": "expire" + } + } + ] + }) +} \ No newline at end of file diff --git a/deployment/lambda/whlg_calculator/docker/main.tf b/deployment/lambda/whlg_calculator/docker/main.tf new file mode 100644 index 00000000..e69de29b diff --git a/deployment/lambda/whlg_calculator/docker/provider.tf b/deployment/lambda/whlg_calculator/docker/provider.tf new file mode 100644 index 00000000..5f0fef0f --- /dev/null +++ b/deployment/lambda/whlg_calculator/docker/provider.tf @@ -0,0 +1,15 @@ +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + version = "~> 6.3.0" + } + } + backend "s3" { + bucket = "whlg-calc-tf-state" + region = "eu-west-2" + key = "env:/dev/lambda/ecr/whlg-calc.tfstate" + } + + required_version = ">= 1.2.0" +} diff --git a/deployment/lambda/whlg_calculator/main.tf b/deployment/lambda/whlg_calculator/main.tf new file mode 100644 index 00000000..e69de29b diff --git a/deployment/lambda/whlg_calculator/provider.tf b/deployment/lambda/whlg_calculator/provider.tf new file mode 100644 index 00000000..df9abf1c --- /dev/null +++ b/deployment/lambda/whlg_calculator/provider.tf @@ -0,0 +1,15 @@ +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + version = "~> 6.3.0" + } + } + backend "s3" { + bucket = "whlg-calc-tf-state" + region = "eu-west-2" + key = "env:/dev/lambda/eachlambda/whlg_calc_lambda.tfstate" + } + + required_version = ">= 1.2.0" +} diff --git a/deployment/lambda/whlg_calculator/vars.tf b/deployment/lambda/whlg_calculator/vars.tf new file mode 100644 index 00000000..ecdf359d --- /dev/null +++ b/deployment/lambda/whlg_calculator/vars.tf @@ -0,0 +1,5 @@ +variable "lambda_image_tag" { + description = "Docker image tag (e.g. GitHub SHA)" + type = string + default = "local-dev-latest" +} \ No newline at end of file diff --git a/deployment/lambda/whlg_calculator/whlg_lambda.tf b/deployment/lambda/whlg_calculator/whlg_lambda.tf new file mode 100644 index 00000000..0a5433a9 --- /dev/null +++ b/deployment/lambda/whlg_calculator/whlg_lambda.tf @@ -0,0 +1,83 @@ +# Reference existing IAM role +data "aws_iam_role" "lambda_exec_role" { + name = "lambda-exec-role" +} + +# Reference existing ECR repository +data "aws_ecr_repository" "whlg_calc_adhoc_ecr" { + name = "whlg_calc_adhoc_ecr" +} + +# SQS queue +resource "aws_sqs_queue" "whlg_calc_adhoc_queue" { + name = "whlg_calc_adhoc-queue" + visibility_timeout_seconds = 1800 # 30 minutes (>= 300s and ~6x Lambda timeout) +} + + +# Custom IAM policy specific to lambda_example +resource "aws_iam_policy" "whlg_calc_adhoc_policy" { + name = "walthamforest_adhoc_policy_lambda" + + policy = jsonencode({ + Version = "2012-10-17", + Statement = [ + { + Effect = "Allow", + Action = [ + "sqs:ReceiveMessage", + "sqs:DeleteMessage", + "sqs:GetQueueAttributes", + "sqs:GetQueueUrl", + "sqs:ChangeMessageVisibility" + ], + Resource = aws_sqs_queue.whlg_calc_adhoc_queue.arn + }, + { + Effect = "Allow", + Action = [ + "ecr:GetDownloadUrlForLayer", + "ecr:BatchGetImage", + "ecr:BatchCheckLayerAvailability" + ], + Resource = data.aws_ecr_repository.whlg_calc_adhoc_ecr.arn + }, + { + Effect = "Allow", + Action = ["ecr:GetAuthorizationToken"], + Resource = "*" + } + ] + }) +} + +resource "aws_iam_role_policy_attachment" "whlg_calc_adhoc_policy_attach" { + role = data.aws_iam_role.lambda_exec_role.name + policy_arn = aws_iam_policy.whlg_calc_adhoc_policy.arn +} + +# Lambda function +resource "aws_lambda_function" "whlg_calc_adhoc" { + function_name = "whlg_calc_adhoc" + role = data.aws_iam_role.lambda_exec_role.arn + package_type = "Image" + image_uri = "${data.aws_ecr_repository.whlg_calc_adhoc_ecr.repository_url}:${var.lambda_image_tag}" + # Increase timeout (max 900 sec / 15 min) + # timeout = 300 # e.g. 5 minutes + + # Increase memory (default 128 MB) + memory_size = 2048 # try 1024 or 2048 MB to start + + # environment { + # variables = { + # DATABASE_URL = "postgresql://postgres:makingwarmhomes@terraform-20250331175522503500000002.cdgzupxvdyp0.eu-west-2.rds.amazonaws.com:5432/surveyDB" + # } + # } +} + +# SQS trigger +resource "aws_lambda_event_source_mapping" "whlg_calc_adhoc_trigger" { + event_source_arn = aws_sqs_queue.whlg_calc_adhoc_queue.arn + function_name = aws_lambda_function.whlg_calc_adhoc.arn + batch_size = 1 +} From 91db4fb86c24e1a76fcdfe9af002011794644a7c Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Tue, 11 Nov 2025 14:29:03 +0000 Subject: [PATCH 02/11] lambda --- .../actions/lambda-deploy/action.yml | 86 +++++++++++++++++++ .../actions/terraform-deploy/action.yml | 55 ++++++++++++ .github/workflows/lambda_main.yml | 33 +++++++ 3 files changed, 174 insertions(+) create mode 100644 .github/workflows/actions/lambda-deploy/action.yml create mode 100644 .github/workflows/actions/terraform-deploy/action.yml create mode 100644 .github/workflows/lambda_main.yml diff --git a/.github/workflows/actions/lambda-deploy/action.yml b/.github/workflows/actions/lambda-deploy/action.yml new file mode 100644 index 00000000..3ca0fc8d --- /dev/null +++ b/.github/workflows/actions/lambda-deploy/action.yml @@ -0,0 +1,86 @@ +name: "Build and Push Lambda Image to ECR" +description: "Reusable action for building and pushing lambda Docker image to ECR" + +inputs: + ecr_name: + description: "Lambda name / ECR repo name" + required: true + dockerfile_path: + description: "Path to Dockerfile" + required: true + ecr_tf_dir: + description: "Path to ECR terraform directory" + required: true + lambda_tf_dir: + description: "Path to Lambda terraform directory" + required: true + aws-access-key-id: + description: "AWS access key" + required: true + aws-secret-access-key: + description: "AWS secret key" + required: true + aws-region: + description: "AWS region" + required: true + git-sha: + description: "Git commit SHA" + required: true + git-ref: + description: "Git ref name" + required: true + +runs: + using: "composite" + steps: + - uses: actions/checkout@v4 + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + aws-access-key-id: ${{ inputs.aws-access-key-id }} + aws-secret-access-key: ${{ inputs.aws-secret-access-key }} + aws-region: ${{ inputs.aws-region }} + + - name: Log in to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v2 + + - name: Deploy ECR + uses: ./.github/workflows/actions/terraform-deploy + with: + working_directory: ${{ inputs.ecr_tf_dir }} + aws-access-key-id: ${{ inputs.aws-access-key-id }} + aws-secret-access-key: ${{ inputs.aws-secret-access-key }} + aws-region: ${{ inputs.aws-region }} + - name: Set Docker image tag + id: set_tag + shell: bash + run: | + SHORT_SHA=$(echo "${{ inputs.git-sha }}" | cut -c1-7) + BRANCH=$(echo "${{ inputs.git-ref }}" | tr '/' '-') + TAG="${BRANCH}-${SHORT_SHA}" + echo "IMAGE_TAG=${TAG}" >> $GITHUB_ENV + echo "tag=$TAG" >> $GITHUB_OUTPUT + + - name: Build and push Docker image + shell: bash + run: | + IMAGE_URI=${{ steps.login-ecr.outputs.registry }}/${{ inputs.ecr_name }}:${{ steps.set_tag.outputs.tag }} + echo "Building Docker image for ${{ inputs.ecr_name }}..." + docker build -t $IMAGE_URI -f ${{ inputs.dockerfile_path }} . + + echo "Pushing to ECR..." + docker push $IMAGE_URI + + - name: Deploy Lambda + uses: ./.github/workflows/actions/terraform-deploy + with: + working_directory: ${{ inputs.lambda_tf_dir }} + aws-access-key-id: ${{ inputs.aws-access-key-id }} + aws-secret-access-key: ${{ inputs.aws-secret-access-key }} + aws-region: ${{ inputs.aws-region }} + lambda-image-tag: ${{ steps.set_tag.outputs.tag }} + + + diff --git a/.github/workflows/actions/terraform-deploy/action.yml b/.github/workflows/actions/terraform-deploy/action.yml new file mode 100644 index 00000000..56133299 --- /dev/null +++ b/.github/workflows/actions/terraform-deploy/action.yml @@ -0,0 +1,55 @@ +name: "Terraform Plan Shared Config" +description: "Plans shared Terraform config for Lambdas" + +inputs: + working_directory: + description: "Directory containing Terraform config" + required: true + aws-access-key-id: + description: "AWS access key" + required: true + aws-secret-access-key: + description: "AWS secret key" + required: true + aws-region: + description: "AWS region" + required: true + lambda-image-tag: + description: "Tag of the Lambda image (e.g., GitHub SHA)" + required: false + +runs: + using: "composite" + steps: + - uses: actions/checkout@v4 + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + aws-access-key-id: ${{ inputs.aws-access-key-id }} + aws-secret-access-key: ${{ inputs.aws-secret-access-key }} + aws-region: ${{ inputs.aws-region }} + + - name: Setup Terraform + uses: hashicorp/setup-terraform@v3 + + - name: Terraform Init + working-directory: ${{ inputs.working_directory }} + shell: bash + run: terraform init -reconfigure + + - name: Terraform Plan + working-directory: ${{ inputs.working_directory }} + shell: bash + run: | + if [ -n "${{ inputs.lambda-image-tag }}" ]; then + terraform plan -out=tfplan -var="lambda_image_tag=${{ inputs.lambda-image-tag }}" + else + terraform plan -out=tfplan + fi + + - name: Terraform Apply + working-directory: ${{ inputs.working_directory }} + shell: bash + run: terraform apply -auto-approve tfplan + diff --git a/.github/workflows/lambda_main.yml b/.github/workflows/lambda_main.yml new file mode 100644 index 00000000..73645ac5 --- /dev/null +++ b/.github/workflows/lambda_main.yml @@ -0,0 +1,33 @@ +# Please note, this github work flows assumes that shared-terrform is deployed in aws env +# The shared-terraform files lives in https://github.com/Hestia-Homes/survey-extraction/tree/main/deployment/lambda/lambda_shared + +name: Deploy Lambdas +on: + push: + branches: [main, feautre/whlg_lambda] + +env: + AWS_REGION: eu-west-2 + +jobs: + whlg-calc: + runs-on: ubuntu-latest + permissions: + id-token: write + contents: read + + steps: + - name: Checkout repo + uses: actions/checkout@v4 + - name: Build and deploy Warm Homes Local Grant Calc (whlg-calc) + uses: ./.github/workflows/actions/lambda-deploy + with: + ecr_name: whlg_calc_adhoc_ecr + dockerfile_path: ./deployment/lambda/whlg_calculator/docker/Dockerfile + ecr_tf_dir: ./deployment/lambda/whlg_calculator/docker/ + lambda_tf_dir: ./deployment/lambda/whlg_calculator/ + aws-access-key-id: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY}} + aws-region: eu-west-2 + git-sha: ${{ github.sha }} + git-ref: ${{ github.ref_name }} \ No newline at end of file From f10473cbf3952ac76ab3e30b977aa290bb2b20ff Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Tue, 11 Nov 2025 14:30:11 +0000 Subject: [PATCH 03/11] wrong branch --- .github/workflows/lambda_main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/lambda_main.yml b/.github/workflows/lambda_main.yml index 73645ac5..960adbe5 100644 --- a/.github/workflows/lambda_main.yml +++ b/.github/workflows/lambda_main.yml @@ -4,7 +4,7 @@ name: Deploy Lambdas on: push: - branches: [main, feautre/whlg_lambda] + branches: [main, feature/whlg_lambda] env: AWS_REGION: eu-west-2 From 91276919be422224e865d6c2a3121a7cb79080ad Mon Sep 17 00:00:00 2001 From: Khalim Conn-Kowlessar Date: Thu, 13 Nov 2025 13:50:38 +0000 Subject: [PATCH 04/11] Adding template for new routes --- backend/app/whlg/__init__.py | 0 backend/app/whlg/route.py | 47 ++ backend/app/whlg/schema.py | 0 backend/tests/test_integration.py | 1061 ++++++++++++++--------------- 4 files changed, 577 insertions(+), 531 deletions(-) create mode 100644 backend/app/whlg/__init__.py create mode 100644 backend/app/whlg/route.py create mode 100644 backend/app/whlg/schema.py diff --git a/backend/app/whlg/__init__.py b/backend/app/whlg/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/app/whlg/route.py b/backend/app/whlg/route.py new file mode 100644 index 00000000..21d417c5 --- /dev/null +++ b/backend/app/whlg/route.py @@ -0,0 +1,47 @@ +import boto3 +import json +import math +import asyncio +import random + +from datetime import datetime + +from fastapi import APIRouter, Depends +from backend.app.dependencies import validate_token +from backend.app.plan.schemas import PlanTriggerRequest +from backend.app.config import get_settings +from sqlalchemy.orm import sessionmaker +from utils.logger import setup_logger +from backend.app.db.connection import db_engine + +from backend.app.db.functions.recommendations_functions import create_scenario + +logger = setup_logger() + +router = APIRouter( + prefix="/whlg", + tags=["whlg"], + dependencies=[Depends(validate_token)], + responses={404: {"description": "Not found"}} +) + + +@router.post("/") +async def whlg_entrypoint(body): + # body needs to include postcode, UPRN [task ID?] + # + # Refer to the plan trigger route for code + # 1) Create an event schema and store it in the schemas file + # 2) Build the tasks functions + # 3) Read in the funding csx. This can be found as such: + # whlg_eligible_postcodes = read_csv_from_s3( + # bucket_name=get_settings().DATA_BUCKET, + # filepath="funding/whlg eligible postcodes.csv", + # ) + # whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes) + # Check the postcode against this file + # We need to store this somewhere????!!!??!??!?!?!?!??!??!??!??!??!??!??!??!??!??! Create a new table! + # Update subtask to be complete + # Once this is complete, build the logs stuff, add the cloudwatch logs ID to the database + + print("We're gonna do stuff!") diff --git a/backend/app/whlg/schema.py b/backend/app/whlg/schema.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/tests/test_integration.py b/backend/tests/test_integration.py index 60778132..1ba80223 100644 --- a/backend/tests/test_integration.py +++ b/backend/tests/test_integration.py @@ -1,532 +1,531 @@ -# import ast -# import json -# from copy import deepcopy -# from dataclasses import replace -# from datetime import datetime -# -# import random -# from tqdm import tqdm -# import pandas as pd -# import numpy as np -# from etl.epc.Record import EPCRecord -# from backend.SearchEpc import SearchEpc -# from sqlalchemy.exc import IntegrityError, OperationalError -# from sqlalchemy.orm import sessionmaker -# from starlette.responses import Response -# -# from backend.app.config import get_settings, get_prediction_buckets -# from backend.app.db.connection import db_engine -# from backend.app.db.functions.materials_functions import get_materials -# from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations -# from backend.app.db.functions.property_functions import ( -# create_property, create_property_details_epc, create_property_targets, update_property_data, -# update_or_create_property_spatial_details -# ) -# from backend.app.db.functions.recommendations_functions import ( -# create_plan, upload_recommendations, create_scenario -# ) -# from backend.app.db.functions.funding_functions import upload_funding -# from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn -# from backend.app.db.models.portfolio import rating_lookup -# from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES -# from backend.app.plan.utils import get_cleaned -# from backend.app.utils import sap_to_epc -# import backend.app.assumptions as assumptions -# -# from backend.ml_models.api import ModelApi -# from backend.Property import Property -# from backend.apis.GoogleSolarApi import GoogleSolarApi -# -# from recommendations.optimiser.CostOptimiser import CostOptimiser -# from recommendations.optimiser.GainOptimiser import GainOptimiser -# import recommendations.optimiser.optimiser_functions as optimiser_functions -# from recommendations.Recommendations import Recommendations -# from utils.logger import setup_logger -# from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3 -# from backend.ml_models.Valuation import PropertyValuation -# -# from etl.bill_savings.KwhData import KwhData -# from etl.spatial.OpenUprnClient import OpenUprnClient -# from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc -# -# from backend.Funding import Funding -# from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths -# from recommendations.recommendation_utils import convert_thickness_to_numeric, get_wall_u_value -# -# # Input data (temp) -# import pickle -# -# import pandas as pd -# -# with open("local_data_for_deletion.pkl", 'rb') as f: -# local_data = pickle.load(f) -# -# cleaning_data = local_data["cleaning_data"] -# materials = local_data["materials"] -# cleaned = local_data["cleaned"] -# project_scores_matrix = local_data["project_scores_matrix"] -# partial_project_scores_matrix = local_data["partial_project_scores_matrix"] -# whlg_eligible_postcodes = local_data["whlg_eligible_postcodes"] -# -# with open("kwh_client_for_deletion.pkl", "rb") as f: -# kwh_client = pickle.load(f) -# -# epc_data = pd.read_csv( -# "/Users/khalimconn-kowlessar/Downloads/all-domestic-certificates/domestic-E06000002-Middlesbrough/certificates -# .csv", -# low_memory=False -# ) -# -# # TODO: Store this for cleaning -# costs_by_floor_area = epc_data[ -# pd.to_datetime(epc_data["LODGEMENT_DATE"]) >= "2024-01-01" -# ][["TOTAL_FLOOR_AREA", "CURRENT_ENERGY_EFFICIENCY", "LIGHTING_COST_CURRENT", "HEATING_COST_CURRENT", -# "HOT_WATER_COST_CURRENT"]].copy() -# -# costs_by_floor_area.columns = [c.lower().replace("_", "-") for c in costs_by_floor_area.columns] -# for c in ["lighting-cost-current", "heating-cost-current", "hot-water-cost-current"]: -# costs_by_floor_area[c + "_scaled"] = costs_by_floor_area[c] / costs_by_floor_area["total-floor-area"] -# -# costs_by_floor_area = costs_by_floor_area.groupby("current-energy-efficiency")[ -# ["lighting-cost-current_scaled", "heating-cost-current_scaled", "hot-water-cost-current_scaled"] -# ].mean().reset_index() -# -# sample_epc_data = epc_data[pd.to_datetime(epc_data["LODGEMENT_DATE"]) >= "2015-01-01"].drop_duplicates("UPRN").sample( -# 1000).reset_index(drop=True) -# -# # TODO: In Property find_energy_sources, sort out biomass community heating - what fuel type -# # TODO: We might be able to remove find_energy_sources entirely and remove estimate_electrical_consumption. It's used -# # in the google solar api but is it really needed? I don't think it's super accurate. It might be better to -# # just use an average energy consumption by floor area for UK households? -# # Load the input properties -# input_properties = [] -# for row_id, config in tqdm(sample_epc_data.iterrows(), total=len(sample_epc_data)): -# epc = { -# k.lower().replace("_", "-"): v if not pd.isnull(v) else None for k, v in config.items() -# } -# # Avoid the data load inside of EPCRecord - something we should pull out -# for x in ["number-habitable-rooms", "floor-height", "number-heated-rooms"]: -# if pd.isnull(epc[x]): -# if x == "floor-height": -# epc[x] = 2.4 -# if x == "number-habitable-rooms": -# epc[x] = 3 -# if x == "number-heated-rooms": -# epc[x] = 3 -# -# epc_records = {'original_epc': epc, 'full_sap_epc': {}, 'old_data': []} -# -# prepared_epc = EPCRecord( -# epc_records=epc_records, -# run_mode="newdata", -# cleaning_data=cleaning_data, -# ) -# -# input_properties.append( -# Property( -# id=row_id, -# is_new=True, -# address=epc["address"], -# postcode=epc["postcode"], -# epc_record=prepared_epc, -# already_installed={}, -# property_valuation={}, -# non_invasive_recommendations=[], -# energy_assessment=None, -# **Property.extract_kwargs(config), # TODO: Depraecate this -# ) -# ) -# -# # For each property, insert the default solar configuration -# for p in tqdm(input_properties): -# solar_api = GoogleSolarApi( -# api_key=None, solar_materials=[m for m in materials if m["type"] == "solar_pv"], max_retries=5 -# ) -# panel_performance = solar_api.default_panel_performance(property_instance=p) -# p.set_solar_panel_configuration( -# solar_panel_configuration={ -# "insights_data": None, "panel_performance": panel_performance, "unit_share_of_energy": 1 -# }, -# ) -# -# # We mock kwh preds -# mocked_kwh_predictions = {"heating_kwh_predictions": [], "hotwater_kwh_predictions": []} -# for p in tqdm(input_properties): -# mocked_kwh_predictions["heating_kwh_predictions"].append({ -# "id": p.uprn, "predictions": random.sample(range(100, 3000), 1)[0] -# }) -# mocked_kwh_predictions["hotwater_kwh_predictions"].append({ -# "id": p.uprn, "predictions": random.sample(range(100, 3000), 1)[0] -# }) -# mocked_kwh_predictions["heating_kwh_predictions"] = pd.DataFrame(mocked_kwh_predictions["heating_kwh_predictions"]) -# mocked_kwh_predictions["hotwater_kwh_predictions"] = pd.DataFrame(mocked_kwh_predictions["hotwater_kwh_predictions"]) -# -# # TODO: We might want to implement this generally, via an ETL process +import ast +import json +from copy import deepcopy +from dataclasses import replace +from datetime import datetime + +import random +from tqdm import tqdm +import pandas as pd +import numpy as np +from etl.epc.Record import EPCRecord +from backend.SearchEpc import SearchEpc +from sqlalchemy.exc import IntegrityError, OperationalError +from sqlalchemy.orm import sessionmaker +from starlette.responses import Response + +from backend.app.config import get_settings, get_prediction_buckets +from backend.app.db.connection import db_engine +from backend.app.db.functions.materials_functions import get_materials +from backend.app.db.functions.portfolio_functions import aggregate_portfolio_recommendations +from backend.app.db.functions.property_functions import ( + create_property, create_property_details_epc, create_property_targets, update_property_data, + update_or_create_property_spatial_details +) +from backend.app.db.functions.recommendations_functions import ( + create_plan, upload_recommendations, create_scenario +) +from backend.app.db.functions.funding_functions import upload_funding +from backend.app.db.functions.energy_assessment_functions import get_latest_assessment_by_uprn +from backend.app.db.models.portfolio import rating_lookup +from backend.app.plan.schemas import PlanTriggerRequest, WALL_INSULATION_MEASURES, ROOF_INSULATION_MEASURES +from backend.app.plan.utils import get_cleaned +from backend.app.utils import sap_to_epc +import backend.app.assumptions as assumptions + +from backend.ml_models.api import ModelApi +from backend.Property import Property +from backend.apis.GoogleSolarApi import GoogleSolarApi + +from recommendations.optimiser.CostOptimiser import CostOptimiser +from recommendations.optimiser.GainOptimiser import GainOptimiser +import recommendations.optimiser.optimiser_functions as optimiser_functions +from recommendations.Recommendations import Recommendations +from utils.logger import setup_logger +from utils.s3 import read_dataframe_from_s3_parquet, read_csv_from_s3, read_excel_from_s3 +from backend.ml_models.Valuation import PropertyValuation + +from etl.bill_savings.KwhData import KwhData +from etl.spatial.OpenUprnClient import OpenUprnClient +from etl.find_my_epc.RetrieveFindMyEpc import RetrieveFindMyEpc + +from backend.Funding import Funding +from recommendations.optimiser.funding_optimiser import optimise_with_funding_paths +from recommendations.recommendation_utils import convert_thickness_to_numeric, get_wall_u_value + +# Input data (temp) +import pickle + +import pandas as pd + +with open("local_data_for_deletion.pkl", 'rb') as f: + local_data = pickle.load(f) + +cleaning_data = local_data["cleaning_data"] +materials = local_data["materials"] +cleaned = local_data["cleaned"] +project_scores_matrix = local_data["project_scores_matrix"] +partial_project_scores_matrix = local_data["partial_project_scores_matrix"] +whlg_eligible_postcodes = local_data["whlg_eligible_postcodes"] + +with open("kwh_client_for_deletion.pkl", "rb") as f: + kwh_client = pickle.load(f) + +epc_data = pd.read_csv( + "/Users/khalimconn-kowlessar/Downloads/domestic-E06000002-Middlesbrough/certificates.csv", + low_memory=False +) + +# TODO: Store this for cleaning +costs_by_floor_area = epc_data[ + pd.to_datetime(epc_data["LODGEMENT_DATE"]) >= "2024-01-01" + ][["TOTAL_FLOOR_AREA", "CURRENT_ENERGY_EFFICIENCY", "LIGHTING_COST_CURRENT", "HEATING_COST_CURRENT", + "HOT_WATER_COST_CURRENT"]].copy() + +costs_by_floor_area.columns = [c.lower().replace("_", "-") for c in costs_by_floor_area.columns] +for c in ["lighting-cost-current", "heating-cost-current", "hot-water-cost-current"]: + costs_by_floor_area[c + "_scaled"] = costs_by_floor_area[c] / costs_by_floor_area["total-floor-area"] + +costs_by_floor_area = costs_by_floor_area.groupby("current-energy-efficiency")[ + ["lighting-cost-current_scaled", "heating-cost-current_scaled", "hot-water-cost-current_scaled"] +].mean().reset_index() + +sample_epc_data = epc_data[pd.to_datetime(epc_data["LODGEMENT_DATE"]) >= "2015-01-01"].drop_duplicates("UPRN").sample( + 10000).reset_index(drop=True) + +# TODO: In Property find_energy_sources, sort out biomass community heating - what fuel type +# TODO: We might be able to remove find_energy_sources entirely and remove estimate_electrical_consumption. It's used +# in the google solar api but is it really needed? I don't think it's super accurate. It might be better to +# just use an average energy consumption by floor area for UK households? +# Load the input properties +input_properties = [] +for row_id, config in tqdm(sample_epc_data.iterrows(), total=len(sample_epc_data)): + epc = { + k.lower().replace("_", "-"): v if not pd.isnull(v) else None for k, v in config.items() + } + # Avoid the data load inside of EPCRecord - something we should pull out + for x in ["number-habitable-rooms", "floor-height", "number-heated-rooms"]: + if pd.isnull(epc[x]): + if x == "floor-height": + epc[x] = 2.4 + if x == "number-habitable-rooms": + epc[x] = 3 + if x == "number-heated-rooms": + epc[x] = 3 + + epc_records = {'original_epc': epc, 'full_sap_epc': {}, 'old_data': []} + + prepared_epc = EPCRecord( + epc_records=epc_records, + run_mode="newdata", + cleaning_data=cleaning_data, + ) + + input_properties.append( + Property( + id=row_id, + is_new=True, + address=epc["address"], + postcode=epc["postcode"], + epc_record=prepared_epc, + already_installed={}, + property_valuation={}, + non_invasive_recommendations=[], + energy_assessment=None, + **Property.extract_kwargs(config), # TODO: Depraecate this + ) + ) + +# For each property, insert the default solar configuration +for p in tqdm(input_properties): + solar_api = GoogleSolarApi( + api_key=None, solar_materials=[m for m in materials if m["type"] == "solar_pv"], max_retries=5 + ) + panel_performance = solar_api.default_panel_performance(property_instance=p) + p.set_solar_panel_configuration( + solar_panel_configuration={ + "insights_data": None, "panel_performance": panel_performance, "unit_share_of_energy": 1 + }, + ) + +# We mock kwh preds +mocked_kwh_predictions = {"heating_kwh_predictions": [], "hotwater_kwh_predictions": []} +for p in tqdm(input_properties): + mocked_kwh_predictions["heating_kwh_predictions"].append({ + "id": p.uprn, "predictions": random.sample(range(100, 3000), 1)[0] + }) + mocked_kwh_predictions["hotwater_kwh_predictions"].append({ + "id": p.uprn, "predictions": random.sample(range(100, 3000), 1)[0] + }) +mocked_kwh_predictions["heating_kwh_predictions"] = pd.DataFrame(mocked_kwh_predictions["heating_kwh_predictions"]) +mocked_kwh_predictions["hotwater_kwh_predictions"] = pd.DataFrame(mocked_kwh_predictions["hotwater_kwh_predictions"]) + +# TODO: We might want to implement this generally, via an ETL process +for p in input_properties: + for col in ["lighting-cost-current", "heating-cost-current", "hot-water-cost-current"]: + if pd.isnull(p.data[col]): + min_diff = abs( + (costs_by_floor_area["current-energy-efficiency"] - p.data["current-energy-efficiency"]) + ).min() + df = costs_by_floor_area[ + abs((costs_by_floor_area["current-energy-efficiency"] - p.data[ + "current-energy-efficiency"])) == min_diff + ] + if df.shape[0] > 1: + df = df.head(1) + p.data[col] = (df[col + "_scaled"] * p.data["total-floor-area"]).values[0] + +[ + p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=mocked_kwh_predictions) for p in + input_properties +] # for p in input_properties: -# for col in ["lighting-cost-current", "heating-cost-current", "hot-water-cost-current"]: -# if pd.isnull(p.data[col]): -# min_diff = abs( -# (costs_by_floor_area["current-energy-efficiency"] - p.data["current-energy-efficiency"]) -# ).min() -# df = costs_by_floor_area[ -# abs((costs_by_floor_area["current-energy-efficiency"] - p.data[ -# "current-energy-efficiency"])) == min_diff -# ] -# if df.shape[0] > 1: -# df = df.head(1) -# p.data[col] = (df[col + "_scaled"] * p.data["total-floor-area"]).values[0] -# -# [ -# p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=mocked_kwh_predictions) for p in -# input_properties -# ] -# # for p in input_properties: -# # p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=mocked_kwh_predictions) -# -# # Run the recommendations -# recommendations = {} -# recommendations_scoring_data = [] -# representative_recommendations = {} -# for p in tqdm(input_properties): -# if p.data["property-type"] == "House" and pd.isnull(p.data["built-form"]): -# p.data["built-form"] = "Semi-Detached" -# recommender = Recommendations( -# property_instance=p, -# materials=materials, -# exclusions=[], -# inclusions=[], -# default_u_values=True -# ) -# property_recommendations, property_representative_recommendations = recommender.recommend() -# -# if not property_recommendations: -# continue -# -# recommendations[p.id] = property_recommendations -# representative_recommendations[p.id] = property_representative_recommendations -# -# p.create_base_difference_epc_record(cleaned_lookup=cleaned) -# p.adjust_difference_record_with_recommendations( -# property_recommendations, property_representative_recommendations -# ) -# -# recommendations_scoring_data.extend(p.recommendations_scoring_data) -# -# recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data) -# recommendations_scoring_data = recommendations_scoring_data.drop( -# columns=[ -# "rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending", -# "carbon_ending" -# ] -# ) -# -# model_predictions_mocked = { -# "sap_change_predictions": None, -# "heat_demand_predictions": None, -# "carbon_change_predictions": None, -# "heating_kwh_predictions": None, -# "hotwater_kwh_predictions": None, -# } -# -# for k in model_predictions_mocked.keys(): -# model_predictions_mocked[k] = recommendations_scoring_data[["id"]].copy() -# model_predictions_mocked[k][['property_id', 'recommendation_id']] = ( -# model_predictions_mocked[k]['id'].str.split('+', expand=True) -# ) -# model_predictions_mocked[k]['phase'] = model_predictions_mocked[k]['recommendation_id'].apply( -# ModelApi.extract_phase) -# -# if k in ["heating_kwh_predictions", "hotwater_kwh_predictions"]: -# model_predictions_mocked[k]["predictions"] = random.choices(range(100, 3000), -# k=len(recommendations_scoring_data)) -# continue -# -# model_predictions_mocked[k] = model_predictions_mocked[k].sort_values(["property_id", "phase"], ascending=True) -# preds = [] -# for p_id in model_predictions_mocked[k]["property_id"].unique(): -# # We add some amount each time -# p = [p for p in input_properties if str(p.id) == p_id][0] -# if k == "sap_change_predictions": -# start = p.data["current-energy-efficiency"] -# elif k == "heat_demand_predictions": -# start = p.data["energy-consumption-current"] -# else: -# start = p.data["co2-emissions-current"] -# df = model_predictions_mocked[k][model_predictions_mocked[k]["property_id"] == p_id].copy() -# # Add some amount each time -# to_add = random.choices(range(0, 15), k=len(df)) -# to_add = np.cumsum(to_add) -# df["predictions"] = start + to_add -# preds.append(df) -# preds = pd.concat(preds) -# model_predictions_mocked[k] = preds -# -# for property_id in tqdm(recommendations.keys(), total=len(recommendations)): -# property_instance = [p for p in input_properties if p.id == property_id][0] -# -# recommendations_with_impact, impact_summary = ( -# Recommendations.calculate_recommendation_impact( -# property_instance=property_instance, -# all_predictions=model_predictions_mocked, -# recommendations=recommendations, -# representative_recommendations=representative_recommendations -# ) -# ) -# -# # We use the impact_summary to update the simulation_epcs with the new SAP, heat demand, carbon, cost etc -# # at each phase -# property_instance.update_simulation_epcs(impact_summary) -# recommendations[property_id] = recommendations_with_impact -# -# for property_id in tqdm([p.id for p in input_properties]): -# property_recommendations = recommendations.get(property_id, []) -# property_instance = [p for p in input_properties if p.id == property_id][0] -# -# property_current_energy_bill = ( -# Recommendations.calculate_recommendation_tenant_savings( -# property_instance=property_instance, -# kwh_simulation_predictions=model_predictions_mocked, -# property_recommendations=property_recommendations, -# ashp_cop=2.8 -# ) -# ) -# property_instance.current_energy_bill = property_current_energy_bill -# -# body = PlanTriggerRequest( -# **{'budget': None, 'goal': 'Increasing EPC', 'housing_type': 'Social', 'goal_value': 'B', 'portfolio_id': 0, -# 'trigger_file_path': '', 'already_installed_file_path': '', -# 'patches_file_path': None, 'non_invasive_recommendations_file_path': None, -# 'valuation_file_path': '', -# 'required_measures': [], 'scenario_name': 'EPC B', 'scenario_id': None, -# 'multi_plan': True, 'optimise': True, 'default_u_values': True, 'ashp_cop': 2.8, -# 'event_type': 'remote_assessment', 'simulate_sap_10': False, 'file_type': None, 'file_format': None, -# 'sheet_name': None, 'sheet_count': None, 'index_start': None, 'index_end': None} -# ) -# -# for p in tqdm(input_properties): -# if not recommendations.get(p.id): -# continue -# -# # we need to double unlist because we have a list of lists -# property_measure_types = {rec["type"] for recs in recommendations[p.id] for rec in recs} -# property_required_measures = [m for m in recommendations[p.id] if m[0]["type"] in body.required_measures] -# measures_to_optimise = [m for m in recommendations[p.id] if m[0]["type"] not in body.required_measures] -# -# # If a measure requiring ventilation is selected, and the property does not have ventilation, we enfore -# # its inclusion -# needs_ventilation = any( -# x in property_measure_types for x in assumptions.measures_needing_ventilation -# ) and not p.has_ventilation -# -# if not measures_to_optimise: -# # Nothing to do, we just reshape the recommendations -# recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults( -# p.id, recommendations, set() -# ) -# continue -# -# fixed_gain = optimiser_functions.calculate_fixed_gain( -# property_required_measures, recommendations, p, needs_ventilation -# ) -# gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain) -# -# funding = Funding( -# tenure="Social", -# project_scores_matrix=project_scores_matrix, -# partial_project_scores_matrix=partial_project_scores_matrix, -# whlg_eligible_postcodes=whlg_eligible_postcodes, -# eco4_social_cavity_abs_rate=12.5, -# eco4_social_solid_abs_rate=17, -# eco4_private_cavity_abs_rate=12.5, -# eco4_private_solid_abs_rate=17, -# gbis_social_cavity_abs_rate=21, -# gbis_social_solid_abs_rate=25, -# gbis_private_cavity_abs_rate=21, -# gbis_private_solid_abs_rate=28, -# ) -# -# li_thickness = convert_thickness_to_numeric( -# p.roof["insulation_thickness"], p.roof["is_pitched"], p.roof["is_flat"] -# ) -# current_wall_u_value = p.walls["thermal_transmittance"] -# if current_wall_u_value is None: -# current_wall_u_value = get_wall_u_value( -# clean_description=p.walls["clean_description"], -# age_band=p.age_band, -# is_granite_or_whinstone=p.walls["is_granite_or_whinstone"], -# is_sandstone_or_limestone=p.walls["is_sandstone_or_limestone"], -# ) -# -# # We insert the innovation uplift -# measures_to_optimise_with_uplift = deepcopy(measures_to_optimise) -# -# # TODO: Turn this into a function and store the innovaiton uplift -# for group in measures_to_optimise_with_uplift: -# for r in group: -# -# if r["type"] in ["mechanical_ventilation", "low_energy_lighting", "secondary_heating", -# "extension_cavity_wall_insulation", "draught_proofing", "sealing_open_fireplace"]: -# ( -# r["partial_project_score"], -# r["partial_project_funding"], -# r["innovation_uplift"], -# r["uplift_project_score"], -# ) = ( -# 0, 0, 0, 0 -# ) -# continue -# -# ( -# r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"], -# r["uplift_project_score"] -# ) = funding.get_innovation_uplift( -# measure=r, -# starting_sap=p.data["current-energy-efficiency"], -# floor_area=p.floor_area, -# is_cavity=p.walls["is_cavity_wall"], -# current_wall_uvalue=current_wall_u_value, -# is_partial="partial" in p.walls["clean_description"].lower(), -# existing_li_thickness=li_thickness, -# mainheating=p.main_heating, -# main_fuel=p.main_fuel, -# mainheat_energy_eff=p.data["mainheat-energy-eff"], -# ) -# -# input_measures = optimiser_functions.prepare_input_measures( -# measures_to_optimise_with_uplift, body.goal, needs_ventilation, funding=True -# ) -# -# # When the goal is Increasing EPC, we can run the funding optimiser -# if body.goal == "Increasing EPC": -# -# solutions = optimise_with_funding_paths( -# p=p, -# input_measures=input_measures, -# housing_type=body.housing_type, -# budget=body.budget, -# target_gain=gain, -# funding=funding -# ) -# -# # Given the solutions we select the optimal one -# solutions["cost_less_full_project_funding"] = np.where( -# solutions["scheme"] == "eco4", -# solutions["total_cost"] - solutions["full_project_funding"] - solutions["total_uplift"], -# solutions["total_cost"] - solutions["partial_project_funding"] - solutions["total_uplift"] -# ) -# -# solutions["cost_less_full_project_funding"] = ( -# solutions["total_cost"] - solutions["full_project_funding"] - solutions["total_uplift"] -# ) -# solutions = solutions.sort_values("cost_less_full_project_funding", ascending=True) -# -# if solutions["meets_upgrade_target"].any(): -# # If we have a solution that meets the upgrade target, we select that one -# optimal_solution = solutions[solutions["meets_upgrade_target"]].iloc[0] -# else: -# # Pick the cheapest -# optimal_solution = solutions.iloc[0] -# -# # This is the list of measures that we will recommend -# scheme = optimal_solution["scheme"] -# funded_measures = optimal_solution["items"] if scheme != "none" else [] -# solution = optimal_solution["items"] + optimal_solution["unfunded_items"] -# # This is the total amount of funding that the project will produce (including uplifts) (£) -# project_funding = optimal_solution["full_project_funding"] if scheme == "eco4" else \ -# optimal_solution["partial_project_funding"] -# # This is the total amount of funding associated to the uplift (£) -# total_uplift = optimal_solution["total_uplift"] -# # This is the funding scheme selected -# # This is the full project ABS -# full_project_score = optimal_solution["project_score"] -# # This is the partial project ABS -# partial_project_score = optimal_solution["partial_project_score"] -# # This is the uplift score ABS -# uplift_project_score = optimal_solution["total_uplift_score"] -# else: -# # We optimise and then we determine eligibility for funding, based on the measures selected -# optimiser = ( -# GainOptimiser( -# input_measures, max_cost=body.budget, max_gain=gain, allow_slack=False -# ) if body.budget else CostOptimiser(input_measures, min_gain=gain) -# ) -# optimiser.setup() -# optimiser.solve() -# solution = optimiser.solution -# -# recommendation_types = [] -# for measures in input_measures: -# for measure in measures: -# recommendation_types.append(measure["type"]) -# recommendation_types = set(recommendation_types) -# -# has_wall_insulation_recommendation = any( -# (m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in -# WALL_INSULATION_MEASURES -# ) -# has_roof_insulation_recommendation = any( -# (m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in -# ROOF_INSULATION_MEASURES -# ) -# -# funding.check_funding( -# measures=solution, -# starting_sap=p.data["current-energy-efficiency"], -# ending_sap=p.data["current-energy-efficiency"] + sum([x["gain"] for x in solution]), -# floor_area=p.floor_area, -# mainheat_description=p.main_heating["clean_description"], -# heating_control_description=p.main_heating_controls["clean_description"], -# is_cavity=p.walls["is_cavity_wall"], -# current_wall_uvalue=current_wall_u_value, -# is_partial="partial" in p.walls["clean_description"].lower(), -# existing_li_thickness=li_thickness, -# mainheating=p.main_heating, -# main_fuel=p.main_fuel, -# mainheat_energy_eff=p.data["mainheat-energy-eff"], -# has_wall_insulation_recommendation=has_wall_insulation_recommendation, -# has_roof_insulation_recommendation=has_roof_insulation_recommendation, -# ) -# -# # Determine the scheme -# scheme = "none" -# if funding.eco4_eligible: -# scheme = "eco4" -# if scheme == "none" and funding.gbis_eligible: -# scheme = "gbis" -# -# funded_measures = solution if scheme in ["gbis", "eco4"] else [] -# project_funding = 0 if funding.full_project_abs is not None else funding.full_project_abs -# total_uplift = funding.eco4_uplift -# full_project_score = 0 if funding.full_project_abs is not None else funding.full_project_abs -# partial_project_score = funding.partial_project_abs -# uplift_project_score = funding.eco4_uplift if scheme == "eco4" else funding.gbis_uplift -# -# selected = {r["id"] for r in solution} -# -# if property_required_measures: -# solution = optimiser_functions.add_required_measures( -# property_id=p.id, property_required_measures=property_required_measures, -# recommendations=recommendations, selected=selected, -# ) -# -# # Add best practice measures (ventilation/trickle vents) -# selected = optimiser_functions.add_best_practice_measures(p.id, solution, recommendations, selected) -# # Final flattening - Don't do this! -# # recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults( -# # p.id, recommendations, selected -# # ) -# -# # TODO: functionise -# for measure in funded_measures: -# if "+mechanical_ventilation" in measure["type"]: -# measure["type"] = measure["type"].split("+mechanical_ventilation")[0] -# -# p.insert_funding( -# scheme=scheme, -# funded_measures=funded_measures, -# project_funding=project_funding, -# total_uplift=total_uplift, -# full_project_score=full_project_score, -# partial_project_score=partial_project_score, -# uplift_project_score=uplift_project_score -# ) +# p.set_features(cleaned=cleaned, kwh_client=kwh_client, kwh_predictions=mocked_kwh_predictions) + +# Run the recommendations +recommendations = {} +recommendations_scoring_data = [] +representative_recommendations = {} +for p in tqdm(input_properties): + if p.data["property-type"] == "House" and pd.isnull(p.data["built-form"]): + p.data["built-form"] = "Semi-Detached" + recommender = Recommendations( + property_instance=p, + materials=materials, + exclusions=[], + inclusions=[], + default_u_values=True + ) + property_recommendations, property_representative_recommendations = recommender.recommend() + + if not property_recommendations: + continue + + recommendations[p.id] = property_recommendations + representative_recommendations[p.id] = property_representative_recommendations + + p.create_base_difference_epc_record(cleaned_lookup=cleaned) + p.adjust_difference_record_with_recommendations( + property_recommendations, property_representative_recommendations + ) + + recommendations_scoring_data.extend(p.recommendations_scoring_data) + +recommendations_scoring_data = pd.DataFrame(recommendations_scoring_data) +recommendations_scoring_data = recommendations_scoring_data.drop( + columns=[ + "rdsap_change", "heat_demand_change", "carbon_change", "sap_ending", "heat_demand_ending", + "carbon_ending" + ] +) + +model_predictions_mocked = { + "sap_change_predictions": None, + "heat_demand_predictions": None, + "carbon_change_predictions": None, + "heating_kwh_predictions": None, + "hotwater_kwh_predictions": None, +} + +for k in model_predictions_mocked.keys(): + model_predictions_mocked[k] = recommendations_scoring_data[["id"]].copy() + model_predictions_mocked[k][['property_id', 'recommendation_id']] = ( + model_predictions_mocked[k]['id'].str.split('+', expand=True) + ) + model_predictions_mocked[k]['phase'] = model_predictions_mocked[k]['recommendation_id'].apply( + ModelApi.extract_phase) + + if k in ["heating_kwh_predictions", "hotwater_kwh_predictions"]: + model_predictions_mocked[k]["predictions"] = random.choices(range(100, 3000), + k=len(recommendations_scoring_data)) + continue + + model_predictions_mocked[k] = model_predictions_mocked[k].sort_values(["property_id", "phase"], ascending=True) + preds = [] + for p_id in model_predictions_mocked[k]["property_id"].unique(): + # We add some amount each time + p = [p for p in input_properties if str(p.id) == p_id][0] + if k == "sap_change_predictions": + start = p.data["current-energy-efficiency"] + elif k == "heat_demand_predictions": + start = p.data["energy-consumption-current"] + else: + start = p.data["co2-emissions-current"] + df = model_predictions_mocked[k][model_predictions_mocked[k]["property_id"] == p_id].copy() + # Add some amount each time + to_add = random.choices(range(0, 15), k=len(df)) + to_add = np.cumsum(to_add) + df["predictions"] = start + to_add + preds.append(df) + preds = pd.concat(preds) + model_predictions_mocked[k] = preds + +for property_id in tqdm(recommendations.keys(), total=len(recommendations)): + property_instance = [p for p in input_properties if p.id == property_id][0] + + recommendations_with_impact, impact_summary = ( + Recommendations.calculate_recommendation_impact( + property_instance=property_instance, + all_predictions=model_predictions_mocked, + recommendations=recommendations, + representative_recommendations=representative_recommendations + ) + ) + + # We use the impact_summary to update the simulation_epcs with the new SAP, heat demand, carbon, cost etc + # at each phase + property_instance.update_simulation_epcs(impact_summary) + recommendations[property_id] = recommendations_with_impact + +for property_id in tqdm([p.id for p in input_properties]): + property_recommendations = recommendations.get(property_id, []) + property_instance = [p for p in input_properties if p.id == property_id][0] + + property_current_energy_bill = ( + Recommendations.calculate_recommendation_tenant_savings( + property_instance=property_instance, + kwh_simulation_predictions=model_predictions_mocked, + property_recommendations=property_recommendations, + ashp_cop=2.8 + ) + ) + property_instance.current_energy_bill = property_current_energy_bill + +body = PlanTriggerRequest( + **{'budget': None, 'goal': 'Increasing EPC', 'housing_type': 'Social', 'goal_value': 'B', 'portfolio_id': 0, + 'trigger_file_path': '', 'already_installed_file_path': '', + 'patches_file_path': None, 'non_invasive_recommendations_file_path': None, + 'valuation_file_path': '', + 'required_measures': [], 'scenario_name': 'EPC B', 'scenario_id': None, + 'multi_plan': True, 'optimise': True, 'default_u_values': True, 'ashp_cop': 2.8, + 'event_type': 'remote_assessment', 'simulate_sap_10': False, 'file_type': None, 'file_format': None, + 'sheet_name': None, 'sheet_count': None, 'index_start': None, 'index_end': None} +) + +for p in tqdm(input_properties): + if not recommendations.get(p.id): + continue + + # we need to double unlist because we have a list of lists + property_measure_types = {rec["type"] for recs in recommendations[p.id] for rec in recs} + property_required_measures = [m for m in recommendations[p.id] if m[0]["type"] in body.required_measures] + measures_to_optimise = [m for m in recommendations[p.id] if m[0]["type"] not in body.required_measures] + + # If a measure requiring ventilation is selected, and the property does not have ventilation, we enfore + # its inclusion + needs_ventilation = any( + x in property_measure_types for x in assumptions.measures_needing_ventilation + ) and not p.has_ventilation + + if not measures_to_optimise: + # Nothing to do, we just reshape the recommendations + recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults( + p.id, recommendations, set() + ) + continue + + fixed_gain = optimiser_functions.calculate_fixed_gain( + property_required_measures, recommendations, p, needs_ventilation + ) + gain = optimiser_functions.calculate_gain(body=body, p=p, fixed_gain=fixed_gain) + + funding = Funding( + tenure="Social", + project_scores_matrix=project_scores_matrix, + partial_project_scores_matrix=partial_project_scores_matrix, + whlg_eligible_postcodes=whlg_eligible_postcodes, + eco4_social_cavity_abs_rate=12.5, + eco4_social_solid_abs_rate=17, + eco4_private_cavity_abs_rate=12.5, + eco4_private_solid_abs_rate=17, + gbis_social_cavity_abs_rate=21, + gbis_social_solid_abs_rate=25, + gbis_private_cavity_abs_rate=21, + gbis_private_solid_abs_rate=28, + ) + + li_thickness = convert_thickness_to_numeric( + p.roof["insulation_thickness"], p.roof["is_pitched"], p.roof["is_flat"] + ) + current_wall_u_value = p.walls["thermal_transmittance"] + if current_wall_u_value is None: + current_wall_u_value = get_wall_u_value( + clean_description=p.walls["clean_description"], + age_band=p.age_band, + is_granite_or_whinstone=p.walls["is_granite_or_whinstone"], + is_sandstone_or_limestone=p.walls["is_sandstone_or_limestone"], + ) + + # We insert the innovation uplift + measures_to_optimise_with_uplift = deepcopy(measures_to_optimise) + + # TODO: Turn this into a function and store the innovaiton uplift + for group in measures_to_optimise_with_uplift: + for r in group: + + if r["type"] in ["mechanical_ventilation", "low_energy_lighting", "secondary_heating", + "extension_cavity_wall_insulation", "draught_proofing", "sealing_open_fireplace"]: + ( + r["partial_project_score"], + r["partial_project_funding"], + r["innovation_uplift"], + r["uplift_project_score"], + ) = ( + 0, 0, 0, 0 + ) + continue + + ( + r["partial_project_score"], r["partial_project_funding"], r["innovation_uplift"], + r["uplift_project_score"] + ) = funding.get_innovation_uplift( + measure=r, + starting_sap=p.data["current-energy-efficiency"], + floor_area=p.floor_area, + is_cavity=p.walls["is_cavity_wall"], + current_wall_uvalue=current_wall_u_value, + is_partial="partial" in p.walls["clean_description"].lower(), + existing_li_thickness=li_thickness, + mainheating=p.main_heating, + main_fuel=p.main_fuel, + mainheat_energy_eff=p.data["mainheat-energy-eff"], + ) + + input_measures = optimiser_functions.prepare_input_measures( + measures_to_optimise_with_uplift, body.goal, needs_ventilation, funding=True + ) + + # When the goal is Increasing EPC, we can run the funding optimiser + if body.goal == "Increasing EPC": + + solutions = optimise_with_funding_paths( + p=p, + input_measures=input_measures, + housing_type=body.housing_type, + budget=body.budget, + target_gain=gain, + funding=funding + ) + + # Given the solutions we select the optimal one + solutions["cost_less_full_project_funding"] = np.where( + solutions["scheme"] == "eco4", + solutions["total_cost"] - solutions["full_project_funding"] - solutions["total_uplift"], + solutions["total_cost"] - solutions["partial_project_funding"] - solutions["total_uplift"] + ) + + solutions["cost_less_full_project_funding"] = ( + solutions["total_cost"] - solutions["full_project_funding"] - solutions["total_uplift"] + ) + solutions = solutions.sort_values("cost_less_full_project_funding", ascending=True) + + if solutions["meets_upgrade_target"].any(): + # If we have a solution that meets the upgrade target, we select that one + optimal_solution = solutions[solutions["meets_upgrade_target"]].iloc[0] + else: + # Pick the cheapest + optimal_solution = solutions.iloc[0] + + # This is the list of measures that we will recommend + scheme = optimal_solution["scheme"] + funded_measures = optimal_solution["items"] if scheme != "none" else [] + solution = optimal_solution["items"] + optimal_solution["unfunded_items"] + # This is the total amount of funding that the project will produce (including uplifts) (£) + project_funding = optimal_solution["full_project_funding"] if scheme == "eco4" else \ + optimal_solution["partial_project_funding"] + # This is the total amount of funding associated to the uplift (£) + total_uplift = optimal_solution["total_uplift"] + # This is the funding scheme selected + # This is the full project ABS + full_project_score = optimal_solution["project_score"] + # This is the partial project ABS + partial_project_score = optimal_solution["partial_project_score"] + # This is the uplift score ABS + uplift_project_score = optimal_solution["total_uplift_score"] + else: + # We optimise and then we determine eligibility for funding, based on the measures selected + optimiser = ( + GainOptimiser( + input_measures, max_cost=body.budget, max_gain=gain, allow_slack=False + ) if body.budget else CostOptimiser(input_measures, min_gain=gain) + ) + optimiser.setup() + optimiser.solve() + solution = optimiser.solution + + recommendation_types = [] + for measures in input_measures: + for measure in measures: + recommendation_types.append(measure["type"]) + recommendation_types = set(recommendation_types) + + has_wall_insulation_recommendation = any( + (m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in + WALL_INSULATION_MEASURES + ) + has_roof_insulation_recommendation = any( + (m in recommendation_types or "+".join([m, "mechanical_ventilation"])) for m in + ROOF_INSULATION_MEASURES + ) + + funding.check_funding( + measures=solution, + starting_sap=p.data["current-energy-efficiency"], + ending_sap=p.data["current-energy-efficiency"] + sum([x["gain"] for x in solution]), + floor_area=p.floor_area, + mainheat_description=p.main_heating["clean_description"], + heating_control_description=p.main_heating_controls["clean_description"], + is_cavity=p.walls["is_cavity_wall"], + current_wall_uvalue=current_wall_u_value, + is_partial="partial" in p.walls["clean_description"].lower(), + existing_li_thickness=li_thickness, + mainheating=p.main_heating, + main_fuel=p.main_fuel, + mainheat_energy_eff=p.data["mainheat-energy-eff"], + has_wall_insulation_recommendation=has_wall_insulation_recommendation, + has_roof_insulation_recommendation=has_roof_insulation_recommendation, + ) + + # Determine the scheme + scheme = "none" + if funding.eco4_eligible: + scheme = "eco4" + if scheme == "none" and funding.gbis_eligible: + scheme = "gbis" + + funded_measures = solution if scheme in ["gbis", "eco4"] else [] + project_funding = 0 if funding.full_project_abs is not None else funding.full_project_abs + total_uplift = funding.eco4_uplift + full_project_score = 0 if funding.full_project_abs is not None else funding.full_project_abs + partial_project_score = funding.partial_project_abs + uplift_project_score = funding.eco4_uplift if scheme == "eco4" else funding.gbis_uplift + + selected = {r["id"] for r in solution} + + if property_required_measures: + solution = optimiser_functions.add_required_measures( + property_id=p.id, property_required_measures=property_required_measures, + recommendations=recommendations, selected=selected, + ) + + # Add best practice measures (ventilation/trickle vents) + selected = optimiser_functions.add_best_practice_measures(p.id, solution, recommendations, selected) + # Final flattening - Don't do this! + # recommendations[p.id] = optimiser_functions.flatten_recommendations_with_defaults( + # p.id, recommendations, selected + # ) + + # TODO: functionise + for measure in funded_measures: + if "+mechanical_ventilation" in measure["type"]: + measure["type"] = measure["type"].split("+mechanical_ventilation")[0] + + p.insert_funding( + scheme=scheme, + funded_measures=funded_measures, + project_funding=project_funding, + total_uplift=total_uplift, + full_project_score=full_project_score, + partial_project_score=partial_project_score, + uplift_project_score=uplift_project_score + ) From 5ab4d5a6d850c407f6678fbd83937cef8266b6f4 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Fri, 14 Nov 2025 13:36:09 +0000 Subject: [PATCH 05/11] add my code to main --- .devcontainer/Dockerfile | 23 ++++--- .devcontainer/devcontainer.json | 3 +- .devcontainer/post-install.sh | 37 ++++------ .vscode/settings.json | 19 +++++ backend/app/config.py | 10 ++- backend/app/db/connection.py | 6 ++ backend/app/db/functions/tasks/Tasks.py | 10 +++ backend/app/db/functions/whlg_functions.py | 80 ++++++++++++++++++++++ backend/app/db/models/tasks.py | 37 ++++++++++ backend/app/db/models/whlg.py | 15 ++++ backend/app/local/router.py | 5 ++ backend/app/main.py | 73 +++++++++++++++++++- backend/app/requirements/requirements.txt | 5 ++ backend/app/whlg/route.py | 47 ------------- backend/app/whlg/router.py | 78 +++++++++++++++++++++ backend/app/whlg/schema.py | 4 ++ backend/run_curl.sh | 11 +++ backend/run_local.sh | 6 ++ 18 files changed, 382 insertions(+), 87 deletions(-) create mode 100644 .vscode/settings.json create mode 100644 backend/app/db/functions/tasks/Tasks.py create mode 100644 backend/app/db/functions/whlg_functions.py create mode 100644 backend/app/db/models/tasks.py create mode 100644 backend/app/db/models/whlg.py delete mode 100644 backend/app/whlg/route.py create mode 100644 backend/app/whlg/router.py create mode 100644 backend/run_curl.sh create mode 100644 backend/run_local.sh diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 4d898973..c061c9f8 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -9,15 +9,15 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ build-essential pkg-config automake autoconf libtool \ && rm -rf /var/lib/apt/lists/* -# 2) Build and install libpostal from source -RUN git clone --depth 1 https://github.com/openvenues/libpostal /tmp/libpostal \ - && cd /tmp/libpostal \ - && ./bootstrap.sh \ - && ./configure --datadir=/usr/local/share/libpostal \ - && make -j"$(nproc)" \ - && make install \ - && ldconfig \ - && rm -rf /tmp/libpostal +# # 2) Build and install libpostal from source +# RUN git clone --depth 1 https://github.com/openvenues/libpostal /tmp/libpostal \ +# && cd /tmp/libpostal \ +# && ./bootstrap.sh \ +# && ./configure --datadir=/usr/local/share/libpostal \ +# && make -j"$(nproc)" \ +# && make install \ +# && ldconfig \ +# && rm -rf /tmp/libpostal # 3) Create the user and grant sudo privileges RUN useradd -m -s /usr/bin/bash ${USER} \ @@ -26,7 +26,10 @@ RUN useradd -m -s /usr/bin/bash ${USER} \ # 4) Python deps ENV PIP_NO_CACHE_DIR=1 PIP_DISABLE_PIP_VERSION_CHECK=1 -ADD asset_list/requirements.txt requirements.txt +# Model +# ADD asset_list/requirements.txt requirements.txt +# FASTAPI backend +ADD backend/app/requirements/requirements.txt requirements.txt RUN pip install -r requirements.txt # 5) Workdir diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 49bd6f83..91a76c3d 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -23,7 +23,8 @@ "4ops.terraform", "fabiospampinato.vscode-todo-plus", "jgclark.vscode-todo-highlight", - "corentinartaud.pdfpreview" + "corentinartaud.pdfpreview", + "ms-python.vscode-python-envs" ] } } diff --git a/.devcontainer/post-install.sh b/.devcontainer/post-install.sh index d9fc3a9e..dc6da006 100644 --- a/.devcontainer/post-install.sh +++ b/.devcontainer/post-install.sh @@ -1,27 +1,14 @@ -# #!/bin/bash -# poetry install; +mkdir -p ~/.ipython/profile_default/startup -# # Get the Poetry virtual environment path -# VENV_PATH=$(poetry env info --path 2>/dev/null) +cat << 'EOF' > ~/.ipython/profile_default/startup/00-load-env.py +from dotenv import load_dotenv +import os -# if [ -z "$VENV_PATH" ]; then -# echo "No Poetry environment found. Did you run 'poetry install'?" -# exit 1 -# fi - -# # Ensure VS Code settings directory exists -# SETTINGS_DIR="/home/vscode/.vscode-server/data/Machine" -# SETTINGS_FILE="$SETTINGS_DIR/settings.json" - -# mkdir -p "$SETTINGS_DIR" - -# # If settings.json doesn't exist, create a default one -# if [ ! -f "$SETTINGS_FILE" ]; then -# echo "{}" > "$SETTINGS_FILE" -# fi - -# # Update VS Code settings to use the Poetry virtual environment -# jq --arg venv "$VENV_PATH/bin/python" '.["python.defaultInterpreterPath"] = $venv' \ -# "$SETTINGS_FILE" > "$SETTINGS_FILE.tmp" && mv "$SETTINGS_FILE.tmp" "$SETTINGS_FILE" - -# echo "✅ Updated VS Code to use Poetry environment: $VENV_PATH" +# Adjust path as needed +env_path = "/workspaces/model/backend/.env" +if os.path.exists(env_path): + load_dotenv(env_path) + print("✔ Loaded .env into Jupyter kernel") +else: + print("⚠ No .env file found to load") +EOF \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..27782c10 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,19 @@ +{ + "jupyter.interactiveWindow.textEditor.executeSelection": true, + "python.REPL.sendToNativeREPL": true, + "notebook.output.scrolling": true, + "terminal.integrated.defaultProfile.linux": "bash", + "editor.rulers": [67], + "terminal.integrated.profiles.linux": { + "bash": { + "path": "/bin/bash" + } + }, + + // Hot reload setting that needs to be in user settings + // "jupyter.runStartupCommands": [ + // "%load_ext autoreload", "%autoreload 2" + // ] + + +} \ No newline at end of file diff --git a/backend/app/config.py b/backend/app/config.py index b53d5223..98e1c447 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -1,5 +1,7 @@ from functools import lru_cache from pydantic_settings import BaseSettings +from typing import Optional + class Settings(BaseSettings): @@ -35,9 +37,13 @@ class Settings(BaseSettings): # Other S3 buckts ENERGY_ASSESSMENTS_BUCKET: str - class Config: - env_file = "backend/.env" + # Optional AWS creds (only required in local) + AWS_ACCESS_KEY_ID: Optional[str] = None + AWS_SECRET_KEY_ID: Optional[str] = None + AWS_DEFAULT_REGION: Optional[str] = None + class Config: + env_file = "backend.env" @lru_cache() def get_settings(): diff --git a/backend/app/db/connection.py b/backend/app/db/connection.py index 9efdfd25..fbec9102 100644 --- a/backend/app/db/connection.py +++ b/backend/app/db/connection.py @@ -1,5 +1,6 @@ from sqlalchemy import create_engine from backend.app.config import get_settings +from sqlmodel import Session connection_string = "postgresql+{drivername}://{username}:{password}@{server}:{port}/{dbname}" db_string = connection_string.format( @@ -12,3 +13,8 @@ db_string = connection_string.format( ) db_engine = create_engine(db_string, pool_size=5, max_overflow=5) + +def get_db_session(): + if db_engine is None: + raise RuntimeError("Database is not configured. Set DATABASE_URL in environment variables.") + return Session(db_engine) diff --git a/backend/app/db/functions/tasks/Tasks.py b/backend/app/db/functions/tasks/Tasks.py new file mode 100644 index 00000000..d3f06d33 --- /dev/null +++ b/backend/app/db/functions/tasks/Tasks.py @@ -0,0 +1,10 @@ + +class TasksInterface: + def __init__(self): + pass + + + +class SubTaskInterface: + def __init__(self) + pass \ No newline at end of file diff --git a/backend/app/db/functions/whlg_functions.py b/backend/app/db/functions/whlg_functions.py new file mode 100644 index 00000000..e318d004 --- /dev/null +++ b/backend/app/db/functions/whlg_functions.py @@ -0,0 +1,80 @@ +from backend.app.db.connection import get_db_session +from backend.app.db.models.whlg import Whlg + + +def upsert_whlg_postcode(postcode: str): + """ + Manually upsert a postcode into the WHLG table. + No unique constraint is required. + """ + + cleaned = postcode.lower().replace(" ", "") + + with get_db_session() as session: + # Check if record exists + existing = session.query(Whlg).filter(Whlg.postcode == cleaned).first() + + if existing: + return existing # nothing to update, just return it + + # Insert a new row + record = Whlg(postcode=cleaned) + session.add(record) + session.commit() + session.refresh(record) + + return record + + +# One time script to upload 400,000 records in one go with the pay +# of pandas and one insert +from backend.app.db.connection import get_db_session +from backend.app.db.models.whlg import Whlg +from sqlalchemy import select +from sqlalchemy.orm import Session + + +def upload_whlg_from_dataframe(df): + """ + FAST bulk insert of WHLG postcodes (400k+ rows). + No unique constraint needed. + """ + + if "Postcode" not in df.columns: + raise ValueError("DataFrame must contain a 'Postcode' column") + + # 1. Clean incoming postcodes + cleaned_postcodes = ( + df["Postcode"] + .astype(str) + .str.lower() + .str.replace(" ", "", regex=False) + .dropna() + .unique() + .tolist() + ) + + with get_db_session() as session: + # 2. Fetch existing postcodes once (VERY FAST) + existing = session.exec(select(Whlg.postcode)).all() + existing_set = set(existing) + + # 3. Determine which are new + new_postcodes = [ + pc for pc in cleaned_postcodes if pc not in existing_set + ] + + if not new_postcodes: + return {"inserted": 0, "skipped_existing": len(cleaned_postcodes)} + + # 4. Bulk insert new postcodes in one shot + objects = [Whlg(postcode=pc) for pc in new_postcodes] + + session.bulk_save_objects(objects) + session.commit() + + return { + "inserted": len(new_postcodes), + "skipped_existing": len(cleaned_postcodes) - len(new_postcodes), + "total_provided": len(cleaned_postcodes) + } diff --git a/backend/app/db/models/tasks.py b/backend/app/db/models/tasks.py new file mode 100644 index 00000000..ed5b3710 --- /dev/null +++ b/backend/app/db/models/tasks.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from typing import Optional, List +from datetime import datetime +from uuid import UUID, uuid4 + +from sqlmodel import SQLModel, Field, Relationship + + +class Task(SQLModel, table=True): + __tablename__ = "tasks" + + id: UUID = Field( + default_factory=uuid4, + primary_key=True, + index=True, + ) + + taskSource: str = Field(alias="task_source") + + jobStarted: Optional[datetime] = Field( + default=None, alias="job_started" + ) + jobCompleted: Optional[datetime] = Field( + default=None, alias="job_completed" + ) + + status: str = Field(default="In Progress") + service: Optional[str] = None + + updatedAt: datetime = Field( + default_factory=datetime.utcnow, + alias="updated_at", + ) + + # Relationship + subTasks: List["SubTask"] = Relationship(back_populates="task") diff --git a/backend/app/db/models/whlg.py b/backend/app/db/models/whlg.py new file mode 100644 index 00000000..29d907e4 --- /dev/null +++ b/backend/app/db/models/whlg.py @@ -0,0 +1,15 @@ +import uuid +from typing import Optional +from sqlmodel import SQLModel, Field + + +class Whlg(SQLModel, table=True): + __tablename__ = "whlg" + + id: Optional[int] = Field( + default=None, + primary_key=True, + index=True, + ) + + postcode: str = Field(nullable=False) \ No newline at end of file diff --git a/backend/app/local/router.py b/backend/app/local/router.py index 4ebb490c..0977be04 100644 --- a/backend/app/local/router.py +++ b/backend/app/local/router.py @@ -31,6 +31,11 @@ def create_dummy_token(secret: str) -> str: return token +@router.get("/") +async def dummy_token(): + return {"hello": "world"} + + @router.get("/dummy-token") async def dummy_token(): settings = get_settings() diff --git a/backend/app/main.py b/backend/app/main.py index de6f0795..261e2f34 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -6,6 +6,7 @@ from fastapi.encoders import jsonable_encoder from starlette.exceptions import HTTPException as StarletteHTTPException from mangum import Mangum from backend.app.portfolio import router as portfolio_router +from backend.app.whlg import router as whlg_router from backend.app.plan import router as plan_router from backend.app.dependencies import validate_api_key from backend.app.config import get_settings @@ -13,7 +14,10 @@ from backend.app.config import get_settings logger = logging.getLogger("uvicorn.error") logging.basicConfig(level=logging.INFO) -app = FastAPI(dependencies=[Depends(validate_api_key)]) +if get_settings().ENVIRONMENT == "local": + app = FastAPI() +else: + app = FastAPI(dependencies=[Depends(validate_api_key)]) # Handle 422 errors (validation failures) @@ -52,10 +56,75 @@ async def log_requests(request: Request, call_next): app.include_router(portfolio_router.router, prefix="/v1") app.include_router(plan_router.router, prefix="/v1") +app.include_router(whlg_router.router, prefix="/v1") + +if get_settings().ENVIRONMENT == "local": + from app.local import router as local_router + app.include_router(local_router.router) + +handler = Mangum(app) +import logging +from fastapi.responses import JSONResponse +from fastapi import FastAPI, Depends, Request, status +from fastapi.exceptions import RequestValidationError +from fastapi.encoders import jsonable_encoder +from starlette.exceptions import HTTPException as StarletteHTTPException +from mangum import Mangum +from backend.app.portfolio import router as portfolio_router +from backend.app.whlg import router as whlg_router +from backend.app.plan import router as plan_router +from backend.app.dependencies import validate_api_key +from backend.app.config import get_settings + +logger = logging.getLogger("uvicorn.error") +logging.basicConfig(level=logging.INFO) + +if get_settings().ENVIRONMENT == "local": + app = FastAPI() +else: + app = FastAPI(dependencies=[Depends(validate_api_key)]) + + +# Handle 422 errors (validation failures) +@app.exception_handler(RequestValidationError) +async def validation_exception_handler(request: Request, exc: RequestValidationError): + logger.error(f"422 Validation Error at {request.url}") + logger.error(f"Body: {exc.body}") + logger.error(f"Validation Errors: {exc.errors()}") + return JSONResponse( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + content=jsonable_encoder({ + "detail": exc.errors(), + "body": exc.body + }), + ) + + +# Handle generic HTTP exceptions (optional, useful for catching 404, 403, etc.) +@app.exception_handler(StarletteHTTPException) +async def http_exception_handler(request: Request, exc: StarletteHTTPException): + logger.warning(f"{exc.status_code} Error at {request.url} - Detail: {exc.detail}") + return JSONResponse( + status_code=exc.status_code, + content={"detail": exc.detail}, + ) + + +# Middleware to log requests +@app.middleware("http") +async def log_requests(request: Request, call_next): + logger.info(f"Incoming request: {request.method} {request.url}") + response = await call_next(request) + logger.info(f"Response status: {response.status_code}") + return response + + +app.include_router(portfolio_router.router, prefix="/v1") +app.include_router(plan_router.router, prefix="/v1") +app.include_router(whlg_router.router, prefix="/v1") if get_settings().ENVIRONMENT == "local": from app.local import router as local_router - app.include_router(local_router.router) handler = Mangum(app) diff --git a/backend/app/requirements/requirements.txt b/backend/app/requirements/requirements.txt index a213214d..8a151e83 100644 --- a/backend/app/requirements/requirements.txt +++ b/backend/app/requirements/requirements.txt @@ -12,3 +12,8 @@ boto3==1.35.44 openpyxl==3.1.2 # Basic pytz +uvicorn[standard] +pandas +ipykernel +sqlmodel + diff --git a/backend/app/whlg/route.py b/backend/app/whlg/route.py deleted file mode 100644 index 21d417c5..00000000 --- a/backend/app/whlg/route.py +++ /dev/null @@ -1,47 +0,0 @@ -import boto3 -import json -import math -import asyncio -import random - -from datetime import datetime - -from fastapi import APIRouter, Depends -from backend.app.dependencies import validate_token -from backend.app.plan.schemas import PlanTriggerRequest -from backend.app.config import get_settings -from sqlalchemy.orm import sessionmaker -from utils.logger import setup_logger -from backend.app.db.connection import db_engine - -from backend.app.db.functions.recommendations_functions import create_scenario - -logger = setup_logger() - -router = APIRouter( - prefix="/whlg", - tags=["whlg"], - dependencies=[Depends(validate_token)], - responses={404: {"description": "Not found"}} -) - - -@router.post("/") -async def whlg_entrypoint(body): - # body needs to include postcode, UPRN [task ID?] - # - # Refer to the plan trigger route for code - # 1) Create an event schema and store it in the schemas file - # 2) Build the tasks functions - # 3) Read in the funding csx. This can be found as such: - # whlg_eligible_postcodes = read_csv_from_s3( - # bucket_name=get_settings().DATA_BUCKET, - # filepath="funding/whlg eligible postcodes.csv", - # ) - # whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes) - # Check the postcode against this file - # We need to store this somewhere????!!!??!??!?!?!?!??!??!??!??!??!??!??!??!??!??! Create a new table! - # Update subtask to be complete - # Once this is complete, build the logs stuff, add the cloudwatch logs ID to the database - - print("We're gonna do stuff!") diff --git a/backend/app/whlg/router.py b/backend/app/whlg/router.py new file mode 100644 index 00000000..3957a3f4 --- /dev/null +++ b/backend/app/whlg/router.py @@ -0,0 +1,78 @@ +import boto3 +import json +import math +import asyncio +import random + +from datetime import datetime + +from fastapi import APIRouter, Depends +from backend.app.dependencies import validate_token +from backend.app.plan.schemas import PlanTriggerRequest +from backend.app.config import get_settings +from sqlalchemy.orm import sessionmaker +from utils.logger import setup_logger +from backend.app.db.connection import db_engine +from backend.app.db.functions.recommendations_functions import create_scenario +import pandas as pd +from backend.app.whlg.schema import WHLGElligibilityRequest + +from utils.s3 import read_csv_from_s3 +from sqlalchemy.dialects.postgresql import insert +from backend.app.db.connection import get_db_session +from backend.app.db.models.whlg import Whlg +from backend.app.db.functions.whlg_functions import upsert_whlg_postcode + +logger = setup_logger() + + +if get_settings().ENVIRONMENT == "local": + router = APIRouter( + prefix="/whlg", + tags=["whlg"], + ) + +else: + router = APIRouter( + prefix="/whlg", + tags=["whlg"], + dependencies=[Depends(validate_token)], + responses={404: {"description": "Not found"}} + ) + +@router.get("/") +async def whlg_entrypoint(): + # body needs to include postcode, UPRN [task ID?] + # + # Refer to the plan trigger route for code + # 1) Create an event schema and store it in the schemas file + # 2) Build the tasks functions + # 3) Read in the funding csx. This can be found as such: + # whlg_eligible_postcodes = read_csv_from_s3( + # bucket_name=get_settings().DATA_BUCKET, + # filepath="funding/whlg eligible postcodes.csv", + # ) + # whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes) + # Check the postcode against this file + # We need to store this somewhere????!!!??!??!?!?!?!??!??!??!??!??!??!??!??!??!??! Create a new table! + # Update subtask to be complete + # Once this is complete, build the logs stuff, add the cloudwatch logs ID to the database + return {"hello": "from whlg"} + + +@router.post("/eligible") +async def eligiable(body: WHLGElligibilityRequest): + postcode = body.postcode or "" + postcode = postcode.lower().replace(" ", "") + + whlg_eligible_postcodes = read_csv_from_s3( + bucket_name=get_settings().DATA_BUCKET, + filepath="funding/whlg eligible postcodes.csv", + ) + whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes) + whlg_eligible_postcodes['Postcode'] = whlg_eligible_postcodes['Postcode'].str.replace(' ', '', regex=False) + + is_eligible = postcode in whlg_eligible_postcodes['Postcode'].values + return {"whlg_eligible": is_eligible} + + diff --git a/backend/app/whlg/schema.py b/backend/app/whlg/schema.py index e69de29b..648ecbf3 100644 --- a/backend/app/whlg/schema.py +++ b/backend/app/whlg/schema.py @@ -0,0 +1,4 @@ +from pydantic import BaseModel, Field + +class WHLGElligibilityRequest(BaseModel): + postcode: str = Field(..., example="B93 8SY") \ No newline at end of file diff --git a/backend/run_curl.sh b/backend/run_curl.sh new file mode 100644 index 00000000..22433e39 --- /dev/null +++ b/backend/run_curl.sh @@ -0,0 +1,11 @@ +curl -X POST "http://localhost:8000/v1/whlg/eligible" \ + -H "Content-Type: application/json" \ + -d '{"postcode": "B93 8SY"}' + +curl -X POST "http://localhost:8000/v1/whlg/eligible" \ + -H "Content-Type: application/json" \ + -d '{"postcode": "BN15 0FD"}' + +curl -X POST "http://localhost:8000/v1/whlg/eligible" \ + -H "Content-Type: application/json" \ + -d '{"postcode": "DY6 0LB"}' diff --git a/backend/run_local.sh b/backend/run_local.sh new file mode 100644 index 00000000..be45a54a --- /dev/null +++ b/backend/run_local.sh @@ -0,0 +1,6 @@ +set -a +source ./.env +set +a + +uvicorn app.main:app --reload + From 05740f82a4026dc7f5de23519576dea1498b2dec Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Fri, 14 Nov 2025 13:40:29 +0000 Subject: [PATCH 06/11] uploade without workflows --- .../actions/lambda-deploy/action.yml | 86 ------------------- .../actions/terraform-deploy/action.yml | 55 ------------ .github/workflows/lambda_main.yml | 33 ------- 3 files changed, 174 deletions(-) delete mode 100644 .github/workflows/actions/lambda-deploy/action.yml delete mode 100644 .github/workflows/actions/terraform-deploy/action.yml delete mode 100644 .github/workflows/lambda_main.yml diff --git a/.github/workflows/actions/lambda-deploy/action.yml b/.github/workflows/actions/lambda-deploy/action.yml deleted file mode 100644 index 3ca0fc8d..00000000 --- a/.github/workflows/actions/lambda-deploy/action.yml +++ /dev/null @@ -1,86 +0,0 @@ -name: "Build and Push Lambda Image to ECR" -description: "Reusable action for building and pushing lambda Docker image to ECR" - -inputs: - ecr_name: - description: "Lambda name / ECR repo name" - required: true - dockerfile_path: - description: "Path to Dockerfile" - required: true - ecr_tf_dir: - description: "Path to ECR terraform directory" - required: true - lambda_tf_dir: - description: "Path to Lambda terraform directory" - required: true - aws-access-key-id: - description: "AWS access key" - required: true - aws-secret-access-key: - description: "AWS secret key" - required: true - aws-region: - description: "AWS region" - required: true - git-sha: - description: "Git commit SHA" - required: true - git-ref: - description: "Git ref name" - required: true - -runs: - using: "composite" - steps: - - uses: actions/checkout@v4 - - - name: Configure AWS credentials - uses: aws-actions/configure-aws-credentials@v4 - with: - aws-access-key-id: ${{ inputs.aws-access-key-id }} - aws-secret-access-key: ${{ inputs.aws-secret-access-key }} - aws-region: ${{ inputs.aws-region }} - - - name: Log in to Amazon ECR - id: login-ecr - uses: aws-actions/amazon-ecr-login@v2 - - - name: Deploy ECR - uses: ./.github/workflows/actions/terraform-deploy - with: - working_directory: ${{ inputs.ecr_tf_dir }} - aws-access-key-id: ${{ inputs.aws-access-key-id }} - aws-secret-access-key: ${{ inputs.aws-secret-access-key }} - aws-region: ${{ inputs.aws-region }} - - name: Set Docker image tag - id: set_tag - shell: bash - run: | - SHORT_SHA=$(echo "${{ inputs.git-sha }}" | cut -c1-7) - BRANCH=$(echo "${{ inputs.git-ref }}" | tr '/' '-') - TAG="${BRANCH}-${SHORT_SHA}" - echo "IMAGE_TAG=${TAG}" >> $GITHUB_ENV - echo "tag=$TAG" >> $GITHUB_OUTPUT - - - name: Build and push Docker image - shell: bash - run: | - IMAGE_URI=${{ steps.login-ecr.outputs.registry }}/${{ inputs.ecr_name }}:${{ steps.set_tag.outputs.tag }} - echo "Building Docker image for ${{ inputs.ecr_name }}..." - docker build -t $IMAGE_URI -f ${{ inputs.dockerfile_path }} . - - echo "Pushing to ECR..." - docker push $IMAGE_URI - - - name: Deploy Lambda - uses: ./.github/workflows/actions/terraform-deploy - with: - working_directory: ${{ inputs.lambda_tf_dir }} - aws-access-key-id: ${{ inputs.aws-access-key-id }} - aws-secret-access-key: ${{ inputs.aws-secret-access-key }} - aws-region: ${{ inputs.aws-region }} - lambda-image-tag: ${{ steps.set_tag.outputs.tag }} - - - diff --git a/.github/workflows/actions/terraform-deploy/action.yml b/.github/workflows/actions/terraform-deploy/action.yml deleted file mode 100644 index 56133299..00000000 --- a/.github/workflows/actions/terraform-deploy/action.yml +++ /dev/null @@ -1,55 +0,0 @@ -name: "Terraform Plan Shared Config" -description: "Plans shared Terraform config for Lambdas" - -inputs: - working_directory: - description: "Directory containing Terraform config" - required: true - aws-access-key-id: - description: "AWS access key" - required: true - aws-secret-access-key: - description: "AWS secret key" - required: true - aws-region: - description: "AWS region" - required: true - lambda-image-tag: - description: "Tag of the Lambda image (e.g., GitHub SHA)" - required: false - -runs: - using: "composite" - steps: - - uses: actions/checkout@v4 - - - name: Configure AWS credentials - uses: aws-actions/configure-aws-credentials@v4 - with: - aws-access-key-id: ${{ inputs.aws-access-key-id }} - aws-secret-access-key: ${{ inputs.aws-secret-access-key }} - aws-region: ${{ inputs.aws-region }} - - - name: Setup Terraform - uses: hashicorp/setup-terraform@v3 - - - name: Terraform Init - working-directory: ${{ inputs.working_directory }} - shell: bash - run: terraform init -reconfigure - - - name: Terraform Plan - working-directory: ${{ inputs.working_directory }} - shell: bash - run: | - if [ -n "${{ inputs.lambda-image-tag }}" ]; then - terraform plan -out=tfplan -var="lambda_image_tag=${{ inputs.lambda-image-tag }}" - else - terraform plan -out=tfplan - fi - - - name: Terraform Apply - working-directory: ${{ inputs.working_directory }} - shell: bash - run: terraform apply -auto-approve tfplan - diff --git a/.github/workflows/lambda_main.yml b/.github/workflows/lambda_main.yml deleted file mode 100644 index 960adbe5..00000000 --- a/.github/workflows/lambda_main.yml +++ /dev/null @@ -1,33 +0,0 @@ -# Please note, this github work flows assumes that shared-terrform is deployed in aws env -# The shared-terraform files lives in https://github.com/Hestia-Homes/survey-extraction/tree/main/deployment/lambda/lambda_shared - -name: Deploy Lambdas -on: - push: - branches: [main, feature/whlg_lambda] - -env: - AWS_REGION: eu-west-2 - -jobs: - whlg-calc: - runs-on: ubuntu-latest - permissions: - id-token: write - contents: read - - steps: - - name: Checkout repo - uses: actions/checkout@v4 - - name: Build and deploy Warm Homes Local Grant Calc (whlg-calc) - uses: ./.github/workflows/actions/lambda-deploy - with: - ecr_name: whlg_calc_adhoc_ecr - dockerfile_path: ./deployment/lambda/whlg_calculator/docker/Dockerfile - ecr_tf_dir: ./deployment/lambda/whlg_calculator/docker/ - lambda_tf_dir: ./deployment/lambda/whlg_calculator/ - aws-access-key-id: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }} - aws-secret-access-key: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY}} - aws-region: eu-west-2 - git-sha: ${{ github.sha }} - git-ref: ${{ github.ref_name }} \ No newline at end of file From c617d603a360477c36a9a463c0c262d2de800b48 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Fri, 14 Nov 2025 13:41:24 +0000 Subject: [PATCH 07/11] uploade without workflows --- .../whlg_calculator/docker/.dockerignore | 21 ----- .../lambda/whlg_calculator/docker/Dockerfile | 25 ------ .../lambda/whlg_calculator/docker/app.py | 3 - .../lambda/whlg_calculator/docker/ecr.tf | 63 -------------- .../lambda/whlg_calculator/docker/main.tf | 0 .../lambda/whlg_calculator/docker/provider.tf | 15 ---- deployment/lambda/whlg_calculator/main.tf | 0 deployment/lambda/whlg_calculator/provider.tf | 15 ---- deployment/lambda/whlg_calculator/vars.tf | 5 -- .../lambda/whlg_calculator/whlg_lambda.tf | 83 ------------------- 10 files changed, 230 deletions(-) delete mode 100644 deployment/lambda/whlg_calculator/docker/.dockerignore delete mode 100644 deployment/lambda/whlg_calculator/docker/Dockerfile delete mode 100644 deployment/lambda/whlg_calculator/docker/app.py delete mode 100644 deployment/lambda/whlg_calculator/docker/ecr.tf delete mode 100644 deployment/lambda/whlg_calculator/docker/main.tf delete mode 100644 deployment/lambda/whlg_calculator/docker/provider.tf delete mode 100644 deployment/lambda/whlg_calculator/main.tf delete mode 100644 deployment/lambda/whlg_calculator/provider.tf delete mode 100644 deployment/lambda/whlg_calculator/vars.tf delete mode 100644 deployment/lambda/whlg_calculator/whlg_lambda.tf diff --git a/deployment/lambda/whlg_calculator/docker/.dockerignore b/deployment/lambda/whlg_calculator/docker/.dockerignore deleted file mode 100644 index d587d341..00000000 --- a/deployment/lambda/whlg_calculator/docker/.dockerignore +++ /dev/null @@ -1,21 +0,0 @@ -# Ignore junk and large files -*.pdf -*.csv -*.xml -*.parquet -*.ipynb -*.mp4 -*.mov -*.jpg -*.png -*.zip -*.tar.gz -__pycache__/ -*.pyc -*.pyo -*.pyd -build/ -dist/ -.etl_cache/ -tests/ -docs/ diff --git a/deployment/lambda/whlg_calculator/docker/Dockerfile b/deployment/lambda/whlg_calculator/docker/Dockerfile deleted file mode 100644 index cdd1f8a3..00000000 --- a/deployment/lambda/whlg_calculator/docker/Dockerfile +++ /dev/null @@ -1,25 +0,0 @@ -FROM public.ecr.aws/lambda/python:3.12 - -# Install Poetry (you could pin a version if you like) -RUN curl -sSL https://install.python-poetry.org | python3 - - -# Add Poetry to PATH -ENV PATH="/root/.local/bin:$PATH" - -# Set working directory -WORKDIR /var/task - -# Copy Poetry files first to leverage Docker layer caching -COPY pyproject.toml poetry.lock README.md ./ -COPY etl/ etl/ - - -# Install dependencies into /var/task -RUN poetry config virtualenvs.create false \ - && poetry install --only main --no-interaction --no-ansi - -# Copy app code -COPY deployment/lambda/extractor_and_loader/docker/app.py ./ - -# Set Lambda handler -CMD ["app.handler"] \ No newline at end of file diff --git a/deployment/lambda/whlg_calculator/docker/app.py b/deployment/lambda/whlg_calculator/docker/app.py deleted file mode 100644 index 4dcf1a8e..00000000 --- a/deployment/lambda/whlg_calculator/docker/app.py +++ /dev/null @@ -1,3 +0,0 @@ -def handler(event, context): - print("Hello and welcome to the WHLG Calculator") - print("Please contact the tech team for implementation") \ No newline at end of file diff --git a/deployment/lambda/whlg_calculator/docker/ecr.tf b/deployment/lambda/whlg_calculator/docker/ecr.tf deleted file mode 100644 index a1501dff..00000000 --- a/deployment/lambda/whlg_calculator/docker/ecr.tf +++ /dev/null @@ -1,63 +0,0 @@ -# ECR repo -resource "aws_ecr_repository" "whlg_calc_adhoc_ecr" { - name = "whlg_calc_adhoc_ecr" -} - -# ECR policy to allow Lambda access -resource "aws_ecr_repository_policy" "whlg_calc_adhoc_ecr_access" { - repository = aws_ecr_repository.whlg_calc_adhoc_ecr.name - - policy = jsonencode({ - Version = "2008-10-17", - Statement = [{ - Sid = "AllowLambdaPull", - Effect = "Allow", - Principal = { - Service = "lambda.amazonaws.com" - }, - Action = [ - "ecr:GetDownloadUrlForLayer", - "ecr:BatchGetImage", - "ecr:BatchCheckLayerAvailability" - ] - }] - }) -} - - - -# ECR lifecycle policy to delete tagged images older than 14 days -resource "aws_ecr_lifecycle_policy" "whlg_calc_adhoc_loader_lifecycle" { - repository = aws_ecr_repository.whlg_calc_adhoc_ecr.name - - policy = jsonencode({ - "rules": [ - { - "rulePriority": 2, - "description": "Expire images older than 14 days", - "selection": { - "tagStatus": "untagged", - "countType": "sinceImagePushed", - "countUnit": "days", - "countNumber": 1 - }, - "action": { - "type": "expire" - } - }, - { - "rulePriority": 1, - "description": "Keep last 5 images", - "selection": { - "tagStatus": "tagged", - "tagPrefixList": ["feature"], - "countType": "imageCountMoreThan", - "countNumber": 5 - }, - "action": { - "type": "expire" - } - } - ] - }) -} \ No newline at end of file diff --git a/deployment/lambda/whlg_calculator/docker/main.tf b/deployment/lambda/whlg_calculator/docker/main.tf deleted file mode 100644 index e69de29b..00000000 diff --git a/deployment/lambda/whlg_calculator/docker/provider.tf b/deployment/lambda/whlg_calculator/docker/provider.tf deleted file mode 100644 index 5f0fef0f..00000000 --- a/deployment/lambda/whlg_calculator/docker/provider.tf +++ /dev/null @@ -1,15 +0,0 @@ -terraform { - required_providers { - aws = { - source = "hashicorp/aws" - version = "~> 6.3.0" - } - } - backend "s3" { - bucket = "whlg-calc-tf-state" - region = "eu-west-2" - key = "env:/dev/lambda/ecr/whlg-calc.tfstate" - } - - required_version = ">= 1.2.0" -} diff --git a/deployment/lambda/whlg_calculator/main.tf b/deployment/lambda/whlg_calculator/main.tf deleted file mode 100644 index e69de29b..00000000 diff --git a/deployment/lambda/whlg_calculator/provider.tf b/deployment/lambda/whlg_calculator/provider.tf deleted file mode 100644 index df9abf1c..00000000 --- a/deployment/lambda/whlg_calculator/provider.tf +++ /dev/null @@ -1,15 +0,0 @@ -terraform { - required_providers { - aws = { - source = "hashicorp/aws" - version = "~> 6.3.0" - } - } - backend "s3" { - bucket = "whlg-calc-tf-state" - region = "eu-west-2" - key = "env:/dev/lambda/eachlambda/whlg_calc_lambda.tfstate" - } - - required_version = ">= 1.2.0" -} diff --git a/deployment/lambda/whlg_calculator/vars.tf b/deployment/lambda/whlg_calculator/vars.tf deleted file mode 100644 index ecdf359d..00000000 --- a/deployment/lambda/whlg_calculator/vars.tf +++ /dev/null @@ -1,5 +0,0 @@ -variable "lambda_image_tag" { - description = "Docker image tag (e.g. GitHub SHA)" - type = string - default = "local-dev-latest" -} \ No newline at end of file diff --git a/deployment/lambda/whlg_calculator/whlg_lambda.tf b/deployment/lambda/whlg_calculator/whlg_lambda.tf deleted file mode 100644 index 0a5433a9..00000000 --- a/deployment/lambda/whlg_calculator/whlg_lambda.tf +++ /dev/null @@ -1,83 +0,0 @@ -# Reference existing IAM role -data "aws_iam_role" "lambda_exec_role" { - name = "lambda-exec-role" -} - -# Reference existing ECR repository -data "aws_ecr_repository" "whlg_calc_adhoc_ecr" { - name = "whlg_calc_adhoc_ecr" -} - -# SQS queue -resource "aws_sqs_queue" "whlg_calc_adhoc_queue" { - name = "whlg_calc_adhoc-queue" - visibility_timeout_seconds = 1800 # 30 minutes (>= 300s and ~6x Lambda timeout) -} - - -# Custom IAM policy specific to lambda_example -resource "aws_iam_policy" "whlg_calc_adhoc_policy" { - name = "walthamforest_adhoc_policy_lambda" - - policy = jsonencode({ - Version = "2012-10-17", - Statement = [ - { - Effect = "Allow", - Action = [ - "sqs:ReceiveMessage", - "sqs:DeleteMessage", - "sqs:GetQueueAttributes", - "sqs:GetQueueUrl", - "sqs:ChangeMessageVisibility" - ], - Resource = aws_sqs_queue.whlg_calc_adhoc_queue.arn - }, - { - Effect = "Allow", - Action = [ - "ecr:GetDownloadUrlForLayer", - "ecr:BatchGetImage", - "ecr:BatchCheckLayerAvailability" - ], - Resource = data.aws_ecr_repository.whlg_calc_adhoc_ecr.arn - }, - { - Effect = "Allow", - Action = ["ecr:GetAuthorizationToken"], - Resource = "*" - } - ] - }) -} - -resource "aws_iam_role_policy_attachment" "whlg_calc_adhoc_policy_attach" { - role = data.aws_iam_role.lambda_exec_role.name - policy_arn = aws_iam_policy.whlg_calc_adhoc_policy.arn -} - -# Lambda function -resource "aws_lambda_function" "whlg_calc_adhoc" { - function_name = "whlg_calc_adhoc" - role = data.aws_iam_role.lambda_exec_role.arn - package_type = "Image" - image_uri = "${data.aws_ecr_repository.whlg_calc_adhoc_ecr.repository_url}:${var.lambda_image_tag}" - # Increase timeout (max 900 sec / 15 min) - # timeout = 300 # e.g. 5 minutes - - # Increase memory (default 128 MB) - memory_size = 2048 # try 1024 or 2048 MB to start - - # environment { - # variables = { - # DATABASE_URL = "postgresql://postgres:makingwarmhomes@terraform-20250331175522503500000002.cdgzupxvdyp0.eu-west-2.rds.amazonaws.com:5432/surveyDB" - # } - # } -} - -# SQS trigger -resource "aws_lambda_event_source_mapping" "whlg_calc_adhoc_trigger" { - event_source_arn = aws_sqs_queue.whlg_calc_adhoc_queue.arn - function_name = aws_lambda_function.whlg_calc_adhoc.arn - batch_size = 1 -} From d98ae7db7b7713ad1fc1a9d7d7d73c1f91cedfb6 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Fri, 14 Nov 2025 14:25:28 +0000 Subject: [PATCH 08/11] added task and sub task interface --- backend/app/db/functions/tasks/Tasks.py | 182 +++++++++++++++++++++++- backend/app/db/models/tasks.py | 36 +++++ backend/app/tasks/__init__.py | 0 backend/app/tasks/router.py | 87 +++++++++++ backend/app/tasks/schema.py | 21 +++ 5 files changed, 320 insertions(+), 6 deletions(-) create mode 100644 backend/app/tasks/__init__.py create mode 100644 backend/app/tasks/router.py create mode 100644 backend/app/tasks/schema.py diff --git a/backend/app/db/functions/tasks/Tasks.py b/backend/app/db/functions/tasks/Tasks.py index d3f06d33..18900c83 100644 --- a/backend/app/db/functions/tasks/Tasks.py +++ b/backend/app/db/functions/tasks/Tasks.py @@ -1,10 +1,180 @@ +from __future__ import annotations -class TasksInterface: - def __init__(self): - pass - +# ---- Standard Library ---- +from typing import Optional, Dict, Any +from datetime import datetime, timezone +from uuid import UUID +import json + +# ---- SQLModel / SQLAlchemy ---- +from sqlmodel import Session, select + +# ---- DB Session ---- +from backend.app.db.connection import get_db_session + +# ---- Models ---- +from backend.app.db.models.tasks import Task, SubTask +# ============================================================ +# SubTask Interface +# ============================================================ class SubTaskInterface: - def __init__(self) - pass \ No newline at end of file + """ + CRUD operations for SubTask + cascading Task progress updates. + """ + + def create_subtask(self, task_id: UUID, inputs: Optional[Dict[str, Any]] = None): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + task = session.get(Task, task_id) + if not task: + raise ValueError(f"Task {task_id} not found") + + subtask = SubTask( + taskId=task_id, + inputs=json.dumps(inputs) if inputs else None, + jobStarted=now, + ) + + session.add(subtask) + session.commit() + session.refresh(subtask) + + # Recalculate the parent task status + self._update_task_progress(session, task_id) + + return subtask + + def update_subtask_status(self, subtask_id: UUID, status: str): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + subtask = session.get(SubTask, subtask_id) + if not subtask: + raise ValueError(f"SubTask {subtask_id} not found") + + normalized = status.lower() + + # Start time + if normalized == "in progress" and subtask.jobStarted is None: + subtask.jobStarted = now + + # Completed time + if normalized == "complete": + subtask.jobCompleted = now + + subtask.status = normalized + subtask.updatedAt = now + + session.add(subtask) + session.commit() + + # Re-evaluate the task + self._update_task_progress(session, subtask.taskId) + + session.refresh(subtask) + return subtask + + # -------------------------------------------------------- + # Task Progress Calculation + # -------------------------------------------------------- + def _update_task_progress(self, session: Session, task_id: UUID): + task = session.get(Task, task_id) + if not task: + return + + subtasks = session.exec( + select(SubTask).where(SubTask.taskId == task_id) + ).all() + + if not subtasks: + return + + statuses = [st.status.lower() for st in subtasks] + now = datetime.now(timezone.utc) + + # Priority: + # failed > in progress > complete + if "failed" in statuses: + task.status = "failed" + task.jobCompleted = now + + elif all(s == "complete" for s in statuses): + task.status = "complete" + task.jobCompleted = now + + else: + task.status = "in progress" + if task.jobStarted is None: + task.jobStarted = now + task.jobCompleted = None # still running + + task.updatedAt = now + session.add(task) + session.commit() + + +# ============================================================ +# Task Interface +# ============================================================ +class TasksInterface: + """ + High-level operations for Task records. + """ + + def create_task( + self, + *, + task_source: str, + service: Optional[str] = None, + inputs: Optional[Dict[str, Any]] = None, + ): + now = datetime.now(timezone.utc) + + # Step 1: Create the task + with get_db_session() as session: + task = Task( + taskSource=task_source, + service=service, + jobStarted=now, + ) + + session.add(task) + session.commit() + session.refresh(task) + + # Step 2: Create first subtask using SubTaskInterface + subtask_interface = SubTaskInterface() + subtask = subtask_interface.create_subtask( + task_id=task.id, + inputs=inputs + ) + + return task.id, subtask.id + + def update_task_status(self, task_id: UUID, status: str): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + task = session.get(Task, task_id) + if not task: + raise ValueError(f"Task {task_id} not found") + + normalized = status.lower() + + if normalized == "in progress" and task.jobStarted is None: + task.jobStarted = now + + if normalized == "complete": + task.jobCompleted = now + + task.status = normalized + task.updatedAt = now + + session.add(task) + session.commit() + session.refresh(task) + + return task diff --git a/backend/app/db/models/tasks.py b/backend/app/db/models/tasks.py index ed5b3710..d8007dcd 100644 --- a/backend/app/db/models/tasks.py +++ b/backend/app/db/models/tasks.py @@ -35,3 +35,39 @@ class Task(SQLModel, table=True): # Relationship subTasks: List["SubTask"] = Relationship(back_populates="task") + + +class SubTask(SQLModel, table=True): + __tablename__ = "sub_task" + + id: UUID = Field( + default_factory=uuid4, + primary_key=True, + index=True, + ) + + taskId: UUID = Field( + foreign_key="tasks.id", + alias="task_id", + ) + + jobStarted: Optional[datetime] = Field( + default=None, alias="job_started" + ) + jobCompleted: Optional[datetime] = Field( + default=None, alias="job_completed" + ) + + status: str = Field(default="In Progress") + + inputs: Optional[str] = None + outputs: Optional[str] = None + cloudLogsURL: Optional[str] = Field(alias="cloud_logs_url") + + updatedAt: datetime = Field( + default_factory=datetime.utcnow, + alias="updated_at", + ) + + # Relationship + task: Optional[Task] = Relationship(back_populates="subTasks") diff --git a/backend/app/tasks/__init__.py b/backend/app/tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/app/tasks/router.py b/backend/app/tasks/router.py new file mode 100644 index 00000000..d324f9ba --- /dev/null +++ b/backend/app/tasks/router.py @@ -0,0 +1,87 @@ +from fastapi import APIRouter, Depends, HTTPException +from uuid import UUID + +from backend.app.dependencies import validate_token +from backend.app.tasks.schema import ( + CreateTaskRequest, + UpdateTaskStatusRequest, + CreateSubTaskRequest, + UpdateSubTaskStatusRequest, +) + +from backend.app.db.functions.tasks.Tasks import TasksInterface, SubTaskInterface +from backend.app.db.connection import get_db_session +from backend.app.db.models.tasks import Task, SubTask +from sqlmodel import select + + +router = APIRouter( + prefix="/tasks", + tags=["tasks"], + dependencies=[Depends(validate_token)], +) + + +@router.post("/", summary="Create a new task and its first subtask") +async def create_task(req: CreateTaskRequest): + tasks = TasksInterface() + task_id, subtask_id = tasks.create_task( + task_source=req.task_source, + service=req.service, + inputs=req.inputs, + ) + return {"task_id": task_id, "subtask_id": subtask_id} + + +@router.get("/{task_id}", summary="Get a task and its subtasks") +async def get_task(task_id: UUID): + with get_db_session() as session: + task = session.get(Task, task_id) + if not task: + raise HTTPException(status_code=404, detail="Task not found") + + subtasks = session.exec( + select(SubTask).where(SubTask.taskId == task_id) + ).all() + + # Deserialize JSON inputs back to dict + formatted = [] + for st in subtasks: + formatted.append({ + **st.dict(), + "inputs": json.loads(st.inputs) if st.inputs else None + }) + + return { + "task": task, + "subtasks": formatted, + } + + +@router.put("/{task_id}/status", summary="Update a task's status") +async def update_task_status(task_id: UUID, req: UpdateTaskStatusRequest): + tasks = TasksInterface() + try: + updated = tasks.update_task_status(task_id, req.status) + return {"task_id": updated.id, "status": updated.status} + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + +@router.post("/{task_id}/subtasks", summary="Create a new subtask under a task") +async def create_subtask(task_id: UUID, req: CreateSubTaskRequest): + subtasks = SubTaskInterface() + try: + st = subtasks.create_subtask(task_id, req.inputs) + return {"subtask_id": st.id, "task_id": task_id} + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + + +@router.put("/subtasks/{subtask_id}/status", summary="Update a subtask's status") +async def update_subtask_status(subtask_id: UUID, req: UpdateSubTaskStatusRequest): + subtasks = SubTaskInterface() + try: + st = subtasks.update_subtask_status(subtask_id, req.status) + return {"subtask_id": st.id, "status": st.status} + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) diff --git a/backend/app/tasks/schema.py b/backend/app/tasks/schema.py new file mode 100644 index 00000000..66be61e7 --- /dev/null +++ b/backend/app/tasks/schema.py @@ -0,0 +1,21 @@ +from typing import Optional, Any, Dict +from uuid import UUID +from pydantic import BaseModel + + +class CreateTaskRequest(BaseModel): + task_source: str + service: Optional[str] = None + inputs: Optional[Dict[str, Any]] = None # JSON object + + +class UpdateTaskStatusRequest(BaseModel): + status: str + + +class CreateSubTaskRequest(BaseModel): + inputs: Optional[Dict[str, Any]] = None # JSON object + + +class UpdateSubTaskStatusRequest(BaseModel): + status: str From 47be3ffea37ee03f820c33fcbf0a37c974638594 Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Fri, 14 Nov 2025 14:26:17 +0000 Subject: [PATCH 09/11] added tasks interface --- backend/app/main.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend/app/main.py b/backend/app/main.py index 261e2f34..f0ab4d86 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -8,6 +8,7 @@ from mangum import Mangum from backend.app.portfolio import router as portfolio_router from backend.app.whlg import router as whlg_router from backend.app.plan import router as plan_router +from backend.app.tasks import router as tasks_router from backend.app.dependencies import validate_api_key from backend.app.config import get_settings @@ -57,6 +58,7 @@ async def log_requests(request: Request, call_next): app.include_router(portfolio_router.router, prefix="/v1") app.include_router(plan_router.router, prefix="/v1") app.include_router(whlg_router.router, prefix="/v1") +app.include_router(tasks_router.router, prefix="/v1") if get_settings().ENVIRONMENT == "local": from app.local import router as local_router From 68a5de28e2b5b34d82e12eb3e1c869c00db4594b Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Fri, 14 Nov 2025 15:02:33 +0000 Subject: [PATCH 10/11] added tasks so Khalim can reeview it --- backend/app/db/functions/tasks/Tasks.py | 155 ++++++++++++++++++++---- backend/app/tasks/router.py | 54 ++++++++- backend/app/tasks/schema.py | 5 + 3 files changed, 189 insertions(+), 25 deletions(-) diff --git a/backend/app/db/functions/tasks/Tasks.py b/backend/app/db/functions/tasks/Tasks.py index 18900c83..06e1c6fe 100644 --- a/backend/app/db/functions/tasks/Tasks.py +++ b/backend/app/db/functions/tasks/Tasks.py @@ -24,6 +24,9 @@ class SubTaskInterface: CRUD operations for SubTask + cascading Task progress updates. """ + # -------------------------------------------------------- + # CREATE SUBTASK + # -------------------------------------------------------- def create_subtask(self, task_id: UUID, inputs: Optional[Dict[str, Any]] = None): now = datetime.now(timezone.utc) @@ -35,18 +38,22 @@ class SubTaskInterface: subtask = SubTask( taskId=task_id, inputs=json.dumps(inputs) if inputs else None, - jobStarted=now, + status="waiting", + jobStarted=None, + jobCompleted=None, ) session.add(subtask) session.commit() session.refresh(subtask) - # Recalculate the parent task status + # Recalculate parent task progress self._update_task_progress(session, task_id) - return subtask + # -------------------------------------------------------- + # UPDATE STATUS (in progress, complete, failed) + # -------------------------------------------------------- def update_subtask_status(self, subtask_id: UUID, status: str): now = datetime.now(timezone.utc) @@ -57,12 +64,12 @@ class SubTaskInterface: normalized = status.lower() - # Start time + # When job really starts if normalized == "in progress" and subtask.jobStarted is None: subtask.jobStarted = now - # Completed time - if normalized == "complete": + # Completed or failed + if normalized in ("complete", "failed"): subtask.jobCompleted = now subtask.status = normalized @@ -71,14 +78,80 @@ class SubTaskInterface: session.add(subtask) session.commit() - # Re-evaluate the task + # Recalculate task status self._update_task_progress(session, subtask.taskId) session.refresh(subtask) return subtask # -------------------------------------------------------- - # Task Progress Calculation + # UPDATE OUTPUTS + # -------------------------------------------------------- + def update_subtask_output(self, subtask_id: UUID, outputs: Dict[str, Any]): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + subtask = session.get(SubTask, subtask_id) + if not subtask: + raise ValueError(f"SubTask {subtask_id} not found") + + subtask.outputs = json.dumps(outputs) + subtask.updatedAt = now + + session.add(subtask) + session.commit() + session.refresh(subtask) + return subtask + + # -------------------------------------------------------- + # UPDATE CLOUD LOGS URL + # -------------------------------------------------------- + def update_subtask_logs(self, subtask_id: UUID, cloud_logs_url: str): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + subtask = session.get(SubTask, subtask_id) + if not subtask: + raise ValueError(f"SubTask {subtask_id} not found") + + subtask.cloudLogsURL = cloud_logs_url + subtask.updatedAt = now + + session.add(subtask) + session.commit() + session.refresh(subtask) + return subtask + + # -------------------------------------------------------- + # SET BOTH OUTPUT + LOGS + # -------------------------------------------------------- + def set_subtask_result( + self, + subtask_id: UUID, + outputs: Optional[Dict[str, Any]] = None, + cloud_logs_url: Optional[str] = None, + ): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + subtask = session.get(SubTask, subtask_id) + if not subtask: + raise ValueError(f"SubTask {subtask_id} not found") + + if outputs is not None: + subtask.outputs = json.dumps(outputs) + + if cloud_logs_url is not None: + subtask.cloudLogsURL = cloud_logs_url + + subtask.updatedAt = now + session.add(subtask) + session.commit() + session.refresh(subtask) + return subtask + + # -------------------------------------------------------- + # TASK PROGRESS CALCULATION # -------------------------------------------------------- def _update_task_progress(self, session: Session, task_id: UUID): task = session.get(Task, task_id) @@ -89,14 +162,9 @@ class SubTaskInterface: select(SubTask).where(SubTask.taskId == task_id) ).all() - if not subtasks: - return - - statuses = [st.status.lower() for st in subtasks] + statuses = [s.status.lower() for s in subtasks] now = datetime.now(timezone.utc) - # Priority: - # failed > in progress > complete if "failed" in statuses: task.status = "failed" task.jobCompleted = now @@ -105,16 +173,61 @@ class SubTaskInterface: task.status = "complete" task.jobCompleted = now - else: + elif "in progress" in statuses: task.status = "in progress" if task.jobStarted is None: task.jobStarted = now - task.jobCompleted = None # still running + + else: + # All waiting + task.status = "waiting" + task.jobStarted = None + task.jobCompleted = None task.updatedAt = now session.add(task) session.commit() + def finalize_subtask( + self, + subtask_id: UUID, + status: str, + outputs: Optional[Dict[str, Any]], + cloud_logs_url: Optional[str] + ): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + subtask = session.get(SubTask, subtask_id) + if not subtask: + raise ValueError(f"SubTask {subtask_id} not found") + + normalized = status.lower() + if normalized not in ("complete", "failed"): + raise ValueError("Status must be 'complete' or 'failed'") + + # Set outputs + if outputs is not None: + subtask.outputs = json.dumps(outputs) + + # Set logs + if cloud_logs_url is not None: + subtask.cloudLogsURL = cloud_logs_url + + # Status + timestamps + subtask.status = normalized + subtask.jobCompleted = now + subtask.updatedAt = now + + session.add(subtask) + session.commit() + + # Update parent task (complete/failed) + self._update_task_progress(session, subtask.taskId) + + session.refresh(subtask) + return subtask + # ============================================================ # Task Interface @@ -133,23 +246,24 @@ class TasksInterface: ): now = datetime.now(timezone.utc) - # Step 1: Create the task with get_db_session() as session: task = Task( taskSource=task_source, service=service, - jobStarted=now, + status="waiting", + jobStarted=None, + jobCompleted=None, ) session.add(task) session.commit() session.refresh(task) - # Step 2: Create first subtask using SubTaskInterface + # Create first subtask in waiting state subtask_interface = SubTaskInterface() subtask = subtask_interface.create_subtask( task_id=task.id, - inputs=inputs + inputs=inputs, ) return task.id, subtask.id @@ -176,5 +290,4 @@ class TasksInterface: session.add(task) session.commit() session.refresh(task) - return task diff --git a/backend/app/tasks/router.py b/backend/app/tasks/router.py index d324f9ba..2a45a303 100644 --- a/backend/app/tasks/router.py +++ b/backend/app/tasks/router.py @@ -1,5 +1,6 @@ from fastapi import APIRouter, Depends, HTTPException from uuid import UUID +import json # ← REQUIRED for json.loads from backend.app.dependencies import validate_token from backend.app.tasks.schema import ( @@ -7,9 +8,12 @@ from backend.app.tasks.schema import ( UpdateTaskStatusRequest, CreateSubTaskRequest, UpdateSubTaskStatusRequest, + FinalizeSubTaskRequest, ) +# Correct location of interfaces from backend.app.db.functions.tasks.Tasks import TasksInterface, SubTaskInterface + from backend.app.db.connection import get_db_session from backend.app.db.models.tasks import Task, SubTask from sqlmodel import select @@ -22,6 +26,9 @@ router = APIRouter( ) +# ============================================================ +# Create Task +# ============================================================ @router.post("/", summary="Create a new task and its first subtask") async def create_task(req: CreateTaskRequest): tasks = TasksInterface() @@ -33,6 +40,9 @@ async def create_task(req: CreateTaskRequest): return {"task_id": task_id, "subtask_id": subtask_id} +# ============================================================ +# Get Task + Subtasks +# ============================================================ @router.get("/{task_id}", summary="Get a task and its subtasks") async def get_task(task_id: UUID): with get_db_session() as session: @@ -44,12 +54,13 @@ async def get_task(task_id: UUID): select(SubTask).where(SubTask.taskId == task_id) ).all() - # Deserialize JSON inputs back to dict formatted = [] for st in subtasks: formatted.append({ **st.dict(), - "inputs": json.loads(st.inputs) if st.inputs else None + "inputs": json.loads(st.inputs) if st.inputs else None, + "outputs": json.loads(st.outputs) if st.outputs else None, + "cloud_logs_url": st.cloudLogsURL, }) return { @@ -58,6 +69,9 @@ async def get_task(task_id: UUID): } +# ============================================================ +# Update Task Status +# ============================================================ @router.put("/{task_id}/status", summary="Update a task's status") async def update_task_status(task_id: UUID, req: UpdateTaskStatusRequest): tasks = TasksInterface() @@ -67,17 +81,24 @@ async def update_task_status(task_id: UUID, req: UpdateTaskStatusRequest): except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) + +# ============================================================ +# Create Additional Subtask +# ============================================================ @router.post("/{task_id}/subtasks", summary="Create a new subtask under a task") async def create_subtask(task_id: UUID, req: CreateSubTaskRequest): subtasks = SubTaskInterface() try: st = subtasks.create_subtask(task_id, req.inputs) - return {"subtask_id": st.id, "task_id": task_id} + return {"subtask_id": st.id, "task_id": task_id, "status": st.status} except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) -@router.put("/subtasks/{subtask_id}/status", summary="Update a subtask's status") +# ============================================================ +# Update Subtask Status +# ============================================================ +@router.put("/subtask/{subtask_id}/status", summary="Update a subtask's status") async def update_subtask_status(subtask_id: UUID, req: UpdateSubTaskStatusRequest): subtasks = SubTaskInterface() try: @@ -85,3 +106,28 @@ async def update_subtask_status(subtask_id: UUID, req: UpdateSubTaskStatusReques return {"subtask_id": st.id, "status": st.status} except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) + + +# === +# Sub task is complete +@router.post("/subtask/{subtask_id}/finalize", summary="Finalize a subtask with status, outputs, logs") +async def finalize_subtask(subtask_id: UUID, req: FinalizeSubTaskRequest): + subtasks = SubTaskInterface() + + try: + st = subtasks.finalize_subtask( + subtask_id=subtask_id, + status=req.status, + outputs=req.outputs, + cloud_logs_url=req.cloud_logs_url + ) + + return { + "subtask_id": st.id, + "status": st.status, + "outputs": req.outputs, + "cloud_logs_url": req.cloud_logs_url, + } + + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) diff --git a/backend/app/tasks/schema.py b/backend/app/tasks/schema.py index 66be61e7..b1a923b3 100644 --- a/backend/app/tasks/schema.py +++ b/backend/app/tasks/schema.py @@ -19,3 +19,8 @@ class CreateSubTaskRequest(BaseModel): class UpdateSubTaskStatusRequest(BaseModel): status: str + +class FinalizeSubTaskRequest(BaseModel): + status: str # "complete" or "failed" + outputs: Optional[Dict[str, Any]] = None + cloud_logs_url: Optional[str] = None \ No newline at end of file From d5b7fb21b3e405e20d05acf498d399f602a12bff Mon Sep 17 00:00:00 2001 From: Jun-te Kim Date: Fri, 14 Nov 2025 15:36:58 +0000 Subject: [PATCH 11/11] add this to dev so i can test --- backend/app/tasks/router.py | 56 +++++++++++++++++++++++++++++++++++++ backend/app/tasks/schema.py | 7 ++++- 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/backend/app/tasks/router.py b/backend/app/tasks/router.py index 2a45a303..90b62dd1 100644 --- a/backend/app/tasks/router.py +++ b/backend/app/tasks/router.py @@ -9,6 +9,7 @@ from backend.app.tasks.schema import ( CreateSubTaskRequest, UpdateSubTaskStatusRequest, FinalizeSubTaskRequest, + TaskSqsTriggerRequest ) # Correct location of interfaces @@ -131,3 +132,58 @@ async def finalize_subtask(subtask_id: UUID, req: FinalizeSubTaskRequest): except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) + + +# for testing: + +import boto3 +import json +from backend.app.tasks.schema import TaskSqsTriggerRequest +from backend.app.db.functions.tasks.Tasks import TasksInterface, SubTaskInterface +from backend.app.config import get_settings + +sqs = boto3.client("sqs") + +@router.post("/trigger", summary="Create task + subtask and publish to SQS", status_code=202) +async def trigger_task(req: TaskSqsTriggerRequest): + """ + Creates a Task + SubTask, then pushes the SubTask into SQS so a Lambda can process it. + If inputs are empty, automatically replaced with {}. + """ + + settings = get_settings() + + tasks = TasksInterface() + + # ---- Normalize empty inputs ---- + inputs = req.inputs or {} # ensures {} even if null + + # ---- 1. Create Task + SubTask ---- + task_id, subtask_id = tasks.create_task( + task_source=req.task_source, + service=req.service, + inputs=inputs, + ) + + # ---- 2. Prepare SQS payload ---- + sqs_payload = { + "subtask_id": str(subtask_id), + "params": inputs, + } + + try: + response = sqs.send_message( + QueueUrl=f"https://sqs.{settings.AWS_REGION}.amazonaws.com/" + f"{settings.AWS_ACCOUNT_ID}/lambda-example-queue", + MessageBody=json.dumps(sqs_payload) + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"SQS error: {e}") + + return { + "message": "Task triggered", + "task_id": task_id, + "subtask_id": subtask_id, + "sqs_message_id": response.get("MessageId"), + "inputs_sent": inputs, + } \ No newline at end of file diff --git a/backend/app/tasks/schema.py b/backend/app/tasks/schema.py index b1a923b3..a5b4424b 100644 --- a/backend/app/tasks/schema.py +++ b/backend/app/tasks/schema.py @@ -23,4 +23,9 @@ class UpdateSubTaskStatusRequest(BaseModel): class FinalizeSubTaskRequest(BaseModel): status: str # "complete" or "failed" outputs: Optional[Dict[str, Any]] = None - cloud_logs_url: Optional[str] = None \ No newline at end of file + cloud_logs_url: Optional[str] = None + +class TaskSqsTriggerRequest(BaseModel): + task_source: str + service: Optional[str] = None + inputs: Dict[str, Any] # forwarded into SubTask.inputs + SQS message \ No newline at end of file