Merge branch 'main' of https://github.com/Hestia-Homes/Model into eco-eligiblity-bug

This commit is contained in:
Khalim Conn-Kowlessar 2025-11-14 16:53:56 +00:00
commit 469a1483a5
23 changed files with 1013 additions and 51 deletions

40
.devcontainer/Dockerfile Normal file
View file

@ -0,0 +1,40 @@
FROM python:3.12-bullseye
ARG USER=vscode
ARG DEBIAN_FRONTEND=noninteractive
# 1) Toolchain + utilities for building libpostal
RUN apt-get update && apt-get install -y --no-install-recommends \
sudo jq vim curl git ca-certificates \
build-essential pkg-config automake autoconf libtool \
&& rm -rf /var/lib/apt/lists/*
# # 2) Build and install libpostal from source
# RUN git clone --depth 1 https://github.com/openvenues/libpostal /tmp/libpostal \
# && cd /tmp/libpostal \
# && ./bootstrap.sh \
# && ./configure --datadir=/usr/local/share/libpostal \
# && make -j"$(nproc)" \
# && make install \
# && ldconfig \
# && rm -rf /tmp/libpostal
# 3) Create the user and grant sudo privileges
RUN useradd -m -s /usr/bin/bash ${USER} \
&& echo "${USER} ALL=(ALL) NOPASSWD: ALL" >/etc/sudoers.d/${USER} \
&& chmod 0440 /etc/sudoers.d/${USER}
# 4) Python deps
ENV PIP_NO_CACHE_DIR=1 PIP_DISABLE_PIP_VERSION_CHECK=1
# Model
# ADD asset_list/requirements.txt requirements.txt
# FASTAPI backend
ADD .devcontainer/requirements.txt requirements.txt
RUN pip install -r requirements.txt
# 5) Workdir
WORKDIR /workspaces/model
# 6) Make Python find your package
# Add project root to PYTHONPATH for all processes
ENV PYTHONPATH=/workspaces/model:${PYTHONPATH}

View file

@ -0,0 +1,31 @@
{
"name": "Basic Python",
"dockerComposeFile": "docker-compose.yml",
"service": "model",
"remoteUser": "vscode",
"workspaceFolder": "/workspaces/model",
"postStartCommand": "bash .devcontainer/post-install.sh",
"mounts": [
// Optional, just makes getting from Downloads (local env) easier
"source=${localEnv:HOME},target=/workspaces/home,type=bind"
],
"customizations": {
"vscode": {
"settings": {
"files.defaultWorkspace": "/workspaces/model"
},
"extensions": [
"ms-python.python",
"ms-toolsai.jupyter",
"mechatroner.rainbow-csv",
"ms-toolsai.datawrangler",
"lindacong.vscode-book-reader",
"4ops.terraform",
"fabiospampinato.vscode-todo-plus",
"jgclark.vscode-todo-highlight",
"corentinartaud.pdfpreview",
"ms-python.vscode-python-envs"
]
}
}
}

View file

@ -0,0 +1,18 @@
version: '3.8'
services:
model:
user: "${UID}:${GID}"
build:
context: ..
dockerfile: .devcontainer/Dockerfile
command: sleep infinity
volumes:
- ..:/workspaces/model
networks:
- model-net
networks:
model-net:
driver: bridge

View file

@ -0,0 +1,14 @@
mkdir -p ~/.ipython/profile_default/startup
cat << 'EOF' > ~/.ipython/profile_default/startup/00-load-env.py
from dotenv import load_dotenv
import os
# Adjust path as needed
env_path = "/workspaces/model/backend/.env"
if os.path.exists(env_path):
load_dotenv(env_path)
print("✔ Loaded .env into Jupyter kernel")
else:
print("⚠ No .env file found to load")
EOF

View file

@ -0,0 +1,17 @@
# fastapi
fastapi==0.115.2
sqlalchemy==2.0.36
pydantic-settings==2.6.0
psycopg2-binary==2.9.10
python-jose==3.3.0
cryptography==43.0.3
mangum==0.19.0
# AWS
boto3==1.35.44
# Data
openpyxl==3.1.2
# Basic
pytz
uvicorn[standard]
sqlmodel

19
.vscode/settings.json vendored Normal file
View file

@ -0,0 +1,19 @@
{
"jupyter.interactiveWindow.textEditor.executeSelection": true,
"python.REPL.sendToNativeREPL": true,
"notebook.output.scrolling": true,
"terminal.integrated.defaultProfile.linux": "bash",
"editor.rulers": [67],
"terminal.integrated.profiles.linux": {
"bash": {
"path": "/bin/bash"
}
},
// Hot reload setting that needs to be in user settings
// "jupyter.runStartupCommands": [
// "%load_ext autoreload", "%autoreload 2"
// ]
}

View file

@ -1,5 +1,7 @@
from functools import lru_cache
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
@ -35,9 +37,13 @@ class Settings(BaseSettings):
# Other S3 buckts
ENERGY_ASSESSMENTS_BUCKET: str
class Config:
env_file = "backend/.env"
# Optional AWS creds (only required in local)
AWS_ACCESS_KEY_ID: Optional[str] = None
AWS_SECRET_KEY_ID: Optional[str] = None
AWS_DEFAULT_REGION: Optional[str] = None
class Config:
env_file = "backend.env"
@lru_cache()
def get_settings():

View file

@ -1,5 +1,6 @@
from sqlalchemy import create_engine
from backend.app.config import get_settings
from sqlmodel import Session
connection_string = "postgresql+{drivername}://{username}:{password}@{server}:{port}/{dbname}"
db_string = connection_string.format(
@ -12,3 +13,8 @@ db_string = connection_string.format(
)
db_engine = create_engine(db_string, pool_size=5, max_overflow=5)
def get_db_session():
if db_engine is None:
raise RuntimeError("Database is not configured. Set DATABASE_URL in environment variables.")
return Session(db_engine)

View file

@ -0,0 +1,293 @@
from __future__ import annotations
# ---- Standard Library ----
from typing import Optional, Dict, Any
from datetime import datetime, timezone
from uuid import UUID
import json
# ---- SQLModel / SQLAlchemy ----
from sqlmodel import Session, select
# ---- DB Session ----
from backend.app.db.connection import get_db_session
# ---- Models ----
from backend.app.db.models.tasks import Task, SubTask
# ============================================================
# SubTask Interface
# ============================================================
class SubTaskInterface:
"""
CRUD operations for SubTask + cascading Task progress updates.
"""
# --------------------------------------------------------
# CREATE SUBTASK
# --------------------------------------------------------
def create_subtask(self, task_id: UUID, inputs: Optional[Dict[str, Any]] = None):
now = datetime.now(timezone.utc)
with get_db_session() as session:
task = session.get(Task, task_id)
if not task:
raise ValueError(f"Task {task_id} not found")
subtask = SubTask(
taskId=task_id,
inputs=json.dumps(inputs) if inputs else None,
status="waiting",
jobStarted=None,
jobCompleted=None,
)
session.add(subtask)
session.commit()
session.refresh(subtask)
# Recalculate parent task progress
self._update_task_progress(session, task_id)
return subtask
# --------------------------------------------------------
# UPDATE STATUS (in progress, complete, failed)
# --------------------------------------------------------
def update_subtask_status(self, subtask_id: UUID, status: str):
now = datetime.now(timezone.utc)
with get_db_session() as session:
subtask = session.get(SubTask, subtask_id)
if not subtask:
raise ValueError(f"SubTask {subtask_id} not found")
normalized = status.lower()
# When job really starts
if normalized == "in progress" and subtask.jobStarted is None:
subtask.jobStarted = now
# Completed or failed
if normalized in ("complete", "failed"):
subtask.jobCompleted = now
subtask.status = normalized
subtask.updatedAt = now
session.add(subtask)
session.commit()
# Recalculate task status
self._update_task_progress(session, subtask.taskId)
session.refresh(subtask)
return subtask
# --------------------------------------------------------
# UPDATE OUTPUTS
# --------------------------------------------------------
def update_subtask_output(self, subtask_id: UUID, outputs: Dict[str, Any]):
now = datetime.now(timezone.utc)
with get_db_session() as session:
subtask = session.get(SubTask, subtask_id)
if not subtask:
raise ValueError(f"SubTask {subtask_id} not found")
subtask.outputs = json.dumps(outputs)
subtask.updatedAt = now
session.add(subtask)
session.commit()
session.refresh(subtask)
return subtask
# --------------------------------------------------------
# UPDATE CLOUD LOGS URL
# --------------------------------------------------------
def update_subtask_logs(self, subtask_id: UUID, cloud_logs_url: str):
now = datetime.now(timezone.utc)
with get_db_session() as session:
subtask = session.get(SubTask, subtask_id)
if not subtask:
raise ValueError(f"SubTask {subtask_id} not found")
subtask.cloudLogsURL = cloud_logs_url
subtask.updatedAt = now
session.add(subtask)
session.commit()
session.refresh(subtask)
return subtask
# --------------------------------------------------------
# SET BOTH OUTPUT + LOGS
# --------------------------------------------------------
def set_subtask_result(
self,
subtask_id: UUID,
outputs: Optional[Dict[str, Any]] = None,
cloud_logs_url: Optional[str] = None,
):
now = datetime.now(timezone.utc)
with get_db_session() as session:
subtask = session.get(SubTask, subtask_id)
if not subtask:
raise ValueError(f"SubTask {subtask_id} not found")
if outputs is not None:
subtask.outputs = json.dumps(outputs)
if cloud_logs_url is not None:
subtask.cloudLogsURL = cloud_logs_url
subtask.updatedAt = now
session.add(subtask)
session.commit()
session.refresh(subtask)
return subtask
# --------------------------------------------------------
# TASK PROGRESS CALCULATION
# --------------------------------------------------------
def _update_task_progress(self, session: Session, task_id: UUID):
task = session.get(Task, task_id)
if not task:
return
subtasks = session.exec(
select(SubTask).where(SubTask.taskId == task_id)
).all()
statuses = [s.status.lower() for s in subtasks]
now = datetime.now(timezone.utc)
if "failed" in statuses:
task.status = "failed"
task.jobCompleted = now
elif all(s == "complete" for s in statuses):
task.status = "complete"
task.jobCompleted = now
elif "in progress" in statuses:
task.status = "in progress"
if task.jobStarted is None:
task.jobStarted = now
else:
# All waiting
task.status = "waiting"
task.jobStarted = None
task.jobCompleted = None
task.updatedAt = now
session.add(task)
session.commit()
def finalize_subtask(
self,
subtask_id: UUID,
status: str,
outputs: Optional[Dict[str, Any]],
cloud_logs_url: Optional[str]
):
now = datetime.now(timezone.utc)
with get_db_session() as session:
subtask = session.get(SubTask, subtask_id)
if not subtask:
raise ValueError(f"SubTask {subtask_id} not found")
normalized = status.lower()
if normalized not in ("complete", "failed"):
raise ValueError("Status must be 'complete' or 'failed'")
# Set outputs
if outputs is not None:
subtask.outputs = json.dumps(outputs)
# Set logs
if cloud_logs_url is not None:
subtask.cloudLogsURL = cloud_logs_url
# Status + timestamps
subtask.status = normalized
subtask.jobCompleted = now
subtask.updatedAt = now
session.add(subtask)
session.commit()
# Update parent task (complete/failed)
self._update_task_progress(session, subtask.taskId)
session.refresh(subtask)
return subtask
# ============================================================
# Task Interface
# ============================================================
class TasksInterface:
"""
High-level operations for Task records.
"""
def create_task(
self,
*,
task_source: str,
service: Optional[str] = None,
inputs: Optional[Dict[str, Any]] = None,
):
now = datetime.now(timezone.utc)
with get_db_session() as session:
task = Task(
taskSource=task_source,
service=service,
status="waiting",
jobStarted=None,
jobCompleted=None,
)
session.add(task)
session.commit()
session.refresh(task)
# Create first subtask in waiting state
subtask_interface = SubTaskInterface()
subtask = subtask_interface.create_subtask(
task_id=task.id,
inputs=inputs,
)
return task.id, subtask.id
def update_task_status(self, task_id: UUID, status: str):
now = datetime.now(timezone.utc)
with get_db_session() as session:
task = session.get(Task, task_id)
if not task:
raise ValueError(f"Task {task_id} not found")
normalized = status.lower()
if normalized == "in progress" and task.jobStarted is None:
task.jobStarted = now
if normalized == "complete":
task.jobCompleted = now
task.status = normalized
task.updatedAt = now
session.add(task)
session.commit()
session.refresh(task)
return task

View file

@ -0,0 +1,80 @@
from backend.app.db.connection import get_db_session
from backend.app.db.models.whlg import Whlg
def upsert_whlg_postcode(postcode: str):
"""
Manually upsert a postcode into the WHLG table.
No unique constraint is required.
"""
cleaned = postcode.lower().replace(" ", "")
with get_db_session() as session:
# Check if record exists
existing = session.query(Whlg).filter(Whlg.postcode == cleaned).first()
if existing:
return existing # nothing to update, just return it
# Insert a new row
record = Whlg(postcode=cleaned)
session.add(record)
session.commit()
session.refresh(record)
return record
# One time script to upload 400,000 records in one go with the pay
# of pandas and one insert
from backend.app.db.connection import get_db_session
from backend.app.db.models.whlg import Whlg
from sqlalchemy import select
from sqlalchemy.orm import Session
def upload_whlg_from_dataframe(df):
"""
FAST bulk insert of WHLG postcodes (400k+ rows).
No unique constraint needed.
"""
if "Postcode" not in df.columns:
raise ValueError("DataFrame must contain a 'Postcode' column")
# 1. Clean incoming postcodes
cleaned_postcodes = (
df["Postcode"]
.astype(str)
.str.lower()
.str.replace(" ", "", regex=False)
.dropna()
.unique()
.tolist()
)
with get_db_session() as session:
# 2. Fetch existing postcodes once (VERY FAST)
existing = session.exec(select(Whlg.postcode)).all()
existing_set = set(existing)
# 3. Determine which are new
new_postcodes = [
pc for pc in cleaned_postcodes if pc not in existing_set
]
if not new_postcodes:
return {"inserted": 0, "skipped_existing": len(cleaned_postcodes)}
# 4. Bulk insert new postcodes in one shot
objects = [Whlg(postcode=pc) for pc in new_postcodes]
session.bulk_save_objects(objects)
session.commit()
return {
"inserted": len(new_postcodes),
"skipped_existing": len(cleaned_postcodes) - len(new_postcodes),
"total_provided": len(cleaned_postcodes)
}

View file

@ -0,0 +1,73 @@
from __future__ import annotations
from typing import Optional, List
from datetime import datetime
from uuid import UUID, uuid4
from sqlmodel import SQLModel, Field, Relationship
class Task(SQLModel, table=True):
__tablename__ = "tasks"
id: UUID = Field(
default_factory=uuid4,
primary_key=True,
index=True,
)
taskSource: str = Field(alias="task_source")
jobStarted: Optional[datetime] = Field(
default=None, alias="job_started"
)
jobCompleted: Optional[datetime] = Field(
default=None, alias="job_completed"
)
status: str = Field(default="In Progress")
service: Optional[str] = None
updatedAt: datetime = Field(
default_factory=datetime.utcnow,
alias="updated_at",
)
# Relationship
subTasks: List["SubTask"] = Relationship(back_populates="task")
class SubTask(SQLModel, table=True):
__tablename__ = "sub_task"
id: UUID = Field(
default_factory=uuid4,
primary_key=True,
index=True,
)
taskId: UUID = Field(
foreign_key="tasks.id",
alias="task_id",
)
jobStarted: Optional[datetime] = Field(
default=None, alias="job_started"
)
jobCompleted: Optional[datetime] = Field(
default=None, alias="job_completed"
)
status: str = Field(default="In Progress")
inputs: Optional[str] = None
outputs: Optional[str] = None
cloudLogsURL: Optional[str] = Field(alias="cloud_logs_url")
updatedAt: datetime = Field(
default_factory=datetime.utcnow,
alias="updated_at",
)
# Relationship
task: Optional[Task] = Relationship(back_populates="subTasks")

View file

@ -0,0 +1,15 @@
import uuid
from typing import Optional
from sqlmodel import SQLModel, Field
class Whlg(SQLModel, table=True):
__tablename__ = "whlg"
id: Optional[int] = Field(
default=None,
primary_key=True,
index=True,
)
postcode: str = Field(nullable=False)

View file

@ -31,6 +31,11 @@ def create_dummy_token(secret: str) -> str:
return token
@router.get("/")
async def dummy_token():
return {"hello": "world"}
@router.get("/dummy-token")
async def dummy_token():
settings = get_settings()

View file

@ -6,14 +6,19 @@ from fastapi.encoders import jsonable_encoder
from starlette.exceptions import HTTPException as StarletteHTTPException
from mangum import Mangum
from backend.app.portfolio import router as portfolio_router
from backend.app.whlg import router as whlg_router
from backend.app.plan import router as plan_router
from backend.app.tasks import router as tasks_router
from backend.app.dependencies import validate_api_key
from backend.app.config import get_settings
logger = logging.getLogger("uvicorn.error")
logging.basicConfig(level=logging.INFO)
app = FastAPI(dependencies=[Depends(validate_api_key)])
if get_settings().ENVIRONMENT == "local":
app = FastAPI()
else:
app = FastAPI(dependencies=[Depends(validate_api_key)])
# Handle 422 errors (validation failures)
@ -52,10 +57,76 @@ async def log_requests(request: Request, call_next):
app.include_router(portfolio_router.router, prefix="/v1")
app.include_router(plan_router.router, prefix="/v1")
app.include_router(whlg_router.router, prefix="/v1")
app.include_router(tasks_router.router, prefix="/v1")
if get_settings().ENVIRONMENT == "local":
from app.local import router as local_router
app.include_router(local_router.router)
handler = Mangum(app)
import logging
from fastapi.responses import JSONResponse
from fastapi import FastAPI, Depends, Request, status
from fastapi.exceptions import RequestValidationError
from fastapi.encoders import jsonable_encoder
from starlette.exceptions import HTTPException as StarletteHTTPException
from mangum import Mangum
from backend.app.portfolio import router as portfolio_router
from backend.app.whlg import router as whlg_router
from backend.app.plan import router as plan_router
from backend.app.dependencies import validate_api_key
from backend.app.config import get_settings
logger = logging.getLogger("uvicorn.error")
logging.basicConfig(level=logging.INFO)
if get_settings().ENVIRONMENT == "local":
app = FastAPI()
else:
app = FastAPI(dependencies=[Depends(validate_api_key)])
# Handle 422 errors (validation failures)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
logger.error(f"422 Validation Error at {request.url}")
logger.error(f"Body: {exc.body}")
logger.error(f"Validation Errors: {exc.errors()}")
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content=jsonable_encoder({
"detail": exc.errors(),
"body": exc.body
}),
)
# Handle generic HTTP exceptions (optional, useful for catching 404, 403, etc.)
@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(request: Request, exc: StarletteHTTPException):
logger.warning(f"{exc.status_code} Error at {request.url} - Detail: {exc.detail}")
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.detail},
)
# Middleware to log requests
@app.middleware("http")
async def log_requests(request: Request, call_next):
logger.info(f"Incoming request: {request.method} {request.url}")
response = await call_next(request)
logger.info(f"Response status: {response.status_code}")
return response
app.include_router(portfolio_router.router, prefix="/v1")
app.include_router(plan_router.router, prefix="/v1")
app.include_router(whlg_router.router, prefix="/v1")
if get_settings().ENVIRONMENT == "local":
from app.local import router as local_router
app.include_router(local_router.router)
handler = Mangum(app)

View file

@ -12,3 +12,5 @@ boto3==1.35.44
openpyxl==3.1.2
# Basic
pytz
sqlmodel

View file

189
backend/app/tasks/router.py Normal file
View file

@ -0,0 +1,189 @@
from fastapi import APIRouter, Depends, HTTPException
from uuid import UUID
import json # ← REQUIRED for json.loads
from backend.app.dependencies import validate_token
from backend.app.tasks.schema import (
CreateTaskRequest,
UpdateTaskStatusRequest,
CreateSubTaskRequest,
UpdateSubTaskStatusRequest,
FinalizeSubTaskRequest,
TaskSqsTriggerRequest
)
# Correct location of interfaces
from backend.app.db.functions.tasks.Tasks import TasksInterface, SubTaskInterface
from backend.app.db.connection import get_db_session
from backend.app.db.models.tasks import Task, SubTask
from sqlmodel import select
router = APIRouter(
prefix="/tasks",
tags=["tasks"],
dependencies=[Depends(validate_token)],
)
# ============================================================
# Create Task
# ============================================================
@router.post("/", summary="Create a new task and its first subtask")
async def create_task(req: CreateTaskRequest):
tasks = TasksInterface()
task_id, subtask_id = tasks.create_task(
task_source=req.task_source,
service=req.service,
inputs=req.inputs,
)
return {"task_id": task_id, "subtask_id": subtask_id}
# ============================================================
# Get Task + Subtasks
# ============================================================
@router.get("/{task_id}", summary="Get a task and its subtasks")
async def get_task(task_id: UUID):
with get_db_session() as session:
task = session.get(Task, task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
subtasks = session.exec(
select(SubTask).where(SubTask.taskId == task_id)
).all()
formatted = []
for st in subtasks:
formatted.append({
**st.dict(),
"inputs": json.loads(st.inputs) if st.inputs else None,
"outputs": json.loads(st.outputs) if st.outputs else None,
"cloud_logs_url": st.cloudLogsURL,
})
return {
"task": task,
"subtasks": formatted,
}
# ============================================================
# Update Task Status
# ============================================================
@router.put("/{task_id}/status", summary="Update a task's status")
async def update_task_status(task_id: UUID, req: UpdateTaskStatusRequest):
tasks = TasksInterface()
try:
updated = tasks.update_task_status(task_id, req.status)
return {"task_id": updated.id, "status": updated.status}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
# ============================================================
# Create Additional Subtask
# ============================================================
@router.post("/{task_id}/subtasks", summary="Create a new subtask under a task")
async def create_subtask(task_id: UUID, req: CreateSubTaskRequest):
subtasks = SubTaskInterface()
try:
st = subtasks.create_subtask(task_id, req.inputs)
return {"subtask_id": st.id, "task_id": task_id, "status": st.status}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
# ============================================================
# Update Subtask Status
# ============================================================
@router.put("/subtask/{subtask_id}/status", summary="Update a subtask's status")
async def update_subtask_status(subtask_id: UUID, req: UpdateSubTaskStatusRequest):
subtasks = SubTaskInterface()
try:
st = subtasks.update_subtask_status(subtask_id, req.status)
return {"subtask_id": st.id, "status": st.status}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
# ===
# Sub task is complete
@router.post("/subtask/{subtask_id}/finalize", summary="Finalize a subtask with status, outputs, logs")
async def finalize_subtask(subtask_id: UUID, req: FinalizeSubTaskRequest):
subtasks = SubTaskInterface()
try:
st = subtasks.finalize_subtask(
subtask_id=subtask_id,
status=req.status,
outputs=req.outputs,
cloud_logs_url=req.cloud_logs_url
)
return {
"subtask_id": st.id,
"status": st.status,
"outputs": req.outputs,
"cloud_logs_url": req.cloud_logs_url,
}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# for testing:
import boto3
import json
from backend.app.tasks.schema import TaskSqsTriggerRequest
from backend.app.db.functions.tasks.Tasks import TasksInterface, SubTaskInterface
from backend.app.config import get_settings
sqs = boto3.client("sqs")
@router.post("/trigger", summary="Create task + subtask and publish to SQS", status_code=202)
async def trigger_task(req: TaskSqsTriggerRequest):
"""
Creates a Task + SubTask, then pushes the SubTask into SQS so a Lambda can process it.
If inputs are empty, automatically replaced with {}.
"""
settings = get_settings()
tasks = TasksInterface()
# ---- Normalize empty inputs ----
inputs = req.inputs or {} # ensures {} even if null
# ---- 1. Create Task + SubTask ----
task_id, subtask_id = tasks.create_task(
task_source=req.task_source,
service=req.service,
inputs=inputs,
)
# ---- 2. Prepare SQS payload ----
sqs_payload = {
"subtask_id": str(subtask_id),
"params": inputs,
}
try:
response = sqs.send_message(
QueueUrl=f"https://sqs.{settings.AWS_REGION}.amazonaws.com/"
f"{settings.AWS_ACCOUNT_ID}/lambda-example-queue",
MessageBody=json.dumps(sqs_payload)
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"SQS error: {e}")
return {
"message": "Task triggered",
"task_id": task_id,
"subtask_id": subtask_id,
"sqs_message_id": response.get("MessageId"),
"inputs_sent": inputs,
}

View file

@ -0,0 +1,31 @@
from typing import Optional, Any, Dict
from uuid import UUID
from pydantic import BaseModel
class CreateTaskRequest(BaseModel):
task_source: str
service: Optional[str] = None
inputs: Optional[Dict[str, Any]] = None # JSON object
class UpdateTaskStatusRequest(BaseModel):
status: str
class CreateSubTaskRequest(BaseModel):
inputs: Optional[Dict[str, Any]] = None # JSON object
class UpdateSubTaskStatusRequest(BaseModel):
status: str
class FinalizeSubTaskRequest(BaseModel):
status: str # "complete" or "failed"
outputs: Optional[Dict[str, Any]] = None
cloud_logs_url: Optional[str] = None
class TaskSqsTriggerRequest(BaseModel):
task_source: str
service: Optional[str] = None
inputs: Dict[str, Any] # forwarded into SubTask.inputs + SQS message

View file

@ -1,47 +0,0 @@
import boto3
import json
import math
import asyncio
import random
from datetime import datetime
from fastapi import APIRouter, Depends
from backend.app.dependencies import validate_token
from backend.app.plan.schemas import PlanTriggerRequest
from backend.app.config import get_settings
from sqlalchemy.orm import sessionmaker
from utils.logger import setup_logger
from backend.app.db.connection import db_engine
from backend.app.db.functions.recommendations_functions import create_scenario
logger = setup_logger()
router = APIRouter(
prefix="/whlg",
tags=["whlg"],
dependencies=[Depends(validate_token)],
responses={404: {"description": "Not found"}}
)
@router.post("/")
async def whlg_entrypoint(body):
# body needs to include postcode, UPRN [task ID?]
#
# Refer to the plan trigger route for code
# 1) Create an event schema and store it in the schemas file
# 2) Build the tasks functions
# 3) Read in the funding csx. This can be found as such:
# whlg_eligible_postcodes = read_csv_from_s3(
# bucket_name=get_settings().DATA_BUCKET,
# filepath="funding/whlg eligible postcodes.csv",
# )
# whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes)
# Check the postcode against this file
# We need to store this somewhere????!!!??!??!?!?!?!??!??!??!??!??!??!??!??!??!??! Create a new table!
# Update subtask to be complete
# Once this is complete, build the logs stuff, add the cloudwatch logs ID to the database
print("We're gonna do stuff!")

View file

@ -0,0 +1,78 @@
import boto3
import json
import math
import asyncio
import random
from datetime import datetime
from fastapi import APIRouter, Depends
from backend.app.dependencies import validate_token
from backend.app.plan.schemas import PlanTriggerRequest
from backend.app.config import get_settings
from sqlalchemy.orm import sessionmaker
from utils.logger import setup_logger
from backend.app.db.connection import db_engine
from backend.app.db.functions.recommendations_functions import create_scenario
import pandas as pd
from backend.app.whlg.schema import WHLGElligibilityRequest
from utils.s3 import read_csv_from_s3
from sqlalchemy.dialects.postgresql import insert
from backend.app.db.connection import get_db_session
from backend.app.db.models.whlg import Whlg
from backend.app.db.functions.whlg_functions import upsert_whlg_postcode
logger = setup_logger()
if get_settings().ENVIRONMENT == "local":
router = APIRouter(
prefix="/whlg",
tags=["whlg"],
)
else:
router = APIRouter(
prefix="/whlg",
tags=["whlg"],
dependencies=[Depends(validate_token)],
responses={404: {"description": "Not found"}}
)
@router.get("/")
async def whlg_entrypoint():
# body needs to include postcode, UPRN [task ID?]
#
# Refer to the plan trigger route for code
# 1) Create an event schema and store it in the schemas file
# 2) Build the tasks functions
# 3) Read in the funding csx. This can be found as such:
# whlg_eligible_postcodes = read_csv_from_s3(
# bucket_name=get_settings().DATA_BUCKET,
# filepath="funding/whlg eligible postcodes.csv",
# )
# whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes)
# Check the postcode against this file
# We need to store this somewhere????!!!??!??!?!?!?!??!??!??!??!??!??!??!??!??!??! Create a new table!
# Update subtask to be complete
# Once this is complete, build the logs stuff, add the cloudwatch logs ID to the database
return {"hello": "from whlg"}
@router.post("/eligible")
async def eligiable(body: WHLGElligibilityRequest):
postcode = body.postcode or ""
postcode = postcode.lower().replace(" ", "")
whlg_eligible_postcodes = read_csv_from_s3(
bucket_name=get_settings().DATA_BUCKET,
filepath="funding/whlg eligible postcodes.csv",
)
whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes)
whlg_eligible_postcodes['Postcode'] = whlg_eligible_postcodes['Postcode'].str.replace(' ', '', regex=False)
is_eligible = postcode in whlg_eligible_postcodes['Postcode'].values
return {"whlg_eligible": is_eligible}

View file

@ -0,0 +1,4 @@
from pydantic import BaseModel, Field
class WHLGElligibilityRequest(BaseModel):
postcode: str = Field(..., example="B93 8SY")

11
backend/run_curl.sh Normal file
View file

@ -0,0 +1,11 @@
curl -X POST "http://localhost:8000/v1/whlg/eligible" \
-H "Content-Type: application/json" \
-d '{"postcode": "B93 8SY"}'
curl -X POST "http://localhost:8000/v1/whlg/eligible" \
-H "Content-Type: application/json" \
-d '{"postcode": "BN15 0FD"}'
curl -X POST "http://localhost:8000/v1/whlg/eligible" \
-H "Content-Type: application/json" \
-d '{"postcode": "DY6 0LB"}'

6
backend/run_local.sh Normal file
View file

@ -0,0 +1,6 @@
set -a
source ./.env
set +a
uvicorn app.main:app --reload