diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile new file mode 100644 index 00000000..56c366f4 --- /dev/null +++ b/.devcontainer/Dockerfile @@ -0,0 +1,40 @@ +FROM python:3.12-bullseye + +ARG USER=vscode +ARG DEBIAN_FRONTEND=noninteractive + +# 1) Toolchain + utilities for building libpostal +RUN apt-get update && apt-get install -y --no-install-recommends \ + sudo jq vim curl git ca-certificates \ + build-essential pkg-config automake autoconf libtool \ + && rm -rf /var/lib/apt/lists/* + +# # 2) Build and install libpostal from source +# RUN git clone --depth 1 https://github.com/openvenues/libpostal /tmp/libpostal \ +# && cd /tmp/libpostal \ +# && ./bootstrap.sh \ +# && ./configure --datadir=/usr/local/share/libpostal \ +# && make -j"$(nproc)" \ +# && make install \ +# && ldconfig \ +# && rm -rf /tmp/libpostal + +# 3) Create the user and grant sudo privileges +RUN useradd -m -s /usr/bin/bash ${USER} \ + && echo "${USER} ALL=(ALL) NOPASSWD: ALL" >/etc/sudoers.d/${USER} \ + && chmod 0440 /etc/sudoers.d/${USER} + +# 4) Python deps +ENV PIP_NO_CACHE_DIR=1 PIP_DISABLE_PIP_VERSION_CHECK=1 +# Model +# ADD asset_list/requirements.txt requirements.txt +# FASTAPI backend +ADD .devcontainer/requirements.txt requirements.txt +RUN pip install -r requirements.txt + +# 5) Workdir +WORKDIR /workspaces/model + +# 6) Make Python find your package +# Add project root to PYTHONPATH for all processes +ENV PYTHONPATH=/workspaces/model:${PYTHONPATH} diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 00000000..91a76c3d --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,31 @@ +{ + "name": "Basic Python", + "dockerComposeFile": "docker-compose.yml", + "service": "model", + "remoteUser": "vscode", + "workspaceFolder": "/workspaces/model", + "postStartCommand": "bash .devcontainer/post-install.sh", + "mounts": [ + // Optional, just makes getting from Downloads (local env) easier + "source=${localEnv:HOME},target=/workspaces/home,type=bind" + ], + "customizations": { + "vscode": { + "settings": { + "files.defaultWorkspace": "/workspaces/model" + }, + "extensions": [ + "ms-python.python", + "ms-toolsai.jupyter", + "mechatroner.rainbow-csv", + "ms-toolsai.datawrangler", + "lindacong.vscode-book-reader", + "4ops.terraform", + "fabiospampinato.vscode-todo-plus", + "jgclark.vscode-todo-highlight", + "corentinartaud.pdfpreview", + "ms-python.vscode-python-envs" + ] + } + } +} diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml new file mode 100644 index 00000000..7f60d34d --- /dev/null +++ b/.devcontainer/docker-compose.yml @@ -0,0 +1,18 @@ +version: '3.8' + +services: + model: + user: "${UID}:${GID}" + build: + context: .. + dockerfile: .devcontainer/Dockerfile + command: sleep infinity + volumes: + - ..:/workspaces/model + networks: + - model-net + +networks: + model-net: + driver: bridge + diff --git a/.devcontainer/post-install.sh b/.devcontainer/post-install.sh new file mode 100644 index 00000000..dc6da006 --- /dev/null +++ b/.devcontainer/post-install.sh @@ -0,0 +1,14 @@ +mkdir -p ~/.ipython/profile_default/startup + +cat << 'EOF' > ~/.ipython/profile_default/startup/00-load-env.py +from dotenv import load_dotenv +import os + +# Adjust path as needed +env_path = "/workspaces/model/backend/.env" +if os.path.exists(env_path): + load_dotenv(env_path) + print("✔ Loaded .env into Jupyter kernel") +else: + print("⚠ No .env file found to load") +EOF \ No newline at end of file diff --git a/.devcontainer/requirements.txt b/.devcontainer/requirements.txt new file mode 100644 index 00000000..d8c51f19 --- /dev/null +++ b/.devcontainer/requirements.txt @@ -0,0 +1,17 @@ +# fastapi +fastapi==0.115.2 +sqlalchemy==2.0.36 +pydantic-settings==2.6.0 +psycopg2-binary==2.9.10 +python-jose==3.3.0 +cryptography==43.0.3 +mangum==0.19.0 +# AWS +boto3==1.35.44 +# Data +openpyxl==3.1.2 +# Basic +pytz +uvicorn[standard] +sqlmodel + diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 00000000..27782c10 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,19 @@ +{ + "jupyter.interactiveWindow.textEditor.executeSelection": true, + "python.REPL.sendToNativeREPL": true, + "notebook.output.scrolling": true, + "terminal.integrated.defaultProfile.linux": "bash", + "editor.rulers": [67], + "terminal.integrated.profiles.linux": { + "bash": { + "path": "/bin/bash" + } + }, + + // Hot reload setting that needs to be in user settings + // "jupyter.runStartupCommands": [ + // "%load_ext autoreload", "%autoreload 2" + // ] + + +} \ No newline at end of file diff --git a/backend/app/config.py b/backend/app/config.py index b53d5223..98e1c447 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -1,5 +1,7 @@ from functools import lru_cache from pydantic_settings import BaseSettings +from typing import Optional + class Settings(BaseSettings): @@ -35,9 +37,13 @@ class Settings(BaseSettings): # Other S3 buckts ENERGY_ASSESSMENTS_BUCKET: str - class Config: - env_file = "backend/.env" + # Optional AWS creds (only required in local) + AWS_ACCESS_KEY_ID: Optional[str] = None + AWS_SECRET_KEY_ID: Optional[str] = None + AWS_DEFAULT_REGION: Optional[str] = None + class Config: + env_file = "backend.env" @lru_cache() def get_settings(): diff --git a/backend/app/db/connection.py b/backend/app/db/connection.py index 9efdfd25..fbec9102 100644 --- a/backend/app/db/connection.py +++ b/backend/app/db/connection.py @@ -1,5 +1,6 @@ from sqlalchemy import create_engine from backend.app.config import get_settings +from sqlmodel import Session connection_string = "postgresql+{drivername}://{username}:{password}@{server}:{port}/{dbname}" db_string = connection_string.format( @@ -12,3 +13,8 @@ db_string = connection_string.format( ) db_engine = create_engine(db_string, pool_size=5, max_overflow=5) + +def get_db_session(): + if db_engine is None: + raise RuntimeError("Database is not configured. Set DATABASE_URL in environment variables.") + return Session(db_engine) diff --git a/backend/app/db/functions/tasks/Tasks.py b/backend/app/db/functions/tasks/Tasks.py new file mode 100644 index 00000000..06e1c6fe --- /dev/null +++ b/backend/app/db/functions/tasks/Tasks.py @@ -0,0 +1,293 @@ +from __future__ import annotations + +# ---- Standard Library ---- +from typing import Optional, Dict, Any +from datetime import datetime, timezone +from uuid import UUID +import json + +# ---- SQLModel / SQLAlchemy ---- +from sqlmodel import Session, select + +# ---- DB Session ---- +from backend.app.db.connection import get_db_session + +# ---- Models ---- +from backend.app.db.models.tasks import Task, SubTask + + +# ============================================================ +# SubTask Interface +# ============================================================ +class SubTaskInterface: + """ + CRUD operations for SubTask + cascading Task progress updates. + """ + + # -------------------------------------------------------- + # CREATE SUBTASK + # -------------------------------------------------------- + def create_subtask(self, task_id: UUID, inputs: Optional[Dict[str, Any]] = None): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + task = session.get(Task, task_id) + if not task: + raise ValueError(f"Task {task_id} not found") + + subtask = SubTask( + taskId=task_id, + inputs=json.dumps(inputs) if inputs else None, + status="waiting", + jobStarted=None, + jobCompleted=None, + ) + + session.add(subtask) + session.commit() + session.refresh(subtask) + + # Recalculate parent task progress + self._update_task_progress(session, task_id) + return subtask + + # -------------------------------------------------------- + # UPDATE STATUS (in progress, complete, failed) + # -------------------------------------------------------- + def update_subtask_status(self, subtask_id: UUID, status: str): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + subtask = session.get(SubTask, subtask_id) + if not subtask: + raise ValueError(f"SubTask {subtask_id} not found") + + normalized = status.lower() + + # When job really starts + if normalized == "in progress" and subtask.jobStarted is None: + subtask.jobStarted = now + + # Completed or failed + if normalized in ("complete", "failed"): + subtask.jobCompleted = now + + subtask.status = normalized + subtask.updatedAt = now + + session.add(subtask) + session.commit() + + # Recalculate task status + self._update_task_progress(session, subtask.taskId) + + session.refresh(subtask) + return subtask + + # -------------------------------------------------------- + # UPDATE OUTPUTS + # -------------------------------------------------------- + def update_subtask_output(self, subtask_id: UUID, outputs: Dict[str, Any]): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + subtask = session.get(SubTask, subtask_id) + if not subtask: + raise ValueError(f"SubTask {subtask_id} not found") + + subtask.outputs = json.dumps(outputs) + subtask.updatedAt = now + + session.add(subtask) + session.commit() + session.refresh(subtask) + return subtask + + # -------------------------------------------------------- + # UPDATE CLOUD LOGS URL + # -------------------------------------------------------- + def update_subtask_logs(self, subtask_id: UUID, cloud_logs_url: str): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + subtask = session.get(SubTask, subtask_id) + if not subtask: + raise ValueError(f"SubTask {subtask_id} not found") + + subtask.cloudLogsURL = cloud_logs_url + subtask.updatedAt = now + + session.add(subtask) + session.commit() + session.refresh(subtask) + return subtask + + # -------------------------------------------------------- + # SET BOTH OUTPUT + LOGS + # -------------------------------------------------------- + def set_subtask_result( + self, + subtask_id: UUID, + outputs: Optional[Dict[str, Any]] = None, + cloud_logs_url: Optional[str] = None, + ): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + subtask = session.get(SubTask, subtask_id) + if not subtask: + raise ValueError(f"SubTask {subtask_id} not found") + + if outputs is not None: + subtask.outputs = json.dumps(outputs) + + if cloud_logs_url is not None: + subtask.cloudLogsURL = cloud_logs_url + + subtask.updatedAt = now + session.add(subtask) + session.commit() + session.refresh(subtask) + return subtask + + # -------------------------------------------------------- + # TASK PROGRESS CALCULATION + # -------------------------------------------------------- + def _update_task_progress(self, session: Session, task_id: UUID): + task = session.get(Task, task_id) + if not task: + return + + subtasks = session.exec( + select(SubTask).where(SubTask.taskId == task_id) + ).all() + + statuses = [s.status.lower() for s in subtasks] + now = datetime.now(timezone.utc) + + if "failed" in statuses: + task.status = "failed" + task.jobCompleted = now + + elif all(s == "complete" for s in statuses): + task.status = "complete" + task.jobCompleted = now + + elif "in progress" in statuses: + task.status = "in progress" + if task.jobStarted is None: + task.jobStarted = now + + else: + # All waiting + task.status = "waiting" + task.jobStarted = None + task.jobCompleted = None + + task.updatedAt = now + session.add(task) + session.commit() + + def finalize_subtask( + self, + subtask_id: UUID, + status: str, + outputs: Optional[Dict[str, Any]], + cloud_logs_url: Optional[str] + ): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + subtask = session.get(SubTask, subtask_id) + if not subtask: + raise ValueError(f"SubTask {subtask_id} not found") + + normalized = status.lower() + if normalized not in ("complete", "failed"): + raise ValueError("Status must be 'complete' or 'failed'") + + # Set outputs + if outputs is not None: + subtask.outputs = json.dumps(outputs) + + # Set logs + if cloud_logs_url is not None: + subtask.cloudLogsURL = cloud_logs_url + + # Status + timestamps + subtask.status = normalized + subtask.jobCompleted = now + subtask.updatedAt = now + + session.add(subtask) + session.commit() + + # Update parent task (complete/failed) + self._update_task_progress(session, subtask.taskId) + + session.refresh(subtask) + return subtask + + +# ============================================================ +# Task Interface +# ============================================================ +class TasksInterface: + """ + High-level operations for Task records. + """ + + def create_task( + self, + *, + task_source: str, + service: Optional[str] = None, + inputs: Optional[Dict[str, Any]] = None, + ): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + task = Task( + taskSource=task_source, + service=service, + status="waiting", + jobStarted=None, + jobCompleted=None, + ) + + session.add(task) + session.commit() + session.refresh(task) + + # Create first subtask in waiting state + subtask_interface = SubTaskInterface() + subtask = subtask_interface.create_subtask( + task_id=task.id, + inputs=inputs, + ) + + return task.id, subtask.id + + def update_task_status(self, task_id: UUID, status: str): + now = datetime.now(timezone.utc) + + with get_db_session() as session: + task = session.get(Task, task_id) + if not task: + raise ValueError(f"Task {task_id} not found") + + normalized = status.lower() + + if normalized == "in progress" and task.jobStarted is None: + task.jobStarted = now + + if normalized == "complete": + task.jobCompleted = now + + task.status = normalized + task.updatedAt = now + + session.add(task) + session.commit() + session.refresh(task) + return task diff --git a/backend/app/db/functions/whlg_functions.py b/backend/app/db/functions/whlg_functions.py new file mode 100644 index 00000000..e318d004 --- /dev/null +++ b/backend/app/db/functions/whlg_functions.py @@ -0,0 +1,80 @@ +from backend.app.db.connection import get_db_session +from backend.app.db.models.whlg import Whlg + + +def upsert_whlg_postcode(postcode: str): + """ + Manually upsert a postcode into the WHLG table. + No unique constraint is required. + """ + + cleaned = postcode.lower().replace(" ", "") + + with get_db_session() as session: + # Check if record exists + existing = session.query(Whlg).filter(Whlg.postcode == cleaned).first() + + if existing: + return existing # nothing to update, just return it + + # Insert a new row + record = Whlg(postcode=cleaned) + session.add(record) + session.commit() + session.refresh(record) + + return record + + +# One time script to upload 400,000 records in one go with the pay +# of pandas and one insert +from backend.app.db.connection import get_db_session +from backend.app.db.models.whlg import Whlg +from sqlalchemy import select +from sqlalchemy.orm import Session + + +def upload_whlg_from_dataframe(df): + """ + FAST bulk insert of WHLG postcodes (400k+ rows). + No unique constraint needed. + """ + + if "Postcode" not in df.columns: + raise ValueError("DataFrame must contain a 'Postcode' column") + + # 1. Clean incoming postcodes + cleaned_postcodes = ( + df["Postcode"] + .astype(str) + .str.lower() + .str.replace(" ", "", regex=False) + .dropna() + .unique() + .tolist() + ) + + with get_db_session() as session: + # 2. Fetch existing postcodes once (VERY FAST) + existing = session.exec(select(Whlg.postcode)).all() + existing_set = set(existing) + + # 3. Determine which are new + new_postcodes = [ + pc for pc in cleaned_postcodes if pc not in existing_set + ] + + if not new_postcodes: + return {"inserted": 0, "skipped_existing": len(cleaned_postcodes)} + + # 4. Bulk insert new postcodes in one shot + objects = [Whlg(postcode=pc) for pc in new_postcodes] + + session.bulk_save_objects(objects) + session.commit() + + return { + "inserted": len(new_postcodes), + "skipped_existing": len(cleaned_postcodes) - len(new_postcodes), + "total_provided": len(cleaned_postcodes) + } diff --git a/backend/app/db/models/tasks.py b/backend/app/db/models/tasks.py new file mode 100644 index 00000000..d8007dcd --- /dev/null +++ b/backend/app/db/models/tasks.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +from typing import Optional, List +from datetime import datetime +from uuid import UUID, uuid4 + +from sqlmodel import SQLModel, Field, Relationship + + +class Task(SQLModel, table=True): + __tablename__ = "tasks" + + id: UUID = Field( + default_factory=uuid4, + primary_key=True, + index=True, + ) + + taskSource: str = Field(alias="task_source") + + jobStarted: Optional[datetime] = Field( + default=None, alias="job_started" + ) + jobCompleted: Optional[datetime] = Field( + default=None, alias="job_completed" + ) + + status: str = Field(default="In Progress") + service: Optional[str] = None + + updatedAt: datetime = Field( + default_factory=datetime.utcnow, + alias="updated_at", + ) + + # Relationship + subTasks: List["SubTask"] = Relationship(back_populates="task") + + +class SubTask(SQLModel, table=True): + __tablename__ = "sub_task" + + id: UUID = Field( + default_factory=uuid4, + primary_key=True, + index=True, + ) + + taskId: UUID = Field( + foreign_key="tasks.id", + alias="task_id", + ) + + jobStarted: Optional[datetime] = Field( + default=None, alias="job_started" + ) + jobCompleted: Optional[datetime] = Field( + default=None, alias="job_completed" + ) + + status: str = Field(default="In Progress") + + inputs: Optional[str] = None + outputs: Optional[str] = None + cloudLogsURL: Optional[str] = Field(alias="cloud_logs_url") + + updatedAt: datetime = Field( + default_factory=datetime.utcnow, + alias="updated_at", + ) + + # Relationship + task: Optional[Task] = Relationship(back_populates="subTasks") diff --git a/backend/app/db/models/whlg.py b/backend/app/db/models/whlg.py new file mode 100644 index 00000000..29d907e4 --- /dev/null +++ b/backend/app/db/models/whlg.py @@ -0,0 +1,15 @@ +import uuid +from typing import Optional +from sqlmodel import SQLModel, Field + + +class Whlg(SQLModel, table=True): + __tablename__ = "whlg" + + id: Optional[int] = Field( + default=None, + primary_key=True, + index=True, + ) + + postcode: str = Field(nullable=False) \ No newline at end of file diff --git a/backend/app/local/router.py b/backend/app/local/router.py index 4ebb490c..0977be04 100644 --- a/backend/app/local/router.py +++ b/backend/app/local/router.py @@ -31,6 +31,11 @@ def create_dummy_token(secret: str) -> str: return token +@router.get("/") +async def dummy_token(): + return {"hello": "world"} + + @router.get("/dummy-token") async def dummy_token(): settings = get_settings() diff --git a/backend/app/main.py b/backend/app/main.py index de6f0795..f0ab4d86 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -6,14 +6,19 @@ from fastapi.encoders import jsonable_encoder from starlette.exceptions import HTTPException as StarletteHTTPException from mangum import Mangum from backend.app.portfolio import router as portfolio_router +from backend.app.whlg import router as whlg_router from backend.app.plan import router as plan_router +from backend.app.tasks import router as tasks_router from backend.app.dependencies import validate_api_key from backend.app.config import get_settings logger = logging.getLogger("uvicorn.error") logging.basicConfig(level=logging.INFO) -app = FastAPI(dependencies=[Depends(validate_api_key)]) +if get_settings().ENVIRONMENT == "local": + app = FastAPI() +else: + app = FastAPI(dependencies=[Depends(validate_api_key)]) # Handle 422 errors (validation failures) @@ -52,10 +57,76 @@ async def log_requests(request: Request, call_next): app.include_router(portfolio_router.router, prefix="/v1") app.include_router(plan_router.router, prefix="/v1") +app.include_router(whlg_router.router, prefix="/v1") +app.include_router(tasks_router.router, prefix="/v1") + +if get_settings().ENVIRONMENT == "local": + from app.local import router as local_router + app.include_router(local_router.router) + +handler = Mangum(app) +import logging +from fastapi.responses import JSONResponse +from fastapi import FastAPI, Depends, Request, status +from fastapi.exceptions import RequestValidationError +from fastapi.encoders import jsonable_encoder +from starlette.exceptions import HTTPException as StarletteHTTPException +from mangum import Mangum +from backend.app.portfolio import router as portfolio_router +from backend.app.whlg import router as whlg_router +from backend.app.plan import router as plan_router +from backend.app.dependencies import validate_api_key +from backend.app.config import get_settings + +logger = logging.getLogger("uvicorn.error") +logging.basicConfig(level=logging.INFO) + +if get_settings().ENVIRONMENT == "local": + app = FastAPI() +else: + app = FastAPI(dependencies=[Depends(validate_api_key)]) + + +# Handle 422 errors (validation failures) +@app.exception_handler(RequestValidationError) +async def validation_exception_handler(request: Request, exc: RequestValidationError): + logger.error(f"422 Validation Error at {request.url}") + logger.error(f"Body: {exc.body}") + logger.error(f"Validation Errors: {exc.errors()}") + return JSONResponse( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + content=jsonable_encoder({ + "detail": exc.errors(), + "body": exc.body + }), + ) + + +# Handle generic HTTP exceptions (optional, useful for catching 404, 403, etc.) +@app.exception_handler(StarletteHTTPException) +async def http_exception_handler(request: Request, exc: StarletteHTTPException): + logger.warning(f"{exc.status_code} Error at {request.url} - Detail: {exc.detail}") + return JSONResponse( + status_code=exc.status_code, + content={"detail": exc.detail}, + ) + + +# Middleware to log requests +@app.middleware("http") +async def log_requests(request: Request, call_next): + logger.info(f"Incoming request: {request.method} {request.url}") + response = await call_next(request) + logger.info(f"Response status: {response.status_code}") + return response + + +app.include_router(portfolio_router.router, prefix="/v1") +app.include_router(plan_router.router, prefix="/v1") +app.include_router(whlg_router.router, prefix="/v1") if get_settings().ENVIRONMENT == "local": from app.local import router as local_router - app.include_router(local_router.router) handler = Mangum(app) diff --git a/backend/app/requirements/requirements.txt b/backend/app/requirements/requirements.txt index a213214d..dff7a546 100644 --- a/backend/app/requirements/requirements.txt +++ b/backend/app/requirements/requirements.txt @@ -12,3 +12,5 @@ boto3==1.35.44 openpyxl==3.1.2 # Basic pytz +sqlmodel + diff --git a/backend/app/tasks/__init__.py b/backend/app/tasks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/backend/app/tasks/router.py b/backend/app/tasks/router.py new file mode 100644 index 00000000..90b62dd1 --- /dev/null +++ b/backend/app/tasks/router.py @@ -0,0 +1,189 @@ +from fastapi import APIRouter, Depends, HTTPException +from uuid import UUID +import json # ← REQUIRED for json.loads + +from backend.app.dependencies import validate_token +from backend.app.tasks.schema import ( + CreateTaskRequest, + UpdateTaskStatusRequest, + CreateSubTaskRequest, + UpdateSubTaskStatusRequest, + FinalizeSubTaskRequest, + TaskSqsTriggerRequest +) + +# Correct location of interfaces +from backend.app.db.functions.tasks.Tasks import TasksInterface, SubTaskInterface + +from backend.app.db.connection import get_db_session +from backend.app.db.models.tasks import Task, SubTask +from sqlmodel import select + + +router = APIRouter( + prefix="/tasks", + tags=["tasks"], + dependencies=[Depends(validate_token)], +) + + +# ============================================================ +# Create Task +# ============================================================ +@router.post("/", summary="Create a new task and its first subtask") +async def create_task(req: CreateTaskRequest): + tasks = TasksInterface() + task_id, subtask_id = tasks.create_task( + task_source=req.task_source, + service=req.service, + inputs=req.inputs, + ) + return {"task_id": task_id, "subtask_id": subtask_id} + + +# ============================================================ +# Get Task + Subtasks +# ============================================================ +@router.get("/{task_id}", summary="Get a task and its subtasks") +async def get_task(task_id: UUID): + with get_db_session() as session: + task = session.get(Task, task_id) + if not task: + raise HTTPException(status_code=404, detail="Task not found") + + subtasks = session.exec( + select(SubTask).where(SubTask.taskId == task_id) + ).all() + + formatted = [] + for st in subtasks: + formatted.append({ + **st.dict(), + "inputs": json.loads(st.inputs) if st.inputs else None, + "outputs": json.loads(st.outputs) if st.outputs else None, + "cloud_logs_url": st.cloudLogsURL, + }) + + return { + "task": task, + "subtasks": formatted, + } + + +# ============================================================ +# Update Task Status +# ============================================================ +@router.put("/{task_id}/status", summary="Update a task's status") +async def update_task_status(task_id: UUID, req: UpdateTaskStatusRequest): + tasks = TasksInterface() + try: + updated = tasks.update_task_status(task_id, req.status) + return {"task_id": updated.id, "status": updated.status} + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + + +# ============================================================ +# Create Additional Subtask +# ============================================================ +@router.post("/{task_id}/subtasks", summary="Create a new subtask under a task") +async def create_subtask(task_id: UUID, req: CreateSubTaskRequest): + subtasks = SubTaskInterface() + try: + st = subtasks.create_subtask(task_id, req.inputs) + return {"subtask_id": st.id, "task_id": task_id, "status": st.status} + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + + +# ============================================================ +# Update Subtask Status +# ============================================================ +@router.put("/subtask/{subtask_id}/status", summary="Update a subtask's status") +async def update_subtask_status(subtask_id: UUID, req: UpdateSubTaskStatusRequest): + subtasks = SubTaskInterface() + try: + st = subtasks.update_subtask_status(subtask_id, req.status) + return {"subtask_id": st.id, "status": st.status} + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + + +# === +# Sub task is complete +@router.post("/subtask/{subtask_id}/finalize", summary="Finalize a subtask with status, outputs, logs") +async def finalize_subtask(subtask_id: UUID, req: FinalizeSubTaskRequest): + subtasks = SubTaskInterface() + + try: + st = subtasks.finalize_subtask( + subtask_id=subtask_id, + status=req.status, + outputs=req.outputs, + cloud_logs_url=req.cloud_logs_url + ) + + return { + "subtask_id": st.id, + "status": st.status, + "outputs": req.outputs, + "cloud_logs_url": req.cloud_logs_url, + } + + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +# for testing: + +import boto3 +import json +from backend.app.tasks.schema import TaskSqsTriggerRequest +from backend.app.db.functions.tasks.Tasks import TasksInterface, SubTaskInterface +from backend.app.config import get_settings + +sqs = boto3.client("sqs") + +@router.post("/trigger", summary="Create task + subtask and publish to SQS", status_code=202) +async def trigger_task(req: TaskSqsTriggerRequest): + """ + Creates a Task + SubTask, then pushes the SubTask into SQS so a Lambda can process it. + If inputs are empty, automatically replaced with {}. + """ + + settings = get_settings() + + tasks = TasksInterface() + + # ---- Normalize empty inputs ---- + inputs = req.inputs or {} # ensures {} even if null + + # ---- 1. Create Task + SubTask ---- + task_id, subtask_id = tasks.create_task( + task_source=req.task_source, + service=req.service, + inputs=inputs, + ) + + # ---- 2. Prepare SQS payload ---- + sqs_payload = { + "subtask_id": str(subtask_id), + "params": inputs, + } + + try: + response = sqs.send_message( + QueueUrl=f"https://sqs.{settings.AWS_REGION}.amazonaws.com/" + f"{settings.AWS_ACCOUNT_ID}/lambda-example-queue", + MessageBody=json.dumps(sqs_payload) + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"SQS error: {e}") + + return { + "message": "Task triggered", + "task_id": task_id, + "subtask_id": subtask_id, + "sqs_message_id": response.get("MessageId"), + "inputs_sent": inputs, + } \ No newline at end of file diff --git a/backend/app/tasks/schema.py b/backend/app/tasks/schema.py new file mode 100644 index 00000000..a5b4424b --- /dev/null +++ b/backend/app/tasks/schema.py @@ -0,0 +1,31 @@ +from typing import Optional, Any, Dict +from uuid import UUID +from pydantic import BaseModel + + +class CreateTaskRequest(BaseModel): + task_source: str + service: Optional[str] = None + inputs: Optional[Dict[str, Any]] = None # JSON object + + +class UpdateTaskStatusRequest(BaseModel): + status: str + + +class CreateSubTaskRequest(BaseModel): + inputs: Optional[Dict[str, Any]] = None # JSON object + + +class UpdateSubTaskStatusRequest(BaseModel): + status: str + +class FinalizeSubTaskRequest(BaseModel): + status: str # "complete" or "failed" + outputs: Optional[Dict[str, Any]] = None + cloud_logs_url: Optional[str] = None + +class TaskSqsTriggerRequest(BaseModel): + task_source: str + service: Optional[str] = None + inputs: Dict[str, Any] # forwarded into SubTask.inputs + SQS message \ No newline at end of file diff --git a/backend/app/whlg/route.py b/backend/app/whlg/route.py deleted file mode 100644 index 21d417c5..00000000 --- a/backend/app/whlg/route.py +++ /dev/null @@ -1,47 +0,0 @@ -import boto3 -import json -import math -import asyncio -import random - -from datetime import datetime - -from fastapi import APIRouter, Depends -from backend.app.dependencies import validate_token -from backend.app.plan.schemas import PlanTriggerRequest -from backend.app.config import get_settings -from sqlalchemy.orm import sessionmaker -from utils.logger import setup_logger -from backend.app.db.connection import db_engine - -from backend.app.db.functions.recommendations_functions import create_scenario - -logger = setup_logger() - -router = APIRouter( - prefix="/whlg", - tags=["whlg"], - dependencies=[Depends(validate_token)], - responses={404: {"description": "Not found"}} -) - - -@router.post("/") -async def whlg_entrypoint(body): - # body needs to include postcode, UPRN [task ID?] - # - # Refer to the plan trigger route for code - # 1) Create an event schema and store it in the schemas file - # 2) Build the tasks functions - # 3) Read in the funding csx. This can be found as such: - # whlg_eligible_postcodes = read_csv_from_s3( - # bucket_name=get_settings().DATA_BUCKET, - # filepath="funding/whlg eligible postcodes.csv", - # ) - # whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes) - # Check the postcode against this file - # We need to store this somewhere????!!!??!??!?!?!?!??!??!??!??!??!??!??!??!??!??! Create a new table! - # Update subtask to be complete - # Once this is complete, build the logs stuff, add the cloudwatch logs ID to the database - - print("We're gonna do stuff!") diff --git a/backend/app/whlg/router.py b/backend/app/whlg/router.py new file mode 100644 index 00000000..3957a3f4 --- /dev/null +++ b/backend/app/whlg/router.py @@ -0,0 +1,78 @@ +import boto3 +import json +import math +import asyncio +import random + +from datetime import datetime + +from fastapi import APIRouter, Depends +from backend.app.dependencies import validate_token +from backend.app.plan.schemas import PlanTriggerRequest +from backend.app.config import get_settings +from sqlalchemy.orm import sessionmaker +from utils.logger import setup_logger +from backend.app.db.connection import db_engine +from backend.app.db.functions.recommendations_functions import create_scenario +import pandas as pd +from backend.app.whlg.schema import WHLGElligibilityRequest + +from utils.s3 import read_csv_from_s3 +from sqlalchemy.dialects.postgresql import insert +from backend.app.db.connection import get_db_session +from backend.app.db.models.whlg import Whlg +from backend.app.db.functions.whlg_functions import upsert_whlg_postcode + +logger = setup_logger() + + +if get_settings().ENVIRONMENT == "local": + router = APIRouter( + prefix="/whlg", + tags=["whlg"], + ) + +else: + router = APIRouter( + prefix="/whlg", + tags=["whlg"], + dependencies=[Depends(validate_token)], + responses={404: {"description": "Not found"}} + ) + +@router.get("/") +async def whlg_entrypoint(): + # body needs to include postcode, UPRN [task ID?] + # + # Refer to the plan trigger route for code + # 1) Create an event schema and store it in the schemas file + # 2) Build the tasks functions + # 3) Read in the funding csx. This can be found as such: + # whlg_eligible_postcodes = read_csv_from_s3( + # bucket_name=get_settings().DATA_BUCKET, + # filepath="funding/whlg eligible postcodes.csv", + # ) + # whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes) + # Check the postcode against this file + # We need to store this somewhere????!!!??!??!?!?!?!??!??!??!??!??!??!??!??!??!??! Create a new table! + # Update subtask to be complete + # Once this is complete, build the logs stuff, add the cloudwatch logs ID to the database + return {"hello": "from whlg"} + + +@router.post("/eligible") +async def eligiable(body: WHLGElligibilityRequest): + postcode = body.postcode or "" + postcode = postcode.lower().replace(" ", "") + + whlg_eligible_postcodes = read_csv_from_s3( + bucket_name=get_settings().DATA_BUCKET, + filepath="funding/whlg eligible postcodes.csv", + ) + whlg_eligible_postcodes = pd.DataFrame(whlg_eligible_postcodes) + whlg_eligible_postcodes['Postcode'] = whlg_eligible_postcodes['Postcode'].str.replace(' ', '', regex=False) + + is_eligible = postcode in whlg_eligible_postcodes['Postcode'].values + return {"whlg_eligible": is_eligible} + + diff --git a/backend/app/whlg/schema.py b/backend/app/whlg/schema.py index e69de29b..648ecbf3 100644 --- a/backend/app/whlg/schema.py +++ b/backend/app/whlg/schema.py @@ -0,0 +1,4 @@ +from pydantic import BaseModel, Field + +class WHLGElligibilityRequest(BaseModel): + postcode: str = Field(..., example="B93 8SY") \ No newline at end of file diff --git a/backend/run_curl.sh b/backend/run_curl.sh new file mode 100644 index 00000000..22433e39 --- /dev/null +++ b/backend/run_curl.sh @@ -0,0 +1,11 @@ +curl -X POST "http://localhost:8000/v1/whlg/eligible" \ + -H "Content-Type: application/json" \ + -d '{"postcode": "B93 8SY"}' + +curl -X POST "http://localhost:8000/v1/whlg/eligible" \ + -H "Content-Type: application/json" \ + -d '{"postcode": "BN15 0FD"}' + +curl -X POST "http://localhost:8000/v1/whlg/eligible" \ + -H "Content-Type: application/json" \ + -d '{"postcode": "DY6 0LB"}' diff --git a/backend/run_local.sh b/backend/run_local.sh new file mode 100644 index 00000000..be45a54a --- /dev/null +++ b/backend/run_local.sh @@ -0,0 +1,6 @@ +set -a +source ./.env +set +a + +uvicorn app.main:app --reload +