Merge pull request #704 from Hestia-Homes/feautre/postcode_splitter_address_uprn_automator_2000

Feautre/postcode splitter address uprn automator 2000
This commit is contained in:
Jun-te Kim 2026-02-16 15:51:43 +00:00 committed by GitHub
commit 1b5d0312b5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
38 changed files with 1283 additions and 355 deletions

View file

@ -22,7 +22,9 @@
"jgclark.vscode-todo-highlight",
"corentinartaud.pdfpreview",
"ms-python.vscode-python-envs",
"ms-python.black-formatter"
"ms-python.black-formatter",
"GrapeCity.gc-excelviewer",
"jakobhoeg.vscode-pokemon"
],
"settings": {
"files.defaultWorkspace": "/workspaces/model",

View file

@ -43,4 +43,17 @@ WORKDIR /workspaces/model
# 6) Make Python find your package
# Add project root to PYTHONPATH for all processes
ENV PYTHONPATH=/workspaces/model:${PYTHONPATH}
ENV PYTHONPATH=/workspaces/model:${PYTHONPATH}
# Install terraform
RUN apt-get update && sudo apt-get install -y gnupg software-properties-common
RUN wget -O- https://apt.releases.hashicorp.com/gpg | \
gpg --dearmor | \
sudo tee /usr/share/keyrings/hashicorp-archive-keyring.gpg > /dev/null
RUN echo "deb [signed-by=/usr/share/keyrings/hashicorp-archive-keyring.gpg] \
https://apt.releases.hashicorp.com $(lsb_release -cs) main" | \
tee /etc/apt/sources.list.d/hashicorp.list
RUN apt update
RUN apt-get install terraform
RUN terraform -install-autocomplete

View file

@ -9,7 +9,7 @@ mangum==0.19.0
# AWS
boto3==1.35.44
# Data
openpyxl==3.1.2
openpyxl==3.1.5
# Basic
pytz
uvicorn[standard]

View file

@ -38,6 +38,8 @@ on:
required: false
DEV_DB_NAME:
required: false
EPC_AUTH_TOKEN:
required: false
jobs:
build:
@ -47,6 +49,7 @@ jobs:
DEV_DB_HOST: ${{ secrets.DEV_DB_HOST }}
DEV_DB_PORT: ${{ secrets.DEV_DB_PORT }}
DEV_DB_NAME: ${{ secrets.DEV_DB_NAME }}
EPC_AUTH_TOKEN: ${{ secrets.EPC_AUTH_TOKEN }}
outputs:
image_digest: ${{ steps.digest.outputs.image_digest }}
@ -87,14 +90,17 @@ jobs:
temp=$(eval echo "$line")
BUILD_ARGS="$BUILD_ARGS --build-arg $temp"
done <<< "${{ inputs.build_args }}"
docker build \
docker buildx build \
--no-cache \
--platform linux/amd64 \
--provenance=false \
--sbom=false \
--push \
-f ${{ inputs.dockerfile_path }} \
$BUILD_ARGS \
-t $IMAGE_URI \
${{ inputs.build_context }}
docker push $IMAGE_URI
- name: Resolve image digest
id: digest

View file

@ -106,4 +106,10 @@ jobs:
- name: Terraform Destroy
if: inputs.terraform_destroy == 'true' && inputs.terraform_apply != 'true'
working-directory: ${{ inputs.lambda_path }}
run: terraform destroy -auto-approve
run: |
terraform destroy -auto-approve \
-var="stage=${{ inputs.stage }}" \
-var="lambda_name=${{ inputs.lambda_name }}" \
-var="ecr_repo_url=${{ steps.repo.outputs.ecr_repo_url }}" \
-var="image_digest=${{ inputs.image_digest }}"

View file

@ -141,3 +141,4 @@ jobs:
# Deploy to AWS Lambda via Serverless
sls deploy --stage ${{ github.ref_name }} --verbose

View file

@ -9,6 +9,7 @@ on:
- '.github/workflows/deploy_terraform.yml'
- '.github/workflows/_build_image.yml'
- '.github/workflows/_deploy_lambda.yml'
workflow_dispatch:
jobs:
determine_stage:
@ -76,10 +77,10 @@ jobs:
run: terraform plan -var-file=${STAGE}.tfvars -out=tfplan
- name: Terraform Apply
if: env.STAGE == 'prod'
if: env.TERRAFORM_APPLY == 'true'
working-directory: infrastructure/terraform/shared
run: terraform apply -auto-approve tfplan
# ============================================================
# 2⃣ Build Address 2 UPRN image and Push
# ============================================================
@ -90,10 +91,19 @@ jobs:
ecr_repo: address2uprn-${{ needs.determine_stage.outputs.stage }}
dockerfile_path: backend/address2UPRN/handler/Dockerfile
build_context: .
build_args: |
DEV_DB_HOST=$DEV_DB_HOST
DEV_DB_PORT=$DEV_DB_PORT
DEV_DB_NAME=$DEV_DB_NAME
EPC_AUTH_TOKEN=$EPC_AUTH_TOKEN
secrets:
AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }}
AWS_REGION: ${{ secrets.DEV_AWS_REGION }}
DEV_DB_HOST: ${{ secrets.DEV_DB_HOST }}
DEV_DB_PORT: ${{ secrets.DEV_DB_PORT }}
DEV_DB_NAME: ${{ secrets.DEV_DB_NAME }}
EPC_AUTH_TOKEN: ${{ secrets.DEV_EPC_AUTH_TOKEN }}
# ============================================================
# 3⃣ Deploy Address 2 UPRN Lambda
@ -140,7 +150,7 @@ jobs:
# 3⃣ Deploy Postcode Splitter Lambda
# ============================================================
postcodeSplitter_lambda:
needs: [postcodeSplitter_image, determine_stage]
needs: [postcodeSplitter_image, determine_stage, address2uprn_lambda]
uses: ./.github/workflows/_deploy_lambda.yml
with:
lambda_name: postcodeSplitter
@ -192,4 +202,5 @@ jobs:
secrets:
AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }}
AWS_REGION: ${{ secrets.DEV_AWS_REGION }}
AWS_REGION: ${{ secrets.DEV_AWS_REGION }}

View file

@ -34,7 +34,7 @@ from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes
logger = setup_logger()
# OpenAI API Key (set this in your environment variables for security)
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "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")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")

View file

@ -13,11 +13,15 @@ from asset_list.utils import get_data
from dotenv import load_dotenv
from backend.SearchEpc import SearchEpc
load_dotenv(dotenv_path="backend/.env")
load_dotenv(dotenv_path="../backend/.env")
EPC_AUTH_TOKEN = os.getenv(
"EPC_AUTH_TOKEN",
)
OPENAI_API_KEY = os.getenv(
"OPENAI_API_KEY",
)
def extract_address1(
asset_list, full_address_col, postcode_col, method="first_two_words"
@ -109,21 +113,21 @@ def app():
)
data_filename = "to_standardise_uprns.xlsx"
sheet_name = "Sheet1"
postcode_column = "Postcode"
postcode_column = "POSTCODE"
address1_column = None
address1_method = "house_number_extraction"
fulladdress_column = "Address"
address_cols_to_concat = None
fulladdress_column = "ADDRESS"
address_cols_to_concat = []
missing_postcodes_method = None
landlord_year_built = None
landlord_os_uprn = None
landlord_property_type = None
landlord_built_form = None
landlord_wall_construction = None
landlord_roof_construction = None
landlord_heating_system = None
landlord_property_type = "PROPERTY TYPE"
landlord_built_form = None # Skipped as empty
landlord_wall_construction = "wall combined" # combin F + G
landlord_roof_construction = "HEATING SYSTEM" # Combine I + J
landlord_heating_system = None # Check with Khalim
landlord_existing_pv = None
landlord_property_id = "LLUPRN"
landlord_property_id = "UPRN"
landlord_sap = None
outcomes_filename = None
outcomes_sheetname = None
@ -275,7 +279,7 @@ def app():
if skip is not None and not force_retrieve_data:
if i <= skip:
continue
chunk = asset_list.standardised_asset_list[i: i + chunk_size]
chunk = asset_list.standardised_asset_list[i : i + chunk_size]
epc_data_chunk, errors_chunk, no_epc_chunk = get_data(
df=chunk,
row_id_name=asset_list.DOMNA_PROPERTY_ID,
@ -418,7 +422,7 @@ def app():
# Retrieve just the data we need
epc_df = epc_df[
[asset_list.DOMNA_PROPERTY_ID] + list(asset_list.EPC_API_DATA_NAMES.keys())
].rename(columns=asset_list.EPC_API_DATA_NAMES)
].rename(columns=asset_list.EPC_API_DATA_NAMES)
# Look for columns not in the find my EPC data, which will have happened if we didn't
# retrieve it in the first place
@ -435,7 +439,7 @@ def app():
find_my_epc_data[
[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"]
+ list(asset_list.FIND_EPC_DATA_NAMES.keys())
].rename(columns=asset_list.FIND_EPC_DATA_NAMES),
].rename(columns=asset_list.FIND_EPC_DATA_NAMES),
how="left",
on=asset_list.DOMNA_PROPERTY_ID,
)

