added utils to allow easier subtask management

This commit is contained in:
Jun-te Kim 2026-03-02 15:15:39 +00:00
parent 484d7a201e
commit aaa1bdc077
11 changed files with 169 additions and 20 deletions

View file

@ -21,7 +21,7 @@ RUN git clone --depth 1 https://github.com/openvenues/libpostal /tmp/libpostal \
&& rm -rf /tmp/libpostal
# 3) Create the user and grant sudo privileges
RUN useradd -m -s /usr/bin/bash ${USER} \
RUN useradd -m -s /bin/bash ${USER} \
&& echo "${USER} ALL=(ALL) NOPASSWD: ALL" >/etc/sudoers.d/${USER} \
&& chmod 0440 /etc/sudoers.d/${USER}
@ -32,6 +32,11 @@ ADD asset_list/requirements.txt requirements1.txt
RUN cat requirements1.txt requirements2.txt >> requirements.txt
RUN pip install -r requirements.txt
# Install code server
RUN curl -fsSL https://code-server.dev/install.sh | sh
# 5) Workdir
WORKDIR /workspaces/model

View file

@ -2,13 +2,14 @@
"name": "SAL ENV",
"dockerComposeFile": "docker-compose.yml",
"service": "model-sal",
"remoteUser": "vscode",
// "remoteUser": "vscode",
"workspaceFolder": "/workspaces/model",
"postStartCommand": "bash .devcontainer/post-install.sh",
"postStartCommand": "bash .devcontainer/asset_list/post-install.sh",
"mounts": [
// Optional, just makes getting from Downloads (local env) easier
"source=${localEnv:HOME},target=/workspaces/home,type=bind"
"source=${localEnv:HOME},target=/home/vscode,type=bind"
],
"forwardPorts": [8081],
"customizations": {
"vscode": {
"extensions": [

View file

@ -2,15 +2,17 @@ version: '3.8'
services:
model-sal:
user: "${UID}:${GID}"
build:
context: ../..
dockerfile: .devcontainer/asset_list/Dockerfile
command: sleep infinity
command: code-server --bind-addr 0.0.0.0:8080
user: vscode
volumes:
- ../../:/workspaces/model
networks:
- model-net
ports:
- "8081:8080"
networks:
model-net:

View file

@ -16,7 +16,13 @@
"python.languageServer": "Pylance",
"python.analysis.typeCheckingMode": "strict",
"python.analysis.autoSearchPaths": true,
"python.analysis.extraPaths": ["./src"]
"python.analysis.extraPaths": ["./src"],
"vim.useCtrlKeys": true,
"vim.handleKeys": {
"<C-c>": false,
"<C-v>": false
}
// Hot reload setting that needs to be in user settings
// "jupyter.runStartupCommands": [

View file

@ -77,21 +77,21 @@ def app():
data_folder = "/workspaces/model/asset_list"
data_filename = "assests.xlsx"
sheet_name = "Sheet1"
postcode_column = "Postcode"
address1_column = "Address"
postcode_column = "POSTCODE"
address1_column = "ADDRESS"
address1_method = "house_number_extraction"
fulladdress_column = None
address_cols_to_concat = ["Address"]
address_cols_to_concat = ["ADDRESS"]
missing_postcodes_method = None
landlord_year_built = None
landlord_os_uprn = "UPRN"
landlord_property_type = "Archetype"
landlord_built_form = "Bedroom Count"
landlord_wall_construction = "Wall Insulation Type"
landlord_roof_construction = "Roof Type"
landlord_heating_system = "Boiler Type"
landlord_os_uprn = None
landlord_property_type = None
landlord_built_form = None
landlord_wall_construction = None
landlord_roof_construction = None
landlord_heating_system = None
landlord_existing_pv = None
landlord_property_id = "Tab"
landlord_property_id = "UPRN"
landlord_sap = None
outcomes_filename = None
outcomes_sheetname = None
@ -488,3 +488,5 @@ def app():
asset_list.duplicated_addresses.to_excel(
writer, sheet_name="Duplicate Properties", index=False
)

View file

@ -43,7 +43,7 @@ def generate_api_key():
# Define the characters that will be used to generate the api key
characters = string.ascii_letters + string.digits
# Generate a 40 character long api key
api_key = ''.join(secrets.choice(characters) for _ in range(40))
api_key = "".join(secrets.choice(characters) for _ in range(40))
return api_key
@ -113,7 +113,7 @@ def save_dataframe_to_s3_parquet(df, bucket_name, file_key):
df.to_parquet(parquet_buffer)
# Create the boto3 client
s3 = boto3.resource('s3')
s3 = boto3.resource("s3")
# Upload the Parquet file to S3
s3.Object(bucket_name, file_key).put(Body=parquet_buffer.getvalue())

View file

View file

@ -0,0 +1,37 @@
from typing import Any
import json
from utils.logger import setup_logger
import logging
from backend.utils.subtasks import subtask_handler
logger: logging.Logger = setup_logger()
import time
@subtask_handler()
def handler(event: dict[str, Any], context: Any, local: bool = False) -> None:
local = True
# Example SQS message for testing (copy and paste into SQS):
if local is True:
event = {
"Records": [
{
"body": json.dumps(
{
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
"sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0",
}
)
}
]
}
print("sleeping for 30 seconds, subtask should be in progress")
raise RuntimeError("test")
time.sleep(30)
print("subtask should be marked as done")
# ------------------------------
# YOUR BUSINESS LOGIC HERE
# ------------------------------

95
backend/utils/subtasks.py Normal file
View file

@ -0,0 +1,95 @@
# decorators/subtask_handler.py
from functools import wraps
from typing import Callable, Any
from uuid import UUID
import json
from backend.app.db.functions.tasks.Tasks import SubTaskInterface
def subtask_handler():
"""
Decorator that wraps your existing handler and automatically:
- Extracts task_id + sub_task_id from event
- Marks subtask as in progress
- Executes handler logic
- Marks subtask complete on success
- Marks failed on exception
"""
def decorator(func: Callable[..., Any]):
@wraps(func)
def wrapper(event: dict[str, Any], context: Any, *args, **kwargs):
records = event.get("Records", [event])
interface = SubTaskInterface()
for record in records:
# -------------------------------
# Parse body safely
# -------------------------------
body = {}
if isinstance(record.get("body"), str):
try:
body = json.loads(record["body"])
except Exception:
body = {}
else:
body = record.get("body", {}) or {}
task_id_raw = body.get("task_id")
subtask_id_raw = body.get("sub_task_id")
task_id = UUID(task_id_raw) if isinstance(task_id_raw, str) else None
subtask_id = (
UUID(subtask_id_raw) if isinstance(subtask_id_raw, str) else None
)
if not task_id or not subtask_id:
raise RuntimeError("task_id or sub_task_id missing")
# -------------------------------
# Mark in progress
# -------------------------------
interface.update_subtask_status(
subtask_id=subtask_id,
status="in progress",
)
try:
# Pass the parsed body into your function
result = func(body, context, *args, **kwargs)
# -------------------------------
# Success → mark complete
# -------------------------------
interface.update_subtask_status(
subtask_id=subtask_id,
status="complete",
outputs={"result": result} if result else None,
)
except Exception as e:
# -------------------------------
# Failure → mark failed
# -------------------------------
interface.update_subtask_status(
subtask_id=subtask_id,
status="failed",
outputs={"error": str(e)},
)
raise
return None
return wrapper
return decorator

View file

@ -6,6 +6,7 @@ from io import BytesIO, StringIO
from urllib.parse import unquote
from utils.logger import setup_logger
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
from typing import Any
logger = setup_logger()
@ -316,7 +317,7 @@ def save_excel_to_s3(df, bucket_name, file_key):
logger.info(f"Excel file saved to S3 bucket '{bucket_name}' with key '{file_key}'")
def read_csv_from_s3(bucket_name, filepath):
def read_csv_from_s3(bucket_name: str, filepath: str) -> list[dict[str, str]]:
logger.info(
f"Reading CSV file from S3 bucket '{bucket_name}' with key '{filepath}'"
)