survey-extraction/deployment/lambda/extractor_and_loader/docker/app.py
2025-08-19 12:05:40 +00:00

158 lines
No EOL
4.6 KiB
Python

import os
import tempfile
import requests
import boto3
from urllib.parse import urlparse
from etl.fileReader.pdfReaderToText import pdfReaderToText
from etl.fileReader.sitenotes import (
SiteNotesExtractor,
WarmHomesConditionReport
)
from uuid import UUID
import json
from typing import Any
from etl.db.db import get_db_session, init_db
from typing import Union
import uuid
from datetime import datetime, timezone
from sqlmodel import select
from sqlalchemy import update
from etl.models.topLevel import uploaded_files
def update_uploaded_file_json_uri_by_query(
db_session,
file_id: Union[str, uuid.UUID],
json_: str,
):
"""
Query uploaded_files by id, update s3_json_uri and s3_json_upload_timestamp,
commit, refresh, and return the ORM object. Raises ValueError if not found.
"""
try:
file_id_norm = uuid.UUID(str(file_id))
except (ValueError, AttributeError, TypeError):
file_id_norm = file_id # leave as-is if not a UUID
obj = (
db_session
.query(uploaded_files)
.filter(uploaded_files.id == file_id_norm)
.first()
)
obj.s3_json_uri = json_
obj.s3_json_upload_timestamp = datetime.now(timezone.utc)
db_session.add(obj)
db_session.commit()
db_session.refresh(obj)
return obj
def serialize_model(model: Any):
"""Recursively convert Pydantic models/lists into plain dicts."""
if hasattr(model, "dict"):
return {k: serialize_model(v) for k, v in model.dict().items()}
elif isinstance(model, list):
return [serialize_model(item) for item in model]
else:
return model
def make_final_json(rooms_obj, heating_system_obj):
# Convert to dict recursively
rooms_data = serialize_model(rooms_obj)
heating_data = serialize_model(heating_system_obj)
# Combine into one big JSON-ready dict
final_data = {
"rooms": rooms_data,
"heating_system": heating_data
}
# Convert to pretty JSON string
return final_data
def parse_s3_uri(uri: str):
"""
Parse an S3 URI or HTTPS S3 URL into bucket and key.
Supports formats:
- s3://bucket-name/path/to/file
- https://bucket-name.s3.region.amazonaws.com/path/to/file
"""
parsed = urlparse(uri)
if parsed.scheme == "s3":
# s3://bucket/key
bucket = parsed.netloc
key = parsed.path.lstrip("/")
elif parsed.scheme in ("http", "https"):
# https://bucket-name.s3.region.amazonaws.com/key
host_parts = parsed.netloc.split(".")
if len(host_parts) >= 3 and host_parts[1] == "s3":
bucket = host_parts[0]
else:
raise ValueError("Not a valid S3 HTTPS URL format")
key = parsed.path.lstrip("/")
else:
raise ValueError("Unsupported URI scheme")
return bucket, key
def download_private_s3_file(uri) -> str:
bucket_name, key = parse_s3_uri(uri)
"""
Download a private S3 file using hardcoded AWS credentials.
Saves it to /tmp and returns the local file path.
"""
# Hardcoded AWS credentials (quick testing only)
aws_access_key = "AKIAU5A36PPNJMZZ3KRW"
aws_secret_key = "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"
aws_region = "eu-west-2"
# Where to store the file locally
tmp_dir = tempfile.gettempdir()
filename = os.path.basename(key)
file_path = os.path.join(tmp_dir, filename)
# Create S3 client with hardcoded creds
s3 = boto3.client(
"s3",
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key,
region_name=aws_region
)
# Download file
s3.download_file(bucket_name, key, file_path)
return file_path
def handler(event, context):
try:
id_ = "2d17c316-65b1-4cc6-ba29-e813a9a989ef"
file_uri = "https://retrofit-energy-assessments-dev.s3.eu-west-2.amazonaws.com/documents/10034911080/osmosis_condition_pas_2035_report/20250819_120258.pdf"
local_path = download_private_s3_file(file_uri)
print(f"File saved to: {local_path}")
reader = pdfReaderToText(local_path)
print(reader.text_list)
obj = WarmHomesConditionReport(reader.text_list)
json_ = make_final_json(obj.master_obj[0], obj.master_obj[1])
from pprint import pprint
pprint(json_)
# json_ = {}
init_db()
with get_db_session() as session:
update_uploaded_file_json_uri_by_query(
session,
id_,
json_
)
except Exception as e:
print(f"❌ Error: {e}")
return {
"statusCode": 500,
"body": str(e)
}