View file

@ -19,4 +19,4 @@ PLAN_TRIGGER_BUCKET=test
DATA_BUCKET=test
EPC_AUTH_TOKEN=test
ENGINE_SQS_URL=test
ENERGY_ASSESSMENTS_BUCKET=test
ENERGY_ASSESSMENTS_BUCKET=test

View file

@ -1,4 +1,17 @@
FROM public.ecr.aws/lambda/python:3.10
# FROM python:3.11.10-bullseye
ARG DEV_DB_HOST
ARG DEV_DB_PORT
ARG DEV_DB_NAME
ARG EPC_AUTH_TOKEN
ENV DB_HOST=${DEV_DB_HOST}
ENV DB_PORT=${DEV_DB_PORT}
ENV DB_NAME=${DEV_DB_NAME}
ENV EPC_AUTH_TOKEN=${EPC_AUTH_TOKEN}
# Set working directory (Lambda task root)
WORKDIR /var/task
@ -8,13 +21,17 @@ WORKDIR /var/task
# -----------------------------
COPY backend/address2UPRN/handler/requirements.txt .
# Install dependencies into Lambda runtime
RUN pip install --no-cache-dir -r requirements.txt
# -----------------------------
# Copy application code
# -----------------------------
# Copy necessary files for database and utility imports
COPY utils/ utils/
COPY backend/ backend/
COPY datatypes/ datatypes/
# Copy the handler
COPY backend/address2UPRN/main.py .
# -----------------------------

View file

@ -1,3 +1,11 @@
epc-api-python==1.0.2
pandas==2.2.2
numpy<2.0
requests
tqdm
pandas
openpyxl
epc-api-python==1.0.2
boto3==1.35.44
sqlmodel
sqlalchemy==2.0.36
psycopg2-binary==2.9.10
pydantic-settings==2.6.0

View file

