mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
Merge branch 'main' into deploy-categorisation
This commit is contained in:
commit
a78981fd0b
24 changed files with 885 additions and 306 deletions
|
|
@ -22,7 +22,9 @@
|
|||
"jgclark.vscode-todo-highlight",
|
||||
"corentinartaud.pdfpreview",
|
||||
"ms-python.vscode-python-envs",
|
||||
"ms-python.black-formatter"
|
||||
"ms-python.black-formatter",
|
||||
"GrapeCity.gc-excelviewer",
|
||||
"jakobhoeg.vscode-pokemon"
|
||||
],
|
||||
"settings": {
|
||||
"files.defaultWorkspace": "/workspaces/model",
|
||||
|
|
|
|||
|
|
@ -43,4 +43,17 @@ WORKDIR /workspaces/model
|
|||
|
||||
# 6) Make Python find your package
|
||||
# Add project root to PYTHONPATH for all processes
|
||||
ENV PYTHONPATH=/workspaces/model:${PYTHONPATH}
|
||||
ENV PYTHONPATH=/workspaces/model:${PYTHONPATH}
|
||||
|
||||
|
||||
# Install terraform
|
||||
RUN apt-get update && sudo apt-get install -y gnupg software-properties-common
|
||||
RUN wget -O- https://apt.releases.hashicorp.com/gpg | \
|
||||
gpg --dearmor | \
|
||||
sudo tee /usr/share/keyrings/hashicorp-archive-keyring.gpg > /dev/null
|
||||
RUN echo "deb [signed-by=/usr/share/keyrings/hashicorp-archive-keyring.gpg] \
|
||||
https://apt.releases.hashicorp.com $(lsb_release -cs) main" | \
|
||||
tee /etc/apt/sources.list.d/hashicorp.list
|
||||
RUN apt update
|
||||
RUN apt-get install terraform
|
||||
RUN terraform -install-autocomplete
|
||||
|
|
@ -9,7 +9,7 @@ mangum==0.19.0
|
|||
# AWS
|
||||
boto3==1.35.44
|
||||
# Data
|
||||
openpyxl==3.1.2
|
||||
openpyxl==3.1.5
|
||||
# Basic
|
||||
pytz
|
||||
uvicorn[standard]
|
||||
|
|
|
|||
14
.github/workflows/_build_image.yml
vendored
14
.github/workflows/_build_image.yml
vendored
|
|
@ -38,6 +38,8 @@ on:
|
|||
required: false
|
||||
DEV_DB_NAME:
|
||||
required: false
|
||||
EPC_AUTH_TOKEN:
|
||||
required: false
|
||||
|
||||
jobs:
|
||||
build:
|
||||
|
|
@ -47,6 +49,7 @@ jobs:
|
|||
DEV_DB_HOST: ${{ secrets.DEV_DB_HOST }}
|
||||
DEV_DB_PORT: ${{ secrets.DEV_DB_PORT }}
|
||||
DEV_DB_NAME: ${{ secrets.DEV_DB_NAME }}
|
||||
EPC_AUTH_TOKEN: ${{ secrets.EPC_AUTH_TOKEN }}
|
||||
|
||||
outputs:
|
||||
image_digest: ${{ steps.digest.outputs.image_digest }}
|
||||
|
|
@ -87,14 +90,17 @@ jobs:
|
|||
temp=$(eval echo "$line")
|
||||
BUILD_ARGS="$BUILD_ARGS --build-arg $temp"
|
||||
done <<< "${{ inputs.build_args }}"
|
||||
|
||||
docker build \
|
||||
|
||||
docker buildx build \
|
||||
--no-cache \
|
||||
--platform linux/amd64 \
|
||||
--provenance=false \
|
||||
--sbom=false \
|
||||
--push \
|
||||
-f ${{ inputs.dockerfile_path }} \
|
||||
$BUILD_ARGS \
|
||||
-t $IMAGE_URI \
|
||||
${{ inputs.build_context }}
|
||||
|
||||
docker push $IMAGE_URI
|
||||
|
||||
- name: Resolve image digest
|
||||
id: digest
|
||||
|
|
|
|||
8
.github/workflows/_deploy_lambda.yml
vendored
8
.github/workflows/_deploy_lambda.yml
vendored
|
|
@ -106,4 +106,10 @@ jobs:
|
|||
- name: Terraform Destroy
|
||||
if: inputs.terraform_destroy == 'true' && inputs.terraform_apply != 'true'
|
||||
working-directory: ${{ inputs.lambda_path }}
|
||||
run: terraform destroy -auto-approve
|
||||
run: |
|
||||
terraform destroy -auto-approve \
|
||||
-var="stage=${{ inputs.stage }}" \
|
||||
-var="lambda_name=${{ inputs.lambda_name }}" \
|
||||
-var="ecr_repo_url=${{ steps.repo.outputs.ecr_repo_url }}" \
|
||||
-var="image_digest=${{ inputs.image_digest }}"
|
||||
|
||||
|
|
|
|||
1
.github/workflows/deploy_fastapi_backend.yml
vendored
1
.github/workflows/deploy_fastapi_backend.yml
vendored
|
|
@ -141,3 +141,4 @@ jobs:
|
|||
|
||||
# Deploy to AWS Lambda via Serverless
|
||||
sls deploy --stage ${{ github.ref_name }} --verbose
|
||||
|
||||
|
|
|
|||
19
.github/workflows/deploy_terraform.yml
vendored
19
.github/workflows/deploy_terraform.yml
vendored
|
|
@ -9,6 +9,7 @@ on:
|
|||
- '.github/workflows/deploy_terraform.yml'
|
||||
- '.github/workflows/_build_image.yml'
|
||||
- '.github/workflows/_deploy_lambda.yml'
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
determine_stage:
|
||||
|
|
@ -76,10 +77,10 @@ jobs:
|
|||
run: terraform plan -var-file=${STAGE}.tfvars -out=tfplan
|
||||
|
||||
- name: Terraform Apply
|
||||
if: env.STAGE == 'prod'
|
||||
if: env.TERRAFORM_APPLY == 'true'
|
||||
working-directory: infrastructure/terraform/shared
|
||||
run: terraform apply -auto-approve tfplan
|
||||
|
||||
|
||||
# ============================================================
|
||||
# 2️⃣ Build Address 2 UPRN image and Push
|
||||
# ============================================================
|
||||
|
|
@ -90,10 +91,19 @@ jobs:
|
|||
ecr_repo: address2uprn-${{ needs.determine_stage.outputs.stage }}
|
||||
dockerfile_path: backend/address2UPRN/handler/Dockerfile
|
||||
build_context: .
|
||||
build_args: |
|
||||
DEV_DB_HOST=$DEV_DB_HOST
|
||||
DEV_DB_PORT=$DEV_DB_PORT
|
||||
DEV_DB_NAME=$DEV_DB_NAME
|
||||
EPC_AUTH_TOKEN=$EPC_AUTH_TOKEN
|
||||
secrets:
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }}
|
||||
AWS_REGION: ${{ secrets.DEV_AWS_REGION }}
|
||||
DEV_DB_HOST: ${{ secrets.DEV_DB_HOST }}
|
||||
DEV_DB_PORT: ${{ secrets.DEV_DB_PORT }}
|
||||
DEV_DB_NAME: ${{ secrets.DEV_DB_NAME }}
|
||||
EPC_AUTH_TOKEN: ${{ secrets.DEV_EPC_AUTH_TOKEN }}
|
||||
|
||||
# ============================================================
|
||||
# 3️⃣ Deploy Address 2 UPRN Lambda
|
||||
|
|
@ -140,7 +150,7 @@ jobs:
|
|||
# 3️⃣ Deploy Postcode Splitter Lambda
|
||||
# ============================================================
|
||||
postcodeSplitter_lambda:
|
||||
needs: [postcodeSplitter_image, determine_stage]
|
||||
needs: [postcodeSplitter_image, determine_stage, address2uprn_lambda]
|
||||
uses: ./.github/workflows/_deploy_lambda.yml
|
||||
with:
|
||||
lambda_name: postcodeSplitter
|
||||
|
|
@ -192,4 +202,5 @@ jobs:
|
|||
secrets:
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }}
|
||||
AWS_REGION: ${{ secrets.DEV_AWS_REGION }}
|
||||
AWS_REGION: ${{ secrets.DEV_AWS_REGION }}
|
||||
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ from etl.epc_clean.epc_attributes.WallAttributes import WallAttributes
|
|||
logger = setup_logger()
|
||||
|
||||
# OpenAI API Key (set this in your environment variables for security)
|
||||
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "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")
|
||||
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -13,11 +13,15 @@ from asset_list.utils import get_data
|
|||
from dotenv import load_dotenv
|
||||
from backend.SearchEpc import SearchEpc
|
||||
|
||||
load_dotenv(dotenv_path="backend/.env")
|
||||
load_dotenv(dotenv_path="../backend/.env")
|
||||
EPC_AUTH_TOKEN = os.getenv(
|
||||
"EPC_AUTH_TOKEN",
|
||||
)
|
||||
|
||||
OPENAI_API_KEY = os.getenv(
|
||||
"OPENAI_API_KEY",
|
||||
)
|
||||
|
||||
|
||||
def extract_address1(
|
||||
asset_list, full_address_col, postcode_col, method="first_two_words"
|
||||
|
|
@ -109,21 +113,21 @@ def app():
|
|||
)
|
||||
data_filename = "to_standardise_uprns.xlsx"
|
||||
sheet_name = "Sheet1"
|
||||
postcode_column = "Postcode"
|
||||
postcode_column = "POSTCODE"
|
||||
address1_column = None
|
||||
address1_method = "house_number_extraction"
|
||||
fulladdress_column = "Address"
|
||||
address_cols_to_concat = None
|
||||
fulladdress_column = "ADDRESS"
|
||||
address_cols_to_concat = []
|
||||
missing_postcodes_method = None
|
||||
landlord_year_built = None
|
||||
landlord_os_uprn = None
|
||||
landlord_property_type = None
|
||||
landlord_built_form = None
|
||||
landlord_wall_construction = None
|
||||
landlord_roof_construction = None
|
||||
landlord_heating_system = None
|
||||
landlord_property_type = "PROPERTY TYPE"
|
||||
landlord_built_form = None # Skipped as empty
|
||||
landlord_wall_construction = "wall combined" # combin F + G
|
||||
landlord_roof_construction = "HEATING SYSTEM" # Combine I + J
|
||||
landlord_heating_system = None # Check with Khalim
|
||||
landlord_existing_pv = None
|
||||
landlord_property_id = "LLUPRN"
|
||||
landlord_property_id = "UPRN"
|
||||
landlord_sap = None
|
||||
outcomes_filename = None
|
||||
outcomes_sheetname = None
|
||||
|
|
@ -275,7 +279,7 @@ def app():
|
|||
if skip is not None and not force_retrieve_data:
|
||||
if i <= skip:
|
||||
continue
|
||||
chunk = asset_list.standardised_asset_list[i: i + chunk_size]
|
||||
chunk = asset_list.standardised_asset_list[i : i + chunk_size]
|
||||
epc_data_chunk, errors_chunk, no_epc_chunk = get_data(
|
||||
df=chunk,
|
||||
row_id_name=asset_list.DOMNA_PROPERTY_ID,
|
||||
|
|
@ -418,7 +422,7 @@ def app():
|
|||
# Retrieve just the data we need
|
||||
epc_df = epc_df[
|
||||
[asset_list.DOMNA_PROPERTY_ID] + list(asset_list.EPC_API_DATA_NAMES.keys())
|
||||
].rename(columns=asset_list.EPC_API_DATA_NAMES)
|
||||
].rename(columns=asset_list.EPC_API_DATA_NAMES)
|
||||
|
||||
# Look for columns not in the find my EPC data, which will have happened if we didn't
|
||||
# retrieve it in the first place
|
||||
|
|
@ -435,7 +439,7 @@ def app():
|
|||
find_my_epc_data[
|
||||
[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"]
|
||||
+ list(asset_list.FIND_EPC_DATA_NAMES.keys())
|
||||
].rename(columns=asset_list.FIND_EPC_DATA_NAMES),
|
||||
].rename(columns=asset_list.FIND_EPC_DATA_NAMES),
|
||||
how="left",
|
||||
on=asset_list.DOMNA_PROPERTY_ID,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -19,4 +19,4 @@ PLAN_TRIGGER_BUCKET=test
|
|||
DATA_BUCKET=test
|
||||
EPC_AUTH_TOKEN=test
|
||||
ENGINE_SQS_URL=test
|
||||
ENERGY_ASSESSMENTS_BUCKET=test
|
||||
ENERGY_ASSESSMENTS_BUCKET=test
|
||||
|
|
|
|||
|
|
@ -1,4 +1,17 @@
|
|||
FROM public.ecr.aws/lambda/python:3.10
|
||||
# FROM python:3.11.10-bullseye
|
||||
|
||||
|
||||
ARG DEV_DB_HOST
|
||||
ARG DEV_DB_PORT
|
||||
ARG DEV_DB_NAME
|
||||
ARG EPC_AUTH_TOKEN
|
||||
|
||||
ENV DB_HOST=${DEV_DB_HOST}
|
||||
ENV DB_PORT=${DEV_DB_PORT}
|
||||
ENV DB_NAME=${DEV_DB_NAME}
|
||||
ENV EPC_AUTH_TOKEN=${EPC_AUTH_TOKEN}
|
||||
|
||||
|
||||
# Set working directory (Lambda task root)
|
||||
WORKDIR /var/task
|
||||
|
|
@ -8,13 +21,17 @@ WORKDIR /var/task
|
|||
# -----------------------------
|
||||
COPY backend/address2UPRN/handler/requirements.txt .
|
||||
|
||||
|
||||
# Install dependencies into Lambda runtime
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
# -----------------------------
|
||||
# Copy application code
|
||||
# -----------------------------
|
||||
|
||||
# Copy necessary files for database and utility imports
|
||||
COPY utils/ utils/
|
||||
COPY backend/ backend/
|
||||
COPY datatypes/ datatypes/
|
||||
|
||||
# Copy the handler
|
||||
COPY backend/address2UPRN/main.py .
|
||||
|
||||
# -----------------------------
|
||||
|
|
|
|||
|
|
@ -1,3 +1,11 @@
|
|||
epc-api-python==1.0.2
|
||||
pandas==2.2.2
|
||||
numpy<2.0
|
||||
requests
|
||||
tqdm
|
||||
pandas
|
||||
openpyxl
|
||||
epc-api-python==1.0.2
|
||||
boto3==1.35.44
|
||||
sqlmodel
|
||||
sqlalchemy==2.0.36
|
||||
psycopg2-binary==2.9.10
|
||||
pydantic-settings==2.6.0
|
||||
|
|
@ -3,12 +3,23 @@ import os
|
|||
from urllib.parse import urlencode
|
||||
import pandas as pd
|
||||
from difflib import SequenceMatcher
|
||||
from tqdm import tqdm
|
||||
from utils.logger import setup_logger
|
||||
import re
|
||||
from typing import Set
|
||||
import json
|
||||
import requests
|
||||
from uuid import UUID
|
||||
import uuid
|
||||
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
|
||||
from utils.s3 import (
|
||||
save_csv_to_s3,
|
||||
read_csv_from_s3 as read_csv_from_s3_dict,
|
||||
parse_s3_uri,
|
||||
)
|
||||
from datetime import datetime
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
import re
|
||||
|
||||
EPC_AUTH_TOKEN = os.getenv(
|
||||
"EPC_AUTH_TOKEN",
|
||||
|
|
@ -17,9 +28,28 @@ EPC_AUTH_TOKEN = os.getenv(
|
|||
if EPC_AUTH_TOKEN is None:
|
||||
raise RuntimeError("EPC_AUTH_TOKEN not defined in env")
|
||||
|
||||
import re
|
||||
from difflib import SequenceMatcher
|
||||
from typing import Set
|
||||
|
||||
def is_valid_postcode(postcode_clean: str) -> bool:
|
||||
"""
|
||||
Validate postcode using postcodes.io.
|
||||
|
||||
Expects a sanitised postcode (e.g. E84SQ).
|
||||
Returns True if valid, False otherwise.
|
||||
"""
|
||||
POSTCODES_IO_VALIDATE_URL = "https://api.postcodes.io/postcodes/{postcode}/validate"
|
||||
if not postcode_clean:
|
||||
return False
|
||||
|
||||
try:
|
||||
resp = requests.get(
|
||||
POSTCODES_IO_VALIDATE_URL.format(postcode=postcode_clean),
|
||||
timeout=5,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json().get("result", False)
|
||||
except requests.RequestException:
|
||||
# Network issues, rate limits, etc.
|
||||
return False
|
||||
|
||||
|
||||
def levenshtein(a: str, b: str) -> float:
|
||||
|
|
@ -300,27 +330,29 @@ def get_uprn_candidates(
|
|||
)
|
||||
|
||||
|
||||
def get_uprn(user_inputed_address: str, postcode: str, return_address=False):
|
||||
def get_uprn_with_epc_df(
|
||||
user_inputed_address: str,
|
||||
epc_df: pd.DataFrame,
|
||||
verbose: bool = False,
|
||||
):
|
||||
"""
|
||||
Return uprn (str)
|
||||
Return False if failed to find a sensible matching epc
|
||||
Return Nons when epc found but no UPRN
|
||||
Return uprn (str) using a pre-fetched EPC dataframe.
|
||||
This avoids calling the API multiple times for the same postcode.
|
||||
"""
|
||||
df = get_epc_data_with_postcode(postcode=postcode)
|
||||
|
||||
if df.empty:
|
||||
if epc_df.empty:
|
||||
return None
|
||||
|
||||
scored_df = get_uprn_candidates(
|
||||
df,
|
||||
epc_df,
|
||||
user_address=user_inputed_address,
|
||||
)
|
||||
|
||||
# Best score
|
||||
best_score = scored_df.iloc[0]["lexiscore"]
|
||||
|
||||
if best_score <= 0:
|
||||
return None
|
||||
# # Return None if score is below threshold
|
||||
# if best_score < 0.7:
|
||||
# return None
|
||||
|
||||
# All rank-1 rows (possible draw)
|
||||
top_rank_df = scored_df[scored_df["lexirank"] == 1]
|
||||
|
|
@ -330,18 +362,41 @@ def get_uprn(user_inputed_address: str, postcode: str, return_address=False):
|
|||
return None
|
||||
|
||||
address = top_rank_df["address"].values[0]
|
||||
lexiscore = float(top_rank_df["lexiscore"].values[0])
|
||||
score = float(top_rank_df["lexiscore"].values[0])
|
||||
|
||||
logger.info(f"Address found to be: {address}, with lexiscore {lexiscore}")
|
||||
logger.info(f"Address found to be: {address}, with lexiscore {score}")
|
||||
# Safe to return the agreed UPRN
|
||||
found_uprn = top_rank_df.iloc[0]["uprn"]
|
||||
|
||||
if found_uprn == "":
|
||||
return None
|
||||
|
||||
if return_address:
|
||||
return found_uprn, address
|
||||
return found_uprn
|
||||
if verbose:
|
||||
return (found_uprn, address, score)
|
||||
else:
|
||||
return found_uprn
|
||||
|
||||
|
||||
def get_uprn(
|
||||
user_inputed_address: str,
|
||||
postcode: str,
|
||||
verbose: bool = False,
|
||||
):
|
||||
"""
|
||||
Return uprn (str)
|
||||
Return False if failed to find a sensible matching epc
|
||||
Return None when epc found but no UPRN
|
||||
|
||||
This function fetches EPC data via API for a single postcode.
|
||||
For processing multiple addresses in the same postcode, use get_uprn_with_epc_df instead.
|
||||
"""
|
||||
df = get_epc_data_with_postcode(postcode=postcode)
|
||||
|
||||
return get_uprn_with_epc_df(
|
||||
user_inputed_address=user_inputed_address,
|
||||
epc_df=df,
|
||||
verbose=verbose,
|
||||
)
|
||||
|
||||
|
||||
def resolve_uprns_for_postcode_group(
|
||||
|
|
@ -424,148 +479,302 @@ def resolve_uprns_for_postcode_group(
|
|||
)
|
||||
|
||||
|
||||
def test(a, b):
|
||||
assert a == b, f"erorr: {a}{type(a)} != {b}: {type(b)}"
|
||||
def save_results_to_s3(
|
||||
results_df: pd.DataFrame, task_id: str, sub_task_id: str, bucket_name: str = None
|
||||
) -> bool:
|
||||
"""
|
||||
Save results DataFrame to S3 as CSV.
|
||||
|
||||
:param results_df: The DataFrame containing results
|
||||
:param task_id: The task ID (used for file naming)
|
||||
:param bucket_name: The S3 bucket name (defaults to env variable)
|
||||
:return: True if successful, False otherwise
|
||||
"""
|
||||
if bucket_name is None:
|
||||
bucket_name = os.getenv("S3_BUCKET_NAME")
|
||||
|
||||
if not bucket_name:
|
||||
logger.error(
|
||||
"S3 bucket name not provided and S3_BUCKET_NAME environment variable not set"
|
||||
)
|
||||
return False
|
||||
|
||||
try:
|
||||
# Create a filename with the task ID
|
||||
file_name = f"{datetime.now().isoformat()}_{str(uuid.uuid4())[:8]}"
|
||||
file_key = f"ara_raw_outputs/{task_id}/{sub_task_id}/{file_name}.csv"
|
||||
|
||||
# Save to S3
|
||||
success = save_csv_to_s3(results_df, bucket_name, file_key)
|
||||
|
||||
if success:
|
||||
logger.info(f"Successfully saved results to s3://{bucket_name}/{file_key}")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"Failed to save results to S3")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving results to S3: {str(e)}")
|
||||
return False
|
||||
|
||||
|
||||
def run_all_test():
|
||||
# Basic usage with different post codes styles
|
||||
test(get_epc_data_with_postcode("b93 8sy").shape[0], 63)
|
||||
test(get_epc_data_with_postcode("B938sy").shape[0], 63)
|
||||
test(get_epc_data_with_postcode("b93 8Sy").shape[0], 63)
|
||||
test(get_epc_data_with_postcode("b93 8Sy").shape[0], 63)
|
||||
def handler(event, context, local=False):
|
||||
print("=== Address2UPRN Lambda Handler ===")
|
||||
print(f"Function: {context.function_name}")
|
||||
print(f"Request ID: {context.aws_request_id}")
|
||||
|
||||
test(get_uprn("68", "b93 8sy"), "100070989938")
|
||||
test(get_uprn("68 Glendon Way", "b93 8sy"), "100070989938")
|
||||
test(get_uprn("Flat A, 28, Nelgarde Road", "se6 4tf"), "100023278633")
|
||||
test(get_uprn("28 A", "se6 4tf"), "100023278633")
|
||||
test(get_uprn("28A", "se6 4tf"), "100023278633")
|
||||
test(get_uprn("6 Aitken Close", "E8 4SQ"), False)
|
||||
# Handle local testing
|
||||
if local is True:
|
||||
event = {
|
||||
"Records": [
|
||||
{
|
||||
"body": json.dumps(
|
||||
{
|
||||
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
|
||||
"sub_task_id": "6a427b6e-1ece-4983-b1e5-9bffccc53d1d",
|
||||
"s3_uri": "s3://retrofit-data-dev/ara_postcode_splitter_batches/e31f2f21-175b-4a91-a3ec-a6baa325e917/8673913b-1a88-42d7-8578-0449123d94b0/2026-02-16T12:00:20.257856_7b520c0e.csv",
|
||||
}
|
||||
)
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
# unique case
|
||||
test(get_uprn("Flat 5, 1, Semley Gate", "e9 5nh"), "10008238198")
|
||||
test(get_uprn("5 , 1 Semley Gate", "e9 5nh"), "10008238198")
|
||||
test(get_uprn("5 Semley Gate", "e9 5nh"), "10008238198")
|
||||
test(get_uprn("1, 5 Semley Gate", "e9 5nh"), False)
|
||||
test(
|
||||
get_uprn("1 Semley Gate", "e9 5nh"), "10008238188"
|
||||
) # this one return "flat 1, in 1 semley gate"
|
||||
test(
|
||||
get_uprn("48 Oswald Street", "E5 0BT"), False
|
||||
) # this one return "flat 1, in 1 semley gate"
|
||||
test(
|
||||
get_uprn("42 Oswald Street", "E5 0BT"), False
|
||||
) # this one return "flat 1, in 1 semley gate"
|
||||
test(
|
||||
get_uprn("46 Oswald Street", "E5 0BT"), False
|
||||
) # this one return "flat 1, in 1 semley gate"
|
||||
get_uprn_candidates(get_epc_data_with_postcode("e5 0bt"), "48 Oswald Street")
|
||||
get_uprn_candidates(
|
||||
get_epc_data_with_postcode("Cr2 7dl"),
|
||||
"FLAT 3; 42 MORETON ROAD, SOUTH CROYDON, SURREY",
|
||||
)
|
||||
print(f"Event: {json.dumps(event, indent=2, default=str)}")
|
||||
print("===================================")
|
||||
|
||||
# Handle both single event and batch events (SQS, etc.)
|
||||
records = event.get("Records", [event])
|
||||
results = []
|
||||
errors = []
|
||||
subtask_interface = SubTaskInterface()
|
||||
|
||||
if __name__ == "__main__":
|
||||
INPUT_FILE = "hackney.xlsx"
|
||||
|
||||
ADDRESS_COL = "Address 1"
|
||||
POSTCODE_COL = "Postcode"
|
||||
UPRN_COL = "UPRN"
|
||||
|
||||
df = pd.read_excel(INPUT_FILE)
|
||||
|
||||
failures = []
|
||||
|
||||
for _, row in tqdm(
|
||||
df.iterrows(),
|
||||
total=len(df),
|
||||
desc="Auditing UPRNs",
|
||||
):
|
||||
input_address = str(row[ADDRESS_COL]).strip()
|
||||
postcode = str(row[POSTCODE_COL]).strip()
|
||||
|
||||
expected_uprn = None if pd.isna(row[UPRN_COL]) else str(int(row[UPRN_COL]))
|
||||
|
||||
for record in records:
|
||||
task_id = None
|
||||
subtask_id = None
|
||||
try:
|
||||
epc_df = get_epc_data_with_postcode(postcode)
|
||||
# Parse body (inputs)
|
||||
if isinstance(record.get("body"), str):
|
||||
body = json.loads(record["body"])
|
||||
else:
|
||||
body = record.get("body", {})
|
||||
|
||||
if epc_df.empty:
|
||||
failures.append(
|
||||
{
|
||||
**row.to_dict(),
|
||||
"found_uprn": None,
|
||||
"best_match_uprn": None,
|
||||
"best_match_address": None,
|
||||
"best_match_lexiscore": None,
|
||||
"status": "no_epc_results",
|
||||
}
|
||||
# Validate required fields
|
||||
task_id = body.get("task_id")
|
||||
subtask_id = body.get("sub_task_id")
|
||||
s3_uri = body.get("s3_uri")
|
||||
|
||||
if not task_id:
|
||||
errors.append({"error": "Missing required field: task_id"})
|
||||
continue
|
||||
|
||||
if not subtask_id:
|
||||
errors.append({"error": "Missing required field: sub_task_id"})
|
||||
continue
|
||||
|
||||
if not s3_uri:
|
||||
errors.append({"error": "Missing required field: s3_uri"})
|
||||
continue
|
||||
|
||||
# Convert task_id to UUID
|
||||
try:
|
||||
task_id = UUID(task_id) if isinstance(task_id, str) else task_id
|
||||
except ValueError as e:
|
||||
errors.append({"error": f"Invalid UUID format for task_id: {str(e)}"})
|
||||
continue
|
||||
|
||||
# Convert sub_task_id to UUID
|
||||
try:
|
||||
subtask_id = (
|
||||
UUID(subtask_id) if isinstance(subtask_id, str) else subtask_id
|
||||
)
|
||||
except ValueError as e:
|
||||
errors.append(
|
||||
{"error": f"Invalid UUID format for sub_task_id: {str(e)}"}
|
||||
)
|
||||
continue
|
||||
|
||||
scored_df = get_uprn_candidates(
|
||||
epc_df,
|
||||
user_address=input_address,
|
||||
)
|
||||
# Update existing subtask to 'in progress'
|
||||
subtask_interface.update_subtask_status(subtask_id, "in progress")
|
||||
logger.info(f"Processing subtask {subtask_id} for task {task_id}")
|
||||
|
||||
best_row = scored_df.iloc[0]
|
||||
# Parse S3 URI and read CSV from S3
|
||||
logger.info(f"Reading data from S3: {s3_uri}")
|
||||
try:
|
||||
bucket, key = parse_s3_uri(s3_uri)
|
||||
csv_data = read_csv_from_s3_dict(bucket, key)
|
||||
df = pd.DataFrame(csv_data)
|
||||
logger.info(f"Loaded {len(df)} rows from S3")
|
||||
except Exception as s3_error:
|
||||
logger.error(f"Failed to read data from S3: {s3_error}")
|
||||
errors.append(
|
||||
{"error": "Failed to read data from S3", "details": str(s3_error)}
|
||||
)
|
||||
try:
|
||||
subtask_interface.update_subtask_status(
|
||||
subtask_id, "failed", outputs={"error": str(s3_error)}
|
||||
)
|
||||
except Exception as db_error:
|
||||
logger.error(f"Failed to update subtask status: {db_error}")
|
||||
continue
|
||||
|
||||
best_match_uprn = str(best_row["uprn"])
|
||||
best_match_address = best_row["address"]
|
||||
best_match_lexiscore = round(float(best_row["lexiscore"]), 4)
|
||||
# Process the rows
|
||||
logger.info(f"Processing {len(df)} rows for task {task_id}")
|
||||
|
||||
found_uprn = get_uprn(input_address, postcode)
|
||||
# Create user_input column by concatenating Address columns if not already present
|
||||
if "user_input" not in df.columns:
|
||||
df["user_input"] = (
|
||||
df["Address 1"].fillna("")
|
||||
+ " "
|
||||
+ df["Address 2"].fillna("")
|
||||
+ " "
|
||||
+ df["Address 3"].fillna("")
|
||||
).str.strip()
|
||||
logger.info(f"Created user_input column from Address 1 and Address 2")
|
||||
else:
|
||||
logger.info(f"user_input column already present in data")
|
||||
|
||||
clean_df = df.dropna(subset=["postcode_clean"])
|
||||
|
||||
postcode_to_addresses = {
|
||||
postcode: group.to_dict(orient="records")
|
||||
for postcode, group in clean_df.groupby("postcode_clean", sort=False)
|
||||
}
|
||||
|
||||
logger.info(f"Total postcodes: {len(postcode_to_addresses)}")
|
||||
|
||||
# Process each postcode group
|
||||
|
||||
results_data = []
|
||||
|
||||
for postcode, postcode_rows in postcode_to_addresses.items():
|
||||
logger.info(
|
||||
f"Processing postcode: {postcode} with {len(postcode_rows)} rows"
|
||||
)
|
||||
|
||||
# Validate postcode before processing
|
||||
if not is_valid_postcode(postcode):
|
||||
logger.warning(f"Postcode {postcode} is invalid, skipping")
|
||||
continue
|
||||
|
||||
# Fetch EPC data once per postcode
|
||||
try:
|
||||
epc_df = get_epc_data_with_postcode(postcode=postcode)
|
||||
logger.info(
|
||||
f"Fetched {len(epc_df)} EPC records for postcode {postcode}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to fetch EPC data for postcode {postcode}: {e}"
|
||||
)
|
||||
continue
|
||||
|
||||
# Process each address in this postcode with the same EPC data
|
||||
for row in postcode_rows:
|
||||
try:
|
||||
user_input = row.get("user_input", "")
|
||||
if not user_input:
|
||||
logger.warning(
|
||||
f"Skipping row with missing user_input for postcode {postcode}"
|
||||
)
|
||||
continue
|
||||
|
||||
# Get UPRN using the pre-fetched EPC data with all return options
|
||||
result = get_uprn_with_epc_df(
|
||||
user_inputed_address=user_input, epc_df=epc_df, verbose=True
|
||||
)
|
||||
|
||||
# Parse result tuple if successful
|
||||
if result:
|
||||
uprn, found_address, score = result
|
||||
logger.info(
|
||||
f"Found UPRN for {user_input} in {postcode}: {uprn} (score: {score})"
|
||||
)
|
||||
|
||||
results_data.append(
|
||||
{
|
||||
**row, # Include all original data
|
||||
"uprn": uprn,
|
||||
"domna_found_address": found_address,
|
||||
"domna_lexiscore": score,
|
||||
}
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"No UPRN found for {user_input} in {postcode}"
|
||||
)
|
||||
results_data.append(
|
||||
{
|
||||
**row, # Include all original data
|
||||
"uprn": None,
|
||||
"domna_found_address": None,
|
||||
"domna_lexiscore": None,
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error processing address {row.get('user_input', 'unknown')}: {e}"
|
||||
)
|
||||
# Still add the row with error markers
|
||||
results_data.append(
|
||||
{
|
||||
**row,
|
||||
"uprn": None,
|
||||
"domna_found_address": None,
|
||||
"domna_lexiscore": None,
|
||||
"error": str(e),
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
# Create results DataFrame
|
||||
result_df = pd.DataFrame(results_data)
|
||||
|
||||
# Save results to S3
|
||||
try:
|
||||
save_results_to_s3(result_df, str(task_id), str(subtask_id))
|
||||
except Exception as s3_error:
|
||||
logger.error(f"Failed to save results to S3: {s3_error}")
|
||||
|
||||
# Mark subtask as completed
|
||||
try:
|
||||
subtask_interface.update_subtask_status(
|
||||
subtask_id,
|
||||
"completed",
|
||||
outputs={"rows_processed": "todo -> show sensible output"},
|
||||
)
|
||||
logger.info(f"Marked subtask {subtask_id} as completed")
|
||||
except Exception as db_error:
|
||||
logger.error(f"Failed to mark subtask as completed: {db_error}")
|
||||
|
||||
except Exception as e:
|
||||
failures.append(
|
||||
{
|
||||
**row.to_dict(),
|
||||
"found_uprn": None,
|
||||
"best_match_uprn": None,
|
||||
"best_match_address": None,
|
||||
"best_match_lexiscore": None,
|
||||
"status": "exception",
|
||||
"error": str(e),
|
||||
}
|
||||
)
|
||||
continue
|
||||
logger.error(f"Unexpected error processing record: {e}", exc_info=True)
|
||||
errors.append({"error": "Unexpected error", "details": str(e)})
|
||||
# Mark subtask as failed if we have one
|
||||
if subtask_id:
|
||||
try:
|
||||
subtask_interface.update_subtask_status(
|
||||
subtask_id, "failed", outputs={"error": str(e)}
|
||||
)
|
||||
except Exception as db_error:
|
||||
logger.error(f"Failed to update subtask status: {db_error}")
|
||||
|
||||
found_uprn_norm = None if not found_uprn else str(found_uprn)
|
||||
# Return error if all records failed
|
||||
logger.info(results_data)
|
||||
logger.info(results)
|
||||
if errors and not results:
|
||||
return {"statusCode": 500, "body": json.dumps({"errors": errors})}
|
||||
|
||||
if found_uprn_norm != expected_uprn:
|
||||
failures.append(
|
||||
{
|
||||
**row.to_dict(),
|
||||
"found_uprn": found_uprn_norm,
|
||||
"best_match_uprn": best_match_uprn,
|
||||
"best_match_address": best_match_address,
|
||||
"best_match_lexiscore": best_match_lexiscore,
|
||||
"status": ("no_match" if found_uprn_norm is None else "mismatch"),
|
||||
}
|
||||
)
|
||||
|
||||
failures_df = pd.DataFrame(failures)
|
||||
|
||||
print("===================================")
|
||||
print(f"Total rows : {len(df)}")
|
||||
print(f"Failures : {len(failures_df)}")
|
||||
print("===================================")
|
||||
|
||||
failures_df.to_excel(
|
||||
"hackney_uprn_failures.xlsx",
|
||||
index=False,
|
||||
)
|
||||
return {
|
||||
"statusCode": 200,
|
||||
"body": json.dumps(
|
||||
{"processed": results, "errors": errors if errors else None}
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def handler(event, context):
|
||||
print("hello world")
|
||||
return {"statusCode": 200, "body": "hello world"}
|
||||
|
||||
|
||||
# TO do function dispatcher,
|
||||
|
||||
# get_uprn_candidates(get_epc_data_with_postcode("E9 5NH"),"Flat 1, 5 Semley Gate" and Flat 5, 1 Semley Gate)
|
||||
# fix that
|
||||
# Look again at flat 1
|
||||
# pandas reader the seperate postcode_splitter
|
||||
# dump into s3
|
||||
# TODO:
|
||||
# Don't add results to return messages as its too verbose
|
||||
# capture the exepection as e, into s3, to find the logs go to s3
|
||||
# Upload results to s3 as well as csv
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
# one time script for a customer forhousing
|
||||
|
||||
import pandas as pd
|
||||
from tqdm import tqdm
|
||||
from backend.address2UPRN.main import get_uprn
|
||||
|
|
@ -5,20 +7,35 @@ from backend.address2UPRN.main import get_uprn
|
|||
# Enable tqdm for pandas
|
||||
tqdm.pandas()
|
||||
|
||||
df = pd.read_excel("address2.xlsx")
|
||||
file_name = "forhousing.xlsx"
|
||||
|
||||
df = pd.read_excel(file_name)
|
||||
|
||||
|
||||
def extract_uprn(row):
|
||||
print(row["User Input"], row["Postcode"])
|
||||
result = get_uprn(row["User Input"], row["Postcode"], return_address=True)
|
||||
user_input = "Address"
|
||||
postcode = "Postcode"
|
||||
result = get_uprn(
|
||||
row[user_input],
|
||||
row[postcode],
|
||||
return_address=True,
|
||||
return_EPC=True,
|
||||
return_score=True,
|
||||
)
|
||||
|
||||
if result is None:
|
||||
return pd.Series([None, None])
|
||||
return pd.Series([None, None, None, None])
|
||||
|
||||
uprn, found_address = result
|
||||
return pd.Series([uprn, found_address])
|
||||
uprn, found_address, epc, score = result
|
||||
return pd.Series([uprn, found_address, epc, score])
|
||||
|
||||
|
||||
df[["juntes uprn", "junte found address"]] = df.progress_apply(extract_uprn, axis=1)
|
||||
df[["juntes uprn", "junte found address", "junte found epc", "junte score"]] = (
|
||||
df.progress_apply(extract_uprn, axis=1)
|
||||
)
|
||||
|
||||
df.to_excel("outputs2.xlsx", index=False)
|
||||
df.to_excel(f"{file_name}_outputs.xlsx", index=False)
|
||||
|
||||
# TODO: add lexiscore
|
||||
# TODO: run it
|
||||
# TODO: give it to danny
|
||||
|
|
|
|||
|
|
@ -18,37 +18,37 @@ def resolve_env_file() -> Optional[str]:
|
|||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
API_KEY: str
|
||||
API_KEY: str = "changeme"
|
||||
API_KEY_NAME: str = "X-API-KEY"
|
||||
SECRET_KEY: str
|
||||
ENVIRONMENT: str
|
||||
DATA_BUCKET: str
|
||||
SECRET_KEY: str = "changeme"
|
||||
ENVIRONMENT: str = "changeme"
|
||||
DATA_BUCKET: str = "changeme"
|
||||
PLAN_TRIGGER_BUCKET: str
|
||||
ENGINE_SQS_URL: str
|
||||
ENGINE_SQS_URL: str = "changeme"
|
||||
|
||||
# Third parties
|
||||
EPC_AUTH_TOKEN: str
|
||||
GOOGLE_SOLAR_API_KEY: str
|
||||
EPC_AUTH_TOKEN: str = "changeme"
|
||||
GOOGLE_SOLAR_API_KEY: str = "changeme"
|
||||
|
||||
# Database settings
|
||||
DB_HOST: str
|
||||
DB_PASSWORD: str
|
||||
DB_USERNAME: str
|
||||
DB_PORT: str
|
||||
DB_NAME: str
|
||||
DB_HOST: str = "changeme"
|
||||
DB_PASSWORD: str = "changeme"
|
||||
DB_USERNAME: str = "changeme"
|
||||
DB_PORT: str = "changeme"
|
||||
DB_NAME: str = "changeme"
|
||||
|
||||
# Prediction buckets
|
||||
SAP_PREDICTIONS_BUCKET: str
|
||||
CARBON_PREDICTIONS_BUCKET: str
|
||||
HEAT_PREDICTIONS_BUCKET: str
|
||||
SAP_PREDICTIONS_BUCKET: str = "changeme"
|
||||
CARBON_PREDICTIONS_BUCKET: str = "changeme"
|
||||
HEAT_PREDICTIONS_BUCKET: str = "changeme"
|
||||
# LIGHTING_COST_PREDICTIONS_BUCKET: str
|
||||
# HEATING_COST_PREDICTIONS_BUCKET: str
|
||||
# HOT_WATER_COST_PREDICTIONS_BUCKET: str
|
||||
HEATING_KWH_PREDICTIONS_BUCKET: str
|
||||
HOTWATER_KWH_PREDICTIONS_BUCKET: str
|
||||
HEATING_KWH_PREDICTIONS_BUCKET: str = "changeme"
|
||||
HOTWATER_KWH_PREDICTIONS_BUCKET: str = "changeme"
|
||||
|
||||
# Other S3 buckts
|
||||
ENERGY_ASSESSMENTS_BUCKET: str
|
||||
ENERGY_ASSESSMENTS_BUCKET: str = "changeme"
|
||||
|
||||
# Optional AWS creds (only required in local)
|
||||
AWS_ACCESS_KEY_ID: Optional[str] = None
|
||||
|
|
|
|||
0
backend/app/db/functions/tasks/__init__.py
Normal file
0
backend/app/db/functions/tasks/__init__.py
Normal file
|
|
@ -10,7 +10,7 @@ mangum==0.19.0
|
|||
# AWS
|
||||
boto3==1.35.44
|
||||
# Data
|
||||
openpyxl==3.1.2
|
||||
openpyxl==3.1.5
|
||||
# Basic
|
||||
pytz
|
||||
sqlmodel
|
||||
|
|
@ -29,5 +29,5 @@ class ConditionTriggerRequest(BaseModel):
|
|||
# {
|
||||
# "file_type": "LBWF",
|
||||
# "trigger_file_bucket": "condition-data-dev",
|
||||
# "trigger_file_key": "input/lbwf/LBWF - Example Asset Data September 2025.xlsx",
|
||||
# "trigger_file_key": "input/lbwf/LBWF - Example Asset Data September 2025.xlsx"
|
||||
# }
|
||||
|
|
|
|||
|
|
@ -1,9 +1,28 @@
|
|||
FROM public.ecr.aws/lambda/python:3.10
|
||||
FROM public.ecr.aws/lambda/python:3.11
|
||||
|
||||
ARG DEV_DB_HOST
|
||||
ARG DEV_DB_PORT
|
||||
ARG DEV_DB_NAME
|
||||
|
||||
ENV DB_HOST=${DEV_DB_HOST}
|
||||
ENV DB_PORT=${DEV_DB_PORT}
|
||||
ENV DB_NAME=${DEV_DB_NAME}
|
||||
|
||||
# Set working directory (Lambda task root)
|
||||
WORKDIR /var/task
|
||||
|
||||
# -----------------------------
|
||||
COPY backend/postcode_splitter/handler/requirements.txt .
|
||||
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
# Copy necessary files for database and utility imports
|
||||
COPY utils/ utils/
|
||||
COPY backend/ backend/
|
||||
COPY datatypes/ datatypes/
|
||||
|
||||
# Copy the handler
|
||||
COPY backend/postcode_splitter/main.py .
|
||||
|
||||
# Lambda handler
|
||||
# -----------------------------
|
||||
CMD ["main.handler"]
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,11 @@
|
|||
pandas==2.2.2
|
||||
numpy<2.0
|
||||
requests
|
||||
tqdm
|
||||
openpyxl
|
||||
epc-api-python==1.0.2
|
||||
boto3==1.35.44
|
||||
sqlmodel
|
||||
sqlalchemy==2.0.36
|
||||
psycopg2-binary==2.9.10
|
||||
pydantic-settings==2.6.0
|
||||
|
|
@ -1,127 +1,278 @@
|
|||
import os
|
||||
import sys
|
||||
import json
|
||||
import pandas as pd
|
||||
import requests
|
||||
from backend.address2UPRN.main import (
|
||||
resolve_uprns_for_postcode_group,
|
||||
get_epc_data_with_postcode,
|
||||
import boto3
|
||||
from uuid import UUID, uuid4
|
||||
from utils.s3 import (
|
||||
read_csv_from_s3 as read_csv_from_s3_dict,
|
||||
save_csv_to_s3,
|
||||
parse_s3_uri,
|
||||
)
|
||||
from utils.logger import setup_logger
|
||||
from tqdm import tqdm
|
||||
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
|
||||
from datetime import datetime
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
def sanitise_postcode(postcode: str) -> str | None:
|
||||
def upload_batch_to_s3(
|
||||
batch_df: pd.DataFrame, task_id: str, sub_task_id: str, bucket_name: str = None
|
||||
) -> str:
|
||||
"""
|
||||
Normalise postcode for grouping.
|
||||
|
||||
- Uppercase
|
||||
- Remove all whitespace
|
||||
Upload batch DataFrame to S3 as CSV.
|
||||
"""
|
||||
if pd.isna(postcode):
|
||||
return None
|
||||
if bucket_name is None:
|
||||
bucket_name = os.getenv("S3_BUCKET_NAME")
|
||||
|
||||
return postcode.upper().replace(" ", "")
|
||||
|
||||
|
||||
def is_valid_postcode(postcode_clean: str) -> bool:
|
||||
"""
|
||||
Validate postcode using postcodes.io.
|
||||
|
||||
Expects a sanitised postcode (e.g. E84SQ).
|
||||
Returns True if valid, False otherwise.
|
||||
"""
|
||||
POSTCODES_IO_VALIDATE_URL = "https://api.postcodes.io/postcodes/{postcode}/validate"
|
||||
if not postcode_clean:
|
||||
return False
|
||||
if not bucket_name:
|
||||
logger.error(
|
||||
"S3 bucket name not provided and S3_BUCKET_NAME environment variable not set"
|
||||
)
|
||||
raise ValueError("S3_BUCKET_NAME not configured")
|
||||
|
||||
try:
|
||||
resp = requests.get(
|
||||
POSTCODES_IO_VALIDATE_URL.format(postcode=postcode_clean),
|
||||
timeout=5,
|
||||
file_name = f"{datetime.now().isoformat()}_{str(uuid4())[:8]}"
|
||||
file_key = (
|
||||
f"ara_postcode_splitter_batches/{task_id}/{sub_task_id}/{file_name}.csv"
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json().get("result", False)
|
||||
except requests.RequestException:
|
||||
# Network issues, rate limits, etc.
|
||||
return False
|
||||
|
||||
success = save_csv_to_s3(batch_df, bucket_name, file_key)
|
||||
|
||||
if success:
|
||||
s3_uri = f"s3://{bucket_name}/{file_key}"
|
||||
logger.info(f"Successfully uploaded batch to {s3_uri}")
|
||||
return s3_uri
|
||||
else:
|
||||
logger.error(f"Failed to upload batch to S3")
|
||||
raise ValueError("Failed to save CSV to S3")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error uploading batch to S3: {str(e)}")
|
||||
raise
|
||||
|
||||
|
||||
def main():
|
||||
df = pd.read_excel("hackney.xlsx", sheet_name="Sustainability")
|
||||
df = df.head(500)
|
||||
def send_to_address2uprn_queue(task_id: str, sub_task_id: str, s3_uri: str) -> str:
|
||||
"""
|
||||
Send a batch to the address2UPRN SQS queue with S3 reference.
|
||||
|
||||
# Sanitise postcodes
|
||||
df["postcode_clean"] = df["Postcode"].apply(sanitise_postcode)
|
||||
Args:
|
||||
task_id: The parent task ID
|
||||
sub_task_id: The new subtask ID for this batch
|
||||
s3_uri: S3 URI pointing to the batch CSV file
|
||||
|
||||
# --- validate AFTER grouping (save API calls) ---
|
||||
Returns:
|
||||
Message ID from SQS
|
||||
"""
|
||||
sqs_client = boto3.client("sqs")
|
||||
queue_url = os.getenv("ADDRESS2UPRN_QUEUE_URL")
|
||||
|
||||
# Get unique, non-null postcodes
|
||||
unique_postcodes = df["postcode_clean"].dropna().unique()
|
||||
if not queue_url:
|
||||
raise ValueError("ADDRESS2UPRN_QUEUE_URL environment variable not set")
|
||||
|
||||
# Validate each postcode once, TODOadd a progress bar
|
||||
postcode_validity = {
|
||||
pc: is_valid_postcode(pc)
|
||||
for pc in tqdm(unique_postcodes, total=len(unique_postcodes))
|
||||
message_body = {
|
||||
"task_id": task_id,
|
||||
"sub_task_id": sub_task_id,
|
||||
"s3_uri": s3_uri,
|
||||
}
|
||||
|
||||
# Map validity back onto dataframe
|
||||
df["postcode_valid"] = df["postcode_clean"].map(postcode_validity)
|
||||
response = sqs_client.send_message(
|
||||
QueueUrl=queue_url,
|
||||
MessageBody=json.dumps(message_body),
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Sent message to address2UPRN queue. "
|
||||
f"Task: {task_id}, SubTask: {sub_task_id}, MessageId: {response['MessageId']}"
|
||||
)
|
||||
|
||||
return response["MessageId"]
|
||||
|
||||
|
||||
def create_batch_and_send_to_address2uprn(
|
||||
batch_df: pd.DataFrame,
|
||||
task_id: str,
|
||||
sub_task_id: str,
|
||||
subtask_interface: SubTaskInterface,
|
||||
bucket_name: str,
|
||||
) -> str:
|
||||
"""
|
||||
Create a batch DataFrame, upload to S3, create subtask, and send to address2UPRN queue.
|
||||
|
||||
"""
|
||||
# Upload batch to S3
|
||||
|
||||
s3_uri = upload_batch_to_s3(batch_df, str(task_id), str(sub_task_id), bucket_name)
|
||||
|
||||
# Create a new subtask for this batch with all inputs
|
||||
created_batch_sub_task_id = subtask_interface.create_subtask(
|
||||
task_id=task_id,
|
||||
inputs={
|
||||
"task_id": str(task_id),
|
||||
"s3_uri": s3_uri,
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(f"Created batch subtask {created_batch_sub_task_id}")
|
||||
|
||||
# Send message with S3 reference
|
||||
send_to_address2uprn_queue(
|
||||
task_id=str(task_id),
|
||||
sub_task_id=str(created_batch_sub_task_id),
|
||||
s3_uri=s3_uri,
|
||||
)
|
||||
|
||||
return created_batch_sub_task_id
|
||||
|
||||
|
||||
def handler(event, context, local=False):
|
||||
print(f"Function: {context.function_name}")
|
||||
print(f"Request ID: {context.aws_request_id}")
|
||||
|
||||
# Example SQS message for testing (copy and paste into SQS):
|
||||
if local is True:
|
||||
event = {
|
||||
"Records": [
|
||||
{
|
||||
"body": json.dumps(
|
||||
{
|
||||
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
|
||||
"sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0",
|
||||
"s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/peabody/2025_11_11 - Peabody - Data Extracts for Domna_transformed.csv",
|
||||
}
|
||||
)
|
||||
}
|
||||
]
|
||||
}
|
||||
# Handle both single event and batch events (SQS, etc.)
|
||||
records = event.get("Records", [event])
|
||||
results = []
|
||||
errors = []
|
||||
subtask_interface = SubTaskInterface()
|
||||
bucket_name = os.getenv("S3_BUCKET_NAME")
|
||||
if local:
|
||||
bucket_name = "retrofit-data-dev"
|
||||
|
||||
for postcode, group_df in tqdm(
|
||||
df[df["postcode_valid"]].groupby("postcode_clean"),
|
||||
desc="Resolving UPRNs by postcode",
|
||||
):
|
||||
try:
|
||||
epc_df = get_epc_data_with_postcode(postcode)
|
||||
for record in records:
|
||||
if local:
|
||||
record = records[0]
|
||||
task_id = None
|
||||
subtask_id = None
|
||||
# Parse body (inputs)
|
||||
|
||||
if epc_df.empty:
|
||||
tmp = group_df.copy()
|
||||
tmp["found_uprn"] = None
|
||||
tmp["status"] = "no_epc_results"
|
||||
results.append(tmp)
|
||||
continue
|
||||
if isinstance(record.get("body"), str):
|
||||
body = json.loads(record["body"])
|
||||
else:
|
||||
body = record.get("body", {})
|
||||
|
||||
resolved = resolve_uprns_for_postcode_group(
|
||||
group_df=group_df,
|
||||
epc_df=epc_df,
|
||||
# Validate required fields
|
||||
task_id = body.get("task_id")
|
||||
subtask_id = body.get("sub_task_id")
|
||||
s3_uri = body.get("s3_uri")
|
||||
|
||||
# Convert task_id to UUID
|
||||
task_id = UUID(task_id) if isinstance(task_id, str) else task_id
|
||||
subtask_id = UUID(subtask_id) if isinstance(subtask_id, str) else subtask_id
|
||||
|
||||
# Mark subtask as in progress
|
||||
subtask_interface.update_subtask_status(subtask_id, "in progress")
|
||||
logger.info(f"Marked subtask {subtask_id} as in progress")
|
||||
|
||||
# Read CSV from S3
|
||||
bucket, key = parse_s3_uri(s3_uri)
|
||||
logger.info(f"S3 Bucket: {bucket}, Key: {key}")
|
||||
|
||||
csv_data = read_csv_from_s3_dict(bucket, key)
|
||||
df = pd.DataFrame(csv_data)
|
||||
|
||||
logger.info(f"CSV loaded: {len(df)} rows, {len(df.columns)} columns")
|
||||
|
||||
# Sanitise postcodes
|
||||
df["postcode_clean"] = df["postcode"].str.upper().str.replace(" ", "")
|
||||
|
||||
df = df.dropna(subset=["postcode_clean"])
|
||||
|
||||
batch_size = 500
|
||||
if df.shape[0] < batch_size:
|
||||
create_batch_and_send_to_address2uprn(
|
||||
batch_df=df,
|
||||
task_id=task_id,
|
||||
sub_task_id=subtask_id,
|
||||
subtask_interface=subtask_interface,
|
||||
bucket_name=bucket_name,
|
||||
)
|
||||
else:
|
||||
postcode_to_addresses = {
|
||||
postcode: group
|
||||
for postcode, group in df.groupby("postcode_clean", sort=False)
|
||||
}
|
||||
|
||||
results.append(resolved)
|
||||
count = 0
|
||||
buffer = []
|
||||
|
||||
except Exception as e:
|
||||
tmp = group_df.copy()
|
||||
tmp["found_uprn"] = None
|
||||
tmp["status"] = "exception"
|
||||
tmp["error"] = str(e)
|
||||
results.append(tmp)
|
||||
for postcode, group_df in postcode_to_addresses.items():
|
||||
group_len = len(group_df)
|
||||
|
||||
final_df = pd.concat(results, ignore_index=True)
|
||||
a = final_df[
|
||||
[
|
||||
"best_match_lexiscore",
|
||||
"Address 1",
|
||||
"best_match_address",
|
||||
"Postcode",
|
||||
"UPRN",
|
||||
"best_match_uprn",
|
||||
]
|
||||
] # add levi score to viewing
|
||||
b = final_df[final_df["best_match_lexiscore"] > 0] # add levi score to viewing
|
||||
b = b[
|
||||
[
|
||||
"best_match_lexiscore",
|
||||
"Address 1",
|
||||
"best_match_address",
|
||||
"Postcode",
|
||||
"UPRN",
|
||||
"best_match_uprn",
|
||||
]
|
||||
]
|
||||
# If single postcode is bigger than batch_size → send directly
|
||||
if group_len >= batch_size:
|
||||
if buffer:
|
||||
create_batch_and_send_to_address2uprn(
|
||||
batch_df=pd.concat(buffer, ignore_index=True),
|
||||
task_id=task_id,
|
||||
sub_task_id=subtask_id,
|
||||
subtask_interface=subtask_interface,
|
||||
bucket_name=bucket_name,
|
||||
)
|
||||
buffer = []
|
||||
count = 0
|
||||
|
||||
create_batch_and_send_to_address2uprn(
|
||||
batch_df=group_df,
|
||||
task_id=task_id,
|
||||
sub_task_id=subtask_id,
|
||||
subtask_interface=subtask_interface,
|
||||
bucket_name=bucket_name,
|
||||
)
|
||||
continue
|
||||
|
||||
def handler(event, context):
|
||||
print("hello Postcode splitter world")
|
||||
return {"statusCode": 200, "body": "hello world"}
|
||||
# If adding would exceed batch → flush first
|
||||
if count + group_len > batch_size:
|
||||
create_batch_and_send_to_address2uprn(
|
||||
batch_df=pd.concat(buffer, ignore_index=True),
|
||||
task_id=task_id,
|
||||
sub_task_id=subtask_id,
|
||||
subtask_interface=subtask_interface,
|
||||
bucket_name=bucket_name,
|
||||
)
|
||||
buffer = []
|
||||
count = 0
|
||||
|
||||
# Add group
|
||||
buffer.append(group_df)
|
||||
count += group_len
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
# Final flush
|
||||
if buffer:
|
||||
create_batch_and_send_to_address2uprn(
|
||||
batch_df=pd.concat(buffer, ignore_index=True),
|
||||
task_id=task_id,
|
||||
sub_task_id=subtask_id,
|
||||
subtask_interface=subtask_interface,
|
||||
bucket_name=bucket_name,
|
||||
)
|
||||
|
||||
# Mark subtask as completed
|
||||
subtask_interface.update_subtask_status(
|
||||
subtask_id,
|
||||
"completed",
|
||||
outputs={"rows_processed": "completed"},
|
||||
)
|
||||
|
||||
return {
|
||||
"statusCode": 200,
|
||||
"body": json.dumps(
|
||||
{"processed": results, "errors": errors if errors else None}
|
||||
),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -407,5 +407,4 @@ module "categorisation_registry" {
|
|||
source = "../modules/container_registry"
|
||||
name = "categorisation"
|
||||
stage = var.stage
|
||||
|
||||
}
|
||||
|
|
@ -36,6 +36,8 @@ scenario_names = {
|
|||
1059: "EPC C - 10k budget",
|
||||
}
|
||||
|
||||
project_name = "manchester"
|
||||
|
||||
|
||||
def get_data(portfolio_id, scenario_ids):
|
||||
session = sessionmaker(bind=db_engine)()
|
||||
|
|
@ -232,7 +234,7 @@ for scenario_id in SCENARIOS:
|
|||
# Get recs for this scenario
|
||||
recommended_measures_df = recommendations_df[
|
||||
recommendations_df["scenario_id"] == scenario_id
|
||||
][["property_id", "measure_type", "estimated_cost", "default"]]
|
||||
][["property_id", "measure_type", "estimated_cost", "default"]]
|
||||
recommended_measures_df = recommended_measures_df[
|
||||
recommended_measures_df["default"]
|
||||
]
|
||||
|
|
@ -240,7 +242,7 @@ for scenario_id in SCENARIOS:
|
|||
|
||||
post_install_sap = recommendations_df[
|
||||
recommendations_df["scenario_id"] == scenario_id
|
||||
][["property_id", "default", "sap_points"]]
|
||||
][["property_id", "default", "sap_points"]]
|
||||
post_install_sap = post_install_sap[post_install_sap["default"]]
|
||||
# Sum up the sap points by property id
|
||||
post_install_sap = (
|
||||
|
|
@ -286,6 +288,8 @@ for scenario_id in SCENARIOS:
|
|||
"current_sap_points",
|
||||
"total_floor_area",
|
||||
"number_of_rooms",
|
||||
"lodgement_date",
|
||||
"is_expired",
|
||||
"id",
|
||||
]
|
||||
]
|
||||
|
|
@ -303,7 +307,58 @@ for scenario_id in SCENARIOS:
|
|||
)
|
||||
df["uprn"] = df["uprn"].astype(str)
|
||||
|
||||
relevant_plans = plans_df[plans_df["scenario_id"] == scenario_id]
|
||||
df2 = df.merge(
|
||||
relevant_plans[["property_id", "post_sap_points", "post_epc_rating"]],
|
||||
how="left",
|
||||
on="property_id",
|
||||
suffixes=("", "_plan"),
|
||||
)
|
||||
print(df2["predicted_post_works_epc"].value_counts())
|
||||
print(df2["post_epc_rating"].value_counts())
|
||||
|
||||
z = df2[
|
||||
(df2["predicted_post_works_epc"] != "D")
|
||||
& (df2["post_epc_rating"].astype(str) == "Epc.D")
|
||||
]
|
||||
|
||||
df2["predicted_post_works_epc"].value_counts()
|
||||
df2["post_epc_rating"].astype(str).value_counts()
|
||||
|
||||
df2[df2["total_retrofit_cost"] > 0].shape
|
||||
|
||||
getting_works = df[df["total_retrofit_cost"] > 0]
|
||||
getting_works["predicted_post_works_epc"].value_counts()
|
||||
|
||||
32565 / getting_works.shape[0]
|
||||
|
||||
df[df["predicted_post_works_sap"] == ""]
|
||||
|
||||
# Expected columns list
|
||||
expected_columns = [
|
||||
"suspended_floor_insulation",
|
||||
"solid_floor_insulation",
|
||||
"external_wall_insulation",
|
||||
"internal_wall_insulation",
|
||||
"cavity_wall_insulation",
|
||||
"loft_insulation",
|
||||
"flat_roof_insulation",
|
||||
"room_roof_insulation",
|
||||
"secondary_glazing",
|
||||
"double_glazing",
|
||||
"solar_pv",
|
||||
"high_heat_retention_storage_heaters",
|
||||
"air_source_heat_pump",
|
||||
"boiler_upgrade",
|
||||
"roomstat_programmer_trvs",
|
||||
"time_temperature_zone_control",
|
||||
]
|
||||
# Add missing columns with default values
|
||||
for col in expected_columns:
|
||||
if col not in df.columns:
|
||||
df[col] = ""
|
||||
|
||||
# Create excel to store to
|
||||
filename = f"{scenario_names[scenario_id]} - 20250113 final.xlsx"
|
||||
filename = f"{scenario_names[scenario_id]} - {project_name}.xlsx"
|
||||
with pd.ExcelWriter(filename) as writer:
|
||||
df.to_excel(writer, sheet_name="properties", index=False)
|
||||
|
|
|
|||
50
utils/s3.py
50
utils/s3.py
|
|
@ -3,12 +3,62 @@ import boto3
|
|||
import csv
|
||||
import pandas as pd
|
||||
from io import BytesIO, StringIO
|
||||
from urllib.parse import unquote
|
||||
from utils.logger import setup_logger
|
||||
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
def parse_s3_uri(s3_uri: str) -> tuple[str, str]:
|
||||
"""
|
||||
Parse S3 URI to extract bucket and key.
|
||||
|
||||
Supports two formats:
|
||||
1. S3 URI format: s3://bucket/key
|
||||
2. AWS console URL format with query parameters
|
||||
"""
|
||||
logger.info("Parsing S3 URI")
|
||||
|
||||
try:
|
||||
# Check if it's an S3 URI format
|
||||
if s3_uri.startswith("s3://"):
|
||||
parts = s3_uri[5:].split("/", 1)
|
||||
if len(parts) < 2:
|
||||
raise ValueError("S3 URI must include both bucket and key")
|
||||
bucket = parts[0]
|
||||
key = parts[1]
|
||||
logger.info(f"Extracted bucket: {bucket}, key: {key}")
|
||||
return bucket, key
|
||||
|
||||
# Otherwise, treat as AWS console URL
|
||||
logger.info("Parsing as AWS console URL")
|
||||
|
||||
# Split base URL and query string
|
||||
if "?" not in s3_uri:
|
||||
raise ValueError("No query string found")
|
||||
|
||||
base, query = s3_uri.split("?", 1)
|
||||
|
||||
# Extract bucket from base URL
|
||||
if "/s3/object/" not in base:
|
||||
raise ValueError("No '/s3/object/' found in URL path")
|
||||
|
||||
path_parts = base.split("/s3/object/")
|
||||
bucket = path_parts[1]
|
||||
logger.info(f"Extracted bucket: {bucket}")
|
||||
|
||||
# Extract prefix from query parameters
|
||||
params = dict(item.split("=") for item in query.split("&") if "=" in item)
|
||||
key = unquote(params.get("prefix", ""))
|
||||
logger.info(f"Extracted key: {key}")
|
||||
|
||||
return bucket, key
|
||||
except Exception as e:
|
||||
logger.error(f"Error parsing S3 URI: {type(e).__name__}: {e}")
|
||||
raise ValueError(f"Could not parse S3 URI") from e
|
||||
|
||||
|
||||
def read_from_s3(bucket_name, s3_file_name):
|
||||
"""
|
||||
Read an object from s3. Decoding of the data is left for outside of this function
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue