diff --git a/.devcontainer/asset_list/devcontainer.json b/.devcontainer/asset_list/devcontainer.json index 4834d559..945dcd88 100644 --- a/.devcontainer/asset_list/devcontainer.json +++ b/.devcontainer/asset_list/devcontainer.json @@ -22,7 +22,9 @@ "jgclark.vscode-todo-highlight", "corentinartaud.pdfpreview", "ms-python.vscode-python-envs", - "ms-python.black-formatter" + "ms-python.black-formatter", + "GrapeCity.gc-excelviewer", + "jakobhoeg.vscode-pokemon" ], "settings": { "files.defaultWorkspace": "/workspaces/model", diff --git a/.devcontainer/backend/Dockerfile b/.devcontainer/backend/Dockerfile index 4c5d16f5..99cd66d6 100644 --- a/.devcontainer/backend/Dockerfile +++ b/.devcontainer/backend/Dockerfile @@ -43,4 +43,17 @@ WORKDIR /workspaces/model # 6) Make Python find your package # Add project root to PYTHONPATH for all processes -ENV PYTHONPATH=/workspaces/model:${PYTHONPATH} \ No newline at end of file +ENV PYTHONPATH=/workspaces/model:${PYTHONPATH} + + +# Install terraform +RUN apt-get update && sudo apt-get install -y gnupg software-properties-common +RUN wget -O- https://apt.releases.hashicorp.com/gpg | \ +gpg --dearmor | \ +sudo tee /usr/share/keyrings/hashicorp-archive-keyring.gpg > /dev/null +RUN echo "deb [signed-by=/usr/share/keyrings/hashicorp-archive-keyring.gpg] \ +https://apt.releases.hashicorp.com $(lsb_release -cs) main" | \ +tee /etc/apt/sources.list.d/hashicorp.list +RUN apt update +RUN apt-get install terraform +RUN terraform -install-autocomplete \ No newline at end of file diff --git a/.devcontainer/backend/requirements.txt b/.devcontainer/backend/requirements.txt index 9562aa6a..9814c8d4 100644 --- a/.devcontainer/backend/requirements.txt +++ b/.devcontainer/backend/requirements.txt @@ -9,7 +9,7 @@ mangum==0.19.0 # AWS boto3==1.35.44 # Data -openpyxl==3.1.2 +openpyxl==3.1.5 # Basic pytz uvicorn[standard] diff --git a/.github/workflows/_build_image.yml b/.github/workflows/_build_image.yml index 641e31f9..3435c92d 100644 --- a/.github/workflows/_build_image.yml +++ b/.github/workflows/_build_image.yml @@ -38,6 +38,8 @@ on: required: false DEV_DB_NAME: required: false + EPC_AUTH_TOKEN: + required: false jobs: build: @@ -47,6 +49,7 @@ jobs: DEV_DB_HOST: ${{ secrets.DEV_DB_HOST }} DEV_DB_PORT: ${{ secrets.DEV_DB_PORT }} DEV_DB_NAME: ${{ secrets.DEV_DB_NAME }} + EPC_AUTH_TOKEN: ${{ secrets.EPC_AUTH_TOKEN }} outputs: image_digest: ${{ steps.digest.outputs.image_digest }} @@ -87,14 +90,17 @@ jobs: temp=$(eval echo "$line") BUILD_ARGS="$BUILD_ARGS --build-arg $temp" done <<< "${{ inputs.build_args }}" - - docker build \ + + docker buildx build \ + --no-cache \ + --platform linux/amd64 \ + --provenance=false \ + --sbom=false \ + --push \ -f ${{ inputs.dockerfile_path }} \ $BUILD_ARGS \ -t $IMAGE_URI \ ${{ inputs.build_context }} - - docker push $IMAGE_URI - name: Resolve image digest id: digest diff --git a/.github/workflows/_deploy_lambda.yml b/.github/workflows/_deploy_lambda.yml index 3612ab43..528300f8 100644 --- a/.github/workflows/_deploy_lambda.yml +++ b/.github/workflows/_deploy_lambda.yml @@ -106,4 +106,10 @@ jobs: - name: Terraform Destroy if: inputs.terraform_destroy == 'true' && inputs.terraform_apply != 'true' working-directory: ${{ inputs.lambda_path }} - run: terraform destroy -auto-approve \ No newline at end of file + run: | + terraform destroy -auto-approve \ + -var="stage=${{ inputs.stage }}" \ + -var="lambda_name=${{ inputs.lambda_name }}" \ + -var="ecr_repo_url=${{ steps.repo.outputs.ecr_repo_url }}" \ + -var="image_digest=${{ inputs.image_digest }}" + diff --git a/.github/workflows/deploy_fastapi_backend.yml b/.github/workflows/deploy_fastapi_backend.yml index e4037c19..7b00d3f2 100644 --- a/.github/workflows/deploy_fastapi_backend.yml +++ b/.github/workflows/deploy_fastapi_backend.yml @@ -141,3 +141,4 @@ jobs: # Deploy to AWS Lambda via Serverless sls deploy --stage ${{ github.ref_name }} --verbose + diff --git a/.github/workflows/deploy_terraform.yml b/.github/workflows/deploy_terraform.yml index 71e2ad9d..6280abcd 100644 --- a/.github/workflows/deploy_terraform.yml +++ b/.github/workflows/deploy_terraform.yml @@ -9,6 +9,7 @@ on: - '.github/workflows/deploy_terraform.yml' - '.github/workflows/_build_image.yml' - '.github/workflows/_deploy_lambda.yml' + workflow_dispatch: jobs: determine_stage: @@ -76,10 +77,10 @@ jobs: run: terraform plan -var-file=${STAGE}.tfvars -out=tfplan - name: Terraform Apply - if: env.STAGE == 'prod' + if: env.TERRAFORM_APPLY == 'true' working-directory: infrastructure/terraform/shared run: terraform apply -auto-approve tfplan - + # ============================================================ # 2️⃣ Build Address 2 UPRN image and Push # ============================================================ @@ -90,10 +91,19 @@ jobs: ecr_repo: address2uprn-${{ needs.determine_stage.outputs.stage }} dockerfile_path: backend/address2UPRN/handler/Dockerfile build_context: . + build_args: | + DEV_DB_HOST=$DEV_DB_HOST + DEV_DB_PORT=$DEV_DB_PORT + DEV_DB_NAME=$DEV_DB_NAME + EPC_AUTH_TOKEN=$EPC_AUTH_TOKEN secrets: AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }} AWS_REGION: ${{ secrets.DEV_AWS_REGION }} + DEV_DB_HOST: ${{ secrets.DEV_DB_HOST }} + DEV_DB_PORT: ${{ secrets.DEV_DB_PORT }} + DEV_DB_NAME: ${{ secrets.DEV_DB_NAME }} + EPC_AUTH_TOKEN: ${{ secrets.DEV_EPC_AUTH_TOKEN }} # ============================================================ # 3️⃣ Deploy Address 2 UPRN Lambda @@ -140,7 +150,7 @@ jobs: # 3️⃣ Deploy Postcode Splitter Lambda # ============================================================ postcodeSplitter_lambda: - needs: [postcodeSplitter_image, determine_stage] + needs: [postcodeSplitter_image, determine_stage, address2uprn_lambda] uses: ./.github/workflows/_deploy_lambda.yml with: lambda_name: postcodeSplitter @@ -192,4 +202,5 @@ jobs: secrets: AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }} - AWS_REGION: ${{ secrets.DEV_AWS_REGION }} \ No newline at end of file + AWS_REGION: ${{ secrets.DEV_AWS_REGION }} + diff --git a/asset_list/AssetList.py b/asset_list/AssetList.py index ea4d8b34..36b3d58e 100644 --- a/asset_list/AssetList.py +++ b/asset_list/AssetList.py @@ -34,7 +34,7 @@ from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes logger = setup_logger() # OpenAI API Key (set this in your environment variables for security) -OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "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") +OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") diff --git a/asset_list/app.py b/asset_list/app.py index 13a6a025..b9c6bcf0 100644 --- a/asset_list/app.py +++ b/asset_list/app.py @@ -13,11 +13,15 @@ from asset_list.utils import get_data from dotenv import load_dotenv from backend.SearchEpc import SearchEpc -load_dotenv(dotenv_path="backend/.env") +load_dotenv(dotenv_path="../backend/.env") EPC_AUTH_TOKEN = os.getenv( "EPC_AUTH_TOKEN", ) +OPENAI_API_KEY = os.getenv( + "OPENAI_API_KEY", +) + def extract_address1( asset_list, full_address_col, postcode_col, method="first_two_words" @@ -109,21 +113,21 @@ def app(): ) data_filename = "to_standardise_uprns.xlsx" sheet_name = "Sheet1" - postcode_column = "Postcode" + postcode_column = "POSTCODE" address1_column = None address1_method = "house_number_extraction" - fulladdress_column = "Address" - address_cols_to_concat = None + fulladdress_column = "ADDRESS" + address_cols_to_concat = [] missing_postcodes_method = None landlord_year_built = None landlord_os_uprn = None - landlord_property_type = None - landlord_built_form = None - landlord_wall_construction = None - landlord_roof_construction = None - landlord_heating_system = None + landlord_property_type = "PROPERTY TYPE" + landlord_built_form = None # Skipped as empty + landlord_wall_construction = "wall combined" # combin F + G + landlord_roof_construction = "HEATING SYSTEM" # Combine I + J + landlord_heating_system = None # Check with Khalim landlord_existing_pv = None - landlord_property_id = "LLUPRN" + landlord_property_id = "UPRN" landlord_sap = None outcomes_filename = None outcomes_sheetname = None @@ -275,7 +279,7 @@ def app(): if skip is not None and not force_retrieve_data: if i <= skip: continue - chunk = asset_list.standardised_asset_list[i: i + chunk_size] + chunk = asset_list.standardised_asset_list[i : i + chunk_size] epc_data_chunk, errors_chunk, no_epc_chunk = get_data( df=chunk, row_id_name=asset_list.DOMNA_PROPERTY_ID, @@ -418,7 +422,7 @@ def app(): # Retrieve just the data we need epc_df = epc_df[ [asset_list.DOMNA_PROPERTY_ID] + list(asset_list.EPC_API_DATA_NAMES.keys()) - ].rename(columns=asset_list.EPC_API_DATA_NAMES) + ].rename(columns=asset_list.EPC_API_DATA_NAMES) # Look for columns not in the find my EPC data, which will have happened if we didn't # retrieve it in the first place @@ -435,7 +439,7 @@ def app(): find_my_epc_data[ [asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"] + list(asset_list.FIND_EPC_DATA_NAMES.keys()) - ].rename(columns=asset_list.FIND_EPC_DATA_NAMES), + ].rename(columns=asset_list.FIND_EPC_DATA_NAMES), how="left", on=asset_list.DOMNA_PROPERTY_ID, ) diff --git a/backend/.env.test b/backend/.env.test index 5b77f243..1679f10f 100644 --- a/backend/.env.test +++ b/backend/.env.test @@ -19,4 +19,4 @@ PLAN_TRIGGER_BUCKET=test DATA_BUCKET=test EPC_AUTH_TOKEN=test ENGINE_SQS_URL=test -ENERGY_ASSESSMENTS_BUCKET=test \ No newline at end of file +ENERGY_ASSESSMENTS_BUCKET=test diff --git a/backend/address2UPRN/handler/Dockerfile b/backend/address2UPRN/handler/Dockerfile index 5a09bd44..07159357 100644 --- a/backend/address2UPRN/handler/Dockerfile +++ b/backend/address2UPRN/handler/Dockerfile @@ -1,4 +1,17 @@ FROM public.ecr.aws/lambda/python:3.10 +# FROM python:3.11.10-bullseye + + +ARG DEV_DB_HOST +ARG DEV_DB_PORT +ARG DEV_DB_NAME +ARG EPC_AUTH_TOKEN + +ENV DB_HOST=${DEV_DB_HOST} +ENV DB_PORT=${DEV_DB_PORT} +ENV DB_NAME=${DEV_DB_NAME} +ENV EPC_AUTH_TOKEN=${EPC_AUTH_TOKEN} + # Set working directory (Lambda task root) WORKDIR /var/task @@ -8,13 +21,17 @@ WORKDIR /var/task # ----------------------------- COPY backend/address2UPRN/handler/requirements.txt . + # Install dependencies into Lambda runtime RUN pip install --no-cache-dir -r requirements.txt -# ----------------------------- -# Copy application code -# ----------------------------- + +# Copy necessary files for database and utility imports COPY utils/ utils/ +COPY backend/ backend/ +COPY datatypes/ datatypes/ + +# Copy the handler COPY backend/address2UPRN/main.py . # ----------------------------- diff --git a/backend/address2UPRN/handler/requirements.txt b/backend/address2UPRN/handler/requirements.txt index bc753841..6ef41b2d 100644 --- a/backend/address2UPRN/handler/requirements.txt +++ b/backend/address2UPRN/handler/requirements.txt @@ -1,3 +1,11 @@ -epc-api-python==1.0.2 +pandas==2.2.2 +numpy<2.0 +requests tqdm -pandas \ No newline at end of file +openpyxl +epc-api-python==1.0.2 +boto3==1.35.44 +sqlmodel +sqlalchemy==2.0.36 +psycopg2-binary==2.9.10 +pydantic-settings==2.6.0 \ No newline at end of file diff --git a/backend/address2UPRN/main.py b/backend/address2UPRN/main.py index ba386e0a..af29a095 100644 --- a/backend/address2UPRN/main.py +++ b/backend/address2UPRN/main.py @@ -3,12 +3,23 @@ import os from urllib.parse import urlencode import pandas as pd from difflib import SequenceMatcher -from tqdm import tqdm from utils.logger import setup_logger +import re +from typing import Set +import json +import requests +from uuid import UUID +import uuid +from backend.app.db.functions.tasks.Tasks import SubTaskInterface +from utils.s3 import ( + save_csv_to_s3, + read_csv_from_s3 as read_csv_from_s3_dict, + parse_s3_uri, +) +from datetime import datetime logger = setup_logger() -import re EPC_AUTH_TOKEN = os.getenv( "EPC_AUTH_TOKEN", @@ -17,9 +28,28 @@ EPC_AUTH_TOKEN = os.getenv( if EPC_AUTH_TOKEN is None: raise RuntimeError("EPC_AUTH_TOKEN not defined in env") -import re -from difflib import SequenceMatcher -from typing import Set + +def is_valid_postcode(postcode_clean: str) -> bool: + """ + Validate postcode using postcodes.io. + + Expects a sanitised postcode (e.g. E84SQ). + Returns True if valid, False otherwise. + """ + POSTCODES_IO_VALIDATE_URL = "https://api.postcodes.io/postcodes/{postcode}/validate" + if not postcode_clean: + return False + + try: + resp = requests.get( + POSTCODES_IO_VALIDATE_URL.format(postcode=postcode_clean), + timeout=5, + ) + resp.raise_for_status() + return resp.json().get("result", False) + except requests.RequestException: + # Network issues, rate limits, etc. + return False def levenshtein(a: str, b: str) -> float: @@ -300,27 +330,29 @@ def get_uprn_candidates( ) -def get_uprn(user_inputed_address: str, postcode: str, return_address=False): +def get_uprn_with_epc_df( + user_inputed_address: str, + epc_df: pd.DataFrame, + verbose: bool = False, +): """ - Return uprn (str) - Return False if failed to find a sensible matching epc - Return Nons when epc found but no UPRN + Return uprn (str) using a pre-fetched EPC dataframe. + This avoids calling the API multiple times for the same postcode. """ - df = get_epc_data_with_postcode(postcode=postcode) - - if df.empty: + if epc_df.empty: return None scored_df = get_uprn_candidates( - df, + epc_df, user_address=user_inputed_address, ) # Best score best_score = scored_df.iloc[0]["lexiscore"] - if best_score <= 0: - return None + # # Return None if score is below threshold + # if best_score < 0.7: + # return None # All rank-1 rows (possible draw) top_rank_df = scored_df[scored_df["lexirank"] == 1] @@ -330,18 +362,41 @@ def get_uprn(user_inputed_address: str, postcode: str, return_address=False): return None address = top_rank_df["address"].values[0] - lexiscore = float(top_rank_df["lexiscore"].values[0]) + score = float(top_rank_df["lexiscore"].values[0]) - logger.info(f"Address found to be: {address}, with lexiscore {lexiscore}") + logger.info(f"Address found to be: {address}, with lexiscore {score}") # Safe to return the agreed UPRN found_uprn = top_rank_df.iloc[0]["uprn"] if found_uprn == "": return None - if return_address: - return found_uprn, address - return found_uprn + if verbose: + return (found_uprn, address, score) + else: + return found_uprn + + +def get_uprn( + user_inputed_address: str, + postcode: str, + verbose: bool = False, +): + """ + Return uprn (str) + Return False if failed to find a sensible matching epc + Return None when epc found but no UPRN + + This function fetches EPC data via API for a single postcode. + For processing multiple addresses in the same postcode, use get_uprn_with_epc_df instead. + """ + df = get_epc_data_with_postcode(postcode=postcode) + + return get_uprn_with_epc_df( + user_inputed_address=user_inputed_address, + epc_df=df, + verbose=verbose, + ) def resolve_uprns_for_postcode_group( @@ -424,148 +479,302 @@ def resolve_uprns_for_postcode_group( ) -def test(a, b): - assert a == b, f"erorr: {a}{type(a)} != {b}: {type(b)}" +def save_results_to_s3( + results_df: pd.DataFrame, task_id: str, sub_task_id: str, bucket_name: str = None +) -> bool: + """ + Save results DataFrame to S3 as CSV. + + :param results_df: The DataFrame containing results + :param task_id: The task ID (used for file naming) + :param bucket_name: The S3 bucket name (defaults to env variable) + :return: True if successful, False otherwise + """ + if bucket_name is None: + bucket_name = os.getenv("S3_BUCKET_NAME") + + if not bucket_name: + logger.error( + "S3 bucket name not provided and S3_BUCKET_NAME environment variable not set" + ) + return False + + try: + # Create a filename with the task ID + file_name = f"{datetime.now().isoformat()}_{str(uuid.uuid4())[:8]}" + file_key = f"ara_raw_outputs/{task_id}/{sub_task_id}/{file_name}.csv" + + # Save to S3 + success = save_csv_to_s3(results_df, bucket_name, file_key) + + if success: + logger.info(f"Successfully saved results to s3://{bucket_name}/{file_key}") + return True + else: + logger.error(f"Failed to save results to S3") + return False + + except Exception as e: + logger.error(f"Error saving results to S3: {str(e)}") + return False -def run_all_test(): - # Basic usage with different post codes styles - test(get_epc_data_with_postcode("b93 8sy").shape[0], 63) - test(get_epc_data_with_postcode("B938sy").shape[0], 63) - test(get_epc_data_with_postcode("b93 8Sy").shape[0], 63) - test(get_epc_data_with_postcode("b93 8Sy").shape[0], 63) +def handler(event, context, local=False): + print("=== Address2UPRN Lambda Handler ===") + print(f"Function: {context.function_name}") + print(f"Request ID: {context.aws_request_id}") - test(get_uprn("68", "b93 8sy"), "100070989938") - test(get_uprn("68 Glendon Way", "b93 8sy"), "100070989938") - test(get_uprn("Flat A, 28, Nelgarde Road", "se6 4tf"), "100023278633") - test(get_uprn("28 A", "se6 4tf"), "100023278633") - test(get_uprn("28A", "se6 4tf"), "100023278633") - test(get_uprn("6 Aitken Close", "E8 4SQ"), False) + # Handle local testing + if local is True: + event = { + "Records": [ + { + "body": json.dumps( + { + "task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917", + "sub_task_id": "6a427b6e-1ece-4983-b1e5-9bffccc53d1d", + "s3_uri": "s3://retrofit-data-dev/ara_postcode_splitter_batches/e31f2f21-175b-4a91-a3ec-a6baa325e917/8673913b-1a88-42d7-8578-0449123d94b0/2026-02-16T12:00:20.257856_7b520c0e.csv", + } + ) + } + ] + } - # unique case - test(get_uprn("Flat 5, 1, Semley Gate", "e9 5nh"), "10008238198") - test(get_uprn("5 , 1 Semley Gate", "e9 5nh"), "10008238198") - test(get_uprn("5 Semley Gate", "e9 5nh"), "10008238198") - test(get_uprn("1, 5 Semley Gate", "e9 5nh"), False) - test( - get_uprn("1 Semley Gate", "e9 5nh"), "10008238188" - ) # this one return "flat 1, in 1 semley gate" - test( - get_uprn("48 Oswald Street", "E5 0BT"), False - ) # this one return "flat 1, in 1 semley gate" - test( - get_uprn("42 Oswald Street", "E5 0BT"), False - ) # this one return "flat 1, in 1 semley gate" - test( - get_uprn("46 Oswald Street", "E5 0BT"), False - ) # this one return "flat 1, in 1 semley gate" - get_uprn_candidates(get_epc_data_with_postcode("e5 0bt"), "48 Oswald Street") - get_uprn_candidates( - get_epc_data_with_postcode("Cr2 7dl"), - "FLAT 3; 42 MORETON ROAD, SOUTH CROYDON, SURREY", - ) + print(f"Event: {json.dumps(event, indent=2, default=str)}") + print("===================================") + # Handle both single event and batch events (SQS, etc.) + records = event.get("Records", [event]) + results = [] + errors = [] + subtask_interface = SubTaskInterface() -if __name__ == "__main__": - INPUT_FILE = "hackney.xlsx" - - ADDRESS_COL = "Address 1" - POSTCODE_COL = "Postcode" - UPRN_COL = "UPRN" - - df = pd.read_excel(INPUT_FILE) - - failures = [] - - for _, row in tqdm( - df.iterrows(), - total=len(df), - desc="Auditing UPRNs", - ): - input_address = str(row[ADDRESS_COL]).strip() - postcode = str(row[POSTCODE_COL]).strip() - - expected_uprn = None if pd.isna(row[UPRN_COL]) else str(int(row[UPRN_COL])) - + for record in records: + task_id = None + subtask_id = None try: - epc_df = get_epc_data_with_postcode(postcode) + # Parse body (inputs) + if isinstance(record.get("body"), str): + body = json.loads(record["body"]) + else: + body = record.get("body", {}) - if epc_df.empty: - failures.append( - { - **row.to_dict(), - "found_uprn": None, - "best_match_uprn": None, - "best_match_address": None, - "best_match_lexiscore": None, - "status": "no_epc_results", - } + # Validate required fields + task_id = body.get("task_id") + subtask_id = body.get("sub_task_id") + s3_uri = body.get("s3_uri") + + if not task_id: + errors.append({"error": "Missing required field: task_id"}) + continue + + if not subtask_id: + errors.append({"error": "Missing required field: sub_task_id"}) + continue + + if not s3_uri: + errors.append({"error": "Missing required field: s3_uri"}) + continue + + # Convert task_id to UUID + try: + task_id = UUID(task_id) if isinstance(task_id, str) else task_id + except ValueError as e: + errors.append({"error": f"Invalid UUID format for task_id: {str(e)}"}) + continue + + # Convert sub_task_id to UUID + try: + subtask_id = ( + UUID(subtask_id) if isinstance(subtask_id, str) else subtask_id + ) + except ValueError as e: + errors.append( + {"error": f"Invalid UUID format for sub_task_id: {str(e)}"} ) continue - scored_df = get_uprn_candidates( - epc_df, - user_address=input_address, - ) + # Update existing subtask to 'in progress' + subtask_interface.update_subtask_status(subtask_id, "in progress") + logger.info(f"Processing subtask {subtask_id} for task {task_id}") - best_row = scored_df.iloc[0] + # Parse S3 URI and read CSV from S3 + logger.info(f"Reading data from S3: {s3_uri}") + try: + bucket, key = parse_s3_uri(s3_uri) + csv_data = read_csv_from_s3_dict(bucket, key) + df = pd.DataFrame(csv_data) + logger.info(f"Loaded {len(df)} rows from S3") + except Exception as s3_error: + logger.error(f"Failed to read data from S3: {s3_error}") + errors.append( + {"error": "Failed to read data from S3", "details": str(s3_error)} + ) + try: + subtask_interface.update_subtask_status( + subtask_id, "failed", outputs={"error": str(s3_error)} + ) + except Exception as db_error: + logger.error(f"Failed to update subtask status: {db_error}") + continue - best_match_uprn = str(best_row["uprn"]) - best_match_address = best_row["address"] - best_match_lexiscore = round(float(best_row["lexiscore"]), 4) + # Process the rows + logger.info(f"Processing {len(df)} rows for task {task_id}") - found_uprn = get_uprn(input_address, postcode) + # Create user_input column by concatenating Address columns if not already present + if "user_input" not in df.columns: + df["user_input"] = ( + df["Address 1"].fillna("") + + " " + + df["Address 2"].fillna("") + + " " + + df["Address 3"].fillna("") + ).str.strip() + logger.info(f"Created user_input column from Address 1 and Address 2") + else: + logger.info(f"user_input column already present in data") + + clean_df = df.dropna(subset=["postcode_clean"]) + + postcode_to_addresses = { + postcode: group.to_dict(orient="records") + for postcode, group in clean_df.groupby("postcode_clean", sort=False) + } + + logger.info(f"Total postcodes: {len(postcode_to_addresses)}") + + # Process each postcode group + + results_data = [] + + for postcode, postcode_rows in postcode_to_addresses.items(): + logger.info( + f"Processing postcode: {postcode} with {len(postcode_rows)} rows" + ) + + # Validate postcode before processing + if not is_valid_postcode(postcode): + logger.warning(f"Postcode {postcode} is invalid, skipping") + continue + + # Fetch EPC data once per postcode + try: + epc_df = get_epc_data_with_postcode(postcode=postcode) + logger.info( + f"Fetched {len(epc_df)} EPC records for postcode {postcode}" + ) + except Exception as e: + logger.error( + f"Failed to fetch EPC data for postcode {postcode}: {e}" + ) + continue + + # Process each address in this postcode with the same EPC data + for row in postcode_rows: + try: + user_input = row.get("user_input", "") + if not user_input: + logger.warning( + f"Skipping row with missing user_input for postcode {postcode}" + ) + continue + + # Get UPRN using the pre-fetched EPC data with all return options + result = get_uprn_with_epc_df( + user_inputed_address=user_input, epc_df=epc_df, verbose=True + ) + + # Parse result tuple if successful + if result: + uprn, found_address, score = result + logger.info( + f"Found UPRN for {user_input} in {postcode}: {uprn} (score: {score})" + ) + + results_data.append( + { + **row, # Include all original data + "uprn": uprn, + "domna_found_address": found_address, + "domna_lexiscore": score, + } + ) + else: + logger.warning( + f"No UPRN found for {user_input} in {postcode}" + ) + results_data.append( + { + **row, # Include all original data + "uprn": None, + "domna_found_address": None, + "domna_lexiscore": None, + } + ) + + except Exception as e: + logger.error( + f"Error processing address {row.get('user_input', 'unknown')}: {e}" + ) + # Still add the row with error markers + results_data.append( + { + **row, + "uprn": None, + "domna_found_address": None, + "domna_lexiscore": None, + "error": str(e), + } + ) + continue + + # Create results DataFrame + result_df = pd.DataFrame(results_data) + + # Save results to S3 + try: + save_results_to_s3(result_df, str(task_id), str(subtask_id)) + except Exception as s3_error: + logger.error(f"Failed to save results to S3: {s3_error}") + + # Mark subtask as completed + try: + subtask_interface.update_subtask_status( + subtask_id, + "completed", + outputs={"rows_processed": "todo -> show sensible output"}, + ) + logger.info(f"Marked subtask {subtask_id} as completed") + except Exception as db_error: + logger.error(f"Failed to mark subtask as completed: {db_error}") except Exception as e: - failures.append( - { - **row.to_dict(), - "found_uprn": None, - "best_match_uprn": None, - "best_match_address": None, - "best_match_lexiscore": None, - "status": "exception", - "error": str(e), - } - ) - continue + logger.error(f"Unexpected error processing record: {e}", exc_info=True) + errors.append({"error": "Unexpected error", "details": str(e)}) + # Mark subtask as failed if we have one + if subtask_id: + try: + subtask_interface.update_subtask_status( + subtask_id, "failed", outputs={"error": str(e)} + ) + except Exception as db_error: + logger.error(f"Failed to update subtask status: {db_error}") - found_uprn_norm = None if not found_uprn else str(found_uprn) + # Return error if all records failed + logger.info(results_data) + logger.info(results) + if errors and not results: + return {"statusCode": 500, "body": json.dumps({"errors": errors})} - if found_uprn_norm != expected_uprn: - failures.append( - { - **row.to_dict(), - "found_uprn": found_uprn_norm, - "best_match_uprn": best_match_uprn, - "best_match_address": best_match_address, - "best_match_lexiscore": best_match_lexiscore, - "status": ("no_match" if found_uprn_norm is None else "mismatch"), - } - ) - - failures_df = pd.DataFrame(failures) - - print("===================================") - print(f"Total rows : {len(df)}") - print(f"Failures : {len(failures_df)}") - print("===================================") - - failures_df.to_excel( - "hackney_uprn_failures.xlsx", - index=False, - ) + return { + "statusCode": 200, + "body": json.dumps( + {"processed": results, "errors": errors if errors else None} + ), + } -def handler(event, context): - print("hello world") - return {"statusCode": 200, "body": "hello world"} - - -# TO do function dispatcher, - -# get_uprn_candidates(get_epc_data_with_postcode("E9 5NH"),"Flat 1, 5 Semley Gate" and Flat 5, 1 Semley Gate) -# fix that -# Look again at flat 1 -# pandas reader the seperate postcode_splitter -# dump into s3 +# TODO: +# Don't add results to return messages as its too verbose +# capture the exepection as e, into s3, to find the logs go to s3 +# Upload results to s3 as well as csv diff --git a/backend/address2UPRN/script.py b/backend/address2UPRN/script.py index a71b5827..090ac5ae 100644 --- a/backend/address2UPRN/script.py +++ b/backend/address2UPRN/script.py @@ -1,3 +1,5 @@ +# one time script for a customer forhousing + import pandas as pd from tqdm import tqdm from backend.address2UPRN.main import get_uprn @@ -5,20 +7,35 @@ from backend.address2UPRN.main import get_uprn # Enable tqdm for pandas tqdm.pandas() -df = pd.read_excel("address2.xlsx") +file_name = "forhousing.xlsx" + +df = pd.read_excel(file_name) def extract_uprn(row): - print(row["User Input"], row["Postcode"]) - result = get_uprn(row["User Input"], row["Postcode"], return_address=True) + user_input = "Address" + postcode = "Postcode" + result = get_uprn( + row[user_input], + row[postcode], + return_address=True, + return_EPC=True, + return_score=True, + ) if result is None: - return pd.Series([None, None]) + return pd.Series([None, None, None, None]) - uprn, found_address = result - return pd.Series([uprn, found_address]) + uprn, found_address, epc, score = result + return pd.Series([uprn, found_address, epc, score]) -df[["juntes uprn", "junte found address"]] = df.progress_apply(extract_uprn, axis=1) +df[["juntes uprn", "junte found address", "junte found epc", "junte score"]] = ( + df.progress_apply(extract_uprn, axis=1) +) -df.to_excel("outputs2.xlsx", index=False) +df.to_excel(f"{file_name}_outputs.xlsx", index=False) + +# TODO: add lexiscore +# TODO: run it +# TODO: give it to danny diff --git a/backend/app/config.py b/backend/app/config.py index 41552ae5..feb312b4 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -18,37 +18,37 @@ def resolve_env_file() -> Optional[str]: class Settings(BaseSettings): - API_KEY: str + API_KEY: str = "changeme" API_KEY_NAME: str = "X-API-KEY" - SECRET_KEY: str - ENVIRONMENT: str - DATA_BUCKET: str + SECRET_KEY: str = "changeme" + ENVIRONMENT: str = "changeme" + DATA_BUCKET: str = "changeme" PLAN_TRIGGER_BUCKET: str - ENGINE_SQS_URL: str + ENGINE_SQS_URL: str = "changeme" # Third parties - EPC_AUTH_TOKEN: str - GOOGLE_SOLAR_API_KEY: str + EPC_AUTH_TOKEN: str = "changeme" + GOOGLE_SOLAR_API_KEY: str = "changeme" # Database settings - DB_HOST: str - DB_PASSWORD: str - DB_USERNAME: str - DB_PORT: str - DB_NAME: str + DB_HOST: str = "changeme" + DB_PASSWORD: str = "changeme" + DB_USERNAME: str = "changeme" + DB_PORT: str = "changeme" + DB_NAME: str = "changeme" # Prediction buckets - SAP_PREDICTIONS_BUCKET: str - CARBON_PREDICTIONS_BUCKET: str - HEAT_PREDICTIONS_BUCKET: str + SAP_PREDICTIONS_BUCKET: str = "changeme" + CARBON_PREDICTIONS_BUCKET: str = "changeme" + HEAT_PREDICTIONS_BUCKET: str = "changeme" # LIGHTING_COST_PREDICTIONS_BUCKET: str # HEATING_COST_PREDICTIONS_BUCKET: str # HOT_WATER_COST_PREDICTIONS_BUCKET: str - HEATING_KWH_PREDICTIONS_BUCKET: str - HOTWATER_KWH_PREDICTIONS_BUCKET: str + HEATING_KWH_PREDICTIONS_BUCKET: str = "changeme" + HOTWATER_KWH_PREDICTIONS_BUCKET: str = "changeme" # Other S3 buckts - ENERGY_ASSESSMENTS_BUCKET: str + ENERGY_ASSESSMENTS_BUCKET: str = "changeme" # Optional AWS creds (only required in local) AWS_ACCESS_KEY_ID: Optional[str] = None diff --git a/backend/app/db/functions/tasks/__init__.py b/backend/app/db/functions/tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/app/requirements/requirements.txt b/backend/app/requirements/requirements.txt index 3124034e..9fdbfe4c 100644 --- a/backend/app/requirements/requirements.txt +++ b/backend/app/requirements/requirements.txt @@ -10,7 +10,7 @@ mangum==0.19.0 # AWS boto3==1.35.44 # Data -openpyxl==3.1.2 +openpyxl==3.1.5 # Basic pytz sqlmodel \ No newline at end of file diff --git a/backend/condition/condition_trigger_request.py b/backend/condition/condition_trigger_request.py index 03bd6ad1..daa82949 100644 --- a/backend/condition/condition_trigger_request.py +++ b/backend/condition/condition_trigger_request.py @@ -29,5 +29,5 @@ class ConditionTriggerRequest(BaseModel): # { # "file_type": "LBWF", # "trigger_file_bucket": "condition-data-dev", -# "trigger_file_key": "input/lbwf/LBWF - Example Asset Data September 2025.xlsx", +# "trigger_file_key": "input/lbwf/LBWF - Example Asset Data September 2025.xlsx" # } diff --git a/backend/postcode_splitter/handler/Dockerfile b/backend/postcode_splitter/handler/Dockerfile index 7c1a7989..74c00b9f 100644 --- a/backend/postcode_splitter/handler/Dockerfile +++ b/backend/postcode_splitter/handler/Dockerfile @@ -1,9 +1,28 @@ -FROM public.ecr.aws/lambda/python:3.10 +FROM public.ecr.aws/lambda/python:3.11 + +ARG DEV_DB_HOST +ARG DEV_DB_PORT +ARG DEV_DB_NAME + +ENV DB_HOST=${DEV_DB_HOST} +ENV DB_PORT=${DEV_DB_PORT} +ENV DB_NAME=${DEV_DB_NAME} # Set working directory (Lambda task root) WORKDIR /var/task -# ----------------------------- +COPY backend/postcode_splitter/handler/requirements.txt . + +RUN pip install --no-cache-dir -r requirements.txt + +# Copy necessary files for database and utility imports +COPY utils/ utils/ +COPY backend/ backend/ +COPY datatypes/ datatypes/ + +# Copy the handler +COPY backend/postcode_splitter/main.py . + # Lambda handler -# ----------------------------- CMD ["main.handler"] + diff --git a/backend/postcode_splitter/handler/requirements.txt b/backend/postcode_splitter/handler/requirements.txt index e69de29b..6ef41b2d 100644 --- a/backend/postcode_splitter/handler/requirements.txt +++ b/backend/postcode_splitter/handler/requirements.txt @@ -0,0 +1,11 @@ +pandas==2.2.2 +numpy<2.0 +requests +tqdm +openpyxl +epc-api-python==1.0.2 +boto3==1.35.44 +sqlmodel +sqlalchemy==2.0.36 +psycopg2-binary==2.9.10 +pydantic-settings==2.6.0 \ No newline at end of file diff --git a/backend/postcode_splitter/main.py b/backend/postcode_splitter/main.py index d55f618a..4f63ed4b 100644 --- a/backend/postcode_splitter/main.py +++ b/backend/postcode_splitter/main.py @@ -1,127 +1,278 @@ +import os +import sys +import json import pandas as pd import requests -from backend.address2UPRN.main import ( - resolve_uprns_for_postcode_group, - get_epc_data_with_postcode, +import boto3 +from uuid import UUID, uuid4 +from utils.s3 import ( + read_csv_from_s3 as read_csv_from_s3_dict, + save_csv_to_s3, + parse_s3_uri, ) +from utils.logger import setup_logger from tqdm import tqdm +from backend.app.db.functions.tasks.Tasks import SubTaskInterface +from datetime import datetime + +logger = setup_logger() -def sanitise_postcode(postcode: str) -> str | None: +def upload_batch_to_s3( + batch_df: pd.DataFrame, task_id: str, sub_task_id: str, bucket_name: str = None +) -> str: """ - Normalise postcode for grouping. - - - Uppercase - - Remove all whitespace + Upload batch DataFrame to S3 as CSV. """ - if pd.isna(postcode): - return None + if bucket_name is None: + bucket_name = os.getenv("S3_BUCKET_NAME") - return postcode.upper().replace(" ", "") - - -def is_valid_postcode(postcode_clean: str) -> bool: - """ - Validate postcode using postcodes.io. - - Expects a sanitised postcode (e.g. E84SQ). - Returns True if valid, False otherwise. - """ - POSTCODES_IO_VALIDATE_URL = "https://api.postcodes.io/postcodes/{postcode}/validate" - if not postcode_clean: - return False + if not bucket_name: + logger.error( + "S3 bucket name not provided and S3_BUCKET_NAME environment variable not set" + ) + raise ValueError("S3_BUCKET_NAME not configured") try: - resp = requests.get( - POSTCODES_IO_VALIDATE_URL.format(postcode=postcode_clean), - timeout=5, + file_name = f"{datetime.now().isoformat()}_{str(uuid4())[:8]}" + file_key = ( + f"ara_postcode_splitter_batches/{task_id}/{sub_task_id}/{file_name}.csv" ) - resp.raise_for_status() - return resp.json().get("result", False) - except requests.RequestException: - # Network issues, rate limits, etc. - return False + + success = save_csv_to_s3(batch_df, bucket_name, file_key) + + if success: + s3_uri = f"s3://{bucket_name}/{file_key}" + logger.info(f"Successfully uploaded batch to {s3_uri}") + return s3_uri + else: + logger.error(f"Failed to upload batch to S3") + raise ValueError("Failed to save CSV to S3") + + except Exception as e: + logger.error(f"Error uploading batch to S3: {str(e)}") + raise -def main(): - df = pd.read_excel("hackney.xlsx", sheet_name="Sustainability") - df = df.head(500) +def send_to_address2uprn_queue(task_id: str, sub_task_id: str, s3_uri: str) -> str: + """ + Send a batch to the address2UPRN SQS queue with S3 reference. - # Sanitise postcodes - df["postcode_clean"] = df["Postcode"].apply(sanitise_postcode) + Args: + task_id: The parent task ID + sub_task_id: The new subtask ID for this batch + s3_uri: S3 URI pointing to the batch CSV file - # --- validate AFTER grouping (save API calls) --- + Returns: + Message ID from SQS + """ + sqs_client = boto3.client("sqs") + queue_url = os.getenv("ADDRESS2UPRN_QUEUE_URL") - # Get unique, non-null postcodes - unique_postcodes = df["postcode_clean"].dropna().unique() + if not queue_url: + raise ValueError("ADDRESS2UPRN_QUEUE_URL environment variable not set") - # Validate each postcode once, TODOadd a progress bar - postcode_validity = { - pc: is_valid_postcode(pc) - for pc in tqdm(unique_postcodes, total=len(unique_postcodes)) + message_body = { + "task_id": task_id, + "sub_task_id": sub_task_id, + "s3_uri": s3_uri, } - # Map validity back onto dataframe - df["postcode_valid"] = df["postcode_clean"].map(postcode_validity) + response = sqs_client.send_message( + QueueUrl=queue_url, + MessageBody=json.dumps(message_body), + ) + logger.info( + f"Sent message to address2UPRN queue. " + f"Task: {task_id}, SubTask: {sub_task_id}, MessageId: {response['MessageId']}" + ) + + return response["MessageId"] + + +def create_batch_and_send_to_address2uprn( + batch_df: pd.DataFrame, + task_id: str, + sub_task_id: str, + subtask_interface: SubTaskInterface, + bucket_name: str, +) -> str: + """ + Create a batch DataFrame, upload to S3, create subtask, and send to address2UPRN queue. + + """ + # Upload batch to S3 + + s3_uri = upload_batch_to_s3(batch_df, str(task_id), str(sub_task_id), bucket_name) + + # Create a new subtask for this batch with all inputs + created_batch_sub_task_id = subtask_interface.create_subtask( + task_id=task_id, + inputs={ + "task_id": str(task_id), + "s3_uri": s3_uri, + }, + ) + + logger.info(f"Created batch subtask {created_batch_sub_task_id}") + + # Send message with S3 reference + send_to_address2uprn_queue( + task_id=str(task_id), + sub_task_id=str(created_batch_sub_task_id), + s3_uri=s3_uri, + ) + + return created_batch_sub_task_id + + +def handler(event, context, local=False): + print(f"Function: {context.function_name}") + print(f"Request ID: {context.aws_request_id}") + + # Example SQS message for testing (copy and paste into SQS): + if local is True: + event = { + "Records": [ + { + "body": json.dumps( + { + "task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917", + "sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0", + "s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/peabody/2025_11_11 - Peabody - Data Extracts for Domna_transformed.csv", + } + ) + } + ] + } + # Handle both single event and batch events (SQS, etc.) + records = event.get("Records", [event]) results = [] + errors = [] + subtask_interface = SubTaskInterface() + bucket_name = os.getenv("S3_BUCKET_NAME") + if local: + bucket_name = "retrofit-data-dev" - for postcode, group_df in tqdm( - df[df["postcode_valid"]].groupby("postcode_clean"), - desc="Resolving UPRNs by postcode", - ): - try: - epc_df = get_epc_data_with_postcode(postcode) + for record in records: + if local: + record = records[0] + task_id = None + subtask_id = None + # Parse body (inputs) - if epc_df.empty: - tmp = group_df.copy() - tmp["found_uprn"] = None - tmp["status"] = "no_epc_results" - results.append(tmp) - continue + if isinstance(record.get("body"), str): + body = json.loads(record["body"]) + else: + body = record.get("body", {}) - resolved = resolve_uprns_for_postcode_group( - group_df=group_df, - epc_df=epc_df, + # Validate required fields + task_id = body.get("task_id") + subtask_id = body.get("sub_task_id") + s3_uri = body.get("s3_uri") + + # Convert task_id to UUID + task_id = UUID(task_id) if isinstance(task_id, str) else task_id + subtask_id = UUID(subtask_id) if isinstance(subtask_id, str) else subtask_id + + # Mark subtask as in progress + subtask_interface.update_subtask_status(subtask_id, "in progress") + logger.info(f"Marked subtask {subtask_id} as in progress") + + # Read CSV from S3 + bucket, key = parse_s3_uri(s3_uri) + logger.info(f"S3 Bucket: {bucket}, Key: {key}") + + csv_data = read_csv_from_s3_dict(bucket, key) + df = pd.DataFrame(csv_data) + + logger.info(f"CSV loaded: {len(df)} rows, {len(df.columns)} columns") + + # Sanitise postcodes + df["postcode_clean"] = df["postcode"].str.upper().str.replace(" ", "") + + df = df.dropna(subset=["postcode_clean"]) + + batch_size = 500 + if df.shape[0] < batch_size: + create_batch_and_send_to_address2uprn( + batch_df=df, + task_id=task_id, + sub_task_id=subtask_id, + subtask_interface=subtask_interface, + bucket_name=bucket_name, ) + else: + postcode_to_addresses = { + postcode: group + for postcode, group in df.groupby("postcode_clean", sort=False) + } - results.append(resolved) + count = 0 + buffer = [] - except Exception as e: - tmp = group_df.copy() - tmp["found_uprn"] = None - tmp["status"] = "exception" - tmp["error"] = str(e) - results.append(tmp) + for postcode, group_df in postcode_to_addresses.items(): + group_len = len(group_df) - final_df = pd.concat(results, ignore_index=True) - a = final_df[ - [ - "best_match_lexiscore", - "Address 1", - "best_match_address", - "Postcode", - "UPRN", - "best_match_uprn", - ] - ] # add levi score to viewing - b = final_df[final_df["best_match_lexiscore"] > 0] # add levi score to viewing - b = b[ - [ - "best_match_lexiscore", - "Address 1", - "best_match_address", - "Postcode", - "UPRN", - "best_match_uprn", - ] - ] + # If single postcode is bigger than batch_size → send directly + if group_len >= batch_size: + if buffer: + create_batch_and_send_to_address2uprn( + batch_df=pd.concat(buffer, ignore_index=True), + task_id=task_id, + sub_task_id=subtask_id, + subtask_interface=subtask_interface, + bucket_name=bucket_name, + ) + buffer = [] + count = 0 + create_batch_and_send_to_address2uprn( + batch_df=group_df, + task_id=task_id, + sub_task_id=subtask_id, + subtask_interface=subtask_interface, + bucket_name=bucket_name, + ) + continue -def handler(event, context): - print("hello Postcode splitter world") - return {"statusCode": 200, "body": "hello world"} + # If adding would exceed batch → flush first + if count + group_len > batch_size: + create_batch_and_send_to_address2uprn( + batch_df=pd.concat(buffer, ignore_index=True), + task_id=task_id, + sub_task_id=subtask_id, + subtask_interface=subtask_interface, + bucket_name=bucket_name, + ) + buffer = [] + count = 0 + # Add group + buffer.append(group_df) + count += group_len -if __name__ == "__main__": - main() + # Final flush + if buffer: + create_batch_and_send_to_address2uprn( + batch_df=pd.concat(buffer, ignore_index=True), + task_id=task_id, + sub_task_id=subtask_id, + subtask_interface=subtask_interface, + bucket_name=bucket_name, + ) + + # Mark subtask as completed + subtask_interface.update_subtask_status( + subtask_id, + "completed", + outputs={"rows_processed": "completed"}, + ) + + return { + "statusCode": 200, + "body": json.dumps( + {"processed": results, "errors": errors if errors else None} + ), + } diff --git a/infrastructure/terraform/lambda/_template/main.tf b/infrastructure/terraform/lambda/_template/main.tf index 3010aa8a..7f60d684 100644 --- a/infrastructure/terraform/lambda/_template/main.tf +++ b/infrastructure/terraform/lambda/_template/main.tf @@ -1,3 +1,30 @@ +# ============================================================================== +# TEMPLATE: Lambda Configuration with Optional S3 IAM Policy +# ============================================================================== +# Instructions: +# 1. Replace "REPLACE ME" with your lambda name (e.g., "my-lambda-name") +# 2. Add any additional environment variables as needed +# 3. To attach S3 IAM policies from shared state: +# - Uncomment the S3 policy attachment section below +# - Update the policy_arn to match the output from shared/main.tf +# - Available shared outputs (examples): +# - data.terraform_remote_state.shared.outputs.condition_etl_s3_read_arn +# - data.terraform_remote_state.shared.outputs.postcode_splitter_s3_read_arn +# 4. To create a NEW S3 policy: +# - Add a new module "lambda_s3_policy" in shared/main.tf using the +# s3_iam_policy module (see examples in shared/main.tf) +# - Then reference it here using data.terraform_remote_state.shared.outputs +# ============================================================================== + +data "terraform_remote_state" "shared" { + backend = "s3" + config = { + bucket = "assessment-model-terraform-state" + key = "env:/${var.stage}/terraform.tfstate" + region = "eu-west-2" + } +} + module "lambda" { source = "../modules/lambda_with_sqs" @@ -12,3 +39,25 @@ module "lambda" { LOG_LEVEL = "info" } } + +# ====================================================================== +# OPTIONAL: Attach S3 IAM policy to Lambda execution role +# ====================================================================== +# Uncomment and configure the resource below to attach S3 permissions +# +# Example 1: Attach existing policy from shared state +# resource "aws_iam_role_policy_attachment" "lambda_s3_policy" { +# role = module.lambda.role_name +# policy_arn = data.terraform_remote_state.shared.outputs.YOUR_POLICY_OUTPUT_NAME_arn +# } +# +# Example 2: Attach multiple policies +# resource "aws_iam_role_policy_attachment" "lambda_read_policy" { +# role = module.lambda.role_name +# policy_arn = data.terraform_remote_state.shared.outputs.postcode_splitter_s3_read_arn +# } +# +# resource "aws_iam_role_policy_attachment" "lambda_write_policy" { +# role = module.lambda.role_name +# policy_arn = data.terraform_remote_state.shared.outputs.another_policy_arn +# } diff --git a/infrastructure/terraform/lambda/address2UPRN/main.tf b/infrastructure/terraform/lambda/address2UPRN/main.tf index 46b193f2..5a36153e 100644 --- a/infrastructure/terraform/lambda/address2UPRN/main.tf +++ b/infrastructure/terraform/lambda/address2UPRN/main.tf @@ -1,3 +1,19 @@ +data "terraform_remote_state" "shared" { + backend = "s3" + config = { + bucket = "assessment-model-terraform-state" + key = "env:/${var.stage}/terraform.tfstate" + region = "eu-west-2" + } +} +data "aws_secretsmanager_secret_version" "db_credentials" { + secret_id = "${var.stage}/assessment_model/db_credentials" +} + +locals { + db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string) +} + module "address2uprn" { source = "../modules/lambda_with_sqs" @@ -6,9 +22,32 @@ module "address2uprn" { image_uri = local.image_uri - - environment = { - STAGE = var.stage - LOG_LEVEL = "info" - } + environment = merge( + { + STAGE = var.stage + LOG_LEVEL = "info" + DB_USERNAME = local.db_credentials.db_assessment_model_username + DB_PASSWORD = local.db_credentials.db_assessment_model_password + GOOGLE_SOLAR_API_KEY = "test" + SAP_PREDICTIONS_BUCKET = "test" + CARBON_PREDICTIONS_BUCKET = "test" + HEAT_PREDICTIONS_BUCKET = "test" + HEATING_KWH_PREDICTIONS_BUCKET = "test" + HOTWATER_KWH_PREDICTIONS_BUCKET = "test" + API_KEY = "test" + ENVIRONMENT = "test" + SECRET_KEY = "test" + PLAN_TRIGGER_BUCKET = "test" + DATA_BUCKET = "test" + ENGINE_SQS_URL = "test" + ENERGY_ASSESSMENTS_BUCKET = "test" + S3_BUCKET_NAME = data.terraform_remote_state.shared.outputs.retrofit_sap_data_bucket_name + }, + ) } + +# Attach S3 read policy to the Lambda execution role +resource "aws_iam_role_policy_attachment" "address2uprn_read_and_write" { + role = module.address2uprn.role_name + policy_arn = data.terraform_remote_state.shared.outputs.address_2_uprn_s3_read_and_write_arn +} \ No newline at end of file diff --git a/infrastructure/terraform/lambda/address2UPRN/outputs.tf b/infrastructure/terraform/lambda/address2UPRN/outputs.tf new file mode 100644 index 00000000..e4645a0a --- /dev/null +++ b/infrastructure/terraform/lambda/address2UPRN/outputs.tf @@ -0,0 +1,14 @@ +output "address2uprn_queue_url" { + value = module.address2uprn.queue_url + description = "URL of the address2UPRN SQS queue" +} + +output "address2uprn_queue_arn" { + value = module.address2uprn.queue_arn + description = "ARN of the address2UPRN SQS queue" +} + +output "address2uprn_lambda_arn" { + value = module.address2uprn.lambda_arn + description = "ARN of the address2UPRN Lambda function" +} diff --git a/infrastructure/terraform/lambda/condition-etl/main.tf b/infrastructure/terraform/lambda/condition-etl/main.tf index 4219f209..0128f975 100644 --- a/infrastructure/terraform/lambda/condition-etl/main.tf +++ b/infrastructure/terraform/lambda/condition-etl/main.tf @@ -23,7 +23,6 @@ module "lambda" { stage = var.stage image_uri = local.image_uri - timeout = 180 environment = merge( diff --git a/infrastructure/terraform/lambda/modules/lambda_with_sqs/outputs.tf b/infrastructure/terraform/lambda/modules/lambda_with_sqs/outputs.tf index afc9246d..b408593f 100644 --- a/infrastructure/terraform/lambda/modules/lambda_with_sqs/outputs.tf +++ b/infrastructure/terraform/lambda/modules/lambda_with_sqs/outputs.tf @@ -9,3 +9,4 @@ output "queue_arn" { output "queue_url" { value = module.queue.queue_url } + diff --git a/infrastructure/terraform/lambda/postcodeSplitter/main.tf b/infrastructure/terraform/lambda/postcodeSplitter/main.tf index ebbdbfdc..d37a01c9 100644 --- a/infrastructure/terraform/lambda/postcodeSplitter/main.tf +++ b/infrastructure/terraform/lambda/postcodeSplitter/main.tf @@ -1,3 +1,30 @@ +data "terraform_remote_state" "shared" { + backend = "s3" + config = { + bucket = "assessment-model-terraform-state" + key = "env:/${var.stage}/terraform.tfstate" + region = "eu-west-2" + } +} +data "aws_secretsmanager_secret_version" "db_credentials" { + secret_id = "${var.stage}/assessment_model/db_credentials" +} + + +locals { + db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string) +} + +# Reference the existing address2UPRN Lambda outputs from address2uprn state +data "terraform_remote_state" "address2uprn" { + backend = "s3" + config = { + bucket = "address2uprn-terraform-state" + key = "env:/${var.stage}/terraform.tfstate" + region = "eu-west-2" + } +} + module "lambda" { source = "../modules/lambda_with_sqs" @@ -7,8 +34,56 @@ module "lambda" { image_uri = local.image_uri - environment = { - STAGE = var.stage - LOG_LEVEL = "info" - } + environment = merge( + { + STAGE = var.stage + LOG_LEVEL = "info" + DB_USERNAME = local.db_credentials.db_assessment_model_username + DB_PASSWORD = local.db_credentials.db_assessment_model_password + GOOGLE_SOLAR_API_KEY = "test" + SAP_PREDICTIONS_BUCKET = "test" + CARBON_PREDICTIONS_BUCKET = "test" + HEAT_PREDICTIONS_BUCKET = "test" + HEATING_KWH_PREDICTIONS_BUCKET = "test" + HOTWATER_KWH_PREDICTIONS_BUCKET = "test" + API_KEY = "test" + ENVIRONMENT = "test" + SECRET_KEY = "test" + PLAN_TRIGGER_BUCKET = "test" + DATA_BUCKET = "test" + EPC_AUTH_TOKEN = "test" + ENGINE_SQS_URL = "test" + ENERGY_ASSESSMENTS_BUCKET = "test" + ADDRESS2UPRN_QUEUE_URL = data.terraform_remote_state.address2uprn.outputs.address2uprn_queue_url + S3_BUCKET_NAME = data.terraform_remote_state.shared.outputs.retrofit_sap_data_bucket_name + }, + ) } + +# Attach S3 read policy to the Lambda execution role +resource "aws_iam_role_policy_attachment" "postcode_splitter_s3_read" { + role = module.lambda.role_name + policy_arn = data.terraform_remote_state.shared.outputs.postcode_splitter_s3_read_arn +} + +# Create SQS send policy for address2UPRN queue +module "postcode_splitter_sqs_policy" { + source = "../../modules/general_iam_policy" + + policy_name = "postcode-splitter-sqs-send-${var.stage}" + policy_description = "Allow postcode-splitter Lambda to send messages to address2UPRN queue" + + actions = [ + "sqs:SendMessage" + ] + + resources = [ + data.terraform_remote_state.address2uprn.outputs.address2uprn_queue_arn + ] +} + +# Attach SQS policy to the Lambda execution role +resource "aws_iam_role_policy_attachment" "postcode_splitter_sqs_send" { + role = module.lambda.role_name + policy_arn = module.postcode_splitter_sqs_policy.policy_arn +} \ No newline at end of file diff --git a/infrastructure/terraform/lambda/postcodeSplitter/variables.tf b/infrastructure/terraform/lambda/postcodeSplitter/variables.tf index 9ce45fa5..7bd68543 100644 --- a/infrastructure/terraform/lambda/postcodeSplitter/variables.tf +++ b/infrastructure/terraform/lambda/postcodeSplitter/variables.tf @@ -24,3 +24,12 @@ locals { output "resolved_image_uri" { value = local.image_uri } + + + + + + + + + diff --git a/infrastructure/terraform/modules/general_iam_policy/main.tf b/infrastructure/terraform/modules/general_iam_policy/main.tf new file mode 100644 index 00000000..f7ffe4a1 --- /dev/null +++ b/infrastructure/terraform/modules/general_iam_policy/main.tf @@ -0,0 +1,21 @@ +# IAM Policy with dynamic actions and resources +resource "aws_iam_policy" "policy" { + name = var.policy_name + description = var.policy_description + + policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + merge( + { + Effect = "Allow" + Action = var.actions + Resource = var.resources + }, + var.conditions != null ? { Condition = var.conditions } : {} + ) + ] + }) + + tags = var.tags +} diff --git a/infrastructure/terraform/modules/general_iam_policy/outputs.tf b/infrastructure/terraform/modules/general_iam_policy/outputs.tf new file mode 100644 index 00000000..cfceab05 --- /dev/null +++ b/infrastructure/terraform/modules/general_iam_policy/outputs.tf @@ -0,0 +1,9 @@ +output "policy_arn" { + value = aws_iam_policy.policy.arn + description = "ARN of the created IAM policy" +} + +output "policy_name" { + value = aws_iam_policy.policy.name + description = "Name of the created IAM policy" +} diff --git a/infrastructure/terraform/modules/general_iam_policy/variables.tf b/infrastructure/terraform/modules/general_iam_policy/variables.tf new file mode 100644 index 00000000..0d824eb5 --- /dev/null +++ b/infrastructure/terraform/modules/general_iam_policy/variables.tf @@ -0,0 +1,32 @@ +variable "policy_name" { + description = "Name of the IAM policy" + type = string +} + +variable "policy_description" { + description = "Description of the IAM policy" + type = string + default = "" +} + +variable "actions" { + description = "List of IAM actions allowed by this policy" + type = list(string) +} + +variable "resources" { + description = "List of AWS resources this policy applies to" + type = list(string) +} + +variable "conditions" { + description = "Optional IAM policy conditions" + type = any + default = null +} + +variable "tags" { + description = "Tags to apply to the policy" + type = map(string) + default = {} +} diff --git a/infrastructure/terraform/modules/lambda_execution_role/main.tf b/infrastructure/terraform/modules/lambda_execution_role/main.tf index fa657afd..e593b17c 100644 --- a/infrastructure/terraform/modules/lambda_execution_role/main.tf +++ b/infrastructure/terraform/modules/lambda_execution_role/main.tf @@ -19,19 +19,3 @@ resource "aws_iam_role_policy_attachment" "basic_logs" { policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" } -resource "aws_iam_role_policy" "ecr_pull" { - role = aws_iam_role.this.name - - policy = jsonencode({ - Version = "2012-10-17" - Statement = [{ - Effect = "Allow" - Action = [ - "ecr:GetAuthorizationToken", - "ecr:BatchGetImage", - "ecr:GetDownloadUrlForLayer" - ] - Resource = "*" - }] - }) -} diff --git a/infrastructure/terraform/modules/s3_iam_policy/main.tf b/infrastructure/terraform/modules/s3_iam_policy/main.tf new file mode 100644 index 00000000..397bd963 --- /dev/null +++ b/infrastructure/terraform/modules/s3_iam_policy/main.tf @@ -0,0 +1,31 @@ +# Dynamically build S3 resources list from bucket ARNs and resource paths +locals { + # Generate full resource ARNs by combining bucket ARNs with resource paths + resources = flatten([ + for bucket_arn in var.bucket_arns : [ + for path in var.resource_paths : "${bucket_arn}${path}" + ] + ]) +} + +# IAM Policy with dynamic actions and resources +resource "aws_iam_policy" "s3_policy" { + name = var.policy_name + description = var.policy_description + + policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + merge( + { + Effect = "Allow" + Action = var.actions + Resource = local.resources + }, + var.conditions != null ? { Condition = var.conditions } : {} + ) + ] + }) + + tags = var.tags +} diff --git a/infrastructure/terraform/modules/s3_iam_policy/outputs.tf b/infrastructure/terraform/modules/s3_iam_policy/outputs.tf new file mode 100644 index 00000000..85defd9c --- /dev/null +++ b/infrastructure/terraform/modules/s3_iam_policy/outputs.tf @@ -0,0 +1,14 @@ +output "policy_arn" { + description = "ARN of the S3 IAM policy" + value = aws_iam_policy.s3_policy.arn +} + +output "policy_name" { + description = "Name of the S3 IAM policy" + value = aws_iam_policy.s3_policy.name +} + +output "policy_id" { + description = "ID of the S3 IAM policy" + value = aws_iam_policy.s3_policy.id +} diff --git a/infrastructure/terraform/modules/s3_iam_policy/variables.tf b/infrastructure/terraform/modules/s3_iam_policy/variables.tf new file mode 100644 index 00000000..e2b3d7a8 --- /dev/null +++ b/infrastructure/terraform/modules/s3_iam_policy/variables.tf @@ -0,0 +1,42 @@ +variable "policy_name" { + description = "Name of the IAM policy" + type = string +} + +variable "policy_description" { + description = "Description of the IAM policy" + type = string + default = "" +} + +variable "bucket_arns" { + description = "List of S3 bucket ARNs to grant access to" + type = list(string) +} + +variable "actions" { + description = "List of S3 actions to allow (e.g., ['s3:GetObject'], ['s3:PutObject'], ['s3:DeleteObject'])" + type = list(string) + default = ["s3:GetObject"] +} + +variable "resource_paths" { + description = "List of resource paths within buckets (e.g., ['/*'] for all objects, ['/specific-prefix/*'] for specific prefix)" + type = list(string) + default = ["/*"] +} + +variable "conditions" { + description = "Optional IAM policy conditions to apply to the statement" + type = any + default = null +} + +variable "tags" { + description = "Tags to apply to the policy" + type = map(string) + default = {} +} + + + diff --git a/infrastructure/terraform/shared/main.tf b/infrastructure/terraform/shared/main.tf index b1474055..acf8c281 100644 --- a/infrastructure/terraform/shared/main.tf +++ b/infrastructure/terraform/shared/main.tf @@ -133,6 +133,11 @@ module "retrofit_sap_data" { allowed_origins = var.allowed_origins } +output "retrofit_sap_data_bucket_name" { + value = module.retrofit_sap_data.bucket_name + description = "Name of the retrofit SAP data bucket" +} + module "retrofit_carbon_predictions" { source = "../modules/s3" bucketname = "retrofit-carbon-predictions-${var.stage}" @@ -305,6 +310,21 @@ module "address2uprn_registry" { } +# S3 policy for postcode splitter to read from retrofit data bucket +module "address2uprn_s3_read_and_write" { + source = "../modules/s3_iam_policy" + + policy_name = "Address2UPRNReadandWriteS3" + policy_description = "Allow address2uprn Lambda to read and write from retrofit-data bucket" + bucket_arns = ["arn:aws:s3:::retrofit-data-${var.stage}"] + actions = ["s3:GetObject", "s3:ListBucket", "s3:PutObject"] + resource_paths = ["/*"] +} + +output "address_2_uprn_s3_read_and_write_arn" { + value = module.address2uprn_s3_read_and_write.policy_arn +} + ################################################ # Condition ETL – Lambda ECR ################################################ @@ -321,6 +341,28 @@ module "condition_etl_registry" { } +# Condition Data S3 Bucket to store initial data +module "condition_data_bucket" { + source = "../modules/s3" + bucketname = "condition-data-${var.stage}" + allowed_origins = var.allowed_origins +} + +module "condition_etl_s3_read" { + source = "../modules/s3_iam_policy" + + policy_name = "ConditionETLReadS3" + policy_description = "Allow Lambda to read objects from condition-data-${var.stage}" + bucket_arns = ["arn:aws:s3:::condition-data-${var.stage}"] + actions = ["s3:GetObject"] + resource_paths = ["/*"] +} + +output "condition_etl_s3_read_arn" { + value = module.condition_etl_s3_read.policy_arn +} + + ################################################ # Postcode Splitter – Lambda ECR ################################################ @@ -337,30 +379,17 @@ module "postcode_splitter_registry" { } -################################################ -# Conidition data – S3 bucket -################################################ -module "condition_data_bucket" { - source = "../modules/s3" - bucketname = "condition-data-${var.stage}" - allowed_origins = var.allowed_origins +# S3 policy for postcode splitter to read from retrofit data bucket +module "postcode_splitter_s3_read" { + source = "../modules/s3_iam_policy" + + policy_name = "PostcodeSplitterReadS3" + policy_description = "Allow postcode splitter Lambda to read from retrofit-data bucket" + bucket_arns = ["arn:aws:s3:::retrofit-data-${var.stage}"] + actions = ["s3:GetObject", "s3:ListBucket", "s3:PutObject"] + resource_paths = ["/*"] } -resource "aws_iam_policy" "condition_etl_s3_read" { - name = "ConditionETLReadS3" - description = "Allow Lambda to read objects from condition-data-${var.stage}" - policy = jsonencode({ - Version = "2012-10-17" - Statement = [ - { - Effect = "Allow" - Action = ["s3:GetObject"] - Resource = "arn:aws:s3:::condition-data-${var.stage}/*" - } - ] - }) -} - -output "condition_etl_s3_read_arn" { - value = aws_iam_policy.condition_etl_s3_read.arn +output "postcode_splitter_s3_read_arn" { + value = module.postcode_splitter_s3_read.policy_arn } \ No newline at end of file diff --git a/sfr/principal_pitch/2_export_data.py b/sfr/principal_pitch/2_export_data.py index 42d70a3a..4f430209 100644 --- a/sfr/principal_pitch/2_export_data.py +++ b/sfr/principal_pitch/2_export_data.py @@ -36,6 +36,8 @@ scenario_names = { 1059: "EPC C - 10k budget", } +project_name = "manchester" + def get_data(portfolio_id, scenario_ids): session = sessionmaker(bind=db_engine)() @@ -232,7 +234,7 @@ for scenario_id in SCENARIOS: # Get recs for this scenario recommended_measures_df = recommendations_df[ recommendations_df["scenario_id"] == scenario_id - ][["property_id", "measure_type", "estimated_cost", "default"]] + ][["property_id", "measure_type", "estimated_cost", "default"]] recommended_measures_df = recommended_measures_df[ recommended_measures_df["default"] ] @@ -240,7 +242,7 @@ for scenario_id in SCENARIOS: post_install_sap = recommendations_df[ recommendations_df["scenario_id"] == scenario_id - ][["property_id", "default", "sap_points"]] + ][["property_id", "default", "sap_points"]] post_install_sap = post_install_sap[post_install_sap["default"]] # Sum up the sap points by property id post_install_sap = ( @@ -286,6 +288,8 @@ for scenario_id in SCENARIOS: "current_sap_points", "total_floor_area", "number_of_rooms", + "lodgement_date", + "is_expired", "id", ] ] @@ -303,7 +307,58 @@ for scenario_id in SCENARIOS: ) df["uprn"] = df["uprn"].astype(str) + relevant_plans = plans_df[plans_df["scenario_id"] == scenario_id] + df2 = df.merge( + relevant_plans[["property_id", "post_sap_points", "post_epc_rating"]], + how="left", + on="property_id", + suffixes=("", "_plan"), + ) + print(df2["predicted_post_works_epc"].value_counts()) + print(df2["post_epc_rating"].value_counts()) + + z = df2[ + (df2["predicted_post_works_epc"] != "D") + & (df2["post_epc_rating"].astype(str) == "Epc.D") + ] + + df2["predicted_post_works_epc"].value_counts() + df2["post_epc_rating"].astype(str).value_counts() + + df2[df2["total_retrofit_cost"] > 0].shape + + getting_works = df[df["total_retrofit_cost"] > 0] + getting_works["predicted_post_works_epc"].value_counts() + + 32565 / getting_works.shape[0] + + df[df["predicted_post_works_sap"] == ""] + + # Expected columns list + expected_columns = [ + "suspended_floor_insulation", + "solid_floor_insulation", + "external_wall_insulation", + "internal_wall_insulation", + "cavity_wall_insulation", + "loft_insulation", + "flat_roof_insulation", + "room_roof_insulation", + "secondary_glazing", + "double_glazing", + "solar_pv", + "high_heat_retention_storage_heaters", + "air_source_heat_pump", + "boiler_upgrade", + "roomstat_programmer_trvs", + "time_temperature_zone_control", + ] + # Add missing columns with default values + for col in expected_columns: + if col not in df.columns: + df[col] = "" + # Create excel to store to - filename = f"{scenario_names[scenario_id]} - 20250113 final.xlsx" + filename = f"{scenario_names[scenario_id]} - {project_name}.xlsx" with pd.ExcelWriter(filename) as writer: df.to_excel(writer, sheet_name="properties", index=False) diff --git a/utils/s3.py b/utils/s3.py index b243b2ab..b3a96dba 100644 --- a/utils/s3.py +++ b/utils/s3.py @@ -3,12 +3,62 @@ import boto3 import csv import pandas as pd from io import BytesIO, StringIO +from urllib.parse import unquote from utils.logger import setup_logger from botocore.exceptions import NoCredentialsError, PartialCredentialsError logger = setup_logger() +def parse_s3_uri(s3_uri: str) -> tuple[str, str]: + """ + Parse S3 URI to extract bucket and key. + + Supports two formats: + 1. S3 URI format: s3://bucket/key + 2. AWS console URL format with query parameters + """ + logger.info("Parsing S3 URI") + + try: + # Check if it's an S3 URI format + if s3_uri.startswith("s3://"): + parts = s3_uri[5:].split("/", 1) + if len(parts) < 2: + raise ValueError("S3 URI must include both bucket and key") + bucket = parts[0] + key = parts[1] + logger.info(f"Extracted bucket: {bucket}, key: {key}") + return bucket, key + + # Otherwise, treat as AWS console URL + logger.info("Parsing as AWS console URL") + + # Split base URL and query string + if "?" not in s3_uri: + raise ValueError("No query string found") + + base, query = s3_uri.split("?", 1) + + # Extract bucket from base URL + if "/s3/object/" not in base: + raise ValueError("No '/s3/object/' found in URL path") + + path_parts = base.split("/s3/object/") + bucket = path_parts[1] + logger.info(f"Extracted bucket: {bucket}") + + # Extract prefix from query parameters + params = dict(item.split("=") for item in query.split("&") if "=" in item) + key = unquote(params.get("prefix", "")) + logger.info(f"Extracted key: {key}") + + return bucket, key + except Exception as e: + logger.error(f"Error parsing S3 URI: {type(e).__name__}: {e}") + raise ValueError(f"Could not parse S3 URI") from e + + def read_from_s3(bucket_name, s3_file_name): """ Read an object from s3. Decoding of the data is left for outside of this function