mirror of
https://github.com/Hestia-Homes/Model.git
synced 2026-06-08 11:17:27 +00:00
post code splitter works
This commit is contained in:
parent
d6ea88adf3
commit
8e574c2401
3 changed files with 130 additions and 264 deletions
2
.github/workflows/deploy_terraform.yml
vendored
2
.github/workflows/deploy_terraform.yml
vendored
|
|
@ -77,7 +77,7 @@ jobs:
|
|||
run: terraform plan -var-file=${STAGE}.tfvars -out=tfplan
|
||||
|
||||
- name: Terraform Apply
|
||||
# if: env.STAGE == 'prod'
|
||||
if: env.STAGE == 'prod'
|
||||
working-directory: infrastructure/terraform/shared
|
||||
run: terraform apply -auto-approve tfplan
|
||||
|
||||
|
|
|
|||
|
|
@ -544,8 +544,8 @@ def handler(event, context, local=False):
|
|||
"body": json.dumps(
|
||||
{
|
||||
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
|
||||
"sub_task_id": "a1b2c3d4-e5f6-7a8b-9c0d-e1f2a3b4c5d6",
|
||||
"s3_uri": "",
|
||||
"sub_task_id": "1c09df07-fd29-4de7-b146-fafb591856a9",
|
||||
"s3_uri": "s3://retrofit-data-dev/ara_postcode_splitter_batches/e31f2f21-175b-4a91-a3ec-a6baa325e917/8673913b-1a88-42d7-8578-0449123d94b0/2026-02-13T15:54:58.568594_67557923.csv",
|
||||
}
|
||||
)
|
||||
}
|
||||
|
|
@ -573,14 +573,14 @@ def handler(event, context, local=False):
|
|||
|
||||
# Validate required fields
|
||||
task_id = body.get("task_id")
|
||||
sub_task_id = body.get("sub_task_id")
|
||||
subtask_id = body.get("sub_task_id")
|
||||
s3_uri = body.get("s3_uri")
|
||||
|
||||
if not task_id:
|
||||
errors.append({"error": "Missing required field: task_id"})
|
||||
continue
|
||||
|
||||
if not sub_task_id:
|
||||
if not subtask_id:
|
||||
errors.append({"error": "Missing required field: sub_task_id"})
|
||||
continue
|
||||
|
||||
|
|
@ -598,7 +598,7 @@ def handler(event, context, local=False):
|
|||
# Convert sub_task_id to UUID
|
||||
try:
|
||||
subtask_id = (
|
||||
UUID(sub_task_id) if isinstance(sub_task_id, str) else sub_task_id
|
||||
UUID(subtask_id) if isinstance(subtask_id, str) else subtask_id
|
||||
)
|
||||
except ValueError as e:
|
||||
errors.append(
|
||||
|
|
@ -756,16 +756,6 @@ def handler(event, context, local=False):
|
|||
except Exception as s3_error:
|
||||
logger.error(f"Failed to save results to S3: {s3_error}")
|
||||
|
||||
results.append(
|
||||
{
|
||||
"subtask_id": str(subtask_id),
|
||||
"postcodes_processed": postcodes_processed,
|
||||
"addresses_processed": addresses_processed,
|
||||
"uprns_found": uprns_found,
|
||||
"status": "processed",
|
||||
}
|
||||
)
|
||||
|
||||
# Mark subtask as completed
|
||||
try:
|
||||
subtask_interface.update_subtask_status(
|
||||
|
|
@ -777,17 +767,6 @@ def handler(event, context, local=False):
|
|||
except Exception as db_error:
|
||||
logger.error(f"Failed to mark subtask as completed: {db_error}")
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Invalid JSON in request body: {e}")
|
||||
errors.append({"error": "Invalid JSON in request body", "details": str(e)})
|
||||
# Mark subtask as failed if we have one
|
||||
if subtask_id:
|
||||
try:
|
||||
subtask_interface.update_subtask_status(
|
||||
subtask_id, "failed", outputs={"error": str(e)}
|
||||
)
|
||||
except Exception as db_error:
|
||||
logger.error(f"Failed to update subtask status: {db_error}")
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error processing record: {e}", exc_info=True)
|
||||
errors.append({"error": "Unexpected error", "details": str(e)})
|
||||
|
|
|
|||
|
|
@ -101,8 +101,9 @@ def send_to_address2uprn_queue(task_id: str, sub_task_id: str, s3_uri: str) -> s
|
|||
|
||||
|
||||
def create_batch_and_send_to_address2uprn(
|
||||
batch_rows: list,
|
||||
batch_df: pd.DataFrame,
|
||||
task_id: str,
|
||||
sub_task_id: str,
|
||||
subtask_interface: SubTaskInterface,
|
||||
bucket_name: str,
|
||||
) -> str:
|
||||
|
|
@ -118,291 +119,177 @@ def create_batch_and_send_to_address2uprn(
|
|||
Returns:
|
||||
The created batch subtask ID
|
||||
"""
|
||||
# Generate unique batch subtask ID
|
||||
batch_sub_task_id = str(uuid4())
|
||||
|
||||
# Upload batch to S3
|
||||
batch_df = pd.DataFrame(batch_rows)
|
||||
s3_uri = upload_batch_to_s3(batch_df, str(task_id), batch_sub_task_id, bucket_name)
|
||||
|
||||
s3_uri = upload_batch_to_s3(batch_df, str(task_id), str(sub_task_id), bucket_name)
|
||||
|
||||
# Create a new subtask for this batch with all inputs
|
||||
created_batch_sub_task_id = subtask_interface.create_subtask(
|
||||
task_id=task_id,
|
||||
inputs={
|
||||
"task_id": str(task_id),
|
||||
"sub_task_id": batch_sub_task_id,
|
||||
"batch_size": len(batch_rows),
|
||||
"s3_uri": s3_uri,
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(f"Created batch subtask {created_batch_sub_task_id}")
|
||||
|
||||
# Send message with S3 reference
|
||||
send_to_address2uprn_queue(
|
||||
task_id=str(task_id),
|
||||
sub_task_id=str(created_batch_sub_task_id),
|
||||
s3_uri=s3_uri,
|
||||
)
|
||||
# # Send message with S3 reference
|
||||
# send_to_address2uprn_queue(
|
||||
# task_id=str(task_id),
|
||||
# sub_task_id=str(created_batch_sub_task_id),
|
||||
# s3_uri=s3_uri,
|
||||
# )
|
||||
|
||||
return created_batch_sub_task_id
|
||||
|
||||
|
||||
def handler(event, context):
|
||||
def handler(event, context, local=False):
|
||||
print(f"Function: {context.function_name}")
|
||||
print(f"Request ID: {context.aws_request_id}")
|
||||
|
||||
# Example SQS message for testing (copy and paste into SQS):
|
||||
# {
|
||||
# "task_id":"e31f2f21-175b-4a91-a3ec-a6baa325e917",
|
||||
# "s3_uri":"s3://retrofit-data-dev/ara_raw_inputs/peabody/2025_11_11 - Peabody - Data Extracts for Domna_transformed.csv"
|
||||
# }
|
||||
|
||||
if local is True:
|
||||
event = {
|
||||
"Records": [
|
||||
{
|
||||
"body": json.dumps(
|
||||
{
|
||||
"task_id": "e31f2f21-175b-4a91-a3ec-a6baa325e917",
|
||||
"sub_task_id": "8673913b-1a88-42d7-8578-0449123d94b0",
|
||||
"s3_uri": "s3://retrofit-data-dev/ara_raw_inputs/peabody/2025_11_11 - Peabody - Data Extracts for Domna_transformed.csv",
|
||||
}
|
||||
)
|
||||
}
|
||||
]
|
||||
}
|
||||
# Handle both single event and batch events (SQS, etc.)
|
||||
records = event.get("Records", [event])
|
||||
results = []
|
||||
errors = []
|
||||
subtask_interface = SubTaskInterface()
|
||||
bucket_name = os.getenv("S3_BUCKET_NAME")
|
||||
if local:
|
||||
bucket_name = "retrofit-data-dev"
|
||||
|
||||
for record in records:
|
||||
if local:
|
||||
record = records[0]
|
||||
task_id = None
|
||||
subtask_id = None
|
||||
try:
|
||||
# Parse body (inputs)
|
||||
if isinstance(record.get("body"), str):
|
||||
body = json.loads(record["body"])
|
||||
else:
|
||||
body = record.get("body", {})
|
||||
# Parse body (inputs)
|
||||
|
||||
# Validate required fields
|
||||
task_id = body.get("task_id")
|
||||
s3_uri = body.get("s3_uri")
|
||||
if isinstance(record.get("body"), str):
|
||||
body = json.loads(record["body"])
|
||||
else:
|
||||
body = record.get("body", {})
|
||||
|
||||
if not task_id:
|
||||
errors.append({"error": "Missing required field: task_id"})
|
||||
continue
|
||||
# Validate required fields
|
||||
task_id = body.get("task_id")
|
||||
subtask_id = body.get("sub_task_id")
|
||||
s3_uri = body.get("s3_uri")
|
||||
|
||||
if not s3_uri:
|
||||
errors.append({"error": "Missing required field: s3_uri"})
|
||||
continue
|
||||
# Convert task_id to UUID
|
||||
task_id = UUID(task_id) if isinstance(task_id, str) else task_id
|
||||
subtask_id = UUID(subtask_id) if isinstance(subtask_id, str) else subtask_id
|
||||
|
||||
# Convert task_id to UUID
|
||||
try:
|
||||
task_id = UUID(task_id) if isinstance(task_id, str) else task_id
|
||||
except ValueError as e:
|
||||
errors.append({"error": f"Invalid UUID format for task_id: {str(e)}"})
|
||||
continue
|
||||
# Mark subtask as in progress
|
||||
subtask_interface.update_subtask_status(subtask_id, "in progress")
|
||||
logger.info(f"Marked subtask {subtask_id} as in progress")
|
||||
|
||||
# Create a new subtask for this postcode splitter invocation
|
||||
subtask_id = subtask_interface.create_subtask(
|
||||
task_id=task_id, inputs={"s3_uri": s3_uri}
|
||||
# Read CSV from S3
|
||||
bucket, key = parse_s3_uri(s3_uri)
|
||||
logger.info(f"S3 Bucket: {bucket}, Key: {key}")
|
||||
|
||||
csv_data = read_csv_from_s3_dict(bucket, key)
|
||||
df = pd.DataFrame(csv_data)
|
||||
|
||||
# TODO: Change the input to the file you want
|
||||
# df = df.head(1983)
|
||||
df = df.head(502)
|
||||
|
||||
logger.info(f"CSV loaded: {len(df)} rows, {len(df.columns)} columns")
|
||||
|
||||
# Sanitise postcodes
|
||||
df["postcode_clean"] = df["postcode"].str.upper().str.replace(" ", "")
|
||||
|
||||
df = df.dropna(subset=["postcode_clean"])
|
||||
|
||||
batch_size = 500
|
||||
if df.shape[0] < batch_size:
|
||||
create_batch_and_send_to_address2uprn(
|
||||
batch_df=df,
|
||||
task_id=task_id,
|
||||
sub_task_id=subtask_id,
|
||||
subtask_interface=subtask_interface,
|
||||
bucket_name=bucket_name,
|
||||
)
|
||||
logger.info(f"Created subtask {subtask_id} for task {task_id}")
|
||||
|
||||
# Mark subtask as in progress
|
||||
subtask_interface.update_subtask_status(subtask_id, "in progress")
|
||||
logger.info(f"Marked subtask {subtask_id} as in progress")
|
||||
|
||||
# Read CSV from S3
|
||||
logger.info(f"Processing S3 URI: {s3_uri}")
|
||||
bucket, key = parse_s3_uri(s3_uri)
|
||||
logger.info(f"S3 Bucket: {bucket}, Key: {key}")
|
||||
|
||||
csv_data = read_csv_from_s3_dict(bucket, key)
|
||||
df = pd.DataFrame(csv_data)
|
||||
|
||||
# df = df.head(1983)
|
||||
df = df.head(5)
|
||||
|
||||
logger.info(f"CSV loaded: {len(df)} rows, {len(df.columns)} columns")
|
||||
|
||||
# Sanitise postcodes
|
||||
df["postcode_clean"] = df["postcode"].str.upper().str.replace(" ", "")
|
||||
|
||||
clean_df = df.dropna(subset=["postcode_clean"])
|
||||
|
||||
else:
|
||||
postcode_to_addresses = {
|
||||
postcode: group.to_dict(orient="records")
|
||||
for postcode, group in clean_df.groupby("postcode_clean", sort=False)
|
||||
postcode: group
|
||||
for postcode, group in df.groupby("postcode_clean", sort=False)
|
||||
}
|
||||
|
||||
logger.info(f"Total postcodes: {len(postcode_to_addresses)}")
|
||||
count = 0
|
||||
buffer = []
|
||||
|
||||
# Calculate total rows to send
|
||||
total_rows = sum(len(rows) for rows in postcode_to_addresses.values())
|
||||
logger.info(f"Total rows to send: {total_rows}")
|
||||
for postcode, group_df in postcode_to_addresses.items():
|
||||
group_len = len(group_df)
|
||||
|
||||
batch_size = 500
|
||||
|
||||
# If all rows fit in one batch, just send them all at once
|
||||
if total_rows <= batch_size:
|
||||
all_rows = []
|
||||
for postcode, rows in postcode_to_addresses.items():
|
||||
all_rows.extend(rows)
|
||||
try:
|
||||
create_batch_and_send_to_address2uprn(
|
||||
batch_rows=all_rows,
|
||||
task_id=task_id,
|
||||
subtask_interface=subtask_interface,
|
||||
bucket_name=bucket_name,
|
||||
)
|
||||
logger.info(
|
||||
f"Sent all {len(all_rows)} rows in single batch to address2UPRN queue"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to send all rows to address2UPRN queue: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
errors.append(
|
||||
{
|
||||
"error": "Failed to send to address2UPRN queue",
|
||||
"details": str(e),
|
||||
}
|
||||
)
|
||||
else:
|
||||
# Multi-batch processing for large datasets
|
||||
batch_rows = []
|
||||
total_sent = 0
|
||||
|
||||
for postcode, rows in postcode_to_addresses.items():
|
||||
logger.info(f"Processing postcode {postcode} with {len(rows)} rows")
|
||||
# If postcode itself is larger than batch_size, send it individually
|
||||
if len(rows) > batch_size:
|
||||
# First, send the current batch if it has data
|
||||
if batch_rows:
|
||||
try:
|
||||
create_batch_and_send_to_address2uprn(
|
||||
batch_rows=batch_rows,
|
||||
task_id=task_id,
|
||||
subtask_interface=subtask_interface,
|
||||
bucket_name=bucket_name,
|
||||
)
|
||||
logger.info(
|
||||
f"Sent batch of {len(batch_rows)} rows to address2UPRN queue"
|
||||
)
|
||||
batch_rows = []
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to send batch to address2UPRN queue: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
errors.append(
|
||||
{
|
||||
"error": "Failed to send to address2UPRN queue",
|
||||
"details": str(e),
|
||||
}
|
||||
)
|
||||
|
||||
# Send the large postcode on its own
|
||||
try:
|
||||
create_batch_and_send_to_address2uprn(
|
||||
batch_rows=rows,
|
||||
task_id=task_id,
|
||||
subtask_interface=subtask_interface,
|
||||
bucket_name=bucket_name,
|
||||
)
|
||||
logger.info(
|
||||
f"Sent large postcode {postcode} ({len(rows)} rows) to address2UPRN queue"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to send large postcode to address2UPRN queue: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
errors.append(
|
||||
{
|
||||
"error": "Failed to send to address2UPRN queue",
|
||||
"details": str(e),
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
# If adding this postcode's rows would exceed batch_size, send current batch
|
||||
current_batch_size = len(batch_rows) + len(rows)
|
||||
if batch_rows and current_batch_size > batch_size:
|
||||
logger.info(
|
||||
f"Batch threshold reached: current {len(batch_rows)} + next postcode {len(rows)} = {current_batch_size} > {batch_size}"
|
||||
)
|
||||
try:
|
||||
create_batch_and_send_to_address2uprn(
|
||||
batch_rows=batch_rows,
|
||||
task_id=task_id,
|
||||
subtask_interface=subtask_interface,
|
||||
bucket_name=bucket_name,
|
||||
)
|
||||
logger.info(
|
||||
f"Sent batch of {len(batch_rows)} rows to address2UPRN queue (total sent: {total_sent})"
|
||||
)
|
||||
total_sent += len(batch_rows)
|
||||
batch_rows = []
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to send batch to address2UPRN queue: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
errors.append(
|
||||
{
|
||||
"error": "Failed to send to address2UPRN queue",
|
||||
"details": str(e),
|
||||
}
|
||||
)
|
||||
|
||||
# Add current postcode's rows to batch
|
||||
batch_rows.extend(rows)
|
||||
|
||||
# Send remaining batch
|
||||
if batch_rows:
|
||||
try:
|
||||
# If single postcode is bigger than batch_size → send directly
|
||||
if group_len >= batch_size:
|
||||
if buffer:
|
||||
create_batch_and_send_to_address2uprn(
|
||||
batch_rows=batch_rows,
|
||||
batch_df=pd.concat(buffer, ignore_index=True),
|
||||
task_id=task_id,
|
||||
sub_task_id=subtask_id,
|
||||
subtask_interface=subtask_interface,
|
||||
bucket_name=bucket_name,
|
||||
)
|
||||
total_sent += len(batch_rows)
|
||||
logger.info(
|
||||
f"Sent final batch of {len(batch_rows)} rows to address2UPRN queue (total sent: {total_sent})"
|
||||
)
|
||||
batch_rows = []
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to send final batch to address2UPRN queue: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
errors.append(
|
||||
{
|
||||
"error": "Failed to send to address2UPRN queue",
|
||||
"details": str(e),
|
||||
}
|
||||
)
|
||||
buffer = []
|
||||
count = 0
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Invalid JSON in request body: {e}")
|
||||
errors.append({"error": "Invalid JSON in request body", "details": str(e)})
|
||||
# Mark subtask as failed if we have one
|
||||
if subtask_id:
|
||||
try:
|
||||
subtask_interface.update_subtask_status(
|
||||
subtask_id, "failed", outputs={"error": str(e)}
|
||||
create_batch_and_send_to_address2uprn(
|
||||
batch_df=group_df,
|
||||
task_id=task_id,
|
||||
sub_task_id=subtask_id,
|
||||
subtask_interface=subtask_interface,
|
||||
bucket_name=bucket_name,
|
||||
)
|
||||
except Exception as db_error:
|
||||
logger.error(f"Failed to update subtask status: {db_error}")
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error processing record: {e}", exc_info=True)
|
||||
errors.append({"error": "Unexpected error", "details": str(e)})
|
||||
# Mark subtask as failed if we have one
|
||||
if subtask_id:
|
||||
try:
|
||||
subtask_interface.update_subtask_status(
|
||||
subtask_id, "failed", outputs={"error": str(e)}
|
||||
)
|
||||
except Exception as db_error:
|
||||
logger.error(f"Failed to update subtask status: {db_error}")
|
||||
continue
|
||||
|
||||
# Return error if all records failed
|
||||
if errors and not results:
|
||||
return {"statusCode": 500, "body": json.dumps({"errors": errors})}
|
||||
# If adding would exceed batch → flush first
|
||||
if count + group_len > batch_size:
|
||||
create_batch_and_send_to_address2uprn(
|
||||
batch_df=pd.concat(buffer, ignore_index=True),
|
||||
task_id=task_id,
|
||||
sub_task_id=subtask_id,
|
||||
subtask_interface=subtask_interface,
|
||||
bucket_name=bucket_name,
|
||||
)
|
||||
buffer = []
|
||||
count = 0
|
||||
|
||||
# Add group
|
||||
buffer.append(group_df)
|
||||
count += group_len
|
||||
|
||||
# Final flush
|
||||
if buffer:
|
||||
create_batch_and_send_to_address2uprn(
|
||||
batch_df=pd.concat(buffer, ignore_index=True),
|
||||
task_id=task_id,
|
||||
sub_task_id=subtask_id,
|
||||
subtask_interface=subtask_interface,
|
||||
bucket_name=bucket_name,
|
||||
)
|
||||
|
||||
# Mark subtask as completed
|
||||
subtask_interface.update_subtask_status(
|
||||
subtask_id,
|
||||
"completed",
|
||||
outputs={"rows_processed": "todo -> show sensible output"},
|
||||
)
|
||||
|
||||
return {
|
||||
"statusCode": 200,
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue