mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
101 lines
2.7 KiB
Python
101 lines
2.7 KiB
Python
import json
|
|
import os
|
|
import time
|
|
from typing import Any, List, Mapping
|
|
|
|
import boto3
|
|
|
|
from utils.logger import setup_logger
|
|
from utils.s3 import upload_file_to_s3
|
|
|
|
logger = setup_logger()
|
|
|
|
BUCKET = "retrofit-energy-assessments-dev"
|
|
PDF_S3_KEY = "example/SiteNotesExample.pdf"
|
|
PDF_LOCAL_PATH = os.path.join(
|
|
os.path.dirname(__file__),
|
|
"..",
|
|
"..",
|
|
"tests",
|
|
"test_data",
|
|
"SiteNotesExample.pdf",
|
|
)
|
|
|
|
|
|
def upload_pdf(local_path: str, bucket: str, key: str) -> None:
|
|
logger.info(f"Uploading {local_path} to s3://{bucket}/{key}")
|
|
upload_file_to_s3(local_path, bucket, key)
|
|
logger.info("Upload complete")
|
|
|
|
|
|
def start_textract_job(bucket: str, key: str) -> str:
|
|
client = boto3.client("textract")
|
|
response = client.start_document_analysis(
|
|
DocumentLocation={"S3Object": {"Bucket": bucket, "Name": key}},
|
|
FeatureTypes=["FORMS"],
|
|
)
|
|
job_id: str = response["JobId"]
|
|
logger.info(f"Started Textract job {job_id}")
|
|
return job_id
|
|
|
|
|
|
def wait_for_job(job_id: str, poll_interval_seconds: int = 5) -> None:
|
|
client = boto3.client("textract")
|
|
logger.info(f"Polling Textract job {job_id}...")
|
|
while True:
|
|
response = client.get_document_analysis(JobId=job_id, MaxResults=1)
|
|
status = response["JobStatus"]
|
|
logger.info(f"Status: {status}")
|
|
if status == "SUCCEEDED":
|
|
return
|
|
if status == "FAILED":
|
|
raise RuntimeError(
|
|
f"Textract job {job_id} failed: {response.get('StatusMessage')}"
|
|
)
|
|
time.sleep(poll_interval_seconds)
|
|
|
|
|
|
def collect_blocks(job_id: str) -> List[Any]:
|
|
client = boto3.client("textract")
|
|
blocks: List[Any] = []
|
|
next_token = None
|
|
|
|
while True:
|
|
kwargs: dict = {"JobId": job_id, "MaxResults": 1000}
|
|
if next_token:
|
|
kwargs["NextToken"] = next_token
|
|
|
|
response = client.get_document_analysis(**kwargs)
|
|
blocks.extend(response.get("Blocks", []))
|
|
|
|
next_token = response.get("NextToken")
|
|
if not next_token:
|
|
break
|
|
|
|
logger.info(f"Collected {len(blocks)} blocks")
|
|
return blocks
|
|
|
|
|
|
def save_blocks(blocks: List[Any], output_path: str) -> None:
|
|
with open(output_path, "w") as f:
|
|
json.dump(blocks, f, indent=2, default=str)
|
|
logger.info(f"Saved blocks to {output_path}")
|
|
|
|
|
|
def handler(event: Mapping[str, Any], context: Any) -> None:
|
|
logger.info("Entered handler")
|
|
|
|
output_path = os.path.join(os.path.dirname(__file__), "..", "textract_blocks.json")
|
|
|
|
upload_pdf(PDF_LOCAL_PATH, BUCKET, PDF_S3_KEY)
|
|
|
|
job_id = start_textract_job(BUCKET, PDF_S3_KEY)
|
|
wait_for_job(job_id)
|
|
blocks = collect_blocks(job_id)
|
|
save_blocks(blocks, output_path)
|
|
|
|
logger.info("Done")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
handler({}, None)
|