mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
Merge pull request #795 from Hestia-Homes/feature/ordanant_survey_api
Feature/ordanant survey api
This commit is contained in:
commit
c7a395efaf
27 changed files with 923 additions and 272 deletions
|
|
@ -21,7 +21,7 @@ RUN git clone --depth 1 https://github.com/openvenues/libpostal /tmp/libpostal \
|
|||
&& rm -rf /tmp/libpostal
|
||||
|
||||
# 3) Create the user and grant sudo privileges
|
||||
RUN useradd -m -s /usr/bin/bash ${USER} \
|
||||
RUN useradd -m -s /bin/bash ${USER} \
|
||||
&& echo "${USER} ALL=(ALL) NOPASSWD: ALL" >/etc/sudoers.d/${USER} \
|
||||
&& chmod 0440 /etc/sudoers.d/${USER}
|
||||
|
||||
|
|
@ -32,6 +32,11 @@ ADD asset_list/requirements.txt requirements1.txt
|
|||
RUN cat requirements1.txt requirements2.txt >> requirements.txt
|
||||
|
||||
RUN pip install -r requirements.txt
|
||||
|
||||
# Install code server
|
||||
RUN curl -fsSL https://code-server.dev/install.sh | sh
|
||||
|
||||
|
||||
# 5) Workdir
|
||||
WORKDIR /workspaces/model
|
||||
|
||||
|
|
|
|||
|
|
@ -2,13 +2,14 @@
|
|||
"name": "SAL ENV",
|
||||
"dockerComposeFile": "docker-compose.yml",
|
||||
"service": "model-sal",
|
||||
"remoteUser": "vscode",
|
||||
// "remoteUser": "vscode",
|
||||
"workspaceFolder": "/workspaces/model",
|
||||
"postStartCommand": "bash .devcontainer/post-install.sh",
|
||||
"postStartCommand": "bash .devcontainer/asset_list/post-install.sh",
|
||||
"mounts": [
|
||||
// Optional, just makes getting from Downloads (local env) easier
|
||||
"source=${localEnv:HOME},target=/workspaces/home,type=bind"
|
||||
"source=${localEnv:HOME},target=/home/vscode,type=bind"
|
||||
],
|
||||
"forwardPorts": [8081],
|
||||
"customizations": {
|
||||
"vscode": {
|
||||
"extensions": [
|
||||
|
|
|
|||
|
|
@ -2,15 +2,17 @@ version: '3.8'
|
|||
|
||||
services:
|
||||
model-sal:
|
||||
user: "${UID}:${GID}"
|
||||
build:
|
||||
context: ../..
|
||||
dockerfile: .devcontainer/asset_list/Dockerfile
|
||||
command: sleep infinity
|
||||
command: code-server --bind-addr 0.0.0.0:8080
|
||||
user: vscode
|
||||
volumes:
|
||||
- ../../:/workspaces/model
|
||||
networks:
|
||||
- model-net
|
||||
ports:
|
||||
- "8081:8080"
|
||||
|
||||
networks:
|
||||
model-net:
|
||||
|
|
|
|||
|
|
@ -1,14 +1,14 @@
|
|||
mkdir -p ~/.ipython/profile_default/startup
|
||||
# mkdir -p ~/.ipython/profile_default/startup
|
||||
|
||||
cat << 'EOF' > ~/.ipython/profile_default/startup/00-load-env.py
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
# cat << 'EOF' > ~/.ipython/profile_default/startup/00-load-env.py
|
||||
# from dotenv import load_dotenv
|
||||
# import os
|
||||
|
||||
# Adjust path as needed
|
||||
env_path = "/workspaces/model/backend/.env"
|
||||
if os.path.exists(env_path):
|
||||
load_dotenv(env_path)
|
||||
print("✔ Loaded .env into Jupyter kernel")
|
||||
else:
|
||||
print("⚠ No .env file found to load")
|
||||
EOF
|
||||
# # Adjust path as needed
|
||||
# env_path = "/workspaces/model/backend/.env"
|
||||
# if os.path.exists(env_path):
|
||||
# load_dotenv(env_path)
|
||||
# print("✔ Loaded .env into Jupyter kernel")
|
||||
# else:
|
||||
# print("⚠ No .env file found to load")
|
||||
# EOF
|
||||
|
|
|
|||
39
.github/workflows/deploy_terraform.yml
vendored
39
.github/workflows/deploy_terraform.yml
vendored
|
|
@ -257,7 +257,7 @@ jobs:
|
|||
AWS_REGION: ${{ secrets.DEV_AWS_REGION }}
|
||||
|
||||
# ============================================================
|
||||
# Deploy Categorisation Lambda
|
||||
# Deploy Ara Engine Lambda
|
||||
# ============================================================
|
||||
ara_engine_lambda:
|
||||
needs: [ara_engine_image, determine_stage]
|
||||
|
|
@ -280,4 +280,39 @@ jobs:
|
|||
TF_VAR_secret_key: ${{ secrets.DEV_SECRET_KEY }}
|
||||
TF_VAR_domain_name: ${{ secrets.DEV_DOMAIN_NAME }}
|
||||
TF_VAR_epc_auth_token: ${{ secrets.DEV_EPC_AUTH_TOKEN }}
|
||||
TF_VAR_google_solar_api_key: ${{ secrets.DEV_GOOGLE_SOLAR_API_KEY }}
|
||||
TF_VAR_google_solar_api_key: ${{ secrets.DEV_GOOGLE_SOLAR_API_KEY }}
|
||||
|
||||
# ============================================================
|
||||
# 2️⃣ Build OrdanceSurvey image and Push
|
||||
# ============================================================
|
||||
ordnanceSurvey_image:
|
||||
needs: [determine_stage, shared_terraform]
|
||||
uses: ./.github/workflows/_build_image.yml
|
||||
with:
|
||||
ecr_repo: ordnance-${{ needs.determine_stage.outputs.stage }}
|
||||
dockerfile_path: backend/ordnanceSurvey/handler/Dockerfile
|
||||
build_context: .
|
||||
build_args: |
|
||||
DEV_DB_HOST=$DEV_DB_HOST
|
||||
DEV_DB_PORT=$DEV_DB_PORT
|
||||
DEV_DB_NAME=$DEV_DB_NAME
|
||||
secrets:
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.DEV_AWS_ACCESS_KEY_ID }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.DEV_AWS_SECRET_ACCESS_KEY }}
|
||||
AWS_REGION: ${{ secrets.DEV_AWS_REGION }}
|
||||
DEV_DB_HOST: ${{ secrets.DEV_DB_HOST }}
|
||||
DEV_DB_PORT: ${{ secrets.DEV_DB_PORT }}
|
||||
DEV_DB_NAME: ${{ secrets.DEV_DB_NAME }}
|
||||
|
||||
# ============================================================
|
||||
# 3️⃣ Deploy OrdanceSurvey Lambda
|
||||
# ============================================================
|
||||
ordnanceSurvey_lambda:
|
||||
needs: [ordnanceSurvey_image, determine_stage]
|
||||
uses: ./.github/workflows/_deploy_lambda.yml
|
||||
with:
|
||||
lambda_name: ordnanceSurvey
|
||||
lambda_path: infrastructure/terraform/lambda/ordnanceSurvey
|
||||
stage: ${{ needs.determine_stage.outputs.stage }}
|
||||
ecr_repo: postcode_splitter-${{ needs.determine_stage.outputs.stage }}
|
||||
image_digest: ${{ needs.ordnanceSurvey_image.outputs.image_digest }}
|
||||
|
|
|
|||
8
.vscode/settings.json
vendored
8
.vscode/settings.json
vendored
|
|
@ -16,7 +16,13 @@
|
|||
"python.languageServer": "Pylance",
|
||||
"python.analysis.typeCheckingMode": "strict",
|
||||
"python.analysis.autoSearchPaths": true,
|
||||
"python.analysis.extraPaths": ["./src"]
|
||||
"python.analysis.extraPaths": ["./src"],
|
||||
|
||||
"vim.useCtrlKeys": true,
|
||||
"vim.handleKeys": {
|
||||
"<C-c>": false,
|
||||
"<C-v>": false
|
||||
}
|
||||
|
||||
// Hot reload setting that needs to be in user settings
|
||||
// "jupyter.runStartupCommands": [
|
||||
|
|
|
|||
|
|
@ -243,7 +243,7 @@ def app():
|
|||
if skip is not None and not force_retrieve_data:
|
||||
if i <= skip:
|
||||
continue
|
||||
chunk = asset_list.standardised_asset_list[i: i + chunk_size]
|
||||
chunk = asset_list.standardised_asset_list[i : i + chunk_size]
|
||||
epc_data_chunk, errors_chunk, no_epc_chunk = get_data(
|
||||
df=chunk,
|
||||
row_id_name=asset_list.DOMNA_PROPERTY_ID,
|
||||
|
|
@ -386,7 +386,7 @@ def app():
|
|||
# Retrieve just the data we need
|
||||
epc_df = epc_df[
|
||||
[asset_list.DOMNA_PROPERTY_ID] + list(asset_list.EPC_API_DATA_NAMES.keys())
|
||||
].rename(columns=asset_list.EPC_API_DATA_NAMES)
|
||||
].rename(columns=asset_list.EPC_API_DATA_NAMES)
|
||||
|
||||
# Look for columns not in the find my EPC data, which will have happened if we didn't
|
||||
# retrieve it in the first place
|
||||
|
|
@ -403,7 +403,7 @@ def app():
|
|||
find_my_epc_data[
|
||||
[asset_list.DOMNA_PROPERTY_ID, "epc_has_floor_recommendation"]
|
||||
+ list(asset_list.FIND_EPC_DATA_NAMES.keys())
|
||||
].rename(columns=asset_list.FIND_EPC_DATA_NAMES),
|
||||
].rename(columns=asset_list.FIND_EPC_DATA_NAMES),
|
||||
how="left",
|
||||
on=asset_list.DOMNA_PROPERTY_ID,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,13 +1,11 @@
|
|||
from typing import Optional
|
||||
|
||||
from epc_api.client import EpcClient
|
||||
import os
|
||||
from urllib.parse import urlencode
|
||||
import pandas as pd
|
||||
from difflib import SequenceMatcher
|
||||
from utils.logger import setup_logger
|
||||
import re
|
||||
from typing import Set
|
||||
import json
|
||||
import requests
|
||||
from uuid import UUID
|
||||
import uuid
|
||||
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
|
||||
|
|
@ -18,6 +16,8 @@ from utils.s3 import (
|
|||
)
|
||||
from datetime import datetime
|
||||
|
||||
from backend.utils.addressMatch import AddressMatch
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
|
|
@ -29,191 +29,6 @@ if EPC_AUTH_TOKEN is None:
|
|||
raise RuntimeError("EPC_AUTH_TOKEN not defined in env")
|
||||
|
||||
|
||||
def is_valid_postcode(postcode_clean: str) -> bool:
|
||||
"""
|
||||
Validate postcode using postcodes.io.
|
||||
|
||||
Expects a sanitised postcode (e.g. E84SQ).
|
||||
Returns True if valid, False otherwise.
|
||||
"""
|
||||
POSTCODES_IO_VALIDATE_URL = "https://api.postcodes.io/postcodes/{postcode}/validate"
|
||||
if not postcode_clean:
|
||||
return False
|
||||
|
||||
try:
|
||||
resp = requests.get(
|
||||
POSTCODES_IO_VALIDATE_URL.format(postcode=postcode_clean),
|
||||
timeout=5,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json().get("result", False)
|
||||
except requests.RequestException:
|
||||
# Network issues, rate limits, etc.
|
||||
return False
|
||||
|
||||
|
||||
def levenshtein(a: str, b: str) -> float:
|
||||
"""
|
||||
Address similarity score in [0, 1].
|
||||
|
||||
Strategy:
|
||||
- Normalise
|
||||
- Strongly penalise mismatched house/flat numbers
|
||||
- Combine token overlap + character similarity
|
||||
"""
|
||||
|
||||
def extract_number_sequence(s: str) -> list[str]:
|
||||
return re.findall(r"\d+[a-z]?", s)
|
||||
|
||||
def extract_numbers(s: str) -> Set[str]:
|
||||
return set(extract_number_sequence(s))
|
||||
|
||||
def tokenise(s: str) -> Set[str]:
|
||||
return set(s.split())
|
||||
|
||||
def extract_building_number(s: str) -> str | None:
|
||||
"""
|
||||
Extract the main building number (NOT flat/unit).
|
||||
Assumes formats like:
|
||||
- '42 moreton road'
|
||||
- 'flat 3 42 moreton road'
|
||||
"""
|
||||
tokens = s.split()
|
||||
|
||||
# remove flat/unit context
|
||||
cleaned = []
|
||||
skip_next = False
|
||||
for t in tokens:
|
||||
if t in ("flat", "apt", "apartment", "unit"):
|
||||
skip_next = True
|
||||
continue
|
||||
if skip_next:
|
||||
skip_next = False
|
||||
continue
|
||||
cleaned.append(t)
|
||||
|
||||
# first remaining number is building number
|
||||
for t in cleaned:
|
||||
if re.fullmatch(r"\d+[a-z]?", t):
|
||||
return t
|
||||
|
||||
return None
|
||||
|
||||
a_norm = normalise_address(a)
|
||||
b_norm = normalise_address(b)
|
||||
|
||||
# --- hard signal: numbers ---
|
||||
nums_a = extract_numbers(a_norm)
|
||||
nums_b = extract_numbers(b_norm)
|
||||
|
||||
if nums_a and not nums_b:
|
||||
return 0.0
|
||||
|
||||
# No shared numbers at all → impossible match
|
||||
if nums_a and nums_b and nums_a.isdisjoint(nums_b):
|
||||
return 0.0
|
||||
|
||||
# 🔒 HARD GUARD: building number must match
|
||||
bld_a = extract_building_number(a_norm)
|
||||
bld_b = extract_building_number(b_norm)
|
||||
|
||||
if bld_a and bld_b and bld_a != bld_b:
|
||||
return 0.0
|
||||
|
||||
# --- order-sensitive flat/building guard ---
|
||||
seq_a = extract_number_sequence(a_norm)
|
||||
seq_b = extract_number_sequence(b_norm)
|
||||
|
||||
has_flat_token_user = any(
|
||||
tok in a_norm for tok in ("flat", "apt", "apartment", "unit")
|
||||
)
|
||||
has_flat_token_epc = "flat" in b_norm
|
||||
|
||||
if (
|
||||
len(seq_a) == 2
|
||||
and len(seq_b) >= 2
|
||||
and has_flat_token_epc
|
||||
and not has_flat_token_user
|
||||
and seq_a != seq_b[:2]
|
||||
):
|
||||
return 0.0
|
||||
|
||||
# --- token similarity (order-independent) ---
|
||||
toks_a = tokenise(a_norm)
|
||||
toks_b = tokenise(b_norm)
|
||||
|
||||
if not toks_a or not toks_b:
|
||||
token_score = 0.0
|
||||
else:
|
||||
token_score = len(toks_a & toks_b) / len(toks_a | toks_b)
|
||||
|
||||
# --- character similarity (soft signal) ---
|
||||
char_score = SequenceMatcher(None, a_norm, b_norm).ratio()
|
||||
|
||||
# --- weighted blend ---
|
||||
return round(
|
||||
0.65 * token_score + 0.35 * char_score,
|
||||
4,
|
||||
)
|
||||
|
||||
|
||||
def normalise_address(s: str) -> str:
|
||||
"""
|
||||
Canonical UK-focused address normalisation.
|
||||
|
||||
- Lowercases
|
||||
- Removes punctuation (keeps / for flats)
|
||||
- Normalises whitespace
|
||||
- Applies synonym compression at token level
|
||||
"""
|
||||
|
||||
if not s:
|
||||
return ""
|
||||
|
||||
ADDRESS_SYNONYMS = {
|
||||
# street types
|
||||
"rd": "road",
|
||||
"rd.": "road",
|
||||
"st": "street",
|
||||
"st.": "street",
|
||||
"ave": "avenue",
|
||||
"ave.": "avenue",
|
||||
"ln": "lane",
|
||||
"ln.": "lane",
|
||||
"cres": "crescent",
|
||||
"ct": "court",
|
||||
"dr": "drive",
|
||||
# flats / units
|
||||
"apt": "flat",
|
||||
"apartment": "flat",
|
||||
"unit": "flat",
|
||||
"ste": "suite",
|
||||
# numbering noise
|
||||
"no": "",
|
||||
"no.": "",
|
||||
}
|
||||
# 1. lowercase
|
||||
s = s.lower()
|
||||
|
||||
# 1.5 split digit-letter suffixes
|
||||
s = re.sub(r"(\d+)([a-z])\b", r"\1 \2", s)
|
||||
|
||||
# 2. remove punctuation except /
|
||||
s = re.sub(r"[^\w\s/]", " ", s)
|
||||
|
||||
# 3. normalise whitespace
|
||||
s = re.sub(r"\s+", " ", s).strip()
|
||||
|
||||
# 4. tokenise + synonym normalisation
|
||||
tokens = []
|
||||
for tok in s.split():
|
||||
replacement = ADDRESS_SYNONYMS.get(tok, tok)
|
||||
if replacement:
|
||||
tokens.append(replacement)
|
||||
|
||||
return " ".join(tokens)
|
||||
|
||||
|
||||
def score_addresses(
|
||||
df: pd.DataFrame,
|
||||
user_address: str,
|
||||
|
|
@ -222,7 +37,7 @@ def score_addresses(
|
|||
if column not in df.columns:
|
||||
raise ValueError(f"Missing column: {column}")
|
||||
|
||||
return df[column].apply(lambda x: levenshtein(user_address, x))
|
||||
return df[column].apply(lambda x: AddressMatch.score(user_address, x))
|
||||
|
||||
|
||||
def get_epc_data_with_postcode(postcode, size=500, attempt=1, max_attempts=3):
|
||||
|
|
@ -314,9 +129,11 @@ def get_uprn_candidates(
|
|||
|
||||
out = df.copy()
|
||||
|
||||
user_norm = normalise_address(user_address)
|
||||
user_norm = AddressMatch.normalise_address(user_address)
|
||||
|
||||
out["lexiscore"] = out[address_column].apply(lambda x: levenshtein(user_norm, x))
|
||||
out["lexiscore"] = out[address_column].apply(
|
||||
lambda x: AddressMatch.levenshtein(user_norm, x)
|
||||
)
|
||||
|
||||
# Normalise UPRN to string
|
||||
out[uprn_column] = out[uprn_column].astype(str).str.replace(r"\.0$", "", regex=True)
|
||||
|
|
@ -480,7 +297,10 @@ def resolve_uprns_for_postcode_group(
|
|||
|
||||
|
||||
def save_results_to_s3(
|
||||
results_df: pd.DataFrame, task_id: str, sub_task_id: str, bucket_name: str = None
|
||||
results_df: pd.DataFrame,
|
||||
task_id: str,
|
||||
sub_task_id: str,
|
||||
bucket_name: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""
|
||||
Save results DataFrame to S3 as CSV.
|
||||
|
|
@ -533,7 +353,7 @@ def handler(event, context, local=False):
|
|||
{
|
||||
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
|
||||
"sub_task_id": "6a427b6e-1ece-4983-b1e5-9bffccc53d1d",
|
||||
"s3_uri": "s3://retrofit-data-dev/ara_postcode_splitter_batches/e31f2f21-175b-4a91-a3ec-a6baa325e917/8673913b-1a88-42d7-8578-0449123d94b0/2026-02-16T12:00:20.257856_7b520c0e.csv",
|
||||
"s3_uri": "s3://retrofit-data-dev/ara_postcode_splitter_batches/e31f2f21-175b-4a91-a3ec-a6baa325e917/8673913b-1a88-42d7-8578-0449123d94b0/2026-02-18T11:47:00.822579_f95467f5.csv",
|
||||
}
|
||||
)
|
||||
}
|
||||
|
|
@ -621,19 +441,6 @@ def handler(event, context, local=False):
|
|||
# Process the rows
|
||||
logger.info(f"Processing {len(df)} rows for task {task_id}")
|
||||
|
||||
# Create user_input column by concatenating Address columns if not already present
|
||||
if "user_input" not in df.columns:
|
||||
df["user_input"] = (
|
||||
df["Address 1"].fillna("")
|
||||
+ " "
|
||||
+ df["Address 2"].fillna("")
|
||||
+ " "
|
||||
+ df["Address 3"].fillna("")
|
||||
).str.strip()
|
||||
logger.info(f"Created user_input column from Address 1 and Address 2")
|
||||
else:
|
||||
logger.info(f"user_input column already present in data")
|
||||
|
||||
clean_df = df.dropna(subset=["postcode_clean"])
|
||||
|
||||
postcode_to_addresses = {
|
||||
|
|
@ -653,7 +460,7 @@ def handler(event, context, local=False):
|
|||
)
|
||||
|
||||
# Validate postcode before processing
|
||||
if not is_valid_postcode(postcode):
|
||||
if not AddressMatch.is_valid_postcode(postcode):
|
||||
logger.warning(f"Postcode {postcode} is invalid, skipping")
|
||||
continue
|
||||
|
||||
|
|
@ -672,57 +479,67 @@ def handler(event, context, local=False):
|
|||
# Process each address in this postcode with the same EPC data
|
||||
for row in postcode_rows:
|
||||
try:
|
||||
user_input = row.get("user_input", "")
|
||||
if not user_input:
|
||||
# Concatenate Address columns directly
|
||||
address2uprn_user_input = (
|
||||
str(row.get("Address 1", "")).strip()
|
||||
+ " "
|
||||
+ str(row.get("Address 2", "")).strip()
|
||||
+ " "
|
||||
+ str(row.get("Address 3", "")).strip()
|
||||
).strip()
|
||||
|
||||
if not address2uprn_user_input:
|
||||
logger.warning(
|
||||
f"Skipping row with missing user_input for postcode {postcode}"
|
||||
f"Skipping row with missing address components for postcode {postcode}"
|
||||
)
|
||||
continue
|
||||
|
||||
# Get UPRN using the pre-fetched EPC data with all return options
|
||||
result = get_uprn_with_epc_df(
|
||||
user_inputed_address=user_input, epc_df=epc_df, verbose=True
|
||||
user_inputed_address=address2uprn_user_input,
|
||||
epc_df=epc_df,
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
# Parse result tuple if successful
|
||||
if result:
|
||||
uprn, found_address, score = result
|
||||
logger.info(
|
||||
f"Found UPRN for {user_input} in {postcode}: {uprn} (score: {score})"
|
||||
f"Found UPRN for {address2uprn_user_input} in {postcode}: {uprn} (score: {score})"
|
||||
)
|
||||
|
||||
results_data.append(
|
||||
{
|
||||
**row, # Include all original data
|
||||
"uprn": uprn,
|
||||
"domna_found_address": found_address,
|
||||
"domna_lexiscore": score,
|
||||
"address2uprn_uprn": uprn,
|
||||
"address2uprn_address": found_address,
|
||||
"address2uprn_lexiscore": score,
|
||||
}
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"No UPRN found for {user_input} in {postcode}"
|
||||
f"No UPRN found for {address2uprn_user_input} in {postcode}"
|
||||
)
|
||||
results_data.append(
|
||||
{
|
||||
**row, # Include all original data
|
||||
"uprn": None,
|
||||
"domna_found_address": None,
|
||||
"domna_lexiscore": None,
|
||||
"address2uprn_uprn": None,
|
||||
"address2uprn_address": None,
|
||||
"address2uprn_lexiscore": None,
|
||||
}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error processing address {row.get('user_input', 'unknown')}: {e}"
|
||||
f"Error processing address {row.get('address2uprn_user_input', 'unknown')}: {e}"
|
||||
)
|
||||
# Still add the row with error markers
|
||||
results_data.append(
|
||||
{
|
||||
**row,
|
||||
"uprn": None,
|
||||
"domna_found_address": None,
|
||||
"domna_lexiscore": None,
|
||||
"address2uprn_uprn": None,
|
||||
"address2uprn_address": None,
|
||||
"address2uprn_lexiscore": None,
|
||||
"error": str(e),
|
||||
}
|
||||
)
|
||||
|
|
|
|||
|
|
@ -63,6 +63,8 @@ class Settings(BaseSettings):
|
|||
# Other S3 buckts
|
||||
ENERGY_ASSESSMENTS_BUCKET: str = "changeme"
|
||||
|
||||
ORDNANCE_SURVEY_API_KEY: str = "changeme"
|
||||
|
||||
# Optional AWS creds (only required in local)
|
||||
AWS_ACCESS_KEY_ID: Optional[str] = None
|
||||
AWS_SECRET_KEY_ID: Optional[str] = None
|
||||
|
|
|
|||
24
backend/app/db/models/postcode_search.py
Normal file
24
backend/app/db/models/postcode_search.py
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
import pytz
|
||||
import datetime
|
||||
from sqlalchemy import (
|
||||
Column,
|
||||
BigInteger,
|
||||
Text,
|
||||
DateTime,
|
||||
)
|
||||
from sqlalchemy.dialects.postgresql import JSONB
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
class PostcodeSearchModel(Base):
|
||||
__tablename__ = "postcode_search"
|
||||
|
||||
id = Column(BigInteger, primary_key=True, autoincrement=True)
|
||||
postcode = Column(Text, nullable=False)
|
||||
result_data = Column(JSONB, nullable=True)
|
||||
|
||||
created_at = Column(
|
||||
DateTime(timezone=True), nullable=False, default=datetime.datetime.now(pytz.utc)
|
||||
)
|
||||
|
|
@ -43,7 +43,7 @@ def generate_api_key():
|
|||
# Define the characters that will be used to generate the api key
|
||||
characters = string.ascii_letters + string.digits
|
||||
# Generate a 40 character long api key
|
||||
api_key = ''.join(secrets.choice(characters) for _ in range(40))
|
||||
api_key = "".join(secrets.choice(characters) for _ in range(40))
|
||||
return api_key
|
||||
|
||||
|
||||
|
|
@ -113,7 +113,7 @@ def save_dataframe_to_s3_parquet(df, bucket_name, file_key):
|
|||
df.to_parquet(parquet_buffer)
|
||||
|
||||
# Create the boto3 client
|
||||
s3 = boto3.resource('s3')
|
||||
s3 = boto3.resource("s3")
|
||||
|
||||
# Upload the Parquet file to S3
|
||||
s3.Object(bucket_name, file_key).put(Body=parquet_buffer.getvalue())
|
||||
|
|
|
|||
0
backend/ordnanceSurvey/__init__.py
Normal file
0
backend/ordnanceSurvey/__init__.py
Normal file
25
backend/ordnanceSurvey/handler/Dockerfile
Normal file
25
backend/ordnanceSurvey/handler/Dockerfile
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
FROM public.ecr.aws/lambda/python:3.11
|
||||
|
||||
ARG DEV_DB_HOST
|
||||
ARG DEV_DB_PORT
|
||||
ARG DEV_DB_NAME
|
||||
|
||||
ENV DB_HOST=${DEV_DB_HOST}
|
||||
ENV DB_PORT=${DEV_DB_PORT}
|
||||
ENV DB_NAME=${DEV_DB_NAME}
|
||||
|
||||
# Set working directory (Lambda task root)
|
||||
WORKDIR /var/task
|
||||
|
||||
COPY backend/ordnanceSurvey/handler/requirements.txt .
|
||||
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
# Copy necessary files for database and utility imports
|
||||
COPY utils/ utils/
|
||||
COPY backend/ backend/
|
||||
COPY datatypes/ datatypes/
|
||||
|
||||
# Lambda handler
|
||||
CMD ["backend/ordnanceSurvey/main.handler"]
|
||||
|
||||
11
backend/ordnanceSurvey/handler/requirements.txt
Normal file
11
backend/ordnanceSurvey/handler/requirements.txt
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
pandas==2.2.2
|
||||
numpy<2.0
|
||||
requests
|
||||
tqdm
|
||||
openpyxl
|
||||
epc-api-python==1.0.2
|
||||
boto3==1.35.44
|
||||
sqlmodel
|
||||
sqlalchemy==2.0.36
|
||||
psycopg2-binary==2.9.10
|
||||
pydantic-settings==2.6.0
|
||||
48
backend/ordnanceSurvey/helpers.py
Normal file
48
backend/ordnanceSurvey/helpers.py
Normal file
|
|
@ -0,0 +1,48 @@
|
|||
import urllib.parse
|
||||
from pydantic import ValidationError
|
||||
import requests
|
||||
import pandas as pd
|
||||
from utils.logger import setup_logger
|
||||
from backend.ordnanceSurvey.types import PostcodeResponse
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
def os_places_results_to_dataframe(data: dict) -> pd.DataFrame:
|
||||
"""
|
||||
Flatten the OS Places API response results into a DataFrame.
|
||||
Each result contains either a DPA or LPI record.
|
||||
"""
|
||||
results = data.get("results", [])
|
||||
rows = []
|
||||
for r in results:
|
||||
if "DPA" in r:
|
||||
rows.append(r["DPA"])
|
||||
elif "LPI" in r:
|
||||
rows.append(r["LPI"])
|
||||
return pd.DataFrame(rows)
|
||||
|
||||
|
||||
def lookup_os_places(postcode: str, api_key: str) -> dict:
|
||||
"""
|
||||
Lookup a postcode using the OS Places API.
|
||||
Returns the full API response data or an error dict.
|
||||
"""
|
||||
if not api_key:
|
||||
return {"error": "Ordnance Survey API key not specified", "status": 400}
|
||||
|
||||
encoded_postcode = urllib.parse.quote(postcode)
|
||||
url = (
|
||||
f"https://api.os.uk/search/places/v1/postcode?postcode={encoded_postcode}"
|
||||
f"&dataset=DPA,LPI&key={api_key}"
|
||||
)
|
||||
|
||||
response = requests.get(url)
|
||||
if response.status_code != 200:
|
||||
logger.error(
|
||||
f"OS Places API error for postcode {postcode}: {response.status_code}"
|
||||
)
|
||||
return {"error": "Failed to fetch address data", "status": response.status_code}
|
||||
|
||||
data = response.json()
|
||||
return {"data": data, "status": 200}
|
||||
11
backend/ordnanceSurvey/local_handler/docker-compose.yml
Normal file
11
backend/ordnanceSurvey/local_handler/docker-compose.yml
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
version: "3.9"
|
||||
|
||||
services:
|
||||
ordnance-survey-lambda:
|
||||
build:
|
||||
context: ../../../
|
||||
dockerfile: backend/ordnanceSurvey/handler/Dockerfile
|
||||
ports:
|
||||
- "9000:8080"
|
||||
env_file:
|
||||
- ../../../.env
|
||||
29
backend/ordnanceSurvey/local_handler/invoke_local_lambda.py
Normal file
29
backend/ordnanceSurvey/local_handler/invoke_local_lambda.py
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
#!/usr/bin/env python3
|
||||
import json
|
||||
import requests
|
||||
|
||||
HOST = "localhost"
|
||||
PORT = "9000"
|
||||
|
||||
LAMBDA_URL = f"http://{HOST}:{PORT}/2015-03-31/functions/function/invocations"
|
||||
|
||||
payload = {
|
||||
"Records": [
|
||||
{
|
||||
"body": json.dumps(
|
||||
{
|
||||
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
|
||||
"sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0",
|
||||
"s3_uri": "s3://retrofit-data-dev/ara_raw_outputs/e31f2f21-175b-4a91-a3ec-a6baa325e917/6a427b6e-1ece-4983-b1e5-9bffccc53d1d/2026-03-04T16:48:22.339995_634c88fc.csv",
|
||||
"lexiscore_column": "address2uprn_lexiscore",
|
||||
}
|
||||
)
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
response = requests.post(LAMBDA_URL, json=payload)
|
||||
|
||||
print("Status code:", response.status_code)
|
||||
print("Response:")
|
||||
print(response.text)
|
||||
227
backend/ordnanceSurvey/main.py
Normal file
227
backend/ordnanceSurvey/main.py
Normal file
|
|
@ -0,0 +1,227 @@
|
|||
from typing import Any, Optional
|
||||
import json
|
||||
from utils.logger import setup_logger
|
||||
import logging
|
||||
from backend.utils.subtasks import subtask_handler
|
||||
from utils.s3 import (
|
||||
save_csv_to_s3,
|
||||
read_csv_from_s3 as read_csv_from_s3_dict,
|
||||
parse_s3_uri,
|
||||
)
|
||||
from backend.utils.addressMatch import AddressMatch
|
||||
from backend.app.db.connection import get_db_session
|
||||
from backend.app.db.models.postcode_search import PostcodeSearchModel
|
||||
from backend.utils.ordnance_survey import (
|
||||
lookup_os_places,
|
||||
os_places_results_to_dataframe,
|
||||
)
|
||||
from backend.app.config import get_settings
|
||||
from sqlalchemy import select
|
||||
from datetime import datetime
|
||||
import uuid
|
||||
import os
|
||||
|
||||
import pandas as pd
|
||||
|
||||
logger: logging.Logger = setup_logger()
|
||||
|
||||
|
||||
def check_if_post_code_exists_in_db_cache(postcode):
|
||||
|
||||
with get_db_session() as session:
|
||||
result = (
|
||||
session.execute(
|
||||
select(PostcodeSearchModel).where(
|
||||
PostcodeSearchModel.postcode == postcode
|
||||
)
|
||||
)
|
||||
.scalars()
|
||||
.first()
|
||||
)
|
||||
if result:
|
||||
return os_places_results_to_dataframe(result.result_data)
|
||||
|
||||
# Cache miss — fetch from OS Places API
|
||||
api_key = get_settings().ORDNANCE_SURVEY_API_KEY
|
||||
response = lookup_os_places(postcode, api_key)
|
||||
|
||||
if response.get("status") != 200 or "data" not in response:
|
||||
logger.error(f"OS Places API failed for {postcode}: {response}")
|
||||
return pd.DataFrame()
|
||||
|
||||
# Save to cache
|
||||
new_record = PostcodeSearchModel(
|
||||
postcode=postcode,
|
||||
result_data=response["data"],
|
||||
)
|
||||
session.add(new_record)
|
||||
session.commit()
|
||||
|
||||
return os_places_results_to_dataframe(response["data"])
|
||||
|
||||
|
||||
def get_ordance_survey_record(row, cache=None):
|
||||
if cache is None:
|
||||
cache = check_if_post_code_exists_in_db_cache(postcode)
|
||||
|
||||
# process cache with row
|
||||
|
||||
|
||||
def save_results_to_s3(
|
||||
results_df: pd.DataFrame,
|
||||
task_id: str,
|
||||
sub_task_id: str,
|
||||
bucket_name: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""
|
||||
Save results DataFrame to S3 as CSV in a parent folder structure.
|
||||
|
||||
:param results_df: The DataFrame containing results
|
||||
:param task_id: The task ID (used for file naming)
|
||||
:param sub_task_id: The subtask ID (used for file naming)
|
||||
:param bucket_name: The S3 bucket name (defaults to env variable)
|
||||
:return: True if successful, False otherwise
|
||||
"""
|
||||
if bucket_name is None:
|
||||
bucket_name = os.getenv("S3_BUCKET_NAME")
|
||||
|
||||
if not bucket_name:
|
||||
logger.error(
|
||||
"S3 bucket name not provided and S3_BUCKET_NAME environment variable not set"
|
||||
)
|
||||
return False
|
||||
|
||||
try:
|
||||
# Create a filename with timestamp and UUID
|
||||
file_name = f"{datetime.now().isoformat()}_{str(uuid.uuid4())[:8]}"
|
||||
file_key = f"ara_ordnance_survey_outputs/{task_id}/{sub_task_id}/ordnanceSurvey/{file_name}.csv"
|
||||
|
||||
# Save to S3
|
||||
success = save_csv_to_s3(results_df, bucket_name, file_key)
|
||||
|
||||
if success:
|
||||
logger.info(f"Successfully saved results to s3://{bucket_name}/{file_key}")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"Failed to save results to S3")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving results to S3: {str(e)}")
|
||||
return False
|
||||
|
||||
|
||||
@subtask_handler() # This assumes task_id and subtask_id is defined in event.Records.body
|
||||
def handler(body: dict[str, Any], context: Any, local: bool = False) -> None:
|
||||
|
||||
# delete this line after test
|
||||
# local = True
|
||||
# Example SQS message for testing (copy and paste into SQS):
|
||||
if local is True:
|
||||
body = {
|
||||
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
|
||||
"sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0",
|
||||
"s3_uri": "s3://retrofit-data-dev/ara_raw_outputs/e31f2f21-175b-4a91-a3ec-a6baa325e917/6a427b6e-1ece-4983-b1e5-9bffccc53d1d/2026-03-04T16:48:22.339995_634c88fc.csv",
|
||||
"lexiscore_column": "address2uprn_lexiscore",
|
||||
}
|
||||
|
||||
s3_uri: str = body.get("s3_uri", "")
|
||||
lexiscore_threshold: float = body.get("lexiscore_threshold", 0.5)
|
||||
lexiscore_column: Optional[str] = body.get("lexiscore_column", None)
|
||||
task_id: str = body.get("task_id", "")
|
||||
sub_task_id: str = body.get("sub_task_id", "")
|
||||
|
||||
if s3_uri == "":
|
||||
raise RuntimeError("Missing s3_uri in message body")
|
||||
|
||||
bucket, key = parse_s3_uri(s3_uri)
|
||||
|
||||
# Assumption designing with address2uprn was ran first
|
||||
csv_data = read_csv_from_s3_dict(bucket, key)
|
||||
df = pd.DataFrame(csv_data)
|
||||
# df = df.head(5)
|
||||
|
||||
# If lexiscore_column is specified, use it; otherwise process all rows
|
||||
if lexiscore_column and lexiscore_column in df.columns:
|
||||
df[lexiscore_column] = pd.to_numeric(df[lexiscore_column], errors="coerce")
|
||||
needs_processing = df[
|
||||
df[lexiscore_column].isna() | (df[lexiscore_column] < lexiscore_threshold)
|
||||
]
|
||||
else:
|
||||
# Default: process all rows
|
||||
needs_processing = df
|
||||
|
||||
grouped = needs_processing.groupby("postcode_clean")
|
||||
|
||||
# Initialise new columns
|
||||
df["ordnance_survey_address"] = None
|
||||
df["ordnance_survey_uprn"] = None
|
||||
df["ordnance_survey_lexiscore"] = None
|
||||
|
||||
# Process each postcode group at a time
|
||||
for postcode, group in grouped:
|
||||
print(f"Processing postcode: {postcode} ({len(group)} rows)")
|
||||
valid_group = AddressMatch.is_valid_postcode(postcode)
|
||||
if not valid_group:
|
||||
logger.warning(f"Postcode {postcode} is invalid, skipping")
|
||||
for idx in group.index:
|
||||
df.at[idx, "ordnance_survey_address"] = (
|
||||
"postcode not found in ordnance survey"
|
||||
)
|
||||
df.at[idx, "ordnance_survey_uprn"] = (
|
||||
"postcode not found in ordnance survey"
|
||||
)
|
||||
df.at[idx, "ordnance_survey_lexiscore"] = (
|
||||
"postcode not found in ordnance survey"
|
||||
)
|
||||
continue
|
||||
|
||||
postcode_cache = check_if_post_code_exists_in_db_cache(postcode)
|
||||
if postcode_cache.empty:
|
||||
logger.warning(f"No OS Places data for {postcode}")
|
||||
for idx in group.index:
|
||||
df.at[idx, "ordnance_survey_address"] = (
|
||||
"postcode not found in ordnance survey"
|
||||
)
|
||||
df.at[idx, "ordnance_survey_uprn"] = (
|
||||
"postcode not found in ordnance survey"
|
||||
)
|
||||
df.at[idx, "ordnance_survey_lexiscore"] = (
|
||||
"postcode not found in ordnance survey"
|
||||
)
|
||||
continue
|
||||
|
||||
for idx, row in group.iterrows():
|
||||
# Concatenate Address columns directly
|
||||
ordnancy_survey_user_input = (
|
||||
str(row.get("Address 1", "")).strip()
|
||||
+ " "
|
||||
+ str(row.get("Address 2", "")).strip()
|
||||
+ " "
|
||||
+ str(row.get("Address 3", "")).strip()
|
||||
).strip()
|
||||
|
||||
if not ordnancy_survey_user_input:
|
||||
continue
|
||||
|
||||
# Score against OS Places addresses
|
||||
scores = postcode_cache["ADDRESS"].apply(
|
||||
lambda addr: AddressMatch.score(ordnancy_survey_user_input, addr)
|
||||
)
|
||||
best_idx = scores.idxmax()
|
||||
best_score = scores[best_idx]
|
||||
|
||||
df.at[idx, "ordnance_survey_address"] = postcode_cache.at[
|
||||
best_idx, "ADDRESS"
|
||||
]
|
||||
df.at[idx, "ordnance_survey_uprn"] = postcode_cache.at[best_idx, "UPRN"]
|
||||
df.at[idx, "ordnance_survey_lexiscore"] = best_score
|
||||
|
||||
# Save results locally
|
||||
if local:
|
||||
df.to_csv("ordnance_survey_results.csv", index=False)
|
||||
print(f"Results saved to ordnance_survey_results.csv ({len(df)} rows)")
|
||||
|
||||
# Save results to S3
|
||||
if task_id and sub_task_id:
|
||||
save_results_to_s3(df, task_id, sub_task_id)
|
||||
201
backend/utils/addressMatch.py
Normal file
201
backend/utils/addressMatch.py
Normal file
|
|
@ -0,0 +1,201 @@
|
|||
import re
|
||||
from typing import Any, Optional
|
||||
from difflib import SequenceMatcher
|
||||
import requests
|
||||
|
||||
|
||||
class AddressMatch:
|
||||
def __init__(self):
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def score(a: str, b: str) -> float:
|
||||
score: float = AddressMatch.levenshtein(a, b)
|
||||
|
||||
return score
|
||||
|
||||
@staticmethod
|
||||
def is_valid_postcode(postcode_clean: str) -> bool:
|
||||
"""
|
||||
Validate postcode using postcodes.io.
|
||||
|
||||
Expects a sanitised postcode (e.g. E84SQ).
|
||||
Returns True if valid, False otherwise.
|
||||
"""
|
||||
POSTCODES_IO_VALIDATE_URL = (
|
||||
"https://api.postcodes.io/postcodes/{postcode}/validate"
|
||||
)
|
||||
if not postcode_clean:
|
||||
return False
|
||||
|
||||
try:
|
||||
resp = requests.get(
|
||||
POSTCODES_IO_VALIDATE_URL.format(postcode=postcode_clean),
|
||||
timeout=5,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json().get("result", False)
|
||||
except requests.RequestException:
|
||||
# Network issues, rate limits, etc.
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def normalise_address(s: str) -> str:
|
||||
"""
|
||||
Canonical UK-focused address normalisation.
|
||||
|
||||
- Lowercases
|
||||
- Removes punctuation (keeps / for flats)
|
||||
- Normalises whitespace
|
||||
- Applies synonym compression at token level
|
||||
"""
|
||||
|
||||
if not s:
|
||||
return ""
|
||||
|
||||
ADDRESS_SYNONYMS = {
|
||||
# street types
|
||||
"rd": "road",
|
||||
"rd.": "road",
|
||||
"st": "street",
|
||||
"st.": "street",
|
||||
"ave": "avenue",
|
||||
"ave.": "avenue",
|
||||
"ln": "lane",
|
||||
"ln.": "lane",
|
||||
"cres": "crescent",
|
||||
"ct": "court",
|
||||
"dr": "drive",
|
||||
# flats / units
|
||||
"apt": "flat",
|
||||
"apartment": "flat",
|
||||
"unit": "flat",
|
||||
"ste": "suite",
|
||||
# numbering noise
|
||||
"no": "",
|
||||
"no.": "",
|
||||
}
|
||||
# 1. lowercase
|
||||
s = s.lower()
|
||||
|
||||
# 1.5 split digit-letter suffixes
|
||||
s = re.sub(r"(\d+)([a-z])\b", r"\1 \2", s)
|
||||
|
||||
# 2. remove punctuation except /
|
||||
s = re.sub(r"[^\w\s/]", " ", s)
|
||||
|
||||
# 3. normalise whitespace
|
||||
s = re.sub(r"\s+", " ", s).strip()
|
||||
|
||||
# 4. tokenise + synonym normalisation
|
||||
tokens: list[str] = []
|
||||
for tok in s.split():
|
||||
replacement = ADDRESS_SYNONYMS.get(tok, tok)
|
||||
if replacement:
|
||||
tokens.append(replacement)
|
||||
return " ".join(tokens)
|
||||
|
||||
@staticmethod
|
||||
def levenshtein(a: str, b: str) -> float:
|
||||
"""
|
||||
Address similarity score in [0, 1].
|
||||
|
||||
Strategy:
|
||||
- Normalise
|
||||
- Strongly penalise mismatched house/flat numbers
|
||||
- Combine token overlap + character similarity
|
||||
"""
|
||||
|
||||
def extract_number_sequence(s: str) -> list[str]:
|
||||
return re.findall(r"\d+[a-z]?", s)
|
||||
|
||||
def extract_numbers(s: str) -> set[str]:
|
||||
return set(extract_number_sequence(s))
|
||||
|
||||
def tokenise(s: str) -> set[str]:
|
||||
return set(s.split())
|
||||
|
||||
def extract_building_number(s: str) -> Optional[str]:
|
||||
"""
|
||||
Extract the main building number (NOT flat/unit).
|
||||
Assumes formats like:
|
||||
- '42 moreton road'
|
||||
- 'flat 3 42 moreton road'
|
||||
"""
|
||||
tokens = s.split()
|
||||
|
||||
# remove flat/unit context
|
||||
cleaned: list[Any] = []
|
||||
skip_next = False
|
||||
for t in tokens:
|
||||
if t in ("flat", "apt", "apartment", "unit"):
|
||||
skip_next = True
|
||||
continue
|
||||
if skip_next:
|
||||
skip_next = False
|
||||
continue
|
||||
cleaned.append(t)
|
||||
|
||||
# first remaining number is building number
|
||||
for t in cleaned:
|
||||
if re.fullmatch(r"\d+[a-z]?", t):
|
||||
return t
|
||||
|
||||
return None
|
||||
|
||||
a_norm = AddressMatch.normalise_address(a)
|
||||
b_norm = AddressMatch.normalise_address(b)
|
||||
|
||||
# --- hard signal: numbers ---
|
||||
nums_a = extract_numbers(a_norm)
|
||||
nums_b = extract_numbers(b_norm)
|
||||
|
||||
if nums_a and not nums_b:
|
||||
return 0.0
|
||||
|
||||
# No shared numbers at all → impossible match
|
||||
if nums_a and nums_b and nums_a.isdisjoint(nums_b):
|
||||
return 0.0
|
||||
|
||||
# 🔒 HARD GUARD: building number must match
|
||||
bld_a = extract_building_number(a_norm)
|
||||
bld_b = extract_building_number(b_norm)
|
||||
|
||||
if bld_a and bld_b and bld_a != bld_b:
|
||||
return 0.0
|
||||
|
||||
# --- order-sensitive flat/building guard ---
|
||||
seq_a = extract_number_sequence(a_norm)
|
||||
seq_b = extract_number_sequence(b_norm)
|
||||
|
||||
has_flat_token_user = any(
|
||||
tok in a_norm for tok in ("flat", "apt", "apartment", "unit")
|
||||
)
|
||||
has_flat_token_epc = "flat" in b_norm
|
||||
|
||||
if (
|
||||
len(seq_a) == 2
|
||||
and len(seq_b) >= 2
|
||||
and has_flat_token_epc
|
||||
and not has_flat_token_user
|
||||
and seq_a != seq_b[:2]
|
||||
):
|
||||
return 0.0
|
||||
|
||||
# --- token similarity (order-independent) ---
|
||||
toks_a: set[str] = tokenise(a_norm)
|
||||
toks_b: set[str] = tokenise(b_norm)
|
||||
|
||||
if not toks_a or not toks_b:
|
||||
token_score = 0.0
|
||||
else:
|
||||
token_score = len(toks_a & toks_b) / len(toks_a | toks_b)
|
||||
|
||||
# --- character similarity (soft signal) ---
|
||||
char_score: float = SequenceMatcher(None, a_norm, b_norm).ratio()
|
||||
|
||||
# --- weighted blend ---
|
||||
return round(
|
||||
0.65 * token_score + 0.35 * char_score,
|
||||
4,
|
||||
)
|
||||
95
backend/utils/subtasks.py
Normal file
95
backend/utils/subtasks.py
Normal file
|
|
@ -0,0 +1,95 @@
|
|||
# decorators/subtask_handler.py
|
||||
|
||||
from functools import wraps
|
||||
from typing import Callable, Any
|
||||
from uuid import UUID
|
||||
import json
|
||||
|
||||
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
|
||||
|
||||
|
||||
def subtask_handler():
|
||||
"""
|
||||
Decorator that wraps your existing handler and automatically:
|
||||
|
||||
- Extracts task_id + sub_task_id from event
|
||||
- Marks subtask as in progress
|
||||
- Executes handler logic
|
||||
- Marks subtask complete on success
|
||||
- Marks failed on exception
|
||||
"""
|
||||
|
||||
def decorator(func: Callable[..., Any]):
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(event: dict[str, Any], context: Any, *args, **kwargs):
|
||||
|
||||
records = event.get("Records", [event])
|
||||
|
||||
interface = SubTaskInterface()
|
||||
|
||||
for record in records:
|
||||
|
||||
# -------------------------------
|
||||
# Parse body safely
|
||||
# -------------------------------
|
||||
body = {}
|
||||
|
||||
if isinstance(record.get("body"), str):
|
||||
try:
|
||||
body = json.loads(record["body"])
|
||||
except Exception:
|
||||
body = {}
|
||||
else:
|
||||
body = record.get("body", {}) or {}
|
||||
|
||||
task_id_raw = body.get("task_id")
|
||||
subtask_id_raw = body.get("sub_task_id")
|
||||
|
||||
task_id = UUID(task_id_raw) if isinstance(task_id_raw, str) else None
|
||||
subtask_id = (
|
||||
UUID(subtask_id_raw) if isinstance(subtask_id_raw, str) else None
|
||||
)
|
||||
|
||||
if not task_id or not subtask_id:
|
||||
raise RuntimeError("task_id or sub_task_id missing")
|
||||
|
||||
# -------------------------------
|
||||
# Mark in progress
|
||||
# -------------------------------
|
||||
interface.update_subtask_status(
|
||||
subtask_id=subtask_id,
|
||||
status="in progress",
|
||||
)
|
||||
|
||||
try:
|
||||
# Pass the parsed body into your function
|
||||
result = func(body, context, *args, **kwargs)
|
||||
|
||||
# -------------------------------
|
||||
# Success → mark complete
|
||||
# -------------------------------
|
||||
interface.update_subtask_status(
|
||||
subtask_id=subtask_id,
|
||||
status="complete",
|
||||
outputs={"result": result} if result else None,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
|
||||
# -------------------------------
|
||||
# Failure → mark failed
|
||||
# -------------------------------
|
||||
interface.update_subtask_status(
|
||||
subtask_id=subtask_id,
|
||||
status="failed",
|
||||
outputs={"error": str(e)},
|
||||
)
|
||||
|
||||
raise
|
||||
|
||||
return None
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
|
@ -33,19 +33,6 @@ module "address2uprn" {
|
|||
LOG_LEVEL = "info"
|
||||
DB_USERNAME = local.db_credentials.db_assessment_model_username
|
||||
DB_PASSWORD = local.db_credentials.db_assessment_model_password
|
||||
GOOGLE_SOLAR_API_KEY = "test"
|
||||
SAP_PREDICTIONS_BUCKET = "test"
|
||||
CARBON_PREDICTIONS_BUCKET = "test"
|
||||
HEAT_PREDICTIONS_BUCKET = "test"
|
||||
HEATING_KWH_PREDICTIONS_BUCKET = "test"
|
||||
HOTWATER_KWH_PREDICTIONS_BUCKET = "test"
|
||||
API_KEY = "test"
|
||||
ENVIRONMENT = "test"
|
||||
SECRET_KEY = "test"
|
||||
PLAN_TRIGGER_BUCKET = "test"
|
||||
DATA_BUCKET = "test"
|
||||
ENGINE_SQS_URL = "test"
|
||||
ENERGY_ASSESSMENTS_BUCKET = "test"
|
||||
S3_BUCKET_NAME = data.terraform_remote_state.shared.outputs.retrofit_sap_data_bucket_name
|
||||
},
|
||||
)
|
||||
|
|
|
|||
45
infrastructure/terraform/lambda/ordnanceSurvey/main.tf
Normal file
45
infrastructure/terraform/lambda/ordnanceSurvey/main.tf
Normal file
|
|
@ -0,0 +1,45 @@
|
|||
data "terraform_remote_state" "shared" {
|
||||
backend = "s3"
|
||||
config = {
|
||||
bucket = "assessment-model-terraform-state"
|
||||
key = "env:/${var.stage}/terraform.tfstate"
|
||||
region = "eu-west-2"
|
||||
}
|
||||
}
|
||||
data "aws_secretsmanager_secret_version" "db_credentials" {
|
||||
secret_id = "${var.stage}/assessment_model/db_credentials"
|
||||
}
|
||||
locals {
|
||||
db_credentials = jsondecode(data.aws_secretsmanager_secret_version.db_credentials.secret_string)
|
||||
}
|
||||
|
||||
module "ordnance" {
|
||||
source = "../modules/lambda_with_sqs"
|
||||
|
||||
name = ordnanceSurvey #"address2uprn" for example
|
||||
stage = var.stage
|
||||
|
||||
image_uri = local.image_uri
|
||||
|
||||
timeout = 900
|
||||
|
||||
# Optional: Set maximum_concurrency to limit concurrent SQS-triggered invocations (2-1000)
|
||||
maximum_concurrency = var.maximum_concurrency
|
||||
|
||||
environment = merge(
|
||||
{
|
||||
STAGE = var.stage
|
||||
LOG_LEVEL = "info"
|
||||
DB_USERNAME = local.db_credentials.db_assessment_model_username
|
||||
DB_PASSWORD = local.db_credentials.db_assessment_model_password
|
||||
S3_BUCKET_NAME = data.terraform_remote_state.shared.outputs.retrofit_sap_data_bucket_name
|
||||
ORDNANCE_SURVEY_API_KEY:= "Reminder to add This somehow, ask if we are doing aws secret method or github secret method"
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
# Attach S3 read policy to the Lambda execution role
|
||||
resource "aws_iam_role_policy_attachment" "ordanceSurvey_read_and_write" {
|
||||
role = module.ordnance.role_name
|
||||
policy_arn = data.terraform_remote_state.shared.outputs.ordnance_s3_read_and_write_arn
|
||||
}
|
||||
16
infrastructure/terraform/lambda/ordnanceSurvey/provider.tf
Normal file
16
infrastructure/terraform/lambda/ordnanceSurvey/provider.tf
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
terraform {
|
||||
required_providers {
|
||||
aws = {
|
||||
source = "hashicorp/aws"
|
||||
version = "~> 4.16"
|
||||
}
|
||||
}
|
||||
|
||||
backend "s3" {
|
||||
bucket = "ordnance-terraform-state"
|
||||
key = "terraform.tfstate"
|
||||
region = "eu-west-2"
|
||||
}
|
||||
|
||||
required_version = ">= 1.2.0"
|
||||
}
|
||||
37
infrastructure/terraform/lambda/ordnanceSurvey/variables.tf
Normal file
37
infrastructure/terraform/lambda/ordnanceSurvey/variables.tf
Normal file
|
|
@ -0,0 +1,37 @@
|
|||
variable "lambda_name" {
|
||||
type = string
|
||||
description = "Logical name of the lambda (e.g. address2uprn)"
|
||||
}
|
||||
|
||||
variable "stage" {
|
||||
description = "Deployment stage (e.g. dev, prod)"
|
||||
type = string
|
||||
}
|
||||
variable "ecr_repo_url" {
|
||||
type = string
|
||||
description = "ECR repository URL (no tag, no digest)"
|
||||
}
|
||||
|
||||
variable "image_digest" {
|
||||
type = string
|
||||
description = "Image digest (sha256:...)"
|
||||
}
|
||||
|
||||
variable "maximum_concurrency" {
|
||||
type = number
|
||||
default = null
|
||||
description = "Maximum number of concurrent Lambda invocations from SQS (2-1000). null = no limit."
|
||||
}
|
||||
|
||||
variable "batch_size" {
|
||||
type = number
|
||||
default = 1
|
||||
}
|
||||
|
||||
locals {
|
||||
image_uri = "${var.ecr_repo_url}@${var.image_digest}"
|
||||
}
|
||||
|
||||
output "resolved_image_uri" {
|
||||
value = local.image_uri
|
||||
}
|
||||
|
|
@ -451,6 +451,36 @@ module "categorisation_registry" {
|
|||
stage = var.stage
|
||||
}
|
||||
|
||||
|
||||
################################################
|
||||
# OrdnanceSurveyAPI – Lambda
|
||||
################################################
|
||||
module "ordnance_state_bucket" {
|
||||
source = "../modules/tf_state_bucket"
|
||||
bucket_name = "ordnance-terraform-state"
|
||||
|
||||
}
|
||||
|
||||
module "ordnance_registry" {
|
||||
source = "../modules/container_registry"
|
||||
name = "ordnance"
|
||||
stage = var.stage
|
||||
|
||||
}
|
||||
|
||||
# S3 policy for postcode splitter to read from retrofit data bucket
|
||||
module "ordnance_s3_read_and_write" {
|
||||
source = "../modules/s3_iam_policy"
|
||||
|
||||
policy_name = "OrdnanceSurveyReadandWriteS3"
|
||||
policy_description = "Allow ordnance Lambda to read and write from retrofit-data bucket"
|
||||
bucket_arns = ["arn:aws:s3:::retrofit-data-${var.stage}"]
|
||||
actions = ["s3:GetObject", "s3:ListBucket", "s3:PutObject"]
|
||||
resource_paths = ["/*"]
|
||||
}
|
||||
|
||||
output "ordnance_s3_read_and_write_arn" {
|
||||
value = module.ordnance_s3_read_and_write.policy_arn
|
||||
################################################
|
||||
# Engine – Lambda ECR
|
||||
################################################
|
||||
|
|
|
|||
|
|
@ -29,9 +29,7 @@ from sqlalchemy import func
|
|||
# PORTFOLIO_ID = 206
|
||||
# SCENARIOS = [389]
|
||||
PORTFOLIO_ID = 581
|
||||
SCENARIOS = [
|
||||
1124
|
||||
]
|
||||
SCENARIOS = [1124]
|
||||
scenario_names = {
|
||||
1124: "EPC C - Solar Focused",
|
||||
}
|
||||
|
|
@ -234,7 +232,7 @@ for scenario_id in SCENARIOS:
|
|||
# Get recs for this scenario
|
||||
recommended_measures_df = recommendations_df[
|
||||
recommendations_df["scenario_id"] == scenario_id
|
||||
][["property_id", "measure_type", "estimated_cost", "default"]]
|
||||
][["property_id", "measure_type", "estimated_cost", "default"]]
|
||||
recommended_measures_df = recommended_measures_df[
|
||||
recommended_measures_df["default"]
|
||||
]
|
||||
|
|
@ -242,7 +240,7 @@ for scenario_id in SCENARIOS:
|
|||
|
||||
post_install_sap = recommendations_df[
|
||||
recommendations_df["scenario_id"] == scenario_id
|
||||
][["property_id", "default", "sap_points"]]
|
||||
][["property_id", "default", "sap_points"]]
|
||||
post_install_sap = post_install_sap[post_install_sap["default"]]
|
||||
# Sum up the sap points by property id
|
||||
post_install_sap = (
|
||||
|
|
@ -320,7 +318,7 @@ for scenario_id in SCENARIOS:
|
|||
z = df2[
|
||||
(df2["predicted_post_works_epc"] != "D")
|
||||
& (df2["post_epc_rating"].astype(str) == "Epc.D")
|
||||
]
|
||||
]
|
||||
|
||||
df2["predicted_post_works_epc"].value_counts()
|
||||
df2["post_epc_rating"].astype(str).value_counts()
|
||||
|
|
@ -330,8 +328,6 @@ for scenario_id in SCENARIOS:
|
|||
getting_works = df[df["total_retrofit_cost"] > 0]
|
||||
getting_works["predicted_post_works_epc"].value_counts()
|
||||
|
||||
32565 / getting_works.shape[0]
|
||||
|
||||
df[df["predicted_post_works_sap"] == ""]
|
||||
|
||||
# Expected columns list
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ from io import BytesIO, StringIO
|
|||
from urllib.parse import unquote
|
||||
from utils.logger import setup_logger
|
||||
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
|
||||
from typing import Any
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
|
@ -316,7 +317,7 @@ def save_excel_to_s3(df, bucket_name, file_key):
|
|||
logger.info(f"Excel file saved to S3 bucket '{bucket_name}' with key '{file_key}'")
|
||||
|
||||
|
||||
def read_csv_from_s3(bucket_name, filepath):
|
||||
def read_csv_from_s3(bucket_name: str, filepath: str) -> list[dict[str, str]]:
|
||||
logger.info(
|
||||
f"Reading CSV file from S3 bucket '{bucket_name}' with key '{filepath}'"
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue