import json import os import time from typing import Any, List, Mapping import boto3 from utils.logger import setup_logger from utils.s3 import upload_file_to_s3 logger = setup_logger() BUCKET = "retrofit-energy-assessments-dev" PDF_S3_KEY = "example/SiteNotesExample.pdf" PDF_LOCAL_PATH = os.path.join( os.path.dirname(__file__), "..", "..", "tests", "test_data", "SiteNotesExample.pdf", ) def upload_pdf(local_path: str, bucket: str, key: str) -> None: logger.info(f"Uploading {local_path} to s3://{bucket}/{key}") upload_file_to_s3(local_path, bucket, key) logger.info("Upload complete") def start_textract_job(bucket: str, key: str) -> str: client = boto3.client("textract") response = client.start_document_analysis( DocumentLocation={"S3Object": {"Bucket": bucket, "Name": key}}, FeatureTypes=["FORMS"], ) job_id: str = response["JobId"] logger.info(f"Started Textract job {job_id}") return job_id def wait_for_job(job_id: str, poll_interval_seconds: int = 5) -> None: client = boto3.client("textract") logger.info(f"Polling Textract job {job_id}...") while True: response = client.get_document_analysis(JobId=job_id, MaxResults=1) status = response["JobStatus"] logger.info(f"Status: {status}") if status == "SUCCEEDED": return if status == "FAILED": raise RuntimeError( f"Textract job {job_id} failed: {response.get('StatusMessage')}" ) time.sleep(poll_interval_seconds) def collect_blocks(job_id: str) -> List[Any]: client = boto3.client("textract") blocks: List[Any] = [] next_token = None while True: kwargs: dict = {"JobId": job_id, "MaxResults": 1000} if next_token: kwargs["NextToken"] = next_token response = client.get_document_analysis(**kwargs) blocks.extend(response.get("Blocks", [])) next_token = response.get("NextToken") if not next_token: break logger.info(f"Collected {len(blocks)} blocks") return blocks def save_blocks(blocks: List[Any], output_path: str) -> None: with open(output_path, "w") as f: json.dump(blocks, f, indent=2, default=str) logger.info(f"Saved blocks to {output_path}") def handler(event: Mapping[str, Any], context: Any) -> None: logger.info("Entered handler") output_path = os.path.join(os.path.dirname(__file__), "..", "textract_blocks.json") upload_pdf(PDF_LOCAL_PATH, BUCKET, PDF_S3_KEY) job_id = start_textract_job(BUCKET, PDF_S3_KEY) wait_for_job(job_id) blocks = collect_blocks(job_id) save_blocks(blocks, output_path) logger.info("Done") if __name__ == "__main__": handler({}, None)