@ -3,12 +3,23 @@ import os
from urllib.parse import urlencode
import pandas as pd
from difflib import SequenceMatcher
from tqdm import tqdm
from utils.logger import setup_logger
import re
from typing import Set
import json
import requests
from uuid import UUID
import uuid
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
from utils.s3 import (
save_csv_to_s3,
read_csv_from_s3 as read_csv_from_s3_dict,
parse_s3_uri,
)
from datetime import datetime
logger = setup_logger()
import re
EPC_AUTH_TOKEN = os.getenv(
"EPC_AUTH_TOKEN",
@ -17,9 +28,28 @@ EPC_AUTH_TOKEN = os.getenv(
if EPC_AUTH_TOKEN is None:
raise RuntimeError("EPC_AUTH_TOKEN not defined in env")
import re
from difflib import SequenceMatcher
from typing import Set
def is_valid_postcode(postcode_clean: str) -> bool:
"""
Validate postcode using postcodes.io.
Expects a sanitised postcode (e.g. E84SQ).
Returns True if valid, False otherwise.
"""
POSTCODES_IO_VALIDATE_URL = "https://api.postcodes.io/postcodes/{postcode}/validate"
if not postcode_clean:
return False
try:
resp = requests.get(
POSTCODES_IO_VALIDATE_URL.format(postcode=postcode_clean),
timeout=5,
)
resp.raise_for_status()
return resp.json().get("result", False)
except requests.RequestException:
# Network issues, rate limits, etc.
return False
def levenshtein(a: str, b: str) -> float:
@ -300,27 +330,29 @@ def get_uprn_candidates(
)
def get_uprn(user_inputed_address: str, postcode: str, return_address=False):
def get_uprn_with_epc_df(
user_inputed_address: str,
epc_df: pd.DataFrame,
verbose: bool = False,
):
"""
Return uprn (str)
Return False if failed to find a sensible matching epc
Return Nons when epc found but no UPRN
Return uprn (str) using a pre-fetched EPC dataframe.
This avoids calling the API multiple times for the same postcode.
"""
df = get_epc_data_with_postcode(postcode=postcode)
if df.empty:
if epc_df.empty:
return None
scored_df = get_uprn_candidates(
df,
epc_df,
user_address=user_inputed_address,
)
# Best score
best_score = scored_df.iloc[0]["lexiscore"]
if best_score <= 0:
return None
# # Return None if score is below threshold
# if best_score < 0.7:
# return None
# All rank-1 rows (possible draw)
top_rank_df = scored_df[scored_df["lexirank"] == 1]
@ -330,18 +362,41 @@ def get_uprn(user_inputed_address: str, postcode: str, return_address=False):
return None
address = top_rank_df["address"].values[0]
lexiscore = float(top_rank_df["lexiscore"].values[0])
score = float(top_rank_df["lexiscore"].values[0])
logger.info(f"Address found to be: {address}, with lexiscore {lexiscore}")
logger.info(f"Address found to be: {address}, with lexiscore {score}")
# Safe to return the agreed UPRN
found_uprn = top_rank_df.iloc[0]["uprn"]
if found_uprn == "":
return None
if return_address:
return found_uprn, address
return found_uprn
if verbose:
return (found_uprn, address, score)
else:
return found_uprn
def get_uprn(
user_inputed_address: str,
postcode: str,
verbose: bool = False,
):
"""
Return uprn (str)
Return False if failed to find a sensible matching epc
Return None when epc found but no UPRN
This function fetches EPC data via API for a single postcode.
For processing multiple addresses in the same postcode, use get_uprn_with_epc_df instead.
"""
df = get_epc_data_with_postcode(postcode=postcode)
return get_uprn_with_epc_df(
user_inputed_address=user_inputed_address,
epc_df=df,
verbose=verbose,
)
def resolve_uprns_for_postcode_group(
@ -424,148 +479,302 @@ def resolve_uprns_for_postcode_group(
)
def test(a, b):
assert a == b, f"erorr: {a}{type(a)} != {b}: {type(b)}"
def save_results_to_s3(
results_df: pd.DataFrame, task_id: str, sub_task_id: str, bucket_name: str = None
) -> bool:
"""
Save results DataFrame to S3 as CSV.
:param results_df: The DataFrame containing results
:param task_id: The task ID (used for file naming)
:param bucket_name: The S3 bucket name (defaults to env variable)
:return: True if successful, False otherwise
"""
if bucket_name is None:
bucket_name = os.getenv("S3_BUCKET_NAME")
if not bucket_name:
logger.error(
"S3 bucket name not provided and S3_BUCKET_NAME environment variable not set"
)
return False
try:
# Create a filename with the task ID
file_name = f"{datetime.now().isoformat()}_{str(uuid.uuid4())[:8]}"
file_key = f"ara_raw_outputs/{task_id}/{sub_task_id}/{file_name}.csv"
# Save to S3
success = save_csv_to_s3(results_df, bucket_name, file_key)
if success:
logger.info(f"Successfully saved results to s3://{bucket_name}/{file_key}")
return True
else:
logger.error(f"Failed to save results to S3")
return False
except Exception as e:
logger.error(f"Error saving results to S3: {str(e)}")
return False
def run_all_test():
# Basic usage with different post codes styles
test(get_epc_data_with_postcode("b93 8sy").shape[0], 63)
test(get_epc_data_with_postcode("B938sy").shape[0], 63)
test(get_epc_data_with_postcode("b93 8Sy").shape[0], 63)
test(get_epc_data_with_postcode("b93 8Sy").shape[0], 63)
def handler(event, context, local=False):
print("=== Address2UPRN Lambda Handler ===")
print(f"Function: {context.function_name}")
print(f"Request ID: {context.aws_request_id}")
test(get_uprn("68", "b93 8sy"), "100070989938")
test(get_uprn("68 Glendon Way", "b93 8sy"), "100070989938")
test(get_uprn("Flat A, 28, Nelgarde Road", "se6 4tf"), "100023278633")
test(get_uprn("28 A", "se6 4tf"), "100023278633")
test(get_uprn("28A", "se6 4tf"), "100023278633")
test(get_uprn("6 Aitken Close", "E8 4SQ"), False)
# Handle local testing
if local is True:
event = {
"Records": [
{
"body": json.dumps(
{
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
"sub_task_id": "6a427b6e-1ece-4983-b1e5-9bffccc53d1d",
"s3_uri": "s3://retrofit-data-dev/ara_postcode_splitter_batches/e31f2f21-175b-4a91-a3ec-a6baa325e917/8673913b-1a88-42d7-8578-0449123d94b0/2026-02-16T12:00:20.257856_7b520c0e.csv",
}
)
}
]
}
# unique case
test(get_uprn("Flat 5, 1, Semley Gate", "e9 5nh"), "10008238198")
test(get_uprn("5 , 1 Semley Gate", "e9 5nh"), "10008238198")
test(get_uprn("5 Semley Gate", "e9 5nh"), "10008238198")
test(get_uprn("1, 5 Semley Gate", "e9 5nh"), False)
test(
get_uprn("1 Semley Gate", "e9 5nh"), "10008238188"
) # this one return "flat 1, in 1 semley gate"
test(
get_uprn("48 Oswald Street", "E5 0BT"), False
) # this one return "flat 1, in 1 semley gate"
test(
get_uprn("42 Oswald Street", "E5 0BT"), False
) # this one return "flat 1, in 1 semley gate"
test(
get_uprn("46 Oswald Street", "E5 0BT"), False
) # this one return "flat 1, in 1 semley gate"
get_uprn_candidates(get_epc_data_with_postcode("e5 0bt"), "48 Oswald Street")
get_uprn_candidates(
get_epc_data_with_postcode("Cr2 7dl"),
"FLAT 3; 42 MORETON ROAD, SOUTH CROYDON, SURREY",
)
print(f"Event: {json.dumps(event, indent=2, default=str)}")
print("===================================")
# Handle both single event and batch events (SQS, etc.)
records = event.get("Records", [event])
results = []
errors = []
subtask_interface = SubTaskInterface()
if __name__ == "__main__":
INPUT_FILE = "hackney.xlsx"
ADDRESS_COL = "Address 1"
POSTCODE_COL = "Postcode"
UPRN_COL = "UPRN"
df = pd.read_excel(INPUT_FILE)
failures = []
for _, row in tqdm(
df.iterrows(),
total=len(df),
desc="Auditing UPRNs",
):
input_address = str(row[ADDRESS_COL]).strip()
postcode = str(row[POSTCODE_COL]).strip()
expected_uprn = None if pd.isna(row[UPRN_COL]) else str(int(row[UPRN_COL]))
for record in records:
task_id = None
subtask_id = None
try:
epc_df = get_epc_data_with_postcode(postcode)
# Parse body (inputs)
if isinstance(record.get("body"), str):
body = json.loads(record["body"])
else:
body = record.get("body", {})
if epc_df.empty:
failures.append(
{
**row.to_dict(),
"found_uprn": None,
"best_match_uprn": None,
"best_match_address": None,
"best_match_lexiscore": None,
"status": "no_epc_results",
}
# Validate required fields
task_id = body.get("task_id")
subtask_id = body.get("sub_task_id")
s3_uri = body.get("s3_uri")
if not task_id:
errors.append({"error": "Missing required field: task_id"})
continue
if not subtask_id:
errors.append({"error": "Missing required field: sub_task_id"})
continue
if not s3_uri:
errors.append({"error": "Missing required field: s3_uri"})
continue
# Convert task_id to UUID
try:
task_id = UUID(task_id) if isinstance(task_id, str) else task_id
except ValueError as e:
errors.append({"error": f"Invalid UUID format for task_id: {str(e)}"})
continue
# Convert sub_task_id to UUID
try:
subtask_id = (
UUID(subtask_id) if isinstance(subtask_id, str) else subtask_id
)
except ValueError as e:
errors.append(
{"error": f"Invalid UUID format for sub_task_id: {str(e)}"}
)
continue
scored_df = get_uprn_candidates(
epc_df,
user_address=input_address,
)
# Update existing subtask to 'in progress'
subtask_interface.update_subtask_status(subtask_id, "in progress")
logger.info(f"Processing subtask {subtask_id} for task {task_id}")
best_row = scored_df.iloc[0]
# Parse S3 URI and read CSV from S3
logger.info(f"Reading data from S3: {s3_uri}")
try:
bucket, key = parse_s3_uri(s3_uri)
csv_data = read_csv_from_s3_dict(bucket, key)
df = pd.DataFrame(csv_data)
logger.info(f"Loaded {len(df)} rows from S3")
except Exception as s3_error:
logger.error(f"Failed to read data from S3: {s3_error}")
errors.append(
{"error": "Failed to read data from S3", "details": str(s3_error)}
)
try:
subtask_interface.update_subtask_status(
subtask_id, "failed", outputs={"error": str(s3_error)}
)
except Exception as db_error:
logger.error(f"Failed to update subtask status: {db_error}")
continue
best_match_uprn = str(best_row["uprn"])
best_match_address = best_row["address"]
best_match_lexiscore = round(float(best_row["lexiscore"]), 4)
# Process the rows
logger.info(f"Processing {len(df)} rows for task {task_id}")
found_uprn = get_uprn(input_address, postcode)
# Create user_input column by concatenating Address columns if not already present
if "user_input" not in df.columns:
df["user_input"] = (
df["Address 1"].fillna("")
+ " "
+ df["Address 2"].fillna("")
+ " "
+ df["Address 3"].fillna("")
).str.strip()
logger.info(f"Created user_input column from Address 1 and Address 2")
else:
logger.info(f"user_input column already present in data")
clean_df = df.dropna(subset=["postcode_clean"])
postcode_to_addresses = {
postcode: group.to_dict(orient="records")
for postcode, group in clean_df.groupby("postcode_clean", sort=False)
}
logger.info(f"Total postcodes: {len(postcode_to_addresses)}")
# Process each postcode group
results_data = []
for postcode, postcode_rows in postcode_to_addresses.items():
logger.info(
f"Processing postcode: {postcode} with {len(postcode_rows)} rows"
)
# Validate postcode before processing
if not is_valid_postcode(postcode):
logger.warning(f"Postcode {postcode} is invalid, skipping")
continue
# Fetch EPC data once per postcode
try:
epc_df = get_epc_data_with_postcode(postcode=postcode)
logger.info(
f"Fetched {len(epc_df)} EPC records for postcode {postcode}"
)
except Exception as e:
logger.error(
f"Failed to fetch EPC data for postcode {postcode}: {e}"
)
continue
# Process each address in this postcode with the same EPC data
for row in postcode_rows:
try:
user_input = row.get("user_input", "")
if not user_input:
logger.warning(
f"Skipping row with missing user_input for postcode {postcode}"
)
continue
# Get UPRN using the pre-fetched EPC data with all return options
result = get_uprn_with_epc_df(
user_inputed_address=user_input, epc_df=epc_df, verbose=True
)
# Parse result tuple if successful
if result:
uprn, found_address, score = result
logger.info(
f"Found UPRN for {user_input} in {postcode}: {uprn} (score: {score})"
)
results_data.append(
{
**row, # Include all original data
"uprn": uprn,
"domna_found_address": found_address,
"domna_lexiscore": score,
}
)
else:
logger.warning(
f"No UPRN found for {user_input} in {postcode}"
)
results_data.append(
{
**row, # Include all original data
"uprn": None,
"domna_found_address": None,
"domna_lexiscore": None,
}
)
except Exception as e:
logger.error(
f"Error processing address {row.get('user_input', 'unknown')}: {e}"
)
# Still add the row with error markers
results_data.append(
{
**row,
"uprn": None,
"domna_found_address": None,
"domna_lexiscore": None,
"error": str(e),
}
)
continue
# Create results DataFrame
result_df = pd.DataFrame(results_data)
# Save results to S3
try:
save_results_to_s3(result_df, str(task_id), str(subtask_id))
except Exception as s3_error:
logger.error(f"Failed to save results to S3: {s3_error}")
# Mark subtask as completed
try:
subtask_interface.update_subtask_status(
subtask_id,
"completed",
outputs={"rows_processed": "todo -> show sensible output"},
)
logger.info(f"Marked subtask {subtask_id} as completed")
except Exception as db_error:
logger.error(f"Failed to mark subtask as completed: {db_error}")
except Exception as e:
failures.append(
{
**row.to_dict(),
"found_uprn": None,
"best_match_uprn": None,
"best_match_address": None,
"best_match_lexiscore": None,
"status": "exception",
"error": str(e),
}
)
continue
logger.error(f"Unexpected error processing record: {e}", exc_info=True)
errors.append({"error": "Unexpected error", "details": str(e)})
# Mark subtask as failed if we have one
if subtask_id:
try:
subtask_interface.update_subtask_status(
subtask_id, "failed", outputs={"error": str(e)}
)
except Exception as db_error:
logger.error(f"Failed to update subtask status: {db_error}")
found_uprn_norm = None if not found_uprn else str(found_uprn)
# Return error if all records failed
logger.info(results_data)
logger.info(results)
if errors and not results:
return {"statusCode": 500, "body": json.dumps({"errors": errors})}
if found_uprn_norm != expected_uprn:
failures.append(
{
**row.to_dict(),
"found_uprn": found_uprn_norm,
"best_match_uprn": best_match_uprn,
"best_match_address": best_match_address,
"best_match_lexiscore": best_match_lexiscore,
"status": ("no_match" if found_uprn_norm is None else "mismatch"),
}
)
failures_df = pd.DataFrame(failures)
print("===================================")
print(f"Total rows : {len(df)}")
print(f"Failures : {len(failures_df)}")
print("===================================")
failures_df.to_excel(
"hackney_uprn_failures.xlsx",
index=False,
)
return {
"statusCode": 200,
"body": json.dumps(
{"processed": results, "errors": errors if errors else None}
),
}
def handler(event, context):
print("hello world")
return {"statusCode": 200, "body": "hello world"}
# TO do function dispatcher,
# get_uprn_candidates(get_epc_data_with_postcode("E9 5NH"),"Flat 1, 5 Semley Gate" and Flat 5, 1 Semley Gate)
# fix that
# Look again at flat 1
# pandas reader the seperate postcode_splitter
# dump into s3
# TODO:
# Don't add results to return messages as its too verbose
# capture the exepection as e, into s3, to find the logs go to s3
# Upload results to s3 as well as csv

View file

@ -1,3 +1,5 @@
# one time script for a customer forhousing
import pandas as pd
from tqdm import tqdm
from backend.address2UPRN.main import get_uprn
@ -5,20 +7,35 @@ from backend.address2UPRN.main import get_uprn
# Enable tqdm for pandas
tqdm.pandas()
df = pd.read_excel("address2.xlsx")
file_name = "forhousing.xlsx"
df = pd.read_excel(file_name)
def extract_uprn(row):
print(row["User Input"], row["Postcode"])
result = get_uprn(row["User Input"], row["Postcode"], return_address=True)
user_input = "Address"
postcode = "Postcode"
result = get_uprn(
row[user_input],
row[postcode],
return_address=True,
return_EPC=True,
return_score=True,
)
if result is None:
return pd.Series([None, None])
return pd.Series([None, None, None, None])
uprn, found_address = result
return pd.Series([uprn, found_address])
uprn, found_address, epc, score = result
return pd.Series([uprn, found_address, epc, score])
df[["juntes uprn", "junte found address"]] = df.progress_apply(extract_uprn, axis=1)
df[["juntes uprn", "junte found address", "junte found epc", "junte score"]] = (
df.progress_apply(extract_uprn, axis=1)
)
df.to_excel("outputs2.xlsx", index=False)
df.to_excel(f"{file_name}_outputs.xlsx", index=False)
# TODO: add lexiscore
# TODO: run it
# TODO: give it to danny

View file

@ -18,37 +18,37 @@ def resolve_env_file() -> Optional[str]:
class Settings(BaseSettings):
API_KEY: str
API_KEY: str = "changeme"
API_KEY_NAME: str = "X-API-KEY"
SECRET_KEY: str
ENVIRONMENT: str
DATA_BUCKET: str
SECRET_KEY: str = "changeme"
ENVIRONMENT: str = "changeme"
DATA_BUCKET: str = "changeme"
PLAN_TRIGGER_BUCKET: str
ENGINE_SQS_URL: str
ENGINE_SQS_URL: str = "changeme"
# Third parties
EPC_AUTH_TOKEN: str
GOOGLE_SOLAR_API_KEY: str
EPC_AUTH_TOKEN: str = "changeme"
GOOGLE_SOLAR_API_KEY: str = "changeme"
# Database settings
DB_HOST: str
DB_PASSWORD: str
DB_USERNAME: str
DB_PORT: str
DB_NAME: str
DB_HOST: str = "changeme"
DB_PASSWORD: str = "changeme"
DB_USERNAME: str = "changeme"
DB_PORT: str = "changeme"
DB_NAME: str = "changeme"
# Prediction buckets
SAP_PREDICTIONS_BUCKET: str
CARBON_PREDICTIONS_BUCKET: str
HEAT_PREDICTIONS_BUCKET: str
SAP_PREDICTIONS_BUCKET: str = "changeme"
CARBON_PREDICTIONS_BUCKET: str = "changeme"
HEAT_PREDICTIONS_BUCKET: str = "changeme"
# LIGHTING_COST_PREDICTIONS_BUCKET: str
# HEATING_COST_PREDICTIONS_BUCKET: str
# HOT_WATER_COST_PREDICTIONS_BUCKET: str
HEATING_KWH_PREDICTIONS_BUCKET: str
HOTWATER_KWH_PREDICTIONS_BUCKET: str
HEATING_KWH_PREDICTIONS_BUCKET: str = "changeme"
HOTWATER_KWH_PREDICTIONS_BUCKET: str = "changeme"
# Other S3 buckts
ENERGY_ASSESSMENTS_BUCKET: str
ENERGY_ASSESSMENTS_BUCKET: str = "changeme"
# Optional AWS creds (only required in local)
AWS_ACCESS_KEY_ID: Optional[str] = None

View file

@ -10,7 +10,7 @@ mangum==0.19.0
# AWS
boto3==1.35.44
# Data
openpyxl==3.1.2
openpyxl==3.1.5
# Basic
pytz
sqlmodel

View file

@ -29,5 +29,5 @@ class ConditionTriggerRequest(BaseModel):
# {
# "file_type": "LBWF",
# "trigger_file_bucket": "condition-data-dev",
# "trigger_file_key": "input/lbwf/LBWF - Example Asset Data September 2025.xlsx",
# "trigger_file_key": "input/lbwf/LBWF - Example Asset Data September 2025.xlsx"
# }

View file

@ -1,9 +1,28 @@
FROM public.ecr.aws/lambda/python:3.10
FROM public.ecr.aws/lambda/python:3.11
ARG DEV_DB_HOST
ARG DEV_DB_PORT
ARG DEV_DB_NAME
ENV DB_HOST=${DEV_DB_HOST}
ENV DB_PORT=${DEV_DB_PORT}
ENV DB_NAME=${DEV_DB_NAME}
# Set working directory (Lambda task root)
WORKDIR /var/task
# -----------------------------
COPY backend/postcode_splitter/handler/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy necessary files for database and utility imports
COPY utils/ utils/
COPY backend/ backend/
COPY datatypes/ datatypes/
# Copy the handler
COPY backend/postcode_splitter/main.py .
# Lambda handler
# -----------------------------
CMD ["main.handler"]

View file

@ -0,0 +1,11 @@
pandas==2.2.2
numpy<2.0
requests
tqdm
openpyxl
epc-api-python==1.0.2
boto3==1.35.44
sqlmodel
sqlalchemy==2.0.36
psycopg2-binary==2.9.10
pydantic-settings==2.6.0

View file

@ -1,127 +1,278 @@
import os
import sys
import json
import pandas as pd
import requests
from backend.address2UPRN.main import (
resolve_uprns_for_postcode_group,
get_epc_data_with_postcode,
import boto3
from uuid import UUID, uuid4
from utils.s3 import (
read_csv_from_s3 as read_csv_from_s3_dict,
save_csv_to_s3,
parse_s3_uri,
)
from utils.logger import setup_logger
from tqdm import tqdm
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
from datetime import datetime
logger = setup_logger()
def sanitise_postcode(postcode: str) -> str | None:
def upload_batch_to_s3(
batch_df: pd.DataFrame, task_id: str, sub_task_id: str, bucket_name: str = None
) -> str:
"""
Normalise postcode for grouping.
- Uppercase
- Remove all whitespace
Upload batch DataFrame to S3 as CSV.
"""
if pd.isna(postcode):
return None
if bucket_name is None:
bucket_name = os.getenv("S3_BUCKET_NAME")
return postcode.upper().replace(" ", "")
def is_valid_postcode(postcode_clean: str) -> bool:
"""
Validate postcode using postcodes.io.
Expects a sanitised postcode (e.g. E84SQ).
Returns True if valid, False otherwise.
"""
POSTCODES_IO_VALIDATE_URL = "https://api.postcodes.io/postcodes/{postcode}/validate"
if not postcode_clean:
return False
if not bucket_name:
logger.error(
"S3 bucket name not provided and S3_BUCKET_NAME environment variable not set"
)
raise ValueError("S3_BUCKET_NAME not configured")
try:
resp = requests.get(
POSTCODES_IO_VALIDATE_URL.format(postcode=postcode_clean),
timeout=5,
file_name = f"{datetime.now().isoformat()}_{str(uuid4())[:8]}"
file_key = (
f"ara_postcode_splitter_batches/{task_id}/{sub_task_id}/{file_name}.csv"
)
resp.raise_for_status()
return resp.json().get("result", False)
except requests.RequestException:
# Network issues, rate limits, etc.
return False
success = save_csv_to_s3(batch_df, bucket_name, file_key)
if success:
s3_uri = f"s3://{bucket_name}/{file_key}"
logger.info(f"Successfully uploaded batch to {s3_uri}")
return s3_uri
else:
logger.error(f"Failed to upload batch to S3")
raise ValueError("Failed to save CSV to S3")
except Exception as e:
logger.error(f"Error uploading batch to S3: {str(e)}")
raise
def main():
df = pd.read_excel("hackney.xlsx", sheet_name="Sustainability")
df = df.head(500)
def send_to_address2uprn_queue(task_id: str, sub_task_id: str, s3_uri: str) -> str:
"""
Send a batch to the address2UPRN SQS queue with S3 reference.
# Sanitise postcodes
df["postcode_clean"] = df["Postcode"].apply(sanitise_postcode)
Args:
task_id: The parent task ID
sub_task_id: The new subtask ID for this batch
s3_uri: S3 URI pointing to the batch CSV file
# --- validate AFTER grouping (save API calls) ---
Returns:
Message ID from SQS
"""
sqs_client = boto3.client("sqs")
queue_url = os.getenv("ADDRESS2UPRN_QUEUE_URL")
# Get unique, non-null postcodes
unique_postcodes = df["postcode_clean"].dropna().unique()
if not queue_url:
raise ValueError("ADDRESS2UPRN_QUEUE_URL environment variable not set")
# Validate each postcode once, TODOadd a progress bar
postcode_validity = {
pc: is_valid_postcode(pc)
for pc in tqdm(unique_postcodes, total=len(unique_postcodes))
message_body = {
"task_id": task_id,
"sub_task_id": sub_task_id,
"s3_uri": s3_uri,
}
# Map validity back onto dataframe
df["postcode_valid"] = df["postcode_clean"].map(postcode_validity)
response = sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message_body),
)
logger.info(
f"Sent message to address2UPRN queue. "
f"Task: {task_id}, SubTask: {sub_task_id}, MessageId: {response['MessageId']}"
)
return response["MessageId"]
def create_batch_and_send_to_address2uprn(
batch_df: pd.DataFrame,
task_id: str,
sub_task_id: str,
subtask_interface: SubTaskInterface,
bucket_name: str,
) -> str:
"""
Create a batch DataFrame, upload to S3, create subtask, and send to address2UPRN queue.
"""
# Upload batch to S3
s3_uri = upload_batch_to_s3(batch_df, str(task_id), str(sub_task_id), bucket_name)
# Create a new subtask for this batch with all inputs
created_batch_sub_task_id = subtask_interface.create_subtask(
task_id=task_id,
inputs={
"task_id": str(task_id),
"s3_uri": s3_uri,
},
)
logger.info(f"Created batch subtask {created_batch_sub_task_id}")
# Send message with S3 reference
send_to_address2uprn_queue(
task_id=str(task_id),
sub_task_id=str(created_batch_sub_task_id),
s3_uri=s3_uri,
)
return created_batch_sub_task_id
def handler(event, context, local=False):
print(f"Function: {context.function_name}")
print(f"Request ID: {context.aws_request_id}")
# Example SQS message for testing (copy and paste into SQS):
if local is True:
event = {
"Records": [
{
"body": json.dumps(
{
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
"sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0",
"s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/peabody/2025_11_11 - Peabody - Data Extracts for Domna_transformed.csv",
}
)
}
]
}
# Handle both single event and batch events (SQS, etc.)
records = event.get("Records", [event])
results = []
errors = []
subtask_interface = SubTaskInterface()
bucket_name = os.getenv("S3_BUCKET_NAME")
if local:
bucket_name = "retrofit-data-dev"
for postcode, group_df in tqdm(
df[df["postcode_valid"]].groupby("postcode_clean"),
desc="Resolving UPRNs by postcode",
):
try:
epc_df = get_epc_data_with_postcode(postcode)
for record in records:
if local:
record = records[0]
task_id = None
subtask_id = None
# Parse body (inputs)
if epc_df.empty:
tmp = group_df.copy()
tmp["found_uprn"] = None
tmp["status"] = "no_epc_results"
results.append(tmp)
continue
if isinstance(record.get("body"), str):
body = json.loads(record["body"])
else:
body = record.get("body", {})
resolved = resolve_uprns_for_postcode_group(
group_df=group_df,
epc_df=epc_df,
# Validate required fields
task_id = body.get("task_id")
subtask_id = body.get("sub_task_id")
s3_uri = body.get("s3_uri")
# Convert task_id to UUID
task_id = UUID(task_id) if isinstance(task_id, str) else task_id
subtask_id = UUID(subtask_id) if isinstance(subtask_id, str) else subtask_id
# Mark subtask as in progress
subtask_interface.update_subtask_status(subtask_id, "in progress")
logger.info(f"Marked subtask {subtask_id} as in progress")
# Read CSV from S3
bucket, key = parse_s3_uri(s3_uri)
logger.info(f"S3 Bucket: {bucket}, Key: {key}")
csv_data = read_csv_from_s3_dict(bucket, key)
df = pd.DataFrame(csv_data)
logger.info(f"CSV loaded: {len(df)} rows, {len(df.columns)} columns")
# Sanitise postcodes
df["postcode_clean"] = df["postcode"].str.upper().str.replace(" ", "")
df = df.dropna(subset=["postcode_clean"])
batch_size = 500
if df.shape[0] < batch_size:
create_batch_and_send_to_address2uprn(
batch_df=df,
task_id=task_id,
sub_task_id=subtask_id,
subtask_interface=subtask_interface,
bucket_name=bucket_name,
)
else:
postcode_to_addresses = {
postcode: group
for postcode, group in df.groupby("postcode_clean", sort=False)
}
results.append(resolved)
count = 0
buffer = []
except Exception as e:
tmp = group_df.copy()
tmp["found_uprn"] = None
tmp["status"] = "exception"
tmp["error"] = str(e)
results.append(tmp)
for postcode, group_df in postcode_to_addresses.items():
group_len = len(group_df)
final_df = pd.concat(results, ignore_index=True)
a = final_df[
[
"best_match_lexiscore",
"Address 1",
"best_match_address",
"Postcode",
"UPRN",
"best_match_uprn",
]
] # add levi score to viewing
b = final_df[final_df["best_match_lexiscore"] > 0] # add levi score to viewing
b = b[
[
"best_match_lexiscore",
"Address 1",
"best_match_address",
"Postcode",
"UPRN",
"best_match_uprn",
]
]
# If single postcode is bigger than batch_size → send directly
if group_len >= batch_size:
if buffer:
create_batch_and_send_to_address2uprn(
batch_df=pd.concat(buffer, ignore_index=True),
task_id=task_id,
sub_task_id=subtask_id,
subtask_interface=subtask_interface,
bucket_name=bucket_name,
)
buffer = []
count = 0
create_batch_and_send_to_address2uprn(
batch_df=group_df,
task_id=task_id,
sub_task_id=subtask_id,
subtask_interface=subtask_interface,
bucket_name=bucket_name,
)
continue
def handler(event, context):
print("hello Postcode splitter world")
return {"statusCode": 200, "body": "hello world"}
# If adding would exceed batch → flush first
if count + group_len > batch_size:
create_batch_and_send_to_address2uprn(
batch_df=pd.concat(buffer, ignore_index=True),
task_id=task_id,
sub_task_id=subtask_id,
subtask_interface=subtask_interface,
bucket_name=bucket_name,
)
buffer = []
count = 0
# Add group
buffer.append(group_df)
count += group_len
if __name__ == "__main__":
main()
# Final flush
if buffer:
create_batch_and_send_to_address2uprn(
batch_df=pd.concat(buffer, ignore_index=True),
task_id=task_id,
sub_task_id=subtask_id,
subtask_interface=subtask_interface,
bucket_name=bucket_name,
)
# Mark subtask as completed
subtask_interface.update_subtask_status(
subtask_id,
"completed",
outputs={"rows_processed": "completed"},
)
return {
"statusCode": 200,
"body": json.dumps(
{"processed": results, "errors": errors if errors else None}
),
}

View file

@ -1,3 +1,30 @@
# ==============================================================================
# TEMPLATE: Lambda Configuration with Optional S3 IAM Policy
# ==============================================================================
# Instructions:
# 1. Replace "REPLACE ME" with your lambda name (e.g., "my-lambda-name")
# 2. Add any additional environment variables as needed
# 3. To attach S3 IAM policies from shared state:
# - Uncomment the S3 policy attachment section below
# - Update the policy_arn to match the output from shared/main.tf
# - Available shared outputs (examples):
# - data.terraform_remote_state.shared.outputs.condition_etl_s3_read_arn
# - data.terraform_remote_state.shared.outputs.postcode_splitter_s3_read_arn
# 4. To create a NEW S3 policy:
# - Add a new module "lambda_s3_policy" in shared/main.tf using the
# s3_iam_policy module (see examples in shared/main.tf)
# - Then reference it here using data.terraform_remote_state.shared.outputs
# ==============================================================================
data "terraform_remote_state" "shared" {
backend = "s3"
config = {
bucket = "assessment-model-terraform-state"
key = "env:/${var.stage}/terraform.tfstate"
region = "eu-west-2"
}
}
module "lambda" {
source = "../modules/lambda_with_sqs"
@ -12,3 +39,25 @@ module "lambda" {
LOG_LEVEL = "info"
}
}
# ======================================================================
# OPTIONAL: Attach S3 IAM policy to Lambda execution role
# ======================================================================
# Uncomment and configure the resource below to attach S3 permissions
#
# Example 1: Attach existing policy from shared state
# resource "aws_iam_role_policy_attachment" "lambda_s3_policy" {
# role = module.lambda.role_name
# policy_arn = data.terraform_remote_state.shared.outputs.YOUR_POLICY_OUTPUT_NAME_arn
# }
#
# Example 2: Attach multiple policies
# resource "aws_iam_role_policy_attachment" "lambda_read_policy" {
# role = module.lambda.role_name
# policy_arn = data.terraform_remote_state.shared.outputs.postcode_splitter_s3_read_arn
# }
#
# resource "aws_iam_role_policy_attachment" "lambda_write_policy" {
# role = module.lambda.role_name
# policy_arn = data.terraform_remote_state.shared.outputs.another_policy_arn
# }

View file

@ -1,3 +1,19 @@
data "terraform_remote_state" "shared" {
backend = "s3"
config = {
bucket = "assessment-model-terraform-state"
key = "env:/${var.stage}/terraform.tfstate"
region = "eu-west-2"
}
}
data "aws_secretsmanager_secret_version" "db_credentials" {
secret_id = "${var.stage}/assessment_model/db_credentials"
}
locals {
db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)
}
module "address2uprn" {
source = "../modules/lambda_with_sqs"
@ -6,9 +22,32 @@ module "address2uprn" {
image_uri = local.image_uri
environment = {
STAGE = var.stage
LOG_LEVEL = "info"
}
environment = merge(
{
STAGE = var.stage
LOG_LEVEL = "info"
DB_USERNAME = local.db_credentials.db_assessment_model_username
DB_PASSWORD = local.db_credentials.db_assessment_model_password
GOOGLE_SOLAR_API_KEY = "test"
SAP_PREDICTIONS_BUCKET = "test"
CARBON_PREDICTIONS_BUCKET = "test"
HEAT_PREDICTIONS_BUCKET = "test"
HEATING_KWH_PREDICTIONS_BUCKET = "test"
HOTWATER_KWH_PREDICTIONS_BUCKET = "test"
API_KEY = "test"
ENVIRONMENT = "test"
SECRET_KEY = "test"
PLAN_TRIGGER_BUCKET = "test"
DATA_BUCKET = "test"
ENGINE_SQS_URL = "test"
ENERGY_ASSESSMENTS_BUCKET = "test"
S3_BUCKET_NAME = data.terraform_remote_state.shared.outputs.retrofit_sap_data_bucket_name
},
)
}
# Attach S3 read policy to the Lambda execution role
resource "aws_iam_role_policy_attachment" "address2uprn_read_and_write" {
role = module.address2uprn.role_name
policy_arn = data.terraform_remote_state.shared.outputs.address_2_uprn_s3_read_and_write_arn
}

View file

@ -0,0 +1,14 @@
output "address2uprn_queue_url" {
value = module.address2uprn.queue_url
description = "URL of the address2UPRN SQS queue"
}
output "address2uprn_queue_arn" {
value = module.address2uprn.queue_arn
description = "ARN of the address2UPRN SQS queue"
}
output "address2uprn_lambda_arn" {
value = module.address2uprn.lambda_arn
description = "ARN of the address2UPRN Lambda function"
}

View file

@ -23,7 +23,6 @@ module "lambda" {
stage = var.stage
image_uri = local.image_uri
timeout = 180
environment = merge(

View file

@ -9,3 +9,4 @@ output "queue_arn" {
output "queue_url" {
value = module.queue.queue_url
}

View file

@ -1,3 +1,30 @@
data "terraform_remote_state" "shared" {
backend = "s3"
config = {
bucket = "assessment-model-terraform-state"
key = "env:/${var.stage}/terraform.tfstate"
region = "eu-west-2"
}
}
data "aws_secretsmanager_secret_version" "db_credentials" {
secret_id = "${var.stage}/assessment_model/db_credentials"
}
locals {
db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)
}
# Reference the existing address2UPRN Lambda outputs from address2uprn state
data "terraform_remote_state" "address2uprn" {
backend = "s3"
config = {
bucket = "address2uprn-terraform-state"
key = "env:/${var.stage}/terraform.tfstate"
region = "eu-west-2"
}
}
module "lambda" {
source = "../modules/lambda_with_sqs"
@ -7,8 +34,56 @@ module "lambda" {
image_uri = local.image_uri
environment = {
STAGE = var.stage
LOG_LEVEL = "info"
}
environment = merge(
{
STAGE = var.stage
LOG_LEVEL = "info"
DB_USERNAME = local.db_credentials.db_assessment_model_username
DB_PASSWORD = local.db_credentials.db_assessment_model_password
GOOGLE_SOLAR_API_KEY = "test"
SAP_PREDICTIONS_BUCKET = "test"
CARBON_PREDICTIONS_BUCKET = "test"
HEAT_PREDICTIONS_BUCKET = "test"
HEATING_KWH_PREDICTIONS_BUCKET = "test"
HOTWATER_KWH_PREDICTIONS_BUCKET = "test"
API_KEY = "test"
ENVIRONMENT = "test"
SECRET_KEY = "test"
PLAN_TRIGGER_BUCKET = "test"
DATA_BUCKET = "test"
EPC_AUTH_TOKEN = "test"
ENGINE_SQS_URL = "test"
ENERGY_ASSESSMENTS_BUCKET = "test"
ADDRESS2UPRN_QUEUE_URL = data.terraform_remote_state.address2uprn.outputs.address2uprn_queue_url
S3_BUCKET_NAME = data.terraform_remote_state.shared.outputs.retrofit_sap_data_bucket_name
},
)
}
# Attach S3 read policy to the Lambda execution role
resource "aws_iam_role_policy_attachment" "postcode_splitter_s3_read" {
role = module.lambda.role_name
policy_arn = data.terraform_remote_state.shared.outputs.postcode_splitter_s3_read_arn
}
# Create SQS send policy for address2UPRN queue
module "postcode_splitter_sqs_policy" {
source = "../../modules/general_iam_policy"
policy_name = "postcode-splitter-sqs-send-${var.stage}"
policy_description = "Allow postcode-splitter Lambda to send messages to address2UPRN queue"
actions = [
"sqs:SendMessage"
]
resources = [
data.terraform_remote_state.address2uprn.outputs.address2uprn_queue_arn
]
}
# Attach SQS policy to the Lambda execution role
resource "aws_iam_role_policy_attachment" "postcode_splitter_sqs_send" {
role = module.lambda.role_name
policy_arn = module.postcode_splitter_sqs_policy.policy_arn
}

View file

@ -24,3 +24,12 @@ locals {
output "resolved_image_uri" {
value = local.image_uri
}

View file

@ -0,0 +1,21 @@
# IAM Policy with dynamic actions and resources
resource "aws_iam_policy" "policy" {
name = var.policy_name
description = var.policy_description
policy = jsonencode({
Version = "2012-10-17"
Statement = [
merge(
{
Effect = "Allow"
Action = var.actions
Resource = var.resources
},
var.conditions != null ? { Condition = var.conditions } : {}
)
]
})
tags = var.tags
}

View file

@ -0,0 +1,9 @@
output "policy_arn" {
value = aws_iam_policy.policy.arn
description = "ARN of the created IAM policy"
}
output "policy_name" {
value = aws_iam_policy.policy.name
description = "Name of the created IAM policy"
}

View file

@ -0,0 +1,32 @@
variable "policy_name" {
description = "Name of the IAM policy"
type = string
}
variable "policy_description" {
description = "Description of the IAM policy"
type = string
default = ""
}
variable "actions" {
description = "List of IAM actions allowed by this policy"
type = list(string)
}
variable "resources" {
description = "List of AWS resources this policy applies to"
type = list(string)
}
variable "conditions" {
description = "Optional IAM policy conditions"
type = any
default = null
}
variable "tags" {
description = "Tags to apply to the policy"
type = map(string)
default = {}
}

View file

@ -19,19 +19,3 @@ resource "aws_iam_role_policy_attachment" "basic_logs" {
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}
resource "aws_iam_role_policy" "ecr_pull" {
role = aws_iam_role.this.name
policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Effect = "Allow"
Action = [
"ecr:GetAuthorizationToken",
"ecr:BatchGetImage",
"ecr:GetDownloadUrlForLayer"
]
Resource = "*"
}]
})
}

View file

@ -0,0 +1,31 @@
# Dynamically build S3 resources list from bucket ARNs and resource paths
locals {
# Generate full resource ARNs by combining bucket ARNs with resource paths
resources = flatten([
for bucket_arn in var.bucket_arns : [
for path in var.resource_paths : "${bucket_arn}${path}"
]
])
}
# IAM Policy with dynamic actions and resources
resource "aws_iam_policy" "s3_policy" {
name = var.policy_name
description = var.policy_description
policy = jsonencode({
Version = "2012-10-17"
Statement = [
merge(
{
Effect = "Allow"
Action = var.actions
Resource = local.resources
},
var.conditions != null ? { Condition = var.conditions } : {}
)
]
})
tags = var.tags
}

View file

@ -0,0 +1,14 @@
output "policy_arn" {
description = "ARN of the S3 IAM policy"
value = aws_iam_policy.s3_policy.arn
}
output "policy_name" {
description = "Name of the S3 IAM policy"
value = aws_iam_policy.s3_policy.name
}
output "policy_id" {
description = "ID of the S3 IAM policy"
value = aws_iam_policy.s3_policy.id
}

View file

@ -0,0 +1,42 @@
variable "policy_name" {
description = "Name of the IAM policy"
type = string
}
variable "policy_description" {
description = "Description of the IAM policy"
type = string
default = ""
}
variable "bucket_arns" {
description = "List of S3 bucket ARNs to grant access to"
type = list(string)
}
variable "actions" {
description = "List of S3 actions to allow (e.g., ['s3:GetObject'], ['s3:PutObject'], ['s3:DeleteObject'])"
type = list(string)
default = ["s3:GetObject"]
}
variable "resource_paths" {
description = "List of resource paths within buckets (e.g., ['/*'] for all objects, ['/specific-prefix/*'] for specific prefix)"
type = list(string)
default = ["/*"]
}
variable "conditions" {
description = "Optional IAM policy conditions to apply to the statement"
type = any
default = null
}
variable "tags" {
description = "Tags to apply to the policy"
type = map(string)
default = {}
}

View file

@ -133,6 +133,11 @@ module "retrofit_sap_data" {
allowed_origins = var.allowed_origins
}
output "retrofit_sap_data_bucket_name" {
value = module.retrofit_sap_data.bucket_name
description = "Name of the retrofit SAP data bucket"
}
module "retrofit_carbon_predictions" {
source = "../modules/s3"
bucketname = "retrofit-carbon-predictions-${var.stage}"
@ -305,6 +310,21 @@ module "address2uprn_registry" {
}
# S3 policy for postcode splitter to read from retrofit data bucket
module "address2uprn_s3_read_and_write" {
source = "../modules/s3_iam_policy"
policy_name = "Address2UPRNReadandWriteS3"
policy_description = "Allow address2uprn Lambda to read and write from retrofit-data bucket"
bucket_arns = ["arn:aws:s3:::retrofit-data-${var.stage}"]
actions = ["s3:GetObject", "s3:ListBucket", "s3:PutObject"]
resource_paths = ["/*"]
}
output "address_2_uprn_s3_read_and_write_arn" {
value = module.address2uprn_s3_read_and_write.policy_arn
}
################################################
# Condition ETL Lambda ECR
################################################
@ -321,6 +341,28 @@ module "condition_etl_registry" {
}
# Condition Data S3 Bucket to store initial data
module "condition_data_bucket" {
source = "../modules/s3"
bucketname = "condition-data-${var.stage}"
allowed_origins = var.allowed_origins
}
module "condition_etl_s3_read" {
source = "../modules/s3_iam_policy"
policy_name = "ConditionETLReadS3"
policy_description = "Allow Lambda to read objects from condition-data-${var.stage}"
bucket_arns = ["arn:aws:s3:::condition-data-${var.stage}"]
actions = ["s3:GetObject"]
resource_paths = ["/*"]
}
output "condition_etl_s3_read_arn" {
value = module.condition_etl_s3_read.policy_arn
}
################################################
# Postcode Splitter Lambda ECR
################################################
@ -337,30 +379,17 @@ module "postcode_splitter_registry" {
}
################################################
# Conidition data S3 bucket
################################################
module "condition_data_bucket" {
source = "../modules/s3"
bucketname = "condition-data-${var.stage}"
allowed_origins = var.allowed_origins
# S3 policy for postcode splitter to read from retrofit data bucket
module "postcode_splitter_s3_read" {
source = "../modules/s3_iam_policy"
policy_name = "PostcodeSplitterReadS3"
policy_description = "Allow postcode splitter Lambda to read from retrofit-data bucket"
bucket_arns = ["arn:aws:s3:::retrofit-data-${var.stage}"]
actions = ["s3:GetObject", "s3:ListBucket", "s3:PutObject"]
resource_paths = ["/*"]
}
resource "aws_iam_policy" "condition_etl_s3_read" {
name = "ConditionETLReadS3"
description = "Allow Lambda to read objects from condition-data-${var.stage}"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = ["s3:GetObject"]
Resource = "arn:aws:s3:::condition-data-${var.stage}/*"
}
]
})
}
output "condition_etl_s3_read_arn" {
value = aws_iam_policy.condition_etl_s3_read.arn
output "postcode_splitter_s3_read_arn" {
value = module.postcode_splitter_s3_read.policy_arn
}

View file

@ -36,6 +36,8 @@ scenario_names = {
1059: "EPC C - 10k budget",
}
project_name = "manchester"
def get_data(portfolio_id, scenario_ids):
session = sessionmaker(bind=db_engine)()
@ -232,7 +234,7 @@ for scenario_id in SCENARIOS:
# Get recs for this scenario
recommended_measures_df = recommendations_df[
recommendations_df["scenario_id"] == scenario_id
][["property_id", "measure_type", "estimated_cost", "default"]]
][["property_id", "measure_type", "estimated_cost", "default"]]
recommended_measures_df = recommended_measures_df[
recommended_measures_df["default"]
]
@ -240,7 +242,7 @@ for scenario_id in SCENARIOS:
post_install_sap = recommendations_df[
recommendations_df["scenario_id"] == scenario_id
][["property_id", "default", "sap_points"]]
][["property_id", "default", "sap_points"]]
post_install_sap = post_install_sap[post_install_sap["default"]]
# Sum up the sap points by property id
post_install_sap = (
@ -286,6 +288,8 @@ for scenario_id in SCENARIOS:
"current_sap_points",
"total_floor_area",
"number_of_rooms",
"lodgement_date",
"is_expired",
"id",
]
]
@ -303,7 +307,58 @@ for scenario_id in SCENARIOS:
)
df["uprn"] = df["uprn"].astype(str)
relevant_plans = plans_df[plans_df["scenario_id"] == scenario_id]
df2 = df.merge(
relevant_plans[["property_id", "post_sap_points", "post_epc_rating"]],
how="left",
on="property_id",
suffixes=("", "_plan"),
)
print(df2["predicted_post_works_epc"].value_counts())
print(df2["post_epc_rating"].value_counts())
z = df2[
(df2["predicted_post_works_epc"] != "D")
& (df2["post_epc_rating"].astype(str) == "Epc.D")
]
df2["predicted_post_works_epc"].value_counts()
df2["post_epc_rating"].astype(str).value_counts()
df2[df2["total_retrofit_cost"] > 0].shape
getting_works = df[df["total_retrofit_cost"] > 0]
getting_works["predicted_post_works_epc"].value_counts()
32565 / getting_works.shape[0]
df[df["predicted_post_works_sap"] == ""]
# Expected columns list
expected_columns = [
"suspended_floor_insulation",
"solid_floor_insulation",
"external_wall_insulation",
"internal_wall_insulation",
"cavity_wall_insulation",
"loft_insulation",
"flat_roof_insulation",
"room_roof_insulation",
"secondary_glazing",
"double_glazing",
"solar_pv",
"high_heat_retention_storage_heaters",
"air_source_heat_pump",
"boiler_upgrade",
"roomstat_programmer_trvs",
"time_temperature_zone_control",
]
# Add missing columns with default values
for col in expected_columns:
if col not in df.columns:
df[col] = ""
# Create excel to store to
filename = f"{scenario_names[scenario_id]} - 20250113 final.xlsx"
filename = f"{scenario_names[scenario_id]} - {project_name}.xlsx"
with pd.ExcelWriter(filename) as writer:
df.to_excel(writer, sheet_name="properties", index=False)

View file

@ -3,12 +3,62 @@ import boto3
import csv
import pandas as pd
from io import BytesIO, StringIO
from urllib.parse import unquote
from utils.logger import setup_logger
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
logger = setup_logger()
def parse_s3_uri(s3_uri: str) -> tuple[str, str]:
"""
Parse S3 URI to extract bucket and key.
Supports two formats:
1. S3 URI format: s3://bucket/key
2. AWS console URL format with query parameters
"""
logger.info("Parsing S3 URI")
try:
# Check if it's an S3 URI format
if s3_uri.startswith("s3://"):
parts = s3_uri[5:].split("/", 1)
if len(parts) < 2:
raise ValueError("S3 URI must include both bucket and key")
bucket = parts[0]
key = parts[1]
logger.info(f"Extracted bucket: {bucket}, key: {key}")
return bucket, key
# Otherwise, treat as AWS console URL
logger.info("Parsing as AWS console URL")
# Split base URL and query string
if "?" not in s3_uri:
raise ValueError("No query string found")
base, query = s3_uri.split("?", 1)
# Extract bucket from base URL
if "/s3/object/" not in base:
raise ValueError("No '/s3/object/' found in URL path")
path_parts = base.split("/s3/object/")
bucket = path_parts[1]
logger.info(f"Extracted bucket: {bucket}")
# Extract prefix from query parameters
params = dict(item.split("=") for item in query.split("&") if "=" in item)
key = unquote(params.get("prefix", ""))
logger.info(f"Extracted key: {key}")
return bucket, key
except Exception as e:
logger.error(f"Error parsing S3 URI: {type(e).__name__}: {e}")
raise ValueError(f"Could not parse S3 URI") from e
def read_from_s3(bucket_name, s3_file_name):
"""
Read an object from s3. Decoding of the data is left for outside of this function