diff --git a/backend/address2UPRN/main.py b/backend/address2UPRN/main.py index 8d1ba21d..0aedd082 100644 --- a/backend/address2UPRN/main.py +++ b/backend/address2UPRN/main.py @@ -10,11 +10,13 @@ from typing import Set import json import requests from uuid import UUID +import uuid from backend.app.db.functions.tasks.Tasks import SubTaskInterface +from utils.s3 import save_csv_to_s3 +from datetime import datetime logger = setup_logger() - EPC_AUTH_TOKEN = os.getenv( "EPC_AUTH_TOKEN", ) @@ -502,6 +504,46 @@ def resolve_uprns_for_postcode_group( ) +def save_results_to_s3( + results_df: pd.DataFrame, task_id: str, sub_task_id: str, bucket_name: str = None +) -> bool: + """ + Save results DataFrame to S3 as CSV. + + :param results_df: The DataFrame containing results + :param task_id: The task ID (used for file naming) + :param bucket_name: The S3 bucket name (defaults to env variable) + :return: True if successful, False otherwise + """ + if bucket_name is None: + bucket_name = os.getenv("S3_BUCKET_NAME") + + if not bucket_name: + logger.error( + "S3 bucket name not provided and S3_BUCKET_NAME environment variable not set" + ) + return False + + try: + # Create a filename with the task ID + file_name = f"{datetime.now().isoformat()}_{str(uuid.uuid4())[:8]}" + file_key = f"ara_raw_outputs/{task_id}/{sub_task_id}/{file_name}.csv" + + # Save to S3 + success = save_csv_to_s3(results_df, bucket_name, file_key) + + if success: + logger.info(f"Successfully saved results to s3://{bucket_name}/{file_key}") + return True + else: + logger.error(f"Failed to save results to S3") + return False + + except Exception as e: + logger.error(f"Error saving results to S3: {str(e)}") + return False + + def test(a, b): assert a == b, f"erorr: {a}{type(a)} != {b}: {type(b)}" @@ -760,7 +802,12 @@ def handler(event, context, local=False): # Create results DataFrame result_df = pd.DataFrame(results_data) - logger.info(f"Created results DataFrame with {len(result_df)} rows") + + # Save results to S3 + try: + save_results_to_s3(result_df, str(task_id), str(subtask_id)) + except Exception as s3_error: + logger.error(f"Failed to save results to S3: {s3_error}") results.append( { diff --git a/backend/postcode_splitter/main.py b/backend/postcode_splitter/main.py index 943435b9..73a79d2c 100644 --- a/backend/postcode_splitter/main.py +++ b/backend/postcode_splitter/main.py @@ -164,6 +164,12 @@ def handler(event, context, local=False): # just do 5 well we are testing, sqs connection if local: df = df.head(5) + + # TODO: DELETE ME, if you see this in the PR. + # TODO: DELETE ME, if you see this in the PR. + # TODO: DELETE ME, if you see this in the PR. + df = df.head(5) + logger.info(f"CSV loaded: {len(df)} rows, {len(df.columns)} columns") # Sanitise postcodes diff --git a/infrastructure/terraform/shared/main.tf b/infrastructure/terraform/shared/main.tf index 5e189dc9..4ec57c3e 100644 --- a/infrastructure/terraform/shared/main.tf +++ b/infrastructure/terraform/shared/main.tf @@ -305,6 +305,21 @@ module "address2uprn_registry" { } +# S3 policy for postcode splitter to read from retrofit data bucket +module "address2uprn_s3_read_and_write" { + source = "../modules/s3_iam_policy" + + policy_name = "Address2UPRNReadandWriteS3" + policy_description = "Allow address2uprn Lambda to read and write from retrofit-data bucket" + bucket_arns = ["arn:aws:s3:::retrofit-data-${var.stage}"] + actions = ["s3:GetObject", "s3:ListBucket", "s3:PutObject"] + resource_paths = ["/*"] +} + +output "postcode_splitter_s3_read_arn" { + value = module.postcode_splitter_s3_read.policy_arn +} + ################################################ # Condition ETL – Lambda ECR ################################################ diff --git a/utils/s3.py b/utils/s3.py index 2e67d4f0..0e79c26b 100644 --- a/utils/s3.py +++ b/utils/s3.py @@ -8,7 +8,6 @@ from botocore.exceptions import NoCredentialsError, PartialCredentialsError logger = setup_logger() - def read_from_s3(bucket_name, s3_file_name): """ Read an object from s3. Decoding of the data is left for outside of